FastAPIの認証・セキュリティ実装 – JWT認証とOAuth2の完全ガイド

FastAPIの認証・セキュリティ実装 – JWT認証とOAuth2の完全ガイド

Webアプリケーションにおいて認証・セキュリティは最も重要な要素の一つです。この記事では、FastAPIでJWT認証、OAuth2、セキュリティヘッダーの実装方法を詳しく解説します。

認証の基礎知識

認証方式の種類

  1. セッションベース認証: サーバー側でセッション情報を保持
  2. トークンベース認証: JWT(JSON Web Token)を使用
  3. OAuth2: 外部サービス(Google、GitHub等)による認証
  4. API Key認証: 固定のAPIキーによる認証

JWT(JSON Web Token)とは?

JWTは以下の3つの部分から構成されます:

  • Header: 暗号化アルゴリズムとトークンタイプ
  • Payload: ユーザー情報やクレーム
  • Signature: トークンの署名
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c

環境構築

必要なパッケージのインストール

# JWT認証関連パッケージ
pip install fastapi uvicorn[standard] python-jose[cryptography] passlib[bcrypt] python-multipart

# OAuth2サポート用
pip install authlib httpx

# セキュリティ強化用
pip install python-multipart email-validator

パスワード認証の実装

セキュリティユーティリティ

# app/security.py
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
import os

# パスワードハッシュ化設定
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

# OAuth2設定
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

# JWT設定
SECRET_KEY = os.getenv("SECRET_KEY", "your-secret-key-here")  # 本番環境では必ず環境変数から読み込み
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

def verify_password(plain_password: str, hashed_password: str) -> bool:
    """パスワードの検証"""
    return pwd_context.verify(plain_password, hashed_password)

def get_password_hash(password: str) -> str:
    """パスワードのハッシュ化"""
    return pwd_context.hash(password)

def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    """アクセストークンの作成"""
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)

    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

def verify_token(token: str, credentials_exception):
    """トークンの検証"""
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        return username
    except JWTError:
        raise credentials_exception

認証スキーマ

# app/auth_schemas.py
from pydantic import BaseModel
from typing import Optional

class Token(BaseModel):
    access_token: str
    token_type: str

class TokenData(BaseModel):
    username: Optional[str] = None

class UserLogin(BaseModel):
    username: str
    password: str

class UserRegister(BaseModel):
    username: str
    email: str
    password: str
    full_name: Optional[str] = None

class UserInDB(BaseModel):
    id: int
    username: str
    email: str
    full_name: Optional[str] = None
    hashed_password: str
    is_active: bool = True
    is_admin: bool = False
    created_at: datetime

class CurrentUser(BaseModel):
    id: int
    username: str
    email: str
    full_name: Optional[str] = None
    is_active: bool
    is_admin: bool

    class Config:
        orm_mode = True

認証関連のCRUD操作

# app/auth_crud.py
from sqlalchemy.orm import Session
from typing import Optional
from . import models
from .security import get_password_hash, verify_password

class AuthCRUD:
    @staticmethod
    def get_user_by_username(db: Session, username: str) -> Optional[models.User]:
        return db.query(models.User).filter(models.User.username == username).first()

    @staticmethod
    def get_user_by_email(db: Session, email: str) -> Optional[models.User]:
        return db.query(models.User).filter(models.User.email == email).first()

    @staticmethod
    def authenticate_user(db: Session, username: str, password: str) -> Optional[models.User]:
        user = AuthCRUD.get_user_by_username(db, username)
        if not user:
            return None
        if not verify_password(password, user.hashed_password):
            return None
        return user

    @staticmethod
    def create_user(db: Session, username: str, email: str, password: str, full_name: str = None) -> models.User:
        hashed_password = get_password_hash(password)
        db_user = models.User(
            username=username,
            email=email,
            hashed_password=hashed_password,
            full_name=full_name
        )
        db.add(db_user)
        db.commit()
        db.refresh(db_user)
        return db_user

    @staticmethod
    def update_user_last_login(db: Session, user_id: int):
        user = db.query(models.User).filter(models.User.id == user_id).first()
        if user:
            user.last_login = datetime.utcnow()
            db.commit()

依存性注入による認証

# app/dependencies.py
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
from . import auth_crud, models
from .database import get_db
from .security import verify_token

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="認証情報を確認できませんでした",
        headers={"WWW-Authenticate": "Bearer"},
    )

    username = verify_token(token, credentials_exception)
    user = auth_crud.AuthCRUD.get_user_by_username(db, username=username)
    if user is None:
        raise credentials_exception
    return user

async def get_current_active_user(current_user: models.User = Depends(get_current_user)):
    if not current_user.is_active:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="非アクティブなユーザーです"
        )
    return current_user

async def get_current_admin_user(current_user: models.User = Depends(get_current_active_user)):
    if not current_user.is_admin:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="管理者権限が必要です"
        )
    return current_user

認証エンドポイントの実装

認証API

# app/auth_routes.py
from datetime import timedelta
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from . import auth_crud, auth_schemas, models
from .database import get_db
from .security import ACCESS_TOKEN_EXPIRE_MINUTES, create_access_token
from .dependencies import get_current_active_user

router = APIRouter(prefix="/auth", tags=["認証"])

@router.post("/register", response_model=auth_schemas.CurrentUser, status_code=status.HTTP_201_CREATED)
async def register_user(user_data: auth_schemas.UserRegister, db: Session = Depends(get_db)):
    # 既存ユーザーチェック
    existing_user = auth_crud.AuthCRUD.get_user_by_username(db, user_data.username)
    if existing_user:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="このユーザー名は既に使用されています"
        )

    existing_email = auth_crud.AuthCRUD.get_user_by_email(db, user_data.email)
    if existing_email:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="このメールアドレスは既に登録されています"
        )

    # ユーザー作成
    user = auth_crud.AuthCRUD.create_user(
        db=db,
        username=user_data.username,
        email=user_data.email,
        password=user_data.password,
        full_name=user_data.full_name
    )

    return user

@router.post("/token", response_model=auth_schemas.Token)
async def login_for_access_token(
    form_data: OAuth2PasswordRequestForm = Depends(),
    db: Session = Depends(get_db)
):
    user = auth_crud.AuthCRUD.authenticate_user(db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="ユーザー名またはパスワードが正しくありません",
            headers={"WWW-Authenticate": "Bearer"},
        )

    if not user.is_active:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="非アクティブなユーザーです"
        )

    # トークン作成
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )

    # ログイン時刻更新
    auth_crud.AuthCRUD.update_user_last_login(db, user.id)

    return {"access_token": access_token, "token_type": "bearer"}

@router.get("/me", response_model=auth_schemas.CurrentUser)
async def read_users_me(current_user: models.User = Depends(get_current_active_user)):
    return current_user

@router.post("/refresh-token", response_model=auth_schemas.Token)
async def refresh_access_token(current_user: models.User = Depends(get_current_active_user)):
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": current_user.username}, expires_delta=access_token_expires
    )
    return {"access_token": access_token, "token_type": "bearer"}

OAuth2実装(Google認証)

OAuth2設定

# app/oauth2.py
import os
from authlib.integrations.starlette_client import OAuth
from fastapi import Request
import httpx

# OAuth2設定
oauth = OAuth()

oauth.register(
    name='google',
    client_id=os.getenv('GOOGLE_CLIENT_ID'),
    client_secret=os.getenv('GOOGLE_CLIENT_SECRET'),
    server_metadata_url='https://accounts.google.com/.well-known/openid_configuration',
    client_kwargs={
        'scope': 'openid email profile'
    }
)

class GoogleOAuth:
    @staticmethod
    async def get_authorization_url(request: Request):
        redirect_uri = request.url_for('auth_google_callback')
        return await oauth.google.authorize_redirect(request, redirect_uri)

    @staticmethod
    async def get_user_info(request: Request):
        token = await oauth.google.authorize_access_token(request)
        user_info = token.get('userinfo')
        return user_info

# GitHub OAuth設定例
oauth.register(
    name='github',
    client_id=os.getenv('GITHUB_CLIENT_ID'),
    client_secret=os.getenv('GITHUB_CLIENT_SECRET'),
    access_token_url='https://github.com/login/oauth/access_token',
    access_token_params=None,
    authorize_url='https://github.com/login/oauth/authorize',
    authorize_params=None,
    api_base_url='https://api.github.com/',
    client_kwargs={'scope': 'user:email'},
)

OAuth2エンドポイント

# app/oauth2_routes.py
from fastapi import APIRouter, Request, HTTPException, Depends
from fastapi.responses import RedirectResponse
from sqlalchemy.orm import Session
from . import auth_crud, models
from .database import get_db
from .oauth2 import GoogleOAuth
from .security import create_access_token

router = APIRouter(prefix="/oauth2", tags=["OAuth2認証"])

@router.get("/google")
async def google_login(request: Request):
    return await GoogleOAuth.get_authorization_url(request)

@router.get("/google/callback")
async def auth_google_callback(request: Request, db: Session = Depends(get_db)):
    try:
        user_info = await GoogleOAuth.get_user_info(request)

        if not user_info:
            raise HTTPException(status_code=400, detail="Google認証に失敗しました")

        # ユーザー情報の取得
        email = user_info.get('email')
        name = user_info.get('name')
        google_id = user_info.get('sub')

        # 既存ユーザーのチェック
        user = auth_crud.AuthCRUD.get_user_by_email(db, email)

        if not user:
            # 新しいユーザーの作成
            username = email.split('@')[0]  # メールアドレスからユーザー名を生成

            # ユーザー名の重複チェック
            existing_username = auth_crud.AuthCRUD.get_user_by_username(db, username)
            if existing_username:
                username = f"{username}_{google_id[:8]}"  # ユニークなユーザー名を生成

            user = models.User(
                username=username,
                email=email,
                full_name=name,
                hashed_password="",  # OAuth2ユーザーはパスワード不要
                is_active=True,
                oauth_provider="google",
                oauth_id=google_id
            )
            db.add(user)
            db.commit()
            db.refresh(user)

        # JWTトークンの作成
        access_token = create_access_token(data={"sub": user.username})

        # フロントエンドにリダイレクト(トークンをクエリパラメータに含める)
        return RedirectResponse(
            url=f"/dashboard?token={access_token}",
            status_code=302
        )

    except Exception as e:
        raise HTTPException(status_code=400, detail=f"認証エラー: {str(e)}")

セキュリティヘッダーとCORS設定

セキュリティミドルウェア

# app/security_middleware.py
from fastapi import FastAPI, Request, Response
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from starlette.middleware.sessions import SessionMiddleware
import os

def configure_security(app: FastAPI):
    # CORS設定
    app.add_middleware(
        CORSMiddleware,
        allow_origins=os.getenv("ALLOWED_ORIGINS", "http://localhost:3000").split(","),
        allow_credentials=True,
        allow_methods=["GET", "POST", "PUT", "DELETE"],
        allow_headers=["*"],
    )

    # 信頼できるホストの設定
    app.add_middleware(
        TrustedHostMiddleware,
        allowed_hosts=os.getenv("ALLOWED_HOSTS", "localhost,127.0.0.1").split(",")
    )

    # セッションミドルウェア(OAuth2用)
    app.add_middleware(
        SessionMiddleware,
        secret_key=os.getenv("SESSION_SECRET_KEY", "your-session-secret-key")
    )

    # セキュリティヘッダーミドルウェア
    @app.middleware("http")
    async def add_security_headers(request: Request, call_next):
        response: Response = await call_next(request)

        # セキュリティヘッダーの追加
        response.headers["X-Content-Type-Options"] = "nosniff"
        response.headers["X-Frame-Options"] = "DENY"
        response.headers["X-XSS-Protection"] = "1; mode=block"
        response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
        response.headers["Content-Security-Policy"] = "default-src 'self'"
        response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"

        return response

レート制限の実装

レート制限ミドルウェア

# app/rate_limit.py
import time
from collections import defaultdict
from fastapi import HTTPException, Request, status
from typing import Dict

class RateLimiter:
    def __init__(self, max_requests: int = 100, window_seconds: int = 3600):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self.requests: Dict[str, list] = defaultdict(list)

    def is_allowed(self, identifier: str) -> bool:
        now = time.time()
        # 古いリクエストを削除
        self.requests[identifier] = [
            req_time for req_time in self.requests[identifier]
            if now - req_time < self.window_seconds
        ]

        # リクエスト数チェック
        if len(self.requests[identifier]) >= self.max_requests:
            return False

        # 新しいリクエストを記録
        self.requests[identifier].append(now)
        return True

# グローバルレート制限インスタンス
rate_limiter = RateLimiter(max_requests=100, window_seconds=3600)

async def rate_limit_dependency(request: Request):
    client_ip = request.client.host
    if not rate_limiter.is_allowed(client_ip):
        raise HTTPException(
            status_code=status.HTTP_429_TOO_MANY_REQUESTS,
            detail="リクエスト回数の上限に達しました。しばらく待ってからお試しください。"
        )

ロールベースアクセス制御

ロール管理

# app/roles.py
from enum import Enum
from typing import List
from fastapi import Depends, HTTPException, status
from . import models
from .dependencies import get_current_active_user

class UserRole(str, Enum):
    USER = "user"
    MODERATOR = "moderator"
    ADMIN = "admin"

class Permission(str, Enum):
    READ_POSTS = "read_posts"
    WRITE_POSTS = "write_posts"
    DELETE_POSTS = "delete_posts"
    MANAGE_USERS = "manage_users"
    ADMIN_ACCESS = "admin_access"

# ロールと権限のマッピング
ROLE_PERMISSIONS = {
    UserRole.USER: [Permission.READ_POSTS, Permission.WRITE_POSTS],
    UserRole.MODERATOR: [
        Permission.READ_POSTS, 
        Permission.WRITE_POSTS, 
        Permission.DELETE_POSTS
    ],
    UserRole.ADMIN: [
        Permission.READ_POSTS,
        Permission.WRITE_POSTS,
        Permission.DELETE_POSTS,
        Permission.MANAGE_USERS,
        Permission.ADMIN_ACCESS
    ]
}

def require_permissions(required_permissions: List[Permission]):
    def permission_checker(current_user: models.User = Depends(get_current_active_user)):
        user_role = UserRole(current_user.role) if hasattr(current_user, 'role') else UserRole.USER
        user_permissions = ROLE_PERMISSIONS.get(user_role, [])

        for permission in required_permissions:
            if permission not in user_permissions:
                raise HTTPException(
                    status_code=status.HTTP_403_FORBIDDEN,
                    detail=f"権限が不足しています: {permission.value}"
                )
        return current_user
    return permission_checker

# 使用例
def require_admin():
    return require_permissions([Permission.ADMIN_ACCESS])

def require_post_management():
    return require_permissions([Permission.DELETE_POSTS])

実践的なセキュリティ実装例

完全な認証付きAPIの例

# app/secure_main.py
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from . import models, schemas
from .database import get_db
from .dependencies import get_current_active_user
from .security_middleware import configure_security
from .rate_limit import rate_limit_dependency
from .roles import require_admin, require_post_management

app = FastAPI(title="セキュアなブログAPI", version="1.0.0")

# セキュリティ設定を適用
configure_security(app)

# 認証が必要なエンドポイント
@app.get("/secure/posts", dependencies=[Depends(rate_limit_dependency)])
async def get_secure_posts(
    current_user: models.User = Depends(get_current_active_user),
    db: Session = Depends(get_db)
):
    return {"message": f"こんにちは、{current_user.username}さん", "posts": []}

# 管理者権限が必要なエンドポイント
@app.delete("/admin/users/{user_id}")
async def delete_user_admin(
    user_id: int,
    admin_user: models.User = Depends(require_admin()),
    db: Session = Depends(get_db)
):
    # ユーザー削除ロジック
    return {"message": f"ユーザー{user_id}が削除されました"}

# 投稿管理権限が必要なエンドポイント
@app.delete("/posts/{post_id}")
async def delete_post(
    post_id: int,
    authorized_user: models.User = Depends(require_post_management()),
    db: Session = Depends(get_db)
):
    # 投稿削除ロジック
    return {"message": f"投稿{post_id}が削除されました"}

セキュリティのベストプラクティス

1. 環境変数の管理

# .env ファイル
SECRET_KEY=your-very-secret-key-here-with-at-least-32-characters
DATABASE_URL=postgresql://user:password@localhost/dbname
GOOGLE_CLIENT_ID=your-google-client-id
GOOGLE_CLIENT_SECRET=your-google-client-secret
ALLOWED_ORIGINS=http://localhost:3000,https://yourdomain.com
ALLOWED_HOSTS=localhost,127.0.0.1,yourdomain.com

2. パスワードポリシーの実装

import re
from typing import List

def validate_password(password: str) -> List[str]:
    errors = []

    if len(password) < 8:
        errors.append("パスワードは8文字以上である必要があります")

    if not re.search(r"[A-Z]", password):
        errors.append("パスワードには大文字を含める必要があります")

    if not re.search(r"[a-z]", password):
        errors.append("パスワードには小文字を含める必要があります")

    if not re.search(r"d", password):
        errors.append("パスワードには数字を含める必要があります")

    if not re.search(r"[!@#$%^&*()_+-=[]{};':"\|,.<>/?]", password):
        errors.append("パスワードには特殊文字を含める必要があります")

    return errors

3. ログとモニタリング

import logging
from datetime import datetime

# セキュリティログの設定
security_logger = logging.getLogger("security")
security_logger.setLevel(logging.INFO)

async def log_security_event(event_type: str, user_id: int = None, details: str = ""):
    security_logger.info(f"{datetime.utcnow()} - {event_type} - User: {user_id} - {details}")

# 使用例
await log_security_event("LOGIN_SUCCESS", user_id=user.id, details=f"IP: {request.client.host}")
await log_security_event("LOGIN_FAILED", details=f"Username: {form_data.username}, IP: {request.client.host}")

まとめ

FastAPIでの認証・セキュリティ実装では以下が重要です:

  1. JWT認証: トークンベースの認証でステートレスなAPI
  2. OAuth2統合: 外部プロバイダーによる認証の実装
  3. ロールベースアクセス制御: 細かい権限管理
  4. セキュリティヘッダー: XSS、CSRF等の攻撃対策
  5. レート制限: DoS攻撃やブルートフォース攻撃の防止

次回は、FastAPIのテスト手法について詳しく解説します。ユニットテスト、統合テスト、認証付きエンドポイントのテスト方法などを学んでいきましょう。

参考リンク