From 436bef48146de854b914d5a3e6d4ce753a00b13e Mon Sep 17 00:00:00 2001 From: Carlos Ouyang Date: Tue, 21 Apr 2026 19:41:48 +0800 Subject: [PATCH] Add opportunity dataset collection --- pyproject.toml | 1 + src/coinhunter/cli.py | 110 +++++- src/coinhunter/config.py | 51 ++- .../services/opportunity_dataset_service.py | 353 ++++++++++++++++++ .../services/opportunity_service.py | 319 +++++++++++++++- src/coinhunter/services/portfolio_service.py | 6 +- src/coinhunter/services/research_service.py | 214 +++++++++++ tests/test_cli.py | 29 ++ tests/test_opportunity_dataset_service.py | 76 ++++ tests/test_opportunity_service.py | 168 ++++++++- 10 files changed, 1295 insertions(+), 32 deletions(-) create mode 100644 src/coinhunter/services/opportunity_dataset_service.py create mode 100644 src/coinhunter/services/research_service.py create mode 100644 tests/test_opportunity_dataset_service.py diff --git a/pyproject.toml b/pyproject.toml index e51cd4c..f61f160 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,6 +21,7 @@ dev = [ "pytest>=8.0", "ruff>=0.5.0", "mypy>=1.10.0", + "types-requests>=2.31.0", ] [project.scripts] diff --git a/src/coinhunter/cli.py b/src/coinhunter/cli.py index d7b3c7c..7bc2432 100644 --- a/src/coinhunter/cli.py +++ b/src/coinhunter/cli.py @@ -27,6 +27,7 @@ from .runtime import ( from .services import ( account_service, market_service, + opportunity_dataset_service, opportunity_service, portfolio_service, trade_service, @@ -454,6 +455,46 @@ Fields: extension_penalty – overextension/chase risk from run-up and MA distance opportunity_score – raw opportunity score before overlap discount position_weight – current portfolio overlap in the same symbol +""", + }, + "opportunity/dataset": { + "tui": """\ +TUI Output: + DATASET COLLECTED + Path: ~/.coinhunter/datasets/opportunity_dataset_20260421T120000Z.json + Symbols: BTCUSDT, ETHUSDT + Window: reference=48.0d simulate=7.0d run=7.0d + +JSON Output: + { + "path": "~/.coinhunter/datasets/opportunity_dataset_20260421T120000Z.json", + "symbols": ["BTCUSDT", "ETHUSDT"], + "counts": {"BTCUSDT": {"1h": 1488}}, + "plan": {"reference_days": 48.0, "simulate_days": 7.0, "run_days": 7.0, "total_days": 62.0}, + "external_history": {"provider": "coingecko", "status": "available"} + } +Fields: + path – JSON dataset file written locally + symbols – symbols included in the dataset + counts – kline row counts by symbol and interval + plan – reference/simulation/run windows used for collection + external_history – external provider historical capability probe result +""", + "json": """\ +JSON Output: + { + "path": "~/.coinhunter/datasets/opportunity_dataset_20260421T120000Z.json", + "symbols": ["BTCUSDT", "ETHUSDT"], + "counts": {"BTCUSDT": {"1h": 1488}}, + "plan": {"reference_days": 48.0, "simulate_days": 7.0, "run_days": 7.0, "total_days": 62.0}, + "external_history": {"provider": "coingecko", "status": "available"} + } +Fields: + path – JSON dataset file written locally + symbols – symbols included in the dataset + counts – kline row counts by symbol and interval + plan – reference/simulation/run windows used for collection + external_history – external provider historical capability probe result """, }, "upgrade": { @@ -792,11 +833,27 @@ def build_parser() -> argparse.ArgumentParser: _add_global_flags(portfolio_parser) opportunity_parser = subparsers.add_parser( - "opportunity", aliases=["o"], help="Scan market for opportunities", + "opportunity", aliases=["opp", "o"], help="Scan market for opportunities", description="Scan the market for trading opportunities and return the top-N candidates with signals.", ) opportunity_parser.add_argument("-s", "--symbols", nargs="*", metavar="SYM", help="Restrict scan to specific symbols") _add_global_flags(opportunity_parser) + opportunity_subparsers = opportunity_parser.add_subparsers(dest="opportunity_command") + scan_parser = opportunity_subparsers.add_parser( + "scan", help="Scan market for opportunities", + description="Scan the market for trading opportunities and return the top-N candidates with signals.", + ) + scan_parser.add_argument("-s", "--symbols", nargs="*", metavar="SYM", help="Restrict scan to specific symbols") + _add_global_flags(scan_parser) + dataset_parser = opportunity_subparsers.add_parser( + "dataset", aliases=["ds"], help="Collect historical opportunity evaluation dataset", + description="Collect point-in-time market data for opportunity simulation and evaluation.", + ) + dataset_parser.add_argument("-s", "--symbols", nargs="*", metavar="SYM", help="Restrict dataset to symbols") + dataset_parser.add_argument("--simulate-days", type=float, help="Forward simulation/evaluation window in days") + dataset_parser.add_argument("--run-days", type=float, help="Continuous scan simulation window in days") + dataset_parser.add_argument("-o", "--output", help="Output dataset JSON path") + _add_global_flags(dataset_parser) upgrade_parser = subparsers.add_parser( "upgrade", help="Upgrade coinhunter to the latest version", @@ -833,6 +890,7 @@ _CANONICAL_COMMANDS = { "m": "market", "pf": "portfolio", "p": "portfolio", + "opp": "opportunity", "o": "opportunity", "cfg": "config", "c": "config", @@ -842,9 +900,10 @@ _CANONICAL_SUBCOMMANDS = { "tk": "tickers", "t": "tickers", "k": "klines", + "ds": "dataset", } -_COMMANDS_WITH_SUBCOMMANDS = {"market", "config"} +_COMMANDS_WITH_SUBCOMMANDS = {"market", "config", "opportunity"} def _get_doc_key(argv: list[str]) -> str | None: @@ -855,7 +914,9 @@ def _get_doc_key(argv: list[str]) -> str | None: cmd = _CANONICAL_COMMANDS.get(tokens[0], tokens[0]) if cmd in _COMMANDS_WITH_SUBCOMMANDS and len(tokens) > 1: sub = _CANONICAL_SUBCOMMANDS.get(tokens[1], tokens[1]) - return f"{cmd}/{sub}" + sub_key = f"{cmd}/{sub}" + if sub_key in COMMAND_DOCS: + return sub_key return cmd @@ -907,7 +968,7 @@ def main(argv: list[str] | None = None) -> int: # Normalize aliases to canonical command names if args.command: args.command = _CANONICAL_COMMANDS.get(args.command, args.command) - for attr in ("account_command", "market_command", "config_command"): + for attr in ("account_command", "market_command", "config_command", "opportunity_command"): val = getattr(args, attr, None) if val: setattr(args, attr, _CANONICAL_SUBCOMMANDS.get(val, val)) @@ -999,6 +1060,21 @@ def main(argv: list[str] | None = None) -> int: print(shtab.complete(parser, shell=args.shell, preamble="")) return 0 + if args.command == "upgrade": + with with_spinner("Upgrading coinhunter...", enabled=not args.agent): + result = self_upgrade() + print_output(result, agent=args.agent) + return 0 + + if args.command == "catlog": + with with_spinner("Reading audit logs...", enabled=not args.agent): + entries = read_audit_log(limit=args.limit, offset=args.offset, dry_run=args.dry_run) + print_output( + {"entries": entries, "limit": args.limit, "offset": args.offset, "total": len(entries), "dry_run": args.dry_run}, + agent=args.agent, + ) + return 0 + config = load_config() if args.command == "account": @@ -1072,6 +1148,17 @@ def main(argv: list[str] | None = None) -> int: return 0 if args.command == "opportunity": + if args.opportunity_command == "dataset": + with with_spinner("Collecting opportunity dataset...", enabled=not args.agent): + result = opportunity_dataset_service.collect_opportunity_dataset( + config, + symbols=args.symbols, + simulate_days=args.simulate_days, + run_days=args.run_days, + output_path=args.output, + ) + print_output(result, agent=args.agent) + return 0 spot_client = _load_spot_client(config) with with_spinner("Scanning opportunities...", enabled=not args.agent): result = opportunity_service.scan_opportunities( @@ -1080,21 +1167,6 @@ def main(argv: list[str] | None = None) -> int: print_output(result, agent=args.agent) return 0 - if args.command == "upgrade": - with with_spinner("Upgrading coinhunter...", enabled=not args.agent): - result = self_upgrade() - print_output(result, agent=args.agent) - return 0 - - if args.command == "catlog": - with with_spinner("Reading audit logs...", enabled=not args.agent): - entries = read_audit_log(limit=args.limit, offset=args.offset, dry_run=args.dry_run) - print_output( - {"entries": entries, "limit": args.limit, "offset": args.offset, "total": len(entries), "dry_run": args.dry_run}, - agent=args.agent, - ) - return 0 - parser.error(f"Unsupported command {args.command}") return 2 except Exception as exc: diff --git a/src/coinhunter/config.py b/src/coinhunter/config.py index 9e8ebb9..64763a6 100644 --- a/src/coinhunter/config.py +++ b/src/coinhunter/config.py @@ -38,14 +38,6 @@ spot_enabled = true dry_run_default = false dust_usdt_threshold = 10.0 -[signal] -lookback_interval = "1h" -trend = 1.0 -momentum = 1.0 -breakout = 0.8 -volume = 0.7 -volatility_penalty = 0.5 - [opportunity] min_quote_volume = 1000000.0 top_n = 10 @@ -54,6 +46,49 @@ ignore_dust = true entry_threshold = 1.5 watch_threshold = 0.6 overlap_penalty = 0.6 +lookback_intervals = ["1h", "4h", "1d"] +auto_research = true +research_provider = "coingecko" +research_timeout_seconds = 4.0 +simulate_days = 7 +run_days = 7 +dataset_timeout_seconds = 15.0 + +[opportunity.risk_limits] +min_liquidity = 0.0 +max_overextension = 0.08 +max_downside_risk = 0.3 +max_unlock_risk = 0.75 +max_regulatory_risk = 0.75 +min_quality_for_add = 0.0 + +[opportunity.weights] +trend = 1.0 +momentum = 1.0 +breakout = 0.8 +pullback = 0.4 +volume = 0.7 +liquidity = 0.3 +trend_alignment = 0.8 +fundamental = 0.8 +tokenomics = 0.7 +catalyst = 0.5 +adoption = 0.4 +smart_money = 0.3 +volatility_penalty = 0.5 +overextension_penalty = 0.7 +downside_penalty = 0.5 +unlock_penalty = 0.8 +regulatory_penalty = 0.4 +position_concentration_penalty = 0.6 + +[signal] +lookback_interval = "1h" +trend = 1.0 +momentum = 1.0 +breakout = 0.8 +volume = 0.7 +volatility_penalty = 0.5 [portfolio] add_threshold = 1.5 diff --git a/src/coinhunter/services/opportunity_dataset_service.py b/src/coinhunter/services/opportunity_dataset_service.py new file mode 100644 index 0000000..0120528 --- /dev/null +++ b/src/coinhunter/services/opportunity_dataset_service.py @@ -0,0 +1,353 @@ +"""Historical dataset collection for opportunity evaluation.""" + +from __future__ import annotations + +import json +from collections.abc import Callable +from dataclasses import asdict, dataclass +from datetime import datetime, timedelta, timezone +from pathlib import Path +from typing import Any +from urllib.error import HTTPError, URLError +from urllib.parse import parse_qs, urlencode, urlparse +from urllib.request import Request, urlopen + +from ..runtime import get_runtime_paths +from .market_service import normalize_symbol, normalize_symbols + +HttpGet = Callable[[str, dict[str, str], float], Any] + +_INTERVAL_SECONDS = { + "1m": 60, + "3m": 180, + "5m": 300, + "15m": 900, + "30m": 1800, + "1h": 3600, + "2h": 7200, + "4h": 14400, + "6h": 21600, + "8h": 28800, + "12h": 43200, + "1d": 86400, + "3d": 259200, + "1w": 604800, +} + + +@dataclass(frozen=True) +class DatasetPlan: + intervals: list[str] + kline_limit: int + reference_days: float + simulate_days: float + run_days: float + total_days: float + start: datetime + simulation_start: datetime + simulation_end: datetime + end: datetime + + +def _as_float(value: Any, default: float = 0.0) -> float: + try: + return float(value) + except (TypeError, ValueError): + return default + + +def _as_int(value: Any, default: int = 0) -> int: + try: + return int(value) + except (TypeError, ValueError): + return default + + +def _public_http_get(url: str, headers: dict[str, str], timeout: float) -> Any: + request = Request(url, headers=headers) + with urlopen(request, timeout=timeout) as response: # noqa: S310 - market data endpoints are user-configurable + return json.loads(response.read().decode("utf-8")) + + +def _public_http_status(url: str, headers: dict[str, str], timeout: float) -> tuple[int, str]: + request = Request(url, headers=headers) + try: + with urlopen(request, timeout=timeout) as response: # noqa: S310 - market data endpoints are user-configurable + return response.status, response.read().decode("utf-8") + except HTTPError as exc: + return exc.code, exc.read().decode("utf-8") + + +def _build_url(base_url: str, path: str, params: dict[str, str]) -> str: + return f"{base_url.rstrip('/')}{path}?{urlencode(params)}" + + +def _iso(dt: datetime) -> str: + return dt.astimezone(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z") + + +def _ms(dt: datetime) -> int: + return int(dt.timestamp() * 1000) + + +def _default_intervals(config: dict[str, Any]) -> list[str]: + configured = config.get("opportunity", {}).get("lookback_intervals", ["1h", "4h", "1d"]) + intervals = [str(item).strip() for item in configured if str(item).strip()] + return intervals or ["1h"] + + +def reference_days_for(config: dict[str, Any]) -> float: + opportunity_config = config.get("opportunity", {}) + intervals = _default_intervals(config) + kline_limit = _as_int(opportunity_config.get("kline_limit"), 48) + seconds = [(_INTERVAL_SECONDS.get(interval) or 0) * kline_limit for interval in intervals] + return round(max(seconds or [0]) / 86400, 4) + + +def build_dataset_plan( + config: dict[str, Any], + *, + simulate_days: float | None = None, + run_days: float | None = None, + now: datetime | None = None, +) -> DatasetPlan: + opportunity_config = config.get("opportunity", {}) + intervals = _default_intervals(config) + kline_limit = _as_int(opportunity_config.get("kline_limit"), 48) + reference_days = reference_days_for(config) + simulate = _as_float( + simulate_days if simulate_days is not None else opportunity_config.get("simulate_days"), + 7.0, + ) + run = _as_float(run_days if run_days is not None else opportunity_config.get("run_days"), 7.0) + end = (now or datetime.now(timezone.utc)).astimezone(timezone.utc).replace(microsecond=0) + total = reference_days + simulate + run + start = end - timedelta(days=total) + simulation_start = start + timedelta(days=reference_days) + simulation_end = simulation_start + timedelta(days=run) + return DatasetPlan( + intervals=intervals, + kline_limit=kline_limit, + reference_days=reference_days, + simulate_days=simulate, + run_days=run, + total_days=round(total, 4), + start=start, + simulation_start=simulation_start, + simulation_end=simulation_end, + end=end, + ) + + +def _binance_base_url(config: dict[str, Any]) -> str: + return str(config.get("binance", {}).get("spot_base_url", "https://api.binance.com")) + + +def _select_universe( + config: dict[str, Any], + *, + symbols: list[str] | None, + http_get: HttpGet, + timeout: float, +) -> list[str]: + if symbols: + return normalize_symbols(symbols) + + market_config = config.get("market", {}) + opportunity_config = config.get("opportunity", {}) + quote = str(market_config.get("default_quote", "USDT")).upper() + allowlist = set(normalize_symbols(market_config.get("universe_allowlist", []))) + denylist = set(normalize_symbols(market_config.get("universe_denylist", []))) + scan_limit = _as_int(opportunity_config.get("scan_limit"), 50) + min_quote_volume = _as_float(opportunity_config.get("min_quote_volume"), 0.0) + base_url = _binance_base_url(config) + headers = {"accept": "application/json", "user-agent": "coinhunter/2"} + + exchange_info = http_get(_build_url(base_url, "/api/v3/exchangeInfo", {}), headers, timeout) + status_map = {normalize_symbol(item["symbol"]): item.get("status", "") for item in exchange_info.get("symbols", [])} + rows = http_get(_build_url(base_url, "/api/v3/ticker/24hr", {}), headers, timeout) + + universe: list[tuple[str, float]] = [] + for ticker in rows if isinstance(rows, list) else []: + symbol = normalize_symbol(ticker.get("symbol", "")) + if not symbol.endswith(quote): + continue + if allowlist and symbol not in allowlist: + continue + if symbol in denylist: + continue + if status_map.get(symbol) != "TRADING": + continue + quote_volume = _as_float(ticker.get("quoteVolume")) + if quote_volume < min_quote_volume: + continue + universe.append((symbol, quote_volume)) + universe.sort(key=lambda item: item[1], reverse=True) + return [symbol for symbol, _ in universe[:scan_limit]] + + +def _fetch_klines( + config: dict[str, Any], + *, + symbol: str, + interval: str, + start: datetime, + end: datetime, + http_get: HttpGet, + timeout: float, +) -> list[list[Any]]: + base_url = _binance_base_url(config) + headers = {"accept": "application/json", "user-agent": "coinhunter/2"} + interval_ms = (_INTERVAL_SECONDS.get(interval) or 60) * 1000 + cursor = _ms(start) + end_ms = _ms(end) + rows: list[list[Any]] = [] + while cursor <= end_ms: + url = _build_url( + base_url, + "/api/v3/klines", + { + "symbol": symbol, + "interval": interval, + "startTime": str(cursor), + "endTime": str(end_ms), + "limit": "1000", + }, + ) + chunk = http_get(url, headers, timeout) + if not chunk: + break + rows.extend(chunk) + last_open = int(chunk[-1][0]) + next_cursor = last_open + interval_ms + if next_cursor <= cursor: + break + cursor = next_cursor + if len(chunk) < 1000: + break + return rows + + +def _probe_external_history( + config: dict[str, Any], + *, + plan: DatasetPlan, + timeout: float, + http_status: Callable[[str, dict[str, str], float], tuple[int, str]] | None = None, +) -> dict[str, Any]: + opportunity_config = config.get("opportunity", {}) + provider = str(opportunity_config.get("research_provider", "coingecko")).lower().strip() + if not bool(opportunity_config.get("auto_research", True)) or provider in {"", "off", "none", "disabled"}: + return {"provider": provider or "disabled", "status": "disabled"} + if provider != "coingecko": + return {"provider": provider, "status": "unsupported"} + + coingecko_config = config.get("coingecko", {}) + base_url = str(coingecko_config.get("base_url", "https://api.coingecko.com/api/v3")) + api_key = str(coingecko_config.get("api_key", "")).strip() + headers = {"accept": "application/json", "user-agent": "coinhunter/2"} + if api_key: + headers["x-cg-demo-api-key"] = api_key + sample_date = plan.simulation_start.strftime("%d-%m-%Y") + url = _build_url(base_url, "/coins/bitcoin/history", {"date": sample_date}) + http_status = http_status or _public_http_status + try: + status, body = http_status(url, headers, timeout) + except (TimeoutError, URLError, OSError) as exc: + return {"provider": "coingecko", "status": "failed", "sample_date": sample_date, "error": str(exc)} + if status == 200: + return {"provider": "coingecko", "status": "available", "sample_date": sample_date} + lowered = body.lower() + if "allowed time range" in lowered or "365 days" in lowered: + result_status = "limited" + elif status == 429: + result_status = "rate_limited" + elif status in {401, 403}: + result_status = "unauthorized" + else: + result_status = "failed" + return { + "provider": "coingecko", + "status": result_status, + "sample_date": sample_date, + "http_status": status, + "message": body[:240], + } + + +def _default_output_path(plan: DatasetPlan) -> Path: + dataset_dir = get_runtime_paths().root / "datasets" + dataset_dir.mkdir(parents=True, exist_ok=True) + stamp = plan.end.strftime("%Y%m%dT%H%M%SZ") + return dataset_dir / f"opportunity_dataset_{stamp}.json" + + +def collect_opportunity_dataset( + config: dict[str, Any], + *, + symbols: list[str] | None = None, + simulate_days: float | None = None, + run_days: float | None = None, + output_path: str | None = None, + http_get: HttpGet | None = None, + http_status: Callable[[str, dict[str, str], float], tuple[int, str]] | None = None, + now: datetime | None = None, +) -> dict[str, Any]: + opportunity_config = config.get("opportunity", {}) + timeout = _as_float(opportunity_config.get("dataset_timeout_seconds"), 15.0) + plan = build_dataset_plan(config, simulate_days=simulate_days, run_days=run_days, now=now) + http_get = http_get or _public_http_get + selected_symbols = _select_universe(config, symbols=symbols, http_get=http_get, timeout=timeout) + klines: dict[str, dict[str, list[list[Any]]]] = {} + counts: dict[str, dict[str, int]] = {} + for symbol in selected_symbols: + klines[symbol] = {} + counts[symbol] = {} + for interval in plan.intervals: + rows = _fetch_klines( + config, + symbol=symbol, + interval=interval, + start=plan.start, + end=plan.end, + http_get=http_get, + timeout=timeout, + ) + klines[symbol][interval] = rows + counts[symbol][interval] = len(rows) + + external_history = _probe_external_history(config, plan=plan, timeout=timeout, http_status=http_status) + path = Path(output_path).expanduser() if output_path else _default_output_path(plan) + path.parent.mkdir(parents=True, exist_ok=True) + metadata = { + "created_at": _iso(datetime.now(timezone.utc)), + "quote": str(config.get("market", {}).get("default_quote", "USDT")).upper(), + "symbols": selected_symbols, + "plan": { + **{ + key: value + for key, value in asdict(plan).items() + if key not in {"start", "simulation_start", "simulation_end", "end"} + }, + "start": _iso(plan.start), + "simulation_start": _iso(plan.simulation_start), + "simulation_end": _iso(plan.simulation_end), + "end": _iso(plan.end), + }, + "external_history": external_history, + } + dataset = {"metadata": metadata, "klines": klines} + path.write_text(json.dumps(dataset, ensure_ascii=False, indent=2), encoding="utf-8") + return { + "path": str(path), + "symbols": selected_symbols, + "counts": counts, + "plan": metadata["plan"], + "external_history": external_history, + } + + +def parse_query(url: str) -> dict[str, str]: + """Test helper for fake HTTP clients.""" + parsed = urlparse(url) + return {key: values[-1] for key, values in parse_qs(parsed.query).items()} diff --git a/src/coinhunter/services/opportunity_service.py b/src/coinhunter/services/opportunity_service.py index a951857..f8622b7 100644 --- a/src/coinhunter/services/opportunity_service.py +++ b/src/coinhunter/services/opportunity_service.py @@ -3,11 +3,14 @@ from __future__ import annotations from dataclasses import asdict, dataclass +from math import log10 +from statistics import mean from typing import Any from ..audit import audit_event from .account_service import get_positions from .market_service import base_asset, get_scan_universe, normalize_symbol +from .research_service import get_external_research from .signal_service import get_signal_interval, score_opportunity_signal @@ -29,6 +32,197 @@ def _opportunity_thresholds(config: dict[str, Any]) -> dict[str, float]: } +def _safe_pct(new: float, old: float) -> float: + if old == 0: + return 0.0 + return (new - old) / old + + +def _clamp(value: float, low: float, high: float) -> float: + return min(max(value, low), high) + + +def _as_float(value: Any, default: float = 0.0) -> float: + try: + return float(value) + except (TypeError, ValueError): + return default + + +def _empty_metrics(concentration: float) -> dict[str, float]: + return { + "trend": 0.0, + "momentum": 0.0, + "breakout": 0.0, + "pullback": 0.0, + "volume_confirmation": 1.0, + "liquidity": 0.0, + "trend_alignment": 0.0, + "volatility": 0.0, + "overextension": 0.0, + "downside_risk": 0.0, + "fundamental": 0.0, + "tokenomics": 0.0, + "catalyst": 0.0, + "adoption": 0.0, + "smart_money": 0.0, + "unlock_risk": 0.0, + "regulatory_risk": 0.0, + "research_confidence": 0.0, + "quality": 0.0, + "concentration": round(concentration, 4), + } + + +def _series_from_klines(klines: list[list[Any]]) -> tuple[list[float], list[float]]: + return [float(item[4]) for item in klines], [float(item[5]) for item in klines] + + +def _trend_signal(closes: list[float]) -> float: + if len(closes) < 2: + return 0.0 + current = closes[-1] + sma_short = mean(closes[-5:]) if len(closes) >= 5 else current + sma_long = mean(closes[-20:]) if len(closes) >= 20 else mean(closes) + if current >= sma_short >= sma_long: + return 1.0 + if current < sma_short < sma_long: + return -1.0 + return 0.0 + + +def _trend_alignment(interval_closes: dict[str, list[float]] | None) -> float: + if not interval_closes: + return 0.0 + signals = [_trend_signal(closes) for closes in interval_closes.values() if len(closes) >= 2] + return mean(signals) if signals else 0.0 + + +def _range_position(current: float, low: float, high: float) -> float: + if high <= low: + return 0.5 + return _clamp((current - low) / (high - low), 0.0, 1.0) + + +def _normalized_research_score(value: Any) -> float: + """Normalize provider research inputs to 0..1. + + Provider values can be expressed as either 0..1 or 0..100. + """ + score = _as_float(value) + if score > 1.0: + score = score / 100.0 + return _clamp(score, 0.0, 1.0) + + +def _research_signals(research: dict[str, Any] | None) -> dict[str, float]: + research = research or {} + return { + "fundamental": _normalized_research_score(research.get("fundamental")), + "tokenomics": _normalized_research_score(research.get("tokenomics")), + "catalyst": _normalized_research_score(research.get("catalyst")), + "adoption": _normalized_research_score(research.get("adoption")), + "smart_money": _normalized_research_score(research.get("smart_money")), + "unlock_risk": _normalized_research_score(research.get("unlock_risk")), + "regulatory_risk": _normalized_research_score(research.get("regulatory_risk")), + "research_confidence": _normalized_research_score(research.get("research_confidence")), + } + + +def _score_candidate( + closes: list[float], + volumes: list[float], + ticker: dict[str, Any], + weights: dict[str, float], + concentration: float, + interval_closes: dict[str, list[float]] | None = None, + research: dict[str, Any] | None = None, +) -> tuple[float, dict[str, float]]: + if len(closes) < 2 or not volumes: + return 0.0, _empty_metrics(concentration) + + current = closes[-1] + sma_short = mean(closes[-5:]) if len(closes) >= 5 else current + trend = _trend_signal(closes) + momentum = ( + _safe_pct(closes[-1], closes[-2]) * 0.5 + + (_safe_pct(closes[-1], closes[-5]) * 0.3 if len(closes) >= 5 else 0.0) + + _as_float(ticker.get("price_change_pct")) / 100.0 * 0.2 + ) + recent_high = max(closes[-20:]) if len(closes) >= 20 else max(closes) + breakout = 1.0 - max((recent_high - current) / recent_high, 0.0) + avg_volume = mean(volumes[:-1]) if len(volumes) > 1 else volumes[-1] + volume_confirmation = volumes[-1] / avg_volume if avg_volume else 1.0 + volume_score = _clamp(volume_confirmation - 1.0, -1.0, 2.0) + volatility = (max(closes[-10:]) - min(closes[-10:])) / current if len(closes) >= 10 and current else 0.0 + quote_volume = _as_float(ticker.get("quote_volume")) + liquidity = _clamp((log10(quote_volume) - 6.0) / 3.0, 0.0, 1.0) if quote_volume > 0 else 0.0 + high_price = _as_float(ticker.get("high_price"), recent_high) + low_price = _as_float(ticker.get("low_price"), min(closes)) + range_position = _range_position(current, low_price, high_price) + pullback = 1.0 - abs(range_position - 0.62) / 0.62 + pullback = _clamp(pullback, 0.0, 1.0) + overextension = max(_safe_pct(current, sma_short) - 0.08, 0.0) + max( + _as_float(ticker.get("price_change_pct")) / 100.0 - 0.12, 0.0 + ) + downside_risk = max(0.35 - range_position, 0.0) + max(volatility - 0.18, 0.0) + trend_alignment = _trend_alignment(interval_closes) + research_signals = _research_signals(research) + quality = mean( + [ + research_signals["fundamental"], + research_signals["tokenomics"], + research_signals["catalyst"], + research_signals["adoption"], + research_signals["smart_money"], + ] + ) + + score = ( + weights.get("trend", 1.0) * trend + + weights.get("momentum", 1.0) * momentum + + weights.get("breakout", 0.8) * breakout + + weights.get("pullback", 0.4) * pullback + + weights.get("volume", 0.7) * volume_score + + weights.get("liquidity", 0.3) * liquidity + + weights.get("trend_alignment", 0.8) * trend_alignment + + weights.get("fundamental", 0.8) * research_signals["fundamental"] + + weights.get("tokenomics", 0.7) * research_signals["tokenomics"] + + weights.get("catalyst", 0.5) * research_signals["catalyst"] + + weights.get("adoption", 0.4) * research_signals["adoption"] + + weights.get("smart_money", 0.3) * research_signals["smart_money"] + - weights.get("volatility_penalty", 0.5) * volatility + - weights.get("overextension_penalty", 0.7) * overextension + - weights.get("downside_penalty", 0.5) * downside_risk + - weights.get("unlock_penalty", 0.8) * research_signals["unlock_risk"] + - weights.get("regulatory_penalty", 0.4) * research_signals["regulatory_risk"] + - weights.get("position_concentration_penalty", 0.6) * concentration + ) + metrics = { + "trend": round(trend, 4), + "momentum": round(momentum, 4), + "breakout": round(breakout, 4), + "pullback": round(pullback, 4), + "volume_confirmation": round(volume_confirmation, 4), + "liquidity": round(liquidity, 4), + "trend_alignment": round(trend_alignment, 4), + "volatility": round(volatility, 4), + "overextension": round(overextension, 4), + "downside_risk": round(downside_risk, 4), + "fundamental": round(research_signals["fundamental"], 4), + "tokenomics": round(research_signals["tokenomics"], 4), + "catalyst": round(research_signals["catalyst"], 4), + "adoption": round(research_signals["adoption"], 4), + "smart_money": round(research_signals["smart_money"], 4), + "unlock_risk": round(research_signals["unlock_risk"], 4), + "regulatory_risk": round(research_signals["regulatory_risk"], 4), + "research_confidence": round(research_signals["research_confidence"], 4), + "quality": round(quality, 4), + "concentration": round(concentration, 4), + } + return score, metrics + + def _action_for_opportunity(score: float, metrics: dict[str, float], thresholds: dict[str, float]) -> tuple[str, list[str]]: reasons: list[str] = [] if metrics["extension_penalty"] >= 1.0 and (metrics["recent_runup"] >= 0.10 or metrics["breakout_pct"] >= 0.03): @@ -44,6 +238,115 @@ def _action_for_opportunity(score: float, metrics: dict[str, float], thresholds: return "skip", reasons +def _action_for( + score: float, + concentration: float, + metrics: dict[str, float] | None = None, + risk_limits: dict[str, float] | None = None, +) -> tuple[str, list[str]]: + metrics = metrics or {} + risk_limits = risk_limits or {} + reasons: list[str] = [] + if concentration >= 0.5 and score < 0.4: + reasons.append("position concentration is high") + return "trim", reasons + if metrics.get("liquidity", 0.0) < risk_limits.get("min_liquidity", 0.0): + reasons.append("liquidity is below the configured institutional threshold") + return "observe", reasons + if metrics.get("unlock_risk", 0.0) > risk_limits.get("max_unlock_risk", 1.0): + reasons.append("token unlock or dilution risk is too high") + return "observe", reasons + if metrics.get("regulatory_risk", 0.0) > risk_limits.get("max_regulatory_risk", 1.0): + reasons.append("regulatory or listing risk is too high") + return "observe", reasons + if metrics.get("overextension", 0.0) >= risk_limits.get("max_overextension", 0.08): + reasons.append("move looks extended; wait for a cleaner entry") + return "observe", reasons + if metrics.get("downside_risk", 0.0) >= risk_limits.get("max_downside_risk", 0.3) and score < 1.0: + reasons.append("price is weak inside its recent range") + return "observe", reasons + if score >= 1.8 and metrics.get("quality", 0.0) >= risk_limits.get("min_quality_for_add", 0.0): + reasons.append("trend, liquidity, and research signals are aligned") + return "add", reasons + if score >= 0.6: + reasons.append("trend remains constructive") + return "hold", reasons + if score <= -0.2: + reasons.append("momentum and structure have weakened") + return "exit", reasons + reasons.append("signal is mixed and needs confirmation") + return "observe", reasons + + +def _lookback_intervals(config: dict[str, Any]) -> list[str]: + configured = config.get("opportunity", {}).get("lookback_intervals", ["1h"]) + intervals = [str(item) for item in configured if str(item).strip()] + return intervals or ["1h"] + + +def _risk_limits(config: dict[str, Any]) -> dict[str, float]: + configured = config.get("opportunity", {}).get("risk_limits", {}) + return {str(key): _as_float(value) for key, value in configured.items()} + + +def _ticker_metrics(ticker: dict[str, Any]) -> dict[str, float]: + return { + "price_change_pct": _as_float(ticker.get("priceChangePercent") or ticker.get("price_change_pct")), + "quote_volume": _as_float(ticker.get("quoteVolume") or ticker.get("quote_volume")), + "high_price": _as_float(ticker.get("highPrice") or ticker.get("high_price")), + "low_price": _as_float(ticker.get("lowPrice") or ticker.get("low_price")), + } + + +def _candidate_series( + spot_client: Any, + symbol: str, + intervals: list[str], + limit: int, +) -> tuple[list[float], list[float], dict[str, list[float]]]: + interval_closes: dict[str, list[float]] = {} + primary_closes: list[float] = [] + primary_volumes: list[float] = [] + for index, interval in enumerate(intervals): + closes, volumes = _series_from_klines(spot_client.klines(symbol=symbol, interval=interval, limit=limit)) + interval_closes[interval] = closes + if index == 0: + primary_closes = closes + primary_volumes = volumes + return primary_closes, primary_volumes, interval_closes + + +def _add_research_metrics(metrics: dict[str, float], research: dict[str, Any] | None) -> None: + research_signals = _research_signals(research) + for key, value in research_signals.items(): + metrics[key] = round(value, 4) + metrics["quality"] = round( + mean( + [ + research_signals["fundamental"], + research_signals["tokenomics"], + research_signals["catalyst"], + research_signals["adoption"], + research_signals["smart_money"], + ] + ), + 4, + ) + + +def _research_score(research: dict[str, Any] | None, weights: dict[str, float]) -> float: + signals = _research_signals(research) + return ( + weights.get("fundamental", 0.8) * signals["fundamental"] + + weights.get("tokenomics", 0.7) * signals["tokenomics"] + + weights.get("catalyst", 0.5) * signals["catalyst"] + + weights.get("adoption", 0.4) * signals["adoption"] + + weights.get("smart_money", 0.3) * signals["smart_money"] + - weights.get("unlock_penalty", 0.8) * signals["unlock_risk"] + - weights.get("regulatory_penalty", 0.4) * signals["regulatory_risk"] + ) + + def scan_opportunities( config: dict[str, Any], *, @@ -51,6 +354,7 @@ def scan_opportunities( symbols: list[str] | None = None, ) -> dict[str, Any]: opportunity_config = config.get("opportunity", {}) + weights = opportunity_config.get("weights", {}) ignore_dust = bool(opportunity_config.get("ignore_dust", True)) interval = get_signal_interval(config) thresholds = _opportunity_thresholds(config) @@ -62,18 +366,27 @@ def scan_opportunities( total_held = sum(concentration_map.values()) or 1.0 universe = get_scan_universe(config, spot_client=spot_client, symbols=symbols)[:scan_limit] + external_research = get_external_research( + config, + symbols=[normalize_symbol(ticker["symbol"]) for ticker in universe], + quote=quote, + ) recommendations = [] for ticker in universe: symbol = normalize_symbol(ticker["symbol"]) klines = spot_client.klines(symbol=symbol, interval=interval, limit=24) - closes = [float(item[4]) for item in klines] - volumes = [float(item[5]) for item in klines] + closes, volumes = _series_from_klines(klines) concentration = concentration_map.get(symbol, 0.0) / total_held opportunity_score, metrics = score_opportunity_signal(closes, volumes, ticker, opportunity_config) score = opportunity_score - thresholds["overlap_penalty"] * concentration - action, reasons = _action_for_opportunity(score, metrics, thresholds) metrics["opportunity_score"] = round(opportunity_score, 4) metrics["position_weight"] = round(concentration, 4) + research = external_research.get(symbol, {}) + research_score = _research_score(research, weights) + score += research_score + metrics["research_score"] = round(research_score, 4) + _add_research_metrics(metrics, research) + action, reasons = _action_for_opportunity(score, metrics, thresholds) if symbol.endswith(quote): reasons.append(f"base asset {base_asset(symbol, quote)} passed liquidity and tradability filters") if concentration > 0: diff --git a/src/coinhunter/services/portfolio_service.py b/src/coinhunter/services/portfolio_service.py index fe376d3..141b710 100644 --- a/src/coinhunter/services/portfolio_service.py +++ b/src/coinhunter/services/portfolio_service.py @@ -8,7 +8,11 @@ from typing import Any from ..audit import audit_event from .account_service import get_positions from .market_service import normalize_symbol -from .signal_service import get_signal_interval, get_signal_weights, score_portfolio_signal +from .signal_service import ( + get_signal_interval, + get_signal_weights, + score_portfolio_signal, +) @dataclass diff --git a/src/coinhunter/services/research_service.py b/src/coinhunter/services/research_service.py new file mode 100644 index 0000000..38c1787 --- /dev/null +++ b/src/coinhunter/services/research_service.py @@ -0,0 +1,214 @@ +"""External research signal providers for opportunity scoring.""" + +from __future__ import annotations + +import json +from collections.abc import Callable +from math import log10 +from typing import Any +from urllib.parse import urlencode +from urllib.request import Request, urlopen + +from .market_service import base_asset, normalize_symbol + +HttpGet = Callable[[str, dict[str, str], float], Any] + + +def _clamp(value: float, low: float = 0.0, high: float = 1.0) -> float: + return min(max(value, low), high) + + +def _as_float(value: Any, default: float = 0.0) -> float: + try: + return float(value) + except (TypeError, ValueError): + return default + + +def _safe_ratio(numerator: float, denominator: float) -> float: + if denominator <= 0: + return 0.0 + return numerator / denominator + + +def _log_score(value: float, *, floor: float, span: float) -> float: + if value <= 0: + return 0.0 + return _clamp((log10(value) - floor) / span) + + +def _pct_score(value: float, *, low: float, high: float) -> float: + if high <= low: + return 0.0 + return _clamp((value - low) / (high - low)) + + +def _public_http_get(url: str, headers: dict[str, str], timeout: float) -> Any: + request = Request(url, headers=headers) + with urlopen(request, timeout=timeout) as response: # noqa: S310 - user-configured market data endpoint + return json.loads(response.read().decode("utf-8")) + + +def _build_url(base_url: str, path: str, params: dict[str, str]) -> str: + return f"{base_url.rstrip('/')}{path}?{urlencode(params)}" + + +def _chunked(items: list[str], size: int) -> list[list[str]]: + return [items[index : index + size] for index in range(0, len(items), size)] + + +def _coingecko_market_to_signals(row: dict[str, Any], *, is_trending: bool = False) -> dict[str, float]: + market_cap = _as_float(row.get("market_cap")) + fdv = _as_float(row.get("fully_diluted_valuation")) + volume = _as_float(row.get("total_volume")) + rank = _as_float(row.get("market_cap_rank"), 9999.0) + circulating = _as_float(row.get("circulating_supply")) + total_supply = _as_float(row.get("total_supply")) + max_supply = _as_float(row.get("max_supply")) + supply_cap = max_supply or total_supply + + rank_score = _clamp(1.0 - (log10(max(rank, 1.0)) / 4.0)) + size_score = _log_score(market_cap, floor=7.0, span=5.0) + volume_to_mcap = _safe_ratio(volume, market_cap) + liquidity_quality = _clamp(volume_to_mcap / 0.10) + + fdv_ratio = _safe_ratio(fdv, market_cap) if fdv and market_cap else 1.0 + fdv_dilution_risk = _clamp((fdv_ratio - 1.0) / 4.0) + supply_unlocked = _clamp(_safe_ratio(circulating, supply_cap)) if supply_cap else max(0.0, 1.0 - fdv_dilution_risk) + supply_dilution_risk = 1.0 - supply_unlocked + unlock_risk = max(fdv_dilution_risk, supply_dilution_risk * 0.8) + + pct_7d = _as_float(row.get("price_change_percentage_7d_in_currency")) + pct_30d = _as_float(row.get("price_change_percentage_30d_in_currency")) + pct_200d = _as_float(row.get("price_change_percentage_200d_in_currency")) + medium_momentum = _pct_score(pct_30d, low=-15.0, high=60.0) + long_momentum = _pct_score(pct_200d, low=-40.0, high=150.0) + trend_catalyst = _pct_score(pct_7d, low=-5.0, high=25.0) + trend_bonus = 1.0 if is_trending else 0.0 + + tokenomics = _clamp(0.65 * supply_unlocked + 0.35 * (1.0 - fdv_dilution_risk)) + fundamental = _clamp(0.40 * rank_score + 0.35 * size_score + 0.25 * liquidity_quality) + catalyst = _clamp(0.45 * trend_catalyst + 0.40 * medium_momentum + 0.15 * trend_bonus) + adoption = _clamp(0.45 * rank_score + 0.35 * liquidity_quality + 0.20 * long_momentum) + smart_money = _clamp(0.35 * rank_score + 0.35 * liquidity_quality + 0.30 * (1.0 - unlock_risk)) + regulatory_risk = 0.10 if rank <= 100 else 0.20 if rank <= 500 else 0.35 + + populated_fields = sum( + 1 + for value in (market_cap, fdv, volume, rank, circulating, supply_cap, pct_7d, pct_30d, pct_200d) + if value + ) + confidence = _clamp(populated_fields / 9.0) + + return { + "fundamental": round(fundamental, 4), + "tokenomics": round(tokenomics, 4), + "catalyst": round(catalyst, 4), + "adoption": round(adoption, 4), + "smart_money": round(smart_money, 4), + "unlock_risk": round(unlock_risk, 4), + "regulatory_risk": round(regulatory_risk, 4), + "research_confidence": round(confidence, 4), + } + + +def _coingecko_headers(config: dict[str, Any]) -> dict[str, str]: + coingecko_config = config.get("coingecko", {}) + headers = {"accept": "application/json", "user-agent": "coinhunter/2"} + api_key = str(coingecko_config.get("api_key", "")).strip() + if api_key: + headers["x-cg-demo-api-key"] = api_key + return headers + + +def _fetch_coingecko_research( + config: dict[str, Any], + *, + symbols: list[str], + quote: str, + http_get: HttpGet | None = None, +) -> dict[str, dict[str, float]]: + if not symbols: + return {} + + opportunity_config = config.get("opportunity", {}) + coingecko_config = config.get("coingecko", {}) + base_url = str(coingecko_config.get("base_url", "https://api.coingecko.com/api/v3")) + timeout = _as_float(opportunity_config.get("research_timeout_seconds"), 4.0) + headers = _coingecko_headers(config) + http_get = http_get or _public_http_get + + base_to_symbol = { + base_asset(normalize_symbol(symbol), quote).lower(): normalize_symbol(symbol) + for symbol in symbols + if normalize_symbol(symbol) + } + bases = sorted(base_to_symbol) + if not bases: + return {} + + trending_ids: set[str] = set() + try: + trending_url = _build_url(base_url, "/search/trending", {}) + trending_payload = http_get(trending_url, headers, timeout) + for item in trending_payload.get("coins", []): + coin = item.get("item", {}) + coin_id = str(coin.get("id", "")).strip() + if coin_id: + trending_ids.add(coin_id) + except Exception: + trending_ids = set() + + research: dict[str, dict[str, float]] = {} + for chunk in _chunked(bases, 50): + params = { + "vs_currency": "usd", + "symbols": ",".join(chunk), + "include_tokens": "top", + "order": "market_cap_desc", + "per_page": "250", + "page": "1", + "sparkline": "false", + "price_change_percentage": "7d,30d,200d", + } + try: + markets_url = _build_url(base_url, "/coins/markets", params) + rows = http_get(markets_url, headers, timeout) + except Exception: + continue + + seen_bases: set[str] = set() + for row in rows if isinstance(rows, list) else []: + symbol = str(row.get("symbol", "")).lower() + if symbol in seen_bases or symbol not in base_to_symbol: + continue + seen_bases.add(symbol) + normalized = base_to_symbol[symbol] + research[normalized] = _coingecko_market_to_signals( + row, + is_trending=str(row.get("id", "")) in trending_ids, + ) + return research + + +def get_external_research( + config: dict[str, Any], + *, + symbols: list[str], + quote: str, + http_get: HttpGet | None = None, +) -> dict[str, dict[str, float]]: + """Fetch automated research signals for symbols. + + Returns an empty map when disabled or when the configured provider is unavailable. + Opportunity scans should continue rather than fail because a research endpoint timed out. + """ + opportunity_config = config.get("opportunity", {}) + if not bool(opportunity_config.get("auto_research", True)): + return {} + provider = str(opportunity_config.get("research_provider", "coingecko")).strip().lower() + if provider in {"", "off", "none", "disabled"}: + return {} + if provider != "coingecko": + return {} + return _fetch_coingecko_research(config, symbols=symbols, quote=quote, http_get=http_get) diff --git a/tests/test_cli.py b/tests/test_cli.py index 3b0107d..7b7ff78 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -248,3 +248,32 @@ class CLITestCase(unittest.TestCase): content = __import__("pathlib").Path(tmp_path).read_text() self.assertIn("BINANCE_API_SECRET=test_secret_value", content) __import__("os").unlink(tmp_path) + + def test_opportunity_dataset_dispatches_without_private_client(self): + captured = {} + config = {"market": {"default_quote": "USDT"}, "opportunity": {}} + with ( + patch.object(cli, "load_config", return_value=config), + patch.object(cli, "_load_spot_client", side_effect=AssertionError("dataset should use public data")), + patch.object( + cli.opportunity_dataset_service, + "collect_opportunity_dataset", + return_value={"path": "/tmp/dataset.json", "symbols": ["BTCUSDT"]}, + ) as collect_mock, + patch.object( + cli, "print_output", side_effect=lambda payload, **kwargs: captured.setdefault("payload", payload) + ), + ): + result = cli.main( + ["opportunity", "dataset", "--symbols", "BTCUSDT", "--simulate-days", "3", "--run-days", "7"] + ) + + self.assertEqual(result, 0) + self.assertEqual(captured["payload"]["path"], "/tmp/dataset.json") + collect_mock.assert_called_once_with( + config, + symbols=["BTCUSDT"], + simulate_days=3.0, + run_days=7.0, + output_path=None, + ) diff --git a/tests/test_opportunity_dataset_service.py b/tests/test_opportunity_dataset_service.py new file mode 100644 index 0000000..5ca9c95 --- /dev/null +++ b/tests/test_opportunity_dataset_service.py @@ -0,0 +1,76 @@ +"""Opportunity dataset collection tests.""" + +from __future__ import annotations + +import json +import tempfile +import unittest +from datetime import datetime, timezone +from pathlib import Path + +from coinhunter.services import opportunity_dataset_service + + +class OpportunityDatasetServiceTestCase(unittest.TestCase): + def test_default_plan_uses_widest_scan_reference_window(self): + config = {"opportunity": {"lookback_intervals": ["1h", "4h", "1d"]}} + plan = opportunity_dataset_service.build_dataset_plan( + config, + now=datetime(2026, 4, 21, tzinfo=timezone.utc), + ) + + self.assertEqual(plan.kline_limit, 48) + self.assertEqual(plan.reference_days, 48.0) + self.assertEqual(plan.simulate_days, 7.0) + self.assertEqual(plan.run_days, 7.0) + self.assertEqual(plan.total_days, 62.0) + + def test_collect_dataset_writes_klines_and_probe_metadata(self): + config = { + "binance": {"spot_base_url": "https://api.binance.test"}, + "market": {"default_quote": "USDT"}, + "opportunity": { + "lookback_intervals": ["1d"], + "kline_limit": 2, + "simulate_days": 1, + "run_days": 1, + "auto_research": True, + "research_provider": "coingecko", + }, + } + + def fake_http_get(url, headers, timeout): + query = opportunity_dataset_service.parse_query(url) + interval_seconds = 86400 + start = int(query["startTime"]) + end = int(query["endTime"]) + rows = [] + cursor = start + index = 0 + while cursor <= end: + close = 100 + index + rows.append([cursor, close - 1, close + 1, close - 2, close, 10, cursor + interval_seconds * 1000 - 1, close * 10]) + cursor += interval_seconds * 1000 + index += 1 + return rows + + def fake_http_status(url, headers, timeout): + return 200, "{}" + + with tempfile.TemporaryDirectory() as tmpdir: + output = Path(tmpdir) / "dataset.json" + payload = opportunity_dataset_service.collect_opportunity_dataset( + config, + symbols=["BTCUSDT"], + output_path=str(output), + http_get=fake_http_get, + http_status=fake_http_status, + now=datetime(2026, 4, 21, tzinfo=timezone.utc), + ) + dataset = json.loads(output.read_text(encoding="utf-8")) + + self.assertEqual(payload["plan"]["reference_days"], 2.0) + self.assertEqual(payload["plan"]["total_days"], 4.0) + self.assertEqual(payload["external_history"]["status"], "available") + self.assertEqual(payload["counts"]["BTCUSDT"]["1d"], 5) + self.assertEqual(len(dataset["klines"]["BTCUSDT"]["1d"]), 5) diff --git a/tests/test_opportunity_service.py b/tests/test_opportunity_service.py index df64445..242e494 100644 --- a/tests/test_opportunity_service.py +++ b/tests/test_opportunity_service.py @@ -5,7 +5,12 @@ from __future__ import annotations import unittest from unittest.mock import patch -from coinhunter.services import opportunity_service, portfolio_service, signal_service +from coinhunter.services import ( + opportunity_service, + portfolio_service, + research_service, + signal_service, +) class FakeSpotClient: @@ -253,6 +258,37 @@ class OpportunityServiceTestCase(unittest.TestCase): "entry_threshold": 1.5, "watch_threshold": 0.6, "overlap_penalty": 0.6, + "auto_research": False, + "research_provider": "coingecko", + "research_timeout_seconds": 4.0, + "risk_limits": { + "min_liquidity": 0.0, + "max_overextension": 0.08, + "max_downside_risk": 0.3, + "max_unlock_risk": 0.75, + "max_regulatory_risk": 0.75, + "min_quality_for_add": 0.0, + }, + "weights": { + "trend": 1.0, + "momentum": 1.0, + "breakout": 0.8, + "pullback": 0.4, + "volume": 0.7, + "liquidity": 0.3, + "trend_alignment": 0.8, + "fundamental": 0.8, + "tokenomics": 0.7, + "catalyst": 0.5, + "adoption": 0.4, + "smart_money": 0.3, + "volatility_penalty": 0.5, + "overextension_penalty": 0.7, + "downside_penalty": 0.5, + "unlock_penalty": 0.8, + "regulatory_penalty": 0.4, + "position_concentration_penalty": 0.6, + }, }, "portfolio": { "add_threshold": 1.5, @@ -314,3 +350,133 @@ class OpportunityServiceTestCase(unittest.TestCase): score, metrics = signal_service.score_market_signal([], [], {"price_change_pct": 1.0}, {}) self.assertEqual(score, 0.0) self.assertEqual(metrics["trend"], 0.0) + + def test_overextended_candidate_is_not_an_add(self): + closes = [100, 110, 121, 133, 146, 160, 176] + volumes = [100, 120, 130, 150, 170, 190, 230] + ticker = { + "price_change_pct": 35.0, + "quote_volume": 20_000_000.0, + "high_price": 180.0, + "low_price": 95.0, + } + score, metrics = opportunity_service._score_candidate( + closes, volumes, ticker, self.config["opportunity"]["weights"], 0.0, {"1h": closes, "4h": closes} + ) + action, reasons = opportunity_service._action_for(score, 0.0, metrics) + + self.assertGreater(score, 1.0) + self.assertGreater(metrics["overextension"], 0.08) + self.assertEqual(action, "observe") + self.assertIn("move looks extended; wait for a cleaner entry", reasons) + + def test_external_research_signals_improve_candidate_quality(self): + closes = [100, 101, 102, 103, 104, 105, 106] + volumes = [100, 105, 110, 115, 120, 125, 130] + ticker = { + "price_change_pct": 4.0, + "quote_volume": 50_000_000.0, + "high_price": 110.0, + "low_price": 95.0, + } + base_score, base_metrics = opportunity_service._score_candidate( + closes, volumes, ticker, self.config["opportunity"]["weights"], 0.0, {"1h": closes} + ) + researched_score, researched_metrics = opportunity_service._score_candidate( + closes, + volumes, + ticker, + self.config["opportunity"]["weights"], + 0.0, + {"1h": closes}, + { + "fundamental": 85, + "tokenomics": 80, + "catalyst": 70, + "adoption": 90, + "smart_money": 60, + }, + ) + + self.assertGreater(researched_score, base_score) + self.assertEqual(base_metrics["quality"], 0.0) + self.assertGreater(researched_metrics["quality"], 0.7) + + def test_scan_uses_automatic_external_research(self): + config = self.config | { + "opportunity": self.config["opportunity"] + | { + "auto_research": True, + "top_n": 2, + } + } + with ( + patch.object(opportunity_service, "audit_event", return_value=None), + patch.object( + opportunity_service, + "get_external_research", + return_value={ + "SOLUSDT": { + "fundamental": 0.9, + "tokenomics": 0.8, + "catalyst": 0.9, + "adoption": 0.8, + "smart_money": 0.7, + "unlock_risk": 0.1, + "regulatory_risk": 0.1, + "research_confidence": 0.9, + } + }, + ) as research_mock, + ): + payload = opportunity_service.scan_opportunities(config, spot_client=FakeSpotClient()) + + research_mock.assert_called_once() + sol = next(item for item in payload["recommendations"] if item["symbol"] == "SOLUSDT") + self.assertEqual(sol["metrics"]["fundamental"], 0.9) + self.assertEqual(sol["metrics"]["research_confidence"], 0.9) + + def test_unlock_risk_blocks_add_recommendation(self): + metrics = { + "liquidity": 0.8, + "overextension": 0.0, + "downside_risk": 0.0, + "unlock_risk": 0.9, + "regulatory_risk": 0.0, + "quality": 0.8, + } + action, reasons = opportunity_service._action_for( + 3.0, + 0.0, + metrics, + self.config["opportunity"]["risk_limits"], + ) + + self.assertEqual(action, "observe") + self.assertIn("token unlock or dilution risk is too high", reasons) + + +class ResearchServiceTestCase(unittest.TestCase): + def test_coingecko_market_data_becomes_research_signals(self): + signals = research_service._coingecko_market_to_signals( + { + "id": "solana", + "symbol": "sol", + "market_cap": 80_000_000_000, + "fully_diluted_valuation": 95_000_000_000, + "total_volume": 5_000_000_000, + "market_cap_rank": 6, + "circulating_supply": 550_000_000, + "total_supply": 600_000_000, + "max_supply": None, + "price_change_percentage_7d_in_currency": 12.0, + "price_change_percentage_30d_in_currency": 35.0, + "price_change_percentage_200d_in_currency": 80.0, + }, + is_trending=True, + ) + + self.assertGreater(signals["fundamental"], 0.6) + self.assertGreater(signals["tokenomics"], 0.8) + self.assertGreater(signals["catalyst"], 0.6) + self.assertLess(signals["unlock_risk"], 0.2)