Skip to content

Task 0 — 假设验证结果

Supplement to 2026-05-18-warehouse-quality-system-v3.md. Read before executing Task 1+.

Verified on: 2026-05-18, local dev environment, DuckDB 1.5.2, Python 3.11.


A0.1 — WarehouseWriter.upsert 行为 ✅

File: src/service/db.py:172-207

python
def upsert(self, table, columns, rows, conflict_cols):
    ...
    if updates:  # there ARE non-conflict columns
        sql = f"INSERT INTO {table} ({col_list}) VALUES ({placeholders}) " \
              f"ON CONFLICT {conflict_clause} DO UPDATE SET {updates}"
    else:        # all columns are conflict columns
        sql = f"INSERT INTO {table} ({col_list}) VALUES ({placeholders}) " \
              f"ON CONFLICT {conflict_clause} DO NOTHING"

Result: DO UPDATE SET col = excluded.col for non-conflict columns. Falls back to DO NOTHING only if every column is a conflict column (rare). For all normalized tables (which have update_flag, source, ingested_at, raw_id as non-conflict columns), re-running the same ingest will refresh row values. Revised Tushare data (update_flag flip) propagates.

Implication for v3: Row-level idempotency confirmed. ingest_entity re-runs on same entity_id are safe.


A0.2 — DuckDB read-only attach 跨进程 ❌ FAIL

Test (cross-process while writer holds lock):

SAME-process read_only FAIL: ConnectionException Connection Error: Can't open
  a connection to same database file with a different configuration than
  existing connections

CROSS-process read_only FAIL: IOException IO Error: Could not set lock on
  file "...duckdb": Conflicting lock is held in [other python pid]

Result: DuckDB enforces an OS-level file lock. While twilight-data (writer) is running, no other process — same Python instance or separate — can open the file even in read_only=True mode. The WarehousePool docstring (src/service/db.py:36-48) already documents this limitation but only for same-process; cross-process is equally blocked.

Implication for v3: WarehouseReader(read_only=True) as designed in Task 8 is not viable. Must amend to one of:

  1. Snapshot copy (recommended) — scripts/data/snapshot_warehouse.sh runs during a brief writer-pause window (or relies on DuckDB's EXPORT DATABASE mechanism), and WarehouseReader opens the snapshot.
  2. HTTP read endpoint on twilight-data — monitor calls FastAPI routes that share the existing read pool. Heavier integration but no copy cost.
  3. Pause-window read — monitor runs only when twilight-data is stopped (defeats "read-only daemon" purpose).

Decision for v3 Phase 3+: option (1). Snapshot is consistent (atomic copy), monitor runs at any time, no live coupling. Cost: extra disk + brief writer block during copy. Add a Task in Phase 3.


A0.3 — ThreadPoolExecutor + shared WarehouseWriter

File: src/service/db.py:120-207

WarehouseWriter.__init__ sets self._lock = threading.Lock(). Every write method (execute, executemany, query, upsert, insert_raw, close) wraps the underlying self._conn.execute(...) in with self._lock:. The cursor() method is the only path that does not lock — but it returns an independent cursor object and the comment notes DuckDB serializes execute calls on cursors of one connection via its own mutex.

Result: ThreadPool 2 workers sharing one WarehouseWriter instance is safe for writes. Concurrency benefit is bounded by lock contention — but correctness holds. The current _run_t3_sequential(workers=2) is not a data-race bug.

Implication for v3: run_backfill_for_spec(spec, workers=N) keeps existing semantics. No need to drop to workers=1. Document the throughput-vs-correctness trade-off in code comment.


A0.4/A0.5 — Existing ingest classes inventory

Full enumeration for Task 2 registration:

FileClassapi_namenormalized_tableconflict_columnsgrainentity_type
daily.pyDailyIngestdailynormalized_daily(ts_code, trade_date, source)per_datetrade_date
daily.pyDailyBasicIngestdaily_basicnormalized_daily_basic(ts_code, trade_date, source)per_datetrade_date
daily.pyAdjFactorIngestadj_factornormalized_adj_factor(ts_code, trade_date, source)per_datetrade_date
weekly.pyWeeklyIngestweeklynormalized_weekly(ts_code, trade_date, source)per_entitystock
weekly.pyMonthlyIngestmonthlynormalized_monthly(ts_code, trade_date, source)per_entitystock
index_daily.pyIndexDailyIngestindex_dailynormalized_index_daily(ts_code, trade_date, source)per_entityindex
index_daily.pyIndexDailybasicIngestindex_dailybasicnormalized_index_dailybasic(ts_code, trade_date, source)per_entityindex
index_daily.pyIndexBasicIngestindex_basicnormalized_index_basictbdonce
index_daily.pyIndexWeightIngestindex_weightnormalized_index_weighttbdper_entityindex
financial.pyIncomeIngestincomenormalized_income(ts_code, end_date, report_type, source)per_entitystock
financial.pyBalanceSheetIngestbalancesheetnormalized_balancesheet(ts_code, end_date, report_type, source)per_entitystock
financial.pyCashflowIngestcashflownormalized_cashflow(ts_code, end_date, report_type, source)per_entitystock
financial.pyFinaIndicatorIngestfina_indicatornormalized_fina_indicator(ts_code, end_date, source)per_entitystock
financial.pyForecastIngestforecastnormalized_forecast(ts_code, end_date, ann_date, source)per_entitystock
financial.pyExpressIngestexpressnormalized_express(ts_code, end_date, source)per_entitystock
trade_cal.pyTradeCalIngesttrade_calnormalized_trade_cal(exchange, cal_date, source)per_date_yearyear
stock_basic.pyStockBasicIngeststock_basicnormalized_stock_basic(ts_code, list_status, source)once

Implication for v3: 16 datasets to register in Task 2 (was assumed ~14). Add trade_cal and stock_basic registrations explicitly.


A0.6 — Schema directory location 📌

v3 plan referenced src/warehouse/sql/. Actual location: src/service/schema/.

Files:

  • 01_raw_tables.sql
  • 02_normalized_tables.sql
  • 03_metadata_tables.sql — already contains backfill_progress, quality_log, source_freshness
  • 04_views.sql
  • 05_indexes.sql

Implication for v3 Task 4: New quality tables go in src/service/schema/06_quality.sql (not src/warehouse/sql/05_quality.sql as plan states). Plan file path references need amendment everywhere.


A0.7 — backfill_progress schema mismatch 🚨

Schema (src/service/schema/03_metadata_tables.sql:28-37):

sql
CREATE TABLE IF NOT EXISTS backfill_progress (
  api_name    VARCHAR      NOT NULL,
  trade_date  DATE         NOT NULL,
  status      VARCHAR      NOT NULL,
  raw_id      BIGINT,
  attempts    INTEGER      NOT NULL DEFAULT 0,
  last_error  VARCHAR,
  updated_at  TIMESTAMP    NOT NULL,
  PRIMARY KEY (api_name, trade_date)
);

PK is composite (api_name, trade_date), and trade_date is NOT NULL.

Script (scripts/data/backfill_all.py:117-128):

python
INSERT INTO backfill_progress (api_name, status, attempts, last_error, updated_at)
VALUES (?, ?, COALESCE((SELECT attempts FROM backfill_progress WHERE api_name=?), 0) + 1, ?, ?)
ON CONFLICT (api_name) DO UPDATE SET ...

The INSERT does not provide trade_date → NOT NULL violation. The ON CONFLICT (api_name) clause names only one PK column → DuckDB requires conflict target match a unique constraint exactly.

Live behavior in production is one of:

  • ECS DB schema has been altered to drop trade_date NOT NULL or change PK
  • Inserts have been silently failing for T3 markers (which would explain the 600 phantom t3:* rows: maybe they never actually persisted, and "done" was being judged from row presence in normalized tables — but _is_done queries backfill_progress, so this doesn't add up)

Need to verify on ECS:

bash
docker exec twilight-data python -c "
import duckdb
c = duckdb.connect('/data/warehouse.duckdb', read_only=True)
print(c.execute('DESCRIBE backfill_progress').fetchdf())
print(c.execute(\"SELECT COUNT(*) FROM backfill_progress WHERE api_name LIKE 't3:%'\").fetchall())
"

Implication for v3:

  • Task 3 (progress key migration) must first reconcile the schema before migrating keys.
  • Likely outcome: alter backfill_progress to drop trade_date (or make it nullable) and switch PK to just api_name. Add this as Task 3a: schema reconciliation, before key migration.
  • Alternative: keep schema, but make script supply a sentinel trade_date (e.g. 1970-01-01) for non-date-grain markers. Ugly.
  • Best fix: dual-key support — api_name (TEXT, the canonical generic key) as standalone PK; trade_date becomes a separate column or moved to a different table. Cleanest schema. Pick this if migration risk acceptable.

v3 plan amendments required

  1. Task 8 (WarehouseReader) — change from "read-only attach" to "snapshot-based reader". Add Task in Phase 3 for snapshot job.
  2. Task 4 schema file — change src/warehouse/sql/05_quality.sqlsrc/service/schema/06_quality.sql.
  3. Task 2 registry — add trade_cal + stock_basic registrations (16 datasets, not 14).
  4. New Task 3a: backfill_progress schema reconciliation — drop trade_date PK, switch to api_name as standalone PK. Migration script handles existing rows.
  5. Concurrent workers documented — current workers=2 is correct under the threading.Lock; no need to downgrade to workers=1 in Task 6.

These amendments are non-blocking for Task 1 (DatasetSpec abstraction is pure code, no schema dependency). Tasks 3/3a/4 will pick them up.

团队内部文档