Files
stockbuddy/scripts/analyze_stock.py

982 lines
35 KiB
Python
Executable File
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
港股分析脚本 - 使用腾讯财经数据源进行技术面+基本面分析
用法:
python3 analyze_stock.py <股票代码> [--period <周期>] [--output <输出文件>]
示例:
python3 analyze_stock.py 0700.HK
python3 analyze_stock.py 0700.HK --period 6mo --output report.json
python3 analyze_stock.py 9988.HK --period 1y
"""
import sys
import json
import argparse
import time
import urllib.request
import urllib.error
from datetime import datetime, timedelta
from pathlib import Path
try:
from db import (
ANALYSIS_CACHE_TTL_SECONDS,
clear_analysis_cache,
get_cached_analysis,
get_kline_df,
get_latest_kline_date,
init_db,
set_cached_analysis,
upsert_kline_df,
upsert_watchlist_item,
)
except ImportError:
sys.path.insert(0, str(Path(__file__).resolve().parent))
from db import (
ANALYSIS_CACHE_TTL_SECONDS,
clear_analysis_cache,
get_cached_analysis,
get_kline_df,
get_latest_kline_date,
init_db,
set_cached_analysis,
upsert_kline_df,
upsert_watchlist_item,
)
try:
import numpy as np
except ImportError:
print("ERROR: numpy 未安装。请运行: pip3 install numpy", file=sys.stderr)
sys.exit(1)
try:
import pandas as pd
except ImportError:
print("ERROR: pandas 未安装。请运行: pip3 install pandas", file=sys.stderr)
sys.exit(1)
# ─────────────────────────────────────────────
# 缓存与重试机制
# ─────────────────────────────────────────────
MAX_RETRIES = 3
RETRY_BASE_DELAY = 2
ANALYSIS_CACHE_TTL = ANALYSIS_CACHE_TTL_SECONDS
# ─────────────────────────────────────────────
# 腾讯财经数据获取
# ─────────────────────────────────────────────
def normalize_stock_code(code: str) -> dict:
"""标准化股票代码,支持港股/A股/美股。"""
raw = code.strip().upper()
if raw.endswith('.HK'):
digits = raw[:-3].lstrip('0') or '0'
return {
'market': 'HK',
'code': digits.zfill(4) + '.HK',
'tencent_symbol': 'hk' + digits.zfill(5),
'exchange': 'HKEX',
}
if raw.startswith(('SH', 'SZ')) and len(raw) == 8 and raw[2:].isdigit():
market = raw[:2]
return {
'market': market,
'code': raw,
'tencent_symbol': raw.lower(),
'exchange': 'SSE' if market == 'SH' else 'SZSE',
}
if raw.endswith('.US'):
symbol = raw[:-3]
return {
'market': 'US',
'code': symbol,
'tencent_symbol': 'us' + symbol,
'exchange': 'US',
}
if raw.startswith('US.'):
symbol = raw[3:]
return {
'market': 'US',
'code': symbol,
'tencent_symbol': 'us' + symbol,
'exchange': 'US',
}
if raw.isdigit():
if len(raw) <= 5:
digits = raw.lstrip('0') or '0'
return {
'market': 'HK',
'code': digits.zfill(4) + '.HK',
'tencent_symbol': 'hk' + digits.zfill(5),
'exchange': 'HKEX',
}
if len(raw) == 6:
market = 'SH' if raw.startswith(('5', '6', '9')) else 'SZ'
return {
'market': market,
'code': market + raw,
'tencent_symbol': (market + raw).lower(),
'exchange': 'SSE' if market == 'SH' else 'SZSE',
}
symbol = raw.replace('.', '').replace('-', '')
return {
'market': 'US',
'code': symbol,
'tencent_symbol': 'us' + symbol,
'exchange': 'US',
}
def fetch_tencent_quote(code: str) -> dict:
"""获取腾讯财经实时行情"""
stock = normalize_stock_code(code)
symbol = stock['tencent_symbol']
url = f"http://qt.gtimg.cn/q={symbol}"
for attempt in range(MAX_RETRIES):
try:
req = urllib.request.Request(url, headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Referer': 'https://gu.qq.com/',
})
with urllib.request.urlopen(req, timeout=10) as response:
data = response.read().decode('gb2312', errors='ignore')
return _parse_tencent_quote(data, symbol, stock)
except urllib.error.URLError as e:
if attempt < MAX_RETRIES - 1:
time.sleep(RETRY_BASE_DELAY * (attempt + 1))
else:
raise Exception(f"获取实时行情失败: {e}")
return {}
def _parse_tencent_quote(data: str, symbol: str, stock: dict) -> dict:
"""解析腾讯财经实时行情响应"""
var_name = f"v_{symbol}"
for line in data.strip().split(";"):
line = line.strip()
if not line or var_name not in line:
continue
# 提取引号内的内容
parts = line.split('"')
if len(parts) < 2:
continue
values = parts[1].split("~")
if len(values) < 35: # 至少需要35个字段
continue
def safe_float(idx: int, default: float = 0.0) -> float:
try:
return float(values[idx]) if values[idx] else default
except (ValueError, IndexError):
return default
def safe_str(idx: int, default: str = "") -> str:
return values[idx] if idx < len(values) else default
# 字段映射 (根据腾讯财经API实际数据)
# 0:市场 1:名称 2:代码 3:现价 4:昨收 5:今开 6:成交量
# 30:时间戳 31:涨跌额 32:涨跌幅 33:最高 34:最低
# 39:市盈率 47:市净率 37:总市值 48:52周高 49:52周低
market = stock['market']
currency = 'HKD' if market == 'HK' else ('CNY' if market in ('SH', 'SZ') else safe_str(35, 'USD') or 'USD')
pb_idx = 47 if market in ('HK', 'US') else 46
market_cap_idx = 37 if market == 'HK' else (57 if market in ('SH', 'SZ') else 44)
high_52_idx = 48 if market in ('HK', 'US') else 41
low_52_idx = 49 if market in ('HK', 'US') else 42
return {
'name': values[1],
'code': stock['code'],
'market': market,
'exchange': stock.get('exchange'),
'tencent_symbol': symbol,
'price': safe_float(3),
'prev_close': safe_float(4),
'open': safe_float(5),
'volume': safe_float(6),
'high': safe_float(33),
'low': safe_float(34),
'change_amount': safe_float(31),
'change_pct': safe_float(32),
'timestamp': safe_str(30),
'currency': currency,
'pe': safe_float(39) if len(values) > 39 else None,
'pb': safe_float(pb_idx) if len(values) > pb_idx else None,
'market_cap': safe_str(market_cap_idx),
'52w_high': safe_float(high_52_idx) if len(values) > high_52_idx else None,
'52w_low': safe_float(low_52_idx) if len(values) > low_52_idx else None,
'raw_code': safe_str(2),
}
return {}
def fetch_tencent_kline(code: str, days: int = 120) -> pd.DataFrame:
"""获取腾讯财经K线数据"""
stock = normalize_stock_code(code)
symbol = stock['tencent_symbol']
url = f"https://web.ifzq.gtimg.cn/appstock/app/fqkline/get?param={symbol},day,,,{days},qfq"
for attempt in range(MAX_RETRIES):
try:
req = urllib.request.Request(url, headers={
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Referer': 'https://gu.qq.com/',
})
with urllib.request.urlopen(req, timeout=15) as response:
data = json.loads(response.read().decode('utf-8'))
return _parse_tencent_kline(data, symbol)
except (urllib.error.URLError, json.JSONDecodeError) as e:
if attempt < MAX_RETRIES - 1:
time.sleep(RETRY_BASE_DELAY * (attempt + 1))
else:
raise Exception(f"获取K线数据失败: {e}")
return pd.DataFrame()
def _parse_tencent_kline(data: dict, symbol: str) -> pd.DataFrame:
"""解析腾讯财经K线数据"""
if data.get('code') != 0 or not data.get('data') or symbol not in data['data']:
return pd.DataFrame()
symbol_data = data['data'][symbol]
day_data = symbol_data.get('day') or symbol_data.get('qfqday') or symbol_data.get('hfqday') or []
if not day_data:
return pd.DataFrame()
records = []
for item in day_data:
if len(item) >= 6:
records.append({
'Date': item[0],
'Open': float(item[1]),
'Close': float(item[2]),
'Low': float(item[3]),
'High': float(item[4]),
'Volume': float(item[5]),
})
df = pd.DataFrame(records)
if not df.empty:
df['Date'] = pd.to_datetime(df['Date'])
df.set_index('Date', inplace=True)
return df
def fetch_us_kline_yahoo(symbol: str, period: str = '6mo') -> pd.DataFrame:
range_map = {
'1mo': '1mo',
'3mo': '3mo',
'6mo': '6mo',
'1y': '1y',
'2y': '2y',
'5y': '5y',
}
url = f"https://query1.finance.yahoo.com/v8/finance/chart/{symbol}?range={range_map.get(period, '6mo')}&interval=1d&includePrePost=false"
req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'})
with urllib.request.urlopen(req, timeout=20) as response:
data = json.loads(response.read().decode('utf-8'))
result = data.get('chart', {}).get('result', [])
if not result:
return pd.DataFrame()
result = result[0]
timestamps = result.get('timestamp') or []
quote = (result.get('indicators', {}).get('quote') or [{}])[0]
opens = quote.get('open') or []
highs = quote.get('high') or []
lows = quote.get('low') or []
closes = quote.get('close') or []
volumes = quote.get('volume') or []
records = []
for i, ts in enumerate(timestamps):
if i >= len(opens) or opens[i] is None or closes[i] is None or highs[i] is None or lows[i] is None:
continue
records.append({
'Date': datetime.fromtimestamp(ts).strftime('%Y-%m-%d'),
'Open': float(opens[i]),
'Close': float(closes[i]),
'Low': float(lows[i]),
'High': float(highs[i]),
'Volume': float(volumes[i] or 0),
})
df = pd.DataFrame(records)
if not df.empty:
df['Date'] = pd.to_datetime(df['Date'])
df.set_index('Date', inplace=True)
return df
def period_to_days(period: str) -> int:
"""将周期字符串转换为天数"""
mapping = {
"1mo": 30,
"3mo": 90,
"6mo": 180,
"1y": 250,
"2y": 500,
"5y": 1250,
}
return mapping.get(period, 180)
def min_kline_points(required_days: int) -> int:
return 20 if required_days <= 30 else 30
def refresh_kline_cache(code: str, required_days: int, period: str = '6mo') -> pd.DataFrame:
"""使用 SQLite 保存日线数据,并按需增量刷新。"""
stock = normalize_stock_code(code)
buffer_days = 30
latest_date = get_latest_kline_date(code)
fetch_days = max(required_days + buffer_days, 60)
if latest_date:
latest_dt = datetime.strptime(latest_date, "%Y-%m-%d")
missing_days = max((datetime.now() - latest_dt).days, 0)
if missing_days <= 2:
fetch_days = min(fetch_days, 60)
else:
fetch_days = max(missing_days + buffer_days, 60)
fetched = fetch_tencent_kline(code, fetch_days)
if stock['market'] == 'US' and len(fetched) <= 2:
fetched = fetch_us_kline_yahoo(stock['code'], period)
if not fetched.empty:
upsert_kline_df(code, fetched, source='yahoo' if stock['market'] == 'US' and len(fetched) > 2 else 'tencent')
hist = get_kline_df(code, required_days + buffer_days)
if len(hist) < min_kline_points(required_days):
fallback = fetch_tencent_kline(code, required_days + buffer_days)
if stock['market'] == 'US' and len(fallback) <= 2:
fallback = fetch_us_kline_yahoo(stock['code'], period)
if not fallback.empty:
upsert_kline_df(code, fallback, source='yahoo' if stock['market'] == 'US' and len(fallback) > 2 else 'tencent')
hist = get_kline_df(code, required_days + buffer_days)
return hist
# ─────────────────────────────────────────────
# 技术指标计算 (保持不变)
# ─────────────────────────────────────────────
def calc_ma(close: pd.Series, windows: list[int] = None) -> dict:
"""计算多周期移动平均线"""
if windows is None:
windows = [5, 10, 20, 60, 120, 250]
result = {}
for w in windows:
if len(close) >= w:
ma = close.rolling(window=w).mean()
result[f"MA{w}"] = round(ma.iloc[-1], 3)
return result
def calc_ema(close: pd.Series, span: int) -> pd.Series:
"""计算指数移动平均线"""
return close.ewm(span=span, adjust=False).mean()
def calc_macd(close: pd.Series, fast: int = 12, slow: int = 26, signal: int = 9) -> dict:
"""计算MACD指标"""
ema_fast = calc_ema(close, fast)
ema_slow = calc_ema(close, slow)
dif = ema_fast - ema_slow
dea = dif.ewm(span=signal, adjust=False).mean()
macd_hist = 2 * (dif - dea)
return {
"DIF": round(dif.iloc[-1], 4),
"DEA": round(dea.iloc[-1], 4),
"MACD": round(macd_hist.iloc[-1], 4),
"signal": _macd_signal(dif, dea, macd_hist),
}
def _macd_signal(dif: pd.Series, dea: pd.Series, macd_hist: pd.Series) -> str:
"""MACD信号判断"""
if len(dif) < 3:
return "中性"
if dif.iloc[-1] > dea.iloc[-1] and dif.iloc[-2] <= dea.iloc[-2]:
return "金叉-买入信号"
if dif.iloc[-1] < dea.iloc[-1] and dif.iloc[-2] >= dea.iloc[-2]:
return "死叉-卖出信号"
if dif.iloc[-1] > 0 and dea.iloc[-1] > 0:
if macd_hist.iloc[-1] > macd_hist.iloc[-2]:
return "多头增强"
return "多头区域"
if dif.iloc[-1] < 0 and dea.iloc[-1] < 0:
if macd_hist.iloc[-1] < macd_hist.iloc[-2]:
return "空头增强"
return "空头区域"
return "中性"
def calc_rsi(close: pd.Series, periods: list[int] = None) -> dict:
"""计算RSI指标"""
if periods is None:
periods = [6, 12, 24]
result = {}
delta = close.diff()
for p in periods:
if len(close) < p + 1:
continue
gain = delta.clip(lower=0).rolling(window=p).mean()
loss = (-delta.clip(upper=0)).rolling(window=p).mean()
rs = gain / loss.replace(0, np.nan)
rsi = 100 - (100 / (1 + rs))
val = round(rsi.iloc[-1], 2)
result[f"RSI{p}"] = val
rsi_main = result.get("RSI12", result.get("RSI6", 50))
if rsi_main > 80:
result["signal"] = "严重超买-卖出信号"
elif rsi_main > 70:
result["signal"] = "超买-注意风险"
elif rsi_main < 20:
result["signal"] = "严重超卖-买入信号"
elif rsi_main < 30:
result["signal"] = "超卖-关注买入"
else:
result["signal"] = "中性"
return result
def calc_kdj(high: pd.Series, low: pd.Series, close: pd.Series, n: int = 9) -> dict:
"""计算KDJ指标"""
if len(close) < n:
return {"K": 50, "D": 50, "J": 50, "signal": "数据不足"}
lowest_low = low.rolling(window=n).min()
highest_high = high.rolling(window=n).max()
rsv = (close - lowest_low) / (highest_high - lowest_low).replace(0, np.nan) * 100
k = pd.Series(index=close.index, dtype=float)
d = pd.Series(index=close.index, dtype=float)
k.iloc[n - 1] = 50
d.iloc[n - 1] = 50
for i in range(n, len(close)):
k.iloc[i] = 2 / 3 * k.iloc[i - 1] + 1 / 3 * rsv.iloc[i]
d.iloc[i] = 2 / 3 * d.iloc[i - 1] + 1 / 3 * k.iloc[i]
j = 3 * k - 2 * d
k_val = round(k.iloc[-1], 2)
d_val = round(d.iloc[-1], 2)
j_val = round(j.iloc[-1], 2)
signal = "中性"
if k_val > d_val and k.iloc[-2] <= d.iloc[-2]:
signal = "金叉-买入信号"
elif k_val < d_val and k.iloc[-2] >= d.iloc[-2]:
signal = "死叉-卖出信号"
elif j_val > 100:
signal = "超买区域"
elif j_val < 0:
signal = "超卖区域"
return {"K": k_val, "D": d_val, "J": j_val, "signal": signal}
def calc_bollinger(close: pd.Series, window: int = 20, num_std: float = 2) -> dict:
"""计算布林带"""
if len(close) < window:
return {"signal": "数据不足"}
ma = close.rolling(window=window).mean()
std = close.rolling(window=window).std()
upper = ma + num_std * std
lower = ma - num_std * std
current = close.iloc[-1]
upper_val = round(upper.iloc[-1], 3)
lower_val = round(lower.iloc[-1], 3)
mid_val = round(ma.iloc[-1], 3)
bandwidth = round((upper_val - lower_val) / mid_val * 100, 2)
signal = "中性"
if current > upper_val:
signal = "突破上轨-超买"
elif current < lower_val:
signal = "突破下轨-超卖"
elif current > mid_val:
signal = "中轨上方-偏强"
else:
signal = "中轨下方-偏弱"
return {
"upper": upper_val,
"middle": mid_val,
"lower": lower_val,
"bandwidth_pct": bandwidth,
"signal": signal,
}
def calc_volume_analysis(volume: pd.Series, close: pd.Series) -> dict:
"""成交量分析"""
if len(volume) < 20:
return {"signal": "数据不足"}
avg_5 = volume.rolling(5).mean().iloc[-1]
avg_20 = volume.rolling(20).mean().iloc[-1]
current = volume.iloc[-1]
vol_ratio = round(current / avg_5, 2) if avg_5 > 0 else 0
price_change = close.iloc[-1] - close.iloc[-2]
signal = "中性"
if vol_ratio > 2 and price_change > 0:
signal = "放量上涨-强势"
elif vol_ratio > 2 and price_change < 0:
signal = "放量下跌-弱势"
elif vol_ratio < 0.5 and price_change > 0:
signal = "缩量上涨-动力不足"
elif vol_ratio < 0.5 and price_change < 0:
signal = "缩量下跌-抛压减轻"
return {
"current_volume": int(current),
"avg_5d_volume": int(avg_5),
"avg_20d_volume": int(avg_20),
"volume_ratio": vol_ratio,
"signal": signal,
}
def calc_ma_trend(close: pd.Series) -> dict:
"""均线趋势分析"""
mas = calc_ma(close, [5, 10, 20, 60])
current = close.iloc[-1]
above_count = sum(1 for v in mas.values() if current > v)
total = len(mas)
if above_count == total and total > 0:
signal = "多头排列-强势"
elif above_count == 0:
signal = "空头排列-弱势"
elif above_count >= total * 0.7:
signal = "偏多"
elif above_count <= total * 0.3:
signal = "偏空"
else:
signal = "震荡"
return {**mas, "trend_signal": signal, "price_above_ma_count": f"{above_count}/{total}"}
# ─────────────────────────────────────────────
# 基本面分析 (基于腾讯数据)
# ─────────────────────────────────────────────
def get_fundamentals(quote: dict) -> dict:
"""基于实时行情数据的基本面分析"""
result = {}
pe = quote.get('pe')
pb = quote.get('pb')
result['PE'] = round(pe, 2) if pe else None
result['PB'] = round(pb, 2) if pb else None
result['PS'] = None
result['market_cap'] = quote.get('market_cap', '')
result['52w_high'] = quote.get('52w_high')
result['52w_low'] = quote.get('52w_low')
result['company_name'] = quote.get('name', '未知')
result['sector'] = quote.get('market', '未知市场')
result['industry'] = quote.get('exchange') or quote.get('market', '未知')
result['currency'] = quote.get('currency', 'N/A')
result['market'] = quote.get('market', 'N/A')
result['fundamental_signal'] = _fundamental_signal(result)
return result
def _fundamental_signal(data: dict) -> str:
"""基本面信号判断 (简化版)"""
score = 0
reasons = []
pe = data.get("PE")
if pe is not None and pe > 0:
if pe < 15:
score += 2
reasons.append(f"PE低估值({pe})")
elif pe < 25:
score += 1
reasons.append(f"PE合理({pe})")
elif pe > 40:
score -= 1
reasons.append(f"PE偏高({pe})")
pb = data.get("PB")
if pb is not None:
if pb < 1:
score += 1
reasons.append(f"PB破净({pb})")
elif pb > 5:
score -= 1
reasons.append(f"PB偏高({pb})")
if score >= 3:
signal = "基本面优秀"
elif score >= 1:
signal = "基本面良好"
elif score >= 0:
signal = "基本面一般"
else:
signal = "基本面较差"
return f"{signal} ({'; '.join(reasons[:3])})" if reasons else signal
# ─────────────────────────────────────────────
# 综合评分与建议
# ─────────────────────────────────────────────
MARKET_PROFILES = {
"HK": {"technical": 0.62, "fundamental": 0.38, "risk_penalty": 1.0},
"SH": {"technical": 0.58, "fundamental": 0.42, "risk_penalty": 0.9},
"SZ": {"technical": 0.60, "fundamental": 0.40, "risk_penalty": 1.0},
"US": {"technical": 0.55, "fundamental": 0.45, "risk_penalty": 0.85},
}
def clamp(value: float, low: float, high: float) -> float:
return max(low, min(high, value))
def detect_market_regime(hist: pd.DataFrame, technical: dict, quote: dict) -> dict:
close = hist["Close"]
ma20 = close.rolling(20).mean().iloc[-1] if len(close) >= 20 else close.iloc[-1]
ma60 = close.rolling(60).mean().iloc[-1] if len(close) >= 60 else ma20
current = close.iloc[-1]
rsi12 = technical.get("rsi", {}).get("RSI12", technical.get("rsi", {}).get("RSI6", 50))
high_52w = quote.get("52w_high")
low_52w = quote.get("52w_low")
pos_52w = None
if high_52w and low_52w and high_52w != low_52w:
pos_52w = (current - low_52w) / (high_52w - low_52w)
if current > ma20 > ma60 and rsi12 >= 55:
regime = "趋势延续"
elif rsi12 <= 35 and technical.get("kdj", {}).get("J", 50) < 20:
regime = "超跌反弹"
elif pos_52w is not None and pos_52w > 0.85 and rsi12 >= 68:
regime = "高位风险"
elif abs(current / ma20 - 1) < 0.03 and 40 <= rsi12 <= 60:
regime = "区间震荡"
else:
regime = "估值修复/等待确认"
return {"regime": regime, "position_52w": round(pos_52w, 4) if pos_52w is not None else None}
def compute_layer_scores(hist: pd.DataFrame, technical: dict, fundamental: dict, quote: dict) -> dict:
close = hist["Close"]
current = close.iloc[-1]
ret_5 = (current / close.iloc[-6] - 1) if len(close) > 5 else 0
ret_20 = (current / close.iloc[-21] - 1) if len(close) > 20 else ret_5
ma = technical.get("ma_trend", {})
above = ma.get("price_above_ma_count", "0/1").split("/")
above_ratio = (int(above[0]) / max(int(above[1]), 1)) if len(above) == 2 else 0
macd_sig = technical.get("macd", {}).get("signal", "")
rsi = technical.get("rsi", {}).get("RSI12", technical.get("rsi", {}).get("RSI6", 50))
kdj_j = technical.get("kdj", {}).get("J", 50)
volume_ratio = technical.get("volume", {}).get("volume_ratio", 1)
boll_sig = technical.get("bollinger", {}).get("signal", "")
pe = fundamental.get("PE")
pb = fundamental.get("PB")
high_52w = fundamental.get("52w_high")
low_52w = fundamental.get("52w_low")
pos_52w = 0.5
if high_52w and low_52w and high_52w != low_52w:
pos_52w = clamp((quote.get("price", current) - low_52w) / (high_52w - low_52w), 0, 1)
trend = (ret_20 * 100 * 0.6) + (above_ratio - 0.5) * 8
if "多头" in macd_sig or "金叉" in macd_sig:
trend += 1.5
elif "空头" in macd_sig or "死叉" in macd_sig:
trend -= 1.5
momentum = ret_5 * 100 * 0.8
momentum += 1.2 if volume_ratio > 1.5 and ret_5 > 0 else 0
momentum -= 1.2 if volume_ratio > 1.5 and ret_5 < 0 else 0
momentum += 0.8 if "金叉" in technical.get("kdj", {}).get("signal", "") else 0
momentum -= 0.8 if "死叉" in technical.get("kdj", {}).get("signal", "") else 0
risk = 0.0
if rsi > 75:
risk -= 2.2
elif rsi < 28:
risk += 1.0
if kdj_j > 100:
risk -= 1.2
elif kdj_j < 0:
risk += 0.8
if pos_52w > 0.88:
risk -= 1.2
elif pos_52w < 0.18:
risk += 0.8
if "突破上轨" in boll_sig:
risk -= 0.8
elif "突破下轨" in boll_sig:
risk += 0.6
valuation = 0.0
if pe is not None:
if 0 < pe < 15:
valuation += 2.0
elif pe < 25:
valuation += 1.0
elif pe > 40:
valuation -= 1.5
if pb is not None:
if 0 < pb < 1:
valuation += 1.0
elif pb > 6:
valuation -= 1.0
relative_strength = clamp(ret_20 * 100 / 4, -3, 3)
volume_structure = clamp((volume_ratio - 1.0) * 2, -2.5, 2.5)
return {
"trend": round(clamp(trend, -5, 5), 2),
"momentum": round(clamp(momentum, -5, 5), 2),
"risk": round(clamp(risk, -5, 5), 2),
"valuation": round(clamp(valuation, -5, 5), 2),
"relative_strength": round(relative_strength, 2),
"volume_structure": round(volume_structure, 2),
}
def evaluate_signal_quality(layer_scores: dict) -> dict:
positives = sum(1 for v in layer_scores.values() if v > 0.8)
negatives = sum(1 for v in layer_scores.values() if v < -0.8)
dispersion = max(layer_scores.values()) - min(layer_scores.values())
agreement = abs(positives - negatives)
confidence = 40 + agreement * 8 - min(dispersion * 2.5, 18)
confidence = int(clamp(confidence, 18, 92))
if confidence >= 72:
level = ""
elif confidence >= 55:
level = ""
else:
level = ""
return {"score": confidence, "level": level, "positives": positives, "negatives": negatives}
def backtest_current_signal(hist: pd.DataFrame, period: str) -> dict:
horizons = [5, 10, 20]
closes = hist["Close"].reset_index(drop=True)
if len(closes) < 45:
return {"samples": 0, "message": "历史样本不足"}
current_ret20 = (closes.iloc[-1] / closes.iloc[-21] - 1) if len(closes) > 20 else 0
current_ret5 = (closes.iloc[-1] / closes.iloc[-6] - 1) if len(closes) > 5 else 0
matched = []
for i in range(25, len(closes) - 20):
r20 = closes.iloc[i] / closes.iloc[i-20] - 1
r5 = closes.iloc[i] / closes.iloc[i-5] - 1
if abs(r20 - current_ret20) < 0.06 and abs(r5 - current_ret5) < 0.04:
matched.append(i)
if len(matched) < 5:
return {"samples": len(matched), "message": "相似信号样本不足"}
perf = {"samples": len(matched)}
all_forward = []
for h in horizons:
vals = []
for i in matched:
if i + h < len(closes):
vals.append(closes.iloc[i + h] / closes.iloc[i] - 1)
if vals:
perf[f"forward_{h}d_avg_pct"] = round(sum(vals) / len(vals) * 100, 2)
perf[f"forward_{h}d_win_rate"] = round(sum(1 for x in vals if x > 0) / len(vals) * 100, 2)
all_forward.extend(vals)
if all_forward:
perf["max_drawdown_proxy_pct"] = round(min(all_forward) * 100, 2)
perf["period"] = period
return perf
def decide_action_type(regime: str, total_score: float, confidence: dict) -> tuple[str, str]:
if total_score >= 4.5 and confidence["score"] >= 70:
return "强烈买入", "趋势型买入" if regime == "趋势延续" else "高置信度买入"
if total_score >= 2:
if regime == "超跌反弹":
return "买入", "超跌博弈型买入"
return "买入", "趋势跟随型买入"
if total_score <= -4.5 and confidence["score"] >= 70:
return "强烈卖出", "风险规避型卖出"
if total_score <= -2:
return "卖出", "止盈/止损型卖出"
return "持有/观望", "等待确认"
def generate_recommendation(technical: dict, fundamental: dict, current_price: float, hist: pd.DataFrame, quote: dict) -> dict:
market = quote.get("market", "HK")
profile = MARKET_PROFILES.get(market, MARKET_PROFILES["HK"])
regime = detect_market_regime(hist, technical, quote)
layer_scores = compute_layer_scores(hist, technical, fundamental, quote)
confidence = evaluate_signal_quality(layer_scores)
technical_bucket = (
layer_scores["trend"] * 0.35 +
layer_scores["momentum"] * 0.25 +
layer_scores["relative_strength"] * 0.20 +
layer_scores["volume_structure"] * 0.20
)
fundamental_bucket = layer_scores["valuation"]
risk_bucket = layer_scores["risk"] * profile["risk_penalty"]
total_score = technical_bucket * profile["technical"] + fundamental_bucket * profile["fundamental"] + risk_bucket
total_score = round(clamp(total_score, -8, 8), 2)
action, action_type = decide_action_type(regime["regime"], total_score, confidence)
icon_map = {"强烈买入": "🟢🟢", "买入": "🟢", "持有/观望": "🟡", "卖出": "🔴", "强烈卖出": "🔴🔴"}
en_map = {"强烈买入": "STRONG_BUY", "买入": "BUY", "持有/观望": "HOLD", "卖出": "SELL", "强烈卖出": "STRONG_SELL"}
icon = icon_map[action]
key_signals = [
f"市场场景: {regime['regime']}",
f"趋势层: {layer_scores['trend']}",
f"动量层: {layer_scores['momentum']}",
f"风险层: {layer_scores['risk']}",
f"估值层: {layer_scores['valuation']}",
f"置信度: {confidence['level']}({confidence['score']})",
]
return {
"action": action,
"action_en": en_map[action],
"action_type": action_type,
"score": total_score,
"icon": icon,
"market_profile": market,
"regime": regime,
"layer_scores": layer_scores,
"confidence": confidence,
"key_signals": key_signals,
"summary": f"{icon} {action} / {action_type} (综合评分: {total_score})",
}
# ─────────────────────────────────────────────
# 主流程
# ─────────────────────────────────────────────
def analyze_stock(code: str, period: str = "6mo", use_cache: bool = True) -> dict:
"""对单只股票进行完整分析"""
init_db()
stock = normalize_stock_code(code)
full_code = stock['code']
if use_cache:
cached = get_cached_analysis(full_code, period)
if cached:
print(f"📦 使用缓存数据 ({full_code}),缓存有效期 {ANALYSIS_CACHE_TTL}s", file=sys.stderr)
return cached
result = {"code": full_code, "market": stock['market'], "analysis_time": datetime.now().isoformat(), "error": None}
try:
quote = fetch_tencent_quote(full_code)
if not quote or not quote.get("price"):
result["error"] = f"无法获取 {full_code} 的实时行情"
return result
upsert_watchlist_item(
code=full_code,
market=quote.get('market', stock['market']),
tencent_symbol=quote.get('tencent_symbol', stock['tencent_symbol']),
name=quote.get('name'),
exchange=quote.get('exchange', stock.get('exchange')),
currency=quote.get('currency'),
last_price=quote.get('price'),
pe=quote.get('pe'),
pb=quote.get('pb'),
market_cap=quote.get('market_cap'),
week52_high=quote.get('52w_high'),
week52_low=quote.get('52w_low'),
quote_time=quote.get('timestamp'),
meta=quote,
)
current_price = quote["price"]
result["current_price"] = current_price
result["price_date"] = quote.get("timestamp", "")
result["price_change"] = quote.get("change_amount")
result["price_change_pct"] = quote.get("change_pct")
days = period_to_days(period)
hist = refresh_kline_cache(full_code, days, period)
if hist.empty or len(hist) < min_kline_points(days):
result["error"] = f"无法获取 {full_code} 的历史K线数据 (仅获得 {len(hist)} 条)"
return result
result["data_points"] = len(hist)
close = hist["Close"]
high = hist["High"]
low = hist["Low"]
volume = hist["Volume"]
technical = {
"ma_trend": calc_ma_trend(close),
"macd": calc_macd(close),
"rsi": calc_rsi(close),
"kdj": calc_kdj(high, low, close),
"bollinger": calc_bollinger(close),
"volume": calc_volume_analysis(volume, close),
}
result["technical"] = technical
fundamental = get_fundamentals(quote)
result["fundamental"] = fundamental
result["recommendation"] = generate_recommendation(technical, fundamental, current_price, hist, quote)
result["signal_validation"] = backtest_current_signal(hist, period)
if result.get("error") is None:
set_cached_analysis(full_code, period, result)
except Exception as e:
result["error"] = f"分析过程出错: {str(e)}"
return result
def main():
parser = argparse.ArgumentParser(description="多市场股票分析工具 (腾讯财经/Yahoo 数据源)")
parser.add_argument("code", help="股票代码,如 0700.HK / 600519 / SH600519 / AAPL")
parser.add_argument("--period", default="6mo", help="数据周期 (1mo/3mo/6mo/1y/2y/5y)")
parser.add_argument("--output", help="输出JSON文件路径")
parser.add_argument("--no-cache", action="store_true", help="跳过缓存,强制重新请求数据")
parser.add_argument("--clear-cache", action="store_true", help="清除所有缓存后退出")
args = parser.parse_args()
if args.clear_cache:
cleared = clear_analysis_cache()
if cleared:
print(f"✅ 已清除 {cleared} 条分析缓存")
else:
print(" 无缓存可清除")
return
result = analyze_stock(args.code, args.period, use_cache=not args.no_cache)
output = json.dumps(result, ensure_ascii=False, indent=2, default=str)
if args.output:
with open(args.output, "w", encoding="utf-8") as f:
f.write(output)
print(f"分析结果已保存至 {args.output}")
else:
print(output)
if __name__ == "__main__":
main()