feat: bootstrap coinhunter cli package

This commit is contained in:
2026-04-15 16:40:56 +08:00
commit 7586685d5f
17 changed files with 2894 additions and 0 deletions

107
src/coinhunter/logger.py Executable file
View File

@@ -0,0 +1,107 @@
#!/usr/bin/env python3
"""Coin Hunter structured logger."""
import json
import traceback
from datetime import datetime, timezone, timedelta
from pathlib import Path
BASE_DIR = Path.home() / ".coinhunter"
LOG_DIR = BASE_DIR / "logs"
SCHEMA_VERSION = 2
CST = timezone(timedelta(hours=8))
def bj_now():
return datetime.now(CST)
def ensure_dir():
LOG_DIR.mkdir(parents=True, exist_ok=True)
def _append_jsonl(prefix: str, payload: dict):
ensure_dir()
date_str = bj_now().strftime("%Y%m%d")
log_file = LOG_DIR / f"{prefix}_{date_str}.jsonl"
with open(log_file, "a", encoding="utf-8") as f:
f.write(json.dumps(payload, ensure_ascii=False) + "\n")
def log_event(prefix: str, payload: dict):
entry = {
"schema_version": SCHEMA_VERSION,
"timestamp": bj_now().isoformat(),
**payload,
}
_append_jsonl(prefix, entry)
return entry
def log_decision(data: dict):
return log_event("decisions", data)
def log_trade(action: str, symbol: str, qty: float = None, amount_usdt: float = None,
price: float = None, note: str = "", **extra):
payload = {
"action": action,
"symbol": symbol,
"qty": qty,
"amount_usdt": amount_usdt,
"price": price,
"note": note,
**extra,
}
return log_event("trades", payload)
def log_snapshot(market_data: dict, note: str = "", **extra):
return log_event("snapshots", {"market_data": market_data, "note": note, **extra})
def log_error(where: str, error: Exception | str, **extra):
payload = {
"where": where,
"error_type": error.__class__.__name__ if isinstance(error, Exception) else "Error",
"error": str(error),
"traceback": traceback.format_exc() if isinstance(error, Exception) else None,
**extra,
}
return log_event("errors", payload)
def get_logs_by_date(log_type: str, date_str: str = None) -> list:
if date_str is None:
date_str = bj_now().strftime("%Y%m%d")
log_file = LOG_DIR / f"{log_type}_{date_str}.jsonl"
if not log_file.exists():
return []
entries = []
with open(log_file, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line:
continue
try:
entries.append(json.loads(line))
except json.JSONDecodeError:
continue
return entries
def get_logs_last_n_hours(log_type: str, n_hours: int = 1) -> list:
now = bj_now()
cutoff = now - timedelta(hours=n_hours)
entries = []
for offset in [0, -1]:
date_str = (now + timedelta(days=offset)).strftime("%Y%m%d")
for entry in get_logs_by_date(log_type, date_str):
try:
ts = datetime.fromisoformat(entry["timestamp"])
except Exception:
continue
if ts >= cutoff:
entries.append(entry)
entries.sort(key=lambda x: x.get("timestamp", ""))
return entries