feat: add Braille spinner, shell completions, and TUI polish

- Add with_spinner context manager with cyan Braille animation for human mode.
- Wrap all query/execution commands in cli.py with loading spinners.
- Integrate shtab: auto-install shell completions during init for zsh/bash.
- Add `completion` subcommand for manual script generation.
- Fix stale output_format default in DEFAULT_CONFIG (json → tui).
- Add help descriptions to all second-level subcommands.
- Version 2.0.4.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-16 19:11:40 +08:00
parent b857ea33f3
commit 536425e8ea
6 changed files with 257 additions and 79 deletions

View File

@@ -53,6 +53,8 @@ This creates:
- `~/.coinhunter/.env`
- `~/.coinhunter/logs/`
If you are using **zsh** or **bash**, `init` will also generate and install shell completion scripts automatically, and update your rc file (`~/.zshrc` or `~/.bashrc`) if needed.
`config.toml` stores runtime and strategy settings. `.env` stores:
```bash
@@ -93,6 +95,10 @@ coinhunter opportunity scan --symbols BTCUSDT ETHUSDT SOLUSDT
# Self-upgrade
coinhunter upgrade
# Shell completion (manual)
coinhunter completion zsh > ~/.zsh/completions/_coinhunter
coinhunter completion bash > ~/.local/share/bash-completion/completions/coinhunter
```
`upgrade` will try `pipx upgrade coinhunter` first, and fall back to `pip install --upgrade coinhunter` if pipx is not available.

View File

@@ -12,6 +12,7 @@ requires-python = ">=3.10"
dependencies = [
"binance-connector>=3.9.0",
"binance-futures-connector>=4.1.0",
"shtab>=1.7.0",
"tomli>=2.0.1; python_version < '3.11'",
]
authors = [

View File

@@ -10,7 +10,7 @@ from . import __version__
from .binance.spot_client import SpotBinanceClient
from .binance.um_futures_client import UMFuturesClient
from .config import ensure_init_files, get_binance_credentials, load_config
from .runtime import get_runtime_paths, print_output, self_upgrade
from .runtime import get_runtime_paths, install_shell_completion, print_output, self_upgrade, with_spinner
from .services import account_service, market_service, opportunity_service, trade_service
EPILOG = """\
@@ -72,8 +72,13 @@ def build_parser() -> argparse.ArgumentParser:
account_parser = subparsers.add_parser("account", help="Account overview, balances, and positions")
account_subparsers = account_parser.add_subparsers(dest="account_command")
account_commands_help = {
"overview": "Total equity and summary across markets",
"balances": "List asset balances",
"positions": "List open positions",
}
for name in ("overview", "balances", "positions"):
sub = account_subparsers.add_parser(name)
sub = account_subparsers.add_parser(name, help=account_commands_help[name])
sub.add_argument("-s", "--spot", action="store_true", help="Include spot market")
sub.add_argument("-f", "--futures", action="store_true", help="Include futures market")
@@ -91,8 +96,9 @@ def build_parser() -> argparse.ArgumentParser:
spot_parser = trade_subparsers.add_parser("spot", help="Spot market orders")
spot_subparsers = spot_parser.add_subparsers(dest="trade_action")
spot_side_help = {"buy": "Buy base asset with quote quantity", "sell": "Sell base asset quantity"}
for side in ("buy", "sell"):
sub = spot_subparsers.add_parser(side)
sub = spot_subparsers.add_parser(side, help=spot_side_help[side])
sub.add_argument("symbol", metavar="SYM", help="Trading pair (e.g. BTCUSDT)")
sub.add_argument("-q", "--qty", type=float, help="Base asset quantity")
sub.add_argument("-Q", "--quote", type=float, help="Quote asset amount (buy market only)")
@@ -102,8 +108,9 @@ def build_parser() -> argparse.ArgumentParser:
futures_parser = trade_subparsers.add_parser("futures", help="USDT-M futures orders")
futures_subparsers = futures_parser.add_subparsers(dest="trade_action")
futures_side_help = {"buy": "Open or add to a LONG position", "sell": "Open or add to a SHORT position"}
for side in ("buy", "sell"):
sub = futures_subparsers.add_parser(side)
sub = futures_subparsers.add_parser(side, help=futures_side_help[side])
sub.add_argument("symbol", metavar="SYM", help="Trading pair (e.g. BTCUSDT)")
sub.add_argument("-q", "--qty", type=float, required=True, help="Contract quantity")
sub.add_argument("-t", "--type", choices=["market", "limit"], default="market", help="Order type (default: market)")
@@ -122,6 +129,9 @@ def build_parser() -> argparse.ArgumentParser:
subparsers.add_parser("upgrade", help="Upgrade coinhunter to the latest version")
completion_parser = subparsers.add_parser("completion", help="Generate shell completion script")
completion_parser.add_argument("shell", choices=["bash", "zsh"], help="Target shell")
return parser
@@ -134,7 +144,15 @@ def main(argv: list[str] | None = None) -> int:
return 0
if args.command == "init":
print_output(ensure_init_files(get_runtime_paths(), force=args.force), agent=args.agent)
init_result = ensure_init_files(get_runtime_paths(), force=args.force)
init_result["completion"] = install_shell_completion(parser)
print_output(init_result, agent=args.agent)
return 0
if args.command == "completion":
import shtab
print(shtab.complete(parser, shell=args.shell, preamble=""))
return 0
config = load_config()
@@ -144,117 +162,127 @@ def main(argv: list[str] | None = None) -> int:
spot_client = _load_spot_client(config) if include_spot else None
futures_client = _load_futures_client(config) if include_futures else None
if args.account_command == "overview":
print_output(
account_service.get_overview(
config,
include_spot=include_spot,
include_futures=include_futures,
spot_client=spot_client,
futures_client=futures_client,
),
agent=args.agent,
)
with with_spinner("Fetching account overview...", enabled=not args.agent):
print_output(
account_service.get_overview(
config,
include_spot=include_spot,
include_futures=include_futures,
spot_client=spot_client,
futures_client=futures_client,
),
agent=args.agent,
)
return 0
if args.account_command == "balances":
print_output(
account_service.get_balances(
config,
include_spot=include_spot,
include_futures=include_futures,
spot_client=spot_client,
futures_client=futures_client,
),
agent=args.agent,
)
with with_spinner("Fetching balances...", enabled=not args.agent):
print_output(
account_service.get_balances(
config,
include_spot=include_spot,
include_futures=include_futures,
spot_client=spot_client,
futures_client=futures_client,
),
agent=args.agent,
)
return 0
if args.account_command == "positions":
print_output(
account_service.get_positions(
config,
include_spot=include_spot,
include_futures=include_futures,
spot_client=spot_client,
futures_client=futures_client,
),
agent=args.agent,
)
with with_spinner("Fetching positions...", enabled=not args.agent):
print_output(
account_service.get_positions(
config,
include_spot=include_spot,
include_futures=include_futures,
spot_client=spot_client,
futures_client=futures_client,
),
agent=args.agent,
)
return 0
parser.error("account requires one of: overview, balances, positions")
if args.command == "market":
spot_client = _load_spot_client(config)
if args.market_command == "tickers":
print_output(market_service.get_tickers(config, args.symbols, spot_client=spot_client), agent=args.agent)
with with_spinner("Fetching tickers...", enabled=not args.agent):
print_output(market_service.get_tickers(config, args.symbols, spot_client=spot_client), agent=args.agent)
return 0
if args.market_command == "klines":
print_output(
market_service.get_klines(
config,
args.symbols,
interval=args.interval,
limit=args.limit,
spot_client=spot_client,
),
agent=args.agent,
)
with with_spinner("Fetching klines...", enabled=not args.agent):
print_output(
market_service.get_klines(
config,
args.symbols,
interval=args.interval,
limit=args.limit,
spot_client=spot_client,
),
agent=args.agent,
)
return 0
parser.error("market requires one of: tickers, klines")
if args.command == "trade":
if args.trade_market == "spot":
spot_client = _load_spot_client(config)
print_output(
trade_service.execute_spot_trade(
config,
side=args.trade_action,
symbol=args.symbol,
qty=args.qty,
quote=args.quote,
order_type=args.type,
price=args.price,
dry_run=True if args.dry_run else None,
spot_client=spot_client,
),
agent=args.agent,
)
with with_spinner("Placing spot order...", enabled=not args.agent):
print_output(
trade_service.execute_spot_trade(
config,
side=args.trade_action,
symbol=args.symbol,
qty=args.qty,
quote=args.quote,
order_type=args.type,
price=args.price,
dry_run=True if args.dry_run else None,
spot_client=spot_client,
),
agent=args.agent,
)
return 0
if args.trade_market == "futures":
futures_client = _load_futures_client(config)
if args.trade_action == "close":
with with_spinner("Closing futures position...", enabled=not args.agent):
print_output(
trade_service.close_futures_position(
config,
symbol=args.symbol,
dry_run=True if args.dry_run else None,
futures_client=futures_client,
),
agent=args.agent,
)
return 0
with with_spinner("Placing futures order...", enabled=not args.agent):
print_output(
trade_service.close_futures_position(
trade_service.execute_futures_trade(
config,
side=args.trade_action,
symbol=args.symbol,
qty=args.qty,
order_type=args.type,
price=args.price,
reduce_only=args.reduce_only,
dry_run=True if args.dry_run else None,
futures_client=futures_client,
),
agent=args.agent,
)
return 0
print_output(
trade_service.execute_futures_trade(
config,
side=args.trade_action,
symbol=args.symbol,
qty=args.qty,
order_type=args.type,
price=args.price,
reduce_only=args.reduce_only,
dry_run=True if args.dry_run else None,
futures_client=futures_client,
),
agent=args.agent,
)
return 0
parser.error("trade requires `spot` or `futures`")
if args.command == "opportunity":
spot_client = _load_spot_client(config)
if args.opportunity_command == "portfolio":
print_output(opportunity_service.analyze_portfolio(config, spot_client=spot_client), agent=args.agent)
with with_spinner("Analyzing portfolio...", enabled=not args.agent):
print_output(opportunity_service.analyze_portfolio(config, spot_client=spot_client), agent=args.agent)
return 0
if args.opportunity_command == "scan":
print_output(opportunity_service.scan_opportunities(config, spot_client=spot_client, symbols=args.symbols), agent=args.agent)
with with_spinner("Scanning opportunities...", enabled=not args.agent):
print_output(opportunity_service.scan_opportunities(config, spot_client=spot_client, symbols=args.symbols), agent=args.agent)
return 0
parser.error("opportunity requires `portfolio` or `scan`")

View File

@@ -17,7 +17,7 @@ except ModuleNotFoundError: # pragma: no cover
DEFAULT_CONFIG = """[runtime]
timezone = "Asia/Shanghai"
log_dir = "logs"
output_format = "json"
output_format = "tui"
[binance]
spot_base_url = "https://api.binance.com"

View File

@@ -2,6 +2,7 @@
from __future__ import annotations
import argparse
import csv
import io
import json
@@ -10,11 +11,19 @@ import re
import shutil
import subprocess
import sys
import threading
from collections.abc import Iterator
from contextlib import contextmanager
from dataclasses import asdict, dataclass, is_dataclass
from datetime import date, datetime
from pathlib import Path
from typing import Any
try:
import shtab
except Exception: # pragma: no cover
shtab = None # type: ignore[assignment]
@dataclass(frozen=True)
class RuntimePaths:
@@ -345,6 +354,26 @@ def _render_tui(payload: Any) -> None:
print(f" {line}")
return
if "created_or_updated" in payload:
print(f"\n{_BOLD}{_CYAN} INITIALIZED {_RESET}")
print(f" Root: {payload.get('root', '')}")
print(f" Config: {payload.get('config_file', '')}")
print(f" Env: {payload.get('env_file', '')}")
print(f" Logs: {payload.get('logs_dir', '')}")
files = payload.get("created_or_updated", [])
if files:
action = "overwritten" if payload.get("force") else "created"
print(f" Files {action}: {', '.join(files)}")
comp = payload.get("completion", {})
if comp.get("installed"):
print(f"\n {_GREEN}{_RESET} Shell completions installed for {comp.get('shell', '')}")
print(f" Path: {comp.get('path', '')}")
if comp.get("hint"):
print(f" Hint: {comp.get('hint', '')}")
elif comp.get("reason"):
print(f"\n Shell completions: {comp.get('reason', '')}")
return
# Generic fallback for single-list payloads
if len(payload) == 1:
key, value = next(iter(payload.items()))
@@ -370,3 +399,114 @@ def print_output(payload: Any, *, agent: bool = False) -> None:
print_json(payload)
else:
_render_tui(payload)
# ---------------------------------------------------------------------------
# Spinner / loading animation
# ---------------------------------------------------------------------------
_SPINNER_FRAMES = ["", "", "", "", "", "", "", "", "", ""]
class _SpinnerThread(threading.Thread):
def __init__(self, message: str, interval: float = 0.08) -> None:
super().__init__(daemon=True)
self.message = message
self.interval = interval
self._stop_event = threading.Event()
def run(self) -> None:
i = 0
while not self._stop_event.is_set():
frame = _SPINNER_FRAMES[i % len(_SPINNER_FRAMES)]
sys.stdout.write(f"\r{_CYAN}{frame}{_RESET} {self.message} ")
sys.stdout.flush()
self._stop_event.wait(self.interval)
i += 1
def stop(self) -> None:
self._stop_event.set()
self.join()
sys.stdout.write("\r\033[K")
sys.stdout.flush()
@contextmanager
def with_spinner(message: str, *, enabled: bool = True) -> Iterator[None]:
if not enabled or not sys.stdout.isatty():
yield
return
spinner = _SpinnerThread(message)
spinner.start()
try:
yield
finally:
spinner.stop()
def _detect_shell() -> str:
shell = os.getenv("SHELL", "")
if "zsh" in shell:
return "zsh"
if "bash" in shell:
return "bash"
return ""
def _zshrc_path() -> Path:
return Path.home() / ".zshrc"
def _bashrc_path() -> Path:
return Path.home() / ".bashrc"
def _rc_contains(rc_path: Path, snippet: str) -> bool:
if not rc_path.exists():
return False
return snippet in rc_path.read_text(encoding="utf-8")
def install_shell_completion(parser: argparse.ArgumentParser) -> dict[str, Any]:
if shtab is None:
return {"shell": None, "installed": False, "reason": "shtab is not installed"}
shell = _detect_shell()
if not shell:
return {"shell": None, "installed": False, "reason": "unable to detect shell from $SHELL"}
script = shtab.complete(parser, shell=shell, preamble="")
installed_path: Path | None = None
hint: str | None = None
if shell == "zsh":
comp_dir = Path.home() / ".zsh" / "completions"
comp_dir.mkdir(parents=True, exist_ok=True)
installed_path = comp_dir / "_coinhunter"
installed_path.write_text(script, encoding="utf-8")
rc_path = _zshrc_path()
fpath_line = "fpath+=(~/.zsh/completions)"
if not _rc_contains(rc_path, fpath_line):
rc_path.write_text(fpath_line + "\n" + rc_path.read_text(encoding="utf-8") if rc_path.exists() else fpath_line + "\n", encoding="utf-8")
hint = "Added fpath+=(~/.zsh/completions) to ~/.zshrc; restart your terminal or run 'compinit'"
else:
hint = "Run 'compinit' or restart your terminal to activate completions"
elif shell == "bash":
comp_dir = Path.home() / ".local" / "share" / "bash-completion" / "completions"
comp_dir.mkdir(parents=True, exist_ok=True)
installed_path = comp_dir / "coinhunter"
installed_path.write_text(script, encoding="utf-8")
rc_path = _bashrc_path()
source_line = '[[ -r "~/.local/share/bash-completion/completions/coinhunter" ]] && . "~/.local/share/bash-completion/completions/coinhunter"'
if not _rc_contains(rc_path, source_line):
rc_path.write_text(source_line + "\n" + rc_path.read_text(encoding="utf-8") if rc_path.exists() else source_line + "\n", encoding="utf-8")
hint = "Added bash completion source line to ~/.bashrc; restart your terminal"
else:
hint = "Restart your terminal or source ~/.bashrc to activate completions"
return {
"shell": shell,
"installed": True,
"path": str(installed_path) if installed_path else None,
"hint": hint,
}

View File

@@ -20,11 +20,14 @@ class CLITestCase(unittest.TestCase):
def test_init_dispatches(self):
captured = {}
with patch.object(cli, "ensure_init_files", return_value={"force": True, "root": "/tmp/ch"}), patch.object(
cli, "install_shell_completion", return_value={"shell": "zsh", "installed": True, "path": "/tmp/ch/_coinhunter"}
), patch.object(
cli, "print_output", side_effect=lambda payload, **kwargs: captured.setdefault("payload", payload)
):
result = cli.main(["init", "--force"])
self.assertEqual(result, 0)
self.assertTrue(captured["payload"]["force"])
self.assertIn("completion", captured["payload"])
def test_old_command_is_rejected(self):
with self.assertRaises(SystemExit):