feat: add strategy and backtest services

- strategy_service.py combines opportunity + portfolio signals into
  unified buy/sell/hold recommendations
- backtest_service.py runs walk-forward backtests on historical datasets
  with virtual cash and positions
- CLI adds `strategy` and `backtest` commands with `--decision-interval`
  and other tuning parameters
- Add tests for both services and CLI dispatch
- Update CLAUDE.md with new architecture docs
- Optimize model weights via opportunity optimizer

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
Carlos Ouyang
2026-04-27 13:21:35 +08:00
parent 10b314aa2b
commit e4b2239bcd
7 changed files with 1078 additions and 10 deletions

View File

@@ -4,7 +4,7 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co
## Development commands ## Development commands
- **Install (dev):** `pip install -e ".[dev]"` - **Install (dev):** `pip install -e ".[dev]"` or `conda env create -f environment.yml && conda activate coinhunter`
- **Run CLI locally:** `python -m coinhunter --help` - **Run CLI locally:** `python -m coinhunter --help`
- **Run tests:** `pytest` or `python -m pytest tests/` - **Run tests:** `pytest` or `python -m pytest tests/`
- **Run single test file:** `pytest tests/test_cli.py -v` - **Run single test file:** `pytest tests/test_cli.py -v`
@@ -16,18 +16,20 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co
CoinHunter V2 is a Binance-first crypto trading CLI with a flat, direct architecture: CoinHunter V2 is a Binance-first crypto trading CLI with a flat, direct architecture:
- **`src/coinhunter/cli.py`** — Single entrypoint (`main()`). Uses `argparse` to parse commands and directly dispatches to service functions. There is no separate `commands/` adapter layer. - **`src/coinhunter/cli.py`** — Single entrypoint (`main()`). Uses `argparse` to parse commands and directly dispatches to service functions. There is no separate `commands/` adapter layer. Top-level commands include `init`, `config`, `account`, `market`, `buy`, `sell`, `portfolio`, `opportunity`, `strategy`, `backtest`, `catlog`, `upgrade`, and `completion`.
- **`src/coinhunter/services/`** — Contains all domain logic: - **`src/coinhunter/services/`** — Contains all domain logic:
- `account_service.py` — balances, positions, overview - `account_service.py` — balances, positions, overview
- `market_service.py` — tickers, klines, scan universe, symbol normalization - `market_service.py` — tickers, klines, scan universe, symbol normalization
- `signal_service.py` — shared market signal scoring used by scan and portfolio analysis - `signal_service.py` — shared market signal scoring used by scan and portfolio analysis
- `portfolio_service.py` — held-position review and add/hold/trim/exit recommendations - `portfolio_service.py` — held-position review and add/hold/trim/exit recommendations
- `trade_service.py` — spot and USDT-M futures order execution - `trade_service.py` — spot order execution only
- `opportunity_service.py` — market scanning and entry/watch/skip recommendations - `opportunity_service.py` — market scanning and entry/watch/skip recommendations
- **`src/coinhunter/binance/`**Thin wrappers around official Binance connectors: - `opportunity_dataset_service.py` — historical kline dataset collection for backtesting
- `spot_client.py` wraps `binance.spot.Spot` - `opportunity_evaluation_service.py` — walk-forward evaluation and model-weight optimization
- `um_futures_client.py` wraps `binance.um_futures.UMFutures` - `research_service.py` — external research signal providers for opportunity scoring
Both normalize request errors into `RuntimeError` and handle single/multi-symbol ticker responses. - `strategy_service.py` — combines opportunity scanning and portfolio analysis into unified buy/sell/hold trade signals
- `backtest_service.py` — walk-forward backtest engine using historical kline datasets with virtual cash and positions
- **`src/coinhunter/binance/spot_client.py`** — Thin wrapper around `binance.spot.Spot`. Normalizes request errors into `RuntimeError` and handles single/multi-symbol ticker responses.
- **`src/coinhunter/config.py`** — `load_config()`, `get_binance_credentials()`, `ensure_init_files()`. - **`src/coinhunter/config.py`** — `load_config()`, `get_binance_credentials()`, `ensure_init_files()`.
- **`src/coinhunter/runtime.py`** — `RuntimePaths`, `get_runtime_paths()`, `print_json()`. - **`src/coinhunter/runtime.py`** — `RuntimePaths`, `get_runtime_paths()`, `print_json()`.
- **`src/coinhunter/audit.py`** — Writes JSONL audit events to dated files. - **`src/coinhunter/audit.py`** — Writes JSONL audit events to dated files.
@@ -39,6 +41,7 @@ User data lives in `~/.coinhunter/` by default (override with `COINHUNTER_HOME`)
- `config.toml` — runtime, binance, trading, signal, opportunity, and portfolio settings - `config.toml` — runtime, binance, trading, signal, opportunity, and portfolio settings
- `.env``BINANCE_API_KEY` and `BINANCE_API_SECRET` - `.env``BINANCE_API_KEY` and `BINANCE_API_SECRET`
- `logs/audit_YYYYMMDD.jsonl` — structured audit log - `logs/audit_YYYYMMDD.jsonl` — structured audit log
- `logs/dry-run/audit_YYYYMMDD.jsonl` — dry-run audit log
Run `coinhunter init` to generate the config and env templates. Run `coinhunter init` to generate the config and env templates.
@@ -46,8 +49,10 @@ Run `coinhunter init` to generate the config and env templates.
- **Symbol normalization:** `market_service.normalize_symbol()` strips `/`, `-`, `_`, and uppercases the symbol. CLI inputs like `ETH/USDT`, `eth-usdt`, and `ETHUSDT` are all normalized to `ETHUSDT`. - **Symbol normalization:** `market_service.normalize_symbol()` strips `/`, `-`, `_`, and uppercases the symbol. CLI inputs like `ETH/USDT`, `eth-usdt`, and `ETHUSDT` are all normalized to `ETHUSDT`.
- **Dry-run behavior:** Trade commands support `--dry-run`. If omitted, the default falls back to `trading.dry_run_default` in `config.toml`. - **Dry-run behavior:** Trade commands support `--dry-run`. If omitted, the default falls back to `trading.dry_run_default` in `config.toml`.
- **Client injection:** Service functions accept `spot_client` / `futures_client` as keyword arguments. This enables easy unit testing with mocks. - **Client injection:** Service functions accept `spot_client` as a keyword argument. This enables easy unit testing with mocks.
- **Error handling:** Binance client wrappers catch `requests.exceptions.SSLError` and `RequestException` and re-raise as human-readable `RuntimeError`. The CLI catches all exceptions in `main()` and prints `error: {message}` to stderr with exit code 1. - **Error handling:** `spot_client.py` catches `requests.exceptions.SSLError` and `RequestException` and re-raises as human-readable `RuntimeError`. The CLI catches all exceptions in `main()` and prints `error: {message}` to stderr with exit code 1.
- **Ticker API fallback:** `spot_client.ticker_stats()` uses `rolling_window_ticker` for symbol-specific queries and `ticker_24hr` for full-market scans (no symbols).
- **Output modes:** All commands support `--agent` for JSON output and `--doc` to print the command's output schema.
## Testing ## Testing
@@ -56,8 +61,12 @@ Tests live in `tests/` and use `unittest.TestCase` with `unittest.mock.patch`. T
- `test_cli.py` — parser smoke tests and dispatch behavior - `test_cli.py` — parser smoke tests and dispatch behavior
- `test_config_runtime.py` — config loading, env parsing, path resolution - `test_config_runtime.py` — config loading, env parsing, path resolution
- `test_account_market_services.py` — balance/position/ticker/klines logic with mocked clients - `test_account_market_services.py` — balance/position/ticker/klines logic with mocked clients
- `test_trade_service.py` — spot and futures trade execution paths - `test_trade_service.py` — spot trade execution paths
- `test_opportunity_service.py` — portfolio and scan scoring logic - `test_opportunity_service.py` — portfolio and scan scoring logic
- `test_opportunity_dataset_service.py` — dataset collection and walk-forward evaluation
- `test_opportunity_evaluation_service.py` — model weight optimization
- `test_strategy_service.py` — combined signal generation logic
- `test_backtest_service.py` — historical backtest engine
## Notes ## Notes

View File

@@ -26,11 +26,13 @@ from .runtime import (
) )
from .services import ( from .services import (
account_service, account_service,
backtest_service,
market_service, market_service,
opportunity_dataset_service, opportunity_dataset_service,
opportunity_evaluation_service, opportunity_evaluation_service,
opportunity_service, opportunity_service,
portfolio_service, portfolio_service,
strategy_service,
trade_service, trade_service,
) )
@@ -45,6 +47,8 @@ examples:
coin opportunity -s BTCUSDT ETHUSDT coin opportunity -s BTCUSDT ETHUSDT
coin opportunity evaluate ~/.coinhunter/datasets/opportunity_dataset.json --agent coin opportunity evaluate ~/.coinhunter/datasets/opportunity_dataset.json --agent
coin opportunity optimize ~/.coinhunter/datasets/opportunity_dataset.json --agent coin opportunity optimize ~/.coinhunter/datasets/opportunity_dataset.json --agent
coin strategy -s BTCUSDT ETHUSDT
coin backtest ~/.coinhunter/datasets/opportunity_dataset_20260101T000000Z.json
coin upgrade coin upgrade
""" """
@@ -967,6 +971,26 @@ def build_parser() -> argparse.ArgumentParser:
optimize_parser.add_argument("--passes", type=int, default=2, help="Coordinate-search passes over model weights") optimize_parser.add_argument("--passes", type=int, default=2, help="Coordinate-search passes over model weights")
_add_global_flags(optimize_parser) _add_global_flags(optimize_parser)
strategy_parser = subparsers.add_parser(
"strategy", aliases=["strat", "st"], help="Combined opportunity + portfolio trade signals",
description="Generate unified buy/sell/hold signals by combining opportunity scanning and portfolio analysis.",
)
strategy_parser.add_argument("-s", "--symbols", nargs="*", metavar="SYM", help="Restrict scan to specific symbols")
_add_global_flags(strategy_parser)
backtest_parser = subparsers.add_parser(
"backtest", aliases=["bt"], help="Backtest combined strategy on historical dataset",
description="Run a walk-forward backtest using historical kline datasets with virtual cash and positions.",
)
backtest_parser.add_argument("dataset", help="Path to an opportunity dataset JSON file")
backtest_parser.add_argument("--initial-cash", type=float, help="Initial cash allocation (default: 10000)")
backtest_parser.add_argument("--max-positions", type=int, help="Maximum simultaneous positions (default: 5)")
backtest_parser.add_argument("--position-size-pct", type=float, help="Cash percentage per position (default: 0.2)")
backtest_parser.add_argument("--commission-pct", type=float, help="Commission per trade in percent (default: 0.1)")
backtest_parser.add_argument("--lookback", type=int, help="Closed candles used for each point-in-time score")
backtest_parser.add_argument("--decision-interval", type=int, help="Minimum minutes between decision points (default: 0 = every candle)")
_add_global_flags(backtest_parser)
upgrade_parser = subparsers.add_parser( upgrade_parser = subparsers.add_parser(
"upgrade", help="Upgrade coinhunter to the latest version", "upgrade", help="Upgrade coinhunter to the latest version",
description="Upgrade the coinhunter package using pipx (preferred) or pip.", description="Upgrade the coinhunter package using pipx (preferred) or pip.",
@@ -1006,6 +1030,9 @@ _CANONICAL_COMMANDS = {
"o": "opportunity", "o": "opportunity",
"cfg": "config", "cfg": "config",
"c": "config", "c": "config",
"strat": "strategy",
"st": "strategy",
"bt": "backtest",
} }
_CANONICAL_SUBCOMMANDS = { _CANONICAL_SUBCOMMANDS = {
@@ -1263,6 +1290,30 @@ def main(argv: list[str] | None = None) -> int:
print_output(result, agent=args.agent) print_output(result, agent=args.agent)
return 0 return 0
if args.command == "strategy":
spot_client = _load_spot_client(config)
with with_spinner("Generating trade signals...", enabled=not args.agent):
result = strategy_service.generate_trade_signals(
config, spot_client=spot_client, symbols=args.symbols
)
print_output(result, agent=args.agent)
return 0
if args.command == "backtest":
with with_spinner("Running backtest...", enabled=not args.agent):
result = backtest_service.run_backtest(
config,
dataset_path=args.dataset,
initial_cash=args.initial_cash,
max_positions=args.max_positions,
position_size_pct=args.position_size_pct / 100.0 if args.position_size_pct is not None else None,
commission_pct=args.commission_pct / 100.0 if args.commission_pct is not None else None,
lookback=args.lookback,
decision_interval_minutes=args.decision_interval,
)
print_output(result, agent=args.agent)
return 0
if args.command == "opportunity": if args.command == "opportunity":
if args.opportunity_command == "optimize": if args.opportunity_command == "optimize":
with with_spinner("Optimizing opportunity model...", enabled=not args.agent): with with_spinner("Optimizing opportunity model...", enabled=not args.agent):

View File

@@ -0,0 +1,370 @@
"""Backtest engine for combined opportunity + portfolio strategy."""
from __future__ import annotations
import json
from collections import defaultdict
from copy import deepcopy
from dataclasses import asdict, dataclass
from datetime import datetime, timezone
from pathlib import Path
from statistics import mean
from typing import Any
from .market_service import normalize_symbol
from .signal_service import get_signal_interval, score_opportunity_signal, score_portfolio_signal
from .strategy_service import generate_signals_from_klines
@dataclass
class Position:
symbol: str
qty: float
entry_price: float
entry_time: str
notional_usdt: float
@dataclass
class Trade:
time: str
symbol: str
side: str
price: float
qty: float
notional: float
commission: float
reason: str
def _as_float(value: Any, default: float = 0.0) -> float:
try:
return float(value)
except (TypeError, ValueError):
return default
def _as_int(value: Any, default: int = 0) -> int:
try:
return int(value)
except (TypeError, ValueError):
return default
def _parse_dt(value: Any) -> datetime | None:
if not value:
return None
try:
return datetime.fromisoformat(str(value).replace("Z", "+00:00")).astimezone(timezone.utc)
except ValueError:
return None
def _iso_from_ms(value: int) -> str:
return (
datetime.fromtimestamp(value / 1000, tz=timezone.utc)
.replace(microsecond=0)
.isoformat()
.replace("+00:00", "Z")
)
def _close(row: list[Any]) -> float:
return _as_float(row[4])
def _open_ms(row: list[Any]) -> int:
return int(row[0])
def _ticker_from_window(symbol: str, rows: list[list[Any]]) -> dict[str, Any]:
first = _close(rows[0])
last = _close(rows[-1])
price_change_pct = ((last - first) / first * 100.0) if first else 0.0
return {
"symbol": symbol,
"price_change_pct": price_change_pct,
"quote_volume": sum(_close(row) * _as_float(row[5]) for row in rows),
"high_price": max(_as_float(row[2]) for row in rows),
"low_price": min(_as_float(row[3]) for row in rows),
}
def _window_series(rows: list[list[Any]]) -> tuple[list[float], list[float]]:
return [_close(row) for row in rows], [_as_float(row[5]) for row in rows]
def _portfolio_value(cash: float, positions: list[Position], prices: dict[str, float]) -> float:
return cash + sum(p.qty * prices.get(p.symbol, p.entry_price) for p in positions)
def _pct(new: float, old: float) -> float:
if old == 0:
return 0.0
return (new - old) / old
def run_backtest(
config: dict[str, Any],
*,
dataset_path: str,
initial_cash: float | None = None,
max_positions: int | None = None,
position_size_pct: float | None = None,
commission_pct: float | None = None,
lookback: int | None = None,
decision_interval_minutes: int | None = None,
) -> dict[str, Any]:
"""Run a walk-forward backtest using historical kline datasets.
Maintains virtual cash and positions. At each decision point:
1. Sells positions where portfolio signals "exit" or "trim"
2. Buys top opportunity "entry" signals within cash and position limits
"""
dataset_file = Path(dataset_path).expanduser()
dataset = json.loads(dataset_file.read_text(encoding="utf-8"))
metadata = dataset.get("metadata", {})
plan = metadata.get("plan", {})
klines = dataset.get("klines", {})
intervals = list(plan.get("intervals") or [])
configured_interval = get_signal_interval(config)
primary_interval = configured_interval if configured_interval in intervals else (intervals[0] if intervals else "1h")
simulation_start = _parse_dt(plan.get("simulation_start"))
simulation_end = _parse_dt(plan.get("simulation_end"))
if simulation_start is None or simulation_end is None:
raise ValueError("dataset metadata must include plan.simulation_start and plan.simulation_end")
opportunity_config = config.get("opportunity", {})
portfolio_config = config.get("portfolio", {})
cash = _as_float(initial_cash, 10000.0)
max_pos = _as_int(max_positions, _as_int(portfolio_config.get("max_positions"), 5))
size_pct = _as_float(position_size_pct, _as_float(opportunity_config.get("backtest_position_size_pct"), 0.2))
commission = _as_float(commission_pct, _as_float(config.get("trading", {}).get("commission_pct"), 0.001))
lookback_bars = lookback or _as_int(opportunity_config.get("evaluation_lookback"), 24)
start_ms = int(simulation_start.timestamp() * 1000)
end_ms = int(simulation_end.timestamp() * 1000)
rows_by_symbol: dict[str, list[list[Any]]] = {}
index_by_symbol: dict[str, dict[int, int]] = {}
for symbol, by_interval in klines.items():
rows = by_interval.get(primary_interval, [])
normalized = normalize_symbol(symbol)
if rows:
rows_by_symbol[normalized] = rows
index_by_symbol[normalized] = {_open_ms(row): index for index, row in enumerate(rows)}
decision_times = sorted(
{
_open_ms(row)
for rows in rows_by_symbol.values()
for row in rows
if start_ms <= _open_ms(row) < end_ms
}
)
interval_ms = _as_int(decision_interval_minutes, 0) * 60 * 1000
if interval_ms > 0 and decision_times:
filtered: list[int] = []
anchor = decision_times[0]
for t in decision_times:
if t - anchor >= interval_ms:
filtered.append(t)
anchor = t
decision_times = filtered
positions: list[Position] = []
trades: list[dict[str, Any]] = []
equity_curve: list[dict[str, Any]] = []
skipped_warmup = 0
skipped_missing_future = 0
for decision_time in decision_times:
current_prices: dict[str, float] = {}
klines_snapshot: dict[str, list[list[Any]]] = {}
for symbol, rows in rows_by_symbol.items():
index = index_by_symbol[symbol].get(decision_time)
if index is None:
continue
window = rows[max(0, index - lookback_bars + 1) : index + 1]
if len(window) < lookback_bars:
skipped_warmup += 1
continue
future_rows = [row for row in rows[index + 1 :]]
if not future_rows:
skipped_missing_future += 1
continue
klines_snapshot[symbol] = window
current_prices[symbol] = _close(window[-1])
# Build held positions for portfolio signal generation
held_positions = [
{
"symbol": p.symbol,
"notional_usdt": p.qty * current_prices.get(p.symbol, p.entry_price),
}
for p in positions
if p.symbol in current_prices
]
signals = generate_signals_from_klines(config, klines_by_symbol=klines_snapshot, held_positions=held_positions)
# Execute sells first to free cash
sell_symbols = {normalize_symbol(s["symbol"]) for s in signals.get("sell", [])}
new_positions: list[Position] = []
for pos in positions:
if pos.symbol in sell_symbols and pos.symbol in current_prices:
price = current_prices[pos.symbol]
notional = pos.qty * price
comm = notional * commission
cash += notional - comm
trades.append(
asdict(
Trade(
time=_iso_from_ms(decision_time),
symbol=pos.symbol,
side="SELL",
price=round(price, 8),
qty=round(pos.qty, 8),
notional=round(notional, 4),
commission=round(comm, 4),
reason="portfolio signal: exit/trim",
)
)
)
else:
new_positions.append(pos)
positions = new_positions
# Execute buys with available cash
available_slots = max_pos - len(positions)
if available_slots > 0 and cash > 0:
for buy_signal in signals.get("buy", [])[:available_slots]:
symbol = normalize_symbol(buy_signal["symbol"])
if symbol not in current_prices:
continue
# Skip if already held
if any(p.symbol == symbol for p in positions):
continue
price = current_prices[symbol]
allocation = cash * size_pct
if allocation <= 0:
continue
qty = allocation / price
comm = allocation * commission
actual_notional = allocation - comm
if actual_notional <= 0:
continue
cash -= allocation
positions.append(
Position(
symbol=symbol,
qty=round(qty, 8),
entry_price=round(price, 8),
entry_time=_iso_from_ms(decision_time),
notional_usdt=round(actual_notional, 4),
)
)
trades.append(
asdict(
Trade(
time=_iso_from_ms(decision_time),
symbol=symbol,
side="BUY",
price=round(price, 8),
qty=round(qty, 8),
notional=round(allocation, 4),
commission=round(comm, 4),
reason=buy_signal.get("reasons", ["opportunity entry"])[0],
)
)
)
# Record equity
equity = _portfolio_value(cash, positions, current_prices)
equity_curve.append(
{
"time": _iso_from_ms(decision_time),
"equity": round(equity, 4),
"cash": round(cash, 4),
"positions_count": len(positions),
}
)
# Final valuation
final_prices: dict[str, float] = {}
for symbol, rows in rows_by_symbol.items():
if rows:
final_prices[symbol] = _close(rows[-1])
final_equity = _portfolio_value(cash, positions, final_prices)
if equity_curve:
equity_curve[-1]["equity"] = round(final_equity, 4)
# Performance metrics
initial_equity = equity_curve[0]["equity"] if equity_curve else cash
total_return = _pct(final_equity, initial_equity)
equity_values = [e["equity"] for e in equity_curve]
peak = initial_equity
max_drawdown = 0.0
for val in equity_values:
if val > peak:
peak = val
dd = _pct(val, peak)
if dd < max_drawdown:
max_drawdown = dd
buy_trades = [t for t in trades if t["side"] == "BUY"]
sell_trades = [t for t in trades if t["side"] == "SELL"]
trade_returns: list[float] = []
position_map: dict[str, dict[str, Any]] = {}
for t in buy_trades:
position_map[t["symbol"]] = t
for t in sell_trades:
buy_trade = position_map.get(t["symbol"])
if buy_trade:
trade_return = _pct(t["notional"] - t["commission"], buy_trade["notional"] + buy_trade["commission"])
trade_returns.append(trade_return)
wins = sum(1 for r in trade_returns if r > 0)
losses = len(trade_returns) - wins
return {
"summary": {
"initial_cash": round(initial_equity, 4),
"final_equity": round(final_equity, 4),
"total_return_pct": round(total_return * 100, 4),
"max_drawdown_pct": round(max_drawdown * 100, 4),
"buy_signals": len(buy_trades),
"sell_signals": len(sell_trades),
"completed_trades": len(trade_returns),
"win_rate": round(wins / len(trade_returns), 4) if trade_returns else 0.0,
"wins": wins,
"losses": losses,
"avg_trade_return_pct": round(mean(trade_returns) * 100, 4) if trade_returns else 0.0,
"open_positions": len(positions),
"decision_points": len(decision_times),
"skipped_warmup": skipped_warmup,
"skipped_missing_future": skipped_missing_future,
},
"trades": trades,
"equity_curve": equity_curve,
"open_positions": [asdict(p) for p in positions],
"parameters": {
"dataset": str(dataset_file),
"interval": primary_interval,
"initial_cash": cash if not trades else initial_equity,
"max_positions": max_pos,
"position_size_pct": size_pct,
"commission_pct": commission,
"lookback_bars": lookback_bars,
"decision_interval_minutes": _as_int(decision_interval_minutes, 0),
},
}

View File

@@ -0,0 +1,339 @@
"""Unified strategy combining opportunity scanning and portfolio management."""
from __future__ import annotations
from dataclasses import asdict, dataclass
from typing import Any
from .market_service import normalize_symbol
from .opportunity_service import (
_action_for_opportunity,
_opportunity_thresholds,
scan_opportunities,
)
from .portfolio_service import (
_action_for_position,
_portfolio_thresholds,
analyze_portfolio,
)
from .signal_service import score_opportunity_signal, score_portfolio_signal
@dataclass
class TradeSignal:
symbol: str
action: str
side: str
score: float
reasons: list[str]
opportunity_metrics: dict[str, float]
portfolio_metrics: dict[str, float]
def _held_symbols(positions: list[dict[str, Any]]) -> set[str]:
return {normalize_symbol(p["symbol"]) for p in positions}
def generate_trade_signals(
config: dict[str, Any],
*,
spot_client: Any,
symbols: list[str] | None = None,
) -> dict[str, Any]:
"""Combine opportunity and portfolio signals into unified buy/sell/hold recommendations.
Buy criteria:
- Opportunity action is "entry"
- Not already held OR portfolio allows "add"
- Position concentration below max weight
Sell criteria:
- Position exists and portfolio action is "exit" or "trim"
Hold criteria:
- Position exists and portfolio action is "hold"
"""
portfolio_config = config.get("portfolio", {})
max_position_weight = float(portfolio_config.get("max_position_weight", 0.6))
opp_result = scan_opportunities(config, spot_client=spot_client, symbols=symbols)
pf_result = analyze_portfolio(config, spot_client=spot_client)
held = {normalize_symbol(p["symbol"]): p for p in pf_result.get("recommendations", [])}
total_notional = sum(p.get("metrics", {}).get("position_weight", 0) for p in held.values()) or 1.0
buys: list[dict[str, Any]] = []
sells: list[dict[str, Any]] = []
holds: list[dict[str, Any]] = []
for rec in opp_result.get("recommendations", []):
symbol = normalize_symbol(rec["symbol"])
opp_action = rec["action"]
score = rec["score"]
reasons = list(rec.get("reasons", []))
opp_metrics = dict(rec.get("metrics", {}))
pf_rec = held.get(symbol)
pf_action = pf_rec["action"] if pf_rec else "none"
pf_metrics = dict(pf_rec.get("metrics", {})) if pf_rec else {}
concentration = pf_metrics.get("position_weight", 0.0)
if opp_action == "entry" and (symbol not in held or pf_action in ("add", "hold")):
if concentration < max_position_weight:
reasons.append(f"portfolio: {pf_action or 'not held'} -> buy")
buys.append(
asdict(
TradeSignal(
symbol=symbol,
action="buy",
side="BUY",
score=round(score, 4),
reasons=reasons,
opportunity_metrics=opp_metrics,
portfolio_metrics=pf_metrics,
)
)
)
else:
reasons.append(f"portfolio: position weight {concentration:.2%} at max -> skip")
holds.append(
asdict(
TradeSignal(
symbol=symbol,
action="hold",
side="HOLD",
score=round(score, 4),
reasons=reasons,
opportunity_metrics=opp_metrics,
portfolio_metrics=pf_metrics,
)
)
)
for symbol, pf_rec in held.items():
pf_action = pf_rec["action"]
score = pf_rec["score"]
reasons = list(pf_rec.get("reasons", []))
pf_metrics = dict(pf_rec.get("metrics", {}))
opp_rec = next((r for r in opp_result.get("recommendations", []) if normalize_symbol(r["symbol"]) == symbol), None)
opp_metrics = dict(opp_rec.get("metrics", {})) if opp_rec else {}
if pf_action in ("exit", "trim"):
reasons.append(f"opportunity: {opp_rec['action'] if opp_rec else 'not in scan'} -> sell")
sells.append(
asdict(
TradeSignal(
symbol=symbol,
action="sell",
side="SELL",
score=round(score, 4),
reasons=reasons,
opportunity_metrics=opp_metrics,
portfolio_metrics=pf_metrics,
)
)
)
elif pf_action == "hold":
reasons.append(f"opportunity: {opp_rec['action'] if opp_rec else 'not in scan'} -> hold")
holds.append(
asdict(
TradeSignal(
symbol=symbol,
action="hold",
side="HOLD",
score=round(score, 4),
reasons=reasons,
opportunity_metrics=opp_metrics,
portfolio_metrics=pf_metrics,
)
)
)
elif pf_action == "add":
# Already handled in buy loop if opp is entry; otherwise treat as hold
if not any(normalize_symbol(b["symbol"]) == symbol for b in buys):
reasons.append("opportunity: no entry signal -> hold")
holds.append(
asdict(
TradeSignal(
symbol=symbol,
action="hold",
side="HOLD",
score=round(score, 4),
reasons=reasons,
opportunity_metrics=opp_metrics,
portfolio_metrics=pf_metrics,
)
)
)
return {
"buy": sorted(buys, key=lambda item: item["score"], reverse=True),
"sell": sorted(sells, key=lambda item: item["score"]),
"hold": sorted(holds, key=lambda item: item["score"], reverse=True),
}
def _series_from_klines(klines: list[list[Any]]) -> tuple[list[float], list[float]]:
return [float(item[4]) for item in klines], [float(item[5]) for item in klines]
def generate_signals_from_klines(
config: dict[str, Any],
*,
klines_by_symbol: dict[str, list[list[Any]]],
held_positions: list[dict[str, Any]],
) -> dict[str, Any]:
"""Pure version of signal generation that works on in-memory klines.
Used by backtest to avoid network calls.
"""
opportunity_config = config.get("opportunity", {})
portfolio_config = config.get("portfolio", {})
thresholds = _opportunity_thresholds(config)
pf_thresholds = _portfolio_thresholds(config)
max_position_weight = pf_thresholds["max_position_weight"]
weights = opportunity_config.get("weights", {})
signal_weights = config.get("signal", {})
top_n = int(opportunity_config.get("top_n", 10))
held = {normalize_symbol(p["symbol"]): p for p in held_positions}
total_notional = sum(p.get("notional_usdt", 0) for p in held_positions) or 1.0
opp_candidates: list[dict[str, Any]] = []
for symbol, rows in klines_by_symbol.items():
if len(rows) < 6:
continue
closes, volumes = _series_from_klines(rows)
ticker = {
"symbol": symbol,
"price_change_pct": ((closes[-1] - closes[0]) / closes[0] * 100) if closes[0] else 0.0,
"quote_volume": sum(c * v for c, v in zip(closes, volumes)),
}
opportunity_score, metrics = score_opportunity_signal(closes, volumes, ticker, opportunity_config)
score = opportunity_score
metrics["opportunity_score"] = round(opportunity_score, 4)
action, reasons, _confidence = _action_for_opportunity(score, metrics, thresholds)
opp_candidates.append({
"symbol": symbol,
"action": action,
"score": round(score, 4),
"metrics": metrics,
"reasons": reasons,
})
pf_results: dict[str, dict[str, Any]] = {}
for symbol, position in held.items():
rows = klines_by_symbol.get(symbol, [])
if len(rows) < 2:
continue
closes, volumes = _series_from_klines(rows)
ticker = {"price_change_pct": ((closes[-1] - closes[0]) / closes[0] * 100) if closes[0] else 0.0}
concentration = position.get("notional_usdt", 0) / total_notional
score, metrics = score_portfolio_signal(closes, volumes, ticker, signal_weights)
pf_action, pf_reasons = _action_for_position(score, concentration, pf_thresholds)
metrics["position_weight"] = round(concentration, 4)
pf_results[symbol] = {
"symbol": symbol,
"action": pf_action,
"score": round(score, 4),
"reasons": pf_reasons,
"metrics": metrics,
"notional_usdt": position.get("notional_usdt", 0),
}
buys: list[dict[str, Any]] = []
sells: list[dict[str, Any]] = []
holds: list[dict[str, Any]] = []
for rec in sorted(opp_candidates, key=lambda item: item["score"], reverse=True)[:top_n]:
symbol = normalize_symbol(rec["symbol"])
opp_action = rec["action"]
score = rec["score"]
reasons = list(rec.get("reasons", []))
opp_metrics = dict(rec.get("metrics", {}))
pf_rec = pf_results.get(symbol)
pf_action = pf_rec["action"] if pf_rec else "none"
pf_metrics = dict(pf_rec.get("metrics", {})) if pf_rec else {}
concentration = pf_metrics.get("position_weight", 0.0)
if opp_action == "entry" and (symbol not in held or pf_action in ("add", "hold")):
if concentration < max_position_weight:
reasons.append(f"portfolio: {pf_action or 'not held'} -> buy")
buys.append(
asdict(
TradeSignal(
symbol=symbol,
action="buy",
side="BUY",
score=round(score, 4),
reasons=reasons,
opportunity_metrics=opp_metrics,
portfolio_metrics=pf_metrics,
)
)
)
else:
reasons.append(f"portfolio: position weight {concentration:.2%} at max -> skip")
for symbol, pf_rec in pf_results.items():
pf_action = pf_rec["action"]
score = pf_rec["score"]
reasons = list(pf_rec.get("reasons", []))
pf_metrics = dict(pf_rec.get("metrics", {}))
opp_rec = next((r for r in opp_candidates if normalize_symbol(r["symbol"]) == symbol), None)
opp_metrics = dict(opp_rec.get("metrics", {})) if opp_rec else {}
if pf_action in ("exit", "trim"):
reasons.append(f"opportunity: {opp_rec['action'] if opp_rec else 'not in scan'} -> sell")
sells.append(
asdict(
TradeSignal(
symbol=symbol,
action="sell",
side="SELL",
score=round(score, 4),
reasons=reasons,
opportunity_metrics=opp_metrics,
portfolio_metrics=pf_metrics,
)
)
)
elif pf_action == "hold":
reasons.append(f"opportunity: {opp_rec['action'] if opp_rec else 'not in scan'} -> hold")
holds.append(
asdict(
TradeSignal(
symbol=symbol,
action="hold",
side="HOLD",
score=round(score, 4),
reasons=reasons,
opportunity_metrics=opp_metrics,
portfolio_metrics=pf_metrics,
)
)
)
elif pf_action == "add":
if not any(normalize_symbol(b["symbol"]) == symbol for b in buys):
reasons.append("opportunity: no entry signal -> hold")
holds.append(
asdict(
TradeSignal(
symbol=symbol,
action="hold",
side="HOLD",
score=round(score, 4),
reasons=reasons,
opportunity_metrics=opp_metrics,
portfolio_metrics=pf_metrics,
)
)
)
return {
"buy": sorted(buys, key=lambda item: item["score"], reverse=True),
"sell": sorted(sells, key=lambda item: item["score"]),
"hold": sorted(holds, key=lambda item: item["score"], reverse=True),
}

View File

@@ -0,0 +1,129 @@
"""Tests for backtest_service."""
from __future__ import annotations
import json
import tempfile
import unittest
from pathlib import Path
from typing import Any
from coinhunter.services import backtest_service
class BacktestServiceTestCase(unittest.TestCase):
def _klines(self, closes: list[float], start_ms: int = 0, volumes: list[float] | None = None) -> list[list[float]]:
volumes = volumes or [1.0] * len(closes)
return [
[start_ms + i * 3600000, c * 0.98, c * 1.02, c * 0.97, c, v, 0.0, c * v, 100, 0.0, 0.0, 0.0]
for i, (c, v) in enumerate(zip(closes, volumes))
]
def _config(self) -> dict[str, Any]:
return {
"opportunity": {
"entry_threshold": 1.5,
"watch_threshold": 0.6,
"min_trigger_score": 0.45,
"min_setup_score": 0.35,
"overlap_penalty": 0.6,
"top_n": 10,
"scan_limit": 50,
"kline_limit": 48,
"weights": {},
"model_weights": {},
},
"portfolio": {
"add_threshold": 1.5,
"hold_threshold": 0.6,
"trim_threshold": 0.2,
"exit_threshold": -0.2,
"max_position_weight": 0.6,
"max_positions": 5,
},
"signal": {
"lookback_interval": "1h",
},
"market": {
"default_quote": "USDT",
},
"trading": {
"commission_pct": 0.001,
},
}
def _make_dataset(self, closes_by_symbol: dict[str, list[float]], start_iso: str = "2025-12-28T00:00:00Z", sim_start_iso: str = "2025-12-30T00:00:00Z", sim_end_iso: str = "2026-01-01T00:00:00Z") -> Path:
from datetime import datetime, timezone
start_ms = int(datetime.fromisoformat(start_iso.replace("Z", "+00:00")).timestamp() * 1000)
klines: dict[str, dict[str, list[list[float]]]] = {}
for symbol, closes in closes_by_symbol.items():
klines[symbol] = {"1h": self._klines(closes, start_ms=start_ms)}
dataset = {
"metadata": {
"created_at": "2026-01-01T00:00:00Z",
"quote": "USDT",
"symbols": list(closes_by_symbol.keys()),
"plan": {
"intervals": ["1h"],
"kline_limit": 48,
"reference_days": 2.0,
"simulate_days": 1.0,
"run_days": 1.0,
"total_days": 4.0,
"start": start_iso,
"simulation_start": sim_start_iso,
"simulation_end": sim_end_iso,
"end": sim_end_iso,
},
"external_history": {"provider": "disabled", "status": "disabled"},
},
"klines": klines,
}
fp = tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False)
json.dump(dataset, fp)
fp.close()
return Path(fp.name)
def test_run_backtest_produces_summary(self) -> None:
config = self._config()
closes = list(range(20, 92))
path = self._make_dataset({"BTCUSDT": closes})
try:
result = backtest_service.run_backtest(config, dataset_path=str(path), initial_cash=10000.0)
self.assertIn("summary", result)
self.assertIn("trades", result)
self.assertIn("equity_curve", result)
self.assertIn("parameters", result)
summary = result["summary"]
self.assertIn("initial_cash", summary)
self.assertIn("final_equity", summary)
self.assertIn("total_return_pct", summary)
self.assertIn("max_drawdown_pct", summary)
self.assertIn("win_rate", summary)
finally:
path.unlink()
def test_run_backtest_missing_simulation_dates_raises(self) -> None:
config = self._config()
path = self._make_dataset({"BTCUSDT": list(range(20, 92))}, sim_start_iso="", sim_end_iso="")
try:
with self.assertRaises(ValueError):
backtest_service.run_backtest(config, dataset_path=str(path))
finally:
path.unlink()
def test_run_backtest_tracks_equity_curve(self) -> None:
config = self._config()
# Need ~72 candles to cover 2025-12-28 through 2026-01-01 (warmup + simulation)
closes = list(range(20, 92))
path = self._make_dataset({"BTCUSDT": closes})
try:
result = backtest_service.run_backtest(config, dataset_path=str(path), initial_cash=10000.0)
self.assertTrue(len(result["equity_curve"]) > 0)
first = result["equity_curve"][0]
self.assertIn("time", first)
self.assertIn("equity", first)
self.assertIn("cash", first)
self.assertIn("positions_count", first)
finally:
path.unlink()

View File

@@ -336,6 +336,76 @@ class CLITestCase(unittest.TestCase):
max_examples=5, max_examples=5,
) )
def test_strategy_dispatches(self):
captured = {}
with (
patch.object(
cli, "load_config", return_value={"binance": {"spot_base_url": "https://test", "recv_window": 5000}, "market": {"default_quote": "USDT"}, "opportunity": {"top_n": 10}}
),
patch.object(cli, "get_binance_credentials", return_value={"api_key": "k", "api_secret": "s"}),
patch.object(cli, "SpotBinanceClient"),
patch.object(
cli.strategy_service,
"generate_trade_signals",
return_value={"buy": [{"symbol": "BTCUSDT", "score": 0.82}], "sell": [], "hold": []},
),
patch.object(
cli, "print_output", side_effect=lambda payload, **kwargs: captured.setdefault("payload", payload)
),
):
result = cli.main(["strategy", "-s", "BTCUSDT"])
self.assertEqual(result, 0)
self.assertEqual(captured["payload"]["buy"][0]["symbol"], "BTCUSDT")
def test_backtest_dispatches_without_private_client(self):
captured = {}
config = {"market": {"default_quote": "USDT"}, "opportunity": {}}
with (
patch.object(cli, "load_config", return_value=config),
patch.object(cli, "_load_spot_client", side_effect=AssertionError("backtest should use dataset only")),
patch.object(
cli.backtest_service,
"run_backtest",
return_value={"summary": {"total_return_pct": 5.0, "win_rate": 0.6}, "trades": []},
) as backtest_mock,
patch.object(
cli,
"print_output",
side_effect=lambda payload, **kwargs: captured.update({"payload": payload, "agent": kwargs["agent"]}),
),
):
result = cli.main(
[
"backtest",
"/tmp/dataset.json",
"--initial-cash",
"5000",
"--max-positions",
"3",
"--position-size-pct",
"20",
"--commission-pct",
"0.1",
"--lookback",
"12",
"--agent",
]
)
self.assertEqual(result, 0)
self.assertEqual(captured["payload"]["summary"]["total_return_pct"], 5.0)
self.assertTrue(captured["agent"])
backtest_mock.assert_called_once_with(
config,
dataset_path="/tmp/dataset.json",
initial_cash=5000.0,
max_positions=3,
position_size_pct=0.2,
commission_pct=0.001,
lookback=12,
decision_interval_minutes=None,
)
def test_opportunity_optimize_dispatches_without_private_client(self): def test_opportunity_optimize_dispatches_without_private_client(self):
captured = {} captured = {}
config = {"market": {"default_quote": "USDT"}, "opportunity": {}} config = {"market": {"default_quote": "USDT"}, "opportunity": {}}

View File

@@ -0,0 +1,100 @@
"""Tests for strategy_service."""
from __future__ import annotations
import unittest
from typing import Any
from unittest import mock
from unittest.mock import MagicMock
from coinhunter.services import strategy_service
class StrategyServiceTestCase(unittest.TestCase):
def _klines(self, closes: list[float], volumes: list[float] | None = None) -> list[list[float]]:
volumes = volumes or [1.0] * len(closes)
return [
[i * 3600000.0, c * 0.98, c * 1.02, c * 0.97, c, v, 0.0, c * v, 100, 0.0, 0.0, 0.0]
for i, (c, v) in enumerate(zip(closes, volumes))
]
def _config(self) -> dict[str, Any]:
return {
"opportunity": {
"entry_threshold": 1.5,
"watch_threshold": 0.6,
"min_trigger_score": 0.45,
"min_setup_score": 0.35,
"overlap_penalty": 0.6,
"top_n": 10,
"scan_limit": 50,
"kline_limit": 48,
"weights": {},
"model_weights": {},
},
"portfolio": {
"add_threshold": 1.5,
"hold_threshold": 0.6,
"trim_threshold": 0.2,
"exit_threshold": -0.2,
"max_position_weight": 0.6,
},
"signal": {
"lookback_interval": "1h",
},
"market": {
"default_quote": "USDT",
},
}
def test_generate_signals_from_klines_buy_when_entry_and_not_held(self) -> None:
config = self._config()
closes = list(range(20, 40))
klines = {"BTCUSDT": self._klines(closes)}
result = strategy_service.generate_signals_from_klines(config, klines_by_symbol=klines, held_positions=[])
self.assertIn("buy", result)
self.assertIn("sell", result)
self.assertIn("hold", result)
def test_generate_signals_from_klines_sell_when_exit_signal(self) -> None:
config = self._config()
closes = list(range(40, 20, -1))
klines = {"BTCUSDT": self._klines(closes)}
held = [{"symbol": "BTCUSDT", "notional_usdt": 1000.0}]
result = strategy_service.generate_signals_from_klines(config, klines_by_symbol=klines, held_positions=held)
symbols = [s["symbol"] for s in result["sell"]]
self.assertIn("BTCUSDT", symbols)
def test_generate_signals_respects_max_position_weight(self) -> None:
config = self._config()
config["portfolio"]["max_position_weight"] = 0.01
closes = list(range(20, 40))
klines = {"BTCUSDT": self._klines(closes)}
held = [{"symbol": "BTCUSDT", "notional_usdt": 9999.0}]
result = strategy_service.generate_signals_from_klines(config, klines_by_symbol=klines, held_positions=held)
buy_symbols = [s["symbol"] for s in result["buy"]]
self.assertNotIn("BTCUSDT", buy_symbols)
@mock.patch("coinhunter.services.portfolio_service.audit_event")
@mock.patch("coinhunter.services.opportunity_service.audit_event")
def test_generate_trade_signals_dispatches_to_services(self, mock_audit_opp, mock_audit_pf) -> None:
mock_client = MagicMock()
mock_client.klines.return_value = self._klines(list(range(20, 44)))
mock_client.ticker_stats.return_value = [
{
"symbol": "BTCUSDT",
"lastPrice": "30.0",
"priceChangePercent": "5.0",
"quoteVolume": "1000000",
"highPrice": "31.0",
"lowPrice": "29.0",
}
]
mock_client.account.return_value = {"balances": [{"asset": "BTC", "free": "0.5", "locked": "0.0"}]}
mock_client.exchange_info.return_value = {"symbols": [{"symbol": "BTCUSDT", "status": "TRADING"}]}
config = self._config()
result = strategy_service.generate_trade_signals(config, spot_client=mock_client)
self.assertIn("buy", result)
self.assertIn("sell", result)
self.assertIn("hold", result)