Files
coinhunter-cli/tests/test_opportunity_dataset_service.py
TacitLab 003212de99 refactor: simplify opportunity actions to entry/watch/avoid with confidence
- Remove dead scoring code (_score_candidate, _action_for, etc.) and
  align action decisions directly with score_opportunity_signal metrics.
- Reduce action surface from trigger/setup/chase/skip to entry/watch/avoid.
- Add confidence field (0..100) mapped from edge_score.
- Update evaluate/optimize ground-truth mapping and tests.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-22 01:08:34 +08:00

281 lines
8.8 KiB
Python

"""Opportunity dataset collection tests."""
from __future__ import annotations
import json
import tempfile
import unittest
from datetime import datetime, timezone
from pathlib import Path
from coinhunter.services import (
opportunity_dataset_service,
opportunity_evaluation_service,
)
class OpportunityDatasetServiceTestCase(unittest.TestCase):
def test_default_plan_uses_widest_scan_reference_window(self):
config = {"opportunity": {"lookback_intervals": ["1h", "4h", "1d"]}}
plan = opportunity_dataset_service.build_dataset_plan(
config,
now=datetime(2026, 4, 21, tzinfo=timezone.utc),
)
self.assertEqual(plan.kline_limit, 48)
self.assertEqual(plan.reference_days, 48.0)
self.assertEqual(plan.simulate_days, 7.0)
self.assertEqual(plan.run_days, 7.0)
self.assertEqual(plan.total_days, 62.0)
def test_collect_dataset_writes_klines_and_probe_metadata(self):
config = {
"binance": {"spot_base_url": "https://api.binance.test"},
"market": {"default_quote": "USDT"},
"opportunity": {
"lookback_intervals": ["1d"],
"kline_limit": 2,
"simulate_days": 1,
"run_days": 1,
"auto_research": True,
"research_provider": "coingecko",
},
}
def fake_http_get(url, headers, timeout):
query = opportunity_dataset_service.parse_query(url)
interval_seconds = 86400
start = int(query["startTime"])
end = int(query["endTime"])
rows = []
cursor = start
index = 0
while cursor <= end:
close = 100 + index
rows.append([cursor, close - 1, close + 1, close - 2, close, 10, cursor + interval_seconds * 1000 - 1, close * 10])
cursor += interval_seconds * 1000
index += 1
return rows
def fake_http_status(url, headers, timeout):
return 200, "{}"
with tempfile.TemporaryDirectory() as tmpdir:
output = Path(tmpdir) / "dataset.json"
payload = opportunity_dataset_service.collect_opportunity_dataset(
config,
symbols=["BTCUSDT"],
output_path=str(output),
http_get=fake_http_get,
http_status=fake_http_status,
now=datetime(2026, 4, 21, tzinfo=timezone.utc),
)
dataset = json.loads(output.read_text(encoding="utf-8"))
self.assertEqual(payload["plan"]["reference_days"], 2.0)
self.assertEqual(payload["plan"]["total_days"], 4.0)
self.assertEqual(payload["external_history"]["status"], "available")
self.assertEqual(payload["counts"]["BTCUSDT"]["1d"], 5)
self.assertEqual(len(dataset["klines"]["BTCUSDT"]["1d"]), 5)
class OpportunityEvaluationServiceTestCase(unittest.TestCase):
def _rows(self, closes):
start = int(datetime(2026, 4, 20, tzinfo=timezone.utc).timestamp() * 1000)
rows = []
for index, close in enumerate(closes):
open_time = start + index * 60 * 60 * 1000
rows.append(
[
open_time,
close * 0.995,
close * 1.01,
close * 0.995,
close,
100 + index * 10,
open_time + 60 * 60 * 1000 - 1,
close * (100 + index * 10),
]
)
return rows
def test_evaluate_dataset_counts_walk_forward_accuracy(self):
good = [
100,
105,
98,
106,
99,
107,
100,
106,
101,
105,
102,
104,
102.5,
103,
102.8,
103.2,
103.0,
103.4,
103.1,
103.6,
103.3,
103.8,
104.2,
106,
108.5,
109,
]
weak = [
100,
99,
98,
97,
96,
95,
94,
93,
92,
91,
90,
89,
88,
87,
86,
85,
84,
83,
82,
81,
80,
79,
78,
77,
76,
75,
]
good_rows = self._rows(good)
weak_rows = self._rows(weak)
simulation_start = datetime.fromtimestamp(good_rows[23][0] / 1000, tz=timezone.utc)
simulation_end = datetime.fromtimestamp(good_rows[24][0] / 1000, tz=timezone.utc)
dataset = {
"metadata": {
"symbols": ["GOODUSDT", "WEAKUSDT"],
"plan": {
"intervals": ["1h"],
"simulate_days": 1 / 12,
"simulation_start": simulation_start.isoformat().replace("+00:00", "Z"),
"simulation_end": simulation_end.isoformat().replace("+00:00", "Z"),
},
},
"klines": {
"GOODUSDT": {"1h": good_rows},
"WEAKUSDT": {"1h": weak_rows},
},
}
config = {
"signal": {"lookback_interval": "1h"},
"opportunity": {
"top_n": 2,
"min_quote_volume": 0.0,
"entry_threshold": 1.5,
"watch_threshold": 0.6,
"min_trigger_score": 0.45,
"min_setup_score": 0.35,
},
}
with tempfile.TemporaryDirectory() as tmpdir:
path = Path(tmpdir) / "dataset.json"
path.write_text(json.dumps(dataset), encoding="utf-8")
result = opportunity_evaluation_service.evaluate_opportunity_dataset(
config,
dataset_path=str(path),
take_profit=0.02,
stop_loss=0.015,
setup_target=0.01,
max_examples=2,
)
self.assertEqual(result["summary"]["count"], 2)
self.assertEqual(result["summary"]["correct"], 2)
self.assertEqual(result["summary"]["accuracy"], 1.0)
self.assertEqual(result["by_action"]["entry"]["correct"], 1)
self.assertEqual(result["trade_simulation"]["wins"], 1)
def test_optimize_model_reports_recommended_weights(self):
rows = self._rows(
[
100,
105,
98,
106,
99,
107,
100,
106,
101,
105,
102,
104,
102.5,
103,
102.8,
103.2,
103.0,
103.4,
103.1,
103.6,
103.3,
103.8,
104.2,
106,
108.5,
109,
]
)
simulation_start = datetime.fromtimestamp(rows[23][0] / 1000, tz=timezone.utc)
simulation_end = datetime.fromtimestamp(rows[24][0] / 1000, tz=timezone.utc)
dataset = {
"metadata": {
"symbols": ["GOODUSDT"],
"plan": {
"intervals": ["1h"],
"simulate_days": 1 / 12,
"simulation_start": simulation_start.isoformat().replace("+00:00", "Z"),
"simulation_end": simulation_end.isoformat().replace("+00:00", "Z"),
},
},
"klines": {"GOODUSDT": {"1h": rows}},
}
config = {
"signal": {"lookback_interval": "1h"},
"opportunity": {
"top_n": 1,
"min_quote_volume": 0.0,
"entry_threshold": 1.5,
"watch_threshold": 0.6,
"min_trigger_score": 0.45,
"min_setup_score": 0.35,
},
}
with tempfile.TemporaryDirectory() as tmpdir:
path = Path(tmpdir) / "dataset.json"
path.write_text(json.dumps(dataset), encoding="utf-8")
result = opportunity_evaluation_service.optimize_opportunity_model(
config,
dataset_path=str(path),
passes=1,
take_profit=0.02,
stop_loss=0.015,
setup_target=0.01,
)
self.assertIn("baseline", result)
self.assertIn("best", result)
self.assertIn("opportunity.model_weights.trigger", result["recommended_config"])
self.assertEqual(result["search"]["optimized"], "model_weights_only")