add resilient CN market fallbacks and source metadata

This commit is contained in:
root
2026-04-01 00:12:12 +08:00
parent da951705bc
commit 2d8aae25de
4 changed files with 388 additions and 9 deletions

View File

@@ -133,3 +133,4 @@ bash {{SKILL_DIR}}/scripts/install_deps.sh
| `scripts/install_deps.sh` | Python 依赖安装脚本 |
| `references/technical_indicators.md` | 技术指标详解和评分标准 |
| `references/output_templates.md` | 分析报告输出模板 |
| `references/data-source-roadmap.md` | 数据源升级路线图:主源 / fallback / 事件层规划;仅在需要扩展数据源或接入事件信息时读取 |

View File

@@ -0,0 +1,206 @@
# StockBuddy 数据源升级方案Phase 1
## 目标
在不显著增加复杂度的前提下,提升 StockBuddy 对 A 股、港股、美股的:
- 数据可用性
- 数据源容错能力
- 报告可解释性
- 后续接入事件层的扩展性
当前原则:
- 先做 **行情/基础面多源容错**
- 再做 **事件提示层**
- 暂不把新闻/社媒直接并入总评分
---
## 已实测可用的免费数据源
### 行情 / K 线 / 基础字段
1. **腾讯财经**
- 适用A 股、港股;美股实时快照也可用
- 实测quote 可用K 线可用
- 用途:主行情源
2. **Yahoo Finance**
- 适用:美股
- 实测chart 接口可用
- 用途:美股 K 线主源 / fallback
3. **东方财富部分行情接口**
- 适用A 股
- 实测JSON 行情接口可用
- 用途A 股补充行情源 / fallback
### 公告 / 事件层
1. **SEC**
- 适用:美股
- 实测JSON submissions 可用
- 用途:美股公告与硬信息源
2. **HKEX 披露网页**
- 适用:港股
- 实测:搜索页可访问
- 用途:港股公告层(需后续结构化适配)
3. **巨潮资讯**
- 适用A 股
- 实测:主页可访问;查询接口需单独适配
- 用途A 股公告层
---
## 不建议当前作为主源的来源
1. **新浪财经**
- 实测出现 403
- 不适合作为稳定主源
2. **雪球 / 股吧 / 其他社区舆情**
- 可访问性和结构稳定性不如行情源
- 更适合后续做热度/风险雷达,不适合先进入主链路
---
## Phase 1 架构
### 1. 行情主链路
#### A 股 / 港股
- 主源:腾讯财经
- A 股 fallback东方财富
- 输出:
- 最新价
- 涨跌幅
- K 线
- PE / PB / 市值 / 52 周高低点(能取则取)
#### 美股
- 实时快照:腾讯财经可保留尝试
- K 线主源Yahoo Finance
- fallback后续可考虑 Finnhub / Twelve Data若未来接受 API key
### 2. 事件层(先只做扩展位,不进总分)
- A 股:巨潮资讯 / 交易所公告
- 港股HKEX
- 美股SEC
输出方式:
- 报告新增 “近期关键事件” 小节
- 默认只摘要 1-3 条
- 只做说明,不直接改变买卖评级
### 3. 社媒层(暂缓)
- 国内:雪球 / 股吧
- 海外X / Reddit / Stocktwits
定位:
- 仅做热度、风险、情绪极端提示
- 不纳入核心评分
---
## 建议的代码改造顺序
### Step 1行情源抽象
将当前 `analyze_stock.py` 中的数据获取逻辑拆成可替换函数层:
- `fetch_quote_primary()`
- `fetch_quote_fallback()`
- `fetch_kline_primary()`
- `fetch_kline_fallback()`
建议后续拆到独立模块,例如:
- `scripts/providers/tencent.py`
- `scripts/providers/eastmoney.py`
- `scripts/providers/yahoo.py`
- `scripts/providers/sec.py`(后续事件层)
### Step 2统一数据模型
无论来源是什么,最终统一成内部字段:
- `price`
- `change_pct`
- `volume`
- `pe`
- `pb`
- `market_cap`
- `52w_high`
- `52w_low`
- `currency`
- `source`
并允许部分字段缺失,避免某一源缺字段时整个分析失败。
### Step 3报告补充数据来源
在最终输出中增加轻量数据源标记,例如:
- 行情源:腾讯财经
- K线源腾讯财经 / Yahoo Finance
- 事件源SEC / HKEX / 巨潮(未来)
用途:
- 方便排查数据异常
- 提高报告透明度
### Step 4事件层接入
在行情分析后,按市场类型追加事件抓取:
- A 股 -> 巨潮
- 港股 -> HKEX
- 美股 -> SEC
若事件抓取失败:
- 不影响主分析
- 仅在报告中省略事件小节或标注“暂无近期关键事件”
---
## 报告层建议
默认模板继续保持简洁,但可新增一个可选区块:
### 🧾 数据来源
- 行情:{quote_source}
- K线{kline_source}
- 事件:{event_source 或 暂无}
### 📰 近期关键事件(后续接入)
- {事件1}
- {事件2}
- {事件3}
要求:
- 事件层不抢主体位置
- 不让报告变成资讯聚合页
- 保持 Telegram 等 IM 场景的短消息可读性
---
## Phase 1 成功标准
满足以下条件即可认为第一阶段完成:
1. A 股分析:腾讯失败时可自动尝试东方财富补位
2. 美股分析Yahoo K 线链路保持稳定
3. 输出中可看到实际使用的数据源
4. 单一来源失败不导致整份分析直接不可用
5. 事件层接口预留清晰,但暂不强耦合评分逻辑
---
## 后续 Phase 2 / 3
### Phase 2事件层
- 接入 SEC / HKEX / 巨潮
- 报告增加“近期关键事件”摘要
### Phase 3舆情雷达
- 接入雪球 / X / Reddit 等热度监测
- 只做“情绪过热 / 异常传播 / 风险提示”
- 不直接进入买卖总分

View File

@@ -12,6 +12,11 @@
- **分析时间**{时间}
- **数据周期**{周期}
### 🧾 数据来源
- **行情**{行情源}
- **K线**{K线源}
- **事件**{事件源}
---
### {建议图标} 操作建议:{操作建议}
@@ -116,6 +121,7 @@
## 模板使用说明
- 所有 `{占位符}` 根据脚本返回的 JSON 数据填充。
- `{行情源}` / `{K线源}` 使用 `data_sources` 中的实际来源;若事件层尚未接入,`{事件源}``暂无`
- 最终输出必须是标准 Markdown 正文,不要放进 ``` 代码块。
- 优先使用短段落、项目符号、卡片式结构;除非用户明确要求,否则尽量不要使用宽表格。
- Telegram 等 IM 场景下,优先保证手机端可读性,避免一行承载过多字段。

View File

@@ -163,6 +163,97 @@ def fetch_tencent_quote(code: str) -> dict:
return {}
def fetch_eastmoney_quote(code: str) -> dict:
"""获取东方财富实时行情(当前仅作为 A 股 fallback"""
stock = normalize_stock_code(code)
if stock['market'] not in ('SH', 'SZ') or len(stock['code']) != 8:
return {}
raw_code = stock['code'][2:]
secid_prefix = '1' if stock['market'] == 'SH' else '0'
fields = 'f43,f44,f45,f46,f57,f58,f60,f116,f164,f167,f168,f169,f170'
url = f"https://push2.eastmoney.com/api/qt/stock/get?secid={secid_prefix}.{raw_code}&fields={fields}"
def scaled(value, digits=100):
if value in (None, ''):
return None
try:
return round(float(value) / digits, 2)
except (TypeError, ValueError):
return None
for attempt in range(MAX_RETRIES):
try:
req = urllib.request.Request(url, headers={
'User-Agent': 'Mozilla/5.0',
'Referer': 'https://quote.eastmoney.com/',
})
with urllib.request.urlopen(req, timeout=10) as response:
payload = json.loads(response.read().decode('utf-8'))
data = payload.get('data') or {}
price = scaled(data.get('f43'))
if not price:
return {}
change_amount = scaled(data.get('f169'))
change_pct = scaled(data.get('f170'))
return {
'name': data.get('f58') or stock['code'],
'code': stock['code'],
'market': stock['market'],
'exchange': stock.get('exchange'),
'tencent_symbol': stock['tencent_symbol'],
'price': price,
'prev_close': scaled(data.get('f60')),
'open': scaled(data.get('f46')),
'volume': None,
'high': scaled(data.get('f44')),
'low': scaled(data.get('f45')),
'change_amount': change_amount,
'change_pct': change_pct,
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'currency': 'CNY',
'pe': scaled(data.get('f164')),
'pb': scaled(data.get('f167')),
'market_cap': data.get('f116'),
'52w_high': None,
'52w_low': None,
'raw_code': data.get('f57') or raw_code,
'quote_source': 'eastmoney',
}
except (urllib.error.URLError, json.JSONDecodeError) as e:
if attempt < MAX_RETRIES - 1:
time.sleep(RETRY_BASE_DELAY * (attempt + 1))
else:
raise Exception(f"获取东方财富行情失败: {e}")
return {}
def fetch_quote_with_fallback(code: str) -> dict:
"""先取主源A 股主源失败时自动尝试东方财富。"""
stock = normalize_stock_code(code)
errors = []
try:
quote = fetch_tencent_quote(code)
if quote and quote.get('price'):
quote.setdefault('quote_source', 'tencent')
return quote
except Exception as e:
errors.append(f"tencent={e}")
if stock['market'] in ('SH', 'SZ'):
try:
quote = fetch_eastmoney_quote(code)
if quote and quote.get('price'):
return quote
except Exception as e:
errors.append(f"eastmoney={e}")
if errors:
raise Exception('; '.join(errors))
return {}
def _parse_tencent_quote(data: str, symbol: str, stock: dict) -> dict:
"""解析腾讯财经实时行情响应"""
var_name = f"v_{symbol}"
@@ -275,6 +366,63 @@ def _parse_tencent_kline(data: dict, symbol: str) -> pd.DataFrame:
return df
def fetch_eastmoney_kline(code: str, days: int = 120) -> pd.DataFrame:
"""获取东方财富日线(当前仅作为 A 股 fallback"""
stock = normalize_stock_code(code)
if stock['market'] not in ('SH', 'SZ') or len(stock['code']) != 8:
return pd.DataFrame()
raw_code = stock['code'][2:]
secid_prefix = '1' if stock['market'] == 'SH' else '0'
end = datetime.now().strftime('%Y%m%d')
# 多抓一些,避免交易日折算不足
start = (datetime.now() - timedelta(days=max(days * 2, 180))).strftime('%Y%m%d')
url = (
"https://push2his.eastmoney.com/api/qt/stock/kline/get"
f"?secid={secid_prefix}.{raw_code}"
"&fields1=f1,f2,f3,f4,f5,f6"
"&fields2=f51,f52,f53,f54,f55,f56,f57,f58"
"&klt=101&fqt=1"
f"&beg={start}&end={end}"
)
for attempt in range(MAX_RETRIES):
try:
req = urllib.request.Request(url, headers={
'User-Agent': 'Mozilla/5.0',
'Referer': 'https://quote.eastmoney.com/',
})
with urllib.request.urlopen(req, timeout=20) as response:
payload = json.loads(response.read().decode('utf-8'))
klines = ((payload.get('data') or {}).get('klines')) or []
if not klines:
return pd.DataFrame()
records = []
for item in klines:
parts = item.split(',')
if len(parts) < 6:
continue
records.append({
'Date': parts[0],
'Open': float(parts[1]),
'Close': float(parts[2]),
'High': float(parts[3]),
'Low': float(parts[4]),
'Volume': float(parts[5]),
})
df = pd.DataFrame(records)
if not df.empty:
df['Date'] = pd.to_datetime(df['Date'])
df.set_index('Date', inplace=True)
return df
except (urllib.error.URLError, json.JSONDecodeError, ValueError) as e:
if attempt < MAX_RETRIES - 1:
time.sleep(RETRY_BASE_DELAY * (attempt + 1))
else:
raise Exception(f"获取东方财富K线失败: {e}")
return pd.DataFrame()
def fetch_us_kline_yahoo(symbol: str, period: str = '6mo') -> pd.DataFrame:
range_map = {
'1mo': '1mo',
@@ -338,12 +486,13 @@ def min_kline_points(required_days: int) -> int:
return 20 if required_days <= 30 else 30
def refresh_kline_cache(code: str, required_days: int, period: str = '6mo') -> pd.DataFrame:
"""使用 SQLite 保存日线数据,并按需增量刷新。"""
def refresh_kline_cache(code: str, required_days: int, period: str = '6mo') -> tuple[pd.DataFrame, str]:
"""使用 SQLite 保存日线数据,并按需增量刷新。返回 (hist, source)。"""
stock = normalize_stock_code(code)
buffer_days = 30
latest_date = get_latest_kline_date(code)
fetch_days = max(required_days + buffer_days, 60)
source_used = 'tencent'
if latest_date:
latest_dt = datetime.strptime(latest_date, "%Y-%m-%d")
@@ -354,20 +503,31 @@ def refresh_kline_cache(code: str, required_days: int, period: str = '6mo') -> p
fetch_days = max(missing_days + buffer_days, 60)
fetched = fetch_tencent_kline(code, fetch_days)
if stock['market'] == 'US' and len(fetched) <= 2:
if stock['market'] in ('SH', 'SZ') and len(fetched) <= 2:
fetched = fetch_eastmoney_kline(code, fetch_days)
source_used = 'eastmoney' if not fetched.empty else source_used
elif stock['market'] == 'US' and len(fetched) <= 2:
fetched = fetch_us_kline_yahoo(stock['code'], period)
source_used = 'yahoo' if not fetched.empty else source_used
if not fetched.empty:
upsert_kline_df(code, fetched, source='yahoo' if stock['market'] == 'US' and len(fetched) > 2 else 'tencent')
upsert_kline_df(code, fetched, source=source_used)
hist = get_kline_df(code, required_days + buffer_days)
if len(hist) < min_kline_points(required_days):
fallback = fetch_tencent_kline(code, required_days + buffer_days)
if stock['market'] == 'US' and len(fallback) <= 2:
fallback_source = 'tencent'
if stock['market'] in ('SH', 'SZ') and len(fallback) <= 2:
fallback = fetch_eastmoney_kline(code, required_days + buffer_days)
fallback_source = 'eastmoney' if not fallback.empty else fallback_source
elif stock['market'] == 'US' and len(fallback) <= 2:
fallback = fetch_us_kline_yahoo(stock['code'], period)
fallback_source = 'yahoo' if not fallback.empty else fallback_source
if not fallback.empty:
upsert_kline_df(code, fallback, source='yahoo' if stock['market'] == 'US' and len(fallback) > 2 else 'tencent')
upsert_kline_df(code, fallback, source=fallback_source)
hist = get_kline_df(code, required_days + buffer_days)
return hist
source_used = fallback_source
return hist, source_used
# ─────────────────────────────────────────────
@@ -886,11 +1046,16 @@ def analyze_stock(code: str, period: str = "6mo", use_cache: bool = True) -> dic
result = {"code": full_code, "market": stock['market'], "analysis_time": datetime.now().isoformat(), "error": None}
try:
quote = fetch_tencent_quote(full_code)
quote = fetch_quote_with_fallback(full_code)
if not quote or not quote.get("price"):
result["error"] = f"无法获取 {full_code} 的实时行情"
return result
result["data_sources"] = {
"quote": quote.get("quote_source", "tencent"),
"kline": None,
}
upsert_watchlist_item(
code=full_code,
market=quote.get('market', stock['market']),
@@ -915,7 +1080,8 @@ def analyze_stock(code: str, period: str = "6mo", use_cache: bool = True) -> dic
result["price_change_pct"] = quote.get("change_pct")
days = period_to_days(period)
hist = refresh_kline_cache(full_code, days, period)
hist, kline_source = refresh_kline_cache(full_code, days, period)
result["data_sources"]["kline"] = kline_source
if hist.empty or len(hist) < min_kline_points(days):
result["error"] = f"无法获取 {full_code} 的历史K线数据 (仅获得 {len(hist)} 条)"
return result