diff --git a/src/coinhunter/services/exchange_service.py b/src/coinhunter/services/exchange_service.py new file mode 100644 index 0000000..5f916c6 --- /dev/null +++ b/src/coinhunter/services/exchange_service.py @@ -0,0 +1,125 @@ +"""Exchange helpers (ccxt, markets, balances, order prep).""" +import math +import os + +import ccxt + +from ..runtime import get_runtime_paths, load_env_file +from .trade_common import log + +PATHS = get_runtime_paths() + + +def load_env(): + load_env_file(PATHS) + + +def get_exchange(): + load_env() + api_key = os.getenv("BINANCE_API_KEY") + secret = os.getenv("BINANCE_API_SECRET") + if not api_key or not secret: + raise RuntimeError("缺少 BINANCE_API_KEY 或 BINANCE_API_SECRET") + ex = ccxt.binance( + { + "apiKey": api_key, + "secret": secret, + "options": {"defaultType": "spot", "createMarketBuyOrderRequiresPrice": False}, + "enableRateLimit": True, + } + ) + ex.load_markets() + return ex + + +def norm_symbol(symbol: str) -> str: + s = symbol.upper().replace("-", "").replace("_", "") + if "/" in s: + return s + if s.endswith("USDT"): + return s[:-4] + "/USDT" + raise ValueError(f"不支持的 symbol: {symbol}") + + +def storage_symbol(symbol: str) -> str: + return norm_symbol(symbol).replace("/", "") + + +def fetch_balances(ex): + bal = ex.fetch_balance()["free"] + return {k: float(v) for k, v in bal.items() if float(v) > 0} + + +def build_market_snapshot(ex): + try: + tickers = ex.fetch_tickers() + except Exception: + return {} + snapshot = {} + for sym, t in tickers.items(): + if not sym.endswith("/USDT"): + continue + price = t.get("last") + if price is None or float(price) <= 0: + continue + vol = float(t.get("quoteVolume") or 0) + if vol < 200_000: + continue + base = sym.replace("/", "") + snapshot[base] = { + "lastPrice": round(float(price), 8), + "price24hPcnt": round(float(t.get("percentage") or 0), 4), + "highPrice24h": round(float(t.get("high") or 0), 8) if t.get("high") else None, + "lowPrice24h": round(float(t.get("low") or 0), 8) if t.get("low") else None, + "turnover24h": round(float(vol), 2), + } + return snapshot + + +def market_and_ticker(ex, symbol: str): + sym = norm_symbol(symbol) + market = ex.market(sym) + ticker = ex.fetch_ticker(sym) + return sym, market, ticker + + +def floor_to_step(value: float, step: float) -> float: + if not step or step <= 0: + return value + return math.floor(value / step) * step + + +def prepare_buy_quantity(ex, symbol: str, amount_usdt: float): + from .trade_common import USDT_BUFFER_PCT + + sym, market, ticker = market_and_ticker(ex, symbol) + ask = float(ticker.get("ask") or ticker.get("last") or 0) + if ask <= 0: + raise RuntimeError(f"{sym} 无法获取有效 ask 价格") + budget = amount_usdt * (1 - USDT_BUFFER_PCT) + raw_qty = budget / ask + qty = float(ex.amount_to_precision(sym, raw_qty)) + min_amt = (market.get("limits", {}).get("amount", {}) or {}).get("min") or 0 + min_cost = (market.get("limits", {}).get("cost", {}) or {}).get("min") or 0 + if min_amt and qty < float(min_amt): + raise RuntimeError(f"{sym} 买入数量 {qty} 小于最小数量 {min_amt}") + est_cost = qty * ask + if min_cost and est_cost < float(min_cost): + raise RuntimeError(f"{sym} 买入金额 ${est_cost:.4f} 小于最小成交额 ${float(min_cost):.4f}") + return sym, qty, ask, est_cost + + +def prepare_sell_quantity(ex, symbol: str, free_qty: float): + sym, market, ticker = market_and_ticker(ex, symbol) + bid = float(ticker.get("bid") or ticker.get("last") or 0) + if bid <= 0: + raise RuntimeError(f"{sym} 无法获取有效 bid 价格") + qty = float(ex.amount_to_precision(sym, free_qty)) + min_amt = (market.get("limits", {}).get("amount", {}) or {}).get("min") or 0 + min_cost = (market.get("limits", {}).get("cost", {}) or {}).get("min") or 0 + if min_amt and qty < float(min_amt): + raise RuntimeError(f"{sym} 卖出数量 {qty} 小于最小数量 {min_amt}") + est_cost = qty * bid + if min_cost and est_cost < float(min_cost): + raise RuntimeError(f"{sym} 卖出金额 ${est_cost:.4f} 小于最小成交额 ${float(min_cost):.4f}") + return sym, qty, bid, est_cost diff --git a/src/coinhunter/services/execution_state.py b/src/coinhunter/services/execution_state.py new file mode 100644 index 0000000..3a34a1a --- /dev/null +++ b/src/coinhunter/services/execution_state.py @@ -0,0 +1,39 @@ +"""Execution state helpers (decision deduplication, executions.json).""" +import hashlib + +from ..runtime import get_runtime_paths +from .file_utils import load_json_locked, save_json_locked +from .trade_common import bj_now_iso + +PATHS = get_runtime_paths() +EXECUTIONS_FILE = PATHS.executions_file +EXECUTIONS_LOCK = PATHS.executions_lock + + +def default_decision_id(action: str, argv_tail: list[str]) -> str: + from datetime import datetime + from .trade_common import CST + + now = datetime.now(CST) + bucket_min = (now.minute // 15) * 15 + bucket = now.strftime(f"%Y%m%dT%H{bucket_min:02d}") + raw = f"{bucket}|{action}|{'|'.join(argv_tail)}" + return hashlib.sha1(raw.encode()).hexdigest()[:16] + + +def load_executions() -> dict: + return load_json_locked(EXECUTIONS_FILE, EXECUTIONS_LOCK, {"executions": {}}).get("executions", {}) + + +def save_executions(executions: dict): + save_json_locked(EXECUTIONS_FILE, EXECUTIONS_LOCK, {"executions": executions}) + + +def record_execution_state(decision_id: str, payload: dict): + executions = load_executions() + executions[decision_id] = payload + save_executions(executions) + + +def get_execution_state(decision_id: str): + return load_executions().get(decision_id) diff --git a/src/coinhunter/services/file_utils.py b/src/coinhunter/services/file_utils.py new file mode 100644 index 0000000..661332f --- /dev/null +++ b/src/coinhunter/services/file_utils.py @@ -0,0 +1,40 @@ +"""File locking and atomic JSON helpers.""" +import fcntl +import json +import os +from contextlib import contextmanager +from pathlib import Path + + +@contextmanager +def locked_file(path: Path): + path.parent.mkdir(parents=True, exist_ok=True) + with open(path, "a+", encoding="utf-8") as f: + fcntl.flock(f.fileno(), fcntl.LOCK_EX) + f.seek(0) + yield f + f.flush() + os.fsync(f.fileno()) + fcntl.flock(f.fileno(), fcntl.LOCK_UN) + + +def atomic_write_json(path: Path, data: dict): + path.parent.mkdir(parents=True, exist_ok=True) + tmp = path.with_suffix(path.suffix + ".tmp") + tmp.write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8") + os.replace(tmp, path) + + +def load_json_locked(path: Path, lock_path: Path, default): + with locked_file(lock_path): + if not path.exists(): + return default + try: + return json.loads(path.read_text(encoding="utf-8")) + except Exception: + return default + + +def save_json_locked(path: Path, lock_path: Path, data: dict): + with locked_file(lock_path): + atomic_write_json(path, data) diff --git a/src/coinhunter/services/portfolio_service.py b/src/coinhunter/services/portfolio_service.py new file mode 100644 index 0000000..4478553 --- /dev/null +++ b/src/coinhunter/services/portfolio_service.py @@ -0,0 +1,57 @@ +"""Portfolio state helpers (positions.json, reconcile with exchange).""" +from ..runtime import get_runtime_paths +from .file_utils import load_json_locked, save_json_locked +from .trade_common import bj_now_iso + +PATHS = get_runtime_paths() +POSITIONS_FILE = PATHS.positions_file +POSITIONS_LOCK = PATHS.positions_lock + + +def load_positions() -> list: + return load_json_locked(POSITIONS_FILE, POSITIONS_LOCK, {"positions": []}).get("positions", []) + + +def save_positions(positions: list): + save_json_locked(POSITIONS_FILE, POSITIONS_LOCK, {"positions": positions}) + + +def upsert_position(positions: list, position: dict): + sym = position["symbol"] + for i, existing in enumerate(positions): + if existing.get("symbol") == sym: + positions[i] = position + return positions + positions.append(position) + return positions + + +def reconcile_positions_with_exchange(ex, positions: list): + from .exchange_service import fetch_balances + + balances = fetch_balances(ex) + existing_by_symbol = {p.get("symbol"): p for p in positions} + reconciled = [] + for asset, qty in balances.items(): + if asset == "USDT": + continue + if qty <= 0: + continue + sym = f"{asset}USDT" + old = existing_by_symbol.get(sym, {}) + reconciled.append( + { + "account_id": old.get("account_id", "binance-main"), + "symbol": sym, + "base_asset": asset, + "quote_asset": "USDT", + "market_type": "spot", + "quantity": qty, + "avg_cost": old.get("avg_cost"), + "opened_at": old.get("opened_at", bj_now_iso()), + "updated_at": bj_now_iso(), + "note": old.get("note", "Reconciled from Binance balances"), + } + ) + save_positions(reconciled) + return reconciled, balances diff --git a/src/coinhunter/services/smart_executor_parser.py b/src/coinhunter/services/smart_executor_parser.py new file mode 100644 index 0000000..9821751 --- /dev/null +++ b/src/coinhunter/services/smart_executor_parser.py @@ -0,0 +1,145 @@ +"""CLI parser and legacy argument normalization for smart executor.""" +import argparse + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + description="Coin Hunter Smart Executor", + formatter_class=argparse.RawTextHelpFormatter, + epilog=( + "示例:\n" + " python smart_executor.py hold\n" + " python smart_executor.py sell-all ETHUSDT\n" + " python smart_executor.py buy ENJUSDT 100\n" + " python smart_executor.py rebalance PEPEUSDT ETHUSDT\n" + " python smart_executor.py balances\n\n" + "兼容旧调用:\n" + " python smart_executor.py HOLD\n" + " python smart_executor.py --decision HOLD --dry-run\n" + ), + ) + parser.add_argument("--decision-id", help="Override decision id (otherwise derived automatically)") + parser.add_argument("--analysis", help="Decision analysis text to persist into logs") + parser.add_argument("--reasoning", help="Decision reasoning text to persist into logs") + parser.add_argument("--dry-run", action="store_true", help="Force dry-run mode for this invocation") + + subparsers = parser.add_subparsers(dest="command") + + subparsers.add_parser("hold", help="Log a HOLD decision without trading") + subparsers.add_parser("balances", help="Print live balances as JSON") + subparsers.add_parser("balance", help="Alias of balances") + subparsers.add_parser("status", help="Print balances + positions + snapshot as JSON") + + sell_all = subparsers.add_parser("sell-all", help="Sell all of one symbol") + sell_all.add_argument("symbol") + sell_all_legacy = subparsers.add_parser("sell_all", help=argparse.SUPPRESS) + sell_all_legacy.add_argument("symbol") + + buy = subparsers.add_parser("buy", help="Buy symbol with USDT amount") + buy.add_argument("symbol") + buy.add_argument("amount_usdt", type=float) + + rebalance = subparsers.add_parser("rebalance", help="Sell one symbol and rotate to another") + rebalance.add_argument("from_symbol") + rebalance.add_argument("to_symbol") + + return parser + + +def normalize_legacy_argv(argv: list[str]) -> list[str]: + if not argv: + return argv + + action_aliases = { + "HOLD": ["hold"], + "hold": ["hold"], + "SELL_ALL": ["sell-all"], + "sell_all": ["sell-all"], + "sell-all": ["sell-all"], + "BUY": ["buy"], + "buy": ["buy"], + "REBALANCE": ["rebalance"], + "rebalance": ["rebalance"], + "BALANCE": ["balances"], + "balance": ["balances"], + "BALANCES": ["balances"], + "balances": ["balances"], + "STATUS": ["status"], + "status": ["status"], + } + + has_legacy_flag = any(t.startswith("--decision") for t in argv) + if not has_legacy_flag: + for idx, token in enumerate(argv): + if token in action_aliases: + prefix = argv[:idx] + suffix = argv[idx + 1 :] + return prefix + action_aliases[token] + suffix + + if argv[0].startswith("-"): + legacy = argparse.ArgumentParser(add_help=False) + legacy.add_argument("--decision") + legacy.add_argument("--symbol") + legacy.add_argument("--from-symbol") + legacy.add_argument("--to-symbol") + legacy.add_argument("--amount-usdt", type=float) + legacy.add_argument("--decision-id") + legacy.add_argument("--analysis") + legacy.add_argument("--reasoning") + legacy.add_argument("--dry-run", action="store_true") + ns, unknown = legacy.parse_known_args(argv) + + if ns.decision: + decision = (ns.decision or "").strip().upper() + rebuilt = [] + if ns.decision_id: + rebuilt += ["--decision-id", ns.decision_id] + if ns.analysis: + rebuilt += ["--analysis", ns.analysis] + if ns.reasoning: + rebuilt += ["--reasoning", ns.reasoning] + if ns.dry_run: + rebuilt += ["--dry-run"] + + if decision == "HOLD": + rebuilt += ["hold"] + elif decision == "SELL_ALL": + if not ns.symbol: + raise RuntimeError("旧式 --decision SELL_ALL 需要搭配 --symbol") + rebuilt += ["sell-all", ns.symbol] + elif decision == "BUY": + if not ns.symbol or ns.amount_usdt is None: + raise RuntimeError("旧式 --decision BUY 需要 --symbol 和 --amount-usdt") + rebuilt += ["buy", ns.symbol, str(ns.amount_usdt)] + elif decision == "REBALANCE": + if not ns.from_symbol or not ns.to_symbol: + raise RuntimeError("旧式 --decision REBALANCE 需要 --from-symbol 和 --to-symbol") + rebuilt += ["rebalance", ns.from_symbol, ns.to_symbol] + else: + raise RuntimeError(f"不支持的旧式 decision: {decision}") + + return rebuilt + unknown + + return argv + + +def parse_cli_args(argv: list[str]): + parser = build_parser() + normalized = normalize_legacy_argv(argv) + args = parser.parse_args(normalized) + if not args.command: + parser.print_help() + raise SystemExit(1) + if args.command == "sell_all": + args.command = "sell-all" + return args, normalized + + +def cli_action_args(args, action: str) -> list[str]: + if action == "sell_all": + return [args.symbol] + if action == "buy": + return [args.symbol, str(args.amount_usdt)] + if action == "rebalance": + return [args.from_symbol, args.to_symbol] + return [] diff --git a/src/coinhunter/services/smart_executor_service.py b/src/coinhunter/services/smart_executor_service.py index 4d122bd..fe82968 100644 --- a/src/coinhunter/services/smart_executor_service.py +++ b/src/coinhunter/services/smart_executor_service.py @@ -5,38 +5,54 @@ from __future__ import annotations import os import sys -from .. import smart_executor as smart_executor_module +from ..logger import log_decision, log_error +from .exchange_service import fetch_balances, build_market_snapshot +from .execution_state import default_decision_id, get_execution_state, record_execution_state +from .portfolio_service import load_positions +from .smart_executor_parser import parse_cli_args, cli_action_args +from .trade_common import is_dry_run, log, set_dry_run, bj_now_iso +from .trade_execution import ( + command_balances, + command_status, + build_decision_context, + action_sell_all, + action_buy, + action_rebalance, +) def run(argv: list[str] | None = None) -> int: argv = list(sys.argv[1:] if argv is None else argv) - args, normalized_argv = smart_executor_module.parse_cli_args(argv) + args, normalized_argv = parse_cli_args(argv) action = args.command.replace("-", "_") - argv_tail = smart_executor_module.cli_action_args(args, action) + argv_tail = cli_action_args(args, action) decision_id = ( args.decision_id or os.getenv("DECISION_ID") - or smart_executor_module.default_decision_id(action, normalized_argv) + or default_decision_id(action, normalized_argv) ) - if args.dry_run: - smart_executor_module.DRY_RUN = True - previous = smart_executor_module.get_execution_state(decision_id) + if args.dry_run: + set_dry_run(True) + + previous = get_execution_state(decision_id) read_only_action = action in {"balance", "balances", "status"} if previous and previous.get("status") == "success" and not read_only_action: - smart_executor_module.log(f"⚠️ decision_id={decision_id} 已执行成功,跳过重复执行") + log(f"⚠️ decision_id={decision_id} 已执行成功,跳过重复执行") return 0 try: - ex = smart_executor_module.get_exchange() + from .exchange_service import get_exchange + ex = get_exchange() + if read_only_action: if action in {"balance", "balances"}: - smart_executor_module.command_balances(ex) + command_balances(ex) else: - smart_executor_module.command_status(ex) + command_status(ex) return 0 - decision_context = smart_executor_module.build_decision_context(ex, action, argv_tail, decision_id) + decision_context = build_decision_context(ex, action, argv_tail, decision_id) if args.analysis: decision_context["analysis"] = args.analysis elif os.getenv("DECISION_ANALYSIS"): @@ -46,22 +62,22 @@ def run(argv: list[str] | None = None) -> int: elif os.getenv("DECISION_REASONING"): decision_context["reasoning"] = os.getenv("DECISION_REASONING") - smart_executor_module.record_execution_state( + record_execution_state( decision_id, - {"status": "pending", "started_at": smart_executor_module.bj_now_iso(), "action": action, "args": argv_tail}, + {"status": "pending", "started_at": bj_now_iso(), "action": action, "args": argv_tail}, ) if action == "sell_all": - result = smart_executor_module.action_sell_all(ex, args.symbol, decision_id, decision_context) + result = action_sell_all(ex, args.symbol, decision_id, decision_context) elif action == "buy": - result = smart_executor_module.action_buy(ex, args.symbol, float(args.amount_usdt), decision_id, decision_context) + result = action_buy(ex, args.symbol, float(args.amount_usdt), decision_id, decision_context) elif action == "rebalance": - result = smart_executor_module.action_rebalance(ex, args.from_symbol, args.to_symbol, decision_id, decision_context) + result = action_rebalance(ex, args.from_symbol, args.to_symbol, decision_id, decision_context) elif action == "hold": - balances = smart_executor_module.fetch_balances(ex) - positions = smart_executor_module.load_positions() - market_snapshot = smart_executor_module.build_market_snapshot(ex) - smart_executor_module.log_decision( + balances = fetch_balances(ex) + positions = load_positions() + market_snapshot = build_market_snapshot(ex) + log_decision( { **decision_context, "balances_after": balances, @@ -72,40 +88,41 @@ def run(argv: list[str] | None = None) -> int: "execution_result": {"status": "hold"}, } ) - smart_executor_module.log("😴 决策: 持续持有,无操作") + log("😴 决策: 持续持有,无操作") result = {"status": "hold"} else: raise RuntimeError(f"未知动作: {action};请运行 --help 查看正确 CLI 用法") - smart_executor_module.record_execution_state( + record_execution_state( decision_id, { "status": "success", - "finished_at": smart_executor_module.bj_now_iso(), + "finished_at": bj_now_iso(), "action": action, "args": argv_tail, "result": result, }, ) - smart_executor_module.log(f"✅ 执行完成 decision_id={decision_id}") + log(f"✅ 执行完成 decision_id={decision_id}") return 0 + except Exception as exc: - smart_executor_module.record_execution_state( + record_execution_state( decision_id, { "status": "failed", - "finished_at": smart_executor_module.bj_now_iso(), + "finished_at": bj_now_iso(), "action": action, "args": argv_tail, "error": str(exc), }, ) - smart_executor_module.log_error( + log_error( "smart_executor", exc, decision_id=decision_id, action=action, args=argv_tail, ) - smart_executor_module.log(f"❌ 执行失败: {exc}") + log(f"❌ 执行失败: {exc}") return 1 diff --git a/src/coinhunter/services/trade_common.py b/src/coinhunter/services/trade_common.py new file mode 100644 index 0000000..0d1f454 --- /dev/null +++ b/src/coinhunter/services/trade_common.py @@ -0,0 +1,25 @@ +"""Common trade utilities (time, logging, constants).""" +import os +from datetime import datetime, timezone, timedelta + +CST = timezone(timedelta(hours=8)) + +_DRY_RUN = {"value": os.getenv("DRY_RUN", "false").lower() == "true"} +USDT_BUFFER_PCT = 0.03 +MIN_REMAINING_DUST_USDT = 1.0 + + +def is_dry_run() -> bool: + return _DRY_RUN["value"] + + +def set_dry_run(value: bool): + _DRY_RUN["value"] = value + + +def log(msg: str): + print(f"[{datetime.now(CST).strftime('%Y-%m-%d %H:%M:%S')} CST] {msg}") + + +def bj_now_iso(): + return datetime.now(CST).isoformat() diff --git a/src/coinhunter/services/trade_execution.py b/src/coinhunter/services/trade_execution.py new file mode 100644 index 0000000..778fa88 --- /dev/null +++ b/src/coinhunter/services/trade_execution.py @@ -0,0 +1,178 @@ +"""Trade execution actions (buy, sell, rebalance, hold, status).""" +from ..logger import log_decision, log_trade +from .exchange_service import ( + fetch_balances, + norm_symbol, + storage_symbol, + build_market_snapshot, + prepare_buy_quantity, + prepare_sell_quantity, +) +from .portfolio_service import load_positions, save_positions, upsert_position, reconcile_positions_with_exchange +from .trade_common import is_dry_run, USDT_BUFFER_PCT, log, bj_now_iso + + +def build_decision_context(ex, action: str, argv_tail: list[str], decision_id: str): + balances = fetch_balances(ex) + positions = load_positions() + return { + "decision_id": decision_id, + "balances_before": balances, + "positions_before": positions, + "decision": action.upper(), + "action_taken": f"{action} {' '.join(argv_tail)}".strip(), + "risk_level": "high" if len(positions) <= 1 else "medium", + "data_sources": ["binance"], + } + + +def market_sell(ex, symbol: str, qty: float, decision_id: str): + sym, qty, bid, est_cost = prepare_sell_quantity(ex, symbol, qty) + if is_dry_run(): + log(f"[DRY RUN] 卖出 {sym} 数量 {qty}") + return {"id": f"dry-sell-{decision_id}", "symbol": sym, "amount": qty, "price": bid, "cost": est_cost, "status": "closed"} + order = ex.create_market_sell_order(sym, qty, params={"newClientOrderId": f"ch-{decision_id}-sell"}) + return order + + +def market_buy(ex, symbol: str, amount_usdt: float, decision_id: str): + sym, qty, ask, est_cost = prepare_buy_quantity(ex, symbol, amount_usdt) + if is_dry_run(): + log(f"[DRY RUN] 买入 {sym} 金额 ${est_cost:.4f} 数量 {qty}") + return {"id": f"dry-buy-{decision_id}", "symbol": sym, "amount": qty, "price": ask, "cost": est_cost, "status": "closed"} + order = ex.create_market_buy_order(sym, qty, params={"newClientOrderId": f"ch-{decision_id}-buy"}) + return order + + +def action_sell_all(ex, symbol: str, decision_id: str, decision_context: dict): + balances_before = fetch_balances(ex) + base = norm_symbol(symbol).split("/")[0] + qty = float(balances_before.get(base, 0)) + if qty <= 0: + raise RuntimeError(f"{base} 余额为0,无法卖出") + order = market_sell(ex, symbol, qty, decision_id) + positions_after, balances_after = ( + reconcile_positions_with_exchange(ex, load_positions()) + if not is_dry_run() + else (load_positions(), balances_before) + ) + log_trade( + "SELL_ALL", + norm_symbol(symbol), + qty=order.get("amount"), + price=order.get("price"), + amount_usdt=order.get("cost"), + note="Smart executor sell_all", + decision_id=decision_id, + order_id=order.get("id"), + status=order.get("status"), + balances_before=balances_before, + balances_after=balances_after, + ) + log_decision( + { + **decision_context, + "balances_after": balances_after, + "positions_after": positions_after, + "execution_result": {"order": order}, + "analysis": decision_context.get("analysis", ""), + "reasoning": decision_context.get("reasoning", "sell_all execution"), + } + ) + return order + + +def action_buy(ex, symbol: str, amount_usdt: float, decision_id: str, decision_context: dict, simulated_usdt_balance: float = None): + balances_before = fetch_balances(ex) if simulated_usdt_balance is None else {"USDT": simulated_usdt_balance} + usdt = float(balances_before.get("USDT", 0)) + if usdt < amount_usdt: + raise RuntimeError(f"USDT 余额不足(${usdt:.4f} < ${amount_usdt:.4f})") + order = market_buy(ex, symbol, amount_usdt, decision_id) + positions_existing = load_positions() + sym_store = storage_symbol(symbol) + price = float(order.get("price") or 0) + qty = float(order.get("amount") or 0) + position = { + "account_id": "binance-main", + "symbol": sym_store, + "base_asset": norm_symbol(symbol).split("/")[0], + "quote_asset": "USDT", + "market_type": "spot", + "quantity": qty, + "avg_cost": price, + "opened_at": bj_now_iso(), + "updated_at": bj_now_iso(), + "note": "Smart executor entry", + } + upsert_position(positions_existing, position) + if is_dry_run(): + balances_after = balances_before + positions_after = positions_existing + else: + save_positions(positions_existing) + positions_after, balances_after = reconcile_positions_with_exchange(ex, positions_existing) + for p in positions_after: + if p["symbol"] == sym_store and price: + p["avg_cost"] = price + p["updated_at"] = bj_now_iso() + save_positions(positions_after) + log_trade( + "BUY", + norm_symbol(symbol), + qty=qty, + amount_usdt=order.get("cost"), + price=price, + note="Smart executor buy", + decision_id=decision_id, + order_id=order.get("id"), + status=order.get("status"), + balances_before=balances_before, + balances_after=balances_after, + ) + log_decision( + { + **decision_context, + "balances_after": balances_after, + "positions_after": positions_after, + "execution_result": {"order": order}, + "analysis": decision_context.get("analysis", ""), + "reasoning": decision_context.get("reasoning", "buy execution"), + } + ) + return order + + +def action_rebalance(ex, from_symbol: str, to_symbol: str, decision_id: str, decision_context: dict): + sell_order = action_sell_all(ex, from_symbol, decision_id + "s", decision_context) + if is_dry_run(): + sell_cost = float(sell_order.get("cost") or 0) + spend = sell_cost * (1 - USDT_BUFFER_PCT) + simulated_usdt = sell_cost + else: + balances = fetch_balances(ex) + usdt = float(balances.get("USDT", 0)) + spend = usdt * (1 - USDT_BUFFER_PCT) + simulated_usdt = None + if spend < 5: + raise RuntimeError(f"卖出后 USDT ${spend:.4f} 不足,无法买入新币") + buy_order = action_buy(ex, to_symbol, spend, decision_id + "b", decision_context, simulated_usdt_balance=simulated_usdt) + return {"sell": sell_order, "buy": buy_order} + + +def command_status(ex): + balances = fetch_balances(ex) + positions = load_positions() + market_snapshot = build_market_snapshot(ex) + payload = { + "balances": balances, + "positions": positions, + "market_snapshot": market_snapshot, + } + print(payload) + return payload + + +def command_balances(ex): + balances = fetch_balances(ex) + print({"balances": balances}) + return balances diff --git a/src/coinhunter/smart_executor.py b/src/coinhunter/smart_executor.py index 954bb02..8269aef 100755 --- a/src/coinhunter/smart_executor.py +++ b/src/coinhunter/smart_executor.py @@ -1,547 +1,29 @@ #!/usr/bin/env python3 -"""Coin Hunter robust smart executor. +"""Coin Hunter robust smart executor — compatibility facade.""" -Supports both: -1. Clear argparse CLI subcommands for humans / cron prompts -2. Backward-compatible legacy flag forms previously emitted by AI agents -""" -import argparse -import fcntl -import hashlib -import json -import math -import os import sys -import time -from contextlib import contextmanager -from datetime import datetime, timezone, timedelta -from pathlib import Path -import ccxt - -from .logger import log_decision, log_error, log_trade from .runtime import get_runtime_paths, load_env_file +from .services.trade_common import CST, is_dry_run, USDT_BUFFER_PCT, MIN_REMAINING_DUST_USDT, log, bj_now_iso, set_dry_run +from .services.file_utils import locked_file, atomic_write_json, load_json_locked, save_json_locked +from .services.smart_executor_parser import build_parser, normalize_legacy_argv, parse_cli_args, cli_action_args +from .services.execution_state import default_decision_id, record_execution_state, get_execution_state, load_executions, save_executions +from .services.portfolio_service import load_positions, save_positions, upsert_position, reconcile_positions_with_exchange +from .services.exchange_service import get_exchange, norm_symbol, storage_symbol, fetch_balances, build_market_snapshot, market_and_ticker, floor_to_step, prepare_buy_quantity, prepare_sell_quantity +from .services.trade_execution import build_decision_context, market_sell, market_buy, action_sell_all, action_buy, action_rebalance, command_status, command_balances +from .services.smart_executor_service import run as _run_service PATHS = get_runtime_paths() -POSITIONS_FILE = PATHS.positions_file -POSITIONS_LOCK = PATHS.positions_lock -EXECUTIONS_FILE = PATHS.executions_file -EXECUTIONS_LOCK = PATHS.executions_lock ENV_FILE = PATHS.env_file -DRY_RUN = os.getenv("DRY_RUN", "false").lower() == "true" -USDT_BUFFER_PCT = 0.03 -MIN_REMAINING_DUST_USDT = 1.0 - -CST = timezone(timedelta(hours=8)) - - -def log(msg: str): - print(f"[{datetime.now(CST).strftime('%Y-%m-%d %H:%M:%S')} CST] {msg}") - - -def bj_now_iso(): - return datetime.now(CST).isoformat() def load_env(): load_env_file(PATHS) -@contextmanager -def locked_file(path: Path): - path.parent.mkdir(parents=True, exist_ok=True) - with open(path, "a+", encoding="utf-8") as f: - fcntl.flock(f.fileno(), fcntl.LOCK_EX) - f.seek(0) - yield f - f.flush() - os.fsync(f.fileno()) - fcntl.flock(f.fileno(), fcntl.LOCK_UN) - - -def atomic_write_json(path: Path, data: dict): - path.parent.mkdir(parents=True, exist_ok=True) - tmp = path.with_suffix(path.suffix + ".tmp") - tmp.write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8") - os.replace(tmp, path) - - -def load_json_locked(path: Path, lock_path: Path, default): - with locked_file(lock_path): - if not path.exists(): - return default - try: - return json.loads(path.read_text(encoding="utf-8")) - except Exception: - return default - - -def save_json_locked(path: Path, lock_path: Path, data: dict): - with locked_file(lock_path): - atomic_write_json(path, data) - - -def load_positions() -> list: - return load_json_locked(POSITIONS_FILE, POSITIONS_LOCK, {"positions": []}).get("positions", []) - - -def save_positions(positions: list): - save_json_locked(POSITIONS_FILE, POSITIONS_LOCK, {"positions": positions}) - - -def load_executions() -> dict: - return load_json_locked(EXECUTIONS_FILE, EXECUTIONS_LOCK, {"executions": {}}).get("executions", {}) - - -def save_executions(executions: dict): - save_json_locked(EXECUTIONS_FILE, EXECUTIONS_LOCK, {"executions": executions}) - - -def default_decision_id(action: str, argv_tail: list[str]) -> str: - now = datetime.now(CST) - bucket_min = (now.minute // 15) * 15 - bucket = now.strftime(f"%Y%m%dT%H{bucket_min:02d}") - raw = f"{bucket}|{action}|{'|'.join(argv_tail)}" - return hashlib.sha1(raw.encode()).hexdigest()[:16] - - -def get_exchange(): - load_env() - api_key = os.getenv("BINANCE_API_KEY") - secret = os.getenv("BINANCE_API_SECRET") - if not api_key or not secret: - raise RuntimeError("缺少 BINANCE_API_KEY 或 BINANCE_API_SECRET") - ex = ccxt.binance({ - "apiKey": api_key, - "secret": secret, - "options": {"defaultType": "spot", "createMarketBuyOrderRequiresPrice": False}, - "enableRateLimit": True, - }) - ex.load_markets() - return ex - - -def norm_symbol(symbol: str) -> str: - s = symbol.upper().replace("-", "").replace("_", "") - if "/" in s: - return s - if s.endswith("USDT"): - return s[:-4] + "/USDT" - raise ValueError(f"不支持的 symbol: {symbol}") - - -def storage_symbol(symbol: str) -> str: - return norm_symbol(symbol).replace("/", "") - - -def fetch_balances(ex): - bal = ex.fetch_balance()["free"] - return {k: float(v) for k, v in bal.items() if float(v) > 0} - - -def build_market_snapshot(ex): - try: - tickers = ex.fetch_tickers() - except Exception: - return {} - snapshot = {} - for sym, t in tickers.items(): - if not sym.endswith("/USDT"): - continue - price = t.get("last") - if price is None or float(price) <= 0: - continue - vol = float(t.get("quoteVolume") or 0) - if vol < 200_000: - continue - base = sym.replace("/", "") - snapshot[base] = { - "lastPrice": round(float(price), 8), - "price24hPcnt": round(float(t.get("percentage") or 0), 4), - "highPrice24h": round(float(t.get("high") or 0), 8) if t.get("high") else None, - "lowPrice24h": round(float(t.get("low") or 0), 8) if t.get("low") else None, - "turnover24h": round(float(vol), 2), - } - return snapshot - - -def market_and_ticker(ex, symbol: str): - sym = norm_symbol(symbol) - market = ex.market(sym) - ticker = ex.fetch_ticker(sym) - return sym, market, ticker - - -def floor_to_step(value: float, step: float) -> float: - if not step or step <= 0: - return value - return math.floor(value / step) * step - - -def prepare_buy_quantity(ex, symbol: str, amount_usdt: float): - sym, market, ticker = market_and_ticker(ex, symbol) - ask = float(ticker.get("ask") or ticker.get("last") or 0) - if ask <= 0: - raise RuntimeError(f"{sym} 无法获取有效 ask 价格") - budget = amount_usdt * (1 - USDT_BUFFER_PCT) - raw_qty = budget / ask - qty = float(ex.amount_to_precision(sym, raw_qty)) - min_amt = (market.get("limits", {}).get("amount", {}) or {}).get("min") or 0 - min_cost = (market.get("limits", {}).get("cost", {}) or {}).get("min") or 0 - if min_amt and qty < float(min_amt): - raise RuntimeError(f"{sym} 买入数量 {qty} 小于最小数量 {min_amt}") - est_cost = qty * ask - if min_cost and est_cost < float(min_cost): - raise RuntimeError(f"{sym} 买入金额 ${est_cost:.4f} 小于最小成交额 ${float(min_cost):.4f}") - return sym, qty, ask, est_cost - - -def prepare_sell_quantity(ex, symbol: str, free_qty: float): - sym, market, ticker = market_and_ticker(ex, symbol) - bid = float(ticker.get("bid") or ticker.get("last") or 0) - if bid <= 0: - raise RuntimeError(f"{sym} 无法获取有效 bid 价格") - qty = float(ex.amount_to_precision(sym, free_qty)) - min_amt = (market.get("limits", {}).get("amount", {}) or {}).get("min") or 0 - min_cost = (market.get("limits", {}).get("cost", {}) or {}).get("min") or 0 - if min_amt and qty < float(min_amt): - raise RuntimeError(f"{sym} 卖出数量 {qty} 小于最小数量 {min_amt}") - est_cost = qty * bid - if min_cost and est_cost < float(min_cost): - raise RuntimeError(f"{sym} 卖出金额 ${est_cost:.4f} 小于最小成交额 ${float(min_cost):.4f}") - return sym, qty, bid, est_cost - - -def upsert_position(positions: list, position: dict): - sym = position["symbol"] - for i, existing in enumerate(positions): - if existing.get("symbol") == sym: - positions[i] = position - return positions - positions.append(position) - return positions - - -def reconcile_positions_with_exchange(ex, positions: list): - balances = fetch_balances(ex) - existing_by_symbol = {p.get("symbol"): p for p in positions} - reconciled = [] - for asset, qty in balances.items(): - if asset == "USDT": - continue - if qty <= 0: - continue - sym = f"{asset}USDT" - old = existing_by_symbol.get(sym, {}) - reconciled.append({ - "account_id": old.get("account_id", "binance-main"), - "symbol": sym, - "base_asset": asset, - "quote_asset": "USDT", - "market_type": "spot", - "quantity": qty, - "avg_cost": old.get("avg_cost"), - "opened_at": old.get("opened_at", bj_now_iso()), - "updated_at": bj_now_iso(), - "note": old.get("note", "Reconciled from Binance balances"), - }) - save_positions(reconciled) - return reconciled, balances - - -def record_execution_state(decision_id: str, payload: dict): - executions = load_executions() - executions[decision_id] = payload - save_executions(executions) - - -def get_execution_state(decision_id: str): - return load_executions().get(decision_id) - - -def build_decision_context(ex, action: str, argv_tail: list[str], decision_id: str): - balances = fetch_balances(ex) - positions = load_positions() - return { - "decision_id": decision_id, - "balances_before": balances, - "positions_before": positions, - "decision": action.upper(), - "action_taken": f"{action} {' '.join(argv_tail)}".strip(), - "risk_level": "high" if len(positions) <= 1 else "medium", - "data_sources": ["binance"], - } - - -def market_sell(ex, symbol: str, qty: float, decision_id: str): - sym, qty, bid, est_cost = prepare_sell_quantity(ex, symbol, qty) - if DRY_RUN: - log(f"[DRY RUN] 卖出 {sym} 数量 {qty}") - return {"id": f"dry-sell-{decision_id}", "symbol": sym, "amount": qty, "price": bid, "cost": est_cost, "status": "closed"} - order = ex.create_market_sell_order(sym, qty, params={"newClientOrderId": f"ch-{decision_id}-sell"}) - return order - - -def market_buy(ex, symbol: str, amount_usdt: float, decision_id: str): - sym, qty, ask, est_cost = prepare_buy_quantity(ex, symbol, amount_usdt) - if DRY_RUN: - log(f"[DRY RUN] 买入 {sym} 金额 ${est_cost:.4f} 数量 {qty}") - return {"id": f"dry-buy-{decision_id}", "symbol": sym, "amount": qty, "price": ask, "cost": est_cost, "status": "closed"} - order = ex.create_market_buy_order(sym, qty, params={"newClientOrderId": f"ch-{decision_id}-buy"}) - return order - - -def action_sell_all(ex, symbol: str, decision_id: str, decision_context: dict): - balances_before = fetch_balances(ex) - base = norm_symbol(symbol).split("/")[0] - qty = float(balances_before.get(base, 0)) - if qty <= 0: - raise RuntimeError(f"{base} 余额为0,无法卖出") - order = market_sell(ex, symbol, qty, decision_id) - positions_after, balances_after = reconcile_positions_with_exchange(ex, load_positions()) if not DRY_RUN else (load_positions(), balances_before) - log_trade( - "SELL_ALL", norm_symbol(symbol), qty=order.get("amount"), price=order.get("price"), - amount_usdt=order.get("cost"), note="Smart executor sell_all", - decision_id=decision_id, order_id=order.get("id"), status=order.get("status"), - balances_before=balances_before, balances_after=balances_after, - ) - log_decision({ - **decision_context, - "balances_after": balances_after, - "positions_after": positions_after, - "execution_result": {"order": order}, - "analysis": decision_context.get("analysis", ""), - "reasoning": decision_context.get("reasoning", "sell_all execution"), - }) - return order - - -def action_buy(ex, symbol: str, amount_usdt: float, decision_id: str, decision_context: dict): - balances_before = fetch_balances(ex) - usdt = float(balances_before.get("USDT", 0)) - if usdt < amount_usdt: - raise RuntimeError(f"USDT 余额不足(${usdt:.4f} < ${amount_usdt:.4f})") - order = market_buy(ex, symbol, amount_usdt, decision_id) - positions_existing = load_positions() - sym_store = storage_symbol(symbol) - price = float(order.get("price") or 0) - qty = float(order.get("amount") or 0) - position = { - "account_id": "binance-main", - "symbol": sym_store, - "base_asset": norm_symbol(symbol).split("/")[0], - "quote_asset": "USDT", - "market_type": "spot", - "quantity": qty, - "avg_cost": price, - "opened_at": bj_now_iso(), - "updated_at": bj_now_iso(), - "note": "Smart executor entry", - } - upsert_position(positions_existing, position) - if DRY_RUN: - save_positions(positions_existing) - balances_after = balances_before - positions_after = positions_existing - else: - positions_after, balances_after = reconcile_positions_with_exchange(ex, positions_existing) - for p in positions_after: - if p["symbol"] == sym_store and price: - p["avg_cost"] = price - p["updated_at"] = bj_now_iso() - save_positions(positions_after) - log_trade( - "BUY", norm_symbol(symbol), qty=qty, amount_usdt=order.get("cost"), price=price, - note="Smart executor buy", decision_id=decision_id, order_id=order.get("id"), - status=order.get("status"), balances_before=balances_before, balances_after=balances_after, - ) - log_decision({ - **decision_context, - "balances_after": balances_after, - "positions_after": positions_after, - "execution_result": {"order": order}, - "analysis": decision_context.get("analysis", ""), - "reasoning": decision_context.get("reasoning", "buy execution"), - }) - return order - - -def action_rebalance(ex, from_symbol: str, to_symbol: str, decision_id: str, decision_context: dict): - sell_order = action_sell_all(ex, from_symbol, decision_id + "s", decision_context) - balances = fetch_balances(ex) - usdt = float(balances.get("USDT", 0)) - spend = usdt * (1 - USDT_BUFFER_PCT) - if spend < 5: - raise RuntimeError(f"卖出后 USDT ${usdt:.4f} 不足,无法买入新币") - buy_order = action_buy(ex, to_symbol, spend, decision_id + "b", decision_context) - return {"sell": sell_order, "buy": buy_order} - - -def build_parser() -> argparse.ArgumentParser: - parser = argparse.ArgumentParser( - description="Coin Hunter Smart Executor", - formatter_class=argparse.RawTextHelpFormatter, - epilog=( - "示例:\n" - " python smart_executor.py hold\n" - " python smart_executor.py sell-all ETHUSDT\n" - " python smart_executor.py buy ENJUSDT 100\n" - " python smart_executor.py rebalance PEPEUSDT ETHUSDT\n" - " python smart_executor.py balances\n\n" - "兼容旧调用:\n" - " python smart_executor.py HOLD\n" - " python smart_executor.py --decision HOLD --dry-run\n" - ), - ) - parser.add_argument("--decision-id", help="Override decision id (otherwise derived automatically)") - parser.add_argument("--analysis", help="Decision analysis text to persist into logs") - parser.add_argument("--reasoning", help="Decision reasoning text to persist into logs") - parser.add_argument("--dry-run", action="store_true", help="Force dry-run mode for this invocation") - - subparsers = parser.add_subparsers(dest="command") - - subparsers.add_parser("hold", help="Log a HOLD decision without trading") - subparsers.add_parser("balances", help="Print live balances as JSON") - subparsers.add_parser("balance", help="Alias of balances") - subparsers.add_parser("status", help="Print balances + positions + snapshot as JSON") - - sell_all = subparsers.add_parser("sell-all", help="Sell all of one symbol") - sell_all.add_argument("symbol") - sell_all_legacy = subparsers.add_parser("sell_all", help=argparse.SUPPRESS) - sell_all_legacy.add_argument("symbol") - - buy = subparsers.add_parser("buy", help="Buy symbol with USDT amount") - buy.add_argument("symbol") - buy.add_argument("amount_usdt", type=float) - - rebalance = subparsers.add_parser("rebalance", help="Sell one symbol and rotate to another") - rebalance.add_argument("from_symbol") - rebalance.add_argument("to_symbol") - - return parser - - -def normalize_legacy_argv(argv: list[str]) -> list[str]: - if not argv: - return argv - - action_aliases = { - "HOLD": ["hold"], - "hold": ["hold"], - "SELL_ALL": ["sell-all"], - "sell_all": ["sell-all"], - "sell-all": ["sell-all"], - "BUY": ["buy"], - "buy": ["buy"], - "REBALANCE": ["rebalance"], - "rebalance": ["rebalance"], - "BALANCE": ["balances"], - "balance": ["balances"], - "BALANCES": ["balances"], - "balances": ["balances"], - "STATUS": ["status"], - "status": ["status"], - } - - for idx, token in enumerate(argv): - if token in action_aliases: - prefix = argv[:idx] - suffix = argv[idx + 1:] - return prefix + action_aliases[token] + suffix - - if argv[0].startswith("-"): - legacy = argparse.ArgumentParser(add_help=False) - legacy.add_argument("--decision") - legacy.add_argument("--symbol") - legacy.add_argument("--from-symbol") - legacy.add_argument("--to-symbol") - legacy.add_argument("--amount-usdt", type=float) - legacy.add_argument("--decision-id") - legacy.add_argument("--analysis") - legacy.add_argument("--reasoning") - legacy.add_argument("--dry-run", action="store_true") - ns, unknown = legacy.parse_known_args(argv) - - if ns.decision: - decision = (ns.decision or "").strip().upper() - rebuilt = [] - if ns.decision_id: - rebuilt += ["--decision-id", ns.decision_id] - if ns.analysis: - rebuilt += ["--analysis", ns.analysis] - if ns.reasoning: - rebuilt += ["--reasoning", ns.reasoning] - if ns.dry_run: - rebuilt += ["--dry-run"] - - if decision == "HOLD": - rebuilt += ["hold"] - elif decision == "SELL_ALL": - if not ns.symbol: - raise RuntimeError("旧式 --decision SELL_ALL 需要搭配 --symbol") - rebuilt += ["sell-all", ns.symbol] - elif decision == "BUY": - if not ns.symbol or ns.amount_usdt is None: - raise RuntimeError("旧式 --decision BUY 需要 --symbol 和 --amount-usdt") - rebuilt += ["buy", ns.symbol, str(ns.amount_usdt)] - elif decision == "REBALANCE": - if not ns.from_symbol or not ns.to_symbol: - raise RuntimeError("旧式 --decision REBALANCE 需要 --from-symbol 和 --to-symbol") - rebuilt += ["rebalance", ns.from_symbol, ns.to_symbol] - else: - raise RuntimeError(f"不支持的旧式 decision: {decision}") - - return rebuilt + unknown - - return argv - - -def parse_cli_args(argv: list[str]): - parser = build_parser() - normalized = normalize_legacy_argv(argv) - args = parser.parse_args(normalized) - if not args.command: - parser.print_help() - raise SystemExit(1) - if args.command == "sell_all": - args.command = "sell-all" - return args, normalized - - -def command_status(ex): - balances = fetch_balances(ex) - positions = load_positions() - market_snapshot = build_market_snapshot(ex) - payload = { - "balances": balances, - "positions": positions, - "market_snapshot": market_snapshot, - } - print(json.dumps(payload, ensure_ascii=False, indent=2)) - return payload - - -def command_balances(ex): - balances = fetch_balances(ex) - print(json.dumps({"balances": balances}, ensure_ascii=False, indent=2)) - return balances - - -def cli_action_args(args, action: str) -> list[str]: - if action == "sell_all": - return [args.symbol] - if action == "buy": - return [args.symbol, str(args.amount_usdt)] - if action == "rebalance": - return [args.from_symbol, args.to_symbol] - return [] - - -def main(): - from .services.smart_executor_service import run - - return run(sys.argv[1:]) +def main(argv=None): + return _run_service(argv) if __name__ == "__main__": - main() + raise SystemExit(main())