Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,19 @@ python -m pip install -e .
python -m pytest -q
```

## Strategy lifecycle CLI

`quant-lifecycle` provides the shared lifecycle entrypoint formerly wrapped by `QuantStrategyLifecycle`.
Production schedules should live in domain repositories and call this CLI or the underlying
`quant_platform_kit.strategy_lifecycle` modules directly.

```bash
quant-lifecycle monitor --domain us_equity
quant-lifecycle drift --domain us_equity
quant-lifecycle autopilot --domain us_equity --dry-run
quant-lifecycle dashboard --format all
```

## Cloud provider abstraction

QuantPlatformKit includes a cloud provider abstraction layer at `quant_platform_kit.cloud`. It defines protocol interfaces for common cloud services — secret management, object storage, document databases, compute discovery, and deployment context — so that platform code can be written without hard-wiring to a specific cloud provider.
Expand Down
13 changes: 13 additions & 0 deletions README.zh-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,19 @@ python -m pip install -e .
python -m pytest -q
```

## 策略生命周期 CLI

`quant-lifecycle` 是原 `QuantStrategyLifecycle` wrapper 职责迁入后的共享入口。
生产定时任务应放在各 domain 仓库,并调用这个 CLI 或底层
`quant_platform_kit.strategy_lifecycle` 模块。

```bash
quant-lifecycle monitor --domain us_equity
quant-lifecycle drift --domain us_equity
quant-lifecycle autopilot --domain us_equity --dry-run
quant-lifecycle dashboard --format all
```

## 云服务抽象层

`quant_platform_kit.cloud` 包为常用云服务定义了协议接口——密钥管理、对象存储、文档数据库、计算发现和部署上下文。平台代码可以通过这些接口编写,无需硬编码到特定云厂商。
Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ authors = [
]
dependencies = []

[project.scripts]
quant-lifecycle = "quant_platform_kit.strategy_lifecycle.cli:main"

[tool.setuptools]
package-dir = { "" = "src" }

Expand Down
224 changes: 224 additions & 0 deletions src/quant_platform_kit/strategy_lifecycle/cli.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,224 @@
"""Command-line entrypoint for strategy lifecycle operations."""

from __future__ import annotations

import argparse
import importlib
import sys
from collections.abc import Callable, Sequence
from typing import Any


def _load_callable(module_name: str, function_name: str) -> Callable[..., Any]:
module = importlib.import_module(module_name)
return getattr(module, function_name)


def _print(message: str) -> None:
print(message)


def _run_monitor(args: argparse.Namespace) -> int:
_print(f"[monitor] Running performance monitor for domain={args.domain}")
run_monitor = _load_callable(
"quant_platform_kit.strategy_lifecycle.performance_monitor",
"run_monitor",
)
snapshots = run_monitor(
domain=args.domain,
strategy_profile=args.strategy,
output_dir=args.output_dir,
)
_print(f"[monitor] Generated {len(snapshots)} performance snapshots")
return 0


def _run_drift(args: argparse.Namespace) -> int:
_print(f"[drift] Running drift detection for domain={args.domain}")
run_drift_detection = _load_callable(
"quant_platform_kit.strategy_lifecycle.drift_detector",
"run_drift_detection",
)
results = run_drift_detection(domain=args.domain, strategy_profile=args.strategy)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Publish drift alerts from the drift command

For direct schedules that run quant-lifecycle drift, this only calls run_drift_detection, which saves drift results but does not build or publish notification events; the existing alert path is in strategy_lifecycle.drift_alerts (build_drift_alert/publish_drift_alerts). Because the new subcommand help says it emits alerts, operators can end up with critical/review drift persisted but no Telegram/email/webhook alert sent.

Useful? React with 👍 / 👎.

critical_count = sum(1 for item in results if getattr(getattr(item, "status", None), "value", None) == "critical")
review_count = sum(1 for item in results if getattr(getattr(item, "status", None), "value", None) == "review")
_print(f"[drift] {len(results)} strategies checked, {critical_count} critical, {review_count} review")
if not getattr(args, "no_alerts", False):
build_drift_alert = _load_callable(
"quant_platform_kit.strategy_lifecycle.drift_alerts",
"build_drift_alert",
)
publish_drift_alerts = _load_callable(
"quant_platform_kit.strategy_lifecycle.drift_alerts",
"publish_drift_alerts",
)
events = [event for event in (build_drift_alert(result) for result in results) if event is not None]
counts = publish_drift_alerts(events, dry_run=getattr(args, "dry_run_alerts", False))
_print(f"[drift] Alerts published: {sum(counts.values())}")
return 0


def _run_optimize(args: argparse.Namespace) -> int:
_print(f"[optimize] Optimizing {args.strategy} with method={args.method}")
run_optimization = _load_callable(
"quant_platform_kit.strategy_lifecycle.param_optimizer",
"run_optimization",
)
proposal = run_optimization(strategy_profile=args.strategy, method=args.method)
_print(f"[optimize] Recommendation: {getattr(proposal, 'recommendation', '')}")
improvement_score = getattr(proposal, "improvement_score", None)
if improvement_score is not None:
_print(f"[optimize] Improvement score: {improvement_score:.3f}")
return 0


def _run_update(args: argparse.Namespace) -> int:
_print(f"[update] Processing proposal: {args.proposal}")
process_update = _load_callable(
"quant_platform_kit.strategy_lifecycle.update_orchestrator",
"process_update",
)
result = process_update(proposal_path=args.proposal, auto_approve=args.auto_approve)
_print(f"[update] Result: stage={result.get('stage')}, reason={result.get('reason', '')}")
return 1 if result.get("stage") == "error" else 0


def _run_dashboard(args: argparse.Namespace) -> int:
_print(f"[dashboard] Building health dashboard (format={args.output_format})")
build_dashboard = _load_callable(
"quant_platform_kit.strategy_lifecycle.health_dashboard",
"build_dashboard",
)
result = build_dashboard(output_dir=args.output_dir, output_format=args.output_format)
_print(f"[dashboard] Dashboard built with {result.get('strategy_count', 0)} strategies")
return 0


def _run_autopilot(args: argparse.Namespace) -> int:
_print(f"[autopilot] Running auto-pilot cycle for domain={args.domain} (dry_run={args.dry_run})")
run_auto_pilot_cycle = _load_callable(
"quant_platform_kit.strategy_lifecycle.codex_integration",
"run_auto_pilot_cycle",
)
summary = run_auto_pilot_cycle(
args.domain,
dry_run=args.dry_run,
create_issues=not args.no_issues,
trigger_optimization=True,
)
_print(f"[autopilot] Snapshots: {summary.get('snapshots_count', 0)}")
_print(f"[autopilot] Drifts checked: {summary.get('drifts_checked', 0)}")
_print(f"[autopilot] Drifts alerting: {summary.get('drifts_alerting', 0)}")
_print(f"[autopilot] Issues created: {summary.get('issues_created', 0)}")
_print(f"[autopilot] Actions: {len(summary.get('actions', []))}")
for action in summary.get("actions", []):
decision = action.get("ai_decision", {})
_print(
f" - {action['strategy']}: drift={action['drift_status']}, "
f"optimize={decision.get('optimization_needed')}, "
f"method={decision.get('recommended_method', 'none')}"
)
return 0


def _run_lifecycle(args: argparse.Namespace) -> int:
_print(f"[lifecycle] Running full lifecycle for domain={args.domain}")
_print("[lifecycle] Step: monitor")
monitor_status = _run_monitor(argparse.Namespace(domain=args.domain, strategy=None, output_dir=None))
if monitor_status != 0:
return monitor_status

_print("[lifecycle] Step: drift")
drift_status = _run_drift(
argparse.Namespace(
domain=args.domain,
strategy=None,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Propagate the lifecycle strategy filter

When a domain schedule runs quant-lifecycle lifecycle --domain us_equity --strategy foo, this hard-codes the drift stage to strategy=None (and the monitor stage above does the same), so the lifecycle still processes and can publish drift alerts for every strategy in the domain while only optimizing foo. The standalone monitor and drift commands already pass --strategy through, so a strategy-scoped lifecycle run can produce unrelated snapshots/alerts and extra work; pass args.strategy into these stages or clarify/remove the option.

Useful? React with 👍 / 👎.

no_alerts=args.no_alerts,
dry_run_alerts=args.dry_run_alerts,
)
)
if drift_status != 0:
return drift_status

if not args.skip_optimization:
if args.strategy:
_print("[lifecycle] Step: optimize")
optimize_status = _run_optimize(argparse.Namespace(strategy=args.strategy, method=args.method))
if optimize_status != 0:
return optimize_status
else:
_print("[lifecycle] Step: optimize skipped (no --strategy provided)")

_print("[lifecycle] Step: dashboard")
dashboard_status = _run_dashboard(argparse.Namespace(output_dir=None, output_format=args.output_format))
if dashboard_status != 0:
return dashboard_status

_print("[lifecycle] Full lifecycle complete")
return 0


def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(prog="quant-lifecycle", description="Quant strategy lifecycle CLI.")
parser.add_argument("--version", action="version", version="quant-lifecycle 0.10.0")
subparsers = parser.add_subparsers(dest="command", required=True)

monitor = subparsers.add_parser("monitor", help="Run the continuous performance monitor for one domain.")
monitor.add_argument("--domain", default="us_equity")
monitor.add_argument("--strategy", default=None)
monitor.add_argument("--output-dir", default=None)
monitor.set_defaults(func=_run_monitor)

drift = subparsers.add_parser("drift", help="Run drift detection and publish drift alerts.")
drift.add_argument("--domain", default="us_equity")
drift.add_argument("--strategy", default=None)
drift.add_argument("--no-alerts", action="store_true")
drift.add_argument("--dry-run-alerts", action="store_true")
drift.set_defaults(func=_run_drift)

optimize = subparsers.add_parser("optimize", help="Run parameter optimization for one strategy.")
optimize.add_argument("--strategy", required=True)
optimize.add_argument("--method", default="grid_search")
optimize.set_defaults(func=_run_optimize)

update = subparsers.add_parser("update", help="Process a parameter update proposal.")
update.add_argument("--proposal", required=True)
update.add_argument("--auto-approve", action="store_true")
update.set_defaults(func=_run_update)

dashboard = subparsers.add_parser("dashboard", help="Build the unified strategy health dashboard.")
dashboard.add_argument("--output-dir", default=None)
dashboard.add_argument("--format", dest="output_format", default="all")
dashboard.set_defaults(func=_run_dashboard)

autopilot = subparsers.add_parser("autopilot", help="Run a full auto-pilot cycle.")
autopilot.add_argument("--domain", default="us_equity")
autopilot.add_argument("--dry-run", action="store_true")
autopilot.add_argument("--no-issues", action="store_true")
autopilot.set_defaults(func=_run_autopilot)

lifecycle = subparsers.add_parser("lifecycle", help="Run the full lifecycle pipeline.")
lifecycle.add_argument("--domain", default="us_equity")
lifecycle.add_argument("--strategy", default=None)
lifecycle.add_argument("--method", default="grid_search")
lifecycle.add_argument("--format", dest="output_format", default="all")
lifecycle.add_argument("--skip-optimization", action="store_true")
lifecycle.add_argument("--no-alerts", action="store_true")
lifecycle.add_argument("--dry-run-alerts", action="store_true")
lifecycle.set_defaults(func=_run_lifecycle)

return parser


def main(argv: Sequence[str] | None = None) -> int:
parser = build_parser()
args = parser.parse_args(argv)
try:
return int(args.func(args))
except Exception as exc: # noqa: BLE001
print(f"[{args.command}] Error: {exc}", file=sys.stderr)
return 1


if __name__ == "__main__":
raise SystemExit(main())
Loading
Loading