Files
coinhunter/scripts/auto_trader.py

278 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Coin Hunter Auto Trader Template
全自动妖币猎人 + 币安执行器
运行前请在 ~/.hermes/.env 配置:
BINANCE_API_KEY=你的API_KEY
BINANCE_API_SECRET=你的API_SECRET
首次运行必须先用 DRY_RUN=true 测试逻辑!
"""
import json
import os
import sys
from datetime import datetime, timezone
from pathlib import Path
import ccxt
# ============== 配置 ==============
COINS_DIR = Path.home() / ".coinhunter"
POSITIONS_FILE = COINS_DIR / "positions.json"
ENV_FILE = Path.home() / ".hermes" / ".env"
# 风控参数
DRY_RUN = os.getenv("DRY_RUN", "true").lower() == "true" # 默认测试模式
MAX_POSITIONS = 2 # 最大同时持仓数
# 资金配置(根据总资产动态计算)
CAPITAL_ALLOCATION_PCT = 0.95 # 用总资产95%玩这个策略留95%缓冲给手续费和滑点)
MIN_POSITION_USDT = 50 # 单次最小下单金额(避免过小)
MIN_VOLUME_24H = 1_000_000 # 最小24h成交额 ($)
MIN_PRICE_CHANGE_24H = 0.05 # 最小涨幅 5%
MAX_PRICE = 1.0 # 只玩低价币meme特征
STOP_LOSS_PCT = -0.07 # 止损 -7%
TAKE_PROFIT_1_PCT = 0.15 # 止盈1 +15%
TAKE_PROFIT_2_PCT = 0.30 # 止盈2 +30%
BLACKLIST = {"USDC", "BUSD", "TUSD", "FDUSD", "USTC", "PAXG", "XRP", "ETH", "BTC"}
# ============== 工具函数 ==============
def log(msg: str):
print(f"[{datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S')} UTC] {msg}")
def load_positions() -> list:
if POSITIONS_FILE.exists():
return json.loads(POSITIONS_FILE.read_text(encoding="utf-8")).get("positions", [])
return []
def save_positions(positions: list):
COINS_DIR.mkdir(parents=True, exist_ok=True)
POSITIONS_FILE.write_text(json.dumps({"positions": positions}, indent=2, ensure_ascii=False), encoding="utf-8")
def load_env():
if ENV_FILE.exists():
for line in ENV_FILE.read_text(encoding="utf-8").splitlines():
line = line.strip()
if line and not line.startswith("#") and "=" in line:
key, val = line.split("=", 1)
os.environ.setdefault(key.strip(), val.strip())
def calculate_position_size(total_usdt: float, available_usdt: float, open_slots: int) -> float:
"""
根据总资产动态计算每次下单金额。
逻辑:先确定策略总上限,再按剩余开仓位均分。
"""
strategy_cap = total_usdt * CAPITAL_ALLOCATION_PCT
used_in_strategy = max(0, strategy_cap - available_usdt)
remaining_strategy_cap = max(0, strategy_cap - used_in_strategy)
if open_slots <= 0 or remaining_strategy_cap < MIN_POSITION_USDT:
return 0
size = remaining_strategy_cap / open_slots
size = min(size, available_usdt)
size = max(0, round(size, 2))
return size if size >= MIN_POSITION_USDT else 0
# ============== 币安客户端 ==============
class BinanceTrader:
def __init__(self):
api_key = os.getenv("BINANCE_API_KEY")
secret = os.getenv("BINANCE_API_SECRET")
if not api_key or not secret:
raise RuntimeError("缺少 BINANCE_API_KEY 或 BINANCE_API_SECRET请配置 ~/.hermes/.env")
self.exchange = ccxt.binance({
"apiKey": api_key,
"secret": secret,
"options": {"defaultType": "spot"},
"enableRateLimit": True,
})
self.exchange.load_markets()
def get_balance(self, asset: str = "USDT") -> float:
bal = self.exchange.fetch_balance()["free"].get(asset, 0)
return float(bal)
def fetch_tickers(self) -> dict:
return self.exchange.fetch_tickers()
def create_market_buy_order(self, symbol: str, amount_usdt: float):
if DRY_RUN:
log(f"[DRY RUN] 模拟买入 {symbol},金额 ${amount_usdt}")
return {"id": "dry-run-buy", "price": None, "amount": amount_usdt}
ticker = self.exchange.fetch_ticker(symbol)
price = float(ticker["last"])
qty = amount_usdt / price
order = self.exchange.create_market_buy_order(symbol, qty)
log(f"✅ 买入 {symbol} | 数量 {qty:.4f} | 价格 ~${price}")
return order
def create_market_sell_order(self, symbol: str, qty: float):
if DRY_RUN:
log(f"[DRY RUN] 模拟卖出 {symbol},数量 {qty}")
return {"id": "dry-run-sell"}
order = self.exchange.create_market_sell_order(symbol, qty)
log(f"✅ 卖出 {symbol} | 数量 {qty:.4f}")
return order
# ============== 选币引擎 ==============
class CoinPicker:
def __init__(self, exchange: ccxt.binance):
self.exchange = exchange
def scan(self) -> list:
tickers = self.exchange.fetch_tickers()
candidates = []
for symbol, t in tickers.items():
if not symbol.endswith("/USDT"):
continue
base = symbol.replace("/USDT", "")
if base in BLACKLIST:
continue
price = float(t["last"] or 0)
change = float(t.get("percentage", 0)) / 100
volume = float(t.get("quoteVolume", 0))
if price <= 0 or price > MAX_PRICE:
continue
if volume < MIN_VOLUME_24H:
continue
if change < MIN_PRICE_CHANGE_24H:
continue
score = change * (volume / MIN_VOLUME_24H)
candidates.append({
"symbol": symbol,
"base": base,
"price": price,
"change_24h": change,
"volume_24h": volume,
"score": score,
})
candidates.sort(key=lambda x: x["score"], reverse=True)
return candidates[:5]
# ============== 主控制器 ==============
def run_cycle():
load_env()
trader = BinanceTrader()
picker = CoinPicker(trader.exchange)
positions = load_positions()
log(f"当前持仓数: {len(positions)} | 最大允许: {MAX_POSITIONS} | DRY_RUN={DRY_RUN}")
# 1. 检查现有持仓(止盈止损)
tickers = trader.fetch_tickers()
new_positions = []
for pos in positions:
sym = pos["symbol"]
qty = float(pos["quantity"])
cost = float(pos["avg_cost"])
# ccxt tickers 使用 slash 格式,如 PENGU/USDT
sym_ccxt = sym.replace("USDT", "/USDT") if "/" not in sym else sym
ticker = tickers.get(sym_ccxt)
if not ticker:
new_positions.append(pos)
continue
price = float(ticker["last"])
pnl_pct = (price - cost) / cost
log(f"监控 {sym} | 现价 ${price:.8f} | 成本 ${cost:.8f} | 盈亏 {pnl_pct:+.2%}")
action = None
if pnl_pct <= STOP_LOSS_PCT:
action = "STOP_LOSS"
elif pnl_pct >= TAKE_PROFIT_2_PCT:
action = "TAKE_PROFIT_2"
elif pnl_pct >= TAKE_PROFIT_1_PCT:
sold_pct = float(pos.get("take_profit_1_sold_pct", 0))
if sold_pct == 0:
action = "TAKE_PROFIT_1"
if action == "STOP_LOSS":
trader.create_market_sell_order(sym, qty)
log(f"🛑 {sym} 触发止损,全部清仓")
continue
if action == "TAKE_PROFIT_1":
sell_qty = qty * 0.5
trader.create_market_sell_order(sym, sell_qty)
pos["quantity"] = qty - sell_qty
pos["take_profit_1_sold_pct"] = 50
pos["updated_at"] = datetime.now(timezone.utc).isoformat()
log(f"🎯 {sym} 触发止盈1卖出50%,剩余 {pos['quantity']:.4f}")
new_positions.append(pos)
continue
if action == "TAKE_PROFIT_2":
trader.create_market_sell_order(sym, float(pos["quantity"]))
log(f"🚀 {sym} 触发止盈2全部清仓")
continue
new_positions.append(pos)
# 2. 开新仓
if len(new_positions) < MAX_POSITIONS:
candidates = picker.scan()
held_bases = {p["base_asset"] for p in new_positions}
total_usdt = trader.get_balance("USDT")
available_usdt = total_usdt
open_slots = MAX_POSITIONS - len(new_positions)
position_size = calculate_position_size(total_usdt, available_usdt, open_slots)
log(f"总资产 USDT: ${total_usdt:.2f} | 策略上限({CAPITAL_ALLOCATION_PCT:.0%}): ${total_usdt*CAPITAL_ALLOCATION_PCT:.2f} | 每仓建议金额: ${position_size:.2f}")
for cand in candidates:
if len(new_positions) >= MAX_POSITIONS:
break
base = cand["base"]
if base in held_bases:
continue
if position_size <= 0:
log("策略资金已用完或余额不足,停止开新仓")
break
symbol = cand["symbol"]
order = trader.create_market_buy_order(symbol, position_size)
avg_price = float(order.get("price") or cand["price"])
qty = position_size / avg_price if avg_price else 0
new_positions.append({
"account_id": "binance-main",
"symbol": symbol.replace("/", ""),
"base_asset": base,
"quote_asset": "USDT",
"market_type": "spot",
"quantity": qty,
"avg_cost": avg_price,
"opened_at": datetime.now(timezone.utc).isoformat(),
"updated_at": datetime.now(timezone.utc).isoformat(),
"note": "Auto-trader entry",
})
held_bases.add(base)
available_usdt -= position_size
position_size = calculate_position_size(total_usdt, available_usdt, MAX_POSITIONS - len(new_positions))
log(f"📈 新开仓 {symbol} | 买入价 ${avg_price:.8f} | 数量 {qty:.2f}")
save_positions(new_positions)
log("周期结束,持仓已保存")
if __name__ == "__main__":
try:
run_cycle()
except Exception as e:
log(f"❌ 错误: {e}")
sys.exit(1)