Files
coinhunter-cli/src/coinhunter/audit.py
Tacit Lab f528575aa8 feat: add catlog command, agent flag reorder, and TUI polish
- Add `coinhunter catlog` with limit/offset pagination for audit logs
- Optimize audit log reading with deque to avoid loading all history
- Allow `-a/--agent` flag after subcommands
- Fix upgrade spinner artifact and empty line issues
- Render audit log TUI as timeline with low-saturation event colors
- Convert audit timestamps to local timezone in TUI
- Remove futures-related capabilities
- Add conda environment.yml for development
- Bump version to 2.0.9 and update README

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 16:42:47 +08:00

71 lines
2.5 KiB
Python

"""Audit logging for CoinHunter V2."""
from __future__ import annotations
import json
from collections import deque
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from .config import load_config, resolve_log_dir
from .runtime import RuntimePaths, ensure_runtime_dirs, get_runtime_paths, json_default
_audit_dir_cache: dict[str, Path] = {}
def _resolve_audit_dir(paths: RuntimePaths) -> Path:
key = str(paths.root)
if key not in _audit_dir_cache:
config = load_config(paths)
_audit_dir_cache[key] = resolve_log_dir(config, paths)
return _audit_dir_cache[key]
def _audit_path(paths: RuntimePaths | None = None) -> Path:
paths = ensure_runtime_dirs(paths or get_runtime_paths())
logs_dir = _resolve_audit_dir(paths)
logs_dir.mkdir(parents=True, exist_ok=True)
return logs_dir / f"audit_{datetime.now(timezone.utc).strftime('%Y%m%d')}.jsonl"
def audit_event(event: str, payload: dict[str, Any], paths: RuntimePaths | None = None) -> dict[str, Any]:
entry = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"event": event,
**payload,
}
with _audit_path(paths).open("a", encoding="utf-8") as handle:
handle.write(json.dumps(entry, ensure_ascii=False, default=json_default) + "\n")
return entry
def read_audit_log(paths: RuntimePaths | None = None, limit: int = 10, offset: int = 0) -> list[dict[str, Any]]:
paths = ensure_runtime_dirs(paths or get_runtime_paths())
logs_dir = _resolve_audit_dir(paths)
if not logs_dir.exists():
return []
audit_files = sorted(logs_dir.glob("audit_*.jsonl"), reverse=True)
needed = offset + limit
chunks: list[list[dict[str, Any]]] = []
total = 0
for audit_file in audit_files:
remaining = needed - total
if remaining <= 0:
break
entries: list[dict[str, Any]] = []
with audit_file.open("r", encoding="utf-8") as handle:
entries = list(deque((json.loads(line) for line in handle if line.strip()), maxlen=remaining))
if entries:
chunks.append(entries)
total += len(entries)
if not chunks:
return []
all_entries: list[dict[str, Any]] = []
for chunk in reversed(chunks):
all_entries.extend(chunk)
start = -(offset + limit) if (offset + limit) <= len(all_entries) else -len(all_entries)
if offset == 0:
return all_entries[start:]
return all_entries[start:-offset]