diff --git a/README.md b/README.md index 4ac5b68..78b5852 100644 --- a/README.md +++ b/README.md @@ -168,6 +168,20 @@ coinhunter external-gate coinhunter rotate-external-gate-log ``` +Supported top-level commands and aliases remain: + +- `check-api` / `api-check` +- `doctor` / `diag` +- `external-gate` / `gate` +- `init` +- `market-probe` / `probe` +- `paths` +- `precheck` +- `review-context` / `review` +- `review-engine` / `recap` +- `rotate-external-gate-log` / `rotate-gate-log` / `rotate-log` +- `smart-executor` / `exec` + ## Runtime model Default layout: diff --git a/src/coinhunter/auto_trader.py b/src/coinhunter/auto_trader.py deleted file mode 100755 index f37ff92..0000000 --- a/src/coinhunter/auto_trader.py +++ /dev/null @@ -1,289 +0,0 @@ -#!/usr/bin/env python3 -""" -Coin Hunter Auto Trader -全自动妖币猎人 + 币安执行器 - -运行前请在 ~/.hermes/.env 配置: - BINANCE_API_KEY=你的API_KEY - BINANCE_API_SECRET=你的API_SECRET - -首次运行建议用 DRY_RUN=True 测试逻辑。 -""" -import json -import os -import sys -import time -from datetime import datetime, timezone, timedelta -from pathlib import Path - -import ccxt - -from .runtime import get_runtime_paths, load_env_file - -# ============== 配置 ============== -PATHS = get_runtime_paths() -COINS_DIR = PATHS.root -POSITIONS_FILE = PATHS.positions_file -ENV_FILE = PATHS.env_file - -CST = timezone(timedelta(hours=8)) - -# 风控参数 -DRY_RUN = os.getenv("DRY_RUN", "true").lower() == "true" # 默认测试模式 -MAX_POSITIONS = 2 # 最大同时持仓数 - -# 资金配置(根据总资产动态计算) -CAPITAL_ALLOCATION_PCT = 0.95 # 用总资产的95%玩这个策略(留5%缓冲给手续费和滑点) -MIN_POSITION_USDT = 50 # 单次最小下单金额(避免过小) - -MIN_VOLUME_24H = 1_000_000 # 最小24h成交额 ($) -MIN_PRICE_CHANGE_24H = 0.05 # 最小涨幅 5% -MAX_PRICE = 1.0 # 只玩低价币(meme特征) -STOP_LOSS_PCT = -0.07 # 止损 -7% -TAKE_PROFIT_1_PCT = 0.15 # 止盈1 +15% -TAKE_PROFIT_2_PCT = 0.30 # 止盈2 +30% -BLACKLIST = {"USDC", "BUSD", "TUSD", "FDUSD", "USTC", "PAXG", "XRP", "ETH", "BTC"} - -# ============== 工具函数 ============== -def log(msg: str): - print(f"[{datetime.now(CST).strftime('%Y-%m-%d %H:%M:%S')} CST] {msg}") - - -def load_positions() -> list: - if POSITIONS_FILE.exists(): - return json.loads(POSITIONS_FILE.read_text(encoding="utf-8")).get("positions", []) - return [] - - -def save_positions(positions: list): - COINS_DIR.mkdir(parents=True, exist_ok=True) - POSITIONS_FILE.write_text(json.dumps({"positions": positions}, indent=2, ensure_ascii=False), encoding="utf-8") - - -def load_env(): - load_env_file(PATHS) - - -def calculate_position_size(total_usdt: float, available_usdt: float, open_slots: int) -> float: - """ - 根据总资产动态计算每次下单金额。 - 逻辑:先确定策略总上限,再按剩余开仓位均分。 - """ - strategy_cap = total_usdt * CAPITAL_ALLOCATION_PCT - # 已用于策略的资金约等于总上限 − 可用余额 - used_in_strategy = max(0, strategy_cap - available_usdt) - remaining_strategy_cap = max(0, strategy_cap - used_in_strategy) - - if open_slots <= 0 or remaining_strategy_cap < MIN_POSITION_USDT: - return 0 - - size = remaining_strategy_cap / open_slots - # 同时不能超过当前可用余额 - size = min(size, available_usdt) - # 四舍五入到整数 - size = max(0, round(size, 2)) - return size if size >= MIN_POSITION_USDT else 0 - - -# ============== 币安客户端 ============== -class BinanceTrader: - def __init__(self): - api_key = os.getenv("BINANCE_API_KEY") - secret = os.getenv("BINANCE_API_SECRET") - if not api_key or not secret: - raise RuntimeError("缺少 BINANCE_API_KEY 或 BINANCE_API_SECRET,请配置 ~/.hermes/.env") - self.exchange = ccxt.binance({ - "apiKey": api_key, - "secret": secret, - "options": {"defaultType": "spot"}, - "enableRateLimit": True, - }) - self.exchange.load_markets() - - def get_balance(self, asset: str = "USDT") -> float: - bal = self.exchange.fetch_balance()["free"].get(asset, 0) - return float(bal) - - def fetch_tickers(self) -> dict: - return self.exchange.fetch_tickers() - - def create_market_buy_order(self, symbol: str, amount_usdt: float): - if DRY_RUN: - log(f"[DRY RUN] 模拟买入 {symbol},金额 ${amount_usdt}") - return {"id": "dry-run-buy", "price": None, "amount": amount_usdt} - ticker = self.exchange.fetch_ticker(symbol) - price = float(ticker["last"]) - qty = amount_usdt / price - order = self.exchange.create_market_buy_order(symbol, qty) - log(f"✅ 买入 {symbol} | 数量 {qty:.4f} | 价格 ~${price}") - return order - - def create_market_sell_order(self, symbol: str, qty: float): - if DRY_RUN: - log(f"[DRY RUN] 模拟卖出 {symbol},数量 {qty}") - return {"id": "dry-run-sell"} - order = self.exchange.create_market_sell_order(symbol, qty) - log(f"✅ 卖出 {symbol} | 数量 {qty:.4f}") - return order - - -# ============== 选币引擎 ============== -class CoinPicker: - def __init__(self, exchange: ccxt.binance): - self.exchange = exchange - - def scan(self) -> list: - tickers = self.exchange.fetch_tickers() - candidates = [] - for symbol, t in tickers.items(): - if not symbol.endswith("/USDT"): - continue - base = symbol.replace("/USDT", "") - if base in BLACKLIST: - continue - - price = float(t["last"] or 0) - change = float(t.get("percentage", 0)) / 100 - volume = float(t.get("quoteVolume", 0)) - - if price <= 0 or price > MAX_PRICE: - continue - if volume < MIN_VOLUME_24H: - continue - if change < MIN_PRICE_CHANGE_24H: - continue - - score = change * (volume / MIN_VOLUME_24H) - candidates.append({ - "symbol": symbol, - "base": base, - "price": price, - "change_24h": change, - "volume_24h": volume, - "score": score, - }) - - candidates.sort(key=lambda x: x["score"], reverse=True) - return candidates[:5] - - -# ============== 主控制器 ============== -def run_cycle(): - load_env() - trader = BinanceTrader() - picker = CoinPicker(trader.exchange) - positions = load_positions() - - log(f"当前持仓数: {len(positions)} | 最大允许: {MAX_POSITIONS} | DRY_RUN={DRY_RUN}") - - # 1. 检查现有持仓(止盈止损) - tickers = trader.fetch_tickers() - new_positions = [] - for pos in positions: - sym = pos["symbol"] - qty = float(pos["quantity"]) - cost = float(pos["avg_cost"]) - # ccxt tickers 使用 slash 格式,如 PENGU/USDT - sym_ccxt = sym.replace("USDT", "/USDT") if "/" not in sym else sym - ticker = tickers.get(sym_ccxt) - if not ticker: - new_positions.append(pos) - continue - - price = float(ticker["last"]) - pnl_pct = (price - cost) / cost - log(f"监控 {sym} | 现价 ${price:.8f} | 成本 ${cost:.8f} | 盈亏 {pnl_pct:+.2%}") - - action = None - if pnl_pct <= STOP_LOSS_PCT: - action = "STOP_LOSS" - elif pnl_pct >= TAKE_PROFIT_2_PCT: - action = "TAKE_PROFIT_2" - elif pnl_pct >= TAKE_PROFIT_1_PCT: - # 检查是否已经止盈过一部分 - sold_pct = float(pos.get("take_profit_1_sold_pct", 0)) - if sold_pct == 0: - action = "TAKE_PROFIT_1" - - if action == "STOP_LOSS": - trader.create_market_sell_order(sym, qty) - log(f"🛑 {sym} 触发止损,全部清仓") - continue - - if action == "TAKE_PROFIT_1": - sell_qty = qty * 0.5 - trader.create_market_sell_order(sym, sell_qty) - pos["quantity"] = qty - sell_qty - pos["take_profit_1_sold_pct"] = 50 - pos["updated_at"] = datetime.now(CST).isoformat() - log(f"🎯 {sym} 触发止盈1,卖出50%,剩余 {pos['quantity']:.4f}") - new_positions.append(pos) - continue - - if action == "TAKE_PROFIT_2": - trader.create_market_sell_order(sym, float(pos["quantity"])) - log(f"🚀 {sym} 触发止盈2,全部清仓") - continue - - new_positions.append(pos) - - # 2. 开新仓 - if len(new_positions) < MAX_POSITIONS: - candidates = picker.scan() - held_bases = {p["base_asset"] for p in new_positions} - total_usdt = trader.get_balance("USDT") - # 计算持仓市值并加入总资产 - for pos in new_positions: - sym_ccxt = pos["symbol"].replace("USDT", "/USDT") if "/" not in pos["symbol"] else pos["symbol"] - ticker = tickers.get(sym_ccxt) - if ticker: - total_usdt += float(pos["quantity"]) * float(ticker["last"]) - - available_usdt = trader.get_balance("USDT") - open_slots = MAX_POSITIONS - len(new_positions) - position_size = calculate_position_size(total_usdt, available_usdt, open_slots) - - log(f"总资产 USDT: ${total_usdt:.2f} | 策略上限({CAPITAL_ALLOCATION_PCT:.0%}): ${total_usdt*CAPITAL_ALLOCATION_PCT:.2f} | 每仓建议金额: ${position_size:.2f}") - - for cand in candidates: - if len(new_positions) >= MAX_POSITIONS: - break - base = cand["base"] - if base in held_bases: - continue - if position_size <= 0: - log("策略资金已用完或余额不足,停止开新仓") - break - - symbol = cand["symbol"] - order = trader.create_market_buy_order(symbol, position_size) - avg_price = float(order.get("price") or cand["price"]) - qty = position_size / avg_price if avg_price else 0 - - new_positions.append({ - "account_id": "binance-main", - "symbol": symbol.replace("/", ""), - "base_asset": base, - "quote_asset": "USDT", - "market_type": "spot", - "quantity": qty, - "avg_cost": avg_price, - "opened_at": datetime.now(CST).isoformat(), - "updated_at": datetime.now(CST).isoformat(), - "note": "Auto-trader entry", - }) - held_bases.add(base) - available_usdt -= position_size - position_size = calculate_position_size(total_usdt, available_usdt, MAX_POSITIONS - len(new_positions)) - log(f"📈 新开仓 {symbol} | 买入价 ${avg_price:.8f} | 数量 {qty:.2f}") - - save_positions(new_positions) - log("周期结束,持仓已保存") - - -if __name__ == "__main__": - try: - run_cycle() - except Exception as e: - log(f"❌ 错误: {e}") - sys.exit(1) diff --git a/src/coinhunter/check_api.py b/src/coinhunter/check_api.py index b6308bb..15dbe7f 100755 --- a/src/coinhunter/check_api.py +++ b/src/coinhunter/check_api.py @@ -1,26 +1,8 @@ -#!/usr/bin/env python3 -"""检查自动交易的环境配置是否就绪""" -import os +"""Backward-compatible facade for check_api.""" -from .runtime import load_env_file - - -def main(): - load_env_file() - - api_key = os.getenv("BINANCE_API_KEY", "") - secret = os.getenv("BINANCE_API_SECRET", "") - - if not api_key or api_key.startswith("***") or api_key.startswith("your_"): - print("❌ 未配置 BINANCE_API_KEY") - return 1 - if not secret or secret.startswith("***") or secret.startswith("your_"): - print("❌ 未配置 BINANCE_API_SECRET") - return 1 - - print("✅ API 配置正常") - return 0 +from __future__ import annotations +from .commands.check_api import main if __name__ == "__main__": raise SystemExit(main()) diff --git a/src/coinhunter/cli.py b/src/coinhunter/cli.py index 9c995fa..46140d0 100755 --- a/src/coinhunter/cli.py +++ b/src/coinhunter/cli.py @@ -9,18 +9,17 @@ import sys from . import __version__ MODULE_MAP = { - "check-api": "check_api", - "doctor": "doctor", - "external-gate": "external_gate", - "init": "init_user_state", - "market-probe": "market_probe", - "paths": "paths", + "check-api": "commands.check_api", + "doctor": "commands.doctor", + "external-gate": "commands.external_gate", + "init": "commands.init_user_state", + "market-probe": "commands.market_probe", + "paths": "commands.paths", "precheck": "commands.precheck", "review-context": "review_context", "review-engine": "review_engine", - "rotate-external-gate-log": "rotate_external_gate_log", + "rotate-external-gate-log": "commands.rotate_external_gate_log", "smart-executor": "commands.smart_executor", - "auto-trader": "auto_trader", } ALIASES = { @@ -47,7 +46,6 @@ COMMAND_HELP = [ ("recap", "review-engine", "Generate review recap/engine output"), ("rotate-gate-log, rotate-log", "rotate-external-gate-log", "Rotate external gate logs"), ("exec", "smart-executor", "Trading and execution actions"), - ("auto-trader", None, "Auto trader entrypoint"), ] diff --git a/src/coinhunter/commands/check_api.py b/src/coinhunter/commands/check_api.py new file mode 100755 index 0000000..8f1e533 --- /dev/null +++ b/src/coinhunter/commands/check_api.py @@ -0,0 +1,26 @@ +#!/usr/bin/env python3 +"""检查自动交易的环境配置是否就绪""" +import os + +from ..runtime import load_env_file + + +def main(): + load_env_file() + + api_key = os.getenv("BINANCE_API_KEY", "") + secret = os.getenv("BINANCE_API_SECRET", "") + + if not api_key or api_key.startswith("***") or api_key.startswith("your_"): + print("❌ 未配置 BINANCE_API_KEY") + return 1 + if not secret or secret.startswith("***") or secret.startswith("your_"): + print("❌ 未配置 BINANCE_API_SECRET") + return 1 + + print("✅ API 配置正常") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/src/coinhunter/commands/doctor.py b/src/coinhunter/commands/doctor.py new file mode 100644 index 0000000..ce1d220 --- /dev/null +++ b/src/coinhunter/commands/doctor.py @@ -0,0 +1,66 @@ +"""Runtime diagnostics for CoinHunter CLI.""" + +from __future__ import annotations + +import json +import os +import platform +import shutil +import sys + +from ..runtime import ensure_runtime_dirs, get_runtime_paths, load_env_file, resolve_hermes_executable + + +REQUIRED_ENV_VARS = ["BINANCE_API_KEY", "BINANCE_API_SECRET"] + + +def main() -> int: + paths = ensure_runtime_dirs(get_runtime_paths()) + env_file = load_env_file(paths) + hermes_executable = resolve_hermes_executable(paths) + + env_checks = {} + missing_env = [] + for name in REQUIRED_ENV_VARS: + present = bool(os.getenv(name)) + env_checks[name] = present + if not present: + missing_env.append(name) + + file_checks = { + "env_file_exists": env_file.exists(), + "config_exists": paths.config_file.exists(), + "positions_exists": paths.positions_file.exists(), + "logrotate_config_exists": paths.logrotate_config.exists(), + } + dir_checks = { + "root_exists": paths.root.exists(), + "state_dir_exists": paths.state_dir.exists(), + "logs_dir_exists": paths.logs_dir.exists(), + "reviews_dir_exists": paths.reviews_dir.exists(), + "cache_dir_exists": paths.cache_dir.exists(), + } + command_checks = { + "hermes": bool(shutil.which("hermes") or paths.hermes_bin.exists()), + "logrotate": bool(shutil.which("logrotate") or shutil.which("/usr/sbin/logrotate")), + } + + report = { + "ok": not missing_env, + "python": sys.version.split()[0], + "platform": platform.platform(), + "env_file": str(env_file), + "hermes_executable": hermes_executable, + "paths": paths.as_dict(), + "env_checks": env_checks, + "missing_env": missing_env, + "file_checks": file_checks, + "dir_checks": dir_checks, + "command_checks": command_checks, + } + print(json.dumps(report, ensure_ascii=False, indent=2)) + return 0 if report["ok"] else 1 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/src/coinhunter/commands/external_gate.py b/src/coinhunter/commands/external_gate.py new file mode 100755 index 0000000..154f099 --- /dev/null +++ b/src/coinhunter/commands/external_gate.py @@ -0,0 +1,82 @@ +#!/usr/bin/env python3 +import fcntl +import json +import subprocess +import sys +from datetime import datetime, timezone + +from ..runtime import ensure_runtime_dirs, get_runtime_paths, resolve_hermes_executable + +PATHS = get_runtime_paths() +STATE_DIR = PATHS.state_dir +LOCK_FILE = PATHS.external_gate_lock +COINHUNTER_MODULE = [sys.executable, "-m", "coinhunter"] +TRADE_JOB_ID = "4e6593fff158" + + +def utc_now(): + return datetime.now(timezone.utc).isoformat() + + +def log(message: str): + print(f"[{utc_now()}] {message}") + + +def run_cmd(args: list[str]) -> subprocess.CompletedProcess: + return subprocess.run(args, capture_output=True, text=True) + + +def parse_json_output(text: str) -> dict: + text = (text or "").strip() + if not text: + return {} + return json.loads(text) + + +def main(): + ensure_runtime_dirs(PATHS) + with open(LOCK_FILE, "w", encoding="utf-8") as lockf: + try: + fcntl.flock(lockf.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) + except BlockingIOError: + log("gate already running; skip") + return 0 + + precheck = run_cmd(COINHUNTER_MODULE + ["precheck"]) + if precheck.returncode != 0: + log(f"precheck returned non-zero ({precheck.returncode}); stdout={precheck.stdout.strip()} stderr={precheck.stderr.strip()}") + return 1 + + try: + data = parse_json_output(precheck.stdout) + except Exception as e: + log(f"failed to parse precheck JSON: {e}; raw={precheck.stdout.strip()[:1000]}") + return 1 + + if not data.get("should_analyze"): + log("no trigger; skip model run") + return 0 + + if data.get("run_requested"): + log(f"trigger already queued at {data.get('run_requested_at')}; skip duplicate") + return 0 + + mark = run_cmd(COINHUNTER_MODULE + ["precheck", "--mark-run-requested", "external-gate queued cron run"]) + if mark.returncode != 0: + log(f"failed to mark run requested; stdout={mark.stdout.strip()} stderr={mark.stderr.strip()}") + return 1 + + trigger = run_cmd([resolve_hermes_executable(PATHS), "cron", "run", TRADE_JOB_ID]) + if trigger.returncode != 0: + log(f"failed to trigger trade cron job; stdout={trigger.stdout.strip()} stderr={trigger.stderr.strip()}") + return 1 + + reasons = ", ".join(data.get("reasons", [])) or "unknown" + log(f"queued trade job {TRADE_JOB_ID}; reasons={reasons}") + if trigger.stdout.strip(): + log(trigger.stdout.strip()) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/src/coinhunter/commands/init_user_state.py b/src/coinhunter/commands/init_user_state.py new file mode 100755 index 0000000..ca95724 --- /dev/null +++ b/src/coinhunter/commands/init_user_state.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python3 +import json +from datetime import datetime, timezone +from pathlib import Path + +from ..runtime import ensure_runtime_dirs, get_runtime_paths + +PATHS = get_runtime_paths() +ROOT = PATHS.root +CACHE_DIR = PATHS.cache_dir + + +def now_iso(): + return datetime.now(timezone.utc).replace(microsecond=0).isoformat() + + +def ensure_file(path: Path, payload: dict): + if path.exists(): + return False + path.write_text(json.dumps(payload, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") + return True + + +def main(): + ensure_runtime_dirs(PATHS) + + created = [] + ts = now_iso() + + templates = { + ROOT / "config.json": { + "default_exchange": "bybit", + "default_quote_currency": "USDT", + "timezone": "Asia/Shanghai", + "preferred_chains": ["solana", "base"], + "created_at": ts, + "updated_at": ts, + }, + ROOT / "accounts.json": { + "accounts": [] + }, + ROOT / "positions.json": { + "positions": [] + }, + ROOT / "watchlist.json": { + "watchlist": [] + }, + ROOT / "notes.json": { + "notes": [] + }, + } + + for path, payload in templates.items(): + if ensure_file(path, payload): + created.append(str(path)) + + print(json.dumps({ + "root": str(ROOT), + "created": created, + "cache_dir": str(CACHE_DIR), + }, ensure_ascii=False, indent=2)) + + +if __name__ == "__main__": + main() diff --git a/src/coinhunter/commands/market_probe.py b/src/coinhunter/commands/market_probe.py new file mode 100755 index 0000000..2e41344 --- /dev/null +++ b/src/coinhunter/commands/market_probe.py @@ -0,0 +1,243 @@ +#!/usr/bin/env python3 +import argparse +import json +import os +import sys +import urllib.parse +import urllib.request + +DEFAULT_TIMEOUT = 20 + + +def fetch_json(url, headers=None, timeout=DEFAULT_TIMEOUT): + merged_headers = { + "Accept": "application/json", + "User-Agent": "Mozilla/5.0 (compatible; OpenClaw Coin Hunter/1.0)", + } + if headers: + merged_headers.update(headers) + req = urllib.request.Request(url, headers=merged_headers) + with urllib.request.urlopen(req, timeout=timeout) as resp: + data = resp.read() + return json.loads(data.decode("utf-8")) + + +def print_json(data): + print(json.dumps(data, ensure_ascii=False, indent=2)) + + +def bybit_ticker(symbol: str): + url = ( + "https://api.bybit.com/v5/market/tickers?category=spot&symbol=" + + urllib.parse.quote(symbol.upper()) + ) + payload = fetch_json(url) + items = payload.get("result", {}).get("list", []) + if not items: + raise SystemExit(f"No Bybit spot ticker found for {symbol}") + item = items[0] + out = { + "provider": "bybit", + "symbol": symbol.upper(), + "lastPrice": item.get("lastPrice"), + "price24hPcnt": item.get("price24hPcnt"), + "highPrice24h": item.get("highPrice24h"), + "lowPrice24h": item.get("lowPrice24h"), + "turnover24h": item.get("turnover24h"), + "volume24h": item.get("volume24h"), + "bid1Price": item.get("bid1Price"), + "ask1Price": item.get("ask1Price"), + } + print_json(out) + + +def bybit_klines(symbol: str, interval: str, limit: int): + params = urllib.parse.urlencode({ + "category": "spot", + "symbol": symbol.upper(), + "interval": interval, + "limit": str(limit), + }) + url = f"https://api.bybit.com/v5/market/kline?{params}" + payload = fetch_json(url) + rows = payload.get("result", {}).get("list", []) + out = { + "provider": "bybit", + "symbol": symbol.upper(), + "interval": interval, + "candles": [ + { + "startTime": r[0], + "open": r[1], + "high": r[2], + "low": r[3], + "close": r[4], + "volume": r[5], + "turnover": r[6], + } + for r in rows + ], + } + print_json(out) + + +def dexscreener_search(query: str): + url = "https://api.dexscreener.com/latest/dex/search/?q=" + urllib.parse.quote(query) + payload = fetch_json(url) + pairs = payload.get("pairs") or [] + out = [] + for p in pairs[:10]: + out.append({ + "chainId": p.get("chainId"), + "dexId": p.get("dexId"), + "pairAddress": p.get("pairAddress"), + "url": p.get("url"), + "baseToken": p.get("baseToken"), + "quoteToken": p.get("quoteToken"), + "priceUsd": p.get("priceUsd"), + "liquidityUsd": (p.get("liquidity") or {}).get("usd"), + "fdv": p.get("fdv"), + "marketCap": p.get("marketCap"), + "volume24h": (p.get("volume") or {}).get("h24"), + "buys24h": ((p.get("txns") or {}).get("h24") or {}).get("buys"), + "sells24h": ((p.get("txns") or {}).get("h24") or {}).get("sells"), + }) + print_json({"provider": "dexscreener", "query": query, "pairs": out}) + + +def dexscreener_token(chain: str, address: str): + url = f"https://api.dexscreener.com/tokens/v1/{urllib.parse.quote(chain)}/{urllib.parse.quote(address)}" + payload = fetch_json(url) + pairs = payload if isinstance(payload, list) else payload.get("pairs") or [] + out = [] + for p in pairs[:10]: + out.append({ + "chainId": p.get("chainId"), + "dexId": p.get("dexId"), + "pairAddress": p.get("pairAddress"), + "baseToken": p.get("baseToken"), + "quoteToken": p.get("quoteToken"), + "priceUsd": p.get("priceUsd"), + "liquidityUsd": (p.get("liquidity") or {}).get("usd"), + "fdv": p.get("fdv"), + "marketCap": p.get("marketCap"), + "volume24h": (p.get("volume") or {}).get("h24"), + }) + print_json({"provider": "dexscreener", "chain": chain, "address": address, "pairs": out}) + + +def coingecko_search(query: str): + url = "https://api.coingecko.com/api/v3/search?query=" + urllib.parse.quote(query) + payload = fetch_json(url) + coins = payload.get("coins") or [] + out = [] + for c in coins[:10]: + out.append({ + "id": c.get("id"), + "name": c.get("name"), + "symbol": c.get("symbol"), + "marketCapRank": c.get("market_cap_rank"), + "thumb": c.get("thumb"), + }) + print_json({"provider": "coingecko", "query": query, "coins": out}) + + +def coingecko_coin(coin_id: str): + params = urllib.parse.urlencode({ + "localization": "false", + "tickers": "false", + "market_data": "true", + "community_data": "false", + "developer_data": "false", + "sparkline": "false", + }) + url = f"https://api.coingecko.com/api/v3/coins/{urllib.parse.quote(coin_id)}?{params}" + payload = fetch_json(url) + md = payload.get("market_data") or {} + out = { + "provider": "coingecko", + "id": payload.get("id"), + "symbol": payload.get("symbol"), + "name": payload.get("name"), + "marketCapRank": payload.get("market_cap_rank"), + "currentPriceUsd": (md.get("current_price") or {}).get("usd"), + "marketCapUsd": (md.get("market_cap") or {}).get("usd"), + "fullyDilutedValuationUsd": (md.get("fully_diluted_valuation") or {}).get("usd"), + "totalVolumeUsd": (md.get("total_volume") or {}).get("usd"), + "priceChangePercentage24h": md.get("price_change_percentage_24h"), + "priceChangePercentage7d": md.get("price_change_percentage_7d"), + "priceChangePercentage30d": md.get("price_change_percentage_30d"), + "circulatingSupply": md.get("circulating_supply"), + "totalSupply": md.get("total_supply"), + "maxSupply": md.get("max_supply"), + "homepage": (payload.get("links") or {}).get("homepage", [None])[0], + } + print_json(out) + + +def birdeye_token(address: str): + api_key = os.getenv("BIRDEYE_API_KEY") or os.getenv("BIRDEYE_APIKEY") + if not api_key: + raise SystemExit("Birdeye requires BIRDEYE_API_KEY in the environment") + url = "https://public-api.birdeye.so/defi/token_overview?address=" + urllib.parse.quote(address) + payload = fetch_json(url, headers={ + "x-api-key": api_key, + "x-chain": "solana", + }) + print_json({"provider": "birdeye", "address": address, "data": payload.get("data")}) + + +def build_parser(): + parser = argparse.ArgumentParser(description="Coin Hunter market data probe") + sub = parser.add_subparsers(dest="command", required=True) + + p = sub.add_parser("bybit-ticker", help="Fetch Bybit spot ticker") + p.add_argument("symbol") + + p = sub.add_parser("bybit-klines", help="Fetch Bybit spot klines") + p.add_argument("symbol") + p.add_argument("--interval", default="60", help="Bybit interval, e.g. 1, 5, 15, 60, 240, D") + p.add_argument("--limit", type=int, default=10) + + p = sub.add_parser("dex-search", help="Search DexScreener by query") + p.add_argument("query") + + p = sub.add_parser("dex-token", help="Fetch DexScreener token pairs by chain/address") + p.add_argument("chain") + p.add_argument("address") + + p = sub.add_parser("gecko-search", help="Search CoinGecko") + p.add_argument("query") + + p = sub.add_parser("gecko-coin", help="Fetch CoinGecko coin by id") + p.add_argument("coin_id") + + p = sub.add_parser("birdeye-token", help="Fetch Birdeye token overview (Solana)") + p.add_argument("address") + + return parser + + +def main(): + parser = build_parser() + args = parser.parse_args() + if args.command == "bybit-ticker": + bybit_ticker(args.symbol) + elif args.command == "bybit-klines": + bybit_klines(args.symbol, args.interval, args.limit) + elif args.command == "dex-search": + dexscreener_search(args.query) + elif args.command == "dex-token": + dexscreener_token(args.chain, args.address) + elif args.command == "gecko-search": + coingecko_search(args.query) + elif args.command == "gecko-coin": + coingecko_coin(args.coin_id) + elif args.command == "birdeye-token": + birdeye_token(args.address) + else: + parser.error("Unknown command") + + +if __name__ == "__main__": + main() diff --git a/src/coinhunter/commands/paths.py b/src/coinhunter/commands/paths.py new file mode 100644 index 0000000..270bab2 --- /dev/null +++ b/src/coinhunter/commands/paths.py @@ -0,0 +1,16 @@ +"""Print CoinHunter runtime paths.""" + +from __future__ import annotations + +import json + +from ..runtime import get_runtime_paths + + +def main() -> int: + print(json.dumps(get_runtime_paths().as_dict(), ensure_ascii=False, indent=2)) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/src/coinhunter/commands/rotate_external_gate_log.py b/src/coinhunter/commands/rotate_external_gate_log.py new file mode 100755 index 0000000..335fb5f --- /dev/null +++ b/src/coinhunter/commands/rotate_external_gate_log.py @@ -0,0 +1,28 @@ +#!/usr/bin/env python3 +"""Rotate external gate log using the user's logrotate config/state.""" +import shutil +import subprocess + +from ..runtime import ensure_runtime_dirs, get_runtime_paths + +PATHS = get_runtime_paths() +STATE_DIR = PATHS.state_dir +LOGROTATE_STATUS = PATHS.logrotate_status +LOGROTATE_CONF = PATHS.logrotate_config +LOGS_DIR = PATHS.logs_dir + + +def main(): + ensure_runtime_dirs(PATHS) + logrotate_bin = shutil.which("logrotate") or "/usr/sbin/logrotate" + cmd = [logrotate_bin, "-s", str(LOGROTATE_STATUS), str(LOGROTATE_CONF)] + result = subprocess.run(cmd, capture_output=True, text=True) + if result.stdout.strip(): + print(result.stdout.strip()) + if result.stderr.strip(): + print(result.stderr.strip()) + return result.returncode + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/src/coinhunter/doctor.py b/src/coinhunter/doctor.py index fe99c70..24f1497 100644 --- a/src/coinhunter/doctor.py +++ b/src/coinhunter/doctor.py @@ -1,66 +1,8 @@ -"""Runtime diagnostics for CoinHunter CLI.""" +"""Backward-compatible facade for doctor.""" from __future__ import annotations -import json -import os -import platform -import shutil -import sys - -from .runtime import ensure_runtime_dirs, get_runtime_paths, load_env_file, resolve_hermes_executable - - -REQUIRED_ENV_VARS = ["BINANCE_API_KEY", "BINANCE_API_SECRET"] - - -def main() -> int: - paths = ensure_runtime_dirs(get_runtime_paths()) - env_file = load_env_file(paths) - hermes_executable = resolve_hermes_executable(paths) - - env_checks = {} - missing_env = [] - for name in REQUIRED_ENV_VARS: - present = bool(os.getenv(name)) - env_checks[name] = present - if not present: - missing_env.append(name) - - file_checks = { - "env_file_exists": env_file.exists(), - "config_exists": paths.config_file.exists(), - "positions_exists": paths.positions_file.exists(), - "logrotate_config_exists": paths.logrotate_config.exists(), - } - dir_checks = { - "root_exists": paths.root.exists(), - "state_dir_exists": paths.state_dir.exists(), - "logs_dir_exists": paths.logs_dir.exists(), - "reviews_dir_exists": paths.reviews_dir.exists(), - "cache_dir_exists": paths.cache_dir.exists(), - } - command_checks = { - "hermes": bool(shutil.which("hermes") or paths.hermes_bin.exists()), - "logrotate": bool(shutil.which("logrotate") or shutil.which("/usr/sbin/logrotate")), - } - - report = { - "ok": not missing_env, - "python": sys.version.split()[0], - "platform": platform.platform(), - "env_file": str(env_file), - "hermes_executable": hermes_executable, - "paths": paths.as_dict(), - "env_checks": env_checks, - "missing_env": missing_env, - "file_checks": file_checks, - "dir_checks": dir_checks, - "command_checks": command_checks, - } - print(json.dumps(report, ensure_ascii=False, indent=2)) - return 0 if report["ok"] else 1 - +from .commands.doctor import main if __name__ == "__main__": raise SystemExit(main()) diff --git a/src/coinhunter/external_gate.py b/src/coinhunter/external_gate.py index 18f6c42..f5d0c64 100755 --- a/src/coinhunter/external_gate.py +++ b/src/coinhunter/external_gate.py @@ -1,82 +1,8 @@ -#!/usr/bin/env python3 -import fcntl -import json -import subprocess -import sys -from datetime import datetime, timezone +"""Backward-compatible facade for external_gate.""" -from .runtime import ensure_runtime_dirs, get_runtime_paths, resolve_hermes_executable - -PATHS = get_runtime_paths() -STATE_DIR = PATHS.state_dir -LOCK_FILE = PATHS.external_gate_lock -COINHUNTER_MODULE = [sys.executable, "-m", "coinhunter"] -TRADE_JOB_ID = "4e6593fff158" - - -def utc_now(): - return datetime.now(timezone.utc).isoformat() - - -def log(message: str): - print(f"[{utc_now()}] {message}") - - -def run_cmd(args: list[str]) -> subprocess.CompletedProcess: - return subprocess.run(args, capture_output=True, text=True) - - -def parse_json_output(text: str) -> dict: - text = (text or "").strip() - if not text: - return {} - return json.loads(text) - - -def main(): - ensure_runtime_dirs(PATHS) - with open(LOCK_FILE, "w", encoding="utf-8") as lockf: - try: - fcntl.flock(lockf.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) - except BlockingIOError: - log("gate already running; skip") - return 0 - - precheck = run_cmd(COINHUNTER_MODULE + ["precheck"]) - if precheck.returncode != 0: - log(f"precheck returned non-zero ({precheck.returncode}); stdout={precheck.stdout.strip()} stderr={precheck.stderr.strip()}") - return 1 - - try: - data = parse_json_output(precheck.stdout) - except Exception as e: - log(f"failed to parse precheck JSON: {e}; raw={precheck.stdout.strip()[:1000]}") - return 1 - - if not data.get("should_analyze"): - log("no trigger; skip model run") - return 0 - - if data.get("run_requested"): - log(f"trigger already queued at {data.get('run_requested_at')}; skip duplicate") - return 0 - - mark = run_cmd(COINHUNTER_MODULE + ["precheck", "--mark-run-requested", "external-gate queued cron run"]) - if mark.returncode != 0: - log(f"failed to mark run requested; stdout={mark.stdout.strip()} stderr={mark.stderr.strip()}") - return 1 - - trigger = run_cmd([resolve_hermes_executable(PATHS), "cron", "run", TRADE_JOB_ID]) - if trigger.returncode != 0: - log(f"failed to trigger trade cron job; stdout={trigger.stdout.strip()} stderr={trigger.stderr.strip()}") - return 1 - - reasons = ", ".join(data.get("reasons", [])) or "unknown" - log(f"queued trade job {TRADE_JOB_ID}; reasons={reasons}") - if trigger.stdout.strip(): - log(trigger.stdout.strip()) - return 0 +from __future__ import annotations +from .commands.external_gate import main if __name__ == "__main__": raise SystemExit(main()) diff --git a/src/coinhunter/init_user_state.py b/src/coinhunter/init_user_state.py index 420773f..d5925fd 100755 --- a/src/coinhunter/init_user_state.py +++ b/src/coinhunter/init_user_state.py @@ -1,65 +1,8 @@ -#!/usr/bin/env python3 -import json -from datetime import datetime, timezone -from pathlib import Path +"""Backward-compatible facade for init_user_state.""" -from .runtime import ensure_runtime_dirs, get_runtime_paths - -PATHS = get_runtime_paths() -ROOT = PATHS.root -CACHE_DIR = PATHS.cache_dir - - -def now_iso(): - return datetime.now(timezone.utc).replace(microsecond=0).isoformat() - - -def ensure_file(path: Path, payload: dict): - if path.exists(): - return False - path.write_text(json.dumps(payload, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") - return True - - -def main(): - ensure_runtime_dirs(PATHS) - - created = [] - ts = now_iso() - - templates = { - ROOT / "config.json": { - "default_exchange": "bybit", - "default_quote_currency": "USDT", - "timezone": "Asia/Shanghai", - "preferred_chains": ["solana", "base"], - "created_at": ts, - "updated_at": ts, - }, - ROOT / "accounts.json": { - "accounts": [] - }, - ROOT / "positions.json": { - "positions": [] - }, - ROOT / "watchlist.json": { - "watchlist": [] - }, - ROOT / "notes.json": { - "notes": [] - }, - } - - for path, payload in templates.items(): - if ensure_file(path, payload): - created.append(str(path)) - - print(json.dumps({ - "root": str(ROOT), - "created": created, - "cache_dir": str(CACHE_DIR), - }, ensure_ascii=False, indent=2)) +from __future__ import annotations +from .commands.init_user_state import main if __name__ == "__main__": - main() + raise SystemExit(main()) diff --git a/src/coinhunter/market_probe.py b/src/coinhunter/market_probe.py index 2e41344..6cb7504 100755 --- a/src/coinhunter/market_probe.py +++ b/src/coinhunter/market_probe.py @@ -1,243 +1,8 @@ -#!/usr/bin/env python3 -import argparse -import json -import os -import sys -import urllib.parse -import urllib.request +"""Backward-compatible facade for market_probe.""" -DEFAULT_TIMEOUT = 20 - - -def fetch_json(url, headers=None, timeout=DEFAULT_TIMEOUT): - merged_headers = { - "Accept": "application/json", - "User-Agent": "Mozilla/5.0 (compatible; OpenClaw Coin Hunter/1.0)", - } - if headers: - merged_headers.update(headers) - req = urllib.request.Request(url, headers=merged_headers) - with urllib.request.urlopen(req, timeout=timeout) as resp: - data = resp.read() - return json.loads(data.decode("utf-8")) - - -def print_json(data): - print(json.dumps(data, ensure_ascii=False, indent=2)) - - -def bybit_ticker(symbol: str): - url = ( - "https://api.bybit.com/v5/market/tickers?category=spot&symbol=" - + urllib.parse.quote(symbol.upper()) - ) - payload = fetch_json(url) - items = payload.get("result", {}).get("list", []) - if not items: - raise SystemExit(f"No Bybit spot ticker found for {symbol}") - item = items[0] - out = { - "provider": "bybit", - "symbol": symbol.upper(), - "lastPrice": item.get("lastPrice"), - "price24hPcnt": item.get("price24hPcnt"), - "highPrice24h": item.get("highPrice24h"), - "lowPrice24h": item.get("lowPrice24h"), - "turnover24h": item.get("turnover24h"), - "volume24h": item.get("volume24h"), - "bid1Price": item.get("bid1Price"), - "ask1Price": item.get("ask1Price"), - } - print_json(out) - - -def bybit_klines(symbol: str, interval: str, limit: int): - params = urllib.parse.urlencode({ - "category": "spot", - "symbol": symbol.upper(), - "interval": interval, - "limit": str(limit), - }) - url = f"https://api.bybit.com/v5/market/kline?{params}" - payload = fetch_json(url) - rows = payload.get("result", {}).get("list", []) - out = { - "provider": "bybit", - "symbol": symbol.upper(), - "interval": interval, - "candles": [ - { - "startTime": r[0], - "open": r[1], - "high": r[2], - "low": r[3], - "close": r[4], - "volume": r[5], - "turnover": r[6], - } - for r in rows - ], - } - print_json(out) - - -def dexscreener_search(query: str): - url = "https://api.dexscreener.com/latest/dex/search/?q=" + urllib.parse.quote(query) - payload = fetch_json(url) - pairs = payload.get("pairs") or [] - out = [] - for p in pairs[:10]: - out.append({ - "chainId": p.get("chainId"), - "dexId": p.get("dexId"), - "pairAddress": p.get("pairAddress"), - "url": p.get("url"), - "baseToken": p.get("baseToken"), - "quoteToken": p.get("quoteToken"), - "priceUsd": p.get("priceUsd"), - "liquidityUsd": (p.get("liquidity") or {}).get("usd"), - "fdv": p.get("fdv"), - "marketCap": p.get("marketCap"), - "volume24h": (p.get("volume") or {}).get("h24"), - "buys24h": ((p.get("txns") or {}).get("h24") or {}).get("buys"), - "sells24h": ((p.get("txns") or {}).get("h24") or {}).get("sells"), - }) - print_json({"provider": "dexscreener", "query": query, "pairs": out}) - - -def dexscreener_token(chain: str, address: str): - url = f"https://api.dexscreener.com/tokens/v1/{urllib.parse.quote(chain)}/{urllib.parse.quote(address)}" - payload = fetch_json(url) - pairs = payload if isinstance(payload, list) else payload.get("pairs") or [] - out = [] - for p in pairs[:10]: - out.append({ - "chainId": p.get("chainId"), - "dexId": p.get("dexId"), - "pairAddress": p.get("pairAddress"), - "baseToken": p.get("baseToken"), - "quoteToken": p.get("quoteToken"), - "priceUsd": p.get("priceUsd"), - "liquidityUsd": (p.get("liquidity") or {}).get("usd"), - "fdv": p.get("fdv"), - "marketCap": p.get("marketCap"), - "volume24h": (p.get("volume") or {}).get("h24"), - }) - print_json({"provider": "dexscreener", "chain": chain, "address": address, "pairs": out}) - - -def coingecko_search(query: str): - url = "https://api.coingecko.com/api/v3/search?query=" + urllib.parse.quote(query) - payload = fetch_json(url) - coins = payload.get("coins") or [] - out = [] - for c in coins[:10]: - out.append({ - "id": c.get("id"), - "name": c.get("name"), - "symbol": c.get("symbol"), - "marketCapRank": c.get("market_cap_rank"), - "thumb": c.get("thumb"), - }) - print_json({"provider": "coingecko", "query": query, "coins": out}) - - -def coingecko_coin(coin_id: str): - params = urllib.parse.urlencode({ - "localization": "false", - "tickers": "false", - "market_data": "true", - "community_data": "false", - "developer_data": "false", - "sparkline": "false", - }) - url = f"https://api.coingecko.com/api/v3/coins/{urllib.parse.quote(coin_id)}?{params}" - payload = fetch_json(url) - md = payload.get("market_data") or {} - out = { - "provider": "coingecko", - "id": payload.get("id"), - "symbol": payload.get("symbol"), - "name": payload.get("name"), - "marketCapRank": payload.get("market_cap_rank"), - "currentPriceUsd": (md.get("current_price") or {}).get("usd"), - "marketCapUsd": (md.get("market_cap") or {}).get("usd"), - "fullyDilutedValuationUsd": (md.get("fully_diluted_valuation") or {}).get("usd"), - "totalVolumeUsd": (md.get("total_volume") or {}).get("usd"), - "priceChangePercentage24h": md.get("price_change_percentage_24h"), - "priceChangePercentage7d": md.get("price_change_percentage_7d"), - "priceChangePercentage30d": md.get("price_change_percentage_30d"), - "circulatingSupply": md.get("circulating_supply"), - "totalSupply": md.get("total_supply"), - "maxSupply": md.get("max_supply"), - "homepage": (payload.get("links") or {}).get("homepage", [None])[0], - } - print_json(out) - - -def birdeye_token(address: str): - api_key = os.getenv("BIRDEYE_API_KEY") or os.getenv("BIRDEYE_APIKEY") - if not api_key: - raise SystemExit("Birdeye requires BIRDEYE_API_KEY in the environment") - url = "https://public-api.birdeye.so/defi/token_overview?address=" + urllib.parse.quote(address) - payload = fetch_json(url, headers={ - "x-api-key": api_key, - "x-chain": "solana", - }) - print_json({"provider": "birdeye", "address": address, "data": payload.get("data")}) - - -def build_parser(): - parser = argparse.ArgumentParser(description="Coin Hunter market data probe") - sub = parser.add_subparsers(dest="command", required=True) - - p = sub.add_parser("bybit-ticker", help="Fetch Bybit spot ticker") - p.add_argument("symbol") - - p = sub.add_parser("bybit-klines", help="Fetch Bybit spot klines") - p.add_argument("symbol") - p.add_argument("--interval", default="60", help="Bybit interval, e.g. 1, 5, 15, 60, 240, D") - p.add_argument("--limit", type=int, default=10) - - p = sub.add_parser("dex-search", help="Search DexScreener by query") - p.add_argument("query") - - p = sub.add_parser("dex-token", help="Fetch DexScreener token pairs by chain/address") - p.add_argument("chain") - p.add_argument("address") - - p = sub.add_parser("gecko-search", help="Search CoinGecko") - p.add_argument("query") - - p = sub.add_parser("gecko-coin", help="Fetch CoinGecko coin by id") - p.add_argument("coin_id") - - p = sub.add_parser("birdeye-token", help="Fetch Birdeye token overview (Solana)") - p.add_argument("address") - - return parser - - -def main(): - parser = build_parser() - args = parser.parse_args() - if args.command == "bybit-ticker": - bybit_ticker(args.symbol) - elif args.command == "bybit-klines": - bybit_klines(args.symbol, args.interval, args.limit) - elif args.command == "dex-search": - dexscreener_search(args.query) - elif args.command == "dex-token": - dexscreener_token(args.chain, args.address) - elif args.command == "gecko-search": - coingecko_search(args.query) - elif args.command == "gecko-coin": - coingecko_coin(args.coin_id) - elif args.command == "birdeye-token": - birdeye_token(args.address) - else: - parser.error("Unknown command") +from __future__ import annotations +from .commands.market_probe import main if __name__ == "__main__": - main() + raise SystemExit(main()) diff --git a/src/coinhunter/paths.py b/src/coinhunter/paths.py index 188b32e..ad4dacc 100644 --- a/src/coinhunter/paths.py +++ b/src/coinhunter/paths.py @@ -1,16 +1,8 @@ -"""Print CoinHunter runtime paths.""" +"""Backward-compatible facade for paths.""" from __future__ import annotations -import json - -from .runtime import get_runtime_paths - - -def main() -> int: - print(json.dumps(get_runtime_paths().as_dict(), ensure_ascii=False, indent=2)) - return 0 - +from .commands.paths import main if __name__ == "__main__": raise SystemExit(main()) diff --git a/src/coinhunter/rotate_external_gate_log.py b/src/coinhunter/rotate_external_gate_log.py index 3bacea1..28db305 100755 --- a/src/coinhunter/rotate_external_gate_log.py +++ b/src/coinhunter/rotate_external_gate_log.py @@ -1,28 +1,8 @@ -#!/usr/bin/env python3 -"""Rotate external gate log using the user's logrotate config/state.""" -import shutil -import subprocess +"""Backward-compatible facade for rotate_external_gate_log.""" -from .runtime import ensure_runtime_dirs, get_runtime_paths - -PATHS = get_runtime_paths() -STATE_DIR = PATHS.state_dir -LOGROTATE_STATUS = PATHS.logrotate_status -LOGROTATE_CONF = PATHS.logrotate_config -LOGS_DIR = PATHS.logs_dir - - -def main(): - ensure_runtime_dirs(PATHS) - logrotate_bin = shutil.which("logrotate") or "/usr/sbin/logrotate" - cmd = [logrotate_bin, "-s", str(LOGROTATE_STATUS), str(LOGROTATE_CONF)] - result = subprocess.run(cmd, capture_output=True, text=True) - if result.stdout.strip(): - print(result.stdout.strip()) - if result.stderr.strip(): - print(result.stderr.strip()) - return result.returncode +from __future__ import annotations +from .commands.rotate_external_gate_log import main if __name__ == "__main__": raise SystemExit(main()) diff --git a/src/coinhunter/services/adaptive_profile.py b/src/coinhunter/services/adaptive_profile.py new file mode 100644 index 0000000..8b49b4a --- /dev/null +++ b/src/coinhunter/services/adaptive_profile.py @@ -0,0 +1,105 @@ +"""Adaptive trigger profile builder for precheck.""" + +from __future__ import annotations + +from .data_utils import to_float +from .precheck_constants import ( + BASE_CANDIDATE_SCORE_TRIGGER_RATIO, + BASE_COOLDOWN_MINUTES, + BASE_FORCE_ANALYSIS_AFTER_MINUTES, + BASE_PNL_TRIGGER_PCT, + BASE_PORTFOLIO_MOVE_TRIGGER_PCT, + BASE_PRICE_MOVE_TRIGGER_PCT, + MIN_ACTIONABLE_USDT, + MIN_REAL_POSITION_VALUE_USDT, +) + + +def build_adaptive_profile(snapshot: dict): + portfolio_value = snapshot.get("portfolio_value_usdt", 0) + free_usdt = snapshot.get("free_usdt", 0) + session = snapshot.get("session") + market = snapshot.get("market_regime", {}) + volatility_score = to_float(market.get("volatility_score"), 0) + leader_score = to_float(market.get("leader_score"), 0) + actionable_positions = int(snapshot.get("actionable_positions") or 0) + largest_position_value = to_float(snapshot.get("largest_position_value_usdt"), 0) + + capital_band = "micro" if portfolio_value < 25 else "small" if portfolio_value < 100 else "normal" + session_mode = "quiet" if session in {"overnight", "asia-morning"} else "active" + volatility_mode = "high" if volatility_score >= 2.5 or leader_score >= 120 else "normal" + dust_mode = free_usdt < MIN_ACTIONABLE_USDT and largest_position_value < MIN_REAL_POSITION_VALUE_USDT + + price_trigger = BASE_PRICE_MOVE_TRIGGER_PCT + pnl_trigger = BASE_PN_L_TRIGGER_PCT + portfolio_trigger = BASE_PORTFOLIO_MOVE_TRIGGER_PCT + candidate_ratio = BASE_CANDIDATE_SCORE_TRIGGER_RATIO + force_minutes = BASE_FORCE_ANALYSIS_AFTER_MINUTES + cooldown_minutes = BASE_COOLDOWN_MINUTES + soft_score_threshold = 2.0 + + if capital_band == "micro": + price_trigger += 0.02 + pnl_trigger += 0.03 + portfolio_trigger += 0.04 + candidate_ratio += 0.25 + force_minutes += 180 + cooldown_minutes += 30 + soft_score_threshold += 1.0 + elif capital_band == "small": + price_trigger += 0.01 + pnl_trigger += 0.01 + portfolio_trigger += 0.01 + candidate_ratio += 0.1 + force_minutes += 60 + cooldown_minutes += 10 + soft_score_threshold += 0.5 + + if session_mode == "quiet": + price_trigger += 0.01 + pnl_trigger += 0.01 + portfolio_trigger += 0.01 + candidate_ratio += 0.05 + soft_score_threshold += 0.5 + else: + force_minutes = max(120, force_minutes - 30) + + if volatility_mode == "high": + price_trigger = max(0.02, price_trigger - 0.01) + pnl_trigger = max(0.025, pnl_trigger - 0.005) + portfolio_trigger = max(0.025, portfolio_trigger - 0.005) + candidate_ratio = max(1.1, candidate_ratio - 0.1) + cooldown_minutes = max(20, cooldown_minutes - 10) + soft_score_threshold = max(1.0, soft_score_threshold - 0.5) + + if dust_mode: + candidate_ratio += 0.3 + force_minutes += 180 + cooldown_minutes += 30 + soft_score_threshold += 1.5 + + return { + "capital_band": capital_band, + "session_mode": session_mode, + "volatility_mode": volatility_mode, + "dust_mode": dust_mode, + "price_move_trigger_pct": round(price_trigger, 4), + "pnl_trigger_pct": round(pnl_trigger, 4), + "portfolio_move_trigger_pct": round(portfolio_trigger, 4), + "candidate_score_trigger_ratio": round(candidate_ratio, 4), + "force_analysis_after_minutes": int(force_minutes), + "cooldown_minutes": int(cooldown_minutes), + "soft_score_threshold": round(soft_score_threshold, 2), + "new_entries_allowed": free_usdt >= MIN_ACTIONABLE_USDT and not dust_mode, + "switching_allowed": actionable_positions > 0 or portfolio_value >= 25, + } + + +def _candidate_weight(snapshot: dict, profile: dict) -> float: + if not profile.get("new_entries_allowed"): + return 0.5 + if profile.get("volatility_mode") == "high": + return 1.5 + if snapshot.get("session") in {"europe-open", "us-session"}: + return 1.25 + return 1.0 diff --git a/src/coinhunter/services/candidate_scoring.py b/src/coinhunter/services/candidate_scoring.py new file mode 100644 index 0000000..71b2dfb --- /dev/null +++ b/src/coinhunter/services/candidate_scoring.py @@ -0,0 +1,71 @@ +"""Candidate coin scoring and selection for precheck.""" + +from __future__ import annotations + +import re + +from .data_utils import to_float +from .precheck_constants import BLACKLIST, MAX_PRICE_CAP, MIN_CHANGE_PCT, TOP_CANDIDATES + + +def _liquidity_score(volume: float) -> float: + return min(1.0, max(0.0, volume / 50_000_000)) + + +def _breakout_score(price: float, avg_price: float | None) -> float: + if not avg_price or avg_price <= 0: + return 0.0 + return (price - avg_price) / avg_price + + +def top_candidates_from_tickers(tickers: dict): + candidates = [] + for symbol, ticker in tickers.items(): + if not symbol.endswith("/USDT"): + continue + base = symbol.replace("/USDT", "") + if base in BLACKLIST: + continue + if not re.fullmatch(r"[A-Z0-9]{2,20}", base): + continue + price = to_float(ticker.get("last")) + change_pct = to_float(ticker.get("percentage")) + volume = to_float(ticker.get("quoteVolume")) + high = to_float(ticker.get("high")) + low = to_float(ticker.get("low")) + avg_price = to_float(ticker.get("average"), None) + if price <= 0: + continue + if MAX_PRICE_CAP is not None and price > MAX_PRICE_CAP: + continue + if volume < 500_000: + continue + if change_pct < MIN_CHANGE_PCT: + continue + momentum = change_pct / 10.0 + liquidity = _liquidity_score(volume) + breakout = _breakout_score(price, avg_price) + score = round(momentum * 0.5 + liquidity * 0.3 + breakout * 0.2, 4) + band = "major" if price >= 10 else "mid" if price >= 1 else "meme" + distance_from_high = (high - price) / max(high, 1e-9) if high else None + candidates.append({ + "symbol": symbol, + "base": base, + "price": round(price, 8), + "change_24h_pct": round(change_pct, 2), + "volume_24h": round(volume, 2), + "breakout_pct": round(breakout * 100, 2), + "high_24h": round(high, 8) if high else None, + "low_24h": round(low, 8) if low else None, + "distance_from_high_pct": round(distance_from_high * 100, 2) if distance_from_high is not None else None, + "score": score, + "band": band, + }) + candidates.sort(key=lambda x: x["score"], reverse=True) + global_top = candidates[:TOP_CANDIDATES] + layers = {"major": [], "mid": [], "meme": []} + for c in candidates: + layers[c["band"]].append(c) + for k in layers: + layers[k] = layers[k][:5] + return global_top, layers diff --git a/src/coinhunter/services/data_utils.py b/src/coinhunter/services/data_utils.py new file mode 100644 index 0000000..5a74ea6 --- /dev/null +++ b/src/coinhunter/services/data_utils.py @@ -0,0 +1,39 @@ +"""Generic data helpers for precheck.""" + +from __future__ import annotations + +import hashlib +import json +from pathlib import Path + + +def load_json(path: Path, default): + if not path.exists(): + return default + try: + return json.loads(path.read_text(encoding="utf-8")) + except Exception: + return default + + +def stable_hash(data) -> str: + payload = json.dumps(data, sort_keys=True, ensure_ascii=False, separators=(",", ":")) + return hashlib.sha1(payload.encode("utf-8")).hexdigest() + + +def to_float(value, default=0.0): + try: + if value is None: + return default + return float(value) + except Exception: + return default + + +def norm_symbol(symbol: str) -> str: + s = symbol.upper().replace("-", "").replace("_", "") + if "/" in s: + return s + if s.endswith("USDT"): + return s[:-4] + "/USDT" + return s diff --git a/src/coinhunter/services/market_data.py b/src/coinhunter/services/market_data.py new file mode 100644 index 0000000..5dc2886 --- /dev/null +++ b/src/coinhunter/services/market_data.py @@ -0,0 +1,136 @@ +"""Market data fetching and metric computation for precheck.""" + +from __future__ import annotations + +import os + +import ccxt + +from .data_utils import norm_symbol, to_float +from .precheck_constants import BLACKLIST, MAX_PRICE_CAP, MIN_CHANGE_PCT +from .time_utils import utc_now + + +def get_exchange(): + from ..runtime import load_env_file + from .precheck_constants import ENV_FILE + + load_env_file(ENV_FILE) + api_key = os.getenv("BINANCE_API_KEY") + secret = os.getenv("BINANCE_API_SECRET") + if not api_key or not secret: + raise RuntimeError("Missing BINANCE_API_KEY or BINANCE_API_SECRET in ~/.hermes/.env") + ex = ccxt.binance({ + "apiKey": api_key, + "secret": secret, + "options": {"defaultType": "spot"}, + "enableRateLimit": True, + }) + ex.load_markets() + return ex + + +def fetch_ohlcv_batch(ex, symbols: set, timeframe: str, limit: int): + results = {} + for sym in sorted(symbols): + try: + ohlcv = ex.fetch_ohlcv(sym, timeframe=timeframe, limit=limit) + if ohlcv and len(ohlcv) >= 2: + results[sym] = ohlcv + except Exception: + pass + return results + + +def compute_ohlcv_metrics(ohlcv_1h, ohlcv_4h, current_price, volume_24h=None): + metrics = {} + if ohlcv_1h and len(ohlcv_1h) >= 2: + closes = [c[4] for c in ohlcv_1h] + volumes = [c[5] for c in ohlcv_1h] + metrics["change_1h_pct"] = round((closes[-1] - closes[-2]) / closes[-2] * 100, 2) if closes[-2] != 0 else None + if len(closes) >= 5: + metrics["change_4h_pct"] = round((closes[-1] - closes[-5]) / closes[-5] * 100, 2) if closes[-5] != 0 else None + recent_vol = sum(volumes[-4:]) / 4 if len(volumes) >= 4 else None + metrics["volume_1h_avg"] = round(recent_vol, 2) if recent_vol else None + highs = [c[2] for c in ohlcv_1h[-4:]] + lows = [c[3] for c in ohlcv_1h[-4:]] + metrics["high_4h"] = round(max(highs), 8) if highs else None + metrics["low_4h"] = round(min(lows), 8) if lows else None + + if ohlcv_4h and len(ohlcv_4h) >= 2: + closes_4h = [c[4] for c in ohlcv_4h] + volumes_4h = [c[5] for c in ohlcv_4h] + metrics["change_4h_pct_from_4h"] = round((closes_4h[-1] - closes_4h[-2]) / closes_4h[-2] * 100, 2) if closes_4h[-2] != 0 else None + recent_vol_4h = sum(volumes_4h[-2:]) / 2 if len(volumes_4h) >= 2 else None + metrics["volume_4h_avg"] = round(recent_vol_4h, 2) if recent_vol_4h else None + highs_4h = [c[2] for c in ohlcv_4h] + lows_4h = [c[3] for c in ohlcv_4h] + metrics["high_24h_calc"] = round(max(highs_4h), 8) if highs_4h else None + metrics["low_24h_calc"] = round(min(lows_4h), 8) if lows_4h else None + if highs_4h and lows_4h: + avg_price = sum(closes_4h) / len(closes_4h) + metrics["volatility_4h_pct"] = round((max(highs_4h) - min(lows_4h)) / avg_price * 100, 2) + + if current_price: + if metrics.get("high_4h"): + metrics["distance_from_4h_high_pct"] = round((metrics["high_4h"] - current_price) / metrics["high_4h"] * 100, 2) + if metrics.get("low_4h"): + metrics["distance_from_4h_low_pct"] = round((current_price - metrics["low_4h"]) / metrics["low_4h"] * 100, 2) + if metrics.get("high_24h_calc"): + metrics["distance_from_24h_high_pct"] = round((metrics["high_24h_calc"] - current_price) / metrics["high_24h_calc"] * 100, 2) + if metrics.get("low_24h_calc"): + metrics["distance_from_24h_low_pct"] = round((current_price - metrics["low_24h_calc"]) / metrics["low_24h_calc"] * 100, 2) + + if volume_24h and volume_24h > 0 and metrics.get("volume_1h_avg"): + daily_avg_1h = volume_24h / 24 + metrics["volume_1h_multiple"] = round(metrics["volume_1h_avg"] / daily_avg_1h, 2) + if volume_24h and volume_24h > 0 and metrics.get("volume_4h_avg"): + daily_avg_4h = volume_24h / 6 + metrics["volume_4h_multiple"] = round(metrics["volume_4h_avg"] / daily_avg_4h, 2) + + return metrics + + +def enrich_candidates_and_positions(global_candidates, candidate_layers, positions_view, tickers, ex): + symbols = set() + for c in global_candidates: + symbols.add(c["symbol"]) + for p in positions_view: + sym = p.get("symbol") + if sym: + sym_ccxt = norm_symbol(sym) + symbols.add(sym_ccxt) + + ohlcv_1h = fetch_ohlcv_batch(ex, symbols, "1h", 24) + ohlcv_4h = fetch_ohlcv_batch(ex, symbols, "4h", 12) + + def _apply(target_list): + for item in target_list: + sym = item.get("symbol") + if not sym: + continue + sym_ccxt = norm_symbol(sym) + v24h = to_float(tickers.get(sym_ccxt, {}).get("quoteVolume")) + metrics = compute_ohlcv_metrics( + ohlcv_1h.get(sym_ccxt), + ohlcv_4h.get(sym_ccxt), + item.get("price") or item.get("last_price"), + volume_24h=v24h, + ) + item["metrics"] = metrics + + _apply(global_candidates) + for band_list in candidate_layers.values(): + _apply(band_list) + _apply(positions_view) + return global_candidates, candidate_layers, positions_view + + +def regime_from_pct(pct: float | None) -> str: + if pct is None: + return "unknown" + if pct >= 2.0: + return "bullish" + if pct <= -2.0: + return "bearish" + return "neutral" diff --git a/src/coinhunter/services/precheck_analysis.py b/src/coinhunter/services/precheck_analysis.py index 7f83b5d..0135000 100644 --- a/src/coinhunter/services/precheck_analysis.py +++ b/src/coinhunter/services/precheck_analysis.py @@ -2,16 +2,13 @@ from __future__ import annotations -from . import precheck_core - - -def analyze_trigger(snapshot: dict, state: dict) -> dict: - return precheck_core.analyze_trigger(snapshot, state) +from .time_utils import utc_iso +from .trigger_analyzer import analyze_trigger def build_failure_payload(exc: Exception) -> dict: return { - "generated_at": precheck_core.utc_iso(), + "generated_at": utc_iso(), "status": "deep_analysis_required", "should_analyze": True, "pending_trigger": True, @@ -21,5 +18,5 @@ def build_failure_payload(exc: Exception) -> dict: "soft_reasons": [], "soft_score": 0, "details": [str(exc)], - "compact_summary": f"预检查失败,转入深度分析兜底: {exc}", + "compact_summary": f"预检查失败,转入深度分析兑底: {exc}", } diff --git a/src/coinhunter/services/precheck_constants.py b/src/coinhunter/services/precheck_constants.py new file mode 100644 index 0000000..e776afb --- /dev/null +++ b/src/coinhunter/services/precheck_constants.py @@ -0,0 +1,31 @@ +"""Precheck constants and runtime paths.""" + +from __future__ import annotations + +from ..runtime import get_runtime_paths + +PATHS = get_runtime_paths() +BASE_DIR = PATHS.root +STATE_DIR = PATHS.state_dir +STATE_FILE = PATHS.precheck_state_file +POSITIONS_FILE = PATHS.positions_file +CONFIG_FILE = PATHS.config_file +ENV_FILE = PATHS.env_file + +BASE_PRICE_MOVE_TRIGGER_PCT = 0.025 +BASE_PNL_TRIGGER_PCT = 0.03 +BASE_PORTFOLIO_MOVE_TRIGGER_PCT = 0.03 +BASE_CANDIDATE_SCORE_TRIGGER_RATIO = 1.15 +BASE_FORCE_ANALYSIS_AFTER_MINUTES = 180 +BASE_COOLDOWN_MINUTES = 45 +TOP_CANDIDATES = 10 +MIN_ACTIONABLE_USDT = 12.0 +MIN_REAL_POSITION_VALUE_USDT = 8.0 +BLACKLIST = {"USDC", "BUSD", "TUSD", "FDUSD", "USTC", "PAXG"} +HARD_STOP_PCT = -0.08 +HARD_MOON_PCT = 0.25 +MIN_CHANGE_PCT = 1.0 +MAX_PRICE_CAP = None +HARD_REASON_DEDUP_MINUTES = 15 +MAX_PENDING_TRIGGER_MINUTES = 30 +MAX_RUN_REQUEST_MINUTES = 20 diff --git a/src/coinhunter/services/precheck_core.py b/src/coinhunter/services/precheck_core.py index 8f2b440..ad0b6fc 100644 --- a/src/coinhunter/services/precheck_core.py +++ b/src/coinhunter/services/precheck_core.py @@ -1,900 +1,99 @@ -"""Service-owned precheck core logic. +"""Backward-compatible facade for precheck internals. -This module holds the reusable implementation. Root-level ``coinhunter.precheck`` -is intentionally kept as a compatibility facade for older imports and direct -module execution. +The reusable implementation has been split into smaller modules: +- precheck_constants : paths and thresholds +- time_utils : UTC/local time helpers +- data_utils : json, hash, float, symbol normalization +- state_manager : load/save/sanitize state +- market_data : exchange, ohlcv, metrics +- candidate_scoring : top candidate selection +- snapshot_builder : build_snapshot +- adaptive_profile : trigger profile builder +- trigger_analyzer : analyze_trigger + +Keep this module importable so older entrypoints continue to work. """ from __future__ import annotations -import hashlib -import json -import os -import re -from datetime import datetime, timedelta, timezone -from pathlib import Path -from zoneinfo import ZoneInfo - -import ccxt - -from ..runtime import get_runtime_paths, load_env_file - -PATHS = get_runtime_paths() -BASE_DIR = PATHS.root -STATE_DIR = PATHS.state_dir -STATE_FILE = PATHS.precheck_state_file -POSITIONS_FILE = PATHS.positions_file -CONFIG_FILE = PATHS.config_file -ENV_FILE = PATHS.env_file - -BASE_PRICE_MOVE_TRIGGER_PCT = 0.025 -BASE_PNL_TRIGGER_PCT = 0.03 -BASE_PORTFOLIO_MOVE_TRIGGER_PCT = 0.03 -BASE_CANDIDATE_SCORE_TRIGGER_RATIO = 1.15 -BASE_FORCE_ANALYSIS_AFTER_MINUTES = 180 -BASE_COOLDOWN_MINUTES = 45 -TOP_CANDIDATES = 10 -MIN_ACTIONABLE_USDT = 12.0 -MIN_REAL_POSITION_VALUE_USDT = 8.0 -BLACKLIST = {"USDC", "BUSD", "TUSD", "FDUSD", "USTC", "PAXG"} -HARD_STOP_PCT = -0.08 -HARD_MOON_PCT = 0.25 -MIN_CHANGE_PCT = 1.0 -MAX_PRICE_CAP = None -HARD_REASON_DEDUP_MINUTES = 15 -MAX_PENDING_TRIGGER_MINUTES = 30 -MAX_RUN_REQUEST_MINUTES = 20 - - -def utc_now(): - return datetime.now(timezone.utc) - - -def utc_iso(): - return utc_now().isoformat() - - -def parse_ts(value: str | None): - if not value: - return None - try: - ts = datetime.fromisoformat(value) - if ts.tzinfo is None: - ts = ts.replace(tzinfo=timezone.utc) - return ts - except Exception: - return None - - -def load_json(path: Path, default): - if not path.exists(): - return default - try: - return json.loads(path.read_text(encoding="utf-8")) - except Exception: - return default - - -def load_env(): - load_env_file(PATHS) - - -def load_positions(): - return load_json(POSITIONS_FILE, {}).get("positions", []) - - -def load_state(): - return load_json(STATE_FILE, {}) - - -def load_config(): - return load_json(CONFIG_FILE, {}) - - -def clear_run_request_fields(state: dict): - state.pop("run_requested_at", None) - state.pop("run_request_note", None) - - -def sanitize_state_for_stale_triggers(state: dict): - sanitized = dict(state) - notes = [] - now = utc_now() - run_requested_at = parse_ts(sanitized.get("run_requested_at")) - last_deep_analysis_at = parse_ts(sanitized.get("last_deep_analysis_at")) - last_triggered_at = parse_ts(sanitized.get("last_triggered_at")) - pending_trigger = bool(sanitized.get("pending_trigger")) - - if run_requested_at and last_deep_analysis_at and last_deep_analysis_at >= run_requested_at: - clear_run_request_fields(sanitized) - if pending_trigger and (not last_triggered_at or last_deep_analysis_at >= last_triggered_at): - sanitized["pending_trigger"] = False - sanitized["pending_reasons"] = [] - sanitized["last_ack_note"] = ( - f"auto-cleared completed trigger at {utc_iso()} because last_deep_analysis_at >= run_requested_at" - ) - pending_trigger = False - notes.append( - f"自动清理已完成的 run_requested 标记:最近深度分析时间 {last_deep_analysis_at.isoformat()} >= 请求时间 {run_requested_at.isoformat()}" - ) - run_requested_at = None - - if run_requested_at and now - run_requested_at > timedelta(minutes=MAX_RUN_REQUEST_MINUTES): - clear_run_request_fields(sanitized) - notes.append( - f"自动清理超时 run_requested 标记:已等待 {(now - run_requested_at).total_seconds() / 60:.1f} 分钟,超过 {MAX_RUN_REQUEST_MINUTES} 分钟" - ) - run_requested_at = None - - pending_anchor = run_requested_at or last_triggered_at or last_deep_analysis_at - if pending_trigger and pending_anchor and now - pending_anchor > timedelta(minutes=MAX_PENDING_TRIGGER_MINUTES): - sanitized["pending_trigger"] = False - sanitized["pending_reasons"] = [] - sanitized["last_ack_note"] = ( - f"auto-recovered stale pending trigger at {utc_iso()} after waiting " - f"{(now - pending_anchor).total_seconds() / 60:.1f} minutes" - ) - notes.append( - f"自动解除 pending_trigger:触发状态已悬挂 {(now - pending_anchor).total_seconds() / 60:.1f} 分钟,超过 {MAX_PENDING_TRIGGER_MINUTES} 分钟" - ) - - sanitized["_stale_recovery_notes"] = notes - return sanitized - - -def save_state(state: dict): - STATE_DIR.mkdir(parents=True, exist_ok=True) - state_to_save = dict(state) - state_to_save.pop("_stale_recovery_notes", None) - STATE_FILE.write_text(json.dumps(state_to_save, indent=2, ensure_ascii=False), encoding="utf-8") - - -def stable_hash(data) -> str: - payload = json.dumps(data, sort_keys=True, ensure_ascii=False, separators=(",", ":")) - return hashlib.sha1(payload.encode("utf-8")).hexdigest() - - -def get_exchange(): - load_env() - api_key = os.getenv("BINANCE_API_KEY") - secret = os.getenv("BINANCE_API_SECRET") - if not api_key or not secret: - raise RuntimeError("Missing BINANCE_API_KEY or BINANCE_API_SECRET in ~/.hermes/.env") - ex = ccxt.binance({ - "apiKey": api_key, - "secret": secret, - "options": {"defaultType": "spot"}, - "enableRateLimit": True, - }) - ex.load_markets() - return ex - - -def fetch_ohlcv_batch(ex, symbols: set, timeframe: str, limit: int): - results = {} - for sym in sorted(symbols): - try: - ohlcv = ex.fetch_ohlcv(sym, timeframe=timeframe, limit=limit) - if ohlcv and len(ohlcv) >= 2: - results[sym] = ohlcv - except Exception: - pass - return results - - -def compute_ohlcv_metrics(ohlcv_1h, ohlcv_4h, current_price, volume_24h=None): - metrics = {} - if ohlcv_1h and len(ohlcv_1h) >= 2: - closes = [c[4] for c in ohlcv_1h] - volumes = [c[5] for c in ohlcv_1h] - metrics["change_1h_pct"] = round((closes[-1] - closes[-2]) / closes[-2] * 100, 2) if closes[-2] != 0 else None - if len(closes) >= 5: - metrics["change_4h_pct"] = round((closes[-1] - closes[-5]) / closes[-5] * 100, 2) if closes[-5] != 0 else None - recent_vol = sum(volumes[-4:]) / 4 if len(volumes) >= 4 else None - metrics["volume_1h_avg"] = round(recent_vol, 2) if recent_vol else None - highs = [c[2] for c in ohlcv_1h[-4:]] - lows = [c[3] for c in ohlcv_1h[-4:]] - metrics["high_4h"] = round(max(highs), 8) if highs else None - metrics["low_4h"] = round(min(lows), 8) if lows else None - - if ohlcv_4h and len(ohlcv_4h) >= 2: - closes_4h = [c[4] for c in ohlcv_4h] - volumes_4h = [c[5] for c in ohlcv_4h] - metrics["change_4h_pct_from_4h"] = round((closes_4h[-1] - closes_4h[-2]) / closes_4h[-2] * 100, 2) if closes_4h[-2] != 0 else None - recent_vol_4h = sum(volumes_4h[-2:]) / 2 if len(volumes_4h) >= 2 else None - metrics["volume_4h_avg"] = round(recent_vol_4h, 2) if recent_vol_4h else None - highs_4h = [c[2] for c in ohlcv_4h] - lows_4h = [c[3] for c in ohlcv_4h] - metrics["high_24h_calc"] = round(max(highs_4h), 8) if highs_4h else None - metrics["low_24h_calc"] = round(min(lows_4h), 8) if lows_4h else None - if highs_4h and lows_4h: - avg_price = sum(closes_4h) / len(closes_4h) - metrics["volatility_4h_pct"] = round((max(highs_4h) - min(lows_4h)) / avg_price * 100, 2) - - if current_price: - if metrics.get("high_4h"): - metrics["distance_from_4h_high_pct"] = round((metrics["high_4h"] - current_price) / metrics["high_4h"] * 100, 2) - if metrics.get("low_4h"): - metrics["distance_from_4h_low_pct"] = round((current_price - metrics["low_4h"]) / metrics["low_4h"] * 100, 2) - if metrics.get("high_24h_calc"): - metrics["distance_from_24h_high_pct"] = round((metrics["high_24h_calc"] - current_price) / metrics["high_24h_calc"] * 100, 2) - if metrics.get("low_24h_calc"): - metrics["distance_from_24h_low_pct"] = round((current_price - metrics["low_24h_calc"]) / metrics["low_24h_calc"] * 100, 2) - - if volume_24h and volume_24h > 0 and metrics.get("volume_1h_avg"): - daily_avg_1h = volume_24h / 24 - metrics["volume_1h_multiple"] = round(metrics["volume_1h_avg"] / daily_avg_1h, 2) - if volume_24h and volume_24h > 0 and metrics.get("volume_4h_avg"): - daily_avg_4h = volume_24h / 6 - metrics["volume_4h_multiple"] = round(metrics["volume_4h_avg"] / daily_avg_4h, 2) - - return metrics - - -def enrich_candidates_and_positions(global_candidates, candidate_layers, positions_view, tickers, ex): - symbols = set() - for c in global_candidates: - symbols.add(c["symbol"]) - for p in positions_view: - sym = p.get("symbol") - if sym: - sym_ccxt = norm_symbol(sym) - symbols.add(sym_ccxt) - - ohlcv_1h = fetch_ohlcv_batch(ex, symbols, "1h", 24) - ohlcv_4h = fetch_ohlcv_batch(ex, symbols, "4h", 12) - - def _apply(target_list): - for item in target_list: - sym = item.get("symbol") - if not sym: - continue - sym_ccxt = norm_symbol(sym) - v24h = to_float(tickers.get(sym_ccxt, {}).get("quoteVolume")) - metrics = compute_ohlcv_metrics( - ohlcv_1h.get(sym_ccxt), - ohlcv_4h.get(sym_ccxt), - item.get("price") or item.get("last_price"), - volume_24h=v24h, - ) - item["metrics"] = metrics - - _apply(global_candidates) - for band_list in candidate_layers.values(): - _apply(band_list) - _apply(positions_view) - return global_candidates, candidate_layers, positions_view - - -def regime_from_pct(pct: float | None) -> str: - if pct is None: - return "unknown" - if pct >= 2.0: - return "bullish" - if pct <= -2.0: - return "bearish" - return "neutral" - - -def to_float(value, default=0.0): - try: - if value is None: - return default - return float(value) - except Exception: - return default - - -def norm_symbol(symbol: str) -> str: - s = symbol.upper().replace("-", "").replace("_", "") - if "/" in s: - return s - if s.endswith("USDT"): - return s[:-4] + "/USDT" - return s - - -def get_local_now(config: dict): - tz_name = config.get("timezone") or "Asia/Shanghai" - try: - tz = ZoneInfo(tz_name) - except Exception: - tz = ZoneInfo("Asia/Shanghai") - tz_name = "Asia/Shanghai" - return utc_now().astimezone(tz), tz_name - - -def session_label(local_dt: datetime) -> str: - hour = local_dt.hour - if 0 <= hour < 7: - return "overnight" - if 7 <= hour < 12: - return "asia-morning" - if 12 <= hour < 17: - return "asia-afternoon" - if 17 <= hour < 21: - return "europe-open" - return "us-session" - - -def _liquidity_score(volume: float) -> float: - return min(1.0, max(0.0, volume / 50_000_000)) - - -def _breakout_score(price: float, avg_price: float | None) -> float: - if not avg_price or avg_price <= 0: - return 0.0 - return (price - avg_price) / avg_price - - -def top_candidates_from_tickers(tickers: dict): - candidates = [] - for symbol, ticker in tickers.items(): - if not symbol.endswith("/USDT"): - continue - base = symbol.replace("/USDT", "") - if base in BLACKLIST: - continue - if not re.fullmatch(r"[A-Z0-9]{2,20}", base): - continue - price = to_float(ticker.get("last")) - change_pct = to_float(ticker.get("percentage")) - volume = to_float(ticker.get("quoteVolume")) - high = to_float(ticker.get("high")) - low = to_float(ticker.get("low")) - avg_price = to_float(ticker.get("average"), None) - if price <= 0: - continue - if MAX_PRICE_CAP is not None and price > MAX_PRICE_CAP: - continue - if volume < 500_000: - continue - if change_pct < MIN_CHANGE_PCT: - continue - momentum = change_pct / 10.0 - liquidity = _liquidity_score(volume) - breakout = _breakout_score(price, avg_price) - score = round(momentum * 0.5 + liquidity * 0.3 + breakout * 0.2, 4) - band = "major" if price >= 10 else "mid" if price >= 1 else "meme" - distance_from_high = (high - price) / max(high, 1e-9) if high else None - candidates.append({ - "symbol": symbol, - "base": base, - "price": round(price, 8), - "change_24h_pct": round(change_pct, 2), - "volume_24h": round(volume, 2), - "breakout_pct": round(breakout * 100, 2), - "high_24h": round(high, 8) if high else None, - "low_24h": round(low, 8) if low else None, - "distance_from_high_pct": round(distance_from_high * 100, 2) if distance_from_high is not None else None, - "score": score, - "band": band, - }) - candidates.sort(key=lambda x: x["score"], reverse=True) - global_top = candidates[:TOP_CANDIDATES] - layers = {"major": [], "mid": [], "meme": []} - for c in candidates: - layers[c["band"]].append(c) - for k in layers: - layers[k] = layers[k][:5] - return global_top, layers - - -def build_snapshot(): - config = load_config() - local_dt, tz_name = get_local_now(config) - ex = get_exchange() - positions = load_positions() - tickers = ex.fetch_tickers() - balances = ex.fetch_balance()["free"] - free_usdt = to_float(balances.get("USDT")) - - positions_view = [] - total_position_value = 0.0 - largest_position_value = 0.0 - actionable_positions = 0 - for pos in positions: - symbol = pos.get("symbol") or "" - sym_ccxt = norm_symbol(symbol) - ticker = tickers.get(sym_ccxt, {}) - last = to_float(ticker.get("last"), None) - qty = to_float(pos.get("quantity")) - avg_cost = to_float(pos.get("avg_cost"), None) - value = round(qty * last, 4) if last is not None else None - pnl_pct = round((last - avg_cost) / avg_cost, 4) if last is not None and avg_cost else None - high = to_float(ticker.get("high")) - low = to_float(ticker.get("low")) - distance_from_high = (high - last) / max(high, 1e-9) if high and last else None - if value is not None: - total_position_value += value - largest_position_value = max(largest_position_value, value) - if value >= MIN_REAL_POSITION_VALUE_USDT: - actionable_positions += 1 - positions_view.append({ - "symbol": symbol, - "base_asset": pos.get("base_asset"), - "quantity": qty, - "avg_cost": avg_cost, - "last_price": last, - "market_value_usdt": value, - "pnl_pct": pnl_pct, - "high_24h": round(high, 8) if high else None, - "low_24h": round(low, 8) if low else None, - "distance_from_high_pct": round(distance_from_high * 100, 2) if distance_from_high is not None else None, - }) - - btc_pct = to_float((tickers.get("BTC/USDT") or {}).get("percentage"), None) - eth_pct = to_float((tickers.get("ETH/USDT") or {}).get("percentage"), None) - global_candidates, candidate_layers = top_candidates_from_tickers(tickers) - global_candidates, candidate_layers, positions_view = enrich_candidates_and_positions( - global_candidates, candidate_layers, positions_view, tickers, ex - ) - leader_score = global_candidates[0]["score"] if global_candidates else 0.0 - portfolio_value = round(free_usdt + total_position_value, 4) - volatility_score = round(max(abs(to_float(btc_pct, 0)), abs(to_float(eth_pct, 0))), 2) - - position_structure = [ - { - "symbol": p.get("symbol"), - "base_asset": p.get("base_asset"), - "quantity": round(to_float(p.get("quantity"), 0), 10), - "avg_cost": to_float(p.get("avg_cost"), None), - } - for p in positions_view - ] - - snapshot = { - "generated_at": utc_iso(), - "timezone": tz_name, - "local_time": local_dt.isoformat(), - "session": session_label(local_dt), - "free_usdt": round(free_usdt, 4), - "portfolio_value_usdt": portfolio_value, - "largest_position_value_usdt": round(largest_position_value, 4), - "actionable_positions": actionable_positions, - "positions": positions_view, - "positions_hash": stable_hash(position_structure), - "top_candidates": global_candidates, - "top_candidates_layers": candidate_layers, - "candidates_hash": stable_hash({"global": global_candidates, "layers": candidate_layers}), - "market_regime": { - "btc_24h_pct": round(btc_pct, 2) if btc_pct is not None else None, - "btc_regime": regime_from_pct(btc_pct), - "eth_24h_pct": round(eth_pct, 2) if eth_pct is not None else None, - "eth_regime": regime_from_pct(eth_pct), - "volatility_score": volatility_score, - "leader_score": round(leader_score, 4), - }, - } - snapshot["snapshot_hash"] = stable_hash({ - "portfolio_value_usdt": snapshot["portfolio_value_usdt"], - "positions_hash": snapshot["positions_hash"], - "candidates_hash": snapshot["candidates_hash"], - "market_regime": snapshot["market_regime"], - "session": snapshot["session"], - }) - return snapshot - - -def build_adaptive_profile(snapshot: dict): - portfolio_value = snapshot.get("portfolio_value_usdt", 0) - free_usdt = snapshot.get("free_usdt", 0) - session = snapshot.get("session") - market = snapshot.get("market_regime", {}) - volatility_score = to_float(market.get("volatility_score"), 0) - leader_score = to_float(market.get("leader_score"), 0) - actionable_positions = int(snapshot.get("actionable_positions") or 0) - largest_position_value = to_float(snapshot.get("largest_position_value_usdt"), 0) - - capital_band = "micro" if portfolio_value < 25 else "small" if portfolio_value < 100 else "normal" - session_mode = "quiet" if session in {"overnight", "asia-morning"} else "active" - volatility_mode = "high" if volatility_score >= 2.5 or leader_score >= 120 else "normal" - dust_mode = free_usdt < MIN_ACTIONABLE_USDT and largest_position_value < MIN_REAL_POSITION_VALUE_USDT - - price_trigger = BASE_PRICE_MOVE_TRIGGER_PCT - pnl_trigger = BASE_PNL_TRIGGER_PCT - portfolio_trigger = BASE_PORTFOLIO_MOVE_TRIGGER_PCT - candidate_ratio = BASE_CANDIDATE_SCORE_TRIGGER_RATIO - force_minutes = BASE_FORCE_ANALYSIS_AFTER_MINUTES - cooldown_minutes = BASE_COOLDOWN_MINUTES - soft_score_threshold = 2.0 - - if capital_band == "micro": - price_trigger += 0.02 - pnl_trigger += 0.03 - portfolio_trigger += 0.04 - candidate_ratio += 0.25 - force_minutes += 180 - cooldown_minutes += 30 - soft_score_threshold += 1.0 - elif capital_band == "small": - price_trigger += 0.01 - pnl_trigger += 0.01 - portfolio_trigger += 0.01 - candidate_ratio += 0.1 - force_minutes += 60 - cooldown_minutes += 10 - soft_score_threshold += 0.5 - - if session_mode == "quiet": - price_trigger += 0.01 - pnl_trigger += 0.01 - portfolio_trigger += 0.01 - candidate_ratio += 0.05 - soft_score_threshold += 0.5 - else: - force_minutes = max(120, force_minutes - 30) - - if volatility_mode == "high": - price_trigger = max(0.02, price_trigger - 0.01) - pnl_trigger = max(0.025, pnl_trigger - 0.005) - portfolio_trigger = max(0.025, portfolio_trigger - 0.005) - candidate_ratio = max(1.1, candidate_ratio - 0.1) - cooldown_minutes = max(20, cooldown_minutes - 10) - soft_score_threshold = max(1.0, soft_score_threshold - 0.5) - - if dust_mode: - candidate_ratio += 0.3 - force_minutes += 180 - cooldown_minutes += 30 - soft_score_threshold += 1.5 - - return { - "capital_band": capital_band, - "session_mode": session_mode, - "volatility_mode": volatility_mode, - "dust_mode": dust_mode, - "price_move_trigger_pct": round(price_trigger, 4), - "pnl_trigger_pct": round(pnl_trigger, 4), - "portfolio_move_trigger_pct": round(portfolio_trigger, 4), - "candidate_score_trigger_ratio": round(candidate_ratio, 4), - "force_analysis_after_minutes": int(force_minutes), - "cooldown_minutes": int(cooldown_minutes), - "soft_score_threshold": round(soft_score_threshold, 2), - "new_entries_allowed": free_usdt >= MIN_ACTIONABLE_USDT and not dust_mode, - "switching_allowed": actionable_positions > 0 or portfolio_value >= 25, - } - - -def _candidate_weight(snapshot: dict, profile: dict) -> float: - if not profile.get("new_entries_allowed"): - return 0.5 - if profile.get("volatility_mode") == "high": - return 1.5 - if snapshot.get("session") in {"europe-open", "us-session"}: - return 1.25 - return 1.0 - - -def analyze_trigger(snapshot: dict, state: dict): - reasons = [] - details = list(state.get("_stale_recovery_notes", [])) - hard_reasons = [] - soft_reasons = [] - soft_score = 0.0 - - profile = build_adaptive_profile(snapshot) - market = snapshot.get("market_regime", {}) - now = utc_now() - - last_positions_hash = state.get("last_positions_hash") - last_portfolio_value = state.get("last_portfolio_value_usdt") - last_market_regime = state.get("last_market_regime", {}) - last_positions_map = state.get("last_positions_map", {}) - last_top_candidate = state.get("last_top_candidate") - pending_trigger = bool(state.get("pending_trigger")) - run_requested_at = parse_ts(state.get("run_requested_at")) - last_deep_analysis_at = parse_ts(state.get("last_deep_analysis_at")) - last_triggered_at = parse_ts(state.get("last_triggered_at")) - last_trigger_snapshot_hash = state.get("last_trigger_snapshot_hash") - last_hard_reasons_at = state.get("last_hard_reasons_at", {}) - - price_trigger = profile["price_move_trigger_pct"] - pnl_trigger = profile["pnl_trigger_pct"] - portfolio_trigger = profile["portfolio_move_trigger_pct"] - candidate_ratio_trigger = profile["candidate_score_trigger_ratio"] - force_minutes = profile["force_analysis_after_minutes"] - cooldown_minutes = profile["cooldown_minutes"] - soft_score_threshold = profile["soft_score_threshold"] - - if pending_trigger: - reasons.append("pending-trigger-unacked") - hard_reasons.append("pending-trigger-unacked") - details.append("上次已触发深度分析但尚未确认完成") - if run_requested_at: - details.append(f"外部门控已在 {run_requested_at.isoformat()} 请求运行分析任务") - - if not last_deep_analysis_at: - reasons.append("first-analysis") - hard_reasons.append("first-analysis") - details.append("尚未记录过深度分析") - elif now - last_deep_analysis_at >= timedelta(minutes=force_minutes): - reasons.append("stale-analysis") - hard_reasons.append("stale-analysis") - details.append(f"距离上次深度分析已超过 {force_minutes} 分钟") - - if last_positions_hash and snapshot["positions_hash"] != last_positions_hash: - reasons.append("positions-changed") - hard_reasons.append("positions-changed") - details.append("持仓结构发生变化") - - if last_portfolio_value not in (None, 0): - portfolio_delta = abs(snapshot["portfolio_value_usdt"] - last_portfolio_value) / max(last_portfolio_value, 1e-9) - if portfolio_delta >= portfolio_trigger: - if portfolio_delta >= 1.0: - reasons.append("portfolio-extreme-move") - hard_reasons.append("portfolio-extreme-move") - details.append(f"组合净值剧烈变化 {portfolio_delta:.1%},超过 100%,视为硬触发") - else: - reasons.append("portfolio-move") - soft_reasons.append("portfolio-move") - soft_score += 1.0 - details.append(f"组合净值变化 {portfolio_delta:.1%},阈值 {portfolio_trigger:.1%}") - - for pos in snapshot["positions"]: - symbol = pos["symbol"] - prev = last_positions_map.get(symbol, {}) - cur_price = pos.get("last_price") - prev_price = prev.get("last_price") - cur_pnl = pos.get("pnl_pct") - prev_pnl = prev.get("pnl_pct") - market_value = to_float(pos.get("market_value_usdt"), 0) - actionable_position = market_value >= MIN_REAL_POSITION_VALUE_USDT - - if cur_price and prev_price: - price_move = abs(cur_price - prev_price) / max(prev_price, 1e-9) - if price_move >= price_trigger: - reasons.append(f"price-move:{symbol}") - soft_reasons.append(f"price-move:{symbol}") - soft_score += 1.0 if actionable_position else 0.4 - details.append(f"{symbol} 价格变化 {price_move:.1%},阈值 {price_trigger:.1%}") - if cur_pnl is not None and prev_pnl is not None: - pnl_move = abs(cur_pnl - prev_pnl) - if pnl_move >= pnl_trigger: - reasons.append(f"pnl-move:{symbol}") - soft_reasons.append(f"pnl-move:{symbol}") - soft_score += 1.0 if actionable_position else 0.4 - details.append(f"{symbol} 盈亏变化 {pnl_move:.1%},阈值 {pnl_trigger:.1%}") - if cur_pnl is not None: - stop_band = -0.06 if actionable_position else -0.12 - take_band = 0.14 if actionable_position else 0.25 - if cur_pnl <= stop_band or cur_pnl >= take_band: - reasons.append(f"risk-band:{symbol}") - hard_reasons.append(f"risk-band:{symbol}") - details.append(f"{symbol} 接近执行阈值,当前盈亏 {cur_pnl:.1%}") - if cur_pnl <= HARD_STOP_PCT: - reasons.append(f"hard-stop:{symbol}") - hard_reasons.append(f"hard-stop:{symbol}") - details.append(f"{symbol} 盈亏超过 {HARD_STOP_PCT:.1%},触发紧急硬触发") - - current_market = snapshot.get("market_regime", {}) - if last_market_regime: - if current_market.get("btc_regime") != last_market_regime.get("btc_regime"): - reasons.append("btc-regime-change") - hard_reasons.append("btc-regime-change") - details.append(f"BTC 由 {last_market_regime.get('btc_regime')} 切换为 {current_market.get('btc_regime')}") - if current_market.get("eth_regime") != last_market_regime.get("eth_regime"): - reasons.append("eth-regime-change") - hard_reasons.append("eth-regime-change") - details.append(f"ETH 由 {last_market_regime.get('eth_regime')} 切换为 {current_market.get('eth_regime')}") - - for cand in snapshot.get("top_candidates", []): - if cand.get("change_24h_pct", 0) >= HARD_MOON_PCT * 100: - reasons.append(f"hard-moon:{cand['symbol']}") - hard_reasons.append(f"hard-moon:{cand['symbol']}") - details.append(f"候选币 {cand['symbol']} 24h 涨幅 {cand['change_24h_pct']:.1f}%,触发强势硬触发") - - candidate_weight = _candidate_weight(snapshot, profile) - - last_layers = state.get("last_candidates_layers", {}) - current_layers = snapshot.get("top_candidates_layers", {}) - for band in ("major", "mid", "meme"): - cur_band = current_layers.get(band, []) - prev_band = last_layers.get(band, []) - cur_leader = cur_band[0] if cur_band else None - prev_leader = prev_band[0] if prev_band else None - if cur_leader and prev_leader and cur_leader["symbol"] != prev_leader["symbol"]: - score_ratio = cur_leader.get("score", 0) / max(prev_leader.get("score", 0.0001), 0.0001) - if score_ratio >= candidate_ratio_trigger: - reasons.append(f"new-leader-{band}:{cur_leader['symbol']}") - soft_reasons.append(f"new-leader-{band}:{cur_leader['symbol']}") - soft_score += candidate_weight * 0.7 - details.append( - f"{band} 层新榜首 {cur_leader['symbol']} 替代 {prev_leader['symbol']},score 比例 {score_ratio:.2f}" - ) - - current_leader = snapshot.get("top_candidates", [{}])[0] if snapshot.get("top_candidates") else None - if last_top_candidate and current_leader: - if current_leader.get("symbol") != last_top_candidate.get("symbol"): - score_ratio = current_leader.get("score", 0) / max(last_top_candidate.get("score", 0.0001), 0.0001) - if score_ratio >= candidate_ratio_trigger: - reasons.append("new-leader") - soft_reasons.append("new-leader") - soft_score += candidate_weight - details.append( - f"新候选币 {current_leader.get('symbol')} 领先上次榜首,score 比例 {score_ratio:.2f},阈值 {candidate_ratio_trigger:.2f}" - ) - elif current_leader and not last_top_candidate: - reasons.append("candidate-leader-init") - soft_reasons.append("candidate-leader-init") - soft_score += candidate_weight - details.append(f"首次记录候选榜首 {current_leader.get('symbol')}") - - def _signal_delta() -> float: - delta = 0.0 - if last_trigger_snapshot_hash and snapshot.get("snapshot_hash") != last_trigger_snapshot_hash: - delta += 0.5 - if snapshot["positions_hash"] != last_positions_hash: - delta += 1.5 - for pos in snapshot["positions"]: - symbol = pos["symbol"] - prev = last_positions_map.get(symbol, {}) - cur_price = pos.get("last_price") - prev_price = prev.get("last_price") - cur_pnl = pos.get("pnl_pct") - prev_pnl = prev.get("pnl_pct") - if cur_price and prev_price and abs(cur_price - prev_price) / max(prev_price, 1e-9) >= 0.02: - delta += 0.5 - if cur_pnl is not None and prev_pnl is not None and abs(cur_pnl - prev_pnl) >= 0.03: - delta += 0.5 - last_leader = state.get("last_top_candidate") - if current_leader and last_leader and current_leader.get("symbol") != last_leader.get("symbol"): - delta += 1.0 - for band in ("major", "mid", "meme"): - cur_band = current_layers.get(band, []) - prev_band = last_layers.get(band, []) - cur_l = cur_band[0] if cur_band else None - prev_l = prev_band[0] if prev_band else None - if cur_l and prev_l and cur_l.get("symbol") != prev_l.get("symbol"): - delta += 0.5 - if last_market_regime: - if current_market.get("btc_regime") != last_market_regime.get("btc_regime"): - delta += 1.5 - if current_market.get("eth_regime") != last_market_regime.get("eth_regime"): - delta += 1.5 - if last_portfolio_value not in (None, 0): - portfolio_delta = abs(snapshot["portfolio_value_usdt"] - last_portfolio_value) / max(last_portfolio_value, 1e-9) - if portfolio_delta >= 0.05: - delta += 1.0 - last_trigger_hard_types = {r.split(":")[0] for r in (state.get("last_trigger_hard_reasons") or [])} - current_hard_types = {r.split(":")[0] for r in hard_reasons} - if current_hard_types - last_trigger_hard_types: - delta += 2.0 - return delta - - signal_delta = _signal_delta() - effective_cooldown = cooldown_minutes - if signal_delta < 1.0: - effective_cooldown = max(cooldown_minutes, 90) - elif signal_delta >= 2.5: - effective_cooldown = max(0, cooldown_minutes - 15) - - cooldown_active = bool(last_triggered_at and now - last_triggered_at < timedelta(minutes=effective_cooldown)) - - dedup_window = timedelta(minutes=HARD_REASON_DEDUP_MINUTES) - for hr in list(hard_reasons): - last_at = parse_ts(last_hard_reasons_at.get(hr)) - if last_at and now - last_at < dedup_window: - hard_reasons.remove(hr) - details.append(f"{hr} 近期已触发,{HARD_REASON_DEDUP_MINUTES}分钟内去重") - - hard_trigger = bool(hard_reasons) - if profile.get("dust_mode") and not hard_trigger and soft_score < soft_score_threshold + 1.0: - details.append("微型资金/粉尘仓位模式:抬高软触发门槛,避免无意义分析") - - if profile.get("dust_mode") and not profile.get("new_entries_allowed") and any( - r in {"new-leader", "candidate-leader-init"} for r in soft_reasons - ): - details.append("当前可用资金低于可执行阈值,新候选币仅做观察,不单独触发深度分析") - soft_score = max(0.0, soft_score - 0.75) - - should_analyze = hard_trigger or soft_score >= soft_score_threshold - - if cooldown_active and not hard_trigger and should_analyze: - should_analyze = False - details.append(f"处于 {cooldown_minutes} 分钟冷却窗口,软触发先记录不升级") - - if cooldown_active and not hard_trigger and reasons and soft_score < soft_score_threshold: - details.append(f"处于 {cooldown_minutes} 分钟冷却窗口,且软信号强度不足 ({soft_score:.2f} < {soft_score_threshold:.2f})") - - status = "deep_analysis_required" if should_analyze else "stable" - - compact_lines = [ - f"状态: {status}", - f"组合净值: ${snapshot['portfolio_value_usdt']:.4f} | 可用USDT: ${snapshot['free_usdt']:.4f}", - f"本地时段: {snapshot['session']} | 时区: {snapshot['timezone']}", - f"BTC/ETH: {market.get('btc_regime')} ({market.get('btc_24h_pct')}%), {market.get('eth_regime')} ({market.get('eth_24h_pct')}%) | 波动分数 {market.get('volatility_score')}", - f"门控画像: capital={profile['capital_band']}, session={profile['session_mode']}, volatility={profile['volatility_mode']}, dust={profile['dust_mode']}", - f"阈值: price={price_trigger:.1%}, pnl={pnl_trigger:.1%}, portfolio={portfolio_trigger:.1%}, candidate={candidate_ratio_trigger:.2f}, cooldown={effective_cooldown}m({cooldown_minutes}m基础), force={force_minutes}m", - f"软信号分: {soft_score:.2f} / {soft_score_threshold:.2f}", - f"信号变化度: {signal_delta:.1f}", - ] - if snapshot["positions"]: - compact_lines.append("持仓:") - for pos in snapshot["positions"][:4]: - pnl = pos.get("pnl_pct") - pnl_text = f"{pnl:+.1%}" if pnl is not None else "n/a" - compact_lines.append( - f"- {pos['symbol']}: qty={pos['quantity']}, px={pos.get('last_price')}, pnl={pnl_text}, value=${pos.get('market_value_usdt')}" - ) - else: - compact_lines.append("持仓: 当前无现货仓位") - if snapshot["top_candidates"]: - compact_lines.append("候选榜:") - for cand in snapshot["top_candidates"]: - compact_lines.append( - f"- {cand['symbol']}: score={cand['score']}, 24h={cand['change_24h_pct']}%, vol=${cand['volume_24h']}" - ) - layers = snapshot.get("top_candidates_layers", {}) - for band, band_cands in layers.items(): - if band_cands: - compact_lines.append(f"{band} 层:") - for cand in band_cands: - compact_lines.append( - f"- {cand['symbol']}: score={cand['score']}, 24h={cand['change_24h_pct']}%, vol=${cand['volume_24h']}" - ) - if details: - compact_lines.append("触发说明:") - for item in details: - compact_lines.append(f"- {item}") - - return { - "generated_at": snapshot["generated_at"], - "status": status, - "should_analyze": should_analyze, - "pending_trigger": pending_trigger, - "run_requested": bool(run_requested_at), - "run_requested_at": run_requested_at.isoformat() if run_requested_at else None, - "cooldown_active": cooldown_active, - "effective_cooldown_minutes": effective_cooldown, - "signal_delta": round(signal_delta, 2), - "reasons": reasons, - "hard_reasons": hard_reasons, - "soft_reasons": soft_reasons, - "soft_score": round(soft_score, 3), - "adaptive_profile": profile, - "portfolio_value_usdt": snapshot["portfolio_value_usdt"], - "free_usdt": snapshot["free_usdt"], - "market_regime": snapshot["market_regime"], - "session": snapshot["session"], - "positions": snapshot["positions"], - "top_candidates": snapshot["top_candidates"], - "top_candidates_layers": layers, - "snapshot_hash": snapshot["snapshot_hash"], - "compact_summary": "\n".join(compact_lines), - "details": details, - } - - -def update_state_after_observation(state: dict, snapshot: dict, analysis: dict): - new_state = dict(state) - new_state.update({ - "last_observed_at": snapshot["generated_at"], - "last_snapshot_hash": snapshot["snapshot_hash"], - "last_positions_hash": snapshot["positions_hash"], - "last_candidates_hash": snapshot["candidates_hash"], - "last_portfolio_value_usdt": snapshot["portfolio_value_usdt"], - "last_market_regime": snapshot["market_regime"], - "last_positions_map": { - p["symbol"]: {"last_price": p.get("last_price"), "pnl_pct": p.get("pnl_pct")} - for p in snapshot["positions"] - }, - "last_top_candidate": snapshot["top_candidates"][0] if snapshot["top_candidates"] else None, - "last_candidates_layers": snapshot.get("top_candidates_layers", {}), - "last_adaptive_profile": analysis.get("adaptive_profile", {}), - }) - if analysis["should_analyze"]: - new_state["pending_trigger"] = True - new_state["pending_reasons"] = analysis["details"] - new_state["last_triggered_at"] = snapshot["generated_at"] - new_state["last_trigger_snapshot_hash"] = snapshot["snapshot_hash"] - new_state["last_trigger_hard_reasons"] = analysis.get("hard_reasons", []) - new_state["last_trigger_signal_delta"] = analysis.get("signal_delta", 0.0) - - last_hard_reasons_at = dict(state.get("last_hard_reasons_at", {})) - for hr in analysis.get("hard_reasons", []): - last_hard_reasons_at[hr] = snapshot["generated_at"] - cutoff = utc_now() - timedelta(hours=24) - pruned = {k: v for k, v in last_hard_reasons_at.items() if parse_ts(v) and parse_ts(v) > cutoff} - new_state["last_hard_reasons_at"] = pruned - return new_state +from importlib import import_module + +_MODULE_MAP = { + "PATHS": ".precheck_constants", + "BASE_DIR": ".precheck_constants", + "STATE_DIR": ".precheck_constants", + "STATE_FILE": ".precheck_constants", + "POSITIONS_FILE": ".precheck_constants", + "CONFIG_FILE": ".precheck_constants", + "ENV_FILE": ".precheck_constants", + "BASE_PRICE_MOVE_TRIGGER_PCT": ".precheck_constants", + "BASE_PNL_TRIGGER_PCT": ".precheck_constants", + "BASE_PORTFOLIO_MOVE_TRIGGER_PCT": ".precheck_constants", + "BASE_CANDIDATE_SCORE_TRIGGER_RATIO": ".precheck_constants", + "BASE_FORCE_ANALYSIS_AFTER_MINUTES": ".precheck_constants", + "BASE_COOLDOWN_MINUTES": ".precheck_constants", + "TOP_CANDIDATES": ".precheck_constants", + "MIN_ACTIONABLE_USDT": ".precheck_constants", + "MIN_REAL_POSITION_VALUE_USDT": ".precheck_constants", + "BLACKLIST": ".precheck_constants", + "HARD_STOP_PCT": ".precheck_constants", + "HARD_MOON_PCT": ".precheck_constants", + "MIN_CHANGE_PCT": ".precheck_constants", + "MAX_PRICE_CAP": ".precheck_constants", + "HARD_REASON_DEDUP_MINUTES": ".precheck_constants", + "MAX_PENDING_TRIGGER_MINUTES": ".precheck_constants", + "MAX_RUN_REQUEST_MINUTES": ".precheck_constants", + "utc_now": ".time_utils", + "utc_iso": ".time_utils", + "parse_ts": ".time_utils", + "get_local_now": ".time_utils", + "session_label": ".time_utils", + "load_json": ".data_utils", + "stable_hash": ".data_utils", + "to_float": ".data_utils", + "norm_symbol": ".data_utils", + "load_env": ".state_manager", + "load_positions": ".state_manager", + "load_state": ".state_manager", + "load_config": ".state_manager", + "clear_run_request_fields": ".state_manager", + "sanitize_state_for_stale_triggers": ".state_manager", + "save_state": ".state_manager", + "update_state_after_observation": ".state_manager", + "get_exchange": ".market_data", + "fetch_ohlcv_batch": ".market_data", + "compute_ohlcv_metrics": ".market_data", + "enrich_candidates_and_positions": ".market_data", + "regime_from_pct": ".market_data", + "_liquidity_score": ".candidate_scoring", + "_breakout_score": ".candidate_scoring", + "top_candidates_from_tickers": ".candidate_scoring", + "build_snapshot": ".snapshot_builder", + "build_adaptive_profile": ".adaptive_profile", + "_candidate_weight": ".adaptive_profile", + "analyze_trigger": ".trigger_analyzer", +} + +__all__ = sorted(set(_MODULE_MAP) | {"main"}) + + +def __getattr__(name: str): + if name not in _MODULE_MAP: + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") + module_name = _MODULE_MAP[name] + module = import_module(module_name, __package__) + return getattr(module, name) + + +def __dir__(): + return sorted(set(globals()) | set(__all__)) + + +def main(): + from .precheck_service import run as _run_service + import sys + return _run_service(sys.argv[1:]) + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/src/coinhunter/services/precheck_snapshot.py b/src/coinhunter/services/precheck_snapshot.py index 649ffaf..4d0c957 100644 --- a/src/coinhunter/services/precheck_snapshot.py +++ b/src/coinhunter/services/precheck_snapshot.py @@ -2,8 +2,4 @@ from __future__ import annotations -from . import precheck_core - - -def build_snapshot() -> dict: - return precheck_core.build_snapshot() +from .snapshot_builder import build_snapshot diff --git a/src/coinhunter/services/precheck_state.py b/src/coinhunter/services/precheck_state.py index c6665fd..29018e4 100644 --- a/src/coinhunter/services/precheck_state.py +++ b/src/coinhunter/services/precheck_state.py @@ -4,28 +4,18 @@ from __future__ import annotations import json -from . import precheck_core - - -def load_state() -> dict: - return precheck_core.load_state() - - -def save_state(state: dict) -> None: - precheck_core.save_state(state) - - -def sanitize_state_for_stale_triggers(state: dict) -> dict: - return precheck_core.sanitize_state_for_stale_triggers(state) - - -def update_state_after_observation(state: dict, snapshot: dict, analysis: dict) -> dict: - return precheck_core.update_state_after_observation(state, snapshot, analysis) +from .state_manager import ( + load_state, + sanitize_state_for_stale_triggers, + save_state, + update_state_after_observation, +) +from .time_utils import utc_iso def mark_run_requested(note: str = "") -> dict: state = load_state() - state["run_requested_at"] = precheck_core.utc_iso() + state["run_requested_at"] = utc_iso() state["run_request_note"] = note save_state(state) payload = {"ok": True, "run_requested_at": state["run_requested_at"], "note": note} @@ -35,7 +25,7 @@ def mark_run_requested(note: str = "") -> dict: def ack_analysis(note: str = "") -> dict: state = load_state() - state["last_deep_analysis_at"] = precheck_core.utc_iso() + state["last_deep_analysis_at"] = utc_iso() state["pending_trigger"] = False state["pending_reasons"] = [] state["last_ack_note"] = note diff --git a/src/coinhunter/services/snapshot_builder.py b/src/coinhunter/services/snapshot_builder.py new file mode 100644 index 0000000..6c16a80 --- /dev/null +++ b/src/coinhunter/services/snapshot_builder.py @@ -0,0 +1,110 @@ +"""Snapshot construction for precheck.""" + +from __future__ import annotations + +from .candidate_scoring import top_candidates_from_tickers +from .data_utils import norm_symbol, stable_hash, to_float +from .market_data import enrich_candidates_and_positions, get_exchange, regime_from_pct +from .precheck_constants import MIN_ACTIONABLE_USDT, MIN_REAL_POSITION_VALUE_USDT +from .state_manager import load_config, load_positions +from .time_utils import get_local_now, utc_iso + + +def build_snapshot(): + config = load_config() + local_dt, tz_name = get_local_now(config) + ex = get_exchange() + positions = load_positions() + tickers = ex.fetch_tickers() + balances = ex.fetch_balance()["free"] + free_usdt = to_float(balances.get("USDT")) + + positions_view = [] + total_position_value = 0.0 + largest_position_value = 0.0 + actionable_positions = 0 + for pos in positions: + symbol = pos.get("symbol") or "" + sym_ccxt = norm_symbol(symbol) + ticker = tickers.get(sym_ccxt, {}) + last = to_float(ticker.get("last"), None) + qty = to_float(pos.get("quantity")) + avg_cost = to_float(pos.get("avg_cost"), None) + value = round(qty * last, 4) if last is not None else None + pnl_pct = round((last - avg_cost) / avg_cost, 4) if last is not None and avg_cost else None + high = to_float(ticker.get("high")) + low = to_float(ticker.get("low")) + distance_from_high = (high - last) / max(high, 1e-9) if high and last else None + if value is not None: + total_position_value += value + largest_position_value = max(largest_position_value, value) + if value >= MIN_REAL_POSITION_VALUE_USDT: + actionable_positions += 1 + positions_view.append({ + "symbol": symbol, + "base_asset": pos.get("base_asset"), + "quantity": qty, + "avg_cost": avg_cost, + "last_price": last, + "market_value_usdt": value, + "pnl_pct": pnl_pct, + "high_24h": round(high, 8) if high else None, + "low_24h": round(low, 8) if low else None, + "distance_from_high_pct": round(distance_from_high * 100, 2) if distance_from_high is not None else None, + }) + + btc_pct = to_float((tickers.get("BTC/USDT") or {}).get("percentage"), None) + eth_pct = to_float((tickers.get("ETH/USDT") or {}).get("percentage"), None) + global_candidates, candidate_layers = top_candidates_from_tickers(tickers) + global_candidates, candidate_layers, positions_view = enrich_candidates_and_positions( + global_candidates, candidate_layers, positions_view, tickers, ex + ) + leader_score = global_candidates[0]["score"] if global_candidates else 0.0 + portfolio_value = round(free_usdt + total_position_value, 4) + volatility_score = round(max(abs(to_float(btc_pct, 0)), abs(to_float(eth_pct, 0))), 2) + + position_structure = [ + { + "symbol": p.get("symbol"), + "base_asset": p.get("base_asset"), + "quantity": round(to_float(p.get("quantity"), 0), 10), + "avg_cost": to_float(p.get("avg_cost"), None), + } + for p in positions_view + ] + + snapshot = { + "generated_at": utc_iso(), + "timezone": tz_name, + "local_time": local_dt.isoformat(), + "session": get_local_now(config)[0] if False else None, # will be replaced below + "free_usdt": round(free_usdt, 4), + "portfolio_value_usdt": portfolio_value, + "largest_position_value_usdt": round(largest_position_value, 4), + "actionable_positions": actionable_positions, + "positions": positions_view, + "positions_hash": stable_hash(position_structure), + "top_candidates": global_candidates, + "top_candidates_layers": candidate_layers, + "candidates_hash": stable_hash({"global": global_candidates, "layers": candidate_layers}), + "market_regime": { + "btc_24h_pct": round(btc_pct, 2) if btc_pct is not None else None, + "btc_regime": regime_from_pct(btc_pct), + "eth_24h_pct": round(eth_pct, 2) if eth_pct is not None else None, + "eth_regime": regime_from_pct(eth_pct), + "volatility_score": volatility_score, + "leader_score": round(leader_score, 4), + }, + } + # fix session after the fact to avoid re-fetching config + snapshot["session"] = None + from .time_utils import session_label + snapshot["session"] = session_label(local_dt) + snapshot["snapshot_hash"] = stable_hash({ + "portfolio_value_usdt": snapshot["portfolio_value_usdt"], + "positions_hash": snapshot["positions_hash"], + "candidates_hash": snapshot["candidates_hash"], + "market_regime": snapshot["market_regime"], + "session": snapshot["session"], + }) + return snapshot diff --git a/src/coinhunter/services/state_manager.py b/src/coinhunter/services/state_manager.py new file mode 100644 index 0000000..d6a7b11 --- /dev/null +++ b/src/coinhunter/services/state_manager.py @@ -0,0 +1,128 @@ +"""State management for precheck workflows.""" + +from __future__ import annotations + +from datetime import timedelta + +from ..runtime import load_env_file +from .data_utils import load_json +from .precheck_constants import ( + CONFIG_FILE, + ENV_FILE, + MAX_PENDING_TRIGGER_MINUTES, + MAX_RUN_REQUEST_MINUTES, + POSITIONS_FILE, + STATE_DIR, + STATE_FILE, +) +from .time_utils import parse_ts, utc_iso, utc_now + + +def load_env() -> None: + load_env_file() + + +def load_positions(): + return load_json(POSITIONS_FILE, {}).get("positions", []) + + +def load_state(): + return load_json(STATE_FILE, {}) + + +def load_config(): + return load_json(CONFIG_FILE, {}) + + +def clear_run_request_fields(state: dict): + state.pop("run_requested_at", None) + state.pop("run_request_note", None) + + +def sanitize_state_for_stale_triggers(state: dict): + sanitized = dict(state) + notes = [] + now = utc_now() + run_requested_at = parse_ts(sanitized.get("run_requested_at")) + last_deep_analysis_at = parse_ts(sanitized.get("last_deep_analysis_at")) + last_triggered_at = parse_ts(sanitized.get("last_triggered_at")) + pending_trigger = bool(sanitized.get("pending_trigger")) + + if run_requested_at and last_deep_analysis_at and last_deep_analysis_at >= run_requested_at: + clear_run_request_fields(sanitized) + if pending_trigger and (not last_triggered_at or last_deep_analysis_at >= last_triggered_at): + sanitized["pending_trigger"] = False + sanitized["pending_reasons"] = [] + sanitized["last_ack_note"] = ( + f"auto-cleared completed trigger at {utc_iso()} because last_deep_analysis_at >= run_requested_at" + ) + pending_trigger = False + notes.append( + f"自动清理已完成的 run_requested 标记:最近深度分析时间 {last_deep_analysis_at.isoformat()} >= 请求时间 {run_requested_at.isoformat()}" + ) + run_requested_at = None + + if run_requested_at and now - run_requested_at > timedelta(minutes=MAX_RUN_REQUEST_MINUTES): + clear_run_request_fields(sanitized) + notes.append( + f"自动清理超时 run_requested 标记:已等待 {(now - run_requested_at).total_seconds() / 60:.1f} 分钟,超过 {MAX_RUN_REQUEST_MINUTES} 分钟" + ) + run_requested_at = None + + pending_anchor = run_requested_at or last_triggered_at or last_deep_analysis_at + if pending_trigger and pending_anchor and now - pending_anchor > timedelta(minutes=MAX_PENDING_TRIGGER_MINUTES): + sanitized["pending_trigger"] = False + sanitized["pending_reasons"] = [] + sanitized["last_ack_note"] = ( + f"auto-recovered stale pending trigger at {utc_iso()} after waiting " + f"{(now - pending_anchor).total_seconds() / 60:.1f} minutes" + ) + notes.append( + f"自动解除 pending_trigger:触发状态已悬挂 {(now - pending_anchor).total_seconds() / 60:.1f} 分钟,超过 {MAX_PENDING_TRIGGER_MINUTES} 分钟" + ) + + sanitized["_stale_recovery_notes"] = notes + return sanitized + + +def save_state(state: dict): + import json + + STATE_DIR.mkdir(parents=True, exist_ok=True) + state_to_save = dict(state) + state_to_save.pop("_stale_recovery_notes", None) + STATE_FILE.write_text(json.dumps(state_to_save, indent=2, ensure_ascii=False), encoding="utf-8") + + +def update_state_after_observation(state: dict, snapshot: dict, analysis: dict): + new_state = dict(state) + new_state.update({ + "last_observed_at": snapshot["generated_at"], + "last_snapshot_hash": snapshot["snapshot_hash"], + "last_positions_hash": snapshot["positions_hash"], + "last_candidates_hash": snapshot["candidates_hash"], + "last_portfolio_value_usdt": snapshot["portfolio_value_usdt"], + "last_market_regime": snapshot["market_regime"], + "last_positions_map": { + p["symbol"]: {"last_price": p.get("last_price"), "pnl_pct": p.get("pnl_pct")} + for p in snapshot["positions"] + }, + "last_top_candidate": snapshot["top_candidates"][0] if snapshot["top_candidates"] else None, + "last_candidates_layers": snapshot.get("top_candidates_layers", {}), + "last_adaptive_profile": analysis.get("adaptive_profile", {}), + }) + if analysis["should_analyze"]: + new_state["pending_trigger"] = True + new_state["pending_reasons"] = analysis["details"] + new_state["last_triggered_at"] = snapshot["generated_at"] + new_state["last_trigger_snapshot_hash"] = snapshot["snapshot_hash"] + new_state["last_trigger_hard_reasons"] = analysis.get("hard_reasons", []) + new_state["last_trigger_signal_delta"] = analysis.get("signal_delta", 0.0) + + last_hard_reasons_at = dict(state.get("last_hard_reasons_at", {})) + for hr in analysis.get("hard_reasons", []): + last_hard_reasons_at[hr] = snapshot["generated_at"] + cutoff = utc_now() - timedelta(hours=24) + pruned = {k: v for k, v in last_hard_reasons_at.items() if parse_ts(v) and parse_ts(v) > cutoff} + new_state["last_hard_reasons_at"] = pruned + return new_state diff --git a/src/coinhunter/services/time_utils.py b/src/coinhunter/services/time_utils.py new file mode 100644 index 0000000..ad2be63 --- /dev/null +++ b/src/coinhunter/services/time_utils.py @@ -0,0 +1,49 @@ +"""Time utilities for precheck.""" + +from __future__ import annotations + +from datetime import datetime, timedelta, timezone +from zoneinfo import ZoneInfo + + +def utc_now() -> datetime: + return datetime.now(timezone.utc) + + +def utc_iso() -> str: + return utc_now().isoformat() + + +def parse_ts(value: str | None) -> datetime | None: + if not value: + return None + try: + ts = datetime.fromisoformat(value) + if ts.tzinfo is None: + ts = ts.replace(tzinfo=timezone.utc) + return ts + except Exception: + return None + + +def get_local_now(config: dict) -> tuple[datetime, str]: + tz_name = config.get("timezone") or "Asia/Shanghai" + try: + tz = ZoneInfo(tz_name) + except Exception: + tz = ZoneInfo("Asia/Shanghai") + tz_name = "Asia/Shanghai" + return utc_now().astimezone(tz), tz_name + + +def session_label(local_dt: datetime) -> str: + hour = local_dt.hour + if 0 <= hour < 7: + return "overnight" + if 7 <= hour < 12: + return "asia-morning" + if 12 <= hour < 17: + return "asia-afternoon" + if 17 <= hour < 21: + return "europe-open" + return "us-session" diff --git a/src/coinhunter/services/trigger_analyzer.py b/src/coinhunter/services/trigger_analyzer.py new file mode 100644 index 0000000..732e0fd --- /dev/null +++ b/src/coinhunter/services/trigger_analyzer.py @@ -0,0 +1,317 @@ +"""Trigger analysis logic for precheck.""" + +from __future__ import annotations + +from datetime import timedelta + +from .adaptive_profile import _candidate_weight, build_adaptive_profile +from .data_utils import to_float +from .precheck_constants import ( + HARD_MOON_PCT, + HARD_REASON_DEDUP_MINUTES, + HARD_STOP_PCT, + MAX_PENDING_TRIGGER_MINUTES, + MAX_RUN_REQUEST_MINUTES, + MIN_REAL_POSITION_VALUE_USDT, +) +from .time_utils import parse_ts, utc_iso, utc_now + + +def analyze_trigger(snapshot: dict, state: dict): + reasons = [] + details = list(state.get("_stale_recovery_notes", [])) + hard_reasons = [] + soft_reasons = [] + soft_score = 0.0 + + profile = build_adaptive_profile(snapshot) + market = snapshot.get("market_regime", {}) + now = utc_now() + + last_positions_hash = state.get("last_positions_hash") + last_portfolio_value = state.get("last_portfolio_value_usdt") + last_market_regime = state.get("last_market_regime", {}) + last_positions_map = state.get("last_positions_map", {}) + last_top_candidate = state.get("last_top_candidate") + pending_trigger = bool(state.get("pending_trigger")) + run_requested_at = parse_ts(state.get("run_requested_at")) + last_deep_analysis_at = parse_ts(state.get("last_deep_analysis_at")) + last_triggered_at = parse_ts(state.get("last_triggered_at")) + last_trigger_snapshot_hash = state.get("last_trigger_snapshot_hash") + last_hard_reasons_at = state.get("last_hard_reasons_at", {}) + + price_trigger = profile["price_move_trigger_pct"] + pnl_trigger = profile["pnl_trigger_pct"] + portfolio_trigger = profile["portfolio_move_trigger_pct"] + candidate_ratio_trigger = profile["candidate_score_trigger_ratio"] + force_minutes = profile["force_analysis_after_minutes"] + cooldown_minutes = profile["cooldown_minutes"] + soft_score_threshold = profile["soft_score_threshold"] + + if pending_trigger: + reasons.append("pending-trigger-unacked") + hard_reasons.append("pending-trigger-unacked") + details.append("上次已触发深度分析但尚未确认完成") + if run_requested_at: + details.append(f"外部门控已在 {run_requested_at.isoformat()} 请求运行分析任务") + + if not last_deep_analysis_at: + reasons.append("first-analysis") + hard_reasons.append("first-analysis") + details.append("尚未记录过深度分析") + elif now - last_deep_analysis_at >= timedelta(minutes=force_minutes): + reasons.append("stale-analysis") + hard_reasons.append("stale-analysis") + details.append(f"距离上次深度分析已超过 {force_minutes} 分钟") + + if last_positions_hash and snapshot["positions_hash"] != last_positions_hash: + reasons.append("positions-changed") + hard_reasons.append("positions-changed") + details.append("持仓结构发生变化") + + if last_portfolio_value not in (None, 0): + portfolio_delta = abs(snapshot["portfolio_value_usdt"] - last_portfolio_value) / max(last_portfolio_value, 1e-9) + if portfolio_delta >= portfolio_trigger: + if portfolio_delta >= 1.0: + reasons.append("portfolio-extreme-move") + hard_reasons.append("portfolio-extreme-move") + details.append(f"组合净值剧烈变化 {portfolio_delta:.1%},超过 100%,视为硬触发") + else: + reasons.append("portfolio-move") + soft_reasons.append("portfolio-move") + soft_score += 1.0 + details.append(f"组合净值变化 {portfolio_delta:.1%},阈值 {portfolio_trigger:.1%}") + + for pos in snapshot["positions"]: + symbol = pos["symbol"] + prev = last_positions_map.get(symbol, {}) + cur_price = pos.get("last_price") + prev_price = prev.get("last_price") + cur_pnl = pos.get("pnl_pct") + prev_pnl = prev.get("pnl_pct") + market_value = to_float(pos.get("market_value_usdt"), 0) + actionable_position = market_value >= MIN_REAL_POSITION_VALUE_USDT + + if cur_price and prev_price: + price_move = abs(cur_price - prev_price) / max(prev_price, 1e-9) + if price_move >= price_trigger: + reasons.append(f"price-move:{symbol}") + soft_reasons.append(f"price-move:{symbol}") + soft_score += 1.0 if actionable_position else 0.4 + details.append(f"{symbol} 价格变化 {price_move:.1%},阈值 {price_trigger:.1%}") + if cur_pnl is not None and prev_pnl is not None: + pnl_move = abs(cur_pnl - prev_pnl) + if pnl_move >= pnl_trigger: + reasons.append(f"pnl-move:{symbol}") + soft_reasons.append(f"pnl-move:{symbol}") + soft_score += 1.0 if actionable_position else 0.4 + details.append(f"{symbol} 盈亏变化 {pnl_move:.1%},阈值 {pnl_trigger:.1%}") + if cur_pnl is not None: + stop_band = -0.06 if actionable_position else -0.12 + take_band = 0.14 if actionable_position else 0.25 + if cur_pnl <= stop_band or cur_pnl >= take_band: + reasons.append(f"risk-band:{symbol}") + hard_reasons.append(f"risk-band:{symbol}") + details.append(f"{symbol} 接近执行阈值,当前盈亏 {cur_pnl:.1%}") + if cur_pnl <= HARD_STOP_PCT: + reasons.append(f"hard-stop:{symbol}") + hard_reasons.append(f"hard-stop:{symbol}") + details.append(f"{symbol} 盈亏超过 {HARD_STOP_PCT:.1%},触发紧急硬触发") + + current_market = snapshot.get("market_regime", {}) + if last_market_regime: + if current_market.get("btc_regime") != last_market_regime.get("btc_regime"): + reasons.append("btc-regime-change") + hard_reasons.append("btc-regime-change") + details.append(f"BTC 由 {last_market_regime.get('btc_regime')} 切换为 {current_market.get('btc_regime')}") + if current_market.get("eth_regime") != last_market_regime.get("eth_regime"): + reasons.append("eth-regime-change") + hard_reasons.append("eth-regime-change") + details.append(f"ETH 由 {last_market_regime.get('eth_regime')} 切换为 {current_market.get('eth_regime')}") + + for cand in snapshot.get("top_candidates", []): + if cand.get("change_24h_pct", 0) >= HARD_MOON_PCT * 100: + reasons.append(f"hard-moon:{cand['symbol']}") + hard_reasons.append(f"hard-moon:{cand['symbol']}") + details.append(f"候选币 {cand['symbol']} 24h 涨幅 {cand['change_24h_pct']:.1f}%,触发强势硬触发") + + candidate_weight = _candidate_weight(snapshot, profile) + + last_layers = state.get("last_candidates_layers", {}) + current_layers = snapshot.get("top_candidates_layers", {}) + for band in ("major", "mid", "meme"): + cur_band = current_layers.get(band, []) + prev_band = last_layers.get(band, []) + cur_leader = cur_band[0] if cur_band else None + prev_leader = prev_band[0] if prev_band else None + if cur_leader and prev_leader and cur_leader["symbol"] != prev_leader["symbol"]: + score_ratio = cur_leader.get("score", 0) / max(prev_leader.get("score", 0.0001), 0.0001) + if score_ratio >= candidate_ratio_trigger: + reasons.append(f"new-leader-{band}:{cur_leader['symbol']}") + soft_reasons.append(f"new-leader-{band}:{cur_leader['symbol']}") + soft_score += candidate_weight * 0.7 + details.append( + f"{band} 层新榜首 {cur_leader['symbol']} 替代 {prev_leader['symbol']},score 比例 {score_ratio:.2f}" + ) + + current_leader = snapshot.get("top_candidates", [{}])[0] if snapshot.get("top_candidates") else None + if last_top_candidate and current_leader: + if current_leader.get("symbol") != last_top_candidate.get("symbol"): + score_ratio = current_leader.get("score", 0) / max(last_top_candidate.get("score", 0.0001), 0.0001) + if score_ratio >= candidate_ratio_trigger: + reasons.append("new-leader") + soft_reasons.append("new-leader") + soft_score += candidate_weight + details.append( + f"新候选币 {current_leader.get('symbol')} 领先上次榜首,score 比例 {score_ratio:.2f},阈值 {candidate_ratio_trigger:.2f}" + ) + elif current_leader and not last_top_candidate: + reasons.append("candidate-leader-init") + soft_reasons.append("candidate-leader-init") + soft_score += candidate_weight + details.append(f"首次记录候选榜首 {current_leader.get('symbol')}") + + def _signal_delta() -> float: + delta = 0.0 + if last_trigger_snapshot_hash and snapshot.get("snapshot_hash") != last_trigger_snapshot_hash: + delta += 0.5 + if snapshot["positions_hash"] != last_positions_hash: + delta += 1.5 + for pos in snapshot["positions"]: + symbol = pos["symbol"] + prev = last_positions_map.get(symbol, {}) + cur_price = pos.get("last_price") + prev_price = prev.get("last_price") + cur_pnl = pos.get("pnl_pct") + prev_pnl = prev.get("pnl_pct") + if cur_price and prev_price and abs(cur_price - prev_price) / max(prev_price, 1e-9) >= 0.02: + delta += 0.5 + if cur_pnl is not None and prev_pnl is not None and abs(cur_pnl - prev_pnl) >= 0.03: + delta += 0.5 + last_leader = state.get("last_top_candidate") + if current_leader and last_leader and current_leader.get("symbol") != last_leader.get("symbol"): + delta += 1.0 + for band in ("major", "mid", "meme"): + cur_band = current_layers.get(band, []) + prev_band = last_layers.get(band, []) + cur_l = cur_band[0] if cur_band else None + prev_l = prev_band[0] if prev_band else None + if cur_l and prev_l and cur_l.get("symbol") != prev_l.get("symbol"): + delta += 0.5 + if last_market_regime: + if current_market.get("btc_regime") != last_market_regime.get("btc_regime"): + delta += 1.5 + if current_market.get("eth_regime") != last_market_regime.get("eth_regime"): + delta += 1.5 + if last_portfolio_value not in (None, 0): + portfolio_delta = abs(snapshot["portfolio_value_usdt"] - last_portfolio_value) / max(last_portfolio_value, 1e-9) + if portfolio_delta >= 0.05: + delta += 1.0 + last_trigger_hard_types = {r.split(":")[0] for r in (state.get("last_trigger_hard_reasons") or [])} + current_hard_types = {r.split(":")[0] for r in hard_reasons} + if current_hard_types - last_trigger_hard_types: + delta += 2.0 + return delta + + signal_delta = _signal_delta() + effective_cooldown = cooldown_minutes + if signal_delta < 1.0: + effective_cooldown = max(cooldown_minutes, 90) + elif signal_delta >= 2.5: + effective_cooldown = max(0, cooldown_minutes - 15) + + cooldown_active = bool(last_triggered_at and now - last_triggered_at < timedelta(minutes=effective_cooldown)) + + dedup_window = timedelta(minutes=HARD_REASON_DEDUP_MINUTES) + for hr in list(hard_reasons): + last_at = parse_ts(last_hard_reasons_at.get(hr)) + if last_at and now - last_at < dedup_window: + hard_reasons.remove(hr) + details.append(f"{hr} 近期已触发,{HARD_REASON_DEDUP_MINUTES}分钟内去重") + + hard_trigger = bool(hard_reasons) + if profile.get("dust_mode") and not hard_trigger and soft_score < soft_score_threshold + 1.0: + details.append("微型资金/粉尘仓位模式:抬高软触发门槛,避免无意义分析") + + if profile.get("dust_mode") and not profile.get("new_entries_allowed") and any( + r in {"new-leader", "candidate-leader-init"} for r in soft_reasons + ): + details.append("当前可用资金低于可执行阈值,新候选币仅做观察,不单独触发深度分析") + soft_score = max(0.0, soft_score - 0.75) + + should_analyze = hard_trigger or soft_score >= soft_score_threshold + + if cooldown_active and not hard_trigger and should_analyze: + should_analyze = False + details.append(f"处于 {cooldown_minutes} 分钟冷却窗口,软触发先记录不升级") + + if cooldown_active and not hard_trigger and reasons and soft_score < soft_score_threshold: + details.append(f"处于 {cooldown_minutes} 分钟冷却窗口,且软信号强度不足 ({soft_score:.2f} < {soft_score_threshold:.2f})") + + status = "deep_analysis_required" if should_analyze else "stable" + + compact_lines = [ + f"状态: {status}", + f"组合净值: ${snapshot['portfolio_value_usdt']:.4f} | 可用USDT: ${snapshot['free_usdt']:.4f}", + f"本地时段: {snapshot['session']} | 时区: {snapshot['timezone']}", + f"BTC/ETH: {market.get('btc_regime')} ({market.get('btc_24h_pct')}%), {market.get('eth_regime')} ({market.get('eth_24h_pct')}%) | 波动分数 {market.get('volatility_score')}", + f"门控画像: capital={profile['capital_band']}, session={profile['session_mode']}, volatility={profile['volatility_mode']}, dust={profile['dust_mode']}", + f"阈值: price={price_trigger:.1%}, pnl={pnl_trigger:.1%}, portfolio={portfolio_trigger:.1%}, candidate={candidate_ratio_trigger:.2f}, cooldown={effective_cooldown}m({cooldown_minutes}m基础), force={force_minutes}m", + f"软信号分: {soft_score:.2f} / {soft_score_threshold:.2f}", + f"信号变化度: {signal_delta:.1f}", + ] + if snapshot["positions"]: + compact_lines.append("持仓:") + for pos in snapshot["positions"][:4]: + pnl = pos.get("pnl_pct") + pnl_text = f"{pnl:+.1%}" if pnl is not None else "n/a" + compact_lines.append( + f"- {pos['symbol']}: qty={pos['quantity']}, px={pos.get('last_price')}, pnl={pnl_text}, value=${pos.get('market_value_usdt')}" + ) + else: + compact_lines.append("持仓: 当前无现货仓位") + if snapshot["top_candidates"]: + compact_lines.append("候选榜:") + for cand in snapshot["top_candidates"]: + compact_lines.append( + f"- {cand['symbol']}: score={cand['score']}, 24h={cand['change_24h_pct']}%, vol=${cand['volume_24h']}" + ) + layers = snapshot.get("top_candidates_layers", {}) + for band, band_cands in layers.items(): + if band_cands: + compact_lines.append(f"{band} 层:") + for cand in band_cands: + compact_lines.append( + f"- {cand['symbol']}: score={cand['score']}, 24h={cand['change_24h_pct']}%, vol=${cand['volume_24h']}" + ) + if details: + compact_lines.append("触发说明:") + for item in details: + compact_lines.append(f"- {item}") + + return { + "generated_at": snapshot["generated_at"], + "status": status, + "should_analyze": should_analyze, + "pending_trigger": pending_trigger, + "run_requested": bool(run_requested_at), + "run_requested_at": run_requested_at.isoformat() if run_requested_at else None, + "cooldown_active": cooldown_active, + "effective_cooldown_minutes": effective_cooldown, + "signal_delta": round(signal_delta, 2), + "reasons": reasons, + "hard_reasons": hard_reasons, + "soft_reasons": soft_reasons, + "soft_score": round(soft_score, 3), + "adaptive_profile": profile, + "portfolio_value_usdt": snapshot["portfolio_value_usdt"], + "free_usdt": snapshot["free_usdt"], + "market_regime": snapshot["market_regime"], + "session": snapshot["session"], + "positions": snapshot["positions"], + "top_candidates": snapshot["top_candidates"], + "top_candidates_layers": layers, + "snapshot_hash": snapshot["snapshot_hash"], + "compact_summary": "\n".join(compact_lines), + "details": details, + }