"""Configuration and secret loading for CoinHunter V2.""" from __future__ import annotations import os from pathlib import Path from typing import Any from .runtime import RuntimePaths, ensure_runtime_dirs, get_runtime_paths try: import tomllib except ModuleNotFoundError: # pragma: no cover import tomli as tomllib try: import tomli_w except ModuleNotFoundError: # pragma: no cover tomli_w = None # type: ignore[assignment] DEFAULT_CONFIG = """[runtime] timezone = "Asia/Shanghai" log_dir = "logs" output_format = "tui" [binance] spot_base_url = "https://api.binance.com" recv_window = 5000 [market] default_quote = "USDT" universe_allowlist = [] universe_denylist = [] [trading] spot_enabled = true dry_run_default = false dust_usdt_threshold = 10.0 [signal] lookback_interval = "1h" trend = 1.0 momentum = 1.0 breakout = 0.8 volume = 0.7 volatility_penalty = 0.5 [opportunity] min_quote_volume = 1000000.0 top_n = 10 scan_limit = 50 ignore_dust = true entry_threshold = 1.5 watch_threshold = 0.6 overlap_penalty = 0.6 [portfolio] add_threshold = 1.5 hold_threshold = 0.6 trim_threshold = 0.2 exit_threshold = -0.2 max_position_weight = 0.6 """ DEFAULT_ENV = "BINANCE_API_KEY=\nBINANCE_API_SECRET=\n" def _permission_denied_message(paths: RuntimePaths, exc: PermissionError) -> RuntimeError: return RuntimeError( "Unable to initialize CoinHunter runtime files because the target directory is not writable: " f"{paths.root}. Set COINHUNTER_HOME to a writable directory or rerun with permissions that can write there. " f"Original error: {exc}" ) def ensure_init_files(paths: RuntimePaths | None = None, *, force: bool = False) -> dict[str, Any]: paths = paths or get_runtime_paths() try: ensure_runtime_dirs(paths) except PermissionError as exc: raise _permission_denied_message(paths, exc) from exc created: list[str] = [] updated: list[str] = [] for path, content in ((paths.config_file, DEFAULT_CONFIG), (paths.env_file, DEFAULT_ENV)): if force or not path.exists(): try: path.write_text(content, encoding="utf-8") except PermissionError as exc: raise _permission_denied_message(paths, exc) from exc (updated if force and path.exists() else created).append(str(path)) return { "root": str(paths.root), "config_file": str(paths.config_file), "env_file": str(paths.env_file), "logs_dir": str(paths.logs_dir), "created_or_updated": created + updated, "force": force, } def load_config(paths: RuntimePaths | None = None) -> dict[str, Any]: paths = paths or get_runtime_paths() if not paths.config_file.exists(): raise RuntimeError(f"Missing config file at {paths.config_file}. Run `coinhunter init` first.") return tomllib.loads(paths.config_file.read_text(encoding="utf-8")) # type: ignore[no-any-return] def load_env_file(paths: RuntimePaths | None = None) -> dict[str, str]: paths = paths or get_runtime_paths() loaded: dict[str, str] = {} if not paths.env_file.exists(): return loaded for raw_line in paths.env_file.read_text(encoding="utf-8").splitlines(): line = raw_line.strip() if not line or line.startswith("#") or "=" not in line: continue key, value = line.split("=", 1) key = key.strip() value = value.strip() loaded[key] = value os.environ[key] = value return loaded def get_binance_credentials(paths: RuntimePaths | None = None) -> dict[str, str]: load_env_file(paths) api_key = os.getenv("BINANCE_API_KEY", "").strip() api_secret = os.getenv("BINANCE_API_SECRET", "").strip() if not api_key or not api_secret: runtime_paths = paths or get_runtime_paths() raise RuntimeError( "Missing BINANCE_API_KEY or BINANCE_API_SECRET. " f"Populate {runtime_paths.env_file} or export them in the environment." ) return {"api_key": api_key, "api_secret": api_secret} def resolve_log_dir(config: dict[str, Any], paths: RuntimePaths | None = None) -> Path: paths = paths or get_runtime_paths() raw = config.get("runtime", {}).get("log_dir", "logs") value = Path(raw).expanduser() return value if value.is_absolute() else paths.root / value def get_config_value(config: dict[str, Any], key_path: str) -> Any: keys = key_path.split(".") node = config for key in keys: if not isinstance(node, dict) or key not in node: return None node = node[key] return node def set_config_value(config_file: Path, key_path: str, value: Any) -> None: if tomli_w is None: raise RuntimeError("tomli-w is not installed. Run `pip install tomli-w`.") if not config_file.exists(): raise RuntimeError(f"Config file not found: {config_file}") config = tomllib.loads(config_file.read_text(encoding="utf-8")) keys = key_path.split(".") node = config for key in keys[:-1]: if key not in node: node[key] = {} node = node[key] # Coerce type from existing value when possible existing = node.get(keys[-1]) if isinstance(existing, bool) and isinstance(value, str): value = value.lower() in ("true", "1", "yes", "on") elif isinstance(existing, (int, float)) and isinstance(value, str): try: value = type(existing)(value) except (ValueError, TypeError) as exc: raise RuntimeError( f"Cannot set {key_path} to {value!r}: expected {type(existing).__name__}, got {value!r}" ) from exc elif isinstance(existing, list) and isinstance(value, str): value = [item.strip() for item in value.split(",") if item.strip()] node[keys[-1]] = value config_file.write_text(tomli_w.dumps(config), encoding="utf-8") def get_env_value(paths: RuntimePaths | None = None, key: str = "") -> str: paths = paths or get_runtime_paths() if not paths.env_file.exists(): return "" env_data = load_env_file(paths) return env_data.get(key, "") def set_env_value(paths: RuntimePaths | None = None, key: str = "", value: str = "") -> None: paths = paths or get_runtime_paths() if not paths.env_file.exists(): raise RuntimeError(f"Env file not found: {paths.env_file}. Run `coin init` first.") lines = paths.env_file.read_text(encoding="utf-8").splitlines() found = False for i, line in enumerate(lines): stripped = line.strip() if stripped.startswith(f"{key}=") or stripped.startswith(f"{key} ="): lines[i] = f"{key}={value}" found = True break if not found: lines.append(f"{key}={value}") paths.env_file.write_text("\n".join(lines) + "\n", encoding="utf-8") os.environ[key] = value