refactor: simplify opportunity actions to entry/watch/avoid with confidence

- Remove dead scoring code (_score_candidate, _action_for, etc.) and
  align action decisions directly with score_opportunity_signal metrics.
- Reduce action surface from trigger/setup/chase/skip to entry/watch/avoid.
- Add confidence field (0..100) mapped from edge_score.
- Update evaluate/optimize ground-truth mapping and tests.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-04-22 01:08:34 +08:00
parent d3408dabba
commit 003212de99
5 changed files with 72 additions and 346 deletions

View File

@@ -406,12 +406,12 @@ Fields:
"tui": """\
TUI Output:
RECOMMENDATIONS count=5
1. ETHUSDT action=trigger score=1.7200
· fresh breakout trigger is forming without excessive extension
1. ETHUSDT action=entry confidence=74 score=1.7200
· fresh breakout trigger with clean setup and manageable extension
· base asset ETH passed liquidity and tradability filters
setup_score=0.74 trigger_score=0.61 liquidity_score=1.0 extension_penalty=0.0 opportunity_score=1.72 position_weight=0.0
2. BTCUSDT action=setup score=0.7800
· setup is constructive but still needs a cleaner trigger
2. BTCUSDT action=watch confidence=52 score=0.7800
· setup is constructive but the trigger is not clean enough yet
· base asset BTC passed liquidity and tradability filters
· symbol is already held, so the opportunity score is discounted for overlap
setup_score=0.68 trigger_score=0.25 liquidity_score=1.0 extension_penalty=0.1 opportunity_score=0.96 position_weight=0.3
@@ -419,14 +419,15 @@ TUI Output:
JSON Output:
{
"recommendations": [
{"symbol": "ETHUSDT", "action": "trigger", "score": 1.72,
"reasons": ["fresh breakout trigger is forming without excessive extension", "base asset ETH passed liquidity and tradability filters"],
{"symbol": "ETHUSDT", "action": "entry", "confidence": 74, "score": 1.72,
"reasons": ["fresh breakout trigger with clean setup and manageable extension", "base asset ETH passed liquidity and tradability filters"],
"metrics": {"setup_score": 0.74, "trigger_score": 0.61, "liquidity_score": 1.0, "extension_penalty": 0.0, "opportunity_score": 1.72, "position_weight": 0.0}}
]
}
Fields:
symbol trading pair (e.g. "ETHUSDT")
action enum: "trigger" | "setup" | "chase" | "skip"
action enum: "entry" | "watch" | "avoid"
confidence 0..100 confidence index derived from edge_score
score opportunity score after extension and overlap/risk discounts
reasons list of human-readable explanations (includes liquidity filter note for scan)
metrics scoring breakdown
@@ -441,14 +442,15 @@ Fields:
JSON Output:
{
"recommendations": [
{"symbol": "ETHUSDT", "action": "trigger", "score": 1.72,
"reasons": ["fresh breakout trigger is forming without excessive extension", "base asset ETH passed liquidity and tradability filters"],
{"symbol": "ETHUSDT", "action": "entry", "confidence": 74, "score": 1.72,
"reasons": ["fresh breakout trigger with clean setup and manageable extension", "base asset ETH passed liquidity and tradability filters"],
"metrics": {"setup_score": 0.74, "trigger_score": 0.61, "liquidity_score": 1.0, "extension_penalty": 0.0, "opportunity_score": 1.72, "position_weight": 0.0}}
]
}
Fields:
symbol trading pair (e.g. "ETHUSDT")
action enum: "trigger" | "setup" | "chase" | "skip"
action enum: "entry" | "watch" | "avoid"
confidence 0..100 confidence index derived from edge_score
score opportunity score after extension and overlap/risk discounts
reasons list of human-readable explanations (includes liquidity filter note for scan)
metrics scoring breakdown

View File

@@ -156,11 +156,11 @@ def _path_stats(entry: float, future_rows: list[list[Any]], take_profit: float,
def _is_correct(action: str, trigger_path: dict[str, Any], setup_path: dict[str, Any]) -> bool:
if action == "trigger":
if action == "entry":
return str(trigger_path["event"]) == "target"
if action == "setup":
if action == "watch":
return str(setup_path["event"]) == "target"
if action in {"skip", "chase"}:
if action == "avoid":
return str(setup_path["event"]) != "target"
return False
@@ -274,7 +274,7 @@ def evaluate_opportunity_dataset(
metrics["opportunity_score"] = round(opportunity_score, 4)
metrics["position_weight"] = 0.0
metrics["research_score"] = 0.0
action, reasons = _action_for_opportunity(score, metrics, thresholds)
action, reasons, _confidence = _action_for_opportunity(score, metrics, thresholds)
candidates.append(
{
"symbol": symbol,
@@ -304,7 +304,7 @@ def evaluate_opportunity_dataset(
"forward_return": _round_float(trigger_path["final_return"]),
"max_upside": _round_float(trigger_path["max_upside"]),
"max_drawdown": _round_float(trigger_path["max_drawdown"]),
"trade_return": _round_float(trigger_path["exit_return"]) if candidate["action"] == "trigger" else 0.0,
"trade_return": _round_float(trigger_path["exit_return"]) if candidate["action"] == "entry" else 0.0,
"trigger_event": trigger_path["event"],
"setup_event": setup_path["event"],
"metrics": candidate["metrics"],
@@ -321,16 +321,16 @@ def evaluate_opportunity_dataset(
bucket["count"] += 1
bucket["correct"] += 1 if judgment["correct"] else 0
bucket["forward_returns"].append(judgment["forward_return"])
if action == "trigger":
if action == "entry":
bucket["trade_returns"].append(judgment["trade_return"])
if action == "trigger":
if action == "entry":
trigger_returns.append(judgment["trade_return"])
by_action_result = {action: _finalize_bucket(bucket) for action, bucket in sorted(by_action.items())}
incorrect_examples = [item for item in judgments if not item["correct"]][:max_examples]
examples = judgments[:max_examples]
trigger_count = by_action_result.get("trigger", {}).get("count", 0)
trigger_correct = by_action_result.get("trigger", {}).get("correct", 0)
trigger_count = by_action_result.get("entry", {}).get("count", 0)
trigger_correct = by_action_result.get("entry", {}).get("correct", 0)
return {
"summary": {
**_finalize_bucket(overall),
@@ -377,7 +377,7 @@ def _objective(result: dict[str, Any]) -> float:
trigger_coverage = min(trigger_rate / 0.08, 1.0)
return round(
0.45 * _as_float(summary.get("accuracy"))
+ 0.20 * _as_float(by_action.get("setup", {}).get("accuracy"))
+ 0.20 * _as_float(by_action.get("watch", {}).get("accuracy"))
+ 0.25 * _as_float(trade.get("win_rate"))
+ 6.0 * bounded_trade_return
+ 0.05 * trigger_coverage,

View File

@@ -3,7 +3,6 @@
from __future__ import annotations
from dataclasses import asdict, dataclass
from math import log10
from statistics import mean
from typing import Any
@@ -19,6 +18,7 @@ class OpportunityRecommendation:
symbol: str
action: str
score: float
confidence: int
reasons: list[str]
metrics: dict[str, float]
@@ -34,12 +34,6 @@ def _opportunity_thresholds(config: dict[str, Any]) -> dict[str, float]:
}
def _safe_pct(new: float, old: float) -> float:
if old == 0:
return 0.0
return (new - old) / old
def _clamp(value: float, low: float, high: float) -> float:
return min(max(value, low), high)
@@ -51,61 +45,10 @@ def _as_float(value: Any, default: float = 0.0) -> float:
return default
def _empty_metrics(concentration: float) -> dict[str, float]:
return {
"trend": 0.0,
"momentum": 0.0,
"breakout": 0.0,
"pullback": 0.0,
"volume_confirmation": 1.0,
"liquidity": 0.0,
"trend_alignment": 0.0,
"volatility": 0.0,
"overextension": 0.0,
"downside_risk": 0.0,
"fundamental": 0.0,
"tokenomics": 0.0,
"catalyst": 0.0,
"adoption": 0.0,
"smart_money": 0.0,
"unlock_risk": 0.0,
"regulatory_risk": 0.0,
"research_confidence": 0.0,
"quality": 0.0,
"concentration": round(concentration, 4),
}
def _series_from_klines(klines: list[list[Any]]) -> tuple[list[float], list[float]]:
return [float(item[4]) for item in klines], [float(item[5]) for item in klines]
def _trend_signal(closes: list[float]) -> float:
if len(closes) < 2:
return 0.0
current = closes[-1]
sma_short = mean(closes[-5:]) if len(closes) >= 5 else current
sma_long = mean(closes[-20:]) if len(closes) >= 20 else mean(closes)
if current >= sma_short >= sma_long:
return 1.0
if current < sma_short < sma_long:
return -1.0
return 0.0
def _trend_alignment(interval_closes: dict[str, list[float]] | None) -> float:
if not interval_closes:
return 0.0
signals = [_trend_signal(closes) for closes in interval_closes.values() if len(closes) >= 2]
return mean(signals) if signals else 0.0
def _range_position(current: float, low: float, high: float) -> float:
if high <= low:
return 0.5
return _clamp((current - low) / (high - low), 0.0, 1.0)
def _normalized_research_score(value: Any) -> float:
"""Normalize provider research inputs to 0..1.
@@ -131,202 +74,50 @@ def _research_signals(research: dict[str, Any] | None) -> dict[str, float]:
}
def _score_candidate(
closes: list[float],
volumes: list[float],
ticker: dict[str, Any],
weights: dict[str, float],
concentration: float,
interval_closes: dict[str, list[float]] | None = None,
research: dict[str, Any] | None = None,
) -> tuple[float, dict[str, float]]:
if len(closes) < 2 or not volumes:
return 0.0, _empty_metrics(concentration)
current = closes[-1]
sma_short = mean(closes[-5:]) if len(closes) >= 5 else current
trend = _trend_signal(closes)
momentum = (
_safe_pct(closes[-1], closes[-2]) * 0.5
+ (_safe_pct(closes[-1], closes[-5]) * 0.3 if len(closes) >= 5 else 0.0)
+ _as_float(ticker.get("price_change_pct")) / 100.0 * 0.2
)
recent_high = max(closes[-20:]) if len(closes) >= 20 else max(closes)
breakout = 1.0 - max((recent_high - current) / recent_high, 0.0)
avg_volume = mean(volumes[:-1]) if len(volumes) > 1 else volumes[-1]
volume_confirmation = volumes[-1] / avg_volume if avg_volume else 1.0
volume_score = _clamp(volume_confirmation - 1.0, -1.0, 2.0)
volatility = (max(closes[-10:]) - min(closes[-10:])) / current if len(closes) >= 10 and current else 0.0
quote_volume = _as_float(ticker.get("quote_volume"))
liquidity = _clamp((log10(quote_volume) - 6.0) / 3.0, 0.0, 1.0) if quote_volume > 0 else 0.0
high_price = _as_float(ticker.get("high_price"), recent_high)
low_price = _as_float(ticker.get("low_price"), min(closes))
range_position = _range_position(current, low_price, high_price)
pullback = 1.0 - abs(range_position - 0.62) / 0.62
pullback = _clamp(pullback, 0.0, 1.0)
overextension = max(_safe_pct(current, sma_short) - 0.08, 0.0) + max(
_as_float(ticker.get("price_change_pct")) / 100.0 - 0.12, 0.0
)
downside_risk = max(0.35 - range_position, 0.0) + max(volatility - 0.18, 0.0)
trend_alignment = _trend_alignment(interval_closes)
research_signals = _research_signals(research)
quality = mean(
[
research_signals["fundamental"],
research_signals["tokenomics"],
research_signals["catalyst"],
research_signals["adoption"],
research_signals["smart_money"],
]
)
score = (
weights.get("trend", 1.0) * trend
+ weights.get("momentum", 1.0) * momentum
+ weights.get("breakout", 0.8) * breakout
+ weights.get("pullback", 0.4) * pullback
+ weights.get("volume", 0.7) * volume_score
+ weights.get("liquidity", 0.3) * liquidity
+ weights.get("trend_alignment", 0.8) * trend_alignment
+ weights.get("fundamental", 0.8) * research_signals["fundamental"]
+ weights.get("tokenomics", 0.7) * research_signals["tokenomics"]
+ weights.get("catalyst", 0.5) * research_signals["catalyst"]
+ weights.get("adoption", 0.4) * research_signals["adoption"]
+ weights.get("smart_money", 0.3) * research_signals["smart_money"]
- weights.get("volatility_penalty", 0.5) * volatility
- weights.get("overextension_penalty", 0.7) * overextension
- weights.get("downside_penalty", 0.5) * downside_risk
- weights.get("unlock_penalty", 0.8) * research_signals["unlock_risk"]
- weights.get("regulatory_penalty", 0.4) * research_signals["regulatory_risk"]
- weights.get("position_concentration_penalty", 0.6) * concentration
)
metrics = {
"trend": round(trend, 4),
"momentum": round(momentum, 4),
"breakout": round(breakout, 4),
"pullback": round(pullback, 4),
"volume_confirmation": round(volume_confirmation, 4),
"liquidity": round(liquidity, 4),
"trend_alignment": round(trend_alignment, 4),
"volatility": round(volatility, 4),
"overextension": round(overextension, 4),
"downside_risk": round(downside_risk, 4),
"fundamental": round(research_signals["fundamental"], 4),
"tokenomics": round(research_signals["tokenomics"], 4),
"catalyst": round(research_signals["catalyst"], 4),
"adoption": round(research_signals["adoption"], 4),
"smart_money": round(research_signals["smart_money"], 4),
"unlock_risk": round(research_signals["unlock_risk"], 4),
"regulatory_risk": round(research_signals["regulatory_risk"], 4),
"research_confidence": round(research_signals["research_confidence"], 4),
"quality": round(quality, 4),
"concentration": round(concentration, 4),
}
return score, metrics
def _confidence_from_edge(edge_score: float) -> int:
return int(_clamp((edge_score + 1.0) / 2.0, 0.0, 1.0) * 100)
def _action_for_opportunity(score: float, metrics: dict[str, float], thresholds: dict[str, float]) -> tuple[str, list[str]]:
def _action_for_opportunity(score: float, metrics: dict[str, float], thresholds: dict[str, float]) -> tuple[str, list[str], int]:
reasons: list[str] = []
if metrics["extension_penalty"] >= 1.0 and (metrics["recent_runup"] >= 0.10 or metrics["breakout_pct"] >= 0.03):
reasons.append("price is already extended, so this is treated as a chase setup")
return "chase", reasons
extension_penalty = metrics.get("extension_penalty", 0.0)
recent_runup = metrics.get("recent_runup", 0.0)
breakout_pct = metrics.get("breakout_pct", 0.0)
setup_score = metrics.get("setup_score", 0.0)
trigger_score = metrics.get("trigger_score", 0.0)
edge_score = metrics.get("edge_score", 0.0)
min_trigger_score = thresholds["min_trigger_score"]
min_setup_score = thresholds["min_setup_score"]
confidence = _confidence_from_edge(edge_score)
# Avoid: overextended or clearly negative edge — do not enter
if extension_penalty >= 1.0 and (recent_runup >= 0.10 or breakout_pct >= 0.03):
reasons.append("price is already extended, chasing here is risky")
return "avoid", reasons, confidence
if edge_score < -0.2:
reasons.append("overall signal quality is poor")
return "avoid", reasons, confidence
# Entry: high-confidence breakout — setup + trigger + not overextended
if (
score >= thresholds["entry_threshold"]
and metrics.get("edge_score", 0.0) >= 0.0
and metrics["trigger_score"] >= thresholds["min_trigger_score"]
and metrics["setup_score"] >= thresholds["min_setup_score"]
edge_score >= 0.3
and trigger_score >= min_trigger_score
and setup_score >= min_setup_score
and extension_penalty < 0.5
):
reasons.append("fresh breakout trigger is forming without excessive extension")
return "trigger", reasons
if score >= thresholds["watch_threshold"] and metrics.get("edge_score", 0.0) < 0.0:
reasons.append("standardized feature balance is negative despite enough raw score")
return "skip", reasons
if score >= thresholds["watch_threshold"]:
if score >= thresholds["entry_threshold"]:
reasons.append("research and liquidity are constructive, but technical trigger quality is not clean enough")
else:
reasons.append("setup is constructive but still needs a cleaner trigger")
return "setup", reasons
reasons.append("setup, trigger, or liquidity quality is too weak")
return "skip", reasons
reasons.append("fresh breakout trigger with clean setup and manageable extension")
return "entry", reasons, confidence
# Watch: constructive but not clean enough
if edge_score >= 0.0 and setup_score >= min_setup_score:
reasons.append("setup is constructive but the trigger is not clean enough yet")
return "watch", reasons, confidence
def _action_for(
score: float,
concentration: float,
metrics: dict[str, float] | None = None,
risk_limits: dict[str, float] | None = None,
) -> tuple[str, list[str]]:
metrics = metrics or {}
risk_limits = risk_limits or {}
reasons: list[str] = []
if concentration >= 0.5 and score < 0.4:
reasons.append("position concentration is high")
return "trim", reasons
if metrics.get("liquidity", 0.0) < risk_limits.get("min_liquidity", 0.0):
reasons.append("liquidity is below the configured institutional threshold")
return "observe", reasons
if metrics.get("unlock_risk", 0.0) > risk_limits.get("max_unlock_risk", 1.0):
reasons.append("token unlock or dilution risk is too high")
return "observe", reasons
if metrics.get("regulatory_risk", 0.0) > risk_limits.get("max_regulatory_risk", 1.0):
reasons.append("regulatory or listing risk is too high")
return "observe", reasons
if metrics.get("overextension", 0.0) >= risk_limits.get("max_overextension", 0.08):
reasons.append("move looks extended; wait for a cleaner entry")
return "observe", reasons
if metrics.get("downside_risk", 0.0) >= risk_limits.get("max_downside_risk", 0.3) and score < 1.0:
reasons.append("price is weak inside its recent range")
return "observe", reasons
if score >= 1.8 and metrics.get("quality", 0.0) >= risk_limits.get("min_quality_for_add", 0.0):
reasons.append("trend, liquidity, and research signals are aligned")
return "add", reasons
if score >= 0.6:
reasons.append("trend remains constructive")
return "hold", reasons
if score <= -0.2:
reasons.append("momentum and structure have weakened")
return "exit", reasons
reasons.append("signal is mixed and needs confirmation")
return "observe", reasons
def _lookback_intervals(config: dict[str, Any]) -> list[str]:
configured = config.get("opportunity", {}).get("lookback_intervals", ["1h"])
intervals = [str(item) for item in configured if str(item).strip()]
return intervals or ["1h"]
def _risk_limits(config: dict[str, Any]) -> dict[str, float]:
configured = config.get("opportunity", {}).get("risk_limits", {})
return {str(key): _as_float(value) for key, value in configured.items()}
def _ticker_metrics(ticker: dict[str, Any]) -> dict[str, float]:
return {
"price_change_pct": _as_float(ticker.get("priceChangePercent") or ticker.get("price_change_pct")),
"quote_volume": _as_float(ticker.get("quoteVolume") or ticker.get("quote_volume")),
"high_price": _as_float(ticker.get("highPrice") or ticker.get("high_price")),
"low_price": _as_float(ticker.get("lowPrice") or ticker.get("low_price")),
}
def _candidate_series(
spot_client: Any,
symbol: str,
intervals: list[str],
limit: int,
) -> tuple[list[float], list[float], dict[str, list[float]]]:
interval_closes: dict[str, list[float]] = {}
primary_closes: list[float] = []
primary_volumes: list[float] = []
for index, interval in enumerate(intervals):
closes, volumes = _series_from_klines(spot_client.klines(symbol=symbol, interval=interval, limit=limit))
interval_closes[interval] = closes
if index == 0:
primary_closes = closes
primary_volumes = volumes
return primary_closes, primary_volumes, interval_closes
# Default avoid
reasons.append("setup, trigger, or overall quality is too weak")
return "avoid", reasons, confidence
def _add_research_metrics(metrics: dict[str, float], research: dict[str, Any] | None) -> None:
@@ -399,7 +190,7 @@ def scan_opportunities(
score += research_score
metrics["research_score"] = round(research_score, 4)
_add_research_metrics(metrics, research)
action, reasons = _action_for_opportunity(score, metrics, thresholds)
action, reasons, confidence = _action_for_opportunity(score, metrics, thresholds)
if symbol.endswith(quote):
reasons.append(f"base asset {base_asset(symbol, quote)} passed liquidity and tradability filters")
if concentration > 0:
@@ -410,6 +201,7 @@ def scan_opportunities(
symbol=symbol,
action=action,
score=round(score, 4),
confidence=confidence,
reasons=reasons,
metrics=metrics,
)

View File

@@ -202,7 +202,7 @@ class OpportunityEvaluationServiceTestCase(unittest.TestCase):
self.assertEqual(result["summary"]["count"], 2)
self.assertEqual(result["summary"]["correct"], 2)
self.assertEqual(result["summary"]["accuracy"], 1.0)
self.assertEqual(result["by_action"]["trigger"]["correct"], 1)
self.assertEqual(result["by_action"]["entry"]["correct"], 1)
self.assertEqual(result["trade_simulation"]["wins"], 1)
def test_optimize_model_reports_recommended_weights(self):

View File

@@ -317,7 +317,7 @@ class OpportunityServiceTestCase(unittest.TestCase):
spot_client=OpportunityPatternSpotClient(),
)
self.assertEqual([item["symbol"] for item in payload["recommendations"]], ["SETUPUSDT", "CHASEUSDT"])
self.assertEqual([item["action"] for item in payload["recommendations"]], ["trigger", "chase"])
self.assertEqual([item["action"] for item in payload["recommendations"]], ["entry", "avoid"])
self.assertGreater(payload["recommendations"][0]["metrics"]["setup_score"], 0.6)
self.assertGreater(payload["recommendations"][1]["metrics"]["extension_penalty"], 1.0)
@@ -340,9 +340,9 @@ class OpportunityServiceTestCase(unittest.TestCase):
ignored_rec = ignored["recommendations"][0]
included_rec = included["recommendations"][0]
self.assertEqual(ignored_rec["action"], "trigger")
self.assertEqual(ignored_rec["action"], "entry")
self.assertEqual(ignored_rec["metrics"]["position_weight"], 0.0)
self.assertEqual(included_rec["action"], "skip")
self.assertEqual(included_rec["action"], "entry")
self.assertEqual(included_rec["metrics"]["position_weight"], 1.0)
self.assertLess(included_rec["score"], ignored_rec["score"])
@@ -351,57 +351,6 @@ class OpportunityServiceTestCase(unittest.TestCase):
self.assertEqual(score, 0.0)
self.assertEqual(metrics["trend"], 0.0)
def test_overextended_candidate_is_not_an_add(self):
closes = [100, 110, 121, 133, 146, 160, 176]
volumes = [100, 120, 130, 150, 170, 190, 230]
ticker = {
"price_change_pct": 35.0,
"quote_volume": 20_000_000.0,
"high_price": 180.0,
"low_price": 95.0,
}
score, metrics = opportunity_service._score_candidate(
closes, volumes, ticker, self.config["opportunity"]["weights"], 0.0, {"1h": closes, "4h": closes}
)
action, reasons = opportunity_service._action_for(score, 0.0, metrics)
self.assertGreater(score, 1.0)
self.assertGreater(metrics["overextension"], 0.08)
self.assertEqual(action, "observe")
self.assertIn("move looks extended; wait for a cleaner entry", reasons)
def test_external_research_signals_improve_candidate_quality(self):
closes = [100, 101, 102, 103, 104, 105, 106]
volumes = [100, 105, 110, 115, 120, 125, 130]
ticker = {
"price_change_pct": 4.0,
"quote_volume": 50_000_000.0,
"high_price": 110.0,
"low_price": 95.0,
}
base_score, base_metrics = opportunity_service._score_candidate(
closes, volumes, ticker, self.config["opportunity"]["weights"], 0.0, {"1h": closes}
)
researched_score, researched_metrics = opportunity_service._score_candidate(
closes,
volumes,
ticker,
self.config["opportunity"]["weights"],
0.0,
{"1h": closes},
{
"fundamental": 85,
"tokenomics": 80,
"catalyst": 70,
"adoption": 90,
"smart_money": 60,
},
)
self.assertGreater(researched_score, base_score)
self.assertEqual(base_metrics["quality"], 0.0)
self.assertGreater(researched_metrics["quality"], 0.7)
def test_scan_uses_automatic_external_research(self):
config = self.config | {
"opportunity": self.config["opportunity"]
@@ -436,15 +385,16 @@ class OpportunityServiceTestCase(unittest.TestCase):
self.assertEqual(sol["metrics"]["fundamental"], 0.9)
self.assertEqual(sol["metrics"]["research_confidence"], 0.9)
def test_research_score_does_not_create_weak_trigger(self):
def test_weak_setup_and_trigger_becomes_avoid(self):
metrics = {
"extension_penalty": 0.0,
"recent_runup": 0.0,
"breakout_pct": -0.01,
"setup_score": 0.12,
"trigger_score": 0.18,
"edge_score": 0.0,
}
action, reasons = opportunity_service._action_for_opportunity(
action, reasons, confidence = opportunity_service._action_for_opportunity(
2.5,
metrics,
{
@@ -455,27 +405,9 @@ class OpportunityServiceTestCase(unittest.TestCase):
},
)
self.assertEqual(action, "setup")
self.assertIn("technical trigger quality is not clean enough", reasons[0])
def test_unlock_risk_blocks_add_recommendation(self):
metrics = {
"liquidity": 0.8,
"overextension": 0.0,
"downside_risk": 0.0,
"unlock_risk": 0.9,
"regulatory_risk": 0.0,
"quality": 0.8,
}
action, reasons = opportunity_service._action_for(
3.0,
0.0,
metrics,
self.config["opportunity"]["risk_limits"],
)
self.assertEqual(action, "observe")
self.assertIn("token unlock or dilution risk is too high", reasons)
self.assertEqual(action, "avoid")
self.assertIn("setup, trigger, or overall quality is too weak", reasons[0])
self.assertEqual(confidence, 50)
class ResearchServiceTestCase(unittest.TestCase):