diff --git a/examples/optimization/eval_optimize_loop/.gitignore b/examples/optimization/eval_optimize_loop/.gitignore new file mode 100644 index 0000000..383bc83 --- /dev/null +++ b/examples/optimization/eval_optimize_loop/.gitignore @@ -0,0 +1,4 @@ +output/ +__pycache__/ +.pytest_cache/ +*.pyc diff --git a/examples/optimization/eval_optimize_loop/config/optimizer.json b/examples/optimization/eval_optimize_loop/config/optimizer.json new file mode 100644 index 0000000..c9489ef --- /dev/null +++ b/examples/optimization/eval_optimize_loop/config/optimizer.json @@ -0,0 +1,87 @@ +{ + "_description": "Evaluation + Optimization 自动回归闭环配置", + "pipeline": { + "name": "PlateAgent Eval-Optimize Loop", + "version": "1.0.0", + "max_iterations": 5, + "random_seed": 42 + }, + "gate": { + "rules": { + "total_score_improvement": { + "enabled": true, + "threshold": 0.03, + "description": "验证集总分提升 ≥ 3%" + }, + "no_new_hard_fail": { + "enabled": true, + "max_new_fails": 0, + "description": "不允许新增 hard fail" + }, + "critical_case_no_regress": { + "enabled": true, + "critical_case_ids": [], + "description": "关键 case 不退步" + }, + "cost_within_budget": { + "enabled": true, + "max_cost_ratio": 1.2, + "description": "成本不超过 baseline 的 120%" + }, + "overfit_detection": { + "enabled": true, + "description": "训练集提升 + 验证集退化 → 拒绝候选" + } + }, + "acceptance_strategy": "all_must_pass", + "description": "all_must_pass: 所有启用的规则都通过才接受; majority: 多数通过即可" + }, + "attribution": { + "categories": [ + "final_answer_mismatch", + "tool_call_error", + "param_error", + "llm_rubric_fail", + "knowledge_recall_insufficient", + "format_invalid" + ], + "rules": { + "final_answer_mismatch": { + "trigger": "predicted != ground_truth", + "priority": 1 + }, + "tool_call_error": { + "trigger": "tool execution failed or timeout", + "priority": 2 + }, + "param_error": { + "trigger": "tool parameter invalid", + "priority": 3 + }, + "llm_rubric_fail": { + "trigger": "LLM Judge score below threshold", + "threshold": 0.6, + "priority": 4 + }, + "knowledge_recall_insufficient": { + "trigger": "blacklist miss or confusion char not recalled", + "priority": 5 + }, + "format_invalid": { + "trigger": "output does not match expected JSON schema", + "priority": 6 + } + } + }, + "optimizer": { + "target_prompts": ["system_prompt", "skill_prompt"], + "strategy": "failure_driven", + "description": "根据归因结果,优先优化失败率最高的类别对应的 prompt 片段" + }, + "output": { + "dir": "output", + "formats": ["json", "markdown"], + "retain_audit_trail": true, + "max_audit_entries": 50 + } +} diff --git a/examples/optimization/eval_optimize_loop/config/train.evalset.json b/examples/optimization/eval_optimize_loop/config/train.evalset.json new file mode 100644 index 0000000..6aed873 --- /dev/null +++ b/examples/optimization/eval_optimize_loop/config/train.evalset.json @@ -0,0 +1,43 @@ +{ + "_description": "???", + "version": "1.0.0", + "cases": [ + { + "case_id": "train_001", + "image": "plate_001.jpg", + "ground_truth": "\u4eacA12345", + "conditions": { + "type": "clear" + }, + "expected_behavior": "should_pass", + "description": "????" + }, + { + "case_id": "train_002", + "image": "plate_028.jpg", + "ground_truth": "\u4eacA12345", + "conditions": { + "type": "noise", + "noise_level": 0.15 + }, + "expected_behavior": "may_fail", + "description": "????" + }, + { + "case_id": "train_003", + "image": "plate_012.jpg", + "ground_truth": "\u82cfA88U88", + "conditions": { + "type": "blur", + "blur_kernel": 5 + }, + "expected_behavior": "may_fail", + "description": "????" + } + ], + "stats": { + "total": 3, + "should_pass": 1, + "may_fail": 2 + } +} \ No newline at end of file diff --git a/examples/optimization/eval_optimize_loop/config/val.evalset.json b/examples/optimization/eval_optimize_loop/config/val.evalset.json new file mode 100644 index 0000000..01fb3d4 --- /dev/null +++ b/examples/optimization/eval_optimize_loop/config/val.evalset.json @@ -0,0 +1,47 @@ +{ + "_description": "???", + "version": "1.0.0", + "cases": [ + { + "case_id": "val_001", + "image": "plate_005.jpg", + "ground_truth": "\u7ca4B54321", + "conditions": { + "type": "clear" + }, + "expected_behavior": "should_pass", + "critical": true, + "description": "??case" + }, + { + "case_id": "val_002", + "image": "plate_029.jpg", + "ground_truth": "\u82cfD13579", + "conditions": { + "type": "noise", + "noise_level": 0.2 + }, + "expected_behavior": "should_fail_baseline", + "critical": false, + "description": "??+???" + }, + { + "case_id": "val_003", + "image": "plate_018.jpg", + "ground_truth": "\u6d59C36912", + "conditions": { + "type": "blur", + "blur_kernel": 7 + }, + "expected_behavior": "should_fail_baseline", + "critical": false, + "description": "????" + } + ], + "stats": { + "total": 3, + "should_pass": 1, + "should_fail_baseline": 2, + "critical": 1 + } +} \ No newline at end of file diff --git a/examples/optimization/eval_optimize_loop/fake/__init__.py b/examples/optimization/eval_optimize_loop/fake/__init__.py new file mode 100644 index 0000000..1b58a1d --- /dev/null +++ b/examples/optimization/eval_optimize_loop/fake/__init__.py @@ -0,0 +1,11 @@ +"""Fake 模块公共导出""" +from .fake_model import FakeLLM, FakeLLMResponse +from .fake_judge import FakeJudge, JudgeResult, JudgeScore + +__all__ = [ + "FakeLLM", + "FakeLLMResponse", + "FakeJudge", + "JudgeResult", + "JudgeScore", +] diff --git a/examples/optimization/eval_optimize_loop/fake/fake_judge.py b/examples/optimization/eval_optimize_loop/fake/fake_judge.py new file mode 100644 index 0000000..203ed81 --- /dev/null +++ b/examples/optimization/eval_optimize_loop/fake/fake_judge.py @@ -0,0 +1,110 @@ +"""Fake Judge — 无 LLM API 调用下模拟评测打分。 + +基于规则引擎(非 LLM)对预测结果和 ground truth 进行对比评分, +输出与 LLMJudge 相同的数据结构,保证 pipeline 可无缝切换。 + +三维评分均基于字符匹配率推导,模拟真实 LLM Judge 行为: +识别差 → 黑名单召回和回复质量也会相应下降。 +""" + +from dataclasses import dataclass + + +@dataclass +class JudgeScore: + """模拟的三维评分""" + recognition_quality: float # 0.0-1.0 + blacklist_quality: float # 0.0-1.0 + response_quality: float # 0.0-1.0 + + @property + def overall(self) -> float: + return (self.recognition_quality + self.blacklist_quality + self.response_quality) / 3.0 + + @property + def passed(self) -> bool: + return self.overall >= 0.6 + + +@dataclass +class JudgeResult: + """模拟的评测结果""" + case_id: str + ground_truth: str + predicted: str + score: JudgeScore + passed: bool + failure_reason: str = "" + + +class FakeJudge: + """基于规则的假 Judge。 + + 评分逻辑(完全确定性,无 LLM 依赖): + - recognition_quality: 字符匹配率(0.0-1.0) + - blacklist_quality: 基于识别质量推导(识别差→黑名单召回也差) + - response_quality: 基于识别质量推导(识别差→回复质量也差) + + 使用方式: + judge = FakeJudge() + result = judge.evaluate("val_001", "京A12345", "京A12345") + """ + + def evaluate( + self, + case_id: str, + ground_truth: str, + predicted: str, + ) -> JudgeResult: + """对单条 case 进行评测。 + + Args: + case_id: case 标识 + ground_truth: 标注真值 + predicted: Agent 预测结果 + + Returns: + JudgeResult: 包含三维评分和 pass/fail 判断 + """ + recognition = self._char_match_score(ground_truth, predicted) + # 黑名单和回复质量随识别质量缩放(模拟真实场景) + blacklist = max(0.1, recognition * 0.9) + response = max(0.2, recognition * 1.05) + + score = JudgeScore( + recognition_quality=recognition, + blacklist_quality=blacklist, + response_quality=response, + ) + + passed = score.passed + reason = "" + if not passed: + if recognition < 0.8: + reason = f"final_answer_mismatch: char_match={recognition:.2f}" + elif blacklist < 0.6: + reason = "knowledge_recall_insufficient: blacklist miss" + else: + reason = f"llm_rubric_fail: overall={score.overall:.2f}" + + return JudgeResult( + case_id=case_id, + ground_truth=ground_truth, + predicted=predicted, + score=score, + passed=passed, + failure_reason=reason, + ) + + @staticmethod + def _char_match_score(a: str, b: str) -> float: + """字符级匹配得分。 + + 完全匹配 → 1.0,逐字符比较取平均。 + """ + if not a or not b: + return 0.0 + if a == b: + return 1.0 + matches = sum(1 for ca, cb in zip(a, b) if ca == cb) + return matches / max(len(a), len(b)) diff --git a/examples/optimization/eval_optimize_loop/fake/fake_model.py b/examples/optimization/eval_optimize_loop/fake/fake_model.py new file mode 100644 index 0000000..c1c4a67 --- /dev/null +++ b/examples/optimization/eval_optimize_loop/fake/fake_model.py @@ -0,0 +1,80 @@ +"""Fake LLM — 无 API Key 模式下模拟 LLM 响应。 + +设计思路: +- 基于 case_id 匹配预设的响应映射表 +- 支持多种场景:通过、失败、工具调用错误等 +- 不产生任何网络请求,所有数据来自配置文件 +""" + +from dataclasses import dataclass, field +from typing import Optional + + +@dataclass +class FakeLLMResponse: + """模拟的 LLM 单次响应""" + content: str + tool_calls: list[dict] = field(default_factory=list) + finish_reason: str = "stop" + + +class FakeLLM: + """无依赖的假 LLM,用于 pipeline 快速验证。 + + 使用方式: + fake = FakeLLM(scenarios={"plate_001": "京A12345"}) + response = await fake.generate("识别 plate_001") + """ + + def __init__(self, scenarios: Optional[dict[str, str]] = None): + """ + Args: + scenarios: {case_id: predicted_result} 映射。 + 不传则使用内置默认值。 + """ + self.scenarios = scenarios or self._default_scenarios() + self.call_count = 0 + self.call_history: list[dict] = [] + + @staticmethod + def _default_scenarios() -> dict[str, str]: + """内置默认场景 — 覆盖 6 个样例 case""" + return { + "train_001": "京A12345", # 清晰 → 通过 + "train_002": "京A12345", # 噪声 → 黑名单应命中 + "train_003": "苏A88U88", # 模糊 → 可能识别错误 + "val_001": "粤B54321", # 关键 case → 应通过 + "val_002": "苏D13579", # 噪声+黑名单 → 基线失败 + "val_003": "浙C36912", # 严重模糊 → 过拟合风险 + } + + async def generate(self, prompt: str) -> FakeLLMResponse: + """模拟一次 LLM 调用。 + + 从 prompt 中提取 case_id,返回对应的预设结果。 + 若未匹配到 case_id,返回 "UNKNOWN"。 + """ + self.call_count += 1 + case_id = self._extract_case_id(prompt) + result = self.scenarios.get(case_id, "UNKNOWN") + + response = FakeLLMResponse(content=result) + self.call_history.append({ + "call": self.call_count, + "case_id": case_id, + "result": result, + "prompt_snippet": prompt[:200], + }) + return response + + def _extract_case_id(self, prompt: str) -> str: + """从 prompt 中提取 case_id。""" + for cid in self.scenarios: + if cid in prompt: + return cid + return "unknown" + + def reset(self): + """重置调用计数和历史。""" + self.call_count = 0 + self.call_history.clear() diff --git a/examples/optimization/eval_optimize_loop/run_pipeline.py b/examples/optimization/eval_optimize_loop/run_pipeline.py new file mode 100644 index 0000000..64ae512 --- /dev/null +++ b/examples/optimization/eval_optimize_loop/run_pipeline.py @@ -0,0 +1,129 @@ +#!/usr/bin/env python3 +"""Eval-Optimize Loop CLI entry point. + +Usage: + python run_pipeline.py # fake mode + python run_pipeline.py --mode real # real mode (needs PlateAgent) + python run_pipeline.py --max-iter 3 # max optimization iterations +""" + +import argparse, asyncio, json, sys, time +from pathlib import Path +from datetime import datetime, timezone + +BASE_DIR = Path(__file__).resolve().parent +sys.path.insert(0, str(BASE_DIR)) + +from src.baseline import BaselineRunner +from src.attribution import AttributionRunner +from src.optimizer import OptimizationRunner +from src.validator import ValidationRunner +from src.auditor import Auditor +from src.reporter import generate_json_report, generate_markdown_report +from src.gate import AcceptanceGate + + +def load_config(): + with open(BASE_DIR / "config" / "optimizer.json", "r", encoding="utf-8") as f: + return json.load(f) + + +async def main(): + parser = argparse.ArgumentParser(description="Eval-Optimize Loop Pipeline") + parser.add_argument("--mode", default="fake", choices=["fake", "real", "trace"]) + parser.add_argument("--max-iter", type=int, default=3) + parser.add_argument("--output", type=str, default=None) + parser.add_argument("--train", type=str, default=None) + parser.add_argument("--val", type=str, default=None) + parser.add_argument("--seed", type=int, default=42) + parser.add_argument("--quiet", action="store_true") + args = parser.parse_args() + + config = load_config() + train_path = Path(args.train) if args.train else BASE_DIR / "config" / "train.evalset.json" + val_path = Path(args.val) if args.val else BASE_DIR / "config" / "val.evalset.json" + output_dir = Path(args.output) if args.output else BASE_DIR / "output" + started_at = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + + if not args.quiet: + print(f"Eval-Optimize Loop | mode={args.mode} seed={args.seed}") + print() + + # Phase 1: Baseline + if not args.quiet: print("[1/6] Baseline...") + br = BaselineRunner(mode="fake") + baseline = await br.run(train_path, val_path) + train_bl, val_bl = baseline["train"], baseline["val"] + if not args.quiet: + print(f" train: {train_bl.summary.pass_rate:.1%} val: {val_bl.summary.pass_rate:.1%}") + + # Phase 2: Attribution + if not args.quiet: print("[2/6] Attribution...") + ar = AttributionRunner() + attr = ar.run(train_bl, val_bl) + if not args.quiet: + p = attr.primary_failure_category + print(f" failures: {attr.total_failures} primary: {p.category if p else 'none'}") + + # Phase 3: Optimization + if not args.quiet: print("[3/6] Optimization...") + opt_runner = OptimizationRunner(mode="fake", config=config.get("pipeline", {})) + opt_result = opt_runner.run(attr) + if not args.quiet: print(f" candidates: {opt_result.total_iterations}") + + # Phase 4: Validation + if not args.quiet: print("[4/6] Validation...") + vr = ValidationRunner(mode="fake") + val_result = vr.run(val_bl, opt_result) + if not args.quiet: print(f" delta: {val_result.summary.avg_score_delta:+.3f}") + + # Phase 5: Gate + if not args.quiet: print("[5/6] Gate...") + gate = AcceptanceGate(config.get("gate", {})) + decision = gate.decide( + baseline_scores=val_bl.score_map, + candidate_scores=val_result.score_map, + baseline_train_scores=train_bl.score_map, + candidate_train_scores=train_bl.score_map, + baseline_cost=val_bl.summary.avg_cost * val_bl.summary.total, + candidate_cost=val_result.summary.total_cost_candidate, + critical_case_ids=["val_001"], + ) + gate_dict = { + "accepted": decision.accepted, + "reason": decision.reason, + "checks": [{"name": c.name, "passed": c.passed, "detail": c.detail} for c in decision.checks], + } + if not args.quiet: print(f" decision: {'ACCEPTED' if decision.accepted else 'REJECTED'}") + + # Phase 6: Audit + if not args.quiet: print("[6/6] Audit...") + auditor = Auditor(output_dir=output_dir) + trail = auditor.build_trail( + pipeline_name="PlateAgent Eval-Optimize Loop", + mode=args.mode, random_seed=args.seed, + optimization=opt_result, baseline_val=val_bl, + validation=val_result, gate_decision=gate_dict, + started_at=started_at, + ) + audit_path = auditor.save( + audit_trail=trail, baseline=baseline, attribution=attr, + optimization=opt_result, validation=val_result, gate_decision=gate_dict, + ) + + # Standalone reports + report_dir = output_dir / "reports" + report_dir.mkdir(parents=True, exist_ok=True) + generate_json_report(train_bl, val_bl, attr, opt_result, val_result, gate_dict, + report_dir / "optimization_report.json") + generate_markdown_report(train_bl, val_bl, attr, opt_result, val_result, gate_dict, + report_dir / "optimization_report.md") + + if not args.quiet: + print(f" audit: {audit_path}") + print(f" reports: {report_dir}") + print("Done. 6 phases completed.") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/optimization/eval_optimize_loop/src/__init__.py b/examples/optimization/eval_optimize_loop/src/__init__.py new file mode 100644 index 0000000..489b6fb --- /dev/null +++ b/examples/optimization/eval_optimize_loop/src/__init__.py @@ -0,0 +1 @@ +"""src 模块公共导出""" diff --git a/examples/optimization/eval_optimize_loop/src/attribution.py b/examples/optimization/eval_optimize_loop/src/attribution.py new file mode 100644 index 0000000..6ab5728 --- /dev/null +++ b/examples/optimization/eval_optimize_loop/src/attribution.py @@ -0,0 +1,292 @@ +"""Phase 2: 失败归因引擎。 + +对 baseline 评测中的失败 case 进行自动分类,按 6 个维度聚类, +输出归因统计和优化建议,为 Phase 3 AgentOptimizer 提供优化方向。 +""" + +from __future__ import annotations + +import json +from dataclasses import dataclass, field +from pathlib import Path +from typing import Optional + +from src.baseline import BaselineResult, BaselineCaseResult + + +@dataclass +class AttributionCase: + """单条 case 的归因结果。""" + case_id: str + dataset: str + category: str + category_priority: int + confidence: float + evidence: list[str] = field(default_factory=list) + ground_truth: str = "" + predicted: str = "" + score: float = 0.0 + char_match_rate: float = 0.0 + judge_scores: dict = field(default_factory=dict) + trajectory_signals: dict = field(default_factory=dict) + + def to_dict(self) -> dict: + return { + "case_id": self.case_id, "dataset": self.dataset, + "category": self.category, "category_priority": self.category_priority, + "confidence": round(self.confidence, 3), "evidence": self.evidence, + "ground_truth": self.ground_truth, "predicted": self.predicted, + "score": round(self.score, 4), "char_match_rate": round(self.char_match_rate, 3), + "judge_scores": self.judge_scores, "trajectory_signals": self.trajectory_signals, + } + + +@dataclass +class AttributionCluster: + """单个归因类别的聚合统计。""" + category: str + priority: int + count: int = 0 + train_count: int = 0 + val_count: int = 0 + cases: list[str] = field(default_factory=list) + avg_confidence: float = 0.0 + avg_score: float = 0.0 + dominant_condition: str = "" + prompt_target: str = "" + + def to_dict(self) -> dict: + return { + "category": self.category, "priority": self.priority, + "count": self.count, "train_count": self.train_count, + "val_count": self.val_count, "cases": self.cases, + "avg_confidence": round(self.avg_confidence, 3), + "avg_score": round(self.avg_score, 4), + "dominant_condition": self.dominant_condition, + "prompt_target": self.prompt_target, + } + + +@dataclass +class AttributionReport: + """完整归因报告。""" + total_failures: int = 0 + train_failures: int = 0 + val_failures: int = 0 + attributed_count: int = 0 + unattributed_count: int = 0 + clusters: list[AttributionCluster] = field(default_factory=list) + cases: list[AttributionCase] = field(default_factory=list) + optimization_priority: list[str] = field(default_factory=list) + + @property + def primary_failure_category(self) -> Optional[AttributionCluster]: + if not self.clusters: + return None + return max(self.clusters, key=lambda c: c.count) + + @property + def cluster_map(self) -> dict[str, AttributionCluster]: + return {c.category: c for c in self.clusters} + + def to_dict(self) -> dict: + return { + "total_failures": self.total_failures, + "train_failures": self.train_failures, + "val_failures": self.val_failures, + "attributed_count": self.attributed_count, + "unattributed_count": self.unattributed_count, + "clusters": [c.to_dict() for c in self.clusters], + "cases": [c.to_dict() for c in self.cases], + "optimization_priority": self.optimization_priority, + } + + +CATEGORY_META: dict[str, dict] = { + "final_answer_mismatch": {"priority": 1, "prompt_target": "system_prompt"}, + "tool_call_error": {"priority": 2, "prompt_target": "skill_prompt"}, + "param_error": {"priority": 3, "prompt_target": "skill_prompt"}, + "llm_rubric_fail": {"priority": 4, "prompt_target": "system_prompt"}, + "knowledge_recall_insufficient":{"priority": 5, "prompt_target": "skill_prompt"}, + "format_invalid": {"priority": 6, "prompt_target": "system_prompt"}, +} + + +class AttributionRunner: + """失败归因运行器。""" + + def __init__(self, config: Optional[dict] = None): + self.config = config or {} + self.categories = self.config.get("categories", list(CATEGORY_META.keys())) + + def run( + self, train_result: BaselineResult, val_result: BaselineResult + ) -> AttributionReport: + all_attrs: list[AttributionCase] = [] + for case in train_result.failed_cases: + all_attrs.append(self._attribute_case(case, "train")) + for case in val_result.failed_cases: + all_attrs.append(self._attribute_case(case, "val")) + clusters = self._build_clusters(all_attrs) + opt_priority = [c.category for c in sorted(clusters, key=lambda x: -x.count)] + attributed = [a for a in all_attrs if a.category != "unattributed"] + return AttributionReport( + total_failures=len(all_attrs), + train_failures=sum(1 for a in all_attrs if a.dataset == "train"), + val_failures=sum(1 for a in all_attrs if a.dataset == "val"), + attributed_count=len(attributed), + unattributed_count=len(all_attrs) - len(attributed), + clusters=clusters, cases=all_attrs, optimization_priority=opt_priority, + ) + + def _attribute_case( + self, case: BaselineCaseResult, dataset: str + ) -> AttributionCase: + evidence: list[str] = [] + candidates: list[tuple[str, float]] = [] + + # Rule 1: failure_reason keyword match + fr = case.failure_reason.lower() + if fr: + kw_map = { + "final_answer_mismatch": ["final_answer_mismatch", "char_match", "mismatch"], + "tool_call_error": ["tool_call_error", "tool execution failed", "timeout"], + "param_error": ["param_error", "parameter invalid", "invalid param"], + "llm_rubric_fail": ["llm_rubric_fail", "rubric", "judge score"], + "knowledge_recall_insufficient": ["knowledge_recall", "blacklist miss", "confusion char"], + "format_invalid": ["format_invalid", "format", "schema", "json parse"], + } + for cat, kws in kw_map.items(): + if any(kw in fr for kw in kws): + candidates.append((cat, 0.90)) + evidence.append(f"failure_reason: {case.failure_reason[:80]}") + + # Rule 2: trajectory signals (check raw_steps first, fallback to nodes) + traj = case.trajectory + if traj: + raw_steps = traj.get("raw_steps", []) + nodes = traj.get("nodes", []) + search_text = " ".join(raw_steps).lower() if raw_steps else " ".join(nodes).lower() + human_review = traj.get("human_review_triggered", False) + conf_val = traj.get("confidence") + + if "error" in search_text or "failed" in search_text: + candidates.append(("tool_call_error", 0.75)) + evidence.append("trajectory tool error") + + if any(kw in search_text for kw in ["partial", "shifted", "missing"]): + candidates.append(("param_error", 0.65)) + evidence.append("trajectory param/locate issue") + + if "knowledge_search" in search_text and "miss" in search_text: + candidates.append(("knowledge_recall_insufficient", 0.85)) + evidence.append("knowledge_search miss in trajectory") + + if human_review and conf_val is not None and conf_val < 0.5: + candidates.append(("llm_rubric_fail", 0.70)) + evidence.append(f"human_review with low conf={conf_val}") + + # Rule 3: Judge scores + if case.judge_recognition >= 0 and case.judge_recognition < 0.6: + candidates.append(("llm_rubric_fail", 0.80)) + evidence.append(f"judge_recognition={case.judge_recognition:.2f} < 0.6") + if case.judge_blacklist >= 0 and case.judge_blacklist < 0.6: + candidates.append(("knowledge_recall_insufficient", 0.75)) + evidence.append(f"judge_blacklist={case.judge_blacklist:.2f} < 0.6") + if case.judge_response >= 0 and case.judge_response < 0.6: + candidates.append(("llm_rubric_fail", 0.65)) + evidence.append(f"judge_response={case.judge_response:.2f} < 0.6") + + # Rule 4: char match fallback + char_rate = case.char_correct / max(case.char_total, 1) + if not case.correct: + candidates.append(("final_answer_mismatch", 0.85)) + evidence.append(f"pred != gt, char_match={char_rate:.2f}") + + # Select best category (highest priority, then confidence) + if candidates: + candidates.sort(key=lambda x: (CATEGORY_META.get(x[0], {}).get("priority", 99), -x[1])) + best_cat, best_conf = candidates[0] + else: + best_cat, best_conf = "unattributed", 0.0 + evidence.append("no matching category") + + cat_priority = CATEGORY_META.get(best_cat, {}).get("priority", 0) + + traj_signals = {} + if case.trajectory: + traj_signals = { + "nodes": case.trajectory.get("nodes", []), + "human_review_triggered": case.trajectory.get("human_review_triggered", False), + "confidence": case.trajectory.get("confidence"), + } + + judge_summary = {} + for dim in ("recognition", "blacklist", "response"): + val = getattr(case, f"judge_{dim}", -1) + if val >= 0: + judge_summary[dim] = val + + return AttributionCase( + case_id=case.case_id, dataset=dataset, + category=best_cat, category_priority=cat_priority, + confidence=best_conf, evidence=evidence, + ground_truth=case.ground_truth, predicted=case.predicted, + score=case.score, char_match_rate=char_rate, + judge_scores=judge_summary, trajectory_signals=traj_signals, + ) + + def _build_clusters( + self, attributions: list[AttributionCase] + ) -> list[AttributionCluster]: + clusters: dict[str, AttributionCluster] = {} + for cat_name in self.categories: + meta = CATEGORY_META.get(cat_name, {}) + clusters[cat_name] = AttributionCluster( + category=cat_name, priority=meta.get("priority", 99), + prompt_target=meta.get("prompt_target", ""), + ) + for attr in attributions: + if attr.category not in clusters: + continue + c = clusters[attr.category] + c.count += 1 + if attr.dataset == "train": + c.train_count += 1 + else: + c.val_count += 1 + c.cases.append(attr.case_id) + c.avg_score += attr.score + c.avg_confidence += attr.confidence + for c in clusters.values(): + if c.count > 0: + c.avg_score /= c.count + c.avg_confidence /= c.count + conds = [a.case_id for a in attributions if a.category == c.category] + if conds: + c.dominant_condition = self._guess_dominant_condition(conds) + return [c for c in clusters.values() if c.count > 0] + + @staticmethod + def _guess_dominant_condition(case_ids: list[str]) -> str: + cond_map = { + "train_001": "clear", "train_002": "noise", "train_003": "blur", + "val_001": "clear", "val_002": "noise", "val_003": "blur", + } + counts: dict[str, int] = {} + for cid in case_ids: + cond = cond_map.get(cid, "unknown") + counts[cond] = counts.get(cond, 0) + 1 + return max(counts, key=counts.get) if counts else "unknown" + + +def run_attribution( + train_result: BaselineResult, + val_result: BaselineResult, + config_path: Optional[str | Path] = None, +) -> AttributionReport: + config = None + if config_path: + with open(config_path, "r", encoding="utf-8") as f: + config = json.load(f).get("attribution", {}) + return AttributionRunner(config=config).run(train_result, val_result) diff --git a/examples/optimization/eval_optimize_loop/src/auditor.py b/examples/optimization/eval_optimize_loop/src/auditor.py new file mode 100644 index 0000000..ec9f7e8 --- /dev/null +++ b/examples/optimization/eval_optimize_loop/src/auditor.py @@ -0,0 +1,107 @@ +"""Phase 6: 审计落盘引擎。""" +from __future__ import annotations +import json, time +from dataclasses import dataclass, field, asdict +from datetime import datetime, timezone +from pathlib import Path +from typing import Optional +from src.baseline import BaselineResult +from src.attribution import AttributionReport +from src.optimizer import OptimizationResult +from src.validator import ValidationResult + +@dataclass +class AuditEntry: + timestamp: str; iteration: int; candidate_id: str + prompt_type: str; failure_category: str + prompt_before: str; prompt_after: str + change_log: list = field(default_factory=list) + baseline_scores: dict = field(default_factory=dict) + candidate_scores: dict = field(default_factory=dict) + gate_accepted: bool = False; gate_reason: str = "" + gate_checks: list = field(default_factory=list) + cost_baseline: float = 0.0; cost_candidate: float = 0.0 + latency_ms: float = 0.0; random_seed: int = 42 + def to_dict(self): return asdict(self) + +@dataclass +class AuditTrail: + pipeline_name: str; run_id: str; started_at: str + completed_at: str = ""; mode: str = "fake"; random_seed: int = 42 + entries: list = field(default_factory=list) + total_cost: float = 0.0; total_latency_ms: float = 0.0 + def to_dict(self): + return {"pipeline_name":self.pipeline_name,"run_id":self.run_id,"started_at":self.started_at,"completed_at":self.completed_at,"mode":self.mode,"random_seed":self.random_seed,"entries":[e.to_dict() for e in self.entries],"total_cost":self.total_cost,"total_latency_ms":self.total_latency_ms} + +class Auditor: + def __init__(self, output_dir="output"): + self.output_dir = Path(output_dir) + + def save(self, audit_trail, baseline, attribution, optimization, validation=None, gate_decision=None): + ts_dir = audit_trail.run_id + audit_path = self.output_dir / "audit" / ts_dir + audit_path.mkdir(parents=True, exist_ok=True) + full = {"audit_trail":audit_trail.to_dict(),"baseline":{k:v.to_dict() for k,v in baseline.items()},"attribution":attribution.to_dict(),"optimization":optimization.to_dict()} + if validation: full["validation"] = validation.to_dict() + if gate_decision: full["gate_decision"] = gate_decision + with open(audit_path/"optimization_report.json","w",encoding="utf-8") as f: + json.dump(full,f,ensure_ascii=False,indent=2) + for entry in audit_trail.entries: + cd = audit_path / f"candidate_{entry.iteration}" + cd.mkdir(exist_ok=True) + (cd/"prompt_before.txt").write_text(entry.prompt_before,"utf-8") + (cd/"prompt_after.txt").write_text(entry.prompt_after,"utf-8") + with open(cd/"change_log.json","w",encoding="utf-8") as f: + json.dump(entry.change_log,f,ensure_ascii=False,indent=2) + md = self._generate_md(audit_trail, baseline, attribution, optimization, validation, gate_decision) + (audit_path/"optimization_report.md").write_text(md,"utf-8") + return audit_path + + def build_trail(self, pipeline_name, mode, random_seed, optimization, baseline_val, validation=None, gate_decision=None, started_at=""): + now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + run_id = datetime.now().strftime("%Y%m%d_%H%M%S") + f"_{random_seed}" + entries = [] + for cand in optimization.candidates: + entry = AuditEntry(timestamp=now, iteration=cand.iteration, candidate_id=cand.candidate_id, prompt_type=cand.target_prompt_type, failure_category=cand.failure_category, prompt_before=cand.prompt_before, prompt_after=cand.prompt_after, change_log=cand.change_log, baseline_scores=baseline_val.score_map if baseline_val else {}, candidate_scores=validation.score_map if validation else {}, gate_accepted=gate_decision.get("accepted",False) if gate_decision else False, gate_reason=gate_decision.get("reason","") if gate_decision else "", gate_checks=gate_decision.get("checks",[]) if gate_decision else [], cost_baseline=baseline_val.summary.avg_cost*baseline_val.summary.total if baseline_val else 0.0, cost_candidate=validation.summary.total_cost_candidate if validation else 0.0, latency_ms=baseline_val.summary.avg_latency_ms if baseline_val else 0.0, random_seed=random_seed) + entries.append(entry) + return AuditTrail(pipeline_name=pipeline_name, run_id=run_id, started_at=started_at or now, completed_at=now, mode=mode, random_seed=random_seed, entries=entries, total_cost=sum(e.cost_candidate for e in entries), total_latency_ms=baseline_val.summary.avg_latency_ms if baseline_val else 0.0) + + @staticmethod + def _generate_md(audit_trail, baseline, attribution, optimization, validation, gate_decision): + L = [] + w = L.append + w("# Optimization Report\n") + w(f"**Pipeline**: {audit_trail.pipeline_name} | **Run**: {audit_trail.run_id}\n") + w(f"**Mode**: {audit_trail.mode} | **Seed**: {audit_trail.random_seed}\n\n") + w("## 1. Baseline Evaluation\n") + for name in ("train","val"): + r = baseline.get(name) + if r is None: continue + w(f"### {name}\n") + w(f"Pass Rate: {r.summary.pass_rate:.1%} ({r.summary.passed}/{r.summary.total}) | Avg Score: {r.summary.avg_score:.3f}\n\n") + for c in r.cases: + st = "PASS" if c.passed else "FAIL" + w(f"- [{st}] {c.case_id}: {c.ground_truth} -> {c.predicted} (score={c.score:.3f})\n") + w("\n") + w("## 2. Failure Attribution\n") + w(f"Failures: {attribution.total_failures} (train:{attribution.train_failures}, val:{attribution.val_failures})\n\n") + for cl in attribution.clusters: + w(f"- **{cl.category}**: {cl.count} cases, conf={cl.avg_confidence:.2f} -> optimize {cl.prompt_target}\n") + w("\n## 3. Optimization\n") + for cand in optimization.candidates: + w(f"### Candidate {cand.iteration}\n") + w(f"- Target: `{cand.target_prompt_type}` | Category: `{cand.failure_category}`\n") + for cl in cand.change_log: + w(f" - {cl}\n") + w("\n") + if validation and validation.delta_cases: + w("## 4. Candidate Validation\n") + for d in validation.delta_cases: + w(f"- {d.case_id}: {d.baseline_score:.3f} -> {d.candidate_score:.3f} ({d.score_delta:+.3f}) [{d.status}]\n") + w(f"\nSummary: improved={validation.summary.improved} regressed={validation.summary.regressed}\n\n") + if gate_decision: + w("## 5. Gate Decision\n") + w(f"**Accepted**: {gate_decision.get('accepted',False)}\n") + w(f"**Reason**: {gate_decision.get('reason','')}\n\n") + w(f"## 6. Audit\n\n- Total Cost: ${audit_trail.total_cost:.6f}\n- Run ID: `{audit_trail.run_id}`\n") + return "".join(L) diff --git a/examples/optimization/eval_optimize_loop/src/baseline.py b/examples/optimization/eval_optimize_loop/src/baseline.py new file mode 100644 index 0000000..7feaa16 --- /dev/null +++ b/examples/optimization/eval_optimize_loop/src/baseline.py @@ -0,0 +1,464 @@ +"""Phase 1: Baseline 评测引擎。 + +对训练集和验证集进行 baseline 评测,记录每条的 metric 分、pass/fail、 +失败原因和关键轨迹,作为后续优化流水线的基准线。 + +支持两种模式: +- fake: 无 API Key,使用 FakeLLM + FakeJudge 模拟评测 +- real: 对接 PlateAgent 的 PlateEvaluator 真实评测 + +使用示例: + runner = BaselineRunner(mode="fake") + results = await runner.run(train_path, val_path) + print(results["train"].summary.pass_rate) +""" + +from __future__ import annotations + +import json +import time +from dataclasses import dataclass, field +from pathlib import Path +from typing import Optional + +from fake.fake_model import FakeLLM +from fake.fake_judge import FakeJudge, JudgeResult + + +# ═══════════════════════════════════════════════════════════════ +# 数据结构 +# ═══════════════════════════════════════════════════════════════ + +@dataclass +class BaselineCaseResult: + """单条 case 的 baseline 评测结果。""" + case_id: str + image: str + ground_truth: str + predicted: str + score: float # 0.0-1.0 综合评分 + passed: bool # score >= 0.6 为通过 + correct: bool # 完全匹配 + char_correct: int = 0 + char_total: int = 0 + failure_reason: str = "" # 失败原因(空=通过) + judge_recognition: float = -1.0 # Judge 识别维度 + judge_blacklist: float = -1.0 # Judge 黑名单维度 + judge_response: float = -1.0 # Judge 回复维度 + cost: float = 0.0 # 预估 LLM token 成本 + latency_ms: float = 0.0 # pipeline 耗时 + conditions: dict = field(default_factory=dict) + trajectory: dict = field(default_factory=dict) # 关键轨迹片段 + + def to_dict(self) -> dict: + return { + "case_id": self.case_id, + "image": self.image, + "ground_truth": self.ground_truth, + "predicted": self.predicted, + "score": round(self.score, 4), + "passed": self.passed, + "correct": self.correct, + "char_correct": self.char_correct, + "char_total": self.char_total, + "failure_reason": self.failure_reason, + "judge_recognition": self.judge_recognition, + "judge_blacklist": self.judge_blacklist, + "judge_response": self.judge_response, + "cost": self.cost, + "latency_ms": round(self.latency_ms, 1), + "conditions": self.conditions, + } + + +@dataclass +class BaselineSummary: + """Baseline 汇总统计。""" + total: int = 0 + passed: int = 0 + failed: int = 0 + avg_score: float = 0.0 + avg_cost: float = 0.0 + avg_latency_ms: float = 0.0 + pass_rate: float = 0.0 + + def to_dict(self) -> dict: + return { + "total": self.total, + "passed": self.passed, + "failed": self.failed, + "avg_score": round(self.avg_score, 4), + "avg_cost": round(self.avg_cost, 6), + "avg_latency_ms": round(self.avg_latency_ms, 1), + "pass_rate": round(self.pass_rate, 4), + } + + +@dataclass +class BaselineResult: + """单个数据集的完整 baseline 结果。""" + dataset_name: str # "train" | "val" + cases: list[BaselineCaseResult] = field(default_factory=list) + summary: BaselineSummary = field(default_factory=BaselineSummary) + + @property + def failed_cases(self) -> list[BaselineCaseResult]: + return [c for c in self.cases if not c.passed] + + @property + def score_map(self) -> dict[str, float]: + """{case_id: score} — 供 gate 模块直接使用""" + return {c.case_id: c.score for c in self.cases} + + def to_dict(self) -> dict: + return { + "dataset_name": self.dataset_name, + "summary": self.summary.to_dict(), + "cases": [c.to_dict() for c in self.cases], + } + + +# ═══════════════════════════════════════════════════════════════ +# Fake 模式:预测值映射表 +# ═══════════════════════════════════════════════════════════════ + +# 模拟不同图像在不同场景下的识别结果 +# 用于构造 pass / fail / 边界三类 case +FAKE_PREDICTIONS: dict[str, dict[str, str]] = { + # ??? + "train_001": { + "predicted": "京A12345", # ?? ? ???? + "trajectory": "preprocess→locate→segment→recognize(conf=0.92)→format_output", + }, + "train_002": { + "predicted": "京B12345", # ?? ? 1?????A?B????????????? + "trajectory": "preprocess(noise_reduction)→locate→segment→recognize(conf=0.45)→llm_verify→format_output", + }, + "train_003": { + "predicted": "苏X8U88", # ?? ? ???+??????? + "trajectory": "preprocess(deblur_failed)→locate(partial)→segment(missing_char)→recognize(conf=0.38)→human_review→format_output", + }, + # ??? + "val_001": { + "predicted": "粤B54321", # ?? case ? ???? + "trajectory": "preprocess→locate→segment→recognize(conf=0.95)→format_output", + }, + "val_002": { + "predicted": "粤B1XS79", # ??+??? ? ????????? + "trajectory": "preprocess→locate→segment→recognize(conf=0.42)→knowledge_search(miss)→format_output", + }, + "val_003": { + "predicted": "浙X36X1Z", # ???? ? ????????? + "trajectory": "preprocess(deblur_failed)→locate(shifted)→segment→recognize(conf=0.25)→human_review→format_output", + }, +} + +class BaselineRunner: + """Baseline 评测运行器。 + + 支持 fake 和 real 两种模式。 + """ + + def __init__(self, mode: str = "fake", **kwargs): + """ + Args: + mode: "fake" | "real" + **kwargs: + fake — 无额外参数 + real — plate_agent_root: str(PlateAgent 项目根目录) + """ + if mode not in ("fake", "real"): + raise ValueError(f"Unknown mode: {mode}. Must be 'fake' or 'real'.") + self.mode = mode + self.kwargs = kwargs + + if mode == "fake": + self._fake_llm = FakeLLM() + self._fake_judge = FakeJudge() + + # ── 公共接口 ──────────────────────────────────────── + + async def run( + self, + train_path: str | Path, + val_path: str | Path, + ) -> dict[str, BaselineResult]: + """运行 baseline 评测。 + + Args: + train_path: train.evalset.json 路径 + val_path: val.evalset.json 路径 + + Returns: + {"train": BaselineResult, "val": BaselineResult} + """ + train_result = await self.run_split(train_path, "train") + val_result = await self.run_split(val_path, "val") + return {"train": train_result, "val": val_result} + + async def run_split( + self, + evalset_path: str | Path, + dataset_name: str, + ) -> BaselineResult: + """对单个数据集运行 baseline 评测。 + + Args: + evalset_path: JSON 文件路径 + dataset_name: "train" | "val"(用于日志和结果标记) + + Returns: + BaselineResult: 完整评测结果 + """ + evalset_path = Path(evalset_path) + with open(evalset_path, "r", encoding="utf-8") as f: + evalset = json.load(f) + + cases_data = evalset.get("cases", []) + if not cases_data: + raise ValueError(f"No cases found in {evalset_path}") + + if self.mode == "fake": + return await self._run_fake_split(cases_data, dataset_name) + else: + return await self._run_real_split(cases_data, dataset_name) + + # ── Fake 模式 ─────────────────────────────────────── + + async def _run_fake_split( + self, + cases_data: list[dict], + dataset_name: str, + ) -> BaselineResult: + """Fake 模式:使用 FakeLLM + FakeJudge 模拟评测。""" + case_results: list[BaselineCaseResult] = [] + + for case in cases_data: + case_id = case["case_id"] + ground_truth = case["ground_truth"] + image = case.get("image", "") + conditions = case.get("conditions", {}) + + # 1. 获取 fake 预测 + fake_info = FAKE_PREDICTIONS.get(case_id, {}) + predicted = fake_info.get("predicted", "UNKNOWN") + trajectory_text = fake_info.get("trajectory", "") + + # 模拟耗时(清晰 200ms,模糊/噪声 500ms) + cond_type = conditions.get("type", "clear") + fake_latency = 200 if cond_type == "clear" else 500 + + # 2. Fake Judge 打分 + judge_result: JudgeResult = self._fake_judge.evaluate( + case_id=case_id, + ground_truth=ground_truth, + predicted=predicted, + ) + + # 3. 构建结果 + correct = (predicted == ground_truth) + char_correct = sum( + 1 for i, c in enumerate(predicted) + if i < len(ground_truth) and c == ground_truth[i] + ) + char_total = len(ground_truth) + + # fake 成本估算:每个 case 约 $0.0002 + fake_cost = 0.0002 + + # 解析 trajectory 为结构化 dict + trajectory = self._parse_trajectory(trajectory_text) + + case_result = BaselineCaseResult( + case_id=case_id, + image=image, + ground_truth=ground_truth, + predicted=predicted, + score=judge_result.score.overall, + passed=judge_result.passed, + correct=correct, + char_correct=char_correct, + char_total=char_total, + failure_reason=judge_result.failure_reason, + judge_recognition=judge_result.score.recognition_quality, + judge_blacklist=judge_result.score.blacklist_quality, + judge_response=judge_result.score.response_quality, + cost=fake_cost, + latency_ms=fake_latency, + conditions=conditions, + trajectory=trajectory, + ) + case_results.append(case_result) + + # 4. 汇总 + summary = self._build_summary(case_results) + return BaselineResult( + dataset_name=dataset_name, + cases=case_results, + summary=summary, + ) + + # ── Real 模式(待对接 PlateEvaluator)───────────────── + + async def _run_real_split( + self, + cases_data: list[dict], + dataset_name: str, + ) -> BaselineResult: + """Real 模式:对接 PlateAgent 的 PlateEvaluator。 + + 当前为占位实现 — 需 plate-agent 项目环境 + trpc_agent_sdk 依赖。 + """ + plate_agent_root = self.kwargs.get("plate_agent_root") + if not plate_agent_root: + raise ValueError( + "Real mode requires plate_agent_root kwarg pointing to plate-agent project." + ) + + import sys + sys.path.insert(0, str(Path(plate_agent_root))) + + try: + from agent.session_manager import create_session_service, create_memory_service + from eval.evaluator import PlateEvaluator + except ImportError as e: + raise ImportError( + f"Cannot import PlateAgent modules from {plate_agent_root}. " + f"Ensure trpc_agent_sdk is installed. Error: {e}" + ) + + # 构建 ground_truth.json 格式(临时文件) + gt_items = [] + for case in cases_data: + gt_items.append({ + "id": hash(case["case_id"]) % 10000, + "image": f"eval/dataset/test_plates/{case['image']}", + "plate_number": case["ground_truth"], + "conditions": case.get("conditions", {}), + }) + + session_service = create_session_service(use_redis=False) + memory_service = create_memory_service(use_redis=False) + + evaluator = PlateEvaluator( + gt_path=None, # 不走文件,手动注入 + session_service=session_service, + memory_service=memory_service, + ) + # 直接注入 ground_truth 数据 + evaluator.ground_truth = gt_items + + report = await evaluator.run(verbose=False) + + # 转换为 BaselineCaseResult 列表 + case_results: list[BaselineCaseResult] = [] + for r in report.details: + case_id = cases_data[r.image_id - 1]["case_id"] if r.image_id <= len(cases_data) else f"case_{r.image_id}" + case_result = BaselineCaseResult( + case_id=case_id, + image=r.image_path, + ground_truth=r.ground_truth, + predicted=r.predicted, + score=1.0 if r.correct else (r.char_correct / max(r.char_total, 1)), + passed=r.correct, + correct=r.correct, + char_correct=r.char_correct, + char_total=r.char_total, + failure_reason="" if r.correct else f"predicted '{r.predicted}' != '{r.ground_truth}'", + judge_recognition=r.judge_recognition, + judge_blacklist=r.judge_blacklist, + judge_response=r.judge_response, + cost=0.0, # real 模式后续通过 token_tracker 采集 + latency_ms=r.pipeline_time_ms, + conditions=r.conditions, + ) + case_results.append(case_result) + + summary = self._build_summary(case_results) + return BaselineResult( + dataset_name=dataset_name, + cases=case_results, + summary=summary, + ) + + # ── 辅助方法 ──────────────────────────────────────── + + @staticmethod + def _build_summary(cases: list[BaselineCaseResult]) -> BaselineSummary: + """从 case 列表构建汇总统计。""" + total = len(cases) + passed = sum(1 for c in cases if c.passed) + failed = total - passed + avg_score = sum(c.score for c in cases) / total if total > 0 else 0.0 + avg_cost = sum(c.cost for c in cases) / total if total > 0 else 0.0 + avg_latency = sum(c.latency_ms for c in cases) / total if total > 0 else 0.0 + pass_rate = passed / total if total > 0 else 0.0 + return BaselineSummary( + total=total, + passed=passed, + failed=failed, + avg_score=avg_score, + avg_cost=avg_cost, + avg_latency_ms=avg_latency, + pass_rate=pass_rate, + ) + + @staticmethod + def _parse_trajectory(trajectory_text: str) -> dict: + """将轨迹文本解析为结构化 dict。 + + "preprocess→locate→segment→recognize(conf=0.92)→format_output" + → {"nodes": ["preprocess","locate","segment","recognize","format_output"], + "confidence": 0.92, "human_review_triggered": False} + """ + if not trajectory_text: + return {} + nodes = [] + confidence = None + human_review = False + for part in trajectory_text.split("→"): + part = part.strip() + if "(" in part: + name = part.split("(")[0] + if "conf=" in part: + try: + confidence = float(part.split("conf=")[1].rstrip(")")) + except ValueError: + pass + else: + name = part + nodes.append(name) + if name in ("human_review", "llm_verify"): + human_review = True + result = { + "nodes": nodes, + "human_review_triggered": human_review, + "raw_steps": [s.strip() for s in trajectory_text.split("→")], + } + if confidence is not None: + result["confidence"] = confidence + return result + + +# ═══════════════════════════════════════════════════════════════ +# 便捷函数 +# ═══════════════════════════════════════════════════════════════ + +async def run_baseline( + train_path: str | Path = "config/train.evalset.json", + val_path: str | Path = "config/val.evalset.json", + mode: str = "fake", + **kwargs, +) -> dict[str, BaselineResult]: + """一键运行 baseline 评测。 + + Args: + train_path: 训练集路径 + val_path: 验证集路径 + mode: "fake" | "real" + + Returns: + {"train": BaselineResult, "val": BaselineResult} + """ + runner = BaselineRunner(mode=mode, **kwargs) + return await runner.run(train_path, val_path) diff --git a/examples/optimization/eval_optimize_loop/src/gate.py b/examples/optimization/eval_optimize_loop/src/gate.py new file mode 100644 index 0000000..50bc3de --- /dev/null +++ b/examples/optimization/eval_optimize_loop/src/gate.py @@ -0,0 +1,254 @@ +"""Phase 5: 接受策略 Gate。 + +根据 optimizer.json 中的 gate 配置,对候选 prompt 的验证结果进行 +多条件判断,输出接受/拒绝决策。 +""" + +import json +from dataclasses import dataclass, field +from pathlib import Path +from typing import Optional + + +@dataclass +class GateCheck: + """单条 gate 检查结果""" + name: str + passed: bool + description: str + detail: str = "" + + +@dataclass +class GateDecision: + """Gate 整体决策""" + accepted: bool + reason: str + checks: list[GateCheck] = field(default_factory=list) + strategy: str = "all_must_pass" + + @property + def failed_checks(self) -> list[GateCheck]: + return [c for c in self.checks if not c.passed] + + @property + def passed_checks(self) -> list[GateCheck]: + return [c for c in self.checks if c.passed] + + +class AcceptanceGate: + """可配置的接受策略决策器。 + + 支持两种策略: + - all_must_pass: 所有启用的规则都通过才接受 + - majority: 多数规则通过即接受 + + 5 条可配置规则(从 optimizer.json 读取): + 1. total_score_improvement: 验证集总分提升 ≥ 阈值 + 2. no_new_hard_fail: 不允许新增 hard fail + 3. critical_case_no_regress: 关键 case 不退步 + 4. cost_within_budget: 成本不超预算 + 5. overfit_detection: 过拟合检测(训练提升 + 验证退化 → 拒绝) + """ + + def __init__(self, gate_config: dict): + """ + Args: + gate_config: optimizer.json 中 "gate" 节的配置 + """ + self.rules = gate_config.get("rules", {}) + self.strategy = gate_config.get("acceptance_strategy", "all_must_pass") + + def decide( + self, + baseline_scores: dict[str, float], # {case_id: score} + candidate_scores: dict[str, float], # {case_id: score} + baseline_train_scores: Optional[dict[str, float]] = None, # {case_id: score} + candidate_train_scores: Optional[dict[str, float]] = None, # {case_id: score} + baseline_cost: float = 0.0, + candidate_cost: float = 0.0, + critical_case_ids: Optional[list[str]] = None, + ) -> GateDecision: + """执行 gate 决策。 + + Returns: + GateDecision: 包含决策结果和每条规则的检查详情 + """ + checks: list[GateCheck] = [] + + # 1. 总分提升检查 + if self._rule_enabled("total_score_improvement"): + checks.append(self._check_total_improvement( + baseline_scores, candidate_scores + )) + + # 2. 无新增 hard fail + if self._rule_enabled("no_new_hard_fail"): + checks.append(self._check_no_new_hard_fail( + baseline_scores, candidate_scores + )) + + # 3. 关键 case 不退步 + if self._rule_enabled("critical_case_no_regress"): + checks.append(self._check_critical_cases( + baseline_scores, candidate_scores, critical_case_ids or [] + )) + + # 4. 成本不超预算 + if self._rule_enabled("cost_within_budget"): + checks.append(self._check_cost( + baseline_cost, candidate_cost + )) + + # 5. 过拟合检测 + if self._rule_enabled("overfit_detection") and baseline_train_scores and candidate_train_scores: + checks.append(self._check_overfit( + baseline_train_scores, candidate_train_scores, + baseline_scores, candidate_scores + )) + + # 决策 + if self.strategy == "all_must_pass": + accepted = all(c.passed for c in checks) + elif self.strategy == "majority": + accepted = sum(1 for c in checks if c.passed) > len(checks) / 2 + else: + accepted = all(c.passed for c in checks) + + reason = self._build_reason(accepted, checks) + return GateDecision( + accepted=accepted, + reason=reason, + checks=checks, + strategy=self.strategy, + ) + + # ── 各检查项 ──────────────────────────────────────── + + def _check_total_improvement( + self, + baseline: dict[str, float], + candidate: dict[str, float], + ) -> GateCheck: + threshold = self.rules["total_score_improvement"].get("threshold", 0.03) + base_avg = sum(baseline.values()) / len(baseline) if baseline else 0 + cand_avg = sum(candidate.values()) / len(candidate) if candidate else 0 + delta = cand_avg - base_avg + passed = delta >= threshold + return GateCheck( + name="total_score_improvement", + passed=passed, + description=f"总分提升 ≥ {threshold:.0%}", + detail=f"baseline={base_avg:.3f}, candidate={cand_avg:.3f}, delta={delta:+.3f}", + ) + + def _check_no_new_hard_fail( + self, + baseline: dict[str, float], + candidate: dict[str, float], + ) -> GateCheck: + max_new = self.rules["no_new_hard_fail"].get("max_new_fails", 0) + base_fails = sum(1 for s in baseline.values() if s < 0.6) + cand_fails = sum(1 for s in candidate.values() if s < 0.6) + new_fails = max(0, cand_fails - base_fails) + passed = new_fails <= max_new + return GateCheck( + name="no_new_hard_fail", + passed=passed, + description=f"新增 hard fail ≤ {max_new}", + detail=f"baseline fails={base_fails}, candidate fails={cand_fails}, new={new_fails}", + ) + + def _check_critical_cases( + self, + baseline: dict[str, float], + candidate: dict[str, float], + critical_ids: list[str], + ) -> GateCheck: + if not critical_ids: + return GateCheck( + name="critical_case_no_regress", + passed=True, + description="无关键 case 配置", + detail="skipped: no critical case ids", + ) + regressed = [ + cid for cid in critical_ids + if cid in baseline and cid in candidate + and candidate[cid] < baseline[cid] + ] + passed = len(regressed) == 0 + return GateCheck( + name="critical_case_no_regress", + passed=passed, + description="关键 case 不退步", + detail=f"regressed: {regressed}" if regressed else "all critical cases stable", + ) + + def _check_cost( + self, + baseline_cost: float, + candidate_cost: float, + ) -> GateCheck: + max_ratio = self.rules["cost_within_budget"].get("max_cost_ratio", 1.2) + if baseline_cost <= 0: + passed = True + ratio = 1.0 + else: + ratio = candidate_cost / baseline_cost + passed = ratio <= max_ratio + return GateCheck( + name="cost_within_budget", + passed=passed, + description=f"成本 ≤ {max_ratio:.0%}× baseline", + detail=f"baseline={baseline_cost:.4f}, candidate={candidate_cost:.4f}, ratio={ratio:.2f}", + ) + + def _check_overfit( + self, + baseline_train: dict[str, float], + candidate_train: dict[str, float], + baseline_val: dict[str, float], + candidate_val: dict[str, float], + ) -> GateCheck: + train_avg_base = sum(baseline_train.values()) / len(baseline_train) if baseline_train else 0 + train_avg_cand = sum(candidate_train.values()) / len(candidate_train) if candidate_train else 0 + val_avg_base = sum(baseline_val.values()) / len(baseline_val) if baseline_val else 0 + val_avg_cand = sum(candidate_val.values()) / len(candidate_val) if candidate_val else 0 + + train_improved = train_avg_cand > train_avg_base + val_regressed = val_avg_cand < val_avg_base + is_overfit = train_improved and val_regressed + + return GateCheck( + name="overfit_detection", + passed=not is_overfit, + description="训练集提升 + 验证集退化 → 拒绝", + detail=( + f"train: {train_avg_base:.3f}→{train_avg_cand:.3f} " + f"({'improved' if train_improved else 'not improved'}), " + f"val: {val_avg_base:.3f}→{val_avg_cand:.3f} " + f"({'regressed' if val_regressed else 'stable'})" + ), + ) + + # ── 辅助方法 ──────────────────────────────────────── + + def _rule_enabled(self, rule_name: str) -> bool: + rule = self.rules.get(rule_name, {}) + return rule.get("enabled", False) + + @staticmethod + def _build_reason(accepted: bool, checks: list[GateCheck]) -> str: + if accepted: + return "所有 gate 检查通过,接受此候选 prompt" + failed = [c for c in checks if not c.passed] + reasons = [f"{c.name}: {c.detail}" for c in failed] + return "拒绝候选 — " + "; ".join(reasons) + + +def load_gate_config(config_path: str | Path) -> dict: + """从 optimizer.json 加载 gate 配置""" + with open(config_path, "r", encoding="utf-8") as f: + config = json.load(f) + return config.get("gate", {}) diff --git a/examples/optimization/eval_optimize_loop/src/optimizer.py b/examples/optimization/eval_optimize_loop/src/optimizer.py new file mode 100644 index 0000000..e11fc7b --- /dev/null +++ b/examples/optimization/eval_optimize_loop/src/optimizer.py @@ -0,0 +1,444 @@ +"""Phase 3: ??????? + +?? Phase 2 ?????? TargetPrompt?system_prompt / skill_prompt??? +????????? prompt ?????? + +??????? +- fake: ??????????? prompt ???? API ??? +- real: ?? trpc_agent.optimization.AgentOptimizer API + +????? +- failure_driven: ??????????????????????? prompt ?? +- iterative: ???????? max_iterations ??? +""" + +from __future__ import annotations + +import json +import hashlib +import time +from dataclasses import dataclass, field +from pathlib import Path +from typing import Optional + +from src.attribution import AttributionReport, AttributionCluster, CATEGORY_META + + +# ============================================================================ +# ?? Prompt ????? PlateAgent ??? prompt ??? +# ============================================================================ + +BASE_PROMPTS: dict[str, str] = { + "system_prompt": ( + "??????????????\n" + "???????????????????????????????????" + "???????????????????????\n\n" + "## ????\n" + "1. ???????????????\n" + "2. ??????\n" + "3. ????\n" + "4. ??????\n" + "5. ????????\n" + "6. ???????\n\n" + "## ????\n" + "?? JSON ?????\n" + '{"plate_number": "?A12345", "confidence": 0.95, "blacklist_hit": false, "blacklist_info": {}}\n\n' + "## ????\n" + "- ??????????????????B/8, 0/O, S/5, 2/Z?\n" + "- ???????????????\n" + "- ????? < 0.5????????\n" + ), + "skill_prompt": ( + "## ???????\n" + "??????????????? ? ??? ? ??? ? Canny ???? ? ?????????\n" + "?????????????????????????????\n\n" + "## ??????\n" + "??????????????????????? HSV ?????????\n" + "??????????????????\n\n" + "## ??????\n" + "??????????????????????\n" + "????????????? '?'?\n\n" + "## ??????\n" + "?? SVM ??????????? LLM ?????????\n" + "???????????B/8, 0/O/D, 2/Z, 5/S, 1/I, 7/T, C/G, E/F, A/4, 6/G, 9/P, 3/B, D/P, K/X?\n" + "?????????/?, ?/?, ?/?, ?/??\n\n" + "## ???????\n" + "?????????????????\n" + "???????????????????????????????\n" + "??????????????????\n" + ), +} + +# ???? ? ?????? +CATEGORY_OPTIMIZATION_HINTS: dict[str, dict] = { + "final_answer_mismatch": { + "target_section": "?????????", + "strategy": ( + "??????????\n" + "- ???????????????\n" + "- ??????????? LLM ????\n" + "- ??????????????" + ), + }, + "tool_call_error": { + "target_section": "??????", + "strategy": ( + "??????????\n" + "- ???????????????\n" + "- ????????????????\n" + "- ??????????????" + ), + }, + "param_error": { + "target_section": "??????", + "strategy": ( + "?????????\n" + "- ???????????????????????\n" + "- ????????\n" + "- ??????????????" + ), + }, + "llm_rubric_fail": { + "target_section": "??????", + "strategy": ( + "???????\n" + "- ??????????????\n" + "- ?????????????????\n" + "- ?? JSON schema ??????????" + ), + }, + "knowledge_recall_insufficient": { + "target_section": "?????", + "strategy": ( + "???????\n" + "- ??????????????????\n" + "- ?????????????A12345 ??? ?A12345?\n" + "- ????????? '???' ?????" + ), + }, + "format_invalid": { + "target_section": "??????", + "strategy": ( + "???????\n" + "- ?? JSON schema ???????\n" + "- ??????? JSON ?????\n" + "- ??????????" + ), + }, +} + + +# ============================================================================ +# ???? +# ============================================================================ + +@dataclass +class PromptCandidate: + """??????? prompt?""" + candidate_id: str # ????????? hash + ???? + iteration: int # ??????0-based? + target_prompt_type: str # "system_prompt" | "skill_prompt" | "router_prompt" + prompt_before: str # ????? + prompt_after: str # ????? + change_log: list[str] = field(default_factory=list) # ?????? + failure_category: str = "" # ????????? + attribution_confidence: float = 0.0 # ????? + estimated_cost: float = 0.0 # ?????? + + def to_dict(self) -> dict: + return { + "candidate_id": self.candidate_id, + "iteration": self.iteration, + "target_prompt_type": self.target_prompt_type, + "prompt_before": self.prompt_before, + "prompt_after": self.prompt_after, + "change_log": self.change_log, + "failure_category": self.failure_category, + "attribution_confidence": round(self.attribution_confidence, 3), + "estimated_cost": round(self.estimated_cost, 6), + } + + +@dataclass +class OptimizationResult: + """???????""" + candidates: list[PromptCandidate] = field(default_factory=list) + total_iterations: int = 0 + strategy: str = "failure_driven" + attribution_summary: dict = field(default_factory=dict) # ???? + + @property + def latest_candidate(self) -> Optional[PromptCandidate]: + return self.candidates[-1] if self.candidates else None + + @property + def optimized_prompt(self) -> Optional[str]: + """???????? prompt?? validator ??????""" + c = self.latest_candidate + return c.prompt_after if c else None + + @property + def optimized_prompt_type(self) -> Optional[str]: + c = self.latest_candidate + return c.target_prompt_type if c else None + + def to_dict(self) -> dict: + return { + "candidates": [c.to_dict() for c in self.candidates], + "total_iterations": self.total_iterations, + "strategy": self.strategy, + "attribution_summary": self.attribution_summary, + } + + +# ============================================================================ +# FakeOptimizer +# ============================================================================ + +class FakeOptimizer: + """????????? Prompt ???? + + ???????????????????? prompt ????????? + ??? API ????????????????? + + ????: + opt = FakeOptimizer() + result = opt.optimize(attribution_report) + print(result.latest_candidate.prompt_after) + """ + + def __init__(self, seed: int = 42): + self.seed = seed + self._iteration = 0 + + def optimize( + self, + attribution_report: AttributionReport, + max_iterations: int = 3, + ) -> OptimizationResult: + """???????? prompt ??? + + Args: + attribution_report: Phase 2 ???? + max_iterations: ?????? + + Returns: + OptimizationResult: ?????? prompt ????? + """ + candidates: list[PromptCandidate] = [] + + if not attribution_report.clusters: + return OptimizationResult( + candidates=candidates, + total_iterations=0, + attribution_summary={"note": "no failures to optimize"}, + ) + + # ??????????? + priority_queue = self._build_priority_queue(attribution_report) + + for iteration, target in enumerate(priority_queue[:max_iterations]): + self._iteration = iteration + category = target["category"] + prompt_type = target["prompt_target"] + confidence = target["confidence"] + + # ??????? + prompt_before = self._get_base_prompt(prompt_type) + + # ??????? + prompt_after, change_log = self._generate_optimization( + prompt_type, category, prompt_before, confidence + ) + + # ???? ID + candidate_id = self._make_candidate_id(prompt_after, iteration) + + candidate = PromptCandidate( + candidate_id=candidate_id, + iteration=iteration, + target_prompt_type=prompt_type, + prompt_before=prompt_before, + prompt_after=prompt_after, + change_log=change_log, + failure_category=category, + attribution_confidence=confidence, + estimated_cost=0.0005, # fake ???????? + ) + candidates.append(candidate) + + attr_summary = { + "primary_failure": attribution_report.primary_failure_category.category + if attribution_report.primary_failure_category else "none", + "total_failures": attribution_report.total_failures, + "optimization_priority": attribution_report.optimization_priority, + } + + return OptimizationResult( + candidates=candidates, + total_iterations=len(candidates), + strategy="failure_driven", + attribution_summary=attr_summary, + ) + + # ?? ???? ???????????????????????????????????????? + + def _build_priority_queue( + self, report: AttributionReport + ) -> list[dict]: + """?????????? + + ?????????????????? prompt_target? + """ + queue = [] + for cluster in sorted(report.clusters, key=lambda c: -c.count): + if cluster.count == 0: + continue + queue.append({ + "category": cluster.category, + "prompt_target": cluster.prompt_target, + "confidence": cluster.avg_confidence, + "count": cluster.count, + }) + return queue + + def _get_base_prompt(self, prompt_type: str) -> str: + """????????? prompt?""" + return BASE_PROMPTS.get(prompt_type, f"# {prompt_type} prompt placeholder") + + def _generate_optimization( + self, + prompt_type: str, + category: str, + prompt_before: str, + confidence: float, + ) -> tuple[str, list[str]]: + """???????????? prompt ??? + + Returns: + (prompt_after, change_log) + """ + hints = CATEGORY_OPTIMIZATION_HINTS.get(category, {}) + strategy = hints.get("strategy", "????") + + change_log = [ + f"[{category}] confidence={confidence:.2f}", + f"target: {prompt_type} ? {hints.get('target_section', 'general')}", + ] + + # ????????? prompt ????? LLM ????? + optimization_header = ( + f"\n\n\n" + f"## ????????????{category}?\n" + f"{strategy}\n" + ) + + prompt_after = prompt_before + optimization_header + + # ?????? + for line in strategy.strip().split("\n"): + line = line.strip().lstrip("- ") + if line and not line.startswith("#"): + change_log.append(f" + {line}") + + return prompt_after, change_log + + @staticmethod + def _make_candidate_id(prompt_text: str, iteration: int) -> str: + """???? ID????? + ????""" + content_hash = hashlib.sha256(prompt_text.encode()).hexdigest()[:12] + ts = int(time.time() * 1000) + return f"cand_{iteration}_{content_hash}_{ts}" + + +# ============================================================================ +# OptimizationRunner????? +# ============================================================================ + +class OptimizationRunner: + """???????? + + ?? fake ? real ????? + + ????: + runner = OptimizationRunner(mode="fake") + result = runner.run(attribution_report) + print(result.optimized_prompt) + """ + + def __init__(self, mode: str = "fake", config: Optional[dict] = None, **kwargs): + if mode not in ("fake", "real"): + raise ValueError(f"Unknown mode: {mode}. Must be 'fake' or 'real'.") + self.mode = mode + self.config = config or {} + self.kwargs = kwargs + self.max_iterations = self.config.get("max_iterations", 3) + + if mode == "fake": + seed = self.config.get("random_seed", 42) + self._optimizer = FakeOptimizer(seed=seed) + + def run( + self, + attribution_report: AttributionReport, + ) -> OptimizationResult: + """????? + + Args: + attribution_report: Phase 2 ???? + + Returns: + OptimizationResult + """ + if self.mode == "fake": + return self._optimizer.optimize( + attribution_report, + max_iterations=self.max_iterations, + ) + else: + return self._run_real(attribution_report) + + def _run_real( + self, attribution_report: AttributionReport + ) -> OptimizationResult: + """Real ????? trpc_agent.optimization.AgentOptimizer?""" + try: + from trpc_agent.optimization import AgentOptimizer + except ImportError: + raise ImportError( + "Real mode requires trpc_agent.optimization. " + "Install trpc-agent package or use mode='fake'." + ) + # TODO: AgentOptimizer ???? tRPC-Agent SDK? + raise NotImplementedError( + "Real mode AgentOptimizer integration pending. Use fake mode." + ) + + +# ============================================================================ +# ???? +# ============================================================================ + +def run_optimization( + attribution_report: AttributionReport, + mode: str = "fake", + config_path: Optional[str | Path] = None, +) -> OptimizationResult: + """??????? + + Args: + attribution_report: Phase 2 ???? + mode: "fake" | "real" + config_path: optimizer.json ?? + + Returns: + OptimizationResult + """ + config = None + if config_path: + with open(config_path, "r", encoding="utf-8") as f: + full = json.load(f) + config = full.get("pipeline", {}) + + runner = OptimizationRunner(mode=mode, config=config) + return runner.run(attribution_report) diff --git a/examples/optimization/eval_optimize_loop/src/reporter.py b/examples/optimization/eval_optimize_loop/src/reporter.py new file mode 100644 index 0000000..32fe231 --- /dev/null +++ b/examples/optimization/eval_optimize_loop/src/reporter.py @@ -0,0 +1,43 @@ +"""报告生成器 — JSON + Markdown 双格式输出。""" +import json +from pathlib import Path + +def generate_json_report(baseline_train, baseline_val, attribution, optimization, validation, gate_decision, output_path): + report = {"pipeline":"eval_optimize_loop","baseline":{"train":baseline_train.to_dict(),"val":baseline_val.to_dict()},"attribution":attribution.to_dict(),"optimization":optimization.to_dict(),"validation":validation.to_dict(),"gate_decision":gate_decision} + with open(output_path,"w",encoding="utf-8") as f: + json.dump(report,f,ensure_ascii=False,indent=2) + +def generate_markdown_report(baseline_train, baseline_val, attribution, optimization, validation, gate_decision, output_path): + L = [] + w = L.append + w("# Eval-Optimize Loop Report\n\n## 1. Baseline\n") + for name,r in [("Train",baseline_train),("Val",baseline_val)]: + w(f"### {name} Set\nPass Rate: {r.summary.pass_rate:.1%} ({r.summary.passed}/{r.summary.total})\nAvg Score: {r.summary.avg_score:.3f}\n\n") + for c in r.cases: + st = "PASS" if c.passed else "FAIL" + w(f"- [{st}] {c.case_id}: {c.ground_truth} -> {c.predicted} (score={c.score:.3f})\n") + w("\n") + w("## 2. Attribution\n") + w(f"Failures: {attribution.total_failures} | Attributed: {attribution.attributed_count}\n\n") + for cl in attribution.clusters: + w(f"- **{cl.category}** ({cl.count} cases) -> {cl.prompt_target}\n") + w("\n## 3. Optimization\n") + for cand in optimization.candidates: + w(f"### Candidate {cand.iteration}\n- Target: `{cand.target_prompt_type}`\n- Category: `{cand.failure_category}`\n") + for cl in cand.change_log: + w(f" - {cl}\n") + w("\n") + w("## 4. Validation\n") + if validation.delta_cases: + for d in validation.delta_cases: + w(f"- {d.case_id}: {d.baseline_score:.3f} -> {d.candidate_score:.3f} ({d.score_delta:+.3f}) [{d.status}]\n") + w(f"\nSummary: improved={validation.summary.improved} regressed={validation.summary.regressed}\n") + w("\n## 5. Gate\n") + w(f"**Accepted**: {gate_decision.get('accepted',False)}\n**Reason**: {gate_decision.get('reason','')}\n") + checks = gate_decision.get("checks",[]) + if checks: + w("\n| Check | Result | Detail |\n|-------|--------|--------|\n") + for ck in checks: + st = "PASS" if ck.get("passed",False) else "FAIL" + w(f"| {ck.get('name','')} | {st} | {ck.get('detail','')} |\n") + Path(output_path).write_text("".join(L),"utf-8") diff --git a/examples/optimization/eval_optimize_loop/src/validator.py b/examples/optimization/eval_optimize_loop/src/validator.py new file mode 100644 index 0000000..388ba0c --- /dev/null +++ b/examples/optimization/eval_optimize_loop/src/validator.py @@ -0,0 +1,106 @@ +"""Phase 4: 候选验证引擎。""" +from __future__ import annotations +import json +from dataclasses import dataclass, field +from typing import Optional +from fake.fake_judge import FakeJudge +from src.baseline import BaselineResult +from src.optimizer import OptimizationResult + +@dataclass +class DeltaCase: + case_id: str; ground_truth: str + baseline_predicted: str; baseline_score: float; baseline_passed: bool + candidate_predicted: str; candidate_score: float; candidate_passed: bool + score_delta: float; status: str = "unchanged"; char_delta: int = 0 + baseline_judge: dict = field(default_factory=dict) + candidate_judge: dict = field(default_factory=dict) + baseline_cost: float = 0.0; candidate_cost: float = 0.0 + def to_dict(self): + return {k: round(v,6) if isinstance(v,float) else v for k,v in self.__dict__.items() if not k.startswith("_")} + +@dataclass +class ValidationSummary: + total: int = 0; improved: int = 0; regressed: int = 0; unchanged: int = 0 + avg_baseline_score: float = 0.0; avg_candidate_score: float = 0.0 + avg_score_delta: float = 0.0; total_cost_baseline: float = 0.0; total_cost_candidate: float = 0.0 + def to_dict(self): + return {k: round(v,6) if isinstance(v,float) else v for k,v in self.__dict__.items() if not k.startswith("_")} + +@dataclass +class ValidationResult: + candidate_id: str = ""; delta_cases: list = field(default_factory=list) + summary: ValidationSummary = field(default_factory=ValidationSummary) + optimization_target: str = "" + @property + def score_map(self): return {d.case_id: d.candidate_score for d in self.delta_cases} + @property + def new_failures(self): return [d for d in self.delta_cases if d.baseline_passed and not d.candidate_passed] + def to_dict(self): + return {"candidate_id":self.candidate_id,"delta_cases":[d.to_dict() for d in self.delta_cases],"summary":self.summary.to_dict(),"optimization_target":self.optimization_target} + +CANDIDATE_PREDICTIONS = { + "final_answer_mismatch": {"val_001":"粤B54321","val_002":"苏D13579","val_003":"浙C36912"}, + "knowledge_recall_insufficient": {"val_001":"粤B54321","val_002":"苏D13579","val_003":"浙C3691Z"}, + "tool_call_error": {"val_001":"粤B54321","val_002":"苏D13579","val_003":"浙C36912"}, + "param_error": {"val_001":"粤B54321","val_002":"苏D13579","val_003":"浙C36912"}, + "llm_rubric_fail": {"val_001":"粤B54321","val_002":"苏D13579","val_003":"浙C36912"}, + "format_invalid": {"val_001":"粤B54321","val_002":"苏D13579","val_003":"浙C36912"}, +} +REGRESSION_PREDICTIONS = {"val_001":"粤B5432Z","val_002":"粤B1XS79","val_003":"浙X36X1Z"} + +class ValidationRunner: + def __init__(self, mode="fake", **kwargs): + if mode not in ("fake","real"): raise ValueError(f"Unknown mode: {mode}") + self.mode = mode; self.kwargs = kwargs + if mode == "fake": self._judge = FakeJudge() + + def run(self, val_baseline, optimization_result, simulate_regression=False): + candidate = optimization_result.latest_candidate + if candidate is None: return ValidationResult(candidate_id="none") + if self.mode == "fake": return self._run_fake(val_baseline, candidate, simulate_regression) + return self._run_real(val_baseline, candidate) + + def _run_fake(self, val_baseline, candidate, simulate_regression=False): + pred_map = REGRESSION_PREDICTIONS if simulate_regression else CANDIDATE_PREDICTIONS.get( + candidate.failure_category, CANDIDATE_PREDICTIONS["final_answer_mismatch"]) + deltas = [] + for bl in val_baseline.cases: + cp_pred = pred_map.get(bl.case_id, bl.predicted) + cj = self._judge.evaluate(bl.case_id, bl.ground_truth, cp_pred) + cc = sum(1 for i,c in enumerate(cp_pred) if i0.005 else ("regressed" if sd<-0.005 else "unchanged") + cd = cc - bl.char_correct + deltas.append(DeltaCase( + case_id=bl.case_id, ground_truth=bl.ground_truth, + baseline_predicted=bl.predicted, baseline_score=bl.score, baseline_passed=bl.passed, + candidate_predicted=cp_pred, candidate_score=cj.score.overall, candidate_passed=cj.passed, + score_delta=sd, status=st, char_delta=cd, + baseline_judge={"recognition":bl.judge_recognition,"blacklist":bl.judge_blacklist,"response":bl.judge_response}, + candidate_judge={"recognition":cj.score.recognition_quality,"blacklist":cj.score.blacklist_quality,"response":cj.score.response_quality}, + baseline_cost=bl.cost, candidate_cost=bl.cost*1.15)) + s = self._build_summary(deltas) + return ValidationResult(candidate_id=candidate.candidate_id, delta_cases=deltas, summary=s, + optimization_target=f"{candidate.target_prompt_type}:{candidate.failure_category}") + + def _run_real(self, val_baseline, candidate): + try: from trpc_agent.optimization import AgentEvaluator + except ImportError: raise ImportError("Real mode requires trpc_agent. Use fake mode.") + raise NotImplementedError("Real mode pending.") + + @staticmethod + def _build_summary(deltas): + t = len(deltas) + if t == 0: return ValidationSummary() + imp = sum(1 for d in deltas if d.status=="improved") + reg = sum(1 for d in deltas if d.status=="regressed") + ab = sum(d.baseline_score for d in deltas)/t + ac = sum(d.candidate_score for d in deltas)/t + return ValidationSummary(total=t, improved=imp, regressed=reg, unchanged=t-imp-reg, + avg_baseline_score=ab, avg_candidate_score=ac, avg_score_delta=ac-ab, + total_cost_baseline=sum(d.baseline_cost for d in deltas), + total_cost_candidate=sum(d.candidate_cost for d in deltas)) + +def run_validation(val_baseline, optimization_result, mode="fake", simulate_regression=False): + return ValidationRunner(mode=mode).run(val_baseline, optimization_result, simulate_regression) diff --git a/examples/optimization/eval_optimize_loop/tests/__init__.py b/examples/optimization/eval_optimize_loop/tests/__init__.py new file mode 100644 index 0000000..e02abfc --- /dev/null +++ b/examples/optimization/eval_optimize_loop/tests/__init__.py @@ -0,0 +1 @@ + diff --git a/examples/optimization/eval_optimize_loop/tests/conftest.py b/examples/optimization/eval_optimize_loop/tests/conftest.py new file mode 100644 index 0000000..368247b --- /dev/null +++ b/examples/optimization/eval_optimize_loop/tests/conftest.py @@ -0,0 +1,76 @@ +"""pytest 配置 + 共享 fixtures""" + +import json +import sys +from pathlib import Path + +import pytest + +# 将项目根加入 sys.path +PROJECT_ROOT = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(PROJECT_ROOT)) + +# ── pytest-asyncio 配置 ── +pytest_plugins = ("pytest_asyncio",) + + +@pytest.fixture +def config_path(): + """optimizer.json 路径""" + return PROJECT_ROOT / "config" / "optimizer.json" + + +@pytest.fixture +def gate_config(config_path): + """加载 gate 配置""" + with open(config_path, "r", encoding="utf-8") as f: + config = json.load(f) + return config.get("gate", {}) + + +@pytest.fixture +def train_evalset_path(): + return PROJECT_ROOT / "config" / "train.evalset.json" + + +@pytest.fixture +def val_evalset_path(): + return PROJECT_ROOT / "config" / "val.evalset.json" + + +@pytest.fixture +def train_evalset(train_evalset_path): + with open(train_evalset_path, "r", encoding="utf-8") as f: + return json.load(f) + + +@pytest.fixture +def val_evalset(val_evalset_path): + with open(val_evalset_path, "r", encoding="utf-8") as f: + return json.load(f) + + +@pytest.fixture +def sample_baseline_scores(): + """模拟 baseline 验证集分数""" + return {"val_001": 0.95, "val_002": 0.45, "val_003": 0.40} + + +@pytest.fixture +def sample_candidate_scores(): + """模拟候选验证集分数(改善)""" + return {"val_001": 0.97, "val_002": 0.72, "val_003": 0.55} + + +@pytest.fixture +def sample_regressed_scores(): + """模拟候选验证集分数(退化)""" + return {"val_001": 0.93, "val_002": 0.40, "val_003": 0.35} + + +@pytest.fixture +def output_dir(tmp_path): + """临时输出目录""" + out = tmp_path / "output" + out.mkdir() + return out diff --git a/examples/optimization/eval_optimize_loop/tests/test_attribution.py b/examples/optimization/eval_optimize_loop/tests/test_attribution.py new file mode 100644 index 0000000..17f042b --- /dev/null +++ b/examples/optimization/eval_optimize_loop/tests/test_attribution.py @@ -0,0 +1,298 @@ +"""Phase 2 Attribution 单元测试""" + +import asyncio +import json +from pathlib import Path + +import pytest +from src.baseline import BaselineRunner, BaselineResult, BaselineCaseResult, BaselineSummary +from src.attribution import ( + AttributionRunner, + AttributionReport, + AttributionCase, + AttributionCluster, + run_attribution, + CATEGORY_META, +) + + +@pytest.fixture +def runner(): + return AttributionRunner() + + +@pytest.fixture +def train_baseline(): + """Fake mode train baseline — 1 pass, 1 boundary, 1 fail.""" + import asyncio + loop = asyncio.new_event_loop() + try: + br = BaselineRunner(mode="fake") + result = loop.run_until_complete( + br.run_split(Path(__file__).parent.parent / "config" / "train.evalset.json", "train") + ) + return result + finally: + loop.close() + + +@pytest.fixture +def val_baseline(): + """Fake mode val baseline — 1 pass, 2 fail.""" + import asyncio + loop = asyncio.new_event_loop() + try: + br = BaselineRunner(mode="fake") + result = loop.run_until_complete( + br.run_split(Path(__file__).parent.parent / "config" / "val.evalset.json", "val") + ) + return result + finally: + loop.close() + + +class TestAttributionDataStructures: + def test_case_to_dict(self): + ac = AttributionCase( + case_id="t1", dataset="train", category="final_answer_mismatch", + category_priority=1, confidence=0.9, evidence=["e1"], + ground_truth="京A", predicted="京B", score=0.5, + char_match_rate=0.5, judge_scores={"recognition": 0.5}, + trajectory_signals={"human_review_triggered": False}, + ) + d = ac.to_dict() + assert d["case_id"] == "t1" + assert d["category"] == "final_answer_mismatch" + + def test_cluster_to_dict(self): + c = AttributionCluster( + category="final_answer_mismatch", priority=1, + count=3, train_count=1, val_count=2, cases=["a","b","c"], + avg_confidence=0.85, avg_score=0.4, dominant_condition="noise", + prompt_target="system_prompt", + ) + d = c.to_dict() + assert d["count"] == 3 + assert d["train_count"] == 1 + assert d["val_count"] == 2 + + def test_report_properties(self): + c = AttributionCluster(category="a", priority=1, count=5) + c2 = AttributionCluster(category="b", priority=2, count=2) + report = AttributionReport( + total_failures=7, clusters=[c, c2], optimization_priority=["a", "b"], + cases=[AttributionCase(case_id="x", dataset="train", category="a", category_priority=1, confidence=0.9)], + ) + assert report.primary_failure_category.category == "a" + assert report.cluster_map["a"].count == 5 + assert len(report.cases) == 1 + + +class TestAttributionRunnerFakeMode: + """用 fake baseline 数据验证归因分类。""" + + def test_run_returns_report(self, runner, train_baseline, val_baseline): + report = runner.run(train_baseline, val_baseline) + assert isinstance(report, AttributionReport) + # train: train_003 fails, val: val_002 + val_003 fail = 3 total + assert report.total_failures == 3 + assert report.train_failures == 1 + assert report.val_failures == 2 + + def test_all_cases_attributed(self, runner, train_baseline, val_baseline): + """所有失败 case 都应被归因(无 unattributed)。""" + report = runner.run(train_baseline, val_baseline) + assert report.unattributed_count == 0 + assert report.attributed_count == report.total_failures + + def test_train_003_classified_as_answer_mismatch(self, runner, train_baseline): + """train_003: 苏X8U88 vs 苏A88U88 → final_answer_mismatch""" + report = runner.run(train_baseline, BaselineResult(dataset_name="val", cases=[])) + case = next(c for c in report.cases if c.case_id == "train_003") + assert case.category == "final_answer_mismatch" + assert case.confidence >= 0.8 + + def test_val_003_has_rich_evidence(self, runner, val_baseline): + """val_003 严重模糊 → 应有多条归因证据""" + report = runner.run( + BaselineResult(dataset_name="train", cases=[]), val_baseline + ) + case = next(c for c in report.cases if c.case_id == "val_003") + # val_003 应有多条证据(failure_reason + judge + trajectory至少2条) + assert len(case.evidence) >= 3, f"expected >=3 evidence items, got {len(case.evidence)}: {case.evidence}" + assert any("judge" in e.lower() or "recogn" in e.lower() for e in case.evidence) + assert any("human_review" in e.lower() or "low conf" in e.lower() for e in case.evidence) + + def test_optimization_priority_ordered(self, runner, train_baseline, val_baseline): + """优化优先级应降序排列。""" + report = runner.run(train_baseline, val_baseline) + counts = [report.cluster_map[p].count for p in report.optimization_priority] + assert counts == sorted(counts, reverse=True) + + def test_cluster_has_dominant_condition(self, runner, train_baseline, val_baseline): + report = runner.run(train_baseline, val_baseline) + for c in report.clusters: + assert c.dominant_condition in ("clear", "noise", "blur", "unknown") + + def test_evidence_not_empty(self, runner, train_baseline, val_baseline): + report = runner.run(train_baseline, val_baseline) + for case in report.cases: + assert len(case.evidence) >= 1, f"{case.case_id} has no evidence" + + def test_judge_scores_preserved(self, runner, train_baseline, val_baseline): + report = runner.run(train_baseline, val_baseline) + for case in report.cases: + assert "recognition" in case.judge_scores + + def test_serializable(self, runner, train_baseline, val_baseline): + report = runner.run(train_baseline, val_baseline) + d = report.to_dict() + j = json.dumps(d, ensure_ascii=False) + parsed = json.loads(j) + assert parsed["total_failures"] == 3 + + +class TestAttributionClassificationLogic: + """分类逻辑细粒度测试。""" + + @pytest.fixture + def default_runner(self): + return AttributionRunner() + + def test_final_answer_mismatch_classification(self, default_runner): + """failure_reason 含 'final_answer_mismatch' → 正确分类""" + case = BaselineCaseResult( + case_id="t1", image="", ground_truth="京A12345", predicted="京B12345", + score=0.4, passed=False, correct=False, char_correct=6, char_total=7, + failure_reason="final_answer_mismatch: char_match=0.86", + judge_recognition=0.86, judge_blacklist=0.77, judge_response=0.90, + trajectory={"nodes": ["preprocess","locate","recognize"], "human_review_triggered": False}, + ) + result = default_runner._attribute_case(case, "train") + assert result.category == "final_answer_mismatch" + assert result.confidence >= 0.85 + + def test_param_error_from_trajectory(self, default_runner): + """轨迹含 'shifted' → param_error 兜底""" + case = BaselineCaseResult( + case_id="t2", image="", ground_truth="京A12345", predicted="", + score=0.3, passed=False, correct=False, char_correct=0, char_total=7, + failure_reason="", + judge_recognition=-1, judge_blacklist=-1, judge_response=-1, + trajectory={"nodes": ["preprocess","locate(shifted)","segment"], "human_review_triggered": False}, + ) + result = default_runner._attribute_case(case, "train") + # Should fallback to param_error (trajectory) or final_answer_mismatch (char fallback) + # param_error has higher priority (3 vs 1) — wait, final_answer_mismatch is priority 1 (highest) + # So: final_answer_mismatch wins over param_error because priority 1 < 3 + # This is correct — mismatched answer takes precedence + assert result.category in ("final_answer_mismatch", "param_error") + + def test_llm_rubric_fail_from_judge(self, default_runner): + """judge_recognition < 0.6 → llm_rubric_fail""" + case = BaselineCaseResult( + case_id="t3", image="", ground_truth="京A12345", predicted="京A12345", + score=0.5, passed=False, correct=True, char_correct=7, char_total=7, + failure_reason="", + judge_recognition=0.45, judge_blacklist=0.8, judge_response=0.9, + trajectory={"nodes": ["preprocess","format_output"], "human_review_triggered": False}, + ) + result = default_runner._attribute_case(case, "train") + assert result.category == "llm_rubric_fail" + + def test_knowledge_recall_from_trajectory(self, default_runner): + """轨迹含 knowledge_search(miss) → knowledge_recall_insufficient""" + case = BaselineCaseResult( + case_id="t4", image="", ground_truth="苏D13579", predicted="苏D13579", + score=0.5, passed=False, correct=True, char_correct=7, char_total=7, + failure_reason="blacklist miss", + judge_recognition=0.9, judge_blacklist=0.3, judge_response=0.9, + trajectory={"nodes": ["recognize","knowledge_search(miss)","format_output"], "human_review_triggered": False}, + ) + result = default_runner._attribute_case(case, "train") + assert result.category in ("knowledge_recall_insufficient", "final_answer_mismatch") + # knowledge_recall_insufficient is priority 5, final_answer_mismatch is 1 + # But final_answer_mismatch only fires when !correct — here correct=True + # So should be knowledge_recall_insufficient + if result.category != "knowledge_recall_insufficient": + # May fall through if failure_reason triggers final_answer_mismatch keyword + pass + + def test_multiple_evidence_sources(self, default_runner): + """多条证据同时命中 → 选最高优先级""" + case = BaselineCaseResult( + case_id="t5", image="", ground_truth="京A12345", predicted="京X12Z45", + score=0.2, passed=False, correct=False, char_correct=3, char_total=7, + failure_reason="final_answer_mismatch: char_match=0.43", + judge_recognition=0.3, judge_blacklist=0.5, judge_response=0.4, + trajectory={"nodes": ["preprocess(deblur_failed)","locate(shifted)","human_review"], + "human_review_triggered": True, "confidence": 0.25}, + ) + result = default_runner._attribute_case(case, "train") + # final_answer_mismatch (prio 1) should win over llm_rubric_fail (prio 4) + # and param_error (prio 3) + assert result.category == "final_answer_mismatch" + assert len(result.evidence) >= 2 # multiple evidence items + + def test_char_rate_computed(self, default_runner): + case = BaselineCaseResult( + case_id="t6", image="", ground_truth="1234567", predicted="1234XXX", + score=0.4, passed=False, correct=False, char_correct=4, char_total=7, + failure_reason="mismatch", judge_recognition=-1, judge_blacklist=-1, judge_response=-1, + trajectory={}, + ) + result = default_runner._attribute_case(case, "train") + assert result.char_match_rate == pytest.approx(4/7, 0.01) + + +class TestAttributionEdgeCases: + """边界场景""" + + def test_no_failures(self): + """全部通过 → 无归因""" + runner = AttributionRunner() + empty = BaselineResult(dataset_name="train", cases=[], summary=BaselineSummary()) + report = runner.run(empty, empty) + assert report.total_failures == 0 + assert report.attributed_count == 0 + assert len(report.clusters) == 0 + assert report.primary_failure_category is None + + def test_unattributed_case(self): + """无法归因的 case → unattributed""" + case = BaselineCaseResult( + case_id="ux", image="", ground_truth="", predicted="", + score=0.3, passed=False, correct=False, char_correct=0, char_total=1, + failure_reason="", judge_recognition=-1, judge_blacklist=-1, judge_response=-1, + trajectory={}, + ) + runner = AttributionRunner() + result = runner._attribute_case(case, "train") + # Even with empty everything, char fallback should fire because !correct + # But gt="" and pred="" → char_match ties at 1/1 = 1.0, and correct=False... + # Let me check: "".char_correct("", "") → 0, char_total=max(1,1)=1 → rate=0 + # So !correct=True → final_answer_mismatch should fire + # Actually this depends on behavior: predicted="" vs ground_truth="" => correct=False but both empty + # The char_rate would be 0/1=0. So it should get final_answer_mismatch + assert result.category != "" + + +class TestConvenienceFunction: + """便捷函数测试""" + + def test_run_attribution_without_config(self, train_baseline, val_baseline): + report = run_attribution(train_baseline, val_baseline) + assert isinstance(report, AttributionReport) + assert report.total_failures >= 0 + + +class TestCategoryMeta: + """CATEGORY_META 完整性检查""" + + def test_all_priorities_unique(self): + priorities = [m["priority"] for m in CATEGORY_META.values()] + assert len(priorities) == len(set(priorities)) + + def test_all_have_prompt_target(self): + for name, meta in CATEGORY_META.items(): + assert meta.get("prompt_target") in ("system_prompt", "skill_prompt"), name diff --git a/examples/optimization/eval_optimize_loop/tests/test_baseline.py b/examples/optimization/eval_optimize_loop/tests/test_baseline.py new file mode 100644 index 0000000..095d633 --- /dev/null +++ b/examples/optimization/eval_optimize_loop/tests/test_baseline.py @@ -0,0 +1,237 @@ +"""Phase 1 Baseline 单元测试""" + +import asyncio +import json +from pathlib import Path + +import pytest +from src.baseline import ( + BaselineRunner, + BaselineResult, + BaselineCaseResult, + BaselineSummary, + run_baseline, +) + + +class TestBaselineDataStructures: + """数据结构测试""" + + def test_case_result_to_dict(self): + r = BaselineCaseResult( + case_id="test_001", + image="plate_001.jpg", + ground_truth="京A12345", + predicted="京A12345", + score=1.0, + passed=True, + correct=True, + char_correct=7, + char_total=7, + ) + d = r.to_dict() + assert d["case_id"] == "test_001" + assert d["score"] == 1.0 + assert d["passed"] is True + + def test_summary_to_dict(self): + s = BaselineSummary(total=3, passed=2, failed=1, avg_score=0.75, pass_rate=0.667) + d = s.to_dict() + assert d["total"] == 3 + assert d["passed"] == 2 + + def test_result_score_map(self): + result = BaselineResult( + dataset_name="test", + cases=[ + BaselineCaseResult(case_id="a", image="", ground_truth="", predicted="", score=0.9, passed=True, correct=True), + BaselineCaseResult(case_id="b", image="", ground_truth="", predicted="", score=0.4, passed=False, correct=False), + ], + ) + sm = result.score_map + assert sm == {"a": 0.9, "b": 0.4} + + def test_result_failed_cases(self): + result = BaselineResult( + dataset_name="test", + cases=[ + BaselineCaseResult(case_id="a", image="", ground_truth="", predicted="", score=0.9, passed=True, correct=True), + BaselineCaseResult(case_id="b", image="", ground_truth="", predicted="", score=0.4, passed=False, correct=False, failure_reason="mismatch"), + ], + ) + assert len(result.failed_cases) == 1 + assert result.failed_cases[0].case_id == "b" + + +class TestBaselineRunnerFakeMode: + """Fake 模式集成测试""" + + @pytest.mark.asyncio + async def test_run_train_split(self, train_evalset_path): + runner = BaselineRunner(mode="fake") + result = await runner.run_split(train_evalset_path, "train") + assert isinstance(result, BaselineResult) + assert result.dataset_name == "train" + assert len(result.cases) == 3 + + @pytest.mark.asyncio + async def test_run_val_split(self, val_evalset_path): + runner = BaselineRunner(mode="fake") + result = await runner.run_split(val_evalset_path, "val") + assert len(result.cases) == 3 + assert result.dataset_name == "val" + + @pytest.mark.asyncio + async def test_run_both_splits(self, train_evalset_path, val_evalset_path): + runner = BaselineRunner(mode="fake") + results = await runner.run(train_evalset_path, val_evalset_path) + assert "train" in results + assert "val" in results + assert len(results["train"].cases) == 3 + assert len(results["val"].cases) == 3 + + @pytest.mark.asyncio + async def test_train_001_should_pass(self, train_evalset_path): + """train_001 是清晰车牌 → 基线应通过""" + runner = BaselineRunner(mode="fake") + result = await runner.run_split(train_evalset_path, "train") + case = next(c for c in result.cases if c.case_id == "train_001") + assert case.passed, f"train_001 should pass, got: {case.failure_reason}" + assert case.correct + assert case.score >= 0.9 + + @pytest.mark.asyncio + async def test_train_002_may_fail(self, train_evalset_path): + """train_002 是噪声图片 → 可能失败""" + runner = BaselineRunner(mode="fake") + result = await runner.run_split(train_evalset_path, "train") + case = next(c for c in result.cases if c.case_id == "train_002") + # 噪声导致 1 字符错误,应归因 + assert not case.correct + assert case.char_correct < case.char_total # may_fail: ??????????? + + @pytest.mark.asyncio + async def test_val_001_critical_should_pass(self, val_evalset_path): + """val_001 是关键 case → 基线应通过(清晰图片)""" + runner = BaselineRunner(mode="fake") + result = await runner.run_split(val_evalset_path, "val") + case = next(c for c in result.cases if c.case_id == "val_001") + assert case.passed + assert case.correct + + @pytest.mark.asyncio + async def test_val_003_should_fail_baseline(self, val_evalset_path): + """val_003 是严重模糊 → 基线应失败""" + runner = BaselineRunner(mode="fake") + result = await runner.run_split(val_evalset_path, "val") + case = next(c for c in result.cases if c.case_id == "val_003") + assert not case.passed, "严重模糊基线应失败" + assert not case.correct + + @pytest.mark.asyncio + async def test_summary_statistics(self, val_evalset_path): + """验证汇总统计计算正确""" + runner = BaselineRunner(mode="fake") + result = await runner.run_split(val_evalset_path, "val") + s = result.summary + assert s.total == 3 + assert s.passed + s.failed == s.total + assert 0.0 <= s.avg_score <= 1.0 + assert 0.0 <= s.pass_rate <= 1.0 + assert s.avg_latency_ms > 0 + assert s.avg_cost > 0 + + @pytest.mark.asyncio + async def test_trajectory_present(self, train_evalset_path): + """验证轨迹信息被正确记录""" + runner = BaselineRunner(mode="fake") + result = await runner.run_split(train_evalset_path, "train") + for case in result.cases: + assert case.trajectory, f"{case.case_id} 缺少轨迹信息" + assert "nodes" in case.trajectory + assert len(case.trajectory["nodes"]) > 1 + + @pytest.mark.asyncio + async def test_judge_scores_present(self, train_evalset_path): + """验证 Judge 三维评分被正确填充""" + runner = BaselineRunner(mode="fake") + result = await runner.run_split(train_evalset_path, "train") + for case in result.cases: + assert case.judge_recognition >= 0, f"{case.case_id}: judge_recognition 未填充" + assert case.judge_blacklist >= 0 + assert case.judge_response >= 0 + + @pytest.mark.asyncio + async def test_serializable_to_json(self, train_evalset_path, val_evalset_path): + """验证结果可序列化为 JSON""" + runner = BaselineRunner(mode="fake") + results = await runner.run(train_evalset_path, val_evalset_path) + for name in ("train", "val"): + d = results[name].to_dict() + json_str = json.dumps(d, ensure_ascii=False) + parsed = json.loads(json_str) + assert parsed["dataset_name"] == name + assert len(parsed["cases"]) == 3 + + @pytest.mark.asyncio + async def test_convenience_function(self): + """测试便捷函数 run_baseline()""" + results = await run_baseline(mode="fake") + assert "train" in results + assert "val" in results + assert results["train"].summary.total == 3 + + +class TestBaselineRunnerRealMode: + """Real 模式测试""" + + @pytest.mark.asyncio + async def test_real_mode_requires_plate_agent_root(self, train_evalset_path): + """没有 plate_agent_root 应抛出 ValueError""" + runner = BaselineRunner(mode="real") + with pytest.raises(ValueError, match="plate_agent_root"): + await runner.run_split(train_evalset_path, "train") + + def test_invalid_mode_raises(self): + with pytest.raises(ValueError, match="Unknown mode"): + BaselineRunner(mode="production") + + +class TestBaselineEdgeCases: + """边界场景""" + + @pytest.mark.asyncio + async def test_empty_evalset_raises(self, tmp_path): + """空数据集应抛出异常""" + empty_path = tmp_path / "empty.json" + empty_path.write_text('{"cases": []}', encoding="utf-8") + runner = BaselineRunner(mode="fake") + with pytest.raises(ValueError, match="No cases"): + await runner.run_split(empty_path, "test") + + def test_build_summary_empty(self): + """空列表汇总""" + s = BaselineRunner._build_summary([]) + assert s.total == 0 + assert s.pass_rate == 0.0 + + @staticmethod + def test_parse_trajectory_basic(): + result = BaselineRunner._parse_trajectory( + "preprocess→locate→segment→recognize(conf=0.92)→format_output" + ) + assert result["nodes"] == ["preprocess", "locate", "segment", "recognize", "format_output"] + assert result["confidence"] == 0.92 + assert result["human_review_triggered"] is False + + @staticmethod + def test_parse_trajectory_with_human_review(): + result = BaselineRunner._parse_trajectory( + "preprocess→locate→segment→recognize(conf=0.38)→human_review→format_output" + ) + assert result["human_review_triggered"] is True + assert "human_review" in result["nodes"] + + @staticmethod + def test_parse_trajectory_empty(): + assert BaselineRunner._parse_trajectory("") == {} diff --git a/examples/optimization/eval_optimize_loop/tests/test_gate.py b/examples/optimization/eval_optimize_loop/tests/test_gate.py new file mode 100644 index 0000000..9231040 --- /dev/null +++ b/examples/optimization/eval_optimize_loop/tests/test_gate.py @@ -0,0 +1,155 @@ +"""Phase 5 Gate 单元测试""" + +import pytest +from src.gate import AcceptanceGate, GateDecision + + +class TestGateAcceptImproved: + """场景:候选全面改善 → 应接受""" + + def test_accepts_improved_candidate( + self, gate_config, sample_baseline_scores, sample_candidate_scores + ): + gate = AcceptanceGate(gate_config) + decision = gate.decide( + baseline_scores=sample_baseline_scores, + candidate_scores=sample_candidate_scores, + baseline_cost=0.10, + candidate_cost=0.11, + ) + assert decision.accepted, f"应接受但被拒绝: {decision.reason}" + assert len(decision.checks) >= 3 # 至少检查 total_score / hard_fail / cost + assert all(c.passed for c in decision.checks), \ + [f"{c.name}: {c.detail}" for c in decision.failed_checks] + + +class TestGateRejectRegressed: + """场景:候选退化 → 应拒绝""" + + def test_rejects_regressed_candidate( + self, gate_config, sample_baseline_scores, sample_regressed_scores + ): + gate = AcceptanceGate(gate_config) + decision = gate.decide( + baseline_scores=sample_baseline_scores, + candidate_scores=sample_regressed_scores, + baseline_cost=0.10, + candidate_cost=0.09, + ) + assert not decision.accepted, "退化候选应被拒绝" + assert any(not c.passed for c in decision.checks) + + +class TestGateOverfitDetection: + """场景:过拟合检测""" + + def test_rejects_overfit( + self, gate_config + ): + """训练集提升 + 验证集退化 → 拒绝""" + gate = AcceptanceGate(gate_config) + decision = gate.decide( + baseline_scores={"v1": 0.80, "v2": 0.75}, + candidate_scores={"v1": 0.72, "v2": 0.70}, # 验证集退化 + baseline_train_scores={"t1": 0.50, "t2": 0.45}, + candidate_train_scores={"t1": 0.80, "t2": 0.75}, # 训练集提升 + ) + assert not decision.accepted, "过拟合应被拒绝" + overfit_check = next( + (c for c in decision.checks if c.name == "overfit_detection"), None + ) + assert overfit_check is not None + assert not overfit_check.passed + + def test_accepts_no_overfit( + self, gate_config + ): + """训练集和验证集都提升 → 接受""" + gate = AcceptanceGate(gate_config) + decision = gate.decide( + baseline_scores={"v1": 0.70, "v2": 0.65}, + candidate_scores={"v1": 0.85, "v2": 0.80}, # 都提升 + baseline_train_scores={"t1": 0.50}, + candidate_train_scores={"t1": 0.80}, # 都提升 + ) + overfit_check = next( + (c for c in decision.checks if c.name == "overfit_detection"), None + ) + assert overfit_check is not None + assert overfit_check.passed, f"不过拟合应通过: {overfit_check.detail}" + + +class TestGateCriticalCases: + """场景:关键 case 不退步""" + + def test_rejects_critical_regression( + self, gate_config, sample_baseline_scores + ): + gate = AcceptanceGate(gate_config) + # val_001 是关键 case,从 0.95 退化到 0.80 + decision = gate.decide( + baseline_scores=sample_baseline_scores, + candidate_scores={"val_001": 0.80, "val_002": 0.90, "val_003": 0.80}, + critical_case_ids=["val_001"], + ) + critical_check = next( + (c for c in decision.checks if c.name == "critical_case_no_regress"), None + ) + assert critical_check is not None + assert not critical_check.passed + + +class TestGateCostBudget: + """场景:成本超预算""" + + def test_rejects_over_budget(self, gate_config, sample_baseline_scores, sample_candidate_scores): + gate = AcceptanceGate(gate_config) + decision = gate.decide( + baseline_scores=sample_baseline_scores, + candidate_scores=sample_candidate_scores, + baseline_cost=0.10, + candidate_cost=0.15, # 1.5× → 超过 1.2× 阈值 + ) + cost_check = next( + (c for c in decision.checks if c.name == "cost_within_budget"), None + ) + assert cost_check is not None + assert not cost_check.passed + + +class TestGateEdgeCases: + """边界场景""" + + def test_empty_scores(self, gate_config): + gate = AcceptanceGate(gate_config) + decision = gate.decide( + baseline_scores={}, + candidate_scores={}, + ) + # 总分提升 0.0 小于阈值 0.03 → 应失败 + total_check = next( + (c for c in decision.checks if c.name == "total_score_improvement"), None + ) + assert total_check is not None + assert not total_check.passed + + def test_majority_strategy(self): + """majority 策略:多数通过即接受""" + config = { + "rules": { + "total_score_improvement": {"enabled": True, "threshold": 0.03}, + "no_new_hard_fail": {"enabled": True, "max_new_fails": 0}, + "cost_within_budget": {"enabled": True, "max_cost_ratio": 1.2}, + }, + "acceptance_strategy": "majority", + } + gate = AcceptanceGate(config) + # 总分提升不达标(失败),但没有新 hard fail(通过),成本不超标(通过)→ 2/3 → 接受 + decision = gate.decide( + baseline_scores={"v1": 0.80, "v2": 0.75}, + candidate_scores={"v1": 0.81, "v2": 0.76}, # 仅 +0.01 < 0.03 + baseline_cost=0.10, + candidate_cost=0.10, + ) + assert decision.accepted + assert decision.strategy == "majority" diff --git a/examples/optimization/eval_optimize_loop/tests/test_optimizer.py b/examples/optimization/eval_optimize_loop/tests/test_optimizer.py new file mode 100644 index 0000000..801d83a --- /dev/null +++ b/examples/optimization/eval_optimize_loop/tests/test_optimizer.py @@ -0,0 +1,305 @@ +"""Phase 3 Optimizer ????""" + +import json +import asyncio +from pathlib import Path + +import pytest +from src.baseline import BaselineRunner, BaselineResult, BaselineCaseResult, BaselineSummary +from src.attribution import AttributionRunner, AttributionReport +from src.optimizer import ( + FakeOptimizer, + OptimizationRunner, + OptimizationResult, + PromptCandidate, + run_optimization, + BASE_PROMPTS, + CATEGORY_OPTIMIZATION_HINTS, +) + + +# ?? Fixtures ???????????????????????????????????????????? + +@pytest.fixture +def fake_attr_report(): + """? fake baseline + attribution ?????????""" + loop = asyncio.new_event_loop() + try: + br = BaselineRunner(mode="fake") + base = Path(__file__).parent.parent / "config" + results = loop.run_until_complete(br.run( + base / "train.evalset.json", + base / "val.evalset.json", + )) + ar = AttributionRunner() + report = ar.run(results["train"], results["val"]) + return report + finally: + loop.close() + + +@pytest.fixture +def empty_attr_report(): + """?????????""" + return AttributionReport(total_failures=0) + + +@pytest.fixture +def single_cluster_report(): + """???????? ? ?????????""" + from src.attribution import AttributionCluster + cluster = AttributionCluster( + category="final_answer_mismatch", priority=1, + count=3, train_count=1, val_count=2, + cases=["train_003", "val_002", "val_003"], + avg_confidence=0.87, avg_score=0.35, + dominant_condition="noise", prompt_target="system_prompt", + ) + return AttributionReport( + total_failures=3, train_failures=1, val_failures=2, + attributed_count=3, unattributed_count=0, + clusters=[cluster], optimization_priority=["final_answer_mismatch"], + ) + + +# ?? ?????? ???????????????????????????????????????? + +class TestPromptCandidate: + def test_to_dict(self): + c = PromptCandidate( + candidate_id="cand_0_abc_123", + iteration=0, target_prompt_type="system_prompt", + prompt_before="hello", prompt_after="hello world", + change_log=["added world"], failure_category="format_invalid", + attribution_confidence=0.85, estimated_cost=0.0005, + ) + d = c.to_dict() + assert d["candidate_id"] == "cand_0_abc_123" + assert d["iteration"] == 0 + assert d["change_log"] == ["added world"] + assert d["prompt_after"] == "hello world" + + def test_unique_ids(self): + """???????? ID?""" + c1 = PromptCandidate( + candidate_id="id1", iteration=0, target_prompt_type="system_prompt", + prompt_before="a", prompt_after="b", + ) + c2 = PromptCandidate( + candidate_id="id2", iteration=1, target_prompt_type="system_prompt", + prompt_before="a", prompt_after="b", + ) + assert c1.candidate_id != c2.candidate_id + + +class TestOptimizationResult: + def test_latest_candidate(self): + c1 = PromptCandidate(candidate_id="c1", iteration=0, target_prompt_type="system_prompt", prompt_before="x", prompt_after="y") + c2 = PromptCandidate(candidate_id="c2", iteration=1, target_prompt_type="system_prompt", prompt_before="y", prompt_after="z") + result = OptimizationResult(candidates=[c1, c2], total_iterations=2) + assert result.latest_candidate.candidate_id == "c2" + assert result.optimized_prompt == "z" + assert result.optimized_prompt_type == "system_prompt" + + def test_empty_no_latest(self): + result = OptimizationResult() + assert result.latest_candidate is None + assert result.optimized_prompt is None + + def test_to_dict(self): + c = PromptCandidate(candidate_id="c1", iteration=0, target_prompt_type="skill_prompt", prompt_before="x", prompt_after="y") + result = OptimizationResult(candidates=[c], total_iterations=1) + d = result.to_dict() + assert d["total_iterations"] == 1 + assert len(d["candidates"]) == 1 + + +# ?? FakeOptimizer ?? ?????????????????????????????????? + +class TestFakeOptimizer: + def test_optimize_generates_candidate(self, fake_attr_report): + opt = FakeOptimizer() + result = opt.optimize(fake_attr_report) + assert result.total_iterations >= 1 + assert len(result.candidates) >= 1 + + def test_prompt_after_longer_than_before(self, fake_attr_report): + """??? prompt ????????""" + opt = FakeOptimizer() + result = opt.optimize(fake_attr_report) + for c in result.candidates: + assert len(c.prompt_after) > len(c.prompt_before), ( + f"{c.target_prompt_type}: before={len(c.prompt_before)} after={len(c.prompt_after)}" + ) + + def test_change_log_not_empty(self, fake_attr_report): + """????????????""" + opt = FakeOptimizer() + result = opt.optimize(fake_attr_report) + for c in result.candidates: + assert len(c.change_log) >= 2, f"change_log too short: {c.change_log}" + + def test_target_prompt_type_valid(self, fake_attr_report): + """target_prompt_type ????????""" + opt = FakeOptimizer() + result = opt.optimize(fake_attr_report) + for c in result.candidates: + assert c.target_prompt_type in BASE_PROMPTS, ( + f"unknown prompt type: {c.target_prompt_type}" + ) + + def test_failure_category_mapped(self, fake_attr_report): + """failure_category ?????????""" + opt = FakeOptimizer() + result = opt.optimize(fake_attr_report) + valid = set(CATEGORY_OPTIMIZATION_HINTS.keys()) + for c in result.candidates: + assert c.failure_category in valid, f"unknown category: {c.failure_category}" + + def test_matches_attribution_priority(self, fake_attr_report): + """??????????????""" + opt = FakeOptimizer() + result = opt.optimize(fake_attr_report) + # ?????????????? + if fake_attr_report.optimization_priority: + top_priority = fake_attr_report.optimization_priority[0] + assert result.candidates[0].failure_category == top_priority + + def test_max_iterations_respected(self, fake_attr_report): + """max_iterations ????????""" + opt = FakeOptimizer() + result = opt.optimize(fake_attr_report, max_iterations=1) + assert len(result.candidates) <= 1 + + def test_empty_attribution_no_candidates(self, empty_attr_report): + opt = FakeOptimizer() + result = opt.optimize(empty_attr_report) + assert result.total_iterations == 0 + assert len(result.candidates) == 0 + + def test_candidate_id_format(self, fake_attr_report): + opt = FakeOptimizer() + result = opt.optimize(fake_attr_report) + for c in result.candidates: + assert c.candidate_id.startswith("cand_"), f"bad id: {c.candidate_id}" + assert len(c.candidate_id) > 20 + + def test_attribution_summary_present(self, fake_attr_report): + opt = FakeOptimizer() + result = opt.optimize(fake_attr_report) + assert "primary_failure" in result.attribution_summary + assert "total_failures" in result.attribution_summary + + def test_strategy_label(self, fake_attr_report): + opt = FakeOptimizer() + result = opt.optimize(fake_attr_report) + assert result.strategy == "failure_driven" + + def test_skill_prompt_optimization(self, single_cluster_report): + """?????? skill_prompt???? skill_prompt?""" + # ?? cluster ? prompt_target ? skill_prompt + single_cluster_report.clusters[0].prompt_target = "skill_prompt" + single_cluster_report.clusters[0].category = "knowledge_recall_insufficient" + opt = FakeOptimizer() + result = opt.optimize(single_cluster_report) + assert result.candidates[0].target_prompt_type == "skill_prompt" + + +# ?? OptimizationRunner ?? ????????????????????????????? + +class TestOptimizationRunner: + def test_fake_mode(self, fake_attr_report): + runner = OptimizationRunner(mode="fake") + result = runner.run(fake_attr_report) + assert isinstance(result, OptimizationResult) + assert result.total_iterations >= 1 + + def test_invalid_mode_raises(self): + with pytest.raises(ValueError, match="Unknown mode"): + OptimizationRunner(mode="production") + + def test_real_mode_not_implemented(self, fake_attr_report): + """Real ??????? NotImplementedError ? ImportError?""" + runner = OptimizationRunner(mode="real") + with pytest.raises((NotImplementedError, ImportError)): + runner.run(fake_attr_report) + + +# ?? ?????? ???????????????????????????????????????? + +class TestConvenienceFunction: + def test_run_optimization(self, fake_attr_report): + result = run_optimization(fake_attr_report, mode="fake") + assert isinstance(result, OptimizationResult) + + def test_run_optimization_with_config(self, fake_attr_report): + config_path = Path(__file__).parent.parent / "config" / "optimizer.json" + result = run_optimization(fake_attr_report, mode="fake", config_path=config_path) + assert result.total_iterations >= 1 + + +# ?? BASE_PROMPTS ??? ????????????????????????????????? + +class TestBasePrompts: + def test_all_prompt_types_have_content(self): + for ptype, text in BASE_PROMPTS.items(): + assert len(text) > 50, f"{ptype} prompt too short" + + def test_system_prompt_has_key_sections(self): + sp = BASE_PROMPTS["system_prompt"] + assert "????" in sp + assert "????" in sp + assert "???" in sp + + def test_skill_prompt_has_key_sections(self): + sp = BASE_PROMPTS["skill_prompt"] + assert "???" in sp + assert "??" in sp + assert "??" in sp + assert "???" in sp + + +# ?? ??????? ?????????????????????????????????????? + +class TestPipelineIntegration: + """baseline ? attribution ? optimizer ??????""" + + @pytest.mark.asyncio + async def test_full_fake_pipeline(self): + """?? fake pipeline ????""" + base = Path(__file__).parent.parent / "config" + + # Phase 1: baseline + br = BaselineRunner(mode="fake") + results = await br.run( + base / "train.evalset.json", + base / "val.evalset.json", + ) + assert results["train"].summary.total == 3 + assert results["val"].summary.total == 3 + + # Phase 2: attribution + ar = AttributionRunner() + attr_report = ar.run(results["train"], results["val"]) + assert attr_report.total_failures >= 1 + assert attr_report.unattributed_count == 0 + + # Phase 3: optimizer + opt = FakeOptimizer() + opt_result = opt.optimize(attr_report) + assert opt_result.total_iterations >= 1 + assert opt_result.latest_candidate is not None + + # ??????? + pipeline_output = { + "baseline": { + "train": results["train"].to_dict(), + "val": results["val"].to_dict(), + }, + "attribution": attr_report.to_dict(), + "optimization": opt_result.to_dict(), + } + json_str = json.dumps(pipeline_output, ensure_ascii=False, indent=2) + assert len(json_str) > 1000 + parsed = json.loads(json_str) + assert "optimization" in parsed diff --git a/examples/optimization/eval_optimize_loop/tests/test_validator.py b/examples/optimization/eval_optimize_loop/tests/test_validator.py new file mode 100644 index 0000000..37a3d3a --- /dev/null +++ b/examples/optimization/eval_optimize_loop/tests/test_validator.py @@ -0,0 +1,290 @@ +"""Phase 4 Validator ????""" + +import json +import asyncio +from pathlib import Path + +import pytest +from src.baseline import BaselineRunner, BaselineResult, BaselineCaseResult +from src.attribution import AttributionRunner +from src.optimizer import FakeOptimizer, OptimizationResult, PromptCandidate +from src.validator import ( + ValidationRunner, + ValidationResult, + DeltaCase, + ValidationSummary, + run_validation, + CANDIDATE_PREDICTIONS, + REGRESSION_PREDICTIONS, +) + + +# ?? Fixtures ???????????????????????????????????????????? + +@pytest.fixture +def val_baseline(): + """Fake mode val baseline?""" + loop = asyncio.new_event_loop() + try: + br = BaselineRunner(mode="fake") + result = loop.run_until_complete( + br.run_split(Path(__file__).parent.parent / "config" / "val.evalset.json", "val") + ) + return result + finally: + loop.close() + + +@pytest.fixture +def full_pipeline(): + """?? fake pipeline: baseline ? attribution ? optimizer?""" + loop = asyncio.new_event_loop() + try: + base = Path(__file__).parent.parent / "config" + br = BaselineRunner(mode="fake") + results = loop.run_until_complete(br.run( + base / "train.evalset.json", base / "val.evalset.json", + )) + ar = AttributionRunner() + attr = ar.run(results["train"], results["val"]) + opt = FakeOptimizer() + opt_result = opt.optimize(attr) + return results["val"], opt_result + finally: + loop.close() + + +# ?? ?????? ???????????????????????????????????????? + +class TestDeltaCase: + def test_improved_status(self): + d = DeltaCase( + case_id="v1", ground_truth="A", + baseline_predicted="B", baseline_score=0.4, baseline_passed=False, + candidate_predicted="A", candidate_score=0.9, candidate_passed=True, + score_delta=0.5, status="improved", char_delta=1, + ) + assert d.status == "improved" + assert d.score_delta > 0 + + def test_regressed_status(self): + d = DeltaCase( + case_id="v1", ground_truth="A", + baseline_predicted="A", baseline_score=0.9, baseline_passed=True, + candidate_predicted="B", candidate_score=0.4, candidate_passed=False, + score_delta=-0.5, status="regressed", char_delta=-1, + ) + assert d.status == "regressed" + + def test_to_dict(self): + d = DeltaCase( + case_id="v1", ground_truth="A", + baseline_predicted="A", baseline_score=1.0, baseline_passed=True, + candidate_predicted="A", candidate_score=1.0, candidate_passed=True, + score_delta=0.0, status="unchanged", + baseline_judge={"recognition": 1.0}, candidate_judge={"recognition": 1.0}, + ) + dd = d.to_dict() + assert dd["case_id"] == "v1" + assert dd["status"] == "unchanged" + assert dd["baseline_judge"]["recognition"] == 1.0 + + +class TestValidationResult: + def test_score_map(self): + result = ValidationResult( + candidate_id="c1", + delta_cases=[ + DeltaCase(case_id="a", ground_truth="", baseline_predicted="", baseline_score=0.5, baseline_passed=False, candidate_predicted="", candidate_score=0.8, candidate_passed=True, score_delta=0.3, status="improved"), + DeltaCase(case_id="b", ground_truth="", baseline_predicted="", baseline_score=0.9, baseline_passed=True, candidate_predicted="", candidate_score=0.91, candidate_passed=True, score_delta=0.01, status="improved"), + ], + ) + sm = result.score_map + assert sm["a"] == 0.8 + assert sm["b"] == 0.91 + + def test_new_failures(self): + result = ValidationResult( + delta_cases=[ + DeltaCase(case_id="pass_to_fail", ground_truth="", baseline_predicted="", baseline_score=0.9, baseline_passed=True, candidate_predicted="", candidate_score=0.4, candidate_passed=False, score_delta=-0.5, status="regressed"), + DeltaCase(case_id="fail_to_pass", ground_truth="", baseline_predicted="", baseline_score=0.4, baseline_passed=False, candidate_predicted="", candidate_score=0.9, candidate_passed=True, score_delta=0.5, status="improved"), + ], + ) + nf = result.new_failures + assert len(nf) == 1 + assert nf[0].case_id == "pass_to_fail" + + +# ?? ValidationRunner Fake ?? ????????????????????????? + +class TestValidationRunnerFake: + def test_run_returns_result(self, full_pipeline): + val_bl, opt_result = full_pipeline + runner = ValidationRunner(mode="fake") + result = runner.run(val_bl, opt_result) + assert isinstance(result, ValidationResult) + assert result.candidate_id == opt_result.latest_candidate.candidate_id + assert len(result.delta_cases) == 3 + + def test_summary_has_improvement(self, full_pipeline): + """?????????????""" + val_bl, opt_result = full_pipeline + runner = ValidationRunner(mode="fake") + result = runner.run(val_bl, opt_result) + assert result.summary.improved >= 1 + assert result.summary.avg_score_delta > 0 + + def test_val_001_critical_unchanged(self, full_pipeline): + """?? case val_001 ?????""" + val_bl, opt_result = full_pipeline + runner = ValidationRunner(mode="fake") + result = runner.run(val_bl, opt_result) + d = next(c for c in result.delta_cases if c.case_id == "val_001") + assert d.status in ("improved", "unchanged") + assert not (d.baseline_passed and not d.candidate_passed) + + def test_val_002_improved(self, full_pipeline): + """val_002 ???????""" + val_bl, opt_result = full_pipeline + runner = ValidationRunner(mode="fake") + result = runner.run(val_bl, opt_result) + d = next(c for c in result.delta_cases if c.case_id == "val_002") + assert d.status == "improved" or d.score_delta > 0 + + def test_regression_mode(self, full_pipeline): + """????????? case ????????""" + val_bl, opt_result = full_pipeline + runner = ValidationRunner(mode="fake") + result = runner.run(val_bl, opt_result, simulate_regression=True) + v1 = next(c for c in result.delta_cases if c.case_id == "val_001") + assert v1.status == "regressed", f"val_001 should regress in regression mode, got {v1.status}" + assert result.summary.regressed >= 1 + + def test_serializable(self, full_pipeline): + val_bl, opt_result = full_pipeline + runner = ValidationRunner(mode="fake") + result = runner.run(val_bl, opt_result) + j = json.dumps(result.to_dict(), ensure_ascii=False) + parsed = json.loads(j) + assert parsed["candidate_id"] + assert len(parsed["delta_cases"]) == 3 + + def test_no_candidate_returns_empty(self): + """??? prompt ???????""" + runner = ValidationRunner(mode="fake") + result = runner.run( + BaselineResult(dataset_name="val"), + OptimizationResult(candidates=[]), + ) + assert result.candidate_id == "none" + assert len(result.delta_cases) == 0 + + def test_optimization_target_set(self, full_pipeline): + val_bl, opt_result = full_pipeline + runner = ValidationRunner(mode="fake") + result = runner.run(val_bl, opt_result) + assert "system_prompt" in result.optimization_target + assert "final_answer_mismatch" in result.optimization_target + + +class TestValidationRunnerModes: + def test_invalid_mode_raises(self): + with pytest.raises(ValueError, match="Unknown mode"): + ValidationRunner(mode="production") + + def test_real_mode_not_implemented(self, full_pipeline): + val_bl, opt_result = full_pipeline + runner = ValidationRunner(mode="real") + with pytest.raises((NotImplementedError, ImportError)): + runner.run(val_bl, opt_result) + + +# ?? ?????? ???????????????????????????????????????? + +class TestConvenienceFunction: + def test_run_validation(self, full_pipeline): + val_bl, opt_result = full_pipeline + result = run_validation(val_bl, opt_result, mode="fake") + assert isinstance(result, ValidationResult) + + +# ?? ??????? ?????????????????????????????????????? + +class TestPredictionMaps: + def test_all_categories_have_val_cases(self): + for cat in ["final_answer_mismatch", "knowledge_recall_insufficient", + "tool_call_error", "param_error", "llm_rubric_fail", "format_invalid"]: + assert cat in CANDIDATE_PREDICTIONS, f"missing {cat}" + pmap = CANDIDATE_PREDICTIONS[cat] + for cid in ["val_001", "val_002", "val_003"]: + assert cid in pmap, f"{cat} missing {cid}" + + def test_regression_map_has_all(self): + for cid in ["val_001", "val_002", "val_003"]: + assert cid in REGRESSION_PREDICTIONS + + +# ?? ?????: 4-phase pipeline + gate ????????????????? + +class TestFullPipelineWithGate: + """baseline ? attribution ? optimizer ? validator ? gate ????""" + + @pytest.mark.asyncio + async def test_four_phase_to_gate(self): + from src.gate import AcceptanceGate + import json + + base = Path(__file__).parent.parent / "config" + + # Phase 1: baseline + br = BaselineRunner(mode="fake") + results = await br.run( + base / "train.evalset.json", base / "val.evalset.json", + ) + + # Phase 2: attribution + ar = AttributionRunner() + attr = ar.run(results["train"], results["val"]) + + # Phase 3: optimizer + opt = FakeOptimizer() + opt_result = opt.optimize(attr) + + # Phase 4: validator + vr = ValidationRunner(mode="fake") + val_result = vr.run(results["val"], opt_result) + + # Phase 5: gate + with open(base / "optimizer.json", "r", encoding="utf-8") as f: + gate_config = json.load(f)["gate"] + gate = AcceptanceGate(gate_config) + + decision = gate.decide( + baseline_scores=results["val"].score_map, + candidate_scores=val_result.score_map, + baseline_train_scores=results["train"].score_map, + candidate_train_scores=results["train"].score_map, + baseline_cost=results["val"].summary.avg_cost * results["val"].summary.total, + candidate_cost=val_result.summary.total_cost_candidate, + ) + + # ??????????? + full_output = { + "baseline": { + "train": results["train"].to_dict(), + "val": results["val"].to_dict(), + }, + "attribution": attr.to_dict(), + "optimization": opt_result.to_dict(), + "validation": val_result.to_dict(), + "gate_decision": { + "accepted": decision.accepted, + "reason": decision.reason, + }, + } + j = json.dumps(full_output, ensure_ascii=False, indent=2) + assert len(j) > 2000 + + print(f"\n Gate decision: accepted={decision.accepted} reason={decision.reason[:80]}") + print(f" Val delta: {val_result.summary.avg_score_delta:+.3f}") + print(f" Improved: {val_result.summary.improved} Regressed: {val_result.summary.regressed}")