refactor: address high-priority debt and publish to PyPI

- Fix TOCTOU race conditions by wrapping read-modify-write cycles
  under single-file locks in execution_state, portfolio_service,
  precheck_state, state_manager, and precheck_service.
- Add missing test coverage (96 tests total):
  - test_review_service.py (15 tests)
  - test_check_api.py (6 tests)
  - test_external_gate.py main branches (+10 tests)
  - test_trade_execution.py new commands (+8 tests)
- Unify all agent-consumed JSON messages to English.
- Config-ize hardcoded values (volume filter, schema_version) via
  get_user_config with sensible defaults.
- Add 1-hour TTL to exchange cache with force_new override.
- Add ruff and mypy to dev dependencies; fix all type errors.
- Add __all__ declarations to 11 service modules.
- Sync README with new commands, config tuning docs, and PyPI badge.
- Publish package as coinhunter==1.0.0 on PyPI with MIT license.
- Deprecate coinhunter-cli==1.0.1 with runtime warning.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-16 01:21:27 +08:00
parent 01bb54dee5
commit 62c40a9776
53 changed files with 2338 additions and 671 deletions

0
tests/__init__.py Normal file
View File

70
tests/test_check_api.py Normal file
View File

@@ -0,0 +1,70 @@
"""Tests for check_api command."""
import json
from unittest.mock import MagicMock, patch
from coinhunter.commands import check_api
class TestMain:
def test_missing_api_key(self, monkeypatch, capsys):
monkeypatch.setenv("BINANCE_API_KEY", "")
monkeypatch.setenv("BINANCE_API_SECRET", "secret")
rc = check_api.main()
assert rc == 1
out = json.loads(capsys.readouterr().out)
assert out["ok"] is False
assert "BINANCE_API_KEY" in out["error"]
def test_missing_api_secret(self, monkeypatch, capsys):
monkeypatch.setenv("BINANCE_API_KEY", "key")
monkeypatch.setenv("BINANCE_API_SECRET", "")
rc = check_api.main()
assert rc == 1
out = json.loads(capsys.readouterr().out)
assert out["ok"] is False
assert "BINANCE_API_SECRET" in out["error"]
def test_placholder_api_key(self, monkeypatch, capsys):
monkeypatch.setenv("BINANCE_API_KEY", "your_api_key")
monkeypatch.setenv("BINANCE_API_SECRET", "secret")
rc = check_api.main()
assert rc == 1
def test_balance_fetch_failure(self, monkeypatch, capsys):
monkeypatch.setenv("BINANCE_API_KEY", "key")
monkeypatch.setenv("BINANCE_API_SECRET", "secret")
mock_ex = MagicMock()
mock_ex.fetch_balance.side_effect = Exception("Network error")
with patch("coinhunter.commands.check_api.ccxt.binance", return_value=mock_ex):
rc = check_api.main()
assert rc == 1
out = json.loads(capsys.readouterr().out)
assert "Failed to connect" in out["error"]
def test_success_with_spot_trading(self, monkeypatch, capsys):
monkeypatch.setenv("BINANCE_API_KEY", "key")
monkeypatch.setenv("BINANCE_API_SECRET", "secret")
mock_ex = MagicMock()
mock_ex.fetch_balance.return_value = {"USDT": 100.0}
mock_ex.sapi_get_account_api_restrictions.return_value = {"enableSpotTrading": True}
with patch("coinhunter.commands.check_api.ccxt.binance", return_value=mock_ex):
rc = check_api.main()
assert rc == 0
out = json.loads(capsys.readouterr().out)
assert out["ok"] is True
assert out["read_permission"] is True
assert out["spot_trading_enabled"] is True
def test_success_restrictions_query_fails(self, monkeypatch, capsys):
monkeypatch.setenv("BINANCE_API_KEY", "key")
monkeypatch.setenv("BINANCE_API_SECRET", "secret")
mock_ex = MagicMock()
mock_ex.fetch_balance.return_value = {"USDT": 100.0}
mock_ex.sapi_get_account_api_restrictions.side_effect = Exception("no permission")
with patch("coinhunter.commands.check_api.ccxt.binance", return_value=mock_ex):
rc = check_api.main()
assert rc == 0
out = json.loads(capsys.readouterr().out)
assert out["spot_trading_enabled"] is None
assert "may be null" in out["note"]

58
tests/test_cli.py Normal file
View File

@@ -0,0 +1,58 @@
"""Tests for CLI routing and parser behavior."""
import pytest
from coinhunter.cli import ALIASES, MODULE_MAP, build_parser, run_python_module
class TestAliases:
def test_all_aliases_resolve_to_canonical(self):
for alias, canonical in ALIASES.items():
assert canonical in MODULE_MAP, f"alias {alias!r} points to missing canonical {canonical!r}"
def test_no_alias_is_itself_an_alias_loop(self):
for alias in ALIASES:
assert alias not in ALIASES.values() or alias in MODULE_MAP
class TestModuleMap:
def test_all_modules_exist(self):
import importlib
for command, module_name in MODULE_MAP.items():
full = f"coinhunter.{module_name}"
mod = importlib.import_module(full)
assert hasattr(mod, "main"), f"{full} missing main()"
class TestBuildParser:
def test_help_includes_commands(self):
parser = build_parser()
help_text = parser.format_help()
assert "coinhunter diag" in help_text
assert "coinhunter exec" in help_text
def test_parses_command_and_args(self):
parser = build_parser()
ns = parser.parse_args(["exec", "bal"])
assert ns.command == "exec"
assert "bal" in ns.args
def test_version_action_exits(self):
parser = build_parser()
with pytest.raises(SystemExit) as exc:
parser.parse_args(["--version"])
assert exc.value.code == 0
class TestRunPythonModule:
def test_runs_module_main_and_returns_int(self):
result = run_python_module("commands.paths", [], "coinhunter paths")
assert result == 0
def test_mutates_sys_argv(self):
import sys
original = sys.argv[:]
run_python_module("commands.paths", ["--help"], "coinhunter paths")
assert sys.argv == original

View File

@@ -0,0 +1,98 @@
"""Tests for exchange helpers and caching."""
import pytest
from coinhunter.services import exchange_service as es
class TestNormSymbol:
def test_already_normalized(self):
assert es.norm_symbol("BTC/USDT") == "BTC/USDT"
def test_raw_symbol(self):
assert es.norm_symbol("BTCUSDT") == "BTC/USDT"
def test_lowercase(self):
assert es.norm_symbol("btcusdt") == "BTC/USDT"
def test_dash_separator(self):
assert es.norm_symbol("BTC-USDT") == "BTC/USDT"
def test_underscore_separator(self):
assert es.norm_symbol("BTC_USDT") == "BTC/USDT"
def test_unsupported_symbol(self):
with pytest.raises(ValueError):
es.norm_symbol("BTC")
class TestStorageSymbol:
def test_converts_to_flat(self):
assert es.storage_symbol("BTC/USDT") == "BTCUSDT"
def test_raw_input(self):
assert es.storage_symbol("ETHUSDT") == "ETHUSDT"
class TestFloorToStep:
def test_no_step(self):
assert es.floor_to_step(1.234, 0) == 1.234
def test_floors_to_step(self):
assert es.floor_to_step(1.234, 0.1) == pytest.approx(1.2)
def test_exact_multiple(self):
assert es.floor_to_step(12, 1) == 12
class TestGetExchangeCaching:
def test_returns_cached_instance(self, monkeypatch):
monkeypatch.setenv("BINANCE_API_KEY", "test_key")
monkeypatch.setenv("BINANCE_API_SECRET", "test_secret")
class FakeEx:
def load_markets(self):
pass
# Reset cache and patch ccxt.binance
original_cache = es._exchange_cache
original_cached_at = es._exchange_cached_at
try:
es._exchange_cache = None
es._exchange_cached_at = None
monkeypatch.setattr(es.ccxt, "binance", lambda *a, **kw: FakeEx())
first = es.get_exchange()
second = es.get_exchange()
assert first is second
third = es.get_exchange(force_new=True)
assert third is not first
finally:
es._exchange_cache = original_cache
es._exchange_cached_at = original_cached_at
def test_cache_expires_after_ttl(self, monkeypatch):
monkeypatch.setenv("BINANCE_API_KEY", "test_key")
monkeypatch.setenv("BINANCE_API_SECRET", "test_secret")
class FakeEx:
def load_markets(self):
pass
original_cache = es._exchange_cache
original_cached_at = es._exchange_cached_at
try:
es._exchange_cache = None
es._exchange_cached_at = None
monkeypatch.setattr(es.ccxt, "binance", lambda *a, **kw: FakeEx())
monkeypatch.setattr(es, "load_env", lambda: None)
first = es.get_exchange()
# Simulate time passing beyond TTL
es._exchange_cached_at -= es.CACHE_TTL_SECONDS + 1
second = es.get_exchange()
assert first is not second
finally:
es._exchange_cache = original_cache
es._exchange_cached_at = original_cached_at

203
tests/test_external_gate.py Normal file
View File

@@ -0,0 +1,203 @@
"""Tests for external_gate configurable hook."""
import json
from unittest.mock import MagicMock, patch
from coinhunter.commands import external_gate
class TestResolveTriggerCommand:
def test_missing_config_means_disabled(self, tmp_path, monkeypatch):
monkeypatch.setenv("COINHUNTER_HOME", str(tmp_path))
paths = external_gate._paths()
# no config file exists
cmd = external_gate._resolve_trigger_command(paths)
assert cmd is None
def test_uses_explicit_list_from_config(self, tmp_path, monkeypatch):
monkeypatch.setenv("COINHUNTER_HOME", str(tmp_path))
paths = external_gate._paths()
paths.root.mkdir(parents=True, exist_ok=True)
paths.config_file.write_text(json.dumps({
"external_gate": {"trigger_command": ["my-scheduler", "run", "job-123"]}
}), encoding="utf-8")
cmd = external_gate._resolve_trigger_command(paths)
assert cmd == ["my-scheduler", "run", "job-123"]
def test_null_means_disabled(self, tmp_path, monkeypatch):
monkeypatch.setenv("COINHUNTER_HOME", str(tmp_path))
paths = external_gate._paths()
paths.root.mkdir(parents=True, exist_ok=True)
paths.config_file.write_text(json.dumps({
"external_gate": {"trigger_command": None}
}), encoding="utf-8")
cmd = external_gate._resolve_trigger_command(paths)
assert cmd is None
def test_empty_list_means_disabled(self, tmp_path, monkeypatch):
monkeypatch.setenv("COINHUNTER_HOME", str(tmp_path))
paths = external_gate._paths()
paths.root.mkdir(parents=True, exist_ok=True)
paths.config_file.write_text(json.dumps({
"external_gate": {"trigger_command": []}
}), encoding="utf-8")
cmd = external_gate._resolve_trigger_command(paths)
assert cmd is None
def test_string_gets_wrapped(self, tmp_path, monkeypatch):
monkeypatch.setenv("COINHUNTER_HOME", str(tmp_path))
paths = external_gate._paths()
paths.root.mkdir(parents=True, exist_ok=True)
paths.config_file.write_text(json.dumps({
"external_gate": {"trigger_command": "my-script.sh"}
}), encoding="utf-8")
cmd = external_gate._resolve_trigger_command(paths)
assert cmd == ["my-script.sh"]
def test_unexpected_type_returns_none_and_warns(self, tmp_path, monkeypatch, capsys):
monkeypatch.setenv("COINHUNTER_HOME", str(tmp_path))
paths = external_gate._paths()
paths.root.mkdir(parents=True, exist_ok=True)
paths.config_file.write_text(json.dumps({
"external_gate": {"trigger_command": 42}
}), encoding="utf-8")
cmd = external_gate._resolve_trigger_command(paths)
assert cmd is None
class TestMain:
def test_already_running(self, tmp_path, monkeypatch, capsys):
monkeypatch.setenv("COINHUNTER_HOME", str(tmp_path))
paths = external_gate._paths()
paths.state_dir.mkdir(parents=True, exist_ok=True)
# Acquire the lock in this process so main() sees it as busy
import fcntl
with open(paths.external_gate_lock, "w", encoding="utf-8") as f:
fcntl.flock(f.fileno(), fcntl.LOCK_EX)
rc = external_gate.main()
assert rc == 0
out = json.loads(capsys.readouterr().out)
assert out["reason"] == "already_running"
def test_precheck_failure(self, tmp_path, monkeypatch, capsys):
monkeypatch.setenv("COINHUNTER_HOME", str(tmp_path))
paths = external_gate._paths()
paths.root.mkdir(parents=True, exist_ok=True)
fake_result = MagicMock(returncode=1, stdout="err", stderr="")
with patch("coinhunter.commands.external_gate.run_cmd", return_value=fake_result):
rc = external_gate.main()
assert rc == 1
out = json.loads(capsys.readouterr().out)
assert out["reason"] == "precheck_failed"
def test_precheck_parse_error(self, tmp_path, monkeypatch, capsys):
monkeypatch.setenv("COINHUNTER_HOME", str(tmp_path))
paths = external_gate._paths()
paths.root.mkdir(parents=True, exist_ok=True)
fake_result = MagicMock(returncode=0, stdout="not-json", stderr="")
with patch("coinhunter.commands.external_gate.run_cmd", return_value=fake_result):
rc = external_gate.main()
assert rc == 1
out = json.loads(capsys.readouterr().out)
assert out["reason"] == "precheck_parse_error"
def test_precheck_not_ok(self, tmp_path, monkeypatch, capsys):
monkeypatch.setenv("COINHUNTER_HOME", str(tmp_path))
paths = external_gate._paths()
paths.root.mkdir(parents=True, exist_ok=True)
fake_result = MagicMock(returncode=0, stdout=json.dumps({"ok": False}), stderr="")
with patch("coinhunter.commands.external_gate.run_cmd", return_value=fake_result):
rc = external_gate.main()
assert rc == 1
out = json.loads(capsys.readouterr().out)
assert out["reason"] == "precheck_not_ok"
def test_no_trigger(self, tmp_path, monkeypatch, capsys):
monkeypatch.setenv("COINHUNTER_HOME", str(tmp_path))
paths = external_gate._paths()
paths.root.mkdir(parents=True, exist_ok=True)
fake_result = MagicMock(returncode=0, stdout=json.dumps({"ok": True, "should_analyze": False}), stderr="")
with patch("coinhunter.commands.external_gate.run_cmd", return_value=fake_result):
rc = external_gate.main()
assert rc == 0
out = json.loads(capsys.readouterr().out)
assert out["reason"] == "no_trigger"
def test_already_queued(self, tmp_path, monkeypatch, capsys):
monkeypatch.setenv("COINHUNTER_HOME", str(tmp_path))
paths = external_gate._paths()
paths.root.mkdir(parents=True, exist_ok=True)
fake_result = MagicMock(
returncode=0,
stdout=json.dumps({"ok": True, "should_analyze": True, "run_requested": True, "run_requested_at": "2024-01-01T00:00:00Z"}),
stderr="",
)
with patch("coinhunter.commands.external_gate.run_cmd", return_value=fake_result):
rc = external_gate.main()
assert rc == 0
out = json.loads(capsys.readouterr().out)
assert out["reason"] == "already_queued"
def test_trigger_disabled(self, tmp_path, monkeypatch, capsys):
monkeypatch.setenv("COINHUNTER_HOME", str(tmp_path))
paths = external_gate._paths()
paths.root.mkdir(parents=True, exist_ok=True)
paths.config_file.write_text(json.dumps({"external_gate": {"trigger_command": None}}), encoding="utf-8")
# First call is precheck, second is mark-run-requested
responses = [
MagicMock(returncode=0, stdout=json.dumps({"ok": True, "should_analyze": True}), stderr=""),
MagicMock(returncode=0, stdout=json.dumps({"ok": True}), stderr=""),
]
with patch("coinhunter.commands.external_gate.run_cmd", side_effect=responses):
rc = external_gate.main()
assert rc == 0
out = json.loads(capsys.readouterr().out)
assert out["reason"] == "trigger_disabled"
def test_trigger_success(self, tmp_path, monkeypatch, capsys):
monkeypatch.setenv("COINHUNTER_HOME", str(tmp_path))
paths = external_gate._paths()
paths.root.mkdir(parents=True, exist_ok=True)
paths.config_file.write_text(json.dumps({"external_gate": {"trigger_command": ["echo", "ok"]}}), encoding="utf-8")
responses = [
MagicMock(returncode=0, stdout=json.dumps({"ok": True, "should_analyze": True, "reasons": ["price-move"]}), stderr=""),
MagicMock(returncode=0, stdout=json.dumps({"ok": True}), stderr=""),
MagicMock(returncode=0, stdout="triggered", stderr=""),
]
with patch("coinhunter.commands.external_gate.run_cmd", side_effect=responses):
rc = external_gate.main()
assert rc == 0
out = json.loads(capsys.readouterr().out)
assert out["triggered"] is True
assert out["reason"] == "price-move"
def test_trigger_command_failure(self, tmp_path, monkeypatch, capsys):
monkeypatch.setenv("COINHUNTER_HOME", str(tmp_path))
paths = external_gate._paths()
paths.root.mkdir(parents=True, exist_ok=True)
paths.config_file.write_text(json.dumps({"external_gate": {"trigger_command": ["false"]}}), encoding="utf-8")
responses = [
MagicMock(returncode=0, stdout=json.dumps({"ok": True, "should_analyze": True}), stderr=""),
MagicMock(returncode=0, stdout=json.dumps({"ok": True}), stderr=""),
MagicMock(returncode=1, stdout="", stderr="fail"),
]
with patch("coinhunter.commands.external_gate.run_cmd", side_effect=responses):
rc = external_gate.main()
assert rc == 1
out = json.loads(capsys.readouterr().out)
assert out["reason"] == "trigger_failed"
def test_mark_run_requested_failure(self, tmp_path, monkeypatch, capsys):
monkeypatch.setenv("COINHUNTER_HOME", str(tmp_path))
paths = external_gate._paths()
paths.root.mkdir(parents=True, exist_ok=True)
paths.config_file.write_text(json.dumps({"external_gate": {"trigger_command": ["echo", "ok"]}}), encoding="utf-8")
responses = [
MagicMock(returncode=0, stdout=json.dumps({"ok": True, "should_analyze": True}), stderr=""),
MagicMock(returncode=1, stdout="err", stderr=""),
]
with patch("coinhunter.commands.external_gate.run_cmd", side_effect=responses):
rc = external_gate.main()
assert rc == 1
out = json.loads(capsys.readouterr().out)
assert out["reason"] == "mark_failed"

View File

@@ -0,0 +1,194 @@
"""Tests for review_service."""
import json
from pathlib import Path
from unittest.mock import patch
from coinhunter.services import review_service as rs
class TestAnalyzeTrade:
def test_buy_good_when_price_up(self):
ex = None
trade = {"symbol": "BTC/USDT", "price": 50000.0, "action": "BUY"}
with patch.object(rs, "fetch_current_price", return_value=52000.0):
result = rs.analyze_trade(trade, ex)
assert result["action"] == "BUY"
assert result["pnl_estimate_pct"] == 4.0
assert result["outcome_assessment"] == "good"
def test_buy_bad_when_price_down(self):
trade = {"symbol": "BTC/USDT", "price": 50000.0, "action": "BUY"}
with patch.object(rs, "fetch_current_price", return_value=48000.0):
result = rs.analyze_trade(trade, None)
assert result["pnl_estimate_pct"] == -4.0
assert result["outcome_assessment"] == "bad"
def test_sell_all_missed_when_price_up(self):
trade = {"symbol": "BTC/USDT", "price": 50000.0, "action": "SELL_ALL"}
with patch.object(rs, "fetch_current_price", return_value=53000.0):
result = rs.analyze_trade(trade, None)
assert result["pnl_estimate_pct"] == -6.0
assert result["outcome_assessment"] == "missed"
def test_neutral_when_small_move(self):
trade = {"symbol": "BTC/USDT", "price": 50000.0, "action": "BUY"}
with patch.object(rs, "fetch_current_price", return_value=50100.0):
result = rs.analyze_trade(trade, None)
assert result["outcome_assessment"] == "neutral"
def test_none_when_no_price(self):
trade = {"symbol": "BTC/USDT", "action": "BUY"}
result = rs.analyze_trade(trade, None)
assert result["pnl_estimate_pct"] is None
assert result["outcome_assessment"] == "neutral"
class TestAnalyzeHoldPasses:
def test_finds_passed_opportunities_that_rise(self):
decisions = [
{
"timestamp": "2024-01-01T00:00:00Z",
"decision": "HOLD",
"analysis": {"opportunities_evaluated": [{"symbol": "ETH/USDT", "verdict": "PASS"}]},
"market_snapshot": {"ETHUSDT": {"lastPrice": 100.0}},
}
]
with patch.object(rs, "fetch_current_price", return_value=110.0):
result = rs.analyze_hold_passes(decisions, None)
assert len(result) == 1
assert result[0]["symbol"] == "ETH/USDT"
assert result[0]["change_pct"] == 10.0
def test_ignores_non_pass_verdicts(self):
decisions = [
{
"decision": "HOLD",
"analysis": {"opportunities_evaluated": [{"symbol": "ETH/USDT", "verdict": "BUY"}]},
"market_snapshot": {"ETHUSDT": {"lastPrice": 100.0}},
}
]
with patch.object(rs, "fetch_current_price", return_value=110.0):
result = rs.analyze_hold_passes(decisions, None)
assert result == []
def test_ignores_small_moves(self):
decisions = [
{
"decision": "HOLD",
"analysis": {"opportunities_evaluated": [{"symbol": "ETH/USDT", "verdict": "PASS"}]},
"market_snapshot": {"ETHUSDT": {"lastPrice": 100.0}},
}
]
with patch.object(rs, "fetch_current_price", return_value=102.0):
result = rs.analyze_hold_passes(decisions, None)
assert result == []
class TestAnalyzeCashMisses:
def test_finds_cash_sit_misses(self):
decisions = [
{
"timestamp": "2024-01-01T00:00:00Z",
"balances": {"USDT": 100.0},
"market_snapshot": {"BTCUSDT": {"lastPrice": 50000.0}},
}
]
with patch.object(rs, "fetch_current_price", return_value=54000.0):
result = rs.analyze_cash_misses(decisions, None)
assert len(result) == 1
assert result[0]["symbol"] == "BTCUSDT"
assert result[0]["change_pct"] == 8.0
def test_requires_mostly_usdt(self):
decisions = [
{
"balances": {"USDT": 10.0, "BTC": 90.0},
"market_snapshot": {"ETHUSDT": {"lastPrice": 100.0}},
}
]
with patch.object(rs, "fetch_current_price", return_value=200.0):
result = rs.analyze_cash_misses(decisions, None)
assert result == []
def test_dedupes_by_best_change(self):
decisions = [
{
"timestamp": "2024-01-01T00:00:00Z",
"balances": {"USDT": 100.0},
"market_snapshot": {"BTCUSDT": {"lastPrice": 50000.0}},
},
{
"timestamp": "2024-01-01T01:00:00Z",
"balances": {"USDT": 100.0},
"market_snapshot": {"BTCUSDT": {"lastPrice": 52000.0}},
},
]
with patch.object(rs, "fetch_current_price", return_value=56000.0):
result = rs.analyze_cash_misses(decisions, None)
assert len(result) == 1
# best change is from 50000 -> 56000 = 12%
assert result[0]["change_pct"] == 12.0
class TestGenerateReview:
def test_empty_period(self, monkeypatch):
monkeypatch.setattr(rs, "get_logs_last_n_hours", lambda log_type, hours: [])
review = rs.generate_review(1)
assert review["total_decisions"] == 0
assert review["total_trades"] == 0
assert any("No decisions or trades" in i for i in review["insights"])
def test_with_trades_and_decisions(self, monkeypatch):
trades = [
{"symbol": "BTC/USDT", "price": 50000.0, "action": "BUY", "timestamp": "2024-01-01T00:00:00Z"}
]
decisions = [
{
"timestamp": "2024-01-01T00:00:00Z",
"decision": "HOLD",
"analysis": {"opportunities_evaluated": [{"symbol": "ETH/USDT", "verdict": "PASS"}]},
"market_snapshot": {"ETHUSDT": {"lastPrice": 100.0}},
}
]
errors = [{"message": "oops"}]
def _logs(log_type, hours):
return {"decisions": decisions, "trades": trades, "errors": errors}.get(log_type, [])
monkeypatch.setattr(rs, "get_logs_last_n_hours", _logs)
monkeypatch.setattr(rs, "fetch_current_price", lambda ex, sym: 110.0 if "ETH" in sym else 52000.0)
review = rs.generate_review(1)
assert review["total_trades"] == 1
assert review["total_decisions"] == 1
assert review["stats"]["good_decisions"] == 1
assert review["stats"]["missed_hold_passes"] == 1
assert any("execution/system errors" in i for i in review["insights"])
class TestSaveReview:
def test_writes_file(self, tmp_path, monkeypatch):
monkeypatch.setenv("COINHUNTER_HOME", str(tmp_path))
review = {"review_timestamp": "2024-01-01T00:00:00+08:00", "review_period_hours": 1}
path = rs.save_review(review)
saved = json.loads(Path(path).read_text(encoding="utf-8"))
assert saved["review_period_hours"] == 1
class TestPrintReview:
def test_outputs_report(self, capsys):
review = {
"review_timestamp": "2024-01-01T00:00:00+08:00",
"review_period_hours": 1,
"total_decisions": 2,
"total_trades": 1,
"total_errors": 0,
"stats": {"good_decisions": 1, "neutral_decisions": 0, "bad_decisions": 0, "missed_opportunities": 0},
"insights": ["Insight A"],
"recommendations": ["Rec B"],
}
rs.print_review(review)
captured = capsys.readouterr()
assert "Coin Hunter Review Report" in captured.out
assert "Insight A" in captured.out
assert "Rec B" in captured.out

63
tests/test_runtime.py Normal file
View File

@@ -0,0 +1,63 @@
"""Tests for runtime path resolution."""
from pathlib import Path
import pytest
from coinhunter.runtime import RuntimePaths, ensure_runtime_dirs, get_runtime_paths, mask_secret
class TestGetRuntimePaths:
def test_defaults_point_to_home_dot_coinhunter(self):
paths = get_runtime_paths()
assert paths.root == Path.home() / ".coinhunter"
def test_respects_coinhunter_home_env(self, monkeypatch):
monkeypatch.setenv("COINHUNTER_HOME", "~/custom_ch")
paths = get_runtime_paths()
assert paths.root == Path.home() / "custom_ch"
def test_respects_hermes_home_env(self, monkeypatch):
monkeypatch.setenv("HERMES_HOME", "~/custom_hermes")
paths = get_runtime_paths()
assert paths.hermes_home == Path.home() / "custom_hermes"
assert paths.env_file == Path.home() / "custom_hermes" / ".env"
def test_returns_frozen_dataclass(self):
paths = get_runtime_paths()
assert isinstance(paths, RuntimePaths)
with pytest.raises(AttributeError):
paths.root = Path("/tmp")
def test_as_dict_returns_strings(self):
paths = get_runtime_paths()
d = paths.as_dict()
assert isinstance(d, dict)
assert all(isinstance(v, str) for v in d.values())
assert "root" in d
class TestEnsureRuntimeDirs:
def test_creates_directories(self, tmp_path, monkeypatch):
monkeypatch.setenv("COINHUNTER_HOME", str(tmp_path))
paths = get_runtime_paths()
returned = ensure_runtime_dirs(paths)
assert returned.root.exists()
assert returned.state_dir.exists()
assert returned.logs_dir.exists()
assert returned.cache_dir.exists()
assert returned.reviews_dir.exists()
class TestMaskSecret:
def test_empty_string(self):
assert mask_secret("") == ""
def test_short_value(self):
assert mask_secret("ab") == "**"
def test_masks_all_but_tail(self):
assert mask_secret("supersecret", tail=4) == "*******cret"
def test_none_returns_empty(self):
assert mask_secret(None) == ""

View File

@@ -0,0 +1,98 @@
"""Tests for smart executor deduplication and routing."""
from coinhunter.services import exchange_service
from coinhunter.services import smart_executor_service as ses
class FakeArgs:
def __init__(self, command="buy", symbol="BTCUSDT", amount_usdt=50, dry_run=False,
decision_id=None, analysis=None, reasoning=None, order_id=None,
from_symbol=None, to_symbol=None):
self.command = command
self.symbol = symbol
self.amount_usdt = amount_usdt
self.dry_run = dry_run
self.decision_id = decision_id
self.analysis = analysis
self.reasoning = reasoning
self.order_id = order_id
self.from_symbol = from_symbol
self.to_symbol = to_symbol
class TestDeduplication:
def test_skips_duplicate_mutating_action(self, monkeypatch, capsys):
monkeypatch.setattr(ses, "parse_cli_args", lambda argv: (FakeArgs(command="buy"), ["BTCUSDT", "50"]))
monkeypatch.setattr(ses, "get_execution_state", lambda did: {"status": "success"})
get_exchange_calls = []
monkeypatch.setattr(exchange_service, "get_exchange", lambda: get_exchange_calls.append(1) or "ex")
result = ses.run(["buy", "BTCUSDT", "50"])
assert result == 0
assert get_exchange_calls == []
captured = capsys.readouterr()
assert "already executed successfully" in captured.err
def test_allows_duplicate_read_only_action(self, monkeypatch):
monkeypatch.setattr(ses, "parse_cli_args", lambda argv: (FakeArgs(command="balances"), ["balances"]))
monkeypatch.setattr(ses, "get_execution_state", lambda did: {"status": "success"})
monkeypatch.setattr(ses, "command_balances", lambda ex: None)
get_exchange_calls = []
monkeypatch.setattr(exchange_service, "get_exchange", lambda: get_exchange_calls.append(1) or "ex")
result = ses.run(["bal"])
assert result == 0
assert len(get_exchange_calls) == 1
def test_allows_retry_after_failure(self, monkeypatch):
monkeypatch.setattr(ses, "parse_cli_args", lambda argv: (FakeArgs(command="buy"), ["BTCUSDT", "50"]))
monkeypatch.setattr(ses, "get_execution_state", lambda did: {"status": "failed"})
monkeypatch.setattr(ses, "record_execution_state", lambda did, state: None)
monkeypatch.setattr(ses, "build_decision_context", lambda ex, action, tail, did: {})
monkeypatch.setattr(ses, "action_buy", lambda *a, **k: {"id": "123"})
monkeypatch.setattr(ses, "print_json", lambda d: None)
get_exchange_calls = []
monkeypatch.setattr(exchange_service, "get_exchange", lambda: get_exchange_calls.append(1) or "ex")
result = ses.run(["buy", "BTCUSDT", "50"])
assert result == 0
assert len(get_exchange_calls) == 1
class TestReadOnlyRouting:
def test_routes_balances(self, monkeypatch):
monkeypatch.setattr(ses, "parse_cli_args", lambda argv: (FakeArgs(command="balances"), ["balances"]))
monkeypatch.setattr(ses, "get_execution_state", lambda did: None)
routed = []
monkeypatch.setattr(ses, "command_balances", lambda ex: routed.append("balances"))
monkeypatch.setattr(exchange_service, "get_exchange", lambda: "ex")
result = ses.run(["balances"])
assert result == 0
assert routed == ["balances"]
def test_routes_orders(self, monkeypatch):
monkeypatch.setattr(ses, "parse_cli_args", lambda argv: (FakeArgs(command="orders"), ["orders"]))
monkeypatch.setattr(ses, "get_execution_state", lambda did: None)
routed = []
monkeypatch.setattr(ses, "command_orders", lambda ex: routed.append("orders"))
monkeypatch.setattr(exchange_service, "get_exchange", lambda: "ex")
result = ses.run(["orders"])
assert result == 0
assert routed == ["orders"]
def test_routes_order_status(self, monkeypatch):
monkeypatch.setattr(ses, "parse_cli_args", lambda argv: (
FakeArgs(command="order-status", symbol="BTCUSDT", order_id="123"),
["order-status", "BTCUSDT", "123"]
))
monkeypatch.setattr(ses, "get_execution_state", lambda did: None)
routed = []
monkeypatch.setattr(ses, "command_order_status", lambda ex, sym, oid: routed.append((sym, oid)))
monkeypatch.setattr(exchange_service, "get_exchange", lambda: "ex")
result = ses.run(["order-status", "BTCUSDT", "123"])
assert result == 0
assert routed == [("BTCUSDT", "123")]

View File

@@ -0,0 +1,99 @@
"""Tests for state_manager precheck utilities."""
from datetime import timedelta, timezone
from coinhunter.services.state_manager import (
clear_run_request_fields,
sanitize_state_for_stale_triggers,
update_state_after_observation,
)
from coinhunter.services.time_utils import utc_iso, utc_now
class TestClearRunRequestFields:
def test_removes_run_fields(self):
state = {"run_requested_at": utc_iso(), "run_request_note": "test"}
clear_run_request_fields(state)
assert "run_requested_at" not in state
assert "run_request_note" not in state
class TestSanitizeStateForStaleTriggers:
def test_no_changes_when_clean(self):
state = {"pending_trigger": False}
result = sanitize_state_for_stale_triggers(state)
assert result["pending_trigger"] is False
assert result["_stale_recovery_notes"] == []
def test_clears_completed_run_request(self):
state = {
"pending_trigger": True,
"run_requested_at": utc_iso(),
"last_deep_analysis_at": utc_iso(),
}
result = sanitize_state_for_stale_triggers(state)
assert result["pending_trigger"] is False
assert "run_requested_at" not in result
assert any("completed run_requested" in note for note in result["_stale_recovery_notes"])
def test_clears_stale_run_request(self):
old = (utc_now() - timedelta(minutes=60)).replace(tzinfo=timezone.utc).isoformat()
state = {
"pending_trigger": False,
"run_requested_at": old,
}
result = sanitize_state_for_stale_triggers(state)
assert "run_requested_at" not in result
assert any("stale run_requested" in note for note in result["_stale_recovery_notes"])
def test_recovers_stale_pending_trigger(self):
old = (utc_now() - timedelta(minutes=60)).replace(tzinfo=timezone.utc).isoformat()
state = {
"pending_trigger": True,
"last_triggered_at": old,
}
result = sanitize_state_for_stale_triggers(state)
assert result["pending_trigger"] is False
assert any("stale pending_trigger" in note for note in result["_stale_recovery_notes"])
class TestUpdateStateAfterObservation:
def test_updates_last_observed_fields(self):
state = {}
snapshot = {
"generated_at": "2024-01-01T00:00:00Z",
"snapshot_hash": "abc",
"positions_hash": "pos123",
"candidates_hash": "can456",
"portfolio_value_usdt": 100.0,
"market_regime": "neutral",
"positions": [],
"top_candidates": [],
}
analysis = {"should_analyze": False, "details": [], "adaptive_profile": {}}
result = update_state_after_observation(state, snapshot, analysis)
assert result["last_observed_at"] == snapshot["generated_at"]
assert result["last_snapshot_hash"] == "abc"
def test_sets_pending_trigger_when_should_analyze(self):
state = {}
snapshot = {
"generated_at": "2024-01-01T00:00:00Z",
"snapshot_hash": "abc",
"positions_hash": "pos123",
"candidates_hash": "can456",
"portfolio_value_usdt": 100.0,
"market_regime": "neutral",
"positions": [],
"top_candidates": [],
}
analysis = {
"should_analyze": True,
"details": ["price move"],
"adaptive_profile": {},
"hard_reasons": ["moon"],
"signal_delta": 1.5,
}
result = update_state_after_observation(state, snapshot, analysis)
assert result["pending_trigger"] is True
assert result["pending_reasons"] == ["price move"]

View File

@@ -0,0 +1,160 @@
"""Tests for trade execution dry-run paths."""
import pytest
from coinhunter.services import trade_execution as te
from coinhunter.services.trade_common import set_dry_run
class TestMarketSellDryRun:
def test_returns_dry_run_order(self, monkeypatch):
set_dry_run(True)
monkeypatch.setattr(
te, "prepare_sell_quantity",
lambda ex, sym, qty: ("BTC/USDT", 0.5, 60000.0, 30000.0)
)
order = te.market_sell(None, "BTCUSDT", 0.5, "dec-123")
assert order["id"] == "dry-sell-dec-123"
assert order["symbol"] == "BTC/USDT"
assert order["amount"] == 0.5
assert order["status"] == "closed"
set_dry_run(False)
class TestMarketBuyDryRun:
def test_returns_dry_run_order(self, monkeypatch):
set_dry_run(True)
monkeypatch.setattr(
te, "prepare_buy_quantity",
lambda ex, sym, amt: ("ETH/USDT", 1.0, 3000.0, 3000.0)
)
order = te.market_buy(None, "ETHUSDT", 100, "dec-456")
assert order["id"] == "dry-buy-dec-456"
assert order["symbol"] == "ETH/USDT"
assert order["amount"] == 1.0
assert order["status"] == "closed"
set_dry_run(False)
class TestActionSellAll:
def test_raises_when_balance_zero(self, monkeypatch):
monkeypatch.setattr(te, "fetch_balances", lambda ex: {"BTC": 0})
with pytest.raises(RuntimeError, match="balance is zero"):
te.action_sell_all(None, "BTCUSDT", "dec-789", {})
def test_dry_run_does_not_reconcile(self, monkeypatch):
set_dry_run(True)
monkeypatch.setattr(te, "fetch_balances", lambda ex: {"BTC": 0.5})
monkeypatch.setattr(
te, "market_sell",
lambda ex, sym, qty, did: {"id": "dry-1", "amount": qty, "price": 60000.0, "cost": 30000.0, "status": "closed"}
)
monkeypatch.setattr(te, "reconcile_positions_with_exchange", lambda ex, hint=None: ([], {}))
monkeypatch.setattr(te, "log_trade", lambda *a, **k: None)
monkeypatch.setattr(te, "log_decision", lambda *a, **k: None)
result = te.action_sell_all(None, "BTCUSDT", "dec-789", {"analysis": "test"})
assert result["id"] == "dry-1"
set_dry_run(False)
class TestActionBuy:
def test_raises_when_insufficient_usdt(self, monkeypatch):
monkeypatch.setattr(te, "fetch_balances", lambda ex: {"USDT": 10.0})
with pytest.raises(RuntimeError, match="Insufficient USDT"):
te.action_buy(None, "BTCUSDT", 50, "dec-abc", {})
def test_dry_run_skips_save(self, monkeypatch):
set_dry_run(True)
monkeypatch.setattr(te, "fetch_balances", lambda ex: {"USDT": 100.0})
monkeypatch.setattr(
te, "market_buy",
lambda ex, sym, amt, did: {"id": "dry-2", "amount": 0.01, "price": 50000.0, "cost": 500.0, "status": "closed"}
)
monkeypatch.setattr(te, "load_positions", lambda: [])
monkeypatch.setattr(te, "upsert_position", lambda positions, pos: positions.append(pos))
monkeypatch.setattr(te, "reconcile_positions_with_exchange", lambda ex, hint=None: ([], {}))
monkeypatch.setattr(te, "log_trade", lambda *a, **k: None)
monkeypatch.setattr(te, "log_decision", lambda *a, **k: None)
result = te.action_buy(None, "BTCUSDT", 50, "dec-abc", {})
assert result["id"] == "dry-2"
set_dry_run(False)
class TestCommandBalances:
def test_returns_balances(self, monkeypatch, capsys):
monkeypatch.setattr(te, "fetch_balances", lambda ex: {"USDT": 100.0, "BTC": 0.5})
result = te.command_balances(None)
assert result == {"USDT": 100.0, "BTC": 0.5}
captured = capsys.readouterr()
assert '"USDT": 100.0' in captured.out
class TestCommandStatus:
def test_returns_snapshot(self, monkeypatch, capsys):
monkeypatch.setattr(te, "fetch_balances", lambda ex: {"USDT": 100.0})
monkeypatch.setattr(te, "load_positions", lambda: [])
monkeypatch.setattr(te, "build_market_snapshot", lambda ex: {"BTC/USDT": 50000.0})
result = te.command_status(None)
assert result["balances"]["USDT"] == 100.0
captured = capsys.readouterr()
assert '"balances"' in captured.out
class TestCommandOrders:
def test_returns_orders(self, monkeypatch, capsys):
orders = [{"id": "1", "symbol": "BTC/USDT"}]
monkeypatch.setattr(te, "print_json", lambda d: None)
ex = type("Ex", (), {"fetch_open_orders": lambda self: orders})()
result = te.command_orders(ex)
assert result == orders
class TestCommandOrderStatus:
def test_returns_order(self, monkeypatch, capsys):
order = {"id": "123", "status": "open"}
monkeypatch.setattr(te, "print_json", lambda d: None)
ex = type("Ex", (), {"fetch_order": lambda self, oid, sym: order})()
result = te.command_order_status(ex, "BTCUSDT", "123")
assert result == order
class TestCommandCancel:
def test_dry_run_returns_early(self, monkeypatch):
set_dry_run(True)
monkeypatch.setattr(te, "log", lambda msg: None)
result = te.command_cancel(None, "BTCUSDT", "123")
assert result["dry_run"] is True
set_dry_run(False)
def test_cancel_by_order_id(self, monkeypatch):
set_dry_run(False)
monkeypatch.setattr(te, "print_json", lambda d: None)
ex = type("Ex", (), {"cancel_order": lambda self, oid, sym: {"id": oid, "status": "canceled"}})()
result = te.command_cancel(ex, "BTCUSDT", "123")
assert result["status"] == "canceled"
def test_cancel_latest_when_no_order_id(self, monkeypatch):
set_dry_run(False)
monkeypatch.setattr(te, "print_json", lambda d: None)
ex = type("Ex", (), {
"fetch_open_orders": lambda self, sym: [{"id": "999"}, {"id": "888"}],
"cancel_order": lambda self, oid, sym: {"id": oid, "status": "canceled"},
})()
result = te.command_cancel(ex, "BTCUSDT", None)
assert result["id"] == "888"
def test_cancel_raises_when_no_open_orders(self, monkeypatch):
set_dry_run(False)
ex = type("Ex", (), {"fetch_open_orders": lambda self, sym: []})()
with pytest.raises(RuntimeError, match="No open orders"):
te.command_cancel(ex, "BTCUSDT", None)
class TestPrintJson:
def test_outputs_sorted_json(self, capsys):
te.print_json({"b": 2, "a": 1})
captured = capsys.readouterr()
assert '"a": 1' in captured.out
assert '"b": 2' in captured.out