"""External research signal providers for opportunity scoring.""" from __future__ import annotations import json from collections.abc import Callable from math import log10 from typing import Any from urllib.parse import urlencode from urllib.request import Request, urlopen from .market_service import base_asset, normalize_symbol HttpGet = Callable[[str, dict[str, str], float], Any] def _clamp(value: float, low: float = 0.0, high: float = 1.0) -> float: return min(max(value, low), high) def _as_float(value: Any, default: float = 0.0) -> float: try: return float(value) except (TypeError, ValueError): return default def _safe_ratio(numerator: float, denominator: float) -> float: if denominator <= 0: return 0.0 return numerator / denominator def _log_score(value: float, *, floor: float, span: float) -> float: if value <= 0: return 0.0 return _clamp((log10(value) - floor) / span) def _pct_score(value: float, *, low: float, high: float) -> float: if high <= low: return 0.0 return _clamp((value - low) / (high - low)) def _public_http_get(url: str, headers: dict[str, str], timeout: float) -> Any: request = Request(url, headers=headers) with urlopen(request, timeout=timeout) as response: # noqa: S310 - user-configured market data endpoint return json.loads(response.read().decode("utf-8")) def _build_url(base_url: str, path: str, params: dict[str, str]) -> str: return f"{base_url.rstrip('/')}{path}?{urlencode(params)}" def _chunked(items: list[str], size: int) -> list[list[str]]: return [items[index : index + size] for index in range(0, len(items), size)] def _coingecko_market_to_signals(row: dict[str, Any], *, is_trending: bool = False) -> dict[str, float]: market_cap = _as_float(row.get("market_cap")) fdv = _as_float(row.get("fully_diluted_valuation")) volume = _as_float(row.get("total_volume")) rank = _as_float(row.get("market_cap_rank"), 9999.0) circulating = _as_float(row.get("circulating_supply")) total_supply = _as_float(row.get("total_supply")) max_supply = _as_float(row.get("max_supply")) supply_cap = max_supply or total_supply rank_score = _clamp(1.0 - (log10(max(rank, 1.0)) / 4.0)) size_score = _log_score(market_cap, floor=7.0, span=5.0) volume_to_mcap = _safe_ratio(volume, market_cap) liquidity_quality = _clamp(volume_to_mcap / 0.10) fdv_ratio = _safe_ratio(fdv, market_cap) if fdv and market_cap else 1.0 fdv_dilution_risk = _clamp((fdv_ratio - 1.0) / 4.0) supply_unlocked = _clamp(_safe_ratio(circulating, supply_cap)) if supply_cap else max(0.0, 1.0 - fdv_dilution_risk) supply_dilution_risk = 1.0 - supply_unlocked unlock_risk = max(fdv_dilution_risk, supply_dilution_risk * 0.8) pct_7d = _as_float(row.get("price_change_percentage_7d_in_currency")) pct_30d = _as_float(row.get("price_change_percentage_30d_in_currency")) pct_200d = _as_float(row.get("price_change_percentage_200d_in_currency")) medium_momentum = _pct_score(pct_30d, low=-15.0, high=60.0) long_momentum = _pct_score(pct_200d, low=-40.0, high=150.0) trend_catalyst = _pct_score(pct_7d, low=-5.0, high=25.0) trend_bonus = 1.0 if is_trending else 0.0 tokenomics = _clamp(0.65 * supply_unlocked + 0.35 * (1.0 - fdv_dilution_risk)) fundamental = _clamp(0.40 * rank_score + 0.35 * size_score + 0.25 * liquidity_quality) catalyst = _clamp(0.45 * trend_catalyst + 0.40 * medium_momentum + 0.15 * trend_bonus) adoption = _clamp(0.45 * rank_score + 0.35 * liquidity_quality + 0.20 * long_momentum) smart_money = _clamp(0.35 * rank_score + 0.35 * liquidity_quality + 0.30 * (1.0 - unlock_risk)) regulatory_risk = 0.10 if rank <= 100 else 0.20 if rank <= 500 else 0.35 populated_fields = sum( 1 for value in (market_cap, fdv, volume, rank, circulating, supply_cap, pct_7d, pct_30d, pct_200d) if value ) confidence = _clamp(populated_fields / 9.0) return { "fundamental": round(fundamental, 4), "tokenomics": round(tokenomics, 4), "catalyst": round(catalyst, 4), "adoption": round(adoption, 4), "smart_money": round(smart_money, 4), "unlock_risk": round(unlock_risk, 4), "regulatory_risk": round(regulatory_risk, 4), "research_confidence": round(confidence, 4), } def _coingecko_headers(config: dict[str, Any]) -> dict[str, str]: coingecko_config = config.get("coingecko", {}) headers = {"accept": "application/json", "user-agent": "coinhunter/2"} api_key = str(coingecko_config.get("api_key", "")).strip() if api_key: headers["x-cg-demo-api-key"] = api_key return headers def _fetch_coingecko_research( config: dict[str, Any], *, symbols: list[str], quote: str, http_get: HttpGet | None = None, ) -> dict[str, dict[str, float]]: if not symbols: return {} opportunity_config = config.get("opportunity", {}) coingecko_config = config.get("coingecko", {}) base_url = str(coingecko_config.get("base_url", "https://api.coingecko.com/api/v3")) timeout = _as_float(opportunity_config.get("research_timeout_seconds"), 4.0) headers = _coingecko_headers(config) http_get = http_get or _public_http_get base_to_symbol = { base_asset(normalize_symbol(symbol), quote).lower(): normalize_symbol(symbol) for symbol in symbols if normalize_symbol(symbol) } bases = sorted(base_to_symbol) if not bases: return {} trending_ids: set[str] = set() try: trending_url = _build_url(base_url, "/search/trending", {}) trending_payload = http_get(trending_url, headers, timeout) for item in trending_payload.get("coins", []): coin = item.get("item", {}) coin_id = str(coin.get("id", "")).strip() if coin_id: trending_ids.add(coin_id) except Exception: trending_ids = set() research: dict[str, dict[str, float]] = {} for chunk in _chunked(bases, 50): params = { "vs_currency": "usd", "symbols": ",".join(chunk), "include_tokens": "top", "order": "market_cap_desc", "per_page": "250", "page": "1", "sparkline": "false", "price_change_percentage": "7d,30d,200d", } try: markets_url = _build_url(base_url, "/coins/markets", params) rows = http_get(markets_url, headers, timeout) except Exception: continue seen_bases: set[str] = set() for row in rows if isinstance(rows, list) else []: symbol = str(row.get("symbol", "")).lower() if symbol in seen_bases or symbol not in base_to_symbol: continue seen_bases.add(symbol) normalized = base_to_symbol[symbol] research[normalized] = _coingecko_market_to_signals( row, is_trending=str(row.get("id", "")) in trending_ids, ) return research def get_external_research( config: dict[str, Any], *, symbols: list[str], quote: str, http_get: HttpGet | None = None, ) -> dict[str, dict[str, float]]: """Fetch automated research signals for symbols. Returns an empty map when disabled or when the configured provider is unavailable. Opportunity scans should continue rather than fail because a research endpoint timed out. """ opportunity_config = config.get("opportunity", {}) if not bool(opportunity_config.get("auto_research", True)): return {} provider = str(opportunity_config.get("research_provider", "coingecko")).strip().lower() if provider in {"", "off", "none", "disabled"}: return {} if provider != "coingecko": return {} return _fetch_coingecko_research(config, symbols=symbols, quote=quote, http_get=http_get)