Compare commits

..

6 Commits

Author SHA1 Message Date
3855477155 refactor: flatten account command to a single balances view
Remove overview/balances/positions subcommands in favor of one
`account` command that returns all balances with an `is_dust` flag.
Add descriptions to every parser and expose -a/--agent and --doc
on all leaf commands for better help discoverability.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 18:19:19 +08:00
d629c25232 fix: resolve merge conflicts and lint issues
- Merge origin/main changes (flattened buy/sell commands, --doc flag, aliases)
- Fix spinner placement for buy/sell commands
- Fix duplicate alias key 'p' in canonical subcommands
- Remove unused mypy ignore comments in spot_client.py
- Fix nested with statements in tests

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 16:59:53 +08:00
4602583760 Merge remote-tracking branch 'origin/main' into main 2026-04-17 16:57:40 +08:00
ca0625b199 chore: bump version to 2.1.1
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 16:51:55 +08:00
a0e01ca56f chore: bump version to 2.1.0
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 16:44:58 +08:00
f528575aa8 feat: add catlog command, agent flag reorder, and TUI polish
- Add `coinhunter catlog` with limit/offset pagination for audit logs
- Optimize audit log reading with deque to avoid loading all history
- Allow `-a/--agent` flag after subcommands
- Fix upgrade spinner artifact and empty line issues
- Render audit log TUI as timeline with low-saturation event colors
- Convert audit timestamps to local timezone in TUI
- Remove futures-related capabilities
- Add conda environment.yml for development
- Bump version to 2.0.9 and update README

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 16:42:47 +08:00
16 changed files with 509 additions and 290 deletions

View File

@@ -85,11 +85,9 @@ coin market klines --doc
```bash
# Account (aliases: a, acc)
coinhunter account overview
coinhunter account overview --agent
coin a ov
coin acc bal
coin a pos
coinhunter account
coinhunter account --agent
coin a
# Market (aliases: m)
coinhunter market tickers BTCUSDT ETH/USDT sol-usdt
@@ -110,6 +108,11 @@ coinhunter opportunity scan --symbols BTCUSDT ETHUSDT SOLUSDT
coin opp pf
coin o scan -s BTCUSDT ETHUSDT
# Audit log
coinhunter catlog
coinhunter catlog -n 20
coinhunter catlog -n 10 -o 10
# Self-upgrade
coinhunter upgrade
coin upgrade
@@ -150,6 +153,8 @@ Events include:
- `opportunity_portfolio_generated`
- `opportunity_scan_generated`
Use `coinhunter catlog` to read recent entries in the terminal. It aggregates across all days and supports pagination with `-n/--limit` and `-o/--offset`.
## Development
Clone the repo and install in editable mode:
@@ -160,6 +165,13 @@ cd coinhunter-cli
pip install -e ".[dev]"
```
Or use the provided Conda environment:
```bash
conda env create -f environment.yml
conda activate coinhunter
```
Run quality checks:
```bash

9
environment.yml Normal file
View File

@@ -0,0 +1,9 @@
name: coinhunter
channels:
- defaults
- conda-forge
dependencies:
- python>=3.10
- pip
- pip:
- -e ".[dev]"

View File

@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project]
name = "coinhunter"
version = "2.1.0"
version = "2.1.1"
description = "Binance-first trading CLI for balances, market data, opportunity scanning, and execution."
readme = "README.md"
license = {text = "MIT"}
@@ -14,9 +14,6 @@ dependencies = [
"shtab>=1.7.0",
"tomli>=2.0.1; python_version < '3.11'",
]
authors = [
{name = "Tacit Lab", email = "ouyangcarlos@gmail.com"}
]
[project.optional-dependencies]
dev = [
@@ -37,13 +34,10 @@ where = ["src"]
[tool.pytest.ini_options]
testpaths = ["tests"]
[tool.ruff]
target-version = "py310"
line-length = 120
addopts = "-v"
[tool.ruff.lint]
select = ["E", "F", "I", "UP", "W"]
select = ["E", "F", "I", "W", "UP", "B", "C4", "SIM"]
ignore = ["E501"]
[tool.ruff.lint.pydocstyle]
@@ -52,7 +46,5 @@ convention = "google"
[tool.mypy]
python_version = "3.10"
warn_return_any = true
warn_unused_configs = true
disallow_untyped_defs = true
warn_unused_ignores = true
ignore_missing_imports = true
exclude = [".venv", "build"]

View File

@@ -3,6 +3,7 @@
from __future__ import annotations
import json
from collections import deque
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
@@ -37,3 +38,33 @@ def audit_event(event: str, payload: dict[str, Any], paths: RuntimePaths | None
with _audit_path(paths).open("a", encoding="utf-8") as handle:
handle.write(json.dumps(entry, ensure_ascii=False, default=json_default) + "\n")
return entry
def read_audit_log(paths: RuntimePaths | None = None, limit: int = 10, offset: int = 0) -> list[dict[str, Any]]:
paths = ensure_runtime_dirs(paths or get_runtime_paths())
logs_dir = _resolve_audit_dir(paths)
if not logs_dir.exists():
return []
audit_files = sorted(logs_dir.glob("audit_*.jsonl"), reverse=True)
needed = offset + limit
chunks: list[list[dict[str, Any]]] = []
total = 0
for audit_file in audit_files:
remaining = needed - total
if remaining <= 0:
break
entries: list[dict[str, Any]] = []
with audit_file.open("r", encoding="utf-8") as handle:
entries = list(deque((json.loads(line) for line in handle if line.strip()), maxlen=remaining))
if entries:
chunks.append(entries)
total += len(entries)
if not chunks:
return []
all_entries: list[dict[str, Any]] = []
for chunk in reversed(chunks):
all_entries.extend(chunk)
start = -(offset + limit) if (offset + limit) <= len(all_entries) else -len(all_entries)
if offset == 0:
return all_entries[start:]
return all_entries[start:-offset]

View File

@@ -5,7 +5,10 @@ from __future__ import annotations
from collections.abc import Callable
from typing import Any
from requests.exceptions import RequestException, SSLError
from requests.exceptions import (
RequestException,
SSLError,
)
class SpotBinanceClient:
@@ -56,7 +59,7 @@ class SpotBinanceClient:
response = self._call("24h ticker", self._client.ticker_24hr, symbol=symbols[0])
else:
response = self._call("24h ticker", self._client.ticker_24hr, symbols=symbols)
return response if isinstance(response, list) else [response] # type: ignore[no-any-return]
return response if isinstance(response, list) else [response]
def ticker_price(self, symbols: list[str] | None = None) -> list[dict[str, Any]]:
if not symbols:
@@ -65,7 +68,7 @@ class SpotBinanceClient:
response = self._call("ticker price", self._client.ticker_price, symbol=symbols[0])
else:
response = self._call("ticker price", self._client.ticker_price, symbols=symbols)
return response if isinstance(response, list) else [response] # type: ignore[no-any-return]
return response if isinstance(response, list) else [response]
def klines(self, symbol: str, interval: str, limit: int) -> list[list[Any]]:
return self._call("klines", self._client.klines, symbol=symbol, interval=interval, limit=limit) # type: ignore[no-any-return]

View File

@@ -7,15 +7,27 @@ import sys
from typing import Any
from . import __version__
from .audit import read_audit_log
from .binance.spot_client import SpotBinanceClient
from .config import ensure_init_files, get_binance_credentials, load_config
from .runtime import get_runtime_paths, install_shell_completion, print_output, self_upgrade, with_spinner
from .services import account_service, market_service, opportunity_service, trade_service
from .runtime import (
get_runtime_paths,
install_shell_completion,
print_output,
self_upgrade,
with_spinner,
)
from .services import (
account_service,
market_service,
opportunity_service,
trade_service,
)
EPILOG = """\
examples:
coin init
coin acc ov
coin account
coin m tk BTCUSDT ETHUSDT
coin m k BTCUSDT -i 1h -l 50
coin buy BTCUSDT -Q 100 -d
@@ -37,38 +49,20 @@ Fields:
files_created list of generated files
completion shell completion installation status
""",
"account/overview": """\
"account": """\
Output: JSON
{
"total_btc": 1.234,
"total_usdt": 50000.0,
"assets": [{"asset": "BTC", "free": 0.5, "locked": 0.1}]
"balances": [
{"asset": "BTC", "free": 0.5, "locked": 0.1, "total": 0.6, "notional_usdt": 30000.0, "is_dust": false}
]
}
Fields:
total_btc total equity denominated in BTC
total_usdt total equity denominated in USDT
assets list of non-zero balances
""",
"account/balances": """\
Output: JSON array
[
{"asset": "BTC", "free": 0.5, "locked": 0.1, "total": 0.6}
]
Fields:
asset asset symbol
free available balance
locked frozen/locked balance
total free + locked
""",
"account/positions": """\
Output: JSON array
[
{"symbol": "BTCUSDT", "positionAmt": 0.01, "entryPrice": 90000.0}
]
Fields:
symbol trading pair
positionAmt quantity held (positive long, negative short)
entryPrice average entry price
notional_usdt estimated value in USDT
is_dust true if value is below dust threshold
""",
"market/tickers": """\
Output: JSON object keyed by normalized symbol
@@ -202,7 +196,7 @@ def _load_spot_client(config: dict[str, Any], *, client: Any | None = None) -> S
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog="coinhunter",
description="CoinHunter V2 Binance-first trading CLI",
description="Binance-first trading CLI for account balances, market data, trade execution, and opportunity scanning.",
epilog=EPILOG,
formatter_class=argparse.RawDescriptionHelpFormatter,
)
@@ -211,58 +205,105 @@ def build_parser() -> argparse.ArgumentParser:
parser.add_argument("--doc", action="store_true", help="Show output schema and field descriptions for the command")
subparsers = parser.add_subparsers(dest="command")
init_parser = subparsers.add_parser("init", help="Generate config.toml, .env, and log directory")
def _add_global_flags(p: argparse.ArgumentParser) -> None:
p.add_argument("-a", "--agent", action="store_true", help="Output in agent-friendly format (JSON or compact)")
p.add_argument("--doc", action="store_true", help="Show output schema and field descriptions for the command")
init_parser = subparsers.add_parser(
"init", help="Generate config.toml, .env, and log directory",
description="Initialize the CoinHunter runtime directory with config.toml, .env, and shell completions.",
)
init_parser.add_argument("-f", "--force", action="store_true", help="Overwrite existing files")
_add_global_flags(init_parser)
account_parser = subparsers.add_parser("account", aliases=["acc", "a"], help="Account overview, balances, and positions")
account_subparsers = account_parser.add_subparsers(dest="account_command")
account_commands_help = {
"overview": "Total equity and summary",
"balances": "List asset balances",
"positions": "List open positions",
}
account_aliases = {
"overview": ["ov"],
"balances": ["bal", "b"],
"positions": ["pos", "p"],
}
for name in ("overview", "balances", "positions"):
account_subparsers.add_parser(name, aliases=account_aliases[name], help=account_commands_help[name])
account_parser = subparsers.add_parser(
"account", aliases=["acc", "a"], help="List asset balances and notional values",
description="List all non-zero spot balances with free/locked totals, notional USDT value, and dust flag.",
)
_add_global_flags(account_parser)
market_parser = subparsers.add_parser("market", aliases=["m"], help="Batch market queries")
market_parser = subparsers.add_parser(
"market", aliases=["m"], help="Batch market queries",
description="Query market data: 24h tickers and OHLCV klines for one or more trading pairs.",
)
market_subparsers = market_parser.add_subparsers(dest="market_command")
tickers_parser = market_subparsers.add_parser("tickers", aliases=["tk", "t"], help="Fetch 24h ticker data")
tickers_parser = market_subparsers.add_parser(
"tickers", aliases=["tk", "t"], help="Fetch 24h ticker data",
description="Fetch 24h ticker statistics (last price, change %, volume) for one or more symbols.",
)
tickers_parser.add_argument("symbols", nargs="+", metavar="SYM", help="Symbols to query (e.g. BTCUSDT ETH/USDT)")
klines_parser = market_subparsers.add_parser("klines", aliases=["k"], help="Fetch OHLCV klines")
_add_global_flags(tickers_parser)
klines_parser = market_subparsers.add_parser(
"klines", aliases=["k"], help="Fetch OHLCV klines",
description="Fetch OHLCV candlestick data for one or more symbols.",
)
klines_parser.add_argument("symbols", nargs="+", metavar="SYM", help="Symbols to query")
klines_parser.add_argument("-i", "--interval", default="1h", help="Kline interval (default: 1h)")
klines_parser.add_argument("-l", "--limit", type=int, default=100, help="Number of candles (default: 100)")
_add_global_flags(klines_parser)
buy_parser = subparsers.add_parser("buy", aliases=["b"], help="Buy base asset")
buy_parser = subparsers.add_parser(
"buy", aliases=["b"], help="Buy base asset",
description="Place a spot BUY order. Market buys require --quote. Limit buys require --qty and --price.",
)
buy_parser.add_argument("symbol", metavar="SYM", help="Trading pair (e.g. BTCUSDT)")
buy_parser.add_argument("-q", "--qty", type=float, help="Base asset quantity (limit orders)")
buy_parser.add_argument("-Q", "--quote", type=float, help="Quote asset amount (market buy only)")
buy_parser.add_argument("-t", "--type", choices=["market", "limit"], default="market", help="Order type (default: market)")
buy_parser.add_argument("-p", "--price", type=float, help="Limit price")
buy_parser.add_argument("-d", "--dry-run", action="store_true", help="Simulate without sending")
_add_global_flags(buy_parser)
sell_parser = subparsers.add_parser("sell", aliases=["s"], help="Sell base asset")
sell_parser = subparsers.add_parser(
"sell", aliases=["s"], help="Sell base asset",
description="Place a spot SELL order. Requires --qty. Limit sells also require --price.",
)
sell_parser.add_argument("symbol", metavar="SYM", help="Trading pair (e.g. BTCUSDT)")
sell_parser.add_argument("-q", "--qty", type=float, help="Base asset quantity")
sell_parser.add_argument("-t", "--type", choices=["market", "limit"], default="market", help="Order type (default: market)")
sell_parser.add_argument("-p", "--price", type=float, help="Limit price")
sell_parser.add_argument("-d", "--dry-run", action="store_true", help="Simulate without sending")
_add_global_flags(sell_parser)
opportunity_parser = subparsers.add_parser("opportunity", aliases=["opp", "o"], help="Portfolio analysis and market scanning")
opportunity_parser = subparsers.add_parser(
"opportunity", aliases=["opp", "o"], help="Portfolio analysis and market scanning",
description="Analyze your portfolio and scan the market for trading opportunities.",
)
opportunity_subparsers = opportunity_parser.add_subparsers(dest="opportunity_command")
opportunity_subparsers.add_parser("portfolio", aliases=["pf", "p"], help="Score current holdings")
scan_parser = opportunity_subparsers.add_parser("scan", help="Scan market for opportunities")
portfolio_parser = opportunity_subparsers.add_parser(
"portfolio", aliases=["pf", "p"], help="Score current holdings",
description="Score your current spot holdings and generate add/hold/trim/exit recommendations.",
)
_add_global_flags(portfolio_parser)
scan_parser = opportunity_subparsers.add_parser(
"scan", help="Scan market for opportunities",
description="Scan the market for trading opportunities and return the top-N candidates with signals.",
)
scan_parser.add_argument("-s", "--symbols", nargs="*", metavar="SYM", help="Restrict scan to specific symbols")
_add_global_flags(scan_parser)
subparsers.add_parser("upgrade", help="Upgrade coinhunter to the latest version")
upgrade_parser = subparsers.add_parser(
"upgrade", help="Upgrade coinhunter to the latest version",
description="Upgrade the coinhunter package using pipx (preferred) or pip.",
)
_add_global_flags(upgrade_parser)
completion_parser = subparsers.add_parser("completion", help="Generate shell completion script")
catlog_parser = subparsers.add_parser(
"catlog", help="Read recent audit log entries",
description="Read recent audit log entries across all days with optional limit and offset pagination.",
)
catlog_parser.add_argument("-n", "--limit", type=int, default=10, help="Number of entries (default: 10)")
catlog_parser.add_argument(
"-o", "--offset", type=int, default=0, help="Skip the most recent N entries (default: 0)"
)
_add_global_flags(catlog_parser)
completion_parser = subparsers.add_parser(
"completion", help="Generate shell completion script",
description="Generate a shell completion script for bash or zsh.",
)
completion_parser.add_argument("shell", choices=["bash", "zsh"], help="Target shell")
_add_global_flags(completion_parser)
return parser
@@ -278,19 +319,13 @@ _CANONICAL_COMMANDS = {
}
_CANONICAL_SUBCOMMANDS = {
"ov": "overview",
"bal": "balances",
"b": "balances",
"pos": "positions",
"p": "positions",
"tk": "tickers",
"t": "tickers",
"k": "klines",
"pf": "portfolio",
"p": "portfolio",
}
_COMMANDS_WITH_SUBCOMMANDS = {"account", "market", "opportunity"}
_COMMANDS_WITH_SUBCOMMANDS = {"market", "opportunity"}
def _get_doc_key(argv: list[str]) -> str | None:
@@ -374,55 +409,35 @@ def main(argv: list[str] | None = None) -> int:
if args.command == "account":
spot_client = _load_spot_client(config)
if args.account_command == "overview":
with with_spinner("Fetching account overview...", enabled=not args.agent):
print_output(
account_service.get_overview(config, spot_client=spot_client),
agent=args.agent,
)
return 0
if args.account_command == "balances":
with with_spinner("Fetching balances...", enabled=not args.agent):
print_output(
account_service.get_balances(config, spot_client=spot_client),
agent=args.agent,
)
result = account_service.get_balances(config, spot_client=spot_client)
print_output(result, agent=args.agent)
return 0
if args.account_command == "positions":
with with_spinner("Fetching positions...", enabled=not args.agent):
print_output(
account_service.get_positions(config, spot_client=spot_client),
agent=args.agent,
)
return 0
parser.error("account requires one of: overview, balances, positions")
if args.command == "market":
spot_client = _load_spot_client(config)
if args.market_command == "tickers":
with with_spinner("Fetching tickers...", enabled=not args.agent):
print_output(market_service.get_tickers(config, args.symbols, spot_client=spot_client), agent=args.agent)
result = market_service.get_tickers(config, args.symbols, spot_client=spot_client)
print_output(result, agent=args.agent)
return 0
if args.market_command == "klines":
with with_spinner("Fetching klines...", enabled=not args.agent):
print_output(
market_service.get_klines(
result = market_service.get_klines(
config,
args.symbols,
interval=args.interval,
limit=args.limit,
spot_client=spot_client,
),
agent=args.agent,
)
print_output(result, agent=args.agent)
return 0
parser.error("market requires one of: tickers, klines")
if args.command == "buy":
spot_client = _load_spot_client(config)
with with_spinner("Placing order...", enabled=not args.agent):
print_output(
trade_service.execute_spot_trade(
result = trade_service.execute_spot_trade(
config,
side="buy",
symbol=args.symbol,
@@ -432,16 +447,14 @@ def main(argv: list[str] | None = None) -> int:
price=args.price,
dry_run=True if args.dry_run else None,
spot_client=spot_client,
),
agent=args.agent,
)
print_output(result, agent=args.agent)
return 0
if args.command == "sell":
spot_client = _load_spot_client(config)
with with_spinner("Placing order...", enabled=not args.agent):
print_output(
trade_service.execute_spot_trade(
result = trade_service.execute_spot_trade(
config,
side="sell",
symbol=args.symbol,
@@ -451,25 +464,39 @@ def main(argv: list[str] | None = None) -> int:
price=args.price,
dry_run=True if args.dry_run else None,
spot_client=spot_client,
),
agent=args.agent,
)
print_output(result, agent=args.agent)
return 0
if args.command == "opportunity":
spot_client = _load_spot_client(config)
if args.opportunity_command == "portfolio":
with with_spinner("Analyzing portfolio...", enabled=not args.agent):
print_output(opportunity_service.analyze_portfolio(config, spot_client=spot_client), agent=args.agent)
result = opportunity_service.analyze_portfolio(config, spot_client=spot_client)
print_output(result, agent=args.agent)
return 0
if args.opportunity_command == "scan":
with with_spinner("Scanning opportunities...", enabled=not args.agent):
print_output(opportunity_service.scan_opportunities(config, spot_client=spot_client, symbols=args.symbols), agent=args.agent)
result = opportunity_service.scan_opportunities(
config, spot_client=spot_client, symbols=args.symbols
)
print_output(result, agent=args.agent)
return 0
parser.error("opportunity requires `portfolio` or `scan`")
if args.command == "upgrade":
print_output(self_upgrade(), agent=args.agent)
with with_spinner("Upgrading coinhunter...", enabled=not args.agent):
result = self_upgrade()
print_output(result, agent=args.agent)
return 0
if args.command == "catlog":
with with_spinner("Reading audit logs...", enabled=not args.agent):
entries = read_audit_log(limit=args.limit, offset=args.offset)
print_output(
{"entries": entries, "limit": args.limit, "offset": args.offset, "total": len(entries)},
agent=args.agent,
)
return 0
parser.error(f"Unsupported command {args.command}")

View File

@@ -126,6 +126,24 @@ def _fmt_number(value: Any) -> str:
return str(value)
def _fmt_local_ts(ts: str) -> str:
try:
dt = datetime.fromisoformat(ts.replace("Z", "+00:00"))
return dt.astimezone().strftime("%Y-%m-%d %H:%M:%S")
except Exception:
return ts
def _event_color(event: str) -> str:
if "failed" in event or "error" in event:
return f"{_DIM}{_RED}"
if event.startswith("trade"):
return f"{_DIM}{_GREEN}"
if event.startswith("opportunity"):
return f"{_DIM}{_YELLOW}"
return _DIM
def _is_large_dataset(payload: Any, threshold: int = 8) -> bool:
if isinstance(payload, dict):
for value in payload.values():
@@ -195,39 +213,27 @@ def _render_tui(payload: Any) -> None:
print(str(payload))
return
if "overview" in payload:
overview = payload.get("overview", {})
print(f"\n{_BOLD}{_CYAN} ACCOUNT OVERVIEW {_RESET}")
print(f" Total Equity: {_GREEN}{_fmt_number(overview.get('total_equity_usdt', 0))} USDT{_RESET}")
print(f" Spot Assets: {_fmt_number(overview.get('spot_asset_count', 0))}")
print(f" Positions: {_fmt_number(overview.get('spot_position_count', 0))}")
if payload.get("balances"):
print()
_render_tui({"balances": payload["balances"]})
if payload.get("positions"):
print()
_render_tui({"positions": payload["positions"]})
return
if "balances" in payload:
rows = payload["balances"]
table_rows: list[list[str]] = []
for r in rows:
is_dust = r.get("is_dust", False)
dust_label = f"{_DIM}dust{_RESET}" if is_dust else ""
table_rows.append(
[
r.get("market_type", ""),
r.get("asset", ""),
_fmt_number(r.get("free", 0)),
_fmt_number(r.get("locked", 0)),
_fmt_number(r.get("total", 0)),
_fmt_number(r.get("notional_usdt", 0)),
dust_label,
]
)
_print_box_table(
"BALANCES",
["Market", "Asset", "Free", "Locked", "Total", "Notional (USDT)"],
["Asset", "Free", "Locked", "Total", "Notional (USDT)", ""],
table_rows,
aligns=["left", "left", "right", "right", "right", "right"],
aligns=["left", "right", "right", "right", "right", "left"],
)
return
@@ -281,7 +287,9 @@ def _render_tui(payload: Any) -> None:
if "klines" in payload:
rows = payload["klines"]
print(f"\n{_BOLD}{_CYAN} KLINES {_RESET} interval={payload.get('interval')} limit={payload.get('limit')} count={len(rows)}")
print(
f"\n{_BOLD}{_CYAN} KLINES {_RESET} interval={payload.get('interval')} limit={payload.get('limit')} count={len(rows)}"
)
display_rows = rows[:10]
table_rows = []
for r in display_rows:
@@ -325,8 +333,12 @@ def _render_tui(payload: Any) -> None:
for i, r in enumerate(rows, 1):
score = r.get("score", 0)
action = r.get("action", "")
action_color = _GREEN if action == "add" else _YELLOW if action == "hold" else _RED if action == "exit" else _CYAN
print(f" {i}. {_BOLD}{r.get('symbol', '')}{_RESET} action={_color(action, action_color)} score={score:.4f}")
action_color = (
_GREEN if action == "add" else _YELLOW if action == "hold" else _RED if action == "exit" else _CYAN
)
print(
f" {i}. {_BOLD}{r.get('symbol', '')}{_RESET} action={_color(action, action_color)} score={score:.4f}"
)
for reason in r.get("reasons", []):
print(f" · {reason}")
metrics = r.get("metrics", {})
@@ -340,9 +352,9 @@ def _render_tui(payload: Any) -> None:
stdout = payload.get("stdout", "")
stderr = payload.get("stderr", "")
if rc == 0:
print(f"\n{_GREEN}{_RESET} Update completed")
print(f"{_GREEN}{_RESET} Update completed")
else:
print(f"\n{_RED}{_RESET} Update failed (exit code {rc})")
print(f"{_RED}{_RESET} Update failed (exit code {rc})")
if stdout:
for line in stdout.strip().splitlines():
print(f" {line}")
@@ -352,6 +364,29 @@ def _render_tui(payload: Any) -> None:
print(f" {line}")
return
if "entries" in payload:
rows = payload["entries"]
print(f"\n{_BOLD}{_CYAN} AUDIT LOG {_RESET}")
if not rows:
print(" (no audit entries)")
return
for r in rows:
ts = _fmt_local_ts(r.get("timestamp", ""))
event = r.get("event", "")
detail_parts: list[str] = []
for key in ("symbol", "side", "qty", "quote_amount", "order_type", "status", "dry_run", "error"):
val = r.get(key)
if val is not None:
detail_parts.append(f"{key}={val}")
if not detail_parts:
for key, val in r.items():
if key not in ("timestamp", "event") and not isinstance(val, (dict, list)):
detail_parts.append(f"{key}={val}")
print(f"\n {_DIM}{ts}{_RESET} {_event_color(event)}{event}{_RESET}")
if detail_parts:
print(f" {' '.join(detail_parts)}")
return
if "created_or_updated" in payload:
print(f"\n{_BOLD}{_CYAN} INITIALIZED {_RESET}")
print(f" Root: {payload.get('root', '')}")
@@ -485,7 +520,10 @@ def install_shell_completion(parser: argparse.ArgumentParser) -> dict[str, Any]:
rc_path = _zshrc_path()
fpath_line = "fpath+=(~/.zsh/completions)"
if not _rc_contains(rc_path, fpath_line):
rc_path.write_text(fpath_line + "\n" + rc_path.read_text(encoding="utf-8") if rc_path.exists() else fpath_line + "\n", encoding="utf-8")
rc_path.write_text(
fpath_line + "\n" + rc_path.read_text(encoding="utf-8") if rc_path.exists() else fpath_line + "\n",
encoding="utf-8",
)
hint = "Added fpath+=(~/.zsh/completions) to ~/.zshrc; restart your terminal or run 'compinit'"
else:
hint = "Run 'compinit' or restart your terminal to activate completions"
@@ -497,7 +535,10 @@ def install_shell_completion(parser: argparse.ArgumentParser) -> dict[str, Any]:
rc_path = _bashrc_path()
source_line = '[[ -r "~/.local/share/bash-completion/completions/coinhunter" ]] && . "~/.local/share/bash-completion/completions/coinhunter"'
if not _rc_contains(rc_path, source_line):
rc_path.write_text(source_line + "\n" + rc_path.read_text(encoding="utf-8") if rc_path.exists() else source_line + "\n", encoding="utf-8")
rc_path.write_text(
source_line + "\n" + rc_path.read_text(encoding="utf-8") if rc_path.exists() else source_line + "\n",
encoding="utf-8",
)
hint = "Added bash completion source line to ~/.bashrc; restart your terminal"
else:
hint = "Restart your terminal or source ~/.bashrc to activate completions"

View File

@@ -13,6 +13,7 @@ class AssetBalance:
locked: float
total: float
notional_usdt: float
is_dust: bool
@dataclass
@@ -59,6 +60,7 @@ def get_balances(
spot_client: Any,
) -> dict[str, Any]:
quote = str(config.get("market", {}).get("default_quote", "USDT")).upper()
dust = float(config.get("trading", {}).get("dust_usdt_threshold", 0.0))
rows: list[dict[str, Any]] = []
balances, _, price_map = _spot_account_data(spot_client, quote)
for item in balances:
@@ -68,6 +70,7 @@ def get_balances(
if total <= 0:
continue
asset = item["asset"]
notional = total * price_map.get(asset, 0.0)
rows.append(
asdict(
AssetBalance(
@@ -75,7 +78,8 @@ def get_balances(
free=free,
locked=locked,
total=total,
notional_usdt=total * price_map.get(asset, 0.0),
notional_usdt=notional,
is_dust=notional < dust,
)
)
)
@@ -113,60 +117,3 @@ def get_positions(
)
)
return {"positions": rows}
def get_overview(
config: dict[str, Any],
*,
spot_client: Any,
) -> dict[str, Any]:
quote = str(config.get("market", {}).get("default_quote", "USDT")).upper()
dust = float(config.get("trading", {}).get("dust_usdt_threshold", 0.0))
balances: list[dict[str, Any]] = []
positions: list[dict[str, Any]] = []
spot_balances, _, price_map = _spot_account_data(spot_client, quote)
for item in spot_balances:
free = float(item.get("free", 0.0))
locked = float(item.get("locked", 0.0))
total = free + locked
if total <= 0:
continue
asset = item["asset"]
balances.append(
asdict(
AssetBalance(
asset=asset,
free=free,
locked=locked,
total=total,
notional_usdt=total * price_map.get(asset, 0.0),
)
)
)
mark_price = price_map.get(asset, 1.0 if asset == quote else 0.0)
notional = total * mark_price
if notional >= dust:
positions.append(
asdict(
PositionView(
symbol=quote if asset == quote else f"{asset}{quote}",
quantity=total,
entry_price=None,
mark_price=mark_price,
notional_usdt=notional,
side="LONG",
)
)
)
spot_equity = sum(item["notional_usdt"] for item in balances)
overview = asdict(
AccountOverview(
total_equity_usdt=spot_equity,
spot_equity_usdt=spot_equity,
spot_asset_count=len(balances),
spot_position_count=len(positions),
)
)
return {"overview": overview, "balances": balances, "positions": positions}

View File

@@ -57,7 +57,9 @@ def get_tickers(config: dict[str, Any], symbols: list[str], *, spot_client: Any)
TickerView(
symbol=normalize_symbol(ticker["symbol"]),
last_price=float(ticker.get("lastPrice") or ticker.get("last_price") or 0.0),
price_change_pct=float(ticker.get("priceChangePercent") or ticker.get("price_change_percent") or 0.0),
price_change_pct=float(
ticker.get("priceChangePercent") or ticker.get("price_change_percent") or 0.0
),
quote_volume=float(ticker.get("quoteVolume") or ticker.get("quote_volume") or 0.0),
)
)

View File

@@ -26,7 +26,9 @@ def _safe_pct(new: float, old: float) -> float:
return (new - old) / old
def _score_candidate(closes: list[float], volumes: list[float], ticker: dict[str, Any], weights: dict[str, float], concentration: float) -> tuple[float, dict[str, float]]:
def _score_candidate(
closes: list[float], volumes: list[float], ticker: dict[str, Any], weights: dict[str, float], concentration: float
) -> tuple[float, dict[str, float]]:
if len(closes) < 2 or not volumes:
return 0.0, {
"trend": 0.0,
@@ -158,10 +160,7 @@ def scan_opportunities(
top_n = int(opportunity_config.get("top_n", 10))
quote = str(config.get("market", {}).get("default_quote", "USDT")).upper()
held_positions = get_positions(config, spot_client=spot_client)["positions"]
concentration_map = {
normalize_symbol(item["symbol"]): float(item["notional_usdt"])
for item in held_positions
}
concentration_map = {normalize_symbol(item["symbol"]): float(item["notional_usdt"]) for item in held_positions}
total_held = sum(concentration_map.values()) or 1.0
universe = get_scan_universe(config, spot_client=spot_client, symbols=symbols)[:scan_limit]

View File

@@ -40,7 +40,9 @@ def _default_dry_run(config: dict[str, Any], dry_run: bool | None) -> bool:
return bool(config.get("trading", {}).get("dry_run_default", False))
def _trade_log_payload(intent: TradeIntent, payload: dict[str, Any], *, status: str, error: str | None = None) -> dict[str, Any]:
def _trade_log_payload(
intent: TradeIntent, payload: dict[str, Any], *, status: str, error: str | None = None
) -> dict[str, Any]:
return {
"market_type": intent.market_type,
"symbol": intent.symbol,
@@ -125,7 +127,9 @@ def execute_spot_trade(
response_payload=response,
)
)
audit_event("trade_filled", {**_trade_log_payload(intent, payload, status="DRY_RUN"), "response_payload": response})
audit_event(
"trade_filled", {**_trade_log_payload(intent, payload, status="DRY_RUN"), "response_payload": response}
)
return {"trade": result}
try:
@@ -146,5 +150,7 @@ def execute_spot_trade(
response_payload=response,
)
)
audit_event("trade_filled", {**_trade_log_payload(intent, payload, status=result["status"]), "response_payload": response})
audit_event(
"trade_filled", {**_trade_log_payload(intent, payload, status=result["status"]), "response_payload": response}
)
return {"trade": result}

View File

@@ -28,9 +28,30 @@ class FakeSpotClient:
def ticker_24h(self, symbols=None):
rows = [
{"symbol": "BTCUSDT", "lastPrice": "60000", "priceChangePercent": "4.5", "quoteVolume": "10000000", "highPrice": "61000", "lowPrice": "58000"},
{"symbol": "ETHUSDT", "lastPrice": "3000", "priceChangePercent": "3.0", "quoteVolume": "8000000", "highPrice": "3050", "lowPrice": "2900"},
{"symbol": "DOGEUSDT", "lastPrice": "0.1", "priceChangePercent": "1.0", "quoteVolume": "200", "highPrice": "0.11", "lowPrice": "0.09"},
{
"symbol": "BTCUSDT",
"lastPrice": "60000",
"priceChangePercent": "4.5",
"quoteVolume": "10000000",
"highPrice": "61000",
"lowPrice": "58000",
},
{
"symbol": "ETHUSDT",
"lastPrice": "3000",
"priceChangePercent": "3.0",
"quoteVolume": "8000000",
"highPrice": "3050",
"lowPrice": "2900",
},
{
"symbol": "DOGEUSDT",
"lastPrice": "0.1",
"priceChangePercent": "1.0",
"quoteVolume": "200",
"highPrice": "0.11",
"lowPrice": "0.09",
},
]
if not symbols:
return rows
@@ -38,23 +59,29 @@ class FakeSpotClient:
return [row for row in rows if row["symbol"] in wanted]
def exchange_info(self):
return {"symbols": [{"symbol": "BTCUSDT", "status": "TRADING"}, {"symbol": "ETHUSDT", "status": "TRADING"}, {"symbol": "DOGEUSDT", "status": "BREAK"}]}
return {
"symbols": [
{"symbol": "BTCUSDT", "status": "TRADING"},
{"symbol": "ETHUSDT", "status": "TRADING"},
{"symbol": "DOGEUSDT", "status": "BREAK"},
]
}
class AccountMarketServicesTestCase(unittest.TestCase):
def test_account_overview_and_dust_filter(self):
def test_get_balances_with_dust_flag(self):
config = {
"market": {"default_quote": "USDT"},
"trading": {"dust_usdt_threshold": 10.0},
}
payload = account_service.get_overview(
payload = account_service.get_balances(
config,
spot_client=FakeSpotClient(),
)
self.assertEqual(payload["overview"]["total_equity_usdt"], 720.1)
symbols = {item["symbol"] for item in payload["positions"]}
self.assertNotIn("DOGEUSDT", symbols)
self.assertIn("BTCUSDT", symbols)
balances = {item["asset"]: item for item in payload["balances"]}
self.assertFalse(balances["USDT"]["is_dust"])
self.assertFalse(balances["BTC"]["is_dust"])
self.assertTrue(balances["DOGE"]["is_dust"])
def test_market_tickers_and_scan_universe(self):
config = {

View File

@@ -22,10 +22,16 @@ class CLITestCase(unittest.TestCase):
def test_init_dispatches(self):
captured = {}
with patch.object(cli, "ensure_init_files", return_value={"force": True, "root": "/tmp/ch"}), patch.object(
cli, "install_shell_completion", return_value={"shell": "zsh", "installed": True, "path": "/tmp/ch/_coinhunter"}
), patch.object(
with (
patch.object(cli, "ensure_init_files", return_value={"force": True, "root": "/tmp/ch"}),
patch.object(
cli,
"install_shell_completion",
return_value={"shell": "zsh", "installed": True, "path": "/tmp/ch/_coinhunter"},
),
patch.object(
cli, "print_output", side_effect=lambda payload, **kwargs: captured.setdefault("payload", payload)
),
):
result = cli.main(["init", "--force"])
self.assertEqual(result, 0)
@@ -85,11 +91,50 @@ class CLITestCase(unittest.TestCase):
self.assertIn("lastPrice", output)
self.assertIn("BTCUSDT", output)
def test_account_dispatches(self):
captured = {}
with (
patch.object(
cli, "load_config", return_value={"binance": {"spot_base_url": "https://test", "recv_window": 5000}, "market": {"default_quote": "USDT"}, "trading": {"dust_usdt_threshold": 10.0}}
),
patch.object(cli, "get_binance_credentials", return_value={"api_key": "k", "api_secret": "s"}),
patch.object(cli, "SpotBinanceClient"),
patch.object(
cli.account_service, "get_balances", return_value={"balances": [{"asset": "BTC", "is_dust": False}]}
),
patch.object(
cli, "print_output", side_effect=lambda payload, **kwargs: captured.setdefault("payload", payload)
),
):
result = cli.main(["account"])
self.assertEqual(result, 0)
self.assertEqual(captured["payload"]["balances"][0]["asset"], "BTC")
def test_upgrade_dispatches(self):
captured = {}
with patch.object(cli, "self_upgrade", return_value={"command": "pipx upgrade coinhunter", "returncode": 0}), patch.object(
with (
patch.object(cli, "self_upgrade", return_value={"command": "pipx upgrade coinhunter", "returncode": 0}),
patch.object(
cli, "print_output", side_effect=lambda payload, **kwargs: captured.setdefault("payload", payload)
),
):
result = cli.main(["upgrade"])
self.assertEqual(result, 0)
self.assertEqual(captured["payload"]["returncode"], 0)
def test_catlog_dispatches(self):
captured = {}
with (
patch.object(
cli, "read_audit_log", return_value=[{"timestamp": "2026-04-17T12:00:00Z", "event": "test_event"}]
),
patch.object(
cli, "print_output", side_effect=lambda payload, **kwargs: captured.setdefault("payload", payload)
),
):
result = cli.main(["catlog", "-n", "5", "-o", "10"])
self.assertEqual(result, 0)
self.assertEqual(captured["payload"]["limit"], 5)
self.assertEqual(captured["payload"]["offset"], 10)
self.assertIn("entries", captured["payload"])
self.assertEqual(captured["payload"]["total"], 1)

View File

@@ -8,13 +8,21 @@ import unittest
from pathlib import Path
from unittest.mock import patch
from coinhunter.config import ensure_init_files, get_binance_credentials, load_config, load_env_file
from coinhunter.config import (
ensure_init_files,
get_binance_credentials,
load_config,
load_env_file,
)
from coinhunter.runtime import get_runtime_paths
class ConfigRuntimeTestCase(unittest.TestCase):
def test_init_files_created_in_coinhunter_home(self):
with tempfile.TemporaryDirectory() as tmp_dir, patch.dict(os.environ, {"COINHUNTER_HOME": str(Path(tmp_dir) / "home")}, clear=False):
with (
tempfile.TemporaryDirectory() as tmp_dir,
patch.dict(os.environ, {"COINHUNTER_HOME": str(Path(tmp_dir) / "home")}, clear=False),
):
paths = get_runtime_paths()
payload = ensure_init_files(paths)
self.assertTrue(paths.config_file.exists())
@@ -23,10 +31,13 @@ class ConfigRuntimeTestCase(unittest.TestCase):
self.assertEqual(payload["root"], str(paths.root))
def test_load_config_and_env(self):
with tempfile.TemporaryDirectory() as tmp_dir, patch.dict(
with (
tempfile.TemporaryDirectory() as tmp_dir,
patch.dict(
os.environ,
{"COINHUNTER_HOME": str(Path(tmp_dir) / "home")},
clear=False,
),
):
paths = get_runtime_paths()
ensure_init_files(paths)
@@ -40,10 +51,13 @@ class ConfigRuntimeTestCase(unittest.TestCase):
self.assertEqual(os.environ["BINANCE_API_SECRET"], "def")
def test_env_file_overrides_existing_environment(self):
with tempfile.TemporaryDirectory() as tmp_dir, patch.dict(
with (
tempfile.TemporaryDirectory() as tmp_dir,
patch.dict(
os.environ,
{"COINHUNTER_HOME": str(Path(tmp_dir) / "home"), "BINANCE_API_KEY": "old_key"},
clear=False,
),
):
paths = get_runtime_paths()
ensure_init_files(paths)
@@ -55,10 +69,13 @@ class ConfigRuntimeTestCase(unittest.TestCase):
self.assertEqual(os.environ["BINANCE_API_SECRET"], "new_secret")
def test_missing_credentials_raise(self):
with tempfile.TemporaryDirectory() as tmp_dir, patch.dict(
with (
tempfile.TemporaryDirectory() as tmp_dir,
patch.dict(
os.environ,
{"COINHUNTER_HOME": str(Path(tmp_dir) / "home")},
clear=False,
),
):
os.environ.pop("BINANCE_API_KEY", None)
os.environ.pop("BINANCE_API_SECRET", None)
@@ -68,12 +85,17 @@ class ConfigRuntimeTestCase(unittest.TestCase):
get_binance_credentials(paths)
def test_permission_error_is_explained(self):
with tempfile.TemporaryDirectory() as tmp_dir, patch.dict(
with (
tempfile.TemporaryDirectory() as tmp_dir,
patch.dict(
os.environ,
{"COINHUNTER_HOME": str(Path(tmp_dir) / "home")},
clear=False,
),
):
paths = get_runtime_paths()
with patch("coinhunter.config.ensure_runtime_dirs", side_effect=PermissionError("no write access")):
with self.assertRaisesRegex(RuntimeError, "Set COINHUNTER_HOME to a writable directory"):
with (
patch("coinhunter.config.ensure_runtime_dirs", side_effect=PermissionError("no write access")),
self.assertRaisesRegex(RuntimeError, "Set COINHUNTER_HOME to a writable directory"),
):
ensure_init_files(paths)

View File

@@ -29,17 +29,52 @@ class FakeSpotClient:
def ticker_24h(self, symbols=None):
rows = {
"BTCUSDT": {"symbol": "BTCUSDT", "lastPrice": "60000", "priceChangePercent": "5", "quoteVolume": "9000000", "highPrice": "60200", "lowPrice": "55000"},
"ETHUSDT": {"symbol": "ETHUSDT", "lastPrice": "3000", "priceChangePercent": "3", "quoteVolume": "8000000", "highPrice": "3100", "lowPrice": "2800"},
"SOLUSDT": {"symbol": "SOLUSDT", "lastPrice": "150", "priceChangePercent": "8", "quoteVolume": "10000000", "highPrice": "152", "lowPrice": "130"},
"DOGEUSDT": {"symbol": "DOGEUSDT", "lastPrice": "0.1", "priceChangePercent": "1", "quoteVolume": "100", "highPrice": "0.11", "lowPrice": "0.09"},
"BTCUSDT": {
"symbol": "BTCUSDT",
"lastPrice": "60000",
"priceChangePercent": "5",
"quoteVolume": "9000000",
"highPrice": "60200",
"lowPrice": "55000",
},
"ETHUSDT": {
"symbol": "ETHUSDT",
"lastPrice": "3000",
"priceChangePercent": "3",
"quoteVolume": "8000000",
"highPrice": "3100",
"lowPrice": "2800",
},
"SOLUSDT": {
"symbol": "SOLUSDT",
"lastPrice": "150",
"priceChangePercent": "8",
"quoteVolume": "10000000",
"highPrice": "152",
"lowPrice": "130",
},
"DOGEUSDT": {
"symbol": "DOGEUSDT",
"lastPrice": "0.1",
"priceChangePercent": "1",
"quoteVolume": "100",
"highPrice": "0.11",
"lowPrice": "0.09",
},
}
if not symbols:
return list(rows.values())
return [rows[symbol] for symbol in symbols]
def exchange_info(self):
return {"symbols": [{"symbol": "BTCUSDT", "status": "TRADING"}, {"symbol": "ETHUSDT", "status": "TRADING"}, {"symbol": "SOLUSDT", "status": "TRADING"}, {"symbol": "DOGEUSDT", "status": "TRADING"}]}
return {
"symbols": [
{"symbol": "BTCUSDT", "status": "TRADING"},
{"symbol": "ETHUSDT", "status": "TRADING"},
{"symbol": "SOLUSDT", "status": "TRADING"},
{"symbol": "DOGEUSDT", "status": "TRADING"},
]
}
def klines(self, symbol, interval, limit):
curves = {
@@ -50,7 +85,18 @@ class FakeSpotClient:
}[symbol]
rows = []
for index, close in enumerate(curves[-limit:]):
rows.append([index, close * 0.98, close * 1.01, close * 0.97, close, 100 + index * 10, index + 1, close * (100 + index * 10)])
rows.append(
[
index,
close * 0.98,
close * 1.01,
close * 0.97,
close,
100 + index * 10,
index + 1,
close * (100 + index * 10),
]
)
return rows
@@ -85,7 +131,9 @@ class OpportunityServiceTestCase(unittest.TestCase):
def test_scan_is_deterministic(self):
with patch.object(opportunity_service, "audit_event", return_value=None):
payload = opportunity_service.scan_opportunities(self.config | {"opportunity": self.config["opportunity"] | {"top_n": 2}}, spot_client=FakeSpotClient())
payload = opportunity_service.scan_opportunities(
self.config | {"opportunity": self.config["opportunity"] | {"top_n": 2}}, spot_client=FakeSpotClient()
)
self.assertEqual([item["symbol"] for item in payload["recommendations"]], ["SOLUSDT", "BTCUSDT"])
def test_score_candidate_handles_empty_klines(self):

View File

@@ -20,7 +20,9 @@ class FakeSpotClient:
class TradeServiceTestCase(unittest.TestCase):
def test_spot_market_buy_dry_run_does_not_call_client(self):
events = []
with patch.object(trade_service, "audit_event", side_effect=lambda event, payload: events.append((event, payload))):
with patch.object(
trade_service, "audit_event", side_effect=lambda event, payload: events.append((event, payload))
):
client = FakeSpotClient()
payload = trade_service.execute_spot_trade(
{"trading": {"dry_run_default": False}},
@@ -55,8 +57,10 @@ class TradeServiceTestCase(unittest.TestCase):
self.assertEqual(client.calls[0]["timeInForce"], "GTC")
def test_spot_market_buy_requires_quote(self):
with patch.object(trade_service, "audit_event", return_value=None):
with self.assertRaisesRegex(RuntimeError, "requires --quote"):
with (
patch.object(trade_service, "audit_event", return_value=None),
self.assertRaisesRegex(RuntimeError, "requires --quote"),
):
trade_service.execute_spot_trade(
{"trading": {"dry_run_default": False}},
side="buy",
@@ -70,8 +74,10 @@ class TradeServiceTestCase(unittest.TestCase):
)
def test_spot_market_buy_rejects_qty(self):
with patch.object(trade_service, "audit_event", return_value=None):
with self.assertRaisesRegex(RuntimeError, "accepts --quote only"):
with (
patch.object(trade_service, "audit_event", return_value=None),
self.assertRaisesRegex(RuntimeError, "accepts --quote only"),
):
trade_service.execute_spot_trade(
{"trading": {"dry_run_default": False}},
side="buy",
@@ -85,8 +91,10 @@ class TradeServiceTestCase(unittest.TestCase):
)
def test_spot_market_sell_rejects_quote(self):
with patch.object(trade_service, "audit_event", return_value=None):
with self.assertRaisesRegex(RuntimeError, "accepts --qty only"):
with (
patch.object(trade_service, "audit_event", return_value=None),
self.assertRaisesRegex(RuntimeError, "accepts --qty only"),
):
trade_service.execute_spot_trade(
{"trading": {"dry_run_default": False}},
side="sell",