"""Trigger analysis logic for precheck.""" from __future__ import annotations from datetime import timedelta from .adaptive_profile import _candidate_weight, build_adaptive_profile from .data_utils import to_float from .precheck_constants import ( HARD_MOON_PCT, HARD_REASON_DEDUP_MINUTES, HARD_STOP_PCT, MIN_REAL_POSITION_VALUE_USDT, ) from .time_utils import parse_ts, utc_now def analyze_trigger(snapshot: dict, state: dict): reasons = [] details = list(state.get("_stale_recovery_notes", [])) hard_reasons = [] soft_reasons = [] soft_score = 0.0 profile = build_adaptive_profile(snapshot) market = snapshot.get("market_regime", {}) now = utc_now() last_positions_hash = state.get("last_positions_hash") last_portfolio_value = state.get("last_portfolio_value_usdt") last_market_regime = state.get("last_market_regime", {}) last_positions_map = state.get("last_positions_map", {}) last_top_candidate = state.get("last_top_candidate") pending_trigger = bool(state.get("pending_trigger")) run_requested_at = parse_ts(state.get("run_requested_at")) last_deep_analysis_at = parse_ts(state.get("last_deep_analysis_at")) last_triggered_at = parse_ts(state.get("last_triggered_at")) last_trigger_snapshot_hash = state.get("last_trigger_snapshot_hash") last_hard_reasons_at = state.get("last_hard_reasons_at", {}) price_trigger = profile["price_move_trigger_pct"] pnl_trigger = profile["pnl_trigger_pct"] portfolio_trigger = profile["portfolio_move_trigger_pct"] candidate_ratio_trigger = profile["candidate_score_trigger_ratio"] force_minutes = profile["force_analysis_after_minutes"] cooldown_minutes = profile["cooldown_minutes"] soft_score_threshold = profile["soft_score_threshold"] if pending_trigger: reasons.append("pending-trigger-unacked") hard_reasons.append("pending-trigger-unacked") details.append("Previous deep analysis trigger has not been acknowledged yet") if run_requested_at: details.append(f"External gate requested analysis at {run_requested_at.isoformat()}") if not last_deep_analysis_at: reasons.append("first-analysis") hard_reasons.append("first-analysis") details.append("No deep analysis has been recorded yet") elif now - last_deep_analysis_at >= timedelta(minutes=force_minutes): reasons.append("stale-analysis") hard_reasons.append("stale-analysis") details.append(f"Time since last deep analysis exceeds {force_minutes} minutes") if last_positions_hash and snapshot["positions_hash"] != last_positions_hash: reasons.append("positions-changed") hard_reasons.append("positions-changed") details.append("Position structure has changed") if last_portfolio_value not in (None, 0): lpf = float(str(last_portfolio_value)) portfolio_delta = abs(snapshot["portfolio_value_usdt"] - lpf) / max(lpf, 1e-9) if portfolio_delta >= portfolio_trigger: if portfolio_delta >= 1.0: reasons.append("portfolio-extreme-move") hard_reasons.append("portfolio-extreme-move") details.append(f"Portfolio value moved extremely {portfolio_delta:.1%}, exceeding 100%, treated as hard trigger") else: reasons.append("portfolio-move") soft_reasons.append("portfolio-move") soft_score += 1.0 details.append(f"Portfolio value moved {portfolio_delta:.1%}, threshold {portfolio_trigger:.1%}") for pos in snapshot["positions"]: symbol = pos["symbol"] prev = last_positions_map.get(symbol, {}) cur_price = pos.get("last_price") prev_price = prev.get("last_price") cur_pnl = pos.get("pnl_pct") prev_pnl = prev.get("pnl_pct") market_value = to_float(pos.get("market_value_usdt"), 0) actionable_position = market_value >= MIN_REAL_POSITION_VALUE_USDT if cur_price and prev_price: price_move = abs(cur_price - prev_price) / max(prev_price, 1e-9) if price_move >= price_trigger: reasons.append(f"price-move:{symbol}") soft_reasons.append(f"price-move:{symbol}") soft_score += 1.0 if actionable_position else 0.4 details.append(f"{symbol} price moved {price_move:.1%}, threshold {price_trigger:.1%}") if cur_pnl is not None and prev_pnl is not None: pnl_move = abs(cur_pnl - prev_pnl) if pnl_move >= pnl_trigger: reasons.append(f"pnl-move:{symbol}") soft_reasons.append(f"pnl-move:{symbol}") soft_score += 1.0 if actionable_position else 0.4 details.append(f"{symbol} PnL moved {pnl_move:.1%}, threshold {pnl_trigger:.1%}") if cur_pnl is not None: stop_band = -0.06 if actionable_position else -0.12 take_band = 0.14 if actionable_position else 0.25 if cur_pnl <= stop_band or cur_pnl >= take_band: reasons.append(f"risk-band:{symbol}") hard_reasons.append(f"risk-band:{symbol}") details.append(f"{symbol} near execution threshold, current PnL {cur_pnl:.1%}") if cur_pnl <= HARD_STOP_PCT: reasons.append(f"hard-stop:{symbol}") hard_reasons.append(f"hard-stop:{symbol}") details.append(f"{symbol} PnL exceeded {HARD_STOP_PCT:.1%}, emergency hard trigger") current_market = snapshot.get("market_regime", {}) if last_market_regime: if current_market.get("btc_regime") != last_market_regime.get("btc_regime"): reasons.append("btc-regime-change") hard_reasons.append("btc-regime-change") details.append(f"BTC regime changed from {last_market_regime.get('btc_regime')} to {current_market.get('btc_regime')}") if current_market.get("eth_regime") != last_market_regime.get("eth_regime"): reasons.append("eth-regime-change") hard_reasons.append("eth-regime-change") details.append(f"ETH regime changed from {last_market_regime.get('eth_regime')} to {current_market.get('eth_regime')}") for cand in snapshot.get("top_candidates", []): if cand.get("change_24h_pct", 0) >= HARD_MOON_PCT * 100: reasons.append(f"hard-moon:{cand['symbol']}") hard_reasons.append(f"hard-moon:{cand['symbol']}") details.append(f"Candidate {cand['symbol']} 24h change {cand['change_24h_pct']:.1f}%, hard moon trigger") candidate_weight = _candidate_weight(snapshot, profile) last_layers = state.get("last_candidates_layers", {}) current_layers = snapshot.get("top_candidates_layers", {}) for band in ("major", "mid", "meme"): cur_band = current_layers.get(band, []) prev_band = last_layers.get(band, []) cur_leader = cur_band[0] if cur_band else None prev_leader = prev_band[0] if prev_band else None if cur_leader and prev_leader and cur_leader["symbol"] != prev_leader["symbol"]: score_ratio = cur_leader.get("score", 0) / max(prev_leader.get("score", 0.0001), 0.0001) if score_ratio >= candidate_ratio_trigger: reasons.append(f"new-leader-{band}:{cur_leader['symbol']}") soft_reasons.append(f"new-leader-{band}:{cur_leader['symbol']}") soft_score += candidate_weight * 0.7 details.append( f"{band} tier new leader {cur_leader['symbol']} replaced {prev_leader['symbol']}, score ratio {score_ratio:.2f}" ) current_leader = snapshot.get("top_candidates", [{}])[0] if snapshot.get("top_candidates") else None if last_top_candidate and current_leader: if current_leader.get("symbol") != last_top_candidate.get("symbol"): score_ratio = current_leader.get("score", 0) / max(last_top_candidate.get("score", 0.0001), 0.0001) if score_ratio >= candidate_ratio_trigger: reasons.append("new-leader") soft_reasons.append("new-leader") soft_score += candidate_weight details.append( f"New candidate {current_leader.get('symbol')} leads previous top, score ratio {score_ratio:.2f}, threshold {candidate_ratio_trigger:.2f}" ) elif current_leader and not last_top_candidate: reasons.append("candidate-leader-init") soft_reasons.append("candidate-leader-init") soft_score += candidate_weight details.append(f"First recorded candidate leader {current_leader.get('symbol')}") def _signal_delta() -> float: delta = 0.0 if last_trigger_snapshot_hash and snapshot.get("snapshot_hash") != last_trigger_snapshot_hash: delta += 0.5 if snapshot["positions_hash"] != last_positions_hash: delta += 1.5 for pos in snapshot["positions"]: symbol = pos["symbol"] prev = last_positions_map.get(symbol, {}) cur_price = pos.get("last_price") prev_price = prev.get("last_price") cur_pnl = pos.get("pnl_pct") prev_pnl = prev.get("pnl_pct") if cur_price and prev_price and abs(cur_price - prev_price) / max(prev_price, 1e-9) >= 0.02: delta += 0.5 if cur_pnl is not None and prev_pnl is not None and abs(cur_pnl - prev_pnl) >= 0.03: delta += 0.5 last_leader = state.get("last_top_candidate") if current_leader and last_leader and current_leader.get("symbol") != last_leader.get("symbol"): delta += 1.0 for band in ("major", "mid", "meme"): cur_band = current_layers.get(band, []) prev_band = last_layers.get(band, []) cur_l = cur_band[0] if cur_band else None prev_l = prev_band[0] if prev_band else None if cur_l and prev_l and cur_l.get("symbol") != prev_l.get("symbol"): delta += 0.5 if last_market_regime: if current_market.get("btc_regime") != last_market_regime.get("btc_regime"): delta += 1.5 if current_market.get("eth_regime") != last_market_regime.get("eth_regime"): delta += 1.5 if last_portfolio_value not in (None, 0): lpf = float(str(last_portfolio_value)) portfolio_delta = abs(snapshot["portfolio_value_usdt"] - lpf) / max(lpf, 1e-9) if portfolio_delta >= 0.05: delta += 1.0 last_trigger_hard_types = {r.split(":")[0] for r in (state.get("last_trigger_hard_reasons") or [])} current_hard_types = {r.split(":")[0] for r in hard_reasons} if current_hard_types - last_trigger_hard_types: delta += 2.0 return delta signal_delta = _signal_delta() effective_cooldown = cooldown_minutes if signal_delta < 1.0: effective_cooldown = max(cooldown_minutes, 90) elif signal_delta >= 2.5: effective_cooldown = max(0, cooldown_minutes - 15) cooldown_active = bool(last_triggered_at and now - last_triggered_at < timedelta(minutes=effective_cooldown)) dedup_window = timedelta(minutes=HARD_REASON_DEDUP_MINUTES) for hr in list(hard_reasons): last_at = parse_ts(last_hard_reasons_at.get(hr)) if last_at and now - last_at < dedup_window: hard_reasons.remove(hr) details.append(f"{hr} triggered recently, deduplicated within {HARD_REASON_DEDUP_MINUTES} minutes") hard_trigger = bool(hard_reasons) if profile.get("dust_mode") and not hard_trigger and soft_score < soft_score_threshold + 1.0: details.append("Dust-mode portfolio: raising soft-trigger threshold to avoid noise") if profile.get("dust_mode") and not profile.get("new_entries_allowed") and any( r in {"new-leader", "candidate-leader-init"} for r in soft_reasons ): details.append("Available capital below executable threshold; new candidates are observation-only") soft_score = max(0.0, soft_score - 0.75) should_analyze = hard_trigger or soft_score >= soft_score_threshold if cooldown_active and not hard_trigger and should_analyze: should_analyze = False details.append(f"In {cooldown_minutes} minute cooldown window; soft trigger logged but not escalated") if cooldown_active and not hard_trigger and reasons and soft_score < soft_score_threshold: details.append(f"In {cooldown_minutes} minute cooldown window with insufficient soft signal ({soft_score:.2f} < {soft_score_threshold:.2f})") status = "deep_analysis_required" if should_analyze else "stable" compact_lines = [ f"Status: {status}", f"Portfolio: ${snapshot['portfolio_value_usdt']:.4f} | Free USDT: ${snapshot['free_usdt']:.4f}", f"Session: {snapshot['session']} | TZ: {snapshot['timezone']}", f"BTC/ETH: {market.get('btc_regime')} ({market.get('btc_24h_pct')}%), {market.get('eth_regime')} ({market.get('eth_24h_pct')}%) | Volatility score {market.get('volatility_score')}", f"Profile: capital={profile['capital_band']}, session={profile['session_mode']}, volatility={profile['volatility_mode']}, dust={profile['dust_mode']}", f"Thresholds: price={price_trigger:.1%}, pnl={pnl_trigger:.1%}, portfolio={portfolio_trigger:.1%}, candidate={candidate_ratio_trigger:.2f}, cooldown={effective_cooldown}m({cooldown_minutes}m base), force={force_minutes}m", f"Soft score: {soft_score:.2f} / {soft_score_threshold:.2f}", f"Signal delta: {signal_delta:.1f}", ] if snapshot["positions"]: compact_lines.append("Positions:") for pos in snapshot["positions"][:4]: pnl = pos.get("pnl_pct") pnl_text = f"{pnl:+.1%}" if pnl is not None else "n/a" compact_lines.append( f"- {pos['symbol']}: qty={pos['quantity']}, px={pos.get('last_price')}, pnl={pnl_text}, value=${pos.get('market_value_usdt')}" ) else: compact_lines.append("Positions: no spot positions currently") if snapshot["top_candidates"]: compact_lines.append("Candidates:") for cand in snapshot["top_candidates"]: compact_lines.append( f"- {cand['symbol']}: score={cand['score']}, 24h={cand['change_24h_pct']}%, vol=${cand['volume_24h']}" ) layers = snapshot.get("top_candidates_layers", {}) for band, band_cands in layers.items(): if band_cands: compact_lines.append(f"{band} tier:") for cand in band_cands: compact_lines.append( f"- {cand['symbol']}: score={cand['score']}, 24h={cand['change_24h_pct']}%, vol=${cand['volume_24h']}" ) if details: compact_lines.append("Trigger notes:") for item in details: compact_lines.append(f"- {item}") return { "generated_at": snapshot["generated_at"], "status": status, "should_analyze": should_analyze, "pending_trigger": pending_trigger, "run_requested": bool(run_requested_at), "run_requested_at": run_requested_at.isoformat() if run_requested_at else None, "cooldown_active": cooldown_active, "effective_cooldown_minutes": effective_cooldown, "signal_delta": round(signal_delta, 2), "reasons": reasons, "hard_reasons": hard_reasons, "soft_reasons": soft_reasons, "soft_score": round(soft_score, 3), "adaptive_profile": profile, "portfolio_value_usdt": snapshot["portfolio_value_usdt"], "free_usdt": snapshot["free_usdt"], "market_regime": snapshot["market_regime"], "session": snapshot["session"], "positions": snapshot["positions"], "top_candidates": snapshot["top_candidates"], "top_candidates_layers": layers, "snapshot_hash": snapshot["snapshot_hash"], "compact_summary": "\n".join(compact_lines), "details": details, }