refactor: split precheck_core and migrate commands to commands/

- Split 900-line precheck_core.py into 9 focused modules:
  precheck_constants, time_utils, data_utils, state_manager,
  market_data, candidate_scoring, snapshot_builder,
  adaptive_profile, trigger_analyzer
- Remove dead auto_trader command and module
- Migrate 7 root-level command modules into commands/:
  check_api, doctor, external_gate, init_user_state,
  market_probe, paths, rotate_external_gate_log
- Keep thin backward-compatible facades in root package
- Update cli.py MODULE_MAP to route through commands/
- Verified compileall and smoke tests for all key commands
This commit is contained in:
2026-04-15 21:29:18 +08:00
parent db981e8e5f
commit a61c329496
30 changed files with 1662 additions and 1715 deletions

View File

@@ -168,6 +168,20 @@ coinhunter external-gate
coinhunter rotate-external-gate-log
```
Supported top-level commands and aliases remain:
- `check-api` / `api-check`
- `doctor` / `diag`
- `external-gate` / `gate`
- `init`
- `market-probe` / `probe`
- `paths`
- `precheck`
- `review-context` / `review`
- `review-engine` / `recap`
- `rotate-external-gate-log` / `rotate-gate-log` / `rotate-log`
- `smart-executor` / `exec`
## Runtime model
Default layout:

View File

@@ -1,289 +0,0 @@
#!/usr/bin/env python3
"""
Coin Hunter Auto Trader
全自动妖币猎人 + 币安执行器
运行前请在 ~/.hermes/.env 配置:
BINANCE_API_KEY=你的API_KEY
BINANCE_API_SECRET=你的API_SECRET
首次运行建议用 DRY_RUN=True 测试逻辑。
"""
import json
import os
import sys
import time
from datetime import datetime, timezone, timedelta
from pathlib import Path
import ccxt
from .runtime import get_runtime_paths, load_env_file
# ============== 配置 ==============
PATHS = get_runtime_paths()
COINS_DIR = PATHS.root
POSITIONS_FILE = PATHS.positions_file
ENV_FILE = PATHS.env_file
CST = timezone(timedelta(hours=8))
# 风控参数
DRY_RUN = os.getenv("DRY_RUN", "true").lower() == "true" # 默认测试模式
MAX_POSITIONS = 2 # 最大同时持仓数
# 资金配置(根据总资产动态计算)
CAPITAL_ALLOCATION_PCT = 0.95 # 用总资产的95%玩这个策略留5%缓冲给手续费和滑点)
MIN_POSITION_USDT = 50 # 单次最小下单金额(避免过小)
MIN_VOLUME_24H = 1_000_000 # 最小24h成交额 ($)
MIN_PRICE_CHANGE_24H = 0.05 # 最小涨幅 5%
MAX_PRICE = 1.0 # 只玩低价币meme特征
STOP_LOSS_PCT = -0.07 # 止损 -7%
TAKE_PROFIT_1_PCT = 0.15 # 止盈1 +15%
TAKE_PROFIT_2_PCT = 0.30 # 止盈2 +30%
BLACKLIST = {"USDC", "BUSD", "TUSD", "FDUSD", "USTC", "PAXG", "XRP", "ETH", "BTC"}
# ============== 工具函数 ==============
def log(msg: str):
print(f"[{datetime.now(CST).strftime('%Y-%m-%d %H:%M:%S')} CST] {msg}")
def load_positions() -> list:
if POSITIONS_FILE.exists():
return json.loads(POSITIONS_FILE.read_text(encoding="utf-8")).get("positions", [])
return []
def save_positions(positions: list):
COINS_DIR.mkdir(parents=True, exist_ok=True)
POSITIONS_FILE.write_text(json.dumps({"positions": positions}, indent=2, ensure_ascii=False), encoding="utf-8")
def load_env():
load_env_file(PATHS)
def calculate_position_size(total_usdt: float, available_usdt: float, open_slots: int) -> float:
"""
根据总资产动态计算每次下单金额。
逻辑:先确定策略总上限,再按剩余开仓位均分。
"""
strategy_cap = total_usdt * CAPITAL_ALLOCATION_PCT
# 已用于策略的资金约等于总上限 可用余额
used_in_strategy = max(0, strategy_cap - available_usdt)
remaining_strategy_cap = max(0, strategy_cap - used_in_strategy)
if open_slots <= 0 or remaining_strategy_cap < MIN_POSITION_USDT:
return 0
size = remaining_strategy_cap / open_slots
# 同时不能超过当前可用余额
size = min(size, available_usdt)
# 四舍五入到整数
size = max(0, round(size, 2))
return size if size >= MIN_POSITION_USDT else 0
# ============== 币安客户端 ==============
class BinanceTrader:
def __init__(self):
api_key = os.getenv("BINANCE_API_KEY")
secret = os.getenv("BINANCE_API_SECRET")
if not api_key or not secret:
raise RuntimeError("缺少 BINANCE_API_KEY 或 BINANCE_API_SECRET请配置 ~/.hermes/.env")
self.exchange = ccxt.binance({
"apiKey": api_key,
"secret": secret,
"options": {"defaultType": "spot"},
"enableRateLimit": True,
})
self.exchange.load_markets()
def get_balance(self, asset: str = "USDT") -> float:
bal = self.exchange.fetch_balance()["free"].get(asset, 0)
return float(bal)
def fetch_tickers(self) -> dict:
return self.exchange.fetch_tickers()
def create_market_buy_order(self, symbol: str, amount_usdt: float):
if DRY_RUN:
log(f"[DRY RUN] 模拟买入 {symbol},金额 ${amount_usdt}")
return {"id": "dry-run-buy", "price": None, "amount": amount_usdt}
ticker = self.exchange.fetch_ticker(symbol)
price = float(ticker["last"])
qty = amount_usdt / price
order = self.exchange.create_market_buy_order(symbol, qty)
log(f"✅ 买入 {symbol} | 数量 {qty:.4f} | 价格 ~${price}")
return order
def create_market_sell_order(self, symbol: str, qty: float):
if DRY_RUN:
log(f"[DRY RUN] 模拟卖出 {symbol},数量 {qty}")
return {"id": "dry-run-sell"}
order = self.exchange.create_market_sell_order(symbol, qty)
log(f"✅ 卖出 {symbol} | 数量 {qty:.4f}")
return order
# ============== 选币引擎 ==============
class CoinPicker:
def __init__(self, exchange: ccxt.binance):
self.exchange = exchange
def scan(self) -> list:
tickers = self.exchange.fetch_tickers()
candidates = []
for symbol, t in tickers.items():
if not symbol.endswith("/USDT"):
continue
base = symbol.replace("/USDT", "")
if base in BLACKLIST:
continue
price = float(t["last"] or 0)
change = float(t.get("percentage", 0)) / 100
volume = float(t.get("quoteVolume", 0))
if price <= 0 or price > MAX_PRICE:
continue
if volume < MIN_VOLUME_24H:
continue
if change < MIN_PRICE_CHANGE_24H:
continue
score = change * (volume / MIN_VOLUME_24H)
candidates.append({
"symbol": symbol,
"base": base,
"price": price,
"change_24h": change,
"volume_24h": volume,
"score": score,
})
candidates.sort(key=lambda x: x["score"], reverse=True)
return candidates[:5]
# ============== 主控制器 ==============
def run_cycle():
load_env()
trader = BinanceTrader()
picker = CoinPicker(trader.exchange)
positions = load_positions()
log(f"当前持仓数: {len(positions)} | 最大允许: {MAX_POSITIONS} | DRY_RUN={DRY_RUN}")
# 1. 检查现有持仓(止盈止损)
tickers = trader.fetch_tickers()
new_positions = []
for pos in positions:
sym = pos["symbol"]
qty = float(pos["quantity"])
cost = float(pos["avg_cost"])
# ccxt tickers 使用 slash 格式,如 PENGU/USDT
sym_ccxt = sym.replace("USDT", "/USDT") if "/" not in sym else sym
ticker = tickers.get(sym_ccxt)
if not ticker:
new_positions.append(pos)
continue
price = float(ticker["last"])
pnl_pct = (price - cost) / cost
log(f"监控 {sym} | 现价 ${price:.8f} | 成本 ${cost:.8f} | 盈亏 {pnl_pct:+.2%}")
action = None
if pnl_pct <= STOP_LOSS_PCT:
action = "STOP_LOSS"
elif pnl_pct >= TAKE_PROFIT_2_PCT:
action = "TAKE_PROFIT_2"
elif pnl_pct >= TAKE_PROFIT_1_PCT:
# 检查是否已经止盈过一部分
sold_pct = float(pos.get("take_profit_1_sold_pct", 0))
if sold_pct == 0:
action = "TAKE_PROFIT_1"
if action == "STOP_LOSS":
trader.create_market_sell_order(sym, qty)
log(f"🛑 {sym} 触发止损,全部清仓")
continue
if action == "TAKE_PROFIT_1":
sell_qty = qty * 0.5
trader.create_market_sell_order(sym, sell_qty)
pos["quantity"] = qty - sell_qty
pos["take_profit_1_sold_pct"] = 50
pos["updated_at"] = datetime.now(CST).isoformat()
log(f"🎯 {sym} 触发止盈1卖出50%,剩余 {pos['quantity']:.4f}")
new_positions.append(pos)
continue
if action == "TAKE_PROFIT_2":
trader.create_market_sell_order(sym, float(pos["quantity"]))
log(f"🚀 {sym} 触发止盈2全部清仓")
continue
new_positions.append(pos)
# 2. 开新仓
if len(new_positions) < MAX_POSITIONS:
candidates = picker.scan()
held_bases = {p["base_asset"] for p in new_positions}
total_usdt = trader.get_balance("USDT")
# 计算持仓市值并加入总资产
for pos in new_positions:
sym_ccxt = pos["symbol"].replace("USDT", "/USDT") if "/" not in pos["symbol"] else pos["symbol"]
ticker = tickers.get(sym_ccxt)
if ticker:
total_usdt += float(pos["quantity"]) * float(ticker["last"])
available_usdt = trader.get_balance("USDT")
open_slots = MAX_POSITIONS - len(new_positions)
position_size = calculate_position_size(total_usdt, available_usdt, open_slots)
log(f"总资产 USDT: ${total_usdt:.2f} | 策略上限({CAPITAL_ALLOCATION_PCT:.0%}): ${total_usdt*CAPITAL_ALLOCATION_PCT:.2f} | 每仓建议金额: ${position_size:.2f}")
for cand in candidates:
if len(new_positions) >= MAX_POSITIONS:
break
base = cand["base"]
if base in held_bases:
continue
if position_size <= 0:
log("策略资金已用完或余额不足,停止开新仓")
break
symbol = cand["symbol"]
order = trader.create_market_buy_order(symbol, position_size)
avg_price = float(order.get("price") or cand["price"])
qty = position_size / avg_price if avg_price else 0
new_positions.append({
"account_id": "binance-main",
"symbol": symbol.replace("/", ""),
"base_asset": base,
"quote_asset": "USDT",
"market_type": "spot",
"quantity": qty,
"avg_cost": avg_price,
"opened_at": datetime.now(CST).isoformat(),
"updated_at": datetime.now(CST).isoformat(),
"note": "Auto-trader entry",
})
held_bases.add(base)
available_usdt -= position_size
position_size = calculate_position_size(total_usdt, available_usdt, MAX_POSITIONS - len(new_positions))
log(f"📈 新开仓 {symbol} | 买入价 ${avg_price:.8f} | 数量 {qty:.2f}")
save_positions(new_positions)
log("周期结束,持仓已保存")
if __name__ == "__main__":
try:
run_cycle()
except Exception as e:
log(f"❌ 错误: {e}")
sys.exit(1)

View File

@@ -1,26 +1,8 @@
#!/usr/bin/env python3
"""检查自动交易的环境配置是否就绪"""
import os
"""Backward-compatible facade for check_api."""
from .runtime import load_env_file
def main():
load_env_file()
api_key = os.getenv("BINANCE_API_KEY", "")
secret = os.getenv("BINANCE_API_SECRET", "")
if not api_key or api_key.startswith("***") or api_key.startswith("your_"):
print("❌ 未配置 BINANCE_API_KEY")
return 1
if not secret or secret.startswith("***") or secret.startswith("your_"):
print("❌ 未配置 BINANCE_API_SECRET")
return 1
print("✅ API 配置正常")
return 0
from __future__ import annotations
from .commands.check_api import main
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -9,18 +9,17 @@ import sys
from . import __version__
MODULE_MAP = {
"check-api": "check_api",
"doctor": "doctor",
"external-gate": "external_gate",
"init": "init_user_state",
"market-probe": "market_probe",
"paths": "paths",
"check-api": "commands.check_api",
"doctor": "commands.doctor",
"external-gate": "commands.external_gate",
"init": "commands.init_user_state",
"market-probe": "commands.market_probe",
"paths": "commands.paths",
"precheck": "commands.precheck",
"review-context": "review_context",
"review-engine": "review_engine",
"rotate-external-gate-log": "rotate_external_gate_log",
"rotate-external-gate-log": "commands.rotate_external_gate_log",
"smart-executor": "commands.smart_executor",
"auto-trader": "auto_trader",
}
ALIASES = {
@@ -47,7 +46,6 @@ COMMAND_HELP = [
("recap", "review-engine", "Generate review recap/engine output"),
("rotate-gate-log, rotate-log", "rotate-external-gate-log", "Rotate external gate logs"),
("exec", "smart-executor", "Trading and execution actions"),
("auto-trader", None, "Auto trader entrypoint"),
]

View File

@@ -0,0 +1,26 @@
#!/usr/bin/env python3
"""检查自动交易的环境配置是否就绪"""
import os
from ..runtime import load_env_file
def main():
load_env_file()
api_key = os.getenv("BINANCE_API_KEY", "")
secret = os.getenv("BINANCE_API_SECRET", "")
if not api_key or api_key.startswith("***") or api_key.startswith("your_"):
print("❌ 未配置 BINANCE_API_KEY")
return 1
if not secret or secret.startswith("***") or secret.startswith("your_"):
print("❌ 未配置 BINANCE_API_SECRET")
return 1
print("✅ API 配置正常")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,66 @@
"""Runtime diagnostics for CoinHunter CLI."""
from __future__ import annotations
import json
import os
import platform
import shutil
import sys
from ..runtime import ensure_runtime_dirs, get_runtime_paths, load_env_file, resolve_hermes_executable
REQUIRED_ENV_VARS = ["BINANCE_API_KEY", "BINANCE_API_SECRET"]
def main() -> int:
paths = ensure_runtime_dirs(get_runtime_paths())
env_file = load_env_file(paths)
hermes_executable = resolve_hermes_executable(paths)
env_checks = {}
missing_env = []
for name in REQUIRED_ENV_VARS:
present = bool(os.getenv(name))
env_checks[name] = present
if not present:
missing_env.append(name)
file_checks = {
"env_file_exists": env_file.exists(),
"config_exists": paths.config_file.exists(),
"positions_exists": paths.positions_file.exists(),
"logrotate_config_exists": paths.logrotate_config.exists(),
}
dir_checks = {
"root_exists": paths.root.exists(),
"state_dir_exists": paths.state_dir.exists(),
"logs_dir_exists": paths.logs_dir.exists(),
"reviews_dir_exists": paths.reviews_dir.exists(),
"cache_dir_exists": paths.cache_dir.exists(),
}
command_checks = {
"hermes": bool(shutil.which("hermes") or paths.hermes_bin.exists()),
"logrotate": bool(shutil.which("logrotate") or shutil.which("/usr/sbin/logrotate")),
}
report = {
"ok": not missing_env,
"python": sys.version.split()[0],
"platform": platform.platform(),
"env_file": str(env_file),
"hermes_executable": hermes_executable,
"paths": paths.as_dict(),
"env_checks": env_checks,
"missing_env": missing_env,
"file_checks": file_checks,
"dir_checks": dir_checks,
"command_checks": command_checks,
}
print(json.dumps(report, ensure_ascii=False, indent=2))
return 0 if report["ok"] else 1
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,82 @@
#!/usr/bin/env python3
import fcntl
import json
import subprocess
import sys
from datetime import datetime, timezone
from ..runtime import ensure_runtime_dirs, get_runtime_paths, resolve_hermes_executable
PATHS = get_runtime_paths()
STATE_DIR = PATHS.state_dir
LOCK_FILE = PATHS.external_gate_lock
COINHUNTER_MODULE = [sys.executable, "-m", "coinhunter"]
TRADE_JOB_ID = "4e6593fff158"
def utc_now():
return datetime.now(timezone.utc).isoformat()
def log(message: str):
print(f"[{utc_now()}] {message}")
def run_cmd(args: list[str]) -> subprocess.CompletedProcess:
return subprocess.run(args, capture_output=True, text=True)
def parse_json_output(text: str) -> dict:
text = (text or "").strip()
if not text:
return {}
return json.loads(text)
def main():
ensure_runtime_dirs(PATHS)
with open(LOCK_FILE, "w", encoding="utf-8") as lockf:
try:
fcntl.flock(lockf.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
except BlockingIOError:
log("gate already running; skip")
return 0
precheck = run_cmd(COINHUNTER_MODULE + ["precheck"])
if precheck.returncode != 0:
log(f"precheck returned non-zero ({precheck.returncode}); stdout={precheck.stdout.strip()} stderr={precheck.stderr.strip()}")
return 1
try:
data = parse_json_output(precheck.stdout)
except Exception as e:
log(f"failed to parse precheck JSON: {e}; raw={precheck.stdout.strip()[:1000]}")
return 1
if not data.get("should_analyze"):
log("no trigger; skip model run")
return 0
if data.get("run_requested"):
log(f"trigger already queued at {data.get('run_requested_at')}; skip duplicate")
return 0
mark = run_cmd(COINHUNTER_MODULE + ["precheck", "--mark-run-requested", "external-gate queued cron run"])
if mark.returncode != 0:
log(f"failed to mark run requested; stdout={mark.stdout.strip()} stderr={mark.stderr.strip()}")
return 1
trigger = run_cmd([resolve_hermes_executable(PATHS), "cron", "run", TRADE_JOB_ID])
if trigger.returncode != 0:
log(f"failed to trigger trade cron job; stdout={trigger.stdout.strip()} stderr={trigger.stderr.strip()}")
return 1
reasons = ", ".join(data.get("reasons", [])) or "unknown"
log(f"queued trade job {TRADE_JOB_ID}; reasons={reasons}")
if trigger.stdout.strip():
log(trigger.stdout.strip())
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,65 @@
#!/usr/bin/env python3
import json
from datetime import datetime, timezone
from pathlib import Path
from ..runtime import ensure_runtime_dirs, get_runtime_paths
PATHS = get_runtime_paths()
ROOT = PATHS.root
CACHE_DIR = PATHS.cache_dir
def now_iso():
return datetime.now(timezone.utc).replace(microsecond=0).isoformat()
def ensure_file(path: Path, payload: dict):
if path.exists():
return False
path.write_text(json.dumps(payload, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
return True
def main():
ensure_runtime_dirs(PATHS)
created = []
ts = now_iso()
templates = {
ROOT / "config.json": {
"default_exchange": "bybit",
"default_quote_currency": "USDT",
"timezone": "Asia/Shanghai",
"preferred_chains": ["solana", "base"],
"created_at": ts,
"updated_at": ts,
},
ROOT / "accounts.json": {
"accounts": []
},
ROOT / "positions.json": {
"positions": []
},
ROOT / "watchlist.json": {
"watchlist": []
},
ROOT / "notes.json": {
"notes": []
},
}
for path, payload in templates.items():
if ensure_file(path, payload):
created.append(str(path))
print(json.dumps({
"root": str(ROOT),
"created": created,
"cache_dir": str(CACHE_DIR),
}, ensure_ascii=False, indent=2))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,243 @@
#!/usr/bin/env python3
import argparse
import json
import os
import sys
import urllib.parse
import urllib.request
DEFAULT_TIMEOUT = 20
def fetch_json(url, headers=None, timeout=DEFAULT_TIMEOUT):
merged_headers = {
"Accept": "application/json",
"User-Agent": "Mozilla/5.0 (compatible; OpenClaw Coin Hunter/1.0)",
}
if headers:
merged_headers.update(headers)
req = urllib.request.Request(url, headers=merged_headers)
with urllib.request.urlopen(req, timeout=timeout) as resp:
data = resp.read()
return json.loads(data.decode("utf-8"))
def print_json(data):
print(json.dumps(data, ensure_ascii=False, indent=2))
def bybit_ticker(symbol: str):
url = (
"https://api.bybit.com/v5/market/tickers?category=spot&symbol="
+ urllib.parse.quote(symbol.upper())
)
payload = fetch_json(url)
items = payload.get("result", {}).get("list", [])
if not items:
raise SystemExit(f"No Bybit spot ticker found for {symbol}")
item = items[0]
out = {
"provider": "bybit",
"symbol": symbol.upper(),
"lastPrice": item.get("lastPrice"),
"price24hPcnt": item.get("price24hPcnt"),
"highPrice24h": item.get("highPrice24h"),
"lowPrice24h": item.get("lowPrice24h"),
"turnover24h": item.get("turnover24h"),
"volume24h": item.get("volume24h"),
"bid1Price": item.get("bid1Price"),
"ask1Price": item.get("ask1Price"),
}
print_json(out)
def bybit_klines(symbol: str, interval: str, limit: int):
params = urllib.parse.urlencode({
"category": "spot",
"symbol": symbol.upper(),
"interval": interval,
"limit": str(limit),
})
url = f"https://api.bybit.com/v5/market/kline?{params}"
payload = fetch_json(url)
rows = payload.get("result", {}).get("list", [])
out = {
"provider": "bybit",
"symbol": symbol.upper(),
"interval": interval,
"candles": [
{
"startTime": r[0],
"open": r[1],
"high": r[2],
"low": r[3],
"close": r[4],
"volume": r[5],
"turnover": r[6],
}
for r in rows
],
}
print_json(out)
def dexscreener_search(query: str):
url = "https://api.dexscreener.com/latest/dex/search/?q=" + urllib.parse.quote(query)
payload = fetch_json(url)
pairs = payload.get("pairs") or []
out = []
for p in pairs[:10]:
out.append({
"chainId": p.get("chainId"),
"dexId": p.get("dexId"),
"pairAddress": p.get("pairAddress"),
"url": p.get("url"),
"baseToken": p.get("baseToken"),
"quoteToken": p.get("quoteToken"),
"priceUsd": p.get("priceUsd"),
"liquidityUsd": (p.get("liquidity") or {}).get("usd"),
"fdv": p.get("fdv"),
"marketCap": p.get("marketCap"),
"volume24h": (p.get("volume") or {}).get("h24"),
"buys24h": ((p.get("txns") or {}).get("h24") or {}).get("buys"),
"sells24h": ((p.get("txns") or {}).get("h24") or {}).get("sells"),
})
print_json({"provider": "dexscreener", "query": query, "pairs": out})
def dexscreener_token(chain: str, address: str):
url = f"https://api.dexscreener.com/tokens/v1/{urllib.parse.quote(chain)}/{urllib.parse.quote(address)}"
payload = fetch_json(url)
pairs = payload if isinstance(payload, list) else payload.get("pairs") or []
out = []
for p in pairs[:10]:
out.append({
"chainId": p.get("chainId"),
"dexId": p.get("dexId"),
"pairAddress": p.get("pairAddress"),
"baseToken": p.get("baseToken"),
"quoteToken": p.get("quoteToken"),
"priceUsd": p.get("priceUsd"),
"liquidityUsd": (p.get("liquidity") or {}).get("usd"),
"fdv": p.get("fdv"),
"marketCap": p.get("marketCap"),
"volume24h": (p.get("volume") or {}).get("h24"),
})
print_json({"provider": "dexscreener", "chain": chain, "address": address, "pairs": out})
def coingecko_search(query: str):
url = "https://api.coingecko.com/api/v3/search?query=" + urllib.parse.quote(query)
payload = fetch_json(url)
coins = payload.get("coins") or []
out = []
for c in coins[:10]:
out.append({
"id": c.get("id"),
"name": c.get("name"),
"symbol": c.get("symbol"),
"marketCapRank": c.get("market_cap_rank"),
"thumb": c.get("thumb"),
})
print_json({"provider": "coingecko", "query": query, "coins": out})
def coingecko_coin(coin_id: str):
params = urllib.parse.urlencode({
"localization": "false",
"tickers": "false",
"market_data": "true",
"community_data": "false",
"developer_data": "false",
"sparkline": "false",
})
url = f"https://api.coingecko.com/api/v3/coins/{urllib.parse.quote(coin_id)}?{params}"
payload = fetch_json(url)
md = payload.get("market_data") or {}
out = {
"provider": "coingecko",
"id": payload.get("id"),
"symbol": payload.get("symbol"),
"name": payload.get("name"),
"marketCapRank": payload.get("market_cap_rank"),
"currentPriceUsd": (md.get("current_price") or {}).get("usd"),
"marketCapUsd": (md.get("market_cap") or {}).get("usd"),
"fullyDilutedValuationUsd": (md.get("fully_diluted_valuation") or {}).get("usd"),
"totalVolumeUsd": (md.get("total_volume") or {}).get("usd"),
"priceChangePercentage24h": md.get("price_change_percentage_24h"),
"priceChangePercentage7d": md.get("price_change_percentage_7d"),
"priceChangePercentage30d": md.get("price_change_percentage_30d"),
"circulatingSupply": md.get("circulating_supply"),
"totalSupply": md.get("total_supply"),
"maxSupply": md.get("max_supply"),
"homepage": (payload.get("links") or {}).get("homepage", [None])[0],
}
print_json(out)
def birdeye_token(address: str):
api_key = os.getenv("BIRDEYE_API_KEY") or os.getenv("BIRDEYE_APIKEY")
if not api_key:
raise SystemExit("Birdeye requires BIRDEYE_API_KEY in the environment")
url = "https://public-api.birdeye.so/defi/token_overview?address=" + urllib.parse.quote(address)
payload = fetch_json(url, headers={
"x-api-key": api_key,
"x-chain": "solana",
})
print_json({"provider": "birdeye", "address": address, "data": payload.get("data")})
def build_parser():
parser = argparse.ArgumentParser(description="Coin Hunter market data probe")
sub = parser.add_subparsers(dest="command", required=True)
p = sub.add_parser("bybit-ticker", help="Fetch Bybit spot ticker")
p.add_argument("symbol")
p = sub.add_parser("bybit-klines", help="Fetch Bybit spot klines")
p.add_argument("symbol")
p.add_argument("--interval", default="60", help="Bybit interval, e.g. 1, 5, 15, 60, 240, D")
p.add_argument("--limit", type=int, default=10)
p = sub.add_parser("dex-search", help="Search DexScreener by query")
p.add_argument("query")
p = sub.add_parser("dex-token", help="Fetch DexScreener token pairs by chain/address")
p.add_argument("chain")
p.add_argument("address")
p = sub.add_parser("gecko-search", help="Search CoinGecko")
p.add_argument("query")
p = sub.add_parser("gecko-coin", help="Fetch CoinGecko coin by id")
p.add_argument("coin_id")
p = sub.add_parser("birdeye-token", help="Fetch Birdeye token overview (Solana)")
p.add_argument("address")
return parser
def main():
parser = build_parser()
args = parser.parse_args()
if args.command == "bybit-ticker":
bybit_ticker(args.symbol)
elif args.command == "bybit-klines":
bybit_klines(args.symbol, args.interval, args.limit)
elif args.command == "dex-search":
dexscreener_search(args.query)
elif args.command == "dex-token":
dexscreener_token(args.chain, args.address)
elif args.command == "gecko-search":
coingecko_search(args.query)
elif args.command == "gecko-coin":
coingecko_coin(args.coin_id)
elif args.command == "birdeye-token":
birdeye_token(args.address)
else:
parser.error("Unknown command")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,16 @@
"""Print CoinHunter runtime paths."""
from __future__ import annotations
import json
from ..runtime import get_runtime_paths
def main() -> int:
print(json.dumps(get_runtime_paths().as_dict(), ensure_ascii=False, indent=2))
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,28 @@
#!/usr/bin/env python3
"""Rotate external gate log using the user's logrotate config/state."""
import shutil
import subprocess
from ..runtime import ensure_runtime_dirs, get_runtime_paths
PATHS = get_runtime_paths()
STATE_DIR = PATHS.state_dir
LOGROTATE_STATUS = PATHS.logrotate_status
LOGROTATE_CONF = PATHS.logrotate_config
LOGS_DIR = PATHS.logs_dir
def main():
ensure_runtime_dirs(PATHS)
logrotate_bin = shutil.which("logrotate") or "/usr/sbin/logrotate"
cmd = [logrotate_bin, "-s", str(LOGROTATE_STATUS), str(LOGROTATE_CONF)]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.stdout.strip():
print(result.stdout.strip())
if result.stderr.strip():
print(result.stderr.strip())
return result.returncode
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -1,66 +1,8 @@
"""Runtime diagnostics for CoinHunter CLI."""
"""Backward-compatible facade for doctor."""
from __future__ import annotations
import json
import os
import platform
import shutil
import sys
from .runtime import ensure_runtime_dirs, get_runtime_paths, load_env_file, resolve_hermes_executable
REQUIRED_ENV_VARS = ["BINANCE_API_KEY", "BINANCE_API_SECRET"]
def main() -> int:
paths = ensure_runtime_dirs(get_runtime_paths())
env_file = load_env_file(paths)
hermes_executable = resolve_hermes_executable(paths)
env_checks = {}
missing_env = []
for name in REQUIRED_ENV_VARS:
present = bool(os.getenv(name))
env_checks[name] = present
if not present:
missing_env.append(name)
file_checks = {
"env_file_exists": env_file.exists(),
"config_exists": paths.config_file.exists(),
"positions_exists": paths.positions_file.exists(),
"logrotate_config_exists": paths.logrotate_config.exists(),
}
dir_checks = {
"root_exists": paths.root.exists(),
"state_dir_exists": paths.state_dir.exists(),
"logs_dir_exists": paths.logs_dir.exists(),
"reviews_dir_exists": paths.reviews_dir.exists(),
"cache_dir_exists": paths.cache_dir.exists(),
}
command_checks = {
"hermes": bool(shutil.which("hermes") or paths.hermes_bin.exists()),
"logrotate": bool(shutil.which("logrotate") or shutil.which("/usr/sbin/logrotate")),
}
report = {
"ok": not missing_env,
"python": sys.version.split()[0],
"platform": platform.platform(),
"env_file": str(env_file),
"hermes_executable": hermes_executable,
"paths": paths.as_dict(),
"env_checks": env_checks,
"missing_env": missing_env,
"file_checks": file_checks,
"dir_checks": dir_checks,
"command_checks": command_checks,
}
print(json.dumps(report, ensure_ascii=False, indent=2))
return 0 if report["ok"] else 1
from .commands.doctor import main
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -1,82 +1,8 @@
#!/usr/bin/env python3
import fcntl
import json
import subprocess
import sys
from datetime import datetime, timezone
"""Backward-compatible facade for external_gate."""
from .runtime import ensure_runtime_dirs, get_runtime_paths, resolve_hermes_executable
PATHS = get_runtime_paths()
STATE_DIR = PATHS.state_dir
LOCK_FILE = PATHS.external_gate_lock
COINHUNTER_MODULE = [sys.executable, "-m", "coinhunter"]
TRADE_JOB_ID = "4e6593fff158"
def utc_now():
return datetime.now(timezone.utc).isoformat()
def log(message: str):
print(f"[{utc_now()}] {message}")
def run_cmd(args: list[str]) -> subprocess.CompletedProcess:
return subprocess.run(args, capture_output=True, text=True)
def parse_json_output(text: str) -> dict:
text = (text or "").strip()
if not text:
return {}
return json.loads(text)
def main():
ensure_runtime_dirs(PATHS)
with open(LOCK_FILE, "w", encoding="utf-8") as lockf:
try:
fcntl.flock(lockf.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
except BlockingIOError:
log("gate already running; skip")
return 0
precheck = run_cmd(COINHUNTER_MODULE + ["precheck"])
if precheck.returncode != 0:
log(f"precheck returned non-zero ({precheck.returncode}); stdout={precheck.stdout.strip()} stderr={precheck.stderr.strip()}")
return 1
try:
data = parse_json_output(precheck.stdout)
except Exception as e:
log(f"failed to parse precheck JSON: {e}; raw={precheck.stdout.strip()[:1000]}")
return 1
if not data.get("should_analyze"):
log("no trigger; skip model run")
return 0
if data.get("run_requested"):
log(f"trigger already queued at {data.get('run_requested_at')}; skip duplicate")
return 0
mark = run_cmd(COINHUNTER_MODULE + ["precheck", "--mark-run-requested", "external-gate queued cron run"])
if mark.returncode != 0:
log(f"failed to mark run requested; stdout={mark.stdout.strip()} stderr={mark.stderr.strip()}")
return 1
trigger = run_cmd([resolve_hermes_executable(PATHS), "cron", "run", TRADE_JOB_ID])
if trigger.returncode != 0:
log(f"failed to trigger trade cron job; stdout={trigger.stdout.strip()} stderr={trigger.stderr.strip()}")
return 1
reasons = ", ".join(data.get("reasons", [])) or "unknown"
log(f"queued trade job {TRADE_JOB_ID}; reasons={reasons}")
if trigger.stdout.strip():
log(trigger.stdout.strip())
return 0
from __future__ import annotations
from .commands.external_gate import main
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -1,65 +1,8 @@
#!/usr/bin/env python3
import json
from datetime import datetime, timezone
from pathlib import Path
"""Backward-compatible facade for init_user_state."""
from .runtime import ensure_runtime_dirs, get_runtime_paths
PATHS = get_runtime_paths()
ROOT = PATHS.root
CACHE_DIR = PATHS.cache_dir
def now_iso():
return datetime.now(timezone.utc).replace(microsecond=0).isoformat()
def ensure_file(path: Path, payload: dict):
if path.exists():
return False
path.write_text(json.dumps(payload, ensure_ascii=False, indent=2) + "\n", encoding="utf-8")
return True
def main():
ensure_runtime_dirs(PATHS)
created = []
ts = now_iso()
templates = {
ROOT / "config.json": {
"default_exchange": "bybit",
"default_quote_currency": "USDT",
"timezone": "Asia/Shanghai",
"preferred_chains": ["solana", "base"],
"created_at": ts,
"updated_at": ts,
},
ROOT / "accounts.json": {
"accounts": []
},
ROOT / "positions.json": {
"positions": []
},
ROOT / "watchlist.json": {
"watchlist": []
},
ROOT / "notes.json": {
"notes": []
},
}
for path, payload in templates.items():
if ensure_file(path, payload):
created.append(str(path))
print(json.dumps({
"root": str(ROOT),
"created": created,
"cache_dir": str(CACHE_DIR),
}, ensure_ascii=False, indent=2))
from __future__ import annotations
from .commands.init_user_state import main
if __name__ == "__main__":
main()
raise SystemExit(main())

View File

@@ -1,243 +1,8 @@
#!/usr/bin/env python3
import argparse
import json
import os
import sys
import urllib.parse
import urllib.request
"""Backward-compatible facade for market_probe."""
DEFAULT_TIMEOUT = 20
def fetch_json(url, headers=None, timeout=DEFAULT_TIMEOUT):
merged_headers = {
"Accept": "application/json",
"User-Agent": "Mozilla/5.0 (compatible; OpenClaw Coin Hunter/1.0)",
}
if headers:
merged_headers.update(headers)
req = urllib.request.Request(url, headers=merged_headers)
with urllib.request.urlopen(req, timeout=timeout) as resp:
data = resp.read()
return json.loads(data.decode("utf-8"))
def print_json(data):
print(json.dumps(data, ensure_ascii=False, indent=2))
def bybit_ticker(symbol: str):
url = (
"https://api.bybit.com/v5/market/tickers?category=spot&symbol="
+ urllib.parse.quote(symbol.upper())
)
payload = fetch_json(url)
items = payload.get("result", {}).get("list", [])
if not items:
raise SystemExit(f"No Bybit spot ticker found for {symbol}")
item = items[0]
out = {
"provider": "bybit",
"symbol": symbol.upper(),
"lastPrice": item.get("lastPrice"),
"price24hPcnt": item.get("price24hPcnt"),
"highPrice24h": item.get("highPrice24h"),
"lowPrice24h": item.get("lowPrice24h"),
"turnover24h": item.get("turnover24h"),
"volume24h": item.get("volume24h"),
"bid1Price": item.get("bid1Price"),
"ask1Price": item.get("ask1Price"),
}
print_json(out)
def bybit_klines(symbol: str, interval: str, limit: int):
params = urllib.parse.urlencode({
"category": "spot",
"symbol": symbol.upper(),
"interval": interval,
"limit": str(limit),
})
url = f"https://api.bybit.com/v5/market/kline?{params}"
payload = fetch_json(url)
rows = payload.get("result", {}).get("list", [])
out = {
"provider": "bybit",
"symbol": symbol.upper(),
"interval": interval,
"candles": [
{
"startTime": r[0],
"open": r[1],
"high": r[2],
"low": r[3],
"close": r[4],
"volume": r[5],
"turnover": r[6],
}
for r in rows
],
}
print_json(out)
def dexscreener_search(query: str):
url = "https://api.dexscreener.com/latest/dex/search/?q=" + urllib.parse.quote(query)
payload = fetch_json(url)
pairs = payload.get("pairs") or []
out = []
for p in pairs[:10]:
out.append({
"chainId": p.get("chainId"),
"dexId": p.get("dexId"),
"pairAddress": p.get("pairAddress"),
"url": p.get("url"),
"baseToken": p.get("baseToken"),
"quoteToken": p.get("quoteToken"),
"priceUsd": p.get("priceUsd"),
"liquidityUsd": (p.get("liquidity") or {}).get("usd"),
"fdv": p.get("fdv"),
"marketCap": p.get("marketCap"),
"volume24h": (p.get("volume") or {}).get("h24"),
"buys24h": ((p.get("txns") or {}).get("h24") or {}).get("buys"),
"sells24h": ((p.get("txns") or {}).get("h24") or {}).get("sells"),
})
print_json({"provider": "dexscreener", "query": query, "pairs": out})
def dexscreener_token(chain: str, address: str):
url = f"https://api.dexscreener.com/tokens/v1/{urllib.parse.quote(chain)}/{urllib.parse.quote(address)}"
payload = fetch_json(url)
pairs = payload if isinstance(payload, list) else payload.get("pairs") or []
out = []
for p in pairs[:10]:
out.append({
"chainId": p.get("chainId"),
"dexId": p.get("dexId"),
"pairAddress": p.get("pairAddress"),
"baseToken": p.get("baseToken"),
"quoteToken": p.get("quoteToken"),
"priceUsd": p.get("priceUsd"),
"liquidityUsd": (p.get("liquidity") or {}).get("usd"),
"fdv": p.get("fdv"),
"marketCap": p.get("marketCap"),
"volume24h": (p.get("volume") or {}).get("h24"),
})
print_json({"provider": "dexscreener", "chain": chain, "address": address, "pairs": out})
def coingecko_search(query: str):
url = "https://api.coingecko.com/api/v3/search?query=" + urllib.parse.quote(query)
payload = fetch_json(url)
coins = payload.get("coins") or []
out = []
for c in coins[:10]:
out.append({
"id": c.get("id"),
"name": c.get("name"),
"symbol": c.get("symbol"),
"marketCapRank": c.get("market_cap_rank"),
"thumb": c.get("thumb"),
})
print_json({"provider": "coingecko", "query": query, "coins": out})
def coingecko_coin(coin_id: str):
params = urllib.parse.urlencode({
"localization": "false",
"tickers": "false",
"market_data": "true",
"community_data": "false",
"developer_data": "false",
"sparkline": "false",
})
url = f"https://api.coingecko.com/api/v3/coins/{urllib.parse.quote(coin_id)}?{params}"
payload = fetch_json(url)
md = payload.get("market_data") or {}
out = {
"provider": "coingecko",
"id": payload.get("id"),
"symbol": payload.get("symbol"),
"name": payload.get("name"),
"marketCapRank": payload.get("market_cap_rank"),
"currentPriceUsd": (md.get("current_price") or {}).get("usd"),
"marketCapUsd": (md.get("market_cap") or {}).get("usd"),
"fullyDilutedValuationUsd": (md.get("fully_diluted_valuation") or {}).get("usd"),
"totalVolumeUsd": (md.get("total_volume") or {}).get("usd"),
"priceChangePercentage24h": md.get("price_change_percentage_24h"),
"priceChangePercentage7d": md.get("price_change_percentage_7d"),
"priceChangePercentage30d": md.get("price_change_percentage_30d"),
"circulatingSupply": md.get("circulating_supply"),
"totalSupply": md.get("total_supply"),
"maxSupply": md.get("max_supply"),
"homepage": (payload.get("links") or {}).get("homepage", [None])[0],
}
print_json(out)
def birdeye_token(address: str):
api_key = os.getenv("BIRDEYE_API_KEY") or os.getenv("BIRDEYE_APIKEY")
if not api_key:
raise SystemExit("Birdeye requires BIRDEYE_API_KEY in the environment")
url = "https://public-api.birdeye.so/defi/token_overview?address=" + urllib.parse.quote(address)
payload = fetch_json(url, headers={
"x-api-key": api_key,
"x-chain": "solana",
})
print_json({"provider": "birdeye", "address": address, "data": payload.get("data")})
def build_parser():
parser = argparse.ArgumentParser(description="Coin Hunter market data probe")
sub = parser.add_subparsers(dest="command", required=True)
p = sub.add_parser("bybit-ticker", help="Fetch Bybit spot ticker")
p.add_argument("symbol")
p = sub.add_parser("bybit-klines", help="Fetch Bybit spot klines")
p.add_argument("symbol")
p.add_argument("--interval", default="60", help="Bybit interval, e.g. 1, 5, 15, 60, 240, D")
p.add_argument("--limit", type=int, default=10)
p = sub.add_parser("dex-search", help="Search DexScreener by query")
p.add_argument("query")
p = sub.add_parser("dex-token", help="Fetch DexScreener token pairs by chain/address")
p.add_argument("chain")
p.add_argument("address")
p = sub.add_parser("gecko-search", help="Search CoinGecko")
p.add_argument("query")
p = sub.add_parser("gecko-coin", help="Fetch CoinGecko coin by id")
p.add_argument("coin_id")
p = sub.add_parser("birdeye-token", help="Fetch Birdeye token overview (Solana)")
p.add_argument("address")
return parser
def main():
parser = build_parser()
args = parser.parse_args()
if args.command == "bybit-ticker":
bybit_ticker(args.symbol)
elif args.command == "bybit-klines":
bybit_klines(args.symbol, args.interval, args.limit)
elif args.command == "dex-search":
dexscreener_search(args.query)
elif args.command == "dex-token":
dexscreener_token(args.chain, args.address)
elif args.command == "gecko-search":
coingecko_search(args.query)
elif args.command == "gecko-coin":
coingecko_coin(args.coin_id)
elif args.command == "birdeye-token":
birdeye_token(args.address)
else:
parser.error("Unknown command")
from __future__ import annotations
from .commands.market_probe import main
if __name__ == "__main__":
main()
raise SystemExit(main())

View File

@@ -1,16 +1,8 @@
"""Print CoinHunter runtime paths."""
"""Backward-compatible facade for paths."""
from __future__ import annotations
import json
from .runtime import get_runtime_paths
def main() -> int:
print(json.dumps(get_runtime_paths().as_dict(), ensure_ascii=False, indent=2))
return 0
from .commands.paths import main
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -1,28 +1,8 @@
#!/usr/bin/env python3
"""Rotate external gate log using the user's logrotate config/state."""
import shutil
import subprocess
"""Backward-compatible facade for rotate_external_gate_log."""
from .runtime import ensure_runtime_dirs, get_runtime_paths
PATHS = get_runtime_paths()
STATE_DIR = PATHS.state_dir
LOGROTATE_STATUS = PATHS.logrotate_status
LOGROTATE_CONF = PATHS.logrotate_config
LOGS_DIR = PATHS.logs_dir
def main():
ensure_runtime_dirs(PATHS)
logrotate_bin = shutil.which("logrotate") or "/usr/sbin/logrotate"
cmd = [logrotate_bin, "-s", str(LOGROTATE_STATUS), str(LOGROTATE_CONF)]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.stdout.strip():
print(result.stdout.strip())
if result.stderr.strip():
print(result.stderr.strip())
return result.returncode
from __future__ import annotations
from .commands.rotate_external_gate_log import main
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,105 @@
"""Adaptive trigger profile builder for precheck."""
from __future__ import annotations
from .data_utils import to_float
from .precheck_constants import (
BASE_CANDIDATE_SCORE_TRIGGER_RATIO,
BASE_COOLDOWN_MINUTES,
BASE_FORCE_ANALYSIS_AFTER_MINUTES,
BASE_PNL_TRIGGER_PCT,
BASE_PORTFOLIO_MOVE_TRIGGER_PCT,
BASE_PRICE_MOVE_TRIGGER_PCT,
MIN_ACTIONABLE_USDT,
MIN_REAL_POSITION_VALUE_USDT,
)
def build_adaptive_profile(snapshot: dict):
portfolio_value = snapshot.get("portfolio_value_usdt", 0)
free_usdt = snapshot.get("free_usdt", 0)
session = snapshot.get("session")
market = snapshot.get("market_regime", {})
volatility_score = to_float(market.get("volatility_score"), 0)
leader_score = to_float(market.get("leader_score"), 0)
actionable_positions = int(snapshot.get("actionable_positions") or 0)
largest_position_value = to_float(snapshot.get("largest_position_value_usdt"), 0)
capital_band = "micro" if portfolio_value < 25 else "small" if portfolio_value < 100 else "normal"
session_mode = "quiet" if session in {"overnight", "asia-morning"} else "active"
volatility_mode = "high" if volatility_score >= 2.5 or leader_score >= 120 else "normal"
dust_mode = free_usdt < MIN_ACTIONABLE_USDT and largest_position_value < MIN_REAL_POSITION_VALUE_USDT
price_trigger = BASE_PRICE_MOVE_TRIGGER_PCT
pnl_trigger = BASE_PN_L_TRIGGER_PCT
portfolio_trigger = BASE_PORTFOLIO_MOVE_TRIGGER_PCT
candidate_ratio = BASE_CANDIDATE_SCORE_TRIGGER_RATIO
force_minutes = BASE_FORCE_ANALYSIS_AFTER_MINUTES
cooldown_minutes = BASE_COOLDOWN_MINUTES
soft_score_threshold = 2.0
if capital_band == "micro":
price_trigger += 0.02
pnl_trigger += 0.03
portfolio_trigger += 0.04
candidate_ratio += 0.25
force_minutes += 180
cooldown_minutes += 30
soft_score_threshold += 1.0
elif capital_band == "small":
price_trigger += 0.01
pnl_trigger += 0.01
portfolio_trigger += 0.01
candidate_ratio += 0.1
force_minutes += 60
cooldown_minutes += 10
soft_score_threshold += 0.5
if session_mode == "quiet":
price_trigger += 0.01
pnl_trigger += 0.01
portfolio_trigger += 0.01
candidate_ratio += 0.05
soft_score_threshold += 0.5
else:
force_minutes = max(120, force_minutes - 30)
if volatility_mode == "high":
price_trigger = max(0.02, price_trigger - 0.01)
pnl_trigger = max(0.025, pnl_trigger - 0.005)
portfolio_trigger = max(0.025, portfolio_trigger - 0.005)
candidate_ratio = max(1.1, candidate_ratio - 0.1)
cooldown_minutes = max(20, cooldown_minutes - 10)
soft_score_threshold = max(1.0, soft_score_threshold - 0.5)
if dust_mode:
candidate_ratio += 0.3
force_minutes += 180
cooldown_minutes += 30
soft_score_threshold += 1.5
return {
"capital_band": capital_band,
"session_mode": session_mode,
"volatility_mode": volatility_mode,
"dust_mode": dust_mode,
"price_move_trigger_pct": round(price_trigger, 4),
"pnl_trigger_pct": round(pnl_trigger, 4),
"portfolio_move_trigger_pct": round(portfolio_trigger, 4),
"candidate_score_trigger_ratio": round(candidate_ratio, 4),
"force_analysis_after_minutes": int(force_minutes),
"cooldown_minutes": int(cooldown_minutes),
"soft_score_threshold": round(soft_score_threshold, 2),
"new_entries_allowed": free_usdt >= MIN_ACTIONABLE_USDT and not dust_mode,
"switching_allowed": actionable_positions > 0 or portfolio_value >= 25,
}
def _candidate_weight(snapshot: dict, profile: dict) -> float:
if not profile.get("new_entries_allowed"):
return 0.5
if profile.get("volatility_mode") == "high":
return 1.5
if snapshot.get("session") in {"europe-open", "us-session"}:
return 1.25
return 1.0

View File

@@ -0,0 +1,71 @@
"""Candidate coin scoring and selection for precheck."""
from __future__ import annotations
import re
from .data_utils import to_float
from .precheck_constants import BLACKLIST, MAX_PRICE_CAP, MIN_CHANGE_PCT, TOP_CANDIDATES
def _liquidity_score(volume: float) -> float:
return min(1.0, max(0.0, volume / 50_000_000))
def _breakout_score(price: float, avg_price: float | None) -> float:
if not avg_price or avg_price <= 0:
return 0.0
return (price - avg_price) / avg_price
def top_candidates_from_tickers(tickers: dict):
candidates = []
for symbol, ticker in tickers.items():
if not symbol.endswith("/USDT"):
continue
base = symbol.replace("/USDT", "")
if base in BLACKLIST:
continue
if not re.fullmatch(r"[A-Z0-9]{2,20}", base):
continue
price = to_float(ticker.get("last"))
change_pct = to_float(ticker.get("percentage"))
volume = to_float(ticker.get("quoteVolume"))
high = to_float(ticker.get("high"))
low = to_float(ticker.get("low"))
avg_price = to_float(ticker.get("average"), None)
if price <= 0:
continue
if MAX_PRICE_CAP is not None and price > MAX_PRICE_CAP:
continue
if volume < 500_000:
continue
if change_pct < MIN_CHANGE_PCT:
continue
momentum = change_pct / 10.0
liquidity = _liquidity_score(volume)
breakout = _breakout_score(price, avg_price)
score = round(momentum * 0.5 + liquidity * 0.3 + breakout * 0.2, 4)
band = "major" if price >= 10 else "mid" if price >= 1 else "meme"
distance_from_high = (high - price) / max(high, 1e-9) if high else None
candidates.append({
"symbol": symbol,
"base": base,
"price": round(price, 8),
"change_24h_pct": round(change_pct, 2),
"volume_24h": round(volume, 2),
"breakout_pct": round(breakout * 100, 2),
"high_24h": round(high, 8) if high else None,
"low_24h": round(low, 8) if low else None,
"distance_from_high_pct": round(distance_from_high * 100, 2) if distance_from_high is not None else None,
"score": score,
"band": band,
})
candidates.sort(key=lambda x: x["score"], reverse=True)
global_top = candidates[:TOP_CANDIDATES]
layers = {"major": [], "mid": [], "meme": []}
for c in candidates:
layers[c["band"]].append(c)
for k in layers:
layers[k] = layers[k][:5]
return global_top, layers

View File

@@ -0,0 +1,39 @@
"""Generic data helpers for precheck."""
from __future__ import annotations
import hashlib
import json
from pathlib import Path
def load_json(path: Path, default):
if not path.exists():
return default
try:
return json.loads(path.read_text(encoding="utf-8"))
except Exception:
return default
def stable_hash(data) -> str:
payload = json.dumps(data, sort_keys=True, ensure_ascii=False, separators=(",", ":"))
return hashlib.sha1(payload.encode("utf-8")).hexdigest()
def to_float(value, default=0.0):
try:
if value is None:
return default
return float(value)
except Exception:
return default
def norm_symbol(symbol: str) -> str:
s = symbol.upper().replace("-", "").replace("_", "")
if "/" in s:
return s
if s.endswith("USDT"):
return s[:-4] + "/USDT"
return s

View File

@@ -0,0 +1,136 @@
"""Market data fetching and metric computation for precheck."""
from __future__ import annotations
import os
import ccxt
from .data_utils import norm_symbol, to_float
from .precheck_constants import BLACKLIST, MAX_PRICE_CAP, MIN_CHANGE_PCT
from .time_utils import utc_now
def get_exchange():
from ..runtime import load_env_file
from .precheck_constants import ENV_FILE
load_env_file(ENV_FILE)
api_key = os.getenv("BINANCE_API_KEY")
secret = os.getenv("BINANCE_API_SECRET")
if not api_key or not secret:
raise RuntimeError("Missing BINANCE_API_KEY or BINANCE_API_SECRET in ~/.hermes/.env")
ex = ccxt.binance({
"apiKey": api_key,
"secret": secret,
"options": {"defaultType": "spot"},
"enableRateLimit": True,
})
ex.load_markets()
return ex
def fetch_ohlcv_batch(ex, symbols: set, timeframe: str, limit: int):
results = {}
for sym in sorted(symbols):
try:
ohlcv = ex.fetch_ohlcv(sym, timeframe=timeframe, limit=limit)
if ohlcv and len(ohlcv) >= 2:
results[sym] = ohlcv
except Exception:
pass
return results
def compute_ohlcv_metrics(ohlcv_1h, ohlcv_4h, current_price, volume_24h=None):
metrics = {}
if ohlcv_1h and len(ohlcv_1h) >= 2:
closes = [c[4] for c in ohlcv_1h]
volumes = [c[5] for c in ohlcv_1h]
metrics["change_1h_pct"] = round((closes[-1] - closes[-2]) / closes[-2] * 100, 2) if closes[-2] != 0 else None
if len(closes) >= 5:
metrics["change_4h_pct"] = round((closes[-1] - closes[-5]) / closes[-5] * 100, 2) if closes[-5] != 0 else None
recent_vol = sum(volumes[-4:]) / 4 if len(volumes) >= 4 else None
metrics["volume_1h_avg"] = round(recent_vol, 2) if recent_vol else None
highs = [c[2] for c in ohlcv_1h[-4:]]
lows = [c[3] for c in ohlcv_1h[-4:]]
metrics["high_4h"] = round(max(highs), 8) if highs else None
metrics["low_4h"] = round(min(lows), 8) if lows else None
if ohlcv_4h and len(ohlcv_4h) >= 2:
closes_4h = [c[4] for c in ohlcv_4h]
volumes_4h = [c[5] for c in ohlcv_4h]
metrics["change_4h_pct_from_4h"] = round((closes_4h[-1] - closes_4h[-2]) / closes_4h[-2] * 100, 2) if closes_4h[-2] != 0 else None
recent_vol_4h = sum(volumes_4h[-2:]) / 2 if len(volumes_4h) >= 2 else None
metrics["volume_4h_avg"] = round(recent_vol_4h, 2) if recent_vol_4h else None
highs_4h = [c[2] for c in ohlcv_4h]
lows_4h = [c[3] for c in ohlcv_4h]
metrics["high_24h_calc"] = round(max(highs_4h), 8) if highs_4h else None
metrics["low_24h_calc"] = round(min(lows_4h), 8) if lows_4h else None
if highs_4h and lows_4h:
avg_price = sum(closes_4h) / len(closes_4h)
metrics["volatility_4h_pct"] = round((max(highs_4h) - min(lows_4h)) / avg_price * 100, 2)
if current_price:
if metrics.get("high_4h"):
metrics["distance_from_4h_high_pct"] = round((metrics["high_4h"] - current_price) / metrics["high_4h"] * 100, 2)
if metrics.get("low_4h"):
metrics["distance_from_4h_low_pct"] = round((current_price - metrics["low_4h"]) / metrics["low_4h"] * 100, 2)
if metrics.get("high_24h_calc"):
metrics["distance_from_24h_high_pct"] = round((metrics["high_24h_calc"] - current_price) / metrics["high_24h_calc"] * 100, 2)
if metrics.get("low_24h_calc"):
metrics["distance_from_24h_low_pct"] = round((current_price - metrics["low_24h_calc"]) / metrics["low_24h_calc"] * 100, 2)
if volume_24h and volume_24h > 0 and metrics.get("volume_1h_avg"):
daily_avg_1h = volume_24h / 24
metrics["volume_1h_multiple"] = round(metrics["volume_1h_avg"] / daily_avg_1h, 2)
if volume_24h and volume_24h > 0 and metrics.get("volume_4h_avg"):
daily_avg_4h = volume_24h / 6
metrics["volume_4h_multiple"] = round(metrics["volume_4h_avg"] / daily_avg_4h, 2)
return metrics
def enrich_candidates_and_positions(global_candidates, candidate_layers, positions_view, tickers, ex):
symbols = set()
for c in global_candidates:
symbols.add(c["symbol"])
for p in positions_view:
sym = p.get("symbol")
if sym:
sym_ccxt = norm_symbol(sym)
symbols.add(sym_ccxt)
ohlcv_1h = fetch_ohlcv_batch(ex, symbols, "1h", 24)
ohlcv_4h = fetch_ohlcv_batch(ex, symbols, "4h", 12)
def _apply(target_list):
for item in target_list:
sym = item.get("symbol")
if not sym:
continue
sym_ccxt = norm_symbol(sym)
v24h = to_float(tickers.get(sym_ccxt, {}).get("quoteVolume"))
metrics = compute_ohlcv_metrics(
ohlcv_1h.get(sym_ccxt),
ohlcv_4h.get(sym_ccxt),
item.get("price") or item.get("last_price"),
volume_24h=v24h,
)
item["metrics"] = metrics
_apply(global_candidates)
for band_list in candidate_layers.values():
_apply(band_list)
_apply(positions_view)
return global_candidates, candidate_layers, positions_view
def regime_from_pct(pct: float | None) -> str:
if pct is None:
return "unknown"
if pct >= 2.0:
return "bullish"
if pct <= -2.0:
return "bearish"
return "neutral"

View File

@@ -2,16 +2,13 @@
from __future__ import annotations
from . import precheck_core
def analyze_trigger(snapshot: dict, state: dict) -> dict:
return precheck_core.analyze_trigger(snapshot, state)
from .time_utils import utc_iso
from .trigger_analyzer import analyze_trigger
def build_failure_payload(exc: Exception) -> dict:
return {
"generated_at": precheck_core.utc_iso(),
"generated_at": utc_iso(),
"status": "deep_analysis_required",
"should_analyze": True,
"pending_trigger": True,
@@ -21,5 +18,5 @@ def build_failure_payload(exc: Exception) -> dict:
"soft_reasons": [],
"soft_score": 0,
"details": [str(exc)],
"compact_summary": f"预检查失败,转入深度分析底: {exc}",
"compact_summary": f"预检查失败,转入深度分析底: {exc}",
}

View File

@@ -0,0 +1,31 @@
"""Precheck constants and runtime paths."""
from __future__ import annotations
from ..runtime import get_runtime_paths
PATHS = get_runtime_paths()
BASE_DIR = PATHS.root
STATE_DIR = PATHS.state_dir
STATE_FILE = PATHS.precheck_state_file
POSITIONS_FILE = PATHS.positions_file
CONFIG_FILE = PATHS.config_file
ENV_FILE = PATHS.env_file
BASE_PRICE_MOVE_TRIGGER_PCT = 0.025
BASE_PNL_TRIGGER_PCT = 0.03
BASE_PORTFOLIO_MOVE_TRIGGER_PCT = 0.03
BASE_CANDIDATE_SCORE_TRIGGER_RATIO = 1.15
BASE_FORCE_ANALYSIS_AFTER_MINUTES = 180
BASE_COOLDOWN_MINUTES = 45
TOP_CANDIDATES = 10
MIN_ACTIONABLE_USDT = 12.0
MIN_REAL_POSITION_VALUE_USDT = 8.0
BLACKLIST = {"USDC", "BUSD", "TUSD", "FDUSD", "USTC", "PAXG"}
HARD_STOP_PCT = -0.08
HARD_MOON_PCT = 0.25
MIN_CHANGE_PCT = 1.0
MAX_PRICE_CAP = None
HARD_REASON_DEDUP_MINUTES = 15
MAX_PENDING_TRIGGER_MINUTES = 30
MAX_RUN_REQUEST_MINUTES = 20

View File

@@ -1,900 +1,99 @@
"""Service-owned precheck core logic.
"""Backward-compatible facade for precheck internals.
This module holds the reusable implementation. Root-level ``coinhunter.precheck``
is intentionally kept as a compatibility facade for older imports and direct
module execution.
The reusable implementation has been split into smaller modules:
- precheck_constants : paths and thresholds
- time_utils : UTC/local time helpers
- data_utils : json, hash, float, symbol normalization
- state_manager : load/save/sanitize state
- market_data : exchange, ohlcv, metrics
- candidate_scoring : top candidate selection
- snapshot_builder : build_snapshot
- adaptive_profile : trigger profile builder
- trigger_analyzer : analyze_trigger
Keep this module importable so older entrypoints continue to work.
"""
from __future__ import annotations
import hashlib
import json
import os
import re
from datetime import datetime, timedelta, timezone
from pathlib import Path
from zoneinfo import ZoneInfo
import ccxt
from ..runtime import get_runtime_paths, load_env_file
PATHS = get_runtime_paths()
BASE_DIR = PATHS.root
STATE_DIR = PATHS.state_dir
STATE_FILE = PATHS.precheck_state_file
POSITIONS_FILE = PATHS.positions_file
CONFIG_FILE = PATHS.config_file
ENV_FILE = PATHS.env_file
BASE_PRICE_MOVE_TRIGGER_PCT = 0.025
BASE_PNL_TRIGGER_PCT = 0.03
BASE_PORTFOLIO_MOVE_TRIGGER_PCT = 0.03
BASE_CANDIDATE_SCORE_TRIGGER_RATIO = 1.15
BASE_FORCE_ANALYSIS_AFTER_MINUTES = 180
BASE_COOLDOWN_MINUTES = 45
TOP_CANDIDATES = 10
MIN_ACTIONABLE_USDT = 12.0
MIN_REAL_POSITION_VALUE_USDT = 8.0
BLACKLIST = {"USDC", "BUSD", "TUSD", "FDUSD", "USTC", "PAXG"}
HARD_STOP_PCT = -0.08
HARD_MOON_PCT = 0.25
MIN_CHANGE_PCT = 1.0
MAX_PRICE_CAP = None
HARD_REASON_DEDUP_MINUTES = 15
MAX_PENDING_TRIGGER_MINUTES = 30
MAX_RUN_REQUEST_MINUTES = 20
def utc_now():
return datetime.now(timezone.utc)
def utc_iso():
return utc_now().isoformat()
def parse_ts(value: str | None):
if not value:
return None
try:
ts = datetime.fromisoformat(value)
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
return ts
except Exception:
return None
def load_json(path: Path, default):
if not path.exists():
return default
try:
return json.loads(path.read_text(encoding="utf-8"))
except Exception:
return default
def load_env():
load_env_file(PATHS)
def load_positions():
return load_json(POSITIONS_FILE, {}).get("positions", [])
def load_state():
return load_json(STATE_FILE, {})
def load_config():
return load_json(CONFIG_FILE, {})
def clear_run_request_fields(state: dict):
state.pop("run_requested_at", None)
state.pop("run_request_note", None)
def sanitize_state_for_stale_triggers(state: dict):
sanitized = dict(state)
notes = []
now = utc_now()
run_requested_at = parse_ts(sanitized.get("run_requested_at"))
last_deep_analysis_at = parse_ts(sanitized.get("last_deep_analysis_at"))
last_triggered_at = parse_ts(sanitized.get("last_triggered_at"))
pending_trigger = bool(sanitized.get("pending_trigger"))
if run_requested_at and last_deep_analysis_at and last_deep_analysis_at >= run_requested_at:
clear_run_request_fields(sanitized)
if pending_trigger and (not last_triggered_at or last_deep_analysis_at >= last_triggered_at):
sanitized["pending_trigger"] = False
sanitized["pending_reasons"] = []
sanitized["last_ack_note"] = (
f"auto-cleared completed trigger at {utc_iso()} because last_deep_analysis_at >= run_requested_at"
)
pending_trigger = False
notes.append(
f"自动清理已完成的 run_requested 标记:最近深度分析时间 {last_deep_analysis_at.isoformat()} >= 请求时间 {run_requested_at.isoformat()}"
)
run_requested_at = None
if run_requested_at and now - run_requested_at > timedelta(minutes=MAX_RUN_REQUEST_MINUTES):
clear_run_request_fields(sanitized)
notes.append(
f"自动清理超时 run_requested 标记:已等待 {(now - run_requested_at).total_seconds() / 60:.1f} 分钟,超过 {MAX_RUN_REQUEST_MINUTES} 分钟"
)
run_requested_at = None
pending_anchor = run_requested_at or last_triggered_at or last_deep_analysis_at
if pending_trigger and pending_anchor and now - pending_anchor > timedelta(minutes=MAX_PENDING_TRIGGER_MINUTES):
sanitized["pending_trigger"] = False
sanitized["pending_reasons"] = []
sanitized["last_ack_note"] = (
f"auto-recovered stale pending trigger at {utc_iso()} after waiting "
f"{(now - pending_anchor).total_seconds() / 60:.1f} minutes"
)
notes.append(
f"自动解除 pending_trigger触发状态已悬挂 {(now - pending_anchor).total_seconds() / 60:.1f} 分钟,超过 {MAX_PENDING_TRIGGER_MINUTES} 分钟"
)
sanitized["_stale_recovery_notes"] = notes
return sanitized
def save_state(state: dict):
STATE_DIR.mkdir(parents=True, exist_ok=True)
state_to_save = dict(state)
state_to_save.pop("_stale_recovery_notes", None)
STATE_FILE.write_text(json.dumps(state_to_save, indent=2, ensure_ascii=False), encoding="utf-8")
def stable_hash(data) -> str:
payload = json.dumps(data, sort_keys=True, ensure_ascii=False, separators=(",", ":"))
return hashlib.sha1(payload.encode("utf-8")).hexdigest()
def get_exchange():
load_env()
api_key = os.getenv("BINANCE_API_KEY")
secret = os.getenv("BINANCE_API_SECRET")
if not api_key or not secret:
raise RuntimeError("Missing BINANCE_API_KEY or BINANCE_API_SECRET in ~/.hermes/.env")
ex = ccxt.binance({
"apiKey": api_key,
"secret": secret,
"options": {"defaultType": "spot"},
"enableRateLimit": True,
})
ex.load_markets()
return ex
def fetch_ohlcv_batch(ex, symbols: set, timeframe: str, limit: int):
results = {}
for sym in sorted(symbols):
try:
ohlcv = ex.fetch_ohlcv(sym, timeframe=timeframe, limit=limit)
if ohlcv and len(ohlcv) >= 2:
results[sym] = ohlcv
except Exception:
pass
return results
def compute_ohlcv_metrics(ohlcv_1h, ohlcv_4h, current_price, volume_24h=None):
metrics = {}
if ohlcv_1h and len(ohlcv_1h) >= 2:
closes = [c[4] for c in ohlcv_1h]
volumes = [c[5] for c in ohlcv_1h]
metrics["change_1h_pct"] = round((closes[-1] - closes[-2]) / closes[-2] * 100, 2) if closes[-2] != 0 else None
if len(closes) >= 5:
metrics["change_4h_pct"] = round((closes[-1] - closes[-5]) / closes[-5] * 100, 2) if closes[-5] != 0 else None
recent_vol = sum(volumes[-4:]) / 4 if len(volumes) >= 4 else None
metrics["volume_1h_avg"] = round(recent_vol, 2) if recent_vol else None
highs = [c[2] for c in ohlcv_1h[-4:]]
lows = [c[3] for c in ohlcv_1h[-4:]]
metrics["high_4h"] = round(max(highs), 8) if highs else None
metrics["low_4h"] = round(min(lows), 8) if lows else None
if ohlcv_4h and len(ohlcv_4h) >= 2:
closes_4h = [c[4] for c in ohlcv_4h]
volumes_4h = [c[5] for c in ohlcv_4h]
metrics["change_4h_pct_from_4h"] = round((closes_4h[-1] - closes_4h[-2]) / closes_4h[-2] * 100, 2) if closes_4h[-2] != 0 else None
recent_vol_4h = sum(volumes_4h[-2:]) / 2 if len(volumes_4h) >= 2 else None
metrics["volume_4h_avg"] = round(recent_vol_4h, 2) if recent_vol_4h else None
highs_4h = [c[2] for c in ohlcv_4h]
lows_4h = [c[3] for c in ohlcv_4h]
metrics["high_24h_calc"] = round(max(highs_4h), 8) if highs_4h else None
metrics["low_24h_calc"] = round(min(lows_4h), 8) if lows_4h else None
if highs_4h and lows_4h:
avg_price = sum(closes_4h) / len(closes_4h)
metrics["volatility_4h_pct"] = round((max(highs_4h) - min(lows_4h)) / avg_price * 100, 2)
if current_price:
if metrics.get("high_4h"):
metrics["distance_from_4h_high_pct"] = round((metrics["high_4h"] - current_price) / metrics["high_4h"] * 100, 2)
if metrics.get("low_4h"):
metrics["distance_from_4h_low_pct"] = round((current_price - metrics["low_4h"]) / metrics["low_4h"] * 100, 2)
if metrics.get("high_24h_calc"):
metrics["distance_from_24h_high_pct"] = round((metrics["high_24h_calc"] - current_price) / metrics["high_24h_calc"] * 100, 2)
if metrics.get("low_24h_calc"):
metrics["distance_from_24h_low_pct"] = round((current_price - metrics["low_24h_calc"]) / metrics["low_24h_calc"] * 100, 2)
if volume_24h and volume_24h > 0 and metrics.get("volume_1h_avg"):
daily_avg_1h = volume_24h / 24
metrics["volume_1h_multiple"] = round(metrics["volume_1h_avg"] / daily_avg_1h, 2)
if volume_24h and volume_24h > 0 and metrics.get("volume_4h_avg"):
daily_avg_4h = volume_24h / 6
metrics["volume_4h_multiple"] = round(metrics["volume_4h_avg"] / daily_avg_4h, 2)
return metrics
def enrich_candidates_and_positions(global_candidates, candidate_layers, positions_view, tickers, ex):
symbols = set()
for c in global_candidates:
symbols.add(c["symbol"])
for p in positions_view:
sym = p.get("symbol")
if sym:
sym_ccxt = norm_symbol(sym)
symbols.add(sym_ccxt)
ohlcv_1h = fetch_ohlcv_batch(ex, symbols, "1h", 24)
ohlcv_4h = fetch_ohlcv_batch(ex, symbols, "4h", 12)
def _apply(target_list):
for item in target_list:
sym = item.get("symbol")
if not sym:
continue
sym_ccxt = norm_symbol(sym)
v24h = to_float(tickers.get(sym_ccxt, {}).get("quoteVolume"))
metrics = compute_ohlcv_metrics(
ohlcv_1h.get(sym_ccxt),
ohlcv_4h.get(sym_ccxt),
item.get("price") or item.get("last_price"),
volume_24h=v24h,
)
item["metrics"] = metrics
_apply(global_candidates)
for band_list in candidate_layers.values():
_apply(band_list)
_apply(positions_view)
return global_candidates, candidate_layers, positions_view
def regime_from_pct(pct: float | None) -> str:
if pct is None:
return "unknown"
if pct >= 2.0:
return "bullish"
if pct <= -2.0:
return "bearish"
return "neutral"
def to_float(value, default=0.0):
try:
if value is None:
return default
return float(value)
except Exception:
return default
def norm_symbol(symbol: str) -> str:
s = symbol.upper().replace("-", "").replace("_", "")
if "/" in s:
return s
if s.endswith("USDT"):
return s[:-4] + "/USDT"
return s
def get_local_now(config: dict):
tz_name = config.get("timezone") or "Asia/Shanghai"
try:
tz = ZoneInfo(tz_name)
except Exception:
tz = ZoneInfo("Asia/Shanghai")
tz_name = "Asia/Shanghai"
return utc_now().astimezone(tz), tz_name
def session_label(local_dt: datetime) -> str:
hour = local_dt.hour
if 0 <= hour < 7:
return "overnight"
if 7 <= hour < 12:
return "asia-morning"
if 12 <= hour < 17:
return "asia-afternoon"
if 17 <= hour < 21:
return "europe-open"
return "us-session"
def _liquidity_score(volume: float) -> float:
return min(1.0, max(0.0, volume / 50_000_000))
def _breakout_score(price: float, avg_price: float | None) -> float:
if not avg_price or avg_price <= 0:
return 0.0
return (price - avg_price) / avg_price
def top_candidates_from_tickers(tickers: dict):
candidates = []
for symbol, ticker in tickers.items():
if not symbol.endswith("/USDT"):
continue
base = symbol.replace("/USDT", "")
if base in BLACKLIST:
continue
if not re.fullmatch(r"[A-Z0-9]{2,20}", base):
continue
price = to_float(ticker.get("last"))
change_pct = to_float(ticker.get("percentage"))
volume = to_float(ticker.get("quoteVolume"))
high = to_float(ticker.get("high"))
low = to_float(ticker.get("low"))
avg_price = to_float(ticker.get("average"), None)
if price <= 0:
continue
if MAX_PRICE_CAP is not None and price > MAX_PRICE_CAP:
continue
if volume < 500_000:
continue
if change_pct < MIN_CHANGE_PCT:
continue
momentum = change_pct / 10.0
liquidity = _liquidity_score(volume)
breakout = _breakout_score(price, avg_price)
score = round(momentum * 0.5 + liquidity * 0.3 + breakout * 0.2, 4)
band = "major" if price >= 10 else "mid" if price >= 1 else "meme"
distance_from_high = (high - price) / max(high, 1e-9) if high else None
candidates.append({
"symbol": symbol,
"base": base,
"price": round(price, 8),
"change_24h_pct": round(change_pct, 2),
"volume_24h": round(volume, 2),
"breakout_pct": round(breakout * 100, 2),
"high_24h": round(high, 8) if high else None,
"low_24h": round(low, 8) if low else None,
"distance_from_high_pct": round(distance_from_high * 100, 2) if distance_from_high is not None else None,
"score": score,
"band": band,
})
candidates.sort(key=lambda x: x["score"], reverse=True)
global_top = candidates[:TOP_CANDIDATES]
layers = {"major": [], "mid": [], "meme": []}
for c in candidates:
layers[c["band"]].append(c)
for k in layers:
layers[k] = layers[k][:5]
return global_top, layers
def build_snapshot():
config = load_config()
local_dt, tz_name = get_local_now(config)
ex = get_exchange()
positions = load_positions()
tickers = ex.fetch_tickers()
balances = ex.fetch_balance()["free"]
free_usdt = to_float(balances.get("USDT"))
positions_view = []
total_position_value = 0.0
largest_position_value = 0.0
actionable_positions = 0
for pos in positions:
symbol = pos.get("symbol") or ""
sym_ccxt = norm_symbol(symbol)
ticker = tickers.get(sym_ccxt, {})
last = to_float(ticker.get("last"), None)
qty = to_float(pos.get("quantity"))
avg_cost = to_float(pos.get("avg_cost"), None)
value = round(qty * last, 4) if last is not None else None
pnl_pct = round((last - avg_cost) / avg_cost, 4) if last is not None and avg_cost else None
high = to_float(ticker.get("high"))
low = to_float(ticker.get("low"))
distance_from_high = (high - last) / max(high, 1e-9) if high and last else None
if value is not None:
total_position_value += value
largest_position_value = max(largest_position_value, value)
if value >= MIN_REAL_POSITION_VALUE_USDT:
actionable_positions += 1
positions_view.append({
"symbol": symbol,
"base_asset": pos.get("base_asset"),
"quantity": qty,
"avg_cost": avg_cost,
"last_price": last,
"market_value_usdt": value,
"pnl_pct": pnl_pct,
"high_24h": round(high, 8) if high else None,
"low_24h": round(low, 8) if low else None,
"distance_from_high_pct": round(distance_from_high * 100, 2) if distance_from_high is not None else None,
})
btc_pct = to_float((tickers.get("BTC/USDT") or {}).get("percentage"), None)
eth_pct = to_float((tickers.get("ETH/USDT") or {}).get("percentage"), None)
global_candidates, candidate_layers = top_candidates_from_tickers(tickers)
global_candidates, candidate_layers, positions_view = enrich_candidates_and_positions(
global_candidates, candidate_layers, positions_view, tickers, ex
)
leader_score = global_candidates[0]["score"] if global_candidates else 0.0
portfolio_value = round(free_usdt + total_position_value, 4)
volatility_score = round(max(abs(to_float(btc_pct, 0)), abs(to_float(eth_pct, 0))), 2)
position_structure = [
{
"symbol": p.get("symbol"),
"base_asset": p.get("base_asset"),
"quantity": round(to_float(p.get("quantity"), 0), 10),
"avg_cost": to_float(p.get("avg_cost"), None),
}
for p in positions_view
]
snapshot = {
"generated_at": utc_iso(),
"timezone": tz_name,
"local_time": local_dt.isoformat(),
"session": session_label(local_dt),
"free_usdt": round(free_usdt, 4),
"portfolio_value_usdt": portfolio_value,
"largest_position_value_usdt": round(largest_position_value, 4),
"actionable_positions": actionable_positions,
"positions": positions_view,
"positions_hash": stable_hash(position_structure),
"top_candidates": global_candidates,
"top_candidates_layers": candidate_layers,
"candidates_hash": stable_hash({"global": global_candidates, "layers": candidate_layers}),
"market_regime": {
"btc_24h_pct": round(btc_pct, 2) if btc_pct is not None else None,
"btc_regime": regime_from_pct(btc_pct),
"eth_24h_pct": round(eth_pct, 2) if eth_pct is not None else None,
"eth_regime": regime_from_pct(eth_pct),
"volatility_score": volatility_score,
"leader_score": round(leader_score, 4),
},
}
snapshot["snapshot_hash"] = stable_hash({
"portfolio_value_usdt": snapshot["portfolio_value_usdt"],
"positions_hash": snapshot["positions_hash"],
"candidates_hash": snapshot["candidates_hash"],
"market_regime": snapshot["market_regime"],
"session": snapshot["session"],
})
return snapshot
def build_adaptive_profile(snapshot: dict):
portfolio_value = snapshot.get("portfolio_value_usdt", 0)
free_usdt = snapshot.get("free_usdt", 0)
session = snapshot.get("session")
market = snapshot.get("market_regime", {})
volatility_score = to_float(market.get("volatility_score"), 0)
leader_score = to_float(market.get("leader_score"), 0)
actionable_positions = int(snapshot.get("actionable_positions") or 0)
largest_position_value = to_float(snapshot.get("largest_position_value_usdt"), 0)
capital_band = "micro" if portfolio_value < 25 else "small" if portfolio_value < 100 else "normal"
session_mode = "quiet" if session in {"overnight", "asia-morning"} else "active"
volatility_mode = "high" if volatility_score >= 2.5 or leader_score >= 120 else "normal"
dust_mode = free_usdt < MIN_ACTIONABLE_USDT and largest_position_value < MIN_REAL_POSITION_VALUE_USDT
price_trigger = BASE_PRICE_MOVE_TRIGGER_PCT
pnl_trigger = BASE_PNL_TRIGGER_PCT
portfolio_trigger = BASE_PORTFOLIO_MOVE_TRIGGER_PCT
candidate_ratio = BASE_CANDIDATE_SCORE_TRIGGER_RATIO
force_minutes = BASE_FORCE_ANALYSIS_AFTER_MINUTES
cooldown_minutes = BASE_COOLDOWN_MINUTES
soft_score_threshold = 2.0
if capital_band == "micro":
price_trigger += 0.02
pnl_trigger += 0.03
portfolio_trigger += 0.04
candidate_ratio += 0.25
force_minutes += 180
cooldown_minutes += 30
soft_score_threshold += 1.0
elif capital_band == "small":
price_trigger += 0.01
pnl_trigger += 0.01
portfolio_trigger += 0.01
candidate_ratio += 0.1
force_minutes += 60
cooldown_minutes += 10
soft_score_threshold += 0.5
if session_mode == "quiet":
price_trigger += 0.01
pnl_trigger += 0.01
portfolio_trigger += 0.01
candidate_ratio += 0.05
soft_score_threshold += 0.5
else:
force_minutes = max(120, force_minutes - 30)
if volatility_mode == "high":
price_trigger = max(0.02, price_trigger - 0.01)
pnl_trigger = max(0.025, pnl_trigger - 0.005)
portfolio_trigger = max(0.025, portfolio_trigger - 0.005)
candidate_ratio = max(1.1, candidate_ratio - 0.1)
cooldown_minutes = max(20, cooldown_minutes - 10)
soft_score_threshold = max(1.0, soft_score_threshold - 0.5)
if dust_mode:
candidate_ratio += 0.3
force_minutes += 180
cooldown_minutes += 30
soft_score_threshold += 1.5
return {
"capital_band": capital_band,
"session_mode": session_mode,
"volatility_mode": volatility_mode,
"dust_mode": dust_mode,
"price_move_trigger_pct": round(price_trigger, 4),
"pnl_trigger_pct": round(pnl_trigger, 4),
"portfolio_move_trigger_pct": round(portfolio_trigger, 4),
"candidate_score_trigger_ratio": round(candidate_ratio, 4),
"force_analysis_after_minutes": int(force_minutes),
"cooldown_minutes": int(cooldown_minutes),
"soft_score_threshold": round(soft_score_threshold, 2),
"new_entries_allowed": free_usdt >= MIN_ACTIONABLE_USDT and not dust_mode,
"switching_allowed": actionable_positions > 0 or portfolio_value >= 25,
}
def _candidate_weight(snapshot: dict, profile: dict) -> float:
if not profile.get("new_entries_allowed"):
return 0.5
if profile.get("volatility_mode") == "high":
return 1.5
if snapshot.get("session") in {"europe-open", "us-session"}:
return 1.25
return 1.0
def analyze_trigger(snapshot: dict, state: dict):
reasons = []
details = list(state.get("_stale_recovery_notes", []))
hard_reasons = []
soft_reasons = []
soft_score = 0.0
profile = build_adaptive_profile(snapshot)
market = snapshot.get("market_regime", {})
now = utc_now()
last_positions_hash = state.get("last_positions_hash")
last_portfolio_value = state.get("last_portfolio_value_usdt")
last_market_regime = state.get("last_market_regime", {})
last_positions_map = state.get("last_positions_map", {})
last_top_candidate = state.get("last_top_candidate")
pending_trigger = bool(state.get("pending_trigger"))
run_requested_at = parse_ts(state.get("run_requested_at"))
last_deep_analysis_at = parse_ts(state.get("last_deep_analysis_at"))
last_triggered_at = parse_ts(state.get("last_triggered_at"))
last_trigger_snapshot_hash = state.get("last_trigger_snapshot_hash")
last_hard_reasons_at = state.get("last_hard_reasons_at", {})
price_trigger = profile["price_move_trigger_pct"]
pnl_trigger = profile["pnl_trigger_pct"]
portfolio_trigger = profile["portfolio_move_trigger_pct"]
candidate_ratio_trigger = profile["candidate_score_trigger_ratio"]
force_minutes = profile["force_analysis_after_minutes"]
cooldown_minutes = profile["cooldown_minutes"]
soft_score_threshold = profile["soft_score_threshold"]
if pending_trigger:
reasons.append("pending-trigger-unacked")
hard_reasons.append("pending-trigger-unacked")
details.append("上次已触发深度分析但尚未确认完成")
if run_requested_at:
details.append(f"外部门控已在 {run_requested_at.isoformat()} 请求运行分析任务")
if not last_deep_analysis_at:
reasons.append("first-analysis")
hard_reasons.append("first-analysis")
details.append("尚未记录过深度分析")
elif now - last_deep_analysis_at >= timedelta(minutes=force_minutes):
reasons.append("stale-analysis")
hard_reasons.append("stale-analysis")
details.append(f"距离上次深度分析已超过 {force_minutes} 分钟")
if last_positions_hash and snapshot["positions_hash"] != last_positions_hash:
reasons.append("positions-changed")
hard_reasons.append("positions-changed")
details.append("持仓结构发生变化")
if last_portfolio_value not in (None, 0):
portfolio_delta = abs(snapshot["portfolio_value_usdt"] - last_portfolio_value) / max(last_portfolio_value, 1e-9)
if portfolio_delta >= portfolio_trigger:
if portfolio_delta >= 1.0:
reasons.append("portfolio-extreme-move")
hard_reasons.append("portfolio-extreme-move")
details.append(f"组合净值剧烈变化 {portfolio_delta:.1%},超过 100%,视为硬触发")
else:
reasons.append("portfolio-move")
soft_reasons.append("portfolio-move")
soft_score += 1.0
details.append(f"组合净值变化 {portfolio_delta:.1%},阈值 {portfolio_trigger:.1%}")
for pos in snapshot["positions"]:
symbol = pos["symbol"]
prev = last_positions_map.get(symbol, {})
cur_price = pos.get("last_price")
prev_price = prev.get("last_price")
cur_pnl = pos.get("pnl_pct")
prev_pnl = prev.get("pnl_pct")
market_value = to_float(pos.get("market_value_usdt"), 0)
actionable_position = market_value >= MIN_REAL_POSITION_VALUE_USDT
if cur_price and prev_price:
price_move = abs(cur_price - prev_price) / max(prev_price, 1e-9)
if price_move >= price_trigger:
reasons.append(f"price-move:{symbol}")
soft_reasons.append(f"price-move:{symbol}")
soft_score += 1.0 if actionable_position else 0.4
details.append(f"{symbol} 价格变化 {price_move:.1%},阈值 {price_trigger:.1%}")
if cur_pnl is not None and prev_pnl is not None:
pnl_move = abs(cur_pnl - prev_pnl)
if pnl_move >= pnl_trigger:
reasons.append(f"pnl-move:{symbol}")
soft_reasons.append(f"pnl-move:{symbol}")
soft_score += 1.0 if actionable_position else 0.4
details.append(f"{symbol} 盈亏变化 {pnl_move:.1%},阈值 {pnl_trigger:.1%}")
if cur_pnl is not None:
stop_band = -0.06 if actionable_position else -0.12
take_band = 0.14 if actionable_position else 0.25
if cur_pnl <= stop_band or cur_pnl >= take_band:
reasons.append(f"risk-band:{symbol}")
hard_reasons.append(f"risk-band:{symbol}")
details.append(f"{symbol} 接近执行阈值,当前盈亏 {cur_pnl:.1%}")
if cur_pnl <= HARD_STOP_PCT:
reasons.append(f"hard-stop:{symbol}")
hard_reasons.append(f"hard-stop:{symbol}")
details.append(f"{symbol} 盈亏超过 {HARD_STOP_PCT:.1%},触发紧急硬触发")
current_market = snapshot.get("market_regime", {})
if last_market_regime:
if current_market.get("btc_regime") != last_market_regime.get("btc_regime"):
reasons.append("btc-regime-change")
hard_reasons.append("btc-regime-change")
details.append(f"BTC 由 {last_market_regime.get('btc_regime')} 切换为 {current_market.get('btc_regime')}")
if current_market.get("eth_regime") != last_market_regime.get("eth_regime"):
reasons.append("eth-regime-change")
hard_reasons.append("eth-regime-change")
details.append(f"ETH 由 {last_market_regime.get('eth_regime')} 切换为 {current_market.get('eth_regime')}")
for cand in snapshot.get("top_candidates", []):
if cand.get("change_24h_pct", 0) >= HARD_MOON_PCT * 100:
reasons.append(f"hard-moon:{cand['symbol']}")
hard_reasons.append(f"hard-moon:{cand['symbol']}")
details.append(f"候选币 {cand['symbol']} 24h 涨幅 {cand['change_24h_pct']:.1f}%,触发强势硬触发")
candidate_weight = _candidate_weight(snapshot, profile)
last_layers = state.get("last_candidates_layers", {})
current_layers = snapshot.get("top_candidates_layers", {})
for band in ("major", "mid", "meme"):
cur_band = current_layers.get(band, [])
prev_band = last_layers.get(band, [])
cur_leader = cur_band[0] if cur_band else None
prev_leader = prev_band[0] if prev_band else None
if cur_leader and prev_leader and cur_leader["symbol"] != prev_leader["symbol"]:
score_ratio = cur_leader.get("score", 0) / max(prev_leader.get("score", 0.0001), 0.0001)
if score_ratio >= candidate_ratio_trigger:
reasons.append(f"new-leader-{band}:{cur_leader['symbol']}")
soft_reasons.append(f"new-leader-{band}:{cur_leader['symbol']}")
soft_score += candidate_weight * 0.7
details.append(
f"{band} 层新榜首 {cur_leader['symbol']} 替代 {prev_leader['symbol']}score 比例 {score_ratio:.2f}"
)
current_leader = snapshot.get("top_candidates", [{}])[0] if snapshot.get("top_candidates") else None
if last_top_candidate and current_leader:
if current_leader.get("symbol") != last_top_candidate.get("symbol"):
score_ratio = current_leader.get("score", 0) / max(last_top_candidate.get("score", 0.0001), 0.0001)
if score_ratio >= candidate_ratio_trigger:
reasons.append("new-leader")
soft_reasons.append("new-leader")
soft_score += candidate_weight
details.append(
f"新候选币 {current_leader.get('symbol')} 领先上次榜首score 比例 {score_ratio:.2f},阈值 {candidate_ratio_trigger:.2f}"
)
elif current_leader and not last_top_candidate:
reasons.append("candidate-leader-init")
soft_reasons.append("candidate-leader-init")
soft_score += candidate_weight
details.append(f"首次记录候选榜首 {current_leader.get('symbol')}")
def _signal_delta() -> float:
delta = 0.0
if last_trigger_snapshot_hash and snapshot.get("snapshot_hash") != last_trigger_snapshot_hash:
delta += 0.5
if snapshot["positions_hash"] != last_positions_hash:
delta += 1.5
for pos in snapshot["positions"]:
symbol = pos["symbol"]
prev = last_positions_map.get(symbol, {})
cur_price = pos.get("last_price")
prev_price = prev.get("last_price")
cur_pnl = pos.get("pnl_pct")
prev_pnl = prev.get("pnl_pct")
if cur_price and prev_price and abs(cur_price - prev_price) / max(prev_price, 1e-9) >= 0.02:
delta += 0.5
if cur_pnl is not None and prev_pnl is not None and abs(cur_pnl - prev_pnl) >= 0.03:
delta += 0.5
last_leader = state.get("last_top_candidate")
if current_leader and last_leader and current_leader.get("symbol") != last_leader.get("symbol"):
delta += 1.0
for band in ("major", "mid", "meme"):
cur_band = current_layers.get(band, [])
prev_band = last_layers.get(band, [])
cur_l = cur_band[0] if cur_band else None
prev_l = prev_band[0] if prev_band else None
if cur_l and prev_l and cur_l.get("symbol") != prev_l.get("symbol"):
delta += 0.5
if last_market_regime:
if current_market.get("btc_regime") != last_market_regime.get("btc_regime"):
delta += 1.5
if current_market.get("eth_regime") != last_market_regime.get("eth_regime"):
delta += 1.5
if last_portfolio_value not in (None, 0):
portfolio_delta = abs(snapshot["portfolio_value_usdt"] - last_portfolio_value) / max(last_portfolio_value, 1e-9)
if portfolio_delta >= 0.05:
delta += 1.0
last_trigger_hard_types = {r.split(":")[0] for r in (state.get("last_trigger_hard_reasons") or [])}
current_hard_types = {r.split(":")[0] for r in hard_reasons}
if current_hard_types - last_trigger_hard_types:
delta += 2.0
return delta
signal_delta = _signal_delta()
effective_cooldown = cooldown_minutes
if signal_delta < 1.0:
effective_cooldown = max(cooldown_minutes, 90)
elif signal_delta >= 2.5:
effective_cooldown = max(0, cooldown_minutes - 15)
cooldown_active = bool(last_triggered_at and now - last_triggered_at < timedelta(minutes=effective_cooldown))
dedup_window = timedelta(minutes=HARD_REASON_DEDUP_MINUTES)
for hr in list(hard_reasons):
last_at = parse_ts(last_hard_reasons_at.get(hr))
if last_at and now - last_at < dedup_window:
hard_reasons.remove(hr)
details.append(f"{hr} 近期已触发,{HARD_REASON_DEDUP_MINUTES}分钟内去重")
hard_trigger = bool(hard_reasons)
if profile.get("dust_mode") and not hard_trigger and soft_score < soft_score_threshold + 1.0:
details.append("微型资金/粉尘仓位模式:抬高软触发门槛,避免无意义分析")
if profile.get("dust_mode") and not profile.get("new_entries_allowed") and any(
r in {"new-leader", "candidate-leader-init"} for r in soft_reasons
):
details.append("当前可用资金低于可执行阈值,新候选币仅做观察,不单独触发深度分析")
soft_score = max(0.0, soft_score - 0.75)
should_analyze = hard_trigger or soft_score >= soft_score_threshold
if cooldown_active and not hard_trigger and should_analyze:
should_analyze = False
details.append(f"处于 {cooldown_minutes} 分钟冷却窗口,软触发先记录不升级")
if cooldown_active and not hard_trigger and reasons and soft_score < soft_score_threshold:
details.append(f"处于 {cooldown_minutes} 分钟冷却窗口,且软信号强度不足 ({soft_score:.2f} < {soft_score_threshold:.2f})")
status = "deep_analysis_required" if should_analyze else "stable"
compact_lines = [
f"状态: {status}",
f"组合净值: ${snapshot['portfolio_value_usdt']:.4f} | 可用USDT: ${snapshot['free_usdt']:.4f}",
f"本地时段: {snapshot['session']} | 时区: {snapshot['timezone']}",
f"BTC/ETH: {market.get('btc_regime')} ({market.get('btc_24h_pct')}%), {market.get('eth_regime')} ({market.get('eth_24h_pct')}%) | 波动分数 {market.get('volatility_score')}",
f"门控画像: capital={profile['capital_band']}, session={profile['session_mode']}, volatility={profile['volatility_mode']}, dust={profile['dust_mode']}",
f"阈值: price={price_trigger:.1%}, pnl={pnl_trigger:.1%}, portfolio={portfolio_trigger:.1%}, candidate={candidate_ratio_trigger:.2f}, cooldown={effective_cooldown}m({cooldown_minutes}m基础), force={force_minutes}m",
f"软信号分: {soft_score:.2f} / {soft_score_threshold:.2f}",
f"信号变化度: {signal_delta:.1f}",
]
if snapshot["positions"]:
compact_lines.append("持仓:")
for pos in snapshot["positions"][:4]:
pnl = pos.get("pnl_pct")
pnl_text = f"{pnl:+.1%}" if pnl is not None else "n/a"
compact_lines.append(
f"- {pos['symbol']}: qty={pos['quantity']}, px={pos.get('last_price')}, pnl={pnl_text}, value=${pos.get('market_value_usdt')}"
)
else:
compact_lines.append("持仓: 当前无现货仓位")
if snapshot["top_candidates"]:
compact_lines.append("候选榜:")
for cand in snapshot["top_candidates"]:
compact_lines.append(
f"- {cand['symbol']}: score={cand['score']}, 24h={cand['change_24h_pct']}%, vol=${cand['volume_24h']}"
)
layers = snapshot.get("top_candidates_layers", {})
for band, band_cands in layers.items():
if band_cands:
compact_lines.append(f"{band} 层:")
for cand in band_cands:
compact_lines.append(
f"- {cand['symbol']}: score={cand['score']}, 24h={cand['change_24h_pct']}%, vol=${cand['volume_24h']}"
)
if details:
compact_lines.append("触发说明:")
for item in details:
compact_lines.append(f"- {item}")
return {
"generated_at": snapshot["generated_at"],
"status": status,
"should_analyze": should_analyze,
"pending_trigger": pending_trigger,
"run_requested": bool(run_requested_at),
"run_requested_at": run_requested_at.isoformat() if run_requested_at else None,
"cooldown_active": cooldown_active,
"effective_cooldown_minutes": effective_cooldown,
"signal_delta": round(signal_delta, 2),
"reasons": reasons,
"hard_reasons": hard_reasons,
"soft_reasons": soft_reasons,
"soft_score": round(soft_score, 3),
"adaptive_profile": profile,
"portfolio_value_usdt": snapshot["portfolio_value_usdt"],
"free_usdt": snapshot["free_usdt"],
"market_regime": snapshot["market_regime"],
"session": snapshot["session"],
"positions": snapshot["positions"],
"top_candidates": snapshot["top_candidates"],
"top_candidates_layers": layers,
"snapshot_hash": snapshot["snapshot_hash"],
"compact_summary": "\n".join(compact_lines),
"details": details,
}
def update_state_after_observation(state: dict, snapshot: dict, analysis: dict):
new_state = dict(state)
new_state.update({
"last_observed_at": snapshot["generated_at"],
"last_snapshot_hash": snapshot["snapshot_hash"],
"last_positions_hash": snapshot["positions_hash"],
"last_candidates_hash": snapshot["candidates_hash"],
"last_portfolio_value_usdt": snapshot["portfolio_value_usdt"],
"last_market_regime": snapshot["market_regime"],
"last_positions_map": {
p["symbol"]: {"last_price": p.get("last_price"), "pnl_pct": p.get("pnl_pct")}
for p in snapshot["positions"]
},
"last_top_candidate": snapshot["top_candidates"][0] if snapshot["top_candidates"] else None,
"last_candidates_layers": snapshot.get("top_candidates_layers", {}),
"last_adaptive_profile": analysis.get("adaptive_profile", {}),
})
if analysis["should_analyze"]:
new_state["pending_trigger"] = True
new_state["pending_reasons"] = analysis["details"]
new_state["last_triggered_at"] = snapshot["generated_at"]
new_state["last_trigger_snapshot_hash"] = snapshot["snapshot_hash"]
new_state["last_trigger_hard_reasons"] = analysis.get("hard_reasons", [])
new_state["last_trigger_signal_delta"] = analysis.get("signal_delta", 0.0)
last_hard_reasons_at = dict(state.get("last_hard_reasons_at", {}))
for hr in analysis.get("hard_reasons", []):
last_hard_reasons_at[hr] = snapshot["generated_at"]
cutoff = utc_now() - timedelta(hours=24)
pruned = {k: v for k, v in last_hard_reasons_at.items() if parse_ts(v) and parse_ts(v) > cutoff}
new_state["last_hard_reasons_at"] = pruned
return new_state
from importlib import import_module
_MODULE_MAP = {
"PATHS": ".precheck_constants",
"BASE_DIR": ".precheck_constants",
"STATE_DIR": ".precheck_constants",
"STATE_FILE": ".precheck_constants",
"POSITIONS_FILE": ".precheck_constants",
"CONFIG_FILE": ".precheck_constants",
"ENV_FILE": ".precheck_constants",
"BASE_PRICE_MOVE_TRIGGER_PCT": ".precheck_constants",
"BASE_PNL_TRIGGER_PCT": ".precheck_constants",
"BASE_PORTFOLIO_MOVE_TRIGGER_PCT": ".precheck_constants",
"BASE_CANDIDATE_SCORE_TRIGGER_RATIO": ".precheck_constants",
"BASE_FORCE_ANALYSIS_AFTER_MINUTES": ".precheck_constants",
"BASE_COOLDOWN_MINUTES": ".precheck_constants",
"TOP_CANDIDATES": ".precheck_constants",
"MIN_ACTIONABLE_USDT": ".precheck_constants",
"MIN_REAL_POSITION_VALUE_USDT": ".precheck_constants",
"BLACKLIST": ".precheck_constants",
"HARD_STOP_PCT": ".precheck_constants",
"HARD_MOON_PCT": ".precheck_constants",
"MIN_CHANGE_PCT": ".precheck_constants",
"MAX_PRICE_CAP": ".precheck_constants",
"HARD_REASON_DEDUP_MINUTES": ".precheck_constants",
"MAX_PENDING_TRIGGER_MINUTES": ".precheck_constants",
"MAX_RUN_REQUEST_MINUTES": ".precheck_constants",
"utc_now": ".time_utils",
"utc_iso": ".time_utils",
"parse_ts": ".time_utils",
"get_local_now": ".time_utils",
"session_label": ".time_utils",
"load_json": ".data_utils",
"stable_hash": ".data_utils",
"to_float": ".data_utils",
"norm_symbol": ".data_utils",
"load_env": ".state_manager",
"load_positions": ".state_manager",
"load_state": ".state_manager",
"load_config": ".state_manager",
"clear_run_request_fields": ".state_manager",
"sanitize_state_for_stale_triggers": ".state_manager",
"save_state": ".state_manager",
"update_state_after_observation": ".state_manager",
"get_exchange": ".market_data",
"fetch_ohlcv_batch": ".market_data",
"compute_ohlcv_metrics": ".market_data",
"enrich_candidates_and_positions": ".market_data",
"regime_from_pct": ".market_data",
"_liquidity_score": ".candidate_scoring",
"_breakout_score": ".candidate_scoring",
"top_candidates_from_tickers": ".candidate_scoring",
"build_snapshot": ".snapshot_builder",
"build_adaptive_profile": ".adaptive_profile",
"_candidate_weight": ".adaptive_profile",
"analyze_trigger": ".trigger_analyzer",
}
__all__ = sorted(set(_MODULE_MAP) | {"main"})
def __getattr__(name: str):
if name not in _MODULE_MAP:
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
module_name = _MODULE_MAP[name]
module = import_module(module_name, __package__)
return getattr(module, name)
def __dir__():
return sorted(set(globals()) | set(__all__))
def main():
from .precheck_service import run as _run_service
import sys
return _run_service(sys.argv[1:])
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -2,8 +2,4 @@
from __future__ import annotations
from . import precheck_core
def build_snapshot() -> dict:
return precheck_core.build_snapshot()
from .snapshot_builder import build_snapshot

View File

@@ -4,28 +4,18 @@ from __future__ import annotations
import json
from . import precheck_core
def load_state() -> dict:
return precheck_core.load_state()
def save_state(state: dict) -> None:
precheck_core.save_state(state)
def sanitize_state_for_stale_triggers(state: dict) -> dict:
return precheck_core.sanitize_state_for_stale_triggers(state)
def update_state_after_observation(state: dict, snapshot: dict, analysis: dict) -> dict:
return precheck_core.update_state_after_observation(state, snapshot, analysis)
from .state_manager import (
load_state,
sanitize_state_for_stale_triggers,
save_state,
update_state_after_observation,
)
from .time_utils import utc_iso
def mark_run_requested(note: str = "") -> dict:
state = load_state()
state["run_requested_at"] = precheck_core.utc_iso()
state["run_requested_at"] = utc_iso()
state["run_request_note"] = note
save_state(state)
payload = {"ok": True, "run_requested_at": state["run_requested_at"], "note": note}
@@ -35,7 +25,7 @@ def mark_run_requested(note: str = "") -> dict:
def ack_analysis(note: str = "") -> dict:
state = load_state()
state["last_deep_analysis_at"] = precheck_core.utc_iso()
state["last_deep_analysis_at"] = utc_iso()
state["pending_trigger"] = False
state["pending_reasons"] = []
state["last_ack_note"] = note

View File

@@ -0,0 +1,110 @@
"""Snapshot construction for precheck."""
from __future__ import annotations
from .candidate_scoring import top_candidates_from_tickers
from .data_utils import norm_symbol, stable_hash, to_float
from .market_data import enrich_candidates_and_positions, get_exchange, regime_from_pct
from .precheck_constants import MIN_ACTIONABLE_USDT, MIN_REAL_POSITION_VALUE_USDT
from .state_manager import load_config, load_positions
from .time_utils import get_local_now, utc_iso
def build_snapshot():
config = load_config()
local_dt, tz_name = get_local_now(config)
ex = get_exchange()
positions = load_positions()
tickers = ex.fetch_tickers()
balances = ex.fetch_balance()["free"]
free_usdt = to_float(balances.get("USDT"))
positions_view = []
total_position_value = 0.0
largest_position_value = 0.0
actionable_positions = 0
for pos in positions:
symbol = pos.get("symbol") or ""
sym_ccxt = norm_symbol(symbol)
ticker = tickers.get(sym_ccxt, {})
last = to_float(ticker.get("last"), None)
qty = to_float(pos.get("quantity"))
avg_cost = to_float(pos.get("avg_cost"), None)
value = round(qty * last, 4) if last is not None else None
pnl_pct = round((last - avg_cost) / avg_cost, 4) if last is not None and avg_cost else None
high = to_float(ticker.get("high"))
low = to_float(ticker.get("low"))
distance_from_high = (high - last) / max(high, 1e-9) if high and last else None
if value is not None:
total_position_value += value
largest_position_value = max(largest_position_value, value)
if value >= MIN_REAL_POSITION_VALUE_USDT:
actionable_positions += 1
positions_view.append({
"symbol": symbol,
"base_asset": pos.get("base_asset"),
"quantity": qty,
"avg_cost": avg_cost,
"last_price": last,
"market_value_usdt": value,
"pnl_pct": pnl_pct,
"high_24h": round(high, 8) if high else None,
"low_24h": round(low, 8) if low else None,
"distance_from_high_pct": round(distance_from_high * 100, 2) if distance_from_high is not None else None,
})
btc_pct = to_float((tickers.get("BTC/USDT") or {}).get("percentage"), None)
eth_pct = to_float((tickers.get("ETH/USDT") or {}).get("percentage"), None)
global_candidates, candidate_layers = top_candidates_from_tickers(tickers)
global_candidates, candidate_layers, positions_view = enrich_candidates_and_positions(
global_candidates, candidate_layers, positions_view, tickers, ex
)
leader_score = global_candidates[0]["score"] if global_candidates else 0.0
portfolio_value = round(free_usdt + total_position_value, 4)
volatility_score = round(max(abs(to_float(btc_pct, 0)), abs(to_float(eth_pct, 0))), 2)
position_structure = [
{
"symbol": p.get("symbol"),
"base_asset": p.get("base_asset"),
"quantity": round(to_float(p.get("quantity"), 0), 10),
"avg_cost": to_float(p.get("avg_cost"), None),
}
for p in positions_view
]
snapshot = {
"generated_at": utc_iso(),
"timezone": tz_name,
"local_time": local_dt.isoformat(),
"session": get_local_now(config)[0] if False else None, # will be replaced below
"free_usdt": round(free_usdt, 4),
"portfolio_value_usdt": portfolio_value,
"largest_position_value_usdt": round(largest_position_value, 4),
"actionable_positions": actionable_positions,
"positions": positions_view,
"positions_hash": stable_hash(position_structure),
"top_candidates": global_candidates,
"top_candidates_layers": candidate_layers,
"candidates_hash": stable_hash({"global": global_candidates, "layers": candidate_layers}),
"market_regime": {
"btc_24h_pct": round(btc_pct, 2) if btc_pct is not None else None,
"btc_regime": regime_from_pct(btc_pct),
"eth_24h_pct": round(eth_pct, 2) if eth_pct is not None else None,
"eth_regime": regime_from_pct(eth_pct),
"volatility_score": volatility_score,
"leader_score": round(leader_score, 4),
},
}
# fix session after the fact to avoid re-fetching config
snapshot["session"] = None
from .time_utils import session_label
snapshot["session"] = session_label(local_dt)
snapshot["snapshot_hash"] = stable_hash({
"portfolio_value_usdt": snapshot["portfolio_value_usdt"],
"positions_hash": snapshot["positions_hash"],
"candidates_hash": snapshot["candidates_hash"],
"market_regime": snapshot["market_regime"],
"session": snapshot["session"],
})
return snapshot

View File

@@ -0,0 +1,128 @@
"""State management for precheck workflows."""
from __future__ import annotations
from datetime import timedelta
from ..runtime import load_env_file
from .data_utils import load_json
from .precheck_constants import (
CONFIG_FILE,
ENV_FILE,
MAX_PENDING_TRIGGER_MINUTES,
MAX_RUN_REQUEST_MINUTES,
POSITIONS_FILE,
STATE_DIR,
STATE_FILE,
)
from .time_utils import parse_ts, utc_iso, utc_now
def load_env() -> None:
load_env_file()
def load_positions():
return load_json(POSITIONS_FILE, {}).get("positions", [])
def load_state():
return load_json(STATE_FILE, {})
def load_config():
return load_json(CONFIG_FILE, {})
def clear_run_request_fields(state: dict):
state.pop("run_requested_at", None)
state.pop("run_request_note", None)
def sanitize_state_for_stale_triggers(state: dict):
sanitized = dict(state)
notes = []
now = utc_now()
run_requested_at = parse_ts(sanitized.get("run_requested_at"))
last_deep_analysis_at = parse_ts(sanitized.get("last_deep_analysis_at"))
last_triggered_at = parse_ts(sanitized.get("last_triggered_at"))
pending_trigger = bool(sanitized.get("pending_trigger"))
if run_requested_at and last_deep_analysis_at and last_deep_analysis_at >= run_requested_at:
clear_run_request_fields(sanitized)
if pending_trigger and (not last_triggered_at or last_deep_analysis_at >= last_triggered_at):
sanitized["pending_trigger"] = False
sanitized["pending_reasons"] = []
sanitized["last_ack_note"] = (
f"auto-cleared completed trigger at {utc_iso()} because last_deep_analysis_at >= run_requested_at"
)
pending_trigger = False
notes.append(
f"自动清理已完成的 run_requested 标记:最近深度分析时间 {last_deep_analysis_at.isoformat()} >= 请求时间 {run_requested_at.isoformat()}"
)
run_requested_at = None
if run_requested_at and now - run_requested_at > timedelta(minutes=MAX_RUN_REQUEST_MINUTES):
clear_run_request_fields(sanitized)
notes.append(
f"自动清理超时 run_requested 标记:已等待 {(now - run_requested_at).total_seconds() / 60:.1f} 分钟,超过 {MAX_RUN_REQUEST_MINUTES} 分钟"
)
run_requested_at = None
pending_anchor = run_requested_at or last_triggered_at or last_deep_analysis_at
if pending_trigger and pending_anchor and now - pending_anchor > timedelta(minutes=MAX_PENDING_TRIGGER_MINUTES):
sanitized["pending_trigger"] = False
sanitized["pending_reasons"] = []
sanitized["last_ack_note"] = (
f"auto-recovered stale pending trigger at {utc_iso()} after waiting "
f"{(now - pending_anchor).total_seconds() / 60:.1f} minutes"
)
notes.append(
f"自动解除 pending_trigger触发状态已悬挂 {(now - pending_anchor).total_seconds() / 60:.1f} 分钟,超过 {MAX_PENDING_TRIGGER_MINUTES} 分钟"
)
sanitized["_stale_recovery_notes"] = notes
return sanitized
def save_state(state: dict):
import json
STATE_DIR.mkdir(parents=True, exist_ok=True)
state_to_save = dict(state)
state_to_save.pop("_stale_recovery_notes", None)
STATE_FILE.write_text(json.dumps(state_to_save, indent=2, ensure_ascii=False), encoding="utf-8")
def update_state_after_observation(state: dict, snapshot: dict, analysis: dict):
new_state = dict(state)
new_state.update({
"last_observed_at": snapshot["generated_at"],
"last_snapshot_hash": snapshot["snapshot_hash"],
"last_positions_hash": snapshot["positions_hash"],
"last_candidates_hash": snapshot["candidates_hash"],
"last_portfolio_value_usdt": snapshot["portfolio_value_usdt"],
"last_market_regime": snapshot["market_regime"],
"last_positions_map": {
p["symbol"]: {"last_price": p.get("last_price"), "pnl_pct": p.get("pnl_pct")}
for p in snapshot["positions"]
},
"last_top_candidate": snapshot["top_candidates"][0] if snapshot["top_candidates"] else None,
"last_candidates_layers": snapshot.get("top_candidates_layers", {}),
"last_adaptive_profile": analysis.get("adaptive_profile", {}),
})
if analysis["should_analyze"]:
new_state["pending_trigger"] = True
new_state["pending_reasons"] = analysis["details"]
new_state["last_triggered_at"] = snapshot["generated_at"]
new_state["last_trigger_snapshot_hash"] = snapshot["snapshot_hash"]
new_state["last_trigger_hard_reasons"] = analysis.get("hard_reasons", [])
new_state["last_trigger_signal_delta"] = analysis.get("signal_delta", 0.0)
last_hard_reasons_at = dict(state.get("last_hard_reasons_at", {}))
for hr in analysis.get("hard_reasons", []):
last_hard_reasons_at[hr] = snapshot["generated_at"]
cutoff = utc_now() - timedelta(hours=24)
pruned = {k: v for k, v in last_hard_reasons_at.items() if parse_ts(v) and parse_ts(v) > cutoff}
new_state["last_hard_reasons_at"] = pruned
return new_state

View File

@@ -0,0 +1,49 @@
"""Time utilities for precheck."""
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from zoneinfo import ZoneInfo
def utc_now() -> datetime:
return datetime.now(timezone.utc)
def utc_iso() -> str:
return utc_now().isoformat()
def parse_ts(value: str | None) -> datetime | None:
if not value:
return None
try:
ts = datetime.fromisoformat(value)
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
return ts
except Exception:
return None
def get_local_now(config: dict) -> tuple[datetime, str]:
tz_name = config.get("timezone") or "Asia/Shanghai"
try:
tz = ZoneInfo(tz_name)
except Exception:
tz = ZoneInfo("Asia/Shanghai")
tz_name = "Asia/Shanghai"
return utc_now().astimezone(tz), tz_name
def session_label(local_dt: datetime) -> str:
hour = local_dt.hour
if 0 <= hour < 7:
return "overnight"
if 7 <= hour < 12:
return "asia-morning"
if 12 <= hour < 17:
return "asia-afternoon"
if 17 <= hour < 21:
return "europe-open"
return "us-session"

View File

@@ -0,0 +1,317 @@
"""Trigger analysis logic for precheck."""
from __future__ import annotations
from datetime import timedelta
from .adaptive_profile import _candidate_weight, build_adaptive_profile
from .data_utils import to_float
from .precheck_constants import (
HARD_MOON_PCT,
HARD_REASON_DEDUP_MINUTES,
HARD_STOP_PCT,
MAX_PENDING_TRIGGER_MINUTES,
MAX_RUN_REQUEST_MINUTES,
MIN_REAL_POSITION_VALUE_USDT,
)
from .time_utils import parse_ts, utc_iso, utc_now
def analyze_trigger(snapshot: dict, state: dict):
reasons = []
details = list(state.get("_stale_recovery_notes", []))
hard_reasons = []
soft_reasons = []
soft_score = 0.0
profile = build_adaptive_profile(snapshot)
market = snapshot.get("market_regime", {})
now = utc_now()
last_positions_hash = state.get("last_positions_hash")
last_portfolio_value = state.get("last_portfolio_value_usdt")
last_market_regime = state.get("last_market_regime", {})
last_positions_map = state.get("last_positions_map", {})
last_top_candidate = state.get("last_top_candidate")
pending_trigger = bool(state.get("pending_trigger"))
run_requested_at = parse_ts(state.get("run_requested_at"))
last_deep_analysis_at = parse_ts(state.get("last_deep_analysis_at"))
last_triggered_at = parse_ts(state.get("last_triggered_at"))
last_trigger_snapshot_hash = state.get("last_trigger_snapshot_hash")
last_hard_reasons_at = state.get("last_hard_reasons_at", {})
price_trigger = profile["price_move_trigger_pct"]
pnl_trigger = profile["pnl_trigger_pct"]
portfolio_trigger = profile["portfolio_move_trigger_pct"]
candidate_ratio_trigger = profile["candidate_score_trigger_ratio"]
force_minutes = profile["force_analysis_after_minutes"]
cooldown_minutes = profile["cooldown_minutes"]
soft_score_threshold = profile["soft_score_threshold"]
if pending_trigger:
reasons.append("pending-trigger-unacked")
hard_reasons.append("pending-trigger-unacked")
details.append("上次已触发深度分析但尚未确认完成")
if run_requested_at:
details.append(f"外部门控已在 {run_requested_at.isoformat()} 请求运行分析任务")
if not last_deep_analysis_at:
reasons.append("first-analysis")
hard_reasons.append("first-analysis")
details.append("尚未记录过深度分析")
elif now - last_deep_analysis_at >= timedelta(minutes=force_minutes):
reasons.append("stale-analysis")
hard_reasons.append("stale-analysis")
details.append(f"距离上次深度分析已超过 {force_minutes} 分钟")
if last_positions_hash and snapshot["positions_hash"] != last_positions_hash:
reasons.append("positions-changed")
hard_reasons.append("positions-changed")
details.append("持仓结构发生变化")
if last_portfolio_value not in (None, 0):
portfolio_delta = abs(snapshot["portfolio_value_usdt"] - last_portfolio_value) / max(last_portfolio_value, 1e-9)
if portfolio_delta >= portfolio_trigger:
if portfolio_delta >= 1.0:
reasons.append("portfolio-extreme-move")
hard_reasons.append("portfolio-extreme-move")
details.append(f"组合净值剧烈变化 {portfolio_delta:.1%},超过 100%,视为硬触发")
else:
reasons.append("portfolio-move")
soft_reasons.append("portfolio-move")
soft_score += 1.0
details.append(f"组合净值变化 {portfolio_delta:.1%},阈值 {portfolio_trigger:.1%}")
for pos in snapshot["positions"]:
symbol = pos["symbol"]
prev = last_positions_map.get(symbol, {})
cur_price = pos.get("last_price")
prev_price = prev.get("last_price")
cur_pnl = pos.get("pnl_pct")
prev_pnl = prev.get("pnl_pct")
market_value = to_float(pos.get("market_value_usdt"), 0)
actionable_position = market_value >= MIN_REAL_POSITION_VALUE_USDT
if cur_price and prev_price:
price_move = abs(cur_price - prev_price) / max(prev_price, 1e-9)
if price_move >= price_trigger:
reasons.append(f"price-move:{symbol}")
soft_reasons.append(f"price-move:{symbol}")
soft_score += 1.0 if actionable_position else 0.4
details.append(f"{symbol} 价格变化 {price_move:.1%},阈值 {price_trigger:.1%}")
if cur_pnl is not None and prev_pnl is not None:
pnl_move = abs(cur_pnl - prev_pnl)
if pnl_move >= pnl_trigger:
reasons.append(f"pnl-move:{symbol}")
soft_reasons.append(f"pnl-move:{symbol}")
soft_score += 1.0 if actionable_position else 0.4
details.append(f"{symbol} 盈亏变化 {pnl_move:.1%},阈值 {pnl_trigger:.1%}")
if cur_pnl is not None:
stop_band = -0.06 if actionable_position else -0.12
take_band = 0.14 if actionable_position else 0.25
if cur_pnl <= stop_band or cur_pnl >= take_band:
reasons.append(f"risk-band:{symbol}")
hard_reasons.append(f"risk-band:{symbol}")
details.append(f"{symbol} 接近执行阈值,当前盈亏 {cur_pnl:.1%}")
if cur_pnl <= HARD_STOP_PCT:
reasons.append(f"hard-stop:{symbol}")
hard_reasons.append(f"hard-stop:{symbol}")
details.append(f"{symbol} 盈亏超过 {HARD_STOP_PCT:.1%},触发紧急硬触发")
current_market = snapshot.get("market_regime", {})
if last_market_regime:
if current_market.get("btc_regime") != last_market_regime.get("btc_regime"):
reasons.append("btc-regime-change")
hard_reasons.append("btc-regime-change")
details.append(f"BTC 由 {last_market_regime.get('btc_regime')} 切换为 {current_market.get('btc_regime')}")
if current_market.get("eth_regime") != last_market_regime.get("eth_regime"):
reasons.append("eth-regime-change")
hard_reasons.append("eth-regime-change")
details.append(f"ETH 由 {last_market_regime.get('eth_regime')} 切换为 {current_market.get('eth_regime')}")
for cand in snapshot.get("top_candidates", []):
if cand.get("change_24h_pct", 0) >= HARD_MOON_PCT * 100:
reasons.append(f"hard-moon:{cand['symbol']}")
hard_reasons.append(f"hard-moon:{cand['symbol']}")
details.append(f"候选币 {cand['symbol']} 24h 涨幅 {cand['change_24h_pct']:.1f}%,触发强势硬触发")
candidate_weight = _candidate_weight(snapshot, profile)
last_layers = state.get("last_candidates_layers", {})
current_layers = snapshot.get("top_candidates_layers", {})
for band in ("major", "mid", "meme"):
cur_band = current_layers.get(band, [])
prev_band = last_layers.get(band, [])
cur_leader = cur_band[0] if cur_band else None
prev_leader = prev_band[0] if prev_band else None
if cur_leader and prev_leader and cur_leader["symbol"] != prev_leader["symbol"]:
score_ratio = cur_leader.get("score", 0) / max(prev_leader.get("score", 0.0001), 0.0001)
if score_ratio >= candidate_ratio_trigger:
reasons.append(f"new-leader-{band}:{cur_leader['symbol']}")
soft_reasons.append(f"new-leader-{band}:{cur_leader['symbol']}")
soft_score += candidate_weight * 0.7
details.append(
f"{band} 层新榜首 {cur_leader['symbol']} 替代 {prev_leader['symbol']}score 比例 {score_ratio:.2f}"
)
current_leader = snapshot.get("top_candidates", [{}])[0] if snapshot.get("top_candidates") else None
if last_top_candidate and current_leader:
if current_leader.get("symbol") != last_top_candidate.get("symbol"):
score_ratio = current_leader.get("score", 0) / max(last_top_candidate.get("score", 0.0001), 0.0001)
if score_ratio >= candidate_ratio_trigger:
reasons.append("new-leader")
soft_reasons.append("new-leader")
soft_score += candidate_weight
details.append(
f"新候选币 {current_leader.get('symbol')} 领先上次榜首score 比例 {score_ratio:.2f},阈值 {candidate_ratio_trigger:.2f}"
)
elif current_leader and not last_top_candidate:
reasons.append("candidate-leader-init")
soft_reasons.append("candidate-leader-init")
soft_score += candidate_weight
details.append(f"首次记录候选榜首 {current_leader.get('symbol')}")
def _signal_delta() -> float:
delta = 0.0
if last_trigger_snapshot_hash and snapshot.get("snapshot_hash") != last_trigger_snapshot_hash:
delta += 0.5
if snapshot["positions_hash"] != last_positions_hash:
delta += 1.5
for pos in snapshot["positions"]:
symbol = pos["symbol"]
prev = last_positions_map.get(symbol, {})
cur_price = pos.get("last_price")
prev_price = prev.get("last_price")
cur_pnl = pos.get("pnl_pct")
prev_pnl = prev.get("pnl_pct")
if cur_price and prev_price and abs(cur_price - prev_price) / max(prev_price, 1e-9) >= 0.02:
delta += 0.5
if cur_pnl is not None and prev_pnl is not None and abs(cur_pnl - prev_pnl) >= 0.03:
delta += 0.5
last_leader = state.get("last_top_candidate")
if current_leader and last_leader and current_leader.get("symbol") != last_leader.get("symbol"):
delta += 1.0
for band in ("major", "mid", "meme"):
cur_band = current_layers.get(band, [])
prev_band = last_layers.get(band, [])
cur_l = cur_band[0] if cur_band else None
prev_l = prev_band[0] if prev_band else None
if cur_l and prev_l and cur_l.get("symbol") != prev_l.get("symbol"):
delta += 0.5
if last_market_regime:
if current_market.get("btc_regime") != last_market_regime.get("btc_regime"):
delta += 1.5
if current_market.get("eth_regime") != last_market_regime.get("eth_regime"):
delta += 1.5
if last_portfolio_value not in (None, 0):
portfolio_delta = abs(snapshot["portfolio_value_usdt"] - last_portfolio_value) / max(last_portfolio_value, 1e-9)
if portfolio_delta >= 0.05:
delta += 1.0
last_trigger_hard_types = {r.split(":")[0] for r in (state.get("last_trigger_hard_reasons") or [])}
current_hard_types = {r.split(":")[0] for r in hard_reasons}
if current_hard_types - last_trigger_hard_types:
delta += 2.0
return delta
signal_delta = _signal_delta()
effective_cooldown = cooldown_minutes
if signal_delta < 1.0:
effective_cooldown = max(cooldown_minutes, 90)
elif signal_delta >= 2.5:
effective_cooldown = max(0, cooldown_minutes - 15)
cooldown_active = bool(last_triggered_at and now - last_triggered_at < timedelta(minutes=effective_cooldown))
dedup_window = timedelta(minutes=HARD_REASON_DEDUP_MINUTES)
for hr in list(hard_reasons):
last_at = parse_ts(last_hard_reasons_at.get(hr))
if last_at and now - last_at < dedup_window:
hard_reasons.remove(hr)
details.append(f"{hr} 近期已触发,{HARD_REASON_DEDUP_MINUTES}分钟内去重")
hard_trigger = bool(hard_reasons)
if profile.get("dust_mode") and not hard_trigger and soft_score < soft_score_threshold + 1.0:
details.append("微型资金/粉尘仓位模式:抬高软触发门槛,避免无意义分析")
if profile.get("dust_mode") and not profile.get("new_entries_allowed") and any(
r in {"new-leader", "candidate-leader-init"} for r in soft_reasons
):
details.append("当前可用资金低于可执行阈值,新候选币仅做观察,不单独触发深度分析")
soft_score = max(0.0, soft_score - 0.75)
should_analyze = hard_trigger or soft_score >= soft_score_threshold
if cooldown_active and not hard_trigger and should_analyze:
should_analyze = False
details.append(f"处于 {cooldown_minutes} 分钟冷却窗口,软触发先记录不升级")
if cooldown_active and not hard_trigger and reasons and soft_score < soft_score_threshold:
details.append(f"处于 {cooldown_minutes} 分钟冷却窗口,且软信号强度不足 ({soft_score:.2f} < {soft_score_threshold:.2f})")
status = "deep_analysis_required" if should_analyze else "stable"
compact_lines = [
f"状态: {status}",
f"组合净值: ${snapshot['portfolio_value_usdt']:.4f} | 可用USDT: ${snapshot['free_usdt']:.4f}",
f"本地时段: {snapshot['session']} | 时区: {snapshot['timezone']}",
f"BTC/ETH: {market.get('btc_regime')} ({market.get('btc_24h_pct')}%), {market.get('eth_regime')} ({market.get('eth_24h_pct')}%) | 波动分数 {market.get('volatility_score')}",
f"门控画像: capital={profile['capital_band']}, session={profile['session_mode']}, volatility={profile['volatility_mode']}, dust={profile['dust_mode']}",
f"阈值: price={price_trigger:.1%}, pnl={pnl_trigger:.1%}, portfolio={portfolio_trigger:.1%}, candidate={candidate_ratio_trigger:.2f}, cooldown={effective_cooldown}m({cooldown_minutes}m基础), force={force_minutes}m",
f"软信号分: {soft_score:.2f} / {soft_score_threshold:.2f}",
f"信号变化度: {signal_delta:.1f}",
]
if snapshot["positions"]:
compact_lines.append("持仓:")
for pos in snapshot["positions"][:4]:
pnl = pos.get("pnl_pct")
pnl_text = f"{pnl:+.1%}" if pnl is not None else "n/a"
compact_lines.append(
f"- {pos['symbol']}: qty={pos['quantity']}, px={pos.get('last_price')}, pnl={pnl_text}, value=${pos.get('market_value_usdt')}"
)
else:
compact_lines.append("持仓: 当前无现货仓位")
if snapshot["top_candidates"]:
compact_lines.append("候选榜:")
for cand in snapshot["top_candidates"]:
compact_lines.append(
f"- {cand['symbol']}: score={cand['score']}, 24h={cand['change_24h_pct']}%, vol=${cand['volume_24h']}"
)
layers = snapshot.get("top_candidates_layers", {})
for band, band_cands in layers.items():
if band_cands:
compact_lines.append(f"{band} 层:")
for cand in band_cands:
compact_lines.append(
f"- {cand['symbol']}: score={cand['score']}, 24h={cand['change_24h_pct']}%, vol=${cand['volume_24h']}"
)
if details:
compact_lines.append("触发说明:")
for item in details:
compact_lines.append(f"- {item}")
return {
"generated_at": snapshot["generated_at"],
"status": status,
"should_analyze": should_analyze,
"pending_trigger": pending_trigger,
"run_requested": bool(run_requested_at),
"run_requested_at": run_requested_at.isoformat() if run_requested_at else None,
"cooldown_active": cooldown_active,
"effective_cooldown_minutes": effective_cooldown,
"signal_delta": round(signal_delta, 2),
"reasons": reasons,
"hard_reasons": hard_reasons,
"soft_reasons": soft_reasons,
"soft_score": round(soft_score, 3),
"adaptive_profile": profile,
"portfolio_value_usdt": snapshot["portfolio_value_usdt"],
"free_usdt": snapshot["free_usdt"],
"market_regime": snapshot["market_regime"],
"session": snapshot["session"],
"positions": snapshot["positions"],
"top_candidates": snapshot["top_candidates"],
"top_candidates_layers": layers,
"snapshot_hash": snapshot["snapshot_hash"],
"compact_summary": "\n".join(compact_lines),
"details": details,
}