Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
428 changes: 428 additions & 0 deletions examples/tool_safety/README.md

Large diffs are not rendered by default.

6 changes: 6 additions & 0 deletions examples/tool_safety/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# Tencent is pleased to support the open source community by making trpc-agent-python available.
#
# Copyright (C) 2026 Tencent. All rights reserved.
#
# trpc-agent-python is licensed under the Apache License Version 2.0
"""Tool Script Safety Guard example package."""
84 changes: 84 additions & 0 deletions examples/tool_safety/safety/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
# Tencent is pleased to support the open source community by making trpc-agent-python available.
#
# Copyright (C) 2026 Tencent. All rights reserved.
#
# trpc-agent-python is licensed under the Apache License Version 2.0
"""Tool Script Safety Guard.

A pluggable pre-execution safety scanner for Tool / Skill / CodeExecutor
scripts. Scans Python and Bash content for dangerous file ops, network
egress, process spawning, dependency installs, resource abuse, and secret
leakage, then emits allow / deny / needs_human_review decisions plus
structured reports and audit events.

Public API::

from examples.tool_safety.safety import (
PolicyConfig, SafetyScanner, ToolSafetyFilter, AuditLogger,
Decision, RiskLevel, SafetyReport, SafetyFinding, ScanInput,
)
"""
from __future__ import annotations

from .audit import AuditLogger
from .audit import emit_telemetry
from .policy import PolicyConfig
from .rules import default_rules
from .rules.base import SafetyRule
from .scanner import SCANNER_VERSION
from .scanner import SafetyScanner
from .scanner import register_custom_rule
from .types import Decision
from .types import max_risk_level
from .types import RiskLevel
from .types import SafetyFinding
from .types import SafetyReport
from .types import ScanInput

# SDK-bound integration layers. Imported lazily so the core scanner works even
# when the full tRPC-Agent SDK dependency tree (e.g. google-genai) is absent.
# tool_filter and wrapper are imported independently so one failing does not
# disable the other.
_SDK_AVAILABLE = False
try: # pragma: no cover - exercised only when SDK is importable
from .tool_filter import ToolSafetyFilter
_SDK_AVAILABLE = True
except Exception: # pylint: disable=broad-except
ToolSafetyFilter = None # type: ignore[assignment]

try: # pragma: no cover
from .wrapper import SafeCodeExecutor
from .wrapper import wrap_tool
from .wrapper import safety_wrapper
from .wrapper import SafetyDeniedError
from .wrapper import SafetyReviewedSkillRunner
except Exception: # pylint: disable=broad-except
SafeCodeExecutor = None # type: ignore[assignment]
wrap_tool = None # type: ignore[assignment]
safety_wrapper = None # type: ignore[assignment]
SafetyDeniedError = None # type: ignore[assignment]
SafetyReviewedSkillRunner = None # type: ignore[assignment]

__all__ = [
"AuditLogger",
"emit_telemetry",
"PolicyConfig",
"default_rules",
"SafetyRule",
"SCANNER_VERSION",
"SafetyScanner",
"register_custom_rule",
"ToolSafetyFilter",
"Decision",
"max_risk_level",
"RiskLevel",
"SafetyFinding",
"SafetyReport",
"ScanInput",
"SafeCodeExecutor",
"wrap_tool",
"safety_wrapper",
"SafetyDeniedError",
"SafetyReviewedSkillRunner",
"_SDK_AVAILABLE",
]
86 changes: 86 additions & 0 deletions examples/tool_safety/safety/audit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Tencent is pleased to support the open source community by making trpc-agent-python available.
#
# Copyright (C) 2026 Tencent. All rights reserved.
#
# trpc-agent-python is licensed under the Apache License Version 2.0
"""Audit logging and OpenTelemetry span reporting for safety scans.

Writes one JSONL line per scan decision to an audit file, and sets the
reserved ``tool.safety.*`` span attributes on the current OTel span when
OpenTelemetry is configured in the host process.
"""
from __future__ import annotations

import json
import os
import time
from pathlib import Path
from typing import Any
from typing import Optional

from .types import SafetyReport


class AuditLogger:
"""Append structured audit events to a JSONL file."""

def __init__(self, path: str | Path | None):
self.path = Path(path) if path else None

def log(self, report: SafetyReport, *, script_path: Optional[str] = None, intercepted: bool = False) -> dict[str, Any]:
"""Emit one audit record. Safe to call when *path* is None (no-op)."""
record = self._build_record(report, script_path=script_path, intercepted=intercepted)
if self.path is not None:
self.path.parent.mkdir(parents=True, exist_ok=True)
with self.path.open("a", encoding="utf-8") as fh:
fh.write(json.dumps(record, ensure_ascii=False) + "\n")
_emit_telemetry(report)
return record

@staticmethod
def _build_record(report: SafetyReport, *, script_path: Optional[str], intercepted: bool) -> dict[str, Any]:
return {
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%S%z", time.localtime()),
"tool_name": report.tool_name,
"decision": report.decision.value,
"risk_level": report.risk_level.value,
"rule_ids": report.rule_ids,
"scan_duration_ms": round(report.scan_duration_ms, 3),
"sanitized": report.sanitized,
"intercepted": intercepted,
"blocked": report.blocked,
"scanner_version": report.scanner_version,
"language": report.language,
"script_path": script_path,
"findings_count": len(report.findings),
}


def emit_telemetry(report: SafetyReport) -> None:
"""Set ``tool.safety.*`` span attributes on the current OTel span.

Public alias of the internal helper; safe to call when OTel is absent.
"""
_emit_telemetry(report)


def _emit_telemetry(report: SafetyReport) -> None:
"""Best-effort span attribute injection. No-op when OTel is unavailable."""
try:
from opentelemetry import trace # type: ignore
except ImportError:
return
try:
span = trace.get_current_span()
if span is None or not getattr(span, "is_recording", lambda: False)():
return
span.set_attribute("tool.safety.decision", report.decision.value)
span.set_attribute("tool.safety.risk_level", report.risk_level.value)
span.set_attribute("tool.safety.rule_id", ",".join(report.rule_ids))
span.set_attribute("tool.safety.scan_duration_ms", report.scan_duration_ms)
span.set_attribute("tool.safety.sanitized", report.sanitized)
span.set_attribute("tool.safety.blocked", report.blocked)
span.set_attribute("tool.safety.tool_name", report.tool_name)
except Exception: # pylint: disable=broad-except
# Telemetry must never break the safety path.
return
122 changes: 122 additions & 0 deletions examples/tool_safety/safety/policy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
# Tencent is pleased to support the open source community by making trpc-agent-python available.
#
# Copyright (C) 2026 Tencent. All rights reserved.
#
# trpc-agent-python is licensed under the Apache License Version 2.0.
"""Policy configuration loading for the Tool Script Safety Guard.

Reads ``tool_safety_policy.yaml`` into a :class:`PolicyConfig` object. The
policy drives every rule: allow-listed domains, forbidden paths, allowed
commands, thresholds, and the deny/review decision boundaries.

Changing the YAML is sufficient to change behavior — no code edits required.
"""
from __future__ import annotations

from dataclasses import dataclass
from dataclasses import field
from pathlib import Path
from typing import Any

import yaml

from .types import Decision
from .types import RiskLevel


@dataclass
class PolicyConfig:
"""In-memory representation of the safety policy.

Attributes:
whitelisted_domains: Domains network access is allowed to (suffix match).
forbidden_paths: Path substrings/regex that must never be touched.
allowed_commands: Bash commands permitted without further scrutiny.
max_timeout_seconds: Hard cap on script execution timeout.
max_output_bytes: Hard cap on captured output size.
max_file_write_bytes: Threshold above which file writes are flagged.
deny_risk_level: Findings at or above this level produce a DENY.
review_risk_level: Findings at or above this level (below deny) produce REVIEW.
secret_patterns: Regex patterns that look like leaked secrets.
disabled_rules: Rule ids to skip entirely.
extra: Free-form per-rule overrides keyed by rule id.
"""
whitelisted_domains: list[str] = field(default_factory=list)
forbidden_paths: list[str] = field(default_factory=list)
allowed_commands: list[str] = field(default_factory=list)
max_timeout_seconds: int = 300
max_output_bytes: int = 10 * 1024 * 1024
max_file_write_bytes: int = 100 * 1024 * 1024
deny_risk_level: RiskLevel = RiskLevel.HIGH
review_risk_level: RiskLevel = RiskLevel.MEDIUM
secret_patterns: list[str] = field(default_factory=list)
disabled_rules: list[str] = field(default_factory=list)
extra: dict[str, Any] = field(default_factory=dict)

@classmethod
def from_dict(cls, data: dict[str, Any]) -> "PolicyConfig":
"""Build a PolicyConfig from a parsed YAML mapping."""
data = data or {}
# Normalize risk levels from strings.
deny_lvl = _parse_risk_level(data.get("deny_risk_level"), RiskLevel.HIGH)
review_lvl = _parse_risk_level(data.get("review_risk_level"), RiskLevel.MEDIUM)

return cls(
whitelisted_domains=list(data.get("whitelisted_domains", []) or []),
forbidden_paths=list(data.get("forbidden_paths", []) or []),
allowed_commands=list(data.get("allowed_commands", []) or []),
max_timeout_seconds=int(data.get("max_timeout_seconds", 300)),
max_output_bytes=int(data.get("max_output_bytes", 10 * 1024 * 1024)),
max_file_write_bytes=int(data.get("max_file_write_bytes", 100 * 1024 * 1024)),
deny_risk_level=deny_lvl,
review_risk_level=review_lvl,
secret_patterns=list(data.get("secret_patterns", []) or []),
disabled_rules=list(data.get("disabled_rules", []) or []),
extra=dict(data.get("extra", {}) or {}),
)

@classmethod
def from_yaml(cls, path: str | Path) -> "PolicyConfig":
"""Load policy from a YAML file on disk."""
text = Path(path).read_text(encoding="utf-8")
data = yaml.safe_load(text) or {}
if not isinstance(data, dict):
raise ValueError(f"policy file {path} must contain a YAML mapping at top level")
return cls.from_dict(data)

def decision_for(self, max_level: RiskLevel) -> Decision:
"""Map an aggregate risk level to a final decision per policy."""
order = _RISK_ORDER
if order[max_level] >= order[self.deny_risk_level]:
return Decision.DENY
if order[max_level] >= order[self.review_risk_level]:
return Decision.NEEDS_HUMAN_REVIEW
return Decision.ALLOW

def is_domain_allowed(self, host: str) -> bool:
"""True when *host* matches any whitelisted suffix (empty list => none allowed)."""
if not self.whitelisted_domains:
# No allow-list configured: deny all network egress by default.
return False
host = (host or "").lower().strip()
return any(host == d or host.endswith("." + d) for d in self.whitelisted_domains)


_RISK_ORDER = {
RiskLevel.NONE: 0,
RiskLevel.LOW: 1,
RiskLevel.MEDIUM: 2,
RiskLevel.HIGH: 3,
RiskLevel.CRITICAL: 4,
}


def _parse_risk_level(value: Any, default: RiskLevel) -> RiskLevel:
if value is None:
return default
if isinstance(value, RiskLevel):
return value
try:
return RiskLevel(str(value).lower())
except ValueError:
return default
39 changes: 39 additions & 0 deletions examples/tool_safety/safety/rules/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Tencent is pleased to support the open source community by making trpc-agent-python available.
#
# Copyright (C) 2026 Tencent. All rights reserved.
#
# trpc-agent-python is licensed under the Apache License Version 2.0
"""Built-in safety rules registry."""
from __future__ import annotations

from .base import SafetyRule
from .dangerous_files import DangerousFilesRule
from .dependency_install import DependencyInstallRule
from .network import NetworkRule
from .process import ProcessRule
from .resource_abuse import ResourceAbuseRule
from .secret_leak import SecretLeakRule


def default_rules() -> list[SafetyRule]:
"""Return the default ordered set of built-in safety rules."""
return [
DangerousFilesRule(),
NetworkRule(),
ProcessRule(),
DependencyInstallRule(),
ResourceAbuseRule(),
SecretLeakRule(),
]


__all__ = [
"SafetyRule",
"DangerousFilesRule",
"NetworkRule",
"ProcessRule",
"DependencyInstallRule",
"ResourceAbuseRule",
"SecretLeakRule",
"default_rules",
]
Loading
Loading