refactor: rewrite Coin Hunter as short-term trading framework with auto-trading architecture

This commit is contained in:
2026-04-16 03:03:25 +08:00
parent 73a3cb6952
commit a4ba9bb64e
5 changed files with 716 additions and 256 deletions

277
scripts/auto_trader.py Normal file
View File

@@ -0,0 +1,277 @@
#!/usr/bin/env python3
"""
Coin Hunter Auto Trader Template
全自动妖币猎人 + 币安执行器
运行前请在 ~/.hermes/.env 配置:
BINANCE_API_KEY=你的API_KEY
BINANCE_API_SECRET=你的API_SECRET
首次运行必须先用 DRY_RUN=true 测试逻辑!
"""
import json
import os
import sys
from datetime import datetime, timezone
from pathlib import Path
import ccxt
# ============== 配置 ==============
COINS_DIR = Path.home() / ".coinhunter"
POSITIONS_FILE = COINS_DIR / "positions.json"
ENV_FILE = Path.home() / ".hermes" / ".env"
# 风控参数
DRY_RUN = os.getenv("DRY_RUN", "true").lower() == "true" # 默认测试模式
MAX_POSITIONS = 2 # 最大同时持仓数
# 资金配置(根据总资产动态计算)
CAPITAL_ALLOCATION_PCT = 0.95 # 用总资产95%玩这个策略留95%缓冲给手续费和滑点)
MIN_POSITION_USDT = 50 # 单次最小下单金额(避免过小)
MIN_VOLUME_24H = 1_000_000 # 最小24h成交额 ($)
MIN_PRICE_CHANGE_24H = 0.05 # 最小涨幅 5%
MAX_PRICE = 1.0 # 只玩低价币meme特征
STOP_LOSS_PCT = -0.07 # 止损 -7%
TAKE_PROFIT_1_PCT = 0.15 # 止盈1 +15%
TAKE_PROFIT_2_PCT = 0.30 # 止盈2 +30%
BLACKLIST = {"USDC", "BUSD", "TUSD", "FDUSD", "USTC", "PAXG", "XRP", "ETH", "BTC"}
# ============== 工具函数 ==============
def log(msg: str):
print(f"[{datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S')} UTC] {msg}")
def load_positions() -> list:
if POSITIONS_FILE.exists():
return json.loads(POSITIONS_FILE.read_text(encoding="utf-8")).get("positions", [])
return []
def save_positions(positions: list):
COINS_DIR.mkdir(parents=True, exist_ok=True)
POSITIONS_FILE.write_text(json.dumps({"positions": positions}, indent=2, ensure_ascii=False), encoding="utf-8")
def load_env():
if ENV_FILE.exists():
for line in ENV_FILE.read_text(encoding="utf-8").splitlines():
line = line.strip()
if line and not line.startswith("#") and "=" in line:
key, val = line.split("=", 1)
os.environ.setdefault(key.strip(), val.strip())
def calculate_position_size(total_usdt: float, available_usdt: float, open_slots: int) -> float:
"""
根据总资产动态计算每次下单金额。
逻辑:先确定策略总上限,再按剩余开仓位均分。
"""
strategy_cap = total_usdt * CAPITAL_ALLOCATION_PCT
used_in_strategy = max(0, strategy_cap - available_usdt)
remaining_strategy_cap = max(0, strategy_cap - used_in_strategy)
if open_slots <= 0 or remaining_strategy_cap < MIN_POSITION_USDT:
return 0
size = remaining_strategy_cap / open_slots
size = min(size, available_usdt)
size = max(0, round(size, 2))
return size if size >= MIN_POSITION_USDT else 0
# ============== 币安客户端 ==============
class BinanceTrader:
def __init__(self):
api_key = os.getenv("BINANCE_API_KEY")
secret = os.getenv("BINANCE_API_SECRET")
if not api_key or not secret:
raise RuntimeError("缺少 BINANCE_API_KEY 或 BINANCE_API_SECRET请配置 ~/.hermes/.env")
self.exchange = ccxt.binance({
"apiKey": api_key,
"secret": secret,
"options": {"defaultType": "spot"},
"enableRateLimit": True,
})
self.exchange.load_markets()
def get_balance(self, asset: str = "USDT") -> float:
bal = self.exchange.fetch_balance()["free"].get(asset, 0)
return float(bal)
def fetch_tickers(self) -> dict:
return self.exchange.fetch_tickers()
def create_market_buy_order(self, symbol: str, amount_usdt: float):
if DRY_RUN:
log(f"[DRY RUN] 模拟买入 {symbol},金额 ${amount_usdt}")
return {"id": "dry-run-buy", "price": None, "amount": amount_usdt}
ticker = self.exchange.fetch_ticker(symbol)
price = float(ticker["last"])
qty = amount_usdt / price
order = self.exchange.create_market_buy_order(symbol, qty)
log(f"✅ 买入 {symbol} | 数量 {qty:.4f} | 价格 ~${price}")
return order
def create_market_sell_order(self, symbol: str, qty: float):
if DRY_RUN:
log(f"[DRY RUN] 模拟卖出 {symbol},数量 {qty}")
return {"id": "dry-run-sell"}
order = self.exchange.create_market_sell_order(symbol, qty)
log(f"✅ 卖出 {symbol} | 数量 {qty:.4f}")
return order
# ============== 选币引擎 ==============
class CoinPicker:
def __init__(self, exchange: ccxt.binance):
self.exchange = exchange
def scan(self) -> list:
tickers = self.exchange.fetch_tickers()
candidates = []
for symbol, t in tickers.items():
if not symbol.endswith("/USDT"):
continue
base = symbol.replace("/USDT", "")
if base in BLACKLIST:
continue
price = float(t["last"] or 0)
change = float(t.get("percentage", 0)) / 100
volume = float(t.get("quoteVolume", 0))
if price <= 0 or price > MAX_PRICE:
continue
if volume < MIN_VOLUME_24H:
continue
if change < MIN_PRICE_CHANGE_24H:
continue
score = change * (volume / MIN_VOLUME_24H)
candidates.append({
"symbol": symbol,
"base": base,
"price": price,
"change_24h": change,
"volume_24h": volume,
"score": score,
})
candidates.sort(key=lambda x: x["score"], reverse=True)
return candidates[:5]
# ============== 主控制器 ==============
def run_cycle():
load_env()
trader = BinanceTrader()
picker = CoinPicker(trader.exchange)
positions = load_positions()
log(f"当前持仓数: {len(positions)} | 最大允许: {MAX_POSITIONS} | DRY_RUN={DRY_RUN}")
# 1. 检查现有持仓(止盈止损)
tickers = trader.fetch_tickers()
new_positions = []
for pos in positions:
sym = pos["symbol"]
qty = float(pos["quantity"])
cost = float(pos["avg_cost"])
# ccxt tickers 使用 slash 格式,如 PENGU/USDT
sym_ccxt = sym.replace("USDT", "/USDT") if "/" not in sym else sym
ticker = tickers.get(sym_ccxt)
if not ticker:
new_positions.append(pos)
continue
price = float(ticker["last"])
pnl_pct = (price - cost) / cost
log(f"监控 {sym} | 现价 ${price:.8f} | 成本 ${cost:.8f} | 盈亏 {pnl_pct:+.2%}")
action = None
if pnl_pct <= STOP_LOSS_PCT:
action = "STOP_LOSS"
elif pnl_pct >= TAKE_PROFIT_2_PCT:
action = "TAKE_PROFIT_2"
elif pnl_pct >= TAKE_PROFIT_1_PCT:
sold_pct = float(pos.get("take_profit_1_sold_pct", 0))
if sold_pct == 0:
action = "TAKE_PROFIT_1"
if action == "STOP_LOSS":
trader.create_market_sell_order(sym, qty)
log(f"🛑 {sym} 触发止损,全部清仓")
continue
if action == "TAKE_PROFIT_1":
sell_qty = qty * 0.5
trader.create_market_sell_order(sym, sell_qty)
pos["quantity"] = qty - sell_qty
pos["take_profit_1_sold_pct"] = 50
pos["updated_at"] = datetime.now(timezone.utc).isoformat()
log(f"🎯 {sym} 触发止盈1卖出50%,剩余 {pos['quantity']:.4f}")
new_positions.append(pos)
continue
if action == "TAKE_PROFIT_2":
trader.create_market_sell_order(sym, float(pos["quantity"]))
log(f"🚀 {sym} 触发止盈2全部清仓")
continue
new_positions.append(pos)
# 2. 开新仓
if len(new_positions) < MAX_POSITIONS:
candidates = picker.scan()
held_bases = {p["base_asset"] for p in new_positions}
total_usdt = trader.get_balance("USDT")
available_usdt = total_usdt
open_slots = MAX_POSITIONS - len(new_positions)
position_size = calculate_position_size(total_usdt, available_usdt, open_slots)
log(f"总资产 USDT: ${total_usdt:.2f} | 策略上限({CAPITAL_ALLOCATION_PCT:.0%}): ${total_usdt*CAPITAL_ALLOCATION_PCT:.2f} | 每仓建议金额: ${position_size:.2f}")
for cand in candidates:
if len(new_positions) >= MAX_POSITIONS:
break
base = cand["base"]
if base in held_bases:
continue
if position_size <= 0:
log("策略资金已用完或余额不足,停止开新仓")
break
symbol = cand["symbol"]
order = trader.create_market_buy_order(symbol, position_size)
avg_price = float(order.get("price") or cand["price"])
qty = position_size / avg_price if avg_price else 0
new_positions.append({
"account_id": "binance-main",
"symbol": symbol.replace("/", ""),
"base_asset": base,
"quote_asset": "USDT",
"market_type": "spot",
"quantity": qty,
"avg_cost": avg_price,
"opened_at": datetime.now(timezone.utc).isoformat(),
"updated_at": datetime.now(timezone.utc).isoformat(),
"note": "Auto-trader entry",
})
held_bases.add(base)
available_usdt -= position_size
position_size = calculate_position_size(total_usdt, available_usdt, MAX_POSITIONS - len(new_positions))
log(f"📈 新开仓 {symbol} | 买入价 ${avg_price:.8f} | 数量 {qty:.2f}")
save_positions(new_positions)
log("周期结束,持仓已保存")
if __name__ == "__main__":
try:
run_cycle()
except Exception as e:
log(f"❌ 错误: {e}")
sys.exit(1)