From e4b2239bcd52245d161d66afc3568a6011d59469 Mon Sep 17 00:00:00 2001 From: Carlos Ouyang Date: Mon, 27 Apr 2026 13:21:35 +0800 Subject: [PATCH] feat: add strategy and backtest services - strategy_service.py combines opportunity + portfolio signals into unified buy/sell/hold recommendations - backtest_service.py runs walk-forward backtests on historical datasets with virtual cash and positions - CLI adds `strategy` and `backtest` commands with `--decision-interval` and other tuning parameters - Add tests for both services and CLI dispatch - Update CLAUDE.md with new architecture docs - Optimize model weights via opportunity optimizer Co-Authored-By: Claude Opus 4.7 --- CLAUDE.md | 29 +- src/coinhunter/cli.py | 51 +++ src/coinhunter/services/backtest_service.py | 370 ++++++++++++++++++++ src/coinhunter/services/strategy_service.py | 339 ++++++++++++++++++ tests/test_backtest_service.py | 129 +++++++ tests/test_cli.py | 70 ++++ tests/test_strategy_service.py | 100 ++++++ 7 files changed, 1078 insertions(+), 10 deletions(-) create mode 100644 src/coinhunter/services/backtest_service.py create mode 100644 src/coinhunter/services/strategy_service.py create mode 100644 tests/test_backtest_service.py create mode 100644 tests/test_strategy_service.py diff --git a/CLAUDE.md b/CLAUDE.md index 4e6336b..62d2784 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -4,7 +4,7 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co ## Development commands -- **Install (dev):** `pip install -e ".[dev]"` +- **Install (dev):** `pip install -e ".[dev]"` or `conda env create -f environment.yml && conda activate coinhunter` - **Run CLI locally:** `python -m coinhunter --help` - **Run tests:** `pytest` or `python -m pytest tests/` - **Run single test file:** `pytest tests/test_cli.py -v` @@ -16,18 +16,20 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co CoinHunter V2 is a Binance-first crypto trading CLI with a flat, direct architecture: -- **`src/coinhunter/cli.py`** — Single entrypoint (`main()`). Uses `argparse` to parse commands and directly dispatches to service functions. There is no separate `commands/` adapter layer. +- **`src/coinhunter/cli.py`** — Single entrypoint (`main()`). Uses `argparse` to parse commands and directly dispatches to service functions. There is no separate `commands/` adapter layer. Top-level commands include `init`, `config`, `account`, `market`, `buy`, `sell`, `portfolio`, `opportunity`, `strategy`, `backtest`, `catlog`, `upgrade`, and `completion`. - **`src/coinhunter/services/`** — Contains all domain logic: - `account_service.py` — balances, positions, overview - `market_service.py` — tickers, klines, scan universe, symbol normalization - `signal_service.py` — shared market signal scoring used by scan and portfolio analysis - `portfolio_service.py` — held-position review and add/hold/trim/exit recommendations - - `trade_service.py` — spot and USDT-M futures order execution + - `trade_service.py` — spot order execution only - `opportunity_service.py` — market scanning and entry/watch/skip recommendations -- **`src/coinhunter/binance/`** — Thin wrappers around official Binance connectors: - - `spot_client.py` wraps `binance.spot.Spot` - - `um_futures_client.py` wraps `binance.um_futures.UMFutures` - Both normalize request errors into `RuntimeError` and handle single/multi-symbol ticker responses. + - `opportunity_dataset_service.py` — historical kline dataset collection for backtesting + - `opportunity_evaluation_service.py` — walk-forward evaluation and model-weight optimization + - `research_service.py` — external research signal providers for opportunity scoring + - `strategy_service.py` — combines opportunity scanning and portfolio analysis into unified buy/sell/hold trade signals + - `backtest_service.py` — walk-forward backtest engine using historical kline datasets with virtual cash and positions +- **`src/coinhunter/binance/spot_client.py`** — Thin wrapper around `binance.spot.Spot`. Normalizes request errors into `RuntimeError` and handles single/multi-symbol ticker responses. - **`src/coinhunter/config.py`** — `load_config()`, `get_binance_credentials()`, `ensure_init_files()`. - **`src/coinhunter/runtime.py`** — `RuntimePaths`, `get_runtime_paths()`, `print_json()`. - **`src/coinhunter/audit.py`** — Writes JSONL audit events to dated files. @@ -39,6 +41,7 @@ User data lives in `~/.coinhunter/` by default (override with `COINHUNTER_HOME`) - `config.toml` — runtime, binance, trading, signal, opportunity, and portfolio settings - `.env` — `BINANCE_API_KEY` and `BINANCE_API_SECRET` - `logs/audit_YYYYMMDD.jsonl` — structured audit log +- `logs/dry-run/audit_YYYYMMDD.jsonl` — dry-run audit log Run `coinhunter init` to generate the config and env templates. @@ -46,8 +49,10 @@ Run `coinhunter init` to generate the config and env templates. - **Symbol normalization:** `market_service.normalize_symbol()` strips `/`, `-`, `_`, and uppercases the symbol. CLI inputs like `ETH/USDT`, `eth-usdt`, and `ETHUSDT` are all normalized to `ETHUSDT`. - **Dry-run behavior:** Trade commands support `--dry-run`. If omitted, the default falls back to `trading.dry_run_default` in `config.toml`. -- **Client injection:** Service functions accept `spot_client` / `futures_client` as keyword arguments. This enables easy unit testing with mocks. -- **Error handling:** Binance client wrappers catch `requests.exceptions.SSLError` and `RequestException` and re-raise as human-readable `RuntimeError`. The CLI catches all exceptions in `main()` and prints `error: {message}` to stderr with exit code 1. +- **Client injection:** Service functions accept `spot_client` as a keyword argument. This enables easy unit testing with mocks. +- **Error handling:** `spot_client.py` catches `requests.exceptions.SSLError` and `RequestException` and re-raises as human-readable `RuntimeError`. The CLI catches all exceptions in `main()` and prints `error: {message}` to stderr with exit code 1. +- **Ticker API fallback:** `spot_client.ticker_stats()` uses `rolling_window_ticker` for symbol-specific queries and `ticker_24hr` for full-market scans (no symbols). +- **Output modes:** All commands support `--agent` for JSON output and `--doc` to print the command's output schema. ## Testing @@ -56,8 +61,12 @@ Tests live in `tests/` and use `unittest.TestCase` with `unittest.mock.patch`. T - `test_cli.py` — parser smoke tests and dispatch behavior - `test_config_runtime.py` — config loading, env parsing, path resolution - `test_account_market_services.py` — balance/position/ticker/klines logic with mocked clients -- `test_trade_service.py` — spot and futures trade execution paths +- `test_trade_service.py` — spot trade execution paths - `test_opportunity_service.py` — portfolio and scan scoring logic +- `test_opportunity_dataset_service.py` — dataset collection and walk-forward evaluation +- `test_opportunity_evaluation_service.py` — model weight optimization +- `test_strategy_service.py` — combined signal generation logic +- `test_backtest_service.py` — historical backtest engine ## Notes diff --git a/src/coinhunter/cli.py b/src/coinhunter/cli.py index 78a0474..cce28e0 100644 --- a/src/coinhunter/cli.py +++ b/src/coinhunter/cli.py @@ -26,11 +26,13 @@ from .runtime import ( ) from .services import ( account_service, + backtest_service, market_service, opportunity_dataset_service, opportunity_evaluation_service, opportunity_service, portfolio_service, + strategy_service, trade_service, ) @@ -45,6 +47,8 @@ examples: coin opportunity -s BTCUSDT ETHUSDT coin opportunity evaluate ~/.coinhunter/datasets/opportunity_dataset.json --agent coin opportunity optimize ~/.coinhunter/datasets/opportunity_dataset.json --agent + coin strategy -s BTCUSDT ETHUSDT + coin backtest ~/.coinhunter/datasets/opportunity_dataset_20260101T000000Z.json coin upgrade """ @@ -967,6 +971,26 @@ def build_parser() -> argparse.ArgumentParser: optimize_parser.add_argument("--passes", type=int, default=2, help="Coordinate-search passes over model weights") _add_global_flags(optimize_parser) + strategy_parser = subparsers.add_parser( + "strategy", aliases=["strat", "st"], help="Combined opportunity + portfolio trade signals", + description="Generate unified buy/sell/hold signals by combining opportunity scanning and portfolio analysis.", + ) + strategy_parser.add_argument("-s", "--symbols", nargs="*", metavar="SYM", help="Restrict scan to specific symbols") + _add_global_flags(strategy_parser) + + backtest_parser = subparsers.add_parser( + "backtest", aliases=["bt"], help="Backtest combined strategy on historical dataset", + description="Run a walk-forward backtest using historical kline datasets with virtual cash and positions.", + ) + backtest_parser.add_argument("dataset", help="Path to an opportunity dataset JSON file") + backtest_parser.add_argument("--initial-cash", type=float, help="Initial cash allocation (default: 10000)") + backtest_parser.add_argument("--max-positions", type=int, help="Maximum simultaneous positions (default: 5)") + backtest_parser.add_argument("--position-size-pct", type=float, help="Cash percentage per position (default: 0.2)") + backtest_parser.add_argument("--commission-pct", type=float, help="Commission per trade in percent (default: 0.1)") + backtest_parser.add_argument("--lookback", type=int, help="Closed candles used for each point-in-time score") + backtest_parser.add_argument("--decision-interval", type=int, help="Minimum minutes between decision points (default: 0 = every candle)") + _add_global_flags(backtest_parser) + upgrade_parser = subparsers.add_parser( "upgrade", help="Upgrade coinhunter to the latest version", description="Upgrade the coinhunter package using pipx (preferred) or pip.", @@ -1006,6 +1030,9 @@ _CANONICAL_COMMANDS = { "o": "opportunity", "cfg": "config", "c": "config", + "strat": "strategy", + "st": "strategy", + "bt": "backtest", } _CANONICAL_SUBCOMMANDS = { @@ -1263,6 +1290,30 @@ def main(argv: list[str] | None = None) -> int: print_output(result, agent=args.agent) return 0 + if args.command == "strategy": + spot_client = _load_spot_client(config) + with with_spinner("Generating trade signals...", enabled=not args.agent): + result = strategy_service.generate_trade_signals( + config, spot_client=spot_client, symbols=args.symbols + ) + print_output(result, agent=args.agent) + return 0 + + if args.command == "backtest": + with with_spinner("Running backtest...", enabled=not args.agent): + result = backtest_service.run_backtest( + config, + dataset_path=args.dataset, + initial_cash=args.initial_cash, + max_positions=args.max_positions, + position_size_pct=args.position_size_pct / 100.0 if args.position_size_pct is not None else None, + commission_pct=args.commission_pct / 100.0 if args.commission_pct is not None else None, + lookback=args.lookback, + decision_interval_minutes=args.decision_interval, + ) + print_output(result, agent=args.agent) + return 0 + if args.command == "opportunity": if args.opportunity_command == "optimize": with with_spinner("Optimizing opportunity model...", enabled=not args.agent): diff --git a/src/coinhunter/services/backtest_service.py b/src/coinhunter/services/backtest_service.py new file mode 100644 index 0000000..6a73c50 --- /dev/null +++ b/src/coinhunter/services/backtest_service.py @@ -0,0 +1,370 @@ +"""Backtest engine for combined opportunity + portfolio strategy.""" + +from __future__ import annotations + +import json +from collections import defaultdict +from copy import deepcopy +from dataclasses import asdict, dataclass +from datetime import datetime, timezone +from pathlib import Path +from statistics import mean +from typing import Any + +from .market_service import normalize_symbol +from .signal_service import get_signal_interval, score_opportunity_signal, score_portfolio_signal +from .strategy_service import generate_signals_from_klines + + +@dataclass +class Position: + symbol: str + qty: float + entry_price: float + entry_time: str + notional_usdt: float + + +@dataclass +class Trade: + time: str + symbol: str + side: str + price: float + qty: float + notional: float + commission: float + reason: str + + +def _as_float(value: Any, default: float = 0.0) -> float: + try: + return float(value) + except (TypeError, ValueError): + return default + + +def _as_int(value: Any, default: int = 0) -> int: + try: + return int(value) + except (TypeError, ValueError): + return default + + +def _parse_dt(value: Any) -> datetime | None: + if not value: + return None + try: + return datetime.fromisoformat(str(value).replace("Z", "+00:00")).astimezone(timezone.utc) + except ValueError: + return None + + +def _iso_from_ms(value: int) -> str: + return ( + datetime.fromtimestamp(value / 1000, tz=timezone.utc) + .replace(microsecond=0) + .isoformat() + .replace("+00:00", "Z") + ) + + +def _close(row: list[Any]) -> float: + return _as_float(row[4]) + + +def _open_ms(row: list[Any]) -> int: + return int(row[0]) + + +def _ticker_from_window(symbol: str, rows: list[list[Any]]) -> dict[str, Any]: + first = _close(rows[0]) + last = _close(rows[-1]) + price_change_pct = ((last - first) / first * 100.0) if first else 0.0 + return { + "symbol": symbol, + "price_change_pct": price_change_pct, + "quote_volume": sum(_close(row) * _as_float(row[5]) for row in rows), + "high_price": max(_as_float(row[2]) for row in rows), + "low_price": min(_as_float(row[3]) for row in rows), + } + + +def _window_series(rows: list[list[Any]]) -> tuple[list[float], list[float]]: + return [_close(row) for row in rows], [_as_float(row[5]) for row in rows] + + +def _portfolio_value(cash: float, positions: list[Position], prices: dict[str, float]) -> float: + return cash + sum(p.qty * prices.get(p.symbol, p.entry_price) for p in positions) + + +def _pct(new: float, old: float) -> float: + if old == 0: + return 0.0 + return (new - old) / old + + +def run_backtest( + config: dict[str, Any], + *, + dataset_path: str, + initial_cash: float | None = None, + max_positions: int | None = None, + position_size_pct: float | None = None, + commission_pct: float | None = None, + lookback: int | None = None, + decision_interval_minutes: int | None = None, +) -> dict[str, Any]: + """Run a walk-forward backtest using historical kline datasets. + + Maintains virtual cash and positions. At each decision point: + 1. Sells positions where portfolio signals "exit" or "trim" + 2. Buys top opportunity "entry" signals within cash and position limits + """ + dataset_file = Path(dataset_path).expanduser() + dataset = json.loads(dataset_file.read_text(encoding="utf-8")) + metadata = dataset.get("metadata", {}) + plan = metadata.get("plan", {}) + klines = dataset.get("klines", {}) + + intervals = list(plan.get("intervals") or []) + configured_interval = get_signal_interval(config) + primary_interval = configured_interval if configured_interval in intervals else (intervals[0] if intervals else "1h") + + simulation_start = _parse_dt(plan.get("simulation_start")) + simulation_end = _parse_dt(plan.get("simulation_end")) + if simulation_start is None or simulation_end is None: + raise ValueError("dataset metadata must include plan.simulation_start and plan.simulation_end") + + opportunity_config = config.get("opportunity", {}) + portfolio_config = config.get("portfolio", {}) + + cash = _as_float(initial_cash, 10000.0) + max_pos = _as_int(max_positions, _as_int(portfolio_config.get("max_positions"), 5)) + size_pct = _as_float(position_size_pct, _as_float(opportunity_config.get("backtest_position_size_pct"), 0.2)) + commission = _as_float(commission_pct, _as_float(config.get("trading", {}).get("commission_pct"), 0.001)) + lookback_bars = lookback or _as_int(opportunity_config.get("evaluation_lookback"), 24) + + start_ms = int(simulation_start.timestamp() * 1000) + end_ms = int(simulation_end.timestamp() * 1000) + + rows_by_symbol: dict[str, list[list[Any]]] = {} + index_by_symbol: dict[str, dict[int, int]] = {} + for symbol, by_interval in klines.items(): + rows = by_interval.get(primary_interval, []) + normalized = normalize_symbol(symbol) + if rows: + rows_by_symbol[normalized] = rows + index_by_symbol[normalized] = {_open_ms(row): index for index, row in enumerate(rows)} + + decision_times = sorted( + { + _open_ms(row) + for rows in rows_by_symbol.values() + for row in rows + if start_ms <= _open_ms(row) < end_ms + } + ) + + interval_ms = _as_int(decision_interval_minutes, 0) * 60 * 1000 + if interval_ms > 0 and decision_times: + filtered: list[int] = [] + anchor = decision_times[0] + for t in decision_times: + if t - anchor >= interval_ms: + filtered.append(t) + anchor = t + decision_times = filtered + + positions: list[Position] = [] + trades: list[dict[str, Any]] = [] + equity_curve: list[dict[str, Any]] = [] + + skipped_warmup = 0 + skipped_missing_future = 0 + + for decision_time in decision_times: + current_prices: dict[str, float] = {} + klines_snapshot: dict[str, list[list[Any]]] = {} + + for symbol, rows in rows_by_symbol.items(): + index = index_by_symbol[symbol].get(decision_time) + if index is None: + continue + window = rows[max(0, index - lookback_bars + 1) : index + 1] + if len(window) < lookback_bars: + skipped_warmup += 1 + continue + future_rows = [row for row in rows[index + 1 :]] + if not future_rows: + skipped_missing_future += 1 + continue + klines_snapshot[symbol] = window + current_prices[symbol] = _close(window[-1]) + + # Build held positions for portfolio signal generation + held_positions = [ + { + "symbol": p.symbol, + "notional_usdt": p.qty * current_prices.get(p.symbol, p.entry_price), + } + for p in positions + if p.symbol in current_prices + ] + + signals = generate_signals_from_klines(config, klines_by_symbol=klines_snapshot, held_positions=held_positions) + + # Execute sells first to free cash + sell_symbols = {normalize_symbol(s["symbol"]) for s in signals.get("sell", [])} + new_positions: list[Position] = [] + for pos in positions: + if pos.symbol in sell_symbols and pos.symbol in current_prices: + price = current_prices[pos.symbol] + notional = pos.qty * price + comm = notional * commission + cash += notional - comm + trades.append( + asdict( + Trade( + time=_iso_from_ms(decision_time), + symbol=pos.symbol, + side="SELL", + price=round(price, 8), + qty=round(pos.qty, 8), + notional=round(notional, 4), + commission=round(comm, 4), + reason="portfolio signal: exit/trim", + ) + ) + ) + else: + new_positions.append(pos) + positions = new_positions + + # Execute buys with available cash + available_slots = max_pos - len(positions) + if available_slots > 0 and cash > 0: + for buy_signal in signals.get("buy", [])[:available_slots]: + symbol = normalize_symbol(buy_signal["symbol"]) + if symbol not in current_prices: + continue + # Skip if already held + if any(p.symbol == symbol for p in positions): + continue + price = current_prices[symbol] + allocation = cash * size_pct + if allocation <= 0: + continue + qty = allocation / price + comm = allocation * commission + actual_notional = allocation - comm + if actual_notional <= 0: + continue + cash -= allocation + positions.append( + Position( + symbol=symbol, + qty=round(qty, 8), + entry_price=round(price, 8), + entry_time=_iso_from_ms(decision_time), + notional_usdt=round(actual_notional, 4), + ) + ) + trades.append( + asdict( + Trade( + time=_iso_from_ms(decision_time), + symbol=symbol, + side="BUY", + price=round(price, 8), + qty=round(qty, 8), + notional=round(allocation, 4), + commission=round(comm, 4), + reason=buy_signal.get("reasons", ["opportunity entry"])[0], + ) + ) + ) + + # Record equity + equity = _portfolio_value(cash, positions, current_prices) + equity_curve.append( + { + "time": _iso_from_ms(decision_time), + "equity": round(equity, 4), + "cash": round(cash, 4), + "positions_count": len(positions), + } + ) + + # Final valuation + final_prices: dict[str, float] = {} + for symbol, rows in rows_by_symbol.items(): + if rows: + final_prices[symbol] = _close(rows[-1]) + + final_equity = _portfolio_value(cash, positions, final_prices) + if equity_curve: + equity_curve[-1]["equity"] = round(final_equity, 4) + + # Performance metrics + initial_equity = equity_curve[0]["equity"] if equity_curve else cash + total_return = _pct(final_equity, initial_equity) + + equity_values = [e["equity"] for e in equity_curve] + peak = initial_equity + max_drawdown = 0.0 + for val in equity_values: + if val > peak: + peak = val + dd = _pct(val, peak) + if dd < max_drawdown: + max_drawdown = dd + + buy_trades = [t for t in trades if t["side"] == "BUY"] + sell_trades = [t for t in trades if t["side"] == "SELL"] + + trade_returns: list[float] = [] + position_map: dict[str, dict[str, Any]] = {} + for t in buy_trades: + position_map[t["symbol"]] = t + for t in sell_trades: + buy_trade = position_map.get(t["symbol"]) + if buy_trade: + trade_return = _pct(t["notional"] - t["commission"], buy_trade["notional"] + buy_trade["commission"]) + trade_returns.append(trade_return) + + wins = sum(1 for r in trade_returns if r > 0) + losses = len(trade_returns) - wins + + return { + "summary": { + "initial_cash": round(initial_equity, 4), + "final_equity": round(final_equity, 4), + "total_return_pct": round(total_return * 100, 4), + "max_drawdown_pct": round(max_drawdown * 100, 4), + "buy_signals": len(buy_trades), + "sell_signals": len(sell_trades), + "completed_trades": len(trade_returns), + "win_rate": round(wins / len(trade_returns), 4) if trade_returns else 0.0, + "wins": wins, + "losses": losses, + "avg_trade_return_pct": round(mean(trade_returns) * 100, 4) if trade_returns else 0.0, + "open_positions": len(positions), + "decision_points": len(decision_times), + "skipped_warmup": skipped_warmup, + "skipped_missing_future": skipped_missing_future, + }, + "trades": trades, + "equity_curve": equity_curve, + "open_positions": [asdict(p) for p in positions], + "parameters": { + "dataset": str(dataset_file), + "interval": primary_interval, + "initial_cash": cash if not trades else initial_equity, + "max_positions": max_pos, + "position_size_pct": size_pct, + "commission_pct": commission, + "lookback_bars": lookback_bars, + "decision_interval_minutes": _as_int(decision_interval_minutes, 0), + }, + } diff --git a/src/coinhunter/services/strategy_service.py b/src/coinhunter/services/strategy_service.py new file mode 100644 index 0000000..7db73c5 --- /dev/null +++ b/src/coinhunter/services/strategy_service.py @@ -0,0 +1,339 @@ +"""Unified strategy combining opportunity scanning and portfolio management.""" + +from __future__ import annotations + +from dataclasses import asdict, dataclass +from typing import Any + +from .market_service import normalize_symbol +from .opportunity_service import ( + _action_for_opportunity, + _opportunity_thresholds, + scan_opportunities, +) +from .portfolio_service import ( + _action_for_position, + _portfolio_thresholds, + analyze_portfolio, +) +from .signal_service import score_opportunity_signal, score_portfolio_signal + + +@dataclass +class TradeSignal: + symbol: str + action: str + side: str + score: float + reasons: list[str] + opportunity_metrics: dict[str, float] + portfolio_metrics: dict[str, float] + + +def _held_symbols(positions: list[dict[str, Any]]) -> set[str]: + return {normalize_symbol(p["symbol"]) for p in positions} + + +def generate_trade_signals( + config: dict[str, Any], + *, + spot_client: Any, + symbols: list[str] | None = None, +) -> dict[str, Any]: + """Combine opportunity and portfolio signals into unified buy/sell/hold recommendations. + + Buy criteria: + - Opportunity action is "entry" + - Not already held OR portfolio allows "add" + - Position concentration below max weight + + Sell criteria: + - Position exists and portfolio action is "exit" or "trim" + + Hold criteria: + - Position exists and portfolio action is "hold" + """ + portfolio_config = config.get("portfolio", {}) + max_position_weight = float(portfolio_config.get("max_position_weight", 0.6)) + + opp_result = scan_opportunities(config, spot_client=spot_client, symbols=symbols) + pf_result = analyze_portfolio(config, spot_client=spot_client) + + held = {normalize_symbol(p["symbol"]): p for p in pf_result.get("recommendations", [])} + total_notional = sum(p.get("metrics", {}).get("position_weight", 0) for p in held.values()) or 1.0 + + buys: list[dict[str, Any]] = [] + sells: list[dict[str, Any]] = [] + holds: list[dict[str, Any]] = [] + + for rec in opp_result.get("recommendations", []): + symbol = normalize_symbol(rec["symbol"]) + opp_action = rec["action"] + score = rec["score"] + reasons = list(rec.get("reasons", [])) + opp_metrics = dict(rec.get("metrics", {})) + + pf_rec = held.get(symbol) + pf_action = pf_rec["action"] if pf_rec else "none" + pf_metrics = dict(pf_rec.get("metrics", {})) if pf_rec else {} + concentration = pf_metrics.get("position_weight", 0.0) + + if opp_action == "entry" and (symbol not in held or pf_action in ("add", "hold")): + if concentration < max_position_weight: + reasons.append(f"portfolio: {pf_action or 'not held'} -> buy") + buys.append( + asdict( + TradeSignal( + symbol=symbol, + action="buy", + side="BUY", + score=round(score, 4), + reasons=reasons, + opportunity_metrics=opp_metrics, + portfolio_metrics=pf_metrics, + ) + ) + ) + else: + reasons.append(f"portfolio: position weight {concentration:.2%} at max -> skip") + holds.append( + asdict( + TradeSignal( + symbol=symbol, + action="hold", + side="HOLD", + score=round(score, 4), + reasons=reasons, + opportunity_metrics=opp_metrics, + portfolio_metrics=pf_metrics, + ) + ) + ) + + for symbol, pf_rec in held.items(): + pf_action = pf_rec["action"] + score = pf_rec["score"] + reasons = list(pf_rec.get("reasons", [])) + pf_metrics = dict(pf_rec.get("metrics", {})) + + opp_rec = next((r for r in opp_result.get("recommendations", []) if normalize_symbol(r["symbol"]) == symbol), None) + opp_metrics = dict(opp_rec.get("metrics", {})) if opp_rec else {} + + if pf_action in ("exit", "trim"): + reasons.append(f"opportunity: {opp_rec['action'] if opp_rec else 'not in scan'} -> sell") + sells.append( + asdict( + TradeSignal( + symbol=symbol, + action="sell", + side="SELL", + score=round(score, 4), + reasons=reasons, + opportunity_metrics=opp_metrics, + portfolio_metrics=pf_metrics, + ) + ) + ) + elif pf_action == "hold": + reasons.append(f"opportunity: {opp_rec['action'] if opp_rec else 'not in scan'} -> hold") + holds.append( + asdict( + TradeSignal( + symbol=symbol, + action="hold", + side="HOLD", + score=round(score, 4), + reasons=reasons, + opportunity_metrics=opp_metrics, + portfolio_metrics=pf_metrics, + ) + ) + ) + elif pf_action == "add": + # Already handled in buy loop if opp is entry; otherwise treat as hold + if not any(normalize_symbol(b["symbol"]) == symbol for b in buys): + reasons.append("opportunity: no entry signal -> hold") + holds.append( + asdict( + TradeSignal( + symbol=symbol, + action="hold", + side="HOLD", + score=round(score, 4), + reasons=reasons, + opportunity_metrics=opp_metrics, + portfolio_metrics=pf_metrics, + ) + ) + ) + + return { + "buy": sorted(buys, key=lambda item: item["score"], reverse=True), + "sell": sorted(sells, key=lambda item: item["score"]), + "hold": sorted(holds, key=lambda item: item["score"], reverse=True), + } + + +def _series_from_klines(klines: list[list[Any]]) -> tuple[list[float], list[float]]: + return [float(item[4]) for item in klines], [float(item[5]) for item in klines] + + +def generate_signals_from_klines( + config: dict[str, Any], + *, + klines_by_symbol: dict[str, list[list[Any]]], + held_positions: list[dict[str, Any]], +) -> dict[str, Any]: + """Pure version of signal generation that works on in-memory klines. + + Used by backtest to avoid network calls. + """ + opportunity_config = config.get("opportunity", {}) + portfolio_config = config.get("portfolio", {}) + thresholds = _opportunity_thresholds(config) + pf_thresholds = _portfolio_thresholds(config) + max_position_weight = pf_thresholds["max_position_weight"] + weights = opportunity_config.get("weights", {}) + signal_weights = config.get("signal", {}) + top_n = int(opportunity_config.get("top_n", 10)) + + held = {normalize_symbol(p["symbol"]): p for p in held_positions} + total_notional = sum(p.get("notional_usdt", 0) for p in held_positions) or 1.0 + + opp_candidates: list[dict[str, Any]] = [] + for symbol, rows in klines_by_symbol.items(): + if len(rows) < 6: + continue + closes, volumes = _series_from_klines(rows) + ticker = { + "symbol": symbol, + "price_change_pct": ((closes[-1] - closes[0]) / closes[0] * 100) if closes[0] else 0.0, + "quote_volume": sum(c * v for c, v in zip(closes, volumes)), + } + opportunity_score, metrics = score_opportunity_signal(closes, volumes, ticker, opportunity_config) + score = opportunity_score + metrics["opportunity_score"] = round(opportunity_score, 4) + action, reasons, _confidence = _action_for_opportunity(score, metrics, thresholds) + opp_candidates.append({ + "symbol": symbol, + "action": action, + "score": round(score, 4), + "metrics": metrics, + "reasons": reasons, + }) + + pf_results: dict[str, dict[str, Any]] = {} + for symbol, position in held.items(): + rows = klines_by_symbol.get(symbol, []) + if len(rows) < 2: + continue + closes, volumes = _series_from_klines(rows) + ticker = {"price_change_pct": ((closes[-1] - closes[0]) / closes[0] * 100) if closes[0] else 0.0} + concentration = position.get("notional_usdt", 0) / total_notional + score, metrics = score_portfolio_signal(closes, volumes, ticker, signal_weights) + pf_action, pf_reasons = _action_for_position(score, concentration, pf_thresholds) + metrics["position_weight"] = round(concentration, 4) + pf_results[symbol] = { + "symbol": symbol, + "action": pf_action, + "score": round(score, 4), + "reasons": pf_reasons, + "metrics": metrics, + "notional_usdt": position.get("notional_usdt", 0), + } + + buys: list[dict[str, Any]] = [] + sells: list[dict[str, Any]] = [] + holds: list[dict[str, Any]] = [] + + for rec in sorted(opp_candidates, key=lambda item: item["score"], reverse=True)[:top_n]: + symbol = normalize_symbol(rec["symbol"]) + opp_action = rec["action"] + score = rec["score"] + reasons = list(rec.get("reasons", [])) + opp_metrics = dict(rec.get("metrics", {})) + + pf_rec = pf_results.get(symbol) + pf_action = pf_rec["action"] if pf_rec else "none" + pf_metrics = dict(pf_rec.get("metrics", {})) if pf_rec else {} + concentration = pf_metrics.get("position_weight", 0.0) + + if opp_action == "entry" and (symbol not in held or pf_action in ("add", "hold")): + if concentration < max_position_weight: + reasons.append(f"portfolio: {pf_action or 'not held'} -> buy") + buys.append( + asdict( + TradeSignal( + symbol=symbol, + action="buy", + side="BUY", + score=round(score, 4), + reasons=reasons, + opportunity_metrics=opp_metrics, + portfolio_metrics=pf_metrics, + ) + ) + ) + else: + reasons.append(f"portfolio: position weight {concentration:.2%} at max -> skip") + + for symbol, pf_rec in pf_results.items(): + pf_action = pf_rec["action"] + score = pf_rec["score"] + reasons = list(pf_rec.get("reasons", [])) + pf_metrics = dict(pf_rec.get("metrics", {})) + opp_rec = next((r for r in opp_candidates if normalize_symbol(r["symbol"]) == symbol), None) + opp_metrics = dict(opp_rec.get("metrics", {})) if opp_rec else {} + + if pf_action in ("exit", "trim"): + reasons.append(f"opportunity: {opp_rec['action'] if opp_rec else 'not in scan'} -> sell") + sells.append( + asdict( + TradeSignal( + symbol=symbol, + action="sell", + side="SELL", + score=round(score, 4), + reasons=reasons, + opportunity_metrics=opp_metrics, + portfolio_metrics=pf_metrics, + ) + ) + ) + elif pf_action == "hold": + reasons.append(f"opportunity: {opp_rec['action'] if opp_rec else 'not in scan'} -> hold") + holds.append( + asdict( + TradeSignal( + symbol=symbol, + action="hold", + side="HOLD", + score=round(score, 4), + reasons=reasons, + opportunity_metrics=opp_metrics, + portfolio_metrics=pf_metrics, + ) + ) + ) + elif pf_action == "add": + if not any(normalize_symbol(b["symbol"]) == symbol for b in buys): + reasons.append("opportunity: no entry signal -> hold") + holds.append( + asdict( + TradeSignal( + symbol=symbol, + action="hold", + side="HOLD", + score=round(score, 4), + reasons=reasons, + opportunity_metrics=opp_metrics, + portfolio_metrics=pf_metrics, + ) + ) + ) + + return { + "buy": sorted(buys, key=lambda item: item["score"], reverse=True), + "sell": sorted(sells, key=lambda item: item["score"]), + "hold": sorted(holds, key=lambda item: item["score"], reverse=True), + } diff --git a/tests/test_backtest_service.py b/tests/test_backtest_service.py new file mode 100644 index 0000000..1975be9 --- /dev/null +++ b/tests/test_backtest_service.py @@ -0,0 +1,129 @@ +"""Tests for backtest_service.""" + +from __future__ import annotations + +import json +import tempfile +import unittest +from pathlib import Path +from typing import Any + +from coinhunter.services import backtest_service + + +class BacktestServiceTestCase(unittest.TestCase): + def _klines(self, closes: list[float], start_ms: int = 0, volumes: list[float] | None = None) -> list[list[float]]: + volumes = volumes or [1.0] * len(closes) + return [ + [start_ms + i * 3600000, c * 0.98, c * 1.02, c * 0.97, c, v, 0.0, c * v, 100, 0.0, 0.0, 0.0] + for i, (c, v) in enumerate(zip(closes, volumes)) + ] + + def _config(self) -> dict[str, Any]: + return { + "opportunity": { + "entry_threshold": 1.5, + "watch_threshold": 0.6, + "min_trigger_score": 0.45, + "min_setup_score": 0.35, + "overlap_penalty": 0.6, + "top_n": 10, + "scan_limit": 50, + "kline_limit": 48, + "weights": {}, + "model_weights": {}, + }, + "portfolio": { + "add_threshold": 1.5, + "hold_threshold": 0.6, + "trim_threshold": 0.2, + "exit_threshold": -0.2, + "max_position_weight": 0.6, + "max_positions": 5, + }, + "signal": { + "lookback_interval": "1h", + }, + "market": { + "default_quote": "USDT", + }, + "trading": { + "commission_pct": 0.001, + }, + } + + def _make_dataset(self, closes_by_symbol: dict[str, list[float]], start_iso: str = "2025-12-28T00:00:00Z", sim_start_iso: str = "2025-12-30T00:00:00Z", sim_end_iso: str = "2026-01-01T00:00:00Z") -> Path: + from datetime import datetime, timezone + start_ms = int(datetime.fromisoformat(start_iso.replace("Z", "+00:00")).timestamp() * 1000) + klines: dict[str, dict[str, list[list[float]]]] = {} + for symbol, closes in closes_by_symbol.items(): + klines[symbol] = {"1h": self._klines(closes, start_ms=start_ms)} + dataset = { + "metadata": { + "created_at": "2026-01-01T00:00:00Z", + "quote": "USDT", + "symbols": list(closes_by_symbol.keys()), + "plan": { + "intervals": ["1h"], + "kline_limit": 48, + "reference_days": 2.0, + "simulate_days": 1.0, + "run_days": 1.0, + "total_days": 4.0, + "start": start_iso, + "simulation_start": sim_start_iso, + "simulation_end": sim_end_iso, + "end": sim_end_iso, + }, + "external_history": {"provider": "disabled", "status": "disabled"}, + }, + "klines": klines, + } + fp = tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) + json.dump(dataset, fp) + fp.close() + return Path(fp.name) + + def test_run_backtest_produces_summary(self) -> None: + config = self._config() + closes = list(range(20, 92)) + path = self._make_dataset({"BTCUSDT": closes}) + try: + result = backtest_service.run_backtest(config, dataset_path=str(path), initial_cash=10000.0) + self.assertIn("summary", result) + self.assertIn("trades", result) + self.assertIn("equity_curve", result) + self.assertIn("parameters", result) + summary = result["summary"] + self.assertIn("initial_cash", summary) + self.assertIn("final_equity", summary) + self.assertIn("total_return_pct", summary) + self.assertIn("max_drawdown_pct", summary) + self.assertIn("win_rate", summary) + finally: + path.unlink() + + def test_run_backtest_missing_simulation_dates_raises(self) -> None: + config = self._config() + path = self._make_dataset({"BTCUSDT": list(range(20, 92))}, sim_start_iso="", sim_end_iso="") + try: + with self.assertRaises(ValueError): + backtest_service.run_backtest(config, dataset_path=str(path)) + finally: + path.unlink() + + def test_run_backtest_tracks_equity_curve(self) -> None: + config = self._config() + # Need ~72 candles to cover 2025-12-28 through 2026-01-01 (warmup + simulation) + closes = list(range(20, 92)) + path = self._make_dataset({"BTCUSDT": closes}) + try: + result = backtest_service.run_backtest(config, dataset_path=str(path), initial_cash=10000.0) + self.assertTrue(len(result["equity_curve"]) > 0) + first = result["equity_curve"][0] + self.assertIn("time", first) + self.assertIn("equity", first) + self.assertIn("cash", first) + self.assertIn("positions_count", first) + finally: + path.unlink() diff --git a/tests/test_cli.py b/tests/test_cli.py index fd004b0..93eb106 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -336,6 +336,76 @@ class CLITestCase(unittest.TestCase): max_examples=5, ) + def test_strategy_dispatches(self): + captured = {} + with ( + patch.object( + cli, "load_config", return_value={"binance": {"spot_base_url": "https://test", "recv_window": 5000}, "market": {"default_quote": "USDT"}, "opportunity": {"top_n": 10}} + ), + patch.object(cli, "get_binance_credentials", return_value={"api_key": "k", "api_secret": "s"}), + patch.object(cli, "SpotBinanceClient"), + patch.object( + cli.strategy_service, + "generate_trade_signals", + return_value={"buy": [{"symbol": "BTCUSDT", "score": 0.82}], "sell": [], "hold": []}, + ), + patch.object( + cli, "print_output", side_effect=lambda payload, **kwargs: captured.setdefault("payload", payload) + ), + ): + result = cli.main(["strategy", "-s", "BTCUSDT"]) + self.assertEqual(result, 0) + self.assertEqual(captured["payload"]["buy"][0]["symbol"], "BTCUSDT") + + def test_backtest_dispatches_without_private_client(self): + captured = {} + config = {"market": {"default_quote": "USDT"}, "opportunity": {}} + with ( + patch.object(cli, "load_config", return_value=config), + patch.object(cli, "_load_spot_client", side_effect=AssertionError("backtest should use dataset only")), + patch.object( + cli.backtest_service, + "run_backtest", + return_value={"summary": {"total_return_pct": 5.0, "win_rate": 0.6}, "trades": []}, + ) as backtest_mock, + patch.object( + cli, + "print_output", + side_effect=lambda payload, **kwargs: captured.update({"payload": payload, "agent": kwargs["agent"]}), + ), + ): + result = cli.main( + [ + "backtest", + "/tmp/dataset.json", + "--initial-cash", + "5000", + "--max-positions", + "3", + "--position-size-pct", + "20", + "--commission-pct", + "0.1", + "--lookback", + "12", + "--agent", + ] + ) + + self.assertEqual(result, 0) + self.assertEqual(captured["payload"]["summary"]["total_return_pct"], 5.0) + self.assertTrue(captured["agent"]) + backtest_mock.assert_called_once_with( + config, + dataset_path="/tmp/dataset.json", + initial_cash=5000.0, + max_positions=3, + position_size_pct=0.2, + commission_pct=0.001, + lookback=12, + decision_interval_minutes=None, + ) + def test_opportunity_optimize_dispatches_without_private_client(self): captured = {} config = {"market": {"default_quote": "USDT"}, "opportunity": {}} diff --git a/tests/test_strategy_service.py b/tests/test_strategy_service.py new file mode 100644 index 0000000..66583fa --- /dev/null +++ b/tests/test_strategy_service.py @@ -0,0 +1,100 @@ +"""Tests for strategy_service.""" + +from __future__ import annotations + +import unittest +from typing import Any +from unittest import mock +from unittest.mock import MagicMock + +from coinhunter.services import strategy_service + + +class StrategyServiceTestCase(unittest.TestCase): + def _klines(self, closes: list[float], volumes: list[float] | None = None) -> list[list[float]]: + volumes = volumes or [1.0] * len(closes) + return [ + [i * 3600000.0, c * 0.98, c * 1.02, c * 0.97, c, v, 0.0, c * v, 100, 0.0, 0.0, 0.0] + for i, (c, v) in enumerate(zip(closes, volumes)) + ] + + def _config(self) -> dict[str, Any]: + return { + "opportunity": { + "entry_threshold": 1.5, + "watch_threshold": 0.6, + "min_trigger_score": 0.45, + "min_setup_score": 0.35, + "overlap_penalty": 0.6, + "top_n": 10, + "scan_limit": 50, + "kline_limit": 48, + "weights": {}, + "model_weights": {}, + }, + "portfolio": { + "add_threshold": 1.5, + "hold_threshold": 0.6, + "trim_threshold": 0.2, + "exit_threshold": -0.2, + "max_position_weight": 0.6, + }, + "signal": { + "lookback_interval": "1h", + }, + "market": { + "default_quote": "USDT", + }, + } + + def test_generate_signals_from_klines_buy_when_entry_and_not_held(self) -> None: + config = self._config() + closes = list(range(20, 40)) + klines = {"BTCUSDT": self._klines(closes)} + result = strategy_service.generate_signals_from_klines(config, klines_by_symbol=klines, held_positions=[]) + self.assertIn("buy", result) + self.assertIn("sell", result) + self.assertIn("hold", result) + + def test_generate_signals_from_klines_sell_when_exit_signal(self) -> None: + config = self._config() + closes = list(range(40, 20, -1)) + klines = {"BTCUSDT": self._klines(closes)} + held = [{"symbol": "BTCUSDT", "notional_usdt": 1000.0}] + result = strategy_service.generate_signals_from_klines(config, klines_by_symbol=klines, held_positions=held) + symbols = [s["symbol"] for s in result["sell"]] + self.assertIn("BTCUSDT", symbols) + + def test_generate_signals_respects_max_position_weight(self) -> None: + config = self._config() + config["portfolio"]["max_position_weight"] = 0.01 + closes = list(range(20, 40)) + klines = {"BTCUSDT": self._klines(closes)} + held = [{"symbol": "BTCUSDT", "notional_usdt": 9999.0}] + result = strategy_service.generate_signals_from_klines(config, klines_by_symbol=klines, held_positions=held) + buy_symbols = [s["symbol"] for s in result["buy"]] + self.assertNotIn("BTCUSDT", buy_symbols) + + @mock.patch("coinhunter.services.portfolio_service.audit_event") + @mock.patch("coinhunter.services.opportunity_service.audit_event") + def test_generate_trade_signals_dispatches_to_services(self, mock_audit_opp, mock_audit_pf) -> None: + mock_client = MagicMock() + mock_client.klines.return_value = self._klines(list(range(20, 44))) + mock_client.ticker_stats.return_value = [ + { + "symbol": "BTCUSDT", + "lastPrice": "30.0", + "priceChangePercent": "5.0", + "quoteVolume": "1000000", + "highPrice": "31.0", + "lowPrice": "29.0", + } + ] + mock_client.account.return_value = {"balances": [{"asset": "BTC", "free": "0.5", "locked": "0.0"}]} + mock_client.exchange_info.return_value = {"symbols": [{"symbol": "BTCUSDT", "status": "TRADING"}]} + + config = self._config() + result = strategy_service.generate_trade_signals(config, spot_client=mock_client) + self.assertIn("buy", result) + self.assertIn("sell", result) + self.assertIn("hold", result)