Compare commits

6 Commits

Author SHA1 Message Date
69f447f538 chore: release v3.0.0
- Bump version to 3.0.0 in pyproject.toml
- Update README with What's New section and new command examples
  (--window for tickers, --dry-run for catlog)

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-20 17:07:49 +08:00
1da08415f1 feat: split portfolio and opportunity decision models 2026-04-20 16:13:57 +08:00
4312b16288 feat: configurable ticker window for market stats (1h, 4h, 1d)
- Replace hardcoded ticker_24h with ticker_stats supporting configurable window
- Add -w/--window flag to `market tickers` (choices: 1h, 4h, 1d, default 1d)
- Update TUI title and JSON output to include window field
- Keep opportunity/pf service on 1d default
- Sync tests and doc comments

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-20 11:11:11 +08:00
cf26a3dd3a feat: split audit logs into live/dryrun subdirs, add catlog --dry-run, list all kline intervals
- Write live trades to logs/live/ and dry-run trades to logs/dryrun/
- Add -d/--dry-run flag to catlog to read dry-run audit logs
- List all 16 Binance kline interval options in --help and docs

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-20 10:27:22 +08:00
e37993c8b5 feat: flatten opportunity commands, add config management, fix completions
- Flatten opportunity into top-level portfolio and opportunity commands
- Add interactive config get/set/key/secret with type coercion
- Rewrite --doc to show TUI vs JSON schema per command
- Unify agent mode output to JSON only
- Make init prompt for API key/secret interactively
- Fix coin tab completion alias binding
- Fix set_config_value reading from wrong path
- Fail loudly on invalid numeric config values

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-20 08:43:30 +08:00
3855477155 refactor: flatten account command to a single balances view
Remove overview/balances/positions subcommands in favor of one
`account` command that returns all balances with an `is_dust` flag.
Add descriptions to every parser and expose -a/--agent and --doc
on all leaf commands for better help discoverability.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-17 18:19:19 +08:00
18 changed files with 1398 additions and 504 deletions

View File

@@ -20,8 +20,10 @@ CoinHunter V2 is a Binance-first crypto trading CLI with a flat, direct architec
- **`src/coinhunter/services/`** — Contains all domain logic: - **`src/coinhunter/services/`** — Contains all domain logic:
- `account_service.py` — balances, positions, overview - `account_service.py` — balances, positions, overview
- `market_service.py` — tickers, klines, scan universe, symbol normalization - `market_service.py` — tickers, klines, scan universe, symbol normalization
- `signal_service.py` — shared market signal scoring used by scan and portfolio analysis
- `portfolio_service.py` — held-position review and add/hold/trim/exit recommendations
- `trade_service.py` — spot and USDT-M futures order execution - `trade_service.py` — spot and USDT-M futures order execution
- `opportunity_service.py`portfolio recommendations and market scanning - `opportunity_service.py`market scanning and entry/watch/skip recommendations
- **`src/coinhunter/binance/`** — Thin wrappers around official Binance connectors: - **`src/coinhunter/binance/`** — Thin wrappers around official Binance connectors:
- `spot_client.py` wraps `binance.spot.Spot` - `spot_client.py` wraps `binance.spot.Spot`
- `um_futures_client.py` wraps `binance.um_futures.UMFutures` - `um_futures_client.py` wraps `binance.um_futures.UMFutures`
@@ -34,7 +36,7 @@ CoinHunter V2 is a Binance-first crypto trading CLI with a flat, direct architec
User data lives in `~/.coinhunter/` by default (override with `COINHUNTER_HOME`): User data lives in `~/.coinhunter/` by default (override with `COINHUNTER_HOME`):
- `config.toml` — runtime, binance, trading, and opportunity settings - `config.toml` — runtime, binance, trading, signal, opportunity, and portfolio settings
- `.env``BINANCE_API_KEY` and `BINANCE_API_SECRET` - `.env``BINANCE_API_KEY` and `BINANCE_API_SECRET`
- `logs/audit_YYYYMMDD.jsonl` — structured audit log - `logs/audit_YYYYMMDD.jsonl` — structured audit log

View File

@@ -19,6 +19,14 @@
--- ---
## What's New in 3.0
- **Split decision models** — portfolio (add/hold/trim/exit) and opportunity (enter/watch/skip) now use independent scoring logic.
- **Configurable ticker windows** — `market tickers` supports `--window 1h`, `4h`, or `1d`.
- **Live / dry-run audit logs** — audit logs are written to separate subdirectories; use `catlog --dry-run` to review simulations.
- **Flattened commands** — `account`, `opportunity`, and `config` are now top-level for fewer keystrokes.
- **Runtime config management** — `config get`, `config set`, and `config key/secret` let you edit settings without touching files manually.
## Install ## Install
For end users, install from PyPI with [pipx](https://pipx.pypa.io/) (recommended) to avoid polluting your system Python: For end users, install from PyPI with [pipx](https://pipx.pypa.io/) (recommended) to avoid polluting your system Python:
@@ -61,6 +69,8 @@ This creates:
If you are using **zsh** or **bash**, `init` will also generate and install shell completion scripts automatically, and update your rc file (`~/.zshrc` or `~/.bashrc`) if needed. If you are using **zsh** or **bash**, `init` will also generate and install shell completion scripts automatically, and update your rc file (`~/.zshrc` or `~/.bashrc`) if needed.
`init` interactively prompts for your Binance API key and secret if they are missing. Use `--no-prompt` to skip this.
`config.toml` stores runtime and strategy settings. `.env` stores: `config.toml` stores runtime and strategy settings. `.env` stores:
```bash ```bash
@@ -68,6 +78,12 @@ BINANCE_API_KEY=
BINANCE_API_SECRET= BINANCE_API_SECRET=
``` ```
Strategy settings are split into three blocks:
- `[signal]` for shared market-signal weights and lookback interval
- `[opportunity]` for scan thresholds, liquidity filters, and top-N output
- `[portfolio]` for add/hold/trim/exit thresholds and max position weight
Override the default home directory with `COINHUNTER_HOME`. Override the default home directory with `COINHUNTER_HOME`.
## Commands ## Commands
@@ -85,16 +101,14 @@ coin market klines --doc
```bash ```bash
# Account (aliases: a, acc) # Account (aliases: a, acc)
coinhunter account overview coinhunter account
coinhunter account overview --agent coinhunter account --agent
coin a ov coin a
coin acc bal
coin a pos
# Market (aliases: m) # Market (aliases: m)
coinhunter market tickers BTCUSDT ETH/USDT sol-usdt coinhunter market tickers BTCUSDT ETH/USDT sol-usdt --window 1h
coinhunter market klines BTCUSDT ETHUSDT --interval 1h --limit 50 coinhunter market klines BTCUSDT ETHUSDT --interval 1h --limit 50
coin m tk BTCUSDT ETHUSDT coin m tk BTCUSDT ETHUSDT -w 1d
coin m k BTCUSDT -i 1h -l 50 coin m k BTCUSDT -i 1h -l 50
# Trade (buy / sell are now top-level commands) # Trade (buy / sell are now top-level commands)
@@ -103,17 +117,34 @@ coinhunter sell BTCUSDT --qty 0.01 --type limit --price 90000
coin b BTCUSDT -Q 100 -d coin b BTCUSDT -Q 100 -d
coin s BTCUSDT -q 0.01 -t limit -p 90000 coin s BTCUSDT -q 0.01 -t limit -p 90000
# Opportunities (aliases: opp, o) # Portfolio (aliases: pf, p)
coinhunter opportunity portfolio coinhunter portfolio
coinhunter opportunity scan coinhunter portfolio --agent
coinhunter opportunity scan --symbols BTCUSDT ETHUSDT SOLUSDT coin pf
coin opp pf
coin o scan -s BTCUSDT ETHUSDT # Opportunity scanning (aliases: o)
coinhunter opportunity
coinhunter opportunity --symbols BTCUSDT ETHUSDT SOLUSDT
coin o -s BTCUSDT ETHUSDT
# Audit log # Audit log
coinhunter catlog coinhunter catlog
coinhunter catlog -n 20 coinhunter catlog -n 20
coinhunter catlog -n 10 -o 10 coinhunter catlog -n 10 -o 10
coinhunter catlog --dry-run
# Configuration management (aliases: cfg, c)
coinhunter config get # show all config
coinhunter config get binance.recv_window
coinhunter config set opportunity.top_n 20
coinhunter config set signal.lookback_interval 4h
coinhunter config set portfolio.max_position_weight 0.25
coinhunter config set trading.dry_run_default true
coinhunter config set market.universe_allowlist BTCUSDT,ETHUSDT
coinhunter config key YOUR_API_KEY # or omit value to prompt interactively
coinhunter config secret YOUR_SECRET # or omit value to prompt interactively
coin c get opportunity.top_n
coin c set trading.dry_run_default false
# Self-upgrade # Self-upgrade
coinhunter upgrade coinhunter upgrade
@@ -134,7 +165,7 @@ CoinHunter V2 uses a flat, direct architecture:
|-------|----------------|-----------| |-------|----------------|-----------|
| **CLI** | Single entrypoint, argument parsing | `cli.py` | | **CLI** | Single entrypoint, argument parsing | `cli.py` |
| **Binance** | Thin API wrappers with unified error handling | `binance/spot_client.py` | | **Binance** | Thin API wrappers with unified error handling | `binance/spot_client.py` |
| **Services** | Domain logic | `services/account_service.py`, `services/market_service.py`, `services/trade_service.py`, `services/opportunity_service.py` | | **Services** | Domain logic | `services/account_service.py`, `services/market_service.py`, `services/signal_service.py`, `services/opportunity_service.py`, `services/portfolio_service.py`, `services/trade_service.py` |
| **Config** | TOML config, `.env` secrets, path resolution | `config.py` | | **Config** | TOML config, `.env` secrets, path resolution | `config.py` |
| **Runtime** | Paths, TUI/JSON/compact output | `runtime.py` | | **Runtime** | Paths, TUI/JSON/compact output | `runtime.py` |
| **Audit** | Structured JSONL logging | `audit.py` | | **Audit** | Structured JSONL logging | `audit.py` |

View File

@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
[project] [project]
name = "coinhunter" name = "coinhunter"
version = "2.1.1" version = "3.0.0"
description = "Binance-first trading CLI for balances, market data, opportunity scanning, and execution." description = "Binance-first trading CLI for balances, market data, opportunity scanning, and execution."
readme = "README.md" readme = "README.md"
license = {text = "MIT"} license = {text = "MIT"}
@@ -13,6 +13,7 @@ dependencies = [
"binance-connector>=3.9.0", "binance-connector>=3.9.0",
"shtab>=1.7.0", "shtab>=1.7.0",
"tomli>=2.0.1; python_version < '3.11'", "tomli>=2.0.1; python_version < '3.11'",
"tomli-w>=1.0.0",
] ]
[project.optional-dependencies] [project.optional-dependencies]

View File

@@ -22,30 +22,38 @@ def _resolve_audit_dir(paths: RuntimePaths) -> Path:
return _audit_dir_cache[key] return _audit_dir_cache[key]
def _audit_path(paths: RuntimePaths | None = None) -> Path: def _audit_path(paths: RuntimePaths | None = None, *, dry_run: bool = False) -> Path:
paths = ensure_runtime_dirs(paths or get_runtime_paths()) paths = ensure_runtime_dirs(paths or get_runtime_paths())
logs_dir = _resolve_audit_dir(paths) logs_dir = _resolve_audit_dir(paths)
logs_dir.mkdir(parents=True, exist_ok=True) subdir = logs_dir / ("dryrun" if dry_run else "live")
return logs_dir / f"audit_{datetime.now(timezone.utc).strftime('%Y%m%d')}.jsonl" subdir.mkdir(parents=True, exist_ok=True)
return subdir / f"audit_{datetime.now(timezone.utc).strftime('%Y%m%d')}.jsonl"
def audit_event(event: str, payload: dict[str, Any], paths: RuntimePaths | None = None) -> dict[str, Any]: def audit_event(
event: str, payload: dict[str, Any], paths: RuntimePaths | None = None, *, dry_run: bool = False
) -> dict[str, Any]:
entry = { entry = {
"timestamp": datetime.now(timezone.utc).isoformat(), "timestamp": datetime.now(timezone.utc).isoformat(),
"event": event, "event": event,
**payload, **payload,
} }
with _audit_path(paths).open("a", encoding="utf-8") as handle: with _audit_path(paths, dry_run=dry_run).open("a", encoding="utf-8") as handle:
handle.write(json.dumps(entry, ensure_ascii=False, default=json_default) + "\n") handle.write(json.dumps(entry, ensure_ascii=False, default=json_default) + "\n")
return entry return entry
def read_audit_log(paths: RuntimePaths | None = None, limit: int = 10, offset: int = 0) -> list[dict[str, Any]]: def read_audit_log(
paths: RuntimePaths | None = None, limit: int = 10, offset: int = 0, *, dry_run: bool = False
) -> list[dict[str, Any]]:
paths = ensure_runtime_dirs(paths or get_runtime_paths()) paths = ensure_runtime_dirs(paths or get_runtime_paths())
logs_dir = _resolve_audit_dir(paths) logs_dir = _resolve_audit_dir(paths)
if not logs_dir.exists(): if not logs_dir.exists():
return [] return []
audit_files = sorted(logs_dir.glob("audit_*.jsonl"), reverse=True) subdir = logs_dir / ("dryrun" if dry_run else "live")
if not subdir.exists():
return []
audit_files = sorted(subdir.glob("audit_*.jsonl"), reverse=True)
needed = offset + limit needed = offset + limit
chunks: list[list[dict[str, Any]]] = [] chunks: list[list[dict[str, Any]]] = []
total = 0 total = 0

View File

@@ -5,7 +5,7 @@ from __future__ import annotations
from collections.abc import Callable from collections.abc import Callable
from typing import Any from typing import Any
from requests.exceptions import ( # type: ignore[import-untyped] from requests.exceptions import (
RequestException, RequestException,
SSLError, SSLError,
) )
@@ -52,13 +52,14 @@ class SpotBinanceClient:
kwargs["symbol"] = symbol kwargs["symbol"] = symbol
return self._call("exchange info", self._client.exchange_info, **kwargs) # type: ignore[no-any-return] return self._call("exchange info", self._client.exchange_info, **kwargs) # type: ignore[no-any-return]
def ticker_24h(self, symbols: list[str] | None = None) -> list[dict[str, Any]]: def ticker_stats(self, symbols: list[str] | None = None, *, window: str = "1d") -> list[dict[str, Any]]:
if not symbols: kwargs: dict[str, Any] = {"windowSize": window}
response = self._call("24h ticker", self._client.ticker_24hr) if symbols:
elif len(symbols) == 1: if len(symbols) == 1:
response = self._call("24h ticker", self._client.ticker_24hr, symbol=symbols[0]) kwargs["symbol"] = symbols[0]
else: else:
response = self._call("24h ticker", self._client.ticker_24hr, symbols=symbols) kwargs["symbols"] = symbols
response = self._call("ticker stats", self._client.ticker, **kwargs)
return response if isinstance(response, list) else [response] return response if isinstance(response, list) else [response]
def ticker_price(self, symbols: list[str] | None = None) -> list[dict[str, Any]]: def ticker_price(self, symbols: list[str] | None = None) -> list[dict[str, Any]]:

File diff suppressed because it is too large Load Diff

View File

@@ -13,6 +13,11 @@ try:
except ModuleNotFoundError: # pragma: no cover except ModuleNotFoundError: # pragma: no cover
import tomli as tomllib import tomli as tomllib
try:
import tomli_w
except ModuleNotFoundError: # pragma: no cover
tomli_w = None # type: ignore[assignment]
DEFAULT_CONFIG = """[runtime] DEFAULT_CONFIG = """[runtime]
timezone = "Asia/Shanghai" timezone = "Asia/Shanghai"
@@ -33,20 +38,29 @@ spot_enabled = true
dry_run_default = false dry_run_default = false
dust_usdt_threshold = 10.0 dust_usdt_threshold = 10.0
[opportunity] [signal]
min_quote_volume = 1000000.0 lookback_interval = "1h"
top_n = 10
scan_limit = 50
ignore_dust = true
lookback_intervals = ["1h", "4h", "1d"]
[opportunity.weights]
trend = 1.0 trend = 1.0
momentum = 1.0 momentum = 1.0
breakout = 0.8 breakout = 0.8
volume = 0.7 volume = 0.7
volatility_penalty = 0.5 volatility_penalty = 0.5
position_concentration_penalty = 0.6
[opportunity]
min_quote_volume = 1000000.0
top_n = 10
scan_limit = 50
ignore_dust = true
entry_threshold = 1.5
watch_threshold = 0.6
overlap_penalty = 0.6
[portfolio]
add_threshold = 1.5
hold_threshold = 0.6
trim_threshold = 0.2
exit_threshold = -0.2
max_position_weight = 0.6
""" """
DEFAULT_ENV = "BINANCE_API_KEY=\nBINANCE_API_SECRET=\n" DEFAULT_ENV = "BINANCE_API_KEY=\nBINANCE_API_SECRET=\n"
@@ -128,3 +142,72 @@ def resolve_log_dir(config: dict[str, Any], paths: RuntimePaths | None = None) -
raw = config.get("runtime", {}).get("log_dir", "logs") raw = config.get("runtime", {}).get("log_dir", "logs")
value = Path(raw).expanduser() value = Path(raw).expanduser()
return value if value.is_absolute() else paths.root / value return value if value.is_absolute() else paths.root / value
def get_config_value(config: dict[str, Any], key_path: str) -> Any:
keys = key_path.split(".")
node = config
for key in keys:
if not isinstance(node, dict) or key not in node:
return None
node = node[key]
return node
def set_config_value(config_file: Path, key_path: str, value: Any) -> None:
if tomli_w is None:
raise RuntimeError("tomli-w is not installed. Run `pip install tomli-w`.")
if not config_file.exists():
raise RuntimeError(f"Config file not found: {config_file}")
config = tomllib.loads(config_file.read_text(encoding="utf-8"))
keys = key_path.split(".")
node = config
for key in keys[:-1]:
if key not in node:
node[key] = {}
node = node[key]
# Coerce type from existing value when possible
existing = node.get(keys[-1])
if isinstance(existing, bool) and isinstance(value, str):
value = value.lower() in ("true", "1", "yes", "on")
elif isinstance(existing, (int, float)) and isinstance(value, str):
try:
value = type(existing)(value)
except (ValueError, TypeError) as exc:
raise RuntimeError(
f"Cannot set {key_path} to {value!r}: expected {type(existing).__name__}, got {value!r}"
) from exc
elif isinstance(existing, list) and isinstance(value, str):
value = [item.strip() for item in value.split(",") if item.strip()]
node[keys[-1]] = value
config_file.write_text(tomli_w.dumps(config), encoding="utf-8")
def get_env_value(paths: RuntimePaths | None = None, key: str = "") -> str:
paths = paths or get_runtime_paths()
if not paths.env_file.exists():
return ""
env_data = load_env_file(paths)
return env_data.get(key, "")
def set_env_value(paths: RuntimePaths | None = None, key: str = "", value: str = "") -> None:
paths = paths or get_runtime_paths()
if not paths.env_file.exists():
raise RuntimeError(f"Env file not found: {paths.env_file}. Run `coin init` first.")
lines = paths.env_file.read_text(encoding="utf-8").splitlines()
found = False
for i, line in enumerate(lines):
stripped = line.strip()
if stripped.startswith(f"{key}=") or stripped.startswith(f"{key} ="):
lines[i] = f"{key}={value}"
found = True
break
if not found:
lines.append(f"{key}={value}")
paths.env_file.write_text("\n".join(lines) + "\n", encoding="utf-8")
os.environ[key] = value

View File

@@ -213,39 +213,27 @@ def _render_tui(payload: Any) -> None:
print(str(payload)) print(str(payload))
return return
if "overview" in payload:
overview = payload.get("overview", {})
print(f"\n{_BOLD}{_CYAN} ACCOUNT OVERVIEW {_RESET}")
print(f" Total Equity: {_GREEN}{_fmt_number(overview.get('total_equity_usdt', 0))} USDT{_RESET}")
print(f" Spot Assets: {_fmt_number(overview.get('spot_asset_count', 0))}")
print(f" Positions: {_fmt_number(overview.get('spot_position_count', 0))}")
if payload.get("balances"):
print()
_render_tui({"balances": payload["balances"]})
if payload.get("positions"):
print()
_render_tui({"positions": payload["positions"]})
return
if "balances" in payload: if "balances" in payload:
rows = payload["balances"] rows = payload["balances"]
table_rows: list[list[str]] = [] table_rows: list[list[str]] = []
for r in rows: for r in rows:
is_dust = r.get("is_dust", False)
dust_label = f"{_DIM}dust{_RESET}" if is_dust else ""
table_rows.append( table_rows.append(
[ [
r.get("market_type", ""),
r.get("asset", ""), r.get("asset", ""),
_fmt_number(r.get("free", 0)), _fmt_number(r.get("free", 0)),
_fmt_number(r.get("locked", 0)), _fmt_number(r.get("locked", 0)),
_fmt_number(r.get("total", 0)), _fmt_number(r.get("total", 0)),
_fmt_number(r.get("notional_usdt", 0)), _fmt_number(r.get("notional_usdt", 0)),
dust_label,
] ]
) )
_print_box_table( _print_box_table(
"BALANCES", "BALANCES",
["Market", "Asset", "Free", "Locked", "Total", "Notional (USDT)"], ["Asset", "Free", "Locked", "Total", "Notional (USDT)", ""],
table_rows, table_rows,
aligns=["left", "left", "right", "right", "right", "right"], aligns=["left", "right", "right", "right", "right", "left"],
) )
return return
@@ -290,7 +278,7 @@ def _render_tui(payload: Any) -> None:
] ]
) )
_print_box_table( _print_box_table(
"24H TICKERS", f"TICKERS window={payload.get('window', '1d')}",
["Symbol", "Last Price", "Change %", "Quote Volume"], ["Symbol", "Last Price", "Change %", "Quote Volume"],
table_rows, table_rows,
aligns=["left", "right", "right", "right"], aligns=["left", "right", "right", "right"],
@@ -346,7 +334,13 @@ def _render_tui(payload: Any) -> None:
score = r.get("score", 0) score = r.get("score", 0)
action = r.get("action", "") action = r.get("action", "")
action_color = ( action_color = (
_GREEN if action == "add" else _YELLOW if action == "hold" else _RED if action == "exit" else _CYAN _GREEN
if action in {"add", "enter"}
else _YELLOW
if action in {"hold", "watch", "review"}
else _RED
if action in {"exit", "skip", "trim"}
else _CYAN
) )
print( print(
f" {i}. {_BOLD}{r.get('symbol', '')}{_RESET} action={_color(action, action_color)} score={score:.4f}" f" {i}. {_BOLD}{r.get('symbol', '')}{_RESET} action={_color(action, action_color)} score={score:.4f}"
@@ -438,10 +432,7 @@ def _render_tui(payload: Any) -> None:
def print_output(payload: Any, *, agent: bool = False) -> None: def print_output(payload: Any, *, agent: bool = False) -> None:
if agent: if agent:
if _is_large_dataset(payload): print_json(payload)
_print_compact(payload)
else:
print_json(payload)
else: else:
_render_tui(payload) _render_tui(payload)
@@ -521,6 +512,13 @@ def install_shell_completion(parser: argparse.ArgumentParser) -> dict[str, Any]:
return {"shell": None, "installed": False, "reason": "unable to detect shell from $SHELL"} return {"shell": None, "installed": False, "reason": "unable to detect shell from $SHELL"}
script = shtab.complete(parser, shell=shell, preamble="") script = shtab.complete(parser, shell=shell, preamble="")
# Also register completion for the "coinhunter" alias
prog = parser.prog.replace("-", "_")
func = f"_shtab_{prog}"
if shell == "bash":
script += f"\ncomplete -o filenames -F {func} coinhunter\n"
elif shell == "zsh":
script += f"\ncompdef {func} coinhunter\n"
installed_path: Path | None = None installed_path: Path | None = None
hint: str | None = None hint: str | None = None

View File

@@ -13,6 +13,7 @@ class AssetBalance:
locked: float locked: float
total: float total: float
notional_usdt: float notional_usdt: float
is_dust: bool
@dataclass @dataclass
@@ -59,6 +60,7 @@ def get_balances(
spot_client: Any, spot_client: Any,
) -> dict[str, Any]: ) -> dict[str, Any]:
quote = str(config.get("market", {}).get("default_quote", "USDT")).upper() quote = str(config.get("market", {}).get("default_quote", "USDT")).upper()
dust = float(config.get("trading", {}).get("dust_usdt_threshold", 0.0))
rows: list[dict[str, Any]] = [] rows: list[dict[str, Any]] = []
balances, _, price_map = _spot_account_data(spot_client, quote) balances, _, price_map = _spot_account_data(spot_client, quote)
for item in balances: for item in balances:
@@ -68,6 +70,7 @@ def get_balances(
if total <= 0: if total <= 0:
continue continue
asset = item["asset"] asset = item["asset"]
notional = total * price_map.get(asset, 0.0)
rows.append( rows.append(
asdict( asdict(
AssetBalance( AssetBalance(
@@ -75,7 +78,8 @@ def get_balances(
free=free, free=free,
locked=locked, locked=locked,
total=total, total=total,
notional_usdt=total * price_map.get(asset, 0.0), notional_usdt=notional,
is_dust=notional < dust,
) )
) )
) )
@@ -113,60 +117,3 @@ def get_positions(
) )
) )
return {"positions": rows} return {"positions": rows}
def get_overview(
config: dict[str, Any],
*,
spot_client: Any,
) -> dict[str, Any]:
quote = str(config.get("market", {}).get("default_quote", "USDT")).upper()
dust = float(config.get("trading", {}).get("dust_usdt_threshold", 0.0))
balances: list[dict[str, Any]] = []
positions: list[dict[str, Any]] = []
spot_balances, _, price_map = _spot_account_data(spot_client, quote)
for item in spot_balances:
free = float(item.get("free", 0.0))
locked = float(item.get("locked", 0.0))
total = free + locked
if total <= 0:
continue
asset = item["asset"]
balances.append(
asdict(
AssetBalance(
asset=asset,
free=free,
locked=locked,
total=total,
notional_usdt=total * price_map.get(asset, 0.0),
)
)
)
mark_price = price_map.get(asset, 1.0 if asset == quote else 0.0)
notional = total * mark_price
if notional >= dust:
positions.append(
asdict(
PositionView(
symbol=quote if asset == quote else f"{asset}{quote}",
quantity=total,
entry_price=None,
mark_price=mark_price,
notional_usdt=notional,
side="LONG",
)
)
)
spot_equity = sum(item["notional_usdt"] for item in balances)
overview = asdict(
AccountOverview(
total_equity_usdt=spot_equity,
spot_equity_usdt=spot_equity,
spot_asset_count=len(balances),
spot_position_count=len(positions),
)
)
return {"overview": overview, "balances": balances, "positions": positions}

View File

@@ -48,10 +48,10 @@ class KlineView:
quote_volume: float quote_volume: float
def get_tickers(config: dict[str, Any], symbols: list[str], *, spot_client: Any) -> dict[str, Any]: def get_tickers(config: dict[str, Any], symbols: list[str], *, spot_client: Any, window: str = "1d") -> dict[str, Any]:
normalized = normalize_symbols(symbols) normalized = normalize_symbols(symbols)
rows = [] rows = []
for ticker in spot_client.ticker_24h(normalized): for ticker in spot_client.ticker_stats(normalized, window=window):
rows.append( rows.append(
asdict( asdict(
TickerView( TickerView(
@@ -64,7 +64,7 @@ def get_tickers(config: dict[str, Any], symbols: list[str], *, spot_client: Any)
) )
) )
) )
return {"tickers": rows} return {"tickers": rows, "window": window}
def get_klines( def get_klines(
@@ -103,6 +103,7 @@ def get_scan_universe(
*, *,
spot_client: Any, spot_client: Any,
symbols: list[str] | None = None, symbols: list[str] | None = None,
window: str = "1d",
) -> list[dict[str, Any]]: ) -> list[dict[str, Any]]:
market_config = config.get("market", {}) market_config = config.get("market", {})
opportunity_config = config.get("opportunity", {}) opportunity_config = config.get("opportunity", {})
@@ -116,7 +117,7 @@ def get_scan_universe(
status_map = {normalize_symbol(item["symbol"]): item.get("status", "") for item in exchange_info.get("symbols", [])} status_map = {normalize_symbol(item["symbol"]): item.get("status", "") for item in exchange_info.get("symbols", [])}
rows: list[dict[str, Any]] = [] rows: list[dict[str, Any]] = []
for ticker in spot_client.ticker_24h(list(requested) if requested else None): for ticker in spot_client.ticker_stats(list(requested) if requested else None, window=window):
symbol = normalize_symbol(ticker["symbol"]) symbol = normalize_symbol(ticker["symbol"])
if not symbol.endswith(quote): if not symbol.endswith(quote):
continue continue

View File

@@ -1,14 +1,14 @@
"""Opportunity analysis services.""" """Opportunity scanning services."""
from __future__ import annotations from __future__ import annotations
from dataclasses import asdict, dataclass from dataclasses import asdict, dataclass
from statistics import mean
from typing import Any from typing import Any
from ..audit import audit_event from ..audit import audit_event
from .account_service import get_positions from .account_service import get_positions
from .market_service import base_asset, get_scan_universe, normalize_symbol from .market_service import base_asset, get_scan_universe, normalize_symbol
from .signal_service import get_signal_interval, get_signal_weights, score_market_signal
@dataclass @dataclass
@@ -20,132 +20,25 @@ class OpportunityRecommendation:
metrics: dict[str, float] metrics: dict[str, float]
def _safe_pct(new: float, old: float) -> float: def _opportunity_thresholds(config: dict[str, Any]) -> dict[str, float]:
if old == 0: opportunity_config = config.get("opportunity", {})
return 0.0 return {
return (new - old) / old "entry_threshold": float(opportunity_config.get("entry_threshold", 1.5)),
"watch_threshold": float(opportunity_config.get("watch_threshold", 0.6)),
"overlap_penalty": float(opportunity_config.get("overlap_penalty", 0.6)),
def _score_candidate(
closes: list[float], volumes: list[float], ticker: dict[str, Any], weights: dict[str, float], concentration: float
) -> tuple[float, dict[str, float]]:
if len(closes) < 2 or not volumes:
return 0.0, {
"trend": 0.0,
"momentum": 0.0,
"breakout": 0.0,
"volume_confirmation": 1.0,
"volatility": 0.0,
"concentration": round(concentration, 4),
}
current = closes[-1]
sma_short = mean(closes[-5:]) if len(closes) >= 5 else current
sma_long = mean(closes[-20:]) if len(closes) >= 20 else mean(closes)
trend = 1.0 if current >= sma_short >= sma_long else -1.0 if current < sma_short < sma_long else 0.0
momentum = (
_safe_pct(closes[-1], closes[-2]) * 0.5
+ (_safe_pct(closes[-1], closes[-5]) * 0.3 if len(closes) >= 5 else 0.0)
+ float(ticker.get("price_change_pct", 0.0)) / 100.0 * 0.2
)
recent_high = max(closes[-20:]) if len(closes) >= 20 else max(closes)
breakout = 1.0 - max((recent_high - current) / recent_high, 0.0)
avg_volume = mean(volumes[:-1]) if len(volumes) > 1 else volumes[-1]
volume_confirmation = volumes[-1] / avg_volume if avg_volume else 1.0
volume_score = min(max(volume_confirmation - 1.0, -1.0), 2.0)
volatility = (max(closes[-10:]) - min(closes[-10:])) / current if len(closes) >= 10 and current else 0.0
score = (
weights.get("trend", 1.0) * trend
+ weights.get("momentum", 1.0) * momentum
+ weights.get("breakout", 0.8) * breakout
+ weights.get("volume", 0.7) * volume_score
- weights.get("volatility_penalty", 0.5) * volatility
- weights.get("position_concentration_penalty", 0.6) * concentration
)
metrics = {
"trend": round(trend, 4),
"momentum": round(momentum, 4),
"breakout": round(breakout, 4),
"volume_confirmation": round(volume_confirmation, 4),
"volatility": round(volatility, 4),
"concentration": round(concentration, 4),
} }
return score, metrics
def _action_for(score: float, concentration: float) -> tuple[str, list[str]]: def _action_for_opportunity(score: float, thresholds: dict[str, float]) -> tuple[str, list[str]]:
reasons: list[str] = [] reasons: list[str] = []
if concentration >= 0.5 and score < 0.4: if score >= thresholds["entry_threshold"]:
reasons.append("position concentration is high") reasons.append("trend, momentum, and breakout are aligned for a fresh entry")
return "trim", reasons return "enter", reasons
if score >= 1.5: if score >= thresholds["watch_threshold"]:
reasons.append("trend, momentum, and breakout are aligned") reasons.append("market structure is constructive but still needs confirmation")
return "add", reasons return "watch", reasons
if score >= 0.6: reasons.append("edge is too weak for a new entry")
reasons.append("trend remains constructive") return "skip", reasons
return "hold", reasons
if score <= -0.2:
reasons.append("momentum and structure have weakened")
return "exit", reasons
reasons.append("signal is mixed and needs confirmation")
return "observe", reasons
def analyze_portfolio(config: dict[str, Any], *, spot_client: Any) -> dict[str, Any]:
quote = str(config.get("market", {}).get("default_quote", "USDT")).upper()
weights = config.get("opportunity", {}).get("weights", {})
positions = get_positions(config, spot_client=spot_client)["positions"]
positions = [item for item in positions if item["symbol"] != quote]
total_notional = sum(item["notional_usdt"] for item in positions) or 1.0
recommendations = []
for position in positions:
symbol = normalize_symbol(position["symbol"])
klines = spot_client.klines(symbol=symbol, interval="1h", limit=24)
closes = [float(item[4]) for item in klines]
volumes = [float(item[5]) for item in klines]
tickers = spot_client.ticker_24h([symbol])
ticker = tickers[0] if tickers else {"priceChangePercent": "0"}
concentration = position["notional_usdt"] / total_notional
score, metrics = _score_candidate(
closes,
volumes,
{
"price_change_pct": float(ticker.get("priceChangePercent") or 0.0),
},
weights,
concentration,
)
action, reasons = _action_for(score, concentration)
recommendations.append(
asdict(
OpportunityRecommendation(
symbol=symbol,
action=action,
score=round(score, 4),
reasons=reasons,
metrics=metrics,
)
)
)
payload = {"recommendations": sorted(recommendations, key=lambda item: item["score"], reverse=True)}
audit_event(
"opportunity_portfolio_generated",
{
"market_type": "spot",
"symbol": None,
"side": None,
"qty": None,
"quote_amount": None,
"order_type": None,
"dry_run": True,
"request_payload": {"mode": "portfolio"},
"response_payload": payload,
"status": "generated",
"error": None,
},
)
return payload
def scan_opportunities( def scan_opportunities(
@@ -155,7 +48,9 @@ def scan_opportunities(
symbols: list[str] | None = None, symbols: list[str] | None = None,
) -> dict[str, Any]: ) -> dict[str, Any]:
opportunity_config = config.get("opportunity", {}) opportunity_config = config.get("opportunity", {})
weights = opportunity_config.get("weights", {}) signal_weights = get_signal_weights(config)
interval = get_signal_interval(config)
thresholds = _opportunity_thresholds(config)
scan_limit = int(opportunity_config.get("scan_limit", 50)) scan_limit = int(opportunity_config.get("scan_limit", 50))
top_n = int(opportunity_config.get("top_n", 10)) top_n = int(opportunity_config.get("top_n", 10))
quote = str(config.get("market", {}).get("default_quote", "USDT")).upper() quote = str(config.get("market", {}).get("default_quote", "USDT")).upper()
@@ -167,14 +62,19 @@ def scan_opportunities(
recommendations = [] recommendations = []
for ticker in universe: for ticker in universe:
symbol = normalize_symbol(ticker["symbol"]) symbol = normalize_symbol(ticker["symbol"])
klines = spot_client.klines(symbol=symbol, interval="1h", limit=24) klines = spot_client.klines(symbol=symbol, interval=interval, limit=24)
closes = [float(item[4]) for item in klines] closes = [float(item[4]) for item in klines]
volumes = [float(item[5]) for item in klines] volumes = [float(item[5]) for item in klines]
concentration = concentration_map.get(symbol, 0.0) / total_held concentration = concentration_map.get(symbol, 0.0) / total_held
score, metrics = _score_candidate(closes, volumes, ticker, weights, concentration) signal_score, metrics = score_market_signal(closes, volumes, ticker, signal_weights)
action, reasons = _action_for(score, concentration) score = signal_score - thresholds["overlap_penalty"] * concentration
action, reasons = _action_for_opportunity(score, thresholds)
metrics["signal_score"] = round(signal_score, 4)
metrics["position_weight"] = round(concentration, 4)
if symbol.endswith(quote): if symbol.endswith(quote):
reasons.append(f"base asset {base_asset(symbol, quote)} passed liquidity and tradability filters") reasons.append(f"base asset {base_asset(symbol, quote)} passed liquidity and tradability filters")
if concentration > 0:
reasons.append("symbol is already held, so the opportunity score is discounted for overlap")
recommendations.append( recommendations.append(
asdict( asdict(
OpportunityRecommendation( OpportunityRecommendation(

View File

@@ -0,0 +1,109 @@
"""Portfolio analysis and position management signals."""
from __future__ import annotations
from dataclasses import asdict, dataclass
from typing import Any
from ..audit import audit_event
from .account_service import get_positions
from .market_service import normalize_symbol
from .signal_service import get_signal_interval, get_signal_weights, score_market_signal
@dataclass
class PortfolioRecommendation:
symbol: str
action: str
score: float
reasons: list[str]
metrics: dict[str, float]
def _portfolio_thresholds(config: dict[str, Any]) -> dict[str, float]:
portfolio_config = config.get("portfolio", {})
return {
"add_threshold": float(portfolio_config.get("add_threshold", 1.5)),
"hold_threshold": float(portfolio_config.get("hold_threshold", 0.6)),
"trim_threshold": float(portfolio_config.get("trim_threshold", 0.2)),
"exit_threshold": float(portfolio_config.get("exit_threshold", -0.2)),
"max_position_weight": float(portfolio_config.get("max_position_weight", 0.6)),
}
def _action_for_position(score: float, concentration: float, thresholds: dict[str, float]) -> tuple[str, list[str]]:
reasons: list[str] = []
max_weight = thresholds["max_position_weight"]
if concentration >= max_weight and score < thresholds["hold_threshold"]:
reasons.append("position weight is above the portfolio risk budget")
return "trim", reasons
if score >= thresholds["add_threshold"] and concentration < max_weight:
reasons.append("market signal is strong and position still has room")
return "add", reasons
if score >= thresholds["hold_threshold"]:
reasons.append("market structure remains supportive for holding")
return "hold", reasons
if score <= thresholds["exit_threshold"]:
reasons.append("market signal has weakened enough to justify an exit review")
return "exit", reasons
if score <= thresholds["trim_threshold"]:
reasons.append("edge has faded and the position should be reduced")
return "trim", reasons
reasons.append("signal is mixed and the position needs review")
return "review", reasons
def analyze_portfolio(config: dict[str, Any], *, spot_client: Any) -> dict[str, Any]:
quote = str(config.get("market", {}).get("default_quote", "USDT")).upper()
signal_weights = get_signal_weights(config)
interval = get_signal_interval(config)
thresholds = _portfolio_thresholds(config)
positions = get_positions(config, spot_client=spot_client)["positions"]
positions = [item for item in positions if item["symbol"] != quote]
total_notional = sum(item["notional_usdt"] for item in positions) or 1.0
recommendations = []
for position in positions:
symbol = normalize_symbol(position["symbol"])
klines = spot_client.klines(symbol=symbol, interval=interval, limit=24)
closes = [float(item[4]) for item in klines]
volumes = [float(item[5]) for item in klines]
tickers = spot_client.ticker_stats([symbol], window="1d")
ticker = tickers[0] if tickers else {"priceChangePercent": "0"}
concentration = position["notional_usdt"] / total_notional
score, metrics = score_market_signal(
closes,
volumes,
{"price_change_pct": float(ticker.get("priceChangePercent") or 0.0)},
signal_weights,
)
action, reasons = _action_for_position(score, concentration, thresholds)
metrics["position_weight"] = round(concentration, 4)
recommendations.append(
asdict(
PortfolioRecommendation(
symbol=symbol,
action=action,
score=round(score, 4),
reasons=reasons,
metrics=metrics,
)
)
)
payload = {"recommendations": sorted(recommendations, key=lambda item: item["score"], reverse=True)}
audit_event(
"opportunity_portfolio_generated",
{
"market_type": "spot",
"symbol": None,
"side": None,
"qty": None,
"quote_amount": None,
"order_type": None,
"dry_run": True,
"request_payload": {"mode": "portfolio"},
"response_payload": payload,
"status": "generated",
"error": None,
},
)
return payload

View File

@@ -0,0 +1,78 @@
"""Shared market signal scoring."""
from __future__ import annotations
from statistics import mean
from typing import Any
def _safe_pct(new: float, old: float) -> float:
if old == 0:
return 0.0
return (new - old) / old
def get_signal_weights(config: dict[str, Any]) -> dict[str, float]:
signal_config = config.get("signal", {})
return {
"trend": float(signal_config.get("trend", 1.0)),
"momentum": float(signal_config.get("momentum", 1.0)),
"breakout": float(signal_config.get("breakout", 0.8)),
"volume": float(signal_config.get("volume", 0.7)),
"volatility_penalty": float(signal_config.get("volatility_penalty", 0.5)),
}
def get_signal_interval(config: dict[str, Any]) -> str:
signal_config = config.get("signal", {})
if signal_config.get("lookback_interval"):
return str(signal_config["lookback_interval"])
return "1h"
def score_market_signal(
closes: list[float],
volumes: list[float],
ticker: dict[str, Any],
weights: dict[str, float],
) -> tuple[float, dict[str, float]]:
if len(closes) < 2 or not volumes:
return 0.0, {
"trend": 0.0,
"momentum": 0.0,
"breakout": 0.0,
"volume_confirmation": 1.0,
"volatility": 0.0,
}
current = closes[-1]
sma_short = mean(closes[-5:]) if len(closes) >= 5 else current
sma_long = mean(closes[-20:]) if len(closes) >= 20 else mean(closes)
trend = 1.0 if current >= sma_short >= sma_long else -1.0 if current < sma_short < sma_long else 0.0
momentum = (
_safe_pct(closes[-1], closes[-2]) * 0.5
+ (_safe_pct(closes[-1], closes[-5]) * 0.3 if len(closes) >= 5 else 0.0)
+ float(ticker.get("price_change_pct", 0.0)) / 100.0 * 0.2
)
recent_high = max(closes[-20:]) if len(closes) >= 20 else max(closes)
breakout = 1.0 - max((recent_high - current) / recent_high, 0.0)
avg_volume = mean(volumes[:-1]) if len(volumes) > 1 else volumes[-1]
volume_confirmation = volumes[-1] / avg_volume if avg_volume else 1.0
volume_score = min(max(volume_confirmation - 1.0, -1.0), 2.0)
volatility = (max(closes[-10:]) - min(closes[-10:])) / current if len(closes) >= 10 and current else 0.0
score = (
weights.get("trend", 1.0) * trend
+ weights.get("momentum", 1.0) * momentum
+ weights.get("breakout", 0.8) * breakout
+ weights.get("volume", 0.7) * volume_score
- weights.get("volatility_penalty", 0.5) * volatility
)
metrics = {
"trend": round(trend, 4),
"momentum": round(momentum, 4),
"breakout": round(breakout, 4),
"volume_confirmation": round(volume_confirmation, 4),
"volatility": round(volatility, 4),
}
return score, metrics

View File

@@ -112,7 +112,7 @@ def execute_spot_trade(
dry_run=is_dry_run, dry_run=is_dry_run,
) )
audit_event("trade_submitted", _trade_log_payload(intent, payload, status="submitted")) audit_event("trade_submitted", _trade_log_payload(intent, payload, status="submitted"), dry_run=intent.dry_run)
if is_dry_run: if is_dry_run:
response = {"dry_run": True, "status": "DRY_RUN", "request": payload} response = {"dry_run": True, "status": "DRY_RUN", "request": payload}
result = asdict( result = asdict(
@@ -128,14 +128,14 @@ def execute_spot_trade(
) )
) )
audit_event( audit_event(
"trade_filled", {**_trade_log_payload(intent, payload, status="DRY_RUN"), "response_payload": response} "trade_filled", {**_trade_log_payload(intent, payload, status="DRY_RUN"), "response_payload": response}, dry_run=intent.dry_run
) )
return {"trade": result} return {"trade": result}
try: try:
response = spot_client.new_order(**payload) response = spot_client.new_order(**payload)
except Exception as exc: except Exception as exc:
audit_event("trade_failed", _trade_log_payload(intent, payload, status="failed", error=str(exc))) audit_event("trade_failed", _trade_log_payload(intent, payload, status="failed", error=str(exc)), dry_run=intent.dry_run)
raise RuntimeError(f"Spot order failed: {exc}") from exc raise RuntimeError(f"Spot order failed: {exc}") from exc
result = asdict( result = asdict(
@@ -151,6 +151,7 @@ def execute_spot_trade(
) )
) )
audit_event( audit_event(
"trade_filled", {**_trade_log_payload(intent, payload, status=result["status"]), "response_payload": response} "trade_filled", {**_trade_log_payload(intent, payload, status=result["status"]), "response_payload": response},
dry_run=intent.dry_run,
) )
return {"trade": result} return {"trade": result}

View File

@@ -26,7 +26,7 @@ class FakeSpotClient:
return list(prices.values()) return list(prices.values())
return [prices[symbol] for symbol in symbols] return [prices[symbol] for symbol in symbols]
def ticker_24h(self, symbols=None): def ticker_stats(self, symbols=None, *, window="1d"):
rows = [ rows = [
{ {
"symbol": "BTCUSDT", "symbol": "BTCUSDT",
@@ -69,19 +69,19 @@ class FakeSpotClient:
class AccountMarketServicesTestCase(unittest.TestCase): class AccountMarketServicesTestCase(unittest.TestCase):
def test_account_overview_and_dust_filter(self): def test_get_balances_with_dust_flag(self):
config = { config = {
"market": {"default_quote": "USDT"}, "market": {"default_quote": "USDT"},
"trading": {"dust_usdt_threshold": 10.0}, "trading": {"dust_usdt_threshold": 10.0},
} }
payload = account_service.get_overview( payload = account_service.get_balances(
config, config,
spot_client=FakeSpotClient(), spot_client=FakeSpotClient(),
) )
self.assertEqual(payload["overview"]["total_equity_usdt"], 720.1) balances = {item["asset"]: item for item in payload["balances"]}
symbols = {item["symbol"] for item in payload["positions"]} self.assertFalse(balances["USDT"]["is_dust"])
self.assertNotIn("DOGEUSDT", symbols) self.assertFalse(balances["BTC"]["is_dust"])
self.assertIn("BTCUSDT", symbols) self.assertTrue(balances["DOGE"]["is_dust"])
def test_market_tickers_and_scan_universe(self): def test_market_tickers_and_scan_universe(self):
config = { config = {

View File

@@ -17,6 +17,7 @@ class CLITestCase(unittest.TestCase):
self.assertIn("account", help_text) self.assertIn("account", help_text)
self.assertIn("buy", help_text) self.assertIn("buy", help_text)
self.assertIn("sell", help_text) self.assertIn("sell", help_text)
self.assertIn("portfolio", help_text)
self.assertIn("opportunity", help_text) self.assertIn("opportunity", help_text)
self.assertIn("--doc", help_text) self.assertIn("--doc", help_text)
@@ -79,18 +80,45 @@ class CLITestCase(unittest.TestCase):
self.assertEqual(result, 0) self.assertEqual(result, 0)
self.assertEqual(captured["payload"]["trade"]["status"], "DRY_RUN") self.assertEqual(captured["payload"]["trade"]["status"], "DRY_RUN")
def test_doc_flag_prints_documentation(self): def test_doc_flag_prints_tui_documentation(self):
import io
from unittest.mock import patch
stdout = io.StringIO() stdout = io.StringIO()
with patch("sys.stdout", stdout): with patch("sys.stdout", stdout):
result = cli.main(["market", "tickers", "--doc"]) result = cli.main(["market", "tickers", "--doc"])
self.assertEqual(result, 0) self.assertEqual(result, 0)
output = stdout.getvalue() output = stdout.getvalue()
self.assertIn("lastPrice", output) self.assertIn("TUI Output", output)
self.assertIn("Last Price", output)
self.assertIn("BTCUSDT", output) self.assertIn("BTCUSDT", output)
def test_doc_flag_prints_json_documentation(self):
stdout = io.StringIO()
with patch("sys.stdout", stdout):
result = cli.main(["market", "tickers", "--doc", "--agent"])
self.assertEqual(result, 0)
output = stdout.getvalue()
self.assertIn("JSON Output", output)
self.assertIn("last_price", output)
self.assertIn("BTCUSDT", output)
def test_account_dispatches(self):
captured = {}
with (
patch.object(
cli, "load_config", return_value={"binance": {"spot_base_url": "https://test", "recv_window": 5000}, "market": {"default_quote": "USDT"}, "trading": {"dust_usdt_threshold": 10.0}}
),
patch.object(cli, "get_binance_credentials", return_value={"api_key": "k", "api_secret": "s"}),
patch.object(cli, "SpotBinanceClient"),
patch.object(
cli.account_service, "get_balances", return_value={"balances": [{"asset": "BTC", "is_dust": False}]}
),
patch.object(
cli, "print_output", side_effect=lambda payload, **kwargs: captured.setdefault("payload", payload)
),
):
result = cli.main(["account"])
self.assertEqual(result, 0)
self.assertEqual(captured["payload"]["balances"][0]["asset"], "BTC")
def test_upgrade_dispatches(self): def test_upgrade_dispatches(self):
captured = {} captured = {}
with ( with (
@@ -103,6 +131,46 @@ class CLITestCase(unittest.TestCase):
self.assertEqual(result, 0) self.assertEqual(result, 0)
self.assertEqual(captured["payload"]["returncode"], 0) self.assertEqual(captured["payload"]["returncode"], 0)
def test_portfolio_dispatches(self):
captured = {}
with (
patch.object(
cli, "load_config", return_value={"binance": {"spot_base_url": "https://test", "recv_window": 5000}, "market": {"default_quote": "USDT"}, "opportunity": {"top_n": 10}}
),
patch.object(cli, "get_binance_credentials", return_value={"api_key": "k", "api_secret": "s"}),
patch.object(cli, "SpotBinanceClient"),
patch.object(
cli.portfolio_service, "analyze_portfolio", return_value={"recommendations": [{"symbol": "BTCUSDT", "score": 0.75}]}
),
patch.object(
cli, "print_output", side_effect=lambda payload, **kwargs: captured.setdefault("payload", payload)
),
):
result = cli.main(["portfolio"])
self.assertEqual(result, 0)
self.assertEqual(captured["payload"]["recommendations"][0]["symbol"], "BTCUSDT")
def test_opportunity_dispatches(self):
captured = {}
with (
patch.object(
cli, "load_config", return_value={"binance": {"spot_base_url": "https://test", "recv_window": 5000}, "market": {"default_quote": "USDT"}, "opportunity": {"top_n": 10}}
),
patch.object(cli, "get_binance_credentials", return_value={"api_key": "k", "api_secret": "s"}),
patch.object(cli, "SpotBinanceClient"),
patch.object(
cli.opportunity_service,
"scan_opportunities",
return_value={"recommendations": [{"symbol": "BTCUSDT", "score": 0.82}]},
),
patch.object(
cli, "print_output", side_effect=lambda payload, **kwargs: captured.setdefault("payload", payload)
),
):
result = cli.main(["opportunity", "-s", "BTCUSDT", "ETHUSDT"])
self.assertEqual(result, 0)
self.assertEqual(captured["payload"]["recommendations"][0]["symbol"], "BTCUSDT")
def test_catlog_dispatches(self): def test_catlog_dispatches(self):
captured = {} captured = {}
with ( with (
@@ -119,3 +187,64 @@ class CLITestCase(unittest.TestCase):
self.assertEqual(captured["payload"]["offset"], 10) self.assertEqual(captured["payload"]["offset"], 10)
self.assertIn("entries", captured["payload"]) self.assertIn("entries", captured["payload"])
self.assertEqual(captured["payload"]["total"], 1) self.assertEqual(captured["payload"]["total"], 1)
def test_config_get_dispatches(self):
captured = {}
with (
patch.object(cli, "load_config", return_value={"binance": {"recv_window": 5000}}),
patch.object(
cli, "print_output", side_effect=lambda payload, **kwargs: captured.setdefault("payload", payload)
),
):
result = cli.main(["config", "get", "binance.recv_window"])
self.assertEqual(result, 0)
self.assertEqual(captured["payload"]["binance.recv_window"], 5000)
def test_config_set_dispatches(self):
import tempfile
with tempfile.NamedTemporaryFile(mode="w", suffix=".toml", delete=False) as f:
f.write('[binance]\nrecv_window = 5000\n')
tmp_path = f.name
with patch.object(cli, "get_runtime_paths") as mock_paths:
mock_paths.return_value.config_file = __import__("pathlib").Path(tmp_path)
result = cli.main(["config", "set", "binance.recv_window", "10000"])
self.assertEqual(result, 0)
# Verify the file was updated
content = __import__("pathlib").Path(tmp_path).read_text()
self.assertIn("recv_window = 10000", content)
__import__("os").unlink(tmp_path)
def test_config_key_dispatches(self):
import tempfile
with tempfile.NamedTemporaryFile(mode="w", suffix=".env", delete=False) as f:
f.write("BINANCE_API_KEY=\n")
tmp_path = f.name
with patch.object(cli, "get_runtime_paths") as mock_paths:
mock_paths.return_value.env_file = __import__("pathlib").Path(tmp_path)
result = cli.main(["config", "key", "test_key_value"])
self.assertEqual(result, 0)
content = __import__("pathlib").Path(tmp_path).read_text()
self.assertIn("BINANCE_API_KEY=test_key_value", content)
__import__("os").unlink(tmp_path)
def test_config_secret_dispatches(self):
import tempfile
with tempfile.NamedTemporaryFile(mode="w", suffix=".env", delete=False) as f:
f.write("BINANCE_API_SECRET=\n")
tmp_path = f.name
with patch.object(cli, "get_runtime_paths") as mock_paths:
mock_paths.return_value.env_file = __import__("pathlib").Path(tmp_path)
result = cli.main(["config", "secret", "test_secret_value"])
self.assertEqual(result, 0)
content = __import__("pathlib").Path(tmp_path).read_text()
self.assertIn("BINANCE_API_SECRET=test_secret_value", content)
__import__("os").unlink(tmp_path)

View File

@@ -1,11 +1,11 @@
"""Opportunity service tests.""" """Signal, opportunity, and portfolio service tests."""
from __future__ import annotations from __future__ import annotations
import unittest import unittest
from unittest.mock import patch from unittest.mock import patch
from coinhunter.services import opportunity_service from coinhunter.services import opportunity_service, portfolio_service, signal_service
class FakeSpotClient: class FakeSpotClient:
@@ -27,7 +27,7 @@ class FakeSpotClient:
} }
return [mapping[symbol] for symbol in symbols] return [mapping[symbol] for symbol in symbols]
def ticker_24h(self, symbols=None): def ticker_stats(self, symbols=None, *, window="1d"):
rows = { rows = {
"BTCUSDT": { "BTCUSDT": {
"symbol": "BTCUSDT", "symbol": "BTCUSDT",
@@ -105,28 +105,40 @@ class OpportunityServiceTestCase(unittest.TestCase):
self.config = { self.config = {
"market": {"default_quote": "USDT", "universe_allowlist": [], "universe_denylist": []}, "market": {"default_quote": "USDT", "universe_allowlist": [], "universe_denylist": []},
"trading": {"dust_usdt_threshold": 10.0}, "trading": {"dust_usdt_threshold": 10.0},
"signal": {
"lookback_interval": "1h",
"trend": 1.0,
"momentum": 1.0,
"breakout": 0.8,
"volume": 0.7,
"volatility_penalty": 0.5,
},
"opportunity": { "opportunity": {
"scan_limit": 10, "scan_limit": 10,
"top_n": 5, "top_n": 5,
"min_quote_volume": 1000.0, "min_quote_volume": 1000.0,
"weights": { "entry_threshold": 1.5,
"trend": 1.0, "watch_threshold": 0.6,
"momentum": 1.0, "overlap_penalty": 0.6,
"breakout": 0.8, },
"volume": 0.7, "portfolio": {
"volatility_penalty": 0.5, "add_threshold": 1.5,
"position_concentration_penalty": 0.6, "hold_threshold": 0.6,
}, "trim_threshold": 0.2,
"exit_threshold": -0.2,
"max_position_weight": 0.6,
}, },
} }
def test_portfolio_analysis_ignores_dust_and_emits_recommendations(self): def test_portfolio_analysis_ignores_dust_and_emits_recommendations(self):
events = [] events = []
with patch.object(opportunity_service, "audit_event", side_effect=lambda event, payload: events.append(event)): with patch.object(portfolio_service, "audit_event", side_effect=lambda event, payload, **kwargs: events.append(event)):
payload = opportunity_service.analyze_portfolio(self.config, spot_client=FakeSpotClient()) payload = portfolio_service.analyze_portfolio(self.config, spot_client=FakeSpotClient())
symbols = [item["symbol"] for item in payload["recommendations"]] symbols = [item["symbol"] for item in payload["recommendations"]]
self.assertNotIn("DOGEUSDT", symbols) self.assertNotIn("DOGEUSDT", symbols)
self.assertEqual(symbols, ["BTCUSDT", "ETHUSDT"]) self.assertEqual(symbols, ["BTCUSDT", "ETHUSDT"])
self.assertEqual(payload["recommendations"][0]["action"], "add")
self.assertEqual(payload["recommendations"][1]["action"], "hold")
self.assertEqual(events, ["opportunity_portfolio_generated"]) self.assertEqual(events, ["opportunity_portfolio_generated"])
def test_scan_is_deterministic(self): def test_scan_is_deterministic(self):
@@ -135,8 +147,9 @@ class OpportunityServiceTestCase(unittest.TestCase):
self.config | {"opportunity": self.config["opportunity"] | {"top_n": 2}}, spot_client=FakeSpotClient() self.config | {"opportunity": self.config["opportunity"] | {"top_n": 2}}, spot_client=FakeSpotClient()
) )
self.assertEqual([item["symbol"] for item in payload["recommendations"]], ["SOLUSDT", "BTCUSDT"]) self.assertEqual([item["symbol"] for item in payload["recommendations"]], ["SOLUSDT", "BTCUSDT"])
self.assertEqual([item["action"] for item in payload["recommendations"]], ["enter", "enter"])
def test_score_candidate_handles_empty_klines(self): def test_signal_score_handles_empty_klines(self):
score, metrics = opportunity_service._score_candidate([], [], {"price_change_pct": 1.0}, {}, 0.0) score, metrics = signal_service.score_market_signal([], [], {"price_change_pct": 1.0}, {})
self.assertEqual(score, 0.0) self.assertEqual(score, 0.0)
self.assertEqual(metrics["trend"], 0.0) self.assertEqual(metrics["trend"], 0.0)

View File

@@ -21,7 +21,7 @@ class TradeServiceTestCase(unittest.TestCase):
def test_spot_market_buy_dry_run_does_not_call_client(self): def test_spot_market_buy_dry_run_does_not_call_client(self):
events = [] events = []
with patch.object( with patch.object(
trade_service, "audit_event", side_effect=lambda event, payload: events.append((event, payload)) trade_service, "audit_event", side_effect=lambda event, payload, **kwargs: events.append((event, payload))
): ):
client = FakeSpotClient() client = FakeSpotClient()
payload = trade_service.execute_spot_trade( payload = trade_service.execute_spot_trade(