主题
Twilight Drive Phase 1 — Implementation Plan
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Ship a paid A-share research bot on WeChat in four milestones: P1.0 (FastAPI + DuckDB + auth), P1.1 (payment + provisioning), P1.2 (multi-source data), P1.3 (web search proxy).
Architecture: FastAPI on Vultr with DuckDB cache, bearer token auth, WeChat Pay for payment, Python provisioner for Hermes profile cloning, Vue SPA for landing page.
Source spec: docs/planning/superpowers/specs/2026-05-07-twilight-drive-phase1.md
Task 1: Project Scaffold for P1.0 Service
Files:
Create:
twilight-drive/src/service/__init__.pyCreate:
twilight-drive/src/service/main.pyCreate:
twilight-drive/src/service/config.pyCreate:
twilight-drive/src/service/auth.pyCreate:
twilight-drive/pyproject.toml(modify existing)[ ] Step 1: Create the FastAPI app skeleton
python
# src/service/main.py
"""Twilight Drive P1.0 — FastAPI backend for stock research data."""
from contextlib import asynccontextmanager
import structlog
from fastapi import FastAPI
from twilight_drive.service.config import Settings
from twilight_drive.service.routes import healthz, price, fundamentals
from twilight_drive.service.auth import AuthMiddleware
logger = structlog.get_logger()
def create_app() -> FastAPI:
settings = Settings()
app = FastAPI(
title="Twilight Drive API",
version="0.2.0",
docs_url=None, # no swagger in prod
redoc_url=None,
)
# Middleware: bearer token auth
app.add_middleware(AuthMiddleware, settings=settings)
# Routes
app.include_router(healthz.router)
app.include_router(price.router, prefix="/api")
app.include_router(fundamentals.router, prefix="/api")
return app
app = create_app()- [ ] Step 2: Create settings module
python
# src/service/config.py
from pathlib import Path
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
"""Environment-driven config for the service."""
# Serving
host: str = "0.0.0.0"
port: int = 8000
version: str = "0.2.0"
# Tushare
tushare_token: str
# DuckDB
duckdb_path: Path = Path("/var/lib/twilight/cache.duckdb")
# Auth (SQLite for P1.0, upgrade to Postgres in P1.1)
auth_db_path: Path = Path("/var/lib/twilight/auth.db")
model_config = {"env_prefix": "TWILIGHT_", "env_file": ".env"}- [ ] Step 3: Create bearer token auth middleware
python
# src/service/auth.py
"""Bearer token authentication middleware.
Tokens are stored as sha256 hashes in a SQLite database.
Middleware checks Authorization header, validates token,
sets request.state.user_id for downstream handlers.
"""
import hashlib
import sqlite3
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import JSONResponse
import structlog
logger = structlog.get_logger()
class AuthMiddleware(BaseHTTPMiddleware):
def __init__(self, app, settings):
super().__init__(app)
self.settings = settings
self._init_db()
def _init_db(self):
"""Create auth table if not exists."""
conn = sqlite3.connect(self.settings.auth_db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS api_keys (
key_hash TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
scope TEXT DEFAULT 'data:read',
revoked_at TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
conn.commit()
conn.close()
async def dispatch(self, request: Request, call_next):
# Skip auth for healthz
if request.url.path == "/healthz":
return await call_next(request)
auth_header = request.headers.get("authorization")
if not auth_header or not auth_header.startswith("Bearer "):
return JSONResponse(status_code=401, content={"detail": "Missing bearer token"})
token = auth_header.split(" ", 1)[1]
key_hash = hashlib.sha256(token.encode()).hexdigest()
conn = sqlite3.connect(self.settings.auth_db_path)
cursor = conn.execute(
"SELECT user_id, scope, revoked_at FROM api_keys WHERE key_hash = ?",
(key_hash,),
)
row = cursor.fetchone()
conn.close()
if row is None:
return JSONResponse(status_code=401, content={"detail": "Invalid token"})
user_id, scope, revoked_at = row
if revoked_at is not None:
return JSONResponse(status_code=403, content={"detail": "Token revoked"})
request.state.user_id = user_id
request.state.scope = scope
logger.debug("auth_success", user_id=user_id, path=request.url.path)
return await call_next(request)- [ ] Step 4: Run the smoke test
bash
cd twilight-drive
uv run uvicorn src.service.main:app --host 0.0.0.0 --port 8000
curl http://localhost:8000/healthz
# Expected: {"ok": true, "version": "0.2.0"}
curl http://localhost:8000/api/price?code=600519.SH
# Expected: 401 {"detail": "Missing bearer token"}- [ ] Step 5: Commit
bash
git add src/service/ pyproject.toml
git commit -m "feat(p1.0): scaffold FastAPI service with bearer token auth"Task 2: DuckDB Cache + /price Route
Files:
Create:
twilight-drive/src/service/cache.pyCreate:
twilight-drive/src/service/routes/__init__.pyCreate:
twilight-drive/src/service/routes/price.py[ ] Step 1: Create DuckDB cache layer
python
# src/service/cache.py
"""DuckDB cache for daily price data.
Schema:
daily_cache(code, trade_date, close, fetched_at)
Cache key: (code, trade_date)
TTL: 24 hours (86400 seconds)
"""
import duckdb
from datetime import datetime, timezone
from pathlib import Path
class PriceCache:
def __init__(self, db_path: Path):
db_path.parent.mkdir(parents=True, exist_ok=True)
self.conn = duckdb.connect(str(db_path))
self._init_table()
def _init_table(self):
self.conn.execute("""
CREATE TABLE IF NOT EXISTS daily_cache (
code VARCHAR,
trade_date VARCHAR,
close DOUBLE,
fetched_at TIMESTAMP,
PRIMARY KEY (code, trade_date)
)
""")
def get(self, code: str, trade_date: str) -> dict | None:
"""Return cached row or None."""
result = self.conn.execute(
"SELECT code, trade_date, close, fetched_at FROM daily_cache "
"WHERE code = ? AND trade_date = ?",
[code, trade_date],
).fetchone()
if result is None:
return None
return {
"code": result[0],
"trade_date": result[1],
"close": result[2],
"fetched_at": result[3].isoformat(),
}
def put(self, code: str, trade_date: str, close: float):
"""Insert or replace cache entry."""
self.conn.execute(
"INSERT OR REPLACE INTO daily_cache (code, trade_date, close, fetched_at) "
"VALUES (?, ?, ?, ?)",
[code, trade_date, close, datetime.now(timezone.utc)],
)
def is_fresh(self, code: str, trade_date: str, max_age_seconds: int = 86400) -> bool:
"""Check if cached entry is within TTL."""
result = self.conn.execute(
"SELECT fetched_at FROM daily_cache WHERE code = ? AND trade_date = ?",
[code, trade_date],
).fetchone()
if result is None:
return False
fetched = result[0]
age = (datetime.now(timezone.utc) - fetched).total_seconds()
return age < max_age_seconds- [ ] Step 2: Create
/priceroute with cache → Tushare fallback
python
# src/service/routes/price.py
"""GET /api/price — return closing price with cite envelope."""
from datetime import datetime, timezone
import httpx
import structlog
from fastapi import APIRouter, Query, Request
from pydantic import BaseModel
from twilight_drive.service.cache import PriceCache
from twilight_drive.service.config import Settings
logger = structlog.get_logger()
router = APIRouter()
TUSHARE_API = "https://tushare.pro/api"
class Cite(BaseModel):
kind: str = "tool"
source: str
served_by: str
served_version: str
table: str
fetched_at: str
cache_age_seconds: int
tool_call_id: str
class PriceResponse(BaseModel):
value: float
metric: str
code: str
as_of: str
cite: Cite
# Module-level cache singleton (initialized on first route access)
_cache: PriceCache | None = None
def get_cache(settings: Settings) -> PriceCache:
global _cache
if _cache is None:
_cache = PriceCache(settings.duckdb_path)
return _cache
def _generate_tool_call_id() -> str:
import secrets
return f"tc_{secrets.token_hex(8)}"
@router.get("/price", response_model=PriceResponse)
async def get_price(
request: Request,
code: str = Query(..., description="Stock code, e.g. 600519.SH"),
trade_date: str = Query(None, description="Trade date YYYYMMDD. Defaults to latest."),
):
settings: Settings = request.app.state.settings
cache = get_cache(settings)
# Resolve trade_date to today if not provided
if trade_date is None:
trade_date = datetime.now(timezone.utc).strftime("%Y%m%d")
# Check cache first
cached = cache.get(code, trade_date)
if cached and cache.is_fresh(code, trade_date):
age = int(
(datetime.now(timezone.utc) - datetime.fromisoformat(cached["fetched_at"])).total_seconds()
)
logger.info("price_cache_hit", code=code, trade_date=trade_date)
return PriceResponse(
value=cached["close"],
metric="close",
code=code,
as_of=trade_date,
cite=Cite(
source="tushare",
served_by="twilight-drive-backend",
served_version=settings.version,
table="daily_cache",
fetched_at=cached["fetched_at"],
cache_age_seconds=age,
tool_call_id=_generate_tool_call_id(),
),
)
# Cache miss → fetch from Tushare
logger.info("price_cache_miss", code=code, trade_date=trade_date)
async with httpx.AsyncClient(timeout=15.0) as client:
resp = await client.post(
TUSHARE_API,
json={
"api_name": "daily",
"token": settings.tushare_token,
"params": {"ts_code": code, "trade_date": trade_date},
},
)
resp.raise_for_status()
data = resp.json()
# Extract close from Tushare response
# Tushare daily response: {data: {items: [[date, open, high, low, close, ...]], fields: [...]}}
items = data.get("data", {}).get("items", [])
if not items:
from fastapi import HTTPException
raise HTTPException(status_code=404, detail=f"No data for {code} on {trade_date}")
fields = data["data"]["fields"]
close_idx = fields.index("close")
close = items[0][close_idx]
# Write to cache
cache.put(code, trade_date, close)
fetched_at = datetime.now(timezone.utc).isoformat()
return PriceResponse(
value=close,
metric="close",
code=code,
as_of=trade_date,
cite=Cite(
source="tushare",
served_by="twilight-drive-backend",
served_version=settings.version,
table="daily",
fetched_at=fetched_at,
cache_age_seconds=0,
tool_call_id=_generate_tool_call_id(),
),
)- [ ] Step 3: Write tests
python
# tests/test_service_price.py
import pytest
from fastapi.testclient import TestClient
from unittest.mock import patch, MagicMock
from twilight_drive.service.main import app
class TestPriceCacheHit:
def test_cache_miss_fetches_tushare(self):
"""First request → Tushare called, response has cite envelope."""
mock_response = MagicMock()
mock_response.json.return_value = {
"data": {
"items": [["20260430", 1800.0, 1860.0, 1790.0, 1850.25, 100000, 185000000.0]],
"fields": ["trade_date", "open", "high", "low", "close", "vol", "amount"],
}
}
mock_response.raise_for_status = MagicMock()
with patch("httpx.AsyncClient") as mock_client:
mock_client.return_value.__aenter__.return_value.post.return_value = mock_response
client = TestClient(app)
resp = client.get("/api/price?code=600519.SH&trade_date=20260430",
headers={"Authorization": "Bearer test-token"})
assert resp.status_code == 200
data = resp.json()
assert data["value"] == 1850.25
assert data["cite"]["kind"] == "tool"
assert data["cite"]["source"] == "tushare"
assert data["cite"]["served_by"] == "twilight-drive-backend"
def test_no_auth_returns_401(self):
client = TestClient(app)
resp = client.get("/api/price?code=600519.SH")
assert resp.status_code == 401
def test_invalid_auth_returns_401(self):
client = TestClient(app)
resp = client.get("/api/price?code=600519.SH",
headers={"Authorization": "Bearer wrong-token"})
assert resp.status_code == 401
def test_no_data_returns_404(self):
"""Tushare returns empty items → 404."""
mock_response = MagicMock()
mock_response.json.return_value = {"data": {"items": [], "fields": []}}
mock_response.raise_for_status = MagicMock()
with patch("httpx.AsyncClient") as mock_client:
mock_client.return_value.__aenter__.return_value.post.return_value = mock_response
client = TestClient(app)
resp = client.get("/api/price?code=999999.SH&trade_date=20260430",
headers={"Authorization": "Bearer test-token"})
assert resp.status_code == 404- [ ] Step 4: Run tests
bash
cd twilight-drive
uv run pytest tests/test_service_price.py -v- [ ] Step 5: Commit
bash
git add src/service/cache.py src/service/routes/price.py tests/test_service_price.py
git commit -m "feat(p1.0): DuckDB cache + /price route with Tushare fallback and cite envelope"Task 3: /fundamentals Route
Files:
Create:
twilight-drive/src/service/routes/fundamentals.py[ ] Step 1: Create the route
python
# src/service/routes/fundamentals.py
"""GET /api/fundamentals — return P/E, ROE, margins with cite envelopes."""
from datetime import datetime, timezone
import httpx
import structlog
from fastapi import APIRouter, Query, Request, HTTPException
from pydantic import BaseModel
from twilight_drive.service.config import Settings
logger = structlog.get_logger()
router = APIRouter()
TUSHARE_API = "https://tushare.pro/api"
class Claim(BaseModel):
metric: str
value: float
cite: dict
class FundamentalsResponse(BaseModel):
code: str
as_of: str
claims: list[Claim]
def _generate_tool_call_id() -> str:
import secrets
return f"tc_{secrets.token_hex(8)}"
@router.get("/fundamentals", response_model=FundamentalsResponse)
async def get_fundamentals(
request: Request,
code: str = Query(...),
period: str = Query(None, description="Report period YYYYMMDD. Defaults to latest."),
):
settings: Settings = request.app.state.settings
if period is None:
period = datetime.now(timezone.utc).strftime("%Y%m%d")
async with httpx.AsyncClient(timeout=15.0) as client:
resp = await client.post(
TUSHARE_API,
json={
"api_name": "fina_indicator",
"token": settings.tushare_token,
"params": {"ts_code": code, "period": period},
},
)
resp.raise_for_status()
data = resp.json()
items = data.get("data", {}).get("items", [])
if not items:
raise HTTPException(status_code=404, detail=f"No fundamentals for {code} in {period}")
fields = data["data"]["fields"]
row = items[0]
# Extract key metrics
metrics = {}
for metric_name in ["pe", "roe", "grossprofit_margin", "netprofit_margin"]:
if metric_name in fields:
idx = fields.index(metric_name)
val = row[idx]
if val is not None:
metrics[metric_name] = float(val)
fetched_at = datetime.now(timezone.utc).isoformat()
return FundamentalsResponse(
code=code,
as_of=period,
claims=[
Claim(
metric=name,
value=value,
cite={
"kind": "tool",
"source": "tushare",
"served_by": "twilight-drive-backend",
"served_version": settings.version,
"table": "fina_indicator",
"fetched_at": fetched_at,
"cache_age_seconds": 0,
"tool_call_id": _generate_tool_call_id(),
},
)
for name, value in metrics.items()
],
)- [ ] Step 2: Write tests + run + commit
(Same pattern as Task 2 — mock Tushare response, verify schema + cite envelope.)
Task 4: Admin CLI for Token Issuance
Files:
Create:
twilight-drive/scripts/issue-token.pyCreate:
twilight-drive/scripts/deploy-vultr.sh[ ] Step 1: Write the token issuance script
python
#!/usr/bin/env python3
"""Issue a bearer token for an alpha user.
Usage:
uv run python scripts/issue-token.py --user alpha-001
uv run python scripts/issue-token.py --user alpha-001 --revoke
"""
import argparse
import hashlib
import secrets
import sqlite3
from datetime import datetime, timezone
from pathlib import Path
AUTH_DB = Path("/var/lib/twilight/auth.db")
def ensure_db():
AUTH_DB.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(AUTH_DB))
conn.execute("""
CREATE TABLE IF NOT EXISTS api_keys (
key_hash TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
scope TEXT DEFAULT 'data:read',
revoked_at TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
conn.commit()
return conn
def issue_token(user_id: str) -> str:
raw = secrets.token_urlsafe(32)
key_hash = hashlib.sha256(raw.encode()).hexdigest()
conn = ensure_db()
conn.execute(
"INSERT OR REPLACE INTO api_keys (key_hash, user_id) VALUES (?, ?)",
(key_hash, user_id),
)
conn.commit()
conn.close()
print(f"Token issued for {user_id}:")
print(f" TWILIGHT_API_TOKEN={raw}")
print(f" (hash: {key_hash[:16]}...)")
print(f" This token will never be shown again. Store it securely.")
return raw
def revoke_token(user_id: str):
conn = ensure_db()
conn.execute(
"UPDATE api_keys SET revoked_at = ? WHERE user_id = ?",
(datetime.now(timezone.utc).isoformat(), user_id),
)
conn.commit()
conn.close()
print(f"Token revoked for {user_id}")
def main():
parser = argparse.ArgumentParser(description="Issue/revoke Twilight API tokens")
parser.add_argument("--user", required=True, help="User ID (e.g. alpha-001)")
parser.add_argument("--revoke", action="store_true", help="Revoke instead of issue")
args = parser.parse_args()
if args.revoke:
revoke_token(args.user)
else:
issue_token(args.user)
if __name__ == "__main__":
main()- [ ] Step 2: Test locally
bash
uv run python scripts/issue-token.py --user alpha-001
# Copy the token
curl -H "Authorization: Bearer <token>" http://localhost:8000/api/price?code=600519.SH&trade_date=20260430
# Should return 200 with price data
uv run python scripts/issue-token.py --user alpha-001 --revoke
curl -H "Authorization: Bearer <token>" http://localhost:8000/api/price?code=600519.SH&trade_date=20260430
# Should return 403- [ ] Step 3: Write deploy script
bash
#!/usr/bin/env bash
# deploy-vultr.sh — one-command deploy to Vultr
set -euo pipefail
VULTR_HOST="${VULTR_HOST:-root@vultr-ip}"
echo "Building and deploying to Vultr..."
ssh "$VULTR_HOST" bash -s <<'EOF'
cd /opt/twilight-drive
git pull origin main
systemctl restart twilight-service
sleep 2
curl -s http://localhost:8000/healthz
EOF
echo "Deploy complete."Task 5: P1.1 — Landing Page + WeChat Pay
Files:
Create:
twilight-drive/src/landing/index.html(Vue SPA, single file)Create:
twilight-drive/src/service/routes/payment.pyCreate:
twilight-drive/src/provisioner/main.py[ ] Step 1: Create landing page
(Single Vue.js SPA, Chinese language, two pricing cards — Pro selectable, Lite disabled.)
- [ ] Step 2: Create payment webhook handler
python
# src/service/routes/payment.py
"""POST /api/payments/webhook — WeChat Pay callback."""
from fastapi import APIRouter, Request, HTTPException
import structlog
logger = structlog.get_logger()
router = APIRouter()
@router.post("/payments/webhook")
async def wechat_pay_webhook(request: Request):
"""Receive WeChat Pay payment confirmation.
1. Verify webhook signature
2. Mark payment as paid
3. Set user.paid_until = now() + 30 days
4. Trigger provisioner (async)
"""
# TODO: Implement WeChat Pay signature verification
# For alpha: accept all webhooks, log for audit
body = await request.json()
logger.info("payment_webhook", body=body)
# TODO: Call provisioner asynchronously
# await provisioner.provision_user(user_id=..., plan="pro")
return {"code": "SUCCESS", "message": "OK"}- [ ] Step 3: Provisioner stub
python
# src/provisioner/main.py
"""Provisioner service: payment webhook → Hermes profile → QR code."""
import asyncio
import secrets
import hashlib
import structlog
from uuid import UUID
logger = structlog.get_logger()
async def provision_user(user_id: UUID, plan: str = "pro") -> dict:
"""Full provisioning flow (P1.1)."""
# 1. Generate API key
raw_key = secrets.token_urlsafe(32)
key_hash = hashlib.sha256(raw_key.encode()).hexdigest()
# 2. Store hash in DB
# await db.store_api_key(key_hash, user_id)
logger.info("provisioning", user_id=str(user_id), plan=plan)
# 3. Clone profile from template
# profile_name = f"user-{user_id.hex[:8]}"
# await hermes.clone(profile_name, template="stock-research-pro")
# 4. Plant secrets
# await hermes.plant_secret(profile_name, "TWILIGHT_API_TOKEN", raw_key)
# 5. Start gateway
# await hermes.start_gateway(profile_name)
# 6. Generate QR
# qr_url = await hermes.generate_qr(profile_name)
return {"status": "provisioning", "user_id": str(user_id)}
if __name__ == "__main__":
asyncio.run(provision_user(UUID(int=1), "pro"))Task 6: P1.2 — Multi-source (pytdx3 backfill)
Files:
Create:
twilight-drive/scripts/backfill_pytdx3.py[ ] Step 1: Write backfill script
python
#!/usr/bin/env python3
"""Backfill historical daily data via pytdx3 (free TDX socket).
Usage:
uv run python scripts/backfill_pytdx3.py --years 5 --top 300
"""
import argparse
import duckdb
from datetime import datetime
def backfill(top_n: int = 300, years: int = 5, duckdb_path: str = "/var/lib/twilight/cache.duckdb"):
"""Fetch N years of daily data for top N stocks via pytdx3."""
# TODO: Implement pytdx3 connection
# from pytdx3 import TDXClient
# client = TDXClient()
# codes = client.get_top_stocks(top_n)
# for code in codes:
# rows = client.get_daily_history(code, years=years)
# duckdb.insert("daily", rows, on_conflict="ignore")
print(f"Backfill: {top_n} stocks, {years} years → {duckdb_path}")
print("TODO: implement pytdx3 adapter")
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--years", type=int, default=5)
parser.add_argument("--top", type=int, default=300)
parser.add_argument("--duckdb", default="/var/lib/twilight/cache.duckdb")
args = parser.parse_args()
backfill(args.top, args.years, args.duckdb)
if __name__ == "__main__":
main()Task 7: P1.3 — Web Search Proxy
Files:
Create:
twilight-drive/src/service/routes/search.py[ ] Step 1: Create search route with DashScope → Google fallback
python
# src/service/routes/search.py
"""GET /api/search — web search proxy with fallback."""
from fastapi import APIRouter, Query, Request
import httpx
import structlog
logger = structlog.get_logger()
router = APIRouter()
DASHSCOPE_SEARCH = "https://dashscope.aliyuncs.com/api/v1/services/web-search/search"
GOOGLE_CSE = "https://www.googleapis.com/customsearch/v1"
@router.get("/search")
async def search(
request: Request,
query: str = Query(...),
max_results: int = Query(10),
):
settings = request.app.state.settings
# Try DashScope first
try:
return await dashscope_search(query, max_results)
except (Exception,) as e:
logger.warning("dashscope_search_failed", error=str(e))
# Fallback to Google
return await google_search(query, max_results)
async def dashscope_search(query: str, max_results: int) -> dict:
# TODO: Implement DashScope web_search
return {"results": [], "source": "dashscope"}
async def google_search(query: str, max_results: int) -> dict:
# TODO: Implement Google Custom Search
return {"results": [], "source": "google"}Task 8: Deployment + Verification
- [ ] Step 1: Set up Vultr instance
bash
# SSH into Vultr, install dependencies
ssh root@vultr-ip
apt update && apt install -y python3.11 python3-pip
pip install uv
mkdir -p /var/lib/twilight
git clone <repo> /opt/twilight-drive
cd /opt/twilight-drive && uv sync- [ ] Step 2: Configure systemd
bash
# /etc/systemd/system/twilight-service.service
[Unit]
Description=Twilight Drive API
After=network.target
[Service]
Type=simple
User=root
WorkingDirectory=/opt/twilight-drive
EnvironmentFile=/etc/twilight/env
ExecStart=/opt/twilight-drive/.venv/bin/uvicorn twilight_drive.service.main:app --host 0.0.0.0 --port 8000
Restart=always
[Install]
WantedBy=multi-user.targetbash
# /etc/twilight/env
TUSHARE_TOKEN=your-tushare-token
TWILIGHT_TUSHARE_TOKEN=your-tushare-token- [ ] Step 3: Run full test suite
bash
cd twilight-drive
uv run pytest -q --tb=short
uv run ruff check src/ tests/- [ ] Step 4: Deploy and smoke test
bash
./scripts/deploy-vultr.sh
TOKEN=$(uv run python scripts/issue-token.py --user smoke-test 2>&1 | grep TWILIGHT_API_TOKEN | cut -d= -f2)
curl -H "Authorization: Bearer $TOKEN" http://vultr-ip:8000/healthz
curl -H "Authorization: Bearer $TOKEN" "http://vultr-ip:8000/api/price?code=600519.SH&trade_date=20260430"Out of Scope (Explicitly Deferred)
- Lite tier enforcement (P2)
- Postgres for users (P1.1 uses SQLite, upgrade later)
- Research report PDF ingestion (P2)
- Real-time intraday data (P3)
- Cross-source factor generation (P2)