491 lines
18 KiB
Python
491 lines
18 KiB
Python
"""CoinHunter V2 CLI."""
|
||
|
||
from __future__ import annotations
|
||
|
||
import argparse
|
||
import sys
|
||
from typing import Any
|
||
|
||
from . import __version__
|
||
from .audit import read_audit_log
|
||
from .binance.spot_client import SpotBinanceClient
|
||
from .config import ensure_init_files, get_binance_credentials, load_config
|
||
from .runtime import get_runtime_paths, install_shell_completion, print_output, self_upgrade, with_spinner
|
||
from .services import account_service, market_service, opportunity_service, trade_service
|
||
|
||
EPILOG = """\
|
||
examples:
|
||
coin init
|
||
coin acc ov
|
||
coin m tk BTCUSDT ETHUSDT
|
||
coin m k BTCUSDT -i 1h -l 50
|
||
coin buy BTCUSDT -Q 100 -d
|
||
coin sell BTCUSDT --qty 0.01 --type limit --price 90000
|
||
coin opp scan -s BTCUSDT ETHUSDT
|
||
coin upgrade
|
||
"""
|
||
|
||
COMMAND_DOCS: dict[str, str] = {
|
||
"init": """\
|
||
Output: JSON
|
||
{
|
||
"root": "~/.coinhunter",
|
||
"files_created": ["config.toml", ".env"],
|
||
"completion": {"shell": "zsh", "installed": true}
|
||
}
|
||
Fields:
|
||
root – runtime directory path
|
||
files_created – list of generated files
|
||
completion – shell completion installation status
|
||
""",
|
||
"account/overview": """\
|
||
Output: JSON
|
||
{
|
||
"total_btc": 1.234,
|
||
"total_usdt": 50000.0,
|
||
"assets": [{"asset": "BTC", "free": 0.5, "locked": 0.1}]
|
||
}
|
||
Fields:
|
||
total_btc – total equity denominated in BTC
|
||
total_usdt – total equity denominated in USDT
|
||
assets – list of non-zero balances
|
||
""",
|
||
"account/balances": """\
|
||
Output: JSON array
|
||
[
|
||
{"asset": "BTC", "free": 0.5, "locked": 0.1, "total": 0.6}
|
||
]
|
||
Fields:
|
||
asset – asset symbol
|
||
free – available balance
|
||
locked – frozen/locked balance
|
||
total – free + locked
|
||
""",
|
||
"account/positions": """\
|
||
Output: JSON array
|
||
[
|
||
{"symbol": "BTCUSDT", "positionAmt": 0.01, "entryPrice": 90000.0}
|
||
]
|
||
Fields:
|
||
symbol – trading pair
|
||
positionAmt – quantity held (positive long, negative short)
|
||
entryPrice – average entry price
|
||
""",
|
||
"market/tickers": """\
|
||
Output: JSON object keyed by normalized symbol
|
||
{
|
||
"BTCUSDT": {"lastPrice": "70000.00", "priceChangePercent": "2.5", "volume": "12345.67"}
|
||
}
|
||
Fields:
|
||
lastPrice – latest traded price
|
||
priceChangePercent – 24h change %
|
||
volume – 24h base volume
|
||
""",
|
||
"market/klines": """\
|
||
Output: JSON object keyed by symbol, value is array of OHLCV candles
|
||
{
|
||
"BTCUSDT": [
|
||
{"open_time": 1713000000000, "open": 69000.0, "high": 69500.0, "low": 68800.0, "close": 69200.0, "volume": 100.5}
|
||
]
|
||
}
|
||
Fields per candle:
|
||
open_time – candle open timestamp (ms)
|
||
open/high/low/close – OHLC prices
|
||
volume – traded base volume
|
||
""",
|
||
"buy": """\
|
||
Output: JSON
|
||
{
|
||
"trade": {
|
||
"market_type": "spot",
|
||
"symbol": "BTCUSDT",
|
||
"side": "BUY",
|
||
"order_type": "MARKET",
|
||
"status": "DRY_RUN",
|
||
"dry_run": true,
|
||
"request_payload": {...},
|
||
"response_payload": {...}
|
||
}
|
||
}
|
||
Fields:
|
||
market_type – "spot"
|
||
side – "BUY"
|
||
order_type – MARKET or LIMIT
|
||
status – order status from exchange (or DRY_RUN)
|
||
dry_run – whether simulated
|
||
request_payload – normalized order sent to Binance
|
||
response_payload – raw exchange response
|
||
""",
|
||
"sell": """\
|
||
Output: JSON
|
||
{
|
||
"trade": {
|
||
"market_type": "spot",
|
||
"symbol": "BTCUSDT",
|
||
"side": "SELL",
|
||
"order_type": "LIMIT",
|
||
"status": "FILLED",
|
||
"dry_run": false,
|
||
"request_payload": {...},
|
||
"response_payload": {...}
|
||
}
|
||
}
|
||
Fields:
|
||
market_type – "spot"
|
||
side – "SELL"
|
||
order_type – MARKET or LIMIT
|
||
status – order status from exchange (or DRY_RUN)
|
||
dry_run – whether simulated
|
||
request_payload – normalized order sent to Binance
|
||
response_payload – raw exchange response
|
||
""",
|
||
"opportunity/portfolio": """\
|
||
Output: JSON
|
||
{
|
||
"scores": [
|
||
{"asset": "BTC", "score": 0.75, "metrics": {"volatility": 0.02, "trend": 0.01}}
|
||
]
|
||
}
|
||
Fields:
|
||
asset – scored asset
|
||
score – composite opportunity score (0-1)
|
||
metrics – breakdown of contributing signals
|
||
""",
|
||
"opportunity/scan": """\
|
||
Output: JSON
|
||
{
|
||
"opportunities": [
|
||
{"symbol": "ETHUSDT", "score": 0.82, "signals": ["momentum", "volume_spike"]}
|
||
]
|
||
}
|
||
Fields:
|
||
symbol – trading pair scanned
|
||
score – opportunity score (0-1)
|
||
signals – list of triggered signal names
|
||
""",
|
||
"upgrade": """\
|
||
Output: JSON
|
||
{
|
||
"command": "pip install --upgrade coinhunter",
|
||
"returncode": 0,
|
||
"stdout": "...",
|
||
"stderr": ""
|
||
}
|
||
Fields:
|
||
command – shell command executed
|
||
returncode – process exit code (0 = success)
|
||
stdout – command standard output
|
||
stderr – command standard error
|
||
""",
|
||
"completion": """\
|
||
Output: shell script text (not JSON)
|
||
# bash/zsh completion script for coinhunter
|
||
...
|
||
Fields:
|
||
(raw shell script suitable for sourcing)
|
||
""",
|
||
}
|
||
|
||
|
||
|
||
def _load_spot_client(config: dict[str, Any], *, client: Any | None = None) -> SpotBinanceClient:
|
||
credentials = get_binance_credentials()
|
||
binance_config = config["binance"]
|
||
return SpotBinanceClient(
|
||
api_key=credentials["api_key"],
|
||
api_secret=credentials["api_secret"],
|
||
base_url=binance_config["spot_base_url"],
|
||
recv_window=int(binance_config["recv_window"]),
|
||
client=client,
|
||
)
|
||
|
||
|
||
def build_parser() -> argparse.ArgumentParser:
|
||
parser = argparse.ArgumentParser(
|
||
prog="coinhunter",
|
||
description="CoinHunter V2 Binance-first trading CLI",
|
||
epilog=EPILOG,
|
||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||
)
|
||
parser.add_argument("-v", "--version", action="version", version=__version__)
|
||
parser.add_argument("-a", "--agent", action="store_true", help="Output in agent-friendly format (JSON or compact)")
|
||
parser.add_argument("--doc", action="store_true", help="Show output schema and field descriptions for the command")
|
||
subparsers = parser.add_subparsers(dest="command")
|
||
|
||
init_parser = subparsers.add_parser("init", help="Generate config.toml, .env, and log directory")
|
||
init_parser.add_argument("-f", "--force", action="store_true", help="Overwrite existing files")
|
||
|
||
account_parser = subparsers.add_parser("account", aliases=["acc", "a"], help="Account overview, balances, and positions")
|
||
account_subparsers = account_parser.add_subparsers(dest="account_command")
|
||
account_commands_help = {
|
||
"overview": "Total equity and summary",
|
||
"balances": "List asset balances",
|
||
"positions": "List open positions",
|
||
}
|
||
account_aliases = {
|
||
"overview": ["ov"],
|
||
"balances": ["bal", "b"],
|
||
"positions": ["pos", "p"],
|
||
}
|
||
for name in ("overview", "balances", "positions"):
|
||
account_subparsers.add_parser(name, aliases=account_aliases[name], help=account_commands_help[name])
|
||
|
||
market_parser = subparsers.add_parser("market", aliases=["m"], help="Batch market queries")
|
||
market_subparsers = market_parser.add_subparsers(dest="market_command")
|
||
tickers_parser = market_subparsers.add_parser("tickers", aliases=["tk", "t"], help="Fetch 24h ticker data")
|
||
tickers_parser.add_argument("symbols", nargs="+", metavar="SYM", help="Symbols to query (e.g. BTCUSDT ETH/USDT)")
|
||
klines_parser = market_subparsers.add_parser("klines", aliases=["k"], help="Fetch OHLCV klines")
|
||
klines_parser.add_argument("symbols", nargs="+", metavar="SYM", help="Symbols to query")
|
||
klines_parser.add_argument("-i", "--interval", default="1h", help="Kline interval (default: 1h)")
|
||
klines_parser.add_argument("-l", "--limit", type=int, default=100, help="Number of candles (default: 100)")
|
||
|
||
buy_parser = subparsers.add_parser("buy", aliases=["b"], help="Buy base asset")
|
||
buy_parser.add_argument("symbol", metavar="SYM", help="Trading pair (e.g. BTCUSDT)")
|
||
buy_parser.add_argument("-q", "--qty", type=float, help="Base asset quantity (limit orders)")
|
||
buy_parser.add_argument("-Q", "--quote", type=float, help="Quote asset amount (market buy only)")
|
||
buy_parser.add_argument("-t", "--type", choices=["market", "limit"], default="market", help="Order type (default: market)")
|
||
buy_parser.add_argument("-p", "--price", type=float, help="Limit price")
|
||
buy_parser.add_argument("-d", "--dry-run", action="store_true", help="Simulate without sending")
|
||
|
||
sell_parser = subparsers.add_parser("sell", aliases=["s"], help="Sell base asset")
|
||
sell_parser.add_argument("symbol", metavar="SYM", help="Trading pair (e.g. BTCUSDT)")
|
||
sell_parser.add_argument("-q", "--qty", type=float, help="Base asset quantity")
|
||
sell_parser.add_argument("-t", "--type", choices=["market", "limit"], default="market", help="Order type (default: market)")
|
||
sell_parser.add_argument("-p", "--price", type=float, help="Limit price")
|
||
sell_parser.add_argument("-d", "--dry-run", action="store_true", help="Simulate without sending")
|
||
|
||
opportunity_parser = subparsers.add_parser("opportunity", aliases=["opp", "o"], help="Portfolio analysis and market scanning")
|
||
opportunity_subparsers = opportunity_parser.add_subparsers(dest="opportunity_command")
|
||
opportunity_subparsers.add_parser("portfolio", aliases=["pf", "p"], help="Score current holdings")
|
||
scan_parser = opportunity_subparsers.add_parser("scan", help="Scan market for opportunities")
|
||
scan_parser.add_argument("-s", "--symbols", nargs="*", metavar="SYM", help="Restrict scan to specific symbols")
|
||
|
||
subparsers.add_parser("upgrade", help="Upgrade coinhunter to the latest version")
|
||
|
||
catlog_parser = subparsers.add_parser("catlog", help="Read recent audit log entries")
|
||
catlog_parser.add_argument("-n", "--limit", type=int, default=10, help="Number of entries (default: 10)")
|
||
catlog_parser.add_argument(
|
||
"-o", "--offset", type=int, default=0, help="Skip the most recent N entries (default: 0)"
|
||
)
|
||
|
||
completion_parser = subparsers.add_parser("completion", help="Generate shell completion script")
|
||
completion_parser.add_argument("shell", choices=["bash", "zsh"], help="Target shell")
|
||
|
||
return parser
|
||
|
||
|
||
_CANONICAL_COMMANDS = {
|
||
"b": "buy",
|
||
"s": "sell",
|
||
"acc": "account",
|
||
"a": "account",
|
||
"m": "market",
|
||
"opp": "opportunity",
|
||
"o": "opportunity",
|
||
}
|
||
|
||
_CANONICAL_SUBCOMMANDS = {
|
||
"ov": "overview",
|
||
"bal": "balances",
|
||
"b": "balances",
|
||
"pos": "positions",
|
||
"p": "positions",
|
||
"tk": "tickers",
|
||
"t": "tickers",
|
||
"k": "klines",
|
||
"pf": "portfolio",
|
||
"p": "portfolio",
|
||
}
|
||
|
||
_COMMANDS_WITH_SUBCOMMANDS = {"account", "market", "opportunity"}
|
||
|
||
|
||
def _get_doc_key(argv: list[str]) -> str | None:
|
||
"""Infer command/subcommand from argv for --doc lookup."""
|
||
tokens = [a for a in argv if a != "--doc" and not a.startswith("-")]
|
||
if not tokens:
|
||
return None
|
||
cmd = _CANONICAL_COMMANDS.get(tokens[0], tokens[0])
|
||
if cmd in _COMMANDS_WITH_SUBCOMMANDS and len(tokens) > 1:
|
||
sub = _CANONICAL_SUBCOMMANDS.get(tokens[1], tokens[1])
|
||
return f"{cmd}/{sub}"
|
||
return cmd
|
||
|
||
|
||
def _reorder_flag(argv: list[str], flag: str, short_flag: str | None = None) -> list[str]:
|
||
"""Move a global flag from after subcommands to before them so argparse can parse it."""
|
||
flags = {flag}
|
||
if short_flag:
|
||
flags.add(short_flag)
|
||
subcommand_idx: int | None = None
|
||
for i, arg in enumerate(argv):
|
||
if not arg.startswith("-"):
|
||
subcommand_idx = i
|
||
break
|
||
if subcommand_idx is None:
|
||
return argv
|
||
new_argv: list[str] = []
|
||
present = False
|
||
for i, arg in enumerate(argv):
|
||
if i >= subcommand_idx and arg in flags:
|
||
present = True
|
||
continue
|
||
new_argv.append(arg)
|
||
if present:
|
||
new_argv.insert(subcommand_idx, flag)
|
||
return new_argv
|
||
|
||
|
||
def main(argv: list[str] | None = None) -> int:
|
||
raw_argv = argv if argv is not None else sys.argv[1:]
|
||
|
||
if "--doc" in raw_argv:
|
||
doc_key = _get_doc_key(raw_argv)
|
||
if doc_key is None:
|
||
print("Available docs: " + ", ".join(sorted(COMMAND_DOCS.keys())))
|
||
return 0
|
||
doc = COMMAND_DOCS.get(doc_key, f"No documentation available for {doc_key}.")
|
||
print(doc)
|
||
return 0
|
||
|
||
parser = build_parser()
|
||
raw_argv = _reorder_flag(raw_argv, "--agent", "-a")
|
||
args = parser.parse_args(raw_argv)
|
||
|
||
# Normalize aliases to canonical command names
|
||
if args.command:
|
||
args.command = _CANONICAL_COMMANDS.get(args.command, args.command)
|
||
for attr in ("account_command", "market_command", "opportunity_command"):
|
||
val = getattr(args, attr, None)
|
||
if val:
|
||
setattr(args, attr, _CANONICAL_SUBCOMMANDS.get(val, val))
|
||
|
||
try:
|
||
if not args.command:
|
||
parser.print_help()
|
||
return 0
|
||
|
||
if args.command == "init":
|
||
init_result = ensure_init_files(get_runtime_paths(), force=args.force)
|
||
init_result["completion"] = install_shell_completion(parser)
|
||
print_output(init_result, agent=args.agent)
|
||
return 0
|
||
|
||
if args.command == "completion":
|
||
import shtab
|
||
|
||
print(shtab.complete(parser, shell=args.shell, preamble=""))
|
||
return 0
|
||
|
||
config = load_config()
|
||
|
||
if args.command == "account":
|
||
spot_client = _load_spot_client(config)
|
||
if args.account_command == "overview":
|
||
with with_spinner("Fetching account overview...", enabled=not args.agent):
|
||
result = account_service.get_overview(config, spot_client=spot_client)
|
||
print_output(result, agent=args.agent)
|
||
return 0
|
||
if args.account_command == "balances":
|
||
with with_spinner("Fetching balances...", enabled=not args.agent):
|
||
result = account_service.get_balances(config, spot_client=spot_client)
|
||
print_output(result, agent=args.agent)
|
||
return 0
|
||
if args.account_command == "positions":
|
||
with with_spinner("Fetching positions...", enabled=not args.agent):
|
||
result = account_service.get_positions(config, spot_client=spot_client)
|
||
print_output(result, agent=args.agent)
|
||
return 0
|
||
parser.error("account requires one of: overview, balances, positions")
|
||
|
||
if args.command == "market":
|
||
spot_client = _load_spot_client(config)
|
||
if args.market_command == "tickers":
|
||
with with_spinner("Fetching tickers...", enabled=not args.agent):
|
||
result = market_service.get_tickers(config, args.symbols, spot_client=spot_client)
|
||
print_output(result, agent=args.agent)
|
||
return 0
|
||
if args.market_command == "klines":
|
||
with with_spinner("Fetching klines...", enabled=not args.agent):
|
||
result = market_service.get_klines(
|
||
config,
|
||
args.symbols,
|
||
interval=args.interval,
|
||
limit=args.limit,
|
||
spot_client=spot_client,
|
||
)
|
||
print_output(result, agent=args.agent)
|
||
return 0
|
||
parser.error("market requires one of: tickers, klines")
|
||
|
||
if args.command == "buy":
|
||
spot_client = _load_spot_client(config)
|
||
with with_spinner("Placing order...", enabled=not args.agent):
|
||
result = trade_service.execute_spot_trade(
|
||
config,
|
||
side="buy",
|
||
symbol=args.symbol,
|
||
qty=args.qty,
|
||
quote=args.quote,
|
||
order_type=args.type,
|
||
price=args.price,
|
||
dry_run=True if args.dry_run else None,
|
||
spot_client=spot_client,
|
||
)
|
||
print_output(result, agent=args.agent)
|
||
return 0
|
||
|
||
if args.command == "sell":
|
||
spot_client = _load_spot_client(config)
|
||
with with_spinner("Placing order...", enabled=not args.agent):
|
||
result = trade_service.execute_spot_trade(
|
||
config,
|
||
side="sell",
|
||
symbol=args.symbol,
|
||
qty=args.qty,
|
||
quote=None,
|
||
order_type=args.type,
|
||
price=args.price,
|
||
dry_run=True if args.dry_run else None,
|
||
spot_client=spot_client,
|
||
)
|
||
print_output(result, agent=args.agent)
|
||
return 0
|
||
|
||
if args.command == "opportunity":
|
||
spot_client = _load_spot_client(config)
|
||
if args.opportunity_command == "portfolio":
|
||
with with_spinner("Analyzing portfolio...", enabled=not args.agent):
|
||
result = opportunity_service.analyze_portfolio(config, spot_client=spot_client)
|
||
print_output(result, agent=args.agent)
|
||
return 0
|
||
if args.opportunity_command == "scan":
|
||
with with_spinner("Scanning opportunities...", enabled=not args.agent):
|
||
result = opportunity_service.scan_opportunities(
|
||
config, spot_client=spot_client, symbols=args.symbols
|
||
)
|
||
print_output(result, agent=args.agent)
|
||
return 0
|
||
parser.error("opportunity requires `portfolio` or `scan`")
|
||
|
||
if args.command == "upgrade":
|
||
with with_spinner("Upgrading coinhunter...", enabled=not args.agent):
|
||
result = self_upgrade()
|
||
print_output(result, agent=args.agent)
|
||
return 0
|
||
|
||
if args.command == "catlog":
|
||
with with_spinner("Reading audit logs...", enabled=not args.agent):
|
||
entries = read_audit_log(limit=args.limit, offset=args.offset)
|
||
print_output(
|
||
{"entries": entries, "limit": args.limit, "offset": args.offset, "total": len(entries)},
|
||
agent=args.agent,
|
||
)
|
||
return 0
|
||
|
||
parser.error(f"Unsupported command {args.command}")
|
||
return 2
|
||
except Exception as exc:
|
||
print(f"error: {exc}", file=sys.stderr)
|
||
return 1
|