FastAPI完全ガイド — 高速Python APIフレームワーク・型安全・非同期・本番運用


FastAPIはPythonのWebフレームワークの中で最も急成長しているプロジェクトの一つだ。GitHub上のスター数は急速に増加し、Netflix・Uber・Microsoftといった大企業が本番環境で採用している。なぜこれほどまでに支持されるのか。その答えは「速度」「型安全性」「開発体験」の三位一体にある。

この記事では、FastAPIの基礎から本番運用まで、実際のコード例を交えながら体系的に解説する。読み終えた後には、FastAPIで本格的なAPIサーバーを構築できるようになっているはずだ。


1. FastAPIとは — Flask・Djangoとの比較

誕生の背景

FastAPIは2018年にSebastián Ramírez(tiangolo)が開発した。Pythonの型ヒント(Type Hints)を最大限に活用し、OpenAPI仕様の自動生成・Pydanticによるデータ検証・Starletteによる非同期処理を組み合わせた革新的なフレームワークだ。

パフォーマンス比較

TechEmpower Framework Benchmarksによると、FastAPIはFlaskと比較して約3〜10倍高速だ。Djangoとの比較では更に大きな差が開く場合もある。

フレームワークリクエスト/秒(目安)非同期サポート型チェックOpenAPI自動生成
FastAPI~30,000ネイティブPydantic v2自動
Flask~8,000別途設定なし手動
Django REST~5,000別途設定なし手動
Express(Node.js)~25,000ネイティブ手動(TS)手動

この速度はStarlette(ASGIフレームワーク)とuvicorn(ASGIサーバー)の組み合わせによって実現されている。

FastAPIが選ばれる理由

1. 型ヒントによる自動バリデーション
2. 対話型APIドキュメント(Swagger UI / ReDoc)の自動生成
3. async/awaitによるネイティブ非同期処理
4. Pydantic v2による高速なデータ検証
5. Python 3.10+の最新機能をフル活用
6. 本番実績(Netflix・Uber・Microsoft)

2. プロジェクト設定

推奨プロジェクト構造

myapi/
├── pyproject.toml
├── .env
├── .env.example
├── Dockerfile
├── docker-compose.yml
├── alembic/
│   ├── alembic.ini
│   └── versions/
└── app/
    ├── __init__.py
    ├── main.py
    ├── config.py
    ├── database.py
    ├── models/
    │   ├── __init__.py
    │   └── user.py
    ├── schemas/
    │   ├── __init__.py
    │   └── user.py
    ├── routers/
    │   ├── __init__.py
    │   ├── users.py
    │   └── auth.py
    ├── dependencies/
    │   ├── __init__.py
    │   └── auth.py
    └── services/
        ├── __init__.py
        └── user_service.py

pyproject.toml

[project]
name = "myapi"
version = "0.1.0"
requires-python = ">=3.11"
dependencies = [
    "fastapi[standard]>=0.115.0",
    "uvicorn[standard]>=0.32.0",
    "pydantic>=2.10.0",
    "pydantic-settings>=2.7.0",
    "sqlalchemy[asyncio]>=2.0.0",
    "asyncpg>=0.30.0",
    "alembic>=1.14.0",
    "python-jose[cryptography]>=3.3.0",
    "passlib[bcrypt]>=1.7.4",
    "python-multipart>=0.0.20",
    "httpx>=0.28.0",
    "redis>=5.2.0",
    "celery>=5.4.0",
]

[project.optional-dependencies]
dev = [
    "pytest>=8.3.0",
    "pytest-asyncio>=0.25.0",
    "anyio>=4.8.0",
    "ruff>=0.9.0",
    "mypy>=1.14.0",
]

[tool.ruff]
line-length = 88
target-version = "py311"

[tool.mypy]
strict = true
plugins = ["pydantic.mypy"]

[tool.pytest.ini_options]
asyncio_mode = "auto"

環境変数設定(config.py)

# app/config.py
from pydantic_settings import BaseSettings, SettingsConfigDict
from functools import lru_cache


class Settings(BaseSettings):
    model_config = SettingsConfigDict(
        env_file=".env",
        env_file_encoding="utf-8",
        case_sensitive=False,
    )

    # アプリ設定
    app_name: str = "MyAPI"
    app_version: str = "1.0.0"
    debug: bool = False
    allowed_hosts: list[str] = ["*"]

    # データベース
    database_url: str = "postgresql+asyncpg://user:pass@localhost/mydb"

    # 認証
    secret_key: str
    algorithm: str = "HS256"
    access_token_expire_minutes: int = 30
    refresh_token_expire_days: int = 7

    # Redis
    redis_url: str = "redis://localhost:6379/0"

    # CORS
    cors_origins: list[str] = ["http://localhost:3000"]


@lru_cache
def get_settings() -> Settings:
    return Settings()

エントリーポイント(main.py)

# app/main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware

from app.config import get_settings
from app.database import engine, Base
from app.routers import users, auth, items
from app.middleware import RateLimitMiddleware, LoggingMiddleware

settings = get_settings()


@asynccontextmanager
async def lifespan(app: FastAPI):
    # 起動時処理
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    print("Database tables created.")
    yield
    # シャットダウン時処理
    await engine.dispose()
    print("Database connection closed.")


app = FastAPI(
    title=settings.app_name,
    version=settings.app_version,
    description="FastAPI完全ガイドのサンプルAPI",
    docs_url="/docs",
    redoc_url="/redoc",
    openapi_url="/openapi.json",
    lifespan=lifespan,
)

# ミドルウェア(後入れ先出し順で実行)
app.add_middleware(
    CORSMiddleware,
    allow_origins=settings.cors_origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)
app.add_middleware(TrustedHostMiddleware, allowed_hosts=settings.allowed_hosts)
app.add_middleware(RateLimitMiddleware, max_requests=100, window_seconds=60)
app.add_middleware(LoggingMiddleware)

# ルーター登録
app.include_router(auth.router, prefix="/api/v1/auth", tags=["認証"])
app.include_router(users.router, prefix="/api/v1/users", tags=["ユーザー"])
app.include_router(items.router, prefix="/api/v1/items", tags=["アイテム"])


@app.get("/health", tags=["ヘルスチェック"])
async def health_check():
    return {"status": "ok", "version": settings.app_version}

3. Path Operation — ルーティングの基礎

HTTPメソッドとデコレータ

# app/routers/items.py
from fastapi import APIRouter, HTTPException, Query, Path, status
from typing import Annotated

from app.schemas.item import ItemCreate, ItemUpdate, ItemResponse, ItemListResponse

router = APIRouter()


# GETエンドポイント(一覧取得)
@router.get(
    "/",
    response_model=ItemListResponse,
    summary="アイテム一覧取得",
    description="ページネーション対応のアイテム一覧を返す。",
)
async def list_items(
    page: Annotated[int, Query(ge=1, description="ページ番号")] = 1,
    per_page: Annotated[int, Query(ge=1, le=100, description="1ページあたりの件数")] = 20,
    search: Annotated[str | None, Query(description="検索キーワード")] = None,
):
    # 実装省略(後のSQLAlchemy節で詳述)
    return {"items": [], "total": 0, "page": page, "per_page": per_page}


# GETエンドポイント(単件取得)
@router.get(
    "/{item_id}",
    response_model=ItemResponse,
    summary="アイテム詳細取得",
    responses={
        404: {"description": "アイテムが見つからない"},
    },
)
async def get_item(
    item_id: Annotated[int, Path(ge=1, description="アイテムID")],
):
    # 存在確認
    item = None  # DBから取得する処理
    if not item:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"Item {item_id} not found",
        )
    return item


# POSTエンドポイント(作成)
@router.post(
    "/",
    response_model=ItemResponse,
    status_code=status.HTTP_201_CREATED,
    summary="アイテム作成",
)
async def create_item(item_data: ItemCreate):
    # DB保存処理
    return item_data


# PATCHエンドポイント(部分更新)
@router.patch("/{item_id}", response_model=ItemResponse)
async def update_item(
    item_id: Annotated[int, Path(ge=1)],
    item_data: ItemUpdate,
):
    return {"id": item_id, **item_data.model_dump(exclude_unset=True)}


# DELETEエンドポイント
@router.delete(
    "/{item_id}",
    status_code=status.HTTP_204_NO_CONTENT,
    summary="アイテム削除",
)
async def delete_item(item_id: Annotated[int, Path(ge=1)]):
    # 削除処理
    pass

レスポンスモデルの高度な使い方

from fastapi import APIRouter
from fastapi.responses import JSONResponse, StreamingResponse, FileResponse
import json

router = APIRouter()

# カスタムレスポンスヘッダー
@router.get("/download/{file_id}")
async def download_file(file_id: int):
    return FileResponse(
        path=f"/tmp/files/{file_id}.pdf",
        filename="document.pdf",
        media_type="application/pdf",
    )

# ストリーミングレスポンス
@router.get("/stream")
async def stream_data():
    async def generate():
        for i in range(10):
            yield f"data: {json.dumps({'chunk': i})}\n\n"
    return StreamingResponse(generate(), media_type="text/event-stream")

# 動的レスポンスモデル選択
@router.get(
    "/users/{user_id}",
    response_model=UserPublicResponse | UserPrivateResponse,
)
async def get_user_data(user_id: int, include_private: bool = False):
    user = await fetch_user(user_id)
    if include_private:
        return UserPrivateResponse.model_validate(user)
    return UserPublicResponse.model_validate(user)

4. Pydantic v2 — データバリデーションの要

BaseModelの基本

# app/schemas/user.py
from pydantic import (
    BaseModel,
    Field,
    EmailStr,
    field_validator,
    model_validator,
    ConfigDict,
    computed_field,
)
from datetime import datetime
from typing import Annotated


# カスタム型エイリアス
PositiveInt = Annotated[int, Field(gt=0)]
NonEmptyStr = Annotated[str, Field(min_length=1, max_length=255)]


class UserBase(BaseModel):
    model_config = ConfigDict(
        from_attributes=True,      # ORMオブジェクトから変換可能
        populate_by_name=True,     # フィールド名エイリアスで投入可能
        str_strip_whitespace=True, # 文字列の前後空白を自動除去
        validate_default=True,     # デフォルト値もバリデーション対象
    )

    username: NonEmptyStr = Field(
        ...,
        pattern=r"^[a-zA-Z0-9_-]+$",
        description="英数字・アンダースコア・ハイフンのみ使用可",
        examples=["john_doe", "alice-123"],
    )
    email: EmailStr = Field(..., description="メールアドレス")
    full_name: str | None = Field(None, max_length=100)
    age: PositiveInt | None = Field(None, le=150, description="年齢(1〜150)")


class UserCreate(UserBase):
    password: str = Field(..., min_length=8, max_length=128)
    password_confirm: str

    @field_validator("username")
    @classmethod
    def username_must_not_be_reserved(cls, v: str) -> str:
        reserved = {"admin", "root", "system", "api"}
        if v.lower() in reserved:
            raise ValueError(f"'{v}' は予約済みユーザー名です")
        return v

    @model_validator(mode="after")
    def passwords_must_match(self) -> "UserCreate":
        if self.password != self.password_confirm:
            raise ValueError("パスワードが一致しません")
        return self


class UserUpdate(BaseModel):
    model_config = ConfigDict(from_attributes=True)

    full_name: str | None = None
    age: PositiveInt | None = None

    # 更新時はすべてのフィールドをオプションにする
    model_config = ConfigDict(
        from_attributes=True,
        extra="forbid",  # 未定義フィールドは拒否
    )


class UserResponse(UserBase):
    id: int
    is_active: bool = True
    created_at: datetime
    updated_at: datetime

    # 計算フィールド(DBに保存しない)
    @computed_field
    @property
    def display_name(self) -> str:
        return self.full_name or self.username


class UserListResponse(BaseModel):
    users: list[UserResponse]
    total: int
    page: int
    per_page: int
    has_next: bool

    @computed_field
    @property
    def total_pages(self) -> int:
        return -(-self.total // self.per_page)  # ceil division

複雑なネスト構造

from pydantic import BaseModel, Field
from enum import Enum
from decimal import Decimal


class Currency(str, Enum):
    JPY = "JPY"
    USD = "USD"
    EUR = "EUR"


class Address(BaseModel):
    postal_code: str = Field(..., pattern=r"^\d{3}-\d{4}$")
    prefecture: str
    city: str
    street: str
    building: str | None = None


class Price(BaseModel):
    amount: Decimal = Field(..., decimal_places=2, ge=0)
    currency: Currency = Currency.JPY

    def to_jpy(self) -> Decimal:
        rates = {Currency.JPY: 1, Currency.USD: 150, Currency.EUR: 162}
        return self.amount * rates[self.currency]


class OrderCreate(BaseModel):
    items: list[dict[str, int]] = Field(..., min_length=1)
    shipping_address: Address
    price: Price
    notes: str | None = Field(None, max_length=1000)

    @model_validator(mode="after")
    def validate_items_not_empty(self) -> "OrderCreate":
        if not any(qty > 0 for item in self.items for qty in item.values()):
            raise ValueError("注文数量は1以上である必要があります")
        return self

5. 依存性注入(Dependency Injection)

Dependsの基本

FastAPIの依存性注入システムは非常に強力だ。認証・DB接続・ページネーション等の共通処理を再利用可能な形でまとめられる。

# app/dependencies/common.py
from fastapi import Depends, Query
from dataclasses import dataclass
from typing import Annotated


@dataclass
class Pagination:
    page: int
    per_page: int
    offset: int

    @property
    def limit(self) -> int:
        return self.per_page


async def get_pagination(
    page: Annotated[int, Query(ge=1)] = 1,
    per_page: Annotated[int, Query(ge=1, le=100)] = 20,
) -> Pagination:
    offset = (page - 1) * per_page
    return Pagination(page=page, per_page=per_page, offset=offset)


# 使い方
@router.get("/items")
async def list_items(pagination: Annotated[Pagination, Depends(get_pagination)]):
    return {
        "page": pagination.page,
        "per_page": pagination.per_page,
        "offset": pagination.offset,
    }

依存性チェーン(依存の依存)

# app/dependencies/auth.py
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.ext.asyncio import AsyncSession
from typing import Annotated

from app.database import get_db
from app.services.auth_service import decode_access_token
from app.services.user_service import get_user_by_id
from app.models.user import User

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/token")


async def get_current_user(
    token: Annotated[str, Depends(oauth2_scheme)],
    db: Annotated[AsyncSession, Depends(get_db)],
) -> User:
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="認証情報が無効です",
        headers={"WWW-Authenticate": "Bearer"},
    )
    payload = decode_access_token(token)
    if not payload:
        raise credentials_exception

    user_id: int | None = payload.get("sub")
    if user_id is None:
        raise credentials_exception

    user = await get_user_by_id(db, user_id)
    if not user:
        raise credentials_exception
    return user


async def get_active_user(
    current_user: Annotated[User, Depends(get_current_user)],
) -> User:
    if not current_user.is_active:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="アカウントが無効です",
        )
    return current_user


async def get_admin_user(
    current_user: Annotated[User, Depends(get_active_user)],
) -> User:
    if not current_user.is_admin:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="管理者権限が必要です",
        )
    return current_user


# 型エイリアスで簡潔に
CurrentUser = Annotated[User, Depends(get_active_user)]
AdminUser = Annotated[User, Depends(get_admin_user)]
DB = Annotated[AsyncSession, Depends(get_db)]


# ルーターで使用
@router.get("/profile")
async def get_profile(current_user: CurrentUser, db: DB):
    return current_user


@router.delete("/users/{user_id}")
async def delete_user(user_id: int, admin: AdminUser, db: DB):
    # 管理者のみアクセス可能
    pass

クラスベースの依存性

from fastapi import Depends, Request
from typing import Annotated


class PermissionChecker:
    def __init__(self, required_permissions: list[str]):
        self.required_permissions = required_permissions

    async def __call__(
        self,
        current_user: Annotated[User, Depends(get_current_user)],
    ) -> User:
        user_permissions = set(current_user.permissions)
        required = set(self.required_permissions)
        if not required.issubset(user_permissions):
            missing = required - user_permissions
            raise HTTPException(
                status_code=403,
                detail=f"権限が不足しています: {', '.join(missing)}",
            )
        return current_user


# 使い方
@router.post("/articles")
async def create_article(
    data: ArticleCreate,
    user: Annotated[User, Depends(PermissionChecker(["articles:write"]))],
):
    pass

@router.delete("/articles/{id}")
async def delete_article(
    article_id: int,
    user: Annotated[User, Depends(PermissionChecker(["articles:write", "articles:delete"]))],
):
    pass

6. 認証 — JWT・OAuth2・パスワードハッシュ

パスワードハッシュ

# app/services/auth_service.py
from passlib.context import CryptContext
from jose import JWTError, jwt
from datetime import datetime, timedelta, timezone
from typing import Any

from app.config import get_settings

settings = get_settings()

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


def hash_password(plain_password: str) -> str:
    return pwd_context.hash(plain_password)


def verify_password(plain_password: str, hashed_password: str) -> bool:
    return pwd_context.verify(plain_password, hashed_password)


def create_access_token(data: dict[str, Any]) -> str:
    to_encode = data.copy()
    expire = datetime.now(timezone.utc) + timedelta(
        minutes=settings.access_token_expire_minutes
    )
    to_encode["exp"] = expire
    to_encode["type"] = "access"
    return jwt.encode(to_encode, settings.secret_key, algorithm=settings.algorithm)


def create_refresh_token(data: dict[str, Any]) -> str:
    to_encode = data.copy()
    expire = datetime.now(timezone.utc) + timedelta(
        days=settings.refresh_token_expire_days
    )
    to_encode["exp"] = expire
    to_encode["type"] = "refresh"
    return jwt.encode(to_encode, settings.secret_key, algorithm=settings.algorithm)


def decode_access_token(token: str) -> dict[str, Any] | None:
    try:
        payload = jwt.decode(
            token,
            settings.secret_key,
            algorithms=[settings.algorithm],
        )
        if payload.get("type") != "access":
            return None
        return payload
    except JWTError:
        return None

認証ルーター

# app/routers/auth.py
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.ext.asyncio import AsyncSession
from typing import Annotated

from app.database import get_db
from app.schemas.auth import Token, TokenRefresh, UserRegister
from app.services.auth_service import (
    verify_password,
    create_access_token,
    create_refresh_token,
    decode_access_token,
)
from app.services.user_service import get_user_by_email, create_user

router = APIRouter()


@router.post("/register", status_code=status.HTTP_201_CREATED)
async def register(
    user_data: UserRegister,
    db: Annotated[AsyncSession, Depends(get_db)],
):
    existing = await get_user_by_email(db, user_data.email)
    if existing:
        raise HTTPException(
            status_code=status.HTTP_409_CONFLICT,
            detail="このメールアドレスは既に登録されています",
        )
    user = await create_user(db, user_data)
    return {"message": "登録完了", "user_id": user.id}


@router.post("/token", response_model=Token)
async def login(
    form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
    db: Annotated[AsyncSession, Depends(get_db)],
):
    user = await get_user_by_email(db, form_data.username)
    if not user or not verify_password(form_data.password, user.hashed_password):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="メールアドレスまたはパスワードが正しくありません",
            headers={"WWW-Authenticate": "Bearer"},
        )

    access_token = create_access_token({"sub": str(user.id), "email": user.email})
    refresh_token = create_refresh_token({"sub": str(user.id)})

    return Token(
        access_token=access_token,
        refresh_token=refresh_token,
        token_type="bearer",
    )


@router.post("/refresh", response_model=Token)
async def refresh_token(
    token_data: TokenRefresh,
    db: Annotated[AsyncSession, Depends(get_db)],
):
    from app.services.auth_service import decode_access_token as decode_token
    from jose import jwt, JWTError
    from app.config import get_settings

    settings = get_settings()
    try:
        payload = jwt.decode(
            token_data.refresh_token,
            settings.secret_key,
            algorithms=[settings.algorithm],
        )
        if payload.get("type") != "refresh":
            raise HTTPException(status_code=401, detail="無効なリフレッシュトークン")
        user_id = int(payload["sub"])
    except (JWTError, KeyError, ValueError):
        raise HTTPException(status_code=401, detail="無効なリフレッシュトークン")

    from app.services.user_service import get_user_by_id
    user = await get_user_by_id(db, user_id)
    if not user or not user.is_active:
        raise HTTPException(status_code=401, detail="ユーザーが見つかりません")

    access_token = create_access_token({"sub": str(user.id), "email": user.email})
    new_refresh_token = create_refresh_token({"sub": str(user.id)})

    return Token(
        access_token=access_token,
        refresh_token=new_refresh_token,
        token_type="bearer",
    )

7. 非同期処理 — async/await・httpx

非同期の基本

FastAPIはASGI(Asynchronous Server Gateway Interface)ベースなので、async/awaitをネイティブにサポートしている。

import asyncio
import httpx
from fastapi import APIRouter

router = APIRouter()


# 非同期エンドポイント
@router.get("/async-demo")
async def async_demo():
    # 並列実行で速度向上
    result1, result2, result3 = await asyncio.gather(
        fetch_external_api("https://api.example.com/data1"),
        fetch_database_data(),
        fetch_cache_data(),
    )
    return {"data1": result1, "data2": result2, "cached": result3}


async def fetch_external_api(url: str) -> dict:
    async with httpx.AsyncClient(timeout=10.0) as client:
        response = await client.get(url)
        response.raise_for_status()
        return response.json()


# httpx クライアントの再利用(依存性注入パターン)
from functools import lru_cache

@lru_cache
def get_http_client() -> httpx.AsyncClient:
    return httpx.AsyncClient(
        timeout=httpx.Timeout(connect=5.0, read=10.0, write=5.0, pool=5.0),
        limits=httpx.Limits(max_connections=100, max_keepalive_connections=20),
        headers={"User-Agent": "MyAPI/1.0"},
    )


@router.get("/weather/{city}")
async def get_weather(
    city: str,
    client: httpx.AsyncClient = Depends(get_http_client),
):
    response = await client.get(
        f"https://api.openweathermap.org/data/2.5/weather",
        params={"q": city, "appid": "YOUR_API_KEY", "lang": "ja"},
    )
    return response.json()

非同期タスクのタイムアウト制御

import asyncio
from fastapi import HTTPException


async def with_timeout(coro, timeout_seconds: float = 5.0):
    try:
        return await asyncio.wait_for(coro, timeout=timeout_seconds)
    except asyncio.TimeoutError:
        raise HTTPException(
            status_code=504,
            detail=f"処理がタイムアウトしました({timeout_seconds}秒)",
        )


@router.get("/slow-endpoint")
async def slow_endpoint():
    result = await with_timeout(slow_operation(), timeout_seconds=3.0)
    return result

8. SQLAlchemy 2.0 — AsyncSession・ORM・マイグレーション

データベース設定

# app/database.py
from sqlalchemy.ext.asyncio import (
    create_async_engine,
    AsyncSession,
    async_sessionmaker,
)
from sqlalchemy.orm import DeclarativeBase
from typing import AsyncGenerator

from app.config import get_settings

settings = get_settings()

engine = create_async_engine(
    settings.database_url,
    pool_size=10,
    max_overflow=20,
    pool_pre_ping=True,  # 接続が生きているか確認
    pool_recycle=3600,   # 1時間でコネクションを再作成
    echo=settings.debug, # デバッグ時にSQLをログ出力
)

AsyncSessionLocal = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,
)


class Base(DeclarativeBase):
    pass


async def get_db() -> AsyncGenerator[AsyncSession, None]:
    async with AsyncSessionLocal() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise

ORMモデル定義

# app/models/user.py
from sqlalchemy import String, Boolean, Integer, DateTime, ForeignKey, Text, Index
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql import func
from datetime import datetime

from app.database import Base


class User(Base):
    __tablename__ = "users"

    # 複合インデックス
    __table_args__ = (
        Index("ix_users_email_active", "email", "is_active"),
    )

    id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True)
    username: Mapped[str] = mapped_column(String(50), unique=True, index=True)
    email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
    hashed_password: Mapped[str] = mapped_column(String(255))
    full_name: Mapped[str | None] = mapped_column(String(100))
    is_active: Mapped[bool] = mapped_column(Boolean, default=True)
    is_admin: Mapped[bool] = mapped_column(Boolean, default=False)
    created_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), server_default=func.now()
    )
    updated_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), server_default=func.now(), onupdate=func.now()
    )

    # リレーション
    items: Mapped[list["Item"]] = relationship(
        "Item", back_populates="owner", lazy="noload"
    )


class Item(Base):
    __tablename__ = "items"

    id: Mapped[int] = mapped_column(Integer, primary_key=True, index=True)
    title: Mapped[str] = mapped_column(String(200))
    description: Mapped[str | None] = mapped_column(Text)
    price: Mapped[int] = mapped_column(Integer, default=0)
    owner_id: Mapped[int] = mapped_column(ForeignKey("users.id", ondelete="CASCADE"))
    created_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), server_default=func.now()
    )

    owner: Mapped["User"] = relationship("User", back_populates="items")

CRUDサービス

# app/services/user_service.py
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, update, delete
from sqlalchemy.orm import selectinload

from app.models.user import User
from app.schemas.user import UserCreate, UserUpdate
from app.services.auth_service import hash_password


async def get_user_by_id(db: AsyncSession, user_id: int) -> User | None:
    result = await db.execute(select(User).where(User.id == user_id))
    return result.scalar_one_or_none()


async def get_user_by_email(db: AsyncSession, email: str) -> User | None:
    result = await db.execute(select(User).where(User.email == email))
    return result.scalar_one_or_none()


async def get_users(
    db: AsyncSession,
    offset: int = 0,
    limit: int = 20,
    search: str | None = None,
) -> tuple[list[User], int]:
    query = select(User)
    count_query = select(func.count()).select_from(User)

    if search:
        like_pattern = f"%{search}%"
        filter_condition = (
            User.username.ilike(like_pattern) |
            User.email.ilike(like_pattern) |
            User.full_name.ilike(like_pattern)
        )
        query = query.where(filter_condition)
        count_query = count_query.where(filter_condition)

    total = (await db.execute(count_query)).scalar_one()
    users = (
        await db.execute(
            query.offset(offset).limit(limit).order_by(User.created_at.desc())
        )
    ).scalars().all()

    return list(users), total


async def create_user(db: AsyncSession, user_data: UserCreate) -> User:
    user = User(
        username=user_data.username,
        email=user_data.email,
        hashed_password=hash_password(user_data.password),
        full_name=user_data.full_name,
    )
    db.add(user)
    await db.flush()  # commitせずにIDを取得
    await db.refresh(user)
    return user


async def update_user(
    db: AsyncSession,
    user_id: int,
    update_data: UserUpdate,
) -> User | None:
    update_dict = update_data.model_dump(exclude_unset=True)
    if not update_dict:
        return await get_user_by_id(db, user_id)

    await db.execute(
        update(User).where(User.id == user_id).values(**update_dict)
    )
    return await get_user_by_id(db, user_id)


# ページネーション付きリレーション取得
async def get_user_with_items(db: AsyncSession, user_id: int) -> User | None:
    result = await db.execute(
        select(User)
        .options(selectinload(User.items))
        .where(User.id == user_id)
    )
    return result.scalar_one_or_none()

Alembicマイグレーション

# Alembic初期化
alembic init alembic

# マイグレーションファイル生成(自動検出)
alembic revision --autogenerate -m "create_users_table"

# マイグレーション実行
alembic upgrade head

# 1つ前に戻す
alembic downgrade -1

# マイグレーション履歴確認
alembic history --verbose
# alembic/env.py の非同期対応
from logging.config import fileConfig
from sqlalchemy import pool
from sqlalchemy.engine import Connection
from sqlalchemy.ext.asyncio import async_engine_from_config
from alembic import context
from app.database import Base
from app.models import user, item  # モデルを全てインポート

config = context.config
fileConfig(config.config_file_name)
target_metadata = Base.metadata


def do_run_migrations(connection: Connection) -> None:
    context.configure(connection=connection, target_metadata=target_metadata)
    with context.begin_transaction():
        context.run_migrations()


async def run_async_migrations() -> None:
    connectable = async_engine_from_config(
        config.get_section(config.config_ini_section, {}),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )
    async with connectable.connect() as connection:
        await connection.run_sync(do_run_migrations)
    await connectable.dispose()

9. ミドルウェア — CORS・レート制限・ロギング

カスタムミドルウェア

# app/middleware.py
import time
import uuid
import logging
from collections import defaultdict
from fastapi import Request, Response
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.types import ASGIApp

logger = logging.getLogger(__name__)


class LoggingMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        request_id = str(uuid.uuid4())[:8]
        start_time = time.perf_counter()

        # リクエストID をヘッダーに付与
        request.state.request_id = request_id

        logger.info(
            "Request started",
            extra={
                "request_id": request_id,
                "method": request.method,
                "path": request.url.path,
                "client": request.client.host if request.client else "unknown",
            },
        )

        try:
            response = await call_next(request)
        except Exception as exc:
            logger.error(
                "Request failed",
                extra={"request_id": request_id, "error": str(exc)},
                exc_info=True,
            )
            raise

        duration_ms = (time.perf_counter() - start_time) * 1000
        response.headers["X-Request-ID"] = request_id
        response.headers["X-Process-Time"] = f"{duration_ms:.2f}ms"

        logger.info(
            "Request completed",
            extra={
                "request_id": request_id,
                "status_code": response.status_code,
                "duration_ms": round(duration_ms, 2),
            },
        )
        return response


class RateLimitMiddleware(BaseHTTPMiddleware):
    def __init__(
        self,
        app: ASGIApp,
        max_requests: int = 100,
        window_seconds: int = 60,
    ):
        super().__init__(app)
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self._requests: dict[str, list[float]] = defaultdict(list)

    async def dispatch(self, request: Request, call_next):
        client_ip = request.client.host if request.client else "unknown"
        now = time.time()
        window_start = now - self.window_seconds

        # ウィンドウ外のリクエストを削除
        self._requests[client_ip] = [
            t for t in self._requests[client_ip] if t > window_start
        ]

        if len(self._requests[client_ip]) >= self.max_requests:
            retry_after = int(
                self.window_seconds - (now - self._requests[client_ip][0])
            )
            return Response(
                content='{"detail": "レート制限に達しました"}',
                status_code=429,
                headers={
                    "Content-Type": "application/json",
                    "Retry-After": str(retry_after),
                    "X-RateLimit-Limit": str(self.max_requests),
                    "X-RateLimit-Remaining": "0",
                },
            )

        self._requests[client_ip].append(now)
        remaining = self.max_requests - len(self._requests[client_ip])

        response = await call_next(request)
        response.headers["X-RateLimit-Limit"] = str(self.max_requests)
        response.headers["X-RateLimit-Remaining"] = str(remaining)
        return response

10. バックグラウンドタスク

FastAPI BackgroundTasks

# app/routers/notifications.py
from fastapi import APIRouter, BackgroundTasks, Depends
from app.services.email_service import send_email, send_welcome_email

router = APIRouter()


async def send_notification_email(email: str, subject: str, body: str):
    """バックグラウンドで実行されるメール送信タスク"""
    try:
        await send_email(email, subject, body)
        print(f"Email sent to {email}")
    except Exception as e:
        print(f"Failed to send email to {email}: {e}")
        # エラーをログに記録(例外は再スローしない)


@router.post("/users/register")
async def register_user(
    user_data: UserCreate,
    background_tasks: BackgroundTasks,
    db: DB,
):
    user = await create_user(db, user_data)

    # レスポンスを返した後にバックグラウンドで実行
    background_tasks.add_task(
        send_notification_email,
        email=user.email,
        subject="ご登録ありがとうございます",
        body=f"{user.username}様、登録が完了しました。",
    )
    background_tasks.add_task(
        send_welcome_email,
        user_id=user.id,
    )

    return {"message": "登録完了。確認メールを送信しました。", "user_id": user.id}

Celery統合(本番向け非同期タスク)

# app/celery_app.py
from celery import Celery
from app.config import get_settings

settings = get_settings()

celery_app = Celery(
    "myapi",
    broker=settings.redis_url,
    backend=settings.redis_url,
    include=["app.tasks"],
)

celery_app.conf.update(
    task_serializer="json",
    accept_content=["json"],
    result_serializer="json",
    timezone="Asia/Tokyo",
    enable_utc=True,
    task_track_started=True,
    task_time_limit=300,  # 5分でタイムアウト
    task_soft_time_limit=240,
    worker_prefetch_multiplier=1,
    task_acks_late=True,
)


# app/tasks.py
from app.celery_app import celery_app
import asyncio


@celery_app.task(
    bind=True,
    max_retries=3,
    default_retry_delay=60,
    name="send_bulk_email",
)
def send_bulk_email(self, email_list: list[str], subject: str, body: str):
    try:
        # 同期コンテキストから非同期関数を呼び出す
        asyncio.run(_send_bulk_email_async(email_list, subject, body))
    except Exception as exc:
        raise self.retry(exc=exc)


async def _send_bulk_email_async(emails: list[str], subject: str, body: str):
    import httpx
    async with httpx.AsyncClient() as client:
        tasks = [
            client.post(
                "https://api.sendgrid.com/v3/mail/send",
                json={"to": email, "subject": subject, "body": body},
            )
            for email in emails
        ]
        await asyncio.gather(*tasks)


# FastAPIエンドポイントからCeleryタスクを起動
@router.post("/campaigns/send")
async def send_campaign(campaign_data: CampaignCreate, admin: AdminUser):
    task = send_bulk_email.delay(
        email_list=campaign_data.recipient_emails,
        subject=campaign_data.subject,
        body=campaign_data.body,
    )
    return {"task_id": task.id, "status": "queued"}


@router.get("/campaigns/tasks/{task_id}")
async def get_task_status(task_id: str):
    from app.celery_app import celery_app
    task = celery_app.AsyncResult(task_id)
    return {
        "task_id": task_id,
        "status": task.status,
        "result": task.result if task.ready() else None,
    }

11. WebSocket

リアルタイム通信の実装

# app/routers/websocket.py
from fastapi import APIRouter, WebSocket, WebSocketDisconnect, Depends
from typing import Any
import json
import asyncio

router = APIRouter()


class ConnectionManager:
    def __init__(self):
        # room_id -> {user_id -> WebSocket}
        self.active_connections: dict[str, dict[str, WebSocket]] = {}

    async def connect(self, websocket: WebSocket, room_id: str, user_id: str):
        await websocket.accept()
        if room_id not in self.active_connections:
            self.active_connections[room_id] = {}
        self.active_connections[room_id][user_id] = websocket

    def disconnect(self, room_id: str, user_id: str):
        if room_id in self.active_connections:
            self.active_connections[room_id].pop(user_id, None)
            if not self.active_connections[room_id]:
                del self.active_connections[room_id]

    async def send_to_user(self, user_id: str, room_id: str, message: dict):
        ws = self.active_connections.get(room_id, {}).get(user_id)
        if ws:
            await ws.send_json(message)

    async def broadcast_to_room(
        self,
        room_id: str,
        message: dict,
        exclude_user_id: str | None = None,
    ):
        connections = self.active_connections.get(room_id, {})
        disconnected = []
        for user_id, ws in connections.items():
            if user_id == exclude_user_id:
                continue
            try:
                await ws.send_json(message)
            except Exception:
                disconnected.append(user_id)
        for user_id in disconnected:
            self.disconnect(room_id, user_id)


manager = ConnectionManager()


@router.websocket("/ws/chat/{room_id}")
async def websocket_chat(
    websocket: WebSocket,
    room_id: str,
    token: str,  # クエリパラメータで認証トークン受け取り
):
    # トークン検証
    payload = decode_access_token(token)
    if not payload:
        await websocket.close(code=4001, reason="認証失敗")
        return

    user_id = str(payload["sub"])
    await manager.connect(websocket, room_id, user_id)

    # 入室通知
    await manager.broadcast_to_room(
        room_id,
        {"type": "join", "user_id": user_id, "message": f"{user_id}が入室しました"},
        exclude_user_id=user_id,
    )

    try:
        while True:
            data = await websocket.receive_json()
            message_type = data.get("type", "message")

            if message_type == "message":
                await manager.broadcast_to_room(
                    room_id,
                    {
                        "type": "message",
                        "user_id": user_id,
                        "content": data.get("content", ""),
                        "timestamp": asyncio.get_event_loop().time(),
                    },
                )
            elif message_type == "ping":
                await websocket.send_json({"type": "pong"})

    except WebSocketDisconnect:
        manager.disconnect(room_id, user_id)
        await manager.broadcast_to_room(
            room_id,
            {"type": "leave", "user_id": user_id, "message": f"{user_id}が退室しました"},
        )

12. テスト — pytest + httpx AsyncClient

テスト設定

# tests/conftest.py
import pytest
import asyncio
from typing import AsyncGenerator
from httpx import AsyncClient, ASGITransport
from sqlalchemy.ext.asyncio import (
    create_async_engine,
    AsyncSession,
    async_sessionmaker,
)

from app.main import app
from app.database import Base, get_db
from app.config import get_settings

settings = get_settings()

# テスト用インメモリDB
TEST_DATABASE_URL = "sqlite+aiosqlite:///:memory:"

test_engine = create_async_engine(TEST_DATABASE_URL, echo=False)
TestSessionLocal = async_sessionmaker(test_engine, class_=AsyncSession)


@pytest.fixture(scope="session")
def event_loop():
    loop = asyncio.new_event_loop()
    yield loop
    loop.close()


@pytest.fixture(autouse=True)
async def setup_db():
    async with test_engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    yield
    async with test_engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)


@pytest.fixture
async def db_session() -> AsyncGenerator[AsyncSession, None]:
    async with TestSessionLocal() as session:
        yield session


@pytest.fixture
async def client(db_session: AsyncSession) -> AsyncGenerator[AsyncClient, None]:
    async def override_get_db():
        yield db_session

    app.dependency_overrides[get_db] = override_get_db

    async with AsyncClient(
        transport=ASGITransport(app=app),
        base_url="http://test",
    ) as ac:
        yield ac

    app.dependency_overrides.clear()


@pytest.fixture
async def auth_headers(client: AsyncClient) -> dict[str, str]:
    """認証済みヘッダーを返すフィクスチャ"""
    # テストユーザー作成
    await client.post("/api/v1/auth/register", json={
        "username": "testuser",
        "email": "test@example.com",
        "password": "TestPass123!",
        "password_confirm": "TestPass123!",
    })
    # ログイン
    response = await client.post("/api/v1/auth/token", data={
        "username": "test@example.com",
        "password": "TestPass123!",
    })
    token = response.json()["access_token"]
    return {"Authorization": f"Bearer {token}"}

テストケース

# tests/test_users.py
import pytest
from httpx import AsyncClient


@pytest.mark.asyncio
class TestUserEndpoints:
    async def test_register_success(self, client: AsyncClient):
        response = await client.post("/api/v1/auth/register", json={
            "username": "newuser",
            "email": "new@example.com",
            "password": "SecurePass123!",
            "password_confirm": "SecurePass123!",
        })
        assert response.status_code == 201
        data = response.json()
        assert "user_id" in data
        assert data["message"] == "登録完了"

    async def test_register_duplicate_email(self, client: AsyncClient):
        payload = {
            "username": "user1",
            "email": "dup@example.com",
            "password": "Pass123!",
            "password_confirm": "Pass123!",
        }
        await client.post("/api/v1/auth/register", json=payload)
        payload["username"] = "user2"
        response = await client.post("/api/v1/auth/register", json=payload)
        assert response.status_code == 409

    async def test_login_success(self, client: AsyncClient):
        # ユーザー作成
        await client.post("/api/v1/auth/register", json={
            "username": "logintest",
            "email": "login@example.com",
            "password": "TestPass123!",
            "password_confirm": "TestPass123!",
        })
        # ログイン
        response = await client.post("/api/v1/auth/token", data={
            "username": "login@example.com",
            "password": "TestPass123!",
        })
        assert response.status_code == 200
        data = response.json()
        assert "access_token" in data
        assert data["token_type"] == "bearer"

    async def test_get_profile_unauthorized(self, client: AsyncClient):
        response = await client.get("/api/v1/users/profile")
        assert response.status_code == 401

    async def test_get_profile_authorized(
        self, client: AsyncClient, auth_headers: dict
    ):
        response = await client.get(
            "/api/v1/users/profile",
            headers=auth_headers,
        )
        assert response.status_code == 200
        data = response.json()
        assert "username" in data
        assert "email" in data

    async def test_update_profile(
        self, client: AsyncClient, auth_headers: dict
    ):
        response = await client.patch(
            "/api/v1/users/me",
            json={"full_name": "Test User"},
            headers=auth_headers,
        )
        assert response.status_code == 200
        assert response.json()["full_name"] == "Test User"

    async def test_pagination(self, client: AsyncClient, auth_headers: dict):
        response = await client.get(
            "/api/v1/users/?page=1&per_page=10",
            headers=auth_headers,
        )
        assert response.status_code == 200
        data = response.json()
        assert "users" in data
        assert "total" in data
        assert data["per_page"] == 10


# tests/test_items.py
@pytest.mark.asyncio
class TestItemCRUD:
    async def test_create_item(
        self, client: AsyncClient, auth_headers: dict
    ):
        response = await client.post(
            "/api/v1/items/",
            json={"title": "テストアイテム", "price": 1000},
            headers=auth_headers,
        )
        assert response.status_code == 201
        data = response.json()
        assert data["title"] == "テストアイテム"
        assert data["price"] == 1000

    async def test_get_item_not_found(self, client: AsyncClient):
        response = await client.get("/api/v1/items/99999")
        assert response.status_code == 404
        assert "not found" in response.json()["detail"].lower()

13. Dockerデプロイ — マルチステージビルド

Dockerfile(マルチステージ)

# Dockerfile

# ---- ビルドステージ ----
FROM python:3.12-slim AS builder

WORKDIR /build

# システム依存パッケージ
RUN apt-get update && apt-get install -y --no-install-recommends \
    build-essential \
    libpq-dev \
    && rm -rf /var/lib/apt/lists/*

# uv(高速パッケージマネージャ)を使用
COPY --from=ghcr.io/astral-sh/uv:latest /uv /usr/local/bin/uv

COPY pyproject.toml .
RUN uv sync --no-dev --frozen

# ---- 本番ステージ ----
FROM python:3.12-slim AS production

WORKDIR /app

# セキュリティ: 非rootユーザーで実行
RUN addgroup --system --gid 1001 appgroup && \
    adduser --system --uid 1001 --gid 1001 --no-create-home appuser

# ランタイム依存のみインストール
RUN apt-get update && apt-get install -y --no-install-recommends \
    libpq5 \
    curl \
    && rm -rf /var/lib/apt/lists/*

# ビルドステージから仮想環境をコピー
COPY --from=builder /build/.venv /app/.venv

# アプリコードをコピー
COPY --chown=appuser:appgroup app/ /app/app/
COPY --chown=appuser:appgroup alembic/ /app/alembic/
COPY --chown=appuser:appgroup alembic.ini /app/

ENV PATH="/app/.venv/bin:$PATH"
ENV PYTHONPATH="/app"
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1

USER appuser

# ヘルスチェック
HEALTHCHECK --interval=30s --timeout=10s --start-period=30s --retries=3 \
    CMD curl -f http://localhost:8000/health || exit 1

EXPOSE 8000

# Gunicorn + uvicorn workers
CMD ["gunicorn", "app.main:app", \
     "--worker-class", "uvicorn.workers.UvicornWorker", \
     "--workers", "4", \
     "--bind", "0.0.0.0:8000", \
     "--timeout", "60", \
     "--keepalive", "5", \
     "--access-logfile", "-", \
     "--error-logfile", "-", \
     "--log-level", "info"]

docker-compose.yml(本番向け)

version: "3.9"

services:
  api:
    build:
      context: .
      target: production
    ports:
      - "8000:8000"
    environment:
      - DATABASE_URL=postgresql+asyncpg://postgres:${POSTGRES_PASSWORD}@db:5432/${POSTGRES_DB}
      - REDIS_URL=redis://redis:6379/0
      - SECRET_KEY=${SECRET_KEY}
      - DEBUG=false
    depends_on:
      db:
        condition: service_healthy
      redis:
        condition: service_healthy
    restart: unless-stopped
    networks:
      - app-network

  db:
    image: postgres:16-alpine
    environment:
      POSTGRES_USER: postgres
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
      POSTGRES_DB: ${POSTGRES_DB}
    volumes:
      - postgres_data:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U postgres"]
      interval: 10s
      timeout: 5s
      retries: 5
    networks:
      - app-network

  redis:
    image: redis:7-alpine
    command: redis-server --maxmemory 256mb --maxmemory-policy allkeys-lru
    volumes:
      - redis_data:/data
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 10s
      timeout: 5s
      retries: 5
    networks:
      - app-network

  celery-worker:
    build:
      context: .
      target: production
    command: celery -A app.celery_app worker --loglevel=info --concurrency=4
    environment:
      - DATABASE_URL=postgresql+asyncpg://postgres:${POSTGRES_PASSWORD}@db:5432/${POSTGRES_DB}
      - REDIS_URL=redis://redis:6379/0
    depends_on:
      - db
      - redis
    restart: unless-stopped
    networks:
      - app-network

  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ./nginx.conf:/etc/nginx/conf.d/default.conf:ro
      - ./certs:/etc/ssl/certs:ro
    depends_on:
      - api
    networks:
      - app-network

volumes:
  postgres_data:
  redis_data:

networks:
  app-network:
    driver: bridge

Nginx設定

# nginx.conf
upstream api_backend {
    server api:8000;
    keepalive 32;
}

server {
    listen 80;
    server_name api.example.com;
    return 301 https://$server_name$request_uri;
}

server {
    listen 443 ssl http2;
    server_name api.example.com;

    ssl_certificate /etc/ssl/certs/fullchain.pem;
    ssl_certificate_key /etc/ssl/certs/privkey.pem;

    # セキュリティヘッダー
    add_header X-Frame-Options DENY;
    add_header X-Content-Type-Options nosniff;
    add_header X-XSS-Protection "1; mode=block";
    add_header Strict-Transport-Security "max-age=31536000; includeSubDomains" always;

    location / {
        proxy_pass http://api_backend;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
        proxy_read_timeout 60s;
        proxy_buffering off;
    }

    location /ws/ {
        proxy_pass http://api_backend;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "Upgrade";
        proxy_read_timeout 3600s;
    }
}

本番運用のベストプラクティス

エラーハンドリング(グローバル)

# app/exception_handlers.py
from fastapi import Request, status
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
from pydantic import ValidationError
import logging

logger = logging.getLogger(__name__)


async def validation_exception_handler(
    request: Request, exc: RequestValidationError
) -> JSONResponse:
    errors = []
    for error in exc.errors():
        errors.append({
            "field": " -> ".join(str(loc) for loc in error["loc"]),
            "message": error["msg"],
            "type": error["type"],
        })
    return JSONResponse(
        status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
        content={"detail": "入力値が不正です", "errors": errors},
    )


async def unhandled_exception_handler(
    request: Request, exc: Exception
) -> JSONResponse:
    logger.error(
        "Unhandled exception",
        exc_info=True,
        extra={"path": str(request.url), "method": request.method},
    )
    return JSONResponse(
        status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
        content={"detail": "サーバー内部エラーが発生しました"},
    )


# main.pyに追加
from fastapi.exceptions import RequestValidationError
app.add_exception_handler(RequestValidationError, validation_exception_handler)
app.add_exception_handler(Exception, unhandled_exception_handler)

パフォーマンスチューニング

# キャッシュ(Redis)
from redis.asyncio import Redis
import json
import hashlib


class CacheService:
    def __init__(self, redis: Redis, default_ttl: int = 300):
        self.redis = redis
        self.default_ttl = default_ttl

    def _make_key(self, prefix: str, **kwargs) -> str:
        params = json.dumps(kwargs, sort_keys=True)
        hash_val = hashlib.md5(params.encode()).hexdigest()[:8]
        return f"{prefix}:{hash_val}"

    async def get(self, key: str):
        value = await self.redis.get(key)
        if value:
            return json.loads(value)
        return None

    async def set(self, key: str, value, ttl: int | None = None):
        await self.redis.setex(
            key,
            ttl or self.default_ttl,
            json.dumps(value, default=str),
        )

    async def delete(self, key: str):
        await self.redis.delete(key)

    async def invalidate_pattern(self, pattern: str):
        keys = await self.redis.keys(pattern)
        if keys:
            await self.redis.delete(*keys)


# デコレータパターン
def cached(prefix: str, ttl: int = 300):
    def decorator(func):
        async def wrapper(*args, **kwargs):
            cache_key = f"{prefix}:{hash(str(args) + str(kwargs))}"
            # キャッシュ確認・設定の処理
            return await func(*args, **kwargs)
        return wrapper
    return decorator

DevToolBoxでAPIレスポンスを検証する

FastAPIの開発中、エンドポイントが返すJSONレスポンスの構造を確認したい場面は多い。Swagger UI(/docs)でテストできるが、レスポンスJSONをさらに整形・検証・スキーマ確認したい時には外部ツールが便利だ。

DevToolBox はブラウザ上で動作する開発者向けツールセットだ。JSONフォーマッター・バリデーター・スキーマジェネレーターをはじめ、Base64エンコード/デコード・JWT解析・正規表現テスターなど、APIデバッグに必要なツールが一か所にまとまっている。

特にFastAPIの開発で役立つ用途:

  • JSONフォーマッター: curlhttpxで取得したAPIレスポンスを整形して可読性を上げる
  • JSON Schema Validator: Pydanticモデルが期待するスキーマとレスポンスの構造が一致しているか検証
  • JWT Decoder: create_access_tokenで生成したトークンのペイロードを即座にデコードして確認
  • Base64ツール: バイナリデータのエンコードを検証する

インストール不要でブラウザからすぐ使えるため、ローカル開発環境の構築と並行して活用できる。


まとめ

FastAPIは「速度・型安全・開発体験」を高い次元で両立させた現代的なPython APIフレームワークだ。この記事で取り上げたポイントを振り返る。

機能要点
基礎設定pyproject.toml + pydantic-settings で型安全な環境変数管理
ルーティングAnnotated + Path/Query/Body で宣言的な型バリデーション
Pydantic v2model_config・field_validator・computed_field で強力なスキーマ定義
依存性注入Depends チェーンで認証・DB・ページネーションを再利用
認証JWT + OAuth2 + passlib で本番品質の認証基盤
非同期asyncio.gather で並列処理・httpx で非同期HTTPクライアント
SQLAlchemyAsyncSession + mapped_column で型安全なORM
テストAsyncClient + fixture でE2Eテストを自動化
デプロイマルチステージDockerビルド + Gunicorn + Nginx

FastAPIのドキュメントは非常に充実しており(fastapi.tiangolo.com)、公式チュートリアルと組み合わせてこの記事を活用することで、本番グレードのAPIを短期間で構築できるはずだ。

型ヒントを徹底することで、IDEのサポートが最大限に活かせ、実行時エラーを設計段階で潰せる。それがFastAPIの最大の価値だ。


関連記事: SQLAlchemy 2.0完全ガイド / Dockerコンテナ本番運用 / JWT認証完全解説


スキルアップ・キャリアアップのおすすめリソース

FastAPIのスキルはバックエンド・AI API開発の両面で高く評価される。次のキャリアステップに役立ててほしい。

転職・キャリアアップ

  • レバテックキャリア — ITエンジニア専門の転職エージェント。PythonバックエンドエンジニアやAI APIの開発案件が豊富。年収600万円以上の求人多数。無料相談可能。
  • Findy — GitHubのPython・FastAPIプロジェクトが評価対象。スカウト型でリモート求人が充実。バックエンド・MLエンジニアへの転職に人気。

オンライン学習

  • Udemy — FastAPI・Pydantic v2・SQLAlchemyを組み合わせた実践的なAPIサーバー構築コースが充実。Docker・AWS連携まで含めた本番対応のコースも多い。セール時は大幅割引。

関連記事