Skip to content

Tushare 数据集成层 — 设计 spec

Date: 2026-05-08 Status: Draft v2(与 01 v2 同步:依赖 Stage 1 两年回填;连接管理 / 视图 / 时区约定从 01 §6 引入) Owner: liang Related: docs/planning/01-data-layer.md(数据层 — 仓库化 / 采集),本 spec 与其平行:

01-data-layer.md存什么 / 怎么落仓 —— Tushare 接口清单、调度、Raw + Normalized schema
2026-05-08-tushare-integration-design.md (本 spec)agent 怎么拿到 对的 / 新的 / 归一化的 数据 —— Provider 抽象、Composer、cache 策略、响应口径

1. 失败模式与事故清单

源自 ~/.hermes/profiles/stock-research-agent/sessions/ 真实用户反馈(2026-04-30 ~ 2026-05-08):

#日期股票Agent 报实际失败模式Root cause
12026-05-07 15:38永鼎股份 (600105)43.6847.52 (-8.1%)Freshnessdaily_cache.TTL=24h + 收盘后无刷新;agent 拿到 5/6 的 close
22026-05-06 15:31璞泰来 (603659)~38.6237.17 (+3.9%)Source 错配从 ETF 持仓表读了"价格"字段,语义不是 OHLC
32026-05-06 10:23大位科技 (600589)10.0710.83 (-7.0%)Freshness返回 4/30 的 close —— 6 天前
42026-05-08 10:05浙文互联 (300582)5,362,159 万元~735 万元单位混乱akshare 成交额 单位是,被当成万元用,差 ~70 倍
52026-05-08 10:05浙文互联 (300582)KeyErrorSchema 脆akshare 没返回 昨收 列,消费侧硬取该字段崩溃

用户原话(USER.md 已经记录的原则):

股票数据抓取原则:必须交叉验证多数据源,确保数据时效性和准确性

→ 5 起事故全部源于集成层工程选择,与 Tushare 数据本身无关。

2. 设计原则

  1. 强类型契约。 所有跨 provider 边界的对象(Quote / Fundamentals / FreshnessReport)是 frozen dataclass;不允许传 dict 越界。
  2. 强 provenance。 每个返回值必带 as_offreshness_secondssource,调用方拿到任何数字时都能判断它"什么时候"、"从哪里"。
  3. 强失败。 不静默回落。Provider 抛 typed exception;Composer 显式选择降级路径并记录降级。
  4. 强 capability。 Provider 必须声明能力(Capability 枚举),Composer 只把请求路由到声明了相应能力的 provider。
  5. 强 canonical 单位。 单位锁死在响应 schema;provider 实现负责把上游单位转成 canonical,错则在 provider 内部失败,不污染下游。
  6. 强 market-state 感知。 Cache 行为、source 选择、降级策略都依赖 MarketState;状态判定走本地 trade_cal 表,不调远程接口。

3. 为什么不是"shorten TTL"了事

事故 1 / 3(freshness)单靠 cache TTL 缩短就能修。但:

  • 事故 2(source 错配)需要 capability 声明 + 路由约束
  • 事故 4(单位)需要 canonical schema + provider 边界归一化
  • 事故 5(schema 脆)需要 typed dataclass 出入边界
  • 复权(USER.md 偏好 QFQ)要求响应里同时给 raw / qfq / hfq

→ Provider 抽象一次解决全部 5 个失败模式 + USER.md 的复权要求 + 未来加 WarehouseProvider 的钩子。值得。

4. DataProvider 抽象

4.1 模块布局

core/core/data/
├── providers/
│   ├── __init__.py        # ProviderRegistry 注册入口
│   ├── base.py            # ABC + 类型契约
│   ├── tushare_daily.py   # TushareDailyProvider(包装 core/data/tushare.py)
│   ├── akshare_spot.py    # AkshareSpotProvider(包装 core/data/akshare.py)
│   └── warehouse.py       # WarehouseProvider(W6 才落地,依赖 01 §3 的仓库)
├── tushare.py             # 不动 —— HTTP wrapper
├── akshare.py             # 不动
├── _market.py             # 删掉 / 折叠到 providers/_market_state.py
└── schemas.py             # 删掉 get_price / get_fundamentals 实现,由 Composer 替代

4.2 类型契约(base.py)

python
from __future__ import annotations

from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime
from enum import Enum


class Capability(Enum):
    """provider 能力声明。Composer 按 capability 路由请求。"""
    DAILY_QUOTE_HISTORICAL = "daily_quote_historical"  # 任意 trade_date 的 close
    DAILY_QUOTE_LATEST     = "daily_quote_latest"      # 最近交易日 close(不指定 trade_date)
    INTRADAY_QUOTE         = "intraday_quote"          # 盘中实时报价(仅市场开盘)
    FUNDAMENTALS_QUARTERLY = "fundamentals_quarterly"
    ADJ_FACTOR             = "adj_factor"
    INDEX_DAILY            = "index_daily"


class MarketState(Enum):
    OPEN          = "open"           # 9:30-11:30 / 13:00-15:00 of trading day
    CLOSING       = "closing"        # 15:00-15:05 of trading day(数据沉降中,禁止 cache)
    CLOSED_FINAL  = "closed_final"   # 15:05+ of trading day OR non-trading day
    PRE_OPEN      = "pre_open"       # < 9:30 of next trading day


@dataclass(frozen=True)
class Quote:
    """A 股日线 quote 的 canonical 表示。

    单位约定(锁死,禁止 provider 越界):
      - close_raw / open_raw / high_raw / low_raw  : 元(4 位小数)
      - volume                                     : 手
      - amount                                     : 千元
      - adj_factor / adj_factor_latest             : 无量纲

    复权(不在 dataclass 字段,是 property):
      - close_qfq = close_raw * adj_factor / adj_factor_latest
      - close_hfq = close_raw * adj_factor
    """
    ts_code:           str
    trade_date:        str  # YYYY-MM-DD
    close_raw:         float
    open_raw:          float | None
    high_raw:          float | None
    low_raw:           float | None
    volume:            int | None
    amount:            float | None
    adj_factor:        float | None
    adj_factor_latest: float | None  # 用于 qfq 计算
    as_of:             datetime       # UTC, provider 拿到值的时刻
    freshness_seconds: int            # (now - as_of).total_seconds()
    source:            str            # provider id, e.g. "tushare-daily"

    @property
    def close_qfq(self) -> float | None:
        if self.adj_factor is None or self.adj_factor_latest is None:
            return None
        return round(self.close_raw * self.adj_factor / self.adj_factor_latest, 4)

    @property
    def close_hfq(self) -> float | None:
        if self.adj_factor is None:
            return None
        return round(self.close_raw * self.adj_factor, 4)


@dataclass(frozen=True)
class FreshnessReport:
    provider_id:    str
    last_success:   datetime | None
    last_failure:   datetime | None
    error_msg:      str | None
    rows_today:     int


@dataclass(frozen=True)
class HealthStatus:
    provider_id:      str
    healthy:          bool
    error:            str | None
    response_time_ms: float | None


# Typed exceptions —— Provider 边界用这些;Composer 必须区分。

class ProviderError(Exception):
    """Provider 调用基类异常。"""

class ProviderUnavailable(ProviderError):
    """Provider 暂时不可用(网络 / 上游 5xx)。Composer 应降级到下一 provider。"""

class ProviderQuotaExceeded(ProviderError):
    """超过频次 / 总量上限。Composer 应降级,并在告警中记录。"""

class CapabilityNotSupported(ProviderError):
    """Provider 没声明所需 capability。这是配置错误,不应被 Composer 静默吞下。"""

class CanonicalUnitViolation(ProviderError):
    """Provider 实现 bug:返回值越过 canonical 单位约定。Critical。"""


class DataProvider(ABC):
    """所有数据源的抽象基类。

    Provider 实现负责:
      1. 跟上游说话(HTTP / SQL / 本地表)
      2. 把结果转成 Quote / FreshnessReport
      3. 单位归一化(违反 → CanonicalUnitViolation)
      4. 错误归一化(→ ProviderError 子类)
    """

    id: str                        # 类属性,唯一
    capabilities: set[Capability]  # 类属性,声明
    canonical_unit_check: bool = True  # 是否对返回值跑单位校验(dev/test = True)

    @abstractmethod
    def get_quote(self, code: str, *, trade_date: str | None = None) -> Quote:
        """主接口。

        - trade_date=None:返回最近一次 close(要求 DAILY_QUOTE_LATEST 或 INTRADAY_QUOTE)
        - trade_date=YYYYMMDD:返回该日 close(要求 DAILY_QUOTE_HISTORICAL)
        """

    @abstractmethod
    def freshness(self) -> FreshnessReport: ...

    @abstractmethod
    def health(self) -> HealthStatus: ...

4.3 三个具体实现

4.3.1 TushareDailyProvider

python
class TushareDailyProvider(DataProvider):
    id = "tushare-daily"
    capabilities = {
        Capability.DAILY_QUOTE_HISTORICAL,
        Capability.DAILY_QUOTE_LATEST,
        Capability.ADJ_FACTOR,
    }

    def __init__(self, token: str): ...

    def get_quote(self, code, *, trade_date=None):
        # 1. tushare.call(api_name="daily", ...) → 拿 close_raw / open / high / low / volume / amount
        # 2. tushare.call(api_name="adj_factor", ...) → 拿 adj_factor 当日值 + 最新值
        # 3. 单位归一化:volume 已经是手;amount 单位转成千元(Tushare amount 原本就是千元)
        # 4. 包成 Quote 返回

调用 Tushare 两次(daily + adj_factor)。Composer 层做合并 cache 让两次合一次。

4.3.2 AkshareSpotProvider

python
class AkshareSpotProvider(DataProvider):
    id = "akshare-spot"
    capabilities = {Capability.INTRADAY_QUOTE}  # 仅市场开盘时

    def get_quote(self, code, *, trade_date=None):
        if trade_date is not None:
            raise CapabilityNotSupported(
                "akshare-spot 只支持 trade_date=None(实时盘口)"
            )
        # 1. akshare 实时全市场快照 → 取该 code
        # 2. 单位严格转换:amount 从元转到千元;volume 从股转到手
        #    任何转换失败 → CanonicalUnitViolation
        # 3. 包成 Quote (close_raw=current_price, adj_factor=None)
        # 4. as_of = 现在时刻;freshness_seconds = ~5

事故 4(akshare 成交额单位)就在第 2 步必须 catch。Provider 边界是单位污染的最后防线。

4.3.3 WarehouseProvider(W6 落地,依赖 01-data-layer.md T1)

python
class WarehouseProvider(DataProvider):
    id = "warehouse"
    capabilities = {
        Capability.DAILY_QUOTE_HISTORICAL,
        Capability.DAILY_QUOTE_LATEST,
        Capability.ADJ_FACTOR,
        Capability.FUNDAMENTALS_QUARTERLY,
        Capability.INDEX_DAILY,
    }

    def __init__(self, db_path: str): ...

    def get_quote(self, code, *, trade_date=None):
        # SELECT ... FROM normalized_daily JOIN normalized_adj_factor ...
        # 命中 → Quote
        # 未命中 → ProviderUnavailable(让 Composer 降到 Tushare)

4.4 ProviderRegistry

python
class ProviderRegistry:
    """单例式注册中心。启动时由 src/service/main.py 注册所有 provider。"""

    _providers: dict[str, DataProvider] = {}

    @classmethod
    def register(cls, provider: DataProvider) -> None:
        if not provider.id:
            raise ValueError("provider.id 不能为空")
        if provider.id in cls._providers:
            raise ValueError(f"provider {provider.id!r} 重复注册")
        if not provider.capabilities:
            raise ValueError(f"provider {provider.id!r} 必须声明至少一个 capability")
        cls._providers[provider.id] = provider

    @classmethod
    def get(cls, provider_id: str) -> DataProvider: ...

    @classmethod
    def by_capability(cls, cap: Capability) -> list[DataProvider]: ...

    @classmethod
    def all(cls) -> list[DataProvider]: ...

注册在 FastAPI lifespan 里:

python
# src/service/main.py
async def lifespan(app):
    ProviderRegistry.register(TushareDailyProvider(token=settings.tushare_token))
    ProviderRegistry.register(AkshareSpotProvider())
    if settings.warehouse_enabled:
        ProviderRegistry.register(WarehouseProvider(db_path=settings.warehouse_path))
    yield

5. Market State(基于 trade_cal)

5.1 数据依赖

需要 01-data-layer.md §3.6 一次性任务:先把 trade_cal 表落到 ~/twilight/data/warehouse.duckdb,覆盖 today - 30d ~ today + 365d

这是本 spec 的硬依赖。 W1 第一步就是种下 trade_cal,不能等 01 整个 T1 落地。

5.2 状态判定

python
class MarketStateResolver:
    """通过本地 trade_cal 表 + 当前时刻判定 MarketState。"""

    def __init__(self, db_path: str): ...

    def resolve(self, now: datetime | None = None) -> MarketState:
        now = now or datetime.now(ZoneInfo("Asia/Shanghai"))
        today_is_open = self._is_trading_day(now.date())
        t = now.time()

        if not today_is_open:
            return MarketState.CLOSED_FINAL

        if t < time(9, 30):
            return MarketState.PRE_OPEN
        if (time(9, 30) <= t < time(11, 30)) or (time(13, 0) <= t < time(15, 0)):
            return MarketState.OPEN
        if time(15, 0) <= t < time(15, 5):
            return MarketState.CLOSING
        return MarketState.CLOSED_FINAL  # 15:05+

Resolver 启动时把 trade_cal 整表读进内存(~250 行/年),resolve() 是纯计算。

5.3 时间精度

中国 A 股时区是 Asia/Shanghai (UTC+8, no DST)。所有 MarketState 判定走该时区;Quote.as_of 走 UTC。两套时区混用是 bug。

6. Composer / Router

6.1 路由策略

python
class QuoteComposer:
    """根据 (capability, market_state) 选 provider 链,按链顺序尝试。"""

    # 路由策略表:(请求语义, market_state) → 优先级 provider id 列表
    ROUTES: dict[tuple[str, MarketState], list[str]] = {
        # 历史 close(指定 trade_date)—— 永远走 warehouse → tushare
        ("historical", MarketState.OPEN):         ["warehouse", "tushare-daily"],
        ("historical", MarketState.CLOSING):      ["warehouse", "tushare-daily"],
        ("historical", MarketState.CLOSED_FINAL): ["warehouse", "tushare-daily"],
        ("historical", MarketState.PRE_OPEN):     ["warehouse", "tushare-daily"],

        # 最新 close(trade_date=None)开盘期间 —— 实时盘口优先
        ("latest", MarketState.OPEN):             ["akshare-spot", "warehouse", "tushare-daily"],
        # CLOSING 期 —— 数据沉降,禁止任何 cache 命中;只走最权威源
        ("latest", MarketState.CLOSING):          ["tushare-daily"],
        # 收盘后 —— warehouse 应该已经刷新;akshare 禁用
        ("latest", MarketState.CLOSED_FINAL):     ["warehouse", "tushare-daily"],
        ("latest", MarketState.PRE_OPEN):         ["warehouse", "tushare-daily"],
    }

    def get_quote(self, code: str, *, trade_date: str | None = None) -> Quote:
        intent = "historical" if trade_date else "latest"
        state = self.market.resolve()
        chain = self.ROUTES[(intent, state)]

        last_error: ProviderError | None = None
        for provider_id in chain:
            try:
                provider = self.registry.get(provider_id)
            except KeyError:
                continue  # provider 未注册(warehouse 在 W5 之前)
            try:
                quote = provider.get_quote(code, trade_date=trade_date)
                self._audit(code, trade_date, intent, state, chain, provider_id, ok=True)
                return quote
            except (ProviderUnavailable, ProviderQuotaExceeded) as e:
                last_error = e
                continue  # 降级
            except (CapabilityNotSupported, CanonicalUnitViolation):
                # bug —— 不降级,直接抛
                raise

        self._audit(code, trade_date, intent, state, chain, None, ok=False, error=last_error)
        raise ProviderUnavailable(
            f"all providers exhausted for {code} (intent={intent}, state={state.value}); "
            f"last_error={last_error!r}"
        )

6.2 失败显式

  • CapabilityNotSupported / CanonicalUnitViolation 永远不降级 —— 这是 bug,必须暴露
  • ProviderUnavailable / ProviderQuotaExceeded 才降级
  • 全链失败抛 ProviderUnavailable返回任何"看起来像 quote"的对象
  • _audit() 写到 provider_audit_log 表(每次调用一行)

6.3 不做的事

  • ❌ 跨源自动对账(多 provider 都成功 → 比较 → 选边)。监测告警在 §8 做,不自动选。
  • ❌ Provider 内部 retry(轮一次失败就抛 ProviderUnavailable,让 Composer 降级而不是 provider 内黑盒重试)

7. 市场感知 Cache TTL

7.1 现行问题

现行 daily_cache.TTL = 24h与市场状态脱钩的死值。事故 1/3 都是它造成的。

7.2 新策略

Cache 层在 Composer 之上,按 (code, trade_date) cache Quote 对象。TTL 由 MarketState + trade_date 联合决定:

trade_dateMarketStateTTL 行为
任意(指定历史日)任意永久 cache(历史 trade_date 的 close 永远 immutable)
NoneOPEN60 秒过期(盘中波动大)
NoneCLOSING不写 cache 也不读 cache(15:00-15:05 数据沉降中,所有调用都直接打到 provider 链)
NoneCLOSED_FINAL有效至次个交易日 9:30(今日收盘价已定)
NonePRE_OPEN有效至今日 9:30(昨日 close 在开盘前一直有效)

7.3 实现

python
class QuoteCache:
    def get(self, code: str, trade_date: str | None, market_state: MarketState) -> Quote | None: ...
    def put(self, quote: Quote, market_state: MarketState) -> None: ...
    def invalidate_market_open(self) -> None:
        """每天 9:30 调用,清掉 trade_date=None 的所有 entry。"""

cache backend:DuckDB 本地表 quote_cache(与仓库同 warehouse.duckdb 文件,复用 01 §6.6.2 的 WarehousePool),替代现行 daily_cache。schema 新增 cached_at + expires_at 列,expires_at IS NULL 表示永久。

7.4 现有 daily_cache 表迁移

W1 不动现有 daily_cache;W5 上线 QuoteCache 后跑 schema 迁移把老表数据 backfill 进来;W6 删老表。

8. 可观测性

8.1 三张表

sql
-- 每次 Composer.get_quote 调用一行
CREATE TABLE provider_audit_log (
  id              BIGINT PRIMARY KEY,
  ts              TIMESTAMP NOT NULL,        -- UTC
  code            VARCHAR,
  trade_date      VARCHAR,
  intent          VARCHAR,                   -- 'historical' | 'latest'
  market_state    VARCHAR,
  chain           VARCHAR,                   -- comma-joined provider ids attempted
  served_by       VARCHAR,                   -- final provider that succeeded, or NULL
  freshness_secs  INTEGER,                   -- as_of 距 ts 的秒数
  latency_ms      INTEGER,                   -- 整个 Composer 调用耗时
  error           VARCHAR                    -- NULL on success
);

-- 每个 provider 的实时 freshness 状态(in-memory + 周期性持久化)
CREATE TABLE provider_freshness (
  provider_id     VARCHAR PRIMARY KEY,
  last_success    TIMESTAMP,
  last_failure    TIMESTAMP,
  error_msg       VARCHAR,
  rows_today      INTEGER DEFAULT 0,
  updated_at      TIMESTAMP NOT NULL
);

-- 跨源 diff 监测(多 provider 同时返回成功时记录差异)
CREATE TABLE provider_diff_log (
  id              BIGINT PRIMARY KEY,
  ts              TIMESTAMP NOT NULL,
  code            VARCHAR,
  trade_date      VARCHAR,
  provider_a      VARCHAR,                   -- e.g. 'warehouse'
  value_a         DOUBLE,
  provider_b      VARCHAR,                   -- e.g. 'tushare-daily'
  value_b         DOUBLE,
  diff_pct        DOUBLE,                    -- (a - b) / b * 100
  severity        VARCHAR                    -- 'info' | 'warning' | 'critical'
);

8.2 三个 endpoint

GET /quality/freshness
  → [{provider_id, last_success, last_failure, error_msg, rows_today}, ...]

GET /quality/audit?since=2026-05-08T00:00&limit=100
  → 最近 N 次 Composer 调用记录

GET /quality/diff?severity=warning&since=...
  → 跨源不一致历史

8.3 跨源 diff 触发条件

  • 仅在 OPEN 期,且 latest 请求成功路由到 ≥ 2 provider 时才记录
  • diff_pct > 1% → severity=warning,写入 + 写入 stderr log
  • diff_pct > 5% → severity=critical,写入 + 写入 stderr log + 异步 webhook 告警(webhook URL 是 P1.1 范畴,本 spec 留 hook)

9. 响应 schema 升级(breaking change)

9.1 现行 /price 响应

json
{
  "value": 47.52,
  "metric": "close",
  "code": "600105.SH",
  "as_of": "2026-05-07",
  "cite": {
    "kind": "tool",
    "source": "tushare",
    "table": "daily",
    "fetched_at": "2026-05-07T07:30:00Z",
    "tool_call_id": "tc_abc123",
    "served_by": "tushare"
  }
}

9.2 新 /price 响应

json
{
  "ts_code": "600105.SH",
  "trade_date": "2026-05-07",
  "close_raw": 47.52,
  "close_qfq": 46.12,
  "close_hfq": 49.80,
  "open_raw": 45.30,
  "high_raw": 47.90,
  "low_raw": 45.10,
  "volume": 1283000,
  "amount": 5917430,
  "adj_factor": 1.234,
  "adj_factor_latest": 1.272,
  "as_of": "2026-05-07T07:30:00Z",
  "freshness_seconds": 480,
  "source": "warehouse",
  "source_chain": ["warehouse"],
  "market_state": "closed_final",
  "units": {
    "price": "CNY",
    "volume": "lots_100shares",
    "amount": "thousand_CNY",
    "share_count": "ten_thousand_shares",
    "market_cap": "ten_thousand_CNY"
  },
  "cite": {
    "kind": "tool",
    "tool_call_id": "tc_abc123",
    "fetched_at": "2026-05-07T07:30:00Z",
    "source_chain": ["warehouse"]
  }
}

9.3 Breaking change 影响面

客户端影响修复
core/skill/scripts/fetch_price.py解析的 value 字段没了改读 close_qfq(默认);提供 --raw 选项读 close_raw
Hermes profile stock-research-agent 里调 fetch_price 的脚本同上fetch_price.py 改完即修
/whoami / /healthz
/fundamentals 路由不受本 spec 影响留待后续 spec

W4 同步发布:backend 升级 + fetch_price.py 升级。不做 v1/v2 双轨

10. 迁移路线(按周)

W1 — 骨架 + trade_cal 种子

  • [ ] core/data/providers/base.py:Capability / MarketState / Quote / 三个 Report / 五种 ProviderError / DataProvider ABC
  • [ ] core/data/providers/__init__.py:ProviderRegistry
  • [ ] 种 trade_cal 表:一次性脚本 scripts/seed_trade_cal.py,从 Tushare 拉 today-30d ~ today+365d,写到 ~/twilight/data/warehouse.duckdbnormalized_trade_cal
  • [ ] core/data/providers/_market_state.py:MarketStateResolver
  • [ ] 单元测试:Quote dataclass / Capability 校验 / MarketStateResolver 边界(9:29:59 / 11:30:00 / 15:00:00 / 15:04:59 / 15:05:00 / 周末 / 春节)

W2 — Provider 包装(行为不变)

  • [ ] TushareDailyProvider:包装现有 core/data/tushare.py,加 adj_factor 调用与单位校验
  • [ ] AkshareSpotProvider:包装现有 core/data/akshare.py关键:单位校验严格化(成交额转千元,volume 转手)
  • [ ] Provider conformance 测试:每个 provider 跑同一组用例(已知 code 返回合理值 / 错误归一化 / 单位不越界)

W3 — Composer + Cache

  • [ ] core/data/composer.py:QuoteComposer + ROUTES 表
  • [ ] core/data/quote_cache.py:QuoteCache(DuckDB backend)
  • [ ] Composer 在 lifespan 注入;FastAPI app.state.composer = ...
  • [ ] 集成测试:5 起事故的 regression case 全部 pass

W4 — /price 切流(breaking)

  • [ ] src/service/routes/price.py:调 Composer,响应 schema 升级
  • [ ] core/skill/scripts/fetch_price.py:解析新 schema,默认 close_qfq
  • [ ] core/core/data/schemas.pyget_price 函数:标记 deprecated,内部转调 Composer
  • [ ] Hermes profile 里 cron 脚本 review 一遍,确保没人在直接读 value 字段

W5 — 可观测性

  • [ ] 三张表的 schema 落库
  • [ ] Composer._audit / FreshnessTracker / DiffMonitor 实现
  • [ ] /quality/freshness /quality/audit /quality/diff 三个 endpoint
  • [ ] 现有 daily_cache 数据 backfill 进 QuoteCache,老表保留只读

W6 — WarehouseProvider(依赖 01 W1 + W2)

  • [ ] 等 01 plan 的 W1 (T1 落仓 + Stage 1 两年回填) + W2 (/price 切流 + v_daily_full 视图) 完成
  • [ ] WarehouseProvider 实现 + 注册(读 WarehousePool,from §6.6.2 of 01)
  • [ ] 删老的 daily_cache 表(与 01 的老 cache 同周下线)
  • [ ] 删 core/core/data/_market.py(被 MarketStateResolver 替代)

依赖关系明示: 02 spec 的 W1-W5 完全独立于 01;只有 W6 需要 01 的仓库就位。01 W1-W2(约 1-2 周)跑完,02 W6 即可上线,整体两条线并行最快 ~6 周收尾。

11. 显式不做(Out of Scope)

  • ❌ 多租户配额(profile manager 范畴;Phase 2 §B)
  • ❌ LLM provider 抽象(Hermes 自己做的事)
  • ❌ 实时 / 分钟数据(独立付费权限)
  • ❌ 跨源自动选边(只做监测告警,不自动)
  • ❌ 替换 akshare(先包装收口;后续可选换源)
  • ❌ Webhook 告警的具体 URL / 通道(只留 hook,配置后置)
  • ❌ Composer 的 sync → async 改造(FastAPI 不强制;存量代码同步即可)
  • ❌ TDX (通达信) 跨源验证 —— 独立 spec,W7+ 启动(见 01-data-layer.md §8.6)

12. 开放问题

  1. akshare 单位的现实情况。 事故 4 显示 akshare 成交额 是元,但 akshare 的同一接口对 ETF / 港股 / 美股可能是不同单位。AkshareSpotProvider 实现时要枚举所有 code 类型还是保守只支持 A 股 6 位代码?倾向后者。
  2. adj_factor_latest 的获取。 Tushare adj_factor 接口返回某 trade_date 的因子,"最新"需要再调一次(trade_date=最新交易日)。是 provider 内部维护一份 daily-refresh 的 cache 还是每次调用都打两次?倾向 in-memory cache + 每天 9:30 失效。
  3. Cache 命中时的 freshness_seconds 计算。 Cache 里存的是 Quote,Quote 自带 as_of。返回时重算 freshness_seconds = (now - as_of).total_seconds(),还是直接返回 cache 时的快照值?倾向重算(让"现在 freshness 多老"准确)。
  4. AkshareSpotProvider 的 freshness。 akshare 实时盘口理论上 < 5s,但底层是爬新浪/腾讯财经,没有可靠 timestamp。强制 as_of=nowfreshness_seconds=0 还是设个保守的 freshness_seconds=15?倾向 15s 保守值。
  5. CLOSING 期(15:00-15:05)的体验。 该期 cache 全 miss + provider 链短(只 tushare),可能短暂 P99 飙升。可以加一个 tight loop "每 30s 主动刷一次 latest" 让 cache 暖起来,但是工程上不优雅。先放这条,看 audit log 是否真有 P99 问题。
  6. CapabilityNotSupported 的 strictness。 Provider 没声明 capability 但 Composer 还是路由过来 = 配置 bug。直接 raise 还是 log + 跳到下一个 provider?倾向 raise(配置 bug 必须暴露)。
  7. trade_cal 数据源依赖。 本 spec W1 第一步靠 Tushare 一次性拉 trade_cal —— 但 Tushare 的 trade_cal 接口本身存在 §1 提到的 "120 文档说要 2000" 的不一致风险(虽然探针证实可调)。如果哪天突然不让调,要不要内置一份硬编码的节假日表 fallback?暂不做,flag 风险。

13. 与 01-data-layer.md 的一致性检查

01 spec本 spec状态
价格单位元(DOUBLE) §6.2元(Quote.close_raw)
成交量单位手(BIGINT)§6.2手(Quote.volume)§4.2
成交额单位千元(与 Tushare daily.amount 对齐)§6.2千元(Quote.amount)
总市值单位万元(保留原单位)§6.5 / §14 Q3units 字段透传 §9.2
总股本单位万股 §6.5units 字段透传
复权策略存原始因子,业务逻辑后置 §6.3Quote.close_qfq / close_hfq property §4.2
trade_date 类型DATE §6.2trade_date: str (YYYY-MM-DD) §4.2✅ 边界转换在 Provider
ingested_at 时区UTC naive §6.5.6as_of: UTC aware §4.2✅ 边界转换 replace(tzinfo=UTC)
trade_cal 落仓§3.6 + §12 W1 种子W1 第一步种子 §10✅(依赖明确)
WarehouseProvider 表来源normalized_daily / normalized_adj_factor / normalized_daily_basic §3.1同 §4.3.3
视图 v_daily_full§6.6.1WarehouseProvider 优先读视图 §4.3.3
连接管理WarehousePool (read) + WarehouseWriter §6.6.2WarehouseProvider 用 pool 的 read
现有 daily_cache 命运切流后 W6 下线 §14 Q5W6 删 §10
回填 StageStage 1 = 2 年 §8.1W6 依赖 Stage 1 完成 §10
TDX 兜底§8.6 列入 W7+ 独立 spec§11 Out of scope(同步约定)

任何 01 后续修改 → 本 spec 同步刷新。

团队内部文档