主题
Lifespan Integration — W2 ↔ W3 衔接 spec
Date: 2026-05-10 Status: Draft (forward-looking; describes the final form after merging in-flight PRs) Owner: liang Sibling docs:
01-data-layer.md §6.6.2 | WarehousePool + WarehouseWriter design |
01-data-layer.md §7 | APScheduler ingest schedule |
2026-05-08-tushare-integration-design.md §4 | DataProvider ABC + Registry |
2026-05-08-tushare-integration-design.md §6 | QuoteComposer + ROUTES |
2026-05-08-tushare-integration-design.md §7 | QuoteCache market-aware TTL |
2026-05-08-tushare-integration-design.md §8 | AuditLog 3 tables + observability |
2026-05-08-deployment-toolset-rollout.md §7 W2 (T2.7) | "lifespan 注入 ProviderRegistry / Composer" |
1. Background
The v0.2.0 baseline src/service/main.py lifespan does one thing:
python
@asynccontextmanager
async def lifespan(_app: FastAPI) -> AsyncIterator[None]:
init_token_db(get_settings().bearer_db_path)
yieldAfter v0.3.0 W2 + W3 wiring, lifespan needs to manage 8 components plus their dependency ordering. Two parallel work streams are converging:
User PR #38 (W3) introduces:
WarehouseWriterinstantiation in lifespanScheduler(APScheduler) for ingest jobs/warehouse/*routes/admin/ingest/backfill/*admin endpoints
Cron T2.7 (W2.7) will introduce:
WarehousePoolfor read pathsMarketStateResolver(already exists as a class on main since #30)AuditLog(PR #41) withwebhook_url(PR #46)QuoteComposer(#34 on main) wired withaudit_log(#42) +freshness sync(#43)QuoteCache(PR #39) with calendar-aware TTL (#44)ProviderRegistrypopulated withTushareDailyProvider(#32) +AkshareSpotProvider(#33)/priceroute refactored to useQuoteComposer(replaces directcore.data.schemas.get_price)/quality/*endpoints (W5, separately)
Without coordination, both streams could:
- Re-instantiate the same DuckDB writer differently → DuckDB locking conflicts
- Disagree on whether
AuditLoglives inapp.stateor as a global - Read settings from different env-var prefixes
- Skip wiring AuditLog into Composer (defeating freshness sync)
This spec is the forward-looking integration document — when both streams land, lifespan should look like the design here.
2. Goals
- Single source of truth for lifespan startup order
- Pin shared resource decisions (which components share a DuckDB connection / writer)
- Define settings flow — env var →
WarehouseSettings→ component constructor - Document failure modes during startup (what blocks vs what's optional)
- Order-of-merge guidance — which merge order produces a clean integration
3. Component dependency graph
┌─────────────────┐
│ WarehouseSettings│
│ (pydantic-settings)│
└─────┬───────────┘
│ (env / YAML)
┌───────────────────┼───────────────────┬─────────────────┐
│ │ │ │
▼ ▼ ▼ ▼
┌──────────┐ ┌──────────────┐ ┌─────────────┐ ┌────────────┐
│ Pool │ │ Writer │ │ AuditLog │ │ Resolver │
│ (read) │ │ (write+lock) │ │ + webhook │ │ (trade_cal)│
└────┬─────┘ └──────┬───────┘ └──┬──────────┘ └─────┬──────┘
│ │ │ │
│ └───┬─────────┐ │ │
│ │ │ │ │
│ ▼ ▼ ▼ ▼
│ ┌─────────┐ ┌──────────┐ ┌──────────────┐
│ │ Cache │ │ Composer │←───────┤ ProviderRegistry
│ │ (TTL) │ │ (ROUTES) │ │ + Tushare/Akshare
│ └────┬────┘ └────┬─────┘ └──────────────┘
│ │ │
│ │ │
▼ ▼ ▼
┌─────────────┐ ┌───────────────────────┐
│ /warehouse/*│ │ /price /fundamentals│
│ (read) │ └───────────────────────┘
└─────────────┘
┌──────────────┐
│ Scheduler │
│ (APScheduler)│ ← uses Writer + AuditLog
└──────┬───────┘
│
▼
┌────────────────┐
│ /admin/ingest/*│
└────────────────┘Shared vs per-instance decisions
| Component | Sharing | Rationale |
|---|---|---|
WarehousePool (read) | Singleton | 4-conn bounded pool; multiple readers serialize via queue |
WarehouseWriter (write) | Singleton | DuckDB single-writer constraint; threading.Lock inside |
AuditLog | Singleton | Multi-writer safe within process; DB-level isolation |
MarketStateResolver | Singleton | In-memory frozenset of trading days; reload on demand |
QuoteCache | Singleton | Reuses DuckDB connections; market-aware TTL state |
QuoteComposer | Singleton | Stateless apart from injected deps |
ProviderRegistry | Class-level singleton | Already a classmethod registry (no instance) |
Scheduler | Singleton | APScheduler one per process |
All singletons live on app.state.<name>. Routes acquire them via FastAPI dependency injection or direct app.state.X access.
4. Final lifespan code (target form)
python
# src/service/main.py — final form after W2.7 + W3 ingest both merge
from __future__ import annotations
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from fastapi import Depends, FastAPI
from core.data.audit_log import AuditLog
from core.data.composer import QuoteComposer
from core.data.providers import ProviderRegistry
from core.data.providers._market_state import MarketStateResolver
from core.data.providers.akshare_spot import AkshareSpotProvider
from core.data.providers.tushare_daily import TushareDailyProvider
from core.data.quote_cache import QuoteCache
from service.auth import Principal, init_token_db, require_bearer
from service.config import get_settings
from service.db import WarehousePool, WarehouseWriter
from service.routes import (
admin as admin_routes,
fundamentals as fundamentals_routes,
price as price_routes,
quality as quality_routes,
warehouse as warehouse_routes,
)
from service.scheduler import Scheduler
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
settings = get_settings()
# Phase 1 — bearer token store (pre-existing v0.2.0; do NOT move).
init_token_db(settings.bearer_db_path)
# Phase 2 — DuckDB warehouse pool + writer (W1.3).
app.state.warehouse_pool = WarehousePool(
settings.warehouse_path,
size=settings.duckdb_read_pool_size,
memory_limit=settings.duckdb_read_memory_limit,
threads=settings.duckdb_read_threads,
)
app.state.warehouse_writer = WarehouseWriter(
settings.warehouse_path,
memory_limit=settings.duckdb_write_memory_limit,
threads=settings.duckdb_write_threads,
)
# Phase 3 — Market state from local trade_cal (W1.2).
app.state.market_state = MarketStateResolver(
settings.warehouse_path,
exchange=settings.market_exchange,
)
# Phase 4 — Audit log (with optional webhook for critical diffs).
app.state.audit_log = AuditLog(
settings.warehouse_path,
webhook_url=settings.alert_webhook_url,
)
# Phase 5 — Quote cache (market-aware TTL, calendar-aware via Resolver).
app.state.quote_cache = QuoteCache(
settings.warehouse_path,
market_state=app.state.market_state,
)
# Phase 6 — Provider registry — register concrete providers.
ProviderRegistry.register(
TushareDailyProvider(settings.tushare_token.get_secret_value())
)
ProviderRegistry.register(AkshareSpotProvider())
# WarehouseProvider lands in W6.
# Phase 7 — Composer (orchestrator).
app.state.composer = QuoteComposer(
market_state=app.state.market_state,
audit_log=app.state.audit_log,
)
# Phase 8 — Ingest scheduler (W3, from user PR #38).
if settings.scheduler_enabled:
app.state.scheduler = Scheduler(
writer=app.state.warehouse_writer,
audit_log=app.state.audit_log,
market_state=app.state.market_state,
settings=settings,
)
app.state.scheduler.start()
yield
# Teardown — reverse order.
if hasattr(app.state, "scheduler"):
app.state.scheduler.stop()
app.state.warehouse_pool.close()
app.state.warehouse_writer.close()
ProviderRegistry.clear()
def create_app() -> FastAPI:
app = FastAPI(title="twilight-drive", version="0.3.0", lifespan=lifespan)
@app.get("/healthz")
def healthz() -> dict[str, str]:
return {"status": "ok"}
@app.get("/whoami")
def whoami(principal: Principal = Depends(require_bearer)) -> dict[str, str]:
return {"user_id": principal.user_id, "plan": principal.plan}
app.include_router(price_routes.router)
app.include_router(fundamentals_routes.router)
app.include_router(warehouse_routes.router) # W3 (PR #38)
app.include_router(admin_routes.router) # W3 (PR #38)
app.include_router(quality_routes.router) # W5
return app
app = create_app()Phase ordering rationale
| Phase | Why this position |
|---|---|
| 1 (token DB) | Inherited from v0.2.0; routes' require_bearer depends on it; must be earliest |
| 2 (Pool/Writer) | All later DuckDB consumers (Resolver, AuditLog, Cache) reuse the same warehouse.duckdb file |
| 3 (Resolver) | Reads normalized_trade_cal from the warehouse (Phase 2's file) |
| 4 (AuditLog) | Writes provider_* tables to the warehouse |
| 5 (Cache) | Needs market_state from Phase 3 |
| 6 (Registry) | Concrete providers wrap upstream APIs (Tushare/Akshare); no DB dep |
| 7 (Composer) | Needs registry (Phase 6) + market_state (Phase 3) + audit_log (Phase 4) |
| 8 (Scheduler) | Needs writer (Phase 2) + audit_log (Phase 4) + market_state (Phase 3) |
5. Settings schema (target)
service.config.WarehouseSettings extends the v0.2.0 baseline:
python
class WarehouseSettings(BaseSettings):
# v0.2.0 baseline (do NOT rename)
bearer_db_path: Path
tushare_token: SecretStr
# W1.3 connection management (PR #26 already on main)
warehouse_path: Path = Path("/data/warehouse.duckdb")
duckdb_read_pool_size: int = 4
duckdb_read_memory_limit: str = "256MB"
duckdb_read_threads: int = 1
duckdb_write_memory_limit: str = "512MB"
duckdb_write_threads: int = 2
# W1.2 market state (PR #30 already on main)
market_exchange: str = "SSE"
# W2 alt5 webhook (PR #46)
alert_webhook_url: str | None = None
# W3 scheduler (user PR #38)
scheduler_enabled: bool = True
backfill_auto_start: bool = False
model_config = SettingsConfigDict(
env_prefix="TWILIGHT_",
env_file=".env",
)Env var map
| Env var | Purpose |
|---|---|
TWILIGHT_BEARER_DB_PATH | bearer SQLite (v0.2.0) |
TWILIGHT_TUSHARE_TOKEN | Tushare Pro token |
TWILIGHT_WAREHOUSE_PATH | DuckDB warehouse file |
TWILIGHT_DUCKDB_READ_POOL_SIZE | int default 4 |
TWILIGHT_DUCKDB_READ_MEMORY_LIMIT | "256MB" |
TWILIGHT_MARKET_EXCHANGE | "SSE" |
TWILIGHT_ALERT_WEBHOOK_URL | optional; empty → no webhook |
TWILIGHT_SCHEDULER_ENABLED | "true"/"false" |
TWILIGHT_BACKFILL_AUTO_START | "true"/"false" |
YAML override: ${WAREHOUSE_DB_PATH}/config/runtime.yaml (matches deployment-toolset-rollout.md §5).
6. Failure modes during startup
What blocks startup vs what's tolerated:
| Failure | Phase | Behavior |
|---|---|---|
| Bearer DB unwriteable | 1 | Crash — auth is required for non-public routes |
| Warehouse DuckDB unwriteable | 2 | Crash — every later phase needs this file |
| Memory limit too high for host | 2 | DuckDB raises at connect time → crash |
normalized_trade_cal missing rows | 3 | Resolver loads empty set; is_trading_day returns False everywhere; MarketState permanently CLOSED_FINAL. Composer routes still work; cache TTL fallback to "tomorrow 9:30". Continue with warning log. |
| Webhook URL unreachable | 4 | Just stored; not validated. First record_diff(critical) fires + swallows fail. Continue. |
TUSHARE_TOKEN missing | 6 | TushareDailyProvider("") raises ValueError → crash. (Alternative: skip registration; but then Composer chains have no provider; better to fail loudly.) |
| akshare import fails | 6 | AkshareSpotProvider() constructor — currently no upstream call at construction (akshare is imported lazily inside get_spot_price). Continue — the provider registers; first INTRADAY_QUOTE call surfaces the import error as ProviderUnavailable. |
| Scheduler can't start | 8 | Log + skip if scheduler_enabled=true. Backend continues serving reads. |
7. Route → component access pattern
| Route | Reads from |
|---|---|
/healthz | nothing (lifespan-independent) |
/whoami | auth.token_db (Phase 1) |
/price | app.state.composer (Phase 7) |
/fundamentals | app.state.composer (Phase 7) (or direct provider for now until W6) |
/warehouse/daily | app.state.warehouse_pool (Phase 2) |
/warehouse/fundamentals | app.state.warehouse_pool (Phase 2) |
/admin/ingest/backfill/* | app.state.scheduler (Phase 8) |
/quality/freshness | app.state.audit_log.freshness_snapshot() (Phase 4) |
/quality/audit | app.state.audit_log.recent_audit() |
/quality/diff | app.state.audit_log.recent_diffs() |
Use FastAPI Depends(get_composer) style for testability:
python
def get_composer(request: Request) -> QuoteComposer:
return request.app.state.composer8. Order-of-merge guidance
Suggested merge sequence to minimize integration friction:
PHASE A — independent W2 alt + #38 (any order):
Merge #41 W2 alt AuditLog
Merge #46 W2 alt5 webhook (depends on #41 → auto-rebase to main)
Merge #39 W2.4 QuoteCache
Merge #44 W2 alt4 trade_cal-aware TTL (depends on #39 → auto-rebase)
Merge #38 user W3 ingest scheduler
PHASE B — Composer chain rebases auto-clean after #41:
Merge #42 W2 alt2 Composer ↔ AuditLog
Merge #43 W2 alt3 freshness sync (depends on #42)
Merge #40 W2.5+W2.6 conformance + regression (depends on #39)
PHASE C — T2.7 lifespan:
Now main has all W2 + W3 components → cron writes T2.7 PR
T2.7 wires the lifespan exactly as §4 describes
No conflicts: just additive imports + lifespan body
PHASE D — W4+ ahead:
/price route refactor to use Composer
/fundamentals route refactor (W5)
Hermes profile migration (skill scripts already use Service mode)PHASE A and B order can be flipped (everything's mergeable independently once #41 lands first). PHASE C is the integration phase the spec describes.
9. Open questions
Should
/priceroute refactor land before or after T2.7?- Before:
/pricekeeps the oldcore.data.schemas.get_pricepath until Composer is ready; T2.7 then changes the route to use Composer. Two PRs. - After: T2.7 wires lifespan AND refactors
/pricein one PR. Bigger diff but atomic. - Recommendation: after (atomic). Diff is ~150 lines including tests.
- Before:
Where does
WarehouseProvider(W6) get instantiated? Phase 6, conditionally onsettings.warehouse_provider_enabled. Add to settings when W6 lands.Scheduler vs request handler shared writer. Both use
app.state.warehouse_writer. Locked internally — fine. Document this explicitly so no one adds a second writer instance.Test pattern for lifespan. TestClient triggers lifespan; need fixtures for tmp
warehouse.duckdb, mockedtushare.call, etc. Spec for that is W2.7's PR responsibility.Reload of trade_cal.
MarketStateResolverreloads on demand. Should the daily ingest job (W3) callapp.state.market_state.reload()after updatingnormalized_trade_cal? Yes — add to user PR #38's daily batch end-of-job hook (or T2.7 adds this).Webhook URL validation. Should AuditLog validate the URL at construction (e.g., reject non-https://)? Currently no validation; relies on operator. Add later if mis-configurations cause issues.
Graceful shutdown. Lifespan teardown calls
pool.close()andwriter.close(). Need to ensure no in-flight requests hold a connection from the pool. FastAPI's lifespan runs after all requests drain; this should be safe.
10. Out of scope
- Implementation of any code (this is forward-looking docs only)
- Changes to existing v0.2.0 routes (
/price,/fundamentals) — those are T2.7's responsibility - W5
/quality/*HTTP endpoint internals (separate spec) - W6
WarehouseProviderregistration logic (separate phase) - Multi-tenant lifespan (Phase 2 §B)
11. Acceptance criteria for T2.7 PR
When the cron writes T2.7, it should produce a src/service/main.py that:
- [ ] Matches the §4 lifespan structure (8 phases, in order)
- [ ] All settings from §5 are read via
get_settings() - [ ] Failure modes per §6 are documented in code comments / log messages
- [ ] Existing v0.2.0 tests for
/whoami,/healthz,/pricestill pass (with/pricerefactored to use Composer, which gives same envelope shape via spec §9 — see open Q #1 about ordering) - [ ] No new env vars without
TWILIGHT_prefix - [ ] Routes from #38 (
warehouse_routes,admin_routes) included - [ ] Tests cover lifespan startup/teardown happy path + at least 2 failure modes (missing token, missing trade_cal)
12. Cross-references
01-data-layer.md§6.6.2 (Pool/Writer)01-data-layer.md§7 (scheduler)02-tushare-integration-design.md§4 (DataProvider ABC)02-tushare-integration-design.md§6 (Composer ROUTES)02-tushare-integration-design.md§7 (Cache TTL)02-tushare-integration-design.md§8 (AuditLog tables)02-tushare-integration-design.md§11 (webhook hook — implemented in PR #46)2026-05-08-deployment-toolset-rollout.md§5 (settings layering)2026-05-08-deployment-toolset-rollout.md§7 W2 T2.7 (the actual implementation)
In-flight PRs this spec assumes will merge:
| PR | Title | What it adds |
|---|---|---|
| #38 | W3 T1 warehouse ingest, scheduler, routes | Phase 8 + warehouse routes |
| #39 | W2.4 QuoteCache | Phase 5 |
| #40 | W2.5+W2.6 conformance + regression | tests only |
| #41 | W2 alt AuditLog | Phase 4 (basic) |
| #42 | W2 alt2 Composer ↔ AuditLog | Composer accepts audit_log |
| #43 | W2 alt3 freshness sync | Composer auto-syncs on each call |
| #44 | W2 alt4 trade_cal-aware TTL | Cache uses Resolver's next_trading_day |
| #45 | W7+ TDX cross-source spec | docs only |
| #46 | W2 alt5 webhook hook | Phase 4 (with webhook) |