Files
coinhunter-cli/src/coinhunter/runtime.py
Tacit Lab 4312b16288 feat: configurable ticker window for market stats (1h, 4h, 1d)
- Replace hardcoded ticker_24h with ticker_stats supporting configurable window
- Add -w/--window flag to `market tickers` (choices: 1h, 4h, 1d, default 1d)
- Update TUI title and JSON output to include window field
- Keep opportunity/pf service on 1d default
- Sync tests and doc comments

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-20 11:11:11 +08:00

556 lines
19 KiB
Python

"""Runtime helpers for CoinHunter V2."""
from __future__ import annotations
import argparse
import csv
import io
import json
import os
import re
import shutil
import subprocess
import sys
import threading
from collections.abc import Iterator
from contextlib import contextmanager
from dataclasses import asdict, dataclass, is_dataclass
from datetime import date, datetime
from pathlib import Path
from typing import Any
try:
import shtab
except Exception: # pragma: no cover
shtab = None # type: ignore[assignment]
@dataclass(frozen=True)
class RuntimePaths:
root: Path
config_file: Path
env_file: Path
logs_dir: Path
def as_dict(self) -> dict[str, str]:
return {key: str(value) for key, value in asdict(self).items()}
def get_runtime_paths() -> RuntimePaths:
root = Path(os.getenv("COINHUNTER_HOME", "~/.coinhunter")).expanduser()
return RuntimePaths(
root=root,
config_file=root / "config.toml",
env_file=root / ".env",
logs_dir=root / "logs",
)
def ensure_runtime_dirs(paths: RuntimePaths | None = None) -> RuntimePaths:
paths = paths or get_runtime_paths()
paths.root.mkdir(parents=True, exist_ok=True)
paths.logs_dir.mkdir(parents=True, exist_ok=True)
return paths
def json_default(value: Any) -> Any:
if is_dataclass(value) and not isinstance(value, type):
return asdict(value)
if isinstance(value, (datetime, date)):
return value.isoformat()
if isinstance(value, Path):
return str(value)
raise TypeError(f"Object of type {type(value).__name__} is not JSON serializable")
def print_json(payload: Any) -> None:
print(json.dumps(payload, ensure_ascii=False, indent=2, sort_keys=True, default=json_default))
def self_upgrade() -> dict[str, Any]:
if shutil.which("pipx"):
cmd = ["pipx", "upgrade", "coinhunter"]
else:
cmd = [sys.executable, "-m", "pip", "install", "--upgrade", "coinhunter"]
result = subprocess.run(cmd, capture_output=True, text=True)
return {
"command": " ".join(cmd),
"returncode": result.returncode,
"stdout": result.stdout.strip(),
"stderr": result.stderr.strip(),
}
# ---------------------------------------------------------------------------
# TUI / Agent output helpers
# ---------------------------------------------------------------------------
_ANSI_RE = re.compile(r"\033\[[0-9;]*m")
_BOLD = "\033[1m"
_RESET = "\033[0m"
_CYAN = "\033[36m"
_GREEN = "\033[32m"
_YELLOW = "\033[33m"
_RED = "\033[31m"
_DIM = "\033[2m"
def _strip_ansi(text: str) -> str:
return _ANSI_RE.sub("", text)
def _color(text: str, color: str) -> str:
return f"{color}{text}{_RESET}"
def _cell_width(text: str) -> int:
return len(_strip_ansi(text))
def _pad(text: str, width: int, align: str = "left") -> str:
pad = width - _cell_width(text)
if align == "right":
return " " * pad + text
return text + " " * pad
def _fmt_number(value: Any) -> str:
if value is None:
return ""
if isinstance(value, bool):
return "true" if value else "false"
if isinstance(value, (int, float)):
s = f"{value:,.4f}"
s = s.rstrip("0").rstrip(".")
return s
return str(value)
def _fmt_local_ts(ts: str) -> str:
try:
dt = datetime.fromisoformat(ts.replace("Z", "+00:00"))
return dt.astimezone().strftime("%Y-%m-%d %H:%M:%S")
except Exception:
return ts
def _event_color(event: str) -> str:
if "failed" in event or "error" in event:
return f"{_DIM}{_RED}"
if event.startswith("trade"):
return f"{_DIM}{_GREEN}"
if event.startswith("opportunity"):
return f"{_DIM}{_YELLOW}"
return _DIM
def _is_large_dataset(payload: Any, threshold: int = 8) -> bool:
if isinstance(payload, dict):
for value in payload.values():
if isinstance(value, list) and len(value) > threshold:
return True
return False
def _print_compact(payload: dict[str, Any]) -> None:
target_key = None
target_rows: list[Any] = []
for key, value in payload.items():
if isinstance(value, list) and len(value) > len(target_rows):
target_key = key
target_rows = value
if target_rows and isinstance(target_rows[0], dict):
headers = list(target_rows[0].keys())
output = io.StringIO()
writer = csv.writer(output, delimiter="|", lineterminator="\n")
writer.writerow(headers)
for row in target_rows:
writer.writerow([str(row.get(h, "")) for h in headers])
print(f"mode=compact|source={target_key}")
print(output.getvalue().strip())
else:
for key, value in payload.items():
print(f"{key}={value}")
def _h_line(widths: list[int], left: str, mid: str, right: str) -> str:
parts = ["" * (w + 2) for w in widths]
return left + mid.join(parts) + right
def _print_box_table(
title: str,
headers: list[str],
rows: list[list[str]],
aligns: list[str] | None = None,
) -> None:
if not rows:
print(f"{_BOLD}{_CYAN}{title}{_RESET}")
print(" (empty)")
return
aligns = aligns or ["left"] * len(headers)
col_widths = [_cell_width(h) for h in headers]
for row in rows:
for i, cell in enumerate(row):
col_widths[i] = max(col_widths[i], _cell_width(cell))
if title:
print(f"{_BOLD}{_CYAN}{title}{_RESET}")
print(_h_line(col_widths, "", "", ""))
header_cells = [_pad(headers[i], col_widths[i], aligns[i]) for i in range(len(headers))]
print("" + "".join(header_cells) + "")
print(_h_line(col_widths, "", "", ""))
for row in rows:
cells = [_pad(row[i], col_widths[i], aligns[i]) for i in range(len(row))]
print("" + "".join(cells) + "")
print(_h_line(col_widths, "", "", ""))
def _render_tui(payload: Any) -> None:
if not isinstance(payload, dict):
print(str(payload))
return
if "balances" in payload:
rows = payload["balances"]
table_rows: list[list[str]] = []
for r in rows:
is_dust = r.get("is_dust", False)
dust_label = f"{_DIM}dust{_RESET}" if is_dust else ""
table_rows.append(
[
r.get("asset", ""),
_fmt_number(r.get("free", 0)),
_fmt_number(r.get("locked", 0)),
_fmt_number(r.get("total", 0)),
_fmt_number(r.get("notional_usdt", 0)),
dust_label,
]
)
_print_box_table(
"BALANCES",
["Asset", "Free", "Locked", "Total", "Notional (USDT)", ""],
table_rows,
aligns=["left", "right", "right", "right", "right", "left"],
)
return
if "positions" in payload:
rows = payload["positions"]
table_rows = []
for r in rows:
entry = _fmt_number(r.get("entry_price")) if r.get("entry_price") is not None else ""
pnl = _fmt_number(r.get("unrealized_pnl")) if r.get("unrealized_pnl") is not None else ""
table_rows.append(
[
r.get("market_type", ""),
r.get("symbol", ""),
r.get("side", ""),
_fmt_number(r.get("quantity", 0)),
entry,
_fmt_number(r.get("mark_price", 0)),
_fmt_number(r.get("notional_usdt", 0)),
pnl,
]
)
_print_box_table(
"POSITIONS",
["Market", "Symbol", "Side", "Qty", "Entry", "Mark", "Notional", "PnL"],
table_rows,
aligns=["left", "left", "left", "right", "right", "right", "right", "right"],
)
return
if "tickers" in payload:
rows = payload["tickers"]
table_rows = []
for r in rows:
pct = r.get("price_change_pct", 0)
pct_str = _color(f"{pct:+.2f}%", _GREEN if pct >= 0 else _RED)
table_rows.append(
[
r.get("symbol", ""),
_fmt_number(r.get("last_price", 0)),
pct_str,
_fmt_number(r.get("quote_volume", 0)),
]
)
_print_box_table(
f"TICKERS window={payload.get('window', '1d')}",
["Symbol", "Last Price", "Change %", "Quote Volume"],
table_rows,
aligns=["left", "right", "right", "right"],
)
return
if "klines" in payload:
rows = payload["klines"]
print(
f"\n{_BOLD}{_CYAN} KLINES {_RESET} interval={payload.get('interval')} limit={payload.get('limit')} count={len(rows)}"
)
display_rows = rows[:10]
table_rows = []
for r in display_rows:
table_rows.append(
[
r.get("symbol", ""),
str(r.get("open_time", ""))[:10],
_fmt_number(r.get("open", 0)),
_fmt_number(r.get("high", 0)),
_fmt_number(r.get("low", 0)),
_fmt_number(r.get("close", 0)),
_fmt_number(r.get("volume", 0)),
]
)
_print_box_table(
"",
["Symbol", "Time", "Open", "High", "Low", "Close", "Vol"],
table_rows,
aligns=["left", "left", "right", "right", "right", "right", "right"],
)
if len(rows) > 10:
print(f" {_DIM}... and {len(rows) - 10} more rows{_RESET}")
return
if "trade" in payload:
t = payload["trade"]
status = t.get("status", "UNKNOWN")
status_color = _GREEN if status == "FILLED" else _YELLOW if status == "DRY_RUN" else _CYAN
print(f"\n{_BOLD}{_CYAN} TRADE RESULT {_RESET}")
print(f" Market: {t.get('market_type', '').upper()}")
print(f" Symbol: {t.get('symbol', '')}")
print(f" Side: {t.get('side', '')}")
print(f" Type: {t.get('order_type', '')}")
print(f" Status: {_color(status, status_color)}")
print(f" Dry Run: {_fmt_number(t.get('dry_run', False))}")
return
if "recommendations" in payload:
rows = payload["recommendations"]
print(f"\n{_BOLD}{_CYAN} RECOMMENDATIONS {_RESET} count={len(rows)}")
for i, r in enumerate(rows, 1):
score = r.get("score", 0)
action = r.get("action", "")
action_color = (
_GREEN if action == "add" else _YELLOW if action == "hold" else _RED if action == "exit" else _CYAN
)
print(
f" {i}. {_BOLD}{r.get('symbol', '')}{_RESET} action={_color(action, action_color)} score={score:.4f}"
)
for reason in r.get("reasons", []):
print(f" · {reason}")
metrics = r.get("metrics", {})
if metrics:
metric_str = " ".join(f"{k}={v}" for k, v in metrics.items())
print(f" {_DIM}{metric_str}{_RESET}")
return
if "command" in payload and "returncode" in payload:
rc = payload.get("returncode", 0)
stdout = payload.get("stdout", "")
stderr = payload.get("stderr", "")
if rc == 0:
print(f"{_GREEN}{_RESET} Update completed")
else:
print(f"{_RED}{_RESET} Update failed (exit code {rc})")
if stdout:
for line in stdout.strip().splitlines():
print(f" {line}")
if rc != 0 and stderr:
print(f" {_YELLOW}Details:{_RESET}")
for line in stderr.strip().splitlines():
print(f" {line}")
return
if "entries" in payload:
rows = payload["entries"]
print(f"\n{_BOLD}{_CYAN} AUDIT LOG {_RESET}")
if not rows:
print(" (no audit entries)")
return
for r in rows:
ts = _fmt_local_ts(r.get("timestamp", ""))
event = r.get("event", "")
detail_parts: list[str] = []
for key in ("symbol", "side", "qty", "quote_amount", "order_type", "status", "dry_run", "error"):
val = r.get(key)
if val is not None:
detail_parts.append(f"{key}={val}")
if not detail_parts:
for key, val in r.items():
if key not in ("timestamp", "event") and not isinstance(val, (dict, list)):
detail_parts.append(f"{key}={val}")
print(f"\n {_DIM}{ts}{_RESET} {_event_color(event)}{event}{_RESET}")
if detail_parts:
print(f" {' '.join(detail_parts)}")
return
if "created_or_updated" in payload:
print(f"\n{_BOLD}{_CYAN} INITIALIZED {_RESET}")
print(f" Root: {payload.get('root', '')}")
print(f" Config: {payload.get('config_file', '')}")
print(f" Env: {payload.get('env_file', '')}")
print(f" Logs: {payload.get('logs_dir', '')}")
files = payload.get("created_or_updated", [])
if files:
action = "overwritten" if payload.get("force") else "created"
print(f" Files {action}: {', '.join(files)}")
comp = payload.get("completion", {})
if comp.get("installed"):
print(f"\n {_GREEN}{_RESET} Shell completions installed for {comp.get('shell', '')}")
print(f" Path: {comp.get('path', '')}")
if comp.get("hint"):
print(f" Hint: {comp.get('hint', '')}")
elif comp.get("reason"):
print(f"\n Shell completions: {comp.get('reason', '')}")
return
# Generic fallback for single-list payloads
if len(payload) == 1:
key, value = next(iter(payload.items()))
if isinstance(value, list) and value and isinstance(value[0], dict):
_render_tui({key: value})
return
# Simple key-value fallback
for key, value in payload.items():
if isinstance(value, str) and "\n" in value:
print(f" {key}:")
for line in value.strip().splitlines():
print(f" {line}")
else:
print(f" {key}: {value}")
def print_output(payload: Any, *, agent: bool = False) -> None:
if agent:
print_json(payload)
else:
_render_tui(payload)
# ---------------------------------------------------------------------------
# Spinner / loading animation
# ---------------------------------------------------------------------------
_SPINNER_FRAMES = ["", "", "", "", "", "", "", "", "", ""]
class _SpinnerThread(threading.Thread):
def __init__(self, message: str, interval: float = 0.08) -> None:
super().__init__(daemon=True)
self.message = message
self.interval = interval
self._stop_event = threading.Event()
def run(self) -> None:
i = 0
while not self._stop_event.is_set():
frame = _SPINNER_FRAMES[i % len(_SPINNER_FRAMES)]
sys.stdout.write(f"\r{_CYAN}{frame}{_RESET} {self.message} ")
sys.stdout.flush()
self._stop_event.wait(self.interval)
i += 1
def stop(self) -> None:
self._stop_event.set()
self.join()
sys.stdout.write("\r\033[K")
sys.stdout.flush()
@contextmanager
def with_spinner(message: str, *, enabled: bool = True) -> Iterator[None]:
if not enabled or not sys.stdout.isatty():
yield
return
spinner = _SpinnerThread(message)
spinner.start()
try:
yield
finally:
spinner.stop()
def _detect_shell() -> str:
shell = os.getenv("SHELL", "")
if "zsh" in shell:
return "zsh"
if "bash" in shell:
return "bash"
return ""
def _zshrc_path() -> Path:
return Path.home() / ".zshrc"
def _bashrc_path() -> Path:
return Path.home() / ".bashrc"
def _rc_contains(rc_path: Path, snippet: str) -> bool:
if not rc_path.exists():
return False
return snippet in rc_path.read_text(encoding="utf-8")
def install_shell_completion(parser: argparse.ArgumentParser) -> dict[str, Any]:
if shtab is None:
return {"shell": None, "installed": False, "reason": "shtab is not installed"}
shell = _detect_shell()
if not shell:
return {"shell": None, "installed": False, "reason": "unable to detect shell from $SHELL"}
script = shtab.complete(parser, shell=shell, preamble="")
# Also register completion for the "coinhunter" alias
prog = parser.prog.replace("-", "_")
func = f"_shtab_{prog}"
if shell == "bash":
script += f"\ncomplete -o filenames -F {func} coinhunter\n"
elif shell == "zsh":
script += f"\ncompdef {func} coinhunter\n"
installed_path: Path | None = None
hint: str | None = None
if shell == "zsh":
comp_dir = Path.home() / ".zsh" / "completions"
comp_dir.mkdir(parents=True, exist_ok=True)
installed_path = comp_dir / "_coinhunter"
installed_path.write_text(script, encoding="utf-8")
rc_path = _zshrc_path()
fpath_line = "fpath+=(~/.zsh/completions)"
if not _rc_contains(rc_path, fpath_line):
rc_path.write_text(
fpath_line + "\n" + rc_path.read_text(encoding="utf-8") if rc_path.exists() else fpath_line + "\n",
encoding="utf-8",
)
hint = "Added fpath+=(~/.zsh/completions) to ~/.zshrc; restart your terminal or run 'compinit'"
else:
hint = "Run 'compinit' or restart your terminal to activate completions"
elif shell == "bash":
comp_dir = Path.home() / ".local" / "share" / "bash-completion" / "completions"
comp_dir.mkdir(parents=True, exist_ok=True)
installed_path = comp_dir / "coinhunter"
installed_path.write_text(script, encoding="utf-8")
rc_path = _bashrc_path()
source_line = '[[ -r "~/.local/share/bash-completion/completions/coinhunter" ]] && . "~/.local/share/bash-completion/completions/coinhunter"'
if not _rc_contains(rc_path, source_line):
rc_path.write_text(
source_line + "\n" + rc_path.read_text(encoding="utf-8") if rc_path.exists() else source_line + "\n",
encoding="utf-8",
)
hint = "Added bash completion source line to ~/.bashrc; restart your terminal"
else:
hint = "Restart your terminal or source ~/.bashrc to activate completions"
return {
"shell": shell,
"installed": True,
"path": str(installed_path) if installed_path else None,
"hint": hint,
}