#!/usr/bin/env python3 """ 港股分析脚本 - 使用腾讯财经数据源进行技术面+基本面分析 用法: python3 analyze_stock.py <股票代码> [--period <周期>] [--output <输出文件>] 示例: python3 analyze_stock.py 0700.HK python3 analyze_stock.py 0700.HK --period 6mo --output report.json python3 analyze_stock.py 9988.HK --period 1y """ import sys import json import argparse import time import urllib.request import urllib.error from datetime import datetime, timedelta from pathlib import Path try: from db import ( ANALYSIS_CACHE_TTL_SECONDS, clear_analysis_cache, get_cached_analysis, get_kline_df, get_latest_kline_date, init_db, set_cached_analysis, upsert_kline_df, upsert_watchlist_item, ) except ImportError: sys.path.insert(0, str(Path(__file__).resolve().parent)) from db import ( ANALYSIS_CACHE_TTL_SECONDS, clear_analysis_cache, get_cached_analysis, get_kline_df, get_latest_kline_date, init_db, set_cached_analysis, upsert_kline_df, upsert_watchlist_item, ) try: import numpy as np except ImportError: print("ERROR: numpy 未安装。请运行: pip3 install numpy", file=sys.stderr) sys.exit(1) try: import pandas as pd except ImportError: print("ERROR: pandas 未安装。请运行: pip3 install pandas", file=sys.stderr) sys.exit(1) # ───────────────────────────────────────────── # 缓存与重试机制 # ───────────────────────────────────────────── MAX_RETRIES = 3 RETRY_BASE_DELAY = 2 ANALYSIS_CACHE_TTL = ANALYSIS_CACHE_TTL_SECONDS # ───────────────────────────────────────────── # 腾讯财经数据获取 # ───────────────────────────────────────────── def normalize_stock_code(code: str) -> dict: """标准化股票代码,支持港股/A股/美股。""" raw = code.strip().upper() if raw.endswith('.HK'): digits = raw[:-3].lstrip('0') or '0' return { 'market': 'HK', 'code': digits.zfill(4) + '.HK', 'tencent_symbol': 'hk' + digits.zfill(5), 'exchange': 'HKEX', } if raw.startswith(('SH', 'SZ')) and len(raw) == 8 and raw[2:].isdigit(): market = raw[:2] return { 'market': market, 'code': raw, 'tencent_symbol': raw.lower(), 'exchange': 'SSE' if market == 'SH' else 'SZSE', } if raw.endswith('.US'): symbol = raw[:-3] return { 'market': 'US', 'code': symbol, 'tencent_symbol': 'us' + symbol, 'exchange': 'US', } if raw.startswith('US.'): symbol = raw[3:] return { 'market': 'US', 'code': symbol, 'tencent_symbol': 'us' + symbol, 'exchange': 'US', } if raw.isdigit(): if len(raw) <= 5: digits = raw.lstrip('0') or '0' return { 'market': 'HK', 'code': digits.zfill(4) + '.HK', 'tencent_symbol': 'hk' + digits.zfill(5), 'exchange': 'HKEX', } if len(raw) == 6: market = 'SH' if raw.startswith(('5', '6', '9')) else 'SZ' return { 'market': market, 'code': market + raw, 'tencent_symbol': (market + raw).lower(), 'exchange': 'SSE' if market == 'SH' else 'SZSE', } symbol = raw.replace('.', '').replace('-', '') return { 'market': 'US', 'code': symbol, 'tencent_symbol': 'us' + symbol, 'exchange': 'US', } def fetch_tencent_quote(code: str) -> dict: """获取腾讯财经实时行情""" stock = normalize_stock_code(code) symbol = stock['tencent_symbol'] url = f"http://qt.gtimg.cn/q={symbol}" for attempt in range(MAX_RETRIES): try: req = urllib.request.Request(url, headers={ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Referer': 'https://gu.qq.com/', }) with urllib.request.urlopen(req, timeout=10) as response: data = response.read().decode('gb2312', errors='ignore') return _parse_tencent_quote(data, symbol, stock) except urllib.error.URLError as e: if attempt < MAX_RETRIES - 1: time.sleep(RETRY_BASE_DELAY * (attempt + 1)) else: raise Exception(f"获取实时行情失败: {e}") return {} def _parse_tencent_quote(data: str, symbol: str, stock: dict) -> dict: """解析腾讯财经实时行情响应""" var_name = f"v_{symbol}" for line in data.strip().split(";"): line = line.strip() if not line or var_name not in line: continue # 提取引号内的内容 parts = line.split('"') if len(parts) < 2: continue values = parts[1].split("~") if len(values) < 35: # 至少需要35个字段 continue def safe_float(idx: int, default: float = 0.0) -> float: try: return float(values[idx]) if values[idx] else default except (ValueError, IndexError): return default def safe_str(idx: int, default: str = "") -> str: return values[idx] if idx < len(values) else default # 字段映射 (根据腾讯财经API实际数据) # 0:市场 1:名称 2:代码 3:现价 4:昨收 5:今开 6:成交量 # 30:时间戳 31:涨跌额 32:涨跌幅 33:最高 34:最低 # 39:市盈率 47:市净率 37:总市值 48:52周高 49:52周低 market = stock['market'] currency = 'HKD' if market == 'HK' else ('CNY' if market in ('SH', 'SZ') else safe_str(35, 'USD') or 'USD') pb_idx = 47 if market in ('HK', 'US') else 46 market_cap_idx = 37 if market == 'HK' else (57 if market in ('SH', 'SZ') else 44) high_52_idx = 48 if market in ('HK', 'US') else 41 low_52_idx = 49 if market in ('HK', 'US') else 42 return { 'name': values[1], 'code': stock['code'], 'market': market, 'exchange': stock.get('exchange'), 'tencent_symbol': symbol, 'price': safe_float(3), 'prev_close': safe_float(4), 'open': safe_float(5), 'volume': safe_float(6), 'high': safe_float(33), 'low': safe_float(34), 'change_amount': safe_float(31), 'change_pct': safe_float(32), 'timestamp': safe_str(30), 'currency': currency, 'pe': safe_float(39) if len(values) > 39 else None, 'pb': safe_float(pb_idx) if len(values) > pb_idx else None, 'market_cap': safe_str(market_cap_idx), '52w_high': safe_float(high_52_idx) if len(values) > high_52_idx else None, '52w_low': safe_float(low_52_idx) if len(values) > low_52_idx else None, 'raw_code': safe_str(2), } return {} def fetch_tencent_kline(code: str, days: int = 120) -> pd.DataFrame: """获取腾讯财经K线数据""" stock = normalize_stock_code(code) symbol = stock['tencent_symbol'] url = f"https://web.ifzq.gtimg.cn/appstock/app/fqkline/get?param={symbol},day,,,{days},qfq" for attempt in range(MAX_RETRIES): try: req = urllib.request.Request(url, headers={ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36', 'Referer': 'https://gu.qq.com/', }) with urllib.request.urlopen(req, timeout=15) as response: data = json.loads(response.read().decode('utf-8')) return _parse_tencent_kline(data, symbol) except (urllib.error.URLError, json.JSONDecodeError) as e: if attempt < MAX_RETRIES - 1: time.sleep(RETRY_BASE_DELAY * (attempt + 1)) else: raise Exception(f"获取K线数据失败: {e}") return pd.DataFrame() def _parse_tencent_kline(data: dict, symbol: str) -> pd.DataFrame: """解析腾讯财经K线数据""" if data.get('code') != 0 or not data.get('data') or symbol not in data['data']: return pd.DataFrame() symbol_data = data['data'][symbol] day_data = symbol_data.get('day') or symbol_data.get('qfqday') or symbol_data.get('hfqday') or [] if not day_data: return pd.DataFrame() records = [] for item in day_data: if len(item) >= 6: records.append({ 'Date': item[0], 'Open': float(item[1]), 'Close': float(item[2]), 'Low': float(item[3]), 'High': float(item[4]), 'Volume': float(item[5]), }) df = pd.DataFrame(records) if not df.empty: df['Date'] = pd.to_datetime(df['Date']) df.set_index('Date', inplace=True) return df def fetch_us_kline_yahoo(symbol: str, period: str = '6mo') -> pd.DataFrame: range_map = { '1mo': '1mo', '3mo': '3mo', '6mo': '6mo', '1y': '1y', '2y': '2y', '5y': '5y', } url = f"https://query1.finance.yahoo.com/v8/finance/chart/{symbol}?range={range_map.get(period, '6mo')}&interval=1d&includePrePost=false" req = urllib.request.Request(url, headers={'User-Agent': 'Mozilla/5.0'}) with urllib.request.urlopen(req, timeout=20) as response: data = json.loads(response.read().decode('utf-8')) result = data.get('chart', {}).get('result', []) if not result: return pd.DataFrame() result = result[0] timestamps = result.get('timestamp') or [] quote = (result.get('indicators', {}).get('quote') or [{}])[0] opens = quote.get('open') or [] highs = quote.get('high') or [] lows = quote.get('low') or [] closes = quote.get('close') or [] volumes = quote.get('volume') or [] records = [] for i, ts in enumerate(timestamps): if i >= len(opens) or opens[i] is None or closes[i] is None or highs[i] is None or lows[i] is None: continue records.append({ 'Date': datetime.fromtimestamp(ts).strftime('%Y-%m-%d'), 'Open': float(opens[i]), 'Close': float(closes[i]), 'Low': float(lows[i]), 'High': float(highs[i]), 'Volume': float(volumes[i] or 0), }) df = pd.DataFrame(records) if not df.empty: df['Date'] = pd.to_datetime(df['Date']) df.set_index('Date', inplace=True) return df def period_to_days(period: str) -> int: """将周期字符串转换为天数""" mapping = { "1mo": 30, "3mo": 90, "6mo": 180, "1y": 250, "2y": 500, "5y": 1250, } return mapping.get(period, 180) def min_kline_points(required_days: int) -> int: return 20 if required_days <= 30 else 30 def refresh_kline_cache(code: str, required_days: int, period: str = '6mo') -> pd.DataFrame: """使用 SQLite 保存日线数据,并按需增量刷新。""" stock = normalize_stock_code(code) buffer_days = 30 latest_date = get_latest_kline_date(code) fetch_days = max(required_days + buffer_days, 60) if latest_date: latest_dt = datetime.strptime(latest_date, "%Y-%m-%d") missing_days = max((datetime.now() - latest_dt).days, 0) if missing_days <= 2: fetch_days = min(fetch_days, 60) else: fetch_days = max(missing_days + buffer_days, 60) fetched = fetch_tencent_kline(code, fetch_days) if stock['market'] == 'US' and len(fetched) <= 2: fetched = fetch_us_kline_yahoo(stock['code'], period) if not fetched.empty: upsert_kline_df(code, fetched, source='yahoo' if stock['market'] == 'US' and len(fetched) > 2 else 'tencent') hist = get_kline_df(code, required_days + buffer_days) if len(hist) < min_kline_points(required_days): fallback = fetch_tencent_kline(code, required_days + buffer_days) if stock['market'] == 'US' and len(fallback) <= 2: fallback = fetch_us_kline_yahoo(stock['code'], period) if not fallback.empty: upsert_kline_df(code, fallback, source='yahoo' if stock['market'] == 'US' and len(fallback) > 2 else 'tencent') hist = get_kline_df(code, required_days + buffer_days) return hist # ───────────────────────────────────────────── # 技术指标计算 (保持不变) # ───────────────────────────────────────────── def calc_ma(close: pd.Series, windows: list[int] = None) -> dict: """计算多周期移动平均线""" if windows is None: windows = [5, 10, 20, 60, 120, 250] result = {} for w in windows: if len(close) >= w: ma = close.rolling(window=w).mean() result[f"MA{w}"] = round(ma.iloc[-1], 3) return result def calc_ema(close: pd.Series, span: int) -> pd.Series: """计算指数移动平均线""" return close.ewm(span=span, adjust=False).mean() def calc_macd(close: pd.Series, fast: int = 12, slow: int = 26, signal: int = 9) -> dict: """计算MACD指标""" ema_fast = calc_ema(close, fast) ema_slow = calc_ema(close, slow) dif = ema_fast - ema_slow dea = dif.ewm(span=signal, adjust=False).mean() macd_hist = 2 * (dif - dea) return { "DIF": round(dif.iloc[-1], 4), "DEA": round(dea.iloc[-1], 4), "MACD": round(macd_hist.iloc[-1], 4), "signal": _macd_signal(dif, dea, macd_hist), } def _macd_signal(dif: pd.Series, dea: pd.Series, macd_hist: pd.Series) -> str: """MACD信号判断""" if len(dif) < 3: return "中性" if dif.iloc[-1] > dea.iloc[-1] and dif.iloc[-2] <= dea.iloc[-2]: return "金叉-买入信号" if dif.iloc[-1] < dea.iloc[-1] and dif.iloc[-2] >= dea.iloc[-2]: return "死叉-卖出信号" if dif.iloc[-1] > 0 and dea.iloc[-1] > 0: if macd_hist.iloc[-1] > macd_hist.iloc[-2]: return "多头增强" return "多头区域" if dif.iloc[-1] < 0 and dea.iloc[-1] < 0: if macd_hist.iloc[-1] < macd_hist.iloc[-2]: return "空头增强" return "空头区域" return "中性" def calc_rsi(close: pd.Series, periods: list[int] = None) -> dict: """计算RSI指标""" if periods is None: periods = [6, 12, 24] result = {} delta = close.diff() for p in periods: if len(close) < p + 1: continue gain = delta.clip(lower=0).rolling(window=p).mean() loss = (-delta.clip(upper=0)).rolling(window=p).mean() rs = gain / loss.replace(0, np.nan) rsi = 100 - (100 / (1 + rs)) val = round(rsi.iloc[-1], 2) result[f"RSI{p}"] = val rsi_main = result.get("RSI12", result.get("RSI6", 50)) if rsi_main > 80: result["signal"] = "严重超买-卖出信号" elif rsi_main > 70: result["signal"] = "超买-注意风险" elif rsi_main < 20: result["signal"] = "严重超卖-买入信号" elif rsi_main < 30: result["signal"] = "超卖-关注买入" else: result["signal"] = "中性" return result def calc_kdj(high: pd.Series, low: pd.Series, close: pd.Series, n: int = 9) -> dict: """计算KDJ指标""" if len(close) < n: return {"K": 50, "D": 50, "J": 50, "signal": "数据不足"} lowest_low = low.rolling(window=n).min() highest_high = high.rolling(window=n).max() rsv = (close - lowest_low) / (highest_high - lowest_low).replace(0, np.nan) * 100 k = pd.Series(index=close.index, dtype=float) d = pd.Series(index=close.index, dtype=float) k.iloc[n - 1] = 50 d.iloc[n - 1] = 50 for i in range(n, len(close)): k.iloc[i] = 2 / 3 * k.iloc[i - 1] + 1 / 3 * rsv.iloc[i] d.iloc[i] = 2 / 3 * d.iloc[i - 1] + 1 / 3 * k.iloc[i] j = 3 * k - 2 * d k_val = round(k.iloc[-1], 2) d_val = round(d.iloc[-1], 2) j_val = round(j.iloc[-1], 2) signal = "中性" if k_val > d_val and k.iloc[-2] <= d.iloc[-2]: signal = "金叉-买入信号" elif k_val < d_val and k.iloc[-2] >= d.iloc[-2]: signal = "死叉-卖出信号" elif j_val > 100: signal = "超买区域" elif j_val < 0: signal = "超卖区域" return {"K": k_val, "D": d_val, "J": j_val, "signal": signal} def calc_bollinger(close: pd.Series, window: int = 20, num_std: float = 2) -> dict: """计算布林带""" if len(close) < window: return {"signal": "数据不足"} ma = close.rolling(window=window).mean() std = close.rolling(window=window).std() upper = ma + num_std * std lower = ma - num_std * std current = close.iloc[-1] upper_val = round(upper.iloc[-1], 3) lower_val = round(lower.iloc[-1], 3) mid_val = round(ma.iloc[-1], 3) bandwidth = round((upper_val - lower_val) / mid_val * 100, 2) signal = "中性" if current > upper_val: signal = "突破上轨-超买" elif current < lower_val: signal = "突破下轨-超卖" elif current > mid_val: signal = "中轨上方-偏强" else: signal = "中轨下方-偏弱" return { "upper": upper_val, "middle": mid_val, "lower": lower_val, "bandwidth_pct": bandwidth, "signal": signal, } def calc_volume_analysis(volume: pd.Series, close: pd.Series) -> dict: """成交量分析""" if len(volume) < 20: return {"signal": "数据不足"} avg_5 = volume.rolling(5).mean().iloc[-1] avg_20 = volume.rolling(20).mean().iloc[-1] current = volume.iloc[-1] vol_ratio = round(current / avg_5, 2) if avg_5 > 0 else 0 price_change = close.iloc[-1] - close.iloc[-2] signal = "中性" if vol_ratio > 2 and price_change > 0: signal = "放量上涨-强势" elif vol_ratio > 2 and price_change < 0: signal = "放量下跌-弱势" elif vol_ratio < 0.5 and price_change > 0: signal = "缩量上涨-动力不足" elif vol_ratio < 0.5 and price_change < 0: signal = "缩量下跌-抛压减轻" return { "current_volume": int(current), "avg_5d_volume": int(avg_5), "avg_20d_volume": int(avg_20), "volume_ratio": vol_ratio, "signal": signal, } def calc_ma_trend(close: pd.Series) -> dict: """均线趋势分析""" mas = calc_ma(close, [5, 10, 20, 60]) current = close.iloc[-1] above_count = sum(1 for v in mas.values() if current > v) total = len(mas) if above_count == total and total > 0: signal = "多头排列-强势" elif above_count == 0: signal = "空头排列-弱势" elif above_count >= total * 0.7: signal = "偏多" elif above_count <= total * 0.3: signal = "偏空" else: signal = "震荡" return {**mas, "trend_signal": signal, "price_above_ma_count": f"{above_count}/{total}"} # ───────────────────────────────────────────── # 基本面分析 (基于腾讯数据) # ───────────────────────────────────────────── def get_fundamentals(quote: dict) -> dict: """基于实时行情数据的基本面分析""" result = {} pe = quote.get('pe') pb = quote.get('pb') result['PE'] = round(pe, 2) if pe else None result['PB'] = round(pb, 2) if pb else None result['PS'] = None result['market_cap'] = quote.get('market_cap', '') result['52w_high'] = quote.get('52w_high') result['52w_low'] = quote.get('52w_low') result['company_name'] = quote.get('name', '未知') result['sector'] = quote.get('market', '未知市场') result['industry'] = quote.get('exchange') or quote.get('market', '未知') result['currency'] = quote.get('currency', 'N/A') result['market'] = quote.get('market', 'N/A') result['fundamental_signal'] = _fundamental_signal(result) return result def _fundamental_signal(data: dict) -> str: """基本面信号判断 (简化版)""" score = 0 reasons = [] pe = data.get("PE") if pe is not None and pe > 0: if pe < 15: score += 2 reasons.append(f"PE低估值({pe})") elif pe < 25: score += 1 reasons.append(f"PE合理({pe})") elif pe > 40: score -= 1 reasons.append(f"PE偏高({pe})") pb = data.get("PB") if pb is not None: if pb < 1: score += 1 reasons.append(f"PB破净({pb})") elif pb > 5: score -= 1 reasons.append(f"PB偏高({pb})") if score >= 3: signal = "基本面优秀" elif score >= 1: signal = "基本面良好" elif score >= 0: signal = "基本面一般" else: signal = "基本面较差" return f"{signal} ({'; '.join(reasons[:3])})" if reasons else signal # ───────────────────────────────────────────── # 综合评分与建议 # ───────────────────────────────────────────── MARKET_PROFILES = { "HK": {"technical": 0.62, "fundamental": 0.38, "risk_penalty": 1.0}, "SH": {"technical": 0.58, "fundamental": 0.42, "risk_penalty": 0.9}, "SZ": {"technical": 0.60, "fundamental": 0.40, "risk_penalty": 1.0}, "US": {"technical": 0.55, "fundamental": 0.45, "risk_penalty": 0.85}, } def clamp(value: float, low: float, high: float) -> float: return max(low, min(high, value)) def detect_market_regime(hist: pd.DataFrame, technical: dict, quote: dict) -> dict: close = hist["Close"] ma20 = close.rolling(20).mean().iloc[-1] if len(close) >= 20 else close.iloc[-1] ma60 = close.rolling(60).mean().iloc[-1] if len(close) >= 60 else ma20 current = close.iloc[-1] rsi12 = technical.get("rsi", {}).get("RSI12", technical.get("rsi", {}).get("RSI6", 50)) high_52w = quote.get("52w_high") low_52w = quote.get("52w_low") pos_52w = None if high_52w and low_52w and high_52w != low_52w: pos_52w = (current - low_52w) / (high_52w - low_52w) if current > ma20 > ma60 and rsi12 >= 55: regime = "趋势延续" elif rsi12 <= 35 and technical.get("kdj", {}).get("J", 50) < 20: regime = "超跌反弹" elif pos_52w is not None and pos_52w > 0.85 and rsi12 >= 68: regime = "高位风险" elif abs(current / ma20 - 1) < 0.03 and 40 <= rsi12 <= 60: regime = "区间震荡" else: regime = "估值修复/等待确认" return {"regime": regime, "position_52w": round(pos_52w, 4) if pos_52w is not None else None} def compute_layer_scores(hist: pd.DataFrame, technical: dict, fundamental: dict, quote: dict) -> dict: close = hist["Close"] current = close.iloc[-1] ret_5 = (current / close.iloc[-6] - 1) if len(close) > 5 else 0 ret_20 = (current / close.iloc[-21] - 1) if len(close) > 20 else ret_5 ma = technical.get("ma_trend", {}) above = ma.get("price_above_ma_count", "0/1").split("/") above_ratio = (int(above[0]) / max(int(above[1]), 1)) if len(above) == 2 else 0 macd_sig = technical.get("macd", {}).get("signal", "") rsi = technical.get("rsi", {}).get("RSI12", technical.get("rsi", {}).get("RSI6", 50)) kdj_j = technical.get("kdj", {}).get("J", 50) volume_ratio = technical.get("volume", {}).get("volume_ratio", 1) boll_sig = technical.get("bollinger", {}).get("signal", "") pe = fundamental.get("PE") pb = fundamental.get("PB") high_52w = fundamental.get("52w_high") low_52w = fundamental.get("52w_low") pos_52w = 0.5 if high_52w and low_52w and high_52w != low_52w: pos_52w = clamp((quote.get("price", current) - low_52w) / (high_52w - low_52w), 0, 1) trend = (ret_20 * 100 * 0.6) + (above_ratio - 0.5) * 8 if "多头" in macd_sig or "金叉" in macd_sig: trend += 1.5 elif "空头" in macd_sig or "死叉" in macd_sig: trend -= 1.5 momentum = ret_5 * 100 * 0.8 momentum += 1.2 if volume_ratio > 1.5 and ret_5 > 0 else 0 momentum -= 1.2 if volume_ratio > 1.5 and ret_5 < 0 else 0 momentum += 0.8 if "金叉" in technical.get("kdj", {}).get("signal", "") else 0 momentum -= 0.8 if "死叉" in technical.get("kdj", {}).get("signal", "") else 0 risk = 0.0 if rsi > 75: risk -= 2.2 elif rsi < 28: risk += 1.0 if kdj_j > 100: risk -= 1.2 elif kdj_j < 0: risk += 0.8 if pos_52w > 0.88: risk -= 1.2 elif pos_52w < 0.18: risk += 0.8 if "突破上轨" in boll_sig: risk -= 0.8 elif "突破下轨" in boll_sig: risk += 0.6 valuation = 0.0 if pe is not None: if 0 < pe < 15: valuation += 2.0 elif pe < 25: valuation += 1.0 elif pe > 40: valuation -= 1.5 if pb is not None: if 0 < pb < 1: valuation += 1.0 elif pb > 6: valuation -= 1.0 relative_strength = clamp(ret_20 * 100 / 4, -3, 3) volume_structure = clamp((volume_ratio - 1.0) * 2, -2.5, 2.5) return { "trend": round(clamp(trend, -5, 5), 2), "momentum": round(clamp(momentum, -5, 5), 2), "risk": round(clamp(risk, -5, 5), 2), "valuation": round(clamp(valuation, -5, 5), 2), "relative_strength": round(relative_strength, 2), "volume_structure": round(volume_structure, 2), } def evaluate_signal_quality(layer_scores: dict) -> dict: positives = sum(1 for v in layer_scores.values() if v > 0.8) negatives = sum(1 for v in layer_scores.values() if v < -0.8) dispersion = max(layer_scores.values()) - min(layer_scores.values()) agreement = abs(positives - negatives) confidence = 40 + agreement * 8 - min(dispersion * 2.5, 18) confidence = int(clamp(confidence, 18, 92)) if confidence >= 72: level = "高" elif confidence >= 55: level = "中" else: level = "低" return {"score": confidence, "level": level, "positives": positives, "negatives": negatives} def backtest_current_signal(hist: pd.DataFrame, period: str) -> dict: horizons = [5, 10, 20] closes = hist["Close"].reset_index(drop=True) if len(closes) < 45: return {"samples": 0, "message": "历史样本不足"} current_ret20 = (closes.iloc[-1] / closes.iloc[-21] - 1) if len(closes) > 20 else 0 current_ret5 = (closes.iloc[-1] / closes.iloc[-6] - 1) if len(closes) > 5 else 0 matched = [] for i in range(25, len(closes) - 20): r20 = closes.iloc[i] / closes.iloc[i-20] - 1 r5 = closes.iloc[i] / closes.iloc[i-5] - 1 if abs(r20 - current_ret20) < 0.06 and abs(r5 - current_ret5) < 0.04: matched.append(i) if len(matched) < 5: return {"samples": len(matched), "message": "相似信号样本不足"} perf = {"samples": len(matched)} all_forward = [] for h in horizons: vals = [] for i in matched: if i + h < len(closes): vals.append(closes.iloc[i + h] / closes.iloc[i] - 1) if vals: perf[f"forward_{h}d_avg_pct"] = round(sum(vals) / len(vals) * 100, 2) perf[f"forward_{h}d_win_rate"] = round(sum(1 for x in vals if x > 0) / len(vals) * 100, 2) all_forward.extend(vals) if all_forward: perf["max_drawdown_proxy_pct"] = round(min(all_forward) * 100, 2) perf["period"] = period return perf def decide_action_type(regime: str, total_score: float, confidence: dict) -> tuple[str, str]: if total_score >= 4.5 and confidence["score"] >= 70: return "强烈买入", "趋势型买入" if regime == "趋势延续" else "高置信度买入" if total_score >= 2: if regime == "超跌反弹": return "买入", "超跌博弈型买入" return "买入", "趋势跟随型买入" if total_score <= -4.5 and confidence["score"] >= 70: return "强烈卖出", "风险规避型卖出" if total_score <= -2: return "卖出", "止盈/止损型卖出" return "持有/观望", "等待确认" def generate_recommendation(technical: dict, fundamental: dict, current_price: float, hist: pd.DataFrame, quote: dict) -> dict: market = quote.get("market", "HK") profile = MARKET_PROFILES.get(market, MARKET_PROFILES["HK"]) regime = detect_market_regime(hist, technical, quote) layer_scores = compute_layer_scores(hist, technical, fundamental, quote) confidence = evaluate_signal_quality(layer_scores) technical_bucket = ( layer_scores["trend"] * 0.35 + layer_scores["momentum"] * 0.25 + layer_scores["relative_strength"] * 0.20 + layer_scores["volume_structure"] * 0.20 ) fundamental_bucket = layer_scores["valuation"] risk_bucket = layer_scores["risk"] * profile["risk_penalty"] total_score = technical_bucket * profile["technical"] + fundamental_bucket * profile["fundamental"] + risk_bucket total_score = round(clamp(total_score, -8, 8), 2) action, action_type = decide_action_type(regime["regime"], total_score, confidence) icon_map = {"强烈买入": "🟢🟢", "买入": "🟢", "持有/观望": "🟡", "卖出": "🔴", "强烈卖出": "🔴🔴"} en_map = {"强烈买入": "STRONG_BUY", "买入": "BUY", "持有/观望": "HOLD", "卖出": "SELL", "强烈卖出": "STRONG_SELL"} icon = icon_map[action] key_signals = [ f"市场场景: {regime['regime']}", f"趋势层: {layer_scores['trend']}", f"动量层: {layer_scores['momentum']}", f"风险层: {layer_scores['risk']}", f"估值层: {layer_scores['valuation']}", f"置信度: {confidence['level']}({confidence['score']})", ] return { "action": action, "action_en": en_map[action], "action_type": action_type, "score": total_score, "icon": icon, "market_profile": market, "regime": regime, "layer_scores": layer_scores, "confidence": confidence, "key_signals": key_signals, "summary": f"{icon} {action} / {action_type} (综合评分: {total_score})", } # ───────────────────────────────────────────── # 主流程 # ───────────────────────────────────────────── def analyze_stock(code: str, period: str = "6mo", use_cache: bool = True) -> dict: """对单只股票进行完整分析""" init_db() stock = normalize_stock_code(code) full_code = stock['code'] if use_cache: cached = get_cached_analysis(full_code, period) if cached: print(f"📦 使用缓存数据 ({full_code}),缓存有效期 {ANALYSIS_CACHE_TTL}s", file=sys.stderr) return cached result = {"code": full_code, "market": stock['market'], "analysis_time": datetime.now().isoformat(), "error": None} try: quote = fetch_tencent_quote(full_code) if not quote or not quote.get("price"): result["error"] = f"无法获取 {full_code} 的实时行情" return result upsert_watchlist_item( code=full_code, market=quote.get('market', stock['market']), tencent_symbol=quote.get('tencent_symbol', stock['tencent_symbol']), name=quote.get('name'), exchange=quote.get('exchange', stock.get('exchange')), currency=quote.get('currency'), last_price=quote.get('price'), pe=quote.get('pe'), pb=quote.get('pb'), market_cap=quote.get('market_cap'), week52_high=quote.get('52w_high'), week52_low=quote.get('52w_low'), quote_time=quote.get('timestamp'), meta=quote, ) current_price = quote["price"] result["current_price"] = current_price result["price_date"] = quote.get("timestamp", "") result["price_change"] = quote.get("change_amount") result["price_change_pct"] = quote.get("change_pct") days = period_to_days(period) hist = refresh_kline_cache(full_code, days, period) if hist.empty or len(hist) < min_kline_points(days): result["error"] = f"无法获取 {full_code} 的历史K线数据 (仅获得 {len(hist)} 条)" return result result["data_points"] = len(hist) close = hist["Close"] high = hist["High"] low = hist["Low"] volume = hist["Volume"] technical = { "ma_trend": calc_ma_trend(close), "macd": calc_macd(close), "rsi": calc_rsi(close), "kdj": calc_kdj(high, low, close), "bollinger": calc_bollinger(close), "volume": calc_volume_analysis(volume, close), } result["technical"] = technical fundamental = get_fundamentals(quote) result["fundamental"] = fundamental result["recommendation"] = generate_recommendation(technical, fundamental, current_price, hist, quote) result["signal_validation"] = backtest_current_signal(hist, period) if result.get("error") is None: set_cached_analysis(full_code, period, result) except Exception as e: result["error"] = f"分析过程出错: {str(e)}" return result def main(): parser = argparse.ArgumentParser(description="多市场股票分析工具 (腾讯财经/Yahoo 数据源)") parser.add_argument("code", help="股票代码,如 0700.HK / 600519 / SH600519 / AAPL") parser.add_argument("--period", default="6mo", help="数据周期 (1mo/3mo/6mo/1y/2y/5y)") parser.add_argument("--output", help="输出JSON文件路径") parser.add_argument("--no-cache", action="store_true", help="跳过缓存,强制重新请求数据") parser.add_argument("--clear-cache", action="store_true", help="清除所有缓存后退出") args = parser.parse_args() if args.clear_cache: cleared = clear_analysis_cache() if cleared: print(f"✅ 已清除 {cleared} 条分析缓存") else: print("ℹ️ 无缓存可清除") return result = analyze_stock(args.code, args.period, use_cache=not args.no_cache) output = json.dumps(result, ensure_ascii=False, indent=2, default=str) if args.output: with open(args.output, "w", encoding="utf-8") as f: f.write(output) print(f"分析结果已保存至 {args.output}") else: print(output) if __name__ == "__main__": main()