主题
Tushare 数据集成层 — 设计 spec
Date: 2026-05-08 Status: Draft v2(与 01 v2 同步:依赖 Stage 1 两年回填;连接管理 / 视图 / 时区约定从 01 §6 引入) Owner: liang Related: docs/planning/01-data-layer.md(数据层 — 仓库化 / 采集),本 spec 与其平行:
01-data-layer.md | 存什么 / 怎么落仓 —— Tushare 接口清单、调度、Raw + Normalized schema |
2026-05-08-tushare-integration-design.md (本 spec) | agent 怎么拿到 对的 / 新的 / 归一化的 数据 —— Provider 抽象、Composer、cache 策略、响应口径 |
1. 失败模式与事故清单
源自 ~/.hermes/profiles/stock-research-agent/sessions/ 真实用户反馈(2026-04-30 ~ 2026-05-08):
| # | 日期 | 股票 | Agent 报 | 实际 | 失败模式 | Root cause |
|---|---|---|---|---|---|---|
| 1 | 2026-05-07 15:38 | 永鼎股份 (600105) | 43.68 | 47.52 (-8.1%) | Freshness | daily_cache.TTL=24h + 收盘后无刷新;agent 拿到 5/6 的 close |
| 2 | 2026-05-06 15:31 | 璞泰来 (603659) | ~38.62 | 37.17 (+3.9%) | Source 错配 | 从 ETF 持仓表读了"价格"字段,语义不是 OHLC |
| 3 | 2026-05-06 10:23 | 大位科技 (600589) | 10.07 | 10.83 (-7.0%) | Freshness | 返回 4/30 的 close —— 6 天前 |
| 4 | 2026-05-08 10:05 | 浙文互联 (300582) | 5,362,159 万元 | ~735 万元 | 单位混乱 | akshare 成交额 单位是元,被当成万元用,差 ~70 倍 |
| 5 | 2026-05-08 10:05 | 浙文互联 (300582) | KeyError | — | Schema 脆 | akshare 没返回 昨收 列,消费侧硬取该字段崩溃 |
用户原话(USER.md 已经记录的原则):
股票数据抓取原则:必须交叉验证多数据源,确保数据时效性和准确性
→ 5 起事故全部源于集成层工程选择,与 Tushare 数据本身无关。
2. 设计原则
- 强类型契约。 所有跨 provider 边界的对象(Quote / Fundamentals / FreshnessReport)是 frozen dataclass;不允许传 dict 越界。
- 强 provenance。 每个返回值必带
as_of、freshness_seconds、source,调用方拿到任何数字时都能判断它"什么时候"、"从哪里"。 - 强失败。 不静默回落。Provider 抛 typed exception;Composer 显式选择降级路径并记录降级。
- 强 capability。 Provider 必须声明能力(
Capability枚举),Composer 只把请求路由到声明了相应能力的 provider。 - 强 canonical 单位。 单位锁死在响应 schema;provider 实现负责把上游单位转成 canonical,错则在 provider 内部失败,不污染下游。
- 强 market-state 感知。 Cache 行为、source 选择、降级策略都依赖
MarketState;状态判定走本地trade_cal表,不调远程接口。
3. 为什么不是"shorten TTL"了事
事故 1 / 3(freshness)单靠 cache TTL 缩短就能修。但:
- 事故 2(source 错配)需要 capability 声明 + 路由约束
- 事故 4(单位)需要 canonical schema + provider 边界归一化
- 事故 5(schema 脆)需要 typed dataclass 出入边界
- 复权(USER.md 偏好 QFQ)要求响应里同时给 raw / qfq / hfq
→ Provider 抽象一次解决全部 5 个失败模式 + USER.md 的复权要求 + 未来加 WarehouseProvider 的钩子。值得。
4. DataProvider 抽象
4.1 模块布局
core/core/data/
├── providers/
│ ├── __init__.py # ProviderRegistry 注册入口
│ ├── base.py # ABC + 类型契约
│ ├── tushare_daily.py # TushareDailyProvider(包装 core/data/tushare.py)
│ ├── akshare_spot.py # AkshareSpotProvider(包装 core/data/akshare.py)
│ └── warehouse.py # WarehouseProvider(W6 才落地,依赖 01 §3 的仓库)
├── tushare.py # 不动 —— HTTP wrapper
├── akshare.py # 不动
├── _market.py # 删掉 / 折叠到 providers/_market_state.py
└── schemas.py # 删掉 get_price / get_fundamentals 实现,由 Composer 替代4.2 类型契约(base.py)
python
from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
from enum import Enum
class Capability(Enum):
"""provider 能力声明。Composer 按 capability 路由请求。"""
DAILY_QUOTE_HISTORICAL = "daily_quote_historical" # 任意 trade_date 的 close
DAILY_QUOTE_LATEST = "daily_quote_latest" # 最近交易日 close(不指定 trade_date)
INTRADAY_QUOTE = "intraday_quote" # 盘中实时报价(仅市场开盘)
FUNDAMENTALS_QUARTERLY = "fundamentals_quarterly"
ADJ_FACTOR = "adj_factor"
INDEX_DAILY = "index_daily"
class MarketState(Enum):
OPEN = "open" # 9:30-11:30 / 13:00-15:00 of trading day
CLOSING = "closing" # 15:00-15:05 of trading day(数据沉降中,禁止 cache)
CLOSED_FINAL = "closed_final" # 15:05+ of trading day OR non-trading day
PRE_OPEN = "pre_open" # < 9:30 of next trading day
@dataclass(frozen=True)
class Quote:
"""A 股日线 quote 的 canonical 表示。
单位约定(锁死,禁止 provider 越界):
- close_raw / open_raw / high_raw / low_raw : 元(4 位小数)
- volume : 手
- amount : 千元
- adj_factor / adj_factor_latest : 无量纲
复权(不在 dataclass 字段,是 property):
- close_qfq = close_raw * adj_factor / adj_factor_latest
- close_hfq = close_raw * adj_factor
"""
ts_code: str
trade_date: str # YYYY-MM-DD
close_raw: float
open_raw: float | None
high_raw: float | None
low_raw: float | None
volume: int | None
amount: float | None
adj_factor: float | None
adj_factor_latest: float | None # 用于 qfq 计算
as_of: datetime # UTC, provider 拿到值的时刻
freshness_seconds: int # (now - as_of).total_seconds()
source: str # provider id, e.g. "tushare-daily"
@property
def close_qfq(self) -> float | None:
if self.adj_factor is None or self.adj_factor_latest is None:
return None
return round(self.close_raw * self.adj_factor / self.adj_factor_latest, 4)
@property
def close_hfq(self) -> float | None:
if self.adj_factor is None:
return None
return round(self.close_raw * self.adj_factor, 4)
@dataclass(frozen=True)
class FreshnessReport:
provider_id: str
last_success: datetime | None
last_failure: datetime | None
error_msg: str | None
rows_today: int
@dataclass(frozen=True)
class HealthStatus:
provider_id: str
healthy: bool
error: str | None
response_time_ms: float | None
# Typed exceptions —— Provider 边界用这些;Composer 必须区分。
class ProviderError(Exception):
"""Provider 调用基类异常。"""
class ProviderUnavailable(ProviderError):
"""Provider 暂时不可用(网络 / 上游 5xx)。Composer 应降级到下一 provider。"""
class ProviderQuotaExceeded(ProviderError):
"""超过频次 / 总量上限。Composer 应降级,并在告警中记录。"""
class CapabilityNotSupported(ProviderError):
"""Provider 没声明所需 capability。这是配置错误,不应被 Composer 静默吞下。"""
class CanonicalUnitViolation(ProviderError):
"""Provider 实现 bug:返回值越过 canonical 单位约定。Critical。"""
class DataProvider(ABC):
"""所有数据源的抽象基类。
Provider 实现负责:
1. 跟上游说话(HTTP / SQL / 本地表)
2. 把结果转成 Quote / FreshnessReport
3. 单位归一化(违反 → CanonicalUnitViolation)
4. 错误归一化(→ ProviderError 子类)
"""
id: str # 类属性,唯一
capabilities: set[Capability] # 类属性,声明
canonical_unit_check: bool = True # 是否对返回值跑单位校验(dev/test = True)
@abstractmethod
def get_quote(self, code: str, *, trade_date: str | None = None) -> Quote:
"""主接口。
- trade_date=None:返回最近一次 close(要求 DAILY_QUOTE_LATEST 或 INTRADAY_QUOTE)
- trade_date=YYYYMMDD:返回该日 close(要求 DAILY_QUOTE_HISTORICAL)
"""
@abstractmethod
def freshness(self) -> FreshnessReport: ...
@abstractmethod
def health(self) -> HealthStatus: ...4.3 三个具体实现
4.3.1 TushareDailyProvider
python
class TushareDailyProvider(DataProvider):
id = "tushare-daily"
capabilities = {
Capability.DAILY_QUOTE_HISTORICAL,
Capability.DAILY_QUOTE_LATEST,
Capability.ADJ_FACTOR,
}
def __init__(self, token: str): ...
def get_quote(self, code, *, trade_date=None):
# 1. tushare.call(api_name="daily", ...) → 拿 close_raw / open / high / low / volume / amount
# 2. tushare.call(api_name="adj_factor", ...) → 拿 adj_factor 当日值 + 最新值
# 3. 单位归一化:volume 已经是手;amount 单位转成千元(Tushare amount 原本就是千元)
# 4. 包成 Quote 返回调用 Tushare 两次(daily + adj_factor)。Composer 层做合并 cache 让两次合一次。
4.3.2 AkshareSpotProvider
python
class AkshareSpotProvider(DataProvider):
id = "akshare-spot"
capabilities = {Capability.INTRADAY_QUOTE} # 仅市场开盘时
def get_quote(self, code, *, trade_date=None):
if trade_date is not None:
raise CapabilityNotSupported(
"akshare-spot 只支持 trade_date=None(实时盘口)"
)
# 1. akshare 实时全市场快照 → 取该 code
# 2. 单位严格转换:amount 从元转到千元;volume 从股转到手
# 任何转换失败 → CanonicalUnitViolation
# 3. 包成 Quote (close_raw=current_price, adj_factor=None)
# 4. as_of = 现在时刻;freshness_seconds = ~5事故 4(akshare 成交额单位)就在第 2 步必须 catch。Provider 边界是单位污染的最后防线。
4.3.3 WarehouseProvider(W6 落地,依赖 01-data-layer.md T1)
python
class WarehouseProvider(DataProvider):
id = "warehouse"
capabilities = {
Capability.DAILY_QUOTE_HISTORICAL,
Capability.DAILY_QUOTE_LATEST,
Capability.ADJ_FACTOR,
Capability.FUNDAMENTALS_QUARTERLY,
Capability.INDEX_DAILY,
}
def __init__(self, db_path: str): ...
def get_quote(self, code, *, trade_date=None):
# SELECT ... FROM normalized_daily JOIN normalized_adj_factor ...
# 命中 → Quote
# 未命中 → ProviderUnavailable(让 Composer 降到 Tushare)4.4 ProviderRegistry
python
class ProviderRegistry:
"""单例式注册中心。启动时由 src/service/main.py 注册所有 provider。"""
_providers: dict[str, DataProvider] = {}
@classmethod
def register(cls, provider: DataProvider) -> None:
if not provider.id:
raise ValueError("provider.id 不能为空")
if provider.id in cls._providers:
raise ValueError(f"provider {provider.id!r} 重复注册")
if not provider.capabilities:
raise ValueError(f"provider {provider.id!r} 必须声明至少一个 capability")
cls._providers[provider.id] = provider
@classmethod
def get(cls, provider_id: str) -> DataProvider: ...
@classmethod
def by_capability(cls, cap: Capability) -> list[DataProvider]: ...
@classmethod
def all(cls) -> list[DataProvider]: ...注册在 FastAPI lifespan 里:
python
# src/service/main.py
async def lifespan(app):
ProviderRegistry.register(TushareDailyProvider(token=settings.tushare_token))
ProviderRegistry.register(AkshareSpotProvider())
if settings.warehouse_enabled:
ProviderRegistry.register(WarehouseProvider(db_path=settings.warehouse_path))
yield5. Market State(基于 trade_cal)
5.1 数据依赖
需要 01-data-layer.md §3.6 一次性任务:先把 trade_cal 表落到 ~/twilight/data/warehouse.duckdb,覆盖 today - 30d ~ today + 365d。
这是本 spec 的硬依赖。 W1 第一步就是种下 trade_cal,不能等 01 整个 T1 落地。
5.2 状态判定
python
class MarketStateResolver:
"""通过本地 trade_cal 表 + 当前时刻判定 MarketState。"""
def __init__(self, db_path: str): ...
def resolve(self, now: datetime | None = None) -> MarketState:
now = now or datetime.now(ZoneInfo("Asia/Shanghai"))
today_is_open = self._is_trading_day(now.date())
t = now.time()
if not today_is_open:
return MarketState.CLOSED_FINAL
if t < time(9, 30):
return MarketState.PRE_OPEN
if (time(9, 30) <= t < time(11, 30)) or (time(13, 0) <= t < time(15, 0)):
return MarketState.OPEN
if time(15, 0) <= t < time(15, 5):
return MarketState.CLOSING
return MarketState.CLOSED_FINAL # 15:05+Resolver 启动时把 trade_cal 整表读进内存(~250 行/年),resolve() 是纯计算。
5.3 时间精度
中国 A 股时区是 Asia/Shanghai (UTC+8, no DST)。所有 MarketState 判定走该时区;Quote.as_of 走 UTC。两套时区混用是 bug。
6. Composer / Router
6.1 路由策略
python
class QuoteComposer:
"""根据 (capability, market_state) 选 provider 链,按链顺序尝试。"""
# 路由策略表:(请求语义, market_state) → 优先级 provider id 列表
ROUTES: dict[tuple[str, MarketState], list[str]] = {
# 历史 close(指定 trade_date)—— 永远走 warehouse → tushare
("historical", MarketState.OPEN): ["warehouse", "tushare-daily"],
("historical", MarketState.CLOSING): ["warehouse", "tushare-daily"],
("historical", MarketState.CLOSED_FINAL): ["warehouse", "tushare-daily"],
("historical", MarketState.PRE_OPEN): ["warehouse", "tushare-daily"],
# 最新 close(trade_date=None)开盘期间 —— 实时盘口优先
("latest", MarketState.OPEN): ["akshare-spot", "warehouse", "tushare-daily"],
# CLOSING 期 —— 数据沉降,禁止任何 cache 命中;只走最权威源
("latest", MarketState.CLOSING): ["tushare-daily"],
# 收盘后 —— warehouse 应该已经刷新;akshare 禁用
("latest", MarketState.CLOSED_FINAL): ["warehouse", "tushare-daily"],
("latest", MarketState.PRE_OPEN): ["warehouse", "tushare-daily"],
}
def get_quote(self, code: str, *, trade_date: str | None = None) -> Quote:
intent = "historical" if trade_date else "latest"
state = self.market.resolve()
chain = self.ROUTES[(intent, state)]
last_error: ProviderError | None = None
for provider_id in chain:
try:
provider = self.registry.get(provider_id)
except KeyError:
continue # provider 未注册(warehouse 在 W5 之前)
try:
quote = provider.get_quote(code, trade_date=trade_date)
self._audit(code, trade_date, intent, state, chain, provider_id, ok=True)
return quote
except (ProviderUnavailable, ProviderQuotaExceeded) as e:
last_error = e
continue # 降级
except (CapabilityNotSupported, CanonicalUnitViolation):
# bug —— 不降级,直接抛
raise
self._audit(code, trade_date, intent, state, chain, None, ok=False, error=last_error)
raise ProviderUnavailable(
f"all providers exhausted for {code} (intent={intent}, state={state.value}); "
f"last_error={last_error!r}"
)6.2 失败显式
CapabilityNotSupported/CanonicalUnitViolation永远不降级 —— 这是 bug,必须暴露ProviderUnavailable/ProviderQuotaExceeded才降级- 全链失败抛
ProviderUnavailable,不返回任何"看起来像 quote"的对象 _audit()写到provider_audit_log表(每次调用一行)
6.3 不做的事
- ❌ 跨源自动对账(多 provider 都成功 → 比较 → 选边)。监测告警在 §8 做,不自动选。
- ❌ Provider 内部 retry(轮一次失败就抛 ProviderUnavailable,让 Composer 降级而不是 provider 内黑盒重试)
7. 市场感知 Cache TTL
7.1 现行问题
现行 daily_cache.TTL = 24h 是与市场状态脱钩的死值。事故 1/3 都是它造成的。
7.2 新策略
Cache 层在 Composer 之上,按 (code, trade_date) cache Quote 对象。TTL 由 MarketState + trade_date 联合决定:
| trade_date | MarketState | TTL 行为 |
|---|---|---|
| 任意(指定历史日) | 任意 | 永久 cache(历史 trade_date 的 close 永远 immutable) |
| None | OPEN | 60 秒过期(盘中波动大) |
| None | CLOSING | 不写 cache 也不读 cache(15:00-15:05 数据沉降中,所有调用都直接打到 provider 链) |
| None | CLOSED_FINAL | 有效至次个交易日 9:30(今日收盘价已定) |
| None | PRE_OPEN | 有效至今日 9:30(昨日 close 在开盘前一直有效) |
7.3 实现
python
class QuoteCache:
def get(self, code: str, trade_date: str | None, market_state: MarketState) -> Quote | None: ...
def put(self, quote: Quote, market_state: MarketState) -> None: ...
def invalidate_market_open(self) -> None:
"""每天 9:30 调用,清掉 trade_date=None 的所有 entry。"""cache backend:DuckDB 本地表 quote_cache(与仓库同 warehouse.duckdb 文件,复用 01 §6.6.2 的 WarehousePool),替代现行 daily_cache。schema 新增 cached_at + expires_at 列,expires_at IS NULL 表示永久。
7.4 现有 daily_cache 表迁移
W1 不动现有 daily_cache;W5 上线 QuoteCache 后跑 schema 迁移把老表数据 backfill 进来;W6 删老表。
8. 可观测性
8.1 三张表
sql
-- 每次 Composer.get_quote 调用一行
CREATE TABLE provider_audit_log (
id BIGINT PRIMARY KEY,
ts TIMESTAMP NOT NULL, -- UTC
code VARCHAR,
trade_date VARCHAR,
intent VARCHAR, -- 'historical' | 'latest'
market_state VARCHAR,
chain VARCHAR, -- comma-joined provider ids attempted
served_by VARCHAR, -- final provider that succeeded, or NULL
freshness_secs INTEGER, -- as_of 距 ts 的秒数
latency_ms INTEGER, -- 整个 Composer 调用耗时
error VARCHAR -- NULL on success
);
-- 每个 provider 的实时 freshness 状态(in-memory + 周期性持久化)
CREATE TABLE provider_freshness (
provider_id VARCHAR PRIMARY KEY,
last_success TIMESTAMP,
last_failure TIMESTAMP,
error_msg VARCHAR,
rows_today INTEGER DEFAULT 0,
updated_at TIMESTAMP NOT NULL
);
-- 跨源 diff 监测(多 provider 同时返回成功时记录差异)
CREATE TABLE provider_diff_log (
id BIGINT PRIMARY KEY,
ts TIMESTAMP NOT NULL,
code VARCHAR,
trade_date VARCHAR,
provider_a VARCHAR, -- e.g. 'warehouse'
value_a DOUBLE,
provider_b VARCHAR, -- e.g. 'tushare-daily'
value_b DOUBLE,
diff_pct DOUBLE, -- (a - b) / b * 100
severity VARCHAR -- 'info' | 'warning' | 'critical'
);8.2 三个 endpoint
GET /quality/freshness
→ [{provider_id, last_success, last_failure, error_msg, rows_today}, ...]
GET /quality/audit?since=2026-05-08T00:00&limit=100
→ 最近 N 次 Composer 调用记录
GET /quality/diff?severity=warning&since=...
→ 跨源不一致历史8.3 跨源 diff 触发条件
- 仅在 OPEN 期,且
latest请求成功路由到 ≥ 2 provider 时才记录 - diff_pct > 1% → severity=warning,写入 + 写入 stderr log
- diff_pct > 5% → severity=critical,写入 + 写入 stderr log + 异步 webhook 告警(webhook URL 是 P1.1 范畴,本 spec 留 hook)
9. 响应 schema 升级(breaking change)
9.1 现行 /price 响应
json
{
"value": 47.52,
"metric": "close",
"code": "600105.SH",
"as_of": "2026-05-07",
"cite": {
"kind": "tool",
"source": "tushare",
"table": "daily",
"fetched_at": "2026-05-07T07:30:00Z",
"tool_call_id": "tc_abc123",
"served_by": "tushare"
}
}9.2 新 /price 响应
json
{
"ts_code": "600105.SH",
"trade_date": "2026-05-07",
"close_raw": 47.52,
"close_qfq": 46.12,
"close_hfq": 49.80,
"open_raw": 45.30,
"high_raw": 47.90,
"low_raw": 45.10,
"volume": 1283000,
"amount": 5917430,
"adj_factor": 1.234,
"adj_factor_latest": 1.272,
"as_of": "2026-05-07T07:30:00Z",
"freshness_seconds": 480,
"source": "warehouse",
"source_chain": ["warehouse"],
"market_state": "closed_final",
"units": {
"price": "CNY",
"volume": "lots_100shares",
"amount": "thousand_CNY",
"share_count": "ten_thousand_shares",
"market_cap": "ten_thousand_CNY"
},
"cite": {
"kind": "tool",
"tool_call_id": "tc_abc123",
"fetched_at": "2026-05-07T07:30:00Z",
"source_chain": ["warehouse"]
}
}9.3 Breaking change 影响面
| 客户端 | 影响 | 修复 |
|---|---|---|
core/skill/scripts/fetch_price.py | 解析的 value 字段没了 | 改读 close_qfq(默认);提供 --raw 选项读 close_raw |
Hermes profile stock-research-agent 里调 fetch_price 的脚本 | 同上 | fetch_price.py 改完即修 |
/whoami / /healthz | 无 | — |
/fundamentals 路由 | 不受本 spec 影响 | 留待后续 spec |
W4 同步发布:backend 升级 + fetch_price.py 升级。不做 v1/v2 双轨。
10. 迁移路线(按周)
W1 — 骨架 + trade_cal 种子
- [ ]
core/data/providers/base.py:Capability / MarketState / Quote / 三个 Report / 五种 ProviderError / DataProvider ABC - [ ]
core/data/providers/__init__.py:ProviderRegistry - [ ] 种 trade_cal 表:一次性脚本
scripts/seed_trade_cal.py,从 Tushare 拉today-30d ~ today+365d,写到~/twilight/data/warehouse.duckdb的normalized_trade_cal表 - [ ]
core/data/providers/_market_state.py:MarketStateResolver - [ ] 单元测试:Quote dataclass / Capability 校验 / MarketStateResolver 边界(9:29:59 / 11:30:00 / 15:00:00 / 15:04:59 / 15:05:00 / 周末 / 春节)
W2 — Provider 包装(行为不变)
- [ ]
TushareDailyProvider:包装现有core/data/tushare.py,加 adj_factor 调用与单位校验 - [ ]
AkshareSpotProvider:包装现有core/data/akshare.py,关键:单位校验严格化(成交额转千元,volume 转手) - [ ] Provider conformance 测试:每个 provider 跑同一组用例(已知 code 返回合理值 / 错误归一化 / 单位不越界)
W3 — Composer + Cache
- [ ]
core/data/composer.py:QuoteComposer + ROUTES 表 - [ ]
core/data/quote_cache.py:QuoteCache(DuckDB backend) - [ ] Composer 在 lifespan 注入;FastAPI app.state.composer = ...
- [ ] 集成测试:5 起事故的 regression case 全部 pass
W4 — /price 切流(breaking)
- [ ]
src/service/routes/price.py:调 Composer,响应 schema 升级 - [ ]
core/skill/scripts/fetch_price.py:解析新 schema,默认 close_qfq - [ ]
core/core/data/schemas.py的get_price函数:标记 deprecated,内部转调 Composer - [ ] Hermes profile 里 cron 脚本 review 一遍,确保没人在直接读
value字段
W5 — 可观测性
- [ ] 三张表的 schema 落库
- [ ] Composer._audit / FreshnessTracker / DiffMonitor 实现
- [ ]
/quality/freshness/quality/audit/quality/diff三个 endpoint - [ ] 现有
daily_cache数据 backfill 进 QuoteCache,老表保留只读
W6 — WarehouseProvider(依赖 01 W1 + W2)
- [ ] 等 01 plan 的 W1 (T1 落仓 + Stage 1 两年回填) + W2 (
/price切流 +v_daily_full视图) 完成 - [ ]
WarehouseProvider实现 + 注册(读WarehousePool,from §6.6.2 of 01) - [ ] 删老的
daily_cache表(与 01 的老 cache 同周下线) - [ ] 删
core/core/data/_market.py(被 MarketStateResolver 替代)
依赖关系明示: 02 spec 的 W1-W5 完全独立于 01;只有 W6 需要 01 的仓库就位。01 W1-W2(约 1-2 周)跑完,02 W6 即可上线,整体两条线并行最快 ~6 周收尾。
11. 显式不做(Out of Scope)
- ❌ 多租户配额(profile manager 范畴;Phase 2 §B)
- ❌ LLM provider 抽象(Hermes 自己做的事)
- ❌ 实时 / 分钟数据(独立付费权限)
- ❌ 跨源自动选边(只做监测告警,不自动)
- ❌ 替换 akshare(先包装收口;后续可选换源)
- ❌ Webhook 告警的具体 URL / 通道(只留 hook,配置后置)
- ❌ Composer 的 sync → async 改造(FastAPI 不强制;存量代码同步即可)
- ❌ TDX (通达信) 跨源验证 —— 独立 spec,W7+ 启动(见 01-data-layer.md §8.6)
12. 开放问题
- akshare 单位的现实情况。 事故 4 显示 akshare
成交额是元,但 akshare 的同一接口对 ETF / 港股 / 美股可能是不同单位。AkshareSpotProvider 实现时要枚举所有 code 类型还是保守只支持 A 股 6 位代码?倾向后者。 - adj_factor_latest 的获取。 Tushare
adj_factor接口返回某 trade_date 的因子,"最新"需要再调一次(trade_date=最新交易日)。是 provider 内部维护一份 daily-refresh 的 cache 还是每次调用都打两次?倾向 in-memory cache + 每天 9:30 失效。 - Cache 命中时的 freshness_seconds 计算。 Cache 里存的是 Quote,Quote 自带
as_of。返回时重算freshness_seconds = (now - as_of).total_seconds(),还是直接返回 cache 时的快照值?倾向重算(让"现在 freshness 多老"准确)。 - AkshareSpotProvider 的 freshness。 akshare 实时盘口理论上 < 5s,但底层是爬新浪/腾讯财经,没有可靠 timestamp。强制
as_of=now、freshness_seconds=0还是设个保守的freshness_seconds=15?倾向 15s 保守值。 - CLOSING 期(15:00-15:05)的体验。 该期 cache 全 miss + provider 链短(只 tushare),可能短暂 P99 飙升。可以加一个 tight loop "每 30s 主动刷一次 latest" 让 cache 暖起来,但是工程上不优雅。先放这条,看 audit log 是否真有 P99 问题。
- CapabilityNotSupported 的 strictness。 Provider 没声明 capability 但 Composer 还是路由过来 = 配置 bug。直接 raise 还是 log + 跳到下一个 provider?倾向 raise(配置 bug 必须暴露)。
- trade_cal 数据源依赖。 本 spec W1 第一步靠 Tushare 一次性拉 trade_cal —— 但 Tushare 的 trade_cal 接口本身存在 §1 提到的 "120 文档说要 2000" 的不一致风险(虽然探针证实可调)。如果哪天突然不让调,要不要内置一份硬编码的节假日表 fallback?暂不做,flag 风险。
13. 与 01-data-layer.md 的一致性检查
| 项 | 01 spec | 本 spec | 状态 |
|---|---|---|---|
| 价格单位 | 元(DOUBLE) §6.2 | 元(Quote.close_raw) | ✅ |
| 成交量单位 | 手(BIGINT)§6.2 | 手(Quote.volume)§4.2 | ✅ |
| 成交额单位 | 千元(与 Tushare daily.amount 对齐)§6.2 | 千元(Quote.amount) | ✅ |
| 总市值单位 | 万元(保留原单位)§6.5 / §14 Q3 | units 字段透传 §9.2 | ✅ |
| 总股本单位 | 万股 §6.5 | units 字段透传 | ✅ |
| 复权策略 | 存原始因子,业务逻辑后置 §6.3 | Quote.close_qfq / close_hfq property §4.2 | ✅ |
| trade_date 类型 | DATE §6.2 | trade_date: str (YYYY-MM-DD) §4.2 | ✅ 边界转换在 Provider |
| ingested_at 时区 | UTC naive §6.5.6 | as_of: UTC aware §4.2 | ✅ 边界转换 replace(tzinfo=UTC) |
| trade_cal 落仓 | §3.6 + §12 W1 种子 | W1 第一步种子 §10 | ✅(依赖明确) |
| WarehouseProvider 表来源 | normalized_daily / normalized_adj_factor / normalized_daily_basic §3.1 | 同 §4.3.3 | ✅ |
视图 v_daily_full | §6.6.1 | WarehouseProvider 优先读视图 §4.3.3 | ✅ |
| 连接管理 | WarehousePool (read) + WarehouseWriter §6.6.2 | WarehouseProvider 用 pool 的 read | ✅ |
| 现有 daily_cache 命运 | 切流后 W6 下线 §14 Q5 | W6 删 §10 | ✅ |
| 回填 Stage | Stage 1 = 2 年 §8.1 | W6 依赖 Stage 1 完成 §10 | ✅ |
| TDX 兜底 | §8.6 列入 W7+ 独立 spec | §11 Out of scope(同步约定) | ✅ |
任何 01 后续修改 → 本 spec 同步刷新。