Skip to content

Twilight Drive Phase 2 — Infrastructure Hardening Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Ship three infrastructure foundations: (A) centralized data warehouse with 4-layer DuckDB model, (B) git-backed profile replication with ownership, (C) New-API + SearXNG gateway for LLM key pooling and web search.

Source spec: docs/planning/superpowers/specs/2026-05-07-twilight-drive-phase2.md


Initiative A: Centralized Data Warehouse

Task A1: DuckDB 4-Layer Schemas

Files:

  • Create: twilight-drive/src/data/warehouse/schemas.py

  • [ ] Step 1: Define all 4 layers + quality + freshness tables

python
# src/data/warehouse/schemas.py
"""DuckDB schema definitions for the 4-layer data warehouse."""

DUCKDB_SCHEMAS = {
    # Layer 0: Raw (append-only JSON blobs)
    "raw_tushare_daily": """
        CREATE TABLE IF NOT EXISTS raw_tushare_daily (
            id BIGINT AUTO_INCREMENT PRIMARY KEY,
            code VARCHAR,
            fetched_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            response_json JSON,
            source VARCHAR DEFAULT 'tushare'
        )
    """,
    
    # Layer 1: Normalized (unified schema)
    "normalized_daily": """
        CREATE TABLE IF NOT EXISTS normalized_daily (
            code VARCHAR NOT NULL,
            trade_date DATE NOT NULL,
            open DOUBLE,
            high DOUBLE,
            low DOUBLE,
            close DOUBLE,
            volume BIGINT,
            amount DOUBLE,
            source VARCHAR,
            ingested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            PRIMARY KEY (code, trade_date, source)
        )
    """,
    
    # Layer 2: Curated (business logic applied)
    "curated_daily": """
        CREATE TABLE IF NOT EXISTS curated_daily (
            code VARCHAR NOT NULL,
            trade_date DATE NOT NULL,
            open DOUBLE,
            high DOUBLE,
            low DOUBLE,
            close DOUBLE,
            volume BIGINT,
            amount DOUBLE,
            adj_factor DOUBLE,
            PRIMARY KEY (code, trade_date)
        )
    """,
    
    # Layer 3: Feature (derived metrics)
    "feature_daily": """
        CREATE TABLE IF NOT EXISTS feature_daily (
            code VARCHAR NOT NULL,
            trade_date DATE NOT NULL,
            ma_5 DOUBLE,
            ma_10 DOUBLE,
            ma_20 DOUBLE,
            ma_60 DOUBLE,
            pe_ttm DOUBLE,
            pb DOUBLE,
            momentum_5d DOUBLE,
            momentum_20d DOUBLE,
            volatility_20d DOUBLE,
            PRIMARY KEY (code, trade_date)
        )
    """,
    
    # Quality log
    "quality_log": """
        CREATE TABLE IF NOT EXISTS quality_log (
            id BIGINT AUTO_INCREMENT PRIMARY KEY,
            check_name VARCHAR,
            code VARCHAR,
            trade_date DATE,
            severity VARCHAR,
            message VARCHAR,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    """,
    
    # Source freshness
    "source_freshness": """
        CREATE TABLE IF NOT EXISTS source_freshness (
            source VARCHAR PRIMARY KEY,
            last_success TIMESTAMP,
            last_failure TIMESTAMP,
            error_message VARCHAR,
            row_count_today INT DEFAULT 0
        )
    """,
}


def init_all(conn) -> None:
    """Create all tables if not exists."""
    for name, ddl in DUCKDB_SCHEMAS.items():
        conn.execute(ddl)

Task A2: Ingestion Scheduler

Files:

  • Create: twilight-drive/src/data/ingest/scheduler.py

  • Create: twilight-drive/src/data/ingest/tushare.py

  • [ ] Step 1: Create the APScheduler setup

python
# src/data/ingest/scheduler.py
"""APScheduler-based daily batch ingestion."""

from apscheduler.schedulers.background import BackgroundScheduler
import structlog

logger = structlog.get_logger()

scheduler = BackgroundScheduler()


def setup_scheduler(warehouse):
    """Register all scheduled jobs."""
    
    # Daily Tushare ingest (after market close 15:00 CST = 07:00 UTC)
    scheduler.add_job(
        ingest_tushare_daily,
        'cron',
        hour=7, minute=30,
        args=[warehouse],
        id='tushare_daily',
        replace_existing=True,
    )
    
    # Quality checks (after ingest)
    scheduler.add_job(
        run_quality_checks,
        'cron',
        hour=8, minute=0,
        args=[warehouse],
        id='quality_checks',
        replace_existing=True,
    )
    
    # Freshness check (every 6 hours)
    scheduler.add_job(
        check_source_freshness,
        'interval',
        hours=6,
        args=[warehouse],
        id='freshness_check',
        replace_existing=True,
    )
    
    scheduler.start()
    logger.info("scheduler_started")


async def ingest_tushare_daily(warehouse):
    """Pull all 5000 stocks from Tushare daily (1 API call)."""
    # TODO: Implement Tushare daily batch pull
    logger.info("ingest_tushare_daily_starting")
    pass


def run_quality_checks(warehouse):
    """Run all data quality checks."""
    # TODO: Implement
    pass


def check_source_freshness(warehouse):
    """Check if all sources are fresh (<24h)."""
    # TODO: Implement
    pass

Task A3: Data Quality Checks

Files:

  • Create: twilight-drive/src/data/quality/checks.py

  • [ ] Step 1: Implement quality check framework

python
# src/data/quality/checks.py
"""Data quality checks for the warehouse."""

import structlog

logger = structlog.get_logger()


class QualityCheck:
    def __init__(self, conn):
        self.conn = conn

    def log(self, check_name, code, trade_date, severity, message):
        self.conn.execute(
            "INSERT INTO quality_log (check_name, code, trade_date, severity, message) "
            "VALUES (?, ?, ?, ?, ?)",
            [check_name, code, str(trade_date), severity, message],
        )
        logger.info(
            "quality_check",
            check_name=check_name,
            code=code,
            severity=severity,
            message=message,
        )

    def cross_source_agreement(self, code, trade_date, threshold=0.0001):
        """Check Tushare vs pytdx3 close for same (code, date)."""
        row = self.conn.execute(
            """
            SELECT 
                MAX(CASE WHEN source = 'tushare' THEN close END) as tushare_close,
                MAX(CASE WHEN source = 'pytdx3' THEN close END) as pytdx3_close
            FROM normalized_daily
            WHERE code = ? AND trade_date = ?
            """,
            [code, str(trade_date)],
        ).fetchone()
        
        t_close, p_close = row
        if t_close is None or p_close is None:
            return  # Not both sources available
        
        diff = abs(t_close - p_close) / t_close
        if diff > threshold:
            self.log("cross_source", code, trade_date, "warning",
                     f"Tushare={t_close}, pytdx3={p_close}, diff={diff:.4%}")
        return diff

    def anomaly_detection(self, threshold=0.20):
        """Flag price changes > 20% single day."""
        rows = self.conn.execute(
            """
            SELECT code, trade_date, close,
                   LAG(close) OVER (PARTITION BY code ORDER BY trade_date) as prev_close
            FROM curated_daily
            WHERE prev_close IS NOT NULL
            AND ABS(close - prev_close) / prev_close > ?
            """,
            [threshold],
        ).fetchall()
        
        for code, trade_date, close, prev_close in rows:
            diff = abs(close - prev_close) / prev_close
            self.log("anomaly", code, trade_date, "error",
                     f"Price change {diff:.1%}: {prev_close}{close}")

    def freshness_check(self, source, max_age_hours=24):
        """Check if source has data within max_age_hours."""
        row = self.conn.execute(
            "SELECT last_success FROM source_freshness WHERE source = ?",
            [source],
        ).fetchone()
        
        if row is None:
            self.log("freshness", None, None, "error", f"No success recorded for {source}")
            return False
        
        from datetime import datetime, timezone
        last = row[0]
        age = (datetime.now(timezone.utc) - last).total_seconds() / 3600
        if age > max_age_hours:
            self.log("freshness", None, None, "warning",
                     f"{source} data is {age:.1f}h old (threshold: {max_age_hours}h)")
            return False
        return True

Task A4: Warehouse API Routes

Files:

  • Create: twilight-drive/src/service/routes/warehouse.py

  • [ ] Step 1: Create warehouse read endpoints

python
# src/service/routes/warehouse.py
"""Warehouse read endpoints: /warehouse/*."""

from fastapi import APIRouter, Query
import duckdb

router = APIRouter()


@router.get("/warehouse/daily")
async def get_warehouse_daily(
    code: str = Query(...),
    start: str = Query(..., description="Start date YYYY-MM-DD"),
    end: str = Query(..., description="End date YYYY-MM-DD"),
    layer: str = Query("curated", description="Data layer: raw|normalized|curated|feature"),
):
    """Return daily data from the specified warehouse layer."""
    # TODO: Read from DuckDB curated layer
    return {"rows": [], "source": layer, "code": code}


@router.get("/warehouse/features")
async def get_warehouse_features(
    code: str = Query(...),
    date: str = Query(..., description="Date YYYY-MM-DD"),
):
    """Return feature metrics for a stock on a date."""
    # TODO: Read from DuckDB feature layer
    return {"code": code, "date": date, "features": {}}


@router.get("/quality/status")
async def get_quality_status():
    """Return data quality status."""
    # TODO: Return latest quality check results
    return {"sources": {}, "last_checks": []}


@router.get("/quality/logs")
async def get_quality_logs(
    severity: str = Query(None, description="Filter by severity"),
):
    """Return quality check logs."""
    # TODO: Query quality_log table
    return []

Task A5: Tests (Initiative A)

  • [ ] Step 1: Write schema + ingest tests
python
# tests/test_warehouse.py
import pytest
import duckdb
from twilight_drive.data.warehouse.schemas import init_all
from twilight_drive.data.quality.checks import QualityCheck


class TestWarehouseSchemas:
    def test_init_all_tables(self):
        conn = duckdb.connect(":memory:")
        init_all(conn)
        # Verify tables exist
        tables = conn.execute("SHOW TABLES").fetchall()
        table_names = [t[0] for t in tables]
        assert "raw_tushare_daily" in table_names
        assert "normalized_daily" in table_names
        assert "curated_daily" in table_names
        assert "feature_daily" in table_names
        assert "quality_log" in table_names
        assert "source_freshness" in table_names


class TestQualityChecks:
    def test_cross_source_agreement_within_threshold(self):
        conn = duckdb.connect(":memory:")
        init_all(conn)
        # Insert test data
        conn.execute("INSERT INTO normalized_daily VALUES ('600519.SH', '2026-04-30', 1800, 1860, 1790, 1850.00, 100000, 185000000, 'tushare', CURRENT_TIMESTAMP)")
        conn.execute("INSERT INTO normalized_daily VALUES ('600519.SH', '2026-04-30', 1800, 1860, 1790, 1850.10, 100000, 185000000, 'pytdx3', CURRENT_TIMESTAMP)")
        
        qc = QualityCheck(conn)
        diff = qc.cross_source_agreement("600519.SH", "2026-04-30")
        assert diff is not None
        assert diff < 0.0001  # < 0.01%

    def test_cross_source_disagreement_logs_warning(self):
        conn = duckdb.connect(":memory:")
        init_all(conn)
        conn.execute("INSERT INTO normalized_daily VALUES ('600519.SH', '2026-04-30', 1800, 1860, 1790, 1850.00, 100000, 185000000, 'tushare', CURRENT_TIMESTAMP)")
        conn.execute("INSERT INTO normalized_daily VALUES ('600519.SH', '2026-04-30', 1800, 1860, 1790, 1900.00, 100000, 190000000, 'pytdx3', CURRENT_TIMESTAMP)")
        
        qc = QualityCheck(conn)
        diff = qc.cross_source_agreement("600519.SH", "2026-04-30")
        assert diff > 0.0001  # > 0.01%
        
        # Verify warning logged
        log = conn.execute(
            "SELECT severity, message FROM quality_log WHERE check_name = 'cross_source'"
        ).fetchone()
        assert log[0] == "warning"

    def test_anomaly_detection_flags_20pct_change(self):
        conn = duckdb.connect(":memory:")
        init_all(conn)
        conn.execute("INSERT INTO curated_daily VALUES ('600519.SH', '2026-04-29', 1000, 1050, 990, 1000.00, 50000, 50000000, 1.0)")
        conn.execute("INSERT INTO curated_daily VALUES ('600519.SH', '2026-04-30', 800, 850, 790, 780.00, 80000, 62400000, 1.0)")
        
        qc = QualityCheck(conn)
        qc.anomaly_detection(threshold=0.20)
        
        log = conn.execute(
            "SELECT severity FROM quality_log WHERE check_name = 'anomaly'"
        ).fetchone()
        assert log[0] == "error"

Initiative B: Profile Replication & Ownership

Task B1: Bare Git Repo Setup

  • [ ] Step 1: Initialize the profile registry
bash
# On Vultr (central registry)
mkdir -p /var/lib/hermes-profiles
cd /var/lib/hermes-profiles
git init --bare

# On Mac Mini (working copy)
git clone vultr:/var/lib/hermes-profiles ~/hermes-profiles-registry
cd ~/hermes-profiles-registry

# Create initial structure
mkdir -p profiles
touch profiles/.gitkeep
touch OWNERS
git add -A
git commit -m "init: profile registry structure"
git push origin main

Task B2: Profile Manager CLI

Files:

  • Create: hermes-profile-manager/pyproject.toml

  • Create: hermes-profile-manager/cli.py

  • [ ] Step 1: Create the CLI entry point

python
# hermes-profile-manager/cli.py
"""hermes-profile-manager CLI."""

import typer
from hermes_profile_manager import claim, push_pull, ownership, templates, sync

app = typer.Typer(help="Manage Hermes profiles with git-backed storage")

# --- Profile commands ---
profile_app = typer.Typer()

@profile_app.command()
def claim(name: str):
    """Claim ownership of a profile."""
    claim.claim_profile(name)

@profile_app.command()
def push():
    """Push local profile changes to registry."""
    push_pull.push()

@profile_app.command()
def pull():
    """Pull latest profile configs from registry."""
    push_pull.pull()

@profile_app.command()
def sync():
    """Pull → merge → push (full sync)."""
    sync.sync_all()

@profile_app.command("list")
def list_profiles():
    """List all profiles and ownership status."""
    ownership.list_profiles()

@profile_app.command()
def show(name: str):
    """Show profile diff vs last commit."""
    push_pull.show_diff(name)

app.add_typer(profile_app, name="profile")

# --- Template commands ---
template_app = typer.Typer()

@template_app.command("list")
def list_templates():
    """List available templates."""
    templates.list_templates()

@template_app.command()
def create(name: str):
    """Create template from current profile."""
    templates.create_template(name)

@template_app.command()
def apply(template: str, target: str):
    """Clone template into new profile."""
    templates.apply_template(template, target)

app.add_typer(template_app, name="template")

# --- Ownership commands ---
ownership_app = typer.Typer()

@ownership_app.command("list")
def list_owners():
    """Show ownership registry."""
    ownership.list_owners()

@ownership_app.command()
def transfer(profile: str, new_owner: str):
    """Transfer profile ownership."""
    ownership.transfer(profile, new_owner)

app.add_typer(ownership_app, name="ownership")

if __name__ == "__main__":
    app()

Task B3: Ownership Model

Files:

  • Create: hermes-profile-manager/ownership.py

  • [ ] Step 1: Implement ownership tracking

python
# hermes-profile-manager/ownership.py
"""Profile ownership registry."""

import subprocess
from pathlib import Path
from datetime import date

OWNERS_FILE = "OWNERS"


def list_profiles():
    """List all profiles with ownership."""
    registry = Path(__file__).resolve().parent.parent
    owners = (registry / OWNERS_FILE).read_text()
    print(owners)


def claim_profile(name: str, owner_id: str, verified_by: str):
    """Claim ownership of a profile."""
    registry = Path(__file__).resolve().parent.parent
    owners_file = registry / OWNERS_FILE
    
    # Check if already owned
    for line in owners_file.read_text().splitlines():
        if line.startswith(f"{name} |"):
            parts = [p.strip() for p in line.split("|")]
            if parts[1] != owner_id:
                print(f"Error: {name} is already owned by {parts[1]}")
                return
            print(f"{name} already owned by {owner_id}")
            return
    
    # Append ownership entry
    with owners_file.open("a") as f:
        f.write(f"{name} | {owner_id} | {verified_by} | {date.today()}\n")
    
    # Git commit
    subprocess.run(["git", "add", OWNERS_FILE], cwd=registry)
    subprocess.run(["git", "commit", "-m", f"claim: {name}{owner_id}"], cwd=registry)
    subprocess.run(["git", "push"], cwd=registry)
    
    print(f"Profile {name} claimed by {owner_id}")

Task B4: Template System

Files:

  • Create: hermes-profile-manager/templates.py

  • Create: profiles/template-stock-research-pro/config.yaml.template

  • [ ] Step 1: Create the template structure

yaml
# profiles/template-stock-research-pro/config.yaml.template
model:
  provider: openai_compatible
  base_url: http://localhost:3000/v1
  api_key: "{{SILICONFLOW_API_KEY}}"
  model: Qwen/Qwen3.5-27B
  fallbacks:
    - provider: openai_compatible
      base_url: https://api.openrouter.ai/api/v1
      api_key: "{{OPENROUTER_API_KEY}}"
      model: qwen/qwen-2.5-72b-instruct

platforms:
  weixin:
    enabled: true
    account: "{{WEIXIN_ACCOUNT}}"
    home_channel: "{{WEIXIN_HOME_CHANNEL}}"
    personality: helpful

skills:
  stock-research:
    enabled: true
    mode: service
    service_url: "{{TWILIGHT_SERVICE_URL}}"
    api_token: "{{TWILIGHT_API_TOKEN}}"

cron:
  enabled: true
  schedule_file: crontab.json

Initiative C: Token Gateway — New-API + SearXNG

Task C1: Docker Compose for Gateway

Files:

  • Create: twilight-drive/docker-compose.gateway.yml

  • [ ] Step 1: Create docker compose file

yaml
version: "3.8"

services:
  new-api:
    image: calciumion/new-api:latest
    container_name: new-api
    restart: always
    ports:
      - "3000:3000"
    environment:
      - SQL_DSN=sqlite:///data/new-api.db
      - REDIS_CONN_STRING=redis://redis:6379
      - SYNC_FREQUENCY=60
    volumes:
      - /var/lib/twilight/new-api:/data
    depends_on:
      - redis

  searxng:
    image: searxng/searxng:latest
    container_name: searxng
    restart: always
    ports:
      - "8080:8080"
    environment:
      - SEARXNG_BASE_URL=http://localhost:8080/
      - SEARXNG_SECRET=${SEARXNG_SECRET:-changeme}
      - UWSGI_WORKERS=4
      - UWSGI_THREADS=4
    volumes:
      - /var/lib/twilight/searxng:/etc/searxng:rw
    cap_drop:
      - ALL
    cap_add:
      - CHOWN
      - SETGID
      - SETUID

  redis:
    image: redis:7-alpine
    container_name: searxng-redis
    restart: always
    command: redis-server --save 30 5 --maxmemory 256mb --maxmemory-policy allkeys-lru
    volumes:
      - /var/lib/twilight/redis:/data

Task C2: SearXNG Configuration

Files:

  • Create: twilight-drive/searxng/settings.yml

  • [ ] Step 1: Configure SearXNG engines

yaml
# searxng/settings.yml
use_default_settings: true

general:
  debug: false
  instance_name: "Twilight Search"

search:
  safe_search: 0
  autocomplete: "baidu"
  default_lang: "zh-CN"
  formats:
    - html
    - json

engines:
  # Chinese engines
  - name: baidu
    engine: baidu
    shortcut: bd
    disabled: false
  
  - name: sogou
    engine: sogou
    shortcut: sg
    disabled: false
  
  - name: 360search
    engine: 360search
    shortcut: so
    disabled: false

  # Global engines
  - name: google
    engine: google
    shortcut: g
    disabled: false
  
  - name: duckduckgo
    engine: duckduckgo
    shortcut: ddg
    disabled: false
  
  - name: bing
    engine: bing
    shortcut: b
    disabled: false

  # Disable unnecessary engines
  - name: wikipedia
    disabled: true
  - name: wikidata
    disabled: true

server:
  port: 8080
  bind_address: "0.0.0.0"
  secret_key: "${SEARXNG_SECRET}"
  limiter: true
  image_proxy: false

ui:
  static_use_hash: true

outgoing:
  request_timeout: 5.0
  max_request_timeout: 10.0
  useragent_suffix: ""
  pool_connections: 100
  pool_maxsize: 20

Task C3: Migrate Hermes Profiles to New-API

  • [ ] Step 1: Update stock-research-agent config
yaml
# ~/.hermes/profiles/stock-research-agent/config.yaml (modified)
model:
  provider: openai_compatible
  base_url: http://localhost:3000/v1   # New-API endpoint
  api_key: new-api-shared-key          # Single proxy key for all profiles
  model: Qwen/Qwen3.5-27B
  • [ ] Step 2: Update main profile config
yaml
# ~/.hermes/config.yaml (modified)
model:
  provider: openai_compatible
  base_url: http://localhost:3000/v1
  api_key: new-api-shared-key
  model: qwen3.6-plus

Files:

  • Modify: twilight-drive/src/service/routes/search.py

  • [ ] Step 1: Update search route to use SearXNG

python
# src/service/routes/search.py (modified)
SEARXNG_URL = "http://localhost:8080/search"


async def searxng_search(query: str, max_results: int) -> dict:
    async with httpx.AsyncClient(timeout=10.0) as client:
        resp = await client.get(
            SEARXNG_URL,
            params={
                "q": query,
                "format": "json",
                "language": "zh-CN",
                "max_results": max_results,
            },
        )
        resp.raise_for_status()
        data = resp.json()
    
    return {
        "results": [
            {
                "title": r.get("title", ""),
                "url": r.get("url", ""),
                "snippet": r.get("content", ""),
                "fetched_at": datetime.now(timezone.utc).isoformat(),
                "cite": {
                    "kind": "tool",
                    "source": r.get("engine", "searxng"),
                    "served_by": "twilight-drive-backend",
                    "original_url": r.get("url", ""),
                },
            }
            for r in data.get("results", [])[:max_results]
        ],
        "source": "searxng",
    }

Combined Test Plan (Phase 2)

  • [ ] Step 1: Write all Phase 2 tests (see spec §Test Plan for each initiative)

Deployment Checklist (Phase 2)

  • [ ] Step 1: Deploy Initiative A (Weeks 6-7)
bash
cd twilight-drive
# Run daily ingest manually to verify
uv run python -c "from twilight_drive.data.ingest.scheduler import ingest_tushare_daily; ..."
# Run quality checks
uv run python -c "from twilight_drive.data.quality.checks import QualityCheck; ..."
# Verify warehouse API
curl http://localhost:8000/warehouse/daily?code=600519.SH&start=2026-01-01&end=2026-04-30
  • [ ] Step 2: Deploy Initiative B (Week 8)
bash
# Set up bare repo on Vultr
ssh vultr "mkdir -p /var/lib/hermes-profiles && cd /var/lib/hermes-profiles && git init --bare"

# Clone on Mac Mini
git clone vultr:/var/lib/hermes-profiles ~/hermes-profiles-registry

# Migrate existing profiles
cp -r ~/.hermes/profiles/* ~/hermes-profiles-registry/profiles/
cd ~/hermes-profiles-registry && git add -A && git commit -m "migrate profiles" && git push

# Install CLI
cd hermes-profile-manager && pip install -e .

# Test
hermes-profile-manager profile list
hermes-profile-manager profile claim stock-research-agent
  • [ ] Step 3: Deploy Initiative C (Week 9)
bash
# Deploy New-API + SearXNG
cd twilight-drive
docker compose -f docker-compose.gateway.yml up -d

# Verify New-API
curl http://localhost:3000/v1/chat/completions -H "Authorization: Bearer new-api-key" \
  -H "Content-Type: application/json" \
  -d '{"model": "Qwen/Qwen3.5-27B", "messages": [{"role": "user", "content": "hi"}]}'

# Verify SearXNG
curl "http://localhost:8080/search?q=600519+业绩&format=json&language=zh-CN"

# Migrate Hermes profiles (update base_url in config.yaml)
# Restart gateways
systemctl --user restart ai.hermes.gateway
systemctl --user restart ai.hermes.stock-research.gateway
  • [ ] Step 4: Full integration test (Week 10)
bash
# Test end-to-end: user query → SearXNG search → New-API LLM → cited response
curl -H "Authorization: Bearer $TOKEN" \
  "http://localhost:8000/api/search?query=600519+最新业绩"

Out of Scope (Explicitly Deferred)

  • Real-time data streaming (P3)
  • Multi-tenant Kubernetes orchestration
  • Per-user dashboards (P2 content scope)
  • Research report PDF ingestion (separate P2 spec)
  • Stripe integration (P2 payment scope)

团队内部文档