Compare commits

11 Commits

Author SHA1 Message Date
69f447f538 chore: release v3.0.0
- Bump version to 3.0.0 in pyproject.toml
- Update README with What's New section and new command examples
  (--window for tickers, --dry-run for catlog)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-20 17:07:49 +08:00
1da08415f1 feat: split portfolio and opportunity decision models 2026-04-20 16:13:57 +08:00
4312b16288 feat: configurable ticker window for market stats (1h, 4h, 1d)
- Replace hardcoded ticker_24h with ticker_stats supporting configurable window
- Add -w/--window flag to `market tickers` (choices: 1h, 4h, 1d, default 1d)
- Update TUI title and JSON output to include window field
- Keep opportunity/pf service on 1d default
- Sync tests and doc comments

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-20 11:11:11 +08:00
cf26a3dd3a feat: split audit logs into live/dryrun subdirs, add catlog --dry-run, list all kline intervals
- Write live trades to logs/live/ and dry-run trades to logs/dryrun/
- Add -d/--dry-run flag to catlog to read dry-run audit logs
- List all 16 Binance kline interval options in --help and docs

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-20 10:27:22 +08:00
e37993c8b5 feat: flatten opportunity commands, add config management, fix completions
- Flatten opportunity into top-level portfolio and opportunity commands
- Add interactive config get/set/key/secret with type coercion
- Rewrite --doc to show TUI vs JSON schema per command
- Unify agent mode output to JSON only
- Make init prompt for API key/secret interactively
- Fix coin tab completion alias binding
- Fix set_config_value reading from wrong path
- Fail loudly on invalid numeric config values

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-20 08:43:30 +08:00
3855477155 refactor: flatten account command to a single balances view
Remove overview/balances/positions subcommands in favor of one
`account` command that returns all balances with an `is_dust` flag.
Add descriptions to every parser and expose -a/--agent and --doc
on all leaf commands for better help discoverability.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 18:19:19 +08:00
d629c25232 fix: resolve merge conflicts and lint issues
- Merge origin/main changes (flattened buy/sell commands, --doc flag, aliases)
- Fix spinner placement for buy/sell commands
- Fix duplicate alias key 'p' in canonical subcommands
- Remove unused mypy ignore comments in spot_client.py
- Fix nested with statements in tests

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 16:59:53 +08:00
4602583760 Merge remote-tracking branch 'origin/main' into main 2026-04-17 16:57:40 +08:00
ca0625b199 chore: bump version to 2.1.1
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 16:51:55 +08:00
a0e01ca56f chore: bump version to 2.1.0
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 16:44:58 +08:00
f528575aa8 feat: add catlog command, agent flag reorder, and TUI polish
- Add `coinhunter catlog` with limit/offset pagination for audit logs
- Optimize audit log reading with deque to avoid loading all history
- Allow `-a/--agent` flag after subcommands
- Fix upgrade spinner artifact and empty line issues
- Render audit log TUI as timeline with low-saturation event colors
- Convert audit timestamps to local timezone in TUI
- Remove futures-related capabilities
- Add conda environment.yml for development
- Bump version to 2.0.9 and update README

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 16:42:47 +08:00
20 changed files with 1764 additions and 609 deletions

View File

@@ -20,8 +20,10 @@ CoinHunter V2 is a Binance-first crypto trading CLI with a flat, direct architec
- **`src/coinhunter/services/`** — Contains all domain logic: - **`src/coinhunter/services/`** — Contains all domain logic:
- `account_service.py` — balances, positions, overview - `account_service.py` — balances, positions, overview
- `market_service.py` — tickers, klines, scan universe, symbol normalization - `market_service.py` — tickers, klines, scan universe, symbol normalization
- `signal_service.py` — shared market signal scoring used by scan and portfolio analysis
- `portfolio_service.py` — held-position review and add/hold/trim/exit recommendations
- `trade_service.py` — spot and USDT-M futures order execution - `trade_service.py` — spot and USDT-M futures order execution
- `opportunity_service.py`portfolio recommendations and market scanning - `opportunity_service.py`market scanning and entry/watch/skip recommendations
- **`src/coinhunter/binance/`** — Thin wrappers around official Binance connectors: - **`src/coinhunter/binance/`** — Thin wrappers around official Binance connectors:
- `spot_client.py` wraps `binance.spot.Spot` - `spot_client.py` wraps `binance.spot.Spot`
- `um_futures_client.py` wraps `binance.um_futures.UMFutures` - `um_futures_client.py` wraps `binance.um_futures.UMFutures`
@@ -34,7 +36,7 @@ CoinHunter V2 is a Binance-first crypto trading CLI with a flat, direct architec
User data lives in `~/.coinhunter/` by default (override with `COINHUNTER_HOME`): User data lives in `~/.coinhunter/` by default (override with `COINHUNTER_HOME`):
- `config.toml` — runtime, binance, trading, and opportunity settings - `config.toml` — runtime, binance, trading, signal, opportunity, and portfolio settings
- `.env``BINANCE_API_KEY` and `BINANCE_API_SECRET` - `.env``BINANCE_API_KEY` and `BINANCE_API_SECRET`
- `logs/audit_YYYYMMDD.jsonl` — structured audit log - `logs/audit_YYYYMMDD.jsonl` — structured audit log

View File

@@ -19,6 +19,14 @@
--- ---
## What's New in 3.0
- **Split decision models** — portfolio (add/hold/trim/exit) and opportunity (enter/watch/skip) now use independent scoring logic.
- **Configurable ticker windows** — `market tickers` supports `--window 1h`, `4h`, or `1d`.
- **Live / dry-run audit logs** — audit logs are written to separate subdirectories; use `catlog --dry-run` to review simulations.
- **Flattened commands** — `account`, `opportunity`, and `config` are now top-level for fewer keystrokes.
- **Runtime config management** — `config get`, `config set`, and `config key/secret` let you edit settings without touching files manually.
## Install ## Install
For end users, install from PyPI with [pipx](https://pipx.pypa.io/) (recommended) to avoid polluting your system Python: For end users, install from PyPI with [pipx](https://pipx.pypa.io/) (recommended) to avoid polluting your system Python:
@@ -61,6 +69,8 @@ This creates:
If you are using **zsh** or **bash**, `init` will also generate and install shell completion scripts automatically, and update your rc file (`~/.zshrc` or `~/.bashrc`) if needed. If you are using **zsh** or **bash**, `init` will also generate and install shell completion scripts automatically, and update your rc file (`~/.zshrc` or `~/.bashrc`) if needed.
`init` interactively prompts for your Binance API key and secret if they are missing. Use `--no-prompt` to skip this.
`config.toml` stores runtime and strategy settings. `.env` stores: `config.toml` stores runtime and strategy settings. `.env` stores:
```bash ```bash
@@ -68,6 +78,12 @@ BINANCE_API_KEY=
BINANCE_API_SECRET= BINANCE_API_SECRET=
``` ```
Strategy settings are split into three blocks:
- `[signal]` for shared market-signal weights and lookback interval
- `[opportunity]` for scan thresholds, liquidity filters, and top-N output
- `[portfolio]` for add/hold/trim/exit thresholds and max position weight
Override the default home directory with `COINHUNTER_HOME`. Override the default home directory with `COINHUNTER_HOME`.
## Commands ## Commands
@@ -85,16 +101,14 @@ coin market klines --doc
```bash ```bash
# Account (aliases: a, acc) # Account (aliases: a, acc)
coinhunter account overview coinhunter account
coinhunter account overview --agent coinhunter account --agent
coin a ov coin a
coin acc bal
coin a pos
# Market (aliases: m) # Market (aliases: m)
coinhunter market tickers BTCUSDT ETH/USDT sol-usdt coinhunter market tickers BTCUSDT ETH/USDT sol-usdt --window 1h
coinhunter market klines BTCUSDT ETHUSDT --interval 1h --limit 50 coinhunter market klines BTCUSDT ETHUSDT --interval 1h --limit 50
coin m tk BTCUSDT ETHUSDT coin m tk BTCUSDT ETHUSDT -w 1d
coin m k BTCUSDT -i 1h -l 50 coin m k BTCUSDT -i 1h -l 50
# Trade (buy / sell are now top-level commands) # Trade (buy / sell are now top-level commands)
@@ -103,12 +117,34 @@ coinhunter sell BTCUSDT --qty 0.01 --type limit --price 90000
coin b BTCUSDT -Q 100 -d coin b BTCUSDT -Q 100 -d
coin s BTCUSDT -q 0.01 -t limit -p 90000 coin s BTCUSDT -q 0.01 -t limit -p 90000
# Opportunities (aliases: opp, o) # Portfolio (aliases: pf, p)
coinhunter opportunity portfolio coinhunter portfolio
coinhunter opportunity scan coinhunter portfolio --agent
coinhunter opportunity scan --symbols BTCUSDT ETHUSDT SOLUSDT coin pf
coin opp pf
coin o scan -s BTCUSDT ETHUSDT # Opportunity scanning (aliases: o)
coinhunter opportunity
coinhunter opportunity --symbols BTCUSDT ETHUSDT SOLUSDT
coin o -s BTCUSDT ETHUSDT
# Audit log
coinhunter catlog
coinhunter catlog -n 20
coinhunter catlog -n 10 -o 10
coinhunter catlog --dry-run
# Configuration management (aliases: cfg, c)
coinhunter config get # show all config
coinhunter config get binance.recv_window
coinhunter config set opportunity.top_n 20
coinhunter config set signal.lookback_interval 4h
coinhunter config set portfolio.max_position_weight 0.25
coinhunter config set trading.dry_run_default true
coinhunter config set market.universe_allowlist BTCUSDT,ETHUSDT
coinhunter config key YOUR_API_KEY # or omit value to prompt interactively
coinhunter config secret YOUR_SECRET # or omit value to prompt interactively
coin c get opportunity.top_n
coin c set trading.dry_run_default false
# Self-upgrade # Self-upgrade
coinhunter upgrade coinhunter upgrade
@@ -129,7 +165,7 @@ CoinHunter V2 uses a flat, direct architecture:
|-------|----------------|-----------| |-------|----------------|-----------|
| **CLI** | Single entrypoint, argument parsing | `cli.py` | | **CLI** | Single entrypoint, argument parsing | `cli.py` |
| **Binance** | Thin API wrappers with unified error handling | `binance/spot_client.py` | | **Binance** | Thin API wrappers with unified error handling | `binance/spot_client.py` |
| **Services** | Domain logic | `services/account_service.py`, `services/market_service.py`, `services/trade_service.py`, `services/opportunity_service.py` | | **Services** | Domain logic | `services/account_service.py`, `services/market_service.py`, `services/signal_service.py`, `services/opportunity_service.py`, `services/portfolio_service.py`, `services/trade_service.py` |
| **Config** | TOML config, `.env` secrets, path resolution | `config.py` | | **Config** | TOML config, `.env` secrets, path resolution | `config.py` |
| **Runtime** | Paths, TUI/JSON/compact output | `runtime.py` | | **Runtime** | Paths, TUI/JSON/compact output | `runtime.py` |
| **Audit** | Structured JSONL logging | `audit.py` | | **Audit** | Structured JSONL logging | `audit.py` |
@@ -150,6 +186,8 @@ Events include:
- `opportunity_portfolio_generated` - `opportunity_portfolio_generated`
- `opportunity_scan_generated` - `opportunity_scan_generated`
Use `coinhunter catlog` to read recent entries in the terminal. It aggregates across all days and supports pagination with `-n/--limit` and `-o/--offset`.
## Development ## Development
Clone the repo and install in editable mode: Clone the repo and install in editable mode:
@@ -160,6 +198,13 @@ cd coinhunter-cli
pip install -e ".[dev]" pip install -e ".[dev]"
``` ```
Or use the provided Conda environment:
```bash
conda env create -f environment.yml
conda activate coinhunter
```
Run quality checks: Run quality checks:
```bash ```bash

9
environment.yml Normal file
View File

@@ -0,0 +1,9 @@
name: coinhunter
channels:
- defaults
- conda-forge
dependencies:
- python>=3.10
- pip
- pip:
- -e ".[dev]"

View File

@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project] [project]
name = "coinhunter" name = "coinhunter"
version = "2.1.0" version = "3.0.0"
description = "Binance-first trading CLI for balances, market data, opportunity scanning, and execution." description = "Binance-first trading CLI for balances, market data, opportunity scanning, and execution."
readme = "README.md" readme = "README.md"
license = {text = "MIT"} license = {text = "MIT"}
@@ -13,9 +13,7 @@ dependencies = [
"binance-connector>=3.9.0", "binance-connector>=3.9.0",
"shtab>=1.7.0", "shtab>=1.7.0",
"tomli>=2.0.1; python_version < '3.11'", "tomli>=2.0.1; python_version < '3.11'",
] "tomli-w>=1.0.0",
authors = [
{name = "Tacit Lab", email = "ouyangcarlos@gmail.com"}
] ]
[project.optional-dependencies] [project.optional-dependencies]
@@ -37,13 +35,10 @@ where = ["src"]
[tool.pytest.ini_options] [tool.pytest.ini_options]
testpaths = ["tests"] testpaths = ["tests"]
addopts = "-v"
[tool.ruff]
target-version = "py310"
line-length = 120
[tool.ruff.lint] [tool.ruff.lint]
select = ["E", "F", "I", "UP", "W"] select = ["E", "F", "I", "W", "UP", "B", "C4", "SIM"]
ignore = ["E501"] ignore = ["E501"]
[tool.ruff.lint.pydocstyle] [tool.ruff.lint.pydocstyle]
@@ -52,7 +47,5 @@ convention = "google"
[tool.mypy] [tool.mypy]
python_version = "3.10" python_version = "3.10"
warn_return_any = true warn_return_any = true
warn_unused_configs = true warn_unused_ignores = true
disallow_untyped_defs = true
ignore_missing_imports = true ignore_missing_imports = true
exclude = [".venv", "build"]

View File

@@ -3,6 +3,7 @@
from __future__ import annotations from __future__ import annotations
import json import json
from collections import deque
from datetime import datetime, timezone from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
@@ -21,19 +22,57 @@ def _resolve_audit_dir(paths: RuntimePaths) -> Path:
return _audit_dir_cache[key] return _audit_dir_cache[key]
def _audit_path(paths: RuntimePaths | None = None) -> Path: def _audit_path(paths: RuntimePaths | None = None, *, dry_run: bool = False) -> Path:
paths = ensure_runtime_dirs(paths or get_runtime_paths()) paths = ensure_runtime_dirs(paths or get_runtime_paths())
logs_dir = _resolve_audit_dir(paths) logs_dir = _resolve_audit_dir(paths)
logs_dir.mkdir(parents=True, exist_ok=True) subdir = logs_dir / ("dryrun" if dry_run else "live")
return logs_dir / f"audit_{datetime.now(timezone.utc).strftime('%Y%m%d')}.jsonl" subdir.mkdir(parents=True, exist_ok=True)
return subdir / f"audit_{datetime.now(timezone.utc).strftime('%Y%m%d')}.jsonl"
def audit_event(event: str, payload: dict[str, Any], paths: RuntimePaths | None = None) -> dict[str, Any]: def audit_event(
event: str, payload: dict[str, Any], paths: RuntimePaths | None = None, *, dry_run: bool = False
) -> dict[str, Any]:
entry = { entry = {
"timestamp": datetime.now(timezone.utc).isoformat(), "timestamp": datetime.now(timezone.utc).isoformat(),
"event": event, "event": event,
**payload, **payload,
} }
with _audit_path(paths).open("a", encoding="utf-8") as handle: with _audit_path(paths, dry_run=dry_run).open("a", encoding="utf-8") as handle:
handle.write(json.dumps(entry, ensure_ascii=False, default=json_default) + "\n") handle.write(json.dumps(entry, ensure_ascii=False, default=json_default) + "\n")
return entry return entry
def read_audit_log(
paths: RuntimePaths | None = None, limit: int = 10, offset: int = 0, *, dry_run: bool = False
) -> list[dict[str, Any]]:
paths = ensure_runtime_dirs(paths or get_runtime_paths())
logs_dir = _resolve_audit_dir(paths)
if not logs_dir.exists():
return []
subdir = logs_dir / ("dryrun" if dry_run else "live")
if not subdir.exists():
return []
audit_files = sorted(subdir.glob("audit_*.jsonl"), reverse=True)
needed = offset + limit
chunks: list[list[dict[str, Any]]] = []
total = 0
for audit_file in audit_files:
remaining = needed - total
if remaining <= 0:
break
entries: list[dict[str, Any]] = []
with audit_file.open("r", encoding="utf-8") as handle:
entries = list(deque((json.loads(line) for line in handle if line.strip()), maxlen=remaining))
if entries:
chunks.append(entries)
total += len(entries)
if not chunks:
return []
all_entries: list[dict[str, Any]] = []
for chunk in reversed(chunks):
all_entries.extend(chunk)
start = -(offset + limit) if (offset + limit) <= len(all_entries) else -len(all_entries)
if offset == 0:
return all_entries[start:]
return all_entries[start:-offset]

View File

@@ -5,7 +5,10 @@ from __future__ import annotations
from collections.abc import Callable from collections.abc import Callable
from typing import Any from typing import Any
from requests.exceptions import RequestException, SSLError from requests.exceptions import (
RequestException,
SSLError,
)
class SpotBinanceClient: class SpotBinanceClient:
@@ -49,14 +52,15 @@ class SpotBinanceClient:
kwargs["symbol"] = symbol kwargs["symbol"] = symbol
return self._call("exchange info", self._client.exchange_info, **kwargs) # type: ignore[no-any-return] return self._call("exchange info", self._client.exchange_info, **kwargs) # type: ignore[no-any-return]
def ticker_24h(self, symbols: list[str] | None = None) -> list[dict[str, Any]]: def ticker_stats(self, symbols: list[str] | None = None, *, window: str = "1d") -> list[dict[str, Any]]:
if not symbols: kwargs: dict[str, Any] = {"windowSize": window}
response = self._call("24h ticker", self._client.ticker_24hr) if symbols:
elif len(symbols) == 1: if len(symbols) == 1:
response = self._call("24h ticker", self._client.ticker_24hr, symbol=symbols[0]) kwargs["symbol"] = symbols[0]
else: else:
response = self._call("24h ticker", self._client.ticker_24hr, symbols=symbols) kwargs["symbols"] = symbols
return response if isinstance(response, list) else [response] # type: ignore[no-any-return] response = self._call("ticker stats", self._client.ticker, **kwargs)
return response if isinstance(response, list) else [response]
def ticker_price(self, symbols: list[str] | None = None) -> list[dict[str, Any]]: def ticker_price(self, symbols: list[str] | None = None) -> list[dict[str, Any]]:
if not symbols: if not symbols:
@@ -65,7 +69,7 @@ class SpotBinanceClient:
response = self._call("ticker price", self._client.ticker_price, symbol=symbols[0]) response = self._call("ticker price", self._client.ticker_price, symbol=symbols[0])
else: else:
response = self._call("ticker price", self._client.ticker_price, symbols=symbols) response = self._call("ticker price", self._client.ticker_price, symbols=symbols)
return response if isinstance(response, list) else [response] # type: ignore[no-any-return] return response if isinstance(response, list) else [response]
def klines(self, symbol: str, interval: str, limit: int) -> list[list[Any]]: def klines(self, symbol: str, interval: str, limit: int) -> list[list[Any]]:
return self._call("klines", self._client.klines, symbol=symbol, interval=interval, limit=limit) # type: ignore[no-any-return] return self._call("klines", self._client.klines, symbol=symbol, interval=interval, limit=limit) # type: ignore[no-any-return]

File diff suppressed because it is too large Load Diff

View File

@@ -13,6 +13,11 @@ try:
except ModuleNotFoundError: # pragma: no cover except ModuleNotFoundError: # pragma: no cover
import tomli as tomllib import tomli as tomllib
try:
import tomli_w
except ModuleNotFoundError: # pragma: no cover
tomli_w = None # type: ignore[assignment]
DEFAULT_CONFIG = """[runtime] DEFAULT_CONFIG = """[runtime]
timezone = "Asia/Shanghai" timezone = "Asia/Shanghai"
@@ -33,20 +38,29 @@ spot_enabled = true
dry_run_default = false dry_run_default = false
dust_usdt_threshold = 10.0 dust_usdt_threshold = 10.0
[opportunity] [signal]
min_quote_volume = 1000000.0 lookback_interval = "1h"
top_n = 10
scan_limit = 50
ignore_dust = true
lookback_intervals = ["1h", "4h", "1d"]
[opportunity.weights]
trend = 1.0 trend = 1.0
momentum = 1.0 momentum = 1.0
breakout = 0.8 breakout = 0.8
volume = 0.7 volume = 0.7
volatility_penalty = 0.5 volatility_penalty = 0.5
position_concentration_penalty = 0.6
[opportunity]
min_quote_volume = 1000000.0
top_n = 10
scan_limit = 50
ignore_dust = true
entry_threshold = 1.5
watch_threshold = 0.6
overlap_penalty = 0.6
[portfolio]
add_threshold = 1.5
hold_threshold = 0.6
trim_threshold = 0.2
exit_threshold = -0.2
max_position_weight = 0.6
""" """
DEFAULT_ENV = "BINANCE_API_KEY=\nBINANCE_API_SECRET=\n" DEFAULT_ENV = "BINANCE_API_KEY=\nBINANCE_API_SECRET=\n"
@@ -128,3 +142,72 @@ def resolve_log_dir(config: dict[str, Any], paths: RuntimePaths | None = None) -
raw = config.get("runtime", {}).get("log_dir", "logs") raw = config.get("runtime", {}).get("log_dir", "logs")
value = Path(raw).expanduser() value = Path(raw).expanduser()
return value if value.is_absolute() else paths.root / value return value if value.is_absolute() else paths.root / value
def get_config_value(config: dict[str, Any], key_path: str) -> Any:
keys = key_path.split(".")
node = config
for key in keys:
if not isinstance(node, dict) or key not in node:
return None
node = node[key]
return node
def set_config_value(config_file: Path, key_path: str, value: Any) -> None:
if tomli_w is None:
raise RuntimeError("tomli-w is not installed. Run `pip install tomli-w`.")
if not config_file.exists():
raise RuntimeError(f"Config file not found: {config_file}")
config = tomllib.loads(config_file.read_text(encoding="utf-8"))
keys = key_path.split(".")
node = config
for key in keys[:-1]:
if key not in node:
node[key] = {}
node = node[key]
# Coerce type from existing value when possible
existing = node.get(keys[-1])
if isinstance(existing, bool) and isinstance(value, str):
value = value.lower() in ("true", "1", "yes", "on")
elif isinstance(existing, (int, float)) and isinstance(value, str):
try:
value = type(existing)(value)
except (ValueError, TypeError) as exc:
raise RuntimeError(
f"Cannot set {key_path} to {value!r}: expected {type(existing).__name__}, got {value!r}"
) from exc
elif isinstance(existing, list) and isinstance(value, str):
value = [item.strip() for item in value.split(",") if item.strip()]
node[keys[-1]] = value
config_file.write_text(tomli_w.dumps(config), encoding="utf-8")
def get_env_value(paths: RuntimePaths | None = None, key: str = "") -> str:
paths = paths or get_runtime_paths()
if not paths.env_file.exists():
return ""
env_data = load_env_file(paths)
return env_data.get(key, "")
def set_env_value(paths: RuntimePaths | None = None, key: str = "", value: str = "") -> None:
paths = paths or get_runtime_paths()
if not paths.env_file.exists():
raise RuntimeError(f"Env file not found: {paths.env_file}. Run `coin init` first.")
lines = paths.env_file.read_text(encoding="utf-8").splitlines()
found = False
for i, line in enumerate(lines):
stripped = line.strip()
if stripped.startswith(f"{key}=") or stripped.startswith(f"{key} ="):
lines[i] = f"{key}={value}"
found = True
break
if not found:
lines.append(f"{key}={value}")
paths.env_file.write_text("\n".join(lines) + "\n", encoding="utf-8")
os.environ[key] = value

View File

@@ -126,6 +126,24 @@ def _fmt_number(value: Any) -> str:
return str(value) return str(value)
def _fmt_local_ts(ts: str) -> str:
try:
dt = datetime.fromisoformat(ts.replace("Z", "+00:00"))
return dt.astimezone().strftime("%Y-%m-%d %H:%M:%S")
except Exception:
return ts
def _event_color(event: str) -> str:
if "failed" in event or "error" in event:
return f"{_DIM}{_RED}"
if event.startswith("trade"):
return f"{_DIM}{_GREEN}"
if event.startswith("opportunity"):
return f"{_DIM}{_YELLOW}"
return _DIM
def _is_large_dataset(payload: Any, threshold: int = 8) -> bool: def _is_large_dataset(payload: Any, threshold: int = 8) -> bool:
if isinstance(payload, dict): if isinstance(payload, dict):
for value in payload.values(): for value in payload.values():
@@ -195,39 +213,27 @@ def _render_tui(payload: Any) -> None:
print(str(payload)) print(str(payload))
return return
if "overview" in payload:
overview = payload.get("overview", {})
print(f"\n{_BOLD}{_CYAN} ACCOUNT OVERVIEW {_RESET}")
print(f" Total Equity: {_GREEN}{_fmt_number(overview.get('total_equity_usdt', 0))} USDT{_RESET}")
print(f" Spot Assets: {_fmt_number(overview.get('spot_asset_count', 0))}")
print(f" Positions: {_fmt_number(overview.get('spot_position_count', 0))}")
if payload.get("balances"):
print()
_render_tui({"balances": payload["balances"]})
if payload.get("positions"):
print()
_render_tui({"positions": payload["positions"]})
return
if "balances" in payload: if "balances" in payload:
rows = payload["balances"] rows = payload["balances"]
table_rows: list[list[str]] = [] table_rows: list[list[str]] = []
for r in rows: for r in rows:
is_dust = r.get("is_dust", False)
dust_label = f"{_DIM}dust{_RESET}" if is_dust else ""
table_rows.append( table_rows.append(
[ [
r.get("market_type", ""),
r.get("asset", ""), r.get("asset", ""),
_fmt_number(r.get("free", 0)), _fmt_number(r.get("free", 0)),
_fmt_number(r.get("locked", 0)), _fmt_number(r.get("locked", 0)),
_fmt_number(r.get("total", 0)), _fmt_number(r.get("total", 0)),
_fmt_number(r.get("notional_usdt", 0)), _fmt_number(r.get("notional_usdt", 0)),
dust_label,
] ]
) )
_print_box_table( _print_box_table(
"BALANCES", "BALANCES",
["Market", "Asset", "Free", "Locked", "Total", "Notional (USDT)"], ["Asset", "Free", "Locked", "Total", "Notional (USDT)", ""],
table_rows, table_rows,
aligns=["left", "left", "right", "right", "right", "right"], aligns=["left", "right", "right", "right", "right", "left"],
) )
return return
@@ -272,7 +278,7 @@ def _render_tui(payload: Any) -> None:
] ]
) )
_print_box_table( _print_box_table(
"24H TICKERS", f"TICKERS window={payload.get('window', '1d')}",
["Symbol", "Last Price", "Change %", "Quote Volume"], ["Symbol", "Last Price", "Change %", "Quote Volume"],
table_rows, table_rows,
aligns=["left", "right", "right", "right"], aligns=["left", "right", "right", "right"],
@@ -281,7 +287,9 @@ def _render_tui(payload: Any) -> None:
if "klines" in payload: if "klines" in payload:
rows = payload["klines"] rows = payload["klines"]
print(f"\n{_BOLD}{_CYAN} KLINES {_RESET} interval={payload.get('interval')} limit={payload.get('limit')} count={len(rows)}") print(
f"\n{_BOLD}{_CYAN} KLINES {_RESET} interval={payload.get('interval')} limit={payload.get('limit')} count={len(rows)}"
)
display_rows = rows[:10] display_rows = rows[:10]
table_rows = [] table_rows = []
for r in display_rows: for r in display_rows:
@@ -325,8 +333,18 @@ def _render_tui(payload: Any) -> None:
for i, r in enumerate(rows, 1): for i, r in enumerate(rows, 1):
score = r.get("score", 0) score = r.get("score", 0)
action = r.get("action", "") action = r.get("action", "")
action_color = _GREEN if action == "add" else _YELLOW if action == "hold" else _RED if action == "exit" else _CYAN action_color = (
print(f" {i}. {_BOLD}{r.get('symbol', '')}{_RESET} action={_color(action, action_color)} score={score:.4f}") _GREEN
if action in {"add", "enter"}
else _YELLOW
if action in {"hold", "watch", "review"}
else _RED
if action in {"exit", "skip", "trim"}
else _CYAN
)
print(
f" {i}. {_BOLD}{r.get('symbol', '')}{_RESET} action={_color(action, action_color)} score={score:.4f}"
)
for reason in r.get("reasons", []): for reason in r.get("reasons", []):
print(f" · {reason}") print(f" · {reason}")
metrics = r.get("metrics", {}) metrics = r.get("metrics", {})
@@ -340,9 +358,9 @@ def _render_tui(payload: Any) -> None:
stdout = payload.get("stdout", "") stdout = payload.get("stdout", "")
stderr = payload.get("stderr", "") stderr = payload.get("stderr", "")
if rc == 0: if rc == 0:
print(f"\n{_GREEN}{_RESET} Update completed") print(f"{_GREEN}{_RESET} Update completed")
else: else:
print(f"\n{_RED}{_RESET} Update failed (exit code {rc})") print(f"{_RED}{_RESET} Update failed (exit code {rc})")
if stdout: if stdout:
for line in stdout.strip().splitlines(): for line in stdout.strip().splitlines():
print(f" {line}") print(f" {line}")
@@ -352,6 +370,29 @@ def _render_tui(payload: Any) -> None:
print(f" {line}") print(f" {line}")
return return
if "entries" in payload:
rows = payload["entries"]
print(f"\n{_BOLD}{_CYAN} AUDIT LOG {_RESET}")
if not rows:
print(" (no audit entries)")
return
for r in rows:
ts = _fmt_local_ts(r.get("timestamp", ""))
event = r.get("event", "")
detail_parts: list[str] = []
for key in ("symbol", "side", "qty", "quote_amount", "order_type", "status", "dry_run", "error"):
val = r.get(key)
if val is not None:
detail_parts.append(f"{key}={val}")
if not detail_parts:
for key, val in r.items():
if key not in ("timestamp", "event") and not isinstance(val, (dict, list)):
detail_parts.append(f"{key}={val}")
print(f"\n {_DIM}{ts}{_RESET} {_event_color(event)}{event}{_RESET}")
if detail_parts:
print(f" {' '.join(detail_parts)}")
return
if "created_or_updated" in payload: if "created_or_updated" in payload:
print(f"\n{_BOLD}{_CYAN} INITIALIZED {_RESET}") print(f"\n{_BOLD}{_CYAN} INITIALIZED {_RESET}")
print(f" Root: {payload.get('root', '')}") print(f" Root: {payload.get('root', '')}")
@@ -391,10 +432,7 @@ def _render_tui(payload: Any) -> None:
def print_output(payload: Any, *, agent: bool = False) -> None: def print_output(payload: Any, *, agent: bool = False) -> None:
if agent: if agent:
if _is_large_dataset(payload): print_json(payload)
_print_compact(payload)
else:
print_json(payload)
else: else:
_render_tui(payload) _render_tui(payload)
@@ -474,6 +512,13 @@ def install_shell_completion(parser: argparse.ArgumentParser) -> dict[str, Any]:
return {"shell": None, "installed": False, "reason": "unable to detect shell from $SHELL"} return {"shell": None, "installed": False, "reason": "unable to detect shell from $SHELL"}
script = shtab.complete(parser, shell=shell, preamble="") script = shtab.complete(parser, shell=shell, preamble="")
# Also register completion for the "coinhunter" alias
prog = parser.prog.replace("-", "_")
func = f"_shtab_{prog}"
if shell == "bash":
script += f"\ncomplete -o filenames -F {func} coinhunter\n"
elif shell == "zsh":
script += f"\ncompdef {func} coinhunter\n"
installed_path: Path | None = None installed_path: Path | None = None
hint: str | None = None hint: str | None = None
@@ -485,7 +530,10 @@ def install_shell_completion(parser: argparse.ArgumentParser) -> dict[str, Any]:
rc_path = _zshrc_path() rc_path = _zshrc_path()
fpath_line = "fpath+=(~/.zsh/completions)" fpath_line = "fpath+=(~/.zsh/completions)"
if not _rc_contains(rc_path, fpath_line): if not _rc_contains(rc_path, fpath_line):
rc_path.write_text(fpath_line + "\n" + rc_path.read_text(encoding="utf-8") if rc_path.exists() else fpath_line + "\n", encoding="utf-8") rc_path.write_text(
fpath_line + "\n" + rc_path.read_text(encoding="utf-8") if rc_path.exists() else fpath_line + "\n",
encoding="utf-8",
)
hint = "Added fpath+=(~/.zsh/completions) to ~/.zshrc; restart your terminal or run 'compinit'" hint = "Added fpath+=(~/.zsh/completions) to ~/.zshrc; restart your terminal or run 'compinit'"
else: else:
hint = "Run 'compinit' or restart your terminal to activate completions" hint = "Run 'compinit' or restart your terminal to activate completions"
@@ -497,7 +545,10 @@ def install_shell_completion(parser: argparse.ArgumentParser) -> dict[str, Any]:
rc_path = _bashrc_path() rc_path = _bashrc_path()
source_line = '[[ -r "~/.local/share/bash-completion/completions/coinhunter" ]] && . "~/.local/share/bash-completion/completions/coinhunter"' source_line = '[[ -r "~/.local/share/bash-completion/completions/coinhunter" ]] && . "~/.local/share/bash-completion/completions/coinhunter"'
if not _rc_contains(rc_path, source_line): if not _rc_contains(rc_path, source_line):
rc_path.write_text(source_line + "\n" + rc_path.read_text(encoding="utf-8") if rc_path.exists() else source_line + "\n", encoding="utf-8") rc_path.write_text(
source_line + "\n" + rc_path.read_text(encoding="utf-8") if rc_path.exists() else source_line + "\n",
encoding="utf-8",
)
hint = "Added bash completion source line to ~/.bashrc; restart your terminal" hint = "Added bash completion source line to ~/.bashrc; restart your terminal"
else: else:
hint = "Restart your terminal or source ~/.bashrc to activate completions" hint = "Restart your terminal or source ~/.bashrc to activate completions"

View File

@@ -13,6 +13,7 @@ class AssetBalance:
locked: float locked: float
total: float total: float
notional_usdt: float notional_usdt: float
is_dust: bool
@dataclass @dataclass
@@ -59,6 +60,7 @@ def get_balances(
spot_client: Any, spot_client: Any,
) -> dict[str, Any]: ) -> dict[str, Any]:
quote = str(config.get("market", {}).get("default_quote", "USDT")).upper() quote = str(config.get("market", {}).get("default_quote", "USDT")).upper()
dust = float(config.get("trading", {}).get("dust_usdt_threshold", 0.0))
rows: list[dict[str, Any]] = [] rows: list[dict[str, Any]] = []
balances, _, price_map = _spot_account_data(spot_client, quote) balances, _, price_map = _spot_account_data(spot_client, quote)
for item in balances: for item in balances:
@@ -68,6 +70,7 @@ def get_balances(
if total <= 0: if total <= 0:
continue continue
asset = item["asset"] asset = item["asset"]
notional = total * price_map.get(asset, 0.0)
rows.append( rows.append(
asdict( asdict(
AssetBalance( AssetBalance(
@@ -75,7 +78,8 @@ def get_balances(
free=free, free=free,
locked=locked, locked=locked,
total=total, total=total,
notional_usdt=total * price_map.get(asset, 0.0), notional_usdt=notional,
is_dust=notional < dust,
) )
) )
) )
@@ -113,60 +117,3 @@ def get_positions(
) )
) )
return {"positions": rows} return {"positions": rows}
def get_overview(
config: dict[str, Any],
*,
spot_client: Any,
) -> dict[str, Any]:
quote = str(config.get("market", {}).get("default_quote", "USDT")).upper()
dust = float(config.get("trading", {}).get("dust_usdt_threshold", 0.0))
balances: list[dict[str, Any]] = []
positions: list[dict[str, Any]] = []
spot_balances, _, price_map = _spot_account_data(spot_client, quote)
for item in spot_balances:
free = float(item.get("free", 0.0))
locked = float(item.get("locked", 0.0))
total = free + locked
if total <= 0:
continue
asset = item["asset"]
balances.append(
asdict(
AssetBalance(
asset=asset,
free=free,
locked=locked,
total=total,
notional_usdt=total * price_map.get(asset, 0.0),
)
)
)
mark_price = price_map.get(asset, 1.0 if asset == quote else 0.0)
notional = total * mark_price
if notional >= dust:
positions.append(
asdict(
PositionView(
symbol=quote if asset == quote else f"{asset}{quote}",
quantity=total,
entry_price=None,
mark_price=mark_price,
notional_usdt=notional,
side="LONG",
)
)
)
spot_equity = sum(item["notional_usdt"] for item in balances)
overview = asdict(
AccountOverview(
total_equity_usdt=spot_equity,
spot_equity_usdt=spot_equity,
spot_asset_count=len(balances),
spot_position_count=len(positions),
)
)
return {"overview": overview, "balances": balances, "positions": positions}

View File

@@ -48,21 +48,23 @@ class KlineView:
quote_volume: float quote_volume: float
def get_tickers(config: dict[str, Any], symbols: list[str], *, spot_client: Any) -> dict[str, Any]: def get_tickers(config: dict[str, Any], symbols: list[str], *, spot_client: Any, window: str = "1d") -> dict[str, Any]:
normalized = normalize_symbols(symbols) normalized = normalize_symbols(symbols)
rows = [] rows = []
for ticker in spot_client.ticker_24h(normalized): for ticker in spot_client.ticker_stats(normalized, window=window):
rows.append( rows.append(
asdict( asdict(
TickerView( TickerView(
symbol=normalize_symbol(ticker["symbol"]), symbol=normalize_symbol(ticker["symbol"]),
last_price=float(ticker.get("lastPrice") or ticker.get("last_price") or 0.0), last_price=float(ticker.get("lastPrice") or ticker.get("last_price") or 0.0),
price_change_pct=float(ticker.get("priceChangePercent") or ticker.get("price_change_percent") or 0.0), price_change_pct=float(
ticker.get("priceChangePercent") or ticker.get("price_change_percent") or 0.0
),
quote_volume=float(ticker.get("quoteVolume") or ticker.get("quote_volume") or 0.0), quote_volume=float(ticker.get("quoteVolume") or ticker.get("quote_volume") or 0.0),
) )
) )
) )
return {"tickers": rows} return {"tickers": rows, "window": window}
def get_klines( def get_klines(
@@ -101,6 +103,7 @@ def get_scan_universe(
*, *,
spot_client: Any, spot_client: Any,
symbols: list[str] | None = None, symbols: list[str] | None = None,
window: str = "1d",
) -> list[dict[str, Any]]: ) -> list[dict[str, Any]]:
market_config = config.get("market", {}) market_config = config.get("market", {})
opportunity_config = config.get("opportunity", {}) opportunity_config = config.get("opportunity", {})
@@ -114,7 +117,7 @@ def get_scan_universe(
status_map = {normalize_symbol(item["symbol"]): item.get("status", "") for item in exchange_info.get("symbols", [])} status_map = {normalize_symbol(item["symbol"]): item.get("status", "") for item in exchange_info.get("symbols", [])}
rows: list[dict[str, Any]] = [] rows: list[dict[str, Any]] = []
for ticker in spot_client.ticker_24h(list(requested) if requested else None): for ticker in spot_client.ticker_stats(list(requested) if requested else None, window=window):
symbol = normalize_symbol(ticker["symbol"]) symbol = normalize_symbol(ticker["symbol"])
if not symbol.endswith(quote): if not symbol.endswith(quote):
continue continue

View File

@@ -1,14 +1,14 @@
"""Opportunity analysis services.""" """Opportunity scanning services."""
from __future__ import annotations from __future__ import annotations
from dataclasses import asdict, dataclass from dataclasses import asdict, dataclass
from statistics import mean
from typing import Any from typing import Any
from ..audit import audit_event from ..audit import audit_event
from .account_service import get_positions from .account_service import get_positions
from .market_service import base_asset, get_scan_universe, normalize_symbol from .market_service import base_asset, get_scan_universe, normalize_symbol
from .signal_service import get_signal_interval, get_signal_weights, score_market_signal
@dataclass @dataclass
@@ -20,130 +20,25 @@ class OpportunityRecommendation:
metrics: dict[str, float] metrics: dict[str, float]
def _safe_pct(new: float, old: float) -> float: def _opportunity_thresholds(config: dict[str, Any]) -> dict[str, float]:
if old == 0: opportunity_config = config.get("opportunity", {})
return 0.0 return {
return (new - old) / old "entry_threshold": float(opportunity_config.get("entry_threshold", 1.5)),
"watch_threshold": float(opportunity_config.get("watch_threshold", 0.6)),
"overlap_penalty": float(opportunity_config.get("overlap_penalty", 0.6)),
def _score_candidate(closes: list[float], volumes: list[float], ticker: dict[str, Any], weights: dict[str, float], concentration: float) -> tuple[float, dict[str, float]]:
if len(closes) < 2 or not volumes:
return 0.0, {
"trend": 0.0,
"momentum": 0.0,
"breakout": 0.0,
"volume_confirmation": 1.0,
"volatility": 0.0,
"concentration": round(concentration, 4),
}
current = closes[-1]
sma_short = mean(closes[-5:]) if len(closes) >= 5 else current
sma_long = mean(closes[-20:]) if len(closes) >= 20 else mean(closes)
trend = 1.0 if current >= sma_short >= sma_long else -1.0 if current < sma_short < sma_long else 0.0
momentum = (
_safe_pct(closes[-1], closes[-2]) * 0.5
+ (_safe_pct(closes[-1], closes[-5]) * 0.3 if len(closes) >= 5 else 0.0)
+ float(ticker.get("price_change_pct", 0.0)) / 100.0 * 0.2
)
recent_high = max(closes[-20:]) if len(closes) >= 20 else max(closes)
breakout = 1.0 - max((recent_high - current) / recent_high, 0.0)
avg_volume = mean(volumes[:-1]) if len(volumes) > 1 else volumes[-1]
volume_confirmation = volumes[-1] / avg_volume if avg_volume else 1.0
volume_score = min(max(volume_confirmation - 1.0, -1.0), 2.0)
volatility = (max(closes[-10:]) - min(closes[-10:])) / current if len(closes) >= 10 and current else 0.0
score = (
weights.get("trend", 1.0) * trend
+ weights.get("momentum", 1.0) * momentum
+ weights.get("breakout", 0.8) * breakout
+ weights.get("volume", 0.7) * volume_score
- weights.get("volatility_penalty", 0.5) * volatility
- weights.get("position_concentration_penalty", 0.6) * concentration
)
metrics = {
"trend": round(trend, 4),
"momentum": round(momentum, 4),
"breakout": round(breakout, 4),
"volume_confirmation": round(volume_confirmation, 4),
"volatility": round(volatility, 4),
"concentration": round(concentration, 4),
} }
return score, metrics
def _action_for(score: float, concentration: float) -> tuple[str, list[str]]: def _action_for_opportunity(score: float, thresholds: dict[str, float]) -> tuple[str, list[str]]:
reasons: list[str] = [] reasons: list[str] = []
if concentration >= 0.5 and score < 0.4: if score >= thresholds["entry_threshold"]:
reasons.append("position concentration is high") reasons.append("trend, momentum, and breakout are aligned for a fresh entry")
return "trim", reasons return "enter", reasons
if score >= 1.5: if score >= thresholds["watch_threshold"]:
reasons.append("trend, momentum, and breakout are aligned") reasons.append("market structure is constructive but still needs confirmation")
return "add", reasons return "watch", reasons
if score >= 0.6: reasons.append("edge is too weak for a new entry")
reasons.append("trend remains constructive") return "skip", reasons
return "hold", reasons
if score <= -0.2:
reasons.append("momentum and structure have weakened")
return "exit", reasons
reasons.append("signal is mixed and needs confirmation")
return "observe", reasons
def analyze_portfolio(config: dict[str, Any], *, spot_client: Any) -> dict[str, Any]:
quote = str(config.get("market", {}).get("default_quote", "USDT")).upper()
weights = config.get("opportunity", {}).get("weights", {})
positions = get_positions(config, spot_client=spot_client)["positions"]
positions = [item for item in positions if item["symbol"] != quote]
total_notional = sum(item["notional_usdt"] for item in positions) or 1.0
recommendations = []
for position in positions:
symbol = normalize_symbol(position["symbol"])
klines = spot_client.klines(symbol=symbol, interval="1h", limit=24)
closes = [float(item[4]) for item in klines]
volumes = [float(item[5]) for item in klines]
tickers = spot_client.ticker_24h([symbol])
ticker = tickers[0] if tickers else {"priceChangePercent": "0"}
concentration = position["notional_usdt"] / total_notional
score, metrics = _score_candidate(
closes,
volumes,
{
"price_change_pct": float(ticker.get("priceChangePercent") or 0.0),
},
weights,
concentration,
)
action, reasons = _action_for(score, concentration)
recommendations.append(
asdict(
OpportunityRecommendation(
symbol=symbol,
action=action,
score=round(score, 4),
reasons=reasons,
metrics=metrics,
)
)
)
payload = {"recommendations": sorted(recommendations, key=lambda item: item["score"], reverse=True)}
audit_event(
"opportunity_portfolio_generated",
{
"market_type": "spot",
"symbol": None,
"side": None,
"qty": None,
"quote_amount": None,
"order_type": None,
"dry_run": True,
"request_payload": {"mode": "portfolio"},
"response_payload": payload,
"status": "generated",
"error": None,
},
)
return payload
def scan_opportunities( def scan_opportunities(
@@ -153,29 +48,33 @@ def scan_opportunities(
symbols: list[str] | None = None, symbols: list[str] | None = None,
) -> dict[str, Any]: ) -> dict[str, Any]:
opportunity_config = config.get("opportunity", {}) opportunity_config = config.get("opportunity", {})
weights = opportunity_config.get("weights", {}) signal_weights = get_signal_weights(config)
interval = get_signal_interval(config)
thresholds = _opportunity_thresholds(config)
scan_limit = int(opportunity_config.get("scan_limit", 50)) scan_limit = int(opportunity_config.get("scan_limit", 50))
top_n = int(opportunity_config.get("top_n", 10)) top_n = int(opportunity_config.get("top_n", 10))
quote = str(config.get("market", {}).get("default_quote", "USDT")).upper() quote = str(config.get("market", {}).get("default_quote", "USDT")).upper()
held_positions = get_positions(config, spot_client=spot_client)["positions"] held_positions = get_positions(config, spot_client=spot_client)["positions"]
concentration_map = { concentration_map = {normalize_symbol(item["symbol"]): float(item["notional_usdt"]) for item in held_positions}
normalize_symbol(item["symbol"]): float(item["notional_usdt"])
for item in held_positions
}
total_held = sum(concentration_map.values()) or 1.0 total_held = sum(concentration_map.values()) or 1.0
universe = get_scan_universe(config, spot_client=spot_client, symbols=symbols)[:scan_limit] universe = get_scan_universe(config, spot_client=spot_client, symbols=symbols)[:scan_limit]
recommendations = [] recommendations = []
for ticker in universe: for ticker in universe:
symbol = normalize_symbol(ticker["symbol"]) symbol = normalize_symbol(ticker["symbol"])
klines = spot_client.klines(symbol=symbol, interval="1h", limit=24) klines = spot_client.klines(symbol=symbol, interval=interval, limit=24)
closes = [float(item[4]) for item in klines] closes = [float(item[4]) for item in klines]
volumes = [float(item[5]) for item in klines] volumes = [float(item[5]) for item in klines]
concentration = concentration_map.get(symbol, 0.0) / total_held concentration = concentration_map.get(symbol, 0.0) / total_held
score, metrics = _score_candidate(closes, volumes, ticker, weights, concentration) signal_score, metrics = score_market_signal(closes, volumes, ticker, signal_weights)
action, reasons = _action_for(score, concentration) score = signal_score - thresholds["overlap_penalty"] * concentration
action, reasons = _action_for_opportunity(score, thresholds)
metrics["signal_score"] = round(signal_score, 4)
metrics["position_weight"] = round(concentration, 4)
if symbol.endswith(quote): if symbol.endswith(quote):
reasons.append(f"base asset {base_asset(symbol, quote)} passed liquidity and tradability filters") reasons.append(f"base asset {base_asset(symbol, quote)} passed liquidity and tradability filters")
if concentration > 0:
reasons.append("symbol is already held, so the opportunity score is discounted for overlap")
recommendations.append( recommendations.append(
asdict( asdict(
OpportunityRecommendation( OpportunityRecommendation(

View File

@@ -0,0 +1,109 @@
"""Portfolio analysis and position management signals."""
from __future__ import annotations
from dataclasses import asdict, dataclass
from typing import Any
from ..audit import audit_event
from .account_service import get_positions
from .market_service import normalize_symbol
from .signal_service import get_signal_interval, get_signal_weights, score_market_signal
@dataclass
class PortfolioRecommendation:
symbol: str
action: str
score: float
reasons: list[str]
metrics: dict[str, float]
def _portfolio_thresholds(config: dict[str, Any]) -> dict[str, float]:
portfolio_config = config.get("portfolio", {})
return {
"add_threshold": float(portfolio_config.get("add_threshold", 1.5)),
"hold_threshold": float(portfolio_config.get("hold_threshold", 0.6)),
"trim_threshold": float(portfolio_config.get("trim_threshold", 0.2)),
"exit_threshold": float(portfolio_config.get("exit_threshold", -0.2)),
"max_position_weight": float(portfolio_config.get("max_position_weight", 0.6)),
}
def _action_for_position(score: float, concentration: float, thresholds: dict[str, float]) -> tuple[str, list[str]]:
reasons: list[str] = []
max_weight = thresholds["max_position_weight"]
if concentration >= max_weight and score < thresholds["hold_threshold"]:
reasons.append("position weight is above the portfolio risk budget")
return "trim", reasons
if score >= thresholds["add_threshold"] and concentration < max_weight:
reasons.append("market signal is strong and position still has room")
return "add", reasons
if score >= thresholds["hold_threshold"]:
reasons.append("market structure remains supportive for holding")
return "hold", reasons
if score <= thresholds["exit_threshold"]:
reasons.append("market signal has weakened enough to justify an exit review")
return "exit", reasons
if score <= thresholds["trim_threshold"]:
reasons.append("edge has faded and the position should be reduced")
return "trim", reasons
reasons.append("signal is mixed and the position needs review")
return "review", reasons
def analyze_portfolio(config: dict[str, Any], *, spot_client: Any) -> dict[str, Any]:
quote = str(config.get("market", {}).get("default_quote", "USDT")).upper()
signal_weights = get_signal_weights(config)
interval = get_signal_interval(config)
thresholds = _portfolio_thresholds(config)
positions = get_positions(config, spot_client=spot_client)["positions"]
positions = [item for item in positions if item["symbol"] != quote]
total_notional = sum(item["notional_usdt"] for item in positions) or 1.0
recommendations = []
for position in positions:
symbol = normalize_symbol(position["symbol"])
klines = spot_client.klines(symbol=symbol, interval=interval, limit=24)
closes = [float(item[4]) for item in klines]
volumes = [float(item[5]) for item in klines]
tickers = spot_client.ticker_stats([symbol], window="1d")
ticker = tickers[0] if tickers else {"priceChangePercent": "0"}
concentration = position["notional_usdt"] / total_notional
score, metrics = score_market_signal(
closes,
volumes,
{"price_change_pct": float(ticker.get("priceChangePercent") or 0.0)},
signal_weights,
)
action, reasons = _action_for_position(score, concentration, thresholds)
metrics["position_weight"] = round(concentration, 4)
recommendations.append(
asdict(
PortfolioRecommendation(
symbol=symbol,
action=action,
score=round(score, 4),
reasons=reasons,
metrics=metrics,
)
)
)
payload = {"recommendations": sorted(recommendations, key=lambda item: item["score"], reverse=True)}
audit_event(
"opportunity_portfolio_generated",
{
"market_type": "spot",
"symbol": None,
"side": None,
"qty": None,
"quote_amount": None,
"order_type": None,
"dry_run": True,
"request_payload": {"mode": "portfolio"},
"response_payload": payload,
"status": "generated",
"error": None,
},
)
return payload

View File

@@ -0,0 +1,78 @@
"""Shared market signal scoring."""
from __future__ import annotations
from statistics import mean
from typing import Any
def _safe_pct(new: float, old: float) -> float:
if old == 0:
return 0.0
return (new - old) / old
def get_signal_weights(config: dict[str, Any]) -> dict[str, float]:
signal_config = config.get("signal", {})
return {
"trend": float(signal_config.get("trend", 1.0)),
"momentum": float(signal_config.get("momentum", 1.0)),
"breakout": float(signal_config.get("breakout", 0.8)),
"volume": float(signal_config.get("volume", 0.7)),
"volatility_penalty": float(signal_config.get("volatility_penalty", 0.5)),
}
def get_signal_interval(config: dict[str, Any]) -> str:
signal_config = config.get("signal", {})
if signal_config.get("lookback_interval"):
return str(signal_config["lookback_interval"])
return "1h"
def score_market_signal(
closes: list[float],
volumes: list[float],
ticker: dict[str, Any],
weights: dict[str, float],
) -> tuple[float, dict[str, float]]:
if len(closes) < 2 or not volumes:
return 0.0, {
"trend": 0.0,
"momentum": 0.0,
"breakout": 0.0,
"volume_confirmation": 1.0,
"volatility": 0.0,
}
current = closes[-1]
sma_short = mean(closes[-5:]) if len(closes) >= 5 else current
sma_long = mean(closes[-20:]) if len(closes) >= 20 else mean(closes)
trend = 1.0 if current >= sma_short >= sma_long else -1.0 if current < sma_short < sma_long else 0.0
momentum = (
_safe_pct(closes[-1], closes[-2]) * 0.5
+ (_safe_pct(closes[-1], closes[-5]) * 0.3 if len(closes) >= 5 else 0.0)
+ float(ticker.get("price_change_pct", 0.0)) / 100.0 * 0.2
)
recent_high = max(closes[-20:]) if len(closes) >= 20 else max(closes)
breakout = 1.0 - max((recent_high - current) / recent_high, 0.0)
avg_volume = mean(volumes[:-1]) if len(volumes) > 1 else volumes[-1]
volume_confirmation = volumes[-1] / avg_volume if avg_volume else 1.0
volume_score = min(max(volume_confirmation - 1.0, -1.0), 2.0)
volatility = (max(closes[-10:]) - min(closes[-10:])) / current if len(closes) >= 10 and current else 0.0
score = (
weights.get("trend", 1.0) * trend
+ weights.get("momentum", 1.0) * momentum
+ weights.get("breakout", 0.8) * breakout
+ weights.get("volume", 0.7) * volume_score
- weights.get("volatility_penalty", 0.5) * volatility
)
metrics = {
"trend": round(trend, 4),
"momentum": round(momentum, 4),
"breakout": round(breakout, 4),
"volume_confirmation": round(volume_confirmation, 4),
"volatility": round(volatility, 4),
}
return score, metrics

View File

@@ -40,7 +40,9 @@ def _default_dry_run(config: dict[str, Any], dry_run: bool | None) -> bool:
return bool(config.get("trading", {}).get("dry_run_default", False)) return bool(config.get("trading", {}).get("dry_run_default", False))
def _trade_log_payload(intent: TradeIntent, payload: dict[str, Any], *, status: str, error: str | None = None) -> dict[str, Any]: def _trade_log_payload(
intent: TradeIntent, payload: dict[str, Any], *, status: str, error: str | None = None
) -> dict[str, Any]:
return { return {
"market_type": intent.market_type, "market_type": intent.market_type,
"symbol": intent.symbol, "symbol": intent.symbol,
@@ -110,7 +112,7 @@ def execute_spot_trade(
dry_run=is_dry_run, dry_run=is_dry_run,
) )
audit_event("trade_submitted", _trade_log_payload(intent, payload, status="submitted")) audit_event("trade_submitted", _trade_log_payload(intent, payload, status="submitted"), dry_run=intent.dry_run)
if is_dry_run: if is_dry_run:
response = {"dry_run": True, "status": "DRY_RUN", "request": payload} response = {"dry_run": True, "status": "DRY_RUN", "request": payload}
result = asdict( result = asdict(
@@ -125,13 +127,15 @@ def execute_spot_trade(
response_payload=response, response_payload=response,
) )
) )
audit_event("trade_filled", {**_trade_log_payload(intent, payload, status="DRY_RUN"), "response_payload": response}) audit_event(
"trade_filled", {**_trade_log_payload(intent, payload, status="DRY_RUN"), "response_payload": response}, dry_run=intent.dry_run
)
return {"trade": result} return {"trade": result}
try: try:
response = spot_client.new_order(**payload) response = spot_client.new_order(**payload)
except Exception as exc: except Exception as exc:
audit_event("trade_failed", _trade_log_payload(intent, payload, status="failed", error=str(exc))) audit_event("trade_failed", _trade_log_payload(intent, payload, status="failed", error=str(exc)), dry_run=intent.dry_run)
raise RuntimeError(f"Spot order failed: {exc}") from exc raise RuntimeError(f"Spot order failed: {exc}") from exc
result = asdict( result = asdict(
@@ -146,5 +150,8 @@ def execute_spot_trade(
response_payload=response, response_payload=response,
) )
) )
audit_event("trade_filled", {**_trade_log_payload(intent, payload, status=result["status"]), "response_payload": response}) audit_event(
"trade_filled", {**_trade_log_payload(intent, payload, status=result["status"]), "response_payload": response},
dry_run=intent.dry_run,
)
return {"trade": result} return {"trade": result}

View File

@@ -26,11 +26,32 @@ class FakeSpotClient:
return list(prices.values()) return list(prices.values())
return [prices[symbol] for symbol in symbols] return [prices[symbol] for symbol in symbols]
def ticker_24h(self, symbols=None): def ticker_stats(self, symbols=None, *, window="1d"):
rows = [ rows = [
{"symbol": "BTCUSDT", "lastPrice": "60000", "priceChangePercent": "4.5", "quoteVolume": "10000000", "highPrice": "61000", "lowPrice": "58000"}, {
{"symbol": "ETHUSDT", "lastPrice": "3000", "priceChangePercent": "3.0", "quoteVolume": "8000000", "highPrice": "3050", "lowPrice": "2900"}, "symbol": "BTCUSDT",
{"symbol": "DOGEUSDT", "lastPrice": "0.1", "priceChangePercent": "1.0", "quoteVolume": "200", "highPrice": "0.11", "lowPrice": "0.09"}, "lastPrice": "60000",
"priceChangePercent": "4.5",
"quoteVolume": "10000000",
"highPrice": "61000",
"lowPrice": "58000",
},
{
"symbol": "ETHUSDT",
"lastPrice": "3000",
"priceChangePercent": "3.0",
"quoteVolume": "8000000",
"highPrice": "3050",
"lowPrice": "2900",
},
{
"symbol": "DOGEUSDT",
"lastPrice": "0.1",
"priceChangePercent": "1.0",
"quoteVolume": "200",
"highPrice": "0.11",
"lowPrice": "0.09",
},
] ]
if not symbols: if not symbols:
return rows return rows
@@ -38,23 +59,29 @@ class FakeSpotClient:
return [row for row in rows if row["symbol"] in wanted] return [row for row in rows if row["symbol"] in wanted]
def exchange_info(self): def exchange_info(self):
return {"symbols": [{"symbol": "BTCUSDT", "status": "TRADING"}, {"symbol": "ETHUSDT", "status": "TRADING"}, {"symbol": "DOGEUSDT", "status": "BREAK"}]} return {
"symbols": [
{"symbol": "BTCUSDT", "status": "TRADING"},
{"symbol": "ETHUSDT", "status": "TRADING"},
{"symbol": "DOGEUSDT", "status": "BREAK"},
]
}
class AccountMarketServicesTestCase(unittest.TestCase): class AccountMarketServicesTestCase(unittest.TestCase):
def test_account_overview_and_dust_filter(self): def test_get_balances_with_dust_flag(self):
config = { config = {
"market": {"default_quote": "USDT"}, "market": {"default_quote": "USDT"},
"trading": {"dust_usdt_threshold": 10.0}, "trading": {"dust_usdt_threshold": 10.0},
} }
payload = account_service.get_overview( payload = account_service.get_balances(
config, config,
spot_client=FakeSpotClient(), spot_client=FakeSpotClient(),
) )
self.assertEqual(payload["overview"]["total_equity_usdt"], 720.1) balances = {item["asset"]: item for item in payload["balances"]}
symbols = {item["symbol"] for item in payload["positions"]} self.assertFalse(balances["USDT"]["is_dust"])
self.assertNotIn("DOGEUSDT", symbols) self.assertFalse(balances["BTC"]["is_dust"])
self.assertIn("BTCUSDT", symbols) self.assertTrue(balances["DOGE"]["is_dust"])
def test_market_tickers_and_scan_universe(self): def test_market_tickers_and_scan_universe(self):
config = { config = {

View File

@@ -17,15 +17,22 @@ class CLITestCase(unittest.TestCase):
self.assertIn("account", help_text) self.assertIn("account", help_text)
self.assertIn("buy", help_text) self.assertIn("buy", help_text)
self.assertIn("sell", help_text) self.assertIn("sell", help_text)
self.assertIn("portfolio", help_text)
self.assertIn("opportunity", help_text) self.assertIn("opportunity", help_text)
self.assertIn("--doc", help_text) self.assertIn("--doc", help_text)
def test_init_dispatches(self): def test_init_dispatches(self):
captured = {} captured = {}
with patch.object(cli, "ensure_init_files", return_value={"force": True, "root": "/tmp/ch"}), patch.object( with (
cli, "install_shell_completion", return_value={"shell": "zsh", "installed": True, "path": "/tmp/ch/_coinhunter"} patch.object(cli, "ensure_init_files", return_value={"force": True, "root": "/tmp/ch"}),
), patch.object( patch.object(
cli, "print_output", side_effect=lambda payload, **kwargs: captured.setdefault("payload", payload) cli,
"install_shell_completion",
return_value={"shell": "zsh", "installed": True, "path": "/tmp/ch/_coinhunter"},
),
patch.object(
cli, "print_output", side_effect=lambda payload, **kwargs: captured.setdefault("payload", payload)
),
): ):
result = cli.main(["init", "--force"]) result = cli.main(["init", "--force"])
self.assertEqual(result, 0) self.assertEqual(result, 0)
@@ -73,23 +80,171 @@ class CLITestCase(unittest.TestCase):
self.assertEqual(result, 0) self.assertEqual(result, 0)
self.assertEqual(captured["payload"]["trade"]["status"], "DRY_RUN") self.assertEqual(captured["payload"]["trade"]["status"], "DRY_RUN")
def test_doc_flag_prints_documentation(self): def test_doc_flag_prints_tui_documentation(self):
import io
from unittest.mock import patch
stdout = io.StringIO() stdout = io.StringIO()
with patch("sys.stdout", stdout): with patch("sys.stdout", stdout):
result = cli.main(["market", "tickers", "--doc"]) result = cli.main(["market", "tickers", "--doc"])
self.assertEqual(result, 0) self.assertEqual(result, 0)
output = stdout.getvalue() output = stdout.getvalue()
self.assertIn("lastPrice", output) self.assertIn("TUI Output", output)
self.assertIn("Last Price", output)
self.assertIn("BTCUSDT", output) self.assertIn("BTCUSDT", output)
def test_doc_flag_prints_json_documentation(self):
stdout = io.StringIO()
with patch("sys.stdout", stdout):
result = cli.main(["market", "tickers", "--doc", "--agent"])
self.assertEqual(result, 0)
output = stdout.getvalue()
self.assertIn("JSON Output", output)
self.assertIn("last_price", output)
self.assertIn("BTCUSDT", output)
def test_account_dispatches(self):
captured = {}
with (
patch.object(
cli, "load_config", return_value={"binance": {"spot_base_url": "https://test", "recv_window": 5000}, "market": {"default_quote": "USDT"}, "trading": {"dust_usdt_threshold": 10.0}}
),
patch.object(cli, "get_binance_credentials", return_value={"api_key": "k", "api_secret": "s"}),
patch.object(cli, "SpotBinanceClient"),
patch.object(
cli.account_service, "get_balances", return_value={"balances": [{"asset": "BTC", "is_dust": False}]}
),
patch.object(
cli, "print_output", side_effect=lambda payload, **kwargs: captured.setdefault("payload", payload)
),
):
result = cli.main(["account"])
self.assertEqual(result, 0)
self.assertEqual(captured["payload"]["balances"][0]["asset"], "BTC")
def test_upgrade_dispatches(self): def test_upgrade_dispatches(self):
captured = {} captured = {}
with patch.object(cli, "self_upgrade", return_value={"command": "pipx upgrade coinhunter", "returncode": 0}), patch.object( with (
cli, "print_output", side_effect=lambda payload, **kwargs: captured.setdefault("payload", payload) patch.object(cli, "self_upgrade", return_value={"command": "pipx upgrade coinhunter", "returncode": 0}),
patch.object(
cli, "print_output", side_effect=lambda payload, **kwargs: captured.setdefault("payload", payload)
),
): ):
result = cli.main(["upgrade"]) result = cli.main(["upgrade"])
self.assertEqual(result, 0) self.assertEqual(result, 0)
self.assertEqual(captured["payload"]["returncode"], 0) self.assertEqual(captured["payload"]["returncode"], 0)
def test_portfolio_dispatches(self):
captured = {}
with (
patch.object(
cli, "load_config", return_value={"binance": {"spot_base_url": "https://test", "recv_window": 5000}, "market": {"default_quote": "USDT"}, "opportunity": {"top_n": 10}}
),
patch.object(cli, "get_binance_credentials", return_value={"api_key": "k", "api_secret": "s"}),
patch.object(cli, "SpotBinanceClient"),
patch.object(
cli.portfolio_service, "analyze_portfolio", return_value={"recommendations": [{"symbol": "BTCUSDT", "score": 0.75}]}
),
patch.object(
cli, "print_output", side_effect=lambda payload, **kwargs: captured.setdefault("payload", payload)
),
):
result = cli.main(["portfolio"])
self.assertEqual(result, 0)
self.assertEqual(captured["payload"]["recommendations"][0]["symbol"], "BTCUSDT")
def test_opportunity_dispatches(self):
captured = {}
with (
patch.object(
cli, "load_config", return_value={"binance": {"spot_base_url": "https://test", "recv_window": 5000}, "market": {"default_quote": "USDT"}, "opportunity": {"top_n": 10}}
),
patch.object(cli, "get_binance_credentials", return_value={"api_key": "k", "api_secret": "s"}),
patch.object(cli, "SpotBinanceClient"),
patch.object(
cli.opportunity_service,
"scan_opportunities",
return_value={"recommendations": [{"symbol": "BTCUSDT", "score": 0.82}]},
),
patch.object(
cli, "print_output", side_effect=lambda payload, **kwargs: captured.setdefault("payload", payload)
),
):
result = cli.main(["opportunity", "-s", "BTCUSDT", "ETHUSDT"])
self.assertEqual(result, 0)
self.assertEqual(captured["payload"]["recommendations"][0]["symbol"], "BTCUSDT")
def test_catlog_dispatches(self):
captured = {}
with (
patch.object(
cli, "read_audit_log", return_value=[{"timestamp": "2026-04-17T12:00:00Z", "event": "test_event"}]
),
patch.object(
cli, "print_output", side_effect=lambda payload, **kwargs: captured.setdefault("payload", payload)
),
):
result = cli.main(["catlog", "-n", "5", "-o", "10"])
self.assertEqual(result, 0)
self.assertEqual(captured["payload"]["limit"], 5)
self.assertEqual(captured["payload"]["offset"], 10)
self.assertIn("entries", captured["payload"])
self.assertEqual(captured["payload"]["total"], 1)
def test_config_get_dispatches(self):
captured = {}
with (
patch.object(cli, "load_config", return_value={"binance": {"recv_window": 5000}}),
patch.object(
cli, "print_output", side_effect=lambda payload, **kwargs: captured.setdefault("payload", payload)
),
):
result = cli.main(["config", "get", "binance.recv_window"])
self.assertEqual(result, 0)
self.assertEqual(captured["payload"]["binance.recv_window"], 5000)
def test_config_set_dispatches(self):
import tempfile
with tempfile.NamedTemporaryFile(mode="w", suffix=".toml", delete=False) as f:
f.write('[binance]\nrecv_window = 5000\n')
tmp_path = f.name
with patch.object(cli, "get_runtime_paths") as mock_paths:
mock_paths.return_value.config_file = __import__("pathlib").Path(tmp_path)
result = cli.main(["config", "set", "binance.recv_window", "10000"])
self.assertEqual(result, 0)
# Verify the file was updated
content = __import__("pathlib").Path(tmp_path).read_text()
self.assertIn("recv_window = 10000", content)
__import__("os").unlink(tmp_path)
def test_config_key_dispatches(self):
import tempfile
with tempfile.NamedTemporaryFile(mode="w", suffix=".env", delete=False) as f:
f.write("BINANCE_API_KEY=\n")
tmp_path = f.name
with patch.object(cli, "get_runtime_paths") as mock_paths:
mock_paths.return_value.env_file = __import__("pathlib").Path(tmp_path)
result = cli.main(["config", "key", "test_key_value"])
self.assertEqual(result, 0)
content = __import__("pathlib").Path(tmp_path).read_text()
self.assertIn("BINANCE_API_KEY=test_key_value", content)
__import__("os").unlink(tmp_path)
def test_config_secret_dispatches(self):
import tempfile
with tempfile.NamedTemporaryFile(mode="w", suffix=".env", delete=False) as f:
f.write("BINANCE_API_SECRET=\n")
tmp_path = f.name
with patch.object(cli, "get_runtime_paths") as mock_paths:
mock_paths.return_value.env_file = __import__("pathlib").Path(tmp_path)
result = cli.main(["config", "secret", "test_secret_value"])
self.assertEqual(result, 0)
content = __import__("pathlib").Path(tmp_path).read_text()
self.assertIn("BINANCE_API_SECRET=test_secret_value", content)
__import__("os").unlink(tmp_path)

View File

@@ -8,13 +8,21 @@ import unittest
from pathlib import Path from pathlib import Path
from unittest.mock import patch from unittest.mock import patch
from coinhunter.config import ensure_init_files, get_binance_credentials, load_config, load_env_file from coinhunter.config import (
ensure_init_files,
get_binance_credentials,
load_config,
load_env_file,
)
from coinhunter.runtime import get_runtime_paths from coinhunter.runtime import get_runtime_paths
class ConfigRuntimeTestCase(unittest.TestCase): class ConfigRuntimeTestCase(unittest.TestCase):
def test_init_files_created_in_coinhunter_home(self): def test_init_files_created_in_coinhunter_home(self):
with tempfile.TemporaryDirectory() as tmp_dir, patch.dict(os.environ, {"COINHUNTER_HOME": str(Path(tmp_dir) / "home")}, clear=False): with (
tempfile.TemporaryDirectory() as tmp_dir,
patch.dict(os.environ, {"COINHUNTER_HOME": str(Path(tmp_dir) / "home")}, clear=False),
):
paths = get_runtime_paths() paths = get_runtime_paths()
payload = ensure_init_files(paths) payload = ensure_init_files(paths)
self.assertTrue(paths.config_file.exists()) self.assertTrue(paths.config_file.exists())
@@ -23,10 +31,13 @@ class ConfigRuntimeTestCase(unittest.TestCase):
self.assertEqual(payload["root"], str(paths.root)) self.assertEqual(payload["root"], str(paths.root))
def test_load_config_and_env(self): def test_load_config_and_env(self):
with tempfile.TemporaryDirectory() as tmp_dir, patch.dict( with (
os.environ, tempfile.TemporaryDirectory() as tmp_dir,
{"COINHUNTER_HOME": str(Path(tmp_dir) / "home")}, patch.dict(
clear=False, os.environ,
{"COINHUNTER_HOME": str(Path(tmp_dir) / "home")},
clear=False,
),
): ):
paths = get_runtime_paths() paths = get_runtime_paths()
ensure_init_files(paths) ensure_init_files(paths)
@@ -40,10 +51,13 @@ class ConfigRuntimeTestCase(unittest.TestCase):
self.assertEqual(os.environ["BINANCE_API_SECRET"], "def") self.assertEqual(os.environ["BINANCE_API_SECRET"], "def")
def test_env_file_overrides_existing_environment(self): def test_env_file_overrides_existing_environment(self):
with tempfile.TemporaryDirectory() as tmp_dir, patch.dict( with (
os.environ, tempfile.TemporaryDirectory() as tmp_dir,
{"COINHUNTER_HOME": str(Path(tmp_dir) / "home"), "BINANCE_API_KEY": "old_key"}, patch.dict(
clear=False, os.environ,
{"COINHUNTER_HOME": str(Path(tmp_dir) / "home"), "BINANCE_API_KEY": "old_key"},
clear=False,
),
): ):
paths = get_runtime_paths() paths = get_runtime_paths()
ensure_init_files(paths) ensure_init_files(paths)
@@ -55,10 +69,13 @@ class ConfigRuntimeTestCase(unittest.TestCase):
self.assertEqual(os.environ["BINANCE_API_SECRET"], "new_secret") self.assertEqual(os.environ["BINANCE_API_SECRET"], "new_secret")
def test_missing_credentials_raise(self): def test_missing_credentials_raise(self):
with tempfile.TemporaryDirectory() as tmp_dir, patch.dict( with (
os.environ, tempfile.TemporaryDirectory() as tmp_dir,
{"COINHUNTER_HOME": str(Path(tmp_dir) / "home")}, patch.dict(
clear=False, os.environ,
{"COINHUNTER_HOME": str(Path(tmp_dir) / "home")},
clear=False,
),
): ):
os.environ.pop("BINANCE_API_KEY", None) os.environ.pop("BINANCE_API_KEY", None)
os.environ.pop("BINANCE_API_SECRET", None) os.environ.pop("BINANCE_API_SECRET", None)
@@ -68,12 +85,17 @@ class ConfigRuntimeTestCase(unittest.TestCase):
get_binance_credentials(paths) get_binance_credentials(paths)
def test_permission_error_is_explained(self): def test_permission_error_is_explained(self):
with tempfile.TemporaryDirectory() as tmp_dir, patch.dict( with (
os.environ, tempfile.TemporaryDirectory() as tmp_dir,
{"COINHUNTER_HOME": str(Path(tmp_dir) / "home")}, patch.dict(
clear=False, os.environ,
{"COINHUNTER_HOME": str(Path(tmp_dir) / "home")},
clear=False,
),
): ):
paths = get_runtime_paths() paths = get_runtime_paths()
with patch("coinhunter.config.ensure_runtime_dirs", side_effect=PermissionError("no write access")): with (
with self.assertRaisesRegex(RuntimeError, "Set COINHUNTER_HOME to a writable directory"): patch("coinhunter.config.ensure_runtime_dirs", side_effect=PermissionError("no write access")),
ensure_init_files(paths) self.assertRaisesRegex(RuntimeError, "Set COINHUNTER_HOME to a writable directory"),
):
ensure_init_files(paths)

View File

@@ -1,11 +1,11 @@
"""Opportunity service tests.""" """Signal, opportunity, and portfolio service tests."""
from __future__ import annotations from __future__ import annotations
import unittest import unittest
from unittest.mock import patch from unittest.mock import patch
from coinhunter.services import opportunity_service from coinhunter.services import opportunity_service, portfolio_service, signal_service
class FakeSpotClient: class FakeSpotClient:
@@ -27,19 +27,54 @@ class FakeSpotClient:
} }
return [mapping[symbol] for symbol in symbols] return [mapping[symbol] for symbol in symbols]
def ticker_24h(self, symbols=None): def ticker_stats(self, symbols=None, *, window="1d"):
rows = { rows = {
"BTCUSDT": {"symbol": "BTCUSDT", "lastPrice": "60000", "priceChangePercent": "5", "quoteVolume": "9000000", "highPrice": "60200", "lowPrice": "55000"}, "BTCUSDT": {
"ETHUSDT": {"symbol": "ETHUSDT", "lastPrice": "3000", "priceChangePercent": "3", "quoteVolume": "8000000", "highPrice": "3100", "lowPrice": "2800"}, "symbol": "BTCUSDT",
"SOLUSDT": {"symbol": "SOLUSDT", "lastPrice": "150", "priceChangePercent": "8", "quoteVolume": "10000000", "highPrice": "152", "lowPrice": "130"}, "lastPrice": "60000",
"DOGEUSDT": {"symbol": "DOGEUSDT", "lastPrice": "0.1", "priceChangePercent": "1", "quoteVolume": "100", "highPrice": "0.11", "lowPrice": "0.09"}, "priceChangePercent": "5",
"quoteVolume": "9000000",
"highPrice": "60200",
"lowPrice": "55000",
},
"ETHUSDT": {
"symbol": "ETHUSDT",
"lastPrice": "3000",
"priceChangePercent": "3",
"quoteVolume": "8000000",
"highPrice": "3100",
"lowPrice": "2800",
},
"SOLUSDT": {
"symbol": "SOLUSDT",
"lastPrice": "150",
"priceChangePercent": "8",
"quoteVolume": "10000000",
"highPrice": "152",
"lowPrice": "130",
},
"DOGEUSDT": {
"symbol": "DOGEUSDT",
"lastPrice": "0.1",
"priceChangePercent": "1",
"quoteVolume": "100",
"highPrice": "0.11",
"lowPrice": "0.09",
},
} }
if not symbols: if not symbols:
return list(rows.values()) return list(rows.values())
return [rows[symbol] for symbol in symbols] return [rows[symbol] for symbol in symbols]
def exchange_info(self): def exchange_info(self):
return {"symbols": [{"symbol": "BTCUSDT", "status": "TRADING"}, {"symbol": "ETHUSDT", "status": "TRADING"}, {"symbol": "SOLUSDT", "status": "TRADING"}, {"symbol": "DOGEUSDT", "status": "TRADING"}]} return {
"symbols": [
{"symbol": "BTCUSDT", "status": "TRADING"},
{"symbol": "ETHUSDT", "status": "TRADING"},
{"symbol": "SOLUSDT", "status": "TRADING"},
{"symbol": "DOGEUSDT", "status": "TRADING"},
]
}
def klines(self, symbol, interval, limit): def klines(self, symbol, interval, limit):
curves = { curves = {
@@ -50,7 +85,18 @@ class FakeSpotClient:
}[symbol] }[symbol]
rows = [] rows = []
for index, close in enumerate(curves[-limit:]): for index, close in enumerate(curves[-limit:]):
rows.append([index, close * 0.98, close * 1.01, close * 0.97, close, 100 + index * 10, index + 1, close * (100 + index * 10)]) rows.append(
[
index,
close * 0.98,
close * 1.01,
close * 0.97,
close,
100 + index * 10,
index + 1,
close * (100 + index * 10),
]
)
return rows return rows
@@ -59,36 +105,51 @@ class OpportunityServiceTestCase(unittest.TestCase):
self.config = { self.config = {
"market": {"default_quote": "USDT", "universe_allowlist": [], "universe_denylist": []}, "market": {"default_quote": "USDT", "universe_allowlist": [], "universe_denylist": []},
"trading": {"dust_usdt_threshold": 10.0}, "trading": {"dust_usdt_threshold": 10.0},
"signal": {
"lookback_interval": "1h",
"trend": 1.0,
"momentum": 1.0,
"breakout": 0.8,
"volume": 0.7,
"volatility_penalty": 0.5,
},
"opportunity": { "opportunity": {
"scan_limit": 10, "scan_limit": 10,
"top_n": 5, "top_n": 5,
"min_quote_volume": 1000.0, "min_quote_volume": 1000.0,
"weights": { "entry_threshold": 1.5,
"trend": 1.0, "watch_threshold": 0.6,
"momentum": 1.0, "overlap_penalty": 0.6,
"breakout": 0.8, },
"volume": 0.7, "portfolio": {
"volatility_penalty": 0.5, "add_threshold": 1.5,
"position_concentration_penalty": 0.6, "hold_threshold": 0.6,
}, "trim_threshold": 0.2,
"exit_threshold": -0.2,
"max_position_weight": 0.6,
}, },
} }
def test_portfolio_analysis_ignores_dust_and_emits_recommendations(self): def test_portfolio_analysis_ignores_dust_and_emits_recommendations(self):
events = [] events = []
with patch.object(opportunity_service, "audit_event", side_effect=lambda event, payload: events.append(event)): with patch.object(portfolio_service, "audit_event", side_effect=lambda event, payload, **kwargs: events.append(event)):
payload = opportunity_service.analyze_portfolio(self.config, spot_client=FakeSpotClient()) payload = portfolio_service.analyze_portfolio(self.config, spot_client=FakeSpotClient())
symbols = [item["symbol"] for item in payload["recommendations"]] symbols = [item["symbol"] for item in payload["recommendations"]]
self.assertNotIn("DOGEUSDT", symbols) self.assertNotIn("DOGEUSDT", symbols)
self.assertEqual(symbols, ["BTCUSDT", "ETHUSDT"]) self.assertEqual(symbols, ["BTCUSDT", "ETHUSDT"])
self.assertEqual(payload["recommendations"][0]["action"], "add")
self.assertEqual(payload["recommendations"][1]["action"], "hold")
self.assertEqual(events, ["opportunity_portfolio_generated"]) self.assertEqual(events, ["opportunity_portfolio_generated"])
def test_scan_is_deterministic(self): def test_scan_is_deterministic(self):
with patch.object(opportunity_service, "audit_event", return_value=None): with patch.object(opportunity_service, "audit_event", return_value=None):
payload = opportunity_service.scan_opportunities(self.config | {"opportunity": self.config["opportunity"] | {"top_n": 2}}, spot_client=FakeSpotClient()) payload = opportunity_service.scan_opportunities(
self.config | {"opportunity": self.config["opportunity"] | {"top_n": 2}}, spot_client=FakeSpotClient()
)
self.assertEqual([item["symbol"] for item in payload["recommendations"]], ["SOLUSDT", "BTCUSDT"]) self.assertEqual([item["symbol"] for item in payload["recommendations"]], ["SOLUSDT", "BTCUSDT"])
self.assertEqual([item["action"] for item in payload["recommendations"]], ["enter", "enter"])
def test_score_candidate_handles_empty_klines(self): def test_signal_score_handles_empty_klines(self):
score, metrics = opportunity_service._score_candidate([], [], {"price_change_pct": 1.0}, {}, 0.0) score, metrics = signal_service.score_market_signal([], [], {"price_change_pct": 1.0}, {})
self.assertEqual(score, 0.0) self.assertEqual(score, 0.0)
self.assertEqual(metrics["trend"], 0.0) self.assertEqual(metrics["trend"], 0.0)

View File

@@ -20,7 +20,9 @@ class FakeSpotClient:
class TradeServiceTestCase(unittest.TestCase): class TradeServiceTestCase(unittest.TestCase):
def test_spot_market_buy_dry_run_does_not_call_client(self): def test_spot_market_buy_dry_run_does_not_call_client(self):
events = [] events = []
with patch.object(trade_service, "audit_event", side_effect=lambda event, payload: events.append((event, payload))): with patch.object(
trade_service, "audit_event", side_effect=lambda event, payload, **kwargs: events.append((event, payload))
):
client = FakeSpotClient() client = FakeSpotClient()
payload = trade_service.execute_spot_trade( payload = trade_service.execute_spot_trade(
{"trading": {"dry_run_default": False}}, {"trading": {"dry_run_default": False}},
@@ -55,9 +57,11 @@ class TradeServiceTestCase(unittest.TestCase):
self.assertEqual(client.calls[0]["timeInForce"], "GTC") self.assertEqual(client.calls[0]["timeInForce"], "GTC")
def test_spot_market_buy_requires_quote(self): def test_spot_market_buy_requires_quote(self):
with patch.object(trade_service, "audit_event", return_value=None): with (
with self.assertRaisesRegex(RuntimeError, "requires --quote"): patch.object(trade_service, "audit_event", return_value=None),
trade_service.execute_spot_trade( self.assertRaisesRegex(RuntimeError, "requires --quote"),
):
trade_service.execute_spot_trade(
{"trading": {"dry_run_default": False}}, {"trading": {"dry_run_default": False}},
side="buy", side="buy",
symbol="BTCUSDT", symbol="BTCUSDT",
@@ -70,9 +74,11 @@ class TradeServiceTestCase(unittest.TestCase):
) )
def test_spot_market_buy_rejects_qty(self): def test_spot_market_buy_rejects_qty(self):
with patch.object(trade_service, "audit_event", return_value=None): with (
with self.assertRaisesRegex(RuntimeError, "accepts --quote only"): patch.object(trade_service, "audit_event", return_value=None),
trade_service.execute_spot_trade( self.assertRaisesRegex(RuntimeError, "accepts --quote only"),
):
trade_service.execute_spot_trade(
{"trading": {"dry_run_default": False}}, {"trading": {"dry_run_default": False}},
side="buy", side="buy",
symbol="BTCUSDT", symbol="BTCUSDT",
@@ -85,9 +91,11 @@ class TradeServiceTestCase(unittest.TestCase):
) )
def test_spot_market_sell_rejects_quote(self): def test_spot_market_sell_rejects_quote(self):
with patch.object(trade_service, "audit_event", return_value=None): with (
with self.assertRaisesRegex(RuntimeError, "accepts --qty only"): patch.object(trade_service, "audit_event", return_value=None),
trade_service.execute_spot_trade( self.assertRaisesRegex(RuntimeError, "accepts --qty only"),
):
trade_service.execute_spot_trade(
{"trading": {"dry_run_default": False}}, {"trading": {"dry_run_default": False}},
side="sell", side="sell",
symbol="BTCUSDT", symbol="BTCUSDT",