Compare commits

...

6 Commits

Author SHA1 Message Date
3855477155 refactor: flatten account command to a single balances view
Remove overview/balances/positions subcommands in favor of one
`account` command that returns all balances with an `is_dust` flag.
Add descriptions to every parser and expose -a/--agent and --doc
on all leaf commands for better help discoverability.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 18:19:19 +08:00
d629c25232 fix: resolve merge conflicts and lint issues
- Merge origin/main changes (flattened buy/sell commands, --doc flag, aliases)
- Fix spinner placement for buy/sell commands
- Fix duplicate alias key 'p' in canonical subcommands
- Remove unused mypy ignore comments in spot_client.py
- Fix nested with statements in tests

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 16:59:53 +08:00
4602583760 Merge remote-tracking branch 'origin/main' into main 2026-04-17 16:57:40 +08:00
ca0625b199 chore: bump version to 2.1.1
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 16:51:55 +08:00
a0e01ca56f chore: bump version to 2.1.0
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 16:44:58 +08:00
f528575aa8 feat: add catlog command, agent flag reorder, and TUI polish
- Add `coinhunter catlog` with limit/offset pagination for audit logs
- Optimize audit log reading with deque to avoid loading all history
- Allow `-a/--agent` flag after subcommands
- Fix upgrade spinner artifact and empty line issues
- Render audit log TUI as timeline with low-saturation event colors
- Convert audit timestamps to local timezone in TUI
- Remove futures-related capabilities
- Add conda environment.yml for development
- Bump version to 2.0.9 and update README

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 16:42:47 +08:00
16 changed files with 509 additions and 290 deletions

View File

@@ -85,11 +85,9 @@ coin market klines --doc
```bash ```bash
# Account (aliases: a, acc) # Account (aliases: a, acc)
coinhunter account overview coinhunter account
coinhunter account overview --agent coinhunter account --agent
coin a ov coin a
coin acc bal
coin a pos
# Market (aliases: m) # Market (aliases: m)
coinhunter market tickers BTCUSDT ETH/USDT sol-usdt coinhunter market tickers BTCUSDT ETH/USDT sol-usdt
@@ -110,6 +108,11 @@ coinhunter opportunity scan --symbols BTCUSDT ETHUSDT SOLUSDT
coin opp pf coin opp pf
coin o scan -s BTCUSDT ETHUSDT coin o scan -s BTCUSDT ETHUSDT
# Audit log
coinhunter catlog
coinhunter catlog -n 20
coinhunter catlog -n 10 -o 10
# Self-upgrade # Self-upgrade
coinhunter upgrade coinhunter upgrade
coin upgrade coin upgrade
@@ -150,6 +153,8 @@ Events include:
- `opportunity_portfolio_generated` - `opportunity_portfolio_generated`
- `opportunity_scan_generated` - `opportunity_scan_generated`
Use `coinhunter catlog` to read recent entries in the terminal. It aggregates across all days and supports pagination with `-n/--limit` and `-o/--offset`.
## Development ## Development
Clone the repo and install in editable mode: Clone the repo and install in editable mode:
@@ -160,6 +165,13 @@ cd coinhunter-cli
pip install -e ".[dev]" pip install -e ".[dev]"
``` ```
Or use the provided Conda environment:
```bash
conda env create -f environment.yml
conda activate coinhunter
```
Run quality checks: Run quality checks:
```bash ```bash

9
environment.yml Normal file
View File

@@ -0,0 +1,9 @@
name: coinhunter
channels:
- defaults
- conda-forge
dependencies:
- python>=3.10
- pip
- pip:
- -e ".[dev]"

View File

@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project] [project]
name = "coinhunter" name = "coinhunter"
version = "2.1.0" version = "2.1.1"
description = "Binance-first trading CLI for balances, market data, opportunity scanning, and execution." description = "Binance-first trading CLI for balances, market data, opportunity scanning, and execution."
readme = "README.md" readme = "README.md"
license = {text = "MIT"} license = {text = "MIT"}
@@ -14,9 +14,6 @@ dependencies = [
"shtab>=1.7.0", "shtab>=1.7.0",
"tomli>=2.0.1; python_version < '3.11'", "tomli>=2.0.1; python_version < '3.11'",
] ]
authors = [
{name = "Tacit Lab", email = "ouyangcarlos@gmail.com"}
]
[project.optional-dependencies] [project.optional-dependencies]
dev = [ dev = [
@@ -37,13 +34,10 @@ where = ["src"]
[tool.pytest.ini_options] [tool.pytest.ini_options]
testpaths = ["tests"] testpaths = ["tests"]
addopts = "-v"
[tool.ruff]
target-version = "py310"
line-length = 120
[tool.ruff.lint] [tool.ruff.lint]
select = ["E", "F", "I", "UP", "W"] select = ["E", "F", "I", "W", "UP", "B", "C4", "SIM"]
ignore = ["E501"] ignore = ["E501"]
[tool.ruff.lint.pydocstyle] [tool.ruff.lint.pydocstyle]
@@ -52,7 +46,5 @@ convention = "google"
[tool.mypy] [tool.mypy]
python_version = "3.10" python_version = "3.10"
warn_return_any = true warn_return_any = true
warn_unused_configs = true warn_unused_ignores = true
disallow_untyped_defs = true
ignore_missing_imports = true ignore_missing_imports = true
exclude = [".venv", "build"]

View File

@@ -3,6 +3,7 @@
from __future__ import annotations from __future__ import annotations
import json import json
from collections import deque
from datetime import datetime, timezone from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
@@ -37,3 +38,33 @@ def audit_event(event: str, payload: dict[str, Any], paths: RuntimePaths | None
with _audit_path(paths).open("a", encoding="utf-8") as handle: with _audit_path(paths).open("a", encoding="utf-8") as handle:
handle.write(json.dumps(entry, ensure_ascii=False, default=json_default) + "\n") handle.write(json.dumps(entry, ensure_ascii=False, default=json_default) + "\n")
return entry return entry
def read_audit_log(paths: RuntimePaths | None = None, limit: int = 10, offset: int = 0) -> list[dict[str, Any]]:
paths = ensure_runtime_dirs(paths or get_runtime_paths())
logs_dir = _resolve_audit_dir(paths)
if not logs_dir.exists():
return []
audit_files = sorted(logs_dir.glob("audit_*.jsonl"), reverse=True)
needed = offset + limit
chunks: list[list[dict[str, Any]]] = []
total = 0
for audit_file in audit_files:
remaining = needed - total
if remaining <= 0:
break
entries: list[dict[str, Any]] = []
with audit_file.open("r", encoding="utf-8") as handle:
entries = list(deque((json.loads(line) for line in handle if line.strip()), maxlen=remaining))
if entries:
chunks.append(entries)
total += len(entries)
if not chunks:
return []
all_entries: list[dict[str, Any]] = []
for chunk in reversed(chunks):
all_entries.extend(chunk)
start = -(offset + limit) if (offset + limit) <= len(all_entries) else -len(all_entries)
if offset == 0:
return all_entries[start:]
return all_entries[start:-offset]

View File

@@ -5,7 +5,10 @@ from __future__ import annotations
from collections.abc import Callable from collections.abc import Callable
from typing import Any from typing import Any
from requests.exceptions import RequestException, SSLError from requests.exceptions import (
RequestException,
SSLError,
)
class SpotBinanceClient: class SpotBinanceClient:
@@ -56,7 +59,7 @@ class SpotBinanceClient:
response = self._call("24h ticker", self._client.ticker_24hr, symbol=symbols[0]) response = self._call("24h ticker", self._client.ticker_24hr, symbol=symbols[0])
else: else:
response = self._call("24h ticker", self._client.ticker_24hr, symbols=symbols) response = self._call("24h ticker", self._client.ticker_24hr, symbols=symbols)
return response if isinstance(response, list) else [response] # type: ignore[no-any-return] return response if isinstance(response, list) else [response]
def ticker_price(self, symbols: list[str] | None = None) -> list[dict[str, Any]]: def ticker_price(self, symbols: list[str] | None = None) -> list[dict[str, Any]]:
if not symbols: if not symbols:
@@ -65,7 +68,7 @@ class SpotBinanceClient:
response = self._call("ticker price", self._client.ticker_price, symbol=symbols[0]) response = self._call("ticker price", self._client.ticker_price, symbol=symbols[0])
else: else:
response = self._call("ticker price", self._client.ticker_price, symbols=symbols) response = self._call("ticker price", self._client.ticker_price, symbols=symbols)
return response if isinstance(response, list) else [response] # type: ignore[no-any-return] return response if isinstance(response, list) else [response]
def klines(self, symbol: str, interval: str, limit: int) -> list[list[Any]]: def klines(self, symbol: str, interval: str, limit: int) -> list[list[Any]]:
return self._call("klines", self._client.klines, symbol=symbol, interval=interval, limit=limit) # type: ignore[no-any-return] return self._call("klines", self._client.klines, symbol=symbol, interval=interval, limit=limit) # type: ignore[no-any-return]

View File

@@ -7,15 +7,27 @@ import sys
from typing import Any from typing import Any
from . import __version__ from . import __version__
from .audit import read_audit_log
from .binance.spot_client import SpotBinanceClient from .binance.spot_client import SpotBinanceClient
from .config import ensure_init_files, get_binance_credentials, load_config from .config import ensure_init_files, get_binance_credentials, load_config
from .runtime import get_runtime_paths, install_shell_completion, print_output, self_upgrade, with_spinner from .runtime import (
from .services import account_service, market_service, opportunity_service, trade_service get_runtime_paths,
install_shell_completion,
print_output,
self_upgrade,
with_spinner,
)
from .services import (
account_service,
market_service,
opportunity_service,
trade_service,
)
EPILOG = """\ EPILOG = """\
examples: examples:
coin init coin init
coin acc ov coin account
coin m tk BTCUSDT ETHUSDT coin m tk BTCUSDT ETHUSDT
coin m k BTCUSDT -i 1h -l 50 coin m k BTCUSDT -i 1h -l 50
coin buy BTCUSDT -Q 100 -d coin buy BTCUSDT -Q 100 -d
@@ -37,38 +49,20 @@ Fields:
files_created list of generated files files_created list of generated files
completion shell completion installation status completion shell completion installation status
""", """,
"account/overview": """\ "account": """\
Output: JSON Output: JSON
{ {
"total_btc": 1.234, "balances": [
"total_usdt": 50000.0, {"asset": "BTC", "free": 0.5, "locked": 0.1, "total": 0.6, "notional_usdt": 30000.0, "is_dust": false}
"assets": [{"asset": "BTC", "free": 0.5, "locked": 0.1}] ]
} }
Fields: Fields:
total_btc total equity denominated in BTC asset asset symbol
total_usdt total equity denominated in USDT free available balance
assets list of non-zero balances locked frozen/locked balance
""", total free + locked
"account/balances": """\ notional_usdt estimated value in USDT
Output: JSON array is_dust true if value is below dust threshold
[
{"asset": "BTC", "free": 0.5, "locked": 0.1, "total": 0.6}
]
Fields:
asset asset symbol
free available balance
locked frozen/locked balance
total free + locked
""",
"account/positions": """\
Output: JSON array
[
{"symbol": "BTCUSDT", "positionAmt": 0.01, "entryPrice": 90000.0}
]
Fields:
symbol trading pair
positionAmt quantity held (positive long, negative short)
entryPrice average entry price
""", """,
"market/tickers": """\ "market/tickers": """\
Output: JSON object keyed by normalized symbol Output: JSON object keyed by normalized symbol
@@ -202,7 +196,7 @@ def _load_spot_client(config: dict[str, Any], *, client: Any | None = None) -> S
def build_parser() -> argparse.ArgumentParser: def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
prog="coinhunter", prog="coinhunter",
description="CoinHunter V2 Binance-first trading CLI", description="Binance-first trading CLI for account balances, market data, trade execution, and opportunity scanning.",
epilog=EPILOG, epilog=EPILOG,
formatter_class=argparse.RawDescriptionHelpFormatter, formatter_class=argparse.RawDescriptionHelpFormatter,
) )
@@ -211,58 +205,105 @@ def build_parser() -> argparse.ArgumentParser:
parser.add_argument("--doc", action="store_true", help="Show output schema and field descriptions for the command") parser.add_argument("--doc", action="store_true", help="Show output schema and field descriptions for the command")
subparsers = parser.add_subparsers(dest="command") subparsers = parser.add_subparsers(dest="command")
init_parser = subparsers.add_parser("init", help="Generate config.toml, .env, and log directory") def _add_global_flags(p: argparse.ArgumentParser) -> None:
p.add_argument("-a", "--agent", action="store_true", help="Output in agent-friendly format (JSON or compact)")
p.add_argument("--doc", action="store_true", help="Show output schema and field descriptions for the command")
init_parser = subparsers.add_parser(
"init", help="Generate config.toml, .env, and log directory",
description="Initialize the CoinHunter runtime directory with config.toml, .env, and shell completions.",
)
init_parser.add_argument("-f", "--force", action="store_true", help="Overwrite existing files") init_parser.add_argument("-f", "--force", action="store_true", help="Overwrite existing files")
_add_global_flags(init_parser)
account_parser = subparsers.add_parser("account", aliases=["acc", "a"], help="Account overview, balances, and positions") account_parser = subparsers.add_parser(
account_subparsers = account_parser.add_subparsers(dest="account_command") "account", aliases=["acc", "a"], help="List asset balances and notional values",
account_commands_help = { description="List all non-zero spot balances with free/locked totals, notional USDT value, and dust flag.",
"overview": "Total equity and summary", )
"balances": "List asset balances", _add_global_flags(account_parser)
"positions": "List open positions",
}
account_aliases = {
"overview": ["ov"],
"balances": ["bal", "b"],
"positions": ["pos", "p"],
}
for name in ("overview", "balances", "positions"):
account_subparsers.add_parser(name, aliases=account_aliases[name], help=account_commands_help[name])
market_parser = subparsers.add_parser("market", aliases=["m"], help="Batch market queries") market_parser = subparsers.add_parser(
"market", aliases=["m"], help="Batch market queries",
description="Query market data: 24h tickers and OHLCV klines for one or more trading pairs.",
)
market_subparsers = market_parser.add_subparsers(dest="market_command") market_subparsers = market_parser.add_subparsers(dest="market_command")
tickers_parser = market_subparsers.add_parser("tickers", aliases=["tk", "t"], help="Fetch 24h ticker data") tickers_parser = market_subparsers.add_parser(
"tickers", aliases=["tk", "t"], help="Fetch 24h ticker data",
description="Fetch 24h ticker statistics (last price, change %, volume) for one or more symbols.",
)
tickers_parser.add_argument("symbols", nargs="+", metavar="SYM", help="Symbols to query (e.g. BTCUSDT ETH/USDT)") tickers_parser.add_argument("symbols", nargs="+", metavar="SYM", help="Symbols to query (e.g. BTCUSDT ETH/USDT)")
klines_parser = market_subparsers.add_parser("klines", aliases=["k"], help="Fetch OHLCV klines") _add_global_flags(tickers_parser)
klines_parser = market_subparsers.add_parser(
"klines", aliases=["k"], help="Fetch OHLCV klines",
description="Fetch OHLCV candlestick data for one or more symbols.",
)
klines_parser.add_argument("symbols", nargs="+", metavar="SYM", help="Symbols to query") klines_parser.add_argument("symbols", nargs="+", metavar="SYM", help="Symbols to query")
klines_parser.add_argument("-i", "--interval", default="1h", help="Kline interval (default: 1h)") klines_parser.add_argument("-i", "--interval", default="1h", help="Kline interval (default: 1h)")
klines_parser.add_argument("-l", "--limit", type=int, default=100, help="Number of candles (default: 100)") klines_parser.add_argument("-l", "--limit", type=int, default=100, help="Number of candles (default: 100)")
_add_global_flags(klines_parser)
buy_parser = subparsers.add_parser("buy", aliases=["b"], help="Buy base asset") buy_parser = subparsers.add_parser(
"buy", aliases=["b"], help="Buy base asset",
description="Place a spot BUY order. Market buys require --quote. Limit buys require --qty and --price.",
)
buy_parser.add_argument("symbol", metavar="SYM", help="Trading pair (e.g. BTCUSDT)") buy_parser.add_argument("symbol", metavar="SYM", help="Trading pair (e.g. BTCUSDT)")
buy_parser.add_argument("-q", "--qty", type=float, help="Base asset quantity (limit orders)") buy_parser.add_argument("-q", "--qty", type=float, help="Base asset quantity (limit orders)")
buy_parser.add_argument("-Q", "--quote", type=float, help="Quote asset amount (market buy only)") buy_parser.add_argument("-Q", "--quote", type=float, help="Quote asset amount (market buy only)")
buy_parser.add_argument("-t", "--type", choices=["market", "limit"], default="market", help="Order type (default: market)") buy_parser.add_argument("-t", "--type", choices=["market", "limit"], default="market", help="Order type (default: market)")
buy_parser.add_argument("-p", "--price", type=float, help="Limit price") buy_parser.add_argument("-p", "--price", type=float, help="Limit price")
buy_parser.add_argument("-d", "--dry-run", action="store_true", help="Simulate without sending") buy_parser.add_argument("-d", "--dry-run", action="store_true", help="Simulate without sending")
_add_global_flags(buy_parser)
sell_parser = subparsers.add_parser("sell", aliases=["s"], help="Sell base asset") sell_parser = subparsers.add_parser(
"sell", aliases=["s"], help="Sell base asset",
description="Place a spot SELL order. Requires --qty. Limit sells also require --price.",
)
sell_parser.add_argument("symbol", metavar="SYM", help="Trading pair (e.g. BTCUSDT)") sell_parser.add_argument("symbol", metavar="SYM", help="Trading pair (e.g. BTCUSDT)")
sell_parser.add_argument("-q", "--qty", type=float, help="Base asset quantity") sell_parser.add_argument("-q", "--qty", type=float, help="Base asset quantity")
sell_parser.add_argument("-t", "--type", choices=["market", "limit"], default="market", help="Order type (default: market)") sell_parser.add_argument("-t", "--type", choices=["market", "limit"], default="market", help="Order type (default: market)")
sell_parser.add_argument("-p", "--price", type=float, help="Limit price") sell_parser.add_argument("-p", "--price", type=float, help="Limit price")
sell_parser.add_argument("-d", "--dry-run", action="store_true", help="Simulate without sending") sell_parser.add_argument("-d", "--dry-run", action="store_true", help="Simulate without sending")
_add_global_flags(sell_parser)
opportunity_parser = subparsers.add_parser("opportunity", aliases=["opp", "o"], help="Portfolio analysis and market scanning") opportunity_parser = subparsers.add_parser(
"opportunity", aliases=["opp", "o"], help="Portfolio analysis and market scanning",
description="Analyze your portfolio and scan the market for trading opportunities.",
)
opportunity_subparsers = opportunity_parser.add_subparsers(dest="opportunity_command") opportunity_subparsers = opportunity_parser.add_subparsers(dest="opportunity_command")
opportunity_subparsers.add_parser("portfolio", aliases=["pf", "p"], help="Score current holdings") portfolio_parser = opportunity_subparsers.add_parser(
scan_parser = opportunity_subparsers.add_parser("scan", help="Scan market for opportunities") "portfolio", aliases=["pf", "p"], help="Score current holdings",
description="Score your current spot holdings and generate add/hold/trim/exit recommendations.",
)
_add_global_flags(portfolio_parser)
scan_parser = opportunity_subparsers.add_parser(
"scan", help="Scan market for opportunities",
description="Scan the market for trading opportunities and return the top-N candidates with signals.",
)
scan_parser.add_argument("-s", "--symbols", nargs="*", metavar="SYM", help="Restrict scan to specific symbols") scan_parser.add_argument("-s", "--symbols", nargs="*", metavar="SYM", help="Restrict scan to specific symbols")
_add_global_flags(scan_parser)
subparsers.add_parser("upgrade", help="Upgrade coinhunter to the latest version") upgrade_parser = subparsers.add_parser(
"upgrade", help="Upgrade coinhunter to the latest version",
description="Upgrade the coinhunter package using pipx (preferred) or pip.",
)
_add_global_flags(upgrade_parser)
completion_parser = subparsers.add_parser("completion", help="Generate shell completion script") catlog_parser = subparsers.add_parser(
"catlog", help="Read recent audit log entries",
description="Read recent audit log entries across all days with optional limit and offset pagination.",
)
catlog_parser.add_argument("-n", "--limit", type=int, default=10, help="Number of entries (default: 10)")
catlog_parser.add_argument(
"-o", "--offset", type=int, default=0, help="Skip the most recent N entries (default: 0)"
)
_add_global_flags(catlog_parser)
completion_parser = subparsers.add_parser(
"completion", help="Generate shell completion script",
description="Generate a shell completion script for bash or zsh.",
)
completion_parser.add_argument("shell", choices=["bash", "zsh"], help="Target shell") completion_parser.add_argument("shell", choices=["bash", "zsh"], help="Target shell")
_add_global_flags(completion_parser)
return parser return parser
@@ -278,19 +319,13 @@ _CANONICAL_COMMANDS = {
} }
_CANONICAL_SUBCOMMANDS = { _CANONICAL_SUBCOMMANDS = {
"ov": "overview",
"bal": "balances",
"b": "balances",
"pos": "positions",
"p": "positions",
"tk": "tickers", "tk": "tickers",
"t": "tickers", "t": "tickers",
"k": "klines", "k": "klines",
"pf": "portfolio", "pf": "portfolio",
"p": "portfolio",
} }
_COMMANDS_WITH_SUBCOMMANDS = {"account", "market", "opportunity"} _COMMANDS_WITH_SUBCOMMANDS = {"market", "opportunity"}
def _get_doc_key(argv: list[str]) -> str | None: def _get_doc_key(argv: list[str]) -> str | None:
@@ -374,102 +409,94 @@ def main(argv: list[str] | None = None) -> int:
if args.command == "account": if args.command == "account":
spot_client = _load_spot_client(config) spot_client = _load_spot_client(config)
if args.account_command == "overview": with with_spinner("Fetching balances...", enabled=not args.agent):
with with_spinner("Fetching account overview...", enabled=not args.agent): result = account_service.get_balances(config, spot_client=spot_client)
print_output( print_output(result, agent=args.agent)
account_service.get_overview(config, spot_client=spot_client), return 0
agent=args.agent,
)
return 0
if args.account_command == "balances":
with with_spinner("Fetching balances...", enabled=not args.agent):
print_output(
account_service.get_balances(config, spot_client=spot_client),
agent=args.agent,
)
return 0
if args.account_command == "positions":
with with_spinner("Fetching positions...", enabled=not args.agent):
print_output(
account_service.get_positions(config, spot_client=spot_client),
agent=args.agent,
)
return 0
parser.error("account requires one of: overview, balances, positions")
if args.command == "market": if args.command == "market":
spot_client = _load_spot_client(config) spot_client = _load_spot_client(config)
if args.market_command == "tickers": if args.market_command == "tickers":
with with_spinner("Fetching tickers...", enabled=not args.agent): with with_spinner("Fetching tickers...", enabled=not args.agent):
print_output(market_service.get_tickers(config, args.symbols, spot_client=spot_client), agent=args.agent) result = market_service.get_tickers(config, args.symbols, spot_client=spot_client)
print_output(result, agent=args.agent)
return 0 return 0
if args.market_command == "klines": if args.market_command == "klines":
with with_spinner("Fetching klines...", enabled=not args.agent): with with_spinner("Fetching klines...", enabled=not args.agent):
print_output( result = market_service.get_klines(
market_service.get_klines( config,
config, args.symbols,
args.symbols, interval=args.interval,
interval=args.interval, limit=args.limit,
limit=args.limit, spot_client=spot_client,
spot_client=spot_client,
),
agent=args.agent,
) )
print_output(result, agent=args.agent)
return 0 return 0
parser.error("market requires one of: tickers, klines") parser.error("market requires one of: tickers, klines")
if args.command == "buy": if args.command == "buy":
spot_client = _load_spot_client(config) spot_client = _load_spot_client(config)
with with_spinner("Placing order...", enabled=not args.agent): with with_spinner("Placing order...", enabled=not args.agent):
print_output( result = trade_service.execute_spot_trade(
trade_service.execute_spot_trade( config,
config, side="buy",
side="buy", symbol=args.symbol,
symbol=args.symbol, qty=args.qty,
qty=args.qty, quote=args.quote,
quote=args.quote, order_type=args.type,
order_type=args.type, price=args.price,
price=args.price, dry_run=True if args.dry_run else None,
dry_run=True if args.dry_run else None, spot_client=spot_client,
spot_client=spot_client,
),
agent=args.agent,
) )
print_output(result, agent=args.agent)
return 0 return 0
if args.command == "sell": if args.command == "sell":
spot_client = _load_spot_client(config) spot_client = _load_spot_client(config)
with with_spinner("Placing order...", enabled=not args.agent): with with_spinner("Placing order...", enabled=not args.agent):
print_output( result = trade_service.execute_spot_trade(
trade_service.execute_spot_trade( config,
config, side="sell",
side="sell", symbol=args.symbol,
symbol=args.symbol, qty=args.qty,
qty=args.qty, quote=None,
quote=None, order_type=args.type,
order_type=args.type, price=args.price,
price=args.price, dry_run=True if args.dry_run else None,
dry_run=True if args.dry_run else None, spot_client=spot_client,
spot_client=spot_client,
),
agent=args.agent,
) )
print_output(result, agent=args.agent)
return 0 return 0
if args.command == "opportunity": if args.command == "opportunity":
spot_client = _load_spot_client(config) spot_client = _load_spot_client(config)
if args.opportunity_command == "portfolio": if args.opportunity_command == "portfolio":
with with_spinner("Analyzing portfolio...", enabled=not args.agent): with with_spinner("Analyzing portfolio...", enabled=not args.agent):
print_output(opportunity_service.analyze_portfolio(config, spot_client=spot_client), agent=args.agent) result = opportunity_service.analyze_portfolio(config, spot_client=spot_client)
print_output(result, agent=args.agent)
return 0 return 0
if args.opportunity_command == "scan": if args.opportunity_command == "scan":
with with_spinner("Scanning opportunities...", enabled=not args.agent): with with_spinner("Scanning opportunities...", enabled=not args.agent):
print_output(opportunity_service.scan_opportunities(config, spot_client=spot_client, symbols=args.symbols), agent=args.agent) result = opportunity_service.scan_opportunities(
config, spot_client=spot_client, symbols=args.symbols
)
print_output(result, agent=args.agent)
return 0 return 0
parser.error("opportunity requires `portfolio` or `scan`") parser.error("opportunity requires `portfolio` or `scan`")
if args.command == "upgrade": if args.command == "upgrade":
print_output(self_upgrade(), agent=args.agent) with with_spinner("Upgrading coinhunter...", enabled=not args.agent):
result = self_upgrade()
print_output(result, agent=args.agent)
return 0
if args.command == "catlog":
with with_spinner("Reading audit logs...", enabled=not args.agent):
entries = read_audit_log(limit=args.limit, offset=args.offset)
print_output(
{"entries": entries, "limit": args.limit, "offset": args.offset, "total": len(entries)},
agent=args.agent,
)
return 0 return 0
parser.error(f"Unsupported command {args.command}") parser.error(f"Unsupported command {args.command}")

View File

@@ -126,6 +126,24 @@ def _fmt_number(value: Any) -> str:
return str(value) return str(value)
def _fmt_local_ts(ts: str) -> str:
try:
dt = datetime.fromisoformat(ts.replace("Z", "+00:00"))
return dt.astimezone().strftime("%Y-%m-%d %H:%M:%S")
except Exception:
return ts
def _event_color(event: str) -> str:
if "failed" in event or "error" in event:
return f"{_DIM}{_RED}"
if event.startswith("trade"):
return f"{_DIM}{_GREEN}"
if event.startswith("opportunity"):
return f"{_DIM}{_YELLOW}"
return _DIM
def _is_large_dataset(payload: Any, threshold: int = 8) -> bool: def _is_large_dataset(payload: Any, threshold: int = 8) -> bool:
if isinstance(payload, dict): if isinstance(payload, dict):
for value in payload.values(): for value in payload.values():
@@ -195,39 +213,27 @@ def _render_tui(payload: Any) -> None:
print(str(payload)) print(str(payload))
return return
if "overview" in payload:
overview = payload.get("overview", {})
print(f"\n{_BOLD}{_CYAN} ACCOUNT OVERVIEW {_RESET}")
print(f" Total Equity: {_GREEN}{_fmt_number(overview.get('total_equity_usdt', 0))} USDT{_RESET}")
print(f" Spot Assets: {_fmt_number(overview.get('spot_asset_count', 0))}")
print(f" Positions: {_fmt_number(overview.get('spot_position_count', 0))}")
if payload.get("balances"):
print()
_render_tui({"balances": payload["balances"]})
if payload.get("positions"):
print()
_render_tui({"positions": payload["positions"]})
return
if "balances" in payload: if "balances" in payload:
rows = payload["balances"] rows = payload["balances"]
table_rows: list[list[str]] = [] table_rows: list[list[str]] = []
for r in rows: for r in rows:
is_dust = r.get("is_dust", False)
dust_label = f"{_DIM}dust{_RESET}" if is_dust else ""
table_rows.append( table_rows.append(
[ [
r.get("market_type", ""),
r.get("asset", ""), r.get("asset", ""),
_fmt_number(r.get("free", 0)), _fmt_number(r.get("free", 0)),
_fmt_number(r.get("locked", 0)), _fmt_number(r.get("locked", 0)),
_fmt_number(r.get("total", 0)), _fmt_number(r.get("total", 0)),
_fmt_number(r.get("notional_usdt", 0)), _fmt_number(r.get("notional_usdt", 0)),
dust_label,
] ]
) )
_print_box_table( _print_box_table(
"BALANCES", "BALANCES",
["Market", "Asset", "Free", "Locked", "Total", "Notional (USDT)"], ["Asset", "Free", "Locked", "Total", "Notional (USDT)", ""],
table_rows, table_rows,
aligns=["left", "left", "right", "right", "right", "right"], aligns=["left", "right", "right", "right", "right", "left"],
) )
return return
@@ -281,7 +287,9 @@ def _render_tui(payload: Any) -> None:
if "klines" in payload: if "klines" in payload:
rows = payload["klines"] rows = payload["klines"]
print(f"\n{_BOLD}{_CYAN} KLINES {_RESET} interval={payload.get('interval')} limit={payload.get('limit')} count={len(rows)}") print(
f"\n{_BOLD}{_CYAN} KLINES {_RESET} interval={payload.get('interval')} limit={payload.get('limit')} count={len(rows)}"
)
display_rows = rows[:10] display_rows = rows[:10]
table_rows = [] table_rows = []
for r in display_rows: for r in display_rows:
@@ -325,8 +333,12 @@ def _render_tui(payload: Any) -> None:
for i, r in enumerate(rows, 1): for i, r in enumerate(rows, 1):
score = r.get("score", 0) score = r.get("score", 0)
action = r.get("action", "") action = r.get("action", "")
action_color = _GREEN if action == "add" else _YELLOW if action == "hold" else _RED if action == "exit" else _CYAN action_color = (
print(f" {i}. {_BOLD}{r.get('symbol', '')}{_RESET} action={_color(action, action_color)} score={score:.4f}") _GREEN if action == "add" else _YELLOW if action == "hold" else _RED if action == "exit" else _CYAN
)
print(
f" {i}. {_BOLD}{r.get('symbol', '')}{_RESET} action={_color(action, action_color)} score={score:.4f}"
)
for reason in r.get("reasons", []): for reason in r.get("reasons", []):
print(f" · {reason}") print(f" · {reason}")
metrics = r.get("metrics", {}) metrics = r.get("metrics", {})
@@ -340,9 +352,9 @@ def _render_tui(payload: Any) -> None:
stdout = payload.get("stdout", "") stdout = payload.get("stdout", "")
stderr = payload.get("stderr", "") stderr = payload.get("stderr", "")
if rc == 0: if rc == 0:
print(f"\n{_GREEN}{_RESET} Update completed") print(f"{_GREEN}{_RESET} Update completed")
else: else:
print(f"\n{_RED}{_RESET} Update failed (exit code {rc})") print(f"{_RED}{_RESET} Update failed (exit code {rc})")
if stdout: if stdout:
for line in stdout.strip().splitlines(): for line in stdout.strip().splitlines():
print(f" {line}") print(f" {line}")
@@ -352,6 +364,29 @@ def _render_tui(payload: Any) -> None:
print(f" {line}") print(f" {line}")
return return
if "entries" in payload:
rows = payload["entries"]
print(f"\n{_BOLD}{_CYAN} AUDIT LOG {_RESET}")
if not rows:
print(" (no audit entries)")
return
for r in rows:
ts = _fmt_local_ts(r.get("timestamp", ""))
event = r.get("event", "")
detail_parts: list[str] = []
for key in ("symbol", "side", "qty", "quote_amount", "order_type", "status", "dry_run", "error"):
val = r.get(key)
if val is not None:
detail_parts.append(f"{key}={val}")
if not detail_parts:
for key, val in r.items():
if key not in ("timestamp", "event") and not isinstance(val, (dict, list)):
detail_parts.append(f"{key}={val}")
print(f"\n {_DIM}{ts}{_RESET} {_event_color(event)}{event}{_RESET}")
if detail_parts:
print(f" {' '.join(detail_parts)}")
return
if "created_or_updated" in payload: if "created_or_updated" in payload:
print(f"\n{_BOLD}{_CYAN} INITIALIZED {_RESET}") print(f"\n{_BOLD}{_CYAN} INITIALIZED {_RESET}")
print(f" Root: {payload.get('root', '')}") print(f" Root: {payload.get('root', '')}")
@@ -485,7 +520,10 @@ def install_shell_completion(parser: argparse.ArgumentParser) -> dict[str, Any]:
rc_path = _zshrc_path() rc_path = _zshrc_path()
fpath_line = "fpath+=(~/.zsh/completions)" fpath_line = "fpath+=(~/.zsh/completions)"
if not _rc_contains(rc_path, fpath_line): if not _rc_contains(rc_path, fpath_line):
rc_path.write_text(fpath_line + "\n" + rc_path.read_text(encoding="utf-8") if rc_path.exists() else fpath_line + "\n", encoding="utf-8") rc_path.write_text(
fpath_line + "\n" + rc_path.read_text(encoding="utf-8") if rc_path.exists() else fpath_line + "\n",
encoding="utf-8",
)
hint = "Added fpath+=(~/.zsh/completions) to ~/.zshrc; restart your terminal or run 'compinit'" hint = "Added fpath+=(~/.zsh/completions) to ~/.zshrc; restart your terminal or run 'compinit'"
else: else:
hint = "Run 'compinit' or restart your terminal to activate completions" hint = "Run 'compinit' or restart your terminal to activate completions"
@@ -497,7 +535,10 @@ def install_shell_completion(parser: argparse.ArgumentParser) -> dict[str, Any]:
rc_path = _bashrc_path() rc_path = _bashrc_path()
source_line = '[[ -r "~/.local/share/bash-completion/completions/coinhunter" ]] && . "~/.local/share/bash-completion/completions/coinhunter"' source_line = '[[ -r "~/.local/share/bash-completion/completions/coinhunter" ]] && . "~/.local/share/bash-completion/completions/coinhunter"'
if not _rc_contains(rc_path, source_line): if not _rc_contains(rc_path, source_line):
rc_path.write_text(source_line + "\n" + rc_path.read_text(encoding="utf-8") if rc_path.exists() else source_line + "\n", encoding="utf-8") rc_path.write_text(
source_line + "\n" + rc_path.read_text(encoding="utf-8") if rc_path.exists() else source_line + "\n",
encoding="utf-8",
)
hint = "Added bash completion source line to ~/.bashrc; restart your terminal" hint = "Added bash completion source line to ~/.bashrc; restart your terminal"
else: else:
hint = "Restart your terminal or source ~/.bashrc to activate completions" hint = "Restart your terminal or source ~/.bashrc to activate completions"

View File

@@ -13,6 +13,7 @@ class AssetBalance:
locked: float locked: float
total: float total: float
notional_usdt: float notional_usdt: float
is_dust: bool
@dataclass @dataclass
@@ -59,6 +60,7 @@ def get_balances(
spot_client: Any, spot_client: Any,
) -> dict[str, Any]: ) -> dict[str, Any]:
quote = str(config.get("market", {}).get("default_quote", "USDT")).upper() quote = str(config.get("market", {}).get("default_quote", "USDT")).upper()
dust = float(config.get("trading", {}).get("dust_usdt_threshold", 0.0))
rows: list[dict[str, Any]] = [] rows: list[dict[str, Any]] = []
balances, _, price_map = _spot_account_data(spot_client, quote) balances, _, price_map = _spot_account_data(spot_client, quote)
for item in balances: for item in balances:
@@ -68,6 +70,7 @@ def get_balances(
if total <= 0: if total <= 0:
continue continue
asset = item["asset"] asset = item["asset"]
notional = total * price_map.get(asset, 0.0)
rows.append( rows.append(
asdict( asdict(
AssetBalance( AssetBalance(
@@ -75,7 +78,8 @@ def get_balances(
free=free, free=free,
locked=locked, locked=locked,
total=total, total=total,
notional_usdt=total * price_map.get(asset, 0.0), notional_usdt=notional,
is_dust=notional < dust,
) )
) )
) )
@@ -113,60 +117,3 @@ def get_positions(
) )
) )
return {"positions": rows} return {"positions": rows}
def get_overview(
config: dict[str, Any],
*,
spot_client: Any,
) -> dict[str, Any]:
quote = str(config.get("market", {}).get("default_quote", "USDT")).upper()
dust = float(config.get("trading", {}).get("dust_usdt_threshold", 0.0))
balances: list[dict[str, Any]] = []
positions: list[dict[str, Any]] = []
spot_balances, _, price_map = _spot_account_data(spot_client, quote)
for item in spot_balances:
free = float(item.get("free", 0.0))
locked = float(item.get("locked", 0.0))
total = free + locked
if total <= 0:
continue
asset = item["asset"]
balances.append(
asdict(
AssetBalance(
asset=asset,
free=free,
locked=locked,
total=total,
notional_usdt=total * price_map.get(asset, 0.0),
)
)
)
mark_price = price_map.get(asset, 1.0 if asset == quote else 0.0)
notional = total * mark_price
if notional >= dust:
positions.append(
asdict(
PositionView(
symbol=quote if asset == quote else f"{asset}{quote}",
quantity=total,
entry_price=None,
mark_price=mark_price,
notional_usdt=notional,
side="LONG",
)
)
)
spot_equity = sum(item["notional_usdt"] for item in balances)
overview = asdict(
AccountOverview(
total_equity_usdt=spot_equity,
spot_equity_usdt=spot_equity,
spot_asset_count=len(balances),
spot_position_count=len(positions),
)
)
return {"overview": overview, "balances": balances, "positions": positions}

View File

@@ -57,7 +57,9 @@ def get_tickers(config: dict[str, Any], symbols: list[str], *, spot_client: Any)
TickerView( TickerView(
symbol=normalize_symbol(ticker["symbol"]), symbol=normalize_symbol(ticker["symbol"]),
last_price=float(ticker.get("lastPrice") or ticker.get("last_price") or 0.0), last_price=float(ticker.get("lastPrice") or ticker.get("last_price") or 0.0),
price_change_pct=float(ticker.get("priceChangePercent") or ticker.get("price_change_percent") or 0.0), price_change_pct=float(
ticker.get("priceChangePercent") or ticker.get("price_change_percent") or 0.0
),
quote_volume=float(ticker.get("quoteVolume") or ticker.get("quote_volume") or 0.0), quote_volume=float(ticker.get("quoteVolume") or ticker.get("quote_volume") or 0.0),
) )
) )

View File

@@ -26,7 +26,9 @@ def _safe_pct(new: float, old: float) -> float:
return (new - old) / old return (new - old) / old
def _score_candidate(closes: list[float], volumes: list[float], ticker: dict[str, Any], weights: dict[str, float], concentration: float) -> tuple[float, dict[str, float]]: def _score_candidate(
closes: list[float], volumes: list[float], ticker: dict[str, Any], weights: dict[str, float], concentration: float
) -> tuple[float, dict[str, float]]:
if len(closes) < 2 or not volumes: if len(closes) < 2 or not volumes:
return 0.0, { return 0.0, {
"trend": 0.0, "trend": 0.0,
@@ -158,10 +160,7 @@ def scan_opportunities(
top_n = int(opportunity_config.get("top_n", 10)) top_n = int(opportunity_config.get("top_n", 10))
quote = str(config.get("market", {}).get("default_quote", "USDT")).upper() quote = str(config.get("market", {}).get("default_quote", "USDT")).upper()
held_positions = get_positions(config, spot_client=spot_client)["positions"] held_positions = get_positions(config, spot_client=spot_client)["positions"]
concentration_map = { concentration_map = {normalize_symbol(item["symbol"]): float(item["notional_usdt"]) for item in held_positions}
normalize_symbol(item["symbol"]): float(item["notional_usdt"])
for item in held_positions
}
total_held = sum(concentration_map.values()) or 1.0 total_held = sum(concentration_map.values()) or 1.0
universe = get_scan_universe(config, spot_client=spot_client, symbols=symbols)[:scan_limit] universe = get_scan_universe(config, spot_client=spot_client, symbols=symbols)[:scan_limit]

View File

@@ -40,7 +40,9 @@ def _default_dry_run(config: dict[str, Any], dry_run: bool | None) -> bool:
return bool(config.get("trading", {}).get("dry_run_default", False)) return bool(config.get("trading", {}).get("dry_run_default", False))
def _trade_log_payload(intent: TradeIntent, payload: dict[str, Any], *, status: str, error: str | None = None) -> dict[str, Any]: def _trade_log_payload(
intent: TradeIntent, payload: dict[str, Any], *, status: str, error: str | None = None
) -> dict[str, Any]:
return { return {
"market_type": intent.market_type, "market_type": intent.market_type,
"symbol": intent.symbol, "symbol": intent.symbol,
@@ -125,7 +127,9 @@ def execute_spot_trade(
response_payload=response, response_payload=response,
) )
) )
audit_event("trade_filled", {**_trade_log_payload(intent, payload, status="DRY_RUN"), "response_payload": response}) audit_event(
"trade_filled", {**_trade_log_payload(intent, payload, status="DRY_RUN"), "response_payload": response}
)
return {"trade": result} return {"trade": result}
try: try:
@@ -146,5 +150,7 @@ def execute_spot_trade(
response_payload=response, response_payload=response,
) )
) )
audit_event("trade_filled", {**_trade_log_payload(intent, payload, status=result["status"]), "response_payload": response}) audit_event(
"trade_filled", {**_trade_log_payload(intent, payload, status=result["status"]), "response_payload": response}
)
return {"trade": result} return {"trade": result}

View File

@@ -28,9 +28,30 @@ class FakeSpotClient:
def ticker_24h(self, symbols=None): def ticker_24h(self, symbols=None):
rows = [ rows = [
{"symbol": "BTCUSDT", "lastPrice": "60000", "priceChangePercent": "4.5", "quoteVolume": "10000000", "highPrice": "61000", "lowPrice": "58000"}, {
{"symbol": "ETHUSDT", "lastPrice": "3000", "priceChangePercent": "3.0", "quoteVolume": "8000000", "highPrice": "3050", "lowPrice": "2900"}, "symbol": "BTCUSDT",
{"symbol": "DOGEUSDT", "lastPrice": "0.1", "priceChangePercent": "1.0", "quoteVolume": "200", "highPrice": "0.11", "lowPrice": "0.09"}, "lastPrice": "60000",
"priceChangePercent": "4.5",
"quoteVolume": "10000000",
"highPrice": "61000",
"lowPrice": "58000",
},
{
"symbol": "ETHUSDT",
"lastPrice": "3000",
"priceChangePercent": "3.0",
"quoteVolume": "8000000",
"highPrice": "3050",
"lowPrice": "2900",
},
{
"symbol": "DOGEUSDT",
"lastPrice": "0.1",
"priceChangePercent": "1.0",
"quoteVolume": "200",
"highPrice": "0.11",
"lowPrice": "0.09",
},
] ]
if not symbols: if not symbols:
return rows return rows
@@ -38,23 +59,29 @@ class FakeSpotClient:
return [row for row in rows if row["symbol"] in wanted] return [row for row in rows if row["symbol"] in wanted]
def exchange_info(self): def exchange_info(self):
return {"symbols": [{"symbol": "BTCUSDT", "status": "TRADING"}, {"symbol": "ETHUSDT", "status": "TRADING"}, {"symbol": "DOGEUSDT", "status": "BREAK"}]} return {
"symbols": [
{"symbol": "BTCUSDT", "status": "TRADING"},
{"symbol": "ETHUSDT", "status": "TRADING"},
{"symbol": "DOGEUSDT", "status": "BREAK"},
]
}
class AccountMarketServicesTestCase(unittest.TestCase): class AccountMarketServicesTestCase(unittest.TestCase):
def test_account_overview_and_dust_filter(self): def test_get_balances_with_dust_flag(self):
config = { config = {
"market": {"default_quote": "USDT"}, "market": {"default_quote": "USDT"},
"trading": {"dust_usdt_threshold": 10.0}, "trading": {"dust_usdt_threshold": 10.0},
} }
payload = account_service.get_overview( payload = account_service.get_balances(
config, config,
spot_client=FakeSpotClient(), spot_client=FakeSpotClient(),
) )
self.assertEqual(payload["overview"]["total_equity_usdt"], 720.1) balances = {item["asset"]: item for item in payload["balances"]}
symbols = {item["symbol"] for item in payload["positions"]} self.assertFalse(balances["USDT"]["is_dust"])
self.assertNotIn("DOGEUSDT", symbols) self.assertFalse(balances["BTC"]["is_dust"])
self.assertIn("BTCUSDT", symbols) self.assertTrue(balances["DOGE"]["is_dust"])
def test_market_tickers_and_scan_universe(self): def test_market_tickers_and_scan_universe(self):
config = { config = {

View File

@@ -22,10 +22,16 @@ class CLITestCase(unittest.TestCase):
def test_init_dispatches(self): def test_init_dispatches(self):
captured = {} captured = {}
with patch.object(cli, "ensure_init_files", return_value={"force": True, "root": "/tmp/ch"}), patch.object( with (
cli, "install_shell_completion", return_value={"shell": "zsh", "installed": True, "path": "/tmp/ch/_coinhunter"} patch.object(cli, "ensure_init_files", return_value={"force": True, "root": "/tmp/ch"}),
), patch.object( patch.object(
cli, "print_output", side_effect=lambda payload, **kwargs: captured.setdefault("payload", payload) cli,
"install_shell_completion",
return_value={"shell": "zsh", "installed": True, "path": "/tmp/ch/_coinhunter"},
),
patch.object(
cli, "print_output", side_effect=lambda payload, **kwargs: captured.setdefault("payload", payload)
),
): ):
result = cli.main(["init", "--force"]) result = cli.main(["init", "--force"])
self.assertEqual(result, 0) self.assertEqual(result, 0)
@@ -85,11 +91,50 @@ class CLITestCase(unittest.TestCase):
self.assertIn("lastPrice", output) self.assertIn("lastPrice", output)
self.assertIn("BTCUSDT", output) self.assertIn("BTCUSDT", output)
def test_account_dispatches(self):
captured = {}
with (
patch.object(
cli, "load_config", return_value={"binance": {"spot_base_url": "https://test", "recv_window": 5000}, "market": {"default_quote": "USDT"}, "trading": {"dust_usdt_threshold": 10.0}}
),
patch.object(cli, "get_binance_credentials", return_value={"api_key": "k", "api_secret": "s"}),
patch.object(cli, "SpotBinanceClient"),
patch.object(
cli.account_service, "get_balances", return_value={"balances": [{"asset": "BTC", "is_dust": False}]}
),
patch.object(
cli, "print_output", side_effect=lambda payload, **kwargs: captured.setdefault("payload", payload)
),
):
result = cli.main(["account"])
self.assertEqual(result, 0)
self.assertEqual(captured["payload"]["balances"][0]["asset"], "BTC")
def test_upgrade_dispatches(self): def test_upgrade_dispatches(self):
captured = {} captured = {}
with patch.object(cli, "self_upgrade", return_value={"command": "pipx upgrade coinhunter", "returncode": 0}), patch.object( with (
cli, "print_output", side_effect=lambda payload, **kwargs: captured.setdefault("payload", payload) patch.object(cli, "self_upgrade", return_value={"command": "pipx upgrade coinhunter", "returncode": 0}),
patch.object(
cli, "print_output", side_effect=lambda payload, **kwargs: captured.setdefault("payload", payload)
),
): ):
result = cli.main(["upgrade"]) result = cli.main(["upgrade"])
self.assertEqual(result, 0) self.assertEqual(result, 0)
self.assertEqual(captured["payload"]["returncode"], 0) self.assertEqual(captured["payload"]["returncode"], 0)
def test_catlog_dispatches(self):
captured = {}
with (
patch.object(
cli, "read_audit_log", return_value=[{"timestamp": "2026-04-17T12:00:00Z", "event": "test_event"}]
),
patch.object(
cli, "print_output", side_effect=lambda payload, **kwargs: captured.setdefault("payload", payload)
),
):
result = cli.main(["catlog", "-n", "5", "-o", "10"])
self.assertEqual(result, 0)
self.assertEqual(captured["payload"]["limit"], 5)
self.assertEqual(captured["payload"]["offset"], 10)
self.assertIn("entries", captured["payload"])
self.assertEqual(captured["payload"]["total"], 1)

View File

@@ -8,13 +8,21 @@ import unittest
from pathlib import Path from pathlib import Path
from unittest.mock import patch from unittest.mock import patch
from coinhunter.config import ensure_init_files, get_binance_credentials, load_config, load_env_file from coinhunter.config import (
ensure_init_files,
get_binance_credentials,
load_config,
load_env_file,
)
from coinhunter.runtime import get_runtime_paths from coinhunter.runtime import get_runtime_paths
class ConfigRuntimeTestCase(unittest.TestCase): class ConfigRuntimeTestCase(unittest.TestCase):
def test_init_files_created_in_coinhunter_home(self): def test_init_files_created_in_coinhunter_home(self):
with tempfile.TemporaryDirectory() as tmp_dir, patch.dict(os.environ, {"COINHUNTER_HOME": str(Path(tmp_dir) / "home")}, clear=False): with (
tempfile.TemporaryDirectory() as tmp_dir,
patch.dict(os.environ, {"COINHUNTER_HOME": str(Path(tmp_dir) / "home")}, clear=False),
):
paths = get_runtime_paths() paths = get_runtime_paths()
payload = ensure_init_files(paths) payload = ensure_init_files(paths)
self.assertTrue(paths.config_file.exists()) self.assertTrue(paths.config_file.exists())
@@ -23,10 +31,13 @@ class ConfigRuntimeTestCase(unittest.TestCase):
self.assertEqual(payload["root"], str(paths.root)) self.assertEqual(payload["root"], str(paths.root))
def test_load_config_and_env(self): def test_load_config_and_env(self):
with tempfile.TemporaryDirectory() as tmp_dir, patch.dict( with (
os.environ, tempfile.TemporaryDirectory() as tmp_dir,
{"COINHUNTER_HOME": str(Path(tmp_dir) / "home")}, patch.dict(
clear=False, os.environ,
{"COINHUNTER_HOME": str(Path(tmp_dir) / "home")},
clear=False,
),
): ):
paths = get_runtime_paths() paths = get_runtime_paths()
ensure_init_files(paths) ensure_init_files(paths)
@@ -40,10 +51,13 @@ class ConfigRuntimeTestCase(unittest.TestCase):
self.assertEqual(os.environ["BINANCE_API_SECRET"], "def") self.assertEqual(os.environ["BINANCE_API_SECRET"], "def")
def test_env_file_overrides_existing_environment(self): def test_env_file_overrides_existing_environment(self):
with tempfile.TemporaryDirectory() as tmp_dir, patch.dict( with (
os.environ, tempfile.TemporaryDirectory() as tmp_dir,
{"COINHUNTER_HOME": str(Path(tmp_dir) / "home"), "BINANCE_API_KEY": "old_key"}, patch.dict(
clear=False, os.environ,
{"COINHUNTER_HOME": str(Path(tmp_dir) / "home"), "BINANCE_API_KEY": "old_key"},
clear=False,
),
): ):
paths = get_runtime_paths() paths = get_runtime_paths()
ensure_init_files(paths) ensure_init_files(paths)
@@ -55,10 +69,13 @@ class ConfigRuntimeTestCase(unittest.TestCase):
self.assertEqual(os.environ["BINANCE_API_SECRET"], "new_secret") self.assertEqual(os.environ["BINANCE_API_SECRET"], "new_secret")
def test_missing_credentials_raise(self): def test_missing_credentials_raise(self):
with tempfile.TemporaryDirectory() as tmp_dir, patch.dict( with (
os.environ, tempfile.TemporaryDirectory() as tmp_dir,
{"COINHUNTER_HOME": str(Path(tmp_dir) / "home")}, patch.dict(
clear=False, os.environ,
{"COINHUNTER_HOME": str(Path(tmp_dir) / "home")},
clear=False,
),
): ):
os.environ.pop("BINANCE_API_KEY", None) os.environ.pop("BINANCE_API_KEY", None)
os.environ.pop("BINANCE_API_SECRET", None) os.environ.pop("BINANCE_API_SECRET", None)
@@ -68,12 +85,17 @@ class ConfigRuntimeTestCase(unittest.TestCase):
get_binance_credentials(paths) get_binance_credentials(paths)
def test_permission_error_is_explained(self): def test_permission_error_is_explained(self):
with tempfile.TemporaryDirectory() as tmp_dir, patch.dict( with (
os.environ, tempfile.TemporaryDirectory() as tmp_dir,
{"COINHUNTER_HOME": str(Path(tmp_dir) / "home")}, patch.dict(
clear=False, os.environ,
{"COINHUNTER_HOME": str(Path(tmp_dir) / "home")},
clear=False,
),
): ):
paths = get_runtime_paths() paths = get_runtime_paths()
with patch("coinhunter.config.ensure_runtime_dirs", side_effect=PermissionError("no write access")): with (
with self.assertRaisesRegex(RuntimeError, "Set COINHUNTER_HOME to a writable directory"): patch("coinhunter.config.ensure_runtime_dirs", side_effect=PermissionError("no write access")),
ensure_init_files(paths) self.assertRaisesRegex(RuntimeError, "Set COINHUNTER_HOME to a writable directory"),
):
ensure_init_files(paths)

View File

@@ -29,17 +29,52 @@ class FakeSpotClient:
def ticker_24h(self, symbols=None): def ticker_24h(self, symbols=None):
rows = { rows = {
"BTCUSDT": {"symbol": "BTCUSDT", "lastPrice": "60000", "priceChangePercent": "5", "quoteVolume": "9000000", "highPrice": "60200", "lowPrice": "55000"}, "BTCUSDT": {
"ETHUSDT": {"symbol": "ETHUSDT", "lastPrice": "3000", "priceChangePercent": "3", "quoteVolume": "8000000", "highPrice": "3100", "lowPrice": "2800"}, "symbol": "BTCUSDT",
"SOLUSDT": {"symbol": "SOLUSDT", "lastPrice": "150", "priceChangePercent": "8", "quoteVolume": "10000000", "highPrice": "152", "lowPrice": "130"}, "lastPrice": "60000",
"DOGEUSDT": {"symbol": "DOGEUSDT", "lastPrice": "0.1", "priceChangePercent": "1", "quoteVolume": "100", "highPrice": "0.11", "lowPrice": "0.09"}, "priceChangePercent": "5",
"quoteVolume": "9000000",
"highPrice": "60200",
"lowPrice": "55000",
},
"ETHUSDT": {
"symbol": "ETHUSDT",
"lastPrice": "3000",
"priceChangePercent": "3",
"quoteVolume": "8000000",
"highPrice": "3100",
"lowPrice": "2800",
},
"SOLUSDT": {
"symbol": "SOLUSDT",
"lastPrice": "150",
"priceChangePercent": "8",
"quoteVolume": "10000000",
"highPrice": "152",
"lowPrice": "130",
},
"DOGEUSDT": {
"symbol": "DOGEUSDT",
"lastPrice": "0.1",
"priceChangePercent": "1",
"quoteVolume": "100",
"highPrice": "0.11",
"lowPrice": "0.09",
},
} }
if not symbols: if not symbols:
return list(rows.values()) return list(rows.values())
return [rows[symbol] for symbol in symbols] return [rows[symbol] for symbol in symbols]
def exchange_info(self): def exchange_info(self):
return {"symbols": [{"symbol": "BTCUSDT", "status": "TRADING"}, {"symbol": "ETHUSDT", "status": "TRADING"}, {"symbol": "SOLUSDT", "status": "TRADING"}, {"symbol": "DOGEUSDT", "status": "TRADING"}]} return {
"symbols": [
{"symbol": "BTCUSDT", "status": "TRADING"},
{"symbol": "ETHUSDT", "status": "TRADING"},
{"symbol": "SOLUSDT", "status": "TRADING"},
{"symbol": "DOGEUSDT", "status": "TRADING"},
]
}
def klines(self, symbol, interval, limit): def klines(self, symbol, interval, limit):
curves = { curves = {
@@ -50,7 +85,18 @@ class FakeSpotClient:
}[symbol] }[symbol]
rows = [] rows = []
for index, close in enumerate(curves[-limit:]): for index, close in enumerate(curves[-limit:]):
rows.append([index, close * 0.98, close * 1.01, close * 0.97, close, 100 + index * 10, index + 1, close * (100 + index * 10)]) rows.append(
[
index,
close * 0.98,
close * 1.01,
close * 0.97,
close,
100 + index * 10,
index + 1,
close * (100 + index * 10),
]
)
return rows return rows
@@ -85,7 +131,9 @@ class OpportunityServiceTestCase(unittest.TestCase):
def test_scan_is_deterministic(self): def test_scan_is_deterministic(self):
with patch.object(opportunity_service, "audit_event", return_value=None): with patch.object(opportunity_service, "audit_event", return_value=None):
payload = opportunity_service.scan_opportunities(self.config | {"opportunity": self.config["opportunity"] | {"top_n": 2}}, spot_client=FakeSpotClient()) payload = opportunity_service.scan_opportunities(
self.config | {"opportunity": self.config["opportunity"] | {"top_n": 2}}, spot_client=FakeSpotClient()
)
self.assertEqual([item["symbol"] for item in payload["recommendations"]], ["SOLUSDT", "BTCUSDT"]) self.assertEqual([item["symbol"] for item in payload["recommendations"]], ["SOLUSDT", "BTCUSDT"])
def test_score_candidate_handles_empty_klines(self): def test_score_candidate_handles_empty_klines(self):

View File

@@ -20,7 +20,9 @@ class FakeSpotClient:
class TradeServiceTestCase(unittest.TestCase): class TradeServiceTestCase(unittest.TestCase):
def test_spot_market_buy_dry_run_does_not_call_client(self): def test_spot_market_buy_dry_run_does_not_call_client(self):
events = [] events = []
with patch.object(trade_service, "audit_event", side_effect=lambda event, payload: events.append((event, payload))): with patch.object(
trade_service, "audit_event", side_effect=lambda event, payload: events.append((event, payload))
):
client = FakeSpotClient() client = FakeSpotClient()
payload = trade_service.execute_spot_trade( payload = trade_service.execute_spot_trade(
{"trading": {"dry_run_default": False}}, {"trading": {"dry_run_default": False}},
@@ -55,9 +57,11 @@ class TradeServiceTestCase(unittest.TestCase):
self.assertEqual(client.calls[0]["timeInForce"], "GTC") self.assertEqual(client.calls[0]["timeInForce"], "GTC")
def test_spot_market_buy_requires_quote(self): def test_spot_market_buy_requires_quote(self):
with patch.object(trade_service, "audit_event", return_value=None): with (
with self.assertRaisesRegex(RuntimeError, "requires --quote"): patch.object(trade_service, "audit_event", return_value=None),
trade_service.execute_spot_trade( self.assertRaisesRegex(RuntimeError, "requires --quote"),
):
trade_service.execute_spot_trade(
{"trading": {"dry_run_default": False}}, {"trading": {"dry_run_default": False}},
side="buy", side="buy",
symbol="BTCUSDT", symbol="BTCUSDT",
@@ -70,9 +74,11 @@ class TradeServiceTestCase(unittest.TestCase):
) )
def test_spot_market_buy_rejects_qty(self): def test_spot_market_buy_rejects_qty(self):
with patch.object(trade_service, "audit_event", return_value=None): with (
with self.assertRaisesRegex(RuntimeError, "accepts --quote only"): patch.object(trade_service, "audit_event", return_value=None),
trade_service.execute_spot_trade( self.assertRaisesRegex(RuntimeError, "accepts --quote only"),
):
trade_service.execute_spot_trade(
{"trading": {"dry_run_default": False}}, {"trading": {"dry_run_default": False}},
side="buy", side="buy",
symbol="BTCUSDT", symbol="BTCUSDT",
@@ -85,9 +91,11 @@ class TradeServiceTestCase(unittest.TestCase):
) )
def test_spot_market_sell_rejects_quote(self): def test_spot_market_sell_rejects_quote(self):
with patch.object(trade_service, "audit_event", return_value=None): with (
with self.assertRaisesRegex(RuntimeError, "accepts --qty only"): patch.object(trade_service, "audit_event", return_value=None),
trade_service.execute_spot_trade( self.assertRaisesRegex(RuntimeError, "accepts --qty only"),
):
trade_service.execute_spot_trade(
{"trading": {"dry_run_default": False}}, {"trading": {"dry_run_default": False}},
side="sell", side="sell",
symbol="BTCUSDT", symbol="BTCUSDT",