主题
Task 0 — 假设验证结果
Supplement to
2026-05-18-warehouse-quality-system-v3.md. Read before executing Task 1+.
Verified on: 2026-05-18, local dev environment, DuckDB 1.5.2, Python 3.11.
A0.1 — WarehouseWriter.upsert 行为 ✅
File: src/service/db.py:172-207
python
def upsert(self, table, columns, rows, conflict_cols):
...
if updates: # there ARE non-conflict columns
sql = f"INSERT INTO {table} ({col_list}) VALUES ({placeholders}) " \
f"ON CONFLICT {conflict_clause} DO UPDATE SET {updates}"
else: # all columns are conflict columns
sql = f"INSERT INTO {table} ({col_list}) VALUES ({placeholders}) " \
f"ON CONFLICT {conflict_clause} DO NOTHING"Result: DO UPDATE SET col = excluded.col for non-conflict columns. Falls back to DO NOTHING only if every column is a conflict column (rare). For all normalized tables (which have update_flag, source, ingested_at, raw_id as non-conflict columns), re-running the same ingest will refresh row values. Revised Tushare data (update_flag flip) propagates.
Implication for v3: Row-level idempotency confirmed. ingest_entity re-runs on same entity_id are safe.
A0.2 — DuckDB read-only attach 跨进程 ❌ FAIL
Test (cross-process while writer holds lock):
SAME-process read_only FAIL: ConnectionException Connection Error: Can't open
a connection to same database file with a different configuration than
existing connections
CROSS-process read_only FAIL: IOException IO Error: Could not set lock on
file "...duckdb": Conflicting lock is held in [other python pid]Result: DuckDB enforces an OS-level file lock. While twilight-data (writer) is running, no other process — same Python instance or separate — can open the file even in read_only=True mode. The WarehousePool docstring (src/service/db.py:36-48) already documents this limitation but only for same-process; cross-process is equally blocked.
Implication for v3: WarehouseReader(read_only=True) as designed in Task 8 is not viable. Must amend to one of:
- Snapshot copy (recommended) —
scripts/data/snapshot_warehouse.shruns during a brief writer-pause window (or relies on DuckDB'sEXPORT DATABASEmechanism), andWarehouseReaderopens the snapshot. - HTTP read endpoint on
twilight-data— monitor calls FastAPI routes that share the existing read pool. Heavier integration but no copy cost. - Pause-window read — monitor runs only when
twilight-datais stopped (defeats "read-only daemon" purpose).
Decision for v3 Phase 3+: option (1). Snapshot is consistent (atomic copy), monitor runs at any time, no live coupling. Cost: extra disk + brief writer block during copy. Add a Task in Phase 3.
A0.3 — ThreadPoolExecutor + shared WarehouseWriter ✅
File: src/service/db.py:120-207
WarehouseWriter.__init__ sets self._lock = threading.Lock(). Every write method (execute, executemany, query, upsert, insert_raw, close) wraps the underlying self._conn.execute(...) in with self._lock:. The cursor() method is the only path that does not lock — but it returns an independent cursor object and the comment notes DuckDB serializes execute calls on cursors of one connection via its own mutex.
Result: ThreadPool 2 workers sharing one WarehouseWriter instance is safe for writes. Concurrency benefit is bounded by lock contention — but correctness holds. The current _run_t3_sequential(workers=2) is not a data-race bug.
Implication for v3: run_backfill_for_spec(spec, workers=N) keeps existing semantics. No need to drop to workers=1. Document the throughput-vs-correctness trade-off in code comment.
A0.4/A0.5 — Existing ingest classes inventory
Full enumeration for Task 2 registration:
| File | Class | api_name | normalized_table | conflict_columns | grain | entity_type |
|---|---|---|---|---|---|---|
| daily.py | DailyIngest | daily | normalized_daily | (ts_code, trade_date, source) | per_date | trade_date |
| daily.py | DailyBasicIngest | daily_basic | normalized_daily_basic | (ts_code, trade_date, source) | per_date | trade_date |
| daily.py | AdjFactorIngest | adj_factor | normalized_adj_factor | (ts_code, trade_date, source) | per_date | trade_date |
| weekly.py | WeeklyIngest | weekly | normalized_weekly | (ts_code, trade_date, source) | per_entity | stock |
| weekly.py | MonthlyIngest | monthly | normalized_monthly | (ts_code, trade_date, source) | per_entity | stock |
| index_daily.py | IndexDailyIngest | index_daily | normalized_index_daily | (ts_code, trade_date, source) | per_entity | index |
| index_daily.py | IndexDailybasicIngest | index_dailybasic | normalized_index_dailybasic | (ts_code, trade_date, source) | per_entity | index |
| index_daily.py | IndexBasicIngest | index_basic | normalized_index_basic | tbd | once | — |
| index_daily.py | IndexWeightIngest | index_weight | normalized_index_weight | tbd | per_entity | index |
| financial.py | IncomeIngest | income | normalized_income | (ts_code, end_date, report_type, source) | per_entity | stock |
| financial.py | BalanceSheetIngest | balancesheet | normalized_balancesheet | (ts_code, end_date, report_type, source) | per_entity | stock |
| financial.py | CashflowIngest | cashflow | normalized_cashflow | (ts_code, end_date, report_type, source) | per_entity | stock |
| financial.py | FinaIndicatorIngest | fina_indicator | normalized_fina_indicator | (ts_code, end_date, source) | per_entity | stock |
| financial.py | ForecastIngest | forecast | normalized_forecast | (ts_code, end_date, ann_date, source) | per_entity | stock |
| financial.py | ExpressIngest | express | normalized_express | (ts_code, end_date, source) | per_entity | stock |
| trade_cal.py | TradeCalIngest | trade_cal | normalized_trade_cal | (exchange, cal_date, source) | per_date_year | year |
| stock_basic.py | StockBasicIngest | stock_basic | normalized_stock_basic | (ts_code, list_status, source) | once | — |
Implication for v3: 16 datasets to register in Task 2 (was assumed ~14). Add trade_cal and stock_basic registrations explicitly.
A0.6 — Schema directory location 📌
v3 plan referenced src/warehouse/sql/. Actual location: src/service/schema/.
Files:
01_raw_tables.sql02_normalized_tables.sql03_metadata_tables.sql— already containsbackfill_progress,quality_log,source_freshness04_views.sql05_indexes.sql
Implication for v3 Task 4: New quality tables go in src/service/schema/06_quality.sql (not src/warehouse/sql/05_quality.sql as plan states). Plan file path references need amendment everywhere.
A0.7 — backfill_progress schema mismatch 🚨
Schema (src/service/schema/03_metadata_tables.sql:28-37):
sql
CREATE TABLE IF NOT EXISTS backfill_progress (
api_name VARCHAR NOT NULL,
trade_date DATE NOT NULL,
status VARCHAR NOT NULL,
raw_id BIGINT,
attempts INTEGER NOT NULL DEFAULT 0,
last_error VARCHAR,
updated_at TIMESTAMP NOT NULL,
PRIMARY KEY (api_name, trade_date)
);PK is composite (api_name, trade_date), and trade_date is NOT NULL.
Script (scripts/data/backfill_all.py:117-128):
python
INSERT INTO backfill_progress (api_name, status, attempts, last_error, updated_at)
VALUES (?, ?, COALESCE((SELECT attempts FROM backfill_progress WHERE api_name=?), 0) + 1, ?, ?)
ON CONFLICT (api_name) DO UPDATE SET ...The INSERT does not provide trade_date → NOT NULL violation. The ON CONFLICT (api_name) clause names only one PK column → DuckDB requires conflict target match a unique constraint exactly.
Live behavior in production is one of:
- ECS DB schema has been altered to drop
trade_date NOT NULLor change PK - Inserts have been silently failing for T3 markers (which would explain the 600 phantom
t3:*rows: maybe they never actually persisted, and "done" was being judged from row presence in normalized tables — but_is_donequeriesbackfill_progress, so this doesn't add up)
Need to verify on ECS:
bash
docker exec twilight-data python -c "
import duckdb
c = duckdb.connect('/data/warehouse.duckdb', read_only=True)
print(c.execute('DESCRIBE backfill_progress').fetchdf())
print(c.execute(\"SELECT COUNT(*) FROM backfill_progress WHERE api_name LIKE 't3:%'\").fetchall())
"Implication for v3:
- Task 3 (progress key migration) must first reconcile the schema before migrating keys.
- Likely outcome: alter
backfill_progressto droptrade_date(or make it nullable) and switch PK to justapi_name. Add this as Task 3a: schema reconciliation, before key migration. - Alternative: keep schema, but make script supply a sentinel
trade_date(e.g.1970-01-01) for non-date-grain markers. Ugly. - Best fix: dual-key support —
api_name(TEXT, the canonical generic key) as standalone PK;trade_datebecomes a separate column or moved to a different table. Cleanest schema. Pick this if migration risk acceptable.
v3 plan amendments required
- Task 8 (WarehouseReader) — change from "read-only attach" to "snapshot-based reader". Add Task in Phase 3 for snapshot job.
- Task 4 schema file — change
src/warehouse/sql/05_quality.sql→src/service/schema/06_quality.sql. - Task 2 registry — add
trade_cal+stock_basicregistrations (16 datasets, not 14). - New Task 3a: backfill_progress schema reconciliation — drop
trade_datePK, switch toapi_nameas standalone PK. Migration script handles existing rows. - Concurrent workers documented — current
workers=2is correct under thethreading.Lock; no need to downgrade toworkers=1in Task 6.
These amendments are non-blocking for Task 1 (DatasetSpec abstraction is pure code, no schema dependency). Tasks 3/3a/4 will pick them up.