From f528575aa86bfaee1ec60a4182425b104ac0f72a Mon Sep 17 00:00:00 2001 From: Tacit Lab Date: Fri, 17 Apr 2026 16:42:47 +0800 Subject: [PATCH] feat: add catlog command, agent flag reorder, and TUI polish - Add `coinhunter catlog` with limit/offset pagination for audit logs - Optimize audit log reading with deque to avoid loading all history - Allow `-a/--agent` flag after subcommands - Fix upgrade spinner artifact and empty line issues - Render audit log TUI as timeline with low-saturation event colors - Convert audit timestamps to local timezone in TUI - Remove futures-related capabilities - Add conda environment.yml for development - Bump version to 2.0.9 and update README Co-Authored-By: Claude Opus 4.7 --- README.md | 14 +++ environment.yml | 9 ++ pyproject.toml | 2 +- src/coinhunter/audit.py | 31 +++++++ src/coinhunter/binance/spot_client.py | 2 +- src/coinhunter/cli.py | 93 +++++++++++-------- src/coinhunter/runtime.py | 67 +++++++++++-- src/coinhunter/services/market_service.py | 4 +- .../services/opportunity_service.py | 9 +- src/coinhunter/services/trade_service.py | 12 ++- tests/test_account_market_services.py | 35 ++++++- tests/test_cli.py | 38 ++++++-- tests/test_config_runtime.py | 49 ++++++---- tests/test_opportunity_service.py | 62 +++++++++++-- tests/test_trade_service.py | 4 +- 15 files changed, 339 insertions(+), 92 deletions(-) create mode 100644 environment.yml diff --git a/README.md b/README.md index a01aaa0..f26a8e2 100644 --- a/README.md +++ b/README.md @@ -88,6 +88,11 @@ coinhunter opportunity portfolio coinhunter opportunity scan coinhunter opportunity scan --symbols BTCUSDT ETHUSDT SOLUSDT +# Audit log +coinhunter catlog +coinhunter catlog -n 20 +coinhunter catlog -n 10 -o 10 + # Self-upgrade coinhunter upgrade @@ -127,6 +132,8 @@ Events include: - `opportunity_portfolio_generated` - `opportunity_scan_generated` +Use `coinhunter catlog` to read recent entries in the terminal. It aggregates across all days and supports pagination with `-n/--limit` and `-o/--offset`. + ## Development Clone the repo and install in editable mode: @@ -137,6 +144,13 @@ cd coinhunter-cli pip install -e ".[dev]" ``` +Or use the provided Conda environment: + +```bash +conda env create -f environment.yml +conda activate coinhunter +``` + Run quality checks: ```bash diff --git a/environment.yml b/environment.yml new file mode 100644 index 0000000..5d820fe --- /dev/null +++ b/environment.yml @@ -0,0 +1,9 @@ +name: coinhunter +channels: + - defaults + - conda-forge +dependencies: + - python>=3.10 + - pip + - pip: + - -e ".[dev]" diff --git a/pyproject.toml b/pyproject.toml index 487c22e..e4e0e77 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "coinhunter" -version = "2.0.8" +version = "2.0.9" description = "Binance-first trading CLI for balances, market data, opportunity scanning, and execution." readme = "README.md" license = {text = "MIT"} diff --git a/src/coinhunter/audit.py b/src/coinhunter/audit.py index 8d7ee32..a1c8f02 100644 --- a/src/coinhunter/audit.py +++ b/src/coinhunter/audit.py @@ -3,6 +3,7 @@ from __future__ import annotations import json +from collections import deque from datetime import datetime, timezone from pathlib import Path from typing import Any @@ -37,3 +38,33 @@ def audit_event(event: str, payload: dict[str, Any], paths: RuntimePaths | None with _audit_path(paths).open("a", encoding="utf-8") as handle: handle.write(json.dumps(entry, ensure_ascii=False, default=json_default) + "\n") return entry + + +def read_audit_log(paths: RuntimePaths | None = None, limit: int = 10, offset: int = 0) -> list[dict[str, Any]]: + paths = ensure_runtime_dirs(paths or get_runtime_paths()) + logs_dir = _resolve_audit_dir(paths) + if not logs_dir.exists(): + return [] + audit_files = sorted(logs_dir.glob("audit_*.jsonl"), reverse=True) + needed = offset + limit + chunks: list[list[dict[str, Any]]] = [] + total = 0 + for audit_file in audit_files: + remaining = needed - total + if remaining <= 0: + break + entries: list[dict[str, Any]] = [] + with audit_file.open("r", encoding="utf-8") as handle: + entries = list(deque((json.loads(line) for line in handle if line.strip()), maxlen=remaining)) + if entries: + chunks.append(entries) + total += len(entries) + if not chunks: + return [] + all_entries: list[dict[str, Any]] = [] + for chunk in reversed(chunks): + all_entries.extend(chunk) + start = -(offset + limit) if (offset + limit) <= len(all_entries) else -len(all_entries) + if offset == 0: + return all_entries[start:] + return all_entries[start:-offset] diff --git a/src/coinhunter/binance/spot_client.py b/src/coinhunter/binance/spot_client.py index b3e0029..def05eb 100644 --- a/src/coinhunter/binance/spot_client.py +++ b/src/coinhunter/binance/spot_client.py @@ -5,7 +5,7 @@ from __future__ import annotations from collections.abc import Callable from typing import Any -from requests.exceptions import RequestException, SSLError +from requests.exceptions import RequestException, SSLError # type: ignore[import-untyped] class SpotBinanceClient: diff --git a/src/coinhunter/cli.py b/src/coinhunter/cli.py index 60bbe75..0c4e2da 100644 --- a/src/coinhunter/cli.py +++ b/src/coinhunter/cli.py @@ -7,6 +7,7 @@ import sys from typing import Any from . import __version__ +from .audit import read_audit_log from .binance.spot_client import SpotBinanceClient from .config import ensure_init_files, get_binance_credentials, load_config from .runtime import get_runtime_paths, install_shell_completion, print_output, self_upgrade, with_spinner @@ -78,7 +79,9 @@ def build_parser() -> argparse.ArgumentParser: sub.add_argument("symbol", metavar="SYM", help="Trading pair (e.g. BTCUSDT)") sub.add_argument("-q", "--qty", type=float, help="Base asset quantity") sub.add_argument("-Q", "--quote", type=float, help="Quote asset amount (buy market only)") - sub.add_argument("-t", "--type", choices=["market", "limit"], default="market", help="Order type (default: market)") + sub.add_argument( + "-t", "--type", choices=["market", "limit"], default="market", help="Order type (default: market)" + ) sub.add_argument("-p", "--price", type=float, help="Limit price") sub.add_argument("-d", "--dry-run", action="store_true", help="Simulate without sending") @@ -90,6 +93,12 @@ def build_parser() -> argparse.ArgumentParser: subparsers.add_parser("upgrade", help="Upgrade coinhunter to the latest version") + catlog_parser = subparsers.add_parser("catlog", help="Read recent audit log entries") + catlog_parser.add_argument("-n", "--limit", type=int, default=10, help="Number of entries (default: 10)") + catlog_parser.add_argument( + "-o", "--offset", type=int, default=0, help="Skip the most recent N entries (default: 0)" + ) + completion_parser = subparsers.add_parser("completion", help="Generate shell completion script") completion_parser.add_argument("shell", choices=["bash", "zsh"], help="Target shell") @@ -145,24 +154,18 @@ def main(argv: list[str] | None = None) -> int: spot_client = _load_spot_client(config) if args.account_command == "overview": with with_spinner("Fetching account overview...", enabled=not args.agent): - print_output( - account_service.get_overview(config, spot_client=spot_client), - agent=args.agent, - ) + result = account_service.get_overview(config, spot_client=spot_client) + print_output(result, agent=args.agent) return 0 if args.account_command == "balances": with with_spinner("Fetching balances...", enabled=not args.agent): - print_output( - account_service.get_balances(config, spot_client=spot_client), - agent=args.agent, - ) + result = account_service.get_balances(config, spot_client=spot_client) + print_output(result, agent=args.agent) return 0 if args.account_command == "positions": with with_spinner("Fetching positions...", enabled=not args.agent): - print_output( - account_service.get_positions(config, spot_client=spot_client), - agent=args.agent, - ) + result = account_service.get_positions(config, spot_client=spot_client) + print_output(result, agent=args.agent) return 0 parser.error("account requires one of: overview, balances, positions") @@ -170,56 +173,68 @@ def main(argv: list[str] | None = None) -> int: spot_client = _load_spot_client(config) if args.market_command == "tickers": with with_spinner("Fetching tickers...", enabled=not args.agent): - print_output(market_service.get_tickers(config, args.symbols, spot_client=spot_client), agent=args.agent) + result = market_service.get_tickers(config, args.symbols, spot_client=spot_client) + print_output(result, agent=args.agent) return 0 if args.market_command == "klines": with with_spinner("Fetching klines...", enabled=not args.agent): - print_output( - market_service.get_klines( - config, - args.symbols, - interval=args.interval, - limit=args.limit, - spot_client=spot_client, - ), - agent=args.agent, + result = market_service.get_klines( + config, + args.symbols, + interval=args.interval, + limit=args.limit, + spot_client=spot_client, ) + print_output(result, agent=args.agent) return 0 parser.error("market requires one of: tickers, klines") if args.command == "trade": spot_client = _load_spot_client(config) with with_spinner("Placing order...", enabled=not args.agent): - print_output( - trade_service.execute_spot_trade( - config, - side=args.trade_action, - symbol=args.symbol, - qty=args.qty, - quote=args.quote, - order_type=args.type, - price=args.price, - dry_run=True if args.dry_run else None, - spot_client=spot_client, - ), - agent=args.agent, + result = trade_service.execute_spot_trade( + config, + side=args.trade_action, + symbol=args.symbol, + qty=args.qty, + quote=args.quote, + order_type=args.type, + price=args.price, + dry_run=True if args.dry_run else None, + spot_client=spot_client, ) + print_output(result, agent=args.agent) return 0 if args.command == "opportunity": spot_client = _load_spot_client(config) if args.opportunity_command == "portfolio": with with_spinner("Analyzing portfolio...", enabled=not args.agent): - print_output(opportunity_service.analyze_portfolio(config, spot_client=spot_client), agent=args.agent) + result = opportunity_service.analyze_portfolio(config, spot_client=spot_client) + print_output(result, agent=args.agent) return 0 if args.opportunity_command == "scan": with with_spinner("Scanning opportunities...", enabled=not args.agent): - print_output(opportunity_service.scan_opportunities(config, spot_client=spot_client, symbols=args.symbols), agent=args.agent) + result = opportunity_service.scan_opportunities( + config, spot_client=spot_client, symbols=args.symbols + ) + print_output(result, agent=args.agent) return 0 parser.error("opportunity requires `portfolio` or `scan`") if args.command == "upgrade": - print_output(self_upgrade(), agent=args.agent) + with with_spinner("Upgrading coinhunter...", enabled=not args.agent): + result = self_upgrade() + print_output(result, agent=args.agent) + return 0 + + if args.command == "catlog": + with with_spinner("Reading audit logs...", enabled=not args.agent): + entries = read_audit_log(limit=args.limit, offset=args.offset) + print_output( + {"entries": entries, "limit": args.limit, "offset": args.offset, "total": len(entries)}, + agent=args.agent, + ) return 0 parser.error(f"Unsupported command {args.command}") diff --git a/src/coinhunter/runtime.py b/src/coinhunter/runtime.py index 6772e9f..4c7bd8b 100644 --- a/src/coinhunter/runtime.py +++ b/src/coinhunter/runtime.py @@ -126,6 +126,24 @@ def _fmt_number(value: Any) -> str: return str(value) +def _fmt_local_ts(ts: str) -> str: + try: + dt = datetime.fromisoformat(ts.replace("Z", "+00:00")) + return dt.astimezone().strftime("%Y-%m-%d %H:%M:%S") + except Exception: + return ts + + +def _event_color(event: str) -> str: + if "failed" in event or "error" in event: + return f"{_DIM}{_RED}" + if event.startswith("trade"): + return f"{_DIM}{_GREEN}" + if event.startswith("opportunity"): + return f"{_DIM}{_YELLOW}" + return _DIM + + def _is_large_dataset(payload: Any, threshold: int = 8) -> bool: if isinstance(payload, dict): for value in payload.values(): @@ -281,7 +299,9 @@ def _render_tui(payload: Any) -> None: if "klines" in payload: rows = payload["klines"] - print(f"\n{_BOLD}{_CYAN} KLINES {_RESET} interval={payload.get('interval')} limit={payload.get('limit')} count={len(rows)}") + print( + f"\n{_BOLD}{_CYAN} KLINES {_RESET} interval={payload.get('interval')} limit={payload.get('limit')} count={len(rows)}" + ) display_rows = rows[:10] table_rows = [] for r in display_rows: @@ -325,8 +345,12 @@ def _render_tui(payload: Any) -> None: for i, r in enumerate(rows, 1): score = r.get("score", 0) action = r.get("action", "") - action_color = _GREEN if action == "add" else _YELLOW if action == "hold" else _RED if action == "exit" else _CYAN - print(f" {i}. {_BOLD}{r.get('symbol', '')}{_RESET} action={_color(action, action_color)} score={score:.4f}") + action_color = ( + _GREEN if action == "add" else _YELLOW if action == "hold" else _RED if action == "exit" else _CYAN + ) + print( + f" {i}. {_BOLD}{r.get('symbol', '')}{_RESET} action={_color(action, action_color)} score={score:.4f}" + ) for reason in r.get("reasons", []): print(f" · {reason}") metrics = r.get("metrics", {}) @@ -340,9 +364,9 @@ def _render_tui(payload: Any) -> None: stdout = payload.get("stdout", "") stderr = payload.get("stderr", "") if rc == 0: - print(f"\n{_GREEN}✓{_RESET} Update completed") + print(f"{_GREEN}✓{_RESET} Update completed") else: - print(f"\n{_RED}✗{_RESET} Update failed (exit code {rc})") + print(f"{_RED}✗{_RESET} Update failed (exit code {rc})") if stdout: for line in stdout.strip().splitlines(): print(f" {line}") @@ -352,6 +376,29 @@ def _render_tui(payload: Any) -> None: print(f" {line}") return + if "entries" in payload: + rows = payload["entries"] + print(f"\n{_BOLD}{_CYAN} AUDIT LOG {_RESET}") + if not rows: + print(" (no audit entries)") + return + for r in rows: + ts = _fmt_local_ts(r.get("timestamp", "")) + event = r.get("event", "") + detail_parts: list[str] = [] + for key in ("symbol", "side", "qty", "quote_amount", "order_type", "status", "dry_run", "error"): + val = r.get(key) + if val is not None: + detail_parts.append(f"{key}={val}") + if not detail_parts: + for key, val in r.items(): + if key not in ("timestamp", "event") and not isinstance(val, (dict, list)): + detail_parts.append(f"{key}={val}") + print(f"\n {_DIM}{ts}{_RESET} {_event_color(event)}{event}{_RESET}") + if detail_parts: + print(f" {' '.join(detail_parts)}") + return + if "created_or_updated" in payload: print(f"\n{_BOLD}{_CYAN} INITIALIZED {_RESET}") print(f" Root: {payload.get('root', '')}") @@ -485,7 +532,10 @@ def install_shell_completion(parser: argparse.ArgumentParser) -> dict[str, Any]: rc_path = _zshrc_path() fpath_line = "fpath+=(~/.zsh/completions)" if not _rc_contains(rc_path, fpath_line): - rc_path.write_text(fpath_line + "\n" + rc_path.read_text(encoding="utf-8") if rc_path.exists() else fpath_line + "\n", encoding="utf-8") + rc_path.write_text( + fpath_line + "\n" + rc_path.read_text(encoding="utf-8") if rc_path.exists() else fpath_line + "\n", + encoding="utf-8", + ) hint = "Added fpath+=(~/.zsh/completions) to ~/.zshrc; restart your terminal or run 'compinit'" else: hint = "Run 'compinit' or restart your terminal to activate completions" @@ -497,7 +547,10 @@ def install_shell_completion(parser: argparse.ArgumentParser) -> dict[str, Any]: rc_path = _bashrc_path() source_line = '[[ -r "~/.local/share/bash-completion/completions/coinhunter" ]] && . "~/.local/share/bash-completion/completions/coinhunter"' if not _rc_contains(rc_path, source_line): - rc_path.write_text(source_line + "\n" + rc_path.read_text(encoding="utf-8") if rc_path.exists() else source_line + "\n", encoding="utf-8") + rc_path.write_text( + source_line + "\n" + rc_path.read_text(encoding="utf-8") if rc_path.exists() else source_line + "\n", + encoding="utf-8", + ) hint = "Added bash completion source line to ~/.bashrc; restart your terminal" else: hint = "Restart your terminal or source ~/.bashrc to activate completions" diff --git a/src/coinhunter/services/market_service.py b/src/coinhunter/services/market_service.py index 84c0384..2485ead 100644 --- a/src/coinhunter/services/market_service.py +++ b/src/coinhunter/services/market_service.py @@ -57,7 +57,9 @@ def get_tickers(config: dict[str, Any], symbols: list[str], *, spot_client: Any) TickerView( symbol=normalize_symbol(ticker["symbol"]), last_price=float(ticker.get("lastPrice") or ticker.get("last_price") or 0.0), - price_change_pct=float(ticker.get("priceChangePercent") or ticker.get("price_change_percent") or 0.0), + price_change_pct=float( + ticker.get("priceChangePercent") or ticker.get("price_change_percent") or 0.0 + ), quote_volume=float(ticker.get("quoteVolume") or ticker.get("quote_volume") or 0.0), ) ) diff --git a/src/coinhunter/services/opportunity_service.py b/src/coinhunter/services/opportunity_service.py index 70afbc3..9511aa5 100644 --- a/src/coinhunter/services/opportunity_service.py +++ b/src/coinhunter/services/opportunity_service.py @@ -26,7 +26,9 @@ def _safe_pct(new: float, old: float) -> float: return (new - old) / old -def _score_candidate(closes: list[float], volumes: list[float], ticker: dict[str, Any], weights: dict[str, float], concentration: float) -> tuple[float, dict[str, float]]: +def _score_candidate( + closes: list[float], volumes: list[float], ticker: dict[str, Any], weights: dict[str, float], concentration: float +) -> tuple[float, dict[str, float]]: if len(closes) < 2 or not volumes: return 0.0, { "trend": 0.0, @@ -158,10 +160,7 @@ def scan_opportunities( top_n = int(opportunity_config.get("top_n", 10)) quote = str(config.get("market", {}).get("default_quote", "USDT")).upper() held_positions = get_positions(config, spot_client=spot_client)["positions"] - concentration_map = { - normalize_symbol(item["symbol"]): float(item["notional_usdt"]) - for item in held_positions - } + concentration_map = {normalize_symbol(item["symbol"]): float(item["notional_usdt"]) for item in held_positions} total_held = sum(concentration_map.values()) or 1.0 universe = get_scan_universe(config, spot_client=spot_client, symbols=symbols)[:scan_limit] diff --git a/src/coinhunter/services/trade_service.py b/src/coinhunter/services/trade_service.py index 4dd36cb..224e6ad 100644 --- a/src/coinhunter/services/trade_service.py +++ b/src/coinhunter/services/trade_service.py @@ -40,7 +40,9 @@ def _default_dry_run(config: dict[str, Any], dry_run: bool | None) -> bool: return bool(config.get("trading", {}).get("dry_run_default", False)) -def _trade_log_payload(intent: TradeIntent, payload: dict[str, Any], *, status: str, error: str | None = None) -> dict[str, Any]: +def _trade_log_payload( + intent: TradeIntent, payload: dict[str, Any], *, status: str, error: str | None = None +) -> dict[str, Any]: return { "market_type": intent.market_type, "symbol": intent.symbol, @@ -125,7 +127,9 @@ def execute_spot_trade( response_payload=response, ) ) - audit_event("trade_filled", {**_trade_log_payload(intent, payload, status="DRY_RUN"), "response_payload": response}) + audit_event( + "trade_filled", {**_trade_log_payload(intent, payload, status="DRY_RUN"), "response_payload": response} + ) return {"trade": result} try: @@ -146,5 +150,7 @@ def execute_spot_trade( response_payload=response, ) ) - audit_event("trade_filled", {**_trade_log_payload(intent, payload, status=result["status"]), "response_payload": response}) + audit_event( + "trade_filled", {**_trade_log_payload(intent, payload, status=result["status"]), "response_payload": response} + ) return {"trade": result} diff --git a/tests/test_account_market_services.py b/tests/test_account_market_services.py index 9aaa602..a774a3f 100644 --- a/tests/test_account_market_services.py +++ b/tests/test_account_market_services.py @@ -28,9 +28,30 @@ class FakeSpotClient: def ticker_24h(self, symbols=None): rows = [ - {"symbol": "BTCUSDT", "lastPrice": "60000", "priceChangePercent": "4.5", "quoteVolume": "10000000", "highPrice": "61000", "lowPrice": "58000"}, - {"symbol": "ETHUSDT", "lastPrice": "3000", "priceChangePercent": "3.0", "quoteVolume": "8000000", "highPrice": "3050", "lowPrice": "2900"}, - {"symbol": "DOGEUSDT", "lastPrice": "0.1", "priceChangePercent": "1.0", "quoteVolume": "200", "highPrice": "0.11", "lowPrice": "0.09"}, + { + "symbol": "BTCUSDT", + "lastPrice": "60000", + "priceChangePercent": "4.5", + "quoteVolume": "10000000", + "highPrice": "61000", + "lowPrice": "58000", + }, + { + "symbol": "ETHUSDT", + "lastPrice": "3000", + "priceChangePercent": "3.0", + "quoteVolume": "8000000", + "highPrice": "3050", + "lowPrice": "2900", + }, + { + "symbol": "DOGEUSDT", + "lastPrice": "0.1", + "priceChangePercent": "1.0", + "quoteVolume": "200", + "highPrice": "0.11", + "lowPrice": "0.09", + }, ] if not symbols: return rows @@ -38,7 +59,13 @@ class FakeSpotClient: return [row for row in rows if row["symbol"] in wanted] def exchange_info(self): - return {"symbols": [{"symbol": "BTCUSDT", "status": "TRADING"}, {"symbol": "ETHUSDT", "status": "TRADING"}, {"symbol": "DOGEUSDT", "status": "BREAK"}]} + return { + "symbols": [ + {"symbol": "BTCUSDT", "status": "TRADING"}, + {"symbol": "ETHUSDT", "status": "TRADING"}, + {"symbol": "DOGEUSDT", "status": "BREAK"}, + ] + } class AccountMarketServicesTestCase(unittest.TestCase): diff --git a/tests/test_cli.py b/tests/test_cli.py index e073b45..b795d04 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -19,10 +19,16 @@ class CLITestCase(unittest.TestCase): def test_init_dispatches(self): captured = {} - with patch.object(cli, "ensure_init_files", return_value={"force": True, "root": "/tmp/ch"}), patch.object( - cli, "install_shell_completion", return_value={"shell": "zsh", "installed": True, "path": "/tmp/ch/_coinhunter"} - ), patch.object( - cli, "print_output", side_effect=lambda payload, **kwargs: captured.setdefault("payload", payload) + with ( + patch.object(cli, "ensure_init_files", return_value={"force": True, "root": "/tmp/ch"}), + patch.object( + cli, + "install_shell_completion", + return_value={"shell": "zsh", "installed": True, "path": "/tmp/ch/_coinhunter"}, + ), + patch.object( + cli, "print_output", side_effect=lambda payload, **kwargs: captured.setdefault("payload", payload) + ), ): result = cli.main(["init", "--force"]) self.assertEqual(result, 0) @@ -42,9 +48,29 @@ class CLITestCase(unittest.TestCase): def test_upgrade_dispatches(self): captured = {} - with patch.object(cli, "self_upgrade", return_value={"command": "pipx upgrade coinhunter", "returncode": 0}), patch.object( - cli, "print_output", side_effect=lambda payload, **kwargs: captured.setdefault("payload", payload) + with ( + patch.object(cli, "self_upgrade", return_value={"command": "pipx upgrade coinhunter", "returncode": 0}), + patch.object( + cli, "print_output", side_effect=lambda payload, **kwargs: captured.setdefault("payload", payload) + ), ): result = cli.main(["upgrade"]) self.assertEqual(result, 0) self.assertEqual(captured["payload"]["returncode"], 0) + + def test_catlog_dispatches(self): + captured = {} + with ( + patch.object( + cli, "read_audit_log", return_value=[{"timestamp": "2026-04-17T12:00:00Z", "event": "test_event"}] + ), + patch.object( + cli, "print_output", side_effect=lambda payload, **kwargs: captured.setdefault("payload", payload) + ), + ): + result = cli.main(["catlog", "-n", "5", "-o", "10"]) + self.assertEqual(result, 0) + self.assertEqual(captured["payload"]["limit"], 5) + self.assertEqual(captured["payload"]["offset"], 10) + self.assertIn("entries", captured["payload"]) + self.assertEqual(captured["payload"]["total"], 1) diff --git a/tests/test_config_runtime.py b/tests/test_config_runtime.py index 37bfd9f..86fe778 100644 --- a/tests/test_config_runtime.py +++ b/tests/test_config_runtime.py @@ -14,7 +14,10 @@ from coinhunter.runtime import get_runtime_paths class ConfigRuntimeTestCase(unittest.TestCase): def test_init_files_created_in_coinhunter_home(self): - with tempfile.TemporaryDirectory() as tmp_dir, patch.dict(os.environ, {"COINHUNTER_HOME": str(Path(tmp_dir) / "home")}, clear=False): + with ( + tempfile.TemporaryDirectory() as tmp_dir, + patch.dict(os.environ, {"COINHUNTER_HOME": str(Path(tmp_dir) / "home")}, clear=False), + ): paths = get_runtime_paths() payload = ensure_init_files(paths) self.assertTrue(paths.config_file.exists()) @@ -23,10 +26,13 @@ class ConfigRuntimeTestCase(unittest.TestCase): self.assertEqual(payload["root"], str(paths.root)) def test_load_config_and_env(self): - with tempfile.TemporaryDirectory() as tmp_dir, patch.dict( - os.environ, - {"COINHUNTER_HOME": str(Path(tmp_dir) / "home")}, - clear=False, + with ( + tempfile.TemporaryDirectory() as tmp_dir, + patch.dict( + os.environ, + {"COINHUNTER_HOME": str(Path(tmp_dir) / "home")}, + clear=False, + ), ): paths = get_runtime_paths() ensure_init_files(paths) @@ -40,10 +46,13 @@ class ConfigRuntimeTestCase(unittest.TestCase): self.assertEqual(os.environ["BINANCE_API_SECRET"], "def") def test_env_file_overrides_existing_environment(self): - with tempfile.TemporaryDirectory() as tmp_dir, patch.dict( - os.environ, - {"COINHUNTER_HOME": str(Path(tmp_dir) / "home"), "BINANCE_API_KEY": "old_key"}, - clear=False, + with ( + tempfile.TemporaryDirectory() as tmp_dir, + patch.dict( + os.environ, + {"COINHUNTER_HOME": str(Path(tmp_dir) / "home"), "BINANCE_API_KEY": "old_key"}, + clear=False, + ), ): paths = get_runtime_paths() ensure_init_files(paths) @@ -55,10 +64,13 @@ class ConfigRuntimeTestCase(unittest.TestCase): self.assertEqual(os.environ["BINANCE_API_SECRET"], "new_secret") def test_missing_credentials_raise(self): - with tempfile.TemporaryDirectory() as tmp_dir, patch.dict( - os.environ, - {"COINHUNTER_HOME": str(Path(tmp_dir) / "home")}, - clear=False, + with ( + tempfile.TemporaryDirectory() as tmp_dir, + patch.dict( + os.environ, + {"COINHUNTER_HOME": str(Path(tmp_dir) / "home")}, + clear=False, + ), ): os.environ.pop("BINANCE_API_KEY", None) os.environ.pop("BINANCE_API_SECRET", None) @@ -68,10 +80,13 @@ class ConfigRuntimeTestCase(unittest.TestCase): get_binance_credentials(paths) def test_permission_error_is_explained(self): - with tempfile.TemporaryDirectory() as tmp_dir, patch.dict( - os.environ, - {"COINHUNTER_HOME": str(Path(tmp_dir) / "home")}, - clear=False, + with ( + tempfile.TemporaryDirectory() as tmp_dir, + patch.dict( + os.environ, + {"COINHUNTER_HOME": str(Path(tmp_dir) / "home")}, + clear=False, + ), ): paths = get_runtime_paths() with patch("coinhunter.config.ensure_runtime_dirs", side_effect=PermissionError("no write access")): diff --git a/tests/test_opportunity_service.py b/tests/test_opportunity_service.py index 938f6e9..355e4b2 100644 --- a/tests/test_opportunity_service.py +++ b/tests/test_opportunity_service.py @@ -29,17 +29,52 @@ class FakeSpotClient: def ticker_24h(self, symbols=None): rows = { - "BTCUSDT": {"symbol": "BTCUSDT", "lastPrice": "60000", "priceChangePercent": "5", "quoteVolume": "9000000", "highPrice": "60200", "lowPrice": "55000"}, - "ETHUSDT": {"symbol": "ETHUSDT", "lastPrice": "3000", "priceChangePercent": "3", "quoteVolume": "8000000", "highPrice": "3100", "lowPrice": "2800"}, - "SOLUSDT": {"symbol": "SOLUSDT", "lastPrice": "150", "priceChangePercent": "8", "quoteVolume": "10000000", "highPrice": "152", "lowPrice": "130"}, - "DOGEUSDT": {"symbol": "DOGEUSDT", "lastPrice": "0.1", "priceChangePercent": "1", "quoteVolume": "100", "highPrice": "0.11", "lowPrice": "0.09"}, + "BTCUSDT": { + "symbol": "BTCUSDT", + "lastPrice": "60000", + "priceChangePercent": "5", + "quoteVolume": "9000000", + "highPrice": "60200", + "lowPrice": "55000", + }, + "ETHUSDT": { + "symbol": "ETHUSDT", + "lastPrice": "3000", + "priceChangePercent": "3", + "quoteVolume": "8000000", + "highPrice": "3100", + "lowPrice": "2800", + }, + "SOLUSDT": { + "symbol": "SOLUSDT", + "lastPrice": "150", + "priceChangePercent": "8", + "quoteVolume": "10000000", + "highPrice": "152", + "lowPrice": "130", + }, + "DOGEUSDT": { + "symbol": "DOGEUSDT", + "lastPrice": "0.1", + "priceChangePercent": "1", + "quoteVolume": "100", + "highPrice": "0.11", + "lowPrice": "0.09", + }, } if not symbols: return list(rows.values()) return [rows[symbol] for symbol in symbols] def exchange_info(self): - return {"symbols": [{"symbol": "BTCUSDT", "status": "TRADING"}, {"symbol": "ETHUSDT", "status": "TRADING"}, {"symbol": "SOLUSDT", "status": "TRADING"}, {"symbol": "DOGEUSDT", "status": "TRADING"}]} + return { + "symbols": [ + {"symbol": "BTCUSDT", "status": "TRADING"}, + {"symbol": "ETHUSDT", "status": "TRADING"}, + {"symbol": "SOLUSDT", "status": "TRADING"}, + {"symbol": "DOGEUSDT", "status": "TRADING"}, + ] + } def klines(self, symbol, interval, limit): curves = { @@ -50,7 +85,18 @@ class FakeSpotClient: }[symbol] rows = [] for index, close in enumerate(curves[-limit:]): - rows.append([index, close * 0.98, close * 1.01, close * 0.97, close, 100 + index * 10, index + 1, close * (100 + index * 10)]) + rows.append( + [ + index, + close * 0.98, + close * 1.01, + close * 0.97, + close, + 100 + index * 10, + index + 1, + close * (100 + index * 10), + ] + ) return rows @@ -85,7 +131,9 @@ class OpportunityServiceTestCase(unittest.TestCase): def test_scan_is_deterministic(self): with patch.object(opportunity_service, "audit_event", return_value=None): - payload = opportunity_service.scan_opportunities(self.config | {"opportunity": self.config["opportunity"] | {"top_n": 2}}, spot_client=FakeSpotClient()) + payload = opportunity_service.scan_opportunities( + self.config | {"opportunity": self.config["opportunity"] | {"top_n": 2}}, spot_client=FakeSpotClient() + ) self.assertEqual([item["symbol"] for item in payload["recommendations"]], ["SOLUSDT", "BTCUSDT"]) def test_score_candidate_handles_empty_klines(self): diff --git a/tests/test_trade_service.py b/tests/test_trade_service.py index f971e91..dc2c53b 100644 --- a/tests/test_trade_service.py +++ b/tests/test_trade_service.py @@ -20,7 +20,9 @@ class FakeSpotClient: class TradeServiceTestCase(unittest.TestCase): def test_spot_market_buy_dry_run_does_not_call_client(self): events = [] - with patch.object(trade_service, "audit_event", side_effect=lambda event, payload: events.append((event, payload))): + with patch.object( + trade_service, "audit_event", side_effect=lambda event, payload: events.append((event, payload)) + ): client = FakeSpotClient() payload = trade_service.execute_spot_trade( {"trading": {"dry_run_default": False}},