From 003212de99f4dba8d8e8758b1514d4853289cf49 Mon Sep 17 00:00:00 2001 From: TacitLab Date: Wed, 22 Apr 2026 01:08:34 +0800 Subject: [PATCH] refactor: simplify opportunity actions to entry/watch/avoid with confidence - Remove dead scoring code (_score_candidate, _action_for, etc.) and align action decisions directly with score_opportunity_signal metrics. - Reduce action surface from trigger/setup/chase/skip to entry/watch/avoid. - Add confidence field (0..100) mapped from edge_score. - Update evaluate/optimize ground-truth mapping and tests. Co-Authored-By: Claude Opus 4.7 --- src/coinhunter/cli.py | 22 +- .../opportunity_evaluation_service.py | 20 +- .../services/opportunity_service.py | 288 +++--------------- tests/test_opportunity_dataset_service.py | 2 +- tests/test_opportunity_service.py | 86 +----- 5 files changed, 72 insertions(+), 346 deletions(-) diff --git a/src/coinhunter/cli.py b/src/coinhunter/cli.py index 06b7bb7..78a0474 100644 --- a/src/coinhunter/cli.py +++ b/src/coinhunter/cli.py @@ -406,12 +406,12 @@ Fields: "tui": """\ TUI Output: RECOMMENDATIONS count=5 - 1. ETHUSDT action=trigger score=1.7200 - · fresh breakout trigger is forming without excessive extension + 1. ETHUSDT action=entry confidence=74 score=1.7200 + · fresh breakout trigger with clean setup and manageable extension · base asset ETH passed liquidity and tradability filters setup_score=0.74 trigger_score=0.61 liquidity_score=1.0 extension_penalty=0.0 opportunity_score=1.72 position_weight=0.0 - 2. BTCUSDT action=setup score=0.7800 - · setup is constructive but still needs a cleaner trigger + 2. BTCUSDT action=watch confidence=52 score=0.7800 + · setup is constructive but the trigger is not clean enough yet · base asset BTC passed liquidity and tradability filters · symbol is already held, so the opportunity score is discounted for overlap setup_score=0.68 trigger_score=0.25 liquidity_score=1.0 extension_penalty=0.1 opportunity_score=0.96 position_weight=0.3 @@ -419,14 +419,15 @@ TUI Output: JSON Output: { "recommendations": [ - {"symbol": "ETHUSDT", "action": "trigger", "score": 1.72, - "reasons": ["fresh breakout trigger is forming without excessive extension", "base asset ETH passed liquidity and tradability filters"], + {"symbol": "ETHUSDT", "action": "entry", "confidence": 74, "score": 1.72, + "reasons": ["fresh breakout trigger with clean setup and manageable extension", "base asset ETH passed liquidity and tradability filters"], "metrics": {"setup_score": 0.74, "trigger_score": 0.61, "liquidity_score": 1.0, "extension_penalty": 0.0, "opportunity_score": 1.72, "position_weight": 0.0}} ] } Fields: symbol – trading pair (e.g. "ETHUSDT") - action – enum: "trigger" | "setup" | "chase" | "skip" + action – enum: "entry" | "watch" | "avoid" + confidence – 0..100 confidence index derived from edge_score score – opportunity score after extension and overlap/risk discounts reasons – list of human-readable explanations (includes liquidity filter note for scan) metrics – scoring breakdown @@ -441,14 +442,15 @@ Fields: JSON Output: { "recommendations": [ - {"symbol": "ETHUSDT", "action": "trigger", "score": 1.72, - "reasons": ["fresh breakout trigger is forming without excessive extension", "base asset ETH passed liquidity and tradability filters"], + {"symbol": "ETHUSDT", "action": "entry", "confidence": 74, "score": 1.72, + "reasons": ["fresh breakout trigger with clean setup and manageable extension", "base asset ETH passed liquidity and tradability filters"], "metrics": {"setup_score": 0.74, "trigger_score": 0.61, "liquidity_score": 1.0, "extension_penalty": 0.0, "opportunity_score": 1.72, "position_weight": 0.0}} ] } Fields: symbol – trading pair (e.g. "ETHUSDT") - action – enum: "trigger" | "setup" | "chase" | "skip" + action – enum: "entry" | "watch" | "avoid" + confidence – 0..100 confidence index derived from edge_score score – opportunity score after extension and overlap/risk discounts reasons – list of human-readable explanations (includes liquidity filter note for scan) metrics – scoring breakdown diff --git a/src/coinhunter/services/opportunity_evaluation_service.py b/src/coinhunter/services/opportunity_evaluation_service.py index 17e53ee..660ae54 100644 --- a/src/coinhunter/services/opportunity_evaluation_service.py +++ b/src/coinhunter/services/opportunity_evaluation_service.py @@ -156,11 +156,11 @@ def _path_stats(entry: float, future_rows: list[list[Any]], take_profit: float, def _is_correct(action: str, trigger_path: dict[str, Any], setup_path: dict[str, Any]) -> bool: - if action == "trigger": + if action == "entry": return str(trigger_path["event"]) == "target" - if action == "setup": + if action == "watch": return str(setup_path["event"]) == "target" - if action in {"skip", "chase"}: + if action == "avoid": return str(setup_path["event"]) != "target" return False @@ -274,7 +274,7 @@ def evaluate_opportunity_dataset( metrics["opportunity_score"] = round(opportunity_score, 4) metrics["position_weight"] = 0.0 metrics["research_score"] = 0.0 - action, reasons = _action_for_opportunity(score, metrics, thresholds) + action, reasons, _confidence = _action_for_opportunity(score, metrics, thresholds) candidates.append( { "symbol": symbol, @@ -304,7 +304,7 @@ def evaluate_opportunity_dataset( "forward_return": _round_float(trigger_path["final_return"]), "max_upside": _round_float(trigger_path["max_upside"]), "max_drawdown": _round_float(trigger_path["max_drawdown"]), - "trade_return": _round_float(trigger_path["exit_return"]) if candidate["action"] == "trigger" else 0.0, + "trade_return": _round_float(trigger_path["exit_return"]) if candidate["action"] == "entry" else 0.0, "trigger_event": trigger_path["event"], "setup_event": setup_path["event"], "metrics": candidate["metrics"], @@ -321,16 +321,16 @@ def evaluate_opportunity_dataset( bucket["count"] += 1 bucket["correct"] += 1 if judgment["correct"] else 0 bucket["forward_returns"].append(judgment["forward_return"]) - if action == "trigger": + if action == "entry": bucket["trade_returns"].append(judgment["trade_return"]) - if action == "trigger": + if action == "entry": trigger_returns.append(judgment["trade_return"]) by_action_result = {action: _finalize_bucket(bucket) for action, bucket in sorted(by_action.items())} incorrect_examples = [item for item in judgments if not item["correct"]][:max_examples] examples = judgments[:max_examples] - trigger_count = by_action_result.get("trigger", {}).get("count", 0) - trigger_correct = by_action_result.get("trigger", {}).get("correct", 0) + trigger_count = by_action_result.get("entry", {}).get("count", 0) + trigger_correct = by_action_result.get("entry", {}).get("correct", 0) return { "summary": { **_finalize_bucket(overall), @@ -377,7 +377,7 @@ def _objective(result: dict[str, Any]) -> float: trigger_coverage = min(trigger_rate / 0.08, 1.0) return round( 0.45 * _as_float(summary.get("accuracy")) - + 0.20 * _as_float(by_action.get("setup", {}).get("accuracy")) + + 0.20 * _as_float(by_action.get("watch", {}).get("accuracy")) + 0.25 * _as_float(trade.get("win_rate")) + 6.0 * bounded_trade_return + 0.05 * trigger_coverage, diff --git a/src/coinhunter/services/opportunity_service.py b/src/coinhunter/services/opportunity_service.py index 8782348..2cf1db0 100644 --- a/src/coinhunter/services/opportunity_service.py +++ b/src/coinhunter/services/opportunity_service.py @@ -3,7 +3,6 @@ from __future__ import annotations from dataclasses import asdict, dataclass -from math import log10 from statistics import mean from typing import Any @@ -19,6 +18,7 @@ class OpportunityRecommendation: symbol: str action: str score: float + confidence: int reasons: list[str] metrics: dict[str, float] @@ -34,12 +34,6 @@ def _opportunity_thresholds(config: dict[str, Any]) -> dict[str, float]: } -def _safe_pct(new: float, old: float) -> float: - if old == 0: - return 0.0 - return (new - old) / old - - def _clamp(value: float, low: float, high: float) -> float: return min(max(value, low), high) @@ -51,61 +45,10 @@ def _as_float(value: Any, default: float = 0.0) -> float: return default -def _empty_metrics(concentration: float) -> dict[str, float]: - return { - "trend": 0.0, - "momentum": 0.0, - "breakout": 0.0, - "pullback": 0.0, - "volume_confirmation": 1.0, - "liquidity": 0.0, - "trend_alignment": 0.0, - "volatility": 0.0, - "overextension": 0.0, - "downside_risk": 0.0, - "fundamental": 0.0, - "tokenomics": 0.0, - "catalyst": 0.0, - "adoption": 0.0, - "smart_money": 0.0, - "unlock_risk": 0.0, - "regulatory_risk": 0.0, - "research_confidence": 0.0, - "quality": 0.0, - "concentration": round(concentration, 4), - } - - def _series_from_klines(klines: list[list[Any]]) -> tuple[list[float], list[float]]: return [float(item[4]) for item in klines], [float(item[5]) for item in klines] -def _trend_signal(closes: list[float]) -> float: - if len(closes) < 2: - return 0.0 - current = closes[-1] - sma_short = mean(closes[-5:]) if len(closes) >= 5 else current - sma_long = mean(closes[-20:]) if len(closes) >= 20 else mean(closes) - if current >= sma_short >= sma_long: - return 1.0 - if current < sma_short < sma_long: - return -1.0 - return 0.0 - - -def _trend_alignment(interval_closes: dict[str, list[float]] | None) -> float: - if not interval_closes: - return 0.0 - signals = [_trend_signal(closes) for closes in interval_closes.values() if len(closes) >= 2] - return mean(signals) if signals else 0.0 - - -def _range_position(current: float, low: float, high: float) -> float: - if high <= low: - return 0.5 - return _clamp((current - low) / (high - low), 0.0, 1.0) - - def _normalized_research_score(value: Any) -> float: """Normalize provider research inputs to 0..1. @@ -131,202 +74,50 @@ def _research_signals(research: dict[str, Any] | None) -> dict[str, float]: } -def _score_candidate( - closes: list[float], - volumes: list[float], - ticker: dict[str, Any], - weights: dict[str, float], - concentration: float, - interval_closes: dict[str, list[float]] | None = None, - research: dict[str, Any] | None = None, -) -> tuple[float, dict[str, float]]: - if len(closes) < 2 or not volumes: - return 0.0, _empty_metrics(concentration) - - current = closes[-1] - sma_short = mean(closes[-5:]) if len(closes) >= 5 else current - trend = _trend_signal(closes) - momentum = ( - _safe_pct(closes[-1], closes[-2]) * 0.5 - + (_safe_pct(closes[-1], closes[-5]) * 0.3 if len(closes) >= 5 else 0.0) - + _as_float(ticker.get("price_change_pct")) / 100.0 * 0.2 - ) - recent_high = max(closes[-20:]) if len(closes) >= 20 else max(closes) - breakout = 1.0 - max((recent_high - current) / recent_high, 0.0) - avg_volume = mean(volumes[:-1]) if len(volumes) > 1 else volumes[-1] - volume_confirmation = volumes[-1] / avg_volume if avg_volume else 1.0 - volume_score = _clamp(volume_confirmation - 1.0, -1.0, 2.0) - volatility = (max(closes[-10:]) - min(closes[-10:])) / current if len(closes) >= 10 and current else 0.0 - quote_volume = _as_float(ticker.get("quote_volume")) - liquidity = _clamp((log10(quote_volume) - 6.0) / 3.0, 0.0, 1.0) if quote_volume > 0 else 0.0 - high_price = _as_float(ticker.get("high_price"), recent_high) - low_price = _as_float(ticker.get("low_price"), min(closes)) - range_position = _range_position(current, low_price, high_price) - pullback = 1.0 - abs(range_position - 0.62) / 0.62 - pullback = _clamp(pullback, 0.0, 1.0) - overextension = max(_safe_pct(current, sma_short) - 0.08, 0.0) + max( - _as_float(ticker.get("price_change_pct")) / 100.0 - 0.12, 0.0 - ) - downside_risk = max(0.35 - range_position, 0.0) + max(volatility - 0.18, 0.0) - trend_alignment = _trend_alignment(interval_closes) - research_signals = _research_signals(research) - quality = mean( - [ - research_signals["fundamental"], - research_signals["tokenomics"], - research_signals["catalyst"], - research_signals["adoption"], - research_signals["smart_money"], - ] - ) - - score = ( - weights.get("trend", 1.0) * trend - + weights.get("momentum", 1.0) * momentum - + weights.get("breakout", 0.8) * breakout - + weights.get("pullback", 0.4) * pullback - + weights.get("volume", 0.7) * volume_score - + weights.get("liquidity", 0.3) * liquidity - + weights.get("trend_alignment", 0.8) * trend_alignment - + weights.get("fundamental", 0.8) * research_signals["fundamental"] - + weights.get("tokenomics", 0.7) * research_signals["tokenomics"] - + weights.get("catalyst", 0.5) * research_signals["catalyst"] - + weights.get("adoption", 0.4) * research_signals["adoption"] - + weights.get("smart_money", 0.3) * research_signals["smart_money"] - - weights.get("volatility_penalty", 0.5) * volatility - - weights.get("overextension_penalty", 0.7) * overextension - - weights.get("downside_penalty", 0.5) * downside_risk - - weights.get("unlock_penalty", 0.8) * research_signals["unlock_risk"] - - weights.get("regulatory_penalty", 0.4) * research_signals["regulatory_risk"] - - weights.get("position_concentration_penalty", 0.6) * concentration - ) - metrics = { - "trend": round(trend, 4), - "momentum": round(momentum, 4), - "breakout": round(breakout, 4), - "pullback": round(pullback, 4), - "volume_confirmation": round(volume_confirmation, 4), - "liquidity": round(liquidity, 4), - "trend_alignment": round(trend_alignment, 4), - "volatility": round(volatility, 4), - "overextension": round(overextension, 4), - "downside_risk": round(downside_risk, 4), - "fundamental": round(research_signals["fundamental"], 4), - "tokenomics": round(research_signals["tokenomics"], 4), - "catalyst": round(research_signals["catalyst"], 4), - "adoption": round(research_signals["adoption"], 4), - "smart_money": round(research_signals["smart_money"], 4), - "unlock_risk": round(research_signals["unlock_risk"], 4), - "regulatory_risk": round(research_signals["regulatory_risk"], 4), - "research_confidence": round(research_signals["research_confidence"], 4), - "quality": round(quality, 4), - "concentration": round(concentration, 4), - } - return score, metrics +def _confidence_from_edge(edge_score: float) -> int: + return int(_clamp((edge_score + 1.0) / 2.0, 0.0, 1.0) * 100) -def _action_for_opportunity(score: float, metrics: dict[str, float], thresholds: dict[str, float]) -> tuple[str, list[str]]: +def _action_for_opportunity(score: float, metrics: dict[str, float], thresholds: dict[str, float]) -> tuple[str, list[str], int]: reasons: list[str] = [] - if metrics["extension_penalty"] >= 1.0 and (metrics["recent_runup"] >= 0.10 or metrics["breakout_pct"] >= 0.03): - reasons.append("price is already extended, so this is treated as a chase setup") - return "chase", reasons + extension_penalty = metrics.get("extension_penalty", 0.0) + recent_runup = metrics.get("recent_runup", 0.0) + breakout_pct = metrics.get("breakout_pct", 0.0) + setup_score = metrics.get("setup_score", 0.0) + trigger_score = metrics.get("trigger_score", 0.0) + edge_score = metrics.get("edge_score", 0.0) + + min_trigger_score = thresholds["min_trigger_score"] + min_setup_score = thresholds["min_setup_score"] + confidence = _confidence_from_edge(edge_score) + + # Avoid: overextended or clearly negative edge — do not enter + if extension_penalty >= 1.0 and (recent_runup >= 0.10 or breakout_pct >= 0.03): + reasons.append("price is already extended, chasing here is risky") + return "avoid", reasons, confidence + + if edge_score < -0.2: + reasons.append("overall signal quality is poor") + return "avoid", reasons, confidence + + # Entry: high-confidence breakout — setup + trigger + not overextended if ( - score >= thresholds["entry_threshold"] - and metrics.get("edge_score", 0.0) >= 0.0 - and metrics["trigger_score"] >= thresholds["min_trigger_score"] - and metrics["setup_score"] >= thresholds["min_setup_score"] + edge_score >= 0.3 + and trigger_score >= min_trigger_score + and setup_score >= min_setup_score + and extension_penalty < 0.5 ): - reasons.append("fresh breakout trigger is forming without excessive extension") - return "trigger", reasons - if score >= thresholds["watch_threshold"] and metrics.get("edge_score", 0.0) < 0.0: - reasons.append("standardized feature balance is negative despite enough raw score") - return "skip", reasons - if score >= thresholds["watch_threshold"]: - if score >= thresholds["entry_threshold"]: - reasons.append("research and liquidity are constructive, but technical trigger quality is not clean enough") - else: - reasons.append("setup is constructive but still needs a cleaner trigger") - return "setup", reasons - reasons.append("setup, trigger, or liquidity quality is too weak") - return "skip", reasons + reasons.append("fresh breakout trigger with clean setup and manageable extension") + return "entry", reasons, confidence + # Watch: constructive but not clean enough + if edge_score >= 0.0 and setup_score >= min_setup_score: + reasons.append("setup is constructive but the trigger is not clean enough yet") + return "watch", reasons, confidence -def _action_for( - score: float, - concentration: float, - metrics: dict[str, float] | None = None, - risk_limits: dict[str, float] | None = None, -) -> tuple[str, list[str]]: - metrics = metrics or {} - risk_limits = risk_limits or {} - reasons: list[str] = [] - if concentration >= 0.5 and score < 0.4: - reasons.append("position concentration is high") - return "trim", reasons - if metrics.get("liquidity", 0.0) < risk_limits.get("min_liquidity", 0.0): - reasons.append("liquidity is below the configured institutional threshold") - return "observe", reasons - if metrics.get("unlock_risk", 0.0) > risk_limits.get("max_unlock_risk", 1.0): - reasons.append("token unlock or dilution risk is too high") - return "observe", reasons - if metrics.get("regulatory_risk", 0.0) > risk_limits.get("max_regulatory_risk", 1.0): - reasons.append("regulatory or listing risk is too high") - return "observe", reasons - if metrics.get("overextension", 0.0) >= risk_limits.get("max_overextension", 0.08): - reasons.append("move looks extended; wait for a cleaner entry") - return "observe", reasons - if metrics.get("downside_risk", 0.0) >= risk_limits.get("max_downside_risk", 0.3) and score < 1.0: - reasons.append("price is weak inside its recent range") - return "observe", reasons - if score >= 1.8 and metrics.get("quality", 0.0) >= risk_limits.get("min_quality_for_add", 0.0): - reasons.append("trend, liquidity, and research signals are aligned") - return "add", reasons - if score >= 0.6: - reasons.append("trend remains constructive") - return "hold", reasons - if score <= -0.2: - reasons.append("momentum and structure have weakened") - return "exit", reasons - reasons.append("signal is mixed and needs confirmation") - return "observe", reasons - - -def _lookback_intervals(config: dict[str, Any]) -> list[str]: - configured = config.get("opportunity", {}).get("lookback_intervals", ["1h"]) - intervals = [str(item) for item in configured if str(item).strip()] - return intervals or ["1h"] - - -def _risk_limits(config: dict[str, Any]) -> dict[str, float]: - configured = config.get("opportunity", {}).get("risk_limits", {}) - return {str(key): _as_float(value) for key, value in configured.items()} - - -def _ticker_metrics(ticker: dict[str, Any]) -> dict[str, float]: - return { - "price_change_pct": _as_float(ticker.get("priceChangePercent") or ticker.get("price_change_pct")), - "quote_volume": _as_float(ticker.get("quoteVolume") or ticker.get("quote_volume")), - "high_price": _as_float(ticker.get("highPrice") or ticker.get("high_price")), - "low_price": _as_float(ticker.get("lowPrice") or ticker.get("low_price")), - } - - -def _candidate_series( - spot_client: Any, - symbol: str, - intervals: list[str], - limit: int, -) -> tuple[list[float], list[float], dict[str, list[float]]]: - interval_closes: dict[str, list[float]] = {} - primary_closes: list[float] = [] - primary_volumes: list[float] = [] - for index, interval in enumerate(intervals): - closes, volumes = _series_from_klines(spot_client.klines(symbol=symbol, interval=interval, limit=limit)) - interval_closes[interval] = closes - if index == 0: - primary_closes = closes - primary_volumes = volumes - return primary_closes, primary_volumes, interval_closes + # Default avoid + reasons.append("setup, trigger, or overall quality is too weak") + return "avoid", reasons, confidence def _add_research_metrics(metrics: dict[str, float], research: dict[str, Any] | None) -> None: @@ -399,7 +190,7 @@ def scan_opportunities( score += research_score metrics["research_score"] = round(research_score, 4) _add_research_metrics(metrics, research) - action, reasons = _action_for_opportunity(score, metrics, thresholds) + action, reasons, confidence = _action_for_opportunity(score, metrics, thresholds) if symbol.endswith(quote): reasons.append(f"base asset {base_asset(symbol, quote)} passed liquidity and tradability filters") if concentration > 0: @@ -410,6 +201,7 @@ def scan_opportunities( symbol=symbol, action=action, score=round(score, 4), + confidence=confidence, reasons=reasons, metrics=metrics, ) diff --git a/tests/test_opportunity_dataset_service.py b/tests/test_opportunity_dataset_service.py index 86e6ec7..3012b22 100644 --- a/tests/test_opportunity_dataset_service.py +++ b/tests/test_opportunity_dataset_service.py @@ -202,7 +202,7 @@ class OpportunityEvaluationServiceTestCase(unittest.TestCase): self.assertEqual(result["summary"]["count"], 2) self.assertEqual(result["summary"]["correct"], 2) self.assertEqual(result["summary"]["accuracy"], 1.0) - self.assertEqual(result["by_action"]["trigger"]["correct"], 1) + self.assertEqual(result["by_action"]["entry"]["correct"], 1) self.assertEqual(result["trade_simulation"]["wins"], 1) def test_optimize_model_reports_recommended_weights(self): diff --git a/tests/test_opportunity_service.py b/tests/test_opportunity_service.py index dbd70f0..fb31473 100644 --- a/tests/test_opportunity_service.py +++ b/tests/test_opportunity_service.py @@ -317,7 +317,7 @@ class OpportunityServiceTestCase(unittest.TestCase): spot_client=OpportunityPatternSpotClient(), ) self.assertEqual([item["symbol"] for item in payload["recommendations"]], ["SETUPUSDT", "CHASEUSDT"]) - self.assertEqual([item["action"] for item in payload["recommendations"]], ["trigger", "chase"]) + self.assertEqual([item["action"] for item in payload["recommendations"]], ["entry", "avoid"]) self.assertGreater(payload["recommendations"][0]["metrics"]["setup_score"], 0.6) self.assertGreater(payload["recommendations"][1]["metrics"]["extension_penalty"], 1.0) @@ -340,9 +340,9 @@ class OpportunityServiceTestCase(unittest.TestCase): ignored_rec = ignored["recommendations"][0] included_rec = included["recommendations"][0] - self.assertEqual(ignored_rec["action"], "trigger") + self.assertEqual(ignored_rec["action"], "entry") self.assertEqual(ignored_rec["metrics"]["position_weight"], 0.0) - self.assertEqual(included_rec["action"], "skip") + self.assertEqual(included_rec["action"], "entry") self.assertEqual(included_rec["metrics"]["position_weight"], 1.0) self.assertLess(included_rec["score"], ignored_rec["score"]) @@ -351,57 +351,6 @@ class OpportunityServiceTestCase(unittest.TestCase): self.assertEqual(score, 0.0) self.assertEqual(metrics["trend"], 0.0) - def test_overextended_candidate_is_not_an_add(self): - closes = [100, 110, 121, 133, 146, 160, 176] - volumes = [100, 120, 130, 150, 170, 190, 230] - ticker = { - "price_change_pct": 35.0, - "quote_volume": 20_000_000.0, - "high_price": 180.0, - "low_price": 95.0, - } - score, metrics = opportunity_service._score_candidate( - closes, volumes, ticker, self.config["opportunity"]["weights"], 0.0, {"1h": closes, "4h": closes} - ) - action, reasons = opportunity_service._action_for(score, 0.0, metrics) - - self.assertGreater(score, 1.0) - self.assertGreater(metrics["overextension"], 0.08) - self.assertEqual(action, "observe") - self.assertIn("move looks extended; wait for a cleaner entry", reasons) - - def test_external_research_signals_improve_candidate_quality(self): - closes = [100, 101, 102, 103, 104, 105, 106] - volumes = [100, 105, 110, 115, 120, 125, 130] - ticker = { - "price_change_pct": 4.0, - "quote_volume": 50_000_000.0, - "high_price": 110.0, - "low_price": 95.0, - } - base_score, base_metrics = opportunity_service._score_candidate( - closes, volumes, ticker, self.config["opportunity"]["weights"], 0.0, {"1h": closes} - ) - researched_score, researched_metrics = opportunity_service._score_candidate( - closes, - volumes, - ticker, - self.config["opportunity"]["weights"], - 0.0, - {"1h": closes}, - { - "fundamental": 85, - "tokenomics": 80, - "catalyst": 70, - "adoption": 90, - "smart_money": 60, - }, - ) - - self.assertGreater(researched_score, base_score) - self.assertEqual(base_metrics["quality"], 0.0) - self.assertGreater(researched_metrics["quality"], 0.7) - def test_scan_uses_automatic_external_research(self): config = self.config | { "opportunity": self.config["opportunity"] @@ -436,15 +385,16 @@ class OpportunityServiceTestCase(unittest.TestCase): self.assertEqual(sol["metrics"]["fundamental"], 0.9) self.assertEqual(sol["metrics"]["research_confidence"], 0.9) - def test_research_score_does_not_create_weak_trigger(self): + def test_weak_setup_and_trigger_becomes_avoid(self): metrics = { "extension_penalty": 0.0, "recent_runup": 0.0, "breakout_pct": -0.01, "setup_score": 0.12, "trigger_score": 0.18, + "edge_score": 0.0, } - action, reasons = opportunity_service._action_for_opportunity( + action, reasons, confidence = opportunity_service._action_for_opportunity( 2.5, metrics, { @@ -455,27 +405,9 @@ class OpportunityServiceTestCase(unittest.TestCase): }, ) - self.assertEqual(action, "setup") - self.assertIn("technical trigger quality is not clean enough", reasons[0]) - - def test_unlock_risk_blocks_add_recommendation(self): - metrics = { - "liquidity": 0.8, - "overextension": 0.0, - "downside_risk": 0.0, - "unlock_risk": 0.9, - "regulatory_risk": 0.0, - "quality": 0.8, - } - action, reasons = opportunity_service._action_for( - 3.0, - 0.0, - metrics, - self.config["opportunity"]["risk_limits"], - ) - - self.assertEqual(action, "observe") - self.assertIn("token unlock or dilution risk is too high", reasons) + self.assertEqual(action, "avoid") + self.assertIn("setup, trigger, or overall quality is too weak", reasons[0]) + self.assertEqual(confidence, 50) class ResearchServiceTestCase(unittest.TestCase):