diff --git a/scripts/analyze_stock.py b/scripts/analyze_stock.py index efc6efa..bb551fd 100755 --- a/scripts/analyze_stock.py +++ b/scripts/analyze_stock.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 """ -港股分析脚本 - 获取港股数据并进行技术面+基本面分析,给出操作建议。 +港股分析脚本 - 使用腾讯财经数据源进行技术面+基本面分析 用法: python3 analyze_stock.py <股票代码> [--period <周期>] [--output <输出文件>] @@ -9,9 +9,6 @@ python3 analyze_stock.py 0700.HK python3 analyze_stock.py 0700.HK --period 6mo --output report.json python3 analyze_stock.py 9988.HK --period 1y - -股票代码格式: 数字.HK (如 0700.HK 腾讯控股, 9988.HK 阿里巴巴) -周期选项: 1mo, 3mo, 6mo, 1y, 2y, 5y (默认 6mo) """ import sys @@ -19,15 +16,11 @@ import json import argparse import time import hashlib +import urllib.request +import urllib.error from datetime import datetime, timedelta from pathlib import Path -try: - import yfinance as yf -except ImportError: - print("ERROR: yfinance 未安装。请运行: pip3 install yfinance", file=sys.stderr) - sys.exit(1) - try: import numpy as np except ImportError: @@ -42,13 +35,13 @@ except ImportError: # ───────────────────────────────────────────── -# 缓存与重试机制(解决 Yahoo Finance 限频问题) +# 缓存与重试机制 # ───────────────────────────────────────────── CACHE_DIR = Path.home() / ".stock_buddy_cache" -CACHE_TTL_SECONDS = 600 # 缓存有效期 10 分钟(同一股票短时间内不重复请求) -MAX_RETRIES = 4 # 最大重试次数 -RETRY_BASE_DELAY = 5 # 重试基础延迟(秒),指数退避: 5s, 10s, 20s, 40s +CACHE_TTL_SECONDS = 600 # 缓存有效期 10 分钟 +MAX_RETRIES = 3 +RETRY_BASE_DELAY = 2 def _cache_key(code: str, period: str) -> str: @@ -58,7 +51,7 @@ def _cache_key(code: str, period: str) -> str: def _read_cache(code: str, period: str) -> dict | None: - """读取缓存,若未过期则返回缓存数据""" + """读取缓存""" cache_file = CACHE_DIR / _cache_key(code, period) if not cache_file.exists(): return None @@ -82,37 +75,159 @@ def _write_cache(code: str, period: str, data: dict): with open(cache_file, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2, default=str) except OSError: - pass # 缓存写入失败不影响主流程 - - -def _retry_request(func, *args, max_retries=MAX_RETRIES, **kwargs): - """ - 带指数退避的重试包装器。 - 捕获 Yahoo Finance 限频错误并自动重试。 - """ - last_error = None - for attempt in range(max_retries): - try: - return func(*args, **kwargs) - except Exception as e: - last_error = e - error_msg = str(e).lower() - # 仅对限频/网络类错误重试 - is_rate_limit = any(kw in error_msg for kw in [ - "rate limit", "too many requests", "429", "throttl", - "connection", "timeout", "timed out", - ]) - if not is_rate_limit: - raise # 非限频错误直接抛出 - if attempt < max_retries - 1: - delay = RETRY_BASE_DELAY * (2 ** attempt) # 3s, 6s, 12s - print(f"⏳ 请求被限频,{delay}秒后第{attempt+2}次重试...", file=sys.stderr) - time.sleep(delay) - raise last_error + pass # ───────────────────────────────────────────── -# 技术指标计算 +# 腾讯财经数据获取 +# ───────────────────────────────────────────── + +def normalize_hk_code(code: str) -> tuple[str, str]: + """标准化港股代码,返回 (原始数字代码, 带.HK后缀代码)""" + code = code.strip().upper().replace(".HK", "") + digits = code.lstrip("0") + if digits.isdigit(): + numeric_code = code.zfill(4) + return numeric_code, numeric_code + ".HK" + return code, code + ".HK" + + +def fetch_tencent_quote(code: str) -> dict: + """获取腾讯财经实时行情""" + numeric_code, full_code = normalize_hk_code(code) + url = f"http://qt.gtimg.cn/q=hk{numeric_code}" + + for attempt in range(MAX_RETRIES): + try: + req = urllib.request.Request(url, headers={ + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" + }) + with urllib.request.urlopen(req, timeout=10) as response: + data = response.read().decode("gb2312", errors="ignore") + return _parse_tencent_quote(data, numeric_code) + except urllib.error.URLError as e: + if attempt < MAX_RETRIES - 1: + time.sleep(RETRY_BASE_DELAY * (attempt + 1)) + else: + raise Exception(f"获取实时行情失败: {e}") + return {} + + +def _parse_tencent_quote(data: str, code: str) -> dict: + """解析腾讯财经实时行情响应""" + var_name = f"v_hk{code}" + for line in data.strip().split(";"): + line = line.strip() + if not line or var_name not in line: + continue + # 提取引号内的内容 + parts = line.split('"') + if len(parts) < 2: + continue + values = parts[1].split("~") + if len(values) < 35: # 至少需要35个字段 + continue + + def safe_float(idx: int, default: float = 0.0) -> float: + try: + return float(values[idx]) if values[idx] else default + except (ValueError, IndexError): + return default + + def safe_str(idx: int, default: str = "") -> str: + return values[idx] if idx < len(values) else default + + # 字段映射 (根据腾讯财经API实际数据) + # 0:市场 1:名称 2:代码 3:现价 4:昨收 5:今开 6:成交量 + # 30:时间戳 31:涨跌额 32:涨跌幅 33:最高 34:最低 + # 39:市盈率 47:市净率 37:总市值 48:52周高 49:52周低 + return { + "name": values[1], + "code": values[2], + "price": safe_float(3), + "prev_close": safe_float(4), + "open": safe_float(5), + "volume": safe_float(6), + "high": safe_float(33), + "low": safe_float(34), + "change_amount": safe_float(31), + "change_pct": safe_float(32), + "timestamp": safe_str(30), + "pe": safe_float(39) if len(values) > 39 else None, + "pb": safe_float(47) if len(values) > 47 else None, + "market_cap": safe_str(37), + "52w_high": safe_float(48) if len(values) > 48 else None, + "52w_low": safe_float(49) if len(values) > 49 else None, + } + return {} + + +def fetch_tencent_kline(code: str, days: int = 120) -> pd.DataFrame: + """获取腾讯财经K线数据""" + numeric_code, full_code = normalize_hk_code(code) + url = f"https://web.ifzq.gtimg.cn/appstock/app/fqkline/get?param=hk{numeric_code},day,,,{days},qfq" + + for attempt in range(MAX_RETRIES): + try: + req = urllib.request.Request(url, headers={ + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" + }) + with urllib.request.urlopen(req, timeout=15) as response: + data = json.loads(response.read().decode("utf-8")) + return _parse_tencent_kline(data, numeric_code) + except (urllib.error.URLError, json.JSONDecodeError) as e: + if attempt < MAX_RETRIES - 1: + time.sleep(RETRY_BASE_DELAY * (attempt + 1)) + else: + raise Exception(f"获取K线数据失败: {e}") + return pd.DataFrame() + + +def _parse_tencent_kline(data: dict, code: str) -> pd.DataFrame: + """解析腾讯财经K线数据""" + key = f"hk{code}" + if data.get("code") != 0 or not data.get("data") or key not in data["data"]: + return pd.DataFrame() + + day_data = data["data"][key].get("day", []) + if not day_data: + return pd.DataFrame() + + # 格式: [日期, 开盘价, 收盘价, 最低价, 最高价, 成交量] + records = [] + for item in day_data: + if len(item) >= 6: + records.append({ + "Date": item[0], + "Open": float(item[1]), + "Close": float(item[2]), + "Low": float(item[3]), + "High": float(item[4]), + "Volume": float(item[5]), + }) + + df = pd.DataFrame(records) + if not df.empty: + df["Date"] = pd.to_datetime(df["Date"]) + df.set_index("Date", inplace=True) + return df + + +def period_to_days(period: str) -> int: + """将周期字符串转换为天数""" + mapping = { + "1mo": 30, + "3mo": 90, + "6mo": 180, + "1y": 250, + "2y": 500, + "5y": 1250, + } + return mapping.get(period, 180) + + +# ───────────────────────────────────────────── +# 技术指标计算 (保持不变) # ───────────────────────────────────────────── def calc_ma(close: pd.Series, windows: list[int] = None) -> dict: @@ -152,18 +267,14 @@ def _macd_signal(dif: pd.Series, dea: pd.Series, macd_hist: pd.Series) -> str: """MACD信号判断""" if len(dif) < 3: return "中性" - # 金叉:DIF上穿DEA if dif.iloc[-1] > dea.iloc[-1] and dif.iloc[-2] <= dea.iloc[-2]: return "金叉-买入信号" - # 死叉:DIF下穿DEA if dif.iloc[-1] < dea.iloc[-1] and dif.iloc[-2] >= dea.iloc[-2]: return "死叉-卖出信号" - # 零轴上方 if dif.iloc[-1] > 0 and dea.iloc[-1] > 0: if macd_hist.iloc[-1] > macd_hist.iloc[-2]: return "多头增强" return "多头区域" - # 零轴下方 if dif.iloc[-1] < 0 and dea.iloc[-1] < 0: if macd_hist.iloc[-1] < macd_hist.iloc[-2]: return "空头增强" @@ -186,7 +297,6 @@ def calc_rsi(close: pd.Series, periods: list[int] = None) -> dict: rsi = 100 - (100 / (1 + rs)) val = round(rsi.iloc[-1], 2) result[f"RSI{p}"] = val - # 综合信号 rsi_main = result.get("RSI12", result.get("RSI6", 50)) if rsi_main > 80: result["signal"] = "严重超买-卖出信号" @@ -321,135 +431,64 @@ def calc_ma_trend(close: pd.Series) -> dict: # ───────────────────────────────────────────── -# 基本面分析 +# 基本面分析 (基于腾讯数据) # ───────────────────────────────────────────── -def get_fundamentals(ticker: yf.Ticker) -> dict: - """获取基本面数据""" - info = ticker.info +def get_fundamentals(quote: dict) -> dict: + """基于实时行情数据的基本面分析""" result = {} - - # 估值指标 - pe = info.get("trailingPE") or info.get("forwardPE") - pb = info.get("priceToBook") - ps = info.get("priceToSalesTrailing12Months") + + # 估值指标 (腾讯提供的) + pe = quote.get("pe") + pb = quote.get("pb") result["PE"] = round(pe, 2) if pe else None result["PB"] = round(pb, 2) if pb else None - result["PS"] = round(ps, 2) if ps else None - - # 股息 (yfinance 有时返回异常值,限制在合理范围) - div_yield = info.get("dividendYield") - if div_yield is not None and 0 < div_yield < 1: - result["dividend_yield_pct"] = round(div_yield * 100, 2) - elif div_yield is not None and div_yield >= 1: - # 可能已经是百分比形式 - result["dividend_yield_pct"] = round(div_yield, 2) if div_yield < 30 else None - else: - result["dividend_yield_pct"] = None - + result["PS"] = None # 腾讯不提供 + # 市值 - market_cap = info.get("marketCap") - if market_cap: - if market_cap >= 1e12: - result["market_cap"] = f"{market_cap/1e12:.2f} 万亿" - elif market_cap >= 1e8: - result["market_cap"] = f"{market_cap/1e8:.2f} 亿" - else: - result["market_cap"] = f"{market_cap:,.0f}" - else: - result["market_cap"] = None - - # 盈利能力 - result["profit_margin_pct"] = round(info.get("profitMargins", 0) * 100, 2) if info.get("profitMargins") else None - result["roe_pct"] = round(info.get("returnOnEquity", 0) * 100, 2) if info.get("returnOnEquity") else None - result["roa_pct"] = round(info.get("returnOnAssets", 0) * 100, 2) if info.get("returnOnAssets") else None - - # 增长指标 - result["revenue_growth_pct"] = round(info.get("revenueGrowth", 0) * 100, 2) if info.get("revenueGrowth") else None - result["earnings_growth_pct"] = round(info.get("earningsGrowth", 0) * 100, 2) if info.get("earningsGrowth") else None - - # 负债 - result["debt_to_equity"] = round(info.get("debtToEquity", 0), 2) if info.get("debtToEquity") else None - + result["market_cap"] = quote.get("market_cap", "") + # 52周价格区间 - result["52w_high"] = info.get("fiftyTwoWeekHigh") - result["52w_low"] = info.get("fiftyTwoWeekLow") - result["50d_avg"] = info.get("fiftyDayAverage") - result["200d_avg"] = info.get("twoHundredDayAverage") - + result["52w_high"] = quote.get("52w_high") + result["52w_low"] = quote.get("52w_low") + # 公司信息 - result["company_name"] = info.get("longName") or info.get("shortName", "未知") - result["sector"] = info.get("sector", "未知") - result["industry"] = info.get("industry", "未知") - result["currency"] = info.get("currency", "HKD") - - # 基本面评分 + result["company_name"] = quote.get("name", "未知") + result["sector"] = "港股" + result["industry"] = "港股" + result["currency"] = "HKD" + + # 基本面信号 result["fundamental_signal"] = _fundamental_signal(result) - + return result def _fundamental_signal(data: dict) -> str: - """基本面信号判断""" + """基本面信号判断 (简化版)""" score = 0 reasons = [] - # PE 评估 pe = data.get("PE") - if pe is not None: - if pe < 0: - score -= 1 - reasons.append("PE为负(亏损)") - elif pe < 15: + if pe is not None and pe > 0: + if pe < 15: score += 2 - reasons.append("PE低估值") + reasons.append(f"PE低估值({pe})") elif pe < 25: score += 1 - reasons.append("PE合理") + reasons.append(f"PE合理({pe})") elif pe > 40: score -= 1 - reasons.append("PE高估") + reasons.append(f"PE偏高({pe})") - # PB 评估 pb = data.get("PB") if pb is not None: if pb < 1: score += 1 - reasons.append("PB破净") + reasons.append(f"PB破净({pb})") elif pb > 5: score -= 1 - - # 股息率 - div = data.get("dividend_yield_pct") - if div is not None and div > 3: - score += 1 - reasons.append(f"高股息{div}%") - - # ROE - roe = data.get("roe_pct") - if roe is not None: - if roe > 15: - score += 1 - reasons.append("ROE优秀") - elif roe < 5: - score -= 1 - - # 增长 - rev_growth = data.get("revenue_growth_pct") - if rev_growth is not None and rev_growth > 10: - score += 1 - reasons.append("收入增长良好") - - earnings_growth = data.get("earnings_growth_pct") - if earnings_growth is not None and earnings_growth > 15: - score += 1 - reasons.append("利润增长强劲") - - # 负债 - de = data.get("debt_to_equity") - if de is not None and de > 200: - score -= 1 - reasons.append("负债率偏高") + reasons.append(f"PB偏高({pb})") if score >= 3: signal = "基本面优秀" @@ -460,7 +499,7 @@ def _fundamental_signal(data: dict) -> str: else: signal = "基本面较差" - return f"{signal} ({'; '.join(reasons[:4])})" if reasons else signal + return f"{signal} ({'; '.join(reasons[:3])})" if reasons else signal # ───────────────────────────────────────────── @@ -469,10 +508,10 @@ def _fundamental_signal(data: dict) -> str: def generate_recommendation(technical: dict, fundamental: dict, current_price: float) -> dict: """综合技术面和基本面给出操作建议""" - score = 0 # 范围大约 -10 到 +10 + score = 0 signals = [] - # ── 技术面评分 ── + # 技术面评分 macd_sig = technical.get("macd", {}).get("signal", "") if "买入" in macd_sig or "金叉" in macd_sig: score += 2 @@ -531,7 +570,7 @@ def generate_recommendation(technical: dict, fundamental: dict, current_price: f score -= 1 signals.append(f"成交量: {vol_sig}") - # ── 基本面评分 ── + # 基本面评分 fund_sig = fundamental.get("fundamental_signal", "") if "优秀" in fund_sig: score += 2 @@ -557,7 +596,7 @@ def generate_recommendation(technical: dict, fundamental: dict, current_price: f else: signals.append(f"52周位置: {position:.0%}") - # ── 映射到操作建议 ── + # 映射到操作建议 if score >= 5: action = "强烈买入" action_en = "STRONG_BUY" @@ -593,82 +632,47 @@ def generate_recommendation(technical: dict, fundamental: dict, current_price: f # 主流程 # ───────────────────────────────────────────── -def normalize_hk_code(code: str) -> str: - """标准化港股代码""" - code = code.strip().upper() - if not code.endswith(".HK"): - # 尝试补全 - digits = code.lstrip("0") - if digits.isdigit(): - code = code.zfill(4) + ".HK" - return code - - def analyze_stock(code: str, period: str = "6mo", use_cache: bool = True) -> dict: - """对单只港股进行完整分析(内置缓存 + 自动重试)""" - code = normalize_hk_code(code) - - # 1. 尝试读取缓存 + """对单只港股进行完整分析""" + numeric_code, full_code = normalize_hk_code(code) + if use_cache: - cached = _read_cache(code, period) + cached = _read_cache(full_code, period) if cached: - print(f"📦 使用缓存数据 ({code}),缓存有效期 {CACHE_TTL_SECONDS}s", file=sys.stderr) + print(f"📦 使用缓存数据 ({full_code}),缓存有效期 {CACHE_TTL_SECONDS}s", file=sys.stderr) return cached - result = {"code": code, "analysis_time": datetime.now().isoformat(), "error": None} + result = {"code": full_code, "analysis_time": datetime.now().isoformat(), "error": None} try: - ticker = yf.Ticker(code) - - # 2. 带重试的数据获取(限频时可能返回空数据或抛异常) - hist = None - for attempt in range(MAX_RETRIES): - try: - hist = ticker.history(period=period) - if hist is not None and not hist.empty: - break # 成功获取数据 - # 空数据可能是限频导致,重试 - if attempt < MAX_RETRIES - 1: - delay = RETRY_BASE_DELAY * (2 ** attempt) - print(f"⏳ 数据为空(可能限频),{delay}秒后第{attempt+2}次重试...", file=sys.stderr) - time.sleep(delay) - ticker = yf.Ticker(code) # 重新创建 ticker 对象 - except Exception as e: - error_msg = str(e).lower() - is_retriable = any(kw in error_msg for kw in [ - "rate limit", "too many requests", "429", "throttl", - "connection", "timeout", "timed out", - ]) - if not is_retriable or attempt >= MAX_RETRIES - 1: - raise - delay = RETRY_BASE_DELAY * (2 ** attempt) - print(f"⏳ 请求被限频,{delay}秒后第{attempt+2}次重试...", file=sys.stderr) - time.sleep(delay) - ticker = yf.Ticker(code) - - if hist is None or hist.empty: - result["error"] = f"无法获取 {code} 的历史数据,请检查股票代码是否正确" + # 1. 获取实时行情 + quote = fetch_tencent_quote(numeric_code) + if not quote or not quote.get("price"): + result["error"] = f"无法获取 {full_code} 的实时行情" return result + + current_price = quote["price"] + result["current_price"] = current_price + result["price_date"] = quote.get("timestamp", "") + result["price_change"] = quote.get("change_amount") + result["price_change_pct"] = quote.get("change_pct") + + # 2. 获取K线数据 + days = period_to_days(period) + hist = fetch_tencent_kline(numeric_code, days) + + if hist.empty or len(hist) < 30: + result["error"] = f"无法获取 {full_code} 的历史K线数据 (仅获得 {len(hist)} 条)" + return result + + result["data_points"] = len(hist) close = hist["Close"] high = hist["High"] low = hist["Low"] volume = hist["Volume"] - current_price = round(close.iloc[-1], 3) - result["current_price"] = current_price - result["price_date"] = str(hist.index[-1].date()) - result["data_points"] = len(hist) - - # 价格变动 - if len(close) > 1: - prev_close = close.iloc[-2] - change = current_price - prev_close - change_pct = change / prev_close * 100 - result["price_change"] = round(change, 3) - result["price_change_pct"] = round(change_pct, 2) - - # 技术分析 + # 3. 技术分析 technical = {} technical["ma_trend"] = calc_ma_trend(close) technical["macd"] = calc_macd(close) @@ -678,20 +682,16 @@ def analyze_stock(code: str, period: str = "6mo", use_cache: bool = True) -> dic technical["volume"] = calc_volume_analysis(volume, close) result["technical"] = technical - # 3. 带重试的基本面数据获取 - try: - fundamental = _retry_request(get_fundamentals, ticker) - result["fundamental"] = fundamental - except Exception as e: - result["fundamental"] = {"error": str(e), "fundamental_signal": "数据获取失败"} - fundamental = result["fundamental"] + # 4. 基本面分析 + fundamental = get_fundamentals(quote) + result["fundamental"] = fundamental - # 综合建议 + # 5. 综合建议 result["recommendation"] = generate_recommendation(technical, fundamental, current_price) - # 4. 写入缓存(仅成功分析时) + # 6. 写入缓存 if result.get("error") is None: - _write_cache(code, period, result) + _write_cache(full_code, period, result) except Exception as e: result["error"] = f"分析过程出错: {str(e)}" @@ -700,15 +700,14 @@ def analyze_stock(code: str, period: str = "6mo", use_cache: bool = True) -> dic def main(): - parser = argparse.ArgumentParser(description="港股分析工具") - parser.add_argument("code", help="港股代码 (如 0700.HK)") + parser = argparse.ArgumentParser(description="港股分析工具 (腾讯财经数据源)") + parser.add_argument("code", help="港股代码 (如 0700.HK, 00700, 腾讯)") parser.add_argument("--period", default="6mo", help="数据周期 (1mo/3mo/6mo/1y/2y/5y)") parser.add_argument("--output", help="输出JSON文件路径") parser.add_argument("--no-cache", action="store_true", help="跳过缓存,强制重新请求数据") parser.add_argument("--clear-cache", action="store_true", help="清除所有缓存后退出") args = parser.parse_args() - # 清除缓存 if args.clear_cache: import shutil if CACHE_DIR.exists():