This chapter explains why Python FastAPI for PIF AIβs backend, the practical trade-offs of SQLAlchemy async, why we use both Alembic and inline migrations, how Worker collaborates with the Web process, and our testing-pyramid + CI strategy.
_run_migrations (idempotent SQL for fast iteration)| Candidate | Strengths | Weaknesses | PIF AI Fit |
|---|---|---|---|
| FastAPI | Async native, Pydantic validation, OpenAPI auto-gen, AI ecosystem | Less βbatteries includedβ than Django | β Chosen |
| Django + DRF | Mature, full-stack, admin | Late async support, heavy framework | β |
| Flask | Simple, flexible | Async via plugins, weak typing | β |
| Node.js (NestJS) | Unified language | Weaker AI/ML ecosystem | β |
Deciding factors:
/docs auto-generated; frontend team doesnβt hand-write API docsbackend/
βββ app/
β βββ main.py # FastAPI app + lifespan
β βββ api/v1/ # Routes (products, pif, sa_review, ...)
β βββ core/ # config, database, security, email
β βββ models/ # SQLAlchemy ORM
β βββ schemas/ # Pydantic request/response
β βββ services/ # Business logic (rag_client, sa_workflow, ...)
β βββ ai/ # AI engine (toxicology, document_parser, ...)
β βββ mcp_servers/ # MCP server impl (tfda, echa)
βββ tests/ # pytest
βββ migrations/ # Alembic
βββ alembic.ini
βββ requirements.txt
SQLAlchemy 2.0 introduces Mapped[T] annotations, enabling IDE/mypy inference:
# app/models/product.py (excerpt)
class Product(Base):
__tablename__ = "products"
id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), primary_key=True, default=uuid.uuid4
)
org_id: Mapped[uuid.UUID] = mapped_column(
UUID(as_uuid=True), ForeignKey("organizations.id")
)
name: Mapped[str] = mapped_column(String(500), nullable=False)
rag_kb_id: Mapped[str | None] = mapped_column(String(100), index=True)
# ...
organization = relationship("Organization", back_populates="products")
All queries are async:
async def get_product_for_org(product_id, org_id, db: AsyncSession):
result = await db.execute(
select(Product).where(
Product.id == product_id,
Product.org_id == org_id, # ACL gate β explicit WHERE
)
)
return result.scalar_one_or_none()
Every data-access function takes org_id and hard-filters in WHERE. This is the application-layer line of defense for Scheme C+ (see Β§10). Code pattern:
# Correct: ACL gate
product = await get_product_for_org(product_id, current_user.org_id, db)
# Wrong: direct lookup (bypasses ACL)
# product = await db.get(Product, product_id) # NEVER
Code review enforces that all DB access goes through get_*_for_org functions.
During Phase 1 rapid iteration, schema changes frequently. Pure Alembic has pain points:
alembic revision --autogeneratealembic upgrade headTrack A: Alembic (formal migrations)
Used for large schema changes (new tables, index rebuilds). Stored in migrations/versions/, reviewed in PR.
Track B: Inline _run_migrations (idempotent SQL)
For small changes (add column, add index, data back-fill), using ALTER ... IF NOT EXISTS or UPDATE ... WHERE. Runs on every FastAPI startup:
# app/main.py (excerpt)
@asynccontextmanager
async def lifespan(app: FastAPI):
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
await _run_migrations(conn)
yield
await engine.dispose()
async def _run_migrations(conn) -> None:
"""Idempotent schema migrations for evolving an existing DB."""
from sqlalchemy import text
stmts = [
"ALTER TABLE users ADD COLUMN IF NOT EXISTS totp_secret TEXT",
"ALTER TABLE products ADD COLUMN IF NOT EXISTS rag_kb_id VARCHAR(100)",
"CREATE INDEX IF NOT EXISTS idx_products_rag_kb_id ON products(rag_kb_id)",
# ...
]
for stmt in stmts:
try:
await conn.execute(text(stmt))
except Exception:
pass # Already in target state β ignore
Pros:
_run_migrations β PR is clearalembic upgradecreate_all, handles transformations ORM cannot express (CHECK fixes, conditional indexes)Cons:
Rule of thumb:
Long-running operations do not fit HTTP request lifecycles:
| Operation | Time | Needs Worker |
|---|---|---|
| Formulation AI extraction | 10β30s | β |
| Toxicology batch query | 30sβ2min | β |
| PIF PDF generation | 30sβ1min | β |
| SA assessment draft generation | 15β45s | β |
| Generic DB CRUD | < 100ms | β |
Redis is the queue backend. Web pushes tasks; Worker consumes:
sequenceDiagram
participant W as Web (FastAPI)
participant R as Redis Queue
participant K as Worker
participant D as PostgreSQL
W->>D: Create task record (status=pending)
W->>R: XADD queue:pif_generate
W-->>Client: 202 Accepted (task_id)
K->>R: XREAD queue:pif_generate
R-->>K: task payload
K->>D: UPDATE status=processing
K->>K: Execute long-running task
alt Success
K->>D: UPDATE status=done, result=...
else Failure
K->>D: UPDATE status=failed, error=...
K->>R: XADD queue:retry (exponential backoff)
end
Client->>W: GET /tasks/:id (polling)
W->>D: SELECT task
W-->>Client: status + result
Figure 6.1: Web enqueues and returns 202. Worker consumes independently, updates DB status. Failure uses exponential backoff (1s, 2s, 4s, 8s, β¦, max 300s); three consecutive failures β marked failed.
Web and Worker scale independently:
# docker-compose.yml (excerpt)
services:
backend:
build: ./backend
command: uvicorn app.main:app --host 0.0.0.0
deploy:
replicas: 3 # K8s: HPA on CPU
worker:
build: ./backend
command: python -m app.worker
deploy:
replicas: 5 # K8s: KEDA on queue depth
Web scales on req/sec; Worker scales on queue depth. They do not interfere.
/\
/ \ E2E (Playwright, Phase 2)
/----\
/ \ Integration (pytest + real DB)
/--------\
/ \ Unit (pytest + MockTransport)
/____________\
External APIs (Claude, PubChem, central RAG) are replaced with httpx.MockTransport β no real network:
# tests/test_rag_client.py (excerpt)
@pytest.mark.asyncio
async def test_create_kb_sends_correct_headers_and_payload(configured_rag):
captured = {}
def handler(request: httpx.Request) -> httpx.Response:
captured["headers"] = dict(request.headers)
return httpx.Response(
201,
json={"status": "success", "data": {"id": "kb_new_xyz"}},
)
_install_mock_transport(handler)
kb = await RagClient.create_knowledge_base(
org_id=uuid.uuid4(), product_id=uuid.uuid4()
)
assert kb.id == "kb_new_xyz"
assert captured["headers"]["x-rag-api-key"] == "test-key-abc"
assert captured["headers"]["x-tenant-id"] == "11111111-..."
This pattern keeps unit tests fully offline β ideal for CI. The 16 RagClient unit tests complete in 1.09 seconds on a local Docker container1.
tests/conftest.py creates a pifai_test database at session start, runs Base.metadata.create_all. Each test function runs in its own transaction, rolled back at end for isolation:
@pytest.fixture(scope="session", autouse=True)
def _create_test_db():
# Connect to main DB β DROP pifai_test β CREATE pifai_test
...
yield
# teardown
Integration tests validate the full FastAPI + DB + ACL chain β e.g., βcan user A access user Bβs products?β
.github/workflows/ci.yml (planned):
jobs:
test:
services:
postgres: { image: pgvector/pgvector:pg16 }
redis: { image: redis:7-alpine }
steps:
- run: pytest -q --cov=app --cov-report=xml
lint:
- run: ruff check .
- run: mypy app/
| Version | Date | Summary |
|---|---|---|
| v0.1 | 2026-04-19 | First draft. FastAPI, SQLAlchemy async, dual migration, Worker, pytest |
Β© 2026 Baiyuan Tech. Licensed under CC BY-NC 4.0.
Nav β Chapter 5: Frontend Stack Β· Chapter 7: AI Engine β
Measured: docker exec pif-backend-1 python -m pytest tests/test_rag_client.py -q completed 16 tests in 1.09s on 2026-04-19 (MacBook M2, Docker Desktop).Β ↩