Skip to content

Phase A+B: Data Pipeline Expansion Design

Status: Draft Date: 2026-05-14 Author: liang


Overview

Phase A: Deploy existing T2a/T2b code + fix operational issues (logging, DuckDB read access) Phase B: Add T3 financial statement tables (6 tables) + 8-hour refresh cycle


Phase A: T2a/T2b Deployment + Operational Fixes

A1: Merge PR #64 (T2a/T2b code)

Current state: Code on branch feat/v0.4.0-t2a-t2b-warehouse, commit 3cb53ef, not merged to main.

Contents:

  • 6 new normalized tables: normalized_weekly, normalized_monthly, normalized_index_daily, normalized_index_dailybasic, normalized_index_basic, normalized_index_weight
  • 6 corresponding raw tables
  • Ingest modules: weekly.py, monthly.py, index_daily.py (4 classes)
  • Scheduler: 4 new jobs (weekly_refresh Mon 03:00, monthly_refresh_price 1st 02:30, index_daily daily 19:00, index_monthly 1st 03:00)
  • MCP proxy expanded: 5→11 warehouse-backed methods
  • Scripts: backfill_all.py, incremental_update.py, check_ingest.sh
  • Retry: 3-attempt exponential backoff (10s→20s→40s) in base.py

Action: Merge to main via PR.

A2: Fix Python Logging to Docker stdout

Problem: logger.info() in scheduler modules don't appear in docker logs. Only uvicorn access logs visible.

Root cause: src/service/main.py configures logger objects but never calls logging.basicConfig() to route to stdout.

Fix: In lifespan startup, after creating logger instances, call:

python
if not logging.getLogger().hasHandlers():
    handler = logging.StreamHandler(sys.stdout)
    handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s"))
    logging.getLogger().addHandler(handler)
    logging.getLogger().setLevel(logging.INFO)

A3: Build, Push, Deploy Image

  1. Merge PR → main
  2. docker build -f deploy/Dockerfile . -t ghcr.io/lacatfly/twilight-drive:v0.4.0-t2
  3. Push to GHCR (using keychain entry ghcr.io-twilight-drive)
  4. SSH to ECS: docker pull + docker compose restart

A4: Run T2a/T2b Backfill

bash
docker exec twilight-backend python scripts/data/backfill_all.py --tiers t2a
  • weekly: ~22 min (all stocks, full history)
  • monthly: ~22 min
  • index_daily + index_dailybasic + index_basic + index_weight: ~1 min

Physical sort after completion (backfill script handles this).

A5: MCP Direct DuckDB Read

Current state: MCP proxy (mcp/tushare_mcp/proxy.py) already has DuckDB-first logic with _WAREHOUSE_METHODS set containing 11 entries. Volume mount ../../data:/data already configured.

What to verify: read_only=True connection doesn't conflict with backend's exclusive write lock. DuckDB documentation confirms: multiple processes can open the same database in read_only mode simultaneously — no lock conflict.

No code changes needed. Just verify after deploy that MCP can read all 11 tables.


Phase B: T3 Financial Statement Tables

B1: Schema — 6 New Tables

All follow existing pattern: PK upsert + source, ingested_at, raw_id metadata columns.

normalized_income

  • PK: ts_code + end_date + report_type
  • Fields: ~30 fields from Tushare income API (revenue, cost, profit, tax, etc.)
  • Report types: 1=original, 2=quarterly sum, 4=consolidated

normalized_balancesheet

  • PK: ts_code + end_date + report_type
  • Fields: ~60 fields (assets, liabilities, equity breakdowns)

normalized_cashflow

  • PK: ts_code + end_date + report_type
  • Fields: ~30 fields (operating/investing/financing cash flows)

normalized_fina_indicator

  • PK: ts_code + end_date
  • Fields: ~50 fields (profitability, solvency, efficiency ratios)

normalized_forecast

  • PK: ts_code + end_date + ann_date
  • Fields: performance forecasts with prediction ranges

normalized_express

  • PK: ts_code + end_date
  • Fields: performance express reports

normalized_disclosure_date (scheduler trigger table)

  • PK: ts_code + end_date
  • Fields: appointment_date, actual_date (for scheduling T3 ingestion)

Corresponding raw_income, raw_balancesheet, raw_disclosure_date, etc. added to 01_raw_tables.sql.

B2: Ingest Modules

New files in src/service/ingest/:

  • financial.py — 6 classes: IncomeIngest, BalanceSheetIngest, CashflowIngest, FinaIndicatorIngest, ForecastIngest, ExpressIngest

Fetch pattern: Each class takes (ts_code, period) params, calls corresponding Tushare API, upserts into normalized table.

Rate limiting: time.sleep(0.1) between calls to stay under 500/min.

B3: Scheduler — 8-Hour Refresh Cycle

Job: t3_financial_refresh — runs every 8 hours (02:00, 10:00, 18:00)

Logic:

  1. Call disclosure_date API to get stocks with new disclosures in last 8 hours
  2. If no disclosures found → skip silently (non-window period)
  3. For each stock → fetch all 6 T3 tables, upsert
  4. Log count of stocks processed

Fallback job: t3_quarterly_fullscan — runs on quarter-end dates (4/30, 8/31, 10/31, next 4/30) at 02:00. Scans all stocks to catch missed disclosures.

B4: Backfill — Historical Financial Data

backfill_all.py extended with --tiers t3 flag:

Estimated time: ~5-6 hours worst case (all 6 tables × 5500 stocks × 4 quarters × 8 years, sequential). With disclosure_date filter — only stocks that actually reported are fetched, reducing calls by ~70%. backfill_progress table tracks (ts_code, period, table) tuples for resume on interruption.

B5: MCP Proxy Expansion

Add 6 entries to _WAREHOUSE_METHODS:

Tushare APIDuckDB QueryNotes
incomeSELECT * FROM normalized_income WHERE ts_code=? AND end_date=?report_type filter optional
balancesheetSELECT * FROM normalized_balancesheet WHERE ts_code=? AND end_date=?
cashflowSELECT * FROM normalized_cashflow WHERE ts_code=? AND end_date=?
fina_indicatorSELECT * FROM normalized_fina_indicator WHERE ts_code=? AND end_date=?
forecastSELECT * FROM normalized_forecast WHERE ts_code=? AND end_date=?
expressSELECT * FROM normalized_express WHERE ts_code=? AND end_date=?

All with live HTTP API fallback if warehouse miss.

B6: Schema Files

  • src/service/schema/01_raw_tables.sql — add 6 new raw_* tables
  • src/service/schema/02_normalized_tables.sql — add 6 new normalized_* tables
  • Update 03_views.sql if needed (no view changes expected for T3)

Dependencies

Phase A:
  A1 (merge) → A3 (deploy) → A4 (backfill T2) → A5 (verify MCP)
  A2 (logging fix) → included in A3 build

Phase B:
  B1 (schema) → B2 (ingest) → B3 (scheduler) → B4 (backfill) → B5 (MCP)

Files Changed

Phase A:

  • src/service/main.py — logging fix
  • scripts/data/backfill_all.py — minor tweaks if needed
  • deploy/compose.yml — no changes (volume mount already correct)

Phase B:

  • src/service/schema/01_raw_tables.sql — +6 tables
  • src/service/schema/02_normalized_tables.sql — +6 tables
  • src/service/ingest/financial.py — new file, 6 classes
  • src/service/ingest/__init__.py — export new classes
  • src/service/scheduler.py — +2 jobs (8h refresh, quarterly fullscan)
  • mcp/tushare_mcp/proxy.py — +6 warehouse methods
  • scripts/data/backfill_all.py — add T3 tier support
  • src/service/main.py — lifespan scheduler registration for new jobs

Risks

  1. DuckDB single-writer constraint — backfill runs inside container with backend stopped (or scheduler paused) to avoid write conflicts
  2. Tushare rate limits — T3 backfill is sequential, long-running. Add --resume flag to continue from last checkpoint
  3. Disk space — 6 new tables, 8 years data, ~5500 stocks. Estimate ~2-3GB additional warehouse growth. Current 40GB disk has 19GB free — sufficient

团队内部文档