USB C ケーブル 純正 1M 2本セット PD対応 60W急速充電 USBタイプc ケーブル USB-C & USBC データ転送 高耐久性 断線防止 映像出力不可 Type-c to Type-c for iPhone 16/15 Pro/Plus/Pro Max、for MacBook Pro/Air/IPad Pro/AirなどTypec機種対応
¥899 (2025-06-30 08:12 GMT +09:00 時点 - 詳細はこちら価格および発送可能時期は表示された日付/時刻の時点のものであり、変更される場合があります。本商品の購入においては、購入の時点で当該の Amazon サイトに表示されている価格および発送可能時期の情報が適用されます。)Anker PowerPort III 3-Port 65W Pod (USB PD 充電器 USB-A & USB-C 3ポート)【独自技術Anker GaN II採用 / PD対応 / PPS規格対応 / PSE技術基準適合 / 折りたたみ式プラグ】MacBook PD対応Windows PC iPad iPhone Galaxy Android スマートフォン ノートPC 各種 その他機器対応(ブラック)
¥5,990 (2025-06-30 08:12 GMT +09:00 時点 - 詳細はこちら価格および発送可能時期は表示された日付/時刻の時点のものであり、変更される場合があります。本商品の購入においては、購入の時点で当該の Amazon サイトに表示されている価格および発送可能時期の情報が適用されます。)AHS Synthesizer V Studio 2 Pro スターターパック
¥20,000 (2025-06-30 08:12 GMT +09:00 時点 - 詳細はこちら価格および発送可能時期は表示された日付/時刻の時点のものであり、変更される場合があります。本商品の購入においては、購入の時点で当該の Amazon サイトに表示されている価格および発送可能時期の情報が適用されます。)本好きの下剋上 ハンネローレの貴族院五年生2
¥1,320 (2025-06-30 08:12 GMT +09:00 時点 - 詳細はこちら価格および発送可能時期は表示された日付/時刻の時点のものであり、変更される場合があります。本商品の購入においては、購入の時点で当該の Amazon サイトに表示されている価格および発送可能時期の情報が適用されます。)目次
FastAPIの認証・セキュリティ実装 – JWT認証とOAuth2の完全ガイド
Webアプリケーションにおいて認証・セキュリティは最も重要な要素の一つです。この記事では、FastAPIでJWT認証、OAuth2、セキュリティヘッダーの実装方法を詳しく解説します。
認証の基礎知識
認証方式の種類
- セッションベース認証: サーバー側でセッション情報を保持
- トークンベース認証: JWT(JSON Web Token)を使用
- OAuth2: 外部サービス(Google、GitHub等)による認証
- API Key認証: 固定のAPIキーによる認証
JWT(JSON Web Token)とは?
JWTは以下の3つの部分から構成されます:
- Header: 暗号化アルゴリズムとトークンタイプ
- Payload: ユーザー情報やクレーム
- Signature: トークンの署名
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c
環境構築
必要なパッケージのインストール
# JWT認証関連パッケージ
pip install fastapi uvicorn[standard] python-jose[cryptography] passlib[bcrypt] python-multipart
# OAuth2サポート用
pip install authlib httpx
# セキュリティ強化用
pip install python-multipart email-validator
パスワード認証の実装
セキュリティユーティリティ
# app/security.py
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
import os
# パスワードハッシュ化設定
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# OAuth2設定
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# JWT設定
SECRET_KEY = os.getenv("SECRET_KEY", "your-secret-key-here") # 本番環境では必ず環境変数から読み込み
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""パスワードの検証"""
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
"""パスワードのハッシュ化"""
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
"""アクセストークンの作成"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def verify_token(token: str, credentials_exception):
"""トークンの検証"""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
return username
except JWTError:
raise credentials_exception
認証スキーマ
# app/auth_schemas.py
from pydantic import BaseModel
from typing import Optional
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
class UserLogin(BaseModel):
username: str
password: str
class UserRegister(BaseModel):
username: str
email: str
password: str
full_name: Optional[str] = None
class UserInDB(BaseModel):
id: int
username: str
email: str
full_name: Optional[str] = None
hashed_password: str
is_active: bool = True
is_admin: bool = False
created_at: datetime
class CurrentUser(BaseModel):
id: int
username: str
email: str
full_name: Optional[str] = None
is_active: bool
is_admin: bool
class Config:
orm_mode = True
認証関連のCRUD操作
# app/auth_crud.py
from sqlalchemy.orm import Session
from typing import Optional
from . import models
from .security import get_password_hash, verify_password
class AuthCRUD:
@staticmethod
def get_user_by_username(db: Session, username: str) -> Optional[models.User]:
return db.query(models.User).filter(models.User.username == username).first()
@staticmethod
def get_user_by_email(db: Session, email: str) -> Optional[models.User]:
return db.query(models.User).filter(models.User.email == email).first()
@staticmethod
def authenticate_user(db: Session, username: str, password: str) -> Optional[models.User]:
user = AuthCRUD.get_user_by_username(db, username)
if not user:
return None
if not verify_password(password, user.hashed_password):
return None
return user
@staticmethod
def create_user(db: Session, username: str, email: str, password: str, full_name: str = None) -> models.User:
hashed_password = get_password_hash(password)
db_user = models.User(
username=username,
email=email,
hashed_password=hashed_password,
full_name=full_name
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
@staticmethod
def update_user_last_login(db: Session, user_id: int):
user = db.query(models.User).filter(models.User.id == user_id).first()
if user:
user.last_login = datetime.utcnow()
db.commit()
依存性注入による認証
# app/dependencies.py
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
from . import auth_crud, models
from .database import get_db
from .security import verify_token
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="認証情報を確認できませんでした",
headers={"WWW-Authenticate": "Bearer"},
)
username = verify_token(token, credentials_exception)
user = auth_crud.AuthCRUD.get_user_by_username(db, username=username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(current_user: models.User = Depends(get_current_user)):
if not current_user.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="非アクティブなユーザーです"
)
return current_user
async def get_current_admin_user(current_user: models.User = Depends(get_current_active_user)):
if not current_user.is_admin:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="管理者権限が必要です"
)
return current_user
認証エンドポイントの実装
認証API
# app/auth_routes.py
from datetime import timedelta
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from . import auth_crud, auth_schemas, models
from .database import get_db
from .security import ACCESS_TOKEN_EXPIRE_MINUTES, create_access_token
from .dependencies import get_current_active_user
router = APIRouter(prefix="/auth", tags=["認証"])
@router.post("/register", response_model=auth_schemas.CurrentUser, status_code=status.HTTP_201_CREATED)
async def register_user(user_data: auth_schemas.UserRegister, db: Session = Depends(get_db)):
# 既存ユーザーチェック
existing_user = auth_crud.AuthCRUD.get_user_by_username(db, user_data.username)
if existing_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="このユーザー名は既に使用されています"
)
existing_email = auth_crud.AuthCRUD.get_user_by_email(db, user_data.email)
if existing_email:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="このメールアドレスは既に登録されています"
)
# ユーザー作成
user = auth_crud.AuthCRUD.create_user(
db=db,
username=user_data.username,
email=user_data.email,
password=user_data.password,
full_name=user_data.full_name
)
return user
@router.post("/token", response_model=auth_schemas.Token)
async def login_for_access_token(
form_data: OAuth2PasswordRequestForm = Depends(),
db: Session = Depends(get_db)
):
user = auth_crud.AuthCRUD.authenticate_user(db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="ユーザー名またはパスワードが正しくありません",
headers={"WWW-Authenticate": "Bearer"},
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="非アクティブなユーザーです"
)
# トークン作成
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
# ログイン時刻更新
auth_crud.AuthCRUD.update_user_last_login(db, user.id)
return {"access_token": access_token, "token_type": "bearer"}
@router.get("/me", response_model=auth_schemas.CurrentUser)
async def read_users_me(current_user: models.User = Depends(get_current_active_user)):
return current_user
@router.post("/refresh-token", response_model=auth_schemas.Token)
async def refresh_access_token(current_user: models.User = Depends(get_current_active_user)):
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": current_user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
OAuth2実装(Google認証)
OAuth2設定
# app/oauth2.py
import os
from authlib.integrations.starlette_client import OAuth
from fastapi import Request
import httpx
# OAuth2設定
oauth = OAuth()
oauth.register(
name='google',
client_id=os.getenv('GOOGLE_CLIENT_ID'),
client_secret=os.getenv('GOOGLE_CLIENT_SECRET'),
server_metadata_url='https://accounts.google.com/.well-known/openid_configuration',
client_kwargs={
'scope': 'openid email profile'
}
)
class GoogleOAuth:
@staticmethod
async def get_authorization_url(request: Request):
redirect_uri = request.url_for('auth_google_callback')
return await oauth.google.authorize_redirect(request, redirect_uri)
@staticmethod
async def get_user_info(request: Request):
token = await oauth.google.authorize_access_token(request)
user_info = token.get('userinfo')
return user_info
# GitHub OAuth設定例
oauth.register(
name='github',
client_id=os.getenv('GITHUB_CLIENT_ID'),
client_secret=os.getenv('GITHUB_CLIENT_SECRET'),
access_token_url='https://github.com/login/oauth/access_token',
access_token_params=None,
authorize_url='https://github.com/login/oauth/authorize',
authorize_params=None,
api_base_url='https://api.github.com/',
client_kwargs={'scope': 'user:email'},
)
OAuth2エンドポイント
# app/oauth2_routes.py
from fastapi import APIRouter, Request, HTTPException, Depends
from fastapi.responses import RedirectResponse
from sqlalchemy.orm import Session
from . import auth_crud, models
from .database import get_db
from .oauth2 import GoogleOAuth
from .security import create_access_token
router = APIRouter(prefix="/oauth2", tags=["OAuth2認証"])
@router.get("/google")
async def google_login(request: Request):
return await GoogleOAuth.get_authorization_url(request)
@router.get("/google/callback")
async def auth_google_callback(request: Request, db: Session = Depends(get_db)):
try:
user_info = await GoogleOAuth.get_user_info(request)
if not user_info:
raise HTTPException(status_code=400, detail="Google認証に失敗しました")
# ユーザー情報の取得
email = user_info.get('email')
name = user_info.get('name')
google_id = user_info.get('sub')
# 既存ユーザーのチェック
user = auth_crud.AuthCRUD.get_user_by_email(db, email)
if not user:
# 新しいユーザーの作成
username = email.split('@')[0] # メールアドレスからユーザー名を生成
# ユーザー名の重複チェック
existing_username = auth_crud.AuthCRUD.get_user_by_username(db, username)
if existing_username:
username = f"{username}_{google_id[:8]}" # ユニークなユーザー名を生成
user = models.User(
username=username,
email=email,
full_name=name,
hashed_password="", # OAuth2ユーザーはパスワード不要
is_active=True,
oauth_provider="google",
oauth_id=google_id
)
db.add(user)
db.commit()
db.refresh(user)
# JWTトークンの作成
access_token = create_access_token(data={"sub": user.username})
# フロントエンドにリダイレクト(トークンをクエリパラメータに含める)
return RedirectResponse(
url=f"/dashboard?token={access_token}",
status_code=302
)
except Exception as e:
raise HTTPException(status_code=400, detail=f"認証エラー: {str(e)}")
セキュリティヘッダーとCORS設定
セキュリティミドルウェア
# app/security_middleware.py
from fastapi import FastAPI, Request, Response
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from starlette.middleware.sessions import SessionMiddleware
import os
def configure_security(app: FastAPI):
# CORS設定
app.add_middleware(
CORSMiddleware,
allow_origins=os.getenv("ALLOWED_ORIGINS", "http://localhost:3000").split(","),
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["*"],
)
# 信頼できるホストの設定
app.add_middleware(
TrustedHostMiddleware,
allowed_hosts=os.getenv("ALLOWED_HOSTS", "localhost,127.0.0.1").split(",")
)
# セッションミドルウェア(OAuth2用)
app.add_middleware(
SessionMiddleware,
secret_key=os.getenv("SESSION_SECRET_KEY", "your-session-secret-key")
)
# セキュリティヘッダーミドルウェア
@app.middleware("http")
async def add_security_headers(request: Request, call_next):
response: Response = await call_next(request)
# セキュリティヘッダーの追加
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
response.headers["Content-Security-Policy"] = "default-src 'self'"
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
return response
レート制限の実装
レート制限ミドルウェア
# app/rate_limit.py
import time
from collections import defaultdict
from fastapi import HTTPException, Request, status
from typing import Dict
class RateLimiter:
def __init__(self, max_requests: int = 100, window_seconds: int = 3600):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests: Dict[str, list] = defaultdict(list)
def is_allowed(self, identifier: str) -> bool:
now = time.time()
# 古いリクエストを削除
self.requests[identifier] = [
req_time for req_time in self.requests[identifier]
if now - req_time < self.window_seconds
]
# リクエスト数チェック
if len(self.requests[identifier]) >= self.max_requests:
return False
# 新しいリクエストを記録
self.requests[identifier].append(now)
return True
# グローバルレート制限インスタンス
rate_limiter = RateLimiter(max_requests=100, window_seconds=3600)
async def rate_limit_dependency(request: Request):
client_ip = request.client.host
if not rate_limiter.is_allowed(client_ip):
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail="リクエスト回数の上限に達しました。しばらく待ってからお試しください。"
)
ロールベースアクセス制御
ロール管理
# app/roles.py
from enum import Enum
from typing import List
from fastapi import Depends, HTTPException, status
from . import models
from .dependencies import get_current_active_user
class UserRole(str, Enum):
USER = "user"
MODERATOR = "moderator"
ADMIN = "admin"
class Permission(str, Enum):
READ_POSTS = "read_posts"
WRITE_POSTS = "write_posts"
DELETE_POSTS = "delete_posts"
MANAGE_USERS = "manage_users"
ADMIN_ACCESS = "admin_access"
# ロールと権限のマッピング
ROLE_PERMISSIONS = {
UserRole.USER: [Permission.READ_POSTS, Permission.WRITE_POSTS],
UserRole.MODERATOR: [
Permission.READ_POSTS,
Permission.WRITE_POSTS,
Permission.DELETE_POSTS
],
UserRole.ADMIN: [
Permission.READ_POSTS,
Permission.WRITE_POSTS,
Permission.DELETE_POSTS,
Permission.MANAGE_USERS,
Permission.ADMIN_ACCESS
]
}
def require_permissions(required_permissions: List[Permission]):
def permission_checker(current_user: models.User = Depends(get_current_active_user)):
user_role = UserRole(current_user.role) if hasattr(current_user, 'role') else UserRole.USER
user_permissions = ROLE_PERMISSIONS.get(user_role, [])
for permission in required_permissions:
if permission not in user_permissions:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"権限が不足しています: {permission.value}"
)
return current_user
return permission_checker
# 使用例
def require_admin():
return require_permissions([Permission.ADMIN_ACCESS])
def require_post_management():
return require_permissions([Permission.DELETE_POSTS])
実践的なセキュリティ実装例
完全な認証付きAPIの例
# app/secure_main.py
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from . import models, schemas
from .database import get_db
from .dependencies import get_current_active_user
from .security_middleware import configure_security
from .rate_limit import rate_limit_dependency
from .roles import require_admin, require_post_management
app = FastAPI(title="セキュアなブログAPI", version="1.0.0")
# セキュリティ設定を適用
configure_security(app)
# 認証が必要なエンドポイント
@app.get("/secure/posts", dependencies=[Depends(rate_limit_dependency)])
async def get_secure_posts(
current_user: models.User = Depends(get_current_active_user),
db: Session = Depends(get_db)
):
return {"message": f"こんにちは、{current_user.username}さん", "posts": []}
# 管理者権限が必要なエンドポイント
@app.delete("/admin/users/{user_id}")
async def delete_user_admin(
user_id: int,
admin_user: models.User = Depends(require_admin()),
db: Session = Depends(get_db)
):
# ユーザー削除ロジック
return {"message": f"ユーザー{user_id}が削除されました"}
# 投稿管理権限が必要なエンドポイント
@app.delete("/posts/{post_id}")
async def delete_post(
post_id: int,
authorized_user: models.User = Depends(require_post_management()),
db: Session = Depends(get_db)
):
# 投稿削除ロジック
return {"message": f"投稿{post_id}が削除されました"}
セキュリティのベストプラクティス
1. 環境変数の管理
# .env ファイル
SECRET_KEY=your-very-secret-key-here-with-at-least-32-characters
DATABASE_URL=postgresql://user:password@localhost/dbname
GOOGLE_CLIENT_ID=your-google-client-id
GOOGLE_CLIENT_SECRET=your-google-client-secret
ALLOWED_ORIGINS=http://localhost:3000,https://yourdomain.com
ALLOWED_HOSTS=localhost,127.0.0.1,yourdomain.com
2. パスワードポリシーの実装
import re
from typing import List
def validate_password(password: str) -> List[str]:
errors = []
if len(password) < 8:
errors.append("パスワードは8文字以上である必要があります")
if not re.search(r"[A-Z]", password):
errors.append("パスワードには大文字を含める必要があります")
if not re.search(r"[a-z]", password):
errors.append("パスワードには小文字を含める必要があります")
if not re.search(r"d", password):
errors.append("パスワードには数字を含める必要があります")
if not re.search(r"[!@#$%^&*()_+-=[]{};':"\|,.<>/?]", password):
errors.append("パスワードには特殊文字を含める必要があります")
return errors
3. ログとモニタリング
import logging
from datetime import datetime
# セキュリティログの設定
security_logger = logging.getLogger("security")
security_logger.setLevel(logging.INFO)
async def log_security_event(event_type: str, user_id: int = None, details: str = ""):
security_logger.info(f"{datetime.utcnow()} - {event_type} - User: {user_id} - {details}")
# 使用例
await log_security_event("LOGIN_SUCCESS", user_id=user.id, details=f"IP: {request.client.host}")
await log_security_event("LOGIN_FAILED", details=f"Username: {form_data.username}, IP: {request.client.host}")
まとめ
FastAPIでの認証・セキュリティ実装では以下が重要です:
- JWT認証: トークンベースの認証でステートレスなAPI
- OAuth2統合: 外部プロバイダーによる認証の実装
- ロールベースアクセス制御: 細かい権限管理
- セキュリティヘッダー: XSS、CSRF等の攻撃対策
- レート制限: DoS攻撃やブルートフォース攻撃の防止
次回は、FastAPIのテスト手法について詳しく解説します。ユニットテスト、統合テスト、認証付きエンドポイントのテスト方法などを学んでいきましょう。