#!/usr/bin/env python3 """Coin Hunter structured logger.""" import json import traceback from datetime import datetime, timezone, timedelta from .runtime import get_runtime_paths LOG_DIR = get_runtime_paths().logs_dir SCHEMA_VERSION = 2 CST = timezone(timedelta(hours=8)) def bj_now(): return datetime.now(CST) def ensure_dir(): LOG_DIR.mkdir(parents=True, exist_ok=True) def _append_jsonl(prefix: str, payload: dict): ensure_dir() date_str = bj_now().strftime("%Y%m%d") log_file = LOG_DIR / f"{prefix}_{date_str}.jsonl" with open(log_file, "a", encoding="utf-8") as f: f.write(json.dumps(payload, ensure_ascii=False) + "\n") def log_event(prefix: str, payload: dict): entry = { "schema_version": SCHEMA_VERSION, "timestamp": bj_now().isoformat(), **payload, } _append_jsonl(prefix, entry) return entry def log_decision(data: dict): return log_event("decisions", data) def log_trade(action: str, symbol: str, qty: float = None, amount_usdt: float = None, price: float = None, note: str = "", **extra): payload = { "action": action, "symbol": symbol, "qty": qty, "amount_usdt": amount_usdt, "price": price, "note": note, **extra, } return log_event("trades", payload) def log_snapshot(market_data: dict, note: str = "", **extra): return log_event("snapshots", {"market_data": market_data, "note": note, **extra}) def log_error(where: str, error: Exception | str, **extra): payload = { "where": where, "error_type": error.__class__.__name__ if isinstance(error, Exception) else "Error", "error": str(error), "traceback": traceback.format_exc() if isinstance(error, Exception) else None, **extra, } return log_event("errors", payload) def get_logs_by_date(log_type: str, date_str: str = None) -> list: if date_str is None: date_str = bj_now().strftime("%Y%m%d") log_file = LOG_DIR / f"{log_type}_{date_str}.jsonl" if not log_file.exists(): return [] entries = [] with open(log_file, "r", encoding="utf-8") as f: for line in f: line = line.strip() if not line: continue try: entries.append(json.loads(line)) except json.JSONDecodeError: continue return entries def get_logs_last_n_hours(log_type: str, n_hours: int = 1) -> list: now = bj_now() cutoff = now - timedelta(hours=n_hours) entries = [] for offset in [0, -1]: date_str = (now + timedelta(days=offset)).strftime("%Y%m%d") for entry in get_logs_by_date(log_type, date_str): try: ts = datetime.fromisoformat(entry["timestamp"]) except Exception: continue if ts >= cutoff: entries.append(entry) entries.sort(key=lambda x: x.get("timestamp", "")) return entries