Refactor opportunity scoring model

This commit is contained in:
2026-04-21 11:25:38 +08:00
parent 4761067c30
commit 50402e4aa7
7 changed files with 269 additions and 42 deletions

View File

@@ -27,7 +27,7 @@
## What's New in 3.0
- **Split decision models** — portfolio (add/hold/trim/exit) and opportunity (enter/watch/skip) now use independent scoring logic.
- **Split decision models** — portfolio (add/hold/trim/exit) and opportunity (trigger/setup/chase/skip) now use independent scoring logic.
- **Configurable ticker windows** — `market tickers` supports `--window 1h`, `4h`, or `1d`.
- **Live / dry-run audit logs** — audit logs are written to separate subdirectories; use `catlog --dry-run` to review simulations.
- **Flattened commands** — `account`, `opportunity`, and `config` are now top-level for fewer keystrokes.

View File

@@ -402,49 +402,57 @@ Fields:
"tui": """\
TUI Output:
RECOMMENDATIONS count=5
1. ETHUSDT action=enter score=0.8200
· trend, momentum, and breakout are aligned for a fresh entry
1. ETHUSDT action=trigger score=1.7200
· fresh breakout trigger is forming without excessive extension
· base asset ETH passed liquidity and tradability filters
trend=1.0 momentum=0.03 breakout=0.9 volume_confirmation=1.5 volatility=0.02 signal_score=0.82 position_weight=0.0
2. BTCUSDT action=watch score=0.6000
· market structure is constructive but still needs confirmation
setup_score=0.74 trigger_score=0.61 liquidity_score=1.0 extension_penalty=0.0 opportunity_score=1.72 position_weight=0.0
2. BTCUSDT action=setup score=0.7800
· setup is constructive but still needs a cleaner trigger
· base asset BTC passed liquidity and tradability filters
· symbol is already held, so the opportunity score is discounted for overlap
trend=1.0 momentum=0.01 breakout=0.6 volume_confirmation=1.1 volatility=0.01 signal_score=0.78 position_weight=0.3
setup_score=0.68 trigger_score=0.25 liquidity_score=1.0 extension_penalty=0.1 opportunity_score=0.96 position_weight=0.3
JSON Output:
{
"recommendations": [
{"symbol": "ETHUSDT", "action": "enter", "score": 0.82,
"reasons": ["trend, momentum, and breakout are aligned for a fresh entry", "base asset ETH passed liquidity and tradability filters"],
"metrics": {"trend": 1.0, "momentum": 0.03, "breakout": 0.9, "volume_confirmation": 1.5, "volatility": 0.02, "signal_score": 0.82, "position_weight": 0.0}}
{"symbol": "ETHUSDT", "action": "trigger", "score": 1.72,
"reasons": ["fresh breakout trigger is forming without excessive extension", "base asset ETH passed liquidity and tradability filters"],
"metrics": {"setup_score": 0.74, "trigger_score": 0.61, "liquidity_score": 1.0, "extension_penalty": 0.0, "opportunity_score": 1.72, "position_weight": 0.0}}
]
}
Fields:
symbol trading pair (e.g. "ETHUSDT")
action enum: "enter" | "watch" | "skip"
score opportunity score after overlap/risk discounts
action enum: "trigger" | "setup" | "chase" | "skip"
score opportunity score after extension and overlap/risk discounts
reasons list of human-readable explanations (includes liquidity filter note for scan)
metrics scoring breakdown
signal_score raw shared market signal score before overlap discount
setup_score compression, higher-lows, and breakout-proximity quality
trigger_score fresh-breakout, volume, and controlled-momentum quality
liquidity_score relative quote-volume quality after liquidity filters
extension_penalty overextension/chase risk from run-up and MA distance
opportunity_score raw opportunity score before overlap discount
position_weight current portfolio overlap in the same symbol
""",
"json": """\
JSON Output:
{
"recommendations": [
{"symbol": "ETHUSDT", "action": "enter", "score": 0.82,
"reasons": ["trend, momentum, and breakout are aligned for a fresh entry", "base asset ETH passed liquidity and tradability filters"],
"metrics": {"trend": 1.0, "momentum": 0.03, "breakout": 0.9, "volume_confirmation": 1.5, "volatility": 0.02, "signal_score": 0.82, "position_weight": 0.0}}
{"symbol": "ETHUSDT", "action": "trigger", "score": 1.72,
"reasons": ["fresh breakout trigger is forming without excessive extension", "base asset ETH passed liquidity and tradability filters"],
"metrics": {"setup_score": 0.74, "trigger_score": 0.61, "liquidity_score": 1.0, "extension_penalty": 0.0, "opportunity_score": 1.72, "position_weight": 0.0}}
]
}
Fields:
symbol trading pair (e.g. "ETHUSDT")
action enum: "enter" | "watch" | "skip"
score opportunity score after overlap/risk discounts
action enum: "trigger" | "setup" | "chase" | "skip"
score opportunity score after extension and overlap/risk discounts
reasons list of human-readable explanations (includes liquidity filter note for scan)
metrics scoring breakdown
signal_score raw shared market signal score before overlap discount
setup_score compression, higher-lows, and breakout-proximity quality
trigger_score fresh-breakout, volume, and controlled-momentum quality
liquidity_score relative quote-volume quality after liquidity filters
extension_penalty overextension/chase risk from run-up and MA distance
opportunity_score raw opportunity score before overlap discount
position_weight current portfolio overlap in the same symbol
""",
},

View File

@@ -335,11 +335,11 @@ def _render_tui(payload: Any) -> None:
action = r.get("action", "")
action_color = (
_GREEN
if action in {"add", "enter"}
if action in {"add", "trigger"}
else _YELLOW
if action in {"hold", "watch", "review"}
if action in {"hold", "setup", "review"}
else _RED
if action in {"exit", "skip", "trim"}
if action in {"chase", "exit", "skip", "trim"}
else _CYAN
)
print(

View File

@@ -8,7 +8,7 @@ from typing import Any
from ..audit import audit_event
from .account_service import get_positions
from .market_service import base_asset, get_scan_universe, normalize_symbol
from .signal_service import get_signal_interval, get_signal_weights, score_market_signal
from .signal_service import get_signal_interval, score_opportunity_signal
@dataclass
@@ -29,15 +29,18 @@ def _opportunity_thresholds(config: dict[str, Any]) -> dict[str, float]:
}
def _action_for_opportunity(score: float, thresholds: dict[str, float]) -> tuple[str, list[str]]:
def _action_for_opportunity(score: float, metrics: dict[str, float], thresholds: dict[str, float]) -> tuple[str, list[str]]:
reasons: list[str] = []
if metrics["extension_penalty"] >= 1.0 and (metrics["recent_runup"] >= 0.10 or metrics["breakout_pct"] >= 0.03):
reasons.append("price is already extended, so this is treated as a chase setup")
return "chase", reasons
if score >= thresholds["entry_threshold"]:
reasons.append("trend, momentum, and breakout are aligned for a fresh entry")
return "enter", reasons
reasons.append("fresh breakout trigger is forming without excessive extension")
return "trigger", reasons
if score >= thresholds["watch_threshold"]:
reasons.append("market structure is constructive but still needs confirmation")
return "watch", reasons
reasons.append("edge is too weak for a new entry")
reasons.append("setup is constructive but still needs a cleaner trigger")
return "setup", reasons
reasons.append("setup, trigger, or liquidity quality is too weak")
return "skip", reasons
@@ -49,7 +52,6 @@ def scan_opportunities(
) -> dict[str, Any]:
opportunity_config = config.get("opportunity", {})
ignore_dust = bool(opportunity_config.get("ignore_dust", True))
signal_weights = get_signal_weights(config)
interval = get_signal_interval(config)
thresholds = _opportunity_thresholds(config)
scan_limit = int(opportunity_config.get("scan_limit", 50))
@@ -67,10 +69,10 @@ def scan_opportunities(
closes = [float(item[4]) for item in klines]
volumes = [float(item[5]) for item in klines]
concentration = concentration_map.get(symbol, 0.0) / total_held
signal_score, metrics = score_market_signal(closes, volumes, ticker, signal_weights)
score = signal_score - thresholds["overlap_penalty"] * concentration
action, reasons = _action_for_opportunity(score, thresholds)
metrics["signal_score"] = round(signal_score, 4)
opportunity_score, metrics = score_opportunity_signal(closes, volumes, ticker, opportunity_config)
score = opportunity_score - thresholds["overlap_penalty"] * concentration
action, reasons = _action_for_opportunity(score, metrics, thresholds)
metrics["opportunity_score"] = round(opportunity_score, 4)
metrics["position_weight"] = round(concentration, 4)
if symbol.endswith(quote):
reasons.append(f"base asset {base_asset(symbol, quote)} passed liquidity and tradability filters")

View File

@@ -8,7 +8,7 @@ from typing import Any
from ..audit import audit_event
from .account_service import get_positions
from .market_service import normalize_symbol
from .signal_service import get_signal_interval, get_signal_weights, score_market_signal
from .signal_service import get_signal_interval, get_signal_weights, score_portfolio_signal
@dataclass
@@ -70,7 +70,7 @@ def analyze_portfolio(config: dict[str, Any], *, spot_client: Any) -> dict[str,
tickers = spot_client.ticker_stats([symbol], window="1d")
ticker = tickers[0] if tickers else {"priceChangePercent": "0"}
concentration = position["notional_usdt"] / total_notional
score, metrics = score_market_signal(
score, metrics = score_portfolio_signal(
closes,
volumes,
{"price_change_pct": float(ticker.get("priceChangePercent") or 0.0)},

View File

@@ -1,17 +1,28 @@
"""Shared market signal scoring."""
"""Market signal scoring primitives and domain-specific models."""
from __future__ import annotations
from math import log10
from statistics import mean
from typing import Any
def _clamp(value: float, low: float, high: float) -> float:
return max(low, min(value, high))
def _safe_pct(new: float, old: float) -> float:
if old == 0:
return 0.0
return (new - old) / old
def _range_pct(values: list[float], denominator: float) -> float:
if not values or denominator == 0:
return 0.0
return (max(values) - min(values)) / denominator
def get_signal_weights(config: dict[str, Any]) -> dict[str, float]:
signal_config = config.get("signal", {})
return {
@@ -35,6 +46,15 @@ def score_market_signal(
volumes: list[float],
ticker: dict[str, Any],
weights: dict[str, float],
) -> tuple[float, dict[str, float]]:
return score_portfolio_signal(closes, volumes, ticker, weights)
def score_portfolio_signal(
closes: list[float],
volumes: list[float],
ticker: dict[str, Any],
weights: dict[str, float],
) -> tuple[float, dict[str, float]]:
if len(closes) < 2 or not volumes:
return 0.0, {
@@ -76,3 +96,97 @@ def score_market_signal(
"volatility": round(volatility, 4),
}
return score, metrics
def score_opportunity_signal(
closes: list[float],
volumes: list[float],
ticker: dict[str, Any],
opportunity_config: dict[str, Any],
) -> tuple[float, dict[str, float]]:
if len(closes) < 6 or len(volumes) < 2:
return 0.0, {
"setup_score": 0.0,
"trigger_score": 0.0,
"liquidity_score": 0.0,
"extension_penalty": 0.0,
"breakout_pct": 0.0,
"recent_runup": 0.0,
"volume_confirmation": 1.0,
"volatility": 0.0,
}
current = closes[-1]
prior_closes = closes[:-1]
prev_high = max(prior_closes[-20:]) if prior_closes else current
recent_low = min(closes[-20:])
range_width = prev_high - recent_low
range_position = _clamp((current - recent_low) / range_width, 0.0, 1.2) if range_width else 0.0
breakout_pct = _safe_pct(current, prev_high)
recent_range = _range_pct(closes[-6:], current)
prior_window = closes[-20:-6] if len(closes) >= 20 else closes[:-6]
prior_range = _range_pct(prior_window, current) if prior_window else recent_range
compression = _clamp(1.0 - (recent_range / prior_range), -1.0, 1.0) if prior_range else 0.0
recent_low_window = min(closes[-5:])
prior_low_window = min(closes[-10:-5]) if len(closes) >= 10 else min(closes[:-5])
higher_lows = 1.0 if recent_low_window > prior_low_window else 0.0
breakout_proximity = _clamp(1.0 - abs(breakout_pct) / 0.03, 0.0, 1.0)
setup_score = _clamp(0.45 * compression + 0.35 * breakout_proximity + 0.20 * higher_lows, 0.0, 1.0)
avg_volume = mean(volumes[:-1])
volume_confirmation = volumes[-1] / avg_volume if avg_volume else 1.0
volume_score = _clamp((volume_confirmation - 1.0) / 1.5, -0.5, 1.0)
momentum_3 = _safe_pct(closes[-1], closes[-4])
if momentum_3 <= 0:
controlled_momentum = _clamp(momentum_3 / 0.05, -0.5, 0.0)
elif momentum_3 <= 0.05:
controlled_momentum = momentum_3 / 0.05
elif momentum_3 <= 0.12:
controlled_momentum = 1.0 - ((momentum_3 - 0.05) / 0.07) * 0.5
else:
controlled_momentum = 0.2
fresh_breakout = _clamp(1.0 - abs(breakout_pct) / 0.025, 0.0, 1.0)
trigger_score = _clamp(0.40 * fresh_breakout + 0.35 * volume_score + 0.25 * controlled_momentum, 0.0, 1.0)
sma_short = mean(closes[-5:])
sma_long = mean(closes[-20:]) if len(closes) >= 20 else mean(closes)
extension_from_short = _safe_pct(current, sma_short)
recent_runup = _safe_pct(current, closes[-6])
extension_penalty = (
_clamp((extension_from_short - 0.025) / 0.075, 0.0, 1.0)
+ _clamp((recent_runup - 0.08) / 0.12, 0.0, 1.0)
+ _clamp((float(ticker.get("price_change_pct", 0.0)) / 100.0 - 0.12) / 0.18, 0.0, 1.0)
)
volatility = _range_pct(closes[-10:], current)
min_quote_volume = float(opportunity_config.get("min_quote_volume", 0.0))
quote_volume = float(ticker.get("quote_volume") or ticker.get("quoteVolume") or 0.0)
if min_quote_volume > 0 and quote_volume > 0:
liquidity_score = _clamp(log10(max(quote_volume / min_quote_volume, 1.0)) / 2.0, 0.0, 1.0)
else:
liquidity_score = 1.0
score = (
setup_score
+ 1.2 * trigger_score
+ 0.4 * liquidity_score
- 0.8 * volatility
- 0.9 * extension_penalty
)
metrics = {
"setup_score": round(setup_score, 4),
"trigger_score": round(trigger_score, 4),
"liquidity_score": round(liquidity_score, 4),
"extension_penalty": round(extension_penalty, 4),
"compression": round(compression, 4),
"range_position": round(range_position, 4),
"breakout_pct": round(breakout_pct, 4),
"recent_runup": round(recent_runup, 4),
"volume_confirmation": round(volume_confirmation, 4),
"volatility": round(volatility, 4),
"sma_short_distance": round(extension_from_short, 4),
"sma_long_distance": round(_safe_pct(current, sma_long), 4),
}
return score, metrics

View File

@@ -128,11 +128,111 @@ class DustOverlapSpotClient(FakeSpotClient):
def klines(self, symbol, interval, limit):
rows = []
for index, close in enumerate([1.0, 1.1, 1.2, 1.3, 1.4, 1.45, 1.5][-limit:]):
setup_curve = [
1.4151,
1.4858,
1.3868,
1.5,
1.4009,
1.5142,
1.4151,
1.5,
1.4292,
1.4858,
1.4434,
1.4717,
1.4505,
1.4575,
1.4547,
1.4604,
1.4575,
1.4632,
1.4599,
1.466,
1.4618,
1.4698,
1.4745,
1.5,
]
for index, close in enumerate(setup_curve[-limit:]):
rows.append([index, close * 0.98, close * 1.01, close * 0.97, close, 100 + index * 10, index + 1, close * 100])
return rows
class OpportunityPatternSpotClient:
def account_info(self):
return {"balances": [{"asset": "USDT", "free": "100", "locked": "0"}]}
def ticker_price(self, symbols=None):
return []
def ticker_stats(self, symbols=None, *, window="1d"):
rows = {
"SETUPUSDT": {
"symbol": "SETUPUSDT",
"lastPrice": "106",
"priceChangePercent": "4",
"quoteVolume": "10000000",
"highPrice": "107",
"lowPrice": "98",
},
"CHASEUSDT": {
"symbol": "CHASEUSDT",
"lastPrice": "150",
"priceChangePercent": "18",
"quoteVolume": "9000000",
"highPrice": "152",
"lowPrice": "120",
},
}
if not symbols:
return list(rows.values())
return [rows[symbol] for symbol in symbols]
def exchange_info(self):
return {
"symbols": [
{"symbol": "SETUPUSDT", "status": "TRADING"},
{"symbol": "CHASEUSDT", "status": "TRADING"},
]
}
def klines(self, symbol, interval, limit):
curves = {
"SETUPUSDT": [
100,
105,
98,
106,
99,
107,
100,
106,
101,
105,
102,
104,
102.5,
103,
102.8,
103.2,
103.0,
103.4,
103.1,
103.6,
103.3,
103.8,
104.2,
106,
],
"CHASEUSDT": [120, 125, 130, 135, 140, 145, 150],
}[symbol]
rows = []
for index, close in enumerate(curves[-limit:]):
rows.append([index, close * 0.98, close * 1.01, close * 0.97, close, 100 + index * 20, index + 1, close * 100])
return rows
class OpportunityServiceTestCase(unittest.TestCase):
def setUp(self):
self.config = {
@@ -177,10 +277,13 @@ class OpportunityServiceTestCase(unittest.TestCase):
def test_scan_is_deterministic(self):
with patch.object(opportunity_service, "audit_event", return_value=None):
payload = opportunity_service.scan_opportunities(
self.config | {"opportunity": self.config["opportunity"] | {"top_n": 2}}, spot_client=FakeSpotClient()
self.config | {"opportunity": self.config["opportunity"] | {"top_n": 2}},
spot_client=OpportunityPatternSpotClient(),
)
self.assertEqual([item["symbol"] for item in payload["recommendations"]], ["SOLUSDT", "BTCUSDT"])
self.assertEqual([item["action"] for item in payload["recommendations"]], ["enter", "enter"])
self.assertEqual([item["symbol"] for item in payload["recommendations"]], ["SETUPUSDT", "CHASEUSDT"])
self.assertEqual([item["action"] for item in payload["recommendations"]], ["trigger", "chase"])
self.assertGreater(payload["recommendations"][0]["metrics"]["setup_score"], 0.6)
self.assertGreater(payload["recommendations"][1]["metrics"]["extension_penalty"], 1.0)
def test_scan_respects_ignore_dust_for_overlap_penalty(self):
client = DustOverlapSpotClient()
@@ -201,7 +304,7 @@ class OpportunityServiceTestCase(unittest.TestCase):
ignored_rec = ignored["recommendations"][0]
included_rec = included["recommendations"][0]
self.assertEqual(ignored_rec["action"], "enter")
self.assertEqual(ignored_rec["action"], "trigger")
self.assertEqual(ignored_rec["metrics"]["position_weight"], 0.0)
self.assertEqual(included_rec["action"], "skip")
self.assertEqual(included_rec["metrics"]["position_weight"], 1.0)