feat: add runtime diagnostics and path management
This commit is contained in:
107
src/coinhunter/runtime.py
Normal file
107
src/coinhunter/runtime.py
Normal file
@@ -0,0 +1,107 @@
|
||||
"""Runtime paths and environment helpers for CoinHunter CLI."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import shutil
|
||||
from dataclasses import asdict, dataclass
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class RuntimePaths:
|
||||
root: Path
|
||||
cache_dir: Path
|
||||
state_dir: Path
|
||||
logs_dir: Path
|
||||
reviews_dir: Path
|
||||
config_file: Path
|
||||
positions_file: Path
|
||||
accounts_file: Path
|
||||
executions_file: Path
|
||||
watchlist_file: Path
|
||||
notes_file: Path
|
||||
positions_lock: Path
|
||||
executions_lock: Path
|
||||
precheck_state_file: Path
|
||||
external_gate_lock: Path
|
||||
logrotate_config: Path
|
||||
logrotate_status: Path
|
||||
hermes_home: Path
|
||||
env_file: Path
|
||||
hermes_bin: Path
|
||||
|
||||
def as_dict(self) -> dict[str, str]:
|
||||
return {key: str(value) for key, value in asdict(self).items()}
|
||||
|
||||
|
||||
def _default_coinhunter_home() -> Path:
|
||||
raw = os.getenv("COINHUNTER_HOME")
|
||||
return Path(raw).expanduser() if raw else Path.home() / ".coinhunter"
|
||||
|
||||
|
||||
def _default_hermes_home() -> Path:
|
||||
raw = os.getenv("HERMES_HOME")
|
||||
return Path(raw).expanduser() if raw else Path.home() / ".hermes"
|
||||
|
||||
|
||||
def get_runtime_paths() -> RuntimePaths:
|
||||
root = _default_coinhunter_home()
|
||||
hermes_home = _default_hermes_home()
|
||||
state_dir = root / "state"
|
||||
return RuntimePaths(
|
||||
root=root,
|
||||
cache_dir=root / "cache",
|
||||
state_dir=state_dir,
|
||||
logs_dir=root / "logs",
|
||||
reviews_dir=root / "reviews",
|
||||
config_file=root / "config.json",
|
||||
positions_file=root / "positions.json",
|
||||
accounts_file=root / "accounts.json",
|
||||
executions_file=root / "executions.json",
|
||||
watchlist_file=root / "watchlist.json",
|
||||
notes_file=root / "notes.json",
|
||||
positions_lock=root / "positions.lock",
|
||||
executions_lock=root / "executions.lock",
|
||||
precheck_state_file=state_dir / "precheck_state.json",
|
||||
external_gate_lock=state_dir / "external_gate.lock",
|
||||
logrotate_config=root / "logrotate_external_gate.conf",
|
||||
logrotate_status=state_dir / "logrotate_external_gate.status",
|
||||
hermes_home=hermes_home,
|
||||
env_file=Path(os.getenv("COINHUNTER_ENV_FILE", str(hermes_home / ".env"))).expanduser(),
|
||||
hermes_bin=Path(os.getenv("HERMES_BIN", str(Path.home() / ".local" / "bin" / "hermes"))).expanduser(),
|
||||
)
|
||||
|
||||
|
||||
def ensure_runtime_dirs(paths: RuntimePaths | None = None) -> RuntimePaths:
|
||||
paths = paths or get_runtime_paths()
|
||||
for directory in (paths.root, paths.cache_dir, paths.state_dir, paths.logs_dir, paths.reviews_dir):
|
||||
directory.mkdir(parents=True, exist_ok=True)
|
||||
return paths
|
||||
|
||||
|
||||
def load_env_file(paths: RuntimePaths | None = None) -> Path:
|
||||
paths = paths or get_runtime_paths()
|
||||
if paths.env_file.exists():
|
||||
for line in paths.env_file.read_text(encoding="utf-8").splitlines():
|
||||
line = line.strip()
|
||||
if line and not line.startswith("#") and "=" in line:
|
||||
key, value = line.split("=", 1)
|
||||
os.environ.setdefault(key.strip(), value.strip())
|
||||
return paths.env_file
|
||||
|
||||
|
||||
def resolve_hermes_executable(paths: RuntimePaths | None = None) -> str:
|
||||
paths = paths or get_runtime_paths()
|
||||
discovered = shutil.which("hermes")
|
||||
if discovered:
|
||||
return discovered
|
||||
return str(paths.hermes_bin)
|
||||
|
||||
|
||||
def mask_secret(value: str | None, *, tail: int = 4) -> str:
|
||||
if not value:
|
||||
return ""
|
||||
if len(value) <= tail:
|
||||
return "*" * len(value)
|
||||
return "*" * max(4, len(value) - tail) + value[-tail:]
|
||||
Reference in New Issue
Block a user