feat: add catlog command, agent flag reorder, and TUI polish

- Add `coinhunter catlog` with limit/offset pagination for audit logs
- Optimize audit log reading with deque to avoid loading all history
- Allow `-a/--agent` flag after subcommands
- Fix upgrade spinner artifact and empty line issues
- Render audit log TUI as timeline with low-saturation event colors
- Convert audit timestamps to local timezone in TUI
- Remove futures-related capabilities
- Add conda environment.yml for development
- Bump version to 2.0.9 and update README

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-04-17 16:42:47 +08:00
parent 6923013694
commit f528575aa8
15 changed files with 339 additions and 92 deletions

View File

@@ -88,6 +88,11 @@ coinhunter opportunity portfolio
coinhunter opportunity scan coinhunter opportunity scan
coinhunter opportunity scan --symbols BTCUSDT ETHUSDT SOLUSDT coinhunter opportunity scan --symbols BTCUSDT ETHUSDT SOLUSDT
# Audit log
coinhunter catlog
coinhunter catlog -n 20
coinhunter catlog -n 10 -o 10
# Self-upgrade # Self-upgrade
coinhunter upgrade coinhunter upgrade
@@ -127,6 +132,8 @@ Events include:
- `opportunity_portfolio_generated` - `opportunity_portfolio_generated`
- `opportunity_scan_generated` - `opportunity_scan_generated`
Use `coinhunter catlog` to read recent entries in the terminal. It aggregates across all days and supports pagination with `-n/--limit` and `-o/--offset`.
## Development ## Development
Clone the repo and install in editable mode: Clone the repo and install in editable mode:
@@ -137,6 +144,13 @@ cd coinhunter-cli
pip install -e ".[dev]" pip install -e ".[dev]"
``` ```
Or use the provided Conda environment:
```bash
conda env create -f environment.yml
conda activate coinhunter
```
Run quality checks: Run quality checks:
```bash ```bash

9
environment.yml Normal file
View File

@@ -0,0 +1,9 @@
name: coinhunter
channels:
- defaults
- conda-forge
dependencies:
- python>=3.10
- pip
- pip:
- -e ".[dev]"

View File

@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project] [project]
name = "coinhunter" name = "coinhunter"
version = "2.0.8" version = "2.0.9"
description = "Binance-first trading CLI for balances, market data, opportunity scanning, and execution." description = "Binance-first trading CLI for balances, market data, opportunity scanning, and execution."
readme = "README.md" readme = "README.md"
license = {text = "MIT"} license = {text = "MIT"}

View File

@@ -3,6 +3,7 @@
from __future__ import annotations from __future__ import annotations
import json import json
from collections import deque
from datetime import datetime, timezone from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
@@ -37,3 +38,33 @@ def audit_event(event: str, payload: dict[str, Any], paths: RuntimePaths | None
with _audit_path(paths).open("a", encoding="utf-8") as handle: with _audit_path(paths).open("a", encoding="utf-8") as handle:
handle.write(json.dumps(entry, ensure_ascii=False, default=json_default) + "\n") handle.write(json.dumps(entry, ensure_ascii=False, default=json_default) + "\n")
return entry return entry
def read_audit_log(paths: RuntimePaths | None = None, limit: int = 10, offset: int = 0) -> list[dict[str, Any]]:
paths = ensure_runtime_dirs(paths or get_runtime_paths())
logs_dir = _resolve_audit_dir(paths)
if not logs_dir.exists():
return []
audit_files = sorted(logs_dir.glob("audit_*.jsonl"), reverse=True)
needed = offset + limit
chunks: list[list[dict[str, Any]]] = []
total = 0
for audit_file in audit_files:
remaining = needed - total
if remaining <= 0:
break
entries: list[dict[str, Any]] = []
with audit_file.open("r", encoding="utf-8") as handle:
entries = list(deque((json.loads(line) for line in handle if line.strip()), maxlen=remaining))
if entries:
chunks.append(entries)
total += len(entries)
if not chunks:
return []
all_entries: list[dict[str, Any]] = []
for chunk in reversed(chunks):
all_entries.extend(chunk)
start = -(offset + limit) if (offset + limit) <= len(all_entries) else -len(all_entries)
if offset == 0:
return all_entries[start:]
return all_entries[start:-offset]

View File

@@ -5,7 +5,7 @@ from __future__ import annotations
from collections.abc import Callable from collections.abc import Callable
from typing import Any from typing import Any
from requests.exceptions import RequestException, SSLError from requests.exceptions import RequestException, SSLError # type: ignore[import-untyped]
class SpotBinanceClient: class SpotBinanceClient:

View File

@@ -7,6 +7,7 @@ import sys
from typing import Any from typing import Any
from . import __version__ from . import __version__
from .audit import read_audit_log
from .binance.spot_client import SpotBinanceClient from .binance.spot_client import SpotBinanceClient
from .config import ensure_init_files, get_binance_credentials, load_config from .config import ensure_init_files, get_binance_credentials, load_config
from .runtime import get_runtime_paths, install_shell_completion, print_output, self_upgrade, with_spinner from .runtime import get_runtime_paths, install_shell_completion, print_output, self_upgrade, with_spinner
@@ -78,7 +79,9 @@ def build_parser() -> argparse.ArgumentParser:
sub.add_argument("symbol", metavar="SYM", help="Trading pair (e.g. BTCUSDT)") sub.add_argument("symbol", metavar="SYM", help="Trading pair (e.g. BTCUSDT)")
sub.add_argument("-q", "--qty", type=float, help="Base asset quantity") sub.add_argument("-q", "--qty", type=float, help="Base asset quantity")
sub.add_argument("-Q", "--quote", type=float, help="Quote asset amount (buy market only)") sub.add_argument("-Q", "--quote", type=float, help="Quote asset amount (buy market only)")
sub.add_argument("-t", "--type", choices=["market", "limit"], default="market", help="Order type (default: market)") sub.add_argument(
"-t", "--type", choices=["market", "limit"], default="market", help="Order type (default: market)"
)
sub.add_argument("-p", "--price", type=float, help="Limit price") sub.add_argument("-p", "--price", type=float, help="Limit price")
sub.add_argument("-d", "--dry-run", action="store_true", help="Simulate without sending") sub.add_argument("-d", "--dry-run", action="store_true", help="Simulate without sending")
@@ -90,6 +93,12 @@ def build_parser() -> argparse.ArgumentParser:
subparsers.add_parser("upgrade", help="Upgrade coinhunter to the latest version") subparsers.add_parser("upgrade", help="Upgrade coinhunter to the latest version")
catlog_parser = subparsers.add_parser("catlog", help="Read recent audit log entries")
catlog_parser.add_argument("-n", "--limit", type=int, default=10, help="Number of entries (default: 10)")
catlog_parser.add_argument(
"-o", "--offset", type=int, default=0, help="Skip the most recent N entries (default: 0)"
)
completion_parser = subparsers.add_parser("completion", help="Generate shell completion script") completion_parser = subparsers.add_parser("completion", help="Generate shell completion script")
completion_parser.add_argument("shell", choices=["bash", "zsh"], help="Target shell") completion_parser.add_argument("shell", choices=["bash", "zsh"], help="Target shell")
@@ -145,24 +154,18 @@ def main(argv: list[str] | None = None) -> int:
spot_client = _load_spot_client(config) spot_client = _load_spot_client(config)
if args.account_command == "overview": if args.account_command == "overview":
with with_spinner("Fetching account overview...", enabled=not args.agent): with with_spinner("Fetching account overview...", enabled=not args.agent):
print_output( result = account_service.get_overview(config, spot_client=spot_client)
account_service.get_overview(config, spot_client=spot_client), print_output(result, agent=args.agent)
agent=args.agent,
)
return 0 return 0
if args.account_command == "balances": if args.account_command == "balances":
with with_spinner("Fetching balances...", enabled=not args.agent): with with_spinner("Fetching balances...", enabled=not args.agent):
print_output( result = account_service.get_balances(config, spot_client=spot_client)
account_service.get_balances(config, spot_client=spot_client), print_output(result, agent=args.agent)
agent=args.agent,
)
return 0 return 0
if args.account_command == "positions": if args.account_command == "positions":
with with_spinner("Fetching positions...", enabled=not args.agent): with with_spinner("Fetching positions...", enabled=not args.agent):
print_output( result = account_service.get_positions(config, spot_client=spot_client)
account_service.get_positions(config, spot_client=spot_client), print_output(result, agent=args.agent)
agent=args.agent,
)
return 0 return 0
parser.error("account requires one of: overview, balances, positions") parser.error("account requires one of: overview, balances, positions")
@@ -170,56 +173,68 @@ def main(argv: list[str] | None = None) -> int:
spot_client = _load_spot_client(config) spot_client = _load_spot_client(config)
if args.market_command == "tickers": if args.market_command == "tickers":
with with_spinner("Fetching tickers...", enabled=not args.agent): with with_spinner("Fetching tickers...", enabled=not args.agent):
print_output(market_service.get_tickers(config, args.symbols, spot_client=spot_client), agent=args.agent) result = market_service.get_tickers(config, args.symbols, spot_client=spot_client)
print_output(result, agent=args.agent)
return 0 return 0
if args.market_command == "klines": if args.market_command == "klines":
with with_spinner("Fetching klines...", enabled=not args.agent): with with_spinner("Fetching klines...", enabled=not args.agent):
print_output( result = market_service.get_klines(
market_service.get_klines( config,
config, args.symbols,
args.symbols, interval=args.interval,
interval=args.interval, limit=args.limit,
limit=args.limit, spot_client=spot_client,
spot_client=spot_client,
),
agent=args.agent,
) )
print_output(result, agent=args.agent)
return 0 return 0
parser.error("market requires one of: tickers, klines") parser.error("market requires one of: tickers, klines")
if args.command == "trade": if args.command == "trade":
spot_client = _load_spot_client(config) spot_client = _load_spot_client(config)
with with_spinner("Placing order...", enabled=not args.agent): with with_spinner("Placing order...", enabled=not args.agent):
print_output( result = trade_service.execute_spot_trade(
trade_service.execute_spot_trade( config,
config, side=args.trade_action,
side=args.trade_action, symbol=args.symbol,
symbol=args.symbol, qty=args.qty,
qty=args.qty, quote=args.quote,
quote=args.quote, order_type=args.type,
order_type=args.type, price=args.price,
price=args.price, dry_run=True if args.dry_run else None,
dry_run=True if args.dry_run else None, spot_client=spot_client,
spot_client=spot_client,
),
agent=args.agent,
) )
print_output(result, agent=args.agent)
return 0 return 0
if args.command == "opportunity": if args.command == "opportunity":
spot_client = _load_spot_client(config) spot_client = _load_spot_client(config)
if args.opportunity_command == "portfolio": if args.opportunity_command == "portfolio":
with with_spinner("Analyzing portfolio...", enabled=not args.agent): with with_spinner("Analyzing portfolio...", enabled=not args.agent):
print_output(opportunity_service.analyze_portfolio(config, spot_client=spot_client), agent=args.agent) result = opportunity_service.analyze_portfolio(config, spot_client=spot_client)
print_output(result, agent=args.agent)
return 0 return 0
if args.opportunity_command == "scan": if args.opportunity_command == "scan":
with with_spinner("Scanning opportunities...", enabled=not args.agent): with with_spinner("Scanning opportunities...", enabled=not args.agent):
print_output(opportunity_service.scan_opportunities(config, spot_client=spot_client, symbols=args.symbols), agent=args.agent) result = opportunity_service.scan_opportunities(
config, spot_client=spot_client, symbols=args.symbols
)
print_output(result, agent=args.agent)
return 0 return 0
parser.error("opportunity requires `portfolio` or `scan`") parser.error("opportunity requires `portfolio` or `scan`")
if args.command == "upgrade": if args.command == "upgrade":
print_output(self_upgrade(), agent=args.agent) with with_spinner("Upgrading coinhunter...", enabled=not args.agent):
result = self_upgrade()
print_output(result, agent=args.agent)
return 0
if args.command == "catlog":
with with_spinner("Reading audit logs...", enabled=not args.agent):
entries = read_audit_log(limit=args.limit, offset=args.offset)
print_output(
{"entries": entries, "limit": args.limit, "offset": args.offset, "total": len(entries)},
agent=args.agent,
)
return 0 return 0
parser.error(f"Unsupported command {args.command}") parser.error(f"Unsupported command {args.command}")

View File

@@ -126,6 +126,24 @@ def _fmt_number(value: Any) -> str:
return str(value) return str(value)
def _fmt_local_ts(ts: str) -> str:
try:
dt = datetime.fromisoformat(ts.replace("Z", "+00:00"))
return dt.astimezone().strftime("%Y-%m-%d %H:%M:%S")
except Exception:
return ts
def _event_color(event: str) -> str:
if "failed" in event or "error" in event:
return f"{_DIM}{_RED}"
if event.startswith("trade"):
return f"{_DIM}{_GREEN}"
if event.startswith("opportunity"):
return f"{_DIM}{_YELLOW}"
return _DIM
def _is_large_dataset(payload: Any, threshold: int = 8) -> bool: def _is_large_dataset(payload: Any, threshold: int = 8) -> bool:
if isinstance(payload, dict): if isinstance(payload, dict):
for value in payload.values(): for value in payload.values():
@@ -281,7 +299,9 @@ def _render_tui(payload: Any) -> None:
if "klines" in payload: if "klines" in payload:
rows = payload["klines"] rows = payload["klines"]
print(f"\n{_BOLD}{_CYAN} KLINES {_RESET} interval={payload.get('interval')} limit={payload.get('limit')} count={len(rows)}") print(
f"\n{_BOLD}{_CYAN} KLINES {_RESET} interval={payload.get('interval')} limit={payload.get('limit')} count={len(rows)}"
)
display_rows = rows[:10] display_rows = rows[:10]
table_rows = [] table_rows = []
for r in display_rows: for r in display_rows:
@@ -325,8 +345,12 @@ def _render_tui(payload: Any) -> None:
for i, r in enumerate(rows, 1): for i, r in enumerate(rows, 1):
score = r.get("score", 0) score = r.get("score", 0)
action = r.get("action", "") action = r.get("action", "")
action_color = _GREEN if action == "add" else _YELLOW if action == "hold" else _RED if action == "exit" else _CYAN action_color = (
print(f" {i}. {_BOLD}{r.get('symbol', '')}{_RESET} action={_color(action, action_color)} score={score:.4f}") _GREEN if action == "add" else _YELLOW if action == "hold" else _RED if action == "exit" else _CYAN
)
print(
f" {i}. {_BOLD}{r.get('symbol', '')}{_RESET} action={_color(action, action_color)} score={score:.4f}"
)
for reason in r.get("reasons", []): for reason in r.get("reasons", []):
print(f" · {reason}") print(f" · {reason}")
metrics = r.get("metrics", {}) metrics = r.get("metrics", {})
@@ -340,9 +364,9 @@ def _render_tui(payload: Any) -> None:
stdout = payload.get("stdout", "") stdout = payload.get("stdout", "")
stderr = payload.get("stderr", "") stderr = payload.get("stderr", "")
if rc == 0: if rc == 0:
print(f"\n{_GREEN}{_RESET} Update completed") print(f"{_GREEN}{_RESET} Update completed")
else: else:
print(f"\n{_RED}{_RESET} Update failed (exit code {rc})") print(f"{_RED}{_RESET} Update failed (exit code {rc})")
if stdout: if stdout:
for line in stdout.strip().splitlines(): for line in stdout.strip().splitlines():
print(f" {line}") print(f" {line}")
@@ -352,6 +376,29 @@ def _render_tui(payload: Any) -> None:
print(f" {line}") print(f" {line}")
return return
if "entries" in payload:
rows = payload["entries"]
print(f"\n{_BOLD}{_CYAN} AUDIT LOG {_RESET}")
if not rows:
print(" (no audit entries)")
return
for r in rows:
ts = _fmt_local_ts(r.get("timestamp", ""))
event = r.get("event", "")
detail_parts: list[str] = []
for key in ("symbol", "side", "qty", "quote_amount", "order_type", "status", "dry_run", "error"):
val = r.get(key)
if val is not None:
detail_parts.append(f"{key}={val}")
if not detail_parts:
for key, val in r.items():
if key not in ("timestamp", "event") and not isinstance(val, (dict, list)):
detail_parts.append(f"{key}={val}")
print(f"\n {_DIM}{ts}{_RESET} {_event_color(event)}{event}{_RESET}")
if detail_parts:
print(f" {' '.join(detail_parts)}")
return
if "created_or_updated" in payload: if "created_or_updated" in payload:
print(f"\n{_BOLD}{_CYAN} INITIALIZED {_RESET}") print(f"\n{_BOLD}{_CYAN} INITIALIZED {_RESET}")
print(f" Root: {payload.get('root', '')}") print(f" Root: {payload.get('root', '')}")
@@ -485,7 +532,10 @@ def install_shell_completion(parser: argparse.ArgumentParser) -> dict[str, Any]:
rc_path = _zshrc_path() rc_path = _zshrc_path()
fpath_line = "fpath+=(~/.zsh/completions)" fpath_line = "fpath+=(~/.zsh/completions)"
if not _rc_contains(rc_path, fpath_line): if not _rc_contains(rc_path, fpath_line):
rc_path.write_text(fpath_line + "\n" + rc_path.read_text(encoding="utf-8") if rc_path.exists() else fpath_line + "\n", encoding="utf-8") rc_path.write_text(
fpath_line + "\n" + rc_path.read_text(encoding="utf-8") if rc_path.exists() else fpath_line + "\n",
encoding="utf-8",
)
hint = "Added fpath+=(~/.zsh/completions) to ~/.zshrc; restart your terminal or run 'compinit'" hint = "Added fpath+=(~/.zsh/completions) to ~/.zshrc; restart your terminal or run 'compinit'"
else: else:
hint = "Run 'compinit' or restart your terminal to activate completions" hint = "Run 'compinit' or restart your terminal to activate completions"
@@ -497,7 +547,10 @@ def install_shell_completion(parser: argparse.ArgumentParser) -> dict[str, Any]:
rc_path = _bashrc_path() rc_path = _bashrc_path()
source_line = '[[ -r "~/.local/share/bash-completion/completions/coinhunter" ]] && . "~/.local/share/bash-completion/completions/coinhunter"' source_line = '[[ -r "~/.local/share/bash-completion/completions/coinhunter" ]] && . "~/.local/share/bash-completion/completions/coinhunter"'
if not _rc_contains(rc_path, source_line): if not _rc_contains(rc_path, source_line):
rc_path.write_text(source_line + "\n" + rc_path.read_text(encoding="utf-8") if rc_path.exists() else source_line + "\n", encoding="utf-8") rc_path.write_text(
source_line + "\n" + rc_path.read_text(encoding="utf-8") if rc_path.exists() else source_line + "\n",
encoding="utf-8",
)
hint = "Added bash completion source line to ~/.bashrc; restart your terminal" hint = "Added bash completion source line to ~/.bashrc; restart your terminal"
else: else:
hint = "Restart your terminal or source ~/.bashrc to activate completions" hint = "Restart your terminal or source ~/.bashrc to activate completions"

View File

@@ -57,7 +57,9 @@ def get_tickers(config: dict[str, Any], symbols: list[str], *, spot_client: Any)
TickerView( TickerView(
symbol=normalize_symbol(ticker["symbol"]), symbol=normalize_symbol(ticker["symbol"]),
last_price=float(ticker.get("lastPrice") or ticker.get("last_price") or 0.0), last_price=float(ticker.get("lastPrice") or ticker.get("last_price") or 0.0),
price_change_pct=float(ticker.get("priceChangePercent") or ticker.get("price_change_percent") or 0.0), price_change_pct=float(
ticker.get("priceChangePercent") or ticker.get("price_change_percent") or 0.0
),
quote_volume=float(ticker.get("quoteVolume") or ticker.get("quote_volume") or 0.0), quote_volume=float(ticker.get("quoteVolume") or ticker.get("quote_volume") or 0.0),
) )
) )

View File

@@ -26,7 +26,9 @@ def _safe_pct(new: float, old: float) -> float:
return (new - old) / old return (new - old) / old
def _score_candidate(closes: list[float], volumes: list[float], ticker: dict[str, Any], weights: dict[str, float], concentration: float) -> tuple[float, dict[str, float]]: def _score_candidate(
closes: list[float], volumes: list[float], ticker: dict[str, Any], weights: dict[str, float], concentration: float
) -> tuple[float, dict[str, float]]:
if len(closes) < 2 or not volumes: if len(closes) < 2 or not volumes:
return 0.0, { return 0.0, {
"trend": 0.0, "trend": 0.0,
@@ -158,10 +160,7 @@ def scan_opportunities(
top_n = int(opportunity_config.get("top_n", 10)) top_n = int(opportunity_config.get("top_n", 10))
quote = str(config.get("market", {}).get("default_quote", "USDT")).upper() quote = str(config.get("market", {}).get("default_quote", "USDT")).upper()
held_positions = get_positions(config, spot_client=spot_client)["positions"] held_positions = get_positions(config, spot_client=spot_client)["positions"]
concentration_map = { concentration_map = {normalize_symbol(item["symbol"]): float(item["notional_usdt"]) for item in held_positions}
normalize_symbol(item["symbol"]): float(item["notional_usdt"])
for item in held_positions
}
total_held = sum(concentration_map.values()) or 1.0 total_held = sum(concentration_map.values()) or 1.0
universe = get_scan_universe(config, spot_client=spot_client, symbols=symbols)[:scan_limit] universe = get_scan_universe(config, spot_client=spot_client, symbols=symbols)[:scan_limit]

View File

@@ -40,7 +40,9 @@ def _default_dry_run(config: dict[str, Any], dry_run: bool | None) -> bool:
return bool(config.get("trading", {}).get("dry_run_default", False)) return bool(config.get("trading", {}).get("dry_run_default", False))
def _trade_log_payload(intent: TradeIntent, payload: dict[str, Any], *, status: str, error: str | None = None) -> dict[str, Any]: def _trade_log_payload(
intent: TradeIntent, payload: dict[str, Any], *, status: str, error: str | None = None
) -> dict[str, Any]:
return { return {
"market_type": intent.market_type, "market_type": intent.market_type,
"symbol": intent.symbol, "symbol": intent.symbol,
@@ -125,7 +127,9 @@ def execute_spot_trade(
response_payload=response, response_payload=response,
) )
) )
audit_event("trade_filled", {**_trade_log_payload(intent, payload, status="DRY_RUN"), "response_payload": response}) audit_event(
"trade_filled", {**_trade_log_payload(intent, payload, status="DRY_RUN"), "response_payload": response}
)
return {"trade": result} return {"trade": result}
try: try:
@@ -146,5 +150,7 @@ def execute_spot_trade(
response_payload=response, response_payload=response,
) )
) )
audit_event("trade_filled", {**_trade_log_payload(intent, payload, status=result["status"]), "response_payload": response}) audit_event(
"trade_filled", {**_trade_log_payload(intent, payload, status=result["status"]), "response_payload": response}
)
return {"trade": result} return {"trade": result}

View File

@@ -28,9 +28,30 @@ class FakeSpotClient:
def ticker_24h(self, symbols=None): def ticker_24h(self, symbols=None):
rows = [ rows = [
{"symbol": "BTCUSDT", "lastPrice": "60000", "priceChangePercent": "4.5", "quoteVolume": "10000000", "highPrice": "61000", "lowPrice": "58000"}, {
{"symbol": "ETHUSDT", "lastPrice": "3000", "priceChangePercent": "3.0", "quoteVolume": "8000000", "highPrice": "3050", "lowPrice": "2900"}, "symbol": "BTCUSDT",
{"symbol": "DOGEUSDT", "lastPrice": "0.1", "priceChangePercent": "1.0", "quoteVolume": "200", "highPrice": "0.11", "lowPrice": "0.09"}, "lastPrice": "60000",
"priceChangePercent": "4.5",
"quoteVolume": "10000000",
"highPrice": "61000",
"lowPrice": "58000",
},
{
"symbol": "ETHUSDT",
"lastPrice": "3000",
"priceChangePercent": "3.0",
"quoteVolume": "8000000",
"highPrice": "3050",
"lowPrice": "2900",
},
{
"symbol": "DOGEUSDT",
"lastPrice": "0.1",
"priceChangePercent": "1.0",
"quoteVolume": "200",
"highPrice": "0.11",
"lowPrice": "0.09",
},
] ]
if not symbols: if not symbols:
return rows return rows
@@ -38,7 +59,13 @@ class FakeSpotClient:
return [row for row in rows if row["symbol"] in wanted] return [row for row in rows if row["symbol"] in wanted]
def exchange_info(self): def exchange_info(self):
return {"symbols": [{"symbol": "BTCUSDT", "status": "TRADING"}, {"symbol": "ETHUSDT", "status": "TRADING"}, {"symbol": "DOGEUSDT", "status": "BREAK"}]} return {
"symbols": [
{"symbol": "BTCUSDT", "status": "TRADING"},
{"symbol": "ETHUSDT", "status": "TRADING"},
{"symbol": "DOGEUSDT", "status": "BREAK"},
]
}
class AccountMarketServicesTestCase(unittest.TestCase): class AccountMarketServicesTestCase(unittest.TestCase):

View File

@@ -19,10 +19,16 @@ class CLITestCase(unittest.TestCase):
def test_init_dispatches(self): def test_init_dispatches(self):
captured = {} captured = {}
with patch.object(cli, "ensure_init_files", return_value={"force": True, "root": "/tmp/ch"}), patch.object( with (
cli, "install_shell_completion", return_value={"shell": "zsh", "installed": True, "path": "/tmp/ch/_coinhunter"} patch.object(cli, "ensure_init_files", return_value={"force": True, "root": "/tmp/ch"}),
), patch.object( patch.object(
cli, "print_output", side_effect=lambda payload, **kwargs: captured.setdefault("payload", payload) cli,
"install_shell_completion",
return_value={"shell": "zsh", "installed": True, "path": "/tmp/ch/_coinhunter"},
),
patch.object(
cli, "print_output", side_effect=lambda payload, **kwargs: captured.setdefault("payload", payload)
),
): ):
result = cli.main(["init", "--force"]) result = cli.main(["init", "--force"])
self.assertEqual(result, 0) self.assertEqual(result, 0)
@@ -42,9 +48,29 @@ class CLITestCase(unittest.TestCase):
def test_upgrade_dispatches(self): def test_upgrade_dispatches(self):
captured = {} captured = {}
with patch.object(cli, "self_upgrade", return_value={"command": "pipx upgrade coinhunter", "returncode": 0}), patch.object( with (
cli, "print_output", side_effect=lambda payload, **kwargs: captured.setdefault("payload", payload) patch.object(cli, "self_upgrade", return_value={"command": "pipx upgrade coinhunter", "returncode": 0}),
patch.object(
cli, "print_output", side_effect=lambda payload, **kwargs: captured.setdefault("payload", payload)
),
): ):
result = cli.main(["upgrade"]) result = cli.main(["upgrade"])
self.assertEqual(result, 0) self.assertEqual(result, 0)
self.assertEqual(captured["payload"]["returncode"], 0) self.assertEqual(captured["payload"]["returncode"], 0)
def test_catlog_dispatches(self):
captured = {}
with (
patch.object(
cli, "read_audit_log", return_value=[{"timestamp": "2026-04-17T12:00:00Z", "event": "test_event"}]
),
patch.object(
cli, "print_output", side_effect=lambda payload, **kwargs: captured.setdefault("payload", payload)
),
):
result = cli.main(["catlog", "-n", "5", "-o", "10"])
self.assertEqual(result, 0)
self.assertEqual(captured["payload"]["limit"], 5)
self.assertEqual(captured["payload"]["offset"], 10)
self.assertIn("entries", captured["payload"])
self.assertEqual(captured["payload"]["total"], 1)

View File

@@ -14,7 +14,10 @@ from coinhunter.runtime import get_runtime_paths
class ConfigRuntimeTestCase(unittest.TestCase): class ConfigRuntimeTestCase(unittest.TestCase):
def test_init_files_created_in_coinhunter_home(self): def test_init_files_created_in_coinhunter_home(self):
with tempfile.TemporaryDirectory() as tmp_dir, patch.dict(os.environ, {"COINHUNTER_HOME": str(Path(tmp_dir) / "home")}, clear=False): with (
tempfile.TemporaryDirectory() as tmp_dir,
patch.dict(os.environ, {"COINHUNTER_HOME": str(Path(tmp_dir) / "home")}, clear=False),
):
paths = get_runtime_paths() paths = get_runtime_paths()
payload = ensure_init_files(paths) payload = ensure_init_files(paths)
self.assertTrue(paths.config_file.exists()) self.assertTrue(paths.config_file.exists())
@@ -23,10 +26,13 @@ class ConfigRuntimeTestCase(unittest.TestCase):
self.assertEqual(payload["root"], str(paths.root)) self.assertEqual(payload["root"], str(paths.root))
def test_load_config_and_env(self): def test_load_config_and_env(self):
with tempfile.TemporaryDirectory() as tmp_dir, patch.dict( with (
os.environ, tempfile.TemporaryDirectory() as tmp_dir,
{"COINHUNTER_HOME": str(Path(tmp_dir) / "home")}, patch.dict(
clear=False, os.environ,
{"COINHUNTER_HOME": str(Path(tmp_dir) / "home")},
clear=False,
),
): ):
paths = get_runtime_paths() paths = get_runtime_paths()
ensure_init_files(paths) ensure_init_files(paths)
@@ -40,10 +46,13 @@ class ConfigRuntimeTestCase(unittest.TestCase):
self.assertEqual(os.environ["BINANCE_API_SECRET"], "def") self.assertEqual(os.environ["BINANCE_API_SECRET"], "def")
def test_env_file_overrides_existing_environment(self): def test_env_file_overrides_existing_environment(self):
with tempfile.TemporaryDirectory() as tmp_dir, patch.dict( with (
os.environ, tempfile.TemporaryDirectory() as tmp_dir,
{"COINHUNTER_HOME": str(Path(tmp_dir) / "home"), "BINANCE_API_KEY": "old_key"}, patch.dict(
clear=False, os.environ,
{"COINHUNTER_HOME": str(Path(tmp_dir) / "home"), "BINANCE_API_KEY": "old_key"},
clear=False,
),
): ):
paths = get_runtime_paths() paths = get_runtime_paths()
ensure_init_files(paths) ensure_init_files(paths)
@@ -55,10 +64,13 @@ class ConfigRuntimeTestCase(unittest.TestCase):
self.assertEqual(os.environ["BINANCE_API_SECRET"], "new_secret") self.assertEqual(os.environ["BINANCE_API_SECRET"], "new_secret")
def test_missing_credentials_raise(self): def test_missing_credentials_raise(self):
with tempfile.TemporaryDirectory() as tmp_dir, patch.dict( with (
os.environ, tempfile.TemporaryDirectory() as tmp_dir,
{"COINHUNTER_HOME": str(Path(tmp_dir) / "home")}, patch.dict(
clear=False, os.environ,
{"COINHUNTER_HOME": str(Path(tmp_dir) / "home")},
clear=False,
),
): ):
os.environ.pop("BINANCE_API_KEY", None) os.environ.pop("BINANCE_API_KEY", None)
os.environ.pop("BINANCE_API_SECRET", None) os.environ.pop("BINANCE_API_SECRET", None)
@@ -68,10 +80,13 @@ class ConfigRuntimeTestCase(unittest.TestCase):
get_binance_credentials(paths) get_binance_credentials(paths)
def test_permission_error_is_explained(self): def test_permission_error_is_explained(self):
with tempfile.TemporaryDirectory() as tmp_dir, patch.dict( with (
os.environ, tempfile.TemporaryDirectory() as tmp_dir,
{"COINHUNTER_HOME": str(Path(tmp_dir) / "home")}, patch.dict(
clear=False, os.environ,
{"COINHUNTER_HOME": str(Path(tmp_dir) / "home")},
clear=False,
),
): ):
paths = get_runtime_paths() paths = get_runtime_paths()
with patch("coinhunter.config.ensure_runtime_dirs", side_effect=PermissionError("no write access")): with patch("coinhunter.config.ensure_runtime_dirs", side_effect=PermissionError("no write access")):

View File

@@ -29,17 +29,52 @@ class FakeSpotClient:
def ticker_24h(self, symbols=None): def ticker_24h(self, symbols=None):
rows = { rows = {
"BTCUSDT": {"symbol": "BTCUSDT", "lastPrice": "60000", "priceChangePercent": "5", "quoteVolume": "9000000", "highPrice": "60200", "lowPrice": "55000"}, "BTCUSDT": {
"ETHUSDT": {"symbol": "ETHUSDT", "lastPrice": "3000", "priceChangePercent": "3", "quoteVolume": "8000000", "highPrice": "3100", "lowPrice": "2800"}, "symbol": "BTCUSDT",
"SOLUSDT": {"symbol": "SOLUSDT", "lastPrice": "150", "priceChangePercent": "8", "quoteVolume": "10000000", "highPrice": "152", "lowPrice": "130"}, "lastPrice": "60000",
"DOGEUSDT": {"symbol": "DOGEUSDT", "lastPrice": "0.1", "priceChangePercent": "1", "quoteVolume": "100", "highPrice": "0.11", "lowPrice": "0.09"}, "priceChangePercent": "5",
"quoteVolume": "9000000",
"highPrice": "60200",
"lowPrice": "55000",
},
"ETHUSDT": {
"symbol": "ETHUSDT",
"lastPrice": "3000",
"priceChangePercent": "3",
"quoteVolume": "8000000",
"highPrice": "3100",
"lowPrice": "2800",
},
"SOLUSDT": {
"symbol": "SOLUSDT",
"lastPrice": "150",
"priceChangePercent": "8",
"quoteVolume": "10000000",
"highPrice": "152",
"lowPrice": "130",
},
"DOGEUSDT": {
"symbol": "DOGEUSDT",
"lastPrice": "0.1",
"priceChangePercent": "1",
"quoteVolume": "100",
"highPrice": "0.11",
"lowPrice": "0.09",
},
} }
if not symbols: if not symbols:
return list(rows.values()) return list(rows.values())
return [rows[symbol] for symbol in symbols] return [rows[symbol] for symbol in symbols]
def exchange_info(self): def exchange_info(self):
return {"symbols": [{"symbol": "BTCUSDT", "status": "TRADING"}, {"symbol": "ETHUSDT", "status": "TRADING"}, {"symbol": "SOLUSDT", "status": "TRADING"}, {"symbol": "DOGEUSDT", "status": "TRADING"}]} return {
"symbols": [
{"symbol": "BTCUSDT", "status": "TRADING"},
{"symbol": "ETHUSDT", "status": "TRADING"},
{"symbol": "SOLUSDT", "status": "TRADING"},
{"symbol": "DOGEUSDT", "status": "TRADING"},
]
}
def klines(self, symbol, interval, limit): def klines(self, symbol, interval, limit):
curves = { curves = {
@@ -50,7 +85,18 @@ class FakeSpotClient:
}[symbol] }[symbol]
rows = [] rows = []
for index, close in enumerate(curves[-limit:]): for index, close in enumerate(curves[-limit:]):
rows.append([index, close * 0.98, close * 1.01, close * 0.97, close, 100 + index * 10, index + 1, close * (100 + index * 10)]) rows.append(
[
index,
close * 0.98,
close * 1.01,
close * 0.97,
close,
100 + index * 10,
index + 1,
close * (100 + index * 10),
]
)
return rows return rows
@@ -85,7 +131,9 @@ class OpportunityServiceTestCase(unittest.TestCase):
def test_scan_is_deterministic(self): def test_scan_is_deterministic(self):
with patch.object(opportunity_service, "audit_event", return_value=None): with patch.object(opportunity_service, "audit_event", return_value=None):
payload = opportunity_service.scan_opportunities(self.config | {"opportunity": self.config["opportunity"] | {"top_n": 2}}, spot_client=FakeSpotClient()) payload = opportunity_service.scan_opportunities(
self.config | {"opportunity": self.config["opportunity"] | {"top_n": 2}}, spot_client=FakeSpotClient()
)
self.assertEqual([item["symbol"] for item in payload["recommendations"]], ["SOLUSDT", "BTCUSDT"]) self.assertEqual([item["symbol"] for item in payload["recommendations"]], ["SOLUSDT", "BTCUSDT"])
def test_score_candidate_handles_empty_klines(self): def test_score_candidate_handles_empty_klines(self):

View File

@@ -20,7 +20,9 @@ class FakeSpotClient:
class TradeServiceTestCase(unittest.TestCase): class TradeServiceTestCase(unittest.TestCase):
def test_spot_market_buy_dry_run_does_not_call_client(self): def test_spot_market_buy_dry_run_does_not_call_client(self):
events = [] events = []
with patch.object(trade_service, "audit_event", side_effect=lambda event, payload: events.append((event, payload))): with patch.object(
trade_service, "audit_event", side_effect=lambda event, payload: events.append((event, payload))
):
client = FakeSpotClient() client = FakeSpotClient()
payload = trade_service.execute_spot_trade( payload = trade_service.execute_spot_trade(
{"trading": {"dry_run_default": False}}, {"trading": {"dry_run_default": False}},