Files
stockbuddy/scripts/portfolio_manager.py
2026-04-02 01:26:10 +08:00

466 lines
19 KiB
Python
Executable File

#!/usr/bin/env python3
"""
多市场股票持仓/关注池管理工具 - 基于 SQLite 管理关注股票、持仓并批量分析。
用法:
python3 portfolio_manager.py list
python3 portfolio_manager.py add <代码> --price <买入价> --shares <数量> [--date <日期>] [--note <备注>] [--account <账户名或ID>]
python3 portfolio_manager.py remove <代码>
python3 portfolio_manager.py update <代码> [--price <价格>] [--shares <数量>] [--note <备注>] [--account <账户名或ID>]
python3 portfolio_manager.py analyze [--output <输出文件>]
python3 portfolio_manager.py account-list
python3 portfolio_manager.py account-upsert <账户名> [--market <市场>] [--currency <币种>] [--cash <总现金>] [--available-cash <可用现金>] [--note <备注>]
python3 portfolio_manager.py rule-set <代码> [--lot-size <每手股数>] [--tick-size <最小价位>] [--odd-lot]
python3 portfolio_manager.py watch-list
python3 portfolio_manager.py watch-add <代码>
python3 portfolio_manager.py watch-remove <代码>
数据默认保存在: ~/.stockbuddy/stockbuddy.db
"""
import sys
import json
import argparse
import os
import time
from collections import defaultdict
from datetime import datetime
try:
from db import (
DB_PATH,
get_account,
get_watchlist_item,
init_db,
list_accounts as db_list_accounts,
list_positions as db_list_positions,
list_watchlist as db_list_watchlist,
remove_position as db_remove_position,
set_watch_status,
update_position_fields,
upsert_account,
upsert_position,
upsert_stock_rule,
upsert_watchlist_item,
)
from analyze_stock import fetch_tencent_quote, normalize_stock_code, analyze_stock
except ImportError:
script_dir = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, script_dir)
from db import (
DB_PATH,
get_account,
get_watchlist_item,
init_db,
list_accounts as db_list_accounts,
list_positions as db_list_positions,
list_watchlist as db_list_watchlist,
remove_position as db_remove_position,
set_watch_status,
update_position_fields,
upsert_account,
upsert_position,
upsert_stock_rule,
upsert_watchlist_item,
)
from analyze_stock import fetch_tencent_quote, normalize_stock_code, analyze_stock
def normalize_code(code: str) -> str:
return normalize_stock_code(code)["code"]
def resolve_account(account_ref: str | None):
if not account_ref:
return None
account = get_account(account_ref)
if not account:
raise ValueError(f"账户不存在: {account_ref}")
return account
def ensure_watch_item(code: str, watched: bool = False) -> dict:
stock = normalize_stock_code(code)
quote = fetch_tencent_quote(stock["code"])
name = quote.get("name") if quote else None
return upsert_watchlist_item(
code=stock["code"],
market=stock["market"],
tencent_symbol=stock["tencent_symbol"],
name=name,
exchange=quote.get("exchange", stock.get("exchange")) if quote else stock.get("exchange"),
currency=quote.get("currency") if quote else None,
last_price=quote.get("price") if quote else None,
pe=quote.get("pe") if quote else None,
pb=quote.get("pb") if quote else None,
market_cap=quote.get("market_cap") if quote else None,
week52_high=quote.get("52w_high") if quote else None,
week52_low=quote.get("52w_low") if quote else None,
quote_time=quote.get("timestamp") if quote else None,
is_watched=watched,
meta=quote or stock,
)
def derive_execution_constraints(position: dict, current_price: float | None = None) -> dict:
shares = int(position.get("shares") or 0)
lot_size = position.get("lot_size")
allows_odd_lot = bool(position.get("allows_odd_lot") or False)
if lot_size is None or lot_size <= 0:
whole_lots = None
remainder = None
can_partial_sell = None
sellable_min_unit = 1 if allows_odd_lot else None
else:
whole_lots = shares // lot_size
remainder = shares % lot_size
can_partial_sell = allows_odd_lot or whole_lots >= 2 or remainder > 0
sellable_min_unit = 1 if allows_odd_lot else lot_size
estimated_cash_if_sell_all = round(shares * current_price, 2) if current_price is not None else None
return {
"lot_size": lot_size,
"allows_odd_lot": allows_odd_lot,
"sellable_min_unit": sellable_min_unit,
"whole_lots": whole_lots,
"odd_lot_remainder": remainder,
"can_partial_sell": can_partial_sell,
"estimated_cash_if_sell_all": estimated_cash_if_sell_all,
}
def derive_position_snapshot(position: dict, analysis: dict) -> dict:
current_price = analysis.get("current_price")
buy_price = position.get("buy_price")
shares = int(position.get("shares") or 0)
cost = round((buy_price or 0) * shares, 2)
market_value = round((current_price or 0) * shares, 2) if current_price is not None else None
pnl = round((current_price - buy_price) * shares, 2) if current_price is not None and buy_price is not None else None
pnl_pct = round((current_price - buy_price) / buy_price * 100, 2) if current_price is not None and buy_price not in (None, 0) else None
execution = derive_execution_constraints(position, current_price)
return {
"buy_price": buy_price,
"shares": shares,
"buy_date": position.get("buy_date"),
"cost": cost,
"market_value": market_value,
"pnl": pnl,
"pnl_pct": pnl_pct,
"note": position.get("note", ""),
"currency": position.get("currency"),
"market": position.get("market"),
"account": {
"id": position.get("account_id"),
"name": position.get("account_name"),
"market": position.get("account_market"),
"currency": position.get("account_currency"),
"cash_balance": position.get("account_cash_balance"),
"available_cash": position.get("account_available_cash"),
},
"execution_constraints": execution,
}
# ─────────────────────────────────────────────
# 持仓管理
# ─────────────────────────────────────────────
def list_positions():
init_db()
positions = db_list_positions()
if not positions:
print(json.dumps({"message": "持仓为空", "positions": []}, ensure_ascii=False, indent=2))
return
print(json.dumps({
"total_positions": len(positions),
"positions": positions,
"portfolio_db": str(DB_PATH),
"updated_at": datetime.now().isoformat(),
}, ensure_ascii=False, indent=2))
def add_position(code: str, price: float, shares: int, date: str = None, note: str = "", account_ref: str = None):
init_db()
normalized = normalize_stock_code(code)
existing = next((p for p in db_list_positions() if p["code"] == normalized["code"]), None)
if existing:
print(json.dumps({"error": f"{normalized['code']} 已在持仓中,请使用 update 命令更新"}, ensure_ascii=False))
return
account = resolve_account(account_ref)
watch = ensure_watch_item(normalized["code"], watched=True)
position = upsert_position(
code=normalized["code"],
market=normalized["market"],
tencent_symbol=normalized["tencent_symbol"],
buy_price=price,
shares=shares,
buy_date=date or datetime.now().strftime("%Y-%m-%d"),
note=note,
account_id=account.get("id") if account else None,
name=watch.get("name"),
currency=watch.get("currency"),
meta=json.loads(watch["meta_json"]) if watch.get("meta_json") else None,
)
print(json.dumps({"message": f"已添加 {normalized['code']}", "position": position}, ensure_ascii=False, indent=2))
def remove_position(code: str):
init_db()
normalized_code = normalize_code(code)
removed = db_remove_position(normalized_code)
if not removed:
print(json.dumps({"error": f"{normalized_code} 不在持仓中"}, ensure_ascii=False))
return
print(json.dumps({"message": f"已移除 {normalized_code}"}, ensure_ascii=False, indent=2))
def update_position(code: str, price: float = None, shares: int = None, note: str = None, account_ref: str = None):
init_db()
normalized_code = normalize_code(code)
account = resolve_account(account_ref) if account_ref else None
position = update_position_fields(normalized_code, price=price, shares=shares, note=note, account_id=account.get("id") if account else None)
if not position:
print(json.dumps({"error": f"{normalized_code} 不在持仓中"}, ensure_ascii=False))
return
print(json.dumps({"message": f"已更新 {normalized_code}", "position": position}, ensure_ascii=False, indent=2))
# ─────────────────────────────────────────────
# 账户与交易规则管理
# ─────────────────────────────────────────────
def list_accounts():
init_db()
accounts = db_list_accounts()
print(json.dumps({
"total_accounts": len(accounts),
"accounts": accounts,
"portfolio_db": str(DB_PATH),
"updated_at": datetime.now().isoformat(),
}, ensure_ascii=False, indent=2))
def save_account(name: str, market: str = None, currency: str = None, cash: float = None, available_cash: float = None, note: str = ""):
init_db()
account = upsert_account(
name=name,
market=market,
currency=currency,
cash_balance=cash,
available_cash=available_cash,
note=note,
)
print(json.dumps({"message": f"已保存账户 {name}", "account": account}, ensure_ascii=False, indent=2))
def set_rule(code: str, lot_size: int = None, tick_size: float = None, odd_lot: bool = False):
init_db()
normalized_code = normalize_code(code)
ensure_watch_item(normalized_code, watched=False)
rule = upsert_stock_rule(code=normalized_code, lot_size=lot_size, tick_size=tick_size, allows_odd_lot=odd_lot)
print(json.dumps({"message": f"已设置 {normalized_code} 的交易规则", "rule": rule}, ensure_ascii=False, indent=2))
# ─────────────────────────────────────────────
# 关注池管理
# ─────────────────────────────────────────────
def list_watchlist():
init_db()
items = db_list_watchlist(only_watched=True)
print(json.dumps({
"total_watchlist": len(items),
"watchlist": items,
"portfolio_db": str(DB_PATH),
"updated_at": datetime.now().isoformat(),
}, ensure_ascii=False, indent=2))
def add_watch(code: str):
init_db()
watch = ensure_watch_item(code, watched=True)
print(json.dumps({"message": f"已关注 {watch['code']}", "watch": watch}, ensure_ascii=False, indent=2))
def remove_watch(code: str):
init_db()
normalized_code = normalize_code(code)
watch = set_watch_status(normalized_code, False)
if not watch:
print(json.dumps({"error": f"{normalized_code} 不在关注池中"}, ensure_ascii=False))
return
print(json.dumps({"message": f"已取消关注 {normalized_code}", "watch": watch}, ensure_ascii=False, indent=2))
# ─────────────────────────────────────────────
# 批量分析
# ─────────────────────────────────────────────
def analyze_portfolio(output_file: str = None):
init_db()
positions = db_list_positions()
if not positions:
print(json.dumps({"message": "持仓为空,无法分析"}, ensure_ascii=False, indent=2))
return
results = []
account_totals = defaultdict(lambda: {"cost": 0.0, "market_value": 0.0})
market_currency_totals = defaultdict(lambda: {"cost": 0.0, "market_value": 0.0})
for i, pos in enumerate(positions):
code = pos["code"]
print(f"正在分析 {code} ({i+1}/{len(positions)})...", file=sys.stderr)
analysis = analyze_stock(code)
analysis["portfolio_info"] = derive_position_snapshot(pos, analysis)
results.append(analysis)
snapshot = analysis["portfolio_info"]
market_value = snapshot.get("market_value") or 0.0
cost = snapshot.get("cost") or 0.0
account_name = snapshot.get("account", {}).get("name") or "未分配账户"
account_totals[account_name]["cost"] += cost
account_totals[account_name]["market_value"] += market_value
mc_key = f"{snapshot.get('market') or 'UNKNOWN'}:{snapshot.get('currency') or 'UNKNOWN'}"
market_currency_totals[mc_key]["cost"] += cost
market_currency_totals[mc_key]["market_value"] += market_value
if i < len(positions) - 1 and not analysis.get("_from_cache"):
time.sleep(2)
total_cost = sum(r.get("portfolio_info", {}).get("cost", 0) or 0 for r in results)
total_value = sum(r.get("portfolio_info", {}).get("market_value", 0) or 0 for r in results)
total_pnl = total_value - total_cost
for analysis in results:
snapshot = analysis.get("portfolio_info", {})
market_value = snapshot.get("market_value") or 0.0
account = snapshot.get("account") or {}
account_name = account.get("name") or "未分配账户"
account_total_value = account_totals[account_name]["market_value"]
snapshot["position_weight_of_portfolio_pct"] = round(market_value / total_value * 100, 2) if total_value > 0 else None
snapshot["position_weight_of_account_pct"] = round(market_value / account_total_value * 100, 2) if account_total_value > 0 else None
summary = {
"analysis_time": datetime.now().isoformat(),
"total_positions": len(results),
"total_cost": round(total_cost, 2),
"total_market_value": round(total_value, 2),
"total_pnl": round(total_pnl, 2),
"total_pnl_pct": round(total_pnl / total_cost * 100, 2) if total_cost > 0 else 0,
"accounts": [
{
"name": name,
"total_cost": round(v["cost"], 2),
"total_market_value": round(v["market_value"], 2),
"total_pnl": round(v["market_value"] - v["cost"], 2),
}
for name, v in sorted(account_totals.items())
],
"market_currency_breakdown": [
{
"market_currency": key,
"total_cost": round(v["cost"], 2),
"total_market_value": round(v["market_value"], 2),
"total_pnl": round(v["market_value"] - v["cost"], 2),
}
for key, v in sorted(market_currency_totals.items())
],
"positions": results,
}
output = json.dumps(summary, ensure_ascii=False, indent=2, default=str)
if output_file:
with open(output_file, "w", encoding="utf-8") as f:
f.write(output)
print(f"分析结果已保存至 {output_file}", file=sys.stderr)
print(output)
def main():
parser = argparse.ArgumentParser(description="多市场股票持仓/关注池管理工具")
subparsers = parser.add_subparsers(dest="command", help="子命令")
subparsers.add_parser("list", help="列出所有持仓")
subparsers.add_parser("account-list", help="列出账户")
add_parser = subparsers.add_parser("add", help="添加持仓")
add_parser.add_argument("code", help="股票代码")
add_parser.add_argument("--price", type=float, required=True, help="买入价格")
add_parser.add_argument("--shares", type=int, required=True, help="持有数量")
add_parser.add_argument("--date", help="买入日期 (YYYY-MM-DD)")
add_parser.add_argument("--note", default="", help="备注")
add_parser.add_argument("--account", help="账户名或账户ID")
rm_parser = subparsers.add_parser("remove", help="移除持仓")
rm_parser.add_argument("code", help="股票代码")
up_parser = subparsers.add_parser("update", help="更新持仓")
up_parser.add_argument("code", help="股票代码")
up_parser.add_argument("--price", type=float, help="买入价格")
up_parser.add_argument("--shares", type=int, help="持有数量")
up_parser.add_argument("--note", help="备注")
up_parser.add_argument("--account", help="账户名或账户ID")
analyze_parser = subparsers.add_parser("analyze", help="批量分析持仓")
analyze_parser.add_argument("--output", help="输出JSON文件")
account_parser = subparsers.add_parser("account-upsert", help="新增或更新账户")
account_parser.add_argument("name", help="账户名")
account_parser.add_argument("--market", help="市场,例如 HK/CN/US")
account_parser.add_argument("--currency", help="币种,例如 HKD/CNY/USD")
account_parser.add_argument("--cash", type=float, help="账户总现金")
account_parser.add_argument("--available-cash", type=float, help="账户可用现金")
account_parser.add_argument("--note", default="", help="备注")
rule_parser = subparsers.add_parser("rule-set", help="设置股票交易规则")
rule_parser.add_argument("code", help="股票代码")
rule_parser.add_argument("--lot-size", type=int, help="一手股数")
rule_parser.add_argument("--tick-size", type=float, help="最小价位变动")
rule_parser.add_argument("--odd-lot", action="store_true", help="允许碎股")
subparsers.add_parser("watch-list", help="列出关注池")
watch_add_parser = subparsers.add_parser("watch-add", help="添加关注股票")
watch_add_parser.add_argument("code", help="股票代码")
watch_remove_parser = subparsers.add_parser("watch-remove", help="取消关注股票")
watch_remove_parser.add_argument("code", help="股票代码")
args = parser.parse_args()
try:
if args.command == "list":
list_positions()
elif args.command == "add":
add_position(args.code, args.price, args.shares, args.date, args.note, args.account)
elif args.command == "remove":
remove_position(args.code)
elif args.command == "update":
update_position(args.code, args.price, args.shares, args.note, args.account)
elif args.command == "analyze":
analyze_portfolio(args.output)
elif args.command == "account-list":
list_accounts()
elif args.command == "account-upsert":
save_account(args.name, args.market, args.currency, args.cash, args.available_cash, args.note)
elif args.command == "rule-set":
set_rule(args.code, args.lot_size, args.tick_size, args.odd_lot)
elif args.command == "watch-list":
list_watchlist()
elif args.command == "watch-add":
add_watch(args.code)
elif args.command == "watch-remove":
remove_watch(args.code)
else:
parser.print_help()
except ValueError as e:
print(json.dumps({"error": str(e)}, ensure_ascii=False, indent=2))
sys.exit(1)
if __name__ == "__main__":
main()