Files
coinhunter-cli/src/coinhunter/services/smart_executor_service.py

112 lines
4.4 KiB
Python

"""Service entrypoint for smart executor workflows."""
from __future__ import annotations
import os
import sys
from .. import smart_executor as smart_executor_module
def run(argv: list[str] | None = None) -> int:
argv = list(sys.argv[1:] if argv is None else argv)
args, normalized_argv = smart_executor_module.parse_cli_args(argv)
action = args.command.replace("-", "_")
argv_tail = smart_executor_module.cli_action_args(args, action)
decision_id = (
args.decision_id
or os.getenv("DECISION_ID")
or smart_executor_module.default_decision_id(action, normalized_argv)
)
if args.dry_run:
smart_executor_module.DRY_RUN = True
previous = smart_executor_module.get_execution_state(decision_id)
read_only_action = action in {"balance", "balances", "status"}
if previous and previous.get("status") == "success" and not read_only_action:
smart_executor_module.log(f"⚠️ decision_id={decision_id} 已执行成功,跳过重复执行")
return 0
try:
ex = smart_executor_module.get_exchange()
if read_only_action:
if action in {"balance", "balances"}:
smart_executor_module.command_balances(ex)
else:
smart_executor_module.command_status(ex)
return 0
decision_context = smart_executor_module.build_decision_context(ex, action, argv_tail, decision_id)
if args.analysis:
decision_context["analysis"] = args.analysis
elif os.getenv("DECISION_ANALYSIS"):
decision_context["analysis"] = os.getenv("DECISION_ANALYSIS")
if args.reasoning:
decision_context["reasoning"] = args.reasoning
elif os.getenv("DECISION_REASONING"):
decision_context["reasoning"] = os.getenv("DECISION_REASONING")
smart_executor_module.record_execution_state(
decision_id,
{"status": "pending", "started_at": smart_executor_module.bj_now_iso(), "action": action, "args": argv_tail},
)
if action == "sell_all":
result = smart_executor_module.action_sell_all(ex, args.symbol, decision_id, decision_context)
elif action == "buy":
result = smart_executor_module.action_buy(ex, args.symbol, float(args.amount_usdt), decision_id, decision_context)
elif action == "rebalance":
result = smart_executor_module.action_rebalance(ex, args.from_symbol, args.to_symbol, decision_id, decision_context)
elif action == "hold":
balances = smart_executor_module.fetch_balances(ex)
positions = smart_executor_module.load_positions()
market_snapshot = smart_executor_module.build_market_snapshot(ex)
smart_executor_module.log_decision(
{
**decision_context,
"balances_after": balances,
"positions_after": positions,
"market_snapshot": market_snapshot,
"analysis": decision_context.get("analysis", "hold"),
"reasoning": decision_context.get("reasoning", "hold"),
"execution_result": {"status": "hold"},
}
)
smart_executor_module.log("😴 决策: 持续持有,无操作")
result = {"status": "hold"}
else:
raise RuntimeError(f"未知动作: {action};请运行 --help 查看正确 CLI 用法")
smart_executor_module.record_execution_state(
decision_id,
{
"status": "success",
"finished_at": smart_executor_module.bj_now_iso(),
"action": action,
"args": argv_tail,
"result": result,
},
)
smart_executor_module.log(f"✅ 执行完成 decision_id={decision_id}")
return 0
except Exception as exc:
smart_executor_module.record_execution_state(
decision_id,
{
"status": "failed",
"finished_at": smart_executor_module.bj_now_iso(),
"action": action,
"args": argv_tail,
"error": str(exc),
},
)
smart_executor_module.log_error(
"smart_executor",
exc,
decision_id=decision_id,
action=action,
args=argv_tail,
)
smart_executor_module.log(f"❌ 执行失败: {exc}")
return 1