Files
coinhunter-cli/src/coinhunter/services/signal_service.py

293 lines
11 KiB
Python

"""Market signal scoring primitives and domain-specific models."""
from __future__ import annotations
from math import log10
from statistics import mean
from typing import Any
def _clamp(value: float, low: float, high: float) -> float:
return max(low, min(value, high))
def _safe_pct(new: float, old: float) -> float:
if old == 0:
return 0.0
return (new - old) / old
def _range_pct(values: list[float], denominator: float) -> float:
if not values or denominator == 0:
return 0.0
return (max(values) - min(values)) / denominator
_DEFAULT_OPPORTUNITY_MODEL_WEIGHTS = {
"trend": 0.1406,
"compression": 0.1688,
"breakout_proximity": 0.0875,
"higher_lows": 0.15,
"range_position": 0.45,
"fresh_breakout": 0.2,
"volume": 0.525,
"momentum": 0.1562,
"setup": 1.875,
"trigger": 1.875,
"liquidity": 0.3,
"volatility_penalty": 0.8,
"extension_penalty": 0.45,
}
def get_opportunity_model_weights(opportunity_config: dict[str, Any]) -> dict[str, float]:
configured = opportunity_config.get("model_weights", {})
return {
key: float(configured.get(key, default))
for key, default in _DEFAULT_OPPORTUNITY_MODEL_WEIGHTS.items()
}
def _weighted_quality(values: dict[str, float], weights: dict[str, float]) -> float:
weighted_sum = 0.0
total_weight = 0.0
for key, value in values.items():
weight = max(float(weights.get(key, 0.0)), 0.0)
if weight == 0:
continue
weighted_sum += weight * value
total_weight += weight
if total_weight == 0:
return 0.0
return _clamp(weighted_sum / total_weight, -1.0, 1.0)
def get_signal_weights(config: dict[str, Any]) -> dict[str, float]:
signal_config = config.get("signal", {})
return {
"trend": float(signal_config.get("trend", 1.0)),
"momentum": float(signal_config.get("momentum", 1.0)),
"breakout": float(signal_config.get("breakout", 0.8)),
"volume": float(signal_config.get("volume", 0.7)),
"volatility_penalty": float(signal_config.get("volatility_penalty", 0.5)),
}
def get_signal_interval(config: dict[str, Any]) -> str:
signal_config = config.get("signal", {})
if signal_config.get("lookback_interval"):
return str(signal_config["lookback_interval"])
return "1h"
def score_market_signal(
closes: list[float],
volumes: list[float],
ticker: dict[str, Any],
weights: dict[str, float],
) -> tuple[float, dict[str, float]]:
return score_portfolio_signal(closes, volumes, ticker, weights)
def score_portfolio_signal(
closes: list[float],
volumes: list[float],
ticker: dict[str, Any],
weights: dict[str, float],
) -> tuple[float, dict[str, float]]:
if len(closes) < 2 or not volumes:
return 0.0, {
"trend": 0.0,
"momentum": 0.0,
"breakout": 0.0,
"volume_confirmation": 1.0,
"volatility": 0.0,
}
current = closes[-1]
sma_short = mean(closes[-5:]) if len(closes) >= 5 else current
sma_long = mean(closes[-20:]) if len(closes) >= 20 else mean(closes)
trend = 1.0 if current >= sma_short >= sma_long else -1.0 if current < sma_short < sma_long else 0.0
momentum = (
_safe_pct(closes[-1], closes[-2]) * 0.5
+ (_safe_pct(closes[-1], closes[-5]) * 0.3 if len(closes) >= 5 else 0.0)
+ float(ticker.get("price_change_pct", 0.0)) / 100.0 * 0.2
)
recent_high = max(closes[-20:]) if len(closes) >= 20 else max(closes)
breakout = 1.0 - max((recent_high - current) / recent_high, 0.0)
avg_volume = mean(volumes[:-1]) if len(volumes) > 1 else volumes[-1]
volume_confirmation = volumes[-1] / avg_volume if avg_volume else 1.0
volume_score = min(max(volume_confirmation - 1.0, -1.0), 2.0)
volatility = (max(closes[-10:]) - min(closes[-10:])) / current if len(closes) >= 10 and current else 0.0
score = (
weights.get("trend", 1.0) * trend
+ weights.get("momentum", 1.0) * momentum
+ weights.get("breakout", 0.8) * breakout
+ weights.get("volume", 0.7) * volume_score
- weights.get("volatility_penalty", 0.5) * volatility
)
metrics = {
"trend": round(trend, 4),
"momentum": round(momentum, 4),
"breakout": round(breakout, 4),
"volume_confirmation": round(volume_confirmation, 4),
"volatility": round(volatility, 4),
}
return score, metrics
def score_opportunity_signal(
closes: list[float],
volumes: list[float],
ticker: dict[str, Any],
opportunity_config: dict[str, Any],
) -> tuple[float, dict[str, float]]:
model_weights = get_opportunity_model_weights(opportunity_config)
if len(closes) < 6 or len(volumes) < 2:
return 0.0, {
"setup_score": 0.0,
"trigger_score": 0.0,
"liquidity_score": 0.0,
"edge_score": 0.0,
"setup_quality": 0.0,
"trigger_quality": 0.0,
"liquidity_quality": 0.0,
"risk_quality": 0.0,
"extension_penalty": 0.0,
"breakout_pct": 0.0,
"recent_runup": 0.0,
"volume_confirmation": 1.0,
"volatility": 0.0,
}
current = closes[-1]
sma_short = mean(closes[-5:])
sma_long = mean(closes[-20:]) if len(closes) >= 20 else mean(closes)
if current >= sma_short >= sma_long:
trend_quality = 1.0
elif current < sma_short < sma_long:
trend_quality = -1.0
else:
trend_quality = 0.0
prior_closes = closes[:-1]
prev_high = max(prior_closes[-20:]) if prior_closes else current
recent_low = min(closes[-20:])
range_width = prev_high - recent_low
range_position = _clamp((current - recent_low) / range_width, 0.0, 1.2) if range_width else 0.0
range_position_quality = 2.0 * _clamp(1.0 - abs(range_position - 0.62) / 0.62, 0.0, 1.0) - 1.0
breakout_pct = _safe_pct(current, prev_high)
recent_range = _range_pct(closes[-6:], current)
prior_window = closes[-20:-6] if len(closes) >= 20 else closes[:-6]
prior_range = _range_pct(prior_window, current) if prior_window else recent_range
compression = _clamp(1.0 - (recent_range / prior_range), -1.0, 1.0) if prior_range else 0.0
recent_low_window = min(closes[-5:])
prior_low_window = min(closes[-10:-5]) if len(closes) >= 10 else min(closes[:-5])
higher_lows = 1.0 if recent_low_window > prior_low_window else -1.0
breakout_proximity = _clamp(1.0 - abs(breakout_pct) / 0.03, 0.0, 1.0)
breakout_proximity_quality = 2.0 * breakout_proximity - 1.0
setup_quality = _weighted_quality(
{
"trend": trend_quality,
"compression": compression,
"breakout_proximity": breakout_proximity_quality,
"higher_lows": higher_lows,
"range_position": range_position_quality,
},
model_weights,
)
setup_score = _clamp((setup_quality + 1.0) / 2.0, 0.0, 1.0)
avg_volume = mean(volumes[:-1])
volume_confirmation = volumes[-1] / avg_volume if avg_volume else 1.0
volume_score = _clamp((volume_confirmation - 1.0) / 1.5, -1.0, 1.0)
momentum_3 = _safe_pct(closes[-1], closes[-4])
if momentum_3 <= 0:
controlled_momentum = _clamp(momentum_3 / 0.05, -1.0, 0.0)
elif momentum_3 <= 0.05:
controlled_momentum = momentum_3 / 0.05
elif momentum_3 <= 0.12:
controlled_momentum = 1.0 - ((momentum_3 - 0.05) / 0.07) * 0.5
else:
controlled_momentum = -0.2
fresh_breakout = _clamp(1.0 - abs(breakout_pct) / 0.025, 0.0, 1.0)
fresh_breakout_quality = 2.0 * fresh_breakout - 1.0
trigger_quality = _weighted_quality(
{
"fresh_breakout": fresh_breakout_quality,
"volume": volume_score,
"momentum": controlled_momentum,
},
model_weights,
)
trigger_score = _clamp((trigger_quality + 1.0) / 2.0, 0.0, 1.0)
extension_from_short = _safe_pct(current, sma_short)
recent_runup = _safe_pct(current, closes[-6])
extension_penalty = (
_clamp((extension_from_short - 0.025) / 0.075, 0.0, 1.0)
+ _clamp((recent_runup - 0.08) / 0.12, 0.0, 1.0)
+ _clamp((float(ticker.get("price_change_pct", 0.0)) / 100.0 - 0.12) / 0.18, 0.0, 1.0)
)
volatility = _range_pct(closes[-10:], current)
min_quote_volume = float(opportunity_config.get("min_quote_volume", 0.0))
quote_volume = float(ticker.get("quote_volume") or ticker.get("quoteVolume") or 0.0)
if min_quote_volume > 0 and quote_volume > 0:
liquidity_score = _clamp(log10(max(quote_volume / min_quote_volume, 1.0)) / 2.0, 0.0, 1.0)
else:
liquidity_score = 1.0
liquidity_quality = 2.0 * liquidity_score - 1.0
volatility_quality = 1.0 - 2.0 * _clamp(volatility / 0.12, 0.0, 1.0)
extension_quality = 1.0 - 2.0 * _clamp(extension_penalty / 2.0, 0.0, 1.0)
risk_quality = _weighted_quality(
{
"volatility_penalty": volatility_quality,
"extension_penalty": extension_quality,
},
model_weights,
)
edge_score = _weighted_quality(
{
"setup": setup_quality,
"trigger": trigger_quality,
"liquidity": liquidity_quality,
"trend": trend_quality,
"range_position": range_position_quality,
"volatility_penalty": volatility_quality,
"extension_penalty": extension_quality,
},
model_weights,
)
score = 1.0 + edge_score
metrics = {
"setup_score": round(setup_score, 4),
"trigger_score": round(trigger_score, 4),
"liquidity_score": round(liquidity_score, 4),
"edge_score": round(edge_score, 4),
"setup_quality": round(setup_quality, 4),
"trigger_quality": round(trigger_quality, 4),
"liquidity_quality": round(liquidity_quality, 4),
"risk_quality": round(risk_quality, 4),
"trend_quality": round(trend_quality, 4),
"range_position_quality": round(range_position_quality, 4),
"breakout_proximity_quality": round(breakout_proximity_quality, 4),
"volume_quality": round(volume_score, 4),
"momentum_quality": round(controlled_momentum, 4),
"extension_quality": round(extension_quality, 4),
"volatility_quality": round(volatility_quality, 4),
"extension_penalty": round(extension_penalty, 4),
"compression": round(compression, 4),
"range_position": round(range_position, 4),
"breakout_pct": round(breakout_pct, 4),
"recent_runup": round(recent_runup, 4),
"volume_confirmation": round(volume_confirmation, 4),
"volatility": round(volatility, 4),
"sma_short_distance": round(extension_from_short, 4),
"sma_long_distance": round(_safe_pct(current, sma_long), 4),
}
return score, metrics