Skip to content

Lifespan Integration — W2 ↔ W3 衔接 spec

Date: 2026-05-10 Status: Draft (forward-looking; describes the final form after merging in-flight PRs) Owner: liang Sibling docs:

01-data-layer.md §6.6.2WarehousePool + WarehouseWriter design
01-data-layer.md §7APScheduler ingest schedule
2026-05-08-tushare-integration-design.md §4DataProvider ABC + Registry
2026-05-08-tushare-integration-design.md §6QuoteComposer + ROUTES
2026-05-08-tushare-integration-design.md §7QuoteCache market-aware TTL
2026-05-08-tushare-integration-design.md §8AuditLog 3 tables + observability
2026-05-08-deployment-toolset-rollout.md §7 W2 (T2.7)"lifespan 注入 ProviderRegistry / Composer"

1. Background

The v0.2.0 baseline src/service/main.py lifespan does one thing:

python
@asynccontextmanager
async def lifespan(_app: FastAPI) -> AsyncIterator[None]:
    init_token_db(get_settings().bearer_db_path)
    yield

After v0.3.0 W2 + W3 wiring, lifespan needs to manage 8 components plus their dependency ordering. Two parallel work streams are converging:

  • User PR #38 (W3) introduces:

    • WarehouseWriter instantiation in lifespan
    • Scheduler (APScheduler) for ingest jobs
    • /warehouse/* routes
    • /admin/ingest/backfill/* admin endpoints
  • Cron T2.7 (W2.7) will introduce:

    • WarehousePool for read paths
    • MarketStateResolver (already exists as a class on main since #30)
    • AuditLog (PR #41) with webhook_url (PR #46)
    • QuoteComposer (#34 on main) wired with audit_log (#42) + freshness sync (#43)
    • QuoteCache (PR #39) with calendar-aware TTL (#44)
    • ProviderRegistry populated with TushareDailyProvider (#32) + AkshareSpotProvider (#33)
    • /price route refactored to use QuoteComposer (replaces direct core.data.schemas.get_price)
    • /quality/* endpoints (W5, separately)

Without coordination, both streams could:

  • Re-instantiate the same DuckDB writer differently → DuckDB locking conflicts
  • Disagree on whether AuditLog lives in app.state or as a global
  • Read settings from different env-var prefixes
  • Skip wiring AuditLog into Composer (defeating freshness sync)

This spec is the forward-looking integration document — when both streams land, lifespan should look like the design here.


2. Goals

  1. Single source of truth for lifespan startup order
  2. Pin shared resource decisions (which components share a DuckDB connection / writer)
  3. Define settings flow — env var → WarehouseSettings → component constructor
  4. Document failure modes during startup (what blocks vs what's optional)
  5. Order-of-merge guidance — which merge order produces a clean integration

3. Component dependency graph

                            ┌─────────────────┐
                            │ WarehouseSettings│
                            │ (pydantic-settings)│
                            └─────┬───────────┘
                                  │ (env / YAML)
              ┌───────────────────┼───────────────────┬─────────────────┐
              │                   │                   │                 │
              ▼                   ▼                   ▼                 ▼
        ┌──────────┐       ┌──────────────┐    ┌─────────────┐   ┌────────────┐
        │ Pool     │       │ Writer       │    │ AuditLog    │   │ Resolver   │
        │ (read)   │       │ (write+lock) │    │ + webhook   │   │ (trade_cal)│
        └────┬─────┘       └──────┬───────┘    └──┬──────────┘   └─────┬──────┘
             │                    │               │                    │
             │                    └───┬─────────┐ │                    │
             │                        │         │ │                    │
             │                        ▼         ▼ ▼                    ▼
             │                  ┌─────────┐  ┌──────────┐        ┌──────────────┐
             │                  │ Cache   │  │ Composer │←───────┤  ProviderRegistry
             │                  │ (TTL)   │  │ (ROUTES) │        │  + Tushare/Akshare
             │                  └────┬────┘  └────┬─────┘        └──────────────┘
             │                       │            │
             │                       │            │
             ▼                       ▼            ▼
       ┌─────────────┐         ┌───────────────────────┐
       │ /warehouse/*│         │  /price  /fundamentals│
       │ (read)      │         └───────────────────────┘
       └─────────────┘

                    ┌──────────────┐
                    │ Scheduler    │
                    │ (APScheduler)│   ← uses Writer + AuditLog
                    └──────┬───────┘


                    ┌────────────────┐
                    │ /admin/ingest/*│
                    └────────────────┘

Shared vs per-instance decisions

ComponentSharingRationale
WarehousePool (read)Singleton4-conn bounded pool; multiple readers serialize via queue
WarehouseWriter (write)SingletonDuckDB single-writer constraint; threading.Lock inside
AuditLogSingletonMulti-writer safe within process; DB-level isolation
MarketStateResolverSingletonIn-memory frozenset of trading days; reload on demand
QuoteCacheSingletonReuses DuckDB connections; market-aware TTL state
QuoteComposerSingletonStateless apart from injected deps
ProviderRegistryClass-level singletonAlready a classmethod registry (no instance)
SchedulerSingletonAPScheduler one per process

All singletons live on app.state.<name>. Routes acquire them via FastAPI dependency injection or direct app.state.X access.


4. Final lifespan code (target form)

python
# src/service/main.py — final form after W2.7 + W3 ingest both merge

from __future__ import annotations

from collections.abc import AsyncIterator
from contextlib import asynccontextmanager

from fastapi import Depends, FastAPI

from core.data.audit_log import AuditLog
from core.data.composer import QuoteComposer
from core.data.providers import ProviderRegistry
from core.data.providers._market_state import MarketStateResolver
from core.data.providers.akshare_spot import AkshareSpotProvider
from core.data.providers.tushare_daily import TushareDailyProvider
from core.data.quote_cache import QuoteCache

from service.auth import Principal, init_token_db, require_bearer
from service.config import get_settings
from service.db import WarehousePool, WarehouseWriter
from service.routes import (
    admin as admin_routes,
    fundamentals as fundamentals_routes,
    price as price_routes,
    quality as quality_routes,
    warehouse as warehouse_routes,
)
from service.scheduler import Scheduler


@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncIterator[None]:
    settings = get_settings()

    # Phase 1 — bearer token store (pre-existing v0.2.0; do NOT move).
    init_token_db(settings.bearer_db_path)

    # Phase 2 — DuckDB warehouse pool + writer (W1.3).
    app.state.warehouse_pool = WarehousePool(
        settings.warehouse_path,
        size=settings.duckdb_read_pool_size,
        memory_limit=settings.duckdb_read_memory_limit,
        threads=settings.duckdb_read_threads,
    )
    app.state.warehouse_writer = WarehouseWriter(
        settings.warehouse_path,
        memory_limit=settings.duckdb_write_memory_limit,
        threads=settings.duckdb_write_threads,
    )

    # Phase 3 — Market state from local trade_cal (W1.2).
    app.state.market_state = MarketStateResolver(
        settings.warehouse_path,
        exchange=settings.market_exchange,
    )

    # Phase 4 — Audit log (with optional webhook for critical diffs).
    app.state.audit_log = AuditLog(
        settings.warehouse_path,
        webhook_url=settings.alert_webhook_url,
    )

    # Phase 5 — Quote cache (market-aware TTL, calendar-aware via Resolver).
    app.state.quote_cache = QuoteCache(
        settings.warehouse_path,
        market_state=app.state.market_state,
    )

    # Phase 6 — Provider registry — register concrete providers.
    ProviderRegistry.register(
        TushareDailyProvider(settings.tushare_token.get_secret_value())
    )
    ProviderRegistry.register(AkshareSpotProvider())
    # WarehouseProvider lands in W6.

    # Phase 7 — Composer (orchestrator).
    app.state.composer = QuoteComposer(
        market_state=app.state.market_state,
        audit_log=app.state.audit_log,
    )

    # Phase 8 — Ingest scheduler (W3, from user PR #38).
    if settings.scheduler_enabled:
        app.state.scheduler = Scheduler(
            writer=app.state.warehouse_writer,
            audit_log=app.state.audit_log,
            market_state=app.state.market_state,
            settings=settings,
        )
        app.state.scheduler.start()

    yield

    # Teardown — reverse order.
    if hasattr(app.state, "scheduler"):
        app.state.scheduler.stop()
    app.state.warehouse_pool.close()
    app.state.warehouse_writer.close()
    ProviderRegistry.clear()


def create_app() -> FastAPI:
    app = FastAPI(title="twilight-drive", version="0.3.0", lifespan=lifespan)

    @app.get("/healthz")
    def healthz() -> dict[str, str]:
        return {"status": "ok"}

    @app.get("/whoami")
    def whoami(principal: Principal = Depends(require_bearer)) -> dict[str, str]:
        return {"user_id": principal.user_id, "plan": principal.plan}

    app.include_router(price_routes.router)
    app.include_router(fundamentals_routes.router)
    app.include_router(warehouse_routes.router)  # W3 (PR #38)
    app.include_router(admin_routes.router)      # W3 (PR #38)
    app.include_router(quality_routes.router)    # W5

    return app


app = create_app()

Phase ordering rationale

PhaseWhy this position
1 (token DB)Inherited from v0.2.0; routes' require_bearer depends on it; must be earliest
2 (Pool/Writer)All later DuckDB consumers (Resolver, AuditLog, Cache) reuse the same warehouse.duckdb file
3 (Resolver)Reads normalized_trade_cal from the warehouse (Phase 2's file)
4 (AuditLog)Writes provider_* tables to the warehouse
5 (Cache)Needs market_state from Phase 3
6 (Registry)Concrete providers wrap upstream APIs (Tushare/Akshare); no DB dep
7 (Composer)Needs registry (Phase 6) + market_state (Phase 3) + audit_log (Phase 4)
8 (Scheduler)Needs writer (Phase 2) + audit_log (Phase 4) + market_state (Phase 3)

5. Settings schema (target)

service.config.WarehouseSettings extends the v0.2.0 baseline:

python
class WarehouseSettings(BaseSettings):
    # v0.2.0 baseline (do NOT rename)
    bearer_db_path: Path
    tushare_token: SecretStr

    # W1.3 connection management (PR #26 already on main)
    warehouse_path: Path = Path("/data/warehouse.duckdb")
    duckdb_read_pool_size: int = 4
    duckdb_read_memory_limit: str = "256MB"
    duckdb_read_threads: int = 1
    duckdb_write_memory_limit: str = "512MB"
    duckdb_write_threads: int = 2

    # W1.2 market state (PR #30 already on main)
    market_exchange: str = "SSE"

    # W2 alt5 webhook (PR #46)
    alert_webhook_url: str | None = None

    # W3 scheduler (user PR #38)
    scheduler_enabled: bool = True
    backfill_auto_start: bool = False

    model_config = SettingsConfigDict(
        env_prefix="TWILIGHT_",
        env_file=".env",
    )

Env var map

Env varPurpose
TWILIGHT_BEARER_DB_PATHbearer SQLite (v0.2.0)
TWILIGHT_TUSHARE_TOKENTushare Pro token
TWILIGHT_WAREHOUSE_PATHDuckDB warehouse file
TWILIGHT_DUCKDB_READ_POOL_SIZEint default 4
TWILIGHT_DUCKDB_READ_MEMORY_LIMIT"256MB"
TWILIGHT_MARKET_EXCHANGE"SSE"
TWILIGHT_ALERT_WEBHOOK_URLoptional; empty → no webhook
TWILIGHT_SCHEDULER_ENABLED"true"/"false"
TWILIGHT_BACKFILL_AUTO_START"true"/"false"

YAML override: ${WAREHOUSE_DB_PATH}/config/runtime.yaml (matches deployment-toolset-rollout.md §5).


6. Failure modes during startup

What blocks startup vs what's tolerated:

FailurePhaseBehavior
Bearer DB unwriteable1Crash — auth is required for non-public routes
Warehouse DuckDB unwriteable2Crash — every later phase needs this file
Memory limit too high for host2DuckDB raises at connect time → crash
normalized_trade_cal missing rows3Resolver loads empty set; is_trading_day returns False everywhere; MarketState permanently CLOSED_FINAL. Composer routes still work; cache TTL fallback to "tomorrow 9:30". Continue with warning log.
Webhook URL unreachable4Just stored; not validated. First record_diff(critical) fires + swallows fail. Continue.
TUSHARE_TOKEN missing6TushareDailyProvider("") raises ValueError → crash. (Alternative: skip registration; but then Composer chains have no provider; better to fail loudly.)
akshare import fails6AkshareSpotProvider() constructor — currently no upstream call at construction (akshare is imported lazily inside get_spot_price). Continue — the provider registers; first INTRADAY_QUOTE call surfaces the import error as ProviderUnavailable.
Scheduler can't start8Log + skip if scheduler_enabled=true. Backend continues serving reads.

7. Route → component access pattern

RouteReads from
/healthznothing (lifespan-independent)
/whoamiauth.token_db (Phase 1)
/priceapp.state.composer (Phase 7)
/fundamentalsapp.state.composer (Phase 7) (or direct provider for now until W6)
/warehouse/dailyapp.state.warehouse_pool (Phase 2)
/warehouse/fundamentalsapp.state.warehouse_pool (Phase 2)
/admin/ingest/backfill/*app.state.scheduler (Phase 8)
/quality/freshnessapp.state.audit_log.freshness_snapshot() (Phase 4)
/quality/auditapp.state.audit_log.recent_audit()
/quality/diffapp.state.audit_log.recent_diffs()

Use FastAPI Depends(get_composer) style for testability:

python
def get_composer(request: Request) -> QuoteComposer:
    return request.app.state.composer

8. Order-of-merge guidance

Suggested merge sequence to minimize integration friction:

PHASE A — independent W2 alt + #38 (any order):
  Merge #41  W2 alt AuditLog
  Merge #46  W2 alt5 webhook (depends on #41 → auto-rebase to main)
  Merge #39  W2.4 QuoteCache
  Merge #44  W2 alt4 trade_cal-aware TTL (depends on #39 → auto-rebase)
  Merge #38  user W3 ingest scheduler

PHASE B — Composer chain rebases auto-clean after #41:
  Merge #42  W2 alt2 Composer ↔ AuditLog
  Merge #43  W2 alt3 freshness sync (depends on #42)
  Merge #40  W2.5+W2.6 conformance + regression (depends on #39)

PHASE C — T2.7 lifespan:
  Now main has all W2 + W3 components → cron writes T2.7 PR
  T2.7 wires the lifespan exactly as §4 describes
  No conflicts: just additive imports + lifespan body

PHASE D — W4+ ahead:
  /price route refactor to use Composer
  /fundamentals route refactor (W5)
  Hermes profile migration (skill scripts already use Service mode)

PHASE A and B order can be flipped (everything's mergeable independently once #41 lands first). PHASE C is the integration phase the spec describes.


9. Open questions

  1. Should /price route refactor land before or after T2.7?

    • Before: /price keeps the old core.data.schemas.get_price path until Composer is ready; T2.7 then changes the route to use Composer. Two PRs.
    • After: T2.7 wires lifespan AND refactors /price in one PR. Bigger diff but atomic.
    • Recommendation: after (atomic). Diff is ~150 lines including tests.
  2. Where does WarehouseProvider (W6) get instantiated? Phase 6, conditionally on settings.warehouse_provider_enabled. Add to settings when W6 lands.

  3. Scheduler vs request handler shared writer. Both use app.state.warehouse_writer. Locked internally — fine. Document this explicitly so no one adds a second writer instance.

  4. Test pattern for lifespan. TestClient triggers lifespan; need fixtures for tmp warehouse.duckdb, mocked tushare.call, etc. Spec for that is W2.7's PR responsibility.

  5. Reload of trade_cal. MarketStateResolver reloads on demand. Should the daily ingest job (W3) call app.state.market_state.reload() after updating normalized_trade_cal? Yes — add to user PR #38's daily batch end-of-job hook (or T2.7 adds this).

  6. Webhook URL validation. Should AuditLog validate the URL at construction (e.g., reject non-https://)? Currently no validation; relies on operator. Add later if mis-configurations cause issues.

  7. Graceful shutdown. Lifespan teardown calls pool.close() and writer.close(). Need to ensure no in-flight requests hold a connection from the pool. FastAPI's lifespan runs after all requests drain; this should be safe.


10. Out of scope

  • Implementation of any code (this is forward-looking docs only)
  • Changes to existing v0.2.0 routes (/price, /fundamentals) — those are T2.7's responsibility
  • W5 /quality/* HTTP endpoint internals (separate spec)
  • W6 WarehouseProvider registration logic (separate phase)
  • Multi-tenant lifespan (Phase 2 §B)

11. Acceptance criteria for T2.7 PR

When the cron writes T2.7, it should produce a src/service/main.py that:

  • [ ] Matches the §4 lifespan structure (8 phases, in order)
  • [ ] All settings from §5 are read via get_settings()
  • [ ] Failure modes per §6 are documented in code comments / log messages
  • [ ] Existing v0.2.0 tests for /whoami, /healthz, /price still pass (with /price refactored to use Composer, which gives same envelope shape via spec §9 — see open Q #1 about ordering)
  • [ ] No new env vars without TWILIGHT_ prefix
  • [ ] Routes from #38 (warehouse_routes, admin_routes) included
  • [ ] Tests cover lifespan startup/teardown happy path + at least 2 failure modes (missing token, missing trade_cal)

12. Cross-references

  • 01-data-layer.md §6.6.2 (Pool/Writer)
  • 01-data-layer.md §7 (scheduler)
  • 02-tushare-integration-design.md §4 (DataProvider ABC)
  • 02-tushare-integration-design.md §6 (Composer ROUTES)
  • 02-tushare-integration-design.md §7 (Cache TTL)
  • 02-tushare-integration-design.md §8 (AuditLog tables)
  • 02-tushare-integration-design.md §11 (webhook hook — implemented in PR #46)
  • 2026-05-08-deployment-toolset-rollout.md §5 (settings layering)
  • 2026-05-08-deployment-toolset-rollout.md §7 W2 T2.7 (the actual implementation)

In-flight PRs this spec assumes will merge:

PRTitleWhat it adds
#38W3 T1 warehouse ingest, scheduler, routesPhase 8 + warehouse routes
#39W2.4 QuoteCachePhase 5
#40W2.5+W2.6 conformance + regressiontests only
#41W2 alt AuditLogPhase 4 (basic)
#42W2 alt2 Composer ↔ AuditLogComposer accepts audit_log
#43W2 alt3 freshness syncComposer auto-syncs on each call
#44W2 alt4 trade_cal-aware TTLCache uses Resolver's next_trading_day
#45W7+ TDX cross-source specdocs only
#46W2 alt5 webhook hookPhase 4 (with webhook)

团队内部文档