feat: add multi-market analysis and sqlite-backed reporting

This commit is contained in:
root
2026-03-31 01:03:03 +08:00
parent dbdaca6f11
commit 5e467494e6
5 changed files with 1281 additions and 432 deletions

View File

@@ -15,12 +15,37 @@ import sys
import json
import argparse
import time
import hashlib
import urllib.request
import urllib.error
from datetime import datetime, timedelta
from pathlib import Path
try:
from db import (
ANALYSIS_CACHE_TTL_SECONDS,
clear_analysis_cache,
get_cached_analysis,
get_kline_df,
get_latest_kline_date,
init_db,
set_cached_analysis,
upsert_kline_df,
upsert_watchlist_item,
)
except ImportError:
sys.path.insert(0, str(Path(__file__).resolve().parent))
from db import (
ANALYSIS_CACHE_TTL_SECONDS,
clear_analysis_cache,
get_cached_analysis,
get_kline_df,
get_latest_kline_date,
init_db,
set_cached_analysis,
upsert_kline_df,
upsert_watchlist_item,
)
try:
import numpy as np
except ImportError:
@@ -38,90 +63,98 @@ except ImportError:
# 缓存与重试机制
# ─────────────────────────────────────────────
DATA_DIR = Path.home() / ".stockbuddy"
CACHE_DIR = DATA_DIR / "cache"
CACHE_TTL_SECONDS = 600 # 缓存有效期 10 分钟
LEGACY_CACHE_DIR = Path.home() / ".stock_buddy_cache"
MAX_RETRIES = 3
RETRY_BASE_DELAY = 2
def _cache_key(code: str, period: str) -> str:
"""生成缓存文件名"""
key = f"{code}_{period}"
return hashlib.md5(key.encode()).hexdigest() + ".json"
def _read_cache(code: str, period: str) -> dict | None:
"""读取缓存"""
cache_file = CACHE_DIR / _cache_key(code, period)
if not cache_file.exists():
legacy_cache_file = LEGACY_CACHE_DIR / _cache_key(code, period)
if legacy_cache_file.exists():
try:
DATA_DIR.mkdir(parents=True, exist_ok=True)
CACHE_DIR.mkdir(parents=True, exist_ok=True)
cache_file.write_text(
legacy_cache_file.read_text(encoding="utf-8"),
encoding="utf-8",
)
except OSError:
cache_file = legacy_cache_file
if not cache_file.exists():
return None
try:
with open(cache_file, "r", encoding="utf-8") as f:
cached = json.load(f)
cached_time = datetime.fromisoformat(cached.get("analysis_time", ""))
if (datetime.now() - cached_time).total_seconds() < CACHE_TTL_SECONDS:
cached["_from_cache"] = True
return cached
except (json.JSONDecodeError, ValueError, KeyError):
pass
return None
def _write_cache(code: str, period: str, data: dict):
"""写入缓存"""
DATA_DIR.mkdir(parents=True, exist_ok=True)
CACHE_DIR.mkdir(parents=True, exist_ok=True)
cache_file = CACHE_DIR / _cache_key(code, period)
try:
with open(cache_file, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2, default=str)
except OSError:
pass
ANALYSIS_CACHE_TTL = ANALYSIS_CACHE_TTL_SECONDS
# ─────────────────────────────────────────────
# 腾讯财经数据获取
# ─────────────────────────────────────────────
def normalize_hk_code(code: str) -> tuple[str, str]:
"""标准化港股代码,返回 (原始数字代码, 带.HK后缀代码)"""
code = code.strip().upper().replace(".HK", "")
digits = code.lstrip("0")
if digits.isdigit():
numeric_code = code.zfill(4)
return numeric_code, numeric_code + ".HK"
return code, code + ".HK"
def normalize_stock_code(code: str) -> dict:
"""标准化股票代码,支持港股/A股/美股。"""
raw = code.strip().upper()
if raw.endswith('.HK'):
digits = raw[:-3].lstrip('0') or '0'
return {
'market': 'HK',
'code': digits.zfill(4) + '.HK',
'tencent_symbol': 'hk' + digits.zfill(5),
'exchange': 'HKEX',
}
if raw.startswith(('SH', 'SZ')) and len(raw) == 8 and raw[2:].isdigit():
market = raw[:2]
return {
'market': market,
'code': raw,
'tencent_symbol': raw.lower(),
'exchange': 'SSE' if market == 'SH' else 'SZSE',
}
if raw.endswith('.US'):
symbol = raw[:-3]
return {
'market': 'US',
'code': symbol,
'tencent_symbol': 'us' + symbol,
'exchange': 'US',
}
if raw.startswith('US.'):
symbol = raw[3:]
return {
'market': 'US',
'code': symbol,
'tencent_symbol': 'us' + symbol,
'exchange': 'US',
}
if raw.isdigit():
if len(raw) <= 5:
digits = raw.lstrip('0') or '0'
return {
'market': 'HK',
'code': digits.zfill(4) + '.HK',
'tencent_symbol': 'hk' + digits.zfill(5),
'exchange': 'HKEX',
}
if len(raw) == 6:
market = 'SH' if raw.startswith(('5', '6', '9')) else 'SZ'
return {
'market': market,
'code': market + raw,
'tencent_symbol': (market + raw).lower(),
'exchange': 'SSE' if market == 'SH' else 'SZSE',
}
symbol = raw.replace('.', '').replace('-', '')
return {
'market': 'US',
'code': symbol,
'tencent_symbol': 'us' + symbol,
'exchange': 'US',
}
def fetch_tencent_quote(code: str) -> dict:
"""获取腾讯财经实时行情"""
numeric_code, full_code = normalize_hk_code(code)
url = f"http://qt.gtimg.cn/q=hk{numeric_code}"
stock = normalize_stock_code(code)
symbol = stock['tencent_symbol']
url = f"http://qt.gtimg.cn/q={symbol}"
for attempt in range(MAX_RETRIES):
try:
req = urllib.request.Request(url, headers={
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Referer': 'https://gu.qq.com/',
})
with urllib.request.urlopen(req, timeout=10) as response:
data = response.read().decode("gb2312", errors="ignore")
return _parse_tencent_quote(data, numeric_code)
data = response.read().decode('gb2312', errors='ignore')
return _parse_tencent_quote(data, symbol, stock)
except urllib.error.URLError as e:
if attempt < MAX_RETRIES - 1:
time.sleep(RETRY_BASE_DELAY * (attempt + 1))
@@ -130,9 +163,9 @@ def fetch_tencent_quote(code: str) -> dict:
return {}
def _parse_tencent_quote(data: str, code: str) -> dict:
def _parse_tencent_quote(data: str, symbol: str, stock: dict) -> dict:
"""解析腾讯财经实时行情响应"""
var_name = f"v_hk{code}"
var_name = f"v_{symbol}"
for line in data.strip().split(";"):
line = line.strip()
if not line or var_name not in line:
@@ -158,40 +191,53 @@ def _parse_tencent_quote(data: str, code: str) -> dict:
# 0:市场 1:名称 2:代码 3:现价 4:昨收 5:今开 6:成交量
# 30:时间戳 31:涨跌额 32:涨跌幅 33:最高 34:最低
# 39:市盈率 47:市净率 37:总市值 48:52周高 49:52周低
market = stock['market']
currency = 'HKD' if market == 'HK' else ('CNY' if market in ('SH', 'SZ') else safe_str(35, 'USD') or 'USD')
pb_idx = 47 if market in ('HK', 'US') else 46
market_cap_idx = 37 if market == 'HK' else (57 if market in ('SH', 'SZ') else 44)
high_52_idx = 48 if market in ('HK', 'US') else 41
low_52_idx = 49 if market in ('HK', 'US') else 42
return {
"name": values[1],
"code": values[2],
"price": safe_float(3),
"prev_close": safe_float(4),
"open": safe_float(5),
"volume": safe_float(6),
"high": safe_float(33),
"low": safe_float(34),
"change_amount": safe_float(31),
"change_pct": safe_float(32),
"timestamp": safe_str(30),
"pe": safe_float(39) if len(values) > 39 else None,
"pb": safe_float(47) if len(values) > 47 else None,
"market_cap": safe_str(37),
"52w_high": safe_float(48) if len(values) > 48 else None,
"52w_low": safe_float(49) if len(values) > 49 else None,
'name': values[1],
'code': stock['code'],
'market': market,
'exchange': stock.get('exchange'),
'tencent_symbol': symbol,
'price': safe_float(3),
'prev_close': safe_float(4),
'open': safe_float(5),
'volume': safe_float(6),
'high': safe_float(33),
'low': safe_float(34),
'change_amount': safe_float(31),
'change_pct': safe_float(32),
'timestamp': safe_str(30),
'currency': currency,
'pe': safe_float(39) if len(values) > 39 else None,
'pb': safe_float(pb_idx) if len(values) > pb_idx else None,
'market_cap': safe_str(market_cap_idx),
'52w_high': safe_float(high_52_idx) if len(values) > high_52_idx else None,
'52w_low': safe_float(low_52_idx) if len(values) > low_52_idx else None,
'raw_code': safe_str(2),
}
return {}
def fetch_tencent_kline(code: str, days: int = 120) -> pd.DataFrame:
"""获取腾讯财经K线数据"""
numeric_code, full_code = normalize_hk_code(code)
url = f"https://web.ifzq.gtimg.cn/appstock/app/fqkline/get?param=hk{numeric_code},day,,,{days},qfq"
stock = normalize_stock_code(code)
symbol = stock['tencent_symbol']
url = f"https://web.ifzq.gtimg.cn/appstock/app/fqkline/get?param={symbol},day,,,{days},qfq"
for attempt in range(MAX_RETRIES):
try:
req = urllib.request.Request(url, headers={
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Referer': 'https://gu.qq.com/',
})
with urllib.request.urlopen(req, timeout=15) as response:
data = json.loads(response.read().decode("utf-8"))
return _parse_tencent_kline(data, numeric_code)
data = json.loads(response.read().decode('utf-8'))
return _parse_tencent_kline(data, symbol)
except (urllib.error.URLError, json.JSONDecodeError) as e:
if attempt < MAX_RETRIES - 1:
time.sleep(RETRY_BASE_DELAY * (attempt + 1))
@@ -200,33 +246,78 @@ def fetch_tencent_kline(code: str, days: int = 120) -> pd.DataFrame:
return pd.DataFrame()
def _parse_tencent_kline(data: dict, code: str) -> pd.DataFrame:
def _parse_tencent_kline(data: dict, symbol: str) -> pd.DataFrame:
"""解析腾讯财经K线数据"""
key = f"hk{code}"
if data.get("code") != 0 or not data.get("data") or key not in data["data"]:
if data.get('code') != 0 or not data.get('data') or symbol not in data['data']:
return pd.DataFrame()
day_data = data["data"][key].get("day", [])
symbol_data = data['data'][symbol]
day_data = symbol_data.get('day') or symbol_data.get('qfqday') or symbol_data.get('hfqday') or []
if not day_data:
return pd.DataFrame()
# 格式: [日期, 开盘价, 收盘价, 最低价, 最高价, 成交量]
records = []
for item in day_data:
if len(item) >= 6:
records.append({
"Date": item[0],
"Open": float(item[1]),
"Close": float(item[2]),
"Low": float(item[3]),
"High": float(item[4]),
"Volume": float(item[5]),
'Date': item[0],
'Open': float(item[1]),
'Close': float(item[2]),
'Low': float(item[3]),
'High': float(item[4]),
'Volume': float(item[5]),
})
df = pd.DataFrame(records)
if not df.empty:
df["Date"] = pd.to_datetime(df["Date"])
df.set_index("Date", inplace=True)
df['Date'] = pd.to_datetime(df['Date'])
df.set_index('Date', inplace=True)
return df
def fetch_us_kline_yahoo(symbol: str, period: str = '6mo') -> pd.DataFrame:
range_map = {
'1mo': '1mo',
'3mo': '3mo',
'6mo': '6mo',
'1y': '1y',
'2y': '2y',
'5y': '5y',
}
url = f"https://query1.finance.yahoo.com/v8/finance/chart/{symbol}?range={range_map.get(period, '6mo')}&interval=1d&includePrePost=false"
req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'})
with urllib.request.urlopen(req, timeout=20) as response:
data = json.loads(response.read().decode('utf-8'))
result = data.get('chart', {}).get('result', [])
if not result:
return pd.DataFrame()
result = result[0]
timestamps = result.get('timestamp') or []
quote = (result.get('indicators', {}).get('quote') or [{}])[0]
opens = quote.get('open') or []
highs = quote.get('high') or []
lows = quote.get('low') or []
closes = quote.get('close') or []
volumes = quote.get('volume') or []
records = []
for i, ts in enumerate(timestamps):
if i >= len(opens) or opens[i] is None or closes[i] is None or highs[i] is None or lows[i] is None:
continue
records.append({
'Date': datetime.fromtimestamp(ts).strftime('%Y-%m-%d'),
'Open': float(opens[i]),
'Close': float(closes[i]),
'Low': float(lows[i]),
'High': float(highs[i]),
'Volume': float(volumes[i] or 0),
})
df = pd.DataFrame(records)
if not df.empty:
df['Date'] = pd.to_datetime(df['Date'])
df.set_index('Date', inplace=True)
return df
@@ -243,6 +334,42 @@ def period_to_days(period: str) -> int:
return mapping.get(period, 180)
def min_kline_points(required_days: int) -> int:
return 20 if required_days <= 30 else 30
def refresh_kline_cache(code: str, required_days: int, period: str = '6mo') -> pd.DataFrame:
"""使用 SQLite 保存日线数据,并按需增量刷新。"""
stock = normalize_stock_code(code)
buffer_days = 30
latest_date = get_latest_kline_date(code)
fetch_days = max(required_days + buffer_days, 60)
if latest_date:
latest_dt = datetime.strptime(latest_date, "%Y-%m-%d")
missing_days = max((datetime.now() - latest_dt).days, 0)
if missing_days <= 2:
fetch_days = min(fetch_days, 60)
else:
fetch_days = max(missing_days + buffer_days, 60)
fetched = fetch_tencent_kline(code, fetch_days)
if stock['market'] == 'US' and len(fetched) <= 2:
fetched = fetch_us_kline_yahoo(stock['code'], period)
if not fetched.empty:
upsert_kline_df(code, fetched, source='yahoo' if stock['market'] == 'US' and len(fetched) > 2 else 'tencent')
hist = get_kline_df(code, required_days + buffer_days)
if len(hist) < min_kline_points(required_days):
fallback = fetch_tencent_kline(code, required_days + buffer_days)
if stock['market'] == 'US' and len(fallback) <= 2:
fallback = fetch_us_kline_yahoo(stock['code'], period)
if not fallback.empty:
upsert_kline_df(code, fallback, source='yahoo' if stock['market'] == 'US' and len(fallback) > 2 else 'tencent')
hist = get_kline_df(code, required_days + buffer_days)
return hist
# ─────────────────────────────────────────────
# 技术指标计算 (保持不变)
# ─────────────────────────────────────────────
@@ -454,30 +581,21 @@ def calc_ma_trend(close: pd.Series) -> dict:
def get_fundamentals(quote: dict) -> dict:
"""基于实时行情数据的基本面分析"""
result = {}
# 估值指标 (腾讯提供的)
pe = quote.get("pe")
pb = quote.get("pb")
result["PE"] = round(pe, 2) if pe else None
result["PB"] = round(pb, 2) if pb else None
result["PS"] = None # 腾讯不提供
# 市值
result["market_cap"] = quote.get("market_cap", "")
# 52周价格区间
result["52w_high"] = quote.get("52w_high")
result["52w_low"] = quote.get("52w_low")
# 公司信息
result["company_name"] = quote.get("name", "未知")
result["sector"] = "港股"
result["industry"] = "港股"
result["currency"] = "HKD"
# 基本面信号
result["fundamental_signal"] = _fundamental_signal(result)
pe = quote.get('pe')
pb = quote.get('pb')
result['PE'] = round(pe, 2) if pe else None
result['PB'] = round(pb, 2) if pb else None
result['PS'] = None
result['market_cap'] = quote.get('market_cap', '')
result['52w_high'] = quote.get('52w_high')
result['52w_low'] = quote.get('52w_low')
result['company_name'] = quote.get('name', '未知')
result['sector'] = quote.get('market', '未知市场')
result['industry'] = quote.get('exchange') or quote.get('market', '未知')
result['currency'] = quote.get('currency', 'N/A')
result['market'] = quote.get('market', 'N/A')
result['fundamental_signal'] = _fundamental_signal(result)
return result
@@ -523,125 +641,229 @@ def _fundamental_signal(data: dict) -> str:
# 综合评分与建议
# ─────────────────────────────────────────────
def generate_recommendation(technical: dict, fundamental: dict, current_price: float) -> dict:
"""综合技术面和基本面给出操作建议"""
score = 0
signals = []
MARKET_PROFILES = {
"HK": {"technical": 0.62, "fundamental": 0.38, "risk_penalty": 1.0},
"SH": {"technical": 0.58, "fundamental": 0.42, "risk_penalty": 0.9},
"SZ": {"technical": 0.60, "fundamental": 0.40, "risk_penalty": 1.0},
"US": {"technical": 0.55, "fundamental": 0.45, "risk_penalty": 0.85},
}
# 技术面评分
def clamp(value: float, low: float, high: float) -> float:
return max(low, min(high, value))
def detect_market_regime(hist: pd.DataFrame, technical: dict, quote: dict) -> dict:
close = hist["Close"]
ma20 = close.rolling(20).mean().iloc[-1] if len(close) >= 20 else close.iloc[-1]
ma60 = close.rolling(60).mean().iloc[-1] if len(close) >= 60 else ma20
current = close.iloc[-1]
rsi12 = technical.get("rsi", {}).get("RSI12", technical.get("rsi", {}).get("RSI6", 50))
high_52w = quote.get("52w_high")
low_52w = quote.get("52w_low")
pos_52w = None
if high_52w and low_52w and high_52w != low_52w:
pos_52w = (current - low_52w) / (high_52w - low_52w)
if current > ma20 > ma60 and rsi12 >= 55:
regime = "趋势延续"
elif rsi12 <= 35 and technical.get("kdj", {}).get("J", 50) < 20:
regime = "超跌反弹"
elif pos_52w is not None and pos_52w > 0.85 and rsi12 >= 68:
regime = "高位风险"
elif abs(current / ma20 - 1) < 0.03 and 40 <= rsi12 <= 60:
regime = "区间震荡"
else:
regime = "估值修复/等待确认"
return {"regime": regime, "position_52w": round(pos_52w, 4) if pos_52w is not None else None}
def compute_layer_scores(hist: pd.DataFrame, technical: dict, fundamental: dict, quote: dict) -> dict:
close = hist["Close"]
current = close.iloc[-1]
ret_5 = (current / close.iloc[-6] - 1) if len(close) > 5 else 0
ret_20 = (current / close.iloc[-21] - 1) if len(close) > 20 else ret_5
ma = technical.get("ma_trend", {})
above = ma.get("price_above_ma_count", "0/1").split("/")
above_ratio = (int(above[0]) / max(int(above[1]), 1)) if len(above) == 2 else 0
macd_sig = technical.get("macd", {}).get("signal", "")
if "买入" in macd_sig or "金叉" in macd_sig:
score += 2
signals.append(f"MACD: {macd_sig}")
elif "卖出" in macd_sig or "死叉" in macd_sig:
score -= 2
signals.append(f"MACD: {macd_sig}")
elif "多头" in macd_sig:
score += 1
signals.append(f"MACD: {macd_sig}")
elif "空头" in macd_sig:
score -= 1
signals.append(f"MACD: {macd_sig}")
rsi_sig = technical.get("rsi", {}).get("signal", "")
if "超卖" in rsi_sig:
score += 2
signals.append(f"RSI: {rsi_sig}")
elif "超买" in rsi_sig:
score -= 2
signals.append(f"RSI: {rsi_sig}")
kdj_sig = technical.get("kdj", {}).get("signal", "")
if "买入" in kdj_sig or "金叉" in kdj_sig:
score += 1
signals.append(f"KDJ: {kdj_sig}")
elif "卖出" in kdj_sig or "死叉" in kdj_sig:
score -= 1
signals.append(f"KDJ: {kdj_sig}")
rsi = technical.get("rsi", {}).get("RSI12", technical.get("rsi", {}).get("RSI6", 50))
kdj_j = technical.get("kdj", {}).get("J", 50)
volume_ratio = technical.get("volume", {}).get("volume_ratio", 1)
boll_sig = technical.get("bollinger", {}).get("signal", "")
if "超卖" in boll_sig or "下轨" in boll_sig:
score += 1
signals.append(f"布林带: {boll_sig}")
elif "超买" in boll_sig or "上轨" in boll_sig:
score -= 1
signals.append(f"布林带: {boll_sig}")
ma_sig = technical.get("ma_trend", {}).get("trend_signal", "")
if "多头" in ma_sig or "强势" in ma_sig:
score += 2
signals.append(f"均线: {ma_sig}")
elif "空头" in ma_sig or "弱势" in ma_sig:
score -= 2
signals.append(f"均线: {ma_sig}")
elif "偏多" in ma_sig:
score += 1
elif "偏空" in ma_sig:
score -= 1
vol_sig = technical.get("volume", {}).get("signal", "")
if "放量上涨" in vol_sig:
score += 1
signals.append(f"成交量: {vol_sig}")
elif "放量下跌" in vol_sig:
score -= 1
signals.append(f"成交量: {vol_sig}")
# 基本面评分
fund_sig = fundamental.get("fundamental_signal", "")
if "优秀" in fund_sig:
score += 2
signals.append(f"基本面: {fund_sig}")
elif "良好" in fund_sig:
score += 1
signals.append(f"基本面: {fund_sig}")
elif "较差" in fund_sig:
score -= 2
signals.append(f"基本面: {fund_sig}")
# 52周位置
pe = fundamental.get("PE")
pb = fundamental.get("PB")
high_52w = fundamental.get("52w_high")
low_52w = fundamental.get("52w_low")
pos_52w = 0.5
if high_52w and low_52w and high_52w != low_52w:
position = (current_price - low_52w) / (high_52w - low_52w)
if position < 0.2:
score += 1
signals.append(f"52周位置: {position:.0%} (接近低点)")
elif position > 0.9:
score -= 1
signals.append(f"52周位置: {position:.0%} (接近高点)")
else:
signals.append(f"52周位置: {position:.0%}")
pos_52w = clamp((quote.get("price", current) - low_52w) / (high_52w - low_52w), 0, 1)
# 映射到操作建议
if score >= 5:
action = "强烈买入"
action_en = "STRONG_BUY"
color = "🟢🟢"
elif score >= 2:
action = "买入"
action_en = "BUY"
color = "🟢"
elif score >= -1:
action = "持有/观望"
action_en = "HOLD"
color = "🟡"
elif score >= -4:
action = "卖出"
action_en = "SELL"
color = "🔴"
trend = (ret_20 * 100 * 0.6) + (above_ratio - 0.5) * 8
if "多头" in macd_sig or "金叉" in macd_sig:
trend += 1.5
elif "空头" in macd_sig or "死叉" in macd_sig:
trend -= 1.5
momentum = ret_5 * 100 * 0.8
momentum += 1.2 if volume_ratio > 1.5 and ret_5 > 0 else 0
momentum -= 1.2 if volume_ratio > 1.5 and ret_5 < 0 else 0
momentum += 0.8 if "金叉" in technical.get("kdj", {}).get("signal", "") else 0
momentum -= 0.8 if "死叉" in technical.get("kdj", {}).get("signal", "") else 0
risk = 0.0
if rsi > 75:
risk -= 2.2
elif rsi < 28:
risk += 1.0
if kdj_j > 100:
risk -= 1.2
elif kdj_j < 0:
risk += 0.8
if pos_52w > 0.88:
risk -= 1.2
elif pos_52w < 0.18:
risk += 0.8
if "突破上轨" in boll_sig:
risk -= 0.8
elif "突破下轨" in boll_sig:
risk += 0.6
valuation = 0.0
if pe is not None:
if 0 < pe < 15:
valuation += 2.0
elif pe < 25:
valuation += 1.0
elif pe > 40:
valuation -= 1.5
if pb is not None:
if 0 < pb < 1:
valuation += 1.0
elif pb > 6:
valuation -= 1.0
relative_strength = clamp(ret_20 * 100 / 4, -3, 3)
volume_structure = clamp((volume_ratio - 1.0) * 2, -2.5, 2.5)
return {
"trend": round(clamp(trend, -5, 5), 2),
"momentum": round(clamp(momentum, -5, 5), 2),
"risk": round(clamp(risk, -5, 5), 2),
"valuation": round(clamp(valuation, -5, 5), 2),
"relative_strength": round(relative_strength, 2),
"volume_structure": round(volume_structure, 2),
}
def evaluate_signal_quality(layer_scores: dict) -> dict:
positives = sum(1 for v in layer_scores.values() if v > 0.8)
negatives = sum(1 for v in layer_scores.values() if v < -0.8)
dispersion = max(layer_scores.values()) - min(layer_scores.values())
agreement = abs(positives - negatives)
confidence = 40 + agreement * 8 - min(dispersion * 2.5, 18)
confidence = int(clamp(confidence, 18, 92))
if confidence >= 72:
level = ""
elif confidence >= 55:
level = ""
else:
action = "强烈卖出"
action_en = "STRONG_SELL"
color = "🔴🔴"
level = ""
return {"score": confidence, "level": level, "positives": positives, "negatives": negatives}
def backtest_current_signal(hist: pd.DataFrame, period: str) -> dict:
horizons = [5, 10, 20]
closes = hist["Close"].reset_index(drop=True)
if len(closes) < 45:
return {"samples": 0, "message": "历史样本不足"}
current_ret20 = (closes.iloc[-1] / closes.iloc[-21] - 1) if len(closes) > 20 else 0
current_ret5 = (closes.iloc[-1] / closes.iloc[-6] - 1) if len(closes) > 5 else 0
matched = []
for i in range(25, len(closes) - 20):
r20 = closes.iloc[i] / closes.iloc[i-20] - 1
r5 = closes.iloc[i] / closes.iloc[i-5] - 1
if abs(r20 - current_ret20) < 0.06 and abs(r5 - current_ret5) < 0.04:
matched.append(i)
if len(matched) < 5:
return {"samples": len(matched), "message": "相似信号样本不足"}
perf = {"samples": len(matched)}
all_forward = []
for h in horizons:
vals = []
for i in matched:
if i + h < len(closes):
vals.append(closes.iloc[i + h] / closes.iloc[i] - 1)
if vals:
perf[f"forward_{h}d_avg_pct"] = round(sum(vals) / len(vals) * 100, 2)
perf[f"forward_{h}d_win_rate"] = round(sum(1 for x in vals if x > 0) / len(vals) * 100, 2)
all_forward.extend(vals)
if all_forward:
perf["max_drawdown_proxy_pct"] = round(min(all_forward) * 100, 2)
perf["period"] = period
return perf
def decide_action_type(regime: str, total_score: float, confidence: dict) -> tuple[str, str]:
if total_score >= 4.5 and confidence["score"] >= 70:
return "强烈买入", "趋势型买入" if regime == "趋势延续" else "高置信度买入"
if total_score >= 2:
if regime == "超跌反弹":
return "买入", "超跌博弈型买入"
return "买入", "趋势跟随型买入"
if total_score <= -4.5 and confidence["score"] >= 70:
return "强烈卖出", "风险规避型卖出"
if total_score <= -2:
return "卖出", "止盈/止损型卖出"
return "持有/观望", "等待确认"
def generate_recommendation(technical: dict, fundamental: dict, current_price: float, hist: pd.DataFrame, quote: dict) -> dict:
market = quote.get("market", "HK")
profile = MARKET_PROFILES.get(market, MARKET_PROFILES["HK"])
regime = detect_market_regime(hist, technical, quote)
layer_scores = compute_layer_scores(hist, technical, fundamental, quote)
confidence = evaluate_signal_quality(layer_scores)
technical_bucket = (
layer_scores["trend"] * 0.35 +
layer_scores["momentum"] * 0.25 +
layer_scores["relative_strength"] * 0.20 +
layer_scores["volume_structure"] * 0.20
)
fundamental_bucket = layer_scores["valuation"]
risk_bucket = layer_scores["risk"] * profile["risk_penalty"]
total_score = technical_bucket * profile["technical"] + fundamental_bucket * profile["fundamental"] + risk_bucket
total_score = round(clamp(total_score, -8, 8), 2)
action, action_type = decide_action_type(regime["regime"], total_score, confidence)
icon_map = {"强烈买入": "🟢🟢", "买入": "🟢", "持有/观望": "🟡", "卖出": "🔴", "强烈卖出": "🔴🔴"}
en_map = {"强烈买入": "STRONG_BUY", "买入": "BUY", "持有/观望": "HOLD", "卖出": "SELL", "强烈卖出": "STRONG_SELL"}
icon = icon_map[action]
key_signals = [
f"市场场景: {regime['regime']}",
f"趋势层: {layer_scores['trend']}",
f"动量层: {layer_scores['momentum']}",
f"风险层: {layer_scores['risk']}",
f"估值层: {layer_scores['valuation']}",
f"置信度: {confidence['level']}({confidence['score']})",
]
return {
"action": action,
"action_en": action_en,
"score": score,
"icon": color,
"key_signals": signals,
"summary": f"{color} {action} (综合评分: {score})",
"action_en": en_map[action],
"action_type": action_type,
"score": total_score,
"icon": icon,
"market_profile": market,
"regime": regime,
"layer_scores": layer_scores,
"confidence": confidence,
"key_signals": key_signals,
"summary": f"{icon} {action} / {action_type} (综合评分: {total_score})",
}
@@ -650,65 +872,77 @@ def generate_recommendation(technical: dict, fundamental: dict, current_price: f
# ─────────────────────────────────────────────
def analyze_stock(code: str, period: str = "6mo", use_cache: bool = True) -> dict:
"""对单只股进行完整分析"""
numeric_code, full_code = normalize_hk_code(code)
"""对单只股进行完整分析"""
init_db()
stock = normalize_stock_code(code)
full_code = stock['code']
if use_cache:
cached = _read_cache(full_code, period)
cached = get_cached_analysis(full_code, period)
if cached:
print(f"📦 使用缓存数据 ({full_code}),缓存有效期 {CACHE_TTL_SECONDS}s", file=sys.stderr)
print(f"📦 使用缓存数据 ({full_code}),缓存有效期 {ANALYSIS_CACHE_TTL}s", file=sys.stderr)
return cached
result = {"code": full_code, "analysis_time": datetime.now().isoformat(), "error": None}
result = {"code": full_code, "market": stock['market'], "analysis_time": datetime.now().isoformat(), "error": None}
try:
# 1. 获取实时行情
quote = fetch_tencent_quote(numeric_code)
quote = fetch_tencent_quote(full_code)
if not quote or not quote.get("price"):
result["error"] = f"无法获取 {full_code} 的实时行情"
return result
upsert_watchlist_item(
code=full_code,
market=quote.get('market', stock['market']),
tencent_symbol=quote.get('tencent_symbol', stock['tencent_symbol']),
name=quote.get('name'),
exchange=quote.get('exchange', stock.get('exchange')),
currency=quote.get('currency'),
last_price=quote.get('price'),
pe=quote.get('pe'),
pb=quote.get('pb'),
market_cap=quote.get('market_cap'),
week52_high=quote.get('52w_high'),
week52_low=quote.get('52w_low'),
quote_time=quote.get('timestamp'),
meta=quote,
)
current_price = quote["price"]
result["current_price"] = current_price
result["price_date"] = quote.get("timestamp", "")
result["price_change"] = quote.get("change_amount")
result["price_change_pct"] = quote.get("change_pct")
# 2. 获取K线数据
days = period_to_days(period)
hist = fetch_tencent_kline(numeric_code, days)
if hist.empty or len(hist) < 30:
hist = refresh_kline_cache(full_code, days, period)
if hist.empty or len(hist) < min_kline_points(days):
result["error"] = f"无法获取 {full_code} 的历史K线数据 (仅获得 {len(hist)} 条)"
return result
result["data_points"] = len(hist)
result["data_points"] = len(hist)
close = hist["Close"]
high = hist["High"]
low = hist["Low"]
volume = hist["Volume"]
# 3. 技术分析
technical = {}
technical["ma_trend"] = calc_ma_trend(close)
technical["macd"] = calc_macd(close)
technical["rsi"] = calc_rsi(close)
technical["kdj"] = calc_kdj(high, low, close)
technical["bollinger"] = calc_bollinger(close)
technical["volume"] = calc_volume_analysis(volume, close)
technical = {
"ma_trend": calc_ma_trend(close),
"macd": calc_macd(close),
"rsi": calc_rsi(close),
"kdj": calc_kdj(high, low, close),
"bollinger": calc_bollinger(close),
"volume": calc_volume_analysis(volume, close),
}
result["technical"] = technical
# 4. 基本面分析
fundamental = get_fundamentals(quote)
result["fundamental"] = fundamental
result["recommendation"] = generate_recommendation(technical, fundamental, current_price, hist, quote)
result["signal_validation"] = backtest_current_signal(hist, period)
# 5. 综合建议
result["recommendation"] = generate_recommendation(technical, fundamental, current_price)
# 6. 写入缓存
if result.get("error") is None:
_write_cache(full_code, period, result)
set_cached_analysis(full_code, period, result)
except Exception as e:
result["error"] = f"分析过程出错: {str(e)}"
@@ -717,8 +951,8 @@ def analyze_stock(code: str, period: str = "6mo", use_cache: bool = True) -> dic
def main():
parser = argparse.ArgumentParser(description="港股分析工具 (腾讯财经数据源)")
parser.add_argument("code", help="港股代码 (如 0700.HK, 00700, 腾讯)")
parser = argparse.ArgumentParser(description="多市场股票分析工具 (腾讯财经/Yahoo 数据源)")
parser.add_argument("code", help="股票代码,如 0700.HK / 600519 / SH600519 / AAPL")
parser.add_argument("--period", default="6mo", help="数据周期 (1mo/3mo/6mo/1y/2y/5y)")
parser.add_argument("--output", help="输出JSON文件路径")
parser.add_argument("--no-cache", action="store_true", help="跳过缓存,强制重新请求数据")
@@ -726,22 +960,15 @@ def main():
args = parser.parse_args()
if args.clear_cache:
import shutil
cleared = False
for path in (CACHE_DIR, LEGACY_CACHE_DIR):
if path.exists():
shutil.rmtree(path)
cleared = True
cleared = clear_analysis_cache()
if cleared:
print("缓存已清除")
print(f"✅ 已清除 {cleared} 条分析缓存")
else:
print(" 无缓存可清除")
return
result = analyze_stock(args.code, args.period, use_cache=not args.no_cache)
output = json.dumps(result, ensure_ascii=False, indent=2, default=str)
if args.output:
with open(args.output, "w", encoding="utf-8") as f:
f.write(output)