refactor: simplify CLI to data layer for AI-assisted trading

Transform CoinHunter from an over-engineered auto-trading system into a
lightweight data-layer CLI paired with the coinbuddy AI Skill.

Key changes:
- Remove non-core commands: backtest, strategy, opportunity dataset/evaluate/optimize
- Add scan: rule-based market screening (zero token cost)
- Add analyze: multi-timeframe technical analysis for AI consumption
- Add watch: lightweight portfolio anomaly monitoring (zero token cost)
- Remove services: backtest, dataset, evaluation, research, strategy
- Add analyze_service with RSI, key levels, alerts, and AI-friendly summaries
- Add watch_portfolio with drawdown/spike/concentration/technical triggers
- Simplify config: remove research/dataset settings, add watch thresholds
- Update TUI rendering for analyze and watch outputs
- Update tests and CLAUDE.md for new architecture

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-04-27 16:35:33 +08:00
parent e4b2239bcd
commit 76c4129c8d
18 changed files with 600 additions and 3142 deletions

View File

@@ -1,129 +0,0 @@
"""Tests for backtest_service."""
from __future__ import annotations
import json
import tempfile
import unittest
from pathlib import Path
from typing import Any
from coinhunter.services import backtest_service
class BacktestServiceTestCase(unittest.TestCase):
def _klines(self, closes: list[float], start_ms: int = 0, volumes: list[float] | None = None) -> list[list[float]]:
volumes = volumes or [1.0] * len(closes)
return [
[start_ms + i * 3600000, c * 0.98, c * 1.02, c * 0.97, c, v, 0.0, c * v, 100, 0.0, 0.0, 0.0]
for i, (c, v) in enumerate(zip(closes, volumes))
]
def _config(self) -> dict[str, Any]:
return {
"opportunity": {
"entry_threshold": 1.5,
"watch_threshold": 0.6,
"min_trigger_score": 0.45,
"min_setup_score": 0.35,
"overlap_penalty": 0.6,
"top_n": 10,
"scan_limit": 50,
"kline_limit": 48,
"weights": {},
"model_weights": {},
},
"portfolio": {
"add_threshold": 1.5,
"hold_threshold": 0.6,
"trim_threshold": 0.2,
"exit_threshold": -0.2,
"max_position_weight": 0.6,
"max_positions": 5,
},
"signal": {
"lookback_interval": "1h",
},
"market": {
"default_quote": "USDT",
},
"trading": {
"commission_pct": 0.001,
},
}
def _make_dataset(self, closes_by_symbol: dict[str, list[float]], start_iso: str = "2025-12-28T00:00:00Z", sim_start_iso: str = "2025-12-30T00:00:00Z", sim_end_iso: str = "2026-01-01T00:00:00Z") -> Path:
from datetime import datetime, timezone
start_ms = int(datetime.fromisoformat(start_iso.replace("Z", "+00:00")).timestamp() * 1000)
klines: dict[str, dict[str, list[list[float]]]] = {}
for symbol, closes in closes_by_symbol.items():
klines[symbol] = {"1h": self._klines(closes, start_ms=start_ms)}
dataset = {
"metadata": {
"created_at": "2026-01-01T00:00:00Z",
"quote": "USDT",
"symbols": list(closes_by_symbol.keys()),
"plan": {
"intervals": ["1h"],
"kline_limit": 48,
"reference_days": 2.0,
"simulate_days": 1.0,
"run_days": 1.0,
"total_days": 4.0,
"start": start_iso,
"simulation_start": sim_start_iso,
"simulation_end": sim_end_iso,
"end": sim_end_iso,
},
"external_history": {"provider": "disabled", "status": "disabled"},
},
"klines": klines,
}
fp = tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False)
json.dump(dataset, fp)
fp.close()
return Path(fp.name)
def test_run_backtest_produces_summary(self) -> None:
config = self._config()
closes = list(range(20, 92))
path = self._make_dataset({"BTCUSDT": closes})
try:
result = backtest_service.run_backtest(config, dataset_path=str(path), initial_cash=10000.0)
self.assertIn("summary", result)
self.assertIn("trades", result)
self.assertIn("equity_curve", result)
self.assertIn("parameters", result)
summary = result["summary"]
self.assertIn("initial_cash", summary)
self.assertIn("final_equity", summary)
self.assertIn("total_return_pct", summary)
self.assertIn("max_drawdown_pct", summary)
self.assertIn("win_rate", summary)
finally:
path.unlink()
def test_run_backtest_missing_simulation_dates_raises(self) -> None:
config = self._config()
path = self._make_dataset({"BTCUSDT": list(range(20, 92))}, sim_start_iso="", sim_end_iso="")
try:
with self.assertRaises(ValueError):
backtest_service.run_backtest(config, dataset_path=str(path))
finally:
path.unlink()
def test_run_backtest_tracks_equity_curve(self) -> None:
config = self._config()
# Need ~72 candles to cover 2025-12-28 through 2026-01-01 (warmup + simulation)
closes = list(range(20, 92))
path = self._make_dataset({"BTCUSDT": closes})
try:
result = backtest_service.run_backtest(config, dataset_path=str(path), initial_cash=10000.0)
self.assertTrue(len(result["equity_curve"]) > 0)
first = result["equity_curve"][0]
self.assertIn("time", first)
self.assertIn("equity", first)
self.assertIn("cash", first)
self.assertIn("positions_count", first)
finally:
path.unlink()

View File

@@ -10,7 +10,7 @@ from coinhunter import cli
class CLITestCase(unittest.TestCase):
def test_help_includes_v2_commands(self):
def test_help_includes_core_commands(self):
parser = cli.build_parser()
help_text = parser.format_help()
self.assertIn("init", help_text)
@@ -18,7 +18,9 @@ class CLITestCase(unittest.TestCase):
self.assertIn("buy", help_text)
self.assertIn("sell", help_text)
self.assertIn("portfolio", help_text)
self.assertIn("opportunity", help_text)
self.assertIn("scan", help_text)
self.assertIn("analyze", help_text)
self.assertIn("watch", help_text)
self.assertIn("--doc", help_text)
def test_init_dispatches(self):
@@ -150,11 +152,11 @@ class CLITestCase(unittest.TestCase):
self.assertEqual(result, 0)
self.assertEqual(captured["payload"]["recommendations"][0]["symbol"], "BTCUSDT")
def test_opportunity_dispatches(self):
def test_scan_dispatches(self):
captured = {}
with (
patch.object(
cli, "load_config", return_value={"binance": {"spot_base_url": "https://test", "recv_window": 5000}, "market": {"default_quote": "USDT"}, "opportunity": {"top_n": 10}}
cli, "load_config", return_value={"binance": {"spot_base_url": "https://test", "recv_window": 5000}, "market": {"default_quote": "USDT"}, "opportunity": {"top_n": 5}}
),
patch.object(cli, "get_binance_credentials", return_value={"api_key": "k", "api_secret": "s"}),
patch.object(cli, "SpotBinanceClient"),
@@ -167,10 +169,52 @@ class CLITestCase(unittest.TestCase):
cli, "print_output", side_effect=lambda payload, **kwargs: captured.setdefault("payload", payload)
),
):
result = cli.main(["opportunity", "-s", "BTCUSDT", "ETHUSDT"])
result = cli.main(["scan", "-s", "BTCUSDT", "ETHUSDT"])
self.assertEqual(result, 0)
self.assertEqual(captured["payload"]["recommendations"][0]["symbol"], "BTCUSDT")
def test_analyze_dispatches(self):
captured = {}
with (
patch.object(
cli, "load_config", return_value={"binance": {"spot_base_url": "https://test", "recv_window": 5000}, "market": {"default_quote": "USDT"}}
),
patch.object(cli, "get_binance_credentials", return_value={"api_key": "k", "api_secret": "s"}),
patch.object(cli, "SpotBinanceClient"),
patch.object(
cli.analyze_service,
"analyze_symbols",
return_value={"analyses": [{"symbol": "BTCUSDT", "summary": "test"}]},
),
patch.object(
cli, "print_output", side_effect=lambda payload, **kwargs: captured.setdefault("payload", payload)
),
):
result = cli.main(["analyze", "BTCUSDT", "ETHUSDT"])
self.assertEqual(result, 0)
self.assertEqual(captured["payload"]["analyses"][0]["symbol"], "BTCUSDT")
def test_watch_dispatches(self):
captured = {}
with (
patch.object(
cli, "load_config", return_value={"binance": {"spot_base_url": "https://test", "recv_window": 5000}, "market": {"default_quote": "USDT"}, "watch": {}}
),
patch.object(cli, "get_binance_credentials", return_value={"api_key": "k", "api_secret": "s"}),
patch.object(cli, "SpotBinanceClient"),
patch.object(
cli.portfolio_service,
"watch_portfolio",
return_value={"watch_results": [{"symbol": "BTCUSDT", "status": "healthy"}], "summary": "1 healthy", "need_review_count": 0, "healthy_count": 1},
),
patch.object(
cli, "print_output", side_effect=lambda payload, **kwargs: captured.setdefault("payload", payload)
),
):
result = cli.main(["watch"])
self.assertEqual(result, 0)
self.assertEqual(captured["payload"]["watch_results"][0]["symbol"], "BTCUSDT")
def test_catlog_dispatches(self):
captured = {}
with (
@@ -248,215 +292,3 @@ class CLITestCase(unittest.TestCase):
content = __import__("pathlib").Path(tmp_path).read_text()
self.assertIn("BINANCE_API_SECRET=test_secret_value", content)
__import__("os").unlink(tmp_path)
def test_opportunity_dataset_dispatches_without_private_client(self):
captured = {}
config = {"market": {"default_quote": "USDT"}, "opportunity": {}}
with (
patch.object(cli, "load_config", return_value=config),
patch.object(cli, "_load_spot_client", side_effect=AssertionError("dataset should use public data")),
patch.object(
cli.opportunity_dataset_service,
"collect_opportunity_dataset",
return_value={"path": "/tmp/dataset.json", "symbols": ["BTCUSDT"]},
) as collect_mock,
patch.object(
cli,
"print_output",
side_effect=lambda payload, **kwargs: captured.update({"payload": payload, "agent": kwargs["agent"]}),
),
):
result = cli.main(
["opportunity", "dataset", "--symbols", "BTCUSDT", "--simulate-days", "3", "--run-days", "7", "--agent"]
)
self.assertEqual(result, 0)
self.assertEqual(captured["payload"]["path"], "/tmp/dataset.json")
self.assertTrue(captured["agent"])
collect_mock.assert_called_once_with(
config,
symbols=["BTCUSDT"],
simulate_days=3.0,
run_days=7.0,
output_path=None,
)
def test_opportunity_evaluate_dispatches_without_private_client(self):
captured = {}
config = {"market": {"default_quote": "USDT"}, "opportunity": {}}
with (
patch.object(cli, "load_config", return_value=config),
patch.object(cli, "_load_spot_client", side_effect=AssertionError("evaluate should use dataset only")),
patch.object(
cli.opportunity_evaluation_service,
"evaluate_opportunity_dataset",
return_value={"summary": {"count": 1, "correct": 1}},
) as evaluate_mock,
patch.object(
cli,
"print_output",
side_effect=lambda payload, **kwargs: captured.update({"payload": payload, "agent": kwargs["agent"]}),
),
):
result = cli.main(
[
"opportunity",
"evaluate",
"/tmp/dataset.json",
"--horizon-hours",
"6",
"--take-profit-pct",
"2",
"--stop-loss-pct",
"1.5",
"--setup-target-pct",
"1",
"--lookback",
"24",
"--top-n",
"3",
"--examples",
"5",
"--agent",
]
)
self.assertEqual(result, 0)
self.assertEqual(captured["payload"]["summary"]["correct"], 1)
self.assertTrue(captured["agent"])
evaluate_mock.assert_called_once_with(
config,
dataset_path="/tmp/dataset.json",
horizon_hours=6.0,
take_profit=0.02,
stop_loss=0.015,
setup_target=0.01,
lookback=24,
top_n=3,
max_examples=5,
)
def test_strategy_dispatches(self):
captured = {}
with (
patch.object(
cli, "load_config", return_value={"binance": {"spot_base_url": "https://test", "recv_window": 5000}, "market": {"default_quote": "USDT"}, "opportunity": {"top_n": 10}}
),
patch.object(cli, "get_binance_credentials", return_value={"api_key": "k", "api_secret": "s"}),
patch.object(cli, "SpotBinanceClient"),
patch.object(
cli.strategy_service,
"generate_trade_signals",
return_value={"buy": [{"symbol": "BTCUSDT", "score": 0.82}], "sell": [], "hold": []},
),
patch.object(
cli, "print_output", side_effect=lambda payload, **kwargs: captured.setdefault("payload", payload)
),
):
result = cli.main(["strategy", "-s", "BTCUSDT"])
self.assertEqual(result, 0)
self.assertEqual(captured["payload"]["buy"][0]["symbol"], "BTCUSDT")
def test_backtest_dispatches_without_private_client(self):
captured = {}
config = {"market": {"default_quote": "USDT"}, "opportunity": {}}
with (
patch.object(cli, "load_config", return_value=config),
patch.object(cli, "_load_spot_client", side_effect=AssertionError("backtest should use dataset only")),
patch.object(
cli.backtest_service,
"run_backtest",
return_value={"summary": {"total_return_pct": 5.0, "win_rate": 0.6}, "trades": []},
) as backtest_mock,
patch.object(
cli,
"print_output",
side_effect=lambda payload, **kwargs: captured.update({"payload": payload, "agent": kwargs["agent"]}),
),
):
result = cli.main(
[
"backtest",
"/tmp/dataset.json",
"--initial-cash",
"5000",
"--max-positions",
"3",
"--position-size-pct",
"20",
"--commission-pct",
"0.1",
"--lookback",
"12",
"--agent",
]
)
self.assertEqual(result, 0)
self.assertEqual(captured["payload"]["summary"]["total_return_pct"], 5.0)
self.assertTrue(captured["agent"])
backtest_mock.assert_called_once_with(
config,
dataset_path="/tmp/dataset.json",
initial_cash=5000.0,
max_positions=3,
position_size_pct=0.2,
commission_pct=0.001,
lookback=12,
decision_interval_minutes=None,
)
def test_opportunity_optimize_dispatches_without_private_client(self):
captured = {}
config = {"market": {"default_quote": "USDT"}, "opportunity": {}}
with (
patch.object(cli, "load_config", return_value=config),
patch.object(cli, "_load_spot_client", side_effect=AssertionError("optimize should use dataset only")),
patch.object(
cli.opportunity_evaluation_service,
"optimize_opportunity_model",
return_value={"best": {"summary": {"accuracy": 0.7}}},
) as optimize_mock,
patch.object(
cli,
"print_output",
side_effect=lambda payload, **kwargs: captured.update({"payload": payload, "agent": kwargs["agent"]}),
),
):
result = cli.main(
[
"opportunity",
"optimize",
"/tmp/dataset.json",
"--horizon-hours",
"6",
"--take-profit-pct",
"2",
"--stop-loss-pct",
"1.5",
"--setup-target-pct",
"1",
"--lookback",
"24",
"--top-n",
"3",
"--passes",
"1",
"--agent",
]
)
self.assertEqual(result, 0)
self.assertEqual(captured["payload"]["best"]["summary"]["accuracy"], 0.7)
self.assertTrue(captured["agent"])
optimize_mock.assert_called_once_with(
config,
dataset_path="/tmp/dataset.json",
horizon_hours=6.0,
take_profit=0.02,
stop_loss=0.015,
setup_target=0.01,
lookback=24,
top_n=3,
passes=1,
)

View File

@@ -1,280 +0,0 @@
"""Opportunity dataset collection tests."""
from __future__ import annotations
import json
import tempfile
import unittest
from datetime import datetime, timezone
from pathlib import Path
from coinhunter.services import (
opportunity_dataset_service,
opportunity_evaluation_service,
)
class OpportunityDatasetServiceTestCase(unittest.TestCase):
def test_default_plan_uses_widest_scan_reference_window(self):
config = {"opportunity": {"lookback_intervals": ["1h", "4h", "1d"]}}
plan = opportunity_dataset_service.build_dataset_plan(
config,
now=datetime(2026, 4, 21, tzinfo=timezone.utc),
)
self.assertEqual(plan.kline_limit, 48)
self.assertEqual(plan.reference_days, 48.0)
self.assertEqual(plan.simulate_days, 7.0)
self.assertEqual(plan.run_days, 7.0)
self.assertEqual(plan.total_days, 62.0)
def test_collect_dataset_writes_klines_and_probe_metadata(self):
config = {
"binance": {"spot_base_url": "https://api.binance.test"},
"market": {"default_quote": "USDT"},
"opportunity": {
"lookback_intervals": ["1d"],
"kline_limit": 2,
"simulate_days": 1,
"run_days": 1,
"auto_research": True,
"research_provider": "coingecko",
},
}
def fake_http_get(url, headers, timeout):
query = opportunity_dataset_service.parse_query(url)
interval_seconds = 86400
start = int(query["startTime"])
end = int(query["endTime"])
rows = []
cursor = start
index = 0
while cursor <= end:
close = 100 + index
rows.append([cursor, close - 1, close + 1, close - 2, close, 10, cursor + interval_seconds * 1000 - 1, close * 10])
cursor += interval_seconds * 1000
index += 1
return rows
def fake_http_status(url, headers, timeout):
return 200, "{}"
with tempfile.TemporaryDirectory() as tmpdir:
output = Path(tmpdir) / "dataset.json"
payload = opportunity_dataset_service.collect_opportunity_dataset(
config,
symbols=["BTCUSDT"],
output_path=str(output),
http_get=fake_http_get,
http_status=fake_http_status,
now=datetime(2026, 4, 21, tzinfo=timezone.utc),
)
dataset = json.loads(output.read_text(encoding="utf-8"))
self.assertEqual(payload["plan"]["reference_days"], 2.0)
self.assertEqual(payload["plan"]["total_days"], 4.0)
self.assertEqual(payload["external_history"]["status"], "available")
self.assertEqual(payload["counts"]["BTCUSDT"]["1d"], 5)
self.assertEqual(len(dataset["klines"]["BTCUSDT"]["1d"]), 5)
class OpportunityEvaluationServiceTestCase(unittest.TestCase):
def _rows(self, closes):
start = int(datetime(2026, 4, 20, tzinfo=timezone.utc).timestamp() * 1000)
rows = []
for index, close in enumerate(closes):
open_time = start + index * 60 * 60 * 1000
rows.append(
[
open_time,
close * 0.995,
close * 1.01,
close * 0.995,
close,
100 + index * 10,
open_time + 60 * 60 * 1000 - 1,
close * (100 + index * 10),
]
)
return rows
def test_evaluate_dataset_counts_walk_forward_accuracy(self):
good = [
100,
105,
98,
106,
99,
107,
100,
106,
101,
105,
102,
104,
102.5,
103,
102.8,
103.2,
103.0,
103.4,
103.1,
103.6,
103.3,
103.8,
104.2,
106,
108.5,
109,
]
weak = [
100,
99,
98,
97,
96,
95,
94,
93,
92,
91,
90,
89,
88,
87,
86,
85,
84,
83,
82,
81,
80,
79,
78,
77,
76,
75,
]
good_rows = self._rows(good)
weak_rows = self._rows(weak)
simulation_start = datetime.fromtimestamp(good_rows[23][0] / 1000, tz=timezone.utc)
simulation_end = datetime.fromtimestamp(good_rows[24][0] / 1000, tz=timezone.utc)
dataset = {
"metadata": {
"symbols": ["GOODUSDT", "WEAKUSDT"],
"plan": {
"intervals": ["1h"],
"simulate_days": 1 / 12,
"simulation_start": simulation_start.isoformat().replace("+00:00", "Z"),
"simulation_end": simulation_end.isoformat().replace("+00:00", "Z"),
},
},
"klines": {
"GOODUSDT": {"1h": good_rows},
"WEAKUSDT": {"1h": weak_rows},
},
}
config = {
"signal": {"lookback_interval": "1h"},
"opportunity": {
"top_n": 2,
"min_quote_volume": 0.0,
"entry_threshold": 1.5,
"watch_threshold": 0.6,
"min_trigger_score": 0.45,
"min_setup_score": 0.35,
},
}
with tempfile.TemporaryDirectory() as tmpdir:
path = Path(tmpdir) / "dataset.json"
path.write_text(json.dumps(dataset), encoding="utf-8")
result = opportunity_evaluation_service.evaluate_opportunity_dataset(
config,
dataset_path=str(path),
take_profit=0.02,
stop_loss=0.015,
setup_target=0.01,
max_examples=2,
)
self.assertEqual(result["summary"]["count"], 2)
self.assertEqual(result["summary"]["correct"], 2)
self.assertEqual(result["summary"]["accuracy"], 1.0)
self.assertEqual(result["by_action"]["entry"]["correct"], 1)
self.assertEqual(result["trade_simulation"]["wins"], 1)
def test_optimize_model_reports_recommended_weights(self):
rows = self._rows(
[
100,
105,
98,
106,
99,
107,
100,
106,
101,
105,
102,
104,
102.5,
103,
102.8,
103.2,
103.0,
103.4,
103.1,
103.6,
103.3,
103.8,
104.2,
106,
108.5,
109,
]
)
simulation_start = datetime.fromtimestamp(rows[23][0] / 1000, tz=timezone.utc)
simulation_end = datetime.fromtimestamp(rows[24][0] / 1000, tz=timezone.utc)
dataset = {
"metadata": {
"symbols": ["GOODUSDT"],
"plan": {
"intervals": ["1h"],
"simulate_days": 1 / 12,
"simulation_start": simulation_start.isoformat().replace("+00:00", "Z"),
"simulation_end": simulation_end.isoformat().replace("+00:00", "Z"),
},
},
"klines": {"GOODUSDT": {"1h": rows}},
}
config = {
"signal": {"lookback_interval": "1h"},
"opportunity": {
"top_n": 1,
"min_quote_volume": 0.0,
"entry_threshold": 1.5,
"watch_threshold": 0.6,
"min_trigger_score": 0.45,
"min_setup_score": 0.35,
},
}
with tempfile.TemporaryDirectory() as tmpdir:
path = Path(tmpdir) / "dataset.json"
path.write_text(json.dumps(dataset), encoding="utf-8")
result = opportunity_evaluation_service.optimize_opportunity_model(
config,
dataset_path=str(path),
passes=1,
take_profit=0.02,
stop_loss=0.015,
setup_target=0.01,
)
self.assertIn("baseline", result)
self.assertIn("best", result)
self.assertIn("opportunity.model_weights.trigger", result["recommended_config"])
self.assertEqual(result["search"]["optimized"], "model_weights_only")

View File

@@ -1,90 +0,0 @@
"""Opportunity historical evaluation tests."""
from __future__ import annotations
import json
import tempfile
import unittest
from pathlib import Path
from coinhunter.services import opportunity_evaluation_service
def _rows(start_ms: int, closes: list[float]) -> list[list[float]]:
rows = []
for index, close in enumerate(closes):
open_time = start_ms + index * 3_600_000
volume = 1_000 + index * 10
rows.append(
[
float(open_time),
close * 0.99,
close * 1.02,
close * 0.98,
close,
float(volume),
float(open_time + 3_599_999),
close * volume,
]
)
return rows
class OpportunityEvaluationServiceTestCase(unittest.TestCase):
def test_evaluate_opportunity_dataset_scores_historical_samples(self):
start_ms = 1_767_225_600_000
dataset = {
"metadata": {
"plan": {
"intervals": ["1h"],
"simulation_start": "2026-01-01T04:00:00Z",
"simulation_end": "2026-01-01T07:00:00Z",
"simulate_days": 1,
}
},
"klines": {
"GOODUSDT": {"1h": _rows(start_ms, [100, 101, 102, 103, 104, 106, 108, 109, 110])},
"BADUSDT": {"1h": _rows(start_ms, [100, 99, 98, 97, 96, 95, 94, 93, 92])},
},
}
config = {
"market": {"default_quote": "USDT"},
"opportunity": {
"entry_threshold": 1.5,
"watch_threshold": 0.6,
"evaluation_horizon_hours": 2.0,
"evaluation_take_profit_pct": 1.0,
"evaluation_stop_loss_pct": 2.0,
"evaluation_setup_target_pct": 0.5,
"evaluation_lookback": 4,
"top_n": 2,
},
}
with tempfile.TemporaryDirectory() as tmp_dir:
dataset_path = Path(tmp_dir) / "opportunity-dataset.json"
dataset_path.write_text(json.dumps(dataset), encoding="utf-8")
payload = opportunity_evaluation_service.evaluate_opportunity_dataset(
config,
dataset_path=str(dataset_path),
horizon_hours=2.0,
take_profit=0.01,
stop_loss=0.02,
setup_target=0.005,
lookback=4,
top_n=2,
max_examples=3,
)
self.assertEqual(payload["summary"]["symbols"], ["BADUSDT", "GOODUSDT"])
self.assertEqual(payload["summary"]["interval"], "1h")
self.assertGreater(payload["summary"]["count"], 0)
self.assertIn("by_action", payload)
self.assertIn("trade_simulation", payload)
self.assertEqual(payload["rules"]["research_mode"], "disabled: dataset has no point-in-time research snapshots")
self.assertLessEqual(len(payload["examples"]), 3)
if __name__ == "__main__":
unittest.main()

View File

@@ -8,7 +8,6 @@ from unittest.mock import patch
from coinhunter.services import (
opportunity_service,
portfolio_service,
research_service,
signal_service,
)
@@ -258,37 +257,6 @@ class OpportunityServiceTestCase(unittest.TestCase):
"entry_threshold": 1.5,
"watch_threshold": 0.6,
"overlap_penalty": 0.6,
"auto_research": False,
"research_provider": "coingecko",
"research_timeout_seconds": 4.0,
"risk_limits": {
"min_liquidity": 0.0,
"max_overextension": 0.08,
"max_downside_risk": 0.3,
"max_unlock_risk": 0.75,
"max_regulatory_risk": 0.75,
"min_quality_for_add": 0.0,
},
"weights": {
"trend": 1.0,
"momentum": 1.0,
"breakout": 0.8,
"pullback": 0.4,
"volume": 0.7,
"liquidity": 0.3,
"trend_alignment": 0.8,
"fundamental": 0.8,
"tokenomics": 0.7,
"catalyst": 0.5,
"adoption": 0.4,
"smart_money": 0.3,
"volatility_penalty": 0.5,
"overextension_penalty": 0.7,
"downside_penalty": 0.5,
"unlock_penalty": 0.8,
"regulatory_penalty": 0.4,
"position_concentration_penalty": 0.6,
},
},
"portfolio": {
"add_threshold": 1.5,
@@ -351,40 +319,6 @@ class OpportunityServiceTestCase(unittest.TestCase):
self.assertEqual(score, 0.0)
self.assertEqual(metrics["trend"], 0.0)
def test_scan_uses_automatic_external_research(self):
config = self.config | {
"opportunity": self.config["opportunity"]
| {
"auto_research": True,
"top_n": 2,
}
}
with (
patch.object(opportunity_service, "audit_event", return_value=None),
patch.object(
opportunity_service,
"get_external_research",
return_value={
"SOLUSDT": {
"fundamental": 0.9,
"tokenomics": 0.8,
"catalyst": 0.9,
"adoption": 0.8,
"smart_money": 0.7,
"unlock_risk": 0.1,
"regulatory_risk": 0.1,
"research_confidence": 0.9,
}
},
) as research_mock,
):
payload = opportunity_service.scan_opportunities(config, spot_client=FakeSpotClient())
research_mock.assert_called_once()
sol = next(item for item in payload["recommendations"] if item["symbol"] == "SOLUSDT")
self.assertEqual(sol["metrics"]["fundamental"], 0.9)
self.assertEqual(sol["metrics"]["research_confidence"], 0.9)
def test_weak_setup_and_trigger_becomes_avoid(self):
metrics = {
"extension_penalty": 0.0,
@@ -409,28 +343,18 @@ class OpportunityServiceTestCase(unittest.TestCase):
self.assertIn("setup, trigger, or overall quality is too weak", reasons[0])
self.assertEqual(confidence, 50)
class ResearchServiceTestCase(unittest.TestCase):
def test_coingecko_market_data_becomes_research_signals(self):
signals = research_service._coingecko_market_to_signals(
{
"id": "solana",
"symbol": "sol",
"market_cap": 80_000_000_000,
"fully_diluted_valuation": 95_000_000_000,
"total_volume": 5_000_000_000,
"market_cap_rank": 6,
"circulating_supply": 550_000_000,
"total_supply": 600_000_000,
"max_supply": None,
"price_change_percentage_7d_in_currency": 12.0,
"price_change_percentage_30d_in_currency": 35.0,
"price_change_percentage_200d_in_currency": 80.0,
},
is_trending=True,
)
self.assertGreater(signals["fundamental"], 0.6)
self.assertGreater(signals["tokenomics"], 0.8)
self.assertGreater(signals["catalyst"], 0.6)
self.assertLess(signals["unlock_risk"], 0.2)
def test_watch_flags_anomalies(self):
config = self.config | {
"watch": {
"alert_drawdown_1h_pct": -5.0,
"alert_drawdown_24h_pct": -10.0,
"alert_spike_1h_pct": 8.0,
"max_position_weight": 0.5,
}
}
with patch.object(portfolio_service, "audit_event", return_value=None):
payload = portfolio_service.watch_portfolio(config, spot_client=FakeSpotClient())
# FakeSpotClient BTC is +5% 24h, ETH is +3% — both should be healthy
self.assertGreaterEqual(payload["healthy_count"], 1)
for result in payload["watch_results"]:
self.assertIn(result["status"], {"healthy", "need_review"})

View File

@@ -1,100 +0,0 @@
"""Tests for strategy_service."""
from __future__ import annotations
import unittest
from typing import Any
from unittest import mock
from unittest.mock import MagicMock
from coinhunter.services import strategy_service
class StrategyServiceTestCase(unittest.TestCase):
def _klines(self, closes: list[float], volumes: list[float] | None = None) -> list[list[float]]:
volumes = volumes or [1.0] * len(closes)
return [
[i * 3600000.0, c * 0.98, c * 1.02, c * 0.97, c, v, 0.0, c * v, 100, 0.0, 0.0, 0.0]
for i, (c, v) in enumerate(zip(closes, volumes))
]
def _config(self) -> dict[str, Any]:
return {
"opportunity": {
"entry_threshold": 1.5,
"watch_threshold": 0.6,
"min_trigger_score": 0.45,
"min_setup_score": 0.35,
"overlap_penalty": 0.6,
"top_n": 10,
"scan_limit": 50,
"kline_limit": 48,
"weights": {},
"model_weights": {},
},
"portfolio": {
"add_threshold": 1.5,
"hold_threshold": 0.6,
"trim_threshold": 0.2,
"exit_threshold": -0.2,
"max_position_weight": 0.6,
},
"signal": {
"lookback_interval": "1h",
},
"market": {
"default_quote": "USDT",
},
}
def test_generate_signals_from_klines_buy_when_entry_and_not_held(self) -> None:
config = self._config()
closes = list(range(20, 40))
klines = {"BTCUSDT": self._klines(closes)}
result = strategy_service.generate_signals_from_klines(config, klines_by_symbol=klines, held_positions=[])
self.assertIn("buy", result)
self.assertIn("sell", result)
self.assertIn("hold", result)
def test_generate_signals_from_klines_sell_when_exit_signal(self) -> None:
config = self._config()
closes = list(range(40, 20, -1))
klines = {"BTCUSDT": self._klines(closes)}
held = [{"symbol": "BTCUSDT", "notional_usdt": 1000.0}]
result = strategy_service.generate_signals_from_klines(config, klines_by_symbol=klines, held_positions=held)
symbols = [s["symbol"] for s in result["sell"]]
self.assertIn("BTCUSDT", symbols)
def test_generate_signals_respects_max_position_weight(self) -> None:
config = self._config()
config["portfolio"]["max_position_weight"] = 0.01
closes = list(range(20, 40))
klines = {"BTCUSDT": self._klines(closes)}
held = [{"symbol": "BTCUSDT", "notional_usdt": 9999.0}]
result = strategy_service.generate_signals_from_klines(config, klines_by_symbol=klines, held_positions=held)
buy_symbols = [s["symbol"] for s in result["buy"]]
self.assertNotIn("BTCUSDT", buy_symbols)
@mock.patch("coinhunter.services.portfolio_service.audit_event")
@mock.patch("coinhunter.services.opportunity_service.audit_event")
def test_generate_trade_signals_dispatches_to_services(self, mock_audit_opp, mock_audit_pf) -> None:
mock_client = MagicMock()
mock_client.klines.return_value = self._klines(list(range(20, 44)))
mock_client.ticker_stats.return_value = [
{
"symbol": "BTCUSDT",
"lastPrice": "30.0",
"priceChangePercent": "5.0",
"quoteVolume": "1000000",
"highPrice": "31.0",
"lowPrice": "29.0",
}
]
mock_client.account.return_value = {"balances": [{"asset": "BTC", "free": "0.5", "locked": "0.0"}]}
mock_client.exchange_info.return_value = {"symbols": [{"symbol": "BTCUSDT", "status": "TRADING"}]}
config = self._config()
result = strategy_service.generate_trade_signals(config, spot_client=mock_client)
self.assertIn("buy", result)
self.assertIn("sell", result)
self.assertIn("hold", result)