10 Commits
v3.0.0 ... main

Author SHA1 Message Date
76c4129c8d refactor: simplify CLI to data layer for AI-assisted trading
Transform CoinHunter from an over-engineered auto-trading system into a
lightweight data-layer CLI paired with the coinbuddy AI Skill.

Key changes:
- Remove non-core commands: backtest, strategy, opportunity dataset/evaluate/optimize
- Add scan: rule-based market screening (zero token cost)
- Add analyze: multi-timeframe technical analysis for AI consumption
- Add watch: lightweight portfolio anomaly monitoring (zero token cost)
- Remove services: backtest, dataset, evaluation, research, strategy
- Add analyze_service with RSI, key levels, alerts, and AI-friendly summaries
- Add watch_portfolio with drawdown/spike/concentration/technical triggers
- Simplify config: remove research/dataset settings, add watch thresholds
- Update TUI rendering for analyze and watch outputs
- Update tests and CLAUDE.md for new architecture

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-27 16:35:33 +08:00
Carlos Ouyang
e4b2239bcd feat: add strategy and backtest services
- strategy_service.py combines opportunity + portfolio signals into
  unified buy/sell/hold recommendations
- backtest_service.py runs walk-forward backtests on historical datasets
  with virtual cash and positions
- CLI adds `strategy` and `backtest` commands with `--decision-interval`
  and other tuning parameters
- Add tests for both services and CLI dispatch
- Update CLAUDE.md with new architecture docs
- Optimize model weights via opportunity optimizer

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-27 13:21:35 +08:00
Carlos Ouyang
10b314aa2b feat: add opportunity historical evaluation 2026-04-22 14:25:22 +08:00
003212de99 refactor: simplify opportunity actions to entry/watch/avoid with confidence
- Remove dead scoring code (_score_candidate, _action_for, etc.) and
  align action decisions directly with score_opportunity_signal metrics.
- Reduce action surface from trigger/setup/chase/skip to entry/watch/avoid.
- Add confidence field (0..100) mapped from edge_score.
- Update evaluate/optimize ground-truth mapping and tests.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-22 01:08:34 +08:00
d3408dabba docs: add repository contributor guide 2026-04-22 00:29:16 +08:00
076a5f1b1c feat: add opportunity evaluation optimizer 2026-04-22 00:29:02 +08:00
Carlos Ouyang
436bef4814 Add opportunity dataset collection 2026-04-21 20:02:54 +08:00
50402e4aa7 Refactor opportunity scoring model 2026-04-21 11:25:38 +08:00
4761067c30 Fix opportunity ignore_dust handling 2026-04-21 10:44:00 +08:00
a9f6cf4c46 fix: use rolling_window_ticker for symbol-specific queries, expand window choices
- Replace removed Spot.ticker() with rolling_window_ticker for symbol-specific
ticker stats (compatible with binance-connector>=3.12.0).
- Fall back to ticker_24hr for full-market scans where rolling_window_ticker
requires symbols.
- Expand --window choices from [1h,4h,1d] to full Binance rolling window set:
1m,2m,5m,15m,30m,1h,2h,4h,6h,8h,12h,1d,2d,3d,5d,7d,15d,30d.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-04-20 23:18:28 +08:00
16 changed files with 1290 additions and 122 deletions

68
AGENTS.md Normal file
View File

@@ -0,0 +1,68 @@
# Repository Guidelines
## Project Structure & Module Organization
CoinHunter is a Python CLI package using a `src/` layout. Application code lives in `src/coinhunter/`.
- `src/coinhunter/cli.py` defines CLI parsing and command dispatch for `coinhunter` and `coin`.
- `src/coinhunter/binance/` contains thin Binance Spot client wrappers.
- `src/coinhunter/services/` contains domain logic for account, market, trade, portfolio, opportunity, dataset, research, and evaluation flows.
- `src/coinhunter/config.py`, `runtime.py`, and `audit.py` handle runtime config, output, completions, upgrade flow, and logs.
- `tests/` contains pytest/unittest coverage by service area.
- `dist/` contains built release artifacts; do not edit these manually.
## Build, Test, and Development Commands
Install locally with development tools:
```bash
python -m pip install -e '.[dev]'
```
Run the CLI from the working tree:
```bash
coinhunter --help
coin opportunity -s BTCUSDT ETHUSDT --agent
```
Quality checks:
```bash
pytest tests/ # run the full test suite
ruff check src tests # lint and import ordering
mypy src # static type checks
```
## Coding Style & Naming Conventions
Use Python 3.10+ syntax and 4-space indentation. Keep modules small and service-oriented; prefer adding logic under `src/coinhunter/services/` and keeping `cli.py` focused on argument parsing and dispatch.
Use `snake_case` for functions, variables, and modules. Use `PascalCase` for classes and dataclasses. Preserve existing payload key naming conventions such as `notional_usdt`, `quote_volume`, and `opportunity_score`.
Ruff enforces `E`, `F`, `I`, `W`, `UP`, `B`, `C4`, and `SIM`; line length `E501` is ignored.
## Testing Guidelines
Tests use `pytest` with `unittest.TestCase`. Add tests near the changed behavior:
- CLI dispatch: `tests/test_cli.py`
- Config/runtime: `tests/test_config_runtime.py`
- Opportunity logic: `tests/test_opportunity_service.py`
- Dataset/evaluation flows: `tests/test_opportunity_dataset_service.py`
Name tests as `test_<behavior>`. Prefer fake clients and injected HTTP functions over live network calls. Run `pytest tests/` before submitting changes.
## Commit & Pull Request Guidelines
Recent history uses short imperative subjects, often Conventional Commit prefixes:
- `feat: configurable ticker window for market stats`
- `fix: use rolling_window_ticker for symbol-specific queries`
- `refactor: flatten account command to a single balances view`
Keep commits focused and describe user-visible behavior. Pull requests should include a concise summary, validation commands run, and any config or CLI changes. Link issues when applicable. For CLI output changes, include before/after examples or JSON snippets.
## Security & Configuration Tips
Never commit Binance API keys, secrets, runtime logs, or local `~/.coinhunter` files. Runtime secrets belong in `~/.coinhunter/.env`; configuration belongs in `~/.coinhunter/config.toml`. Use `COINHUNTER_HOME` for isolated test runs.

View File

@@ -4,7 +4,7 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co
## Development commands
- **Install (dev):** `pip install -e ".[dev]"`
- **Install (dev):** `pip install -e ".[dev]"` or `conda env create -f environment.yml && conda activate coinhunter`
- **Run CLI locally:** `python -m coinhunter --help`
- **Run tests:** `pytest` or `python -m pytest tests/`
- **Run single test file:** `pytest tests/test_cli.py -v`
@@ -14,31 +14,39 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co
## Architecture
CoinHunter V2 is a Binance-first crypto trading CLI with a flat, direct architecture:
CoinHunter is a **lightweight data-layer CLI** designed to pair with the `coinbuddy` AI Skill for crypto trading on Binance. The philosophy is **layered screening**: the CLI handles cheap rule-based filtering and monitoring, while the AI Skill handles expensive deep analysis on a small set of curated candidates.
- **`src/coinhunter/cli.py`** — Single entrypoint (`main()`). Uses `argparse` to parse commands and directly dispatches to service functions. There is no separate `commands/` adapter layer.
- **`src/coinhunter/services/`** — Contains all domain logic:
- `account_service.py`balances, positions, overview
### CLI layer (data + execution)
- **`src/coinhunter/cli.py`**Single entrypoint (`main()`). Uses `argparse` to parse commands and directly dispatches to service functions. Core commands: `init`, `config`, `account`, `market`, `buy`, `sell`, `portfolio`, `scan`, `analyze`, `watch`, `upgrade`, `catlog`, `completion`.
- **`src/coinhunter/services/`** — Domain logic:
- `account_service.py` — balances, positions
- `market_service.py` — tickers, klines, scan universe, symbol normalization
- `signal_service.py` — shared market signal scoring used by scan and portfolio analysis
- `portfolio_service.py` — held-position review and add/hold/trim/exit recommendations
- `trade_service.py` — spot and USDT-M futures order execution
- `opportunity_service.py` — market scanning and entry/watch/skip recommendations
- **`src/coinhunter/binance/`** — Thin wrappers around official Binance connectors:
- `spot_client.py` wraps `binance.spot.Spot`
- `um_futures_client.py` wraps `binance.um_futures.UMFutures`
Both normalize request errors into `RuntimeError` and handle single/multi-symbol ticker responses.
- `signal_service.py` — shared market signal scoring (rule-based, zero token cost)
- `portfolio_service.py` — held-position review (`analyze_portfolio`) and lightweight anomaly monitoring (`watch_portfolio`)
- `trade_service.py` — spot order execution only
- `opportunity_service.py` — market scanning (`scan_opportunities`) returning top-N candidates
- `analyze_service.py` — multi-timeframe deep technical analysis for AI consumption
- **`src/coinhunter/binance/spot_client.py`** — Thin wrapper around `binance.spot.Spot`. Normalizes request errors into `RuntimeError`.
- **`src/coinhunter/config.py`** — `load_config()`, `get_binance_credentials()`, `ensure_init_files()`.
- **`src/coinhunter/runtime.py`** — `RuntimePaths`, `get_runtime_paths()`, `print_json()`.
- **`src/coinhunter/runtime.py`** — `RuntimePaths`, `get_runtime_paths()`, `print_json()`, TUI rendering.
- **`src/coinhunter/audit.py`** — Writes JSONL audit events to dated files.
### AI layer (decision)
- **`coinbuddy` Skill** — Lives at `~/.claude/skills/coinbuddy/SKILL.md`. Governs how the AI interacts with the CLI:
- **Discovery flow:** `scan``analyze` → AI synthesis → user confirm → `trade`
- **Portfolio flow:** `watch` → flag NEED_REVIEW → `analyze` → AI synthesis → user confirm → `trade`
- The Skill always uses `--agent` for structured JSON consumption.
## Runtime and environment
User data lives in `~/.coinhunter/` by default (override with `COINHUNTER_HOME`):
- `config.toml` — runtime, binance, trading, signal, opportunity, and portfolio settings
- `config.toml` — runtime, binance, trading, signal, opportunity, portfolio, and watch settings
- `.env``BINANCE_API_KEY` and `BINANCE_API_SECRET`
- `logs/audit_YYYYMMDD.jsonl` — structured audit log
- `logs/dry-run/audit_YYYYMMDD.jsonl` — dry-run audit log
Run `coinhunter init` to generate the config and env templates.
@@ -46,19 +54,29 @@ Run `coinhunter init` to generate the config and env templates.
- **Symbol normalization:** `market_service.normalize_symbol()` strips `/`, `-`, `_`, and uppercases the symbol. CLI inputs like `ETH/USDT`, `eth-usdt`, and `ETHUSDT` are all normalized to `ETHUSDT`.
- **Dry-run behavior:** Trade commands support `--dry-run`. If omitted, the default falls back to `trading.dry_run_default` in `config.toml`.
- **Client injection:** Service functions accept `spot_client` / `futures_client` as keyword arguments. This enables easy unit testing with mocks.
- **Error handling:** Binance client wrappers catch `requests.exceptions.SSLError` and `RequestException` and re-raise as human-readable `RuntimeError`. The CLI catches all exceptions in `main()` and prints `error: {message}` to stderr with exit code 1.
- **Client injection:** Service functions accept `spot_client` as a keyword argument for easy unit testing with mocks.
- **Error handling:** `spot_client.py` catches `requests.exceptions.SSLError` and `RequestException` and re-raises as human-readable `RuntimeError`. The CLI catches all exceptions in `main()` and prints `error: {message}` to stderr with exit code 1.
- **Ticker API fallback:** `spot_client.ticker_stats()` uses `rolling_window_ticker` for symbol-specific queries and `ticker_24hr` for full-market scans (no symbols).
- **Output modes:** All commands support `--agent` for JSON output and `--doc` to print the command's output schema.
- **Watch rules:** `portfolio_service.watch_portfolio()` monitors held positions for anomalies (1h/24h drawdowns, spikes, concentration risk, technical score deterioration). This is pure rule-based and costs zero tokens.
- **Analyze design:** `analyze_service.analyze_symbols()` fetches multi-timeframe klines (1h, 4h, 1d) and produces an AI-friendly output with `summary`, `timeframes`, `key_levels`, `alerts`, and `signal_score`. It is designed for LLM consumption.
## CLI command reference
| Command | Purpose | Token cost |
|---------|---------|-----------|
| `coin scan` | Rule-based market scan, returns top-N candidates | 0 |
| `coin analyze <sym>` | Multi-timeframe deep technical analysis | 0 |
| `coin watch` | Portfolio anomaly monitoring | 0 |
| `coin portfolio` | Full portfolio scoring | 0 |
| `coin account` | Balances | 0 |
| `coin buy/sell` | Trade execution | 0 |
## Testing
Tests live in `tests/` and use `unittest.TestCase` with `unittest.mock.patch`. The test suite covers:
- `test_cli.py` — parser smoke tests and dispatch behavior
- `test_config_runtime.py` — config loading, env parsing, path resolution
- `test_account_market_services.py` — balance/position/ticker/klines logic with mocked clients
- `test_trade_service.py` — spot and futures trade execution paths
- `test_opportunity_service.py` — portfolio and scan scoring logic
Tests live in `tests/` and use `unittest.TestCase` with `unittest.mock.patch`. The test suite covers CLI parser smoke tests, config loading, service logic with mocked clients, and trade execution paths.
## Notes
- `AGENTS.md` in this repo is stale and describes a prior V1 architecture (commands/, smart executor, precheck, review engine). Do not rely on it.
- Removed in the V2 simplification: `backtest`, `strategy`, `opportunity dataset/evaluate/optimize`, `research_service` (CoinGecko). These were over-engineered for the AI-assisted trading flow and have been archived out of the core codebase.

View File

@@ -19,9 +19,15 @@
---
## What's New in 3.0.1
- **Fix ticker API compatibility** — `rolling_window_ticker` replaces the removed `ticker` method in `binance-connector>=3.12.0`.
- **Expand ticker window choices** — `market tickers --window` now supports `1m`, `2m`, `5m`, `15m`, `30m`, `1h`, `2h`, `4h`, `6h`, `8h`, `12h`, `1d`, `2d`, `3d`, `5d`, `7d`, `15d`, `30d`.
- **Smart API fallback** — full-market scan (no symbols) falls back to 24h ticker; symbol-specific queries use rolling window.
## What's New in 3.0
- **Split decision models** — portfolio (add/hold/trim/exit) and opportunity (enter/watch/skip) now use independent scoring logic.
- **Split decision models** — portfolio (add/hold/trim/exit) and opportunity (trigger/setup/chase/skip) now use independent scoring logic.
- **Configurable ticker windows** — `market tickers` supports `--window 1h`, `4h`, or `1d`.
- **Live / dry-run audit logs** — audit logs are written to separate subdirectories; use `catlog --dry-run` to review simulations.
- **Flattened commands** — `account`, `opportunity`, and `config` are now top-level for fewer keystrokes.

View File

@@ -4,13 +4,14 @@ build-backend = "setuptools.build_meta"
[project]
name = "coinhunter"
version = "3.0.0"
version = "3.0.1"
description = "Binance-first trading CLI for balances, market data, opportunity scanning, and execution."
readme = "README.md"
license = {text = "MIT"}
requires-python = ">=3.10"
dependencies = [
"binance-connector>=3.9.0",
"requests>=2.31.0",
"shtab>=1.7.0",
"tomli>=2.0.1; python_version < '3.11'",
"tomli-w>=1.0.0",
@@ -21,6 +22,7 @@ dev = [
"pytest>=8.0",
"ruff>=0.5.0",
"mypy>=1.10.0",
"types-requests>=2.31.0",
]
[project.scripts]

View File

@@ -53,13 +53,15 @@ class SpotBinanceClient:
return self._call("exchange info", self._client.exchange_info, **kwargs) # type: ignore[no-any-return]
def ticker_stats(self, symbols: list[str] | None = None, *, window: str = "1d") -> list[dict[str, Any]]:
kwargs: dict[str, Any] = {"windowSize": window}
if symbols:
kwargs: dict[str, Any] = {"windowSize": window}
if len(symbols) == 1:
kwargs["symbol"] = symbols[0]
else:
kwargs["symbols"] = symbols
response = self._call("ticker stats", self._client.ticker, **kwargs)
response = self._call("ticker stats", self._client.rolling_window_ticker, **kwargs)
else:
response = self._call("ticker stats", self._client.ticker_24hr)
return response if isinstance(response, list) else [response]
def ticker_price(self, symbols: list[str] | None = None) -> list[dict[str, Any]]:

View File

@@ -1,4 +1,4 @@
"""CoinHunter V2 CLI."""
"""CoinHunter V2 CLI — lightweight data layer for AI-assisted crypto trading."""
from __future__ import annotations
@@ -26,6 +26,7 @@ from .runtime import (
)
from .services import (
account_service,
analyze_service,
market_service,
opportunity_service,
portfolio_service,
@@ -40,7 +41,11 @@ examples:
coin m k BTCUSDT -i 1h -l 50
coin buy BTCUSDT -Q 100 -d
coin sell BTCUSDT --qty 0.01 --type limit --price 90000
coin opportunity -s BTCUSDT ETHUSDT
coin scan
coin scan -s BTCUSDT ETHUSDT
coin analyze BTCUSDT ETHUSDT
coin portfolio
coin watch
coin upgrade
"""
@@ -157,7 +162,7 @@ Fields:
last_price latest traded price (float)
price_change_pct change % over the selected window (float, e.g. 2.5 = +2.5%)
quote_volume quote volume over the selected window (float)
window statistics window (enum: 1h, 4h, 1d)
window statistics window (enum: 1m, 2m, 5m, 15m, 30m, 1h, 2h, 4h, 6h, 8h, 12h, 1d, 2d, 3d, 5d, 7d, 15d, 30d)
""",
"json": """\
JSON Output:
@@ -172,7 +177,7 @@ Fields:
last_price latest traded price (float)
price_change_pct change % over the selected window (float, e.g. 2.5 = +2.5%)
quote_volume quote volume over the selected window (float)
window statistics window (enum: 1h, 4h, 1d)
window statistics window (enum: 1m, 2m, 5m, 15m, 30m, 1h, 2h, 4h, 6h, 8h, 12h, 1d, 2d, 3d, 5d, 7d, 15d, 30d)
""",
},
"market/klines": {
@@ -398,54 +403,177 @@ Fields:
position_weight position weight in portfolio (float, 0-1)
""",
},
"opportunity": {
"scan": {
"tui": """\
TUI Output:
RECOMMENDATIONS count=5
1. ETHUSDT action=enter score=0.8200
· trend, momentum, and breakout are aligned for a fresh entry
· base asset ETH passed liquidity and tradability filters
trend=1.0 momentum=0.03 breakout=0.9 volume_confirmation=1.5 volatility=0.02 signal_score=0.82 position_weight=0.0
2. BTCUSDT action=watch score=0.6000
· market structure is constructive but still needs confirmation
· base asset BTC passed liquidity and tradability filters
1. ETHUSDT action=entry confidence=74 score=1.7200
· fresh breakout trigger with clean setup and manageable extension
setup_score=0.74 trigger_score=0.61 liquidity_score=1.0 extension_penalty=0.0 opportunity_score=1.72 position_weight=0.0
2. BTCUSDT action=watch confidence=52 score=0.7800
· setup is constructive but the trigger is not clean enough yet
· symbol is already held, so the opportunity score is discounted for overlap
trend=1.0 momentum=0.01 breakout=0.6 volume_confirmation=1.1 volatility=0.01 signal_score=0.78 position_weight=0.3
setup_score=0.68 trigger_score=0.25 liquidity_score=1.0 extension_penalty=0.1 opportunity_score=0.96 position_weight=0.3
JSON Output:
{
"recommendations": [
{"symbol": "ETHUSDT", "action": "enter", "score": 0.82,
"reasons": ["trend, momentum, and breakout are aligned for a fresh entry", "base asset ETH passed liquidity and tradability filters"],
"metrics": {"trend": 1.0, "momentum": 0.03, "breakout": 0.9, "volume_confirmation": 1.5, "volatility": 0.02, "signal_score": 0.82, "position_weight": 0.0}}
{"symbol": "ETHUSDT", "action": "entry", "confidence": 74, "score": 1.72,
"reasons": ["fresh breakout trigger with clean setup and manageable extension"],
"metrics": {"setup_score": 0.74, "trigger_score": 0.61, "liquidity_score": 1.0, "extension_penalty": 0.0, "opportunity_score": 1.72, "position_weight": 0.0}}
]
}
Fields:
symbol trading pair (e.g. "ETHUSDT")
action enum: "enter" | "watch" | "skip"
score opportunity score after overlap/risk discounts
reasons list of human-readable explanations (includes liquidity filter note for scan)
action enum: "entry" | "watch" | "avoid"
confidence 0..100 confidence index derived from edge_score
score opportunity score after extension and overlap/risk discounts
reasons list of human-readable explanations
metrics scoring breakdown
signal_score raw shared market signal score before overlap discount
setup_score compression, higher-lows, and breakout-proximity quality
trigger_score fresh-breakout, volume, and controlled-momentum quality
liquidity_score relative quote-volume quality after liquidity filters
extension_penalty overextension/chase risk from run-up and MA distance
opportunity_score raw opportunity score before overlap discount
position_weight current portfolio overlap in the same symbol
""",
"json": """\
JSON Output:
{
"recommendations": [
{"symbol": "ETHUSDT", "action": "enter", "score": 0.82,
"reasons": ["trend, momentum, and breakout are aligned for a fresh entry", "base asset ETH passed liquidity and tradability filters"],
"metrics": {"trend": 1.0, "momentum": 0.03, "breakout": 0.9, "volume_confirmation": 1.5, "volatility": 0.02, "signal_score": 0.82, "position_weight": 0.0}}
{"symbol": "ETHUSDT", "action": "entry", "confidence": 74, "score": 1.72,
"reasons": ["fresh breakout trigger with clean setup and manageable extension"],
"metrics": {"setup_score": 0.74, "trigger_score": 0.61, "liquidity_score": 1.0, "extension_penalty": 0.0, "opportunity_score": 1.72, "position_weight": 0.0}}
]
}
Fields:
symbol trading pair (e.g. "ETHUSDT")
action enum: "enter" | "watch" | "skip"
score opportunity score after overlap/risk discounts
reasons list of human-readable explanations (includes liquidity filter note for scan)
action enum: "entry" | "watch" | "avoid"
confidence 0..100 confidence index derived from edge_score
score opportunity score after extension and overlap/risk discounts
reasons list of human-readable explanations
metrics scoring breakdown
signal_score raw shared market signal score before overlap discount
setup_score compression, higher-lows, and breakout-proximity quality
trigger_score fresh-breakout, volume, and controlled-momentum quality
liquidity_score relative quote-volume quality after liquidity filters
extension_penalty overextension/chase risk from run-up and MA distance
opportunity_score raw opportunity score before overlap discount
position_weight current portfolio overlap in the same symbol
""",
},
"analyze": {
"tui": """\
TUI Output:
ANALYSIS count=2
BTCUSDT at 70,000.00 (+2.50% 24h). 1h trend: uptrend, 4h: uptrend, 1d: sideways. 1h RSI 65.0. No significant alerts.
1h:uptrend RSI=65.0 | 4h:uptrend RSI=58.0 | 1d:sideways RSI=52.0
S=[68000.0, 65000.0] R=[71000.0, 73000.0]
JSON Output:
{
"analyses": [
{
"symbol": "BTCUSDT",
"summary": "BTCUSDT at 70000.00 (+2.50% 24h)...",
"price": {"current": 70000.0, "change_24h_pct": 2.5, "high_24h": 71000.0, "low_24h": 68000.0, "volume_24h": 123456789.0},
"timeframes": {"1h": {"trend": "uptrend", "sma20": 69000.0, "rsi": 65.0, "volatility_pct": 1.2, "volume_ratio": 1.3}, ...},
"key_levels": {"support": [68000.0, 65000.0], "resistance": [71000.0, 73000.0], "recent_high": 71000.0, "recent_low": 68000.0},
"alerts": [],
"signal_score": 0.75,
"signal_metrics": {"trend": 1.0, "momentum": 0.02, ...}
}
]
}
Fields:
symbol trading pair
summary human-readable one-line technical summary
price current price, 24h change, high/low, volume
timeframes 1h/4h/1d trend, sma20, rsi, volatility, volume_ratio
key_levels support, resistance, recent_high, recent_low
alerts list of technical alerts (e.g. RSI overbought, near support)
signal_score portfolio-style signal score
signal_metrics raw scoring breakdown
""",
"json": """\
JSON Output:
{
"analyses": [
{
"symbol": "BTCUSDT",
"summary": "BTCUSDT at 70000.00 (+2.50% 24h)...",
"price": {"current": 70000.0, "change_24h_pct": 2.5, "high_24h": 71000.0, "low_24h": 68000.0, "volume_24h": 123456789.0},
"timeframes": {"1h": {"trend": "uptrend", "sma20": 69000.0, "rsi": 65.0, "volatility_pct": 1.2, "volume_ratio": 1.3}, ...},
"key_levels": {"support": [68000.0, 65000.0], "resistance": [71000.0, 73000.0], "recent_high": 71000.0, "recent_low": 68000.0},
"alerts": [],
"signal_score": 0.75,
"signal_metrics": {"trend": 1.0, "momentum": 0.02, ...}
}
]
}
Fields:
symbol trading pair
summary human-readable one-line technical summary
price current price, 24h change, high/low, volume
timeframes 1h/4h/1d trend, sma20, rsi, volatility, volume_ratio
key_levels support, resistance, recent_high, recent_low
alerts list of technical alerts (e.g. RSI overbought, near support)
signal_score portfolio-style signal score
signal_metrics raw scoring breakdown
""",
},
"watch": {
"tui": """\
TUI Output:
PORTFOLIO WATCH 2 position(s) need review, 1 healthy
⚠ NEED_REVIEW ETHUSDT
· 1h drop -8.00% (alert threshold -5.0%)
· 24h drop -12.00% (alert threshold -10.0%)
· position weight 60.0% exceeds max 50.0%
· technical score -0.30 below exit threshold -0.20
✓ HEALTHY BTCUSDT weight=30.0%
JSON Output:
{
"watch_results": [
{"symbol": "ETHUSDT", "status": "need_review", "reasons": ["1h drop -8.00%..."], "metrics": {...}},
{"symbol": "BTCUSDT", "status": "healthy", "reasons": [], "metrics": {"position_weight": 0.3, ...}}
],
"summary": "2 position(s) need review, 1 healthy",
"need_review_count": 2,
"healthy_count": 1
}
Fields:
watch_results per-position watch status
symbol trading pair
status "need_review" | "healthy"
reasons list of triggered alert reasons
metrics position_weight, signal_score, price_change_24h_pct, volatility, trend
summary human-readable summary string
need_review_count number of positions flagged for review
healthy_count number of positions with no alerts
""",
"json": """\
JSON Output:
{
"watch_results": [
{"symbol": "ETHUSDT", "status": "need_review", "reasons": ["1h drop -8.00%..."], "metrics": {...}},
{"symbol": "BTCUSDT", "status": "healthy", "reasons": [], "metrics": {"position_weight": 0.3, ...}}
],
"summary": "2 position(s) need review, 1 healthy",
"need_review_count": 2,
"healthy_count": 1
}
Fields:
watch_results per-position watch status
symbol trading pair
status "need_review" | "healthy"
reasons list of triggered alert reasons
metrics position_weight, signal_score, price_change_24h_pct, volatility, trend
summary human-readable summary string
need_review_count number of positions flagged for review
healthy_count number of positions with no alerts
""",
},
"upgrade": {
@@ -545,17 +673,17 @@ Fields:
TUI Output:
CONFIG
binance.recv_window = 5000
opportunity.top_n = 10
opportunity.top_n = 5
JSON Output:
{"binance.recv_window": 5000, "opportunity.top_n": 10}
{"binance.recv_window": 5000, "opportunity.top_n": 5}
Fields:
key dot-notation config key (e.g. "binance.recv_window")
value current value (type depends on key: bool, int, float, list, str)
""",
"json": """\
JSON Output:
{"binance.recv_window": 5000, "opportunity.top_n": 10}
{"binance.recv_window": 5000, "opportunity.top_n": 5}
Fields:
key dot-notation config key (e.g. "binance.recv_window")
value current value (type depends on key: bool, int, float, list, str)
@@ -736,8 +864,10 @@ def build_parser() -> argparse.ArgumentParser:
)
tickers_parser.add_argument("symbols", nargs="+", metavar="SYM", help="Symbols to query (e.g. BTCUSDT ETH/USDT)")
tickers_parser.add_argument(
"-w", "--window", choices=["1h", "4h", "1d"], default="1d",
help="Statistics window: 1h, 4h, 1d (default: 1d)",
"-w", "--window",
choices=["1m", "2m", "5m", "15m", "30m", "1h", "2h", "4h", "6h", "8h", "12h", "1d", "2d", "3d", "5d", "7d", "15d", "30d"],
default="1d",
help="Rolling statistics window (default: 1d)",
)
_add_global_flags(tickers_parser)
klines_parser = market_subparsers.add_parser(
@@ -781,12 +911,28 @@ def build_parser() -> argparse.ArgumentParser:
)
_add_global_flags(portfolio_parser)
opportunity_parser = subparsers.add_parser(
"opportunity", aliases=["o"], help="Scan market for opportunities",
description="Scan the market for trading opportunities and return the top-N candidates with signals.",
scan_parser = subparsers.add_parser(
"scan", aliases=["sc"], help="Scan market for top-N opportunities",
description="Rule-based market scan that returns the top-N candidates. Zero token cost. "
"Use `analyze` for deep-dive on selected symbols.",
)
opportunity_parser.add_argument("-s", "--symbols", nargs="*", metavar="SYM", help="Restrict scan to specific symbols")
_add_global_flags(opportunity_parser)
scan_parser.add_argument("-s", "--symbols", nargs="*", metavar="SYM", help="Restrict scan to specific symbols")
_add_global_flags(scan_parser)
analyze_parser = subparsers.add_parser(
"analyze", aliases=["an"], help="Detailed technical analysis for one or more symbols",
description="Multi-timeframe technical analysis (1h/4h/1d) with key levels, alerts, and signal scores. "
"Designed for AI consumption — use with --agent for structured JSON output.",
)
analyze_parser.add_argument("symbols", nargs="+", metavar="SYM", help="Symbols to analyze (e.g. BTCUSDT ETHUSDT)")
_add_global_flags(analyze_parser)
watch_parser = subparsers.add_parser(
"watch", aliases=["w"], help="Lightweight portfolio monitoring",
description="Monitor held positions for anomalies (drawdowns, spikes, concentration risk, technical deterioration). "
"Returns NEED_REVIEW or HEALTHY for each position. Zero token cost.",
)
_add_global_flags(watch_parser)
upgrade_parser = subparsers.add_parser(
"upgrade", help="Upgrade coinhunter to the latest version",
@@ -823,9 +969,11 @@ _CANONICAL_COMMANDS = {
"m": "market",
"pf": "portfolio",
"p": "portfolio",
"o": "opportunity",
"cfg": "config",
"c": "config",
"sc": "scan",
"an": "analyze",
"w": "watch",
}
_CANONICAL_SUBCOMMANDS = {
@@ -845,7 +993,9 @@ def _get_doc_key(argv: list[str]) -> str | None:
cmd = _CANONICAL_COMMANDS.get(tokens[0], tokens[0])
if cmd in _COMMANDS_WITH_SUBCOMMANDS and len(tokens) > 1:
sub = _CANONICAL_SUBCOMMANDS.get(tokens[1], tokens[1])
return f"{cmd}/{sub}"
sub_key = f"{cmd}/{sub}"
if sub_key in COMMAND_DOCS:
return sub_key
return cmd
@@ -893,11 +1043,12 @@ def main(argv: list[str] | None = None) -> int:
parser = build_parser()
raw_argv = _reorder_flag(raw_argv, "--agent", "-a")
args = parser.parse_args(raw_argv)
args.agent = bool(getattr(args, "agent", False) or "--agent" in raw_argv or "-a" in raw_argv)
# Normalize aliases to canonical command names
if args.command:
args.command = _CANONICAL_COMMANDS.get(args.command, args.command)
for attr in ("account_command", "market_command", "config_command"):
for attr in ("market_command", "config_command"):
val = getattr(args, attr, None)
if val:
setattr(args, attr, _CANONICAL_SUBCOMMANDS.get(val, val))
@@ -989,6 +1140,21 @@ def main(argv: list[str] | None = None) -> int:
print(shtab.complete(parser, shell=args.shell, preamble=""))
return 0
if args.command == "upgrade":
with with_spinner("Upgrading coinhunter...", enabled=not args.agent):
result = self_upgrade()
print_output(result, agent=args.agent)
return 0
if args.command == "catlog":
with with_spinner("Reading audit logs...", enabled=not args.agent):
entries = read_audit_log(limit=args.limit, offset=args.offset, dry_run=args.dry_run)
print_output(
{"entries": entries, "limit": args.limit, "offset": args.offset, "total": len(entries), "dry_run": args.dry_run},
agent=args.agent,
)
return 0
config = load_config()
if args.command == "account":
@@ -1061,7 +1227,7 @@ def main(argv: list[str] | None = None) -> int:
print_output(result, agent=args.agent)
return 0
if args.command == "opportunity":
if args.command == "scan":
spot_client = _load_spot_client(config)
with with_spinner("Scanning opportunities...", enabled=not args.agent):
result = opportunity_service.scan_opportunities(
@@ -1070,19 +1236,20 @@ def main(argv: list[str] | None = None) -> int:
print_output(result, agent=args.agent)
return 0
if args.command == "upgrade":
with with_spinner("Upgrading coinhunter...", enabled=not args.agent):
result = self_upgrade()
if args.command == "analyze":
spot_client = _load_spot_client(config)
with with_spinner("Analyzing symbols...", enabled=not args.agent):
result = analyze_service.analyze_symbols(
config, spot_client=spot_client, symbols=args.symbols
)
print_output(result, agent=args.agent)
return 0
if args.command == "catlog":
with with_spinner("Reading audit logs...", enabled=not args.agent):
entries = read_audit_log(limit=args.limit, offset=args.offset, dry_run=args.dry_run)
print_output(
{"entries": entries, "limit": args.limit, "offset": args.offset, "total": len(entries), "dry_run": args.dry_run},
agent=args.agent,
)
if args.command == "watch":
spot_client = _load_spot_client(config)
with with_spinner("Watching portfolio...", enabled=not args.agent):
result = portfolio_service.watch_portfolio(config, spot_client=spot_client)
print_output(result, agent=args.agent)
return 0
parser.error(f"Unsupported command {args.command}")

View File

@@ -38,6 +38,33 @@ spot_enabled = true
dry_run_default = false
dust_usdt_threshold = 10.0
[opportunity]
min_quote_volume = 1000000.0
top_n = 5
scan_limit = 50
ignore_dust = true
entry_threshold = 1.5
watch_threshold = 0.6
min_trigger_score = 0.45
min_setup_score = 0.35
overlap_penalty = 0.6
lookback_intervals = ["1h", "4h", "1d"]
[opportunity.model_weights]
trend = 0.1406
compression = 0.1688
breakout_proximity = 0.0875
higher_lows = 0.15
range_position = 0.45
fresh_breakout = 0.2
volume = 0.525
momentum = 0.1562
setup = 1.875
trigger = 1.875
liquidity = 0.3
volatility_penalty = 0.8
extension_penalty = 0.45
[signal]
lookback_interval = "1h"
trend = 1.0
@@ -46,21 +73,18 @@ breakout = 0.8
volume = 0.7
volatility_penalty = 0.5
[opportunity]
min_quote_volume = 1000000.0
top_n = 10
scan_limit = 50
ignore_dust = true
entry_threshold = 1.5
watch_threshold = 0.6
overlap_penalty = 0.6
[portfolio]
add_threshold = 1.5
hold_threshold = 0.6
trim_threshold = 0.2
exit_threshold = -0.2
max_position_weight = 0.6
[watch]
alert_drawdown_1h_pct = -5.0
alert_drawdown_24h_pct = -10.0
alert_spike_1h_pct = 8.0
max_position_weight = 0.5
"""
DEFAULT_ENV = "BINANCE_API_KEY=\nBINANCE_API_SECRET=\n"

View File

@@ -335,11 +335,11 @@ def _render_tui(payload: Any) -> None:
action = r.get("action", "")
action_color = (
_GREEN
if action in {"add", "enter"}
if action in {"add", "trigger"}
else _YELLOW
if action in {"hold", "watch", "review"}
if action in {"hold", "setup", "review"}
else _RED
if action in {"exit", "skip", "trim"}
if action in {"chase", "exit", "skip", "trim"}
else _CYAN
)
print(
@@ -353,6 +353,59 @@ def _render_tui(payload: Any) -> None:
print(f" {_DIM}{metric_str}{_RESET}")
return
if "analyses" in payload:
rows = payload["analyses"]
print(f"\n{_BOLD}{_CYAN} ANALYSIS {_RESET} count={len(rows)}")
for r in rows:
symbol = r.get("symbol", "")
price = r.get("price", {})
current = price.get("current", 0)
change = price.get("change_24h_pct", 0)
change_color = _GREEN if change >= 0 else _RED
print(f"\n {_BOLD}{symbol}{_RESET} {current:,.2f} {_color(f'{change:+.2f}%', change_color)}")
print(f" {r.get('summary', '')}")
alerts = r.get("alerts", [])
if alerts:
for alert in alerts:
print(f" {_YELLOW}! {_RESET}{alert}")
timeframes = r.get("timeframes", {})
if timeframes:
tf_parts = []
for tf_name, tf_data in timeframes.items():
trend = tf_data.get("trend", "?")
rsi = tf_data.get("rsi")
rsi_str = f" RSI={rsi:.1f}" if rsi is not None else ""
tf_parts.append(f"{tf_name}:{trend}{rsi_str}")
print(f" {_DIM}{' | '.join(tf_parts)}{_RESET}")
levels = r.get("key_levels", {})
if levels:
sup = levels.get("support", [])
res = levels.get("resistance", [])
if sup or res:
print(f" S={sup} R={res}")
return
if "watch_results" in payload:
rows = payload["watch_results"]
summary = payload.get("summary", "")
print(f"\n{_BOLD}{_CYAN} PORTFOLIO WATCH {_RESET} {summary}")
for r in rows:
status = r.get("status", "")
symbol = r.get("symbol", "")
if status == "need_review":
print(f"\n {_YELLOW}⚠ NEED_REVIEW{_RESET} {_BOLD}{symbol}{_RESET}")
for reason in r.get("reasons", []):
print(f" · {reason}")
metrics = r.get("metrics", {})
if metrics:
metric_str = " ".join(f"{k}={v}" for k, v in metrics.items())
print(f" {_DIM}{metric_str}{_RESET}")
else:
metrics = r.get("metrics", {})
weight = metrics.get("position_weight", 0)
print(f" {_GREEN}✓ HEALTHY{_RESET} {symbol} weight={weight:.2%}")
return
if "command" in payload and "returncode" in payload:
rc = payload.get("returncode", 0)
stdout = payload.get("stdout", "")

View File

@@ -90,6 +90,7 @@ def get_positions(
config: dict[str, Any],
*,
spot_client: Any,
ignore_dust: bool = True,
) -> dict[str, Any]:
quote = str(config.get("market", {}).get("default_quote", "USDT")).upper()
dust = float(config.get("trading", {}).get("dust_usdt_threshold", 0.0))
@@ -102,7 +103,7 @@ def get_positions(
asset = item["asset"]
mark_price = price_map.get(asset, 1.0 if asset == quote else 0.0)
notional = quantity * mark_price
if notional < dust:
if ignore_dust and notional < dust:
continue
rows.append(
asdict(

View File

@@ -0,0 +1,201 @@
"""Detailed symbol analysis for AI consumption."""
from __future__ import annotations
from statistics import mean
from typing import Any
from .market_service import normalize_symbol
from .signal_service import score_portfolio_signal
def _clamp(value: float, low: float, high: float) -> float:
return max(low, min(value, high))
def _safe_pct(new: float, old: float) -> float:
if old == 0:
return 0.0
return (new - old) / old
def _rsi(closes: list[float], period: int = 14) -> float | None:
if len(closes) < period + 1:
return None
gains = []
losses = []
for i in range(1, period + 1):
delta = closes[-i] - closes[-i - 1]
if delta > 0:
gains.append(delta)
losses.append(0.0)
else:
gains.append(0.0)
losses.append(abs(delta))
avg_gain = mean(gains) if gains else 0.0
avg_loss = mean(losses) if losses else 0.0
if avg_loss == 0:
return 100.0
rs = avg_gain / avg_loss
return 100.0 - (100.0 / (1.0 + rs))
def _analyze_timeframe(klines: list[list[Any]]) -> dict[str, Any]:
if not klines:
return {"trend": "unknown", "sma20": None, "rsi": None, "volatility_pct": None}
closes = [float(item[4]) for item in klines]
volumes = [float(item[5]) for item in klines]
current = closes[-1]
sma20 = mean(closes[-20:]) if len(closes) >= 20 else mean(closes)
trend = (
"uptrend"
if current >= sma20 * 1.02
else "downtrend"
if current <= sma20 * 0.98
else "sideways"
)
rsi_val = _rsi(closes)
if len(closes) >= 10 and current:
volatility = (max(closes[-10:]) - min(closes[-10:])) / current * 100
else:
volatility = None
avg_volume = mean(volumes[:-1]) if len(volumes) > 1 else volumes[-1]
volume_ratio = volumes[-1] / avg_volume if avg_volume else 1.0
return {
"trend": trend,
"sma20": round(sma20, 4) if sma20 else None,
"rsi": round(rsi_val, 2) if rsi_val is not None else None,
"volatility_pct": round(volatility, 4) if volatility is not None else None,
"volume_ratio": round(volume_ratio, 4),
}
def _key_levels(klines: list[list[Any]]) -> dict[str, Any]:
if not klines:
return {"support": [], "resistance": [], "recent_high": None, "recent_low": None}
closes = [float(item[4]) for item in klines]
highs = [float(item[2]) for item in klines]
lows = [float(item[3]) for item in klines]
recent_high = max(highs[-20:]) if len(highs) >= 20 else max(highs)
recent_low = min(lows[-20:]) if len(lows) >= 20 else min(lows)
# Simple support/resistance: recent local extremes
support = sorted(set([round(min(lows[-10:]), 2), round(recent_low, 2)]))
resistance = sorted(set([round(max(highs[-10:]), 2), round(recent_high, 2)]))
return {
"support": support,
"resistance": resistance,
"recent_high": round(recent_high, 2),
"recent_low": round(recent_low, 2),
}
def _generate_alerts(
ticker: dict[str, Any],
tf_1h: dict[str, Any],
tf_4h: dict[str, Any],
tf_1d: dict[str, Any],
levels: dict[str, Any],
current_price: float,
) -> list[str]:
alerts: list[str] = []
change_24h = float(ticker.get("price_change_pct", 0.0))
if abs(change_24h) >= 10:
alerts.append(f"24h price change is extreme ({change_24h:+.2f}%)")
elif abs(change_24h) >= 5:
alerts.append(f"24h price change is significant ({change_24h:+.2f}%)")
rsi_1h = tf_1h.get("rsi")
if rsi_1h is not None:
if rsi_1h >= 70:
alerts.append(f"1h RSI is overbought ({rsi_1h:.1f})")
elif rsi_1h <= 30:
alerts.append(f"1h RSI is oversold ({rsi_1h:.1f})")
for level in levels.get("resistance", []):
if level > 0 and abs(current_price - level) / level < 0.02:
alerts.append(f"price is near resistance ({level:,.2f})")
for level in levels.get("support", []):
if level > 0 and abs(current_price - level) / level < 0.02:
alerts.append(f"price is near support ({level:,.2f})")
if tf_1h.get("trend") != tf_4h.get("trend"):
alerts.append(f"timeframe divergence: 1h={tf_1h['trend']} vs 4h={tf_4h['trend']}")
vol_ratio = tf_1h.get("volume_ratio", 1.0)
if vol_ratio >= 2.0:
alerts.append(f"volume spike detected ({vol_ratio:.2f}x average)")
return alerts
def analyze_symbols(
config: dict[str, Any],
*,
spot_client: Any,
symbols: list[str],
) -> dict[str, Any]:
quote = str(config.get("market", {}).get("default_quote", "USDT")).upper()
analyses = []
for raw_symbol in symbols:
symbol = normalize_symbol(raw_symbol)
# Fetch multi-timeframe klines
klines_1h = spot_client.klines(symbol=symbol, interval="1h", limit=72)
klines_4h = spot_client.klines(symbol=symbol, interval="4h", limit=42)
klines_1d = spot_client.klines(symbol=symbol, interval="1d", limit=30)
tickers = spot_client.ticker_stats([symbol], window="1d")
ticker = tickers[0] if tickers else {"priceChangePercent": "0", "lastPrice": "0", "quoteVolume": "0"}
current_price = float(ticker.get("lastPrice") or ticker.get("last_price") or 0.0)
change_24h = float(ticker.get("priceChangePercent") or ticker.get("price_change_pct") or 0.0)
volume_24h = float(ticker.get("quoteVolume") or ticker.get("quote_volume") or 0.0)
tf_1h = _analyze_timeframe(klines_1h)
tf_4h = _analyze_timeframe(klines_4h)
tf_1d = _analyze_timeframe(klines_1d)
levels = _key_levels(klines_1h)
alerts = _generate_alerts(ticker, tf_1h, tf_4h, tf_1d, levels, current_price)
# Portfolio-style signal for context
closes_1h = [float(item[4]) for item in klines_1h]
volumes_1h = [float(item[5]) for item in klines_1h]
signal_score, signal_metrics = score_portfolio_signal(
closes_1h,
volumes_1h,
{"price_change_pct": change_24h},
{"trend": 1.0, "momentum": 1.0, "breakout": 0.8, "volume": 0.7, "volatility_penalty": 0.5},
)
# Build human-readable summary for AI
summary_parts = [
f"{symbol} at {current_price:,.2f} ({change_24h:+.2f}% 24h).",
f"1h trend: {tf_1h['trend']}, 4h: {tf_4h['trend']}, 1d: {tf_1d['trend']}.",
]
if tf_1h["rsi"] is not None:
summary_parts.append(f"1h RSI {tf_1h['rsi']:.1f}.")
if alerts:
summary_parts.append(f"Alerts: {'; '.join(alerts)}.")
else:
summary_parts.append("No significant alerts.")
analyses.append({
"symbol": symbol,
"summary": " ".join(summary_parts),
"price": {
"current": round(current_price, 4),
"change_24h_pct": round(change_24h, 4),
"high_24h": float(ticker.get("highPrice") or 0.0),
"low_24h": float(ticker.get("lowPrice") or 0.0),
"volume_24h": round(volume_24h, 4),
},
"timeframes": {
"1h": tf_1h,
"4h": tf_4h,
"1d": tf_1d,
},
"key_levels": levels,
"alerts": alerts,
"signal_score": round(signal_score, 4),
"signal_metrics": signal_metrics,
})
return {"analyses": analyses}

View File

@@ -8,7 +8,7 @@ from typing import Any
from ..audit import audit_event
from .account_service import get_positions
from .market_service import base_asset, get_scan_universe, normalize_symbol
from .signal_service import get_signal_interval, get_signal_weights, score_market_signal
from .signal_service import get_signal_interval, score_opportunity_signal
@dataclass
@@ -16,6 +16,7 @@ class OpportunityRecommendation:
symbol: str
action: str
score: float
confidence: int
reasons: list[str]
metrics: dict[str, float]
@@ -25,20 +26,64 @@ def _opportunity_thresholds(config: dict[str, Any]) -> dict[str, float]:
return {
"entry_threshold": float(opportunity_config.get("entry_threshold", 1.5)),
"watch_threshold": float(opportunity_config.get("watch_threshold", 0.6)),
"min_trigger_score": float(opportunity_config.get("min_trigger_score", 0.45)),
"min_setup_score": float(opportunity_config.get("min_setup_score", 0.35)),
"overlap_penalty": float(opportunity_config.get("overlap_penalty", 0.6)),
}
def _action_for_opportunity(score: float, thresholds: dict[str, float]) -> tuple[str, list[str]]:
def _clamp(value: float, low: float, high: float) -> float:
return min(max(value, low), high)
def _series_from_klines(klines: list[list[Any]]) -> tuple[list[float], list[float]]:
return [float(item[4]) for item in klines], [float(item[5]) for item in klines]
def _confidence_from_edge(edge_score: float) -> int:
return int(_clamp((edge_score + 1.0) / 2.0, 0.0, 1.0) * 100)
def _action_for_opportunity(score: float, metrics: dict[str, float], thresholds: dict[str, float]) -> tuple[str, list[str], int]:
reasons: list[str] = []
if score >= thresholds["entry_threshold"]:
reasons.append("trend, momentum, and breakout are aligned for a fresh entry")
return "enter", reasons
if score >= thresholds["watch_threshold"]:
reasons.append("market structure is constructive but still needs confirmation")
return "watch", reasons
reasons.append("edge is too weak for a new entry")
return "skip", reasons
extension_penalty = metrics.get("extension_penalty", 0.0)
recent_runup = metrics.get("recent_runup", 0.0)
breakout_pct = metrics.get("breakout_pct", 0.0)
setup_score = metrics.get("setup_score", 0.0)
trigger_score = metrics.get("trigger_score", 0.0)
edge_score = metrics.get("edge_score", 0.0)
min_trigger_score = thresholds["min_trigger_score"]
min_setup_score = thresholds["min_setup_score"]
confidence = _confidence_from_edge(edge_score)
# Avoid: overextended or clearly negative edge — do not enter
if extension_penalty >= 1.0 and (recent_runup >= 0.10 or breakout_pct >= 0.03):
reasons.append("price is already extended, chasing here is risky")
return "avoid", reasons, confidence
if edge_score < -0.2:
reasons.append("overall signal quality is poor")
return "avoid", reasons, confidence
# Entry: high-confidence breakout — setup + trigger + not overextended
if (
edge_score >= 0.3
and trigger_score >= min_trigger_score
and setup_score >= min_setup_score
and extension_penalty < 0.5
):
reasons.append("fresh breakout trigger with clean setup and manageable extension")
return "entry", reasons, confidence
# Watch: constructive but not clean enough
if edge_score >= 0.0 and setup_score >= min_setup_score:
reasons.append("setup is constructive but the trigger is not clean enough yet")
return "watch", reasons, confidence
# Default avoid
reasons.append("setup, trigger, or overall quality is too weak")
return "avoid", reasons, confidence
def scan_opportunities(
@@ -48,13 +93,13 @@ def scan_opportunities(
symbols: list[str] | None = None,
) -> dict[str, Any]:
opportunity_config = config.get("opportunity", {})
signal_weights = get_signal_weights(config)
ignore_dust = bool(opportunity_config.get("ignore_dust", True))
interval = get_signal_interval(config)
thresholds = _opportunity_thresholds(config)
scan_limit = int(opportunity_config.get("scan_limit", 50))
top_n = int(opportunity_config.get("top_n", 10))
quote = str(config.get("market", {}).get("default_quote", "USDT")).upper()
held_positions = get_positions(config, spot_client=spot_client)["positions"]
held_positions = get_positions(config, spot_client=spot_client, ignore_dust=ignore_dust)["positions"]
concentration_map = {normalize_symbol(item["symbol"]): float(item["notional_usdt"]) for item in held_positions}
total_held = sum(concentration_map.values()) or 1.0
@@ -63,14 +108,13 @@ def scan_opportunities(
for ticker in universe:
symbol = normalize_symbol(ticker["symbol"])
klines = spot_client.klines(symbol=symbol, interval=interval, limit=24)
closes = [float(item[4]) for item in klines]
volumes = [float(item[5]) for item in klines]
closes, volumes = _series_from_klines(klines)
concentration = concentration_map.get(symbol, 0.0) / total_held
signal_score, metrics = score_market_signal(closes, volumes, ticker, signal_weights)
score = signal_score - thresholds["overlap_penalty"] * concentration
action, reasons = _action_for_opportunity(score, thresholds)
metrics["signal_score"] = round(signal_score, 4)
opportunity_score, metrics = score_opportunity_signal(closes, volumes, ticker, opportunity_config)
score = opportunity_score - thresholds["overlap_penalty"] * concentration
metrics["opportunity_score"] = round(opportunity_score, 4)
metrics["position_weight"] = round(concentration, 4)
action, reasons, confidence = _action_for_opportunity(score, metrics, thresholds)
if symbol.endswith(quote):
reasons.append(f"base asset {base_asset(symbol, quote)} passed liquidity and tradability filters")
if concentration > 0:
@@ -81,6 +125,7 @@ def scan_opportunities(
symbol=symbol,
action=action,
score=round(score, 4),
confidence=confidence,
reasons=reasons,
metrics=metrics,
)

View File

@@ -8,7 +8,11 @@ from typing import Any
from ..audit import audit_event
from .account_service import get_positions
from .market_service import normalize_symbol
from .signal_service import get_signal_interval, get_signal_weights, score_market_signal
from .signal_service import (
get_signal_interval,
get_signal_weights,
score_portfolio_signal,
)
@dataclass
@@ -20,6 +24,14 @@ class PortfolioRecommendation:
metrics: dict[str, float]
@dataclass
class WatchResult:
symbol: str
status: str
reasons: list[str]
metrics: dict[str, float]
def _portfolio_thresholds(config: dict[str, Any]) -> dict[str, float]:
portfolio_config = config.get("portfolio", {})
return {
@@ -70,7 +82,7 @@ def analyze_portfolio(config: dict[str, Any], *, spot_client: Any) -> dict[str,
tickers = spot_client.ticker_stats([symbol], window="1d")
ticker = tickers[0] if tickers else {"priceChangePercent": "0"}
concentration = position["notional_usdt"] / total_notional
score, metrics = score_market_signal(
score, metrics = score_portfolio_signal(
closes,
volumes,
{"price_change_pct": float(ticker.get("priceChangePercent") or 0.0)},
@@ -107,3 +119,98 @@ def analyze_portfolio(config: dict[str, Any], *, spot_client: Any) -> dict[str,
},
)
return payload
def watch_portfolio(config: dict[str, Any], *, spot_client: Any) -> dict[str, Any]:
"""Lightweight portfolio monitoring. Returns NEED_REVIEW or HEALTHY for each position.
Zero-token-cost rule-based screening. AI should only deep-analyze NEED_REVIEW items.
"""
quote = str(config.get("market", {}).get("default_quote", "USDT")).upper()
watch_config = config.get("watch", {})
alert_drawdown_1h_pct = float(watch_config.get("alert_drawdown_1h_pct", -5.0))
alert_drawdown_24h_pct = float(watch_config.get("alert_drawdown_24h_pct", -10.0))
alert_spike_1h_pct = float(watch_config.get("alert_spike_1h_pct", 8.0))
max_position_weight = float(watch_config.get("max_position_weight", 0.5))
exit_threshold = float(config.get("portfolio", {}).get("exit_threshold", -0.2))
signal_weights = get_signal_weights(config)
interval = get_signal_interval(config)
positions = get_positions(config, spot_client=spot_client)["positions"]
positions = [item for item in positions if item["symbol"] != quote]
total_notional = sum(item["notional_usdt"] for item in positions) or 1.0
watch_results = []
need_review_count = 0
for position in positions:
symbol = normalize_symbol(position["symbol"])
klines = spot_client.klines(symbol=symbol, interval=interval, limit=24)
closes = [float(item[4]) for item in klines]
volumes = [float(item[5]) for item in klines]
tickers = spot_client.ticker_stats([symbol], window="1d")
ticker = tickers[0] if tickers else {"priceChangePercent": "0"}
price_change_24h = float(ticker.get("priceChangePercent") or 0.0)
concentration = position["notional_usdt"] / total_notional
score, metrics = score_portfolio_signal(
closes,
volumes,
{"price_change_pct": price_change_24h},
signal_weights,
)
reasons: list[str] = []
# Rule 1: 1h price crash
if len(closes) >= 2:
price_change_1h = (closes[-1] - closes[-2]) / closes[-2] * 100 if closes[-2] != 0 else 0.0
if price_change_1h <= alert_drawdown_1h_pct:
reasons.append(f"1h drop {price_change_1h:.2f}% (alert threshold {alert_drawdown_1h_pct:.1f}%)")
if price_change_1h >= alert_spike_1h_pct:
reasons.append(f"1h spike +{price_change_1h:.2f}% (alert threshold {alert_spike_1h_pct:.1f}%)")
# Rule 2: 24h price crash
if price_change_24h <= alert_drawdown_24h_pct:
reasons.append(f"24h drop {price_change_24h:.2f}% (alert threshold {alert_drawdown_24h_pct:.1f}%)")
# Rule 3: Concentration risk
if concentration >= max_position_weight:
reasons.append(f"position weight {concentration:.1%} exceeds max {max_position_weight:.1%}")
# Rule 4: Technical deterioration
if score <= exit_threshold:
reasons.append(f"technical score {score:.2f} below exit threshold {exit_threshold:.2f}")
if reasons:
status = "need_review"
need_review_count += 1
else:
status = "healthy"
watch_results.append(
asdict(
WatchResult(
symbol=symbol,
status=status,
reasons=reasons,
metrics={
"position_weight": round(concentration, 4),
"signal_score": round(score, 4),
"price_change_24h_pct": round(price_change_24h, 4),
"volatility": metrics.get("volatility", 0.0),
"trend": metrics.get("trend", 0.0),
},
)
)
)
healthy_count = len(watch_results) - need_review_count
summary = f"{need_review_count} position(s) need review, {healthy_count} healthy"
if need_review_count == 0:
summary = "All positions healthy — no action needed"
return {
"watch_results": watch_results,
"summary": summary,
"need_review_count": need_review_count,
"healthy_count": healthy_count,
}

View File

@@ -1,17 +1,67 @@
"""Shared market signal scoring."""
"""Market signal scoring primitives and domain-specific models."""
from __future__ import annotations
from math import log10
from statistics import mean
from typing import Any
def _clamp(value: float, low: float, high: float) -> float:
return max(low, min(value, high))
def _safe_pct(new: float, old: float) -> float:
if old == 0:
return 0.0
return (new - old) / old
def _range_pct(values: list[float], denominator: float) -> float:
if not values or denominator == 0:
return 0.0
return (max(values) - min(values)) / denominator
_DEFAULT_OPPORTUNITY_MODEL_WEIGHTS = {
"trend": 0.1406,
"compression": 0.1688,
"breakout_proximity": 0.0875,
"higher_lows": 0.15,
"range_position": 0.45,
"fresh_breakout": 0.2,
"volume": 0.525,
"momentum": 0.1562,
"setup": 1.875,
"trigger": 1.875,
"liquidity": 0.3,
"volatility_penalty": 0.8,
"extension_penalty": 0.45,
}
def get_opportunity_model_weights(opportunity_config: dict[str, Any]) -> dict[str, float]:
configured = opportunity_config.get("model_weights", {})
return {
key: float(configured.get(key, default))
for key, default in _DEFAULT_OPPORTUNITY_MODEL_WEIGHTS.items()
}
def _weighted_quality(values: dict[str, float], weights: dict[str, float]) -> float:
weighted_sum = 0.0
total_weight = 0.0
for key, value in values.items():
weight = max(float(weights.get(key, 0.0)), 0.0)
if weight == 0:
continue
weighted_sum += weight * value
total_weight += weight
if total_weight == 0:
return 0.0
return _clamp(weighted_sum / total_weight, -1.0, 1.0)
def get_signal_weights(config: dict[str, Any]) -> dict[str, float]:
signal_config = config.get("signal", {})
return {
@@ -35,6 +85,15 @@ def score_market_signal(
volumes: list[float],
ticker: dict[str, Any],
weights: dict[str, float],
) -> tuple[float, dict[str, float]]:
return score_portfolio_signal(closes, volumes, ticker, weights)
def score_portfolio_signal(
closes: list[float],
volumes: list[float],
ticker: dict[str, Any],
weights: dict[str, float],
) -> tuple[float, dict[str, float]]:
if len(closes) < 2 or not volumes:
return 0.0, {
@@ -76,3 +135,158 @@ def score_market_signal(
"volatility": round(volatility, 4),
}
return score, metrics
def score_opportunity_signal(
closes: list[float],
volumes: list[float],
ticker: dict[str, Any],
opportunity_config: dict[str, Any],
) -> tuple[float, dict[str, float]]:
model_weights = get_opportunity_model_weights(opportunity_config)
if len(closes) < 6 or len(volumes) < 2:
return 0.0, {
"setup_score": 0.0,
"trigger_score": 0.0,
"liquidity_score": 0.0,
"edge_score": 0.0,
"setup_quality": 0.0,
"trigger_quality": 0.0,
"liquidity_quality": 0.0,
"risk_quality": 0.0,
"extension_penalty": 0.0,
"breakout_pct": 0.0,
"recent_runup": 0.0,
"volume_confirmation": 1.0,
"volatility": 0.0,
}
current = closes[-1]
sma_short = mean(closes[-5:])
sma_long = mean(closes[-20:]) if len(closes) >= 20 else mean(closes)
if current >= sma_short >= sma_long:
trend_quality = 1.0
elif current < sma_short < sma_long:
trend_quality = -1.0
else:
trend_quality = 0.0
prior_closes = closes[:-1]
prev_high = max(prior_closes[-20:]) if prior_closes else current
recent_low = min(closes[-20:])
range_width = prev_high - recent_low
range_position = _clamp((current - recent_low) / range_width, 0.0, 1.2) if range_width else 0.0
range_position_quality = 2.0 * _clamp(1.0 - abs(range_position - 0.62) / 0.62, 0.0, 1.0) - 1.0
breakout_pct = _safe_pct(current, prev_high)
recent_range = _range_pct(closes[-6:], current)
prior_window = closes[-20:-6] if len(closes) >= 20 else closes[:-6]
prior_range = _range_pct(prior_window, current) if prior_window else recent_range
compression = _clamp(1.0 - (recent_range / prior_range), -1.0, 1.0) if prior_range else 0.0
recent_low_window = min(closes[-5:])
prior_low_window = min(closes[-10:-5]) if len(closes) >= 10 else min(closes[:-5])
higher_lows = 1.0 if recent_low_window > prior_low_window else -1.0
breakout_proximity = _clamp(1.0 - abs(breakout_pct) / 0.03, 0.0, 1.0)
breakout_proximity_quality = 2.0 * breakout_proximity - 1.0
setup_quality = _weighted_quality(
{
"trend": trend_quality,
"compression": compression,
"breakout_proximity": breakout_proximity_quality,
"higher_lows": higher_lows,
"range_position": range_position_quality,
},
model_weights,
)
setup_score = _clamp((setup_quality + 1.0) / 2.0, 0.0, 1.0)
avg_volume = mean(volumes[:-1])
volume_confirmation = volumes[-1] / avg_volume if avg_volume else 1.0
volume_score = _clamp((volume_confirmation - 1.0) / 1.5, -1.0, 1.0)
momentum_3 = _safe_pct(closes[-1], closes[-4])
if momentum_3 <= 0:
controlled_momentum = _clamp(momentum_3 / 0.05, -1.0, 0.0)
elif momentum_3 <= 0.05:
controlled_momentum = momentum_3 / 0.05
elif momentum_3 <= 0.12:
controlled_momentum = 1.0 - ((momentum_3 - 0.05) / 0.07) * 0.5
else:
controlled_momentum = -0.2
fresh_breakout = _clamp(1.0 - abs(breakout_pct) / 0.025, 0.0, 1.0)
fresh_breakout_quality = 2.0 * fresh_breakout - 1.0
trigger_quality = _weighted_quality(
{
"fresh_breakout": fresh_breakout_quality,
"volume": volume_score,
"momentum": controlled_momentum,
},
model_weights,
)
trigger_score = _clamp((trigger_quality + 1.0) / 2.0, 0.0, 1.0)
extension_from_short = _safe_pct(current, sma_short)
recent_runup = _safe_pct(current, closes[-6])
extension_penalty = (
_clamp((extension_from_short - 0.025) / 0.075, 0.0, 1.0)
+ _clamp((recent_runup - 0.08) / 0.12, 0.0, 1.0)
+ _clamp((float(ticker.get("price_change_pct", 0.0)) / 100.0 - 0.12) / 0.18, 0.0, 1.0)
)
volatility = _range_pct(closes[-10:], current)
min_quote_volume = float(opportunity_config.get("min_quote_volume", 0.0))
quote_volume = float(ticker.get("quote_volume") or ticker.get("quoteVolume") or 0.0)
if min_quote_volume > 0 and quote_volume > 0:
liquidity_score = _clamp(log10(max(quote_volume / min_quote_volume, 1.0)) / 2.0, 0.0, 1.0)
else:
liquidity_score = 1.0
liquidity_quality = 2.0 * liquidity_score - 1.0
volatility_quality = 1.0 - 2.0 * _clamp(volatility / 0.12, 0.0, 1.0)
extension_quality = 1.0 - 2.0 * _clamp(extension_penalty / 2.0, 0.0, 1.0)
risk_quality = _weighted_quality(
{
"volatility_penalty": volatility_quality,
"extension_penalty": extension_quality,
},
model_weights,
)
edge_score = _weighted_quality(
{
"setup": setup_quality,
"trigger": trigger_quality,
"liquidity": liquidity_quality,
"trend": trend_quality,
"range_position": range_position_quality,
"volatility_penalty": volatility_quality,
"extension_penalty": extension_quality,
},
model_weights,
)
score = 1.0 + edge_score
metrics = {
"setup_score": round(setup_score, 4),
"trigger_score": round(trigger_score, 4),
"liquidity_score": round(liquidity_score, 4),
"edge_score": round(edge_score, 4),
"setup_quality": round(setup_quality, 4),
"trigger_quality": round(trigger_quality, 4),
"liquidity_quality": round(liquidity_quality, 4),
"risk_quality": round(risk_quality, 4),
"trend_quality": round(trend_quality, 4),
"range_position_quality": round(range_position_quality, 4),
"breakout_proximity_quality": round(breakout_proximity_quality, 4),
"volume_quality": round(volume_score, 4),
"momentum_quality": round(controlled_momentum, 4),
"extension_quality": round(extension_quality, 4),
"volatility_quality": round(volatility_quality, 4),
"extension_penalty": round(extension_penalty, 4),
"compression": round(compression, 4),
"range_position": round(range_position, 4),
"breakout_pct": round(breakout_pct, 4),
"recent_runup": round(recent_runup, 4),
"volume_confirmation": round(volume_confirmation, 4),
"volatility": round(volatility, 4),
"sma_short_distance": round(extension_from_short, 4),
"sma_long_distance": round(_safe_pct(current, sma_long), 4),
}
return score, metrics

View File

@@ -93,3 +93,14 @@ class AccountMarketServicesTestCase(unittest.TestCase):
universe = market_service.get_scan_universe(config, spot_client=FakeSpotClient())
self.assertEqual([item["symbol"] for item in universe], ["BTCUSDT", "ETHUSDT"])
def test_get_positions_can_include_dust(self):
config = {
"market": {"default_quote": "USDT"},
"trading": {"dust_usdt_threshold": 10.0},
}
ignored = account_service.get_positions(config, spot_client=FakeSpotClient())
included = account_service.get_positions(config, spot_client=FakeSpotClient(), ignore_dust=False)
self.assertEqual([item["symbol"] for item in ignored["positions"]], ["USDT", "BTCUSDT"])
self.assertEqual([item["symbol"] for item in included["positions"]], ["USDT", "BTCUSDT", "DOGEUSDT"])

View File

@@ -10,7 +10,7 @@ from coinhunter import cli
class CLITestCase(unittest.TestCase):
def test_help_includes_v2_commands(self):
def test_help_includes_core_commands(self):
parser = cli.build_parser()
help_text = parser.format_help()
self.assertIn("init", help_text)
@@ -18,7 +18,9 @@ class CLITestCase(unittest.TestCase):
self.assertIn("buy", help_text)
self.assertIn("sell", help_text)
self.assertIn("portfolio", help_text)
self.assertIn("opportunity", help_text)
self.assertIn("scan", help_text)
self.assertIn("analyze", help_text)
self.assertIn("watch", help_text)
self.assertIn("--doc", help_text)
def test_init_dispatches(self):
@@ -150,11 +152,11 @@ class CLITestCase(unittest.TestCase):
self.assertEqual(result, 0)
self.assertEqual(captured["payload"]["recommendations"][0]["symbol"], "BTCUSDT")
def test_opportunity_dispatches(self):
def test_scan_dispatches(self):
captured = {}
with (
patch.object(
cli, "load_config", return_value={"binance": {"spot_base_url": "https://test", "recv_window": 5000}, "market": {"default_quote": "USDT"}, "opportunity": {"top_n": 10}}
cli, "load_config", return_value={"binance": {"spot_base_url": "https://test", "recv_window": 5000}, "market": {"default_quote": "USDT"}, "opportunity": {"top_n": 5}}
),
patch.object(cli, "get_binance_credentials", return_value={"api_key": "k", "api_secret": "s"}),
patch.object(cli, "SpotBinanceClient"),
@@ -167,10 +169,52 @@ class CLITestCase(unittest.TestCase):
cli, "print_output", side_effect=lambda payload, **kwargs: captured.setdefault("payload", payload)
),
):
result = cli.main(["opportunity", "-s", "BTCUSDT", "ETHUSDT"])
result = cli.main(["scan", "-s", "BTCUSDT", "ETHUSDT"])
self.assertEqual(result, 0)
self.assertEqual(captured["payload"]["recommendations"][0]["symbol"], "BTCUSDT")
def test_analyze_dispatches(self):
captured = {}
with (
patch.object(
cli, "load_config", return_value={"binance": {"spot_base_url": "https://test", "recv_window": 5000}, "market": {"default_quote": "USDT"}}
),
patch.object(cli, "get_binance_credentials", return_value={"api_key": "k", "api_secret": "s"}),
patch.object(cli, "SpotBinanceClient"),
patch.object(
cli.analyze_service,
"analyze_symbols",
return_value={"analyses": [{"symbol": "BTCUSDT", "summary": "test"}]},
),
patch.object(
cli, "print_output", side_effect=lambda payload, **kwargs: captured.setdefault("payload", payload)
),
):
result = cli.main(["analyze", "BTCUSDT", "ETHUSDT"])
self.assertEqual(result, 0)
self.assertEqual(captured["payload"]["analyses"][0]["symbol"], "BTCUSDT")
def test_watch_dispatches(self):
captured = {}
with (
patch.object(
cli, "load_config", return_value={"binance": {"spot_base_url": "https://test", "recv_window": 5000}, "market": {"default_quote": "USDT"}, "watch": {}}
),
patch.object(cli, "get_binance_credentials", return_value={"api_key": "k", "api_secret": "s"}),
patch.object(cli, "SpotBinanceClient"),
patch.object(
cli.portfolio_service,
"watch_portfolio",
return_value={"watch_results": [{"symbol": "BTCUSDT", "status": "healthy"}], "summary": "1 healthy", "need_review_count": 0, "healthy_count": 1},
),
patch.object(
cli, "print_output", side_effect=lambda payload, **kwargs: captured.setdefault("payload", payload)
),
):
result = cli.main(["watch"])
self.assertEqual(result, 0)
self.assertEqual(captured["payload"]["watch_results"][0]["symbol"], "BTCUSDT")
def test_catlog_dispatches(self):
captured = {}
with (

View File

@@ -5,7 +5,11 @@ from __future__ import annotations
import unittest
from unittest.mock import patch
from coinhunter.services import opportunity_service, portfolio_service, signal_service
from coinhunter.services import (
opportunity_service,
portfolio_service,
signal_service,
)
class FakeSpotClient:
@@ -100,6 +104,139 @@ class FakeSpotClient:
return rows
class DustOverlapSpotClient(FakeSpotClient):
def account_info(self):
return {"balances": [{"asset": "XRP", "free": "5", "locked": "0"}]}
def ticker_price(self, symbols=None):
mapping = {"XRPUSDT": {"symbol": "XRPUSDT", "price": "1.5"}}
return [mapping[symbol] for symbol in symbols]
def ticker_stats(self, symbols=None, *, window="1d"):
rows = {
"XRPUSDT": {
"symbol": "XRPUSDT",
"lastPrice": "1.5",
"priceChangePercent": "10",
"quoteVolume": "5000000",
"highPrice": "1.52",
"lowPrice": "1.2",
}
}
if not symbols:
return list(rows.values())
return [rows[symbol] for symbol in symbols]
def exchange_info(self):
return {"symbols": [{"symbol": "XRPUSDT", "status": "TRADING"}]}
def klines(self, symbol, interval, limit):
rows = []
setup_curve = [
1.4151,
1.4858,
1.3868,
1.5,
1.4009,
1.5142,
1.4151,
1.5,
1.4292,
1.4858,
1.4434,
1.4717,
1.4505,
1.4575,
1.4547,
1.4604,
1.4575,
1.4632,
1.4599,
1.466,
1.4618,
1.4698,
1.4745,
1.5,
]
for index, close in enumerate(setup_curve[-limit:]):
rows.append([index, close * 0.98, close * 1.01, close * 0.97, close, 100 + index * 10, index + 1, close * 100])
return rows
class OpportunityPatternSpotClient:
def account_info(self):
return {"balances": [{"asset": "USDT", "free": "100", "locked": "0"}]}
def ticker_price(self, symbols=None):
return []
def ticker_stats(self, symbols=None, *, window="1d"):
rows = {
"SETUPUSDT": {
"symbol": "SETUPUSDT",
"lastPrice": "106",
"priceChangePercent": "4",
"quoteVolume": "10000000",
"highPrice": "107",
"lowPrice": "98",
},
"CHASEUSDT": {
"symbol": "CHASEUSDT",
"lastPrice": "150",
"priceChangePercent": "18",
"quoteVolume": "9000000",
"highPrice": "152",
"lowPrice": "120",
},
}
if not symbols:
return list(rows.values())
return [rows[symbol] for symbol in symbols]
def exchange_info(self):
return {
"symbols": [
{"symbol": "SETUPUSDT", "status": "TRADING"},
{"symbol": "CHASEUSDT", "status": "TRADING"},
]
}
def klines(self, symbol, interval, limit):
curves = {
"SETUPUSDT": [
100,
105,
98,
106,
99,
107,
100,
106,
101,
105,
102,
104,
102.5,
103,
102.8,
103.2,
103.0,
103.4,
103.1,
103.6,
103.3,
103.8,
104.2,
106,
],
"CHASEUSDT": [120, 125, 130, 135, 140, 145, 150],
}[symbol]
rows = []
for index, close in enumerate(curves[-limit:]):
rows.append([index, close * 0.98, close * 1.01, close * 0.97, close, 100 + index * 20, index + 1, close * 100])
return rows
class OpportunityServiceTestCase(unittest.TestCase):
def setUp(self):
self.config = {
@@ -144,12 +281,80 @@ class OpportunityServiceTestCase(unittest.TestCase):
def test_scan_is_deterministic(self):
with patch.object(opportunity_service, "audit_event", return_value=None):
payload = opportunity_service.scan_opportunities(
self.config | {"opportunity": self.config["opportunity"] | {"top_n": 2}}, spot_client=FakeSpotClient()
self.config | {"opportunity": self.config["opportunity"] | {"top_n": 2}},
spot_client=OpportunityPatternSpotClient(),
)
self.assertEqual([item["symbol"] for item in payload["recommendations"]], ["SOLUSDT", "BTCUSDT"])
self.assertEqual([item["action"] for item in payload["recommendations"]], ["enter", "enter"])
self.assertEqual([item["symbol"] for item in payload["recommendations"]], ["SETUPUSDT", "CHASEUSDT"])
self.assertEqual([item["action"] for item in payload["recommendations"]], ["entry", "avoid"])
self.assertGreater(payload["recommendations"][0]["metrics"]["setup_score"], 0.6)
self.assertGreater(payload["recommendations"][1]["metrics"]["extension_penalty"], 1.0)
def test_scan_respects_ignore_dust_for_overlap_penalty(self):
client = DustOverlapSpotClient()
base_config = self.config | {
"opportunity": self.config["opportunity"] | {
"top_n": 1,
"ignore_dust": True,
"overlap_penalty": 2.0,
}
}
with patch.object(opportunity_service, "audit_event", return_value=None):
ignored = opportunity_service.scan_opportunities(base_config, spot_client=client, symbols=["XRPUSDT"])
included = opportunity_service.scan_opportunities(
base_config | {"opportunity": base_config["opportunity"] | {"ignore_dust": False}},
spot_client=client,
symbols=["XRPUSDT"],
)
ignored_rec = ignored["recommendations"][0]
included_rec = included["recommendations"][0]
self.assertEqual(ignored_rec["action"], "entry")
self.assertEqual(ignored_rec["metrics"]["position_weight"], 0.0)
self.assertEqual(included_rec["action"], "entry")
self.assertEqual(included_rec["metrics"]["position_weight"], 1.0)
self.assertLess(included_rec["score"], ignored_rec["score"])
def test_signal_score_handles_empty_klines(self):
score, metrics = signal_service.score_market_signal([], [], {"price_change_pct": 1.0}, {})
self.assertEqual(score, 0.0)
self.assertEqual(metrics["trend"], 0.0)
def test_weak_setup_and_trigger_becomes_avoid(self):
metrics = {
"extension_penalty": 0.0,
"recent_runup": 0.0,
"breakout_pct": -0.01,
"setup_score": 0.12,
"trigger_score": 0.18,
"edge_score": 0.0,
}
action, reasons, confidence = opportunity_service._action_for_opportunity(
2.5,
metrics,
{
"entry_threshold": 1.5,
"watch_threshold": 0.6,
"min_trigger_score": 0.45,
"min_setup_score": 0.35,
},
)
self.assertEqual(action, "avoid")
self.assertIn("setup, trigger, or overall quality is too weak", reasons[0])
self.assertEqual(confidence, 50)
def test_watch_flags_anomalies(self):
config = self.config | {
"watch": {
"alert_drawdown_1h_pct": -5.0,
"alert_drawdown_24h_pct": -10.0,
"alert_spike_1h_pct": 8.0,
"max_position_weight": 0.5,
}
}
with patch.object(portfolio_service, "audit_event", return_value=None):
payload = portfolio_service.watch_portfolio(config, spot_client=FakeSpotClient())
# FakeSpotClient BTC is +5% 24h, ETH is +3% — both should be healthy
self.assertGreaterEqual(payload["healthy_count"], 1)
for result in payload["watch_results"]:
self.assertIn(result["status"], {"healthy", "need_review"})