#!/usr/bin/env python3 """ Coin Hunter Auto Trader Template 全自动妖币猎人 + 币安执行器 运行前请在 ~/.hermes/.env 配置: BINANCE_API_KEY=你的API_KEY BINANCE_API_SECRET=你的API_SECRET 首次运行必须先用 DRY_RUN=true 测试逻辑! """ import json import os import sys from datetime import datetime, timezone from pathlib import Path import ccxt # ============== 配置 ============== COINS_DIR = Path.home() / ".coinhunter" POSITIONS_FILE = COINS_DIR / "positions.json" ENV_FILE = Path.home() / ".hermes" / ".env" # 风控参数 DRY_RUN = os.getenv("DRY_RUN", "true").lower() == "true" # 默认测试模式 MAX_POSITIONS = 2 # 最大同时持仓数 # 资金配置(根据总资产动态计算) CAPITAL_ALLOCATION_PCT = 0.95 # 用总资产95%玩这个策略(留95%缓冲给手续费和滑点) MIN_POSITION_USDT = 50 # 单次最小下单金额(避免过小) MIN_VOLUME_24H = 1_000_000 # 最小24h成交额 ($) MIN_PRICE_CHANGE_24H = 0.05 # 最小涨幅 5% MAX_PRICE = 1.0 # 只玩低价币(meme特征) STOP_LOSS_PCT = -0.07 # 止损 -7% TAKE_PROFIT_1_PCT = 0.15 # 止盈1 +15% TAKE_PROFIT_2_PCT = 0.30 # 止盈2 +30% BLACKLIST = {"USDC", "BUSD", "TUSD", "FDUSD", "USTC", "PAXG", "XRP", "ETH", "BTC"} # ============== 工具函数 ============== def log(msg: str): print(f"[{datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S')} UTC] {msg}") def load_positions() -> list: if POSITIONS_FILE.exists(): return json.loads(POSITIONS_FILE.read_text(encoding="utf-8")).get("positions", []) return [] def save_positions(positions: list): COINS_DIR.mkdir(parents=True, exist_ok=True) POSITIONS_FILE.write_text(json.dumps({"positions": positions}, indent=2, ensure_ascii=False), encoding="utf-8") def load_env(): if ENV_FILE.exists(): for line in ENV_FILE.read_text(encoding="utf-8").splitlines(): line = line.strip() if line and not line.startswith("#") and "=" in line: key, val = line.split("=", 1) os.environ.setdefault(key.strip(), val.strip()) def calculate_position_size(total_usdt: float, available_usdt: float, open_slots: int) -> float: """ 根据总资产动态计算每次下单金额。 逻辑:先确定策略总上限,再按剩余开仓位均分。 """ strategy_cap = total_usdt * CAPITAL_ALLOCATION_PCT used_in_strategy = max(0, strategy_cap - available_usdt) remaining_strategy_cap = max(0, strategy_cap - used_in_strategy) if open_slots <= 0 or remaining_strategy_cap < MIN_POSITION_USDT: return 0 size = remaining_strategy_cap / open_slots size = min(size, available_usdt) size = max(0, round(size, 2)) return size if size >= MIN_POSITION_USDT else 0 # ============== 币安客户端 ============== class BinanceTrader: def __init__(self): api_key = os.getenv("BINANCE_API_KEY") secret = os.getenv("BINANCE_API_SECRET") if not api_key or not secret: raise RuntimeError("缺少 BINANCE_API_KEY 或 BINANCE_API_SECRET,请配置 ~/.hermes/.env") self.exchange = ccxt.binance({ "apiKey": api_key, "secret": secret, "options": {"defaultType": "spot"}, "enableRateLimit": True, }) self.exchange.load_markets() def get_balance(self, asset: str = "USDT") -> float: bal = self.exchange.fetch_balance()["free"].get(asset, 0) return float(bal) def fetch_tickers(self) -> dict: return self.exchange.fetch_tickers() def create_market_buy_order(self, symbol: str, amount_usdt: float): if DRY_RUN: log(f"[DRY RUN] 模拟买入 {symbol},金额 ${amount_usdt}") return {"id": "dry-run-buy", "price": None, "amount": amount_usdt} ticker = self.exchange.fetch_ticker(symbol) price = float(ticker["last"]) qty = amount_usdt / price order = self.exchange.create_market_buy_order(symbol, qty) log(f"✅ 买入 {symbol} | 数量 {qty:.4f} | 价格 ~${price}") return order def create_market_sell_order(self, symbol: str, qty: float): if DRY_RUN: log(f"[DRY RUN] 模拟卖出 {symbol},数量 {qty}") return {"id": "dry-run-sell"} order = self.exchange.create_market_sell_order(symbol, qty) log(f"✅ 卖出 {symbol} | 数量 {qty:.4f}") return order # ============== 选币引擎 ============== class CoinPicker: def __init__(self, exchange: ccxt.binance): self.exchange = exchange def scan(self) -> list: tickers = self.exchange.fetch_tickers() candidates = [] for symbol, t in tickers.items(): if not symbol.endswith("/USDT"): continue base = symbol.replace("/USDT", "") if base in BLACKLIST: continue price = float(t["last"] or 0) change = float(t.get("percentage", 0)) / 100 volume = float(t.get("quoteVolume", 0)) if price <= 0 or price > MAX_PRICE: continue if volume < MIN_VOLUME_24H: continue if change < MIN_PRICE_CHANGE_24H: continue score = change * (volume / MIN_VOLUME_24H) candidates.append({ "symbol": symbol, "base": base, "price": price, "change_24h": change, "volume_24h": volume, "score": score, }) candidates.sort(key=lambda x: x["score"], reverse=True) return candidates[:5] # ============== 主控制器 ============== def run_cycle(): load_env() trader = BinanceTrader() picker = CoinPicker(trader.exchange) positions = load_positions() log(f"当前持仓数: {len(positions)} | 最大允许: {MAX_POSITIONS} | DRY_RUN={DRY_RUN}") # 1. 检查现有持仓(止盈止损) tickers = trader.fetch_tickers() new_positions = [] for pos in positions: sym = pos["symbol"] qty = float(pos["quantity"]) cost = float(pos["avg_cost"]) # ccxt tickers 使用 slash 格式,如 PENGU/USDT sym_ccxt = sym.replace("USDT", "/USDT") if "/" not in sym else sym ticker = tickers.get(sym_ccxt) if not ticker: new_positions.append(pos) continue price = float(ticker["last"]) pnl_pct = (price - cost) / cost log(f"监控 {sym} | 现价 ${price:.8f} | 成本 ${cost:.8f} | 盈亏 {pnl_pct:+.2%}") action = None if pnl_pct <= STOP_LOSS_PCT: action = "STOP_LOSS" elif pnl_pct >= TAKE_PROFIT_2_PCT: action = "TAKE_PROFIT_2" elif pnl_pct >= TAKE_PROFIT_1_PCT: sold_pct = float(pos.get("take_profit_1_sold_pct", 0)) if sold_pct == 0: action = "TAKE_PROFIT_1" if action == "STOP_LOSS": trader.create_market_sell_order(sym, qty) log(f"🛑 {sym} 触发止损,全部清仓") continue if action == "TAKE_PROFIT_1": sell_qty = qty * 0.5 trader.create_market_sell_order(sym, sell_qty) pos["quantity"] = qty - sell_qty pos["take_profit_1_sold_pct"] = 50 pos["updated_at"] = datetime.now(timezone.utc).isoformat() log(f"🎯 {sym} 触发止盈1,卖出50%,剩余 {pos['quantity']:.4f}") new_positions.append(pos) continue if action == "TAKE_PROFIT_2": trader.create_market_sell_order(sym, float(pos["quantity"])) log(f"🚀 {sym} 触发止盈2,全部清仓") continue new_positions.append(pos) # 2. 开新仓 if len(new_positions) < MAX_POSITIONS: candidates = picker.scan() held_bases = {p["base_asset"] for p in new_positions} total_usdt = trader.get_balance("USDT") available_usdt = total_usdt open_slots = MAX_POSITIONS - len(new_positions) position_size = calculate_position_size(total_usdt, available_usdt, open_slots) log(f"总资产 USDT: ${total_usdt:.2f} | 策略上限({CAPITAL_ALLOCATION_PCT:.0%}): ${total_usdt*CAPITAL_ALLOCATION_PCT:.2f} | 每仓建议金额: ${position_size:.2f}") for cand in candidates: if len(new_positions) >= MAX_POSITIONS: break base = cand["base"] if base in held_bases: continue if position_size <= 0: log("策略资金已用完或余额不足,停止开新仓") break symbol = cand["symbol"] order = trader.create_market_buy_order(symbol, position_size) avg_price = float(order.get("price") or cand["price"]) qty = position_size / avg_price if avg_price else 0 new_positions.append({ "account_id": "binance-main", "symbol": symbol.replace("/", ""), "base_asset": base, "quote_asset": "USDT", "market_type": "spot", "quantity": qty, "avg_cost": avg_price, "opened_at": datetime.now(timezone.utc).isoformat(), "updated_at": datetime.now(timezone.utc).isoformat(), "note": "Auto-trader entry", }) held_bases.add(base) available_usdt -= position_size position_size = calculate_position_size(total_usdt, available_usdt, MAX_POSITIONS - len(new_positions)) log(f"📈 新开仓 {symbol} | 买入价 ${avg_price:.8f} | 数量 {qty:.2f}") save_positions(new_positions) log("周期结束,持仓已保存") if __name__ == "__main__": try: run_cycle() except Exception as e: log(f"❌ 错误: {e}") sys.exit(1)