feat: split portfolio and opportunity decision models
This commit is contained in:
@@ -1,14 +1,14 @@
|
||||
"""Opportunity analysis services."""
|
||||
"""Opportunity scanning services."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import asdict, dataclass
|
||||
from statistics import mean
|
||||
from typing import Any
|
||||
|
||||
from ..audit import audit_event
|
||||
from .account_service import get_positions
|
||||
from .market_service import base_asset, get_scan_universe, normalize_symbol
|
||||
from .signal_service import get_signal_interval, get_signal_weights, score_market_signal
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -20,132 +20,25 @@ class OpportunityRecommendation:
|
||||
metrics: dict[str, float]
|
||||
|
||||
|
||||
def _safe_pct(new: float, old: float) -> float:
|
||||
if old == 0:
|
||||
return 0.0
|
||||
return (new - old) / old
|
||||
|
||||
|
||||
def _score_candidate(
|
||||
closes: list[float], volumes: list[float], ticker: dict[str, Any], weights: dict[str, float], concentration: float
|
||||
) -> tuple[float, dict[str, float]]:
|
||||
if len(closes) < 2 or not volumes:
|
||||
return 0.0, {
|
||||
"trend": 0.0,
|
||||
"momentum": 0.0,
|
||||
"breakout": 0.0,
|
||||
"volume_confirmation": 1.0,
|
||||
"volatility": 0.0,
|
||||
"concentration": round(concentration, 4),
|
||||
}
|
||||
|
||||
current = closes[-1]
|
||||
sma_short = mean(closes[-5:]) if len(closes) >= 5 else current
|
||||
sma_long = mean(closes[-20:]) if len(closes) >= 20 else mean(closes)
|
||||
trend = 1.0 if current >= sma_short >= sma_long else -1.0 if current < sma_short < sma_long else 0.0
|
||||
momentum = (
|
||||
_safe_pct(closes[-1], closes[-2]) * 0.5
|
||||
+ (_safe_pct(closes[-1], closes[-5]) * 0.3 if len(closes) >= 5 else 0.0)
|
||||
+ float(ticker.get("price_change_pct", 0.0)) / 100.0 * 0.2
|
||||
)
|
||||
recent_high = max(closes[-20:]) if len(closes) >= 20 else max(closes)
|
||||
breakout = 1.0 - max((recent_high - current) / recent_high, 0.0)
|
||||
avg_volume = mean(volumes[:-1]) if len(volumes) > 1 else volumes[-1]
|
||||
volume_confirmation = volumes[-1] / avg_volume if avg_volume else 1.0
|
||||
volume_score = min(max(volume_confirmation - 1.0, -1.0), 2.0)
|
||||
volatility = (max(closes[-10:]) - min(closes[-10:])) / current if len(closes) >= 10 and current else 0.0
|
||||
|
||||
score = (
|
||||
weights.get("trend", 1.0) * trend
|
||||
+ weights.get("momentum", 1.0) * momentum
|
||||
+ weights.get("breakout", 0.8) * breakout
|
||||
+ weights.get("volume", 0.7) * volume_score
|
||||
- weights.get("volatility_penalty", 0.5) * volatility
|
||||
- weights.get("position_concentration_penalty", 0.6) * concentration
|
||||
)
|
||||
metrics = {
|
||||
"trend": round(trend, 4),
|
||||
"momentum": round(momentum, 4),
|
||||
"breakout": round(breakout, 4),
|
||||
"volume_confirmation": round(volume_confirmation, 4),
|
||||
"volatility": round(volatility, 4),
|
||||
"concentration": round(concentration, 4),
|
||||
def _opportunity_thresholds(config: dict[str, Any]) -> dict[str, float]:
|
||||
opportunity_config = config.get("opportunity", {})
|
||||
return {
|
||||
"entry_threshold": float(opportunity_config.get("entry_threshold", 1.5)),
|
||||
"watch_threshold": float(opportunity_config.get("watch_threshold", 0.6)),
|
||||
"overlap_penalty": float(opportunity_config.get("overlap_penalty", 0.6)),
|
||||
}
|
||||
return score, metrics
|
||||
|
||||
|
||||
def _action_for(score: float, concentration: float) -> tuple[str, list[str]]:
|
||||
def _action_for_opportunity(score: float, thresholds: dict[str, float]) -> tuple[str, list[str]]:
|
||||
reasons: list[str] = []
|
||||
if concentration >= 0.5 and score < 0.4:
|
||||
reasons.append("position concentration is high")
|
||||
return "trim", reasons
|
||||
if score >= 1.5:
|
||||
reasons.append("trend, momentum, and breakout are aligned")
|
||||
return "add", reasons
|
||||
if score >= 0.6:
|
||||
reasons.append("trend remains constructive")
|
||||
return "hold", reasons
|
||||
if score <= -0.2:
|
||||
reasons.append("momentum and structure have weakened")
|
||||
return "exit", reasons
|
||||
reasons.append("signal is mixed and needs confirmation")
|
||||
return "observe", reasons
|
||||
|
||||
|
||||
def analyze_portfolio(config: dict[str, Any], *, spot_client: Any) -> dict[str, Any]:
|
||||
quote = str(config.get("market", {}).get("default_quote", "USDT")).upper()
|
||||
weights = config.get("opportunity", {}).get("weights", {})
|
||||
positions = get_positions(config, spot_client=spot_client)["positions"]
|
||||
positions = [item for item in positions if item["symbol"] != quote]
|
||||
total_notional = sum(item["notional_usdt"] for item in positions) or 1.0
|
||||
recommendations = []
|
||||
for position in positions:
|
||||
symbol = normalize_symbol(position["symbol"])
|
||||
klines = spot_client.klines(symbol=symbol, interval="1h", limit=24)
|
||||
closes = [float(item[4]) for item in klines]
|
||||
volumes = [float(item[5]) for item in klines]
|
||||
tickers = spot_client.ticker_stats([symbol], window="1d")
|
||||
ticker = tickers[0] if tickers else {"priceChangePercent": "0"}
|
||||
concentration = position["notional_usdt"] / total_notional
|
||||
score, metrics = _score_candidate(
|
||||
closes,
|
||||
volumes,
|
||||
{
|
||||
"price_change_pct": float(ticker.get("priceChangePercent") or 0.0),
|
||||
},
|
||||
weights,
|
||||
concentration,
|
||||
)
|
||||
action, reasons = _action_for(score, concentration)
|
||||
recommendations.append(
|
||||
asdict(
|
||||
OpportunityRecommendation(
|
||||
symbol=symbol,
|
||||
action=action,
|
||||
score=round(score, 4),
|
||||
reasons=reasons,
|
||||
metrics=metrics,
|
||||
)
|
||||
)
|
||||
)
|
||||
payload = {"recommendations": sorted(recommendations, key=lambda item: item["score"], reverse=True)}
|
||||
audit_event(
|
||||
"opportunity_portfolio_generated",
|
||||
{
|
||||
"market_type": "spot",
|
||||
"symbol": None,
|
||||
"side": None,
|
||||
"qty": None,
|
||||
"quote_amount": None,
|
||||
"order_type": None,
|
||||
"dry_run": True,
|
||||
"request_payload": {"mode": "portfolio"},
|
||||
"response_payload": payload,
|
||||
"status": "generated",
|
||||
"error": None,
|
||||
},
|
||||
)
|
||||
return payload
|
||||
if score >= thresholds["entry_threshold"]:
|
||||
reasons.append("trend, momentum, and breakout are aligned for a fresh entry")
|
||||
return "enter", reasons
|
||||
if score >= thresholds["watch_threshold"]:
|
||||
reasons.append("market structure is constructive but still needs confirmation")
|
||||
return "watch", reasons
|
||||
reasons.append("edge is too weak for a new entry")
|
||||
return "skip", reasons
|
||||
|
||||
|
||||
def scan_opportunities(
|
||||
@@ -155,7 +48,9 @@ def scan_opportunities(
|
||||
symbols: list[str] | None = None,
|
||||
) -> dict[str, Any]:
|
||||
opportunity_config = config.get("opportunity", {})
|
||||
weights = opportunity_config.get("weights", {})
|
||||
signal_weights = get_signal_weights(config)
|
||||
interval = get_signal_interval(config)
|
||||
thresholds = _opportunity_thresholds(config)
|
||||
scan_limit = int(opportunity_config.get("scan_limit", 50))
|
||||
top_n = int(opportunity_config.get("top_n", 10))
|
||||
quote = str(config.get("market", {}).get("default_quote", "USDT")).upper()
|
||||
@@ -167,14 +62,19 @@ def scan_opportunities(
|
||||
recommendations = []
|
||||
for ticker in universe:
|
||||
symbol = normalize_symbol(ticker["symbol"])
|
||||
klines = spot_client.klines(symbol=symbol, interval="1h", limit=24)
|
||||
klines = spot_client.klines(symbol=symbol, interval=interval, limit=24)
|
||||
closes = [float(item[4]) for item in klines]
|
||||
volumes = [float(item[5]) for item in klines]
|
||||
concentration = concentration_map.get(symbol, 0.0) / total_held
|
||||
score, metrics = _score_candidate(closes, volumes, ticker, weights, concentration)
|
||||
action, reasons = _action_for(score, concentration)
|
||||
signal_score, metrics = score_market_signal(closes, volumes, ticker, signal_weights)
|
||||
score = signal_score - thresholds["overlap_penalty"] * concentration
|
||||
action, reasons = _action_for_opportunity(score, thresholds)
|
||||
metrics["signal_score"] = round(signal_score, 4)
|
||||
metrics["position_weight"] = round(concentration, 4)
|
||||
if symbol.endswith(quote):
|
||||
reasons.append(f"base asset {base_asset(symbol, quote)} passed liquidity and tradability filters")
|
||||
if concentration > 0:
|
||||
reasons.append("symbol is already held, so the opportunity score is discounted for overlap")
|
||||
recommendations.append(
|
||||
asdict(
|
||||
OpportunityRecommendation(
|
||||
|
||||
109
src/coinhunter/services/portfolio_service.py
Normal file
109
src/coinhunter/services/portfolio_service.py
Normal file
@@ -0,0 +1,109 @@
|
||||
"""Portfolio analysis and position management signals."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import asdict, dataclass
|
||||
from typing import Any
|
||||
|
||||
from ..audit import audit_event
|
||||
from .account_service import get_positions
|
||||
from .market_service import normalize_symbol
|
||||
from .signal_service import get_signal_interval, get_signal_weights, score_market_signal
|
||||
|
||||
|
||||
@dataclass
|
||||
class PortfolioRecommendation:
|
||||
symbol: str
|
||||
action: str
|
||||
score: float
|
||||
reasons: list[str]
|
||||
metrics: dict[str, float]
|
||||
|
||||
|
||||
def _portfolio_thresholds(config: dict[str, Any]) -> dict[str, float]:
|
||||
portfolio_config = config.get("portfolio", {})
|
||||
return {
|
||||
"add_threshold": float(portfolio_config.get("add_threshold", 1.5)),
|
||||
"hold_threshold": float(portfolio_config.get("hold_threshold", 0.6)),
|
||||
"trim_threshold": float(portfolio_config.get("trim_threshold", 0.2)),
|
||||
"exit_threshold": float(portfolio_config.get("exit_threshold", -0.2)),
|
||||
"max_position_weight": float(portfolio_config.get("max_position_weight", 0.6)),
|
||||
}
|
||||
|
||||
|
||||
def _action_for_position(score: float, concentration: float, thresholds: dict[str, float]) -> tuple[str, list[str]]:
|
||||
reasons: list[str] = []
|
||||
max_weight = thresholds["max_position_weight"]
|
||||
if concentration >= max_weight and score < thresholds["hold_threshold"]:
|
||||
reasons.append("position weight is above the portfolio risk budget")
|
||||
return "trim", reasons
|
||||
if score >= thresholds["add_threshold"] and concentration < max_weight:
|
||||
reasons.append("market signal is strong and position still has room")
|
||||
return "add", reasons
|
||||
if score >= thresholds["hold_threshold"]:
|
||||
reasons.append("market structure remains supportive for holding")
|
||||
return "hold", reasons
|
||||
if score <= thresholds["exit_threshold"]:
|
||||
reasons.append("market signal has weakened enough to justify an exit review")
|
||||
return "exit", reasons
|
||||
if score <= thresholds["trim_threshold"]:
|
||||
reasons.append("edge has faded and the position should be reduced")
|
||||
return "trim", reasons
|
||||
reasons.append("signal is mixed and the position needs review")
|
||||
return "review", reasons
|
||||
|
||||
|
||||
def analyze_portfolio(config: dict[str, Any], *, spot_client: Any) -> dict[str, Any]:
|
||||
quote = str(config.get("market", {}).get("default_quote", "USDT")).upper()
|
||||
signal_weights = get_signal_weights(config)
|
||||
interval = get_signal_interval(config)
|
||||
thresholds = _portfolio_thresholds(config)
|
||||
positions = get_positions(config, spot_client=spot_client)["positions"]
|
||||
positions = [item for item in positions if item["symbol"] != quote]
|
||||
total_notional = sum(item["notional_usdt"] for item in positions) or 1.0
|
||||
recommendations = []
|
||||
for position in positions:
|
||||
symbol = normalize_symbol(position["symbol"])
|
||||
klines = spot_client.klines(symbol=symbol, interval=interval, limit=24)
|
||||
closes = [float(item[4]) for item in klines]
|
||||
volumes = [float(item[5]) for item in klines]
|
||||
tickers = spot_client.ticker_stats([symbol], window="1d")
|
||||
ticker = tickers[0] if tickers else {"priceChangePercent": "0"}
|
||||
concentration = position["notional_usdt"] / total_notional
|
||||
score, metrics = score_market_signal(
|
||||
closes,
|
||||
volumes,
|
||||
{"price_change_pct": float(ticker.get("priceChangePercent") or 0.0)},
|
||||
signal_weights,
|
||||
)
|
||||
action, reasons = _action_for_position(score, concentration, thresholds)
|
||||
metrics["position_weight"] = round(concentration, 4)
|
||||
recommendations.append(
|
||||
asdict(
|
||||
PortfolioRecommendation(
|
||||
symbol=symbol,
|
||||
action=action,
|
||||
score=round(score, 4),
|
||||
reasons=reasons,
|
||||
metrics=metrics,
|
||||
)
|
||||
)
|
||||
)
|
||||
payload = {"recommendations": sorted(recommendations, key=lambda item: item["score"], reverse=True)}
|
||||
audit_event(
|
||||
"opportunity_portfolio_generated",
|
||||
{
|
||||
"market_type": "spot",
|
||||
"symbol": None,
|
||||
"side": None,
|
||||
"qty": None,
|
||||
"quote_amount": None,
|
||||
"order_type": None,
|
||||
"dry_run": True,
|
||||
"request_payload": {"mode": "portfolio"},
|
||||
"response_payload": payload,
|
||||
"status": "generated",
|
||||
"error": None,
|
||||
},
|
||||
)
|
||||
return payload
|
||||
78
src/coinhunter/services/signal_service.py
Normal file
78
src/coinhunter/services/signal_service.py
Normal file
@@ -0,0 +1,78 @@
|
||||
"""Shared market signal scoring."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from statistics import mean
|
||||
from typing import Any
|
||||
|
||||
|
||||
def _safe_pct(new: float, old: float) -> float:
|
||||
if old == 0:
|
||||
return 0.0
|
||||
return (new - old) / old
|
||||
|
||||
|
||||
def get_signal_weights(config: dict[str, Any]) -> dict[str, float]:
|
||||
signal_config = config.get("signal", {})
|
||||
return {
|
||||
"trend": float(signal_config.get("trend", 1.0)),
|
||||
"momentum": float(signal_config.get("momentum", 1.0)),
|
||||
"breakout": float(signal_config.get("breakout", 0.8)),
|
||||
"volume": float(signal_config.get("volume", 0.7)),
|
||||
"volatility_penalty": float(signal_config.get("volatility_penalty", 0.5)),
|
||||
}
|
||||
|
||||
|
||||
def get_signal_interval(config: dict[str, Any]) -> str:
|
||||
signal_config = config.get("signal", {})
|
||||
if signal_config.get("lookback_interval"):
|
||||
return str(signal_config["lookback_interval"])
|
||||
return "1h"
|
||||
|
||||
|
||||
def score_market_signal(
|
||||
closes: list[float],
|
||||
volumes: list[float],
|
||||
ticker: dict[str, Any],
|
||||
weights: dict[str, float],
|
||||
) -> tuple[float, dict[str, float]]:
|
||||
if len(closes) < 2 or not volumes:
|
||||
return 0.0, {
|
||||
"trend": 0.0,
|
||||
"momentum": 0.0,
|
||||
"breakout": 0.0,
|
||||
"volume_confirmation": 1.0,
|
||||
"volatility": 0.0,
|
||||
}
|
||||
|
||||
current = closes[-1]
|
||||
sma_short = mean(closes[-5:]) if len(closes) >= 5 else current
|
||||
sma_long = mean(closes[-20:]) if len(closes) >= 20 else mean(closes)
|
||||
trend = 1.0 if current >= sma_short >= sma_long else -1.0 if current < sma_short < sma_long else 0.0
|
||||
momentum = (
|
||||
_safe_pct(closes[-1], closes[-2]) * 0.5
|
||||
+ (_safe_pct(closes[-1], closes[-5]) * 0.3 if len(closes) >= 5 else 0.0)
|
||||
+ float(ticker.get("price_change_pct", 0.0)) / 100.0 * 0.2
|
||||
)
|
||||
recent_high = max(closes[-20:]) if len(closes) >= 20 else max(closes)
|
||||
breakout = 1.0 - max((recent_high - current) / recent_high, 0.0)
|
||||
avg_volume = mean(volumes[:-1]) if len(volumes) > 1 else volumes[-1]
|
||||
volume_confirmation = volumes[-1] / avg_volume if avg_volume else 1.0
|
||||
volume_score = min(max(volume_confirmation - 1.0, -1.0), 2.0)
|
||||
volatility = (max(closes[-10:]) - min(closes[-10:])) / current if len(closes) >= 10 and current else 0.0
|
||||
|
||||
score = (
|
||||
weights.get("trend", 1.0) * trend
|
||||
+ weights.get("momentum", 1.0) * momentum
|
||||
+ weights.get("breakout", 0.8) * breakout
|
||||
+ weights.get("volume", 0.7) * volume_score
|
||||
- weights.get("volatility_penalty", 0.5) * volatility
|
||||
)
|
||||
metrics = {
|
||||
"trend": round(trend, 4),
|
||||
"momentum": round(momentum, 4),
|
||||
"breakout": round(breakout, 4),
|
||||
"volume_confirmation": round(volume_confirmation, 4),
|
||||
"volatility": round(volatility, 4),
|
||||
}
|
||||
return score, metrics
|
||||
Reference in New Issue
Block a user