refactor: finish facade migration for precheck and executor

This commit is contained in:
2026-04-15 20:50:38 +08:00
parent e6274d3a00
commit db981e8e5f
8 changed files with 1131 additions and 944 deletions

View File

@@ -56,7 +56,11 @@ The first extraction pass is already live:
- `smart-executor``commands.smart_executor` + `services.smart_executor_service`
- `precheck``commands.precheck` + `services.precheck_service`
- `precheck` internals now also have dedicated service modules for:
- root modules are compatibility facades only:
- `src/coinhunter/precheck.py`
- `src/coinhunter/smart_executor.py`
- `precheck` internals now live in dedicated service modules:
- `services.precheck_core`
- `services.precheck_state`
- `services.precheck_snapshot`
- `services.precheck_analysis`
@@ -65,7 +69,20 @@ This keeps behavior stable while giving the codebase a cleaner landing zone for
## Installation
Editable install:
Dedicated user-local install:
```bash
./scripts/install_local.sh
```
This creates:
- app environment: `~/.local/share/coinhunter-cli/venv`
- launcher: `~/.local/bin/coinhunter`
The launcher behaves like a normal installed CLI and simply forwards into the dedicated virtualenv.
Editable install for development:
```bash
pip install -e .

35
scripts/install_local.sh Executable file
View File

@@ -0,0 +1,35 @@
#!/usr/bin/env bash
set -euo pipefail
APP_HOME="${COINHUNTER_APP_HOME:-$HOME/.local/share/coinhunter-cli}"
VENV_DIR="$APP_HOME/venv"
BIN_DIR="${COINHUNTER_BIN_DIR:-$HOME/.local/bin}"
LAUNCHER="$BIN_DIR/coinhunter"
PYTHON_BIN="${PYTHON:-}"
if [[ -z "$PYTHON_BIN" ]]; then
if command -v python3 >/dev/null 2>&1; then
PYTHON_BIN="$(command -v python3)"
elif command -v python >/dev/null 2>&1; then
PYTHON_BIN="$(command -v python)"
else
echo "error: python3/python not found in PATH" >&2
exit 1
fi
fi
mkdir -p "$APP_HOME" "$BIN_DIR"
"$PYTHON_BIN" -m venv "$VENV_DIR"
"$VENV_DIR/bin/python" -m pip install --upgrade pip setuptools wheel
"$VENV_DIR/bin/python" -m pip install --upgrade "$(pwd)"
cat >"$LAUNCHER" <<EOF
#!/usr/bin/env bash
exec "$VENV_DIR/bin/coinhunter" "\$@"
EOF
chmod +x "$LAUNCHER"
echo "Installed coinhunter into:"
echo " venv: $VENV_DIR"
echo " launcher: $LAUNCHER"

998
src/coinhunter/precheck.py Executable file → Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -2,16 +2,16 @@
from __future__ import annotations
from .. import precheck as precheck_module
from . import precheck_core
def analyze_trigger(snapshot: dict, state: dict) -> dict:
return precheck_module.analyze_trigger(snapshot, state)
return precheck_core.analyze_trigger(snapshot, state)
def build_failure_payload(exc: Exception) -> dict:
return {
"generated_at": precheck_module.utc_iso(),
"generated_at": precheck_core.utc_iso(),
"status": "deep_analysis_required",
"should_analyze": True,
"pending_trigger": True,

View File

@@ -0,0 +1,900 @@
"""Service-owned precheck core logic.
This module holds the reusable implementation. Root-level ``coinhunter.precheck``
is intentionally kept as a compatibility facade for older imports and direct
module execution.
"""
from __future__ import annotations
import hashlib
import json
import os
import re
from datetime import datetime, timedelta, timezone
from pathlib import Path
from zoneinfo import ZoneInfo
import ccxt
from ..runtime import get_runtime_paths, load_env_file
PATHS = get_runtime_paths()
BASE_DIR = PATHS.root
STATE_DIR = PATHS.state_dir
STATE_FILE = PATHS.precheck_state_file
POSITIONS_FILE = PATHS.positions_file
CONFIG_FILE = PATHS.config_file
ENV_FILE = PATHS.env_file
BASE_PRICE_MOVE_TRIGGER_PCT = 0.025
BASE_PNL_TRIGGER_PCT = 0.03
BASE_PORTFOLIO_MOVE_TRIGGER_PCT = 0.03
BASE_CANDIDATE_SCORE_TRIGGER_RATIO = 1.15
BASE_FORCE_ANALYSIS_AFTER_MINUTES = 180
BASE_COOLDOWN_MINUTES = 45
TOP_CANDIDATES = 10
MIN_ACTIONABLE_USDT = 12.0
MIN_REAL_POSITION_VALUE_USDT = 8.0
BLACKLIST = {"USDC", "BUSD", "TUSD", "FDUSD", "USTC", "PAXG"}
HARD_STOP_PCT = -0.08
HARD_MOON_PCT = 0.25
MIN_CHANGE_PCT = 1.0
MAX_PRICE_CAP = None
HARD_REASON_DEDUP_MINUTES = 15
MAX_PENDING_TRIGGER_MINUTES = 30
MAX_RUN_REQUEST_MINUTES = 20
def utc_now():
return datetime.now(timezone.utc)
def utc_iso():
return utc_now().isoformat()
def parse_ts(value: str | None):
if not value:
return None
try:
ts = datetime.fromisoformat(value)
if ts.tzinfo is None:
ts = ts.replace(tzinfo=timezone.utc)
return ts
except Exception:
return None
def load_json(path: Path, default):
if not path.exists():
return default
try:
return json.loads(path.read_text(encoding="utf-8"))
except Exception:
return default
def load_env():
load_env_file(PATHS)
def load_positions():
return load_json(POSITIONS_FILE, {}).get("positions", [])
def load_state():
return load_json(STATE_FILE, {})
def load_config():
return load_json(CONFIG_FILE, {})
def clear_run_request_fields(state: dict):
state.pop("run_requested_at", None)
state.pop("run_request_note", None)
def sanitize_state_for_stale_triggers(state: dict):
sanitized = dict(state)
notes = []
now = utc_now()
run_requested_at = parse_ts(sanitized.get("run_requested_at"))
last_deep_analysis_at = parse_ts(sanitized.get("last_deep_analysis_at"))
last_triggered_at = parse_ts(sanitized.get("last_triggered_at"))
pending_trigger = bool(sanitized.get("pending_trigger"))
if run_requested_at and last_deep_analysis_at and last_deep_analysis_at >= run_requested_at:
clear_run_request_fields(sanitized)
if pending_trigger and (not last_triggered_at or last_deep_analysis_at >= last_triggered_at):
sanitized["pending_trigger"] = False
sanitized["pending_reasons"] = []
sanitized["last_ack_note"] = (
f"auto-cleared completed trigger at {utc_iso()} because last_deep_analysis_at >= run_requested_at"
)
pending_trigger = False
notes.append(
f"自动清理已完成的 run_requested 标记:最近深度分析时间 {last_deep_analysis_at.isoformat()} >= 请求时间 {run_requested_at.isoformat()}"
)
run_requested_at = None
if run_requested_at and now - run_requested_at > timedelta(minutes=MAX_RUN_REQUEST_MINUTES):
clear_run_request_fields(sanitized)
notes.append(
f"自动清理超时 run_requested 标记:已等待 {(now - run_requested_at).total_seconds() / 60:.1f} 分钟,超过 {MAX_RUN_REQUEST_MINUTES} 分钟"
)
run_requested_at = None
pending_anchor = run_requested_at or last_triggered_at or last_deep_analysis_at
if pending_trigger and pending_anchor and now - pending_anchor > timedelta(minutes=MAX_PENDING_TRIGGER_MINUTES):
sanitized["pending_trigger"] = False
sanitized["pending_reasons"] = []
sanitized["last_ack_note"] = (
f"auto-recovered stale pending trigger at {utc_iso()} after waiting "
f"{(now - pending_anchor).total_seconds() / 60:.1f} minutes"
)
notes.append(
f"自动解除 pending_trigger触发状态已悬挂 {(now - pending_anchor).total_seconds() / 60:.1f} 分钟,超过 {MAX_PENDING_TRIGGER_MINUTES} 分钟"
)
sanitized["_stale_recovery_notes"] = notes
return sanitized
def save_state(state: dict):
STATE_DIR.mkdir(parents=True, exist_ok=True)
state_to_save = dict(state)
state_to_save.pop("_stale_recovery_notes", None)
STATE_FILE.write_text(json.dumps(state_to_save, indent=2, ensure_ascii=False), encoding="utf-8")
def stable_hash(data) -> str:
payload = json.dumps(data, sort_keys=True, ensure_ascii=False, separators=(",", ":"))
return hashlib.sha1(payload.encode("utf-8")).hexdigest()
def get_exchange():
load_env()
api_key = os.getenv("BINANCE_API_KEY")
secret = os.getenv("BINANCE_API_SECRET")
if not api_key or not secret:
raise RuntimeError("Missing BINANCE_API_KEY or BINANCE_API_SECRET in ~/.hermes/.env")
ex = ccxt.binance({
"apiKey": api_key,
"secret": secret,
"options": {"defaultType": "spot"},
"enableRateLimit": True,
})
ex.load_markets()
return ex
def fetch_ohlcv_batch(ex, symbols: set, timeframe: str, limit: int):
results = {}
for sym in sorted(symbols):
try:
ohlcv = ex.fetch_ohlcv(sym, timeframe=timeframe, limit=limit)
if ohlcv and len(ohlcv) >= 2:
results[sym] = ohlcv
except Exception:
pass
return results
def compute_ohlcv_metrics(ohlcv_1h, ohlcv_4h, current_price, volume_24h=None):
metrics = {}
if ohlcv_1h and len(ohlcv_1h) >= 2:
closes = [c[4] for c in ohlcv_1h]
volumes = [c[5] for c in ohlcv_1h]
metrics["change_1h_pct"] = round((closes[-1] - closes[-2]) / closes[-2] * 100, 2) if closes[-2] != 0 else None
if len(closes) >= 5:
metrics["change_4h_pct"] = round((closes[-1] - closes[-5]) / closes[-5] * 100, 2) if closes[-5] != 0 else None
recent_vol = sum(volumes[-4:]) / 4 if len(volumes) >= 4 else None
metrics["volume_1h_avg"] = round(recent_vol, 2) if recent_vol else None
highs = [c[2] for c in ohlcv_1h[-4:]]
lows = [c[3] for c in ohlcv_1h[-4:]]
metrics["high_4h"] = round(max(highs), 8) if highs else None
metrics["low_4h"] = round(min(lows), 8) if lows else None
if ohlcv_4h and len(ohlcv_4h) >= 2:
closes_4h = [c[4] for c in ohlcv_4h]
volumes_4h = [c[5] for c in ohlcv_4h]
metrics["change_4h_pct_from_4h"] = round((closes_4h[-1] - closes_4h[-2]) / closes_4h[-2] * 100, 2) if closes_4h[-2] != 0 else None
recent_vol_4h = sum(volumes_4h[-2:]) / 2 if len(volumes_4h) >= 2 else None
metrics["volume_4h_avg"] = round(recent_vol_4h, 2) if recent_vol_4h else None
highs_4h = [c[2] for c in ohlcv_4h]
lows_4h = [c[3] for c in ohlcv_4h]
metrics["high_24h_calc"] = round(max(highs_4h), 8) if highs_4h else None
metrics["low_24h_calc"] = round(min(lows_4h), 8) if lows_4h else None
if highs_4h and lows_4h:
avg_price = sum(closes_4h) / len(closes_4h)
metrics["volatility_4h_pct"] = round((max(highs_4h) - min(lows_4h)) / avg_price * 100, 2)
if current_price:
if metrics.get("high_4h"):
metrics["distance_from_4h_high_pct"] = round((metrics["high_4h"] - current_price) / metrics["high_4h"] * 100, 2)
if metrics.get("low_4h"):
metrics["distance_from_4h_low_pct"] = round((current_price - metrics["low_4h"]) / metrics["low_4h"] * 100, 2)
if metrics.get("high_24h_calc"):
metrics["distance_from_24h_high_pct"] = round((metrics["high_24h_calc"] - current_price) / metrics["high_24h_calc"] * 100, 2)
if metrics.get("low_24h_calc"):
metrics["distance_from_24h_low_pct"] = round((current_price - metrics["low_24h_calc"]) / metrics["low_24h_calc"] * 100, 2)
if volume_24h and volume_24h > 0 and metrics.get("volume_1h_avg"):
daily_avg_1h = volume_24h / 24
metrics["volume_1h_multiple"] = round(metrics["volume_1h_avg"] / daily_avg_1h, 2)
if volume_24h and volume_24h > 0 and metrics.get("volume_4h_avg"):
daily_avg_4h = volume_24h / 6
metrics["volume_4h_multiple"] = round(metrics["volume_4h_avg"] / daily_avg_4h, 2)
return metrics
def enrich_candidates_and_positions(global_candidates, candidate_layers, positions_view, tickers, ex):
symbols = set()
for c in global_candidates:
symbols.add(c["symbol"])
for p in positions_view:
sym = p.get("symbol")
if sym:
sym_ccxt = norm_symbol(sym)
symbols.add(sym_ccxt)
ohlcv_1h = fetch_ohlcv_batch(ex, symbols, "1h", 24)
ohlcv_4h = fetch_ohlcv_batch(ex, symbols, "4h", 12)
def _apply(target_list):
for item in target_list:
sym = item.get("symbol")
if not sym:
continue
sym_ccxt = norm_symbol(sym)
v24h = to_float(tickers.get(sym_ccxt, {}).get("quoteVolume"))
metrics = compute_ohlcv_metrics(
ohlcv_1h.get(sym_ccxt),
ohlcv_4h.get(sym_ccxt),
item.get("price") or item.get("last_price"),
volume_24h=v24h,
)
item["metrics"] = metrics
_apply(global_candidates)
for band_list in candidate_layers.values():
_apply(band_list)
_apply(positions_view)
return global_candidates, candidate_layers, positions_view
def regime_from_pct(pct: float | None) -> str:
if pct is None:
return "unknown"
if pct >= 2.0:
return "bullish"
if pct <= -2.0:
return "bearish"
return "neutral"
def to_float(value, default=0.0):
try:
if value is None:
return default
return float(value)
except Exception:
return default
def norm_symbol(symbol: str) -> str:
s = symbol.upper().replace("-", "").replace("_", "")
if "/" in s:
return s
if s.endswith("USDT"):
return s[:-4] + "/USDT"
return s
def get_local_now(config: dict):
tz_name = config.get("timezone") or "Asia/Shanghai"
try:
tz = ZoneInfo(tz_name)
except Exception:
tz = ZoneInfo("Asia/Shanghai")
tz_name = "Asia/Shanghai"
return utc_now().astimezone(tz), tz_name
def session_label(local_dt: datetime) -> str:
hour = local_dt.hour
if 0 <= hour < 7:
return "overnight"
if 7 <= hour < 12:
return "asia-morning"
if 12 <= hour < 17:
return "asia-afternoon"
if 17 <= hour < 21:
return "europe-open"
return "us-session"
def _liquidity_score(volume: float) -> float:
return min(1.0, max(0.0, volume / 50_000_000))
def _breakout_score(price: float, avg_price: float | None) -> float:
if not avg_price or avg_price <= 0:
return 0.0
return (price - avg_price) / avg_price
def top_candidates_from_tickers(tickers: dict):
candidates = []
for symbol, ticker in tickers.items():
if not symbol.endswith("/USDT"):
continue
base = symbol.replace("/USDT", "")
if base in BLACKLIST:
continue
if not re.fullmatch(r"[A-Z0-9]{2,20}", base):
continue
price = to_float(ticker.get("last"))
change_pct = to_float(ticker.get("percentage"))
volume = to_float(ticker.get("quoteVolume"))
high = to_float(ticker.get("high"))
low = to_float(ticker.get("low"))
avg_price = to_float(ticker.get("average"), None)
if price <= 0:
continue
if MAX_PRICE_CAP is not None and price > MAX_PRICE_CAP:
continue
if volume < 500_000:
continue
if change_pct < MIN_CHANGE_PCT:
continue
momentum = change_pct / 10.0
liquidity = _liquidity_score(volume)
breakout = _breakout_score(price, avg_price)
score = round(momentum * 0.5 + liquidity * 0.3 + breakout * 0.2, 4)
band = "major" if price >= 10 else "mid" if price >= 1 else "meme"
distance_from_high = (high - price) / max(high, 1e-9) if high else None
candidates.append({
"symbol": symbol,
"base": base,
"price": round(price, 8),
"change_24h_pct": round(change_pct, 2),
"volume_24h": round(volume, 2),
"breakout_pct": round(breakout * 100, 2),
"high_24h": round(high, 8) if high else None,
"low_24h": round(low, 8) if low else None,
"distance_from_high_pct": round(distance_from_high * 100, 2) if distance_from_high is not None else None,
"score": score,
"band": band,
})
candidates.sort(key=lambda x: x["score"], reverse=True)
global_top = candidates[:TOP_CANDIDATES]
layers = {"major": [], "mid": [], "meme": []}
for c in candidates:
layers[c["band"]].append(c)
for k in layers:
layers[k] = layers[k][:5]
return global_top, layers
def build_snapshot():
config = load_config()
local_dt, tz_name = get_local_now(config)
ex = get_exchange()
positions = load_positions()
tickers = ex.fetch_tickers()
balances = ex.fetch_balance()["free"]
free_usdt = to_float(balances.get("USDT"))
positions_view = []
total_position_value = 0.0
largest_position_value = 0.0
actionable_positions = 0
for pos in positions:
symbol = pos.get("symbol") or ""
sym_ccxt = norm_symbol(symbol)
ticker = tickers.get(sym_ccxt, {})
last = to_float(ticker.get("last"), None)
qty = to_float(pos.get("quantity"))
avg_cost = to_float(pos.get("avg_cost"), None)
value = round(qty * last, 4) if last is not None else None
pnl_pct = round((last - avg_cost) / avg_cost, 4) if last is not None and avg_cost else None
high = to_float(ticker.get("high"))
low = to_float(ticker.get("low"))
distance_from_high = (high - last) / max(high, 1e-9) if high and last else None
if value is not None:
total_position_value += value
largest_position_value = max(largest_position_value, value)
if value >= MIN_REAL_POSITION_VALUE_USDT:
actionable_positions += 1
positions_view.append({
"symbol": symbol,
"base_asset": pos.get("base_asset"),
"quantity": qty,
"avg_cost": avg_cost,
"last_price": last,
"market_value_usdt": value,
"pnl_pct": pnl_pct,
"high_24h": round(high, 8) if high else None,
"low_24h": round(low, 8) if low else None,
"distance_from_high_pct": round(distance_from_high * 100, 2) if distance_from_high is not None else None,
})
btc_pct = to_float((tickers.get("BTC/USDT") or {}).get("percentage"), None)
eth_pct = to_float((tickers.get("ETH/USDT") or {}).get("percentage"), None)
global_candidates, candidate_layers = top_candidates_from_tickers(tickers)
global_candidates, candidate_layers, positions_view = enrich_candidates_and_positions(
global_candidates, candidate_layers, positions_view, tickers, ex
)
leader_score = global_candidates[0]["score"] if global_candidates else 0.0
portfolio_value = round(free_usdt + total_position_value, 4)
volatility_score = round(max(abs(to_float(btc_pct, 0)), abs(to_float(eth_pct, 0))), 2)
position_structure = [
{
"symbol": p.get("symbol"),
"base_asset": p.get("base_asset"),
"quantity": round(to_float(p.get("quantity"), 0), 10),
"avg_cost": to_float(p.get("avg_cost"), None),
}
for p in positions_view
]
snapshot = {
"generated_at": utc_iso(),
"timezone": tz_name,
"local_time": local_dt.isoformat(),
"session": session_label(local_dt),
"free_usdt": round(free_usdt, 4),
"portfolio_value_usdt": portfolio_value,
"largest_position_value_usdt": round(largest_position_value, 4),
"actionable_positions": actionable_positions,
"positions": positions_view,
"positions_hash": stable_hash(position_structure),
"top_candidates": global_candidates,
"top_candidates_layers": candidate_layers,
"candidates_hash": stable_hash({"global": global_candidates, "layers": candidate_layers}),
"market_regime": {
"btc_24h_pct": round(btc_pct, 2) if btc_pct is not None else None,
"btc_regime": regime_from_pct(btc_pct),
"eth_24h_pct": round(eth_pct, 2) if eth_pct is not None else None,
"eth_regime": regime_from_pct(eth_pct),
"volatility_score": volatility_score,
"leader_score": round(leader_score, 4),
},
}
snapshot["snapshot_hash"] = stable_hash({
"portfolio_value_usdt": snapshot["portfolio_value_usdt"],
"positions_hash": snapshot["positions_hash"],
"candidates_hash": snapshot["candidates_hash"],
"market_regime": snapshot["market_regime"],
"session": snapshot["session"],
})
return snapshot
def build_adaptive_profile(snapshot: dict):
portfolio_value = snapshot.get("portfolio_value_usdt", 0)
free_usdt = snapshot.get("free_usdt", 0)
session = snapshot.get("session")
market = snapshot.get("market_regime", {})
volatility_score = to_float(market.get("volatility_score"), 0)
leader_score = to_float(market.get("leader_score"), 0)
actionable_positions = int(snapshot.get("actionable_positions") or 0)
largest_position_value = to_float(snapshot.get("largest_position_value_usdt"), 0)
capital_band = "micro" if portfolio_value < 25 else "small" if portfolio_value < 100 else "normal"
session_mode = "quiet" if session in {"overnight", "asia-morning"} else "active"
volatility_mode = "high" if volatility_score >= 2.5 or leader_score >= 120 else "normal"
dust_mode = free_usdt < MIN_ACTIONABLE_USDT and largest_position_value < MIN_REAL_POSITION_VALUE_USDT
price_trigger = BASE_PRICE_MOVE_TRIGGER_PCT
pnl_trigger = BASE_PNL_TRIGGER_PCT
portfolio_trigger = BASE_PORTFOLIO_MOVE_TRIGGER_PCT
candidate_ratio = BASE_CANDIDATE_SCORE_TRIGGER_RATIO
force_minutes = BASE_FORCE_ANALYSIS_AFTER_MINUTES
cooldown_minutes = BASE_COOLDOWN_MINUTES
soft_score_threshold = 2.0
if capital_band == "micro":
price_trigger += 0.02
pnl_trigger += 0.03
portfolio_trigger += 0.04
candidate_ratio += 0.25
force_minutes += 180
cooldown_minutes += 30
soft_score_threshold += 1.0
elif capital_band == "small":
price_trigger += 0.01
pnl_trigger += 0.01
portfolio_trigger += 0.01
candidate_ratio += 0.1
force_minutes += 60
cooldown_minutes += 10
soft_score_threshold += 0.5
if session_mode == "quiet":
price_trigger += 0.01
pnl_trigger += 0.01
portfolio_trigger += 0.01
candidate_ratio += 0.05
soft_score_threshold += 0.5
else:
force_minutes = max(120, force_minutes - 30)
if volatility_mode == "high":
price_trigger = max(0.02, price_trigger - 0.01)
pnl_trigger = max(0.025, pnl_trigger - 0.005)
portfolio_trigger = max(0.025, portfolio_trigger - 0.005)
candidate_ratio = max(1.1, candidate_ratio - 0.1)
cooldown_minutes = max(20, cooldown_minutes - 10)
soft_score_threshold = max(1.0, soft_score_threshold - 0.5)
if dust_mode:
candidate_ratio += 0.3
force_minutes += 180
cooldown_minutes += 30
soft_score_threshold += 1.5
return {
"capital_band": capital_band,
"session_mode": session_mode,
"volatility_mode": volatility_mode,
"dust_mode": dust_mode,
"price_move_trigger_pct": round(price_trigger, 4),
"pnl_trigger_pct": round(pnl_trigger, 4),
"portfolio_move_trigger_pct": round(portfolio_trigger, 4),
"candidate_score_trigger_ratio": round(candidate_ratio, 4),
"force_analysis_after_minutes": int(force_minutes),
"cooldown_minutes": int(cooldown_minutes),
"soft_score_threshold": round(soft_score_threshold, 2),
"new_entries_allowed": free_usdt >= MIN_ACTIONABLE_USDT and not dust_mode,
"switching_allowed": actionable_positions > 0 or portfolio_value >= 25,
}
def _candidate_weight(snapshot: dict, profile: dict) -> float:
if not profile.get("new_entries_allowed"):
return 0.5
if profile.get("volatility_mode") == "high":
return 1.5
if snapshot.get("session") in {"europe-open", "us-session"}:
return 1.25
return 1.0
def analyze_trigger(snapshot: dict, state: dict):
reasons = []
details = list(state.get("_stale_recovery_notes", []))
hard_reasons = []
soft_reasons = []
soft_score = 0.0
profile = build_adaptive_profile(snapshot)
market = snapshot.get("market_regime", {})
now = utc_now()
last_positions_hash = state.get("last_positions_hash")
last_portfolio_value = state.get("last_portfolio_value_usdt")
last_market_regime = state.get("last_market_regime", {})
last_positions_map = state.get("last_positions_map", {})
last_top_candidate = state.get("last_top_candidate")
pending_trigger = bool(state.get("pending_trigger"))
run_requested_at = parse_ts(state.get("run_requested_at"))
last_deep_analysis_at = parse_ts(state.get("last_deep_analysis_at"))
last_triggered_at = parse_ts(state.get("last_triggered_at"))
last_trigger_snapshot_hash = state.get("last_trigger_snapshot_hash")
last_hard_reasons_at = state.get("last_hard_reasons_at", {})
price_trigger = profile["price_move_trigger_pct"]
pnl_trigger = profile["pnl_trigger_pct"]
portfolio_trigger = profile["portfolio_move_trigger_pct"]
candidate_ratio_trigger = profile["candidate_score_trigger_ratio"]
force_minutes = profile["force_analysis_after_minutes"]
cooldown_minutes = profile["cooldown_minutes"]
soft_score_threshold = profile["soft_score_threshold"]
if pending_trigger:
reasons.append("pending-trigger-unacked")
hard_reasons.append("pending-trigger-unacked")
details.append("上次已触发深度分析但尚未确认完成")
if run_requested_at:
details.append(f"外部门控已在 {run_requested_at.isoformat()} 请求运行分析任务")
if not last_deep_analysis_at:
reasons.append("first-analysis")
hard_reasons.append("first-analysis")
details.append("尚未记录过深度分析")
elif now - last_deep_analysis_at >= timedelta(minutes=force_minutes):
reasons.append("stale-analysis")
hard_reasons.append("stale-analysis")
details.append(f"距离上次深度分析已超过 {force_minutes} 分钟")
if last_positions_hash and snapshot["positions_hash"] != last_positions_hash:
reasons.append("positions-changed")
hard_reasons.append("positions-changed")
details.append("持仓结构发生变化")
if last_portfolio_value not in (None, 0):
portfolio_delta = abs(snapshot["portfolio_value_usdt"] - last_portfolio_value) / max(last_portfolio_value, 1e-9)
if portfolio_delta >= portfolio_trigger:
if portfolio_delta >= 1.0:
reasons.append("portfolio-extreme-move")
hard_reasons.append("portfolio-extreme-move")
details.append(f"组合净值剧烈变化 {portfolio_delta:.1%},超过 100%,视为硬触发")
else:
reasons.append("portfolio-move")
soft_reasons.append("portfolio-move")
soft_score += 1.0
details.append(f"组合净值变化 {portfolio_delta:.1%},阈值 {portfolio_trigger:.1%}")
for pos in snapshot["positions"]:
symbol = pos["symbol"]
prev = last_positions_map.get(symbol, {})
cur_price = pos.get("last_price")
prev_price = prev.get("last_price")
cur_pnl = pos.get("pnl_pct")
prev_pnl = prev.get("pnl_pct")
market_value = to_float(pos.get("market_value_usdt"), 0)
actionable_position = market_value >= MIN_REAL_POSITION_VALUE_USDT
if cur_price and prev_price:
price_move = abs(cur_price - prev_price) / max(prev_price, 1e-9)
if price_move >= price_trigger:
reasons.append(f"price-move:{symbol}")
soft_reasons.append(f"price-move:{symbol}")
soft_score += 1.0 if actionable_position else 0.4
details.append(f"{symbol} 价格变化 {price_move:.1%},阈值 {price_trigger:.1%}")
if cur_pnl is not None and prev_pnl is not None:
pnl_move = abs(cur_pnl - prev_pnl)
if pnl_move >= pnl_trigger:
reasons.append(f"pnl-move:{symbol}")
soft_reasons.append(f"pnl-move:{symbol}")
soft_score += 1.0 if actionable_position else 0.4
details.append(f"{symbol} 盈亏变化 {pnl_move:.1%},阈值 {pnl_trigger:.1%}")
if cur_pnl is not None:
stop_band = -0.06 if actionable_position else -0.12
take_band = 0.14 if actionable_position else 0.25
if cur_pnl <= stop_band or cur_pnl >= take_band:
reasons.append(f"risk-band:{symbol}")
hard_reasons.append(f"risk-band:{symbol}")
details.append(f"{symbol} 接近执行阈值,当前盈亏 {cur_pnl:.1%}")
if cur_pnl <= HARD_STOP_PCT:
reasons.append(f"hard-stop:{symbol}")
hard_reasons.append(f"hard-stop:{symbol}")
details.append(f"{symbol} 盈亏超过 {HARD_STOP_PCT:.1%},触发紧急硬触发")
current_market = snapshot.get("market_regime", {})
if last_market_regime:
if current_market.get("btc_regime") != last_market_regime.get("btc_regime"):
reasons.append("btc-regime-change")
hard_reasons.append("btc-regime-change")
details.append(f"BTC 由 {last_market_regime.get('btc_regime')} 切换为 {current_market.get('btc_regime')}")
if current_market.get("eth_regime") != last_market_regime.get("eth_regime"):
reasons.append("eth-regime-change")
hard_reasons.append("eth-regime-change")
details.append(f"ETH 由 {last_market_regime.get('eth_regime')} 切换为 {current_market.get('eth_regime')}")
for cand in snapshot.get("top_candidates", []):
if cand.get("change_24h_pct", 0) >= HARD_MOON_PCT * 100:
reasons.append(f"hard-moon:{cand['symbol']}")
hard_reasons.append(f"hard-moon:{cand['symbol']}")
details.append(f"候选币 {cand['symbol']} 24h 涨幅 {cand['change_24h_pct']:.1f}%,触发强势硬触发")
candidate_weight = _candidate_weight(snapshot, profile)
last_layers = state.get("last_candidates_layers", {})
current_layers = snapshot.get("top_candidates_layers", {})
for band in ("major", "mid", "meme"):
cur_band = current_layers.get(band, [])
prev_band = last_layers.get(band, [])
cur_leader = cur_band[0] if cur_band else None
prev_leader = prev_band[0] if prev_band else None
if cur_leader and prev_leader and cur_leader["symbol"] != prev_leader["symbol"]:
score_ratio = cur_leader.get("score", 0) / max(prev_leader.get("score", 0.0001), 0.0001)
if score_ratio >= candidate_ratio_trigger:
reasons.append(f"new-leader-{band}:{cur_leader['symbol']}")
soft_reasons.append(f"new-leader-{band}:{cur_leader['symbol']}")
soft_score += candidate_weight * 0.7
details.append(
f"{band} 层新榜首 {cur_leader['symbol']} 替代 {prev_leader['symbol']}score 比例 {score_ratio:.2f}"
)
current_leader = snapshot.get("top_candidates", [{}])[0] if snapshot.get("top_candidates") else None
if last_top_candidate and current_leader:
if current_leader.get("symbol") != last_top_candidate.get("symbol"):
score_ratio = current_leader.get("score", 0) / max(last_top_candidate.get("score", 0.0001), 0.0001)
if score_ratio >= candidate_ratio_trigger:
reasons.append("new-leader")
soft_reasons.append("new-leader")
soft_score += candidate_weight
details.append(
f"新候选币 {current_leader.get('symbol')} 领先上次榜首score 比例 {score_ratio:.2f},阈值 {candidate_ratio_trigger:.2f}"
)
elif current_leader and not last_top_candidate:
reasons.append("candidate-leader-init")
soft_reasons.append("candidate-leader-init")
soft_score += candidate_weight
details.append(f"首次记录候选榜首 {current_leader.get('symbol')}")
def _signal_delta() -> float:
delta = 0.0
if last_trigger_snapshot_hash and snapshot.get("snapshot_hash") != last_trigger_snapshot_hash:
delta += 0.5
if snapshot["positions_hash"] != last_positions_hash:
delta += 1.5
for pos in snapshot["positions"]:
symbol = pos["symbol"]
prev = last_positions_map.get(symbol, {})
cur_price = pos.get("last_price")
prev_price = prev.get("last_price")
cur_pnl = pos.get("pnl_pct")
prev_pnl = prev.get("pnl_pct")
if cur_price and prev_price and abs(cur_price - prev_price) / max(prev_price, 1e-9) >= 0.02:
delta += 0.5
if cur_pnl is not None and prev_pnl is not None and abs(cur_pnl - prev_pnl) >= 0.03:
delta += 0.5
last_leader = state.get("last_top_candidate")
if current_leader and last_leader and current_leader.get("symbol") != last_leader.get("symbol"):
delta += 1.0
for band in ("major", "mid", "meme"):
cur_band = current_layers.get(band, [])
prev_band = last_layers.get(band, [])
cur_l = cur_band[0] if cur_band else None
prev_l = prev_band[0] if prev_band else None
if cur_l and prev_l and cur_l.get("symbol") != prev_l.get("symbol"):
delta += 0.5
if last_market_regime:
if current_market.get("btc_regime") != last_market_regime.get("btc_regime"):
delta += 1.5
if current_market.get("eth_regime") != last_market_regime.get("eth_regime"):
delta += 1.5
if last_portfolio_value not in (None, 0):
portfolio_delta = abs(snapshot["portfolio_value_usdt"] - last_portfolio_value) / max(last_portfolio_value, 1e-9)
if portfolio_delta >= 0.05:
delta += 1.0
last_trigger_hard_types = {r.split(":")[0] for r in (state.get("last_trigger_hard_reasons") or [])}
current_hard_types = {r.split(":")[0] for r in hard_reasons}
if current_hard_types - last_trigger_hard_types:
delta += 2.0
return delta
signal_delta = _signal_delta()
effective_cooldown = cooldown_minutes
if signal_delta < 1.0:
effective_cooldown = max(cooldown_minutes, 90)
elif signal_delta >= 2.5:
effective_cooldown = max(0, cooldown_minutes - 15)
cooldown_active = bool(last_triggered_at and now - last_triggered_at < timedelta(minutes=effective_cooldown))
dedup_window = timedelta(minutes=HARD_REASON_DEDUP_MINUTES)
for hr in list(hard_reasons):
last_at = parse_ts(last_hard_reasons_at.get(hr))
if last_at and now - last_at < dedup_window:
hard_reasons.remove(hr)
details.append(f"{hr} 近期已触发,{HARD_REASON_DEDUP_MINUTES}分钟内去重")
hard_trigger = bool(hard_reasons)
if profile.get("dust_mode") and not hard_trigger and soft_score < soft_score_threshold + 1.0:
details.append("微型资金/粉尘仓位模式:抬高软触发门槛,避免无意义分析")
if profile.get("dust_mode") and not profile.get("new_entries_allowed") and any(
r in {"new-leader", "candidate-leader-init"} for r in soft_reasons
):
details.append("当前可用资金低于可执行阈值,新候选币仅做观察,不单独触发深度分析")
soft_score = max(0.0, soft_score - 0.75)
should_analyze = hard_trigger or soft_score >= soft_score_threshold
if cooldown_active and not hard_trigger and should_analyze:
should_analyze = False
details.append(f"处于 {cooldown_minutes} 分钟冷却窗口,软触发先记录不升级")
if cooldown_active and not hard_trigger and reasons and soft_score < soft_score_threshold:
details.append(f"处于 {cooldown_minutes} 分钟冷却窗口,且软信号强度不足 ({soft_score:.2f} < {soft_score_threshold:.2f})")
status = "deep_analysis_required" if should_analyze else "stable"
compact_lines = [
f"状态: {status}",
f"组合净值: ${snapshot['portfolio_value_usdt']:.4f} | 可用USDT: ${snapshot['free_usdt']:.4f}",
f"本地时段: {snapshot['session']} | 时区: {snapshot['timezone']}",
f"BTC/ETH: {market.get('btc_regime')} ({market.get('btc_24h_pct')}%), {market.get('eth_regime')} ({market.get('eth_24h_pct')}%) | 波动分数 {market.get('volatility_score')}",
f"门控画像: capital={profile['capital_band']}, session={profile['session_mode']}, volatility={profile['volatility_mode']}, dust={profile['dust_mode']}",
f"阈值: price={price_trigger:.1%}, pnl={pnl_trigger:.1%}, portfolio={portfolio_trigger:.1%}, candidate={candidate_ratio_trigger:.2f}, cooldown={effective_cooldown}m({cooldown_minutes}m基础), force={force_minutes}m",
f"软信号分: {soft_score:.2f} / {soft_score_threshold:.2f}",
f"信号变化度: {signal_delta:.1f}",
]
if snapshot["positions"]:
compact_lines.append("持仓:")
for pos in snapshot["positions"][:4]:
pnl = pos.get("pnl_pct")
pnl_text = f"{pnl:+.1%}" if pnl is not None else "n/a"
compact_lines.append(
f"- {pos['symbol']}: qty={pos['quantity']}, px={pos.get('last_price')}, pnl={pnl_text}, value=${pos.get('market_value_usdt')}"
)
else:
compact_lines.append("持仓: 当前无现货仓位")
if snapshot["top_candidates"]:
compact_lines.append("候选榜:")
for cand in snapshot["top_candidates"]:
compact_lines.append(
f"- {cand['symbol']}: score={cand['score']}, 24h={cand['change_24h_pct']}%, vol=${cand['volume_24h']}"
)
layers = snapshot.get("top_candidates_layers", {})
for band, band_cands in layers.items():
if band_cands:
compact_lines.append(f"{band} 层:")
for cand in band_cands:
compact_lines.append(
f"- {cand['symbol']}: score={cand['score']}, 24h={cand['change_24h_pct']}%, vol=${cand['volume_24h']}"
)
if details:
compact_lines.append("触发说明:")
for item in details:
compact_lines.append(f"- {item}")
return {
"generated_at": snapshot["generated_at"],
"status": status,
"should_analyze": should_analyze,
"pending_trigger": pending_trigger,
"run_requested": bool(run_requested_at),
"run_requested_at": run_requested_at.isoformat() if run_requested_at else None,
"cooldown_active": cooldown_active,
"effective_cooldown_minutes": effective_cooldown,
"signal_delta": round(signal_delta, 2),
"reasons": reasons,
"hard_reasons": hard_reasons,
"soft_reasons": soft_reasons,
"soft_score": round(soft_score, 3),
"adaptive_profile": profile,
"portfolio_value_usdt": snapshot["portfolio_value_usdt"],
"free_usdt": snapshot["free_usdt"],
"market_regime": snapshot["market_regime"],
"session": snapshot["session"],
"positions": snapshot["positions"],
"top_candidates": snapshot["top_candidates"],
"top_candidates_layers": layers,
"snapshot_hash": snapshot["snapshot_hash"],
"compact_summary": "\n".join(compact_lines),
"details": details,
}
def update_state_after_observation(state: dict, snapshot: dict, analysis: dict):
new_state = dict(state)
new_state.update({
"last_observed_at": snapshot["generated_at"],
"last_snapshot_hash": snapshot["snapshot_hash"],
"last_positions_hash": snapshot["positions_hash"],
"last_candidates_hash": snapshot["candidates_hash"],
"last_portfolio_value_usdt": snapshot["portfolio_value_usdt"],
"last_market_regime": snapshot["market_regime"],
"last_positions_map": {
p["symbol"]: {"last_price": p.get("last_price"), "pnl_pct": p.get("pnl_pct")}
for p in snapshot["positions"]
},
"last_top_candidate": snapshot["top_candidates"][0] if snapshot["top_candidates"] else None,
"last_candidates_layers": snapshot.get("top_candidates_layers", {}),
"last_adaptive_profile": analysis.get("adaptive_profile", {}),
})
if analysis["should_analyze"]:
new_state["pending_trigger"] = True
new_state["pending_reasons"] = analysis["details"]
new_state["last_triggered_at"] = snapshot["generated_at"]
new_state["last_trigger_snapshot_hash"] = snapshot["snapshot_hash"]
new_state["last_trigger_hard_reasons"] = analysis.get("hard_reasons", [])
new_state["last_trigger_signal_delta"] = analysis.get("signal_delta", 0.0)
last_hard_reasons_at = dict(state.get("last_hard_reasons_at", {}))
for hr in analysis.get("hard_reasons", []):
last_hard_reasons_at[hr] = snapshot["generated_at"]
cutoff = utc_now() - timedelta(hours=24)
pruned = {k: v for k, v in last_hard_reasons_at.items() if parse_ts(v) and parse_ts(v) > cutoff}
new_state["last_hard_reasons_at"] = pruned
return new_state

View File

@@ -2,8 +2,8 @@
from __future__ import annotations
from .. import precheck as precheck_module
from . import precheck_core
def build_snapshot() -> dict:
return precheck_module.build_snapshot()
return precheck_core.build_snapshot()

View File

@@ -4,28 +4,28 @@ from __future__ import annotations
import json
from .. import precheck as precheck_module
from . import precheck_core
def load_state() -> dict:
return precheck_module.load_state()
return precheck_core.load_state()
def save_state(state: dict) -> None:
precheck_module.save_state(state)
precheck_core.save_state(state)
def sanitize_state_for_stale_triggers(state: dict) -> dict:
return precheck_module.sanitize_state_for_stale_triggers(state)
return precheck_core.sanitize_state_for_stale_triggers(state)
def update_state_after_observation(state: dict, snapshot: dict, analysis: dict) -> dict:
return precheck_module.update_state_after_observation(state, snapshot, analysis)
return precheck_core.update_state_after_observation(state, snapshot, analysis)
def mark_run_requested(note: str = "") -> dict:
state = load_state()
state["run_requested_at"] = precheck_module.utc_iso()
state["run_requested_at"] = precheck_core.utc_iso()
state["run_request_note"] = note
save_state(state)
payload = {"ok": True, "run_requested_at": state["run_requested_at"], "note": note}
@@ -35,7 +35,7 @@ def mark_run_requested(note: str = "") -> dict:
def ack_analysis(note: str = "") -> dict:
state = load_state()
state["last_deep_analysis_at"] = precheck_module.utc_iso()
state["last_deep_analysis_at"] = precheck_core.utc_iso()
state["pending_trigger"] = False
state["pending_reasons"] = []
state["last_ack_note"] = note

97
src/coinhunter/smart_executor.py Executable file → Normal file
View File

@@ -1,28 +1,99 @@
#!/usr/bin/env python3
"""Coin Hunter robust smart executor — compatibility facade."""
"""Backward-compatible facade for smart executor workflows.
The executable implementation lives in ``coinhunter.services.smart_executor_service``.
This module stays importable for older callers without importing the whole trading
stack up front.
"""
from __future__ import annotations
import sys
from importlib import import_module
from .runtime import get_runtime_paths, load_env_file
from .services.trade_common import CST, is_dry_run, USDT_BUFFER_PCT, MIN_REMAINING_DUST_USDT, log, bj_now_iso, set_dry_run
from .services.file_utils import locked_file, atomic_write_json, load_json_locked, save_json_locked
from .services.smart_executor_parser import build_parser, normalize_legacy_argv, parse_cli_args, cli_action_args
from .services.execution_state import default_decision_id, record_execution_state, get_execution_state, load_executions, save_executions
from .services.portfolio_service import load_positions, save_positions, upsert_position, reconcile_positions_with_exchange
from .services.exchange_service import get_exchange, norm_symbol, storage_symbol, fetch_balances, build_market_snapshot, market_and_ticker, floor_to_step, prepare_buy_quantity, prepare_sell_quantity
from .services.trade_execution import build_decision_context, market_sell, market_buy, action_sell_all, action_buy, action_rebalance, command_status, command_balances
from .services.smart_executor_service import run as _run_service
PATHS = get_runtime_paths()
ENV_FILE = PATHS.env_file
_EXPORT_MAP = {
"PATHS": (".runtime", "get_runtime_paths"),
"ENV_FILE": (".runtime", "get_runtime_paths"),
"load_env_file": (".runtime", "load_env_file"),
"CST": (".services.trade_common", "CST"),
"USDT_BUFFER_PCT": (".services.trade_common", "USDT_BUFFER_PCT"),
"MIN_REMAINING_DUST_USDT": (".services.trade_common", "MIN_REMAINING_DUST_USDT"),
"is_dry_run": (".services.trade_common", "is_dry_run"),
"log": (".services.trade_common", "log"),
"bj_now_iso": (".services.trade_common", "bj_now_iso"),
"set_dry_run": (".services.trade_common", "set_dry_run"),
"locked_file": (".services.file_utils", "locked_file"),
"atomic_write_json": (".services.file_utils", "atomic_write_json"),
"load_json_locked": (".services.file_utils", "load_json_locked"),
"save_json_locked": (".services.file_utils", "save_json_locked"),
"build_parser": (".services.smart_executor_parser", "build_parser"),
"normalize_legacy_argv": (".services.smart_executor_parser", "normalize_legacy_argv"),
"parse_cli_args": (".services.smart_executor_parser", "parse_cli_args"),
"cli_action_args": (".services.smart_executor_parser", "cli_action_args"),
"default_decision_id": (".services.execution_state", "default_decision_id"),
"record_execution_state": (".services.execution_state", "record_execution_state"),
"get_execution_state": (".services.execution_state", "get_execution_state"),
"load_executions": (".services.execution_state", "load_executions"),
"save_executions": (".services.execution_state", "save_executions"),
"load_positions": (".services.portfolio_service", "load_positions"),
"save_positions": (".services.portfolio_service", "save_positions"),
"upsert_position": (".services.portfolio_service", "upsert_position"),
"reconcile_positions_with_exchange": (".services.portfolio_service", "reconcile_positions_with_exchange"),
"get_exchange": (".services.exchange_service", "get_exchange"),
"norm_symbol": (".services.exchange_service", "norm_symbol"),
"storage_symbol": (".services.exchange_service", "storage_symbol"),
"fetch_balances": (".services.exchange_service", "fetch_balances"),
"build_market_snapshot": (".services.exchange_service", "build_market_snapshot"),
"market_and_ticker": (".services.exchange_service", "market_and_ticker"),
"floor_to_step": (".services.exchange_service", "floor_to_step"),
"prepare_buy_quantity": (".services.exchange_service", "prepare_buy_quantity"),
"prepare_sell_quantity": (".services.exchange_service", "prepare_sell_quantity"),
"build_decision_context": (".services.trade_execution", "build_decision_context"),
"market_sell": (".services.trade_execution", "market_sell"),
"market_buy": (".services.trade_execution", "market_buy"),
"action_sell_all": (".services.trade_execution", "action_sell_all"),
"action_buy": (".services.trade_execution", "action_buy"),
"action_rebalance": (".services.trade_execution", "action_rebalance"),
"command_status": (".services.trade_execution", "command_status"),
"command_balances": (".services.trade_execution", "command_balances"),
}
__all__ = sorted(set(_EXPORT_MAP) | {"ENV_FILE", "PATHS", "load_env", "main"})
def __getattr__(name: str):
if name == "PATHS":
runtime = import_module(".runtime", __package__)
return runtime.get_runtime_paths()
if name == "ENV_FILE":
runtime = import_module(".runtime", __package__)
return runtime.get_runtime_paths().env_file
if name == "load_env":
return load_env
if name not in _EXPORT_MAP:
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
module_name, attr_name = _EXPORT_MAP[name]
module = import_module(module_name, __package__)
if name == "PATHS":
return getattr(module, attr_name)()
if name == "ENV_FILE":
return getattr(module, attr_name)().env_file
return getattr(module, attr_name)
def __dir__():
return sorted(set(globals()) | set(__all__))
def load_env():
load_env_file(PATHS)
runtime = import_module(".runtime", __package__)
runtime.load_env_file(runtime.get_runtime_paths())
def main(argv=None):
return _run_service(argv)
return _run_service(sys.argv[1:] if argv is None else argv)
if __name__ == "__main__":