refactor: rewrite to CoinHunter V2 flat architecture

Replace the V1 commands/services split with a flat, direct architecture:
- cli.py dispatches directly to service functions
- New services: account, market, trade, opportunity
- Thin Binance wrappers: spot_client, um_futures_client
- Add audit logging, runtime paths, and TOML config
- Remove legacy V1 code: commands/, precheck, review engine, smart executor
- Add ruff + mypy toolchain and fix edge cases in trade params

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-04-16 17:22:29 +08:00
parent 3819e35a7b
commit 52cd76a750
78 changed files with 2023 additions and 5407 deletions

View File

@@ -1,127 +1,52 @@
"""Runtime paths and environment helpers for CoinHunter CLI."""
"""Runtime helpers for CoinHunter V2."""
from __future__ import annotations
import json
import os
import shutil
from dataclasses import asdict, dataclass
from dataclasses import asdict, dataclass, is_dataclass
from datetime import date, datetime
from pathlib import Path
from typing import Any
@dataclass(frozen=True)
class RuntimePaths:
root: Path
cache_dir: Path
state_dir: Path
logs_dir: Path
reviews_dir: Path
config_file: Path
positions_file: Path
accounts_file: Path
executions_file: Path
watchlist_file: Path
notes_file: Path
positions_lock: Path
executions_lock: Path
precheck_state_file: Path
precheck_state_lock: Path
external_gate_lock: Path
logrotate_config: Path
logrotate_status: Path
hermes_home: Path
env_file: Path
hermes_bin: Path
logs_dir: Path
def as_dict(self) -> dict[str, str]:
return {key: str(value) for key, value in asdict(self).items()}
def _default_coinhunter_home() -> Path:
raw = os.getenv("COINHUNTER_HOME")
return Path(raw).expanduser() if raw else Path.home() / ".coinhunter"
def _default_hermes_home() -> Path:
raw = os.getenv("HERMES_HOME")
return Path(raw).expanduser() if raw else Path.home() / ".hermes"
def get_runtime_paths() -> RuntimePaths:
root = _default_coinhunter_home()
hermes_home = _default_hermes_home()
state_dir = root / "state"
root = Path(os.getenv("COINHUNTER_HOME", "~/.coinhunter")).expanduser()
return RuntimePaths(
root=root,
cache_dir=root / "cache",
state_dir=state_dir,
config_file=root / "config.toml",
env_file=root / ".env",
logs_dir=root / "logs",
reviews_dir=root / "reviews",
config_file=root / "config.json",
positions_file=root / "positions.json",
accounts_file=root / "accounts.json",
executions_file=root / "executions.json",
watchlist_file=root / "watchlist.json",
notes_file=root / "notes.json",
positions_lock=root / "positions.lock",
executions_lock=root / "executions.lock",
precheck_state_file=state_dir / "precheck_state.json",
precheck_state_lock=state_dir / "precheck_state.lock",
external_gate_lock=state_dir / "external_gate.lock",
logrotate_config=root / "logrotate_external_gate.conf",
logrotate_status=state_dir / "logrotate_external_gate.status",
hermes_home=hermes_home,
env_file=Path(os.getenv("COINHUNTER_ENV_FILE", str(hermes_home / ".env"))).expanduser(),
hermes_bin=Path(os.getenv("HERMES_BIN", str(Path.home() / ".local" / "bin" / "hermes"))).expanduser(),
)
def ensure_runtime_dirs(paths: RuntimePaths | None = None) -> RuntimePaths:
paths = paths or get_runtime_paths()
for directory in (paths.root, paths.cache_dir, paths.state_dir, paths.logs_dir, paths.reviews_dir):
directory.mkdir(parents=True, exist_ok=True)
paths.root.mkdir(parents=True, exist_ok=True)
paths.logs_dir.mkdir(parents=True, exist_ok=True)
return paths
def load_env_file(paths: RuntimePaths | None = None) -> Path:
paths = paths or get_runtime_paths()
if paths.env_file.exists():
for line in paths.env_file.read_text(encoding="utf-8").splitlines():
line = line.strip()
if line and not line.startswith("#") and "=" in line:
key, value = line.split("=", 1)
os.environ.setdefault(key.strip(), value.strip())
return paths.env_file
def json_default(value: Any) -> Any:
if is_dataclass(value) and not isinstance(value, type):
return asdict(value)
if isinstance(value, (datetime, date)):
return value.isoformat()
if isinstance(value, Path):
return str(value)
raise TypeError(f"Object of type {type(value).__name__} is not JSON serializable")
def resolve_hermes_executable(paths: RuntimePaths | None = None) -> str:
paths = paths or get_runtime_paths()
discovered = shutil.which("hermes")
if discovered:
return discovered
return str(paths.hermes_bin)
def mask_secret(value: str | None, *, tail: int = 4) -> str:
if not value:
return ""
if len(value) <= tail:
return "*" * len(value)
return "*" * max(4, len(value) - tail) + value[-tail:]
def get_user_config(key: str, default=None):
"""Read a dotted key from the user config file."""
paths = get_runtime_paths()
try:
config = json.loads(paths.config_file.read_text(encoding="utf-8"))
except Exception:
return default
for part in key.split("."):
if isinstance(config, dict):
config = config.get(part)
if config is None:
return default
else:
return default
return config if config is not None else default
def print_json(payload: Any) -> None:
print(json.dumps(payload, ensure_ascii=False, indent=2, sort_keys=True, default=json_default))