主题
Phase A+B: Data Pipeline Expansion Design
Status: Draft Date: 2026-05-14 Author: liang
Overview
Phase A: Deploy existing T2a/T2b code + fix operational issues (logging, DuckDB read access) Phase B: Add T3 financial statement tables (6 tables) + 8-hour refresh cycle
Phase A: T2a/T2b Deployment + Operational Fixes
A1: Merge PR #64 (T2a/T2b code)
Current state: Code on branch feat/v0.4.0-t2a-t2b-warehouse, commit 3cb53ef, not merged to main.
Contents:
- 6 new normalized tables:
normalized_weekly,normalized_monthly,normalized_index_daily,normalized_index_dailybasic,normalized_index_basic,normalized_index_weight - 6 corresponding raw tables
- Ingest modules:
weekly.py,monthly.py,index_daily.py(4 classes) - Scheduler: 4 new jobs (weekly_refresh Mon 03:00, monthly_refresh_price 1st 02:30, index_daily daily 19:00, index_monthly 1st 03:00)
- MCP proxy expanded: 5→11 warehouse-backed methods
- Scripts:
backfill_all.py,incremental_update.py,check_ingest.sh - Retry: 3-attempt exponential backoff (10s→20s→40s) in base.py
Action: Merge to main via PR.
A2: Fix Python Logging to Docker stdout
Problem: logger.info() in scheduler modules don't appear in docker logs. Only uvicorn access logs visible.
Root cause: src/service/main.py configures logger objects but never calls logging.basicConfig() to route to stdout.
Fix: In lifespan startup, after creating logger instances, call:
python
if not logging.getLogger().hasHandlers():
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter("%(asctime)s [%(levelname)s] %(name)s: %(message)s"))
logging.getLogger().addHandler(handler)
logging.getLogger().setLevel(logging.INFO)A3: Build, Push, Deploy Image
- Merge PR → main
docker build -f deploy/Dockerfile . -t ghcr.io/lacatfly/twilight-drive:v0.4.0-t2- Push to GHCR (using keychain entry
ghcr.io-twilight-drive) - SSH to ECS:
docker pull+docker compose restart
A4: Run T2a/T2b Backfill
bash
docker exec twilight-backend python scripts/data/backfill_all.py --tiers t2a- weekly: ~22 min (all stocks, full history)
- monthly: ~22 min
- index_daily + index_dailybasic + index_basic + index_weight: ~1 min
Physical sort after completion (backfill script handles this).
A5: MCP Direct DuckDB Read
Current state: MCP proxy (mcp/tushare_mcp/proxy.py) already has DuckDB-first logic with _WAREHOUSE_METHODS set containing 11 entries. Volume mount ../../data:/data already configured.
What to verify: read_only=True connection doesn't conflict with backend's exclusive write lock. DuckDB documentation confirms: multiple processes can open the same database in read_only mode simultaneously — no lock conflict.
No code changes needed. Just verify after deploy that MCP can read all 11 tables.
Phase B: T3 Financial Statement Tables
B1: Schema — 6 New Tables
All follow existing pattern: PK upsert + source, ingested_at, raw_id metadata columns.
normalized_income
- PK:
ts_code + end_date + report_type - Fields: ~30 fields from Tushare
incomeAPI (revenue, cost, profit, tax, etc.) - Report types: 1=original, 2=quarterly sum, 4=consolidated
normalized_balancesheet
- PK:
ts_code + end_date + report_type - Fields: ~60 fields (assets, liabilities, equity breakdowns)
normalized_cashflow
- PK:
ts_code + end_date + report_type - Fields: ~30 fields (operating/investing/financing cash flows)
normalized_fina_indicator
- PK:
ts_code + end_date - Fields: ~50 fields (profitability, solvency, efficiency ratios)
normalized_forecast
- PK:
ts_code + end_date + ann_date - Fields: performance forecasts with prediction ranges
normalized_express
- PK:
ts_code + end_date - Fields: performance express reports
normalized_disclosure_date (scheduler trigger table)
- PK:
ts_code + end_date - Fields: appointment_date, actual_date (for scheduling T3 ingestion)
Corresponding raw_income, raw_balancesheet, raw_disclosure_date, etc. added to 01_raw_tables.sql.
B2: Ingest Modules
New files in src/service/ingest/:
financial.py— 6 classes:IncomeIngest,BalanceSheetIngest,CashflowIngest,FinaIndicatorIngest,ForecastIngest,ExpressIngest
Fetch pattern: Each class takes (ts_code, period) params, calls corresponding Tushare API, upserts into normalized table.
Rate limiting: time.sleep(0.1) between calls to stay under 500/min.
B3: Scheduler — 8-Hour Refresh Cycle
Job: t3_financial_refresh — runs every 8 hours (02:00, 10:00, 18:00)
Logic:
- Call
disclosure_dateAPI to get stocks with new disclosures in last 8 hours - If no disclosures found → skip silently (non-window period)
- For each stock → fetch all 6 T3 tables, upsert
- Log count of stocks processed
Fallback job: t3_quarterly_fullscan — runs on quarter-end dates (4/30, 8/31, 10/31, next 4/30) at 02:00. Scans all stocks to catch missed disclosures.
B4: Backfill — Historical Financial Data
backfill_all.py extended with --tiers t3 flag:
Estimated time: ~5-6 hours worst case (all 6 tables × 5500 stocks × 4 quarters × 8 years, sequential). With disclosure_date filter — only stocks that actually reported are fetched, reducing calls by ~70%. backfill_progress table tracks (ts_code, period, table) tuples for resume on interruption.
B5: MCP Proxy Expansion
Add 6 entries to _WAREHOUSE_METHODS:
| Tushare API | DuckDB Query | Notes |
|---|---|---|
income | SELECT * FROM normalized_income WHERE ts_code=? AND end_date=? | report_type filter optional |
balancesheet | SELECT * FROM normalized_balancesheet WHERE ts_code=? AND end_date=? | |
cashflow | SELECT * FROM normalized_cashflow WHERE ts_code=? AND end_date=? | |
fina_indicator | SELECT * FROM normalized_fina_indicator WHERE ts_code=? AND end_date=? | |
forecast | SELECT * FROM normalized_forecast WHERE ts_code=? AND end_date=? | |
express | SELECT * FROM normalized_express WHERE ts_code=? AND end_date=? |
All with live HTTP API fallback if warehouse miss.
B6: Schema Files
src/service/schema/01_raw_tables.sql— add 6 new raw_* tablessrc/service/schema/02_normalized_tables.sql— add 6 new normalized_* tables- Update
03_views.sqlif needed (no view changes expected for T3)
Dependencies
Phase A:
A1 (merge) → A3 (deploy) → A4 (backfill T2) → A5 (verify MCP)
A2 (logging fix) → included in A3 build
Phase B:
B1 (schema) → B2 (ingest) → B3 (scheduler) → B4 (backfill) → B5 (MCP)Files Changed
Phase A:
src/service/main.py— logging fixscripts/data/backfill_all.py— minor tweaks if neededdeploy/compose.yml— no changes (volume mount already correct)
Phase B:
src/service/schema/01_raw_tables.sql— +6 tablessrc/service/schema/02_normalized_tables.sql— +6 tablessrc/service/ingest/financial.py— new file, 6 classessrc/service/ingest/__init__.py— export new classessrc/service/scheduler.py— +2 jobs (8h refresh, quarterly fullscan)mcp/tushare_mcp/proxy.py— +6 warehouse methodsscripts/data/backfill_all.py— add T3 tier supportsrc/service/main.py— lifespan scheduler registration for new jobs
Risks
- DuckDB single-writer constraint — backfill runs inside container with backend stopped (or scheduler paused) to avoid write conflicts
- Tushare rate limits — T3 backfill is sequential, long-running. Add
--resumeflag to continue from last checkpoint - Disk space — 6 new tables, 8 years data, ~5500 stocks. Estimate ~2-3GB additional warehouse growth. Current 40GB disk has 19GB free — sufficient