主题
Twilight Drive Phase 2 — Infrastructure Hardening Plan
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Ship three infrastructure foundations: (A) centralized data warehouse with 4-layer DuckDB model, (B) git-backed profile replication with ownership, (C) New-API + SearXNG gateway for LLM key pooling and web search.
Source spec: docs/planning/superpowers/specs/2026-05-07-twilight-drive-phase2.md
Initiative A: Centralized Data Warehouse
Task A1: DuckDB 4-Layer Schemas
Files:
Create:
twilight-drive/src/data/warehouse/schemas.py[ ] Step 1: Define all 4 layers + quality + freshness tables
python
# src/data/warehouse/schemas.py
"""DuckDB schema definitions for the 4-layer data warehouse."""
DUCKDB_SCHEMAS = {
# Layer 0: Raw (append-only JSON blobs)
"raw_tushare_daily": """
CREATE TABLE IF NOT EXISTS raw_tushare_daily (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
code VARCHAR,
fetched_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
response_json JSON,
source VARCHAR DEFAULT 'tushare'
)
""",
# Layer 1: Normalized (unified schema)
"normalized_daily": """
CREATE TABLE IF NOT EXISTS normalized_daily (
code VARCHAR NOT NULL,
trade_date DATE NOT NULL,
open DOUBLE,
high DOUBLE,
low DOUBLE,
close DOUBLE,
volume BIGINT,
amount DOUBLE,
source VARCHAR,
ingested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (code, trade_date, source)
)
""",
# Layer 2: Curated (business logic applied)
"curated_daily": """
CREATE TABLE IF NOT EXISTS curated_daily (
code VARCHAR NOT NULL,
trade_date DATE NOT NULL,
open DOUBLE,
high DOUBLE,
low DOUBLE,
close DOUBLE,
volume BIGINT,
amount DOUBLE,
adj_factor DOUBLE,
PRIMARY KEY (code, trade_date)
)
""",
# Layer 3: Feature (derived metrics)
"feature_daily": """
CREATE TABLE IF NOT EXISTS feature_daily (
code VARCHAR NOT NULL,
trade_date DATE NOT NULL,
ma_5 DOUBLE,
ma_10 DOUBLE,
ma_20 DOUBLE,
ma_60 DOUBLE,
pe_ttm DOUBLE,
pb DOUBLE,
momentum_5d DOUBLE,
momentum_20d DOUBLE,
volatility_20d DOUBLE,
PRIMARY KEY (code, trade_date)
)
""",
# Quality log
"quality_log": """
CREATE TABLE IF NOT EXISTS quality_log (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
check_name VARCHAR,
code VARCHAR,
trade_date DATE,
severity VARCHAR,
message VARCHAR,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""",
# Source freshness
"source_freshness": """
CREATE TABLE IF NOT EXISTS source_freshness (
source VARCHAR PRIMARY KEY,
last_success TIMESTAMP,
last_failure TIMESTAMP,
error_message VARCHAR,
row_count_today INT DEFAULT 0
)
""",
}
def init_all(conn) -> None:
"""Create all tables if not exists."""
for name, ddl in DUCKDB_SCHEMAS.items():
conn.execute(ddl)Task A2: Ingestion Scheduler
Files:
Create:
twilight-drive/src/data/ingest/scheduler.pyCreate:
twilight-drive/src/data/ingest/tushare.py[ ] Step 1: Create the APScheduler setup
python
# src/data/ingest/scheduler.py
"""APScheduler-based daily batch ingestion."""
from apscheduler.schedulers.background import BackgroundScheduler
import structlog
logger = structlog.get_logger()
scheduler = BackgroundScheduler()
def setup_scheduler(warehouse):
"""Register all scheduled jobs."""
# Daily Tushare ingest (after market close 15:00 CST = 07:00 UTC)
scheduler.add_job(
ingest_tushare_daily,
'cron',
hour=7, minute=30,
args=[warehouse],
id='tushare_daily',
replace_existing=True,
)
# Quality checks (after ingest)
scheduler.add_job(
run_quality_checks,
'cron',
hour=8, minute=0,
args=[warehouse],
id='quality_checks',
replace_existing=True,
)
# Freshness check (every 6 hours)
scheduler.add_job(
check_source_freshness,
'interval',
hours=6,
args=[warehouse],
id='freshness_check',
replace_existing=True,
)
scheduler.start()
logger.info("scheduler_started")
async def ingest_tushare_daily(warehouse):
"""Pull all 5000 stocks from Tushare daily (1 API call)."""
# TODO: Implement Tushare daily batch pull
logger.info("ingest_tushare_daily_starting")
pass
def run_quality_checks(warehouse):
"""Run all data quality checks."""
# TODO: Implement
pass
def check_source_freshness(warehouse):
"""Check if all sources are fresh (<24h)."""
# TODO: Implement
passTask A3: Data Quality Checks
Files:
Create:
twilight-drive/src/data/quality/checks.py[ ] Step 1: Implement quality check framework
python
# src/data/quality/checks.py
"""Data quality checks for the warehouse."""
import structlog
logger = structlog.get_logger()
class QualityCheck:
def __init__(self, conn):
self.conn = conn
def log(self, check_name, code, trade_date, severity, message):
self.conn.execute(
"INSERT INTO quality_log (check_name, code, trade_date, severity, message) "
"VALUES (?, ?, ?, ?, ?)",
[check_name, code, str(trade_date), severity, message],
)
logger.info(
"quality_check",
check_name=check_name,
code=code,
severity=severity,
message=message,
)
def cross_source_agreement(self, code, trade_date, threshold=0.0001):
"""Check Tushare vs pytdx3 close for same (code, date)."""
row = self.conn.execute(
"""
SELECT
MAX(CASE WHEN source = 'tushare' THEN close END) as tushare_close,
MAX(CASE WHEN source = 'pytdx3' THEN close END) as pytdx3_close
FROM normalized_daily
WHERE code = ? AND trade_date = ?
""",
[code, str(trade_date)],
).fetchone()
t_close, p_close = row
if t_close is None or p_close is None:
return # Not both sources available
diff = abs(t_close - p_close) / t_close
if diff > threshold:
self.log("cross_source", code, trade_date, "warning",
f"Tushare={t_close}, pytdx3={p_close}, diff={diff:.4%}")
return diff
def anomaly_detection(self, threshold=0.20):
"""Flag price changes > 20% single day."""
rows = self.conn.execute(
"""
SELECT code, trade_date, close,
LAG(close) OVER (PARTITION BY code ORDER BY trade_date) as prev_close
FROM curated_daily
WHERE prev_close IS NOT NULL
AND ABS(close - prev_close) / prev_close > ?
""",
[threshold],
).fetchall()
for code, trade_date, close, prev_close in rows:
diff = abs(close - prev_close) / prev_close
self.log("anomaly", code, trade_date, "error",
f"Price change {diff:.1%}: {prev_close} → {close}")
def freshness_check(self, source, max_age_hours=24):
"""Check if source has data within max_age_hours."""
row = self.conn.execute(
"SELECT last_success FROM source_freshness WHERE source = ?",
[source],
).fetchone()
if row is None:
self.log("freshness", None, None, "error", f"No success recorded for {source}")
return False
from datetime import datetime, timezone
last = row[0]
age = (datetime.now(timezone.utc) - last).total_seconds() / 3600
if age > max_age_hours:
self.log("freshness", None, None, "warning",
f"{source} data is {age:.1f}h old (threshold: {max_age_hours}h)")
return False
return TrueTask A4: Warehouse API Routes
Files:
Create:
twilight-drive/src/service/routes/warehouse.py[ ] Step 1: Create warehouse read endpoints
python
# src/service/routes/warehouse.py
"""Warehouse read endpoints: /warehouse/*."""
from fastapi import APIRouter, Query
import duckdb
router = APIRouter()
@router.get("/warehouse/daily")
async def get_warehouse_daily(
code: str = Query(...),
start: str = Query(..., description="Start date YYYY-MM-DD"),
end: str = Query(..., description="End date YYYY-MM-DD"),
layer: str = Query("curated", description="Data layer: raw|normalized|curated|feature"),
):
"""Return daily data from the specified warehouse layer."""
# TODO: Read from DuckDB curated layer
return {"rows": [], "source": layer, "code": code}
@router.get("/warehouse/features")
async def get_warehouse_features(
code: str = Query(...),
date: str = Query(..., description="Date YYYY-MM-DD"),
):
"""Return feature metrics for a stock on a date."""
# TODO: Read from DuckDB feature layer
return {"code": code, "date": date, "features": {}}
@router.get("/quality/status")
async def get_quality_status():
"""Return data quality status."""
# TODO: Return latest quality check results
return {"sources": {}, "last_checks": []}
@router.get("/quality/logs")
async def get_quality_logs(
severity: str = Query(None, description="Filter by severity"),
):
"""Return quality check logs."""
# TODO: Query quality_log table
return []Task A5: Tests (Initiative A)
- [ ] Step 1: Write schema + ingest tests
python
# tests/test_warehouse.py
import pytest
import duckdb
from twilight_drive.data.warehouse.schemas import init_all
from twilight_drive.data.quality.checks import QualityCheck
class TestWarehouseSchemas:
def test_init_all_tables(self):
conn = duckdb.connect(":memory:")
init_all(conn)
# Verify tables exist
tables = conn.execute("SHOW TABLES").fetchall()
table_names = [t[0] for t in tables]
assert "raw_tushare_daily" in table_names
assert "normalized_daily" in table_names
assert "curated_daily" in table_names
assert "feature_daily" in table_names
assert "quality_log" in table_names
assert "source_freshness" in table_names
class TestQualityChecks:
def test_cross_source_agreement_within_threshold(self):
conn = duckdb.connect(":memory:")
init_all(conn)
# Insert test data
conn.execute("INSERT INTO normalized_daily VALUES ('600519.SH', '2026-04-30', 1800, 1860, 1790, 1850.00, 100000, 185000000, 'tushare', CURRENT_TIMESTAMP)")
conn.execute("INSERT INTO normalized_daily VALUES ('600519.SH', '2026-04-30', 1800, 1860, 1790, 1850.10, 100000, 185000000, 'pytdx3', CURRENT_TIMESTAMP)")
qc = QualityCheck(conn)
diff = qc.cross_source_agreement("600519.SH", "2026-04-30")
assert diff is not None
assert diff < 0.0001 # < 0.01%
def test_cross_source_disagreement_logs_warning(self):
conn = duckdb.connect(":memory:")
init_all(conn)
conn.execute("INSERT INTO normalized_daily VALUES ('600519.SH', '2026-04-30', 1800, 1860, 1790, 1850.00, 100000, 185000000, 'tushare', CURRENT_TIMESTAMP)")
conn.execute("INSERT INTO normalized_daily VALUES ('600519.SH', '2026-04-30', 1800, 1860, 1790, 1900.00, 100000, 190000000, 'pytdx3', CURRENT_TIMESTAMP)")
qc = QualityCheck(conn)
diff = qc.cross_source_agreement("600519.SH", "2026-04-30")
assert diff > 0.0001 # > 0.01%
# Verify warning logged
log = conn.execute(
"SELECT severity, message FROM quality_log WHERE check_name = 'cross_source'"
).fetchone()
assert log[0] == "warning"
def test_anomaly_detection_flags_20pct_change(self):
conn = duckdb.connect(":memory:")
init_all(conn)
conn.execute("INSERT INTO curated_daily VALUES ('600519.SH', '2026-04-29', 1000, 1050, 990, 1000.00, 50000, 50000000, 1.0)")
conn.execute("INSERT INTO curated_daily VALUES ('600519.SH', '2026-04-30', 800, 850, 790, 780.00, 80000, 62400000, 1.0)")
qc = QualityCheck(conn)
qc.anomaly_detection(threshold=0.20)
log = conn.execute(
"SELECT severity FROM quality_log WHERE check_name = 'anomaly'"
).fetchone()
assert log[0] == "error"Initiative B: Profile Replication & Ownership
Task B1: Bare Git Repo Setup
- [ ] Step 1: Initialize the profile registry
bash
# On Vultr (central registry)
mkdir -p /var/lib/hermes-profiles
cd /var/lib/hermes-profiles
git init --bare
# On Mac Mini (working copy)
git clone vultr:/var/lib/hermes-profiles ~/hermes-profiles-registry
cd ~/hermes-profiles-registry
# Create initial structure
mkdir -p profiles
touch profiles/.gitkeep
touch OWNERS
git add -A
git commit -m "init: profile registry structure"
git push origin mainTask B2: Profile Manager CLI
Files:
Create:
hermes-profile-manager/pyproject.tomlCreate:
hermes-profile-manager/cli.py[ ] Step 1: Create the CLI entry point
python
# hermes-profile-manager/cli.py
"""hermes-profile-manager CLI."""
import typer
from hermes_profile_manager import claim, push_pull, ownership, templates, sync
app = typer.Typer(help="Manage Hermes profiles with git-backed storage")
# --- Profile commands ---
profile_app = typer.Typer()
@profile_app.command()
def claim(name: str):
"""Claim ownership of a profile."""
claim.claim_profile(name)
@profile_app.command()
def push():
"""Push local profile changes to registry."""
push_pull.push()
@profile_app.command()
def pull():
"""Pull latest profile configs from registry."""
push_pull.pull()
@profile_app.command()
def sync():
"""Pull → merge → push (full sync)."""
sync.sync_all()
@profile_app.command("list")
def list_profiles():
"""List all profiles and ownership status."""
ownership.list_profiles()
@profile_app.command()
def show(name: str):
"""Show profile diff vs last commit."""
push_pull.show_diff(name)
app.add_typer(profile_app, name="profile")
# --- Template commands ---
template_app = typer.Typer()
@template_app.command("list")
def list_templates():
"""List available templates."""
templates.list_templates()
@template_app.command()
def create(name: str):
"""Create template from current profile."""
templates.create_template(name)
@template_app.command()
def apply(template: str, target: str):
"""Clone template into new profile."""
templates.apply_template(template, target)
app.add_typer(template_app, name="template")
# --- Ownership commands ---
ownership_app = typer.Typer()
@ownership_app.command("list")
def list_owners():
"""Show ownership registry."""
ownership.list_owners()
@ownership_app.command()
def transfer(profile: str, new_owner: str):
"""Transfer profile ownership."""
ownership.transfer(profile, new_owner)
app.add_typer(ownership_app, name="ownership")
if __name__ == "__main__":
app()Task B3: Ownership Model
Files:
Create:
hermes-profile-manager/ownership.py[ ] Step 1: Implement ownership tracking
python
# hermes-profile-manager/ownership.py
"""Profile ownership registry."""
import subprocess
from pathlib import Path
from datetime import date
OWNERS_FILE = "OWNERS"
def list_profiles():
"""List all profiles with ownership."""
registry = Path(__file__).resolve().parent.parent
owners = (registry / OWNERS_FILE).read_text()
print(owners)
def claim_profile(name: str, owner_id: str, verified_by: str):
"""Claim ownership of a profile."""
registry = Path(__file__).resolve().parent.parent
owners_file = registry / OWNERS_FILE
# Check if already owned
for line in owners_file.read_text().splitlines():
if line.startswith(f"{name} |"):
parts = [p.strip() for p in line.split("|")]
if parts[1] != owner_id:
print(f"Error: {name} is already owned by {parts[1]}")
return
print(f"{name} already owned by {owner_id}")
return
# Append ownership entry
with owners_file.open("a") as f:
f.write(f"{name} | {owner_id} | {verified_by} | {date.today()}\n")
# Git commit
subprocess.run(["git", "add", OWNERS_FILE], cwd=registry)
subprocess.run(["git", "commit", "-m", f"claim: {name} → {owner_id}"], cwd=registry)
subprocess.run(["git", "push"], cwd=registry)
print(f"Profile {name} claimed by {owner_id}")Task B4: Template System
Files:
Create:
hermes-profile-manager/templates.pyCreate:
profiles/template-stock-research-pro/config.yaml.template[ ] Step 1: Create the template structure
yaml
# profiles/template-stock-research-pro/config.yaml.template
model:
provider: openai_compatible
base_url: http://localhost:3000/v1
api_key: "{{SILICONFLOW_API_KEY}}"
model: Qwen/Qwen3.5-27B
fallbacks:
- provider: openai_compatible
base_url: https://api.openrouter.ai/api/v1
api_key: "{{OPENROUTER_API_KEY}}"
model: qwen/qwen-2.5-72b-instruct
platforms:
weixin:
enabled: true
account: "{{WEIXIN_ACCOUNT}}"
home_channel: "{{WEIXIN_HOME_CHANNEL}}"
personality: helpful
skills:
stock-research:
enabled: true
mode: service
service_url: "{{TWILIGHT_SERVICE_URL}}"
api_token: "{{TWILIGHT_API_TOKEN}}"
cron:
enabled: true
schedule_file: crontab.jsonInitiative C: Token Gateway — New-API + SearXNG
Task C1: Docker Compose for Gateway
Files:
Create:
twilight-drive/docker-compose.gateway.yml[ ] Step 1: Create docker compose file
yaml
version: "3.8"
services:
new-api:
image: calciumion/new-api:latest
container_name: new-api
restart: always
ports:
- "3000:3000"
environment:
- SQL_DSN=sqlite:///data/new-api.db
- REDIS_CONN_STRING=redis://redis:6379
- SYNC_FREQUENCY=60
volumes:
- /var/lib/twilight/new-api:/data
depends_on:
- redis
searxng:
image: searxng/searxng:latest
container_name: searxng
restart: always
ports:
- "8080:8080"
environment:
- SEARXNG_BASE_URL=http://localhost:8080/
- SEARXNG_SECRET=${SEARXNG_SECRET:-changeme}
- UWSGI_WORKERS=4
- UWSGI_THREADS=4
volumes:
- /var/lib/twilight/searxng:/etc/searxng:rw
cap_drop:
- ALL
cap_add:
- CHOWN
- SETGID
- SETUID
redis:
image: redis:7-alpine
container_name: searxng-redis
restart: always
command: redis-server --save 30 5 --maxmemory 256mb --maxmemory-policy allkeys-lru
volumes:
- /var/lib/twilight/redis:/dataTask C2: SearXNG Configuration
Files:
Create:
twilight-drive/searxng/settings.yml[ ] Step 1: Configure SearXNG engines
yaml
# searxng/settings.yml
use_default_settings: true
general:
debug: false
instance_name: "Twilight Search"
search:
safe_search: 0
autocomplete: "baidu"
default_lang: "zh-CN"
formats:
- html
- json
engines:
# Chinese engines
- name: baidu
engine: baidu
shortcut: bd
disabled: false
- name: sogou
engine: sogou
shortcut: sg
disabled: false
- name: 360search
engine: 360search
shortcut: so
disabled: false
# Global engines
- name: google
engine: google
shortcut: g
disabled: false
- name: duckduckgo
engine: duckduckgo
shortcut: ddg
disabled: false
- name: bing
engine: bing
shortcut: b
disabled: false
# Disable unnecessary engines
- name: wikipedia
disabled: true
- name: wikidata
disabled: true
server:
port: 8080
bind_address: "0.0.0.0"
secret_key: "${SEARXNG_SECRET}"
limiter: true
image_proxy: false
ui:
static_use_hash: true
outgoing:
request_timeout: 5.0
max_request_timeout: 10.0
useragent_suffix: ""
pool_connections: 100
pool_maxsize: 20Task C3: Migrate Hermes Profiles to New-API
- [ ] Step 1: Update stock-research-agent config
yaml
# ~/.hermes/profiles/stock-research-agent/config.yaml (modified)
model:
provider: openai_compatible
base_url: http://localhost:3000/v1 # New-API endpoint
api_key: new-api-shared-key # Single proxy key for all profiles
model: Qwen/Qwen3.5-27B- [ ] Step 2: Update main profile config
yaml
# ~/.hermes/config.yaml (modified)
model:
provider: openai_compatible
base_url: http://localhost:3000/v1
api_key: new-api-shared-key
model: qwen3.6-plusTask C4: Wire SearXNG into Twilight Search
Files:
Modify:
twilight-drive/src/service/routes/search.py[ ] Step 1: Update search route to use SearXNG
python
# src/service/routes/search.py (modified)
SEARXNG_URL = "http://localhost:8080/search"
async def searxng_search(query: str, max_results: int) -> dict:
async with httpx.AsyncClient(timeout=10.0) as client:
resp = await client.get(
SEARXNG_URL,
params={
"q": query,
"format": "json",
"language": "zh-CN",
"max_results": max_results,
},
)
resp.raise_for_status()
data = resp.json()
return {
"results": [
{
"title": r.get("title", ""),
"url": r.get("url", ""),
"snippet": r.get("content", ""),
"fetched_at": datetime.now(timezone.utc).isoformat(),
"cite": {
"kind": "tool",
"source": r.get("engine", "searxng"),
"served_by": "twilight-drive-backend",
"original_url": r.get("url", ""),
},
}
for r in data.get("results", [])[:max_results]
],
"source": "searxng",
}Combined Test Plan (Phase 2)
- [ ] Step 1: Write all Phase 2 tests (see spec §Test Plan for each initiative)
Deployment Checklist (Phase 2)
- [ ] Step 1: Deploy Initiative A (Weeks 6-7)
bash
cd twilight-drive
# Run daily ingest manually to verify
uv run python -c "from twilight_drive.data.ingest.scheduler import ingest_tushare_daily; ..."
# Run quality checks
uv run python -c "from twilight_drive.data.quality.checks import QualityCheck; ..."
# Verify warehouse API
curl http://localhost:8000/warehouse/daily?code=600519.SH&start=2026-01-01&end=2026-04-30- [ ] Step 2: Deploy Initiative B (Week 8)
bash
# Set up bare repo on Vultr
ssh vultr "mkdir -p /var/lib/hermes-profiles && cd /var/lib/hermes-profiles && git init --bare"
# Clone on Mac Mini
git clone vultr:/var/lib/hermes-profiles ~/hermes-profiles-registry
# Migrate existing profiles
cp -r ~/.hermes/profiles/* ~/hermes-profiles-registry/profiles/
cd ~/hermes-profiles-registry && git add -A && git commit -m "migrate profiles" && git push
# Install CLI
cd hermes-profile-manager && pip install -e .
# Test
hermes-profile-manager profile list
hermes-profile-manager profile claim stock-research-agent- [ ] Step 3: Deploy Initiative C (Week 9)
bash
# Deploy New-API + SearXNG
cd twilight-drive
docker compose -f docker-compose.gateway.yml up -d
# Verify New-API
curl http://localhost:3000/v1/chat/completions -H "Authorization: Bearer new-api-key" \
-H "Content-Type: application/json" \
-d '{"model": "Qwen/Qwen3.5-27B", "messages": [{"role": "user", "content": "hi"}]}'
# Verify SearXNG
curl "http://localhost:8080/search?q=600519+业绩&format=json&language=zh-CN"
# Migrate Hermes profiles (update base_url in config.yaml)
# Restart gateways
systemctl --user restart ai.hermes.gateway
systemctl --user restart ai.hermes.stock-research.gateway- [ ] Step 4: Full integration test (Week 10)
bash
# Test end-to-end: user query → SearXNG search → New-API LLM → cited response
curl -H "Authorization: Bearer $TOKEN" \
"http://localhost:8000/api/search?query=600519+最新业绩"Out of Scope (Explicitly Deferred)
- Real-time data streaming (P3)
- Multi-tenant Kubernetes orchestration
- Per-user dashboards (P2 content scope)
- Research report PDF ingestion (separate P2 spec)
- Stripe integration (P2 payment scope)