Skip to content

Design: Warehouse Ingest Self-Healing

Date: 2026-05-13 Status: Draft Owner: liang

Problem

APScheduler daily_batch runs once at 18:30. Single HTTP timeout = permanent data gap for that day. No retry, no gap-fill, no host-level alerting. /quality/status returns raw timestamps without health status.

Design

Layer 1: Per-Job Retry (src/service/ingest/base.py)

IngestJob._fetch() gets 3-attempt exponential backoff (10s → 20s → 40s). Only retries on transient failures (HTTP errors, timeouts). Does not retry on Tushare auth errors or rate-limit 429s (those need longer backoff, handled by scheduler misfire_grace_time).

python
import time

class IngestJob:
    MAX_RETRIES = 3
    RETRY_DELAYS = [10, 20, 40]  # seconds

    def _fetch(self, trade_date):
        params = self._build_params(trade_date)
        last_err = None
        for attempt, delay in enumerate(self.RETRY_DELAYS):
            try:
                return tushare_call(
                    api_name=self.api_name,
                    params=params,
                    fields=self.fields,
                    token=self.token,
                )
            except Exception as e:
                last_err = e
                if attempt < len(self.RETRY_DELAYS) - 1:
                    logger.warning(
                        "%s attempt %d failed, retrying in %ds: %s",
                        self.api_name, attempt + 1, delay, e,
                    )
                    time.sleep(delay)
        raise RuntimeError(
            f"{self.api_name} failed after {self.MAX_RETRIES} retries: {last_err}"
        )

TradeCalIngest._fetch(year) and StockBasicIngest._fetch_status(status) get the same treatment by overriding in their respective subclasses.

Layer 2: Gap-Fill (src/service/scheduler.py)

_daily_batch() calls _backfill_gaps() before today's normal ingest. Finds trading days in the last 7 days where normalized_daily has no rows for that date. Backfills up to 3 gaps per run (avoids runaway on first boot or after long outage).

Gap detection SQL:

sql
SELECT c.cal_date
FROM normalized_trade_cal c
LEFT JOIN normalized_daily d ON c.cal_date = d.trade_date
WHERE c.exchange = 'SSE'
  AND c.is_open = 1
  AND c.cal_date >= CAST(? AS DATE)  -- cutoff date
  AND d.ts_code IS NULL              -- no daily rows for this date
GROUP BY c.cal_date
ORDER BY c.cal_date DESC
LIMIT ?                              -- max_gaps

Each gap day runs the same ingest jobs as normal. Upsert via PK ensures idempotency. Errors logged to source_freshness same as normal.

Layer 3: Host Watchdog (scripts/check_ingest.sh)

Shell script deployed to ECS host, run via cron at 20:00 Mon-Fri. Connects to DuckDB inside the container, checks:

  1. Latest normalized_daily date is within 3 days of today
  2. No source_freshness entry has last_failure > last_success (i.e., last attempt was a failure)
  3. Row count for latest trading day >= 4000 (warning threshold)

Exits 0 = OK, 1 = ALERT. Output appended to /home/twilight/logs/ingest_check.log. Can be wired to email/Slack alerting later.

Layer 4: /quality/status Health Field

The existing endpoint returns last_success, last_failure, rows_today. Add a computed status field per source:

ConditionStatus
last_success within 48h"healthy"
last_success 48h-7d"stale"
last_failure > last_success or last_success IS NULL"error"
Never ingested (both NULL)"never_ingested"

File Changes

FileChange
src/service/ingest/base.pyAdd retry loop to _fetch()
src/service/ingest/stock_basic.pyAdd retry to _fetch_status()
src/service/ingest/trade_cal.pyAdd retry to _fetch()
src/service/scheduler.pyAdd _backfill_gaps(), call from _daily_batch()
src/service/routes/warehouse.pyAdd status field to freshness response
src/service/main.pyAdd try/except around scheduler.start()
scripts/check_ingest.shNew — host watchdog script

Testing

  1. Unit: mock Tushare call to fail twice then succeed → retry works
  2. Unit: inject gap into normalized_trade_cal → gap-fill runs
  3. Integration: stop Tushare, run daily_batch → verify 3 retries, source_freshness.last_failure set
  4. Integration: manually delete a day from normalized_daily, run scheduler → gap-fill restores it
  5. Manual: run check_ingest.sh on ECS after deployment

Risks

  • Gap-fill on first boot after backfill: _backfill_gaps limits to 3 gaps, and uses normalized_daily PK upsert, so safe even if triggered on empty table.
  • Retry delay adds wall time: 3 retries × max 40s = 80s per failed API. Daily batch runs 3 APIs → worst case +240s. Still within 2h misfire_grace_time.
  • DuckDB single writer: Gap-fill runs same writer connection as normal ingest, no concurrency issue (all in same thread).

团队内部文档