Skip to content

Twilight Drive Phase 1 — Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Ship a paid A-share research bot on WeChat in four milestones: P1.0 (FastAPI + DuckDB + auth), P1.1 (payment + provisioning), P1.2 (multi-source data), P1.3 (web search proxy).

Architecture: FastAPI on Vultr with DuckDB cache, bearer token auth, WeChat Pay for payment, Python provisioner for Hermes profile cloning, Vue SPA for landing page.

Source spec: docs/planning/superpowers/specs/2026-05-07-twilight-drive-phase1.md


Task 1: Project Scaffold for P1.0 Service

Files:

  • Create: twilight-drive/src/service/__init__.py

  • Create: twilight-drive/src/service/main.py

  • Create: twilight-drive/src/service/config.py

  • Create: twilight-drive/src/service/auth.py

  • Create: twilight-drive/pyproject.toml (modify existing)

  • [ ] Step 1: Create the FastAPI app skeleton

python
# src/service/main.py
"""Twilight Drive P1.0 — FastAPI backend for stock research data."""

from contextlib import asynccontextmanager

import structlog
from fastapi import FastAPI

from twilight_drive.service.config import Settings
from twilight_drive.service.routes import healthz, price, fundamentals
from twilight_drive.service.auth import AuthMiddleware

logger = structlog.get_logger()


def create_app() -> FastAPI:
    settings = Settings()

    app = FastAPI(
        title="Twilight Drive API",
        version="0.2.0",
        docs_url=None,  # no swagger in prod
        redoc_url=None,
    )

    # Middleware: bearer token auth
    app.add_middleware(AuthMiddleware, settings=settings)

    # Routes
    app.include_router(healthz.router)
    app.include_router(price.router, prefix="/api")
    app.include_router(fundamentals.router, prefix="/api")

    return app


app = create_app()
  • [ ] Step 2: Create settings module
python
# src/service/config.py
from pathlib import Path
from pydantic_settings import BaseSettings


class Settings(BaseSettings):
    """Environment-driven config for the service."""

    # Serving
    host: str = "0.0.0.0"
    port: int = 8000
    version: str = "0.2.0"

    # Tushare
    tushare_token: str

    # DuckDB
    duckdb_path: Path = Path("/var/lib/twilight/cache.duckdb")

    # Auth (SQLite for P1.0, upgrade to Postgres in P1.1)
    auth_db_path: Path = Path("/var/lib/twilight/auth.db")

    model_config = {"env_prefix": "TWILIGHT_", "env_file": ".env"}
  • [ ] Step 3: Create bearer token auth middleware
python
# src/service/auth.py
"""Bearer token authentication middleware.

Tokens are stored as sha256 hashes in a SQLite database.
Middleware checks Authorization header, validates token,
sets request.state.user_id for downstream handlers.
"""

import hashlib
import sqlite3
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import JSONResponse

import structlog

logger = structlog.get_logger()


class AuthMiddleware(BaseHTTPMiddleware):
    def __init__(self, app, settings):
        super().__init__(app)
        self.settings = settings
        self._init_db()

    def _init_db(self):
        """Create auth table if not exists."""
        conn = sqlite3.connect(self.settings.auth_db_path)
        conn.execute("""
            CREATE TABLE IF NOT EXISTS api_keys (
                key_hash TEXT PRIMARY KEY,
                user_id TEXT NOT NULL,
                scope TEXT DEFAULT 'data:read',
                revoked_at TIMESTAMP,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        """)
        conn.commit()
        conn.close()

    async def dispatch(self, request: Request, call_next):
        # Skip auth for healthz
        if request.url.path == "/healthz":
            return await call_next(request)

        auth_header = request.headers.get("authorization")
        if not auth_header or not auth_header.startswith("Bearer "):
            return JSONResponse(status_code=401, content={"detail": "Missing bearer token"})

        token = auth_header.split(" ", 1)[1]
        key_hash = hashlib.sha256(token.encode()).hexdigest()

        conn = sqlite3.connect(self.settings.auth_db_path)
        cursor = conn.execute(
            "SELECT user_id, scope, revoked_at FROM api_keys WHERE key_hash = ?",
            (key_hash,),
        )
        row = cursor.fetchone()
        conn.close()

        if row is None:
            return JSONResponse(status_code=401, content={"detail": "Invalid token"})

        user_id, scope, revoked_at = row
        if revoked_at is not None:
            return JSONResponse(status_code=403, content={"detail": "Token revoked"})

        request.state.user_id = user_id
        request.state.scope = scope
        logger.debug("auth_success", user_id=user_id, path=request.url.path)

        return await call_next(request)
  • [ ] Step 4: Run the smoke test
bash
cd twilight-drive
uv run uvicorn src.service.main:app --host 0.0.0.0 --port 8000
curl http://localhost:8000/healthz
# Expected: {"ok": true, "version": "0.2.0"}
curl http://localhost:8000/api/price?code=600519.SH
# Expected: 401 {"detail": "Missing bearer token"}
  • [ ] Step 5: Commit
bash
git add src/service/ pyproject.toml
git commit -m "feat(p1.0): scaffold FastAPI service with bearer token auth"

Task 2: DuckDB Cache + /price Route

Files:

  • Create: twilight-drive/src/service/cache.py

  • Create: twilight-drive/src/service/routes/__init__.py

  • Create: twilight-drive/src/service/routes/price.py

  • [ ] Step 1: Create DuckDB cache layer

python
# src/service/cache.py
"""DuckDB cache for daily price data.

Schema:
  daily_cache(code, trade_date, close, fetched_at)
Cache key: (code, trade_date)
TTL: 24 hours (86400 seconds)
"""

import duckdb
from datetime import datetime, timezone
from pathlib import Path


class PriceCache:
    def __init__(self, db_path: Path):
        db_path.parent.mkdir(parents=True, exist_ok=True)
        self.conn = duckdb.connect(str(db_path))
        self._init_table()

    def _init_table(self):
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS daily_cache (
                code VARCHAR,
                trade_date VARCHAR,
                close DOUBLE,
                fetched_at TIMESTAMP,
                PRIMARY KEY (code, trade_date)
            )
        """)

    def get(self, code: str, trade_date: str) -> dict | None:
        """Return cached row or None."""
        result = self.conn.execute(
            "SELECT code, trade_date, close, fetched_at FROM daily_cache "
            "WHERE code = ? AND trade_date = ?",
            [code, trade_date],
        ).fetchone()
        if result is None:
            return None
        return {
            "code": result[0],
            "trade_date": result[1],
            "close": result[2],
            "fetched_at": result[3].isoformat(),
        }

    def put(self, code: str, trade_date: str, close: float):
        """Insert or replace cache entry."""
        self.conn.execute(
            "INSERT OR REPLACE INTO daily_cache (code, trade_date, close, fetched_at) "
            "VALUES (?, ?, ?, ?)",
            [code, trade_date, close, datetime.now(timezone.utc)],
        )

    def is_fresh(self, code: str, trade_date: str, max_age_seconds: int = 86400) -> bool:
        """Check if cached entry is within TTL."""
        result = self.conn.execute(
            "SELECT fetched_at FROM daily_cache WHERE code = ? AND trade_date = ?",
            [code, trade_date],
        ).fetchone()
        if result is None:
            return False
        fetched = result[0]
        age = (datetime.now(timezone.utc) - fetched).total_seconds()
        return age < max_age_seconds
  • [ ] Step 2: Create /price route with cache → Tushare fallback
python
# src/service/routes/price.py
"""GET /api/price — return closing price with cite envelope."""

from datetime import datetime, timezone

import httpx
import structlog
from fastapi import APIRouter, Query, Request
from pydantic import BaseModel

from twilight_drive.service.cache import PriceCache
from twilight_drive.service.config import Settings

logger = structlog.get_logger()
router = APIRouter()

TUSHARE_API = "https://tushare.pro/api"


class Cite(BaseModel):
    kind: str = "tool"
    source: str
    served_by: str
    served_version: str
    table: str
    fetched_at: str
    cache_age_seconds: int
    tool_call_id: str


class PriceResponse(BaseModel):
    value: float
    metric: str
    code: str
    as_of: str
    cite: Cite


# Module-level cache singleton (initialized on first route access)
_cache: PriceCache | None = None


def get_cache(settings: Settings) -> PriceCache:
    global _cache
    if _cache is None:
        _cache = PriceCache(settings.duckdb_path)
    return _cache


def _generate_tool_call_id() -> str:
    import secrets
    return f"tc_{secrets.token_hex(8)}"


@router.get("/price", response_model=PriceResponse)
async def get_price(
    request: Request,
    code: str = Query(..., description="Stock code, e.g. 600519.SH"),
    trade_date: str = Query(None, description="Trade date YYYYMMDD. Defaults to latest."),
):
    settings: Settings = request.app.state.settings
    cache = get_cache(settings)

    # Resolve trade_date to today if not provided
    if trade_date is None:
        trade_date = datetime.now(timezone.utc).strftime("%Y%m%d")

    # Check cache first
    cached = cache.get(code, trade_date)
    if cached and cache.is_fresh(code, trade_date):
        age = int(
            (datetime.now(timezone.utc) - datetime.fromisoformat(cached["fetched_at"])).total_seconds()
        )
        logger.info("price_cache_hit", code=code, trade_date=trade_date)
        return PriceResponse(
            value=cached["close"],
            metric="close",
            code=code,
            as_of=trade_date,
            cite=Cite(
                source="tushare",
                served_by="twilight-drive-backend",
                served_version=settings.version,
                table="daily_cache",
                fetched_at=cached["fetched_at"],
                cache_age_seconds=age,
                tool_call_id=_generate_tool_call_id(),
            ),
        )

    # Cache miss → fetch from Tushare
    logger.info("price_cache_miss", code=code, trade_date=trade_date)
    async with httpx.AsyncClient(timeout=15.0) as client:
        resp = await client.post(
            TUSHARE_API,
            json={
                "api_name": "daily",
                "token": settings.tushare_token,
                "params": {"ts_code": code, "trade_date": trade_date},
            },
        )
        resp.raise_for_status()
        data = resp.json()

    # Extract close from Tushare response
    # Tushare daily response: {data: {items: [[date, open, high, low, close, ...]], fields: [...]}}
    items = data.get("data", {}).get("items", [])
    if not items:
        from fastapi import HTTPException
        raise HTTPException(status_code=404, detail=f"No data for {code} on {trade_date}")

    fields = data["data"]["fields"]
    close_idx = fields.index("close")
    close = items[0][close_idx]

    # Write to cache
    cache.put(code, trade_date, close)

    fetched_at = datetime.now(timezone.utc).isoformat()
    return PriceResponse(
        value=close,
        metric="close",
        code=code,
        as_of=trade_date,
        cite=Cite(
            source="tushare",
            served_by="twilight-drive-backend",
            served_version=settings.version,
            table="daily",
            fetched_at=fetched_at,
            cache_age_seconds=0,
            tool_call_id=_generate_tool_call_id(),
        ),
    )
  • [ ] Step 3: Write tests
python
# tests/test_service_price.py
import pytest
from fastapi.testclient import TestClient
from unittest.mock import patch, MagicMock

from twilight_drive.service.main import app


class TestPriceCacheHit:
    def test_cache_miss_fetches_tushare(self):
        """First request → Tushare called, response has cite envelope."""
        mock_response = MagicMock()
        mock_response.json.return_value = {
            "data": {
                "items": [["20260430", 1800.0, 1860.0, 1790.0, 1850.25, 100000, 185000000.0]],
                "fields": ["trade_date", "open", "high", "low", "close", "vol", "amount"],
            }
        }
        mock_response.raise_for_status = MagicMock()

        with patch("httpx.AsyncClient") as mock_client:
            mock_client.return_value.__aenter__.return_value.post.return_value = mock_response
            client = TestClient(app)
            resp = client.get("/api/price?code=600519.SH&trade_date=20260430",
                              headers={"Authorization": "Bearer test-token"})

        assert resp.status_code == 200
        data = resp.json()
        assert data["value"] == 1850.25
        assert data["cite"]["kind"] == "tool"
        assert data["cite"]["source"] == "tushare"
        assert data["cite"]["served_by"] == "twilight-drive-backend"

    def test_no_auth_returns_401(self):
        client = TestClient(app)
        resp = client.get("/api/price?code=600519.SH")
        assert resp.status_code == 401

    def test_invalid_auth_returns_401(self):
        client = TestClient(app)
        resp = client.get("/api/price?code=600519.SH",
                          headers={"Authorization": "Bearer wrong-token"})
        assert resp.status_code == 401

    def test_no_data_returns_404(self):
        """Tushare returns empty items → 404."""
        mock_response = MagicMock()
        mock_response.json.return_value = {"data": {"items": [], "fields": []}}
        mock_response.raise_for_status = MagicMock()

        with patch("httpx.AsyncClient") as mock_client:
            mock_client.return_value.__aenter__.return_value.post.return_value = mock_response
            client = TestClient(app)
            resp = client.get("/api/price?code=999999.SH&trade_date=20260430",
                              headers={"Authorization": "Bearer test-token"})

        assert resp.status_code == 404
  • [ ] Step 4: Run tests
bash
cd twilight-drive
uv run pytest tests/test_service_price.py -v
  • [ ] Step 5: Commit
bash
git add src/service/cache.py src/service/routes/price.py tests/test_service_price.py
git commit -m "feat(p1.0): DuckDB cache + /price route with Tushare fallback and cite envelope"

Task 3: /fundamentals Route

Files:

  • Create: twilight-drive/src/service/routes/fundamentals.py

  • [ ] Step 1: Create the route

python
# src/service/routes/fundamentals.py
"""GET /api/fundamentals — return P/E, ROE, margins with cite envelopes."""

from datetime import datetime, timezone

import httpx
import structlog
from fastapi import APIRouter, Query, Request, HTTPException
from pydantic import BaseModel

from twilight_drive.service.config import Settings

logger = structlog.get_logger()
router = APIRouter()

TUSHARE_API = "https://tushare.pro/api"


class Claim(BaseModel):
    metric: str
    value: float
    cite: dict


class FundamentalsResponse(BaseModel):
    code: str
    as_of: str
    claims: list[Claim]


def _generate_tool_call_id() -> str:
    import secrets
    return f"tc_{secrets.token_hex(8)}"


@router.get("/fundamentals", response_model=FundamentalsResponse)
async def get_fundamentals(
    request: Request,
    code: str = Query(...),
    period: str = Query(None, description="Report period YYYYMMDD. Defaults to latest."),
):
    settings: Settings = request.app.state.settings

    if period is None:
        period = datetime.now(timezone.utc).strftime("%Y%m%d")

    async with httpx.AsyncClient(timeout=15.0) as client:
        resp = await client.post(
            TUSHARE_API,
            json={
                "api_name": "fina_indicator",
                "token": settings.tushare_token,
                "params": {"ts_code": code, "period": period},
            },
        )
        resp.raise_for_status()
        data = resp.json()

    items = data.get("data", {}).get("items", [])
    if not items:
        raise HTTPException(status_code=404, detail=f"No fundamentals for {code} in {period}")

    fields = data["data"]["fields"]
    row = items[0]

    # Extract key metrics
    metrics = {}
    for metric_name in ["pe", "roe", "grossprofit_margin", "netprofit_margin"]:
        if metric_name in fields:
            idx = fields.index(metric_name)
            val = row[idx]
            if val is not None:
                metrics[metric_name] = float(val)

    fetched_at = datetime.now(timezone.utc).isoformat()
    return FundamentalsResponse(
        code=code,
        as_of=period,
        claims=[
            Claim(
                metric=name,
                value=value,
                cite={
                    "kind": "tool",
                    "source": "tushare",
                    "served_by": "twilight-drive-backend",
                    "served_version": settings.version,
                    "table": "fina_indicator",
                    "fetched_at": fetched_at,
                    "cache_age_seconds": 0,
                    "tool_call_id": _generate_tool_call_id(),
                },
            )
            for name, value in metrics.items()
        ],
    )
  • [ ] Step 2: Write tests + run + commit

(Same pattern as Task 2 — mock Tushare response, verify schema + cite envelope.)


Task 4: Admin CLI for Token Issuance

Files:

  • Create: twilight-drive/scripts/issue-token.py

  • Create: twilight-drive/scripts/deploy-vultr.sh

  • [ ] Step 1: Write the token issuance script

python
#!/usr/bin/env python3
"""Issue a bearer token for an alpha user.

Usage:
    uv run python scripts/issue-token.py --user alpha-001
    uv run python scripts/issue-token.py --user alpha-001 --revoke
"""

import argparse
import hashlib
import secrets
import sqlite3
from datetime import datetime, timezone
from pathlib import Path

AUTH_DB = Path("/var/lib/twilight/auth.db")


def ensure_db():
    AUTH_DB.parent.mkdir(parents=True, exist_ok=True)
    conn = sqlite3.connect(str(AUTH_DB))
    conn.execute("""
        CREATE TABLE IF NOT EXISTS api_keys (
            key_hash TEXT PRIMARY KEY,
            user_id TEXT NOT NULL,
            scope TEXT DEFAULT 'data:read',
            revoked_at TIMESTAMP,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )
    """)
    conn.commit()
    return conn


def issue_token(user_id: str) -> str:
    raw = secrets.token_urlsafe(32)
    key_hash = hashlib.sha256(raw.encode()).hexdigest()

    conn = ensure_db()
    conn.execute(
        "INSERT OR REPLACE INTO api_keys (key_hash, user_id) VALUES (?, ?)",
        (key_hash, user_id),
    )
    conn.commit()
    conn.close()

    print(f"Token issued for {user_id}:")
    print(f"  TWILIGHT_API_TOKEN={raw}")
    print(f"  (hash: {key_hash[:16]}...)")
    print(f"  This token will never be shown again. Store it securely.")
    return raw


def revoke_token(user_id: str):
    conn = ensure_db()
    conn.execute(
        "UPDATE api_keys SET revoked_at = ? WHERE user_id = ?",
        (datetime.now(timezone.utc).isoformat(), user_id),
    )
    conn.commit()
    conn.close()
    print(f"Token revoked for {user_id}")


def main():
    parser = argparse.ArgumentParser(description="Issue/revoke Twilight API tokens")
    parser.add_argument("--user", required=True, help="User ID (e.g. alpha-001)")
    parser.add_argument("--revoke", action="store_true", help="Revoke instead of issue")
    args = parser.parse_args()

    if args.revoke:
        revoke_token(args.user)
    else:
        issue_token(args.user)


if __name__ == "__main__":
    main()
  • [ ] Step 2: Test locally
bash
uv run python scripts/issue-token.py --user alpha-001
# Copy the token
curl -H "Authorization: Bearer <token>" http://localhost:8000/api/price?code=600519.SH&trade_date=20260430
# Should return 200 with price data

uv run python scripts/issue-token.py --user alpha-001 --revoke
curl -H "Authorization: Bearer <token>" http://localhost:8000/api/price?code=600519.SH&trade_date=20260430
# Should return 403
  • [ ] Step 3: Write deploy script
bash
#!/usr/bin/env bash
# deploy-vultr.sh — one-command deploy to Vultr
set -euo pipefail

VULTR_HOST="${VULTR_HOST:-root@vultr-ip}"

echo "Building and deploying to Vultr..."
ssh "$VULTR_HOST" bash -s <<'EOF'
  cd /opt/twilight-drive
  git pull origin main
  systemctl restart twilight-service
  sleep 2
  curl -s http://localhost:8000/healthz
EOF

echo "Deploy complete."

Task 5: P1.1 — Landing Page + WeChat Pay

Files:

  • Create: twilight-drive/src/landing/index.html (Vue SPA, single file)

  • Create: twilight-drive/src/service/routes/payment.py

  • Create: twilight-drive/src/provisioner/main.py

  • [ ] Step 1: Create landing page

(Single Vue.js SPA, Chinese language, two pricing cards — Pro selectable, Lite disabled.)

  • [ ] Step 2: Create payment webhook handler
python
# src/service/routes/payment.py
"""POST /api/payments/webhook — WeChat Pay callback."""

from fastapi import APIRouter, Request, HTTPException
import structlog

logger = structlog.get_logger()
router = APIRouter()


@router.post("/payments/webhook")
async def wechat_pay_webhook(request: Request):
    """Receive WeChat Pay payment confirmation.
    
    1. Verify webhook signature
    2. Mark payment as paid
    3. Set user.paid_until = now() + 30 days
    4. Trigger provisioner (async)
    """
    # TODO: Implement WeChat Pay signature verification
    # For alpha: accept all webhooks, log for audit
    body = await request.json()
    logger.info("payment_webhook", body=body)
    
    # TODO: Call provisioner asynchronously
    # await provisioner.provision_user(user_id=..., plan="pro")
    
    return {"code": "SUCCESS", "message": "OK"}
  • [ ] Step 3: Provisioner stub
python
# src/provisioner/main.py
"""Provisioner service: payment webhook → Hermes profile → QR code."""

import asyncio
import secrets
import hashlib
import structlog
from uuid import UUID

logger = structlog.get_logger()


async def provision_user(user_id: UUID, plan: str = "pro") -> dict:
    """Full provisioning flow (P1.1)."""
    # 1. Generate API key
    raw_key = secrets.token_urlsafe(32)
    key_hash = hashlib.sha256(raw_key.encode()).hexdigest()
    
    # 2. Store hash in DB
    # await db.store_api_key(key_hash, user_id)
    logger.info("provisioning", user_id=str(user_id), plan=plan)
    
    # 3. Clone profile from template
    # profile_name = f"user-{user_id.hex[:8]}"
    # await hermes.clone(profile_name, template="stock-research-pro")
    
    # 4. Plant secrets
    # await hermes.plant_secret(profile_name, "TWILIGHT_API_TOKEN", raw_key)
    
    # 5. Start gateway
    # await hermes.start_gateway(profile_name)
    
    # 6. Generate QR
    # qr_url = await hermes.generate_qr(profile_name)
    
    return {"status": "provisioning", "user_id": str(user_id)}


if __name__ == "__main__":
    asyncio.run(provision_user(UUID(int=1), "pro"))

Task 6: P1.2 — Multi-source (pytdx3 backfill)

Files:

  • Create: twilight-drive/scripts/backfill_pytdx3.py

  • [ ] Step 1: Write backfill script

python
#!/usr/bin/env python3
"""Backfill historical daily data via pytdx3 (free TDX socket).

Usage:
    uv run python scripts/backfill_pytdx3.py --years 5 --top 300
"""

import argparse
import duckdb
from datetime import datetime


def backfill(top_n: int = 300, years: int = 5, duckdb_path: str = "/var/lib/twilight/cache.duckdb"):
    """Fetch N years of daily data for top N stocks via pytdx3."""
    # TODO: Implement pytdx3 connection
    # from pytdx3 import TDXClient
    # client = TDXClient()
    # codes = client.get_top_stocks(top_n)
    # for code in codes:
    #     rows = client.get_daily_history(code, years=years)
    #     duckdb.insert("daily", rows, on_conflict="ignore")
    
    print(f"Backfill: {top_n} stocks, {years} years → {duckdb_path}")
    print("TODO: implement pytdx3 adapter")


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--years", type=int, default=5)
    parser.add_argument("--top", type=int, default=300)
    parser.add_argument("--duckdb", default="/var/lib/twilight/cache.duckdb")
    args = parser.parse_args()
    backfill(args.top, args.years, args.duckdb)


if __name__ == "__main__":
    main()

Task 7: P1.3 — Web Search Proxy

Files:

  • Create: twilight-drive/src/service/routes/search.py

  • [ ] Step 1: Create search route with DashScope → Google fallback

python
# src/service/routes/search.py
"""GET /api/search — web search proxy with fallback."""

from fastapi import APIRouter, Query, Request
import httpx
import structlog

logger = structlog.get_logger()
router = APIRouter()

DASHSCOPE_SEARCH = "https://dashscope.aliyuncs.com/api/v1/services/web-search/search"
GOOGLE_CSE = "https://www.googleapis.com/customsearch/v1"


@router.get("/search")
async def search(
    request: Request,
    query: str = Query(...),
    max_results: int = Query(10),
):
    settings = request.app.state.settings
    
    # Try DashScope first
    try:
        return await dashscope_search(query, max_results)
    except (Exception,) as e:
        logger.warning("dashscope_search_failed", error=str(e))
        # Fallback to Google
        return await google_search(query, max_results)


async def dashscope_search(query: str, max_results: int) -> dict:
    # TODO: Implement DashScope web_search
    return {"results": [], "source": "dashscope"}


async def google_search(query: str, max_results: int) -> dict:
    # TODO: Implement Google Custom Search
    return {"results": [], "source": "google"}

Task 8: Deployment + Verification

  • [ ] Step 1: Set up Vultr instance
bash
# SSH into Vultr, install dependencies
ssh root@vultr-ip
apt update && apt install -y python3.11 python3-pip
pip install uv
mkdir -p /var/lib/twilight
git clone <repo> /opt/twilight-drive
cd /opt/twilight-drive && uv sync
  • [ ] Step 2: Configure systemd
bash
# /etc/systemd/system/twilight-service.service
[Unit]
Description=Twilight Drive API
After=network.target

[Service]
Type=simple
User=root
WorkingDirectory=/opt/twilight-drive
EnvironmentFile=/etc/twilight/env
ExecStart=/opt/twilight-drive/.venv/bin/uvicorn twilight_drive.service.main:app --host 0.0.0.0 --port 8000
Restart=always

[Install]
WantedBy=multi-user.target
bash
# /etc/twilight/env
TUSHARE_TOKEN=your-tushare-token
TWILIGHT_TUSHARE_TOKEN=your-tushare-token
  • [ ] Step 3: Run full test suite
bash
cd twilight-drive
uv run pytest -q --tb=short
uv run ruff check src/ tests/
  • [ ] Step 4: Deploy and smoke test
bash
./scripts/deploy-vultr.sh
TOKEN=$(uv run python scripts/issue-token.py --user smoke-test 2>&1 | grep TWILIGHT_API_TOKEN | cut -d= -f2)
curl -H "Authorization: Bearer $TOKEN" http://vultr-ip:8000/healthz
curl -H "Authorization: Bearer $TOKEN" "http://vultr-ip:8000/api/price?code=600519.SH&trade_date=20260430"

Out of Scope (Explicitly Deferred)

  • Lite tier enforcement (P2)
  • Postgres for users (P1.1 uses SQLite, upgrade later)
  • Research report PDF ingestion (P2)
  • Real-time intraday data (P3)
  • Cross-source factor generation (P2)

团队内部文档