"""Runtime helpers for CoinHunter V2.""" from __future__ import annotations import argparse import csv import io import json import os import re import shutil import subprocess import sys import threading from collections.abc import Iterator from contextlib import contextmanager from dataclasses import asdict, dataclass, is_dataclass from datetime import date, datetime from pathlib import Path from typing import Any try: import shtab except Exception: # pragma: no cover shtab = None # type: ignore[assignment] @dataclass(frozen=True) class RuntimePaths: root: Path config_file: Path env_file: Path logs_dir: Path def as_dict(self) -> dict[str, str]: return {key: str(value) for key, value in asdict(self).items()} def get_runtime_paths() -> RuntimePaths: root = Path(os.getenv("COINHUNTER_HOME", "~/.coinhunter")).expanduser() return RuntimePaths( root=root, config_file=root / "config.toml", env_file=root / ".env", logs_dir=root / "logs", ) def ensure_runtime_dirs(paths: RuntimePaths | None = None) -> RuntimePaths: paths = paths or get_runtime_paths() paths.root.mkdir(parents=True, exist_ok=True) paths.logs_dir.mkdir(parents=True, exist_ok=True) return paths def json_default(value: Any) -> Any: if is_dataclass(value) and not isinstance(value, type): return asdict(value) if isinstance(value, (datetime, date)): return value.isoformat() if isinstance(value, Path): return str(value) raise TypeError(f"Object of type {type(value).__name__} is not JSON serializable") def print_json(payload: Any) -> None: print(json.dumps(payload, ensure_ascii=False, indent=2, sort_keys=True, default=json_default)) def self_upgrade() -> dict[str, Any]: if shutil.which("pipx"): cmd = ["pipx", "upgrade", "coinhunter"] else: cmd = [sys.executable, "-m", "pip", "install", "--upgrade", "coinhunter"] result = subprocess.run(cmd, capture_output=True, text=True) return { "command": " ".join(cmd), "returncode": result.returncode, "stdout": result.stdout.strip(), "stderr": result.stderr.strip(), } # --------------------------------------------------------------------------- # TUI / Agent output helpers # --------------------------------------------------------------------------- _ANSI_RE = re.compile(r"\033\[[0-9;]*m") _BOLD = "\033[1m" _RESET = "\033[0m" _CYAN = "\033[36m" _GREEN = "\033[32m" _YELLOW = "\033[33m" _RED = "\033[31m" _DIM = "\033[2m" def _strip_ansi(text: str) -> str: return _ANSI_RE.sub("", text) def _color(text: str, color: str) -> str: return f"{color}{text}{_RESET}" def _cell_width(text: str) -> int: return len(_strip_ansi(text)) def _pad(text: str, width: int, align: str = "left") -> str: pad = width - _cell_width(text) if align == "right": return " " * pad + text return text + " " * pad def _fmt_number(value: Any) -> str: if value is None: return "—" if isinstance(value, bool): return "true" if value else "false" if isinstance(value, (int, float)): s = f"{value:,.4f}" s = s.rstrip("0").rstrip(".") return s return str(value) def _is_large_dataset(payload: Any, threshold: int = 8) -> bool: if isinstance(payload, dict): for value in payload.values(): if isinstance(value, list) and len(value) > threshold: return True return False def _print_compact(payload: dict[str, Any]) -> None: target_key = None target_rows: list[Any] = [] for key, value in payload.items(): if isinstance(value, list) and len(value) > len(target_rows): target_key = key target_rows = value if target_rows and isinstance(target_rows[0], dict): headers = list(target_rows[0].keys()) output = io.StringIO() writer = csv.writer(output, delimiter="|", lineterminator="\n") writer.writerow(headers) for row in target_rows: writer.writerow([str(row.get(h, "")) for h in headers]) print(f"mode=compact|source={target_key}") print(output.getvalue().strip()) else: for key, value in payload.items(): print(f"{key}={value}") def _h_line(widths: list[int], left: str, mid: str, right: str) -> str: parts = ["─" * (w + 2) for w in widths] return left + mid.join(parts) + right def _print_box_table( title: str, headers: list[str], rows: list[list[str]], aligns: list[str] | None = None, ) -> None: if not rows: print(f"{_BOLD}{_CYAN}{title}{_RESET}") print(" (empty)") return aligns = aligns or ["left"] * len(headers) col_widths = [_cell_width(h) for h in headers] for row in rows: for i, cell in enumerate(row): col_widths[i] = max(col_widths[i], _cell_width(cell)) if title: print(f"{_BOLD}{_CYAN}{title}{_RESET}") print(_h_line(col_widths, "┌", "┬", "┐")) header_cells = [_pad(headers[i], col_widths[i], aligns[i]) for i in range(len(headers))] print("│ " + " │ ".join(header_cells) + " │") print(_h_line(col_widths, "├", "┼", "┤")) for row in rows: cells = [_pad(row[i], col_widths[i], aligns[i]) for i in range(len(row))] print("│ " + " │ ".join(cells) + " │") print(_h_line(col_widths, "└", "┴", "┘")) def _render_tui(payload: Any) -> None: if not isinstance(payload, dict): print(str(payload)) return if "overview" in payload: overview = payload.get("overview", {}) print(f"\n{_BOLD}{_CYAN} ACCOUNT OVERVIEW {_RESET}") print(f" Total Equity: {_GREEN}{_fmt_number(overview.get('total_equity_usdt', 0))} USDT{_RESET}") print(f" Spot Assets: {_fmt_number(overview.get('spot_asset_count', 0))}") print(f" Positions: {_fmt_number(overview.get('spot_position_count', 0))}") if payload.get("balances"): print() _render_tui({"balances": payload["balances"]}) if payload.get("positions"): print() _render_tui({"positions": payload["positions"]}) return if "balances" in payload: rows = payload["balances"] table_rows: list[list[str]] = [] for r in rows: table_rows.append( [ r.get("market_type", ""), r.get("asset", ""), _fmt_number(r.get("free", 0)), _fmt_number(r.get("locked", 0)), _fmt_number(r.get("total", 0)), _fmt_number(r.get("notional_usdt", 0)), ] ) _print_box_table( "BALANCES", ["Market", "Asset", "Free", "Locked", "Total", "Notional (USDT)"], table_rows, aligns=["left", "left", "right", "right", "right", "right"], ) return if "positions" in payload: rows = payload["positions"] table_rows = [] for r in rows: entry = _fmt_number(r.get("entry_price")) if r.get("entry_price") is not None else "—" pnl = _fmt_number(r.get("unrealized_pnl")) if r.get("unrealized_pnl") is not None else "—" table_rows.append( [ r.get("market_type", ""), r.get("symbol", ""), r.get("side", ""), _fmt_number(r.get("quantity", 0)), entry, _fmt_number(r.get("mark_price", 0)), _fmt_number(r.get("notional_usdt", 0)), pnl, ] ) _print_box_table( "POSITIONS", ["Market", "Symbol", "Side", "Qty", "Entry", "Mark", "Notional", "PnL"], table_rows, aligns=["left", "left", "left", "right", "right", "right", "right", "right"], ) return if "tickers" in payload: rows = payload["tickers"] table_rows = [] for r in rows: pct = r.get("price_change_pct", 0) pct_str = _color(f"{pct:+.2f}%", _GREEN if pct >= 0 else _RED) table_rows.append( [ r.get("symbol", ""), _fmt_number(r.get("last_price", 0)), pct_str, _fmt_number(r.get("quote_volume", 0)), ] ) _print_box_table( "24H TICKERS", ["Symbol", "Last Price", "Change %", "Quote Volume"], table_rows, aligns=["left", "right", "right", "right"], ) return if "klines" in payload: rows = payload["klines"] print(f"\n{_BOLD}{_CYAN} KLINES {_RESET} interval={payload.get('interval')} limit={payload.get('limit')} count={len(rows)}") display_rows = rows[:10] table_rows = [] for r in display_rows: table_rows.append( [ r.get("symbol", ""), str(r.get("open_time", ""))[:10], _fmt_number(r.get("open", 0)), _fmt_number(r.get("high", 0)), _fmt_number(r.get("low", 0)), _fmt_number(r.get("close", 0)), _fmt_number(r.get("volume", 0)), ] ) _print_box_table( "", ["Symbol", "Time", "Open", "High", "Low", "Close", "Vol"], table_rows, aligns=["left", "left", "right", "right", "right", "right", "right"], ) if len(rows) > 10: print(f" {_DIM}... and {len(rows) - 10} more rows{_RESET}") return if "trade" in payload: t = payload["trade"] status = t.get("status", "UNKNOWN") status_color = _GREEN if status == "FILLED" else _YELLOW if status == "DRY_RUN" else _CYAN print(f"\n{_BOLD}{_CYAN} TRADE RESULT {_RESET}") print(f" Market: {t.get('market_type', '').upper()}") print(f" Symbol: {t.get('symbol', '')}") print(f" Side: {t.get('side', '')}") print(f" Type: {t.get('order_type', '')}") print(f" Status: {_color(status, status_color)}") print(f" Dry Run: {_fmt_number(t.get('dry_run', False))}") return if "recommendations" in payload: rows = payload["recommendations"] print(f"\n{_BOLD}{_CYAN} RECOMMENDATIONS {_RESET} count={len(rows)}") for i, r in enumerate(rows, 1): score = r.get("score", 0) action = r.get("action", "") action_color = _GREEN if action == "add" else _YELLOW if action == "hold" else _RED if action == "exit" else _CYAN print(f" {i}. {_BOLD}{r.get('symbol', '')}{_RESET} action={_color(action, action_color)} score={score:.4f}") for reason in r.get("reasons", []): print(f" · {reason}") metrics = r.get("metrics", {}) if metrics: metric_str = " ".join(f"{k}={v}" for k, v in metrics.items()) print(f" {_DIM}{metric_str}{_RESET}") return if "command" in payload and "returncode" in payload: rc = payload.get("returncode", 0) stdout = payload.get("stdout", "") stderr = payload.get("stderr", "") if rc == 0: print(f"\n{_GREEN}✓{_RESET} Update completed") else: print(f"\n{_RED}✗{_RESET} Update failed (exit code {rc})") if stdout: for line in stdout.strip().splitlines(): print(f" {line}") if rc != 0 and stderr: print(f" {_YELLOW}Details:{_RESET}") for line in stderr.strip().splitlines(): print(f" {line}") return if "created_or_updated" in payload: print(f"\n{_BOLD}{_CYAN} INITIALIZED {_RESET}") print(f" Root: {payload.get('root', '')}") print(f" Config: {payload.get('config_file', '')}") print(f" Env: {payload.get('env_file', '')}") print(f" Logs: {payload.get('logs_dir', '')}") files = payload.get("created_or_updated", []) if files: action = "overwritten" if payload.get("force") else "created" print(f" Files {action}: {', '.join(files)}") comp = payload.get("completion", {}) if comp.get("installed"): print(f"\n {_GREEN}✓{_RESET} Shell completions installed for {comp.get('shell', '')}") print(f" Path: {comp.get('path', '')}") if comp.get("hint"): print(f" Hint: {comp.get('hint', '')}") elif comp.get("reason"): print(f"\n Shell completions: {comp.get('reason', '')}") return # Generic fallback for single-list payloads if len(payload) == 1: key, value = next(iter(payload.items())) if isinstance(value, list) and value and isinstance(value[0], dict): _render_tui({key: value}) return # Simple key-value fallback for key, value in payload.items(): if isinstance(value, str) and "\n" in value: print(f" {key}:") for line in value.strip().splitlines(): print(f" {line}") else: print(f" {key}: {value}") def print_output(payload: Any, *, agent: bool = False) -> None: if agent: if _is_large_dataset(payload): _print_compact(payload) else: print_json(payload) else: _render_tui(payload) # --------------------------------------------------------------------------- # Spinner / loading animation # --------------------------------------------------------------------------- _SPINNER_FRAMES = ["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"] class _SpinnerThread(threading.Thread): def __init__(self, message: str, interval: float = 0.08) -> None: super().__init__(daemon=True) self.message = message self.interval = interval self._stop_event = threading.Event() def run(self) -> None: i = 0 while not self._stop_event.is_set(): frame = _SPINNER_FRAMES[i % len(_SPINNER_FRAMES)] sys.stdout.write(f"\r{_CYAN}{frame}{_RESET} {self.message} ") sys.stdout.flush() self._stop_event.wait(self.interval) i += 1 def stop(self) -> None: self._stop_event.set() self.join() sys.stdout.write("\r\033[K") sys.stdout.flush() @contextmanager def with_spinner(message: str, *, enabled: bool = True) -> Iterator[None]: if not enabled or not sys.stdout.isatty(): yield return spinner = _SpinnerThread(message) spinner.start() try: yield finally: spinner.stop() def _detect_shell() -> str: shell = os.getenv("SHELL", "") if "zsh" in shell: return "zsh" if "bash" in shell: return "bash" return "" def _zshrc_path() -> Path: return Path.home() / ".zshrc" def _bashrc_path() -> Path: return Path.home() / ".bashrc" def _rc_contains(rc_path: Path, snippet: str) -> bool: if not rc_path.exists(): return False return snippet in rc_path.read_text(encoding="utf-8") def install_shell_completion(parser: argparse.ArgumentParser) -> dict[str, Any]: if shtab is None: return {"shell": None, "installed": False, "reason": "shtab is not installed"} shell = _detect_shell() if not shell: return {"shell": None, "installed": False, "reason": "unable to detect shell from $SHELL"} script = shtab.complete(parser, shell=shell, preamble="") installed_path: Path | None = None hint: str | None = None if shell == "zsh": comp_dir = Path.home() / ".zsh" / "completions" comp_dir.mkdir(parents=True, exist_ok=True) installed_path = comp_dir / "_coinhunter" installed_path.write_text(script, encoding="utf-8") rc_path = _zshrc_path() fpath_line = "fpath+=(~/.zsh/completions)" if not _rc_contains(rc_path, fpath_line): rc_path.write_text(fpath_line + "\n" + rc_path.read_text(encoding="utf-8") if rc_path.exists() else fpath_line + "\n", encoding="utf-8") hint = "Added fpath+=(~/.zsh/completions) to ~/.zshrc; restart your terminal or run 'compinit'" else: hint = "Run 'compinit' or restart your terminal to activate completions" elif shell == "bash": comp_dir = Path.home() / ".local" / "share" / "bash-completion" / "completions" comp_dir.mkdir(parents=True, exist_ok=True) installed_path = comp_dir / "coinhunter" installed_path.write_text(script, encoding="utf-8") rc_path = _bashrc_path() source_line = '[[ -r "~/.local/share/bash-completion/completions/coinhunter" ]] && . "~/.local/share/bash-completion/completions/coinhunter"' if not _rc_contains(rc_path, source_line): rc_path.write_text(source_line + "\n" + rc_path.read_text(encoding="utf-8") if rc_path.exists() else source_line + "\n", encoding="utf-8") hint = "Added bash completion source line to ~/.bashrc; restart your terminal" else: hint = "Restart your terminal or source ~/.bashrc to activate completions" return { "shell": shell, "installed": True, "path": str(installed_path) if installed_path else None, "hint": hint, }