From 76c4129c8d664ba5ff10adc49fe06b87e9065ea0 Mon Sep 17 00:00:00 2001 From: Tacit Lab Date: Mon, 27 Apr 2026 16:35:33 +0800 Subject: [PATCH] refactor: simplify CLI to data layer for AI-assisted trading Transform CoinHunter from an over-engineered auto-trading system into a lightweight data-layer CLI paired with the coinbuddy AI Skill. Key changes: - Remove non-core commands: backtest, strategy, opportunity dataset/evaluate/optimize - Add scan: rule-based market screening (zero token cost) - Add analyze: multi-timeframe technical analysis for AI consumption - Add watch: lightweight portfolio anomaly monitoring (zero token cost) - Remove services: backtest, dataset, evaluation, research, strategy - Add analyze_service with RSI, key levels, alerts, and AI-friendly summaries - Add watch_portfolio with drawdown/spike/concentration/technical triggers - Simplify config: remove research/dataset settings, add watch thresholds - Update TUI rendering for analyze and watch outputs - Update tests and CLAUDE.md for new architecture Co-Authored-By: Claude Opus 4.7 --- CLAUDE.md | 63 +- src/coinhunter/cli.py | 384 +++++-------- src/coinhunter/config.py | 47 +- src/coinhunter/runtime.py | 53 ++ src/coinhunter/services/analyze_service.py | 201 +++++++ src/coinhunter/services/backtest_service.py | 370 ------------ .../services/opportunity_dataset_service.py | 372 ------------ .../opportunity_evaluation_service.py | 536 ------------------ .../services/opportunity_service.py | 76 --- src/coinhunter/services/portfolio_service.py | 103 ++++ src/coinhunter/services/research_service.py | 227 -------- src/coinhunter/services/strategy_service.py | 339 ----------- tests/test_backtest_service.py | 129 ----- tests/test_cli.py | 266 ++------- tests/test_opportunity_dataset_service.py | 280 --------- tests/test_opportunity_evaluation_service.py | 90 --- tests/test_opportunity_service.py | 106 +--- tests/test_strategy_service.py | 100 ---- 18 files changed, 600 insertions(+), 3142 deletions(-) create mode 100644 src/coinhunter/services/analyze_service.py delete mode 100644 src/coinhunter/services/backtest_service.py delete mode 100644 src/coinhunter/services/opportunity_dataset_service.py delete mode 100644 src/coinhunter/services/opportunity_evaluation_service.py delete mode 100644 src/coinhunter/services/research_service.py delete mode 100644 src/coinhunter/services/strategy_service.py delete mode 100644 tests/test_backtest_service.py delete mode 100644 tests/test_opportunity_dataset_service.py delete mode 100644 tests/test_opportunity_evaluation_service.py delete mode 100644 tests/test_strategy_service.py diff --git a/CLAUDE.md b/CLAUDE.md index 62d2784..4623ccc 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -14,31 +14,36 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co ## Architecture -CoinHunter V2 is a Binance-first crypto trading CLI with a flat, direct architecture: +CoinHunter is a **lightweight data-layer CLI** designed to pair with the `coinbuddy` AI Skill for crypto trading on Binance. The philosophy is **layered screening**: the CLI handles cheap rule-based filtering and monitoring, while the AI Skill handles expensive deep analysis on a small set of curated candidates. -- **`src/coinhunter/cli.py`** — Single entrypoint (`main()`). Uses `argparse` to parse commands and directly dispatches to service functions. There is no separate `commands/` adapter layer. Top-level commands include `init`, `config`, `account`, `market`, `buy`, `sell`, `portfolio`, `opportunity`, `strategy`, `backtest`, `catlog`, `upgrade`, and `completion`. -- **`src/coinhunter/services/`** — Contains all domain logic: - - `account_service.py` — balances, positions, overview +### CLI layer (data + execution) + +- **`src/coinhunter/cli.py`** — Single entrypoint (`main()`). Uses `argparse` to parse commands and directly dispatches to service functions. Core commands: `init`, `config`, `account`, `market`, `buy`, `sell`, `portfolio`, `scan`, `analyze`, `watch`, `upgrade`, `catlog`, `completion`. +- **`src/coinhunter/services/`** — Domain logic: + - `account_service.py` — balances, positions - `market_service.py` — tickers, klines, scan universe, symbol normalization - - `signal_service.py` — shared market signal scoring used by scan and portfolio analysis - - `portfolio_service.py` — held-position review and add/hold/trim/exit recommendations + - `signal_service.py` — shared market signal scoring (rule-based, zero token cost) + - `portfolio_service.py` — held-position review (`analyze_portfolio`) and lightweight anomaly monitoring (`watch_portfolio`) - `trade_service.py` — spot order execution only - - `opportunity_service.py` — market scanning and entry/watch/skip recommendations - - `opportunity_dataset_service.py` — historical kline dataset collection for backtesting - - `opportunity_evaluation_service.py` — walk-forward evaluation and model-weight optimization - - `research_service.py` — external research signal providers for opportunity scoring - - `strategy_service.py` — combines opportunity scanning and portfolio analysis into unified buy/sell/hold trade signals - - `backtest_service.py` — walk-forward backtest engine using historical kline datasets with virtual cash and positions -- **`src/coinhunter/binance/spot_client.py`** — Thin wrapper around `binance.spot.Spot`. Normalizes request errors into `RuntimeError` and handles single/multi-symbol ticker responses. + - `opportunity_service.py` — market scanning (`scan_opportunities`) returning top-N candidates + - `analyze_service.py` — multi-timeframe deep technical analysis for AI consumption +- **`src/coinhunter/binance/spot_client.py`** — Thin wrapper around `binance.spot.Spot`. Normalizes request errors into `RuntimeError`. - **`src/coinhunter/config.py`** — `load_config()`, `get_binance_credentials()`, `ensure_init_files()`. -- **`src/coinhunter/runtime.py`** — `RuntimePaths`, `get_runtime_paths()`, `print_json()`. +- **`src/coinhunter/runtime.py`** — `RuntimePaths`, `get_runtime_paths()`, `print_json()`, TUI rendering. - **`src/coinhunter/audit.py`** — Writes JSONL audit events to dated files. +### AI layer (decision) + +- **`coinbuddy` Skill** — Lives at `~/.claude/skills/coinbuddy/SKILL.md`. Governs how the AI interacts with the CLI: + - **Discovery flow:** `scan` → `analyze` → AI synthesis → user confirm → `trade` + - **Portfolio flow:** `watch` → flag NEED_REVIEW → `analyze` → AI synthesis → user confirm → `trade` + - The Skill always uses `--agent` for structured JSON consumption. + ## Runtime and environment User data lives in `~/.coinhunter/` by default (override with `COINHUNTER_HOME`): -- `config.toml` — runtime, binance, trading, signal, opportunity, and portfolio settings +- `config.toml` — runtime, binance, trading, signal, opportunity, portfolio, and watch settings - `.env` — `BINANCE_API_KEY` and `BINANCE_API_SECRET` - `logs/audit_YYYYMMDD.jsonl` — structured audit log - `logs/dry-run/audit_YYYYMMDD.jsonl` — dry-run audit log @@ -49,25 +54,29 @@ Run `coinhunter init` to generate the config and env templates. - **Symbol normalization:** `market_service.normalize_symbol()` strips `/`, `-`, `_`, and uppercases the symbol. CLI inputs like `ETH/USDT`, `eth-usdt`, and `ETHUSDT` are all normalized to `ETHUSDT`. - **Dry-run behavior:** Trade commands support `--dry-run`. If omitted, the default falls back to `trading.dry_run_default` in `config.toml`. -- **Client injection:** Service functions accept `spot_client` as a keyword argument. This enables easy unit testing with mocks. +- **Client injection:** Service functions accept `spot_client` as a keyword argument for easy unit testing with mocks. - **Error handling:** `spot_client.py` catches `requests.exceptions.SSLError` and `RequestException` and re-raises as human-readable `RuntimeError`. The CLI catches all exceptions in `main()` and prints `error: {message}` to stderr with exit code 1. - **Ticker API fallback:** `spot_client.ticker_stats()` uses `rolling_window_ticker` for symbol-specific queries and `ticker_24hr` for full-market scans (no symbols). - **Output modes:** All commands support `--agent` for JSON output and `--doc` to print the command's output schema. +- **Watch rules:** `portfolio_service.watch_portfolio()` monitors held positions for anomalies (1h/24h drawdowns, spikes, concentration risk, technical score deterioration). This is pure rule-based and costs zero tokens. +- **Analyze design:** `analyze_service.analyze_symbols()` fetches multi-timeframe klines (1h, 4h, 1d) and produces an AI-friendly output with `summary`, `timeframes`, `key_levels`, `alerts`, and `signal_score`. It is designed for LLM consumption. + +## CLI command reference + +| Command | Purpose | Token cost | +|---------|---------|-----------| +| `coin scan` | Rule-based market scan, returns top-N candidates | 0 | +| `coin analyze ` | Multi-timeframe deep technical analysis | 0 | +| `coin watch` | Portfolio anomaly monitoring | 0 | +| `coin portfolio` | Full portfolio scoring | 0 | +| `coin account` | Balances | 0 | +| `coin buy/sell` | Trade execution | 0 | ## Testing -Tests live in `tests/` and use `unittest.TestCase` with `unittest.mock.patch`. The test suite covers: - -- `test_cli.py` — parser smoke tests and dispatch behavior -- `test_config_runtime.py` — config loading, env parsing, path resolution -- `test_account_market_services.py` — balance/position/ticker/klines logic with mocked clients -- `test_trade_service.py` — spot trade execution paths -- `test_opportunity_service.py` — portfolio and scan scoring logic -- `test_opportunity_dataset_service.py` — dataset collection and walk-forward evaluation -- `test_opportunity_evaluation_service.py` — model weight optimization -- `test_strategy_service.py` — combined signal generation logic -- `test_backtest_service.py` — historical backtest engine +Tests live in `tests/` and use `unittest.TestCase` with `unittest.mock.patch`. The test suite covers CLI parser smoke tests, config loading, service logic with mocked clients, and trade execution paths. ## Notes - `AGENTS.md` in this repo is stale and describes a prior V1 architecture (commands/, smart executor, precheck, review engine). Do not rely on it. +- Removed in the V2 simplification: `backtest`, `strategy`, `opportunity dataset/evaluate/optimize`, `research_service` (CoinGecko). These were over-engineered for the AI-assisted trading flow and have been archived out of the core codebase. diff --git a/src/coinhunter/cli.py b/src/coinhunter/cli.py index cce28e0..d66a691 100644 --- a/src/coinhunter/cli.py +++ b/src/coinhunter/cli.py @@ -1,4 +1,4 @@ -"""CoinHunter V2 CLI.""" +"""CoinHunter V2 CLI — lightweight data layer for AI-assisted crypto trading.""" from __future__ import annotations @@ -26,13 +26,10 @@ from .runtime import ( ) from .services import ( account_service, - backtest_service, + analyze_service, market_service, - opportunity_dataset_service, - opportunity_evaluation_service, opportunity_service, portfolio_service, - strategy_service, trade_service, ) @@ -44,11 +41,11 @@ examples: coin m k BTCUSDT -i 1h -l 50 coin buy BTCUSDT -Q 100 -d coin sell BTCUSDT --qty 0.01 --type limit --price 90000 - coin opportunity -s BTCUSDT ETHUSDT - coin opportunity evaluate ~/.coinhunter/datasets/opportunity_dataset.json --agent - coin opportunity optimize ~/.coinhunter/datasets/opportunity_dataset.json --agent - coin strategy -s BTCUSDT ETHUSDT - coin backtest ~/.coinhunter/datasets/opportunity_dataset_20260101T000000Z.json + coin scan + coin scan -s BTCUSDT ETHUSDT + coin analyze BTCUSDT ETHUSDT + coin portfolio + coin watch coin upgrade """ @@ -406,17 +403,15 @@ Fields: position_weight – position weight in portfolio (float, 0-1) """, }, - "opportunity": { + "scan": { "tui": """\ TUI Output: RECOMMENDATIONS count=5 1. ETHUSDT action=entry confidence=74 score=1.7200 · fresh breakout trigger with clean setup and manageable extension - · base asset ETH passed liquidity and tradability filters setup_score=0.74 trigger_score=0.61 liquidity_score=1.0 extension_penalty=0.0 opportunity_score=1.72 position_weight=0.0 2. BTCUSDT action=watch confidence=52 score=0.7800 · setup is constructive but the trigger is not clean enough yet - · base asset BTC passed liquidity and tradability filters · symbol is already held, so the opportunity score is discounted for overlap setup_score=0.68 trigger_score=0.25 liquidity_score=1.0 extension_penalty=0.1 opportunity_score=0.96 position_weight=0.3 @@ -424,7 +419,7 @@ JSON Output: { "recommendations": [ {"symbol": "ETHUSDT", "action": "entry", "confidence": 74, "score": 1.72, - "reasons": ["fresh breakout trigger with clean setup and manageable extension", "base asset ETH passed liquidity and tradability filters"], + "reasons": ["fresh breakout trigger with clean setup and manageable extension"], "metrics": {"setup_score": 0.74, "trigger_score": 0.61, "liquidity_score": 1.0, "extension_penalty": 0.0, "opportunity_score": 1.72, "position_weight": 0.0}} ] } @@ -433,7 +428,7 @@ Fields: action – enum: "entry" | "watch" | "avoid" confidence – 0..100 confidence index derived from edge_score score – opportunity score after extension and overlap/risk discounts - reasons – list of human-readable explanations (includes liquidity filter note for scan) + reasons – list of human-readable explanations metrics – scoring breakdown setup_score – compression, higher-lows, and breakout-proximity quality trigger_score – fresh-breakout, volume, and controlled-momentum quality @@ -447,7 +442,7 @@ JSON Output: { "recommendations": [ {"symbol": "ETHUSDT", "action": "entry", "confidence": 74, "score": 1.72, - "reasons": ["fresh breakout trigger with clean setup and manageable extension", "base asset ETH passed liquidity and tradability filters"], + "reasons": ["fresh breakout trigger with clean setup and manageable extension"], "metrics": {"setup_score": 0.74, "trigger_score": 0.61, "liquidity_score": 1.0, "extension_penalty": 0.0, "opportunity_score": 1.72, "position_weight": 0.0}} ] } @@ -456,7 +451,7 @@ Fields: action – enum: "entry" | "watch" | "avoid" confidence – 0..100 confidence index derived from edge_score score – opportunity score after extension and overlap/risk discounts - reasons – list of human-readable explanations (includes liquidity filter note for scan) + reasons – list of human-readable explanations metrics – scoring breakdown setup_score – compression, higher-lows, and breakout-proximity quality trigger_score – fresh-breakout, volume, and controlled-momentum quality @@ -466,125 +461,119 @@ Fields: position_weight – current portfolio overlap in the same symbol """, }, - "opportunity/dataset": { + "analyze": { "tui": """\ TUI Output: - DATASET COLLECTED - Path: ~/.coinhunter/datasets/opportunity_dataset_20260421T120000Z.json - Symbols: BTCUSDT, ETHUSDT - Window: reference=48.0d simulate=7.0d run=7.0d + ANALYSIS count=2 + BTCUSDT at 70,000.00 (+2.50% 24h). 1h trend: uptrend, 4h: uptrend, 1d: sideways. 1h RSI 65.0. No significant alerts. + 1h:uptrend RSI=65.0 | 4h:uptrend RSI=58.0 | 1d:sideways RSI=52.0 + S=[68000.0, 65000.0] R=[71000.0, 73000.0] JSON Output: { - "path": "~/.coinhunter/datasets/opportunity_dataset_20260421T120000Z.json", - "symbols": ["BTCUSDT", "ETHUSDT"], - "counts": {"BTCUSDT": {"1h": 1488}}, - "plan": {"reference_days": 48.0, "simulate_days": 7.0, "run_days": 7.0, "total_days": 62.0}, - "external_history": {"provider": "coingecko", "status": "available"} + "analyses": [ + { + "symbol": "BTCUSDT", + "summary": "BTCUSDT at 70000.00 (+2.50% 24h)...", + "price": {"current": 70000.0, "change_24h_pct": 2.5, "high_24h": 71000.0, "low_24h": 68000.0, "volume_24h": 123456789.0}, + "timeframes": {"1h": {"trend": "uptrend", "sma20": 69000.0, "rsi": 65.0, "volatility_pct": 1.2, "volume_ratio": 1.3}, ...}, + "key_levels": {"support": [68000.0, 65000.0], "resistance": [71000.0, 73000.0], "recent_high": 71000.0, "recent_low": 68000.0}, + "alerts": [], + "signal_score": 0.75, + "signal_metrics": {"trend": 1.0, "momentum": 0.02, ...} + } + ] } Fields: - path – JSON dataset file written locally - symbols – symbols included in the dataset - counts – kline row counts by symbol and interval - plan – reference/simulation/run windows used for collection - external_history – external provider historical capability probe result + symbol – trading pair + summary – human-readable one-line technical summary + price – current price, 24h change, high/low, volume + timeframes – 1h/4h/1d trend, sma20, rsi, volatility, volume_ratio + key_levels – support, resistance, recent_high, recent_low + alerts – list of technical alerts (e.g. RSI overbought, near support) + signal_score – portfolio-style signal score + signal_metrics – raw scoring breakdown """, "json": """\ JSON Output: { - "path": "~/.coinhunter/datasets/opportunity_dataset_20260421T120000Z.json", - "symbols": ["BTCUSDT", "ETHUSDT"], - "counts": {"BTCUSDT": {"1h": 1488}}, - "plan": {"reference_days": 48.0, "simulate_days": 7.0, "run_days": 7.0, "total_days": 62.0}, - "external_history": {"provider": "coingecko", "status": "available"} + "analyses": [ + { + "symbol": "BTCUSDT", + "summary": "BTCUSDT at 70000.00 (+2.50% 24h)...", + "price": {"current": 70000.0, "change_24h_pct": 2.5, "high_24h": 71000.0, "low_24h": 68000.0, "volume_24h": 123456789.0}, + "timeframes": {"1h": {"trend": "uptrend", "sma20": 69000.0, "rsi": 65.0, "volatility_pct": 1.2, "volume_ratio": 1.3}, ...}, + "key_levels": {"support": [68000.0, 65000.0], "resistance": [71000.0, 73000.0], "recent_high": 71000.0, "recent_low": 68000.0}, + "alerts": [], + "signal_score": 0.75, + "signal_metrics": {"trend": 1.0, "momentum": 0.02, ...} + } + ] } Fields: - path – JSON dataset file written locally - symbols – symbols included in the dataset - counts – kline row counts by symbol and interval - plan – reference/simulation/run windows used for collection - external_history – external provider historical capability probe result + symbol – trading pair + summary – human-readable one-line technical summary + price – current price, 24h change, high/low, volume + timeframes – 1h/4h/1d trend, sma20, rsi, volatility, volume_ratio + key_levels – support, resistance, recent_high, recent_low + alerts – list of technical alerts (e.g. RSI overbought, near support) + signal_score – portfolio-style signal score + signal_metrics – raw scoring breakdown """, }, - "opportunity/evaluate": { + "watch": { "tui": """\ TUI Output: - SUMMARY - count=120 correct=76 incorrect=44 accuracy=0.6333 - interval=1h top_n=10 decision_times=24 + PORTFOLIO WATCH 2 position(s) need review, 1 healthy - BY ACTION - trigger count=12 correct=7 accuracy=0.5833 avg_trade_return=0.0062 - setup count=78 correct=52 accuracy=0.6667 - skip count=30 correct=17 accuracy=0.5667 + ⚠ NEED_REVIEW ETHUSDT + · 1h drop -8.00% (alert threshold -5.0%) + · 24h drop -12.00% (alert threshold -10.0%) + · position weight 60.0% exceeds max 50.0% + · technical score -0.30 below exit threshold -0.20 + + ✓ HEALTHY BTCUSDT weight=30.0% JSON Output: { - "summary": {"count": 120, "correct": 76, "incorrect": 44, "accuracy": 0.6333}, - "by_action": {"trigger": {"count": 12, "correct": 7, "accuracy": 0.5833}}, - "trade_simulation": {"trigger_trades": 12, "wins": 7, "losses": 5, "win_rate": 0.5833}, - "rules": {"horizon_hours": 24.0, "take_profit": 0.02, "stop_loss": 0.015, "setup_target": 0.01} + "watch_results": [ + {"symbol": "ETHUSDT", "status": "need_review", "reasons": ["1h drop -8.00%..."], "metrics": {...}}, + {"symbol": "BTCUSDT", "status": "healthy", "reasons": [], "metrics": {"position_weight": 0.3, ...}} + ], + "summary": "2 position(s) need review, 1 healthy", + "need_review_count": 2, + "healthy_count": 1 } Fields: - summary – aggregate walk-forward judgment accuracy - by_action – accuracy and average returns grouped by trigger/setup/chase/skip - trade_simulation – trigger-only trade outcome using take-profit/stop-loss rules - rules – objective evaluation assumptions used for the run - examples – first evaluated judgments with outcome labels + watch_results – per-position watch status + symbol – trading pair + status – "need_review" | "healthy" + reasons – list of triggered alert reasons + metrics – position_weight, signal_score, price_change_24h_pct, volatility, trend + summary – human-readable summary string + need_review_count – number of positions flagged for review + healthy_count – number of positions with no alerts """, "json": """\ JSON Output: { - "summary": {"count": 120, "correct": 76, "incorrect": 44, "accuracy": 0.6333}, - "by_action": {"trigger": {"count": 12, "correct": 7, "accuracy": 0.5833}}, - "trade_simulation": {"trigger_trades": 12, "wins": 7, "losses": 5, "win_rate": 0.5833}, - "rules": {"horizon_hours": 24.0, "take_profit": 0.02, "stop_loss": 0.015, "setup_target": 0.01} + "watch_results": [ + {"symbol": "ETHUSDT", "status": "need_review", "reasons": ["1h drop -8.00%..."], "metrics": {...}}, + {"symbol": "BTCUSDT", "status": "healthy", "reasons": [], "metrics": {"position_weight": 0.3, ...}} + ], + "summary": "2 position(s) need review, 1 healthy", + "need_review_count": 2, + "healthy_count": 1 } Fields: - summary – aggregate walk-forward judgment accuracy - by_action – accuracy and average returns grouped by trigger/setup/chase/skip - trade_simulation – trigger-only trade outcome using take-profit/stop-loss rules - rules – objective evaluation assumptions used for the run - examples – first evaluated judgments with outcome labels -""", - }, - "opportunity/optimize": { - "tui": """\ -TUI Output: - BASELINE - objective=0.5012 accuracy=0.5970 trigger_win_rate=0.4312 - - BEST - objective=0.5341 accuracy=0.6214 trigger_win_rate=0.4862 - -JSON Output: - { - "baseline": {"objective": 0.5012, "summary": {"accuracy": 0.597}}, - "best": {"objective": 0.5341, "summary": {"accuracy": 0.6214}}, - "improvement": {"accuracy": 0.0244, "trigger_win_rate": 0.055}, - "recommended_config": {"opportunity.model_weights.trigger": 1.5} - } -Fields: - baseline – evaluation snapshot with current model weights - best – best walk-forward snapshot found by coordinate search - improvement – deltas from baseline to best - recommended_config – config keys that can be written with `coin config set` - search – optimizer metadata; thresholds are fixed -""", - "json": """\ -JSON Output: - { - "baseline": {"objective": 0.5012, "summary": {"accuracy": 0.597}}, - "best": {"objective": 0.5341, "summary": {"accuracy": 0.6214}}, - "improvement": {"accuracy": 0.0244, "trigger_win_rate": 0.055}, - "recommended_config": {"opportunity.model_weights.trigger": 1.5} - } -Fields: - baseline – evaluation snapshot with current model weights - best – best walk-forward snapshot found by coordinate search - improvement – deltas from baseline to best - recommended_config – config keys that can be written with `coin config set` - search – optimizer metadata; thresholds are fixed + watch_results – per-position watch status + symbol – trading pair + status – "need_review" | "healthy" + reasons – list of triggered alert reasons + metrics – position_weight, signal_score, price_change_24h_pct, volatility, trend + summary – human-readable summary string + need_review_count – number of positions flagged for review + healthy_count – number of positions with no alerts """, }, "upgrade": { @@ -684,17 +673,17 @@ Fields: TUI Output: CONFIG binance.recv_window = 5000 - opportunity.top_n = 10 + opportunity.top_n = 5 JSON Output: - {"binance.recv_window": 5000, "opportunity.top_n": 10} + {"binance.recv_window": 5000, "opportunity.top_n": 5} Fields: key – dot-notation config key (e.g. "binance.recv_window") value – current value (type depends on key: bool, int, float, list, str) """, "json": """\ JSON Output: - {"binance.recv_window": 5000, "opportunity.top_n": 10} + {"binance.recv_window": 5000, "opportunity.top_n": 5} Fields: key – dot-notation config key (e.g. "binance.recv_window") value – current value (type depends on key: bool, int, float, list, str) @@ -922,74 +911,28 @@ def build_parser() -> argparse.ArgumentParser: ) _add_global_flags(portfolio_parser) - opportunity_parser = subparsers.add_parser( - "opportunity", aliases=["opp", "o"], help="Scan market for opportunities", - description="Scan the market for trading opportunities and return the top-N candidates with signals.", - ) - opportunity_parser.add_argument("-s", "--symbols", nargs="*", metavar="SYM", help="Restrict scan to specific symbols") - _add_global_flags(opportunity_parser) - opportunity_subparsers = opportunity_parser.add_subparsers(dest="opportunity_command") - scan_parser = opportunity_subparsers.add_parser( - "scan", help="Scan market for opportunities", - description="Scan the market for trading opportunities and return the top-N candidates with signals.", + scan_parser = subparsers.add_parser( + "scan", aliases=["sc"], help="Scan market for top-N opportunities", + description="Rule-based market scan that returns the top-N candidates. Zero token cost. " + "Use `analyze` for deep-dive on selected symbols.", ) scan_parser.add_argument("-s", "--symbols", nargs="*", metavar="SYM", help="Restrict scan to specific symbols") _add_global_flags(scan_parser) - dataset_parser = opportunity_subparsers.add_parser( - "dataset", aliases=["ds"], help="Collect historical opportunity evaluation dataset", - description="Collect point-in-time market data for opportunity simulation and evaluation.", - ) - dataset_parser.add_argument("-s", "--symbols", nargs="*", metavar="SYM", help="Restrict dataset to symbols") - dataset_parser.add_argument("--simulate-days", type=float, help="Forward simulation/evaluation window in days") - dataset_parser.add_argument("--run-days", type=float, help="Continuous scan simulation window in days") - dataset_parser.add_argument("-o", "--output", help="Output dataset JSON path") - _add_global_flags(dataset_parser) - evaluate_parser = opportunity_subparsers.add_parser( - "evaluate", aliases=["eval", "ev"], help="Evaluate opportunity accuracy from a historical dataset", - description="Run a walk-forward evaluation over an opportunity dataset using point-in-time candles only.", - ) - evaluate_parser.add_argument("dataset", help="Path to an opportunity dataset JSON file") - evaluate_parser.add_argument("--horizon-hours", type=float, help="Forward evaluation horizon in hours") - evaluate_parser.add_argument("--take-profit-pct", type=float, help="Trigger success take-profit threshold in percent") - evaluate_parser.add_argument("--stop-loss-pct", type=float, help="Stop-loss threshold in percent") - evaluate_parser.add_argument("--setup-target-pct", type=float, help="Setup success target threshold in percent") - evaluate_parser.add_argument("--lookback", type=int, help="Closed candles used for each point-in-time score") - evaluate_parser.add_argument("--top-n", type=int, help="Evaluate only the top-N ranked symbols at each decision time") - evaluate_parser.add_argument("--examples", type=int, default=20, help="Number of example judgments to include") - _add_global_flags(evaluate_parser) - optimize_parser = opportunity_subparsers.add_parser( - "optimize", aliases=["opt"], help="Optimize opportunity model weights from a historical dataset", - description="Coordinate-search normalized model weights while keeping decision thresholds fixed.", - ) - optimize_parser.add_argument("dataset", help="Path to an opportunity dataset JSON file") - optimize_parser.add_argument("--horizon-hours", type=float, help="Forward evaluation horizon in hours") - optimize_parser.add_argument("--take-profit-pct", type=float, help="Trigger success take-profit threshold in percent") - optimize_parser.add_argument("--stop-loss-pct", type=float, help="Stop-loss threshold in percent") - optimize_parser.add_argument("--setup-target-pct", type=float, help="Setup success target threshold in percent") - optimize_parser.add_argument("--lookback", type=int, help="Closed candles used for each point-in-time score") - optimize_parser.add_argument("--top-n", type=int, help="Evaluate only the top-N ranked symbols at each decision time") - optimize_parser.add_argument("--passes", type=int, default=2, help="Coordinate-search passes over model weights") - _add_global_flags(optimize_parser) - strategy_parser = subparsers.add_parser( - "strategy", aliases=["strat", "st"], help="Combined opportunity + portfolio trade signals", - description="Generate unified buy/sell/hold signals by combining opportunity scanning and portfolio analysis.", + analyze_parser = subparsers.add_parser( + "analyze", aliases=["an"], help="Detailed technical analysis for one or more symbols", + description="Multi-timeframe technical analysis (1h/4h/1d) with key levels, alerts, and signal scores. " + "Designed for AI consumption — use with --agent for structured JSON output.", ) - strategy_parser.add_argument("-s", "--symbols", nargs="*", metavar="SYM", help="Restrict scan to specific symbols") - _add_global_flags(strategy_parser) + analyze_parser.add_argument("symbols", nargs="+", metavar="SYM", help="Symbols to analyze (e.g. BTCUSDT ETHUSDT)") + _add_global_flags(analyze_parser) - backtest_parser = subparsers.add_parser( - "backtest", aliases=["bt"], help="Backtest combined strategy on historical dataset", - description="Run a walk-forward backtest using historical kline datasets with virtual cash and positions.", + watch_parser = subparsers.add_parser( + "watch", aliases=["w"], help="Lightweight portfolio monitoring", + description="Monitor held positions for anomalies (drawdowns, spikes, concentration risk, technical deterioration). " + "Returns NEED_REVIEW or HEALTHY for each position. Zero token cost.", ) - backtest_parser.add_argument("dataset", help="Path to an opportunity dataset JSON file") - backtest_parser.add_argument("--initial-cash", type=float, help="Initial cash allocation (default: 10000)") - backtest_parser.add_argument("--max-positions", type=int, help="Maximum simultaneous positions (default: 5)") - backtest_parser.add_argument("--position-size-pct", type=float, help="Cash percentage per position (default: 0.2)") - backtest_parser.add_argument("--commission-pct", type=float, help="Commission per trade in percent (default: 0.1)") - backtest_parser.add_argument("--lookback", type=int, help="Closed candles used for each point-in-time score") - backtest_parser.add_argument("--decision-interval", type=int, help="Minimum minutes between decision points (default: 0 = every candle)") - _add_global_flags(backtest_parser) + _add_global_flags(watch_parser) upgrade_parser = subparsers.add_parser( "upgrade", help="Upgrade coinhunter to the latest version", @@ -1026,26 +969,20 @@ _CANONICAL_COMMANDS = { "m": "market", "pf": "portfolio", "p": "portfolio", - "opp": "opportunity", - "o": "opportunity", "cfg": "config", "c": "config", - "strat": "strategy", - "st": "strategy", - "bt": "backtest", + "sc": "scan", + "an": "analyze", + "w": "watch", } _CANONICAL_SUBCOMMANDS = { "tk": "tickers", "t": "tickers", "k": "klines", - "ds": "dataset", - "eval": "evaluate", - "ev": "evaluate", - "opt": "optimize", } -_COMMANDS_WITH_SUBCOMMANDS = {"market", "config", "opportunity"} +_COMMANDS_WITH_SUBCOMMANDS = {"market", "config"} def _get_doc_key(argv: list[str]) -> str | None: @@ -1111,7 +1048,7 @@ def main(argv: list[str] | None = None) -> int: # Normalize aliases to canonical command names if args.command: args.command = _CANONICAL_COMMANDS.get(args.command, args.command) - for attr in ("account_command", "market_command", "config_command", "opportunity_command"): + for attr in ("market_command", "config_command"): val = getattr(args, attr, None) if val: setattr(args, attr, _CANONICAL_SUBCOMMANDS.get(val, val)) @@ -1290,72 +1227,7 @@ def main(argv: list[str] | None = None) -> int: print_output(result, agent=args.agent) return 0 - if args.command == "strategy": - spot_client = _load_spot_client(config) - with with_spinner("Generating trade signals...", enabled=not args.agent): - result = strategy_service.generate_trade_signals( - config, spot_client=spot_client, symbols=args.symbols - ) - print_output(result, agent=args.agent) - return 0 - - if args.command == "backtest": - with with_spinner("Running backtest...", enabled=not args.agent): - result = backtest_service.run_backtest( - config, - dataset_path=args.dataset, - initial_cash=args.initial_cash, - max_positions=args.max_positions, - position_size_pct=args.position_size_pct / 100.0 if args.position_size_pct is not None else None, - commission_pct=args.commission_pct / 100.0 if args.commission_pct is not None else None, - lookback=args.lookback, - decision_interval_minutes=args.decision_interval, - ) - print_output(result, agent=args.agent) - return 0 - - if args.command == "opportunity": - if args.opportunity_command == "optimize": - with with_spinner("Optimizing opportunity model...", enabled=not args.agent): - result = opportunity_evaluation_service.optimize_opportunity_model( - config, - dataset_path=args.dataset, - horizon_hours=args.horizon_hours, - take_profit=args.take_profit_pct / 100.0 if args.take_profit_pct is not None else None, - stop_loss=args.stop_loss_pct / 100.0 if args.stop_loss_pct is not None else None, - setup_target=args.setup_target_pct / 100.0 if args.setup_target_pct is not None else None, - lookback=args.lookback, - top_n=args.top_n, - passes=args.passes, - ) - print_output(result, agent=args.agent) - return 0 - if args.opportunity_command == "evaluate": - with with_spinner("Evaluating opportunity dataset...", enabled=not args.agent): - result = opportunity_evaluation_service.evaluate_opportunity_dataset( - config, - dataset_path=args.dataset, - horizon_hours=args.horizon_hours, - take_profit=args.take_profit_pct / 100.0 if args.take_profit_pct is not None else None, - stop_loss=args.stop_loss_pct / 100.0 if args.stop_loss_pct is not None else None, - setup_target=args.setup_target_pct / 100.0 if args.setup_target_pct is not None else None, - lookback=args.lookback, - top_n=args.top_n, - max_examples=args.examples, - ) - print_output(result, agent=args.agent) - return 0 - if args.opportunity_command == "dataset": - with with_spinner("Collecting opportunity dataset...", enabled=not args.agent): - result = opportunity_dataset_service.collect_opportunity_dataset( - config, - symbols=args.symbols, - simulate_days=args.simulate_days, - run_days=args.run_days, - output_path=args.output, - ) - print_output(result, agent=args.agent) - return 0 + if args.command == "scan": spot_client = _load_spot_client(config) with with_spinner("Scanning opportunities...", enabled=not args.agent): result = opportunity_service.scan_opportunities( @@ -1364,6 +1236,22 @@ def main(argv: list[str] | None = None) -> int: print_output(result, agent=args.agent) return 0 + if args.command == "analyze": + spot_client = _load_spot_client(config) + with with_spinner("Analyzing symbols...", enabled=not args.agent): + result = analyze_service.analyze_symbols( + config, spot_client=spot_client, symbols=args.symbols + ) + print_output(result, agent=args.agent) + return 0 + + if args.command == "watch": + spot_client = _load_spot_client(config) + with with_spinner("Watching portfolio...", enabled=not args.agent): + result = portfolio_service.watch_portfolio(config, spot_client=spot_client) + print_output(result, agent=args.agent) + return 0 + parser.error(f"Unsupported command {args.command}") return 2 except Exception as exc: diff --git a/src/coinhunter/config.py b/src/coinhunter/config.py index 1106c20..cc07099 100644 --- a/src/coinhunter/config.py +++ b/src/coinhunter/config.py @@ -40,7 +40,7 @@ dust_usdt_threshold = 10.0 [opportunity] min_quote_volume = 1000000.0 -top_n = 10 +top_n = 5 scan_limit = 50 ignore_dust = true entry_threshold = 1.5 @@ -49,45 +49,6 @@ min_trigger_score = 0.45 min_setup_score = 0.35 overlap_penalty = 0.6 lookback_intervals = ["1h", "4h", "1d"] -auto_research = true -research_provider = "coingecko" -research_timeout_seconds = 4.0 -simulate_days = 7 -run_days = 7 -dataset_timeout_seconds = 15.0 -evaluation_horizon_hours = 24.0 -evaluation_take_profit_pct = 2.0 -evaluation_stop_loss_pct = 1.5 -evaluation_setup_target_pct = 1.0 -evaluation_lookback = 24 - -[opportunity.risk_limits] -min_liquidity = 0.0 -max_overextension = 0.08 -max_downside_risk = 0.3 -max_unlock_risk = 0.75 -max_regulatory_risk = 0.75 -min_quality_for_add = 0.0 - -[opportunity.weights] -trend = 1.0 -momentum = 1.0 -breakout = 0.8 -pullback = 0.4 -volume = 0.7 -liquidity = 0.3 -trend_alignment = 0.8 -fundamental = 0.8 -tokenomics = 0.7 -catalyst = 0.5 -adoption = 0.4 -smart_money = 0.3 -volatility_penalty = 0.5 -overextension_penalty = 0.7 -downside_penalty = 0.5 -unlock_penalty = 0.8 -regulatory_penalty = 0.4 -position_concentration_penalty = 0.6 [opportunity.model_weights] trend = 0.1406 @@ -118,6 +79,12 @@ hold_threshold = 0.6 trim_threshold = 0.2 exit_threshold = -0.2 max_position_weight = 0.6 + +[watch] +alert_drawdown_1h_pct = -5.0 +alert_drawdown_24h_pct = -10.0 +alert_spike_1h_pct = 8.0 +max_position_weight = 0.5 """ DEFAULT_ENV = "BINANCE_API_KEY=\nBINANCE_API_SECRET=\n" diff --git a/src/coinhunter/runtime.py b/src/coinhunter/runtime.py index b9c3656..f2045cd 100644 --- a/src/coinhunter/runtime.py +++ b/src/coinhunter/runtime.py @@ -353,6 +353,59 @@ def _render_tui(payload: Any) -> None: print(f" {_DIM}{metric_str}{_RESET}") return + if "analyses" in payload: + rows = payload["analyses"] + print(f"\n{_BOLD}{_CYAN} ANALYSIS {_RESET} count={len(rows)}") + for r in rows: + symbol = r.get("symbol", "") + price = r.get("price", {}) + current = price.get("current", 0) + change = price.get("change_24h_pct", 0) + change_color = _GREEN if change >= 0 else _RED + print(f"\n {_BOLD}{symbol}{_RESET} {current:,.2f} {_color(f'{change:+.2f}%', change_color)}") + print(f" {r.get('summary', '')}") + alerts = r.get("alerts", []) + if alerts: + for alert in alerts: + print(f" {_YELLOW}! {_RESET}{alert}") + timeframes = r.get("timeframes", {}) + if timeframes: + tf_parts = [] + for tf_name, tf_data in timeframes.items(): + trend = tf_data.get("trend", "?") + rsi = tf_data.get("rsi") + rsi_str = f" RSI={rsi:.1f}" if rsi is not None else "" + tf_parts.append(f"{tf_name}:{trend}{rsi_str}") + print(f" {_DIM}{' | '.join(tf_parts)}{_RESET}") + levels = r.get("key_levels", {}) + if levels: + sup = levels.get("support", []) + res = levels.get("resistance", []) + if sup or res: + print(f" S={sup} R={res}") + return + + if "watch_results" in payload: + rows = payload["watch_results"] + summary = payload.get("summary", "") + print(f"\n{_BOLD}{_CYAN} PORTFOLIO WATCH {_RESET} {summary}") + for r in rows: + status = r.get("status", "") + symbol = r.get("symbol", "") + if status == "need_review": + print(f"\n {_YELLOW}⚠ NEED_REVIEW{_RESET} {_BOLD}{symbol}{_RESET}") + for reason in r.get("reasons", []): + print(f" · {reason}") + metrics = r.get("metrics", {}) + if metrics: + metric_str = " ".join(f"{k}={v}" for k, v in metrics.items()) + print(f" {_DIM}{metric_str}{_RESET}") + else: + metrics = r.get("metrics", {}) + weight = metrics.get("position_weight", 0) + print(f" {_GREEN}✓ HEALTHY{_RESET} {symbol} weight={weight:.2%}") + return + if "command" in payload and "returncode" in payload: rc = payload.get("returncode", 0) stdout = payload.get("stdout", "") diff --git a/src/coinhunter/services/analyze_service.py b/src/coinhunter/services/analyze_service.py new file mode 100644 index 0000000..48456fd --- /dev/null +++ b/src/coinhunter/services/analyze_service.py @@ -0,0 +1,201 @@ +"""Detailed symbol analysis for AI consumption.""" + +from __future__ import annotations + +from statistics import mean +from typing import Any + +from .market_service import normalize_symbol +from .signal_service import score_portfolio_signal + + +def _clamp(value: float, low: float, high: float) -> float: + return max(low, min(value, high)) + + +def _safe_pct(new: float, old: float) -> float: + if old == 0: + return 0.0 + return (new - old) / old + + +def _rsi(closes: list[float], period: int = 14) -> float | None: + if len(closes) < period + 1: + return None + gains = [] + losses = [] + for i in range(1, period + 1): + delta = closes[-i] - closes[-i - 1] + if delta > 0: + gains.append(delta) + losses.append(0.0) + else: + gains.append(0.0) + losses.append(abs(delta)) + avg_gain = mean(gains) if gains else 0.0 + avg_loss = mean(losses) if losses else 0.0 + if avg_loss == 0: + return 100.0 + rs = avg_gain / avg_loss + return 100.0 - (100.0 / (1.0 + rs)) + + +def _analyze_timeframe(klines: list[list[Any]]) -> dict[str, Any]: + if not klines: + return {"trend": "unknown", "sma20": None, "rsi": None, "volatility_pct": None} + closes = [float(item[4]) for item in klines] + volumes = [float(item[5]) for item in klines] + current = closes[-1] + sma20 = mean(closes[-20:]) if len(closes) >= 20 else mean(closes) + trend = ( + "uptrend" + if current >= sma20 * 1.02 + else "downtrend" + if current <= sma20 * 0.98 + else "sideways" + ) + rsi_val = _rsi(closes) + if len(closes) >= 10 and current: + volatility = (max(closes[-10:]) - min(closes[-10:])) / current * 100 + else: + volatility = None + avg_volume = mean(volumes[:-1]) if len(volumes) > 1 else volumes[-1] + volume_ratio = volumes[-1] / avg_volume if avg_volume else 1.0 + return { + "trend": trend, + "sma20": round(sma20, 4) if sma20 else None, + "rsi": round(rsi_val, 2) if rsi_val is not None else None, + "volatility_pct": round(volatility, 4) if volatility is not None else None, + "volume_ratio": round(volume_ratio, 4), + } + + +def _key_levels(klines: list[list[Any]]) -> dict[str, Any]: + if not klines: + return {"support": [], "resistance": [], "recent_high": None, "recent_low": None} + closes = [float(item[4]) for item in klines] + highs = [float(item[2]) for item in klines] + lows = [float(item[3]) for item in klines] + recent_high = max(highs[-20:]) if len(highs) >= 20 else max(highs) + recent_low = min(lows[-20:]) if len(lows) >= 20 else min(lows) + # Simple support/resistance: recent local extremes + support = sorted(set([round(min(lows[-10:]), 2), round(recent_low, 2)])) + resistance = sorted(set([round(max(highs[-10:]), 2), round(recent_high, 2)])) + return { + "support": support, + "resistance": resistance, + "recent_high": round(recent_high, 2), + "recent_low": round(recent_low, 2), + } + + +def _generate_alerts( + ticker: dict[str, Any], + tf_1h: dict[str, Any], + tf_4h: dict[str, Any], + tf_1d: dict[str, Any], + levels: dict[str, Any], + current_price: float, +) -> list[str]: + alerts: list[str] = [] + change_24h = float(ticker.get("price_change_pct", 0.0)) + if abs(change_24h) >= 10: + alerts.append(f"24h price change is extreme ({change_24h:+.2f}%)") + elif abs(change_24h) >= 5: + alerts.append(f"24h price change is significant ({change_24h:+.2f}%)") + + rsi_1h = tf_1h.get("rsi") + if rsi_1h is not None: + if rsi_1h >= 70: + alerts.append(f"1h RSI is overbought ({rsi_1h:.1f})") + elif rsi_1h <= 30: + alerts.append(f"1h RSI is oversold ({rsi_1h:.1f})") + + for level in levels.get("resistance", []): + if level > 0 and abs(current_price - level) / level < 0.02: + alerts.append(f"price is near resistance ({level:,.2f})") + for level in levels.get("support", []): + if level > 0 and abs(current_price - level) / level < 0.02: + alerts.append(f"price is near support ({level:,.2f})") + + if tf_1h.get("trend") != tf_4h.get("trend"): + alerts.append(f"timeframe divergence: 1h={tf_1h['trend']} vs 4h={tf_4h['trend']}") + + vol_ratio = tf_1h.get("volume_ratio", 1.0) + if vol_ratio >= 2.0: + alerts.append(f"volume spike detected ({vol_ratio:.2f}x average)") + + return alerts + + +def analyze_symbols( + config: dict[str, Any], + *, + spot_client: Any, + symbols: list[str], +) -> dict[str, Any]: + quote = str(config.get("market", {}).get("default_quote", "USDT")).upper() + analyses = [] + for raw_symbol in symbols: + symbol = normalize_symbol(raw_symbol) + # Fetch multi-timeframe klines + klines_1h = spot_client.klines(symbol=symbol, interval="1h", limit=72) + klines_4h = spot_client.klines(symbol=symbol, interval="4h", limit=42) + klines_1d = spot_client.klines(symbol=symbol, interval="1d", limit=30) + tickers = spot_client.ticker_stats([symbol], window="1d") + ticker = tickers[0] if tickers else {"priceChangePercent": "0", "lastPrice": "0", "quoteVolume": "0"} + + current_price = float(ticker.get("lastPrice") or ticker.get("last_price") or 0.0) + change_24h = float(ticker.get("priceChangePercent") or ticker.get("price_change_pct") or 0.0) + volume_24h = float(ticker.get("quoteVolume") or ticker.get("quote_volume") or 0.0) + + tf_1h = _analyze_timeframe(klines_1h) + tf_4h = _analyze_timeframe(klines_4h) + tf_1d = _analyze_timeframe(klines_1d) + levels = _key_levels(klines_1h) + alerts = _generate_alerts(ticker, tf_1h, tf_4h, tf_1d, levels, current_price) + + # Portfolio-style signal for context + closes_1h = [float(item[4]) for item in klines_1h] + volumes_1h = [float(item[5]) for item in klines_1h] + signal_score, signal_metrics = score_portfolio_signal( + closes_1h, + volumes_1h, + {"price_change_pct": change_24h}, + {"trend": 1.0, "momentum": 1.0, "breakout": 0.8, "volume": 0.7, "volatility_penalty": 0.5}, + ) + + # Build human-readable summary for AI + summary_parts = [ + f"{symbol} at {current_price:,.2f} ({change_24h:+.2f}% 24h).", + f"1h trend: {tf_1h['trend']}, 4h: {tf_4h['trend']}, 1d: {tf_1d['trend']}.", + ] + if tf_1h["rsi"] is not None: + summary_parts.append(f"1h RSI {tf_1h['rsi']:.1f}.") + if alerts: + summary_parts.append(f"Alerts: {'; '.join(alerts)}.") + else: + summary_parts.append("No significant alerts.") + + analyses.append({ + "symbol": symbol, + "summary": " ".join(summary_parts), + "price": { + "current": round(current_price, 4), + "change_24h_pct": round(change_24h, 4), + "high_24h": float(ticker.get("highPrice") or 0.0), + "low_24h": float(ticker.get("lowPrice") or 0.0), + "volume_24h": round(volume_24h, 4), + }, + "timeframes": { + "1h": tf_1h, + "4h": tf_4h, + "1d": tf_1d, + }, + "key_levels": levels, + "alerts": alerts, + "signal_score": round(signal_score, 4), + "signal_metrics": signal_metrics, + }) + + return {"analyses": analyses} diff --git a/src/coinhunter/services/backtest_service.py b/src/coinhunter/services/backtest_service.py deleted file mode 100644 index 6a73c50..0000000 --- a/src/coinhunter/services/backtest_service.py +++ /dev/null @@ -1,370 +0,0 @@ -"""Backtest engine for combined opportunity + portfolio strategy.""" - -from __future__ import annotations - -import json -from collections import defaultdict -from copy import deepcopy -from dataclasses import asdict, dataclass -from datetime import datetime, timezone -from pathlib import Path -from statistics import mean -from typing import Any - -from .market_service import normalize_symbol -from .signal_service import get_signal_interval, score_opportunity_signal, score_portfolio_signal -from .strategy_service import generate_signals_from_klines - - -@dataclass -class Position: - symbol: str - qty: float - entry_price: float - entry_time: str - notional_usdt: float - - -@dataclass -class Trade: - time: str - symbol: str - side: str - price: float - qty: float - notional: float - commission: float - reason: str - - -def _as_float(value: Any, default: float = 0.0) -> float: - try: - return float(value) - except (TypeError, ValueError): - return default - - -def _as_int(value: Any, default: int = 0) -> int: - try: - return int(value) - except (TypeError, ValueError): - return default - - -def _parse_dt(value: Any) -> datetime | None: - if not value: - return None - try: - return datetime.fromisoformat(str(value).replace("Z", "+00:00")).astimezone(timezone.utc) - except ValueError: - return None - - -def _iso_from_ms(value: int) -> str: - return ( - datetime.fromtimestamp(value / 1000, tz=timezone.utc) - .replace(microsecond=0) - .isoformat() - .replace("+00:00", "Z") - ) - - -def _close(row: list[Any]) -> float: - return _as_float(row[4]) - - -def _open_ms(row: list[Any]) -> int: - return int(row[0]) - - -def _ticker_from_window(symbol: str, rows: list[list[Any]]) -> dict[str, Any]: - first = _close(rows[0]) - last = _close(rows[-1]) - price_change_pct = ((last - first) / first * 100.0) if first else 0.0 - return { - "symbol": symbol, - "price_change_pct": price_change_pct, - "quote_volume": sum(_close(row) * _as_float(row[5]) for row in rows), - "high_price": max(_as_float(row[2]) for row in rows), - "low_price": min(_as_float(row[3]) for row in rows), - } - - -def _window_series(rows: list[list[Any]]) -> tuple[list[float], list[float]]: - return [_close(row) for row in rows], [_as_float(row[5]) for row in rows] - - -def _portfolio_value(cash: float, positions: list[Position], prices: dict[str, float]) -> float: - return cash + sum(p.qty * prices.get(p.symbol, p.entry_price) for p in positions) - - -def _pct(new: float, old: float) -> float: - if old == 0: - return 0.0 - return (new - old) / old - - -def run_backtest( - config: dict[str, Any], - *, - dataset_path: str, - initial_cash: float | None = None, - max_positions: int | None = None, - position_size_pct: float | None = None, - commission_pct: float | None = None, - lookback: int | None = None, - decision_interval_minutes: int | None = None, -) -> dict[str, Any]: - """Run a walk-forward backtest using historical kline datasets. - - Maintains virtual cash and positions. At each decision point: - 1. Sells positions where portfolio signals "exit" or "trim" - 2. Buys top opportunity "entry" signals within cash and position limits - """ - dataset_file = Path(dataset_path).expanduser() - dataset = json.loads(dataset_file.read_text(encoding="utf-8")) - metadata = dataset.get("metadata", {}) - plan = metadata.get("plan", {}) - klines = dataset.get("klines", {}) - - intervals = list(plan.get("intervals") or []) - configured_interval = get_signal_interval(config) - primary_interval = configured_interval if configured_interval in intervals else (intervals[0] if intervals else "1h") - - simulation_start = _parse_dt(plan.get("simulation_start")) - simulation_end = _parse_dt(plan.get("simulation_end")) - if simulation_start is None or simulation_end is None: - raise ValueError("dataset metadata must include plan.simulation_start and plan.simulation_end") - - opportunity_config = config.get("opportunity", {}) - portfolio_config = config.get("portfolio", {}) - - cash = _as_float(initial_cash, 10000.0) - max_pos = _as_int(max_positions, _as_int(portfolio_config.get("max_positions"), 5)) - size_pct = _as_float(position_size_pct, _as_float(opportunity_config.get("backtest_position_size_pct"), 0.2)) - commission = _as_float(commission_pct, _as_float(config.get("trading", {}).get("commission_pct"), 0.001)) - lookback_bars = lookback or _as_int(opportunity_config.get("evaluation_lookback"), 24) - - start_ms = int(simulation_start.timestamp() * 1000) - end_ms = int(simulation_end.timestamp() * 1000) - - rows_by_symbol: dict[str, list[list[Any]]] = {} - index_by_symbol: dict[str, dict[int, int]] = {} - for symbol, by_interval in klines.items(): - rows = by_interval.get(primary_interval, []) - normalized = normalize_symbol(symbol) - if rows: - rows_by_symbol[normalized] = rows - index_by_symbol[normalized] = {_open_ms(row): index for index, row in enumerate(rows)} - - decision_times = sorted( - { - _open_ms(row) - for rows in rows_by_symbol.values() - for row in rows - if start_ms <= _open_ms(row) < end_ms - } - ) - - interval_ms = _as_int(decision_interval_minutes, 0) * 60 * 1000 - if interval_ms > 0 and decision_times: - filtered: list[int] = [] - anchor = decision_times[0] - for t in decision_times: - if t - anchor >= interval_ms: - filtered.append(t) - anchor = t - decision_times = filtered - - positions: list[Position] = [] - trades: list[dict[str, Any]] = [] - equity_curve: list[dict[str, Any]] = [] - - skipped_warmup = 0 - skipped_missing_future = 0 - - for decision_time in decision_times: - current_prices: dict[str, float] = {} - klines_snapshot: dict[str, list[list[Any]]] = {} - - for symbol, rows in rows_by_symbol.items(): - index = index_by_symbol[symbol].get(decision_time) - if index is None: - continue - window = rows[max(0, index - lookback_bars + 1) : index + 1] - if len(window) < lookback_bars: - skipped_warmup += 1 - continue - future_rows = [row for row in rows[index + 1 :]] - if not future_rows: - skipped_missing_future += 1 - continue - klines_snapshot[symbol] = window - current_prices[symbol] = _close(window[-1]) - - # Build held positions for portfolio signal generation - held_positions = [ - { - "symbol": p.symbol, - "notional_usdt": p.qty * current_prices.get(p.symbol, p.entry_price), - } - for p in positions - if p.symbol in current_prices - ] - - signals = generate_signals_from_klines(config, klines_by_symbol=klines_snapshot, held_positions=held_positions) - - # Execute sells first to free cash - sell_symbols = {normalize_symbol(s["symbol"]) for s in signals.get("sell", [])} - new_positions: list[Position] = [] - for pos in positions: - if pos.symbol in sell_symbols and pos.symbol in current_prices: - price = current_prices[pos.symbol] - notional = pos.qty * price - comm = notional * commission - cash += notional - comm - trades.append( - asdict( - Trade( - time=_iso_from_ms(decision_time), - symbol=pos.symbol, - side="SELL", - price=round(price, 8), - qty=round(pos.qty, 8), - notional=round(notional, 4), - commission=round(comm, 4), - reason="portfolio signal: exit/trim", - ) - ) - ) - else: - new_positions.append(pos) - positions = new_positions - - # Execute buys with available cash - available_slots = max_pos - len(positions) - if available_slots > 0 and cash > 0: - for buy_signal in signals.get("buy", [])[:available_slots]: - symbol = normalize_symbol(buy_signal["symbol"]) - if symbol not in current_prices: - continue - # Skip if already held - if any(p.symbol == symbol for p in positions): - continue - price = current_prices[symbol] - allocation = cash * size_pct - if allocation <= 0: - continue - qty = allocation / price - comm = allocation * commission - actual_notional = allocation - comm - if actual_notional <= 0: - continue - cash -= allocation - positions.append( - Position( - symbol=symbol, - qty=round(qty, 8), - entry_price=round(price, 8), - entry_time=_iso_from_ms(decision_time), - notional_usdt=round(actual_notional, 4), - ) - ) - trades.append( - asdict( - Trade( - time=_iso_from_ms(decision_time), - symbol=symbol, - side="BUY", - price=round(price, 8), - qty=round(qty, 8), - notional=round(allocation, 4), - commission=round(comm, 4), - reason=buy_signal.get("reasons", ["opportunity entry"])[0], - ) - ) - ) - - # Record equity - equity = _portfolio_value(cash, positions, current_prices) - equity_curve.append( - { - "time": _iso_from_ms(decision_time), - "equity": round(equity, 4), - "cash": round(cash, 4), - "positions_count": len(positions), - } - ) - - # Final valuation - final_prices: dict[str, float] = {} - for symbol, rows in rows_by_symbol.items(): - if rows: - final_prices[symbol] = _close(rows[-1]) - - final_equity = _portfolio_value(cash, positions, final_prices) - if equity_curve: - equity_curve[-1]["equity"] = round(final_equity, 4) - - # Performance metrics - initial_equity = equity_curve[0]["equity"] if equity_curve else cash - total_return = _pct(final_equity, initial_equity) - - equity_values = [e["equity"] for e in equity_curve] - peak = initial_equity - max_drawdown = 0.0 - for val in equity_values: - if val > peak: - peak = val - dd = _pct(val, peak) - if dd < max_drawdown: - max_drawdown = dd - - buy_trades = [t for t in trades if t["side"] == "BUY"] - sell_trades = [t for t in trades if t["side"] == "SELL"] - - trade_returns: list[float] = [] - position_map: dict[str, dict[str, Any]] = {} - for t in buy_trades: - position_map[t["symbol"]] = t - for t in sell_trades: - buy_trade = position_map.get(t["symbol"]) - if buy_trade: - trade_return = _pct(t["notional"] - t["commission"], buy_trade["notional"] + buy_trade["commission"]) - trade_returns.append(trade_return) - - wins = sum(1 for r in trade_returns if r > 0) - losses = len(trade_returns) - wins - - return { - "summary": { - "initial_cash": round(initial_equity, 4), - "final_equity": round(final_equity, 4), - "total_return_pct": round(total_return * 100, 4), - "max_drawdown_pct": round(max_drawdown * 100, 4), - "buy_signals": len(buy_trades), - "sell_signals": len(sell_trades), - "completed_trades": len(trade_returns), - "win_rate": round(wins / len(trade_returns), 4) if trade_returns else 0.0, - "wins": wins, - "losses": losses, - "avg_trade_return_pct": round(mean(trade_returns) * 100, 4) if trade_returns else 0.0, - "open_positions": len(positions), - "decision_points": len(decision_times), - "skipped_warmup": skipped_warmup, - "skipped_missing_future": skipped_missing_future, - }, - "trades": trades, - "equity_curve": equity_curve, - "open_positions": [asdict(p) for p in positions], - "parameters": { - "dataset": str(dataset_file), - "interval": primary_interval, - "initial_cash": cash if not trades else initial_equity, - "max_positions": max_pos, - "position_size_pct": size_pct, - "commission_pct": commission, - "lookback_bars": lookback_bars, - "decision_interval_minutes": _as_int(decision_interval_minutes, 0), - }, - } diff --git a/src/coinhunter/services/opportunity_dataset_service.py b/src/coinhunter/services/opportunity_dataset_service.py deleted file mode 100644 index 0034e42..0000000 --- a/src/coinhunter/services/opportunity_dataset_service.py +++ /dev/null @@ -1,372 +0,0 @@ -"""Historical dataset collection for opportunity evaluation.""" - -from __future__ import annotations - -import json -import time -from collections.abc import Callable -from dataclasses import asdict, dataclass -from datetime import datetime, timedelta, timezone -from pathlib import Path -from typing import Any -from urllib.parse import parse_qs, urlencode, urlparse - -import requests -from requests.exceptions import RequestException - -from ..runtime import get_runtime_paths -from .market_service import normalize_symbol, normalize_symbols - -HttpGet = Callable[[str, dict[str, str], float], Any] -_PUBLIC_HTTP_ATTEMPTS = 5 - -_INTERVAL_SECONDS = { - "1m": 60, - "3m": 180, - "5m": 300, - "15m": 900, - "30m": 1800, - "1h": 3600, - "2h": 7200, - "4h": 14400, - "6h": 21600, - "8h": 28800, - "12h": 43200, - "1d": 86400, - "3d": 259200, - "1w": 604800, -} - - -@dataclass(frozen=True) -class DatasetPlan: - intervals: list[str] - kline_limit: int - reference_days: float - simulate_days: float - run_days: float - total_days: float - start: datetime - simulation_start: datetime - simulation_end: datetime - end: datetime - - -def _as_float(value: Any, default: float = 0.0) -> float: - try: - return float(value) - except (TypeError, ValueError): - return default - - -def _as_int(value: Any, default: int = 0) -> int: - try: - return int(value) - except (TypeError, ValueError): - return default - - -def _public_http_get(url: str, headers: dict[str, str], timeout: float) -> Any: - last_error: RequestException | None = None - for attempt in range(_PUBLIC_HTTP_ATTEMPTS): - try: - response = requests.get(url, headers=headers, timeout=timeout) - response.raise_for_status() - return response.json() - except RequestException as exc: - last_error = exc - if attempt < _PUBLIC_HTTP_ATTEMPTS - 1: - time.sleep(0.5 * (attempt + 1)) - if last_error is not None: - raise last_error - raise RuntimeError("public HTTP request failed") - - -def _public_http_status(url: str, headers: dict[str, str], timeout: float) -> tuple[int, str]: - last_error: RequestException | None = None - for attempt in range(_PUBLIC_HTTP_ATTEMPTS): - try: - response = requests.get(url, headers=headers, timeout=timeout) - return response.status_code, response.text - except RequestException as exc: - last_error = exc - if attempt < _PUBLIC_HTTP_ATTEMPTS - 1: - time.sleep(0.5 * (attempt + 1)) - if last_error is not None: - raise last_error - raise RuntimeError("public HTTP status request failed") - - -def _build_url(base_url: str, path: str, params: dict[str, str]) -> str: - return f"{base_url.rstrip('/')}{path}?{urlencode(params)}" - - -def _iso(dt: datetime) -> str: - return dt.astimezone(timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z") - - -def _ms(dt: datetime) -> int: - return int(dt.timestamp() * 1000) - - -def _default_intervals(config: dict[str, Any]) -> list[str]: - configured = config.get("opportunity", {}).get("lookback_intervals", ["1h", "4h", "1d"]) - intervals = [str(item).strip() for item in configured if str(item).strip()] - return intervals or ["1h"] - - -def reference_days_for(config: dict[str, Any]) -> float: - opportunity_config = config.get("opportunity", {}) - intervals = _default_intervals(config) - kline_limit = _as_int(opportunity_config.get("kline_limit"), 48) - seconds = [(_INTERVAL_SECONDS.get(interval) or 0) * kline_limit for interval in intervals] - return round(max(seconds or [0]) / 86400, 4) - - -def build_dataset_plan( - config: dict[str, Any], - *, - simulate_days: float | None = None, - run_days: float | None = None, - now: datetime | None = None, -) -> DatasetPlan: - opportunity_config = config.get("opportunity", {}) - intervals = _default_intervals(config) - kline_limit = _as_int(opportunity_config.get("kline_limit"), 48) - reference_days = reference_days_for(config) - simulate = _as_float( - simulate_days if simulate_days is not None else opportunity_config.get("simulate_days"), - 7.0, - ) - run = _as_float(run_days if run_days is not None else opportunity_config.get("run_days"), 7.0) - end = (now or datetime.now(timezone.utc)).astimezone(timezone.utc).replace(microsecond=0) - total = reference_days + simulate + run - start = end - timedelta(days=total) - simulation_start = start + timedelta(days=reference_days) - simulation_end = simulation_start + timedelta(days=run) - return DatasetPlan( - intervals=intervals, - kline_limit=kline_limit, - reference_days=reference_days, - simulate_days=simulate, - run_days=run, - total_days=round(total, 4), - start=start, - simulation_start=simulation_start, - simulation_end=simulation_end, - end=end, - ) - - -def _binance_base_url(config: dict[str, Any]) -> str: - return str(config.get("binance", {}).get("spot_base_url", "https://api.binance.com")) - - -def _select_universe( - config: dict[str, Any], - *, - symbols: list[str] | None, - http_get: HttpGet, - timeout: float, -) -> list[str]: - if symbols: - return normalize_symbols(symbols) - - market_config = config.get("market", {}) - opportunity_config = config.get("opportunity", {}) - quote = str(market_config.get("default_quote", "USDT")).upper() - allowlist = set(normalize_symbols(market_config.get("universe_allowlist", []))) - denylist = set(normalize_symbols(market_config.get("universe_denylist", []))) - scan_limit = _as_int(opportunity_config.get("scan_limit"), 50) - min_quote_volume = _as_float(opportunity_config.get("min_quote_volume"), 0.0) - base_url = _binance_base_url(config) - headers = {"accept": "application/json", "user-agent": "coinhunter/2"} - - exchange_info = http_get(_build_url(base_url, "/api/v3/exchangeInfo", {}), headers, timeout) - status_map = {normalize_symbol(item["symbol"]): item.get("status", "") for item in exchange_info.get("symbols", [])} - rows = http_get(_build_url(base_url, "/api/v3/ticker/24hr", {}), headers, timeout) - - universe: list[tuple[str, float]] = [] - for ticker in rows if isinstance(rows, list) else []: - symbol = normalize_symbol(ticker.get("symbol", "")) - if not symbol.endswith(quote): - continue - if allowlist and symbol not in allowlist: - continue - if symbol in denylist: - continue - if status_map.get(symbol) != "TRADING": - continue - quote_volume = _as_float(ticker.get("quoteVolume")) - if quote_volume < min_quote_volume: - continue - universe.append((symbol, quote_volume)) - universe.sort(key=lambda item: item[1], reverse=True) - return [symbol for symbol, _ in universe[:scan_limit]] - - -def _fetch_klines( - config: dict[str, Any], - *, - symbol: str, - interval: str, - start: datetime, - end: datetime, - http_get: HttpGet, - timeout: float, -) -> list[list[Any]]: - base_url = _binance_base_url(config) - headers = {"accept": "application/json", "user-agent": "coinhunter/2"} - interval_ms = (_INTERVAL_SECONDS.get(interval) or 60) * 1000 - cursor = _ms(start) - end_ms = _ms(end) - rows: list[list[Any]] = [] - while cursor <= end_ms: - url = _build_url( - base_url, - "/api/v3/klines", - { - "symbol": symbol, - "interval": interval, - "startTime": str(cursor), - "endTime": str(end_ms), - "limit": "1000", - }, - ) - chunk = http_get(url, headers, timeout) - if not chunk: - break - rows.extend(chunk) - last_open = int(chunk[-1][0]) - next_cursor = last_open + interval_ms - if next_cursor <= cursor: - break - cursor = next_cursor - if len(chunk) < 1000: - break - return rows - - -def _probe_external_history( - config: dict[str, Any], - *, - plan: DatasetPlan, - timeout: float, - http_status: Callable[[str, dict[str, str], float], tuple[int, str]] | None = None, -) -> dict[str, Any]: - opportunity_config = config.get("opportunity", {}) - provider = str(opportunity_config.get("research_provider", "coingecko")).lower().strip() - if not bool(opportunity_config.get("auto_research", True)) or provider in {"", "off", "none", "disabled"}: - return {"provider": provider or "disabled", "status": "disabled"} - if provider != "coingecko": - return {"provider": provider, "status": "unsupported"} - - coingecko_config = config.get("coingecko", {}) - base_url = str(coingecko_config.get("base_url", "https://api.coingecko.com/api/v3")) - api_key = str(coingecko_config.get("api_key", "")).strip() - headers = {"accept": "application/json", "user-agent": "coinhunter/2"} - if api_key: - headers["x-cg-demo-api-key"] = api_key - sample_date = plan.simulation_start.strftime("%d-%m-%Y") - url = _build_url(base_url, "/coins/bitcoin/history", {"date": sample_date}) - http_status = http_status or _public_http_status - try: - status, body = http_status(url, headers, timeout) - except (TimeoutError, RequestException, OSError) as exc: - return {"provider": "coingecko", "status": "failed", "sample_date": sample_date, "error": str(exc)} - if status == 200: - return {"provider": "coingecko", "status": "available", "sample_date": sample_date} - lowered = body.lower() - if "allowed time range" in lowered or "365 days" in lowered: - result_status = "limited" - elif status == 429: - result_status = "rate_limited" - elif status in {401, 403}: - result_status = "unauthorized" - else: - result_status = "failed" - return { - "provider": "coingecko", - "status": result_status, - "sample_date": sample_date, - "http_status": status, - "message": body[:240], - } - - -def _default_output_path(plan: DatasetPlan) -> Path: - dataset_dir = get_runtime_paths().root / "datasets" - dataset_dir.mkdir(parents=True, exist_ok=True) - stamp = plan.end.strftime("%Y%m%dT%H%M%SZ") - return dataset_dir / f"opportunity_dataset_{stamp}.json" - - -def collect_opportunity_dataset( - config: dict[str, Any], - *, - symbols: list[str] | None = None, - simulate_days: float | None = None, - run_days: float | None = None, - output_path: str | None = None, - http_get: HttpGet | None = None, - http_status: Callable[[str, dict[str, str], float], tuple[int, str]] | None = None, - now: datetime | None = None, -) -> dict[str, Any]: - opportunity_config = config.get("opportunity", {}) - timeout = _as_float(opportunity_config.get("dataset_timeout_seconds"), 15.0) - plan = build_dataset_plan(config, simulate_days=simulate_days, run_days=run_days, now=now) - http_get = http_get or _public_http_get - selected_symbols = _select_universe(config, symbols=symbols, http_get=http_get, timeout=timeout) - klines: dict[str, dict[str, list[list[Any]]]] = {} - counts: dict[str, dict[str, int]] = {} - for symbol in selected_symbols: - klines[symbol] = {} - counts[symbol] = {} - for interval in plan.intervals: - rows = _fetch_klines( - config, - symbol=symbol, - interval=interval, - start=plan.start, - end=plan.end, - http_get=http_get, - timeout=timeout, - ) - klines[symbol][interval] = rows - counts[symbol][interval] = len(rows) - - external_history = _probe_external_history(config, plan=plan, timeout=timeout, http_status=http_status) - path = Path(output_path).expanduser() if output_path else _default_output_path(plan) - path.parent.mkdir(parents=True, exist_ok=True) - metadata = { - "created_at": _iso(datetime.now(timezone.utc)), - "quote": str(config.get("market", {}).get("default_quote", "USDT")).upper(), - "symbols": selected_symbols, - "plan": { - **{ - key: value - for key, value in asdict(plan).items() - if key not in {"start", "simulation_start", "simulation_end", "end"} - }, - "start": _iso(plan.start), - "simulation_start": _iso(plan.simulation_start), - "simulation_end": _iso(plan.simulation_end), - "end": _iso(plan.end), - }, - "external_history": external_history, - } - dataset = {"metadata": metadata, "klines": klines} - path.write_text(json.dumps(dataset, ensure_ascii=False, indent=2), encoding="utf-8") - return { - "path": str(path), - "symbols": selected_symbols, - "counts": counts, - "plan": metadata["plan"], - "external_history": external_history, - } - - -def parse_query(url: str) -> dict[str, str]: - """Test helper for fake HTTP clients.""" - parsed = urlparse(url) - return {key: values[-1] for key, values in parse_qs(parsed.query).items()} diff --git a/src/coinhunter/services/opportunity_evaluation_service.py b/src/coinhunter/services/opportunity_evaluation_service.py deleted file mode 100644 index 660ae54..0000000 --- a/src/coinhunter/services/opportunity_evaluation_service.py +++ /dev/null @@ -1,536 +0,0 @@ -"""Walk-forward evaluation for historical opportunity datasets.""" - -from __future__ import annotations - -import json -from collections import defaultdict -from copy import deepcopy -from datetime import datetime, timezone -from pathlib import Path -from statistics import mean -from typing import Any - -from .market_service import normalize_symbol -from .opportunity_service import _action_for_opportunity, _opportunity_thresholds -from .signal_service import ( - get_opportunity_model_weights, - get_signal_interval, - score_opportunity_signal, -) - -_OPTIMIZE_WEIGHT_KEYS = [ - "trend", - "compression", - "breakout_proximity", - "higher_lows", - "range_position", - "fresh_breakout", - "volume", - "momentum", - "setup", - "trigger", - "liquidity", - "volatility_penalty", - "extension_penalty", -] -_OPTIMIZE_MULTIPLIERS = [0.5, 0.75, 1.25, 1.5] - - -def _as_float(value: Any, default: float = 0.0) -> float: - try: - return float(value) - except (TypeError, ValueError): - return default - - -def _as_int(value: Any, default: int = 0) -> int: - try: - return int(value) - except (TypeError, ValueError): - return default - - -def _parse_dt(value: Any) -> datetime | None: - if not value: - return None - try: - return datetime.fromisoformat(str(value).replace("Z", "+00:00")).astimezone(timezone.utc) - except ValueError: - return None - - -def _iso_from_ms(value: int) -> str: - return datetime.fromtimestamp(value / 1000, tz=timezone.utc).replace(microsecond=0).isoformat().replace("+00:00", "Z") - - -def _close(row: list[Any]) -> float: - return _as_float(row[4]) - - -def _high(row: list[Any]) -> float: - return _as_float(row[2]) - - -def _low(row: list[Any]) -> float: - return _as_float(row[3]) - - -def _volume(row: list[Any]) -> float: - return _as_float(row[5]) - - -def _quote_volume(row: list[Any]) -> float: - if len(row) > 7: - return _as_float(row[7]) - return _close(row) * _volume(row) - - -def _open_ms(row: list[Any]) -> int: - return int(row[0]) - - -def _ticker_from_window(symbol: str, rows: list[list[Any]]) -> dict[str, Any]: - first = _close(rows[0]) - last = _close(rows[-1]) - price_change_pct = ((last - first) / first * 100.0) if first else 0.0 - return { - "symbol": symbol, - "price_change_pct": price_change_pct, - "quote_volume": sum(_quote_volume(row) for row in rows), - "high_price": max(_high(row) for row in rows), - "low_price": min(_low(row) for row in rows), - } - - -def _window_series(rows: list[list[Any]]) -> tuple[list[float], list[float]]: - return [_close(row) for row in rows], [_volume(row) for row in rows] - - -def _pct(new: float, old: float) -> float: - if old == 0: - return 0.0 - return (new - old) / old - - -def _path_stats(entry: float, future_rows: list[list[Any]], take_profit: float, stop_loss: float) -> dict[str, Any]: - if not future_rows: - return { - "event": "missing", - "exit_return": 0.0, - "final_return": 0.0, - "max_upside": 0.0, - "max_drawdown": 0.0, - "bars": 0, - } - - for row in future_rows: - high_return = _pct(_high(row), entry) - low_return = _pct(_low(row), entry) - if low_return <= -stop_loss: - return { - "event": "stop", - "exit_return": -stop_loss, - "final_return": _pct(_close(future_rows[-1]), entry), - "max_upside": max(_pct(_high(item), entry) for item in future_rows), - "max_drawdown": min(_pct(_low(item), entry) for item in future_rows), - "bars": len(future_rows), - } - if high_return >= take_profit: - return { - "event": "target", - "exit_return": take_profit, - "final_return": _pct(_close(future_rows[-1]), entry), - "max_upside": max(_pct(_high(item), entry) for item in future_rows), - "max_drawdown": min(_pct(_low(item), entry) for item in future_rows), - "bars": len(future_rows), - } - - return { - "event": "horizon", - "exit_return": _pct(_close(future_rows[-1]), entry), - "final_return": _pct(_close(future_rows[-1]), entry), - "max_upside": max(_pct(_high(item), entry) for item in future_rows), - "max_drawdown": min(_pct(_low(item), entry) for item in future_rows), - "bars": len(future_rows), - } - - -def _is_correct(action: str, trigger_path: dict[str, Any], setup_path: dict[str, Any]) -> bool: - if action == "entry": - return str(trigger_path["event"]) == "target" - if action == "watch": - return str(setup_path["event"]) == "target" - if action == "avoid": - return str(setup_path["event"]) != "target" - return False - - -def _round_float(value: Any, digits: int = 4) -> float: - return round(_as_float(value), digits) - - -def _finalize_bucket(bucket: dict[str, Any]) -> dict[str, Any]: - count = int(bucket["count"]) - correct = int(bucket["correct"]) - returns = bucket["forward_returns"] - trade_returns = bucket["trade_returns"] - return { - "count": count, - "correct": correct, - "incorrect": count - correct, - "accuracy": round(correct / count, 4) if count else 0.0, - "avg_forward_return": round(mean(returns), 4) if returns else 0.0, - "avg_trade_return": round(mean(trade_returns), 4) if trade_returns else 0.0, - } - - -def _bucket() -> dict[str, Any]: - return {"count": 0, "correct": 0, "forward_returns": [], "trade_returns": []} - - -def evaluate_opportunity_dataset( - config: dict[str, Any], - *, - dataset_path: str, - horizon_hours: float | None = None, - take_profit: float | None = None, - stop_loss: float | None = None, - setup_target: float | None = None, - lookback: int | None = None, - top_n: int | None = None, - max_examples: int = 20, -) -> dict[str, Any]: - """Evaluate opportunity actions using only point-in-time historical candles.""" - dataset_file = Path(dataset_path).expanduser() - dataset = json.loads(dataset_file.read_text(encoding="utf-8")) - metadata = dataset.get("metadata", {}) - plan = metadata.get("plan", {}) - klines = dataset.get("klines", {}) - opportunity_config = config.get("opportunity", {}) - - intervals = list(plan.get("intervals") or []) - configured_interval = get_signal_interval(config) - primary_interval = configured_interval if configured_interval in intervals else (intervals[0] if intervals else "1h") - simulation_start = _parse_dt(plan.get("simulation_start")) - simulation_end = _parse_dt(plan.get("simulation_end")) - if simulation_start is None or simulation_end is None: - raise ValueError("dataset metadata must include plan.simulation_start and plan.simulation_end") - - horizon = _as_float(horizon_hours, 0.0) - if horizon <= 0: - horizon = _as_float(plan.get("simulate_days"), 0.0) * 24.0 - if horizon <= 0: - horizon = _as_float(opportunity_config.get("evaluation_horizon_hours"), 24.0) - - take_profit_value = take_profit if take_profit is not None else _as_float(opportunity_config.get("evaluation_take_profit_pct"), 2.0) / 100.0 - stop_loss_value = stop_loss if stop_loss is not None else _as_float(opportunity_config.get("evaluation_stop_loss_pct"), 1.5) / 100.0 - setup_target_value = setup_target if setup_target is not None else _as_float(opportunity_config.get("evaluation_setup_target_pct"), 1.0) / 100.0 - lookback_bars = lookback or _as_int(opportunity_config.get("evaluation_lookback"), 24) - selected_top_n = top_n or _as_int(opportunity_config.get("top_n"), 10) - thresholds = _opportunity_thresholds(config) - horizon_ms = int(horizon * 60 * 60 * 1000) - start_ms = int(simulation_start.timestamp() * 1000) - end_ms = int(simulation_end.timestamp() * 1000) - - rows_by_symbol: dict[str, list[list[Any]]] = {} - index_by_symbol: dict[str, dict[int, int]] = {} - for symbol, by_interval in klines.items(): - rows = by_interval.get(primary_interval, []) - normalized = normalize_symbol(symbol) - if rows: - rows_by_symbol[normalized] = rows - index_by_symbol[normalized] = {_open_ms(row): index for index, row in enumerate(rows)} - - decision_times = sorted( - { - _open_ms(row) - for rows in rows_by_symbol.values() - for row in rows - if start_ms <= _open_ms(row) < end_ms - } - ) - - judgments: list[dict[str, Any]] = [] - skipped_missing_future = 0 - skipped_warmup = 0 - for decision_time in decision_times: - candidates: list[dict[str, Any]] = [] - for symbol, rows in rows_by_symbol.items(): - index = index_by_symbol[symbol].get(decision_time) - if index is None: - continue - window = rows[max(0, index - lookback_bars + 1) : index + 1] - if len(window) < lookback_bars: - skipped_warmup += 1 - continue - future_rows = [row for row in rows[index + 1 :] if _open_ms(row) <= decision_time + horizon_ms] - if not future_rows: - skipped_missing_future += 1 - continue - closes, volumes = _window_series(window) - ticker = _ticker_from_window(symbol, window) - opportunity_score, metrics = score_opportunity_signal(closes, volumes, ticker, opportunity_config) - score = opportunity_score - metrics["opportunity_score"] = round(opportunity_score, 4) - metrics["position_weight"] = 0.0 - metrics["research_score"] = 0.0 - action, reasons, _confidence = _action_for_opportunity(score, metrics, thresholds) - candidates.append( - { - "symbol": symbol, - "time": decision_time, - "action": action, - "score": round(score, 4), - "metrics": metrics, - "reasons": reasons, - "entry_price": _close(window[-1]), - "future_rows": future_rows, - } - ) - - for rank, candidate in enumerate(sorted(candidates, key=lambda item: item["score"], reverse=True)[:selected_top_n], start=1): - trigger_path = _path_stats(candidate["entry_price"], candidate["future_rows"], take_profit_value, stop_loss_value) - setup_path = _path_stats(candidate["entry_price"], candidate["future_rows"], setup_target_value, stop_loss_value) - correct = _is_correct(candidate["action"], trigger_path, setup_path) - judgments.append( - { - "time": _iso_from_ms(candidate["time"]), - "rank": rank, - "symbol": candidate["symbol"], - "action": candidate["action"], - "score": candidate["score"], - "correct": correct, - "entry_price": round(candidate["entry_price"], 8), - "forward_return": _round_float(trigger_path["final_return"]), - "max_upside": _round_float(trigger_path["max_upside"]), - "max_drawdown": _round_float(trigger_path["max_drawdown"]), - "trade_return": _round_float(trigger_path["exit_return"]) if candidate["action"] == "entry" else 0.0, - "trigger_event": trigger_path["event"], - "setup_event": setup_path["event"], - "metrics": candidate["metrics"], - "reason": candidate["reasons"][0] if candidate["reasons"] else "", - } - ) - - overall = _bucket() - by_action: dict[str, dict[str, Any]] = defaultdict(_bucket) - trigger_returns: list[float] = [] - for judgment in judgments: - action = judgment["action"] - for bucket in (overall, by_action[action]): - bucket["count"] += 1 - bucket["correct"] += 1 if judgment["correct"] else 0 - bucket["forward_returns"].append(judgment["forward_return"]) - if action == "entry": - bucket["trade_returns"].append(judgment["trade_return"]) - if action == "entry": - trigger_returns.append(judgment["trade_return"]) - - by_action_result = {action: _finalize_bucket(bucket) for action, bucket in sorted(by_action.items())} - incorrect_examples = [item for item in judgments if not item["correct"]][:max_examples] - examples = judgments[:max_examples] - trigger_count = by_action_result.get("entry", {}).get("count", 0) - trigger_correct = by_action_result.get("entry", {}).get("correct", 0) - return { - "summary": { - **_finalize_bucket(overall), - "decision_times": len(decision_times), - "symbols": sorted(rows_by_symbol), - "interval": primary_interval, - "top_n": selected_top_n, - "skipped_warmup": skipped_warmup, - "skipped_missing_future": skipped_missing_future, - }, - "by_action": by_action_result, - "trade_simulation": { - "trigger_trades": trigger_count, - "wins": trigger_correct, - "losses": trigger_count - trigger_correct, - "win_rate": round(trigger_correct / trigger_count, 4) if trigger_count else 0.0, - "avg_trade_return": round(mean(trigger_returns), 4) if trigger_returns else 0.0, - }, - "rules": { - "dataset": str(dataset_file), - "interval": primary_interval, - "horizon_hours": round(horizon, 4), - "lookback_bars": lookback_bars, - "take_profit": round(take_profit_value, 4), - "stop_loss": round(stop_loss_value, 4), - "setup_target": round(setup_target_value, 4), - "same_candle_policy": "stop_first", - "research_mode": "disabled: dataset has no point-in-time research snapshots", - }, - "examples": examples, - "incorrect_examples": incorrect_examples, - } - - -def _objective(result: dict[str, Any]) -> float: - summary = result.get("summary", {}) - by_action = result.get("by_action", {}) - trade = result.get("trade_simulation", {}) - count = _as_float(summary.get("count")) - trigger_trades = _as_float(trade.get("trigger_trades")) - trigger_rate = trigger_trades / count if count else 0.0 - avg_trade_return = _as_float(trade.get("avg_trade_return")) - bounded_trade_return = max(min(avg_trade_return, 0.03), -0.03) - trigger_coverage = min(trigger_rate / 0.08, 1.0) - return round( - 0.45 * _as_float(summary.get("accuracy")) - + 0.20 * _as_float(by_action.get("watch", {}).get("accuracy")) - + 0.25 * _as_float(trade.get("win_rate")) - + 6.0 * bounded_trade_return - + 0.05 * trigger_coverage, - 6, - ) - - -def _copy_config_with_weights(config: dict[str, Any], weights: dict[str, float]) -> dict[str, Any]: - candidate = deepcopy(config) - candidate.setdefault("opportunity", {})["model_weights"] = weights - return candidate - - -def _evaluation_snapshot(result: dict[str, Any], objective: float, weights: dict[str, float]) -> dict[str, Any]: - return { - "objective": objective, - "weights": {key: round(value, 4) for key, value in sorted(weights.items())}, - "summary": result.get("summary", {}), - "by_action": result.get("by_action", {}), - "trade_simulation": result.get("trade_simulation", {}), - } - - -def optimize_opportunity_model( - config: dict[str, Any], - *, - dataset_path: str, - horizon_hours: float | None = None, - take_profit: float | None = None, - stop_loss: float | None = None, - setup_target: float | None = None, - lookback: int | None = None, - top_n: int | None = None, - passes: int = 2, -) -> dict[str, Any]: - """Coordinate-search model weights against a walk-forward dataset. - - This intentionally optimizes model feature weights only. Entry/watch policy - thresholds remain fixed so the search improves signal construction instead - of fitting decision cutoffs to a sample. - """ - base_weights = get_opportunity_model_weights(config.get("opportunity", {})) - - def evaluate(weights: dict[str, float]) -> tuple[dict[str, Any], float]: - result = evaluate_opportunity_dataset( - _copy_config_with_weights(config, weights), - dataset_path=dataset_path, - horizon_hours=horizon_hours, - take_profit=take_profit, - stop_loss=stop_loss, - setup_target=setup_target, - lookback=lookback, - top_n=top_n, - max_examples=0, - ) - return result, _objective(result) - - baseline_result, baseline_objective = evaluate(base_weights) - best_weights = dict(base_weights) - best_result = baseline_result - best_objective = baseline_objective - evaluations = 1 - history: list[dict[str, Any]] = [ - { - "pass": 0, - "key": "baseline", - "multiplier": 1.0, - "objective": baseline_objective, - "accuracy": baseline_result["summary"]["accuracy"], - "trigger_win_rate": baseline_result["trade_simulation"]["win_rate"], - } - ] - - for pass_index in range(max(passes, 0)): - improved = False - for key in _OPTIMIZE_WEIGHT_KEYS: - current_value = best_weights.get(key, 0.0) - if current_value <= 0: - continue - local_best_weights = best_weights - local_best_result = best_result - local_best_objective = best_objective - local_best_multiplier = 1.0 - for multiplier in _OPTIMIZE_MULTIPLIERS: - candidate_weights = dict(best_weights) - candidate_weights[key] = round(max(current_value * multiplier, 0.01), 4) - candidate_result, candidate_objective = evaluate(candidate_weights) - evaluations += 1 - history.append( - { - "pass": pass_index + 1, - "key": key, - "multiplier": multiplier, - "objective": candidate_objective, - "accuracy": candidate_result["summary"]["accuracy"], - "trigger_win_rate": candidate_result["trade_simulation"]["win_rate"], - } - ) - if candidate_objective > local_best_objective: - local_best_weights = candidate_weights - local_best_result = candidate_result - local_best_objective = candidate_objective - local_best_multiplier = multiplier - if local_best_objective > best_objective: - best_weights = local_best_weights - best_result = local_best_result - best_objective = local_best_objective - improved = True - history.append( - { - "pass": pass_index + 1, - "key": key, - "multiplier": local_best_multiplier, - "objective": best_objective, - "accuracy": best_result["summary"]["accuracy"], - "trigger_win_rate": best_result["trade_simulation"]["win_rate"], - "selected": True, - } - ) - if not improved: - break - - recommended_config = { - f"opportunity.model_weights.{key}": round(value, 4) - for key, value in sorted(best_weights.items()) - } - return { - "baseline": _evaluation_snapshot(baseline_result, baseline_objective, base_weights), - "best": _evaluation_snapshot(best_result, best_objective, best_weights), - "improvement": { - "objective": round(best_objective - baseline_objective, 6), - "accuracy": round( - _as_float(best_result["summary"].get("accuracy")) - _as_float(baseline_result["summary"].get("accuracy")), - 4, - ), - "trigger_win_rate": round( - _as_float(best_result["trade_simulation"].get("win_rate")) - - _as_float(baseline_result["trade_simulation"].get("win_rate")), - 4, - ), - "avg_trade_return": round( - _as_float(best_result["trade_simulation"].get("avg_trade_return")) - - _as_float(baseline_result["trade_simulation"].get("avg_trade_return")), - 4, - ), - }, - "recommended_config": recommended_config, - "search": { - "passes": passes, - "evaluations": evaluations, - "optimized": "model_weights_only", - "thresholds": "fixed", - "objective": "0.45*accuracy + 0.20*setup_accuracy + 0.25*trigger_win_rate + 6*avg_trade_return + 0.05*trigger_coverage", - }, - "history": history[-20:], - } diff --git a/src/coinhunter/services/opportunity_service.py b/src/coinhunter/services/opportunity_service.py index 2cf1db0..9793777 100644 --- a/src/coinhunter/services/opportunity_service.py +++ b/src/coinhunter/services/opportunity_service.py @@ -3,13 +3,11 @@ from __future__ import annotations from dataclasses import asdict, dataclass -from statistics import mean from typing import Any from ..audit import audit_event from .account_service import get_positions from .market_service import base_asset, get_scan_universe, normalize_symbol -from .research_service import get_external_research from .signal_service import get_signal_interval, score_opportunity_signal @@ -38,42 +36,10 @@ def _clamp(value: float, low: float, high: float) -> float: return min(max(value, low), high) -def _as_float(value: Any, default: float = 0.0) -> float: - try: - return float(value) - except (TypeError, ValueError): - return default - - def _series_from_klines(klines: list[list[Any]]) -> tuple[list[float], list[float]]: return [float(item[4]) for item in klines], [float(item[5]) for item in klines] -def _normalized_research_score(value: Any) -> float: - """Normalize provider research inputs to 0..1. - - Provider values can be expressed as either 0..1 or 0..100. - """ - score = _as_float(value) - if score > 1.0: - score = score / 100.0 - return _clamp(score, 0.0, 1.0) - - -def _research_signals(research: dict[str, Any] | None) -> dict[str, float]: - research = research or {} - return { - "fundamental": _normalized_research_score(research.get("fundamental")), - "tokenomics": _normalized_research_score(research.get("tokenomics")), - "catalyst": _normalized_research_score(research.get("catalyst")), - "adoption": _normalized_research_score(research.get("adoption")), - "smart_money": _normalized_research_score(research.get("smart_money")), - "unlock_risk": _normalized_research_score(research.get("unlock_risk")), - "regulatory_risk": _normalized_research_score(research.get("regulatory_risk")), - "research_confidence": _normalized_research_score(research.get("research_confidence")), - } - - def _confidence_from_edge(edge_score: float) -> int: return int(_clamp((edge_score + 1.0) / 2.0, 0.0, 1.0) * 100) @@ -120,37 +86,6 @@ def _action_for_opportunity(score: float, metrics: dict[str, float], thresholds: return "avoid", reasons, confidence -def _add_research_metrics(metrics: dict[str, float], research: dict[str, Any] | None) -> None: - research_signals = _research_signals(research) - for key, value in research_signals.items(): - metrics[key] = round(value, 4) - metrics["quality"] = round( - mean( - [ - research_signals["fundamental"], - research_signals["tokenomics"], - research_signals["catalyst"], - research_signals["adoption"], - research_signals["smart_money"], - ] - ), - 4, - ) - - -def _research_score(research: dict[str, Any] | None, weights: dict[str, float]) -> float: - signals = _research_signals(research) - return ( - weights.get("fundamental", 0.8) * signals["fundamental"] - + weights.get("tokenomics", 0.7) * signals["tokenomics"] - + weights.get("catalyst", 0.5) * signals["catalyst"] - + weights.get("adoption", 0.4) * signals["adoption"] - + weights.get("smart_money", 0.3) * signals["smart_money"] - - weights.get("unlock_penalty", 0.8) * signals["unlock_risk"] - - weights.get("regulatory_penalty", 0.4) * signals["regulatory_risk"] - ) - - def scan_opportunities( config: dict[str, Any], *, @@ -158,7 +93,6 @@ def scan_opportunities( symbols: list[str] | None = None, ) -> dict[str, Any]: opportunity_config = config.get("opportunity", {}) - weights = opportunity_config.get("weights", {}) ignore_dust = bool(opportunity_config.get("ignore_dust", True)) interval = get_signal_interval(config) thresholds = _opportunity_thresholds(config) @@ -170,11 +104,6 @@ def scan_opportunities( total_held = sum(concentration_map.values()) or 1.0 universe = get_scan_universe(config, spot_client=spot_client, symbols=symbols)[:scan_limit] - external_research = get_external_research( - config, - symbols=[normalize_symbol(ticker["symbol"]) for ticker in universe], - quote=quote, - ) recommendations = [] for ticker in universe: symbol = normalize_symbol(ticker["symbol"]) @@ -185,11 +114,6 @@ def scan_opportunities( score = opportunity_score - thresholds["overlap_penalty"] * concentration metrics["opportunity_score"] = round(opportunity_score, 4) metrics["position_weight"] = round(concentration, 4) - research = external_research.get(symbol, {}) - research_score = _research_score(research, weights) - score += research_score - metrics["research_score"] = round(research_score, 4) - _add_research_metrics(metrics, research) action, reasons, confidence = _action_for_opportunity(score, metrics, thresholds) if symbol.endswith(quote): reasons.append(f"base asset {base_asset(symbol, quote)} passed liquidity and tradability filters") diff --git a/src/coinhunter/services/portfolio_service.py b/src/coinhunter/services/portfolio_service.py index 141b710..4e0acf7 100644 --- a/src/coinhunter/services/portfolio_service.py +++ b/src/coinhunter/services/portfolio_service.py @@ -24,6 +24,14 @@ class PortfolioRecommendation: metrics: dict[str, float] +@dataclass +class WatchResult: + symbol: str + status: str + reasons: list[str] + metrics: dict[str, float] + + def _portfolio_thresholds(config: dict[str, Any]) -> dict[str, float]: portfolio_config = config.get("portfolio", {}) return { @@ -111,3 +119,98 @@ def analyze_portfolio(config: dict[str, Any], *, spot_client: Any) -> dict[str, }, ) return payload + + +def watch_portfolio(config: dict[str, Any], *, spot_client: Any) -> dict[str, Any]: + """Lightweight portfolio monitoring. Returns NEED_REVIEW or HEALTHY for each position. + + Zero-token-cost rule-based screening. AI should only deep-analyze NEED_REVIEW items. + """ + quote = str(config.get("market", {}).get("default_quote", "USDT")).upper() + watch_config = config.get("watch", {}) + alert_drawdown_1h_pct = float(watch_config.get("alert_drawdown_1h_pct", -5.0)) + alert_drawdown_24h_pct = float(watch_config.get("alert_drawdown_24h_pct", -10.0)) + alert_spike_1h_pct = float(watch_config.get("alert_spike_1h_pct", 8.0)) + max_position_weight = float(watch_config.get("max_position_weight", 0.5)) + exit_threshold = float(config.get("portfolio", {}).get("exit_threshold", -0.2)) + signal_weights = get_signal_weights(config) + interval = get_signal_interval(config) + + positions = get_positions(config, spot_client=spot_client)["positions"] + positions = [item for item in positions if item["symbol"] != quote] + total_notional = sum(item["notional_usdt"] for item in positions) or 1.0 + + watch_results = [] + need_review_count = 0 + for position in positions: + symbol = normalize_symbol(position["symbol"]) + klines = spot_client.klines(symbol=symbol, interval=interval, limit=24) + closes = [float(item[4]) for item in klines] + volumes = [float(item[5]) for item in klines] + tickers = spot_client.ticker_stats([symbol], window="1d") + ticker = tickers[0] if tickers else {"priceChangePercent": "0"} + price_change_24h = float(ticker.get("priceChangePercent") or 0.0) + concentration = position["notional_usdt"] / total_notional + + score, metrics = score_portfolio_signal( + closes, + volumes, + {"price_change_pct": price_change_24h}, + signal_weights, + ) + + reasons: list[str] = [] + # Rule 1: 1h price crash + if len(closes) >= 2: + price_change_1h = (closes[-1] - closes[-2]) / closes[-2] * 100 if closes[-2] != 0 else 0.0 + if price_change_1h <= alert_drawdown_1h_pct: + reasons.append(f"1h drop {price_change_1h:.2f}% (alert threshold {alert_drawdown_1h_pct:.1f}%)") + if price_change_1h >= alert_spike_1h_pct: + reasons.append(f"1h spike +{price_change_1h:.2f}% (alert threshold {alert_spike_1h_pct:.1f}%)") + + # Rule 2: 24h price crash + if price_change_24h <= alert_drawdown_24h_pct: + reasons.append(f"24h drop {price_change_24h:.2f}% (alert threshold {alert_drawdown_24h_pct:.1f}%)") + + # Rule 3: Concentration risk + if concentration >= max_position_weight: + reasons.append(f"position weight {concentration:.1%} exceeds max {max_position_weight:.1%}") + + # Rule 4: Technical deterioration + if score <= exit_threshold: + reasons.append(f"technical score {score:.2f} below exit threshold {exit_threshold:.2f}") + + if reasons: + status = "need_review" + need_review_count += 1 + else: + status = "healthy" + + watch_results.append( + asdict( + WatchResult( + symbol=symbol, + status=status, + reasons=reasons, + metrics={ + "position_weight": round(concentration, 4), + "signal_score": round(score, 4), + "price_change_24h_pct": round(price_change_24h, 4), + "volatility": metrics.get("volatility", 0.0), + "trend": metrics.get("trend", 0.0), + }, + ) + ) + ) + + healthy_count = len(watch_results) - need_review_count + summary = f"{need_review_count} position(s) need review, {healthy_count} healthy" + if need_review_count == 0: + summary = "All positions healthy — no action needed" + + return { + "watch_results": watch_results, + "summary": summary, + "need_review_count": need_review_count, + "healthy_count": healthy_count, + } diff --git a/src/coinhunter/services/research_service.py b/src/coinhunter/services/research_service.py deleted file mode 100644 index 3d66bea..0000000 --- a/src/coinhunter/services/research_service.py +++ /dev/null @@ -1,227 +0,0 @@ -"""External research signal providers for opportunity scoring.""" - -from __future__ import annotations - -import time -from collections.abc import Callable -from math import log10 -from typing import Any -from urllib.parse import urlencode - -import requests -from requests.exceptions import RequestException - -from .market_service import base_asset, normalize_symbol - -HttpGet = Callable[[str, dict[str, str], float], Any] -_PUBLIC_HTTP_ATTEMPTS = 5 - - -def _clamp(value: float, low: float = 0.0, high: float = 1.0) -> float: - return min(max(value, low), high) - - -def _as_float(value: Any, default: float = 0.0) -> float: - try: - return float(value) - except (TypeError, ValueError): - return default - - -def _safe_ratio(numerator: float, denominator: float) -> float: - if denominator <= 0: - return 0.0 - return numerator / denominator - - -def _log_score(value: float, *, floor: float, span: float) -> float: - if value <= 0: - return 0.0 - return _clamp((log10(value) - floor) / span) - - -def _pct_score(value: float, *, low: float, high: float) -> float: - if high <= low: - return 0.0 - return _clamp((value - low) / (high - low)) - - -def _public_http_get(url: str, headers: dict[str, str], timeout: float) -> Any: - last_error: RequestException | None = None - for attempt in range(_PUBLIC_HTTP_ATTEMPTS): - try: - response = requests.get(url, headers=headers, timeout=timeout) - response.raise_for_status() - return response.json() - except RequestException as exc: - last_error = exc - if attempt < _PUBLIC_HTTP_ATTEMPTS - 1: - time.sleep(0.5 * (attempt + 1)) - if last_error is not None: - raise last_error - raise RuntimeError("public HTTP request failed") - - -def _build_url(base_url: str, path: str, params: dict[str, str]) -> str: - return f"{base_url.rstrip('/')}{path}?{urlencode(params)}" - - -def _chunked(items: list[str], size: int) -> list[list[str]]: - return [items[index : index + size] for index in range(0, len(items), size)] - - -def _coingecko_market_to_signals(row: dict[str, Any], *, is_trending: bool = False) -> dict[str, float]: - market_cap = _as_float(row.get("market_cap")) - fdv = _as_float(row.get("fully_diluted_valuation")) - volume = _as_float(row.get("total_volume")) - rank = _as_float(row.get("market_cap_rank"), 9999.0) - circulating = _as_float(row.get("circulating_supply")) - total_supply = _as_float(row.get("total_supply")) - max_supply = _as_float(row.get("max_supply")) - supply_cap = max_supply or total_supply - - rank_score = _clamp(1.0 - (log10(max(rank, 1.0)) / 4.0)) - size_score = _log_score(market_cap, floor=7.0, span=5.0) - volume_to_mcap = _safe_ratio(volume, market_cap) - liquidity_quality = _clamp(volume_to_mcap / 0.10) - - fdv_ratio = _safe_ratio(fdv, market_cap) if fdv and market_cap else 1.0 - fdv_dilution_risk = _clamp((fdv_ratio - 1.0) / 4.0) - supply_unlocked = _clamp(_safe_ratio(circulating, supply_cap)) if supply_cap else max(0.0, 1.0 - fdv_dilution_risk) - supply_dilution_risk = 1.0 - supply_unlocked - unlock_risk = max(fdv_dilution_risk, supply_dilution_risk * 0.8) - - pct_7d = _as_float(row.get("price_change_percentage_7d_in_currency")) - pct_30d = _as_float(row.get("price_change_percentage_30d_in_currency")) - pct_200d = _as_float(row.get("price_change_percentage_200d_in_currency")) - medium_momentum = _pct_score(pct_30d, low=-15.0, high=60.0) - long_momentum = _pct_score(pct_200d, low=-40.0, high=150.0) - trend_catalyst = _pct_score(pct_7d, low=-5.0, high=25.0) - trend_bonus = 1.0 if is_trending else 0.0 - - tokenomics = _clamp(0.65 * supply_unlocked + 0.35 * (1.0 - fdv_dilution_risk)) - fundamental = _clamp(0.40 * rank_score + 0.35 * size_score + 0.25 * liquidity_quality) - catalyst = _clamp(0.45 * trend_catalyst + 0.40 * medium_momentum + 0.15 * trend_bonus) - adoption = _clamp(0.45 * rank_score + 0.35 * liquidity_quality + 0.20 * long_momentum) - smart_money = _clamp(0.35 * rank_score + 0.35 * liquidity_quality + 0.30 * (1.0 - unlock_risk)) - regulatory_risk = 0.10 if rank <= 100 else 0.20 if rank <= 500 else 0.35 - - populated_fields = sum( - 1 - for value in (market_cap, fdv, volume, rank, circulating, supply_cap, pct_7d, pct_30d, pct_200d) - if value - ) - confidence = _clamp(populated_fields / 9.0) - - return { - "fundamental": round(fundamental, 4), - "tokenomics": round(tokenomics, 4), - "catalyst": round(catalyst, 4), - "adoption": round(adoption, 4), - "smart_money": round(smart_money, 4), - "unlock_risk": round(unlock_risk, 4), - "regulatory_risk": round(regulatory_risk, 4), - "research_confidence": round(confidence, 4), - } - - -def _coingecko_headers(config: dict[str, Any]) -> dict[str, str]: - coingecko_config = config.get("coingecko", {}) - headers = {"accept": "application/json", "user-agent": "coinhunter/2"} - api_key = str(coingecko_config.get("api_key", "")).strip() - if api_key: - headers["x-cg-demo-api-key"] = api_key - return headers - - -def _fetch_coingecko_research( - config: dict[str, Any], - *, - symbols: list[str], - quote: str, - http_get: HttpGet | None = None, -) -> dict[str, dict[str, float]]: - if not symbols: - return {} - - opportunity_config = config.get("opportunity", {}) - coingecko_config = config.get("coingecko", {}) - base_url = str(coingecko_config.get("base_url", "https://api.coingecko.com/api/v3")) - timeout = _as_float(opportunity_config.get("research_timeout_seconds"), 4.0) - headers = _coingecko_headers(config) - http_get = http_get or _public_http_get - - base_to_symbol = { - base_asset(normalize_symbol(symbol), quote).lower(): normalize_symbol(symbol) - for symbol in symbols - if normalize_symbol(symbol) - } - bases = sorted(base_to_symbol) - if not bases: - return {} - - trending_ids: set[str] = set() - try: - trending_url = _build_url(base_url, "/search/trending", {}) - trending_payload = http_get(trending_url, headers, timeout) - for item in trending_payload.get("coins", []): - coin = item.get("item", {}) - coin_id = str(coin.get("id", "")).strip() - if coin_id: - trending_ids.add(coin_id) - except Exception: - trending_ids = set() - - research: dict[str, dict[str, float]] = {} - for chunk in _chunked(bases, 50): - params = { - "vs_currency": "usd", - "symbols": ",".join(chunk), - "include_tokens": "top", - "order": "market_cap_desc", - "per_page": "250", - "page": "1", - "sparkline": "false", - "price_change_percentage": "7d,30d,200d", - } - try: - markets_url = _build_url(base_url, "/coins/markets", params) - rows = http_get(markets_url, headers, timeout) - except Exception: - continue - - seen_bases: set[str] = set() - for row in rows if isinstance(rows, list) else []: - symbol = str(row.get("symbol", "")).lower() - if symbol in seen_bases or symbol not in base_to_symbol: - continue - seen_bases.add(symbol) - normalized = base_to_symbol[symbol] - research[normalized] = _coingecko_market_to_signals( - row, - is_trending=str(row.get("id", "")) in trending_ids, - ) - return research - - -def get_external_research( - config: dict[str, Any], - *, - symbols: list[str], - quote: str, - http_get: HttpGet | None = None, -) -> dict[str, dict[str, float]]: - """Fetch automated research signals for symbols. - - Returns an empty map when disabled or when the configured provider is unavailable. - Opportunity scans should continue rather than fail because a research endpoint timed out. - """ - opportunity_config = config.get("opportunity", {}) - if not bool(opportunity_config.get("auto_research", True)): - return {} - provider = str(opportunity_config.get("research_provider", "coingecko")).strip().lower() - if provider in {"", "off", "none", "disabled"}: - return {} - if provider != "coingecko": - return {} - return _fetch_coingecko_research(config, symbols=symbols, quote=quote, http_get=http_get) diff --git a/src/coinhunter/services/strategy_service.py b/src/coinhunter/services/strategy_service.py deleted file mode 100644 index 7db73c5..0000000 --- a/src/coinhunter/services/strategy_service.py +++ /dev/null @@ -1,339 +0,0 @@ -"""Unified strategy combining opportunity scanning and portfolio management.""" - -from __future__ import annotations - -from dataclasses import asdict, dataclass -from typing import Any - -from .market_service import normalize_symbol -from .opportunity_service import ( - _action_for_opportunity, - _opportunity_thresholds, - scan_opportunities, -) -from .portfolio_service import ( - _action_for_position, - _portfolio_thresholds, - analyze_portfolio, -) -from .signal_service import score_opportunity_signal, score_portfolio_signal - - -@dataclass -class TradeSignal: - symbol: str - action: str - side: str - score: float - reasons: list[str] - opportunity_metrics: dict[str, float] - portfolio_metrics: dict[str, float] - - -def _held_symbols(positions: list[dict[str, Any]]) -> set[str]: - return {normalize_symbol(p["symbol"]) for p in positions} - - -def generate_trade_signals( - config: dict[str, Any], - *, - spot_client: Any, - symbols: list[str] | None = None, -) -> dict[str, Any]: - """Combine opportunity and portfolio signals into unified buy/sell/hold recommendations. - - Buy criteria: - - Opportunity action is "entry" - - Not already held OR portfolio allows "add" - - Position concentration below max weight - - Sell criteria: - - Position exists and portfolio action is "exit" or "trim" - - Hold criteria: - - Position exists and portfolio action is "hold" - """ - portfolio_config = config.get("portfolio", {}) - max_position_weight = float(portfolio_config.get("max_position_weight", 0.6)) - - opp_result = scan_opportunities(config, spot_client=spot_client, symbols=symbols) - pf_result = analyze_portfolio(config, spot_client=spot_client) - - held = {normalize_symbol(p["symbol"]): p for p in pf_result.get("recommendations", [])} - total_notional = sum(p.get("metrics", {}).get("position_weight", 0) for p in held.values()) or 1.0 - - buys: list[dict[str, Any]] = [] - sells: list[dict[str, Any]] = [] - holds: list[dict[str, Any]] = [] - - for rec in opp_result.get("recommendations", []): - symbol = normalize_symbol(rec["symbol"]) - opp_action = rec["action"] - score = rec["score"] - reasons = list(rec.get("reasons", [])) - opp_metrics = dict(rec.get("metrics", {})) - - pf_rec = held.get(symbol) - pf_action = pf_rec["action"] if pf_rec else "none" - pf_metrics = dict(pf_rec.get("metrics", {})) if pf_rec else {} - concentration = pf_metrics.get("position_weight", 0.0) - - if opp_action == "entry" and (symbol not in held or pf_action in ("add", "hold")): - if concentration < max_position_weight: - reasons.append(f"portfolio: {pf_action or 'not held'} -> buy") - buys.append( - asdict( - TradeSignal( - symbol=symbol, - action="buy", - side="BUY", - score=round(score, 4), - reasons=reasons, - opportunity_metrics=opp_metrics, - portfolio_metrics=pf_metrics, - ) - ) - ) - else: - reasons.append(f"portfolio: position weight {concentration:.2%} at max -> skip") - holds.append( - asdict( - TradeSignal( - symbol=symbol, - action="hold", - side="HOLD", - score=round(score, 4), - reasons=reasons, - opportunity_metrics=opp_metrics, - portfolio_metrics=pf_metrics, - ) - ) - ) - - for symbol, pf_rec in held.items(): - pf_action = pf_rec["action"] - score = pf_rec["score"] - reasons = list(pf_rec.get("reasons", [])) - pf_metrics = dict(pf_rec.get("metrics", {})) - - opp_rec = next((r for r in opp_result.get("recommendations", []) if normalize_symbol(r["symbol"]) == symbol), None) - opp_metrics = dict(opp_rec.get("metrics", {})) if opp_rec else {} - - if pf_action in ("exit", "trim"): - reasons.append(f"opportunity: {opp_rec['action'] if opp_rec else 'not in scan'} -> sell") - sells.append( - asdict( - TradeSignal( - symbol=symbol, - action="sell", - side="SELL", - score=round(score, 4), - reasons=reasons, - opportunity_metrics=opp_metrics, - portfolio_metrics=pf_metrics, - ) - ) - ) - elif pf_action == "hold": - reasons.append(f"opportunity: {opp_rec['action'] if opp_rec else 'not in scan'} -> hold") - holds.append( - asdict( - TradeSignal( - symbol=symbol, - action="hold", - side="HOLD", - score=round(score, 4), - reasons=reasons, - opportunity_metrics=opp_metrics, - portfolio_metrics=pf_metrics, - ) - ) - ) - elif pf_action == "add": - # Already handled in buy loop if opp is entry; otherwise treat as hold - if not any(normalize_symbol(b["symbol"]) == symbol for b in buys): - reasons.append("opportunity: no entry signal -> hold") - holds.append( - asdict( - TradeSignal( - symbol=symbol, - action="hold", - side="HOLD", - score=round(score, 4), - reasons=reasons, - opportunity_metrics=opp_metrics, - portfolio_metrics=pf_metrics, - ) - ) - ) - - return { - "buy": sorted(buys, key=lambda item: item["score"], reverse=True), - "sell": sorted(sells, key=lambda item: item["score"]), - "hold": sorted(holds, key=lambda item: item["score"], reverse=True), - } - - -def _series_from_klines(klines: list[list[Any]]) -> tuple[list[float], list[float]]: - return [float(item[4]) for item in klines], [float(item[5]) for item in klines] - - -def generate_signals_from_klines( - config: dict[str, Any], - *, - klines_by_symbol: dict[str, list[list[Any]]], - held_positions: list[dict[str, Any]], -) -> dict[str, Any]: - """Pure version of signal generation that works on in-memory klines. - - Used by backtest to avoid network calls. - """ - opportunity_config = config.get("opportunity", {}) - portfolio_config = config.get("portfolio", {}) - thresholds = _opportunity_thresholds(config) - pf_thresholds = _portfolio_thresholds(config) - max_position_weight = pf_thresholds["max_position_weight"] - weights = opportunity_config.get("weights", {}) - signal_weights = config.get("signal", {}) - top_n = int(opportunity_config.get("top_n", 10)) - - held = {normalize_symbol(p["symbol"]): p for p in held_positions} - total_notional = sum(p.get("notional_usdt", 0) for p in held_positions) or 1.0 - - opp_candidates: list[dict[str, Any]] = [] - for symbol, rows in klines_by_symbol.items(): - if len(rows) < 6: - continue - closes, volumes = _series_from_klines(rows) - ticker = { - "symbol": symbol, - "price_change_pct": ((closes[-1] - closes[0]) / closes[0] * 100) if closes[0] else 0.0, - "quote_volume": sum(c * v for c, v in zip(closes, volumes)), - } - opportunity_score, metrics = score_opportunity_signal(closes, volumes, ticker, opportunity_config) - score = opportunity_score - metrics["opportunity_score"] = round(opportunity_score, 4) - action, reasons, _confidence = _action_for_opportunity(score, metrics, thresholds) - opp_candidates.append({ - "symbol": symbol, - "action": action, - "score": round(score, 4), - "metrics": metrics, - "reasons": reasons, - }) - - pf_results: dict[str, dict[str, Any]] = {} - for symbol, position in held.items(): - rows = klines_by_symbol.get(symbol, []) - if len(rows) < 2: - continue - closes, volumes = _series_from_klines(rows) - ticker = {"price_change_pct": ((closes[-1] - closes[0]) / closes[0] * 100) if closes[0] else 0.0} - concentration = position.get("notional_usdt", 0) / total_notional - score, metrics = score_portfolio_signal(closes, volumes, ticker, signal_weights) - pf_action, pf_reasons = _action_for_position(score, concentration, pf_thresholds) - metrics["position_weight"] = round(concentration, 4) - pf_results[symbol] = { - "symbol": symbol, - "action": pf_action, - "score": round(score, 4), - "reasons": pf_reasons, - "metrics": metrics, - "notional_usdt": position.get("notional_usdt", 0), - } - - buys: list[dict[str, Any]] = [] - sells: list[dict[str, Any]] = [] - holds: list[dict[str, Any]] = [] - - for rec in sorted(opp_candidates, key=lambda item: item["score"], reverse=True)[:top_n]: - symbol = normalize_symbol(rec["symbol"]) - opp_action = rec["action"] - score = rec["score"] - reasons = list(rec.get("reasons", [])) - opp_metrics = dict(rec.get("metrics", {})) - - pf_rec = pf_results.get(symbol) - pf_action = pf_rec["action"] if pf_rec else "none" - pf_metrics = dict(pf_rec.get("metrics", {})) if pf_rec else {} - concentration = pf_metrics.get("position_weight", 0.0) - - if opp_action == "entry" and (symbol not in held or pf_action in ("add", "hold")): - if concentration < max_position_weight: - reasons.append(f"portfolio: {pf_action or 'not held'} -> buy") - buys.append( - asdict( - TradeSignal( - symbol=symbol, - action="buy", - side="BUY", - score=round(score, 4), - reasons=reasons, - opportunity_metrics=opp_metrics, - portfolio_metrics=pf_metrics, - ) - ) - ) - else: - reasons.append(f"portfolio: position weight {concentration:.2%} at max -> skip") - - for symbol, pf_rec in pf_results.items(): - pf_action = pf_rec["action"] - score = pf_rec["score"] - reasons = list(pf_rec.get("reasons", [])) - pf_metrics = dict(pf_rec.get("metrics", {})) - opp_rec = next((r for r in opp_candidates if normalize_symbol(r["symbol"]) == symbol), None) - opp_metrics = dict(opp_rec.get("metrics", {})) if opp_rec else {} - - if pf_action in ("exit", "trim"): - reasons.append(f"opportunity: {opp_rec['action'] if opp_rec else 'not in scan'} -> sell") - sells.append( - asdict( - TradeSignal( - symbol=symbol, - action="sell", - side="SELL", - score=round(score, 4), - reasons=reasons, - opportunity_metrics=opp_metrics, - portfolio_metrics=pf_metrics, - ) - ) - ) - elif pf_action == "hold": - reasons.append(f"opportunity: {opp_rec['action'] if opp_rec else 'not in scan'} -> hold") - holds.append( - asdict( - TradeSignal( - symbol=symbol, - action="hold", - side="HOLD", - score=round(score, 4), - reasons=reasons, - opportunity_metrics=opp_metrics, - portfolio_metrics=pf_metrics, - ) - ) - ) - elif pf_action == "add": - if not any(normalize_symbol(b["symbol"]) == symbol for b in buys): - reasons.append("opportunity: no entry signal -> hold") - holds.append( - asdict( - TradeSignal( - symbol=symbol, - action="hold", - side="HOLD", - score=round(score, 4), - reasons=reasons, - opportunity_metrics=opp_metrics, - portfolio_metrics=pf_metrics, - ) - ) - ) - - return { - "buy": sorted(buys, key=lambda item: item["score"], reverse=True), - "sell": sorted(sells, key=lambda item: item["score"]), - "hold": sorted(holds, key=lambda item: item["score"], reverse=True), - } diff --git a/tests/test_backtest_service.py b/tests/test_backtest_service.py deleted file mode 100644 index 1975be9..0000000 --- a/tests/test_backtest_service.py +++ /dev/null @@ -1,129 +0,0 @@ -"""Tests for backtest_service.""" - -from __future__ import annotations - -import json -import tempfile -import unittest -from pathlib import Path -from typing import Any - -from coinhunter.services import backtest_service - - -class BacktestServiceTestCase(unittest.TestCase): - def _klines(self, closes: list[float], start_ms: int = 0, volumes: list[float] | None = None) -> list[list[float]]: - volumes = volumes or [1.0] * len(closes) - return [ - [start_ms + i * 3600000, c * 0.98, c * 1.02, c * 0.97, c, v, 0.0, c * v, 100, 0.0, 0.0, 0.0] - for i, (c, v) in enumerate(zip(closes, volumes)) - ] - - def _config(self) -> dict[str, Any]: - return { - "opportunity": { - "entry_threshold": 1.5, - "watch_threshold": 0.6, - "min_trigger_score": 0.45, - "min_setup_score": 0.35, - "overlap_penalty": 0.6, - "top_n": 10, - "scan_limit": 50, - "kline_limit": 48, - "weights": {}, - "model_weights": {}, - }, - "portfolio": { - "add_threshold": 1.5, - "hold_threshold": 0.6, - "trim_threshold": 0.2, - "exit_threshold": -0.2, - "max_position_weight": 0.6, - "max_positions": 5, - }, - "signal": { - "lookback_interval": "1h", - }, - "market": { - "default_quote": "USDT", - }, - "trading": { - "commission_pct": 0.001, - }, - } - - def _make_dataset(self, closes_by_symbol: dict[str, list[float]], start_iso: str = "2025-12-28T00:00:00Z", sim_start_iso: str = "2025-12-30T00:00:00Z", sim_end_iso: str = "2026-01-01T00:00:00Z") -> Path: - from datetime import datetime, timezone - start_ms = int(datetime.fromisoformat(start_iso.replace("Z", "+00:00")).timestamp() * 1000) - klines: dict[str, dict[str, list[list[float]]]] = {} - for symbol, closes in closes_by_symbol.items(): - klines[symbol] = {"1h": self._klines(closes, start_ms=start_ms)} - dataset = { - "metadata": { - "created_at": "2026-01-01T00:00:00Z", - "quote": "USDT", - "symbols": list(closes_by_symbol.keys()), - "plan": { - "intervals": ["1h"], - "kline_limit": 48, - "reference_days": 2.0, - "simulate_days": 1.0, - "run_days": 1.0, - "total_days": 4.0, - "start": start_iso, - "simulation_start": sim_start_iso, - "simulation_end": sim_end_iso, - "end": sim_end_iso, - }, - "external_history": {"provider": "disabled", "status": "disabled"}, - }, - "klines": klines, - } - fp = tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) - json.dump(dataset, fp) - fp.close() - return Path(fp.name) - - def test_run_backtest_produces_summary(self) -> None: - config = self._config() - closes = list(range(20, 92)) - path = self._make_dataset({"BTCUSDT": closes}) - try: - result = backtest_service.run_backtest(config, dataset_path=str(path), initial_cash=10000.0) - self.assertIn("summary", result) - self.assertIn("trades", result) - self.assertIn("equity_curve", result) - self.assertIn("parameters", result) - summary = result["summary"] - self.assertIn("initial_cash", summary) - self.assertIn("final_equity", summary) - self.assertIn("total_return_pct", summary) - self.assertIn("max_drawdown_pct", summary) - self.assertIn("win_rate", summary) - finally: - path.unlink() - - def test_run_backtest_missing_simulation_dates_raises(self) -> None: - config = self._config() - path = self._make_dataset({"BTCUSDT": list(range(20, 92))}, sim_start_iso="", sim_end_iso="") - try: - with self.assertRaises(ValueError): - backtest_service.run_backtest(config, dataset_path=str(path)) - finally: - path.unlink() - - def test_run_backtest_tracks_equity_curve(self) -> None: - config = self._config() - # Need ~72 candles to cover 2025-12-28 through 2026-01-01 (warmup + simulation) - closes = list(range(20, 92)) - path = self._make_dataset({"BTCUSDT": closes}) - try: - result = backtest_service.run_backtest(config, dataset_path=str(path), initial_cash=10000.0) - self.assertTrue(len(result["equity_curve"]) > 0) - first = result["equity_curve"][0] - self.assertIn("time", first) - self.assertIn("equity", first) - self.assertIn("cash", first) - self.assertIn("positions_count", first) - finally: - path.unlink() diff --git a/tests/test_cli.py b/tests/test_cli.py index 93eb106..7a3cd2b 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -10,7 +10,7 @@ from coinhunter import cli class CLITestCase(unittest.TestCase): - def test_help_includes_v2_commands(self): + def test_help_includes_core_commands(self): parser = cli.build_parser() help_text = parser.format_help() self.assertIn("init", help_text) @@ -18,7 +18,9 @@ class CLITestCase(unittest.TestCase): self.assertIn("buy", help_text) self.assertIn("sell", help_text) self.assertIn("portfolio", help_text) - self.assertIn("opportunity", help_text) + self.assertIn("scan", help_text) + self.assertIn("analyze", help_text) + self.assertIn("watch", help_text) self.assertIn("--doc", help_text) def test_init_dispatches(self): @@ -150,11 +152,11 @@ class CLITestCase(unittest.TestCase): self.assertEqual(result, 0) self.assertEqual(captured["payload"]["recommendations"][0]["symbol"], "BTCUSDT") - def test_opportunity_dispatches(self): + def test_scan_dispatches(self): captured = {} with ( patch.object( - cli, "load_config", return_value={"binance": {"spot_base_url": "https://test", "recv_window": 5000}, "market": {"default_quote": "USDT"}, "opportunity": {"top_n": 10}} + cli, "load_config", return_value={"binance": {"spot_base_url": "https://test", "recv_window": 5000}, "market": {"default_quote": "USDT"}, "opportunity": {"top_n": 5}} ), patch.object(cli, "get_binance_credentials", return_value={"api_key": "k", "api_secret": "s"}), patch.object(cli, "SpotBinanceClient"), @@ -167,10 +169,52 @@ class CLITestCase(unittest.TestCase): cli, "print_output", side_effect=lambda payload, **kwargs: captured.setdefault("payload", payload) ), ): - result = cli.main(["opportunity", "-s", "BTCUSDT", "ETHUSDT"]) + result = cli.main(["scan", "-s", "BTCUSDT", "ETHUSDT"]) self.assertEqual(result, 0) self.assertEqual(captured["payload"]["recommendations"][0]["symbol"], "BTCUSDT") + def test_analyze_dispatches(self): + captured = {} + with ( + patch.object( + cli, "load_config", return_value={"binance": {"spot_base_url": "https://test", "recv_window": 5000}, "market": {"default_quote": "USDT"}} + ), + patch.object(cli, "get_binance_credentials", return_value={"api_key": "k", "api_secret": "s"}), + patch.object(cli, "SpotBinanceClient"), + patch.object( + cli.analyze_service, + "analyze_symbols", + return_value={"analyses": [{"symbol": "BTCUSDT", "summary": "test"}]}, + ), + patch.object( + cli, "print_output", side_effect=lambda payload, **kwargs: captured.setdefault("payload", payload) + ), + ): + result = cli.main(["analyze", "BTCUSDT", "ETHUSDT"]) + self.assertEqual(result, 0) + self.assertEqual(captured["payload"]["analyses"][0]["symbol"], "BTCUSDT") + + def test_watch_dispatches(self): + captured = {} + with ( + patch.object( + cli, "load_config", return_value={"binance": {"spot_base_url": "https://test", "recv_window": 5000}, "market": {"default_quote": "USDT"}, "watch": {}} + ), + patch.object(cli, "get_binance_credentials", return_value={"api_key": "k", "api_secret": "s"}), + patch.object(cli, "SpotBinanceClient"), + patch.object( + cli.portfolio_service, + "watch_portfolio", + return_value={"watch_results": [{"symbol": "BTCUSDT", "status": "healthy"}], "summary": "1 healthy", "need_review_count": 0, "healthy_count": 1}, + ), + patch.object( + cli, "print_output", side_effect=lambda payload, **kwargs: captured.setdefault("payload", payload) + ), + ): + result = cli.main(["watch"]) + self.assertEqual(result, 0) + self.assertEqual(captured["payload"]["watch_results"][0]["symbol"], "BTCUSDT") + def test_catlog_dispatches(self): captured = {} with ( @@ -248,215 +292,3 @@ class CLITestCase(unittest.TestCase): content = __import__("pathlib").Path(tmp_path).read_text() self.assertIn("BINANCE_API_SECRET=test_secret_value", content) __import__("os").unlink(tmp_path) - - def test_opportunity_dataset_dispatches_without_private_client(self): - captured = {} - config = {"market": {"default_quote": "USDT"}, "opportunity": {}} - with ( - patch.object(cli, "load_config", return_value=config), - patch.object(cli, "_load_spot_client", side_effect=AssertionError("dataset should use public data")), - patch.object( - cli.opportunity_dataset_service, - "collect_opportunity_dataset", - return_value={"path": "/tmp/dataset.json", "symbols": ["BTCUSDT"]}, - ) as collect_mock, - patch.object( - cli, - "print_output", - side_effect=lambda payload, **kwargs: captured.update({"payload": payload, "agent": kwargs["agent"]}), - ), - ): - result = cli.main( - ["opportunity", "dataset", "--symbols", "BTCUSDT", "--simulate-days", "3", "--run-days", "7", "--agent"] - ) - - self.assertEqual(result, 0) - self.assertEqual(captured["payload"]["path"], "/tmp/dataset.json") - self.assertTrue(captured["agent"]) - collect_mock.assert_called_once_with( - config, - symbols=["BTCUSDT"], - simulate_days=3.0, - run_days=7.0, - output_path=None, - ) - - def test_opportunity_evaluate_dispatches_without_private_client(self): - captured = {} - config = {"market": {"default_quote": "USDT"}, "opportunity": {}} - with ( - patch.object(cli, "load_config", return_value=config), - patch.object(cli, "_load_spot_client", side_effect=AssertionError("evaluate should use dataset only")), - patch.object( - cli.opportunity_evaluation_service, - "evaluate_opportunity_dataset", - return_value={"summary": {"count": 1, "correct": 1}}, - ) as evaluate_mock, - patch.object( - cli, - "print_output", - side_effect=lambda payload, **kwargs: captured.update({"payload": payload, "agent": kwargs["agent"]}), - ), - ): - result = cli.main( - [ - "opportunity", - "evaluate", - "/tmp/dataset.json", - "--horizon-hours", - "6", - "--take-profit-pct", - "2", - "--stop-loss-pct", - "1.5", - "--setup-target-pct", - "1", - "--lookback", - "24", - "--top-n", - "3", - "--examples", - "5", - "--agent", - ] - ) - - self.assertEqual(result, 0) - self.assertEqual(captured["payload"]["summary"]["correct"], 1) - self.assertTrue(captured["agent"]) - evaluate_mock.assert_called_once_with( - config, - dataset_path="/tmp/dataset.json", - horizon_hours=6.0, - take_profit=0.02, - stop_loss=0.015, - setup_target=0.01, - lookback=24, - top_n=3, - max_examples=5, - ) - - def test_strategy_dispatches(self): - captured = {} - with ( - patch.object( - cli, "load_config", return_value={"binance": {"spot_base_url": "https://test", "recv_window": 5000}, "market": {"default_quote": "USDT"}, "opportunity": {"top_n": 10}} - ), - patch.object(cli, "get_binance_credentials", return_value={"api_key": "k", "api_secret": "s"}), - patch.object(cli, "SpotBinanceClient"), - patch.object( - cli.strategy_service, - "generate_trade_signals", - return_value={"buy": [{"symbol": "BTCUSDT", "score": 0.82}], "sell": [], "hold": []}, - ), - patch.object( - cli, "print_output", side_effect=lambda payload, **kwargs: captured.setdefault("payload", payload) - ), - ): - result = cli.main(["strategy", "-s", "BTCUSDT"]) - self.assertEqual(result, 0) - self.assertEqual(captured["payload"]["buy"][0]["symbol"], "BTCUSDT") - - def test_backtest_dispatches_without_private_client(self): - captured = {} - config = {"market": {"default_quote": "USDT"}, "opportunity": {}} - with ( - patch.object(cli, "load_config", return_value=config), - patch.object(cli, "_load_spot_client", side_effect=AssertionError("backtest should use dataset only")), - patch.object( - cli.backtest_service, - "run_backtest", - return_value={"summary": {"total_return_pct": 5.0, "win_rate": 0.6}, "trades": []}, - ) as backtest_mock, - patch.object( - cli, - "print_output", - side_effect=lambda payload, **kwargs: captured.update({"payload": payload, "agent": kwargs["agent"]}), - ), - ): - result = cli.main( - [ - "backtest", - "/tmp/dataset.json", - "--initial-cash", - "5000", - "--max-positions", - "3", - "--position-size-pct", - "20", - "--commission-pct", - "0.1", - "--lookback", - "12", - "--agent", - ] - ) - - self.assertEqual(result, 0) - self.assertEqual(captured["payload"]["summary"]["total_return_pct"], 5.0) - self.assertTrue(captured["agent"]) - backtest_mock.assert_called_once_with( - config, - dataset_path="/tmp/dataset.json", - initial_cash=5000.0, - max_positions=3, - position_size_pct=0.2, - commission_pct=0.001, - lookback=12, - decision_interval_minutes=None, - ) - - def test_opportunity_optimize_dispatches_without_private_client(self): - captured = {} - config = {"market": {"default_quote": "USDT"}, "opportunity": {}} - with ( - patch.object(cli, "load_config", return_value=config), - patch.object(cli, "_load_spot_client", side_effect=AssertionError("optimize should use dataset only")), - patch.object( - cli.opportunity_evaluation_service, - "optimize_opportunity_model", - return_value={"best": {"summary": {"accuracy": 0.7}}}, - ) as optimize_mock, - patch.object( - cli, - "print_output", - side_effect=lambda payload, **kwargs: captured.update({"payload": payload, "agent": kwargs["agent"]}), - ), - ): - result = cli.main( - [ - "opportunity", - "optimize", - "/tmp/dataset.json", - "--horizon-hours", - "6", - "--take-profit-pct", - "2", - "--stop-loss-pct", - "1.5", - "--setup-target-pct", - "1", - "--lookback", - "24", - "--top-n", - "3", - "--passes", - "1", - "--agent", - ] - ) - - self.assertEqual(result, 0) - self.assertEqual(captured["payload"]["best"]["summary"]["accuracy"], 0.7) - self.assertTrue(captured["agent"]) - optimize_mock.assert_called_once_with( - config, - dataset_path="/tmp/dataset.json", - horizon_hours=6.0, - take_profit=0.02, - stop_loss=0.015, - setup_target=0.01, - lookback=24, - top_n=3, - passes=1, - ) diff --git a/tests/test_opportunity_dataset_service.py b/tests/test_opportunity_dataset_service.py deleted file mode 100644 index 3012b22..0000000 --- a/tests/test_opportunity_dataset_service.py +++ /dev/null @@ -1,280 +0,0 @@ -"""Opportunity dataset collection tests.""" - -from __future__ import annotations - -import json -import tempfile -import unittest -from datetime import datetime, timezone -from pathlib import Path - -from coinhunter.services import ( - opportunity_dataset_service, - opportunity_evaluation_service, -) - - -class OpportunityDatasetServiceTestCase(unittest.TestCase): - def test_default_plan_uses_widest_scan_reference_window(self): - config = {"opportunity": {"lookback_intervals": ["1h", "4h", "1d"]}} - plan = opportunity_dataset_service.build_dataset_plan( - config, - now=datetime(2026, 4, 21, tzinfo=timezone.utc), - ) - - self.assertEqual(plan.kline_limit, 48) - self.assertEqual(plan.reference_days, 48.0) - self.assertEqual(plan.simulate_days, 7.0) - self.assertEqual(plan.run_days, 7.0) - self.assertEqual(plan.total_days, 62.0) - - def test_collect_dataset_writes_klines_and_probe_metadata(self): - config = { - "binance": {"spot_base_url": "https://api.binance.test"}, - "market": {"default_quote": "USDT"}, - "opportunity": { - "lookback_intervals": ["1d"], - "kline_limit": 2, - "simulate_days": 1, - "run_days": 1, - "auto_research": True, - "research_provider": "coingecko", - }, - } - - def fake_http_get(url, headers, timeout): - query = opportunity_dataset_service.parse_query(url) - interval_seconds = 86400 - start = int(query["startTime"]) - end = int(query["endTime"]) - rows = [] - cursor = start - index = 0 - while cursor <= end: - close = 100 + index - rows.append([cursor, close - 1, close + 1, close - 2, close, 10, cursor + interval_seconds * 1000 - 1, close * 10]) - cursor += interval_seconds * 1000 - index += 1 - return rows - - def fake_http_status(url, headers, timeout): - return 200, "{}" - - with tempfile.TemporaryDirectory() as tmpdir: - output = Path(tmpdir) / "dataset.json" - payload = opportunity_dataset_service.collect_opportunity_dataset( - config, - symbols=["BTCUSDT"], - output_path=str(output), - http_get=fake_http_get, - http_status=fake_http_status, - now=datetime(2026, 4, 21, tzinfo=timezone.utc), - ) - dataset = json.loads(output.read_text(encoding="utf-8")) - - self.assertEqual(payload["plan"]["reference_days"], 2.0) - self.assertEqual(payload["plan"]["total_days"], 4.0) - self.assertEqual(payload["external_history"]["status"], "available") - self.assertEqual(payload["counts"]["BTCUSDT"]["1d"], 5) - self.assertEqual(len(dataset["klines"]["BTCUSDT"]["1d"]), 5) - - -class OpportunityEvaluationServiceTestCase(unittest.TestCase): - def _rows(self, closes): - start = int(datetime(2026, 4, 20, tzinfo=timezone.utc).timestamp() * 1000) - rows = [] - for index, close in enumerate(closes): - open_time = start + index * 60 * 60 * 1000 - rows.append( - [ - open_time, - close * 0.995, - close * 1.01, - close * 0.995, - close, - 100 + index * 10, - open_time + 60 * 60 * 1000 - 1, - close * (100 + index * 10), - ] - ) - return rows - - def test_evaluate_dataset_counts_walk_forward_accuracy(self): - good = [ - 100, - 105, - 98, - 106, - 99, - 107, - 100, - 106, - 101, - 105, - 102, - 104, - 102.5, - 103, - 102.8, - 103.2, - 103.0, - 103.4, - 103.1, - 103.6, - 103.3, - 103.8, - 104.2, - 106, - 108.5, - 109, - ] - weak = [ - 100, - 99, - 98, - 97, - 96, - 95, - 94, - 93, - 92, - 91, - 90, - 89, - 88, - 87, - 86, - 85, - 84, - 83, - 82, - 81, - 80, - 79, - 78, - 77, - 76, - 75, - ] - good_rows = self._rows(good) - weak_rows = self._rows(weak) - simulation_start = datetime.fromtimestamp(good_rows[23][0] / 1000, tz=timezone.utc) - simulation_end = datetime.fromtimestamp(good_rows[24][0] / 1000, tz=timezone.utc) - dataset = { - "metadata": { - "symbols": ["GOODUSDT", "WEAKUSDT"], - "plan": { - "intervals": ["1h"], - "simulate_days": 1 / 12, - "simulation_start": simulation_start.isoformat().replace("+00:00", "Z"), - "simulation_end": simulation_end.isoformat().replace("+00:00", "Z"), - }, - }, - "klines": { - "GOODUSDT": {"1h": good_rows}, - "WEAKUSDT": {"1h": weak_rows}, - }, - } - config = { - "signal": {"lookback_interval": "1h"}, - "opportunity": { - "top_n": 2, - "min_quote_volume": 0.0, - "entry_threshold": 1.5, - "watch_threshold": 0.6, - "min_trigger_score": 0.45, - "min_setup_score": 0.35, - }, - } - - with tempfile.TemporaryDirectory() as tmpdir: - path = Path(tmpdir) / "dataset.json" - path.write_text(json.dumps(dataset), encoding="utf-8") - result = opportunity_evaluation_service.evaluate_opportunity_dataset( - config, - dataset_path=str(path), - take_profit=0.02, - stop_loss=0.015, - setup_target=0.01, - max_examples=2, - ) - - self.assertEqual(result["summary"]["count"], 2) - self.assertEqual(result["summary"]["correct"], 2) - self.assertEqual(result["summary"]["accuracy"], 1.0) - self.assertEqual(result["by_action"]["entry"]["correct"], 1) - self.assertEqual(result["trade_simulation"]["wins"], 1) - - def test_optimize_model_reports_recommended_weights(self): - rows = self._rows( - [ - 100, - 105, - 98, - 106, - 99, - 107, - 100, - 106, - 101, - 105, - 102, - 104, - 102.5, - 103, - 102.8, - 103.2, - 103.0, - 103.4, - 103.1, - 103.6, - 103.3, - 103.8, - 104.2, - 106, - 108.5, - 109, - ] - ) - simulation_start = datetime.fromtimestamp(rows[23][0] / 1000, tz=timezone.utc) - simulation_end = datetime.fromtimestamp(rows[24][0] / 1000, tz=timezone.utc) - dataset = { - "metadata": { - "symbols": ["GOODUSDT"], - "plan": { - "intervals": ["1h"], - "simulate_days": 1 / 12, - "simulation_start": simulation_start.isoformat().replace("+00:00", "Z"), - "simulation_end": simulation_end.isoformat().replace("+00:00", "Z"), - }, - }, - "klines": {"GOODUSDT": {"1h": rows}}, - } - config = { - "signal": {"lookback_interval": "1h"}, - "opportunity": { - "top_n": 1, - "min_quote_volume": 0.0, - "entry_threshold": 1.5, - "watch_threshold": 0.6, - "min_trigger_score": 0.45, - "min_setup_score": 0.35, - }, - } - - with tempfile.TemporaryDirectory() as tmpdir: - path = Path(tmpdir) / "dataset.json" - path.write_text(json.dumps(dataset), encoding="utf-8") - result = opportunity_evaluation_service.optimize_opportunity_model( - config, - dataset_path=str(path), - passes=1, - take_profit=0.02, - stop_loss=0.015, - setup_target=0.01, - ) - - self.assertIn("baseline", result) - self.assertIn("best", result) - self.assertIn("opportunity.model_weights.trigger", result["recommended_config"]) - self.assertEqual(result["search"]["optimized"], "model_weights_only") diff --git a/tests/test_opportunity_evaluation_service.py b/tests/test_opportunity_evaluation_service.py deleted file mode 100644 index c75113b..0000000 --- a/tests/test_opportunity_evaluation_service.py +++ /dev/null @@ -1,90 +0,0 @@ -"""Opportunity historical evaluation tests.""" - -from __future__ import annotations - -import json -import tempfile -import unittest -from pathlib import Path - -from coinhunter.services import opportunity_evaluation_service - - -def _rows(start_ms: int, closes: list[float]) -> list[list[float]]: - rows = [] - for index, close in enumerate(closes): - open_time = start_ms + index * 3_600_000 - volume = 1_000 + index * 10 - rows.append( - [ - float(open_time), - close * 0.99, - close * 1.02, - close * 0.98, - close, - float(volume), - float(open_time + 3_599_999), - close * volume, - ] - ) - return rows - - -class OpportunityEvaluationServiceTestCase(unittest.TestCase): - def test_evaluate_opportunity_dataset_scores_historical_samples(self): - start_ms = 1_767_225_600_000 - dataset = { - "metadata": { - "plan": { - "intervals": ["1h"], - "simulation_start": "2026-01-01T04:00:00Z", - "simulation_end": "2026-01-01T07:00:00Z", - "simulate_days": 1, - } - }, - "klines": { - "GOODUSDT": {"1h": _rows(start_ms, [100, 101, 102, 103, 104, 106, 108, 109, 110])}, - "BADUSDT": {"1h": _rows(start_ms, [100, 99, 98, 97, 96, 95, 94, 93, 92])}, - }, - } - config = { - "market": {"default_quote": "USDT"}, - "opportunity": { - "entry_threshold": 1.5, - "watch_threshold": 0.6, - "evaluation_horizon_hours": 2.0, - "evaluation_take_profit_pct": 1.0, - "evaluation_stop_loss_pct": 2.0, - "evaluation_setup_target_pct": 0.5, - "evaluation_lookback": 4, - "top_n": 2, - }, - } - - with tempfile.TemporaryDirectory() as tmp_dir: - dataset_path = Path(tmp_dir) / "opportunity-dataset.json" - dataset_path.write_text(json.dumps(dataset), encoding="utf-8") - - payload = opportunity_evaluation_service.evaluate_opportunity_dataset( - config, - dataset_path=str(dataset_path), - horizon_hours=2.0, - take_profit=0.01, - stop_loss=0.02, - setup_target=0.005, - lookback=4, - top_n=2, - max_examples=3, - ) - - self.assertEqual(payload["summary"]["symbols"], ["BADUSDT", "GOODUSDT"]) - self.assertEqual(payload["summary"]["interval"], "1h") - self.assertGreater(payload["summary"]["count"], 0) - self.assertIn("by_action", payload) - self.assertIn("trade_simulation", payload) - self.assertEqual(payload["rules"]["research_mode"], "disabled: dataset has no point-in-time research snapshots") - self.assertLessEqual(len(payload["examples"]), 3) - - -if __name__ == "__main__": - unittest.main() diff --git a/tests/test_opportunity_service.py b/tests/test_opportunity_service.py index fb31473..eab9815 100644 --- a/tests/test_opportunity_service.py +++ b/tests/test_opportunity_service.py @@ -8,7 +8,6 @@ from unittest.mock import patch from coinhunter.services import ( opportunity_service, portfolio_service, - research_service, signal_service, ) @@ -258,37 +257,6 @@ class OpportunityServiceTestCase(unittest.TestCase): "entry_threshold": 1.5, "watch_threshold": 0.6, "overlap_penalty": 0.6, - "auto_research": False, - "research_provider": "coingecko", - "research_timeout_seconds": 4.0, - "risk_limits": { - "min_liquidity": 0.0, - "max_overextension": 0.08, - "max_downside_risk": 0.3, - "max_unlock_risk": 0.75, - "max_regulatory_risk": 0.75, - "min_quality_for_add": 0.0, - }, - "weights": { - "trend": 1.0, - "momentum": 1.0, - "breakout": 0.8, - "pullback": 0.4, - "volume": 0.7, - "liquidity": 0.3, - "trend_alignment": 0.8, - "fundamental": 0.8, - "tokenomics": 0.7, - "catalyst": 0.5, - "adoption": 0.4, - "smart_money": 0.3, - "volatility_penalty": 0.5, - "overextension_penalty": 0.7, - "downside_penalty": 0.5, - "unlock_penalty": 0.8, - "regulatory_penalty": 0.4, - "position_concentration_penalty": 0.6, - }, }, "portfolio": { "add_threshold": 1.5, @@ -351,40 +319,6 @@ class OpportunityServiceTestCase(unittest.TestCase): self.assertEqual(score, 0.0) self.assertEqual(metrics["trend"], 0.0) - def test_scan_uses_automatic_external_research(self): - config = self.config | { - "opportunity": self.config["opportunity"] - | { - "auto_research": True, - "top_n": 2, - } - } - with ( - patch.object(opportunity_service, "audit_event", return_value=None), - patch.object( - opportunity_service, - "get_external_research", - return_value={ - "SOLUSDT": { - "fundamental": 0.9, - "tokenomics": 0.8, - "catalyst": 0.9, - "adoption": 0.8, - "smart_money": 0.7, - "unlock_risk": 0.1, - "regulatory_risk": 0.1, - "research_confidence": 0.9, - } - }, - ) as research_mock, - ): - payload = opportunity_service.scan_opportunities(config, spot_client=FakeSpotClient()) - - research_mock.assert_called_once() - sol = next(item for item in payload["recommendations"] if item["symbol"] == "SOLUSDT") - self.assertEqual(sol["metrics"]["fundamental"], 0.9) - self.assertEqual(sol["metrics"]["research_confidence"], 0.9) - def test_weak_setup_and_trigger_becomes_avoid(self): metrics = { "extension_penalty": 0.0, @@ -409,28 +343,18 @@ class OpportunityServiceTestCase(unittest.TestCase): self.assertIn("setup, trigger, or overall quality is too weak", reasons[0]) self.assertEqual(confidence, 50) - -class ResearchServiceTestCase(unittest.TestCase): - def test_coingecko_market_data_becomes_research_signals(self): - signals = research_service._coingecko_market_to_signals( - { - "id": "solana", - "symbol": "sol", - "market_cap": 80_000_000_000, - "fully_diluted_valuation": 95_000_000_000, - "total_volume": 5_000_000_000, - "market_cap_rank": 6, - "circulating_supply": 550_000_000, - "total_supply": 600_000_000, - "max_supply": None, - "price_change_percentage_7d_in_currency": 12.0, - "price_change_percentage_30d_in_currency": 35.0, - "price_change_percentage_200d_in_currency": 80.0, - }, - is_trending=True, - ) - - self.assertGreater(signals["fundamental"], 0.6) - self.assertGreater(signals["tokenomics"], 0.8) - self.assertGreater(signals["catalyst"], 0.6) - self.assertLess(signals["unlock_risk"], 0.2) + def test_watch_flags_anomalies(self): + config = self.config | { + "watch": { + "alert_drawdown_1h_pct": -5.0, + "alert_drawdown_24h_pct": -10.0, + "alert_spike_1h_pct": 8.0, + "max_position_weight": 0.5, + } + } + with patch.object(portfolio_service, "audit_event", return_value=None): + payload = portfolio_service.watch_portfolio(config, spot_client=FakeSpotClient()) + # FakeSpotClient BTC is +5% 24h, ETH is +3% — both should be healthy + self.assertGreaterEqual(payload["healthy_count"], 1) + for result in payload["watch_results"]: + self.assertIn(result["status"], {"healthy", "need_review"}) diff --git a/tests/test_strategy_service.py b/tests/test_strategy_service.py deleted file mode 100644 index 66583fa..0000000 --- a/tests/test_strategy_service.py +++ /dev/null @@ -1,100 +0,0 @@ -"""Tests for strategy_service.""" - -from __future__ import annotations - -import unittest -from typing import Any -from unittest import mock -from unittest.mock import MagicMock - -from coinhunter.services import strategy_service - - -class StrategyServiceTestCase(unittest.TestCase): - def _klines(self, closes: list[float], volumes: list[float] | None = None) -> list[list[float]]: - volumes = volumes or [1.0] * len(closes) - return [ - [i * 3600000.0, c * 0.98, c * 1.02, c * 0.97, c, v, 0.0, c * v, 100, 0.0, 0.0, 0.0] - for i, (c, v) in enumerate(zip(closes, volumes)) - ] - - def _config(self) -> dict[str, Any]: - return { - "opportunity": { - "entry_threshold": 1.5, - "watch_threshold": 0.6, - "min_trigger_score": 0.45, - "min_setup_score": 0.35, - "overlap_penalty": 0.6, - "top_n": 10, - "scan_limit": 50, - "kline_limit": 48, - "weights": {}, - "model_weights": {}, - }, - "portfolio": { - "add_threshold": 1.5, - "hold_threshold": 0.6, - "trim_threshold": 0.2, - "exit_threshold": -0.2, - "max_position_weight": 0.6, - }, - "signal": { - "lookback_interval": "1h", - }, - "market": { - "default_quote": "USDT", - }, - } - - def test_generate_signals_from_klines_buy_when_entry_and_not_held(self) -> None: - config = self._config() - closes = list(range(20, 40)) - klines = {"BTCUSDT": self._klines(closes)} - result = strategy_service.generate_signals_from_klines(config, klines_by_symbol=klines, held_positions=[]) - self.assertIn("buy", result) - self.assertIn("sell", result) - self.assertIn("hold", result) - - def test_generate_signals_from_klines_sell_when_exit_signal(self) -> None: - config = self._config() - closes = list(range(40, 20, -1)) - klines = {"BTCUSDT": self._klines(closes)} - held = [{"symbol": "BTCUSDT", "notional_usdt": 1000.0}] - result = strategy_service.generate_signals_from_klines(config, klines_by_symbol=klines, held_positions=held) - symbols = [s["symbol"] for s in result["sell"]] - self.assertIn("BTCUSDT", symbols) - - def test_generate_signals_respects_max_position_weight(self) -> None: - config = self._config() - config["portfolio"]["max_position_weight"] = 0.01 - closes = list(range(20, 40)) - klines = {"BTCUSDT": self._klines(closes)} - held = [{"symbol": "BTCUSDT", "notional_usdt": 9999.0}] - result = strategy_service.generate_signals_from_klines(config, klines_by_symbol=klines, held_positions=held) - buy_symbols = [s["symbol"] for s in result["buy"]] - self.assertNotIn("BTCUSDT", buy_symbols) - - @mock.patch("coinhunter.services.portfolio_service.audit_event") - @mock.patch("coinhunter.services.opportunity_service.audit_event") - def test_generate_trade_signals_dispatches_to_services(self, mock_audit_opp, mock_audit_pf) -> None: - mock_client = MagicMock() - mock_client.klines.return_value = self._klines(list(range(20, 44))) - mock_client.ticker_stats.return_value = [ - { - "symbol": "BTCUSDT", - "lastPrice": "30.0", - "priceChangePercent": "5.0", - "quoteVolume": "1000000", - "highPrice": "31.0", - "lowPrice": "29.0", - } - ] - mock_client.account.return_value = {"balances": [{"asset": "BTC", "free": "0.5", "locked": "0.0"}]} - mock_client.exchange_info.return_value = {"symbols": [{"symbol": "BTCUSDT", "status": "TRADING"}]} - - config = self._config() - result = strategy_service.generate_trade_signals(config, spot_client=mock_client) - self.assertIn("buy", result) - self.assertIn("sell", result) - self.assertIn("hold", result)