コンテナ本番運用とセキュリティ強化ガイド:エンタープライズ級の実践手法

約90分で読めます by ぽんたぬき
コンテナ本番運用とセキュリティ強化ガイド:エンタープライズ級の実践手法

コンテナ本番運用とセキュリティ強化ガイド:エンタープライズ級の実践手法

開発環境でのコンテナ化に成功したら、次の重要なステップは本番環境での安全で効率的な運用です。本番環境では、開発時とは全く異なるセキュリティ要件、可用性要件、コンプライアンス要件が求められます。

この記事では、エンタープライズ環境で求められるコンテナセキュリティのベストプラクティス、継続的な監視・運用手法、そして実際の脅威に対する具体的な対策を詳しく解説します。金融、医療、政府機関などの高セキュリティ環境でも適用できる実践的な手法を習得できます。

エンタープライズコンテナセキュリティの基本原則

ゼロトラストアーキテクチャの実装

基本方針:

  • すべてのコンテナを信頼しない
  • 最小権限の原則を徹底
  • 継続的な監視と検証
  • 多層防御の実装

実装レベル:

# セキュリティ強化されたdocker-compose.yml
services:
  web:
    image: myapp:latest
    
    # セキュリティ設定
    read_only: true  # ファイルシステム読み取り専用
    tmpfs:
      - /tmp:noexec,nosuid,size=100m
      - /var/run:noexec,nosuid,size=50m
    
    # 権限制限
    cap_drop:
      - ALL  # すべての権限を削除
    cap_add:
      - NET_BIND_SERVICE  # 必要最小限のみ追加
    
    security_opt:
      - no-new-privileges:true  # 特権昇格防止
      - apparmor:myapp-profile  # AppArmor プロファイル
    
    # ユーザー制限
    user: "1000:1000"  # 非rootユーザー
    
    # リソース制限
    deploy:
      resources:
        limits:
          memory: 512M
          cpus: '0.5'
        reservations:
          memory: 256M
          cpus: '0.25'
    
    # ネットワーク分離
    networks:
      - frontend
      - backend
    
    # 環境変数(シークレット管理)
    environment:
      - DATABASE_URL_FILE=/run/secrets/db_url
      - API_KEY_FILE=/run/secrets/api_key
    
    secrets:
      - db_url
      - api_key
    
    # ヘルスチェック
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
      interval: 30s
      timeout: 10s
      retries: 3
      start_period: 40s

# ネットワーク分離
networks:
  frontend:
    driver: bridge
    internal: false
  backend:
    driver: bridge
    internal: true  # 外部アクセス遮断

# シークレット管理
secrets:
  db_url:
    external: true
  api_key:
    external: true

セキュアなDockerfile設計

セキュリティ重視のマルチステージビルド:

# セキュア本番用Dockerfile
#==============================================================================
# セキュリティスキャン付きベースイメージ
#==============================================================================
FROM python:3.11-slim@sha256:12345abcdef as base

# セキュリティアップデート
RUN apt-get update && apt-get upgrade -y \
    && apt-get install -y --no-install-recommends \
        # 必要最小限のパッケージ
        ca-certificates \
        curl \
        gnupg \
    && rm -rf /var/lib/apt/lists/* \
    && apt-get clean

# セキュリティ設定
RUN echo 'umask 077' >> /etc/profile

#==============================================================================
# ビルドステージ(特権操作を分離)
#==============================================================================
FROM base as builder

# ビルド用パッケージ(本番には含まれない)
RUN apt-get update && apt-get install -y --no-install-recommends \
    build-essential \
    && rm -rf /var/lib/apt/lists/*

WORKDIR /build

# 依存関係のビルド
COPY requirements.txt .
RUN pip install --user --no-cache-dir --upgrade pip setuptools \
    && pip install --user --no-cache-dir -r requirements.txt

# セキュリティスキャン(ビルド時)
RUN pip install --user safety \
    && /root/.local/bin/safety check \
    && pip uninstall -y safety

#==============================================================================
# 本番実行ステージ
#==============================================================================
FROM base as production

# 非特権ユーザーの作成
RUN groupadd -r -g 1000 appgroup \
    && useradd -r -u 1000 -g appgroup -d /app -s /sbin/nologin \
        -c "Application User" appuser

# アプリケーションディレクトリ設定
WORKDIR /app
RUN chown appuser:appgroup /app

# ビルドステージから依存関係のみコピー
COPY --from=builder --chown=appuser:appgroup /root/.local /home/appuser/.local

# アプリケーションコードコピー
COPY --chown=appuser:appgroup src/ ./src/
COPY --chown=appuser:appgroup scripts/entrypoint.sh ./

# 実行権限設定(最小限)
RUN chmod 755 entrypoint.sh \
    && chmod -R 755 src/ \
    && find . -name "*.py" -exec chmod 644 {} \;

# 環境変数設定
ENV PATH="/home/appuser/.local/bin:$PATH"
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
ENV PYTHONPATH=/app

# セキュリティラベル
LABEL security.scan.date="$(date -u +%Y-%m-%dT%H:%M:%SZ)"
LABEL security.base.image="python:3.11-slim"
LABEL security.maintainer="security@company.com"

# 非rootユーザーに切り替え
USER appuser

# ヘルスチェック
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
    CMD curl -f http://localhost:8000/health || exit 1

# エントリーポイント
ENTRYPOINT ["./entrypoint.sh"]
CMD ["python", "src/main.py"]

セキュリティ強化エントリーポイント:

#!/bin/bash
# scripts/entrypoint.sh - セキュリティ強化エントリーポイント

set -euo pipefail

# セキュリティチェック関数
security_check() {
    echo "🔒 Security validation starting..."
    
    # 権限チェック
    if [ "$(id -u)" -eq 0 ]; then
        echo "❌ Running as root is not allowed"
        exit 1
    fi
    
    # 必要なシークレットファイル確認
    local required_secrets=("DATABASE_URL_FILE" "API_KEY_FILE")
    for secret_var in "${required_secrets[@]}"; do
        local secret_file="${!secret_var:-}"
        if [ -z "$secret_file" ] || [ ! -f "$secret_file" ]; then
            echo "❌ Required secret not found: $secret_var"
            exit 1
        fi
        
        # ファイル権限チェック
        local perms=$(stat -c "%a" "$secret_file")
        if [ "$perms" != "400" ] && [ "$perms" != "600" ]; then
            echo "❌ Insecure permissions on secret file: $secret_file ($perms)"
            exit 1
        fi
    done
    
    # 環境変数サニタイゼーション
    unset HISTFILE
    unset HISTSIZE
    export HISTFILE=/dev/null
    
    echo "✅ Security validation passed"
}

# シークレット読み込み関数
load_secrets() {
    echo "🔑 Loading secrets..."
    
    if [ -f "${DATABASE_URL_FILE:-}" ]; then
        export DATABASE_URL=$(cat "$DATABASE_URL_FILE")
        echo "✅ Database URL loaded"
    fi
    
    if [ -f "${API_KEY_FILE:-}" ]; then
        export API_KEY=$(cat "$API_KEY_FILE")
        echo "✅ API key loaded"
    fi
}

# ヘルスチェック関数
health_check() {
    local max_attempts=30
    local attempt=1
    
    echo "🏥 Waiting for application to be ready..."
    
    while [ $attempt -le $max_attempts ]; do
        if curl -f -s http://localhost:8000/health > /dev/null 2>&1; then
            echo "✅ Application is healthy"
            return 0
        fi
        
        echo "⏳ Health check attempt $attempt/$max_attempts..."
        sleep 2
        ((attempt++))
    done
    
    echo "❌ Application failed to become healthy"
    return 1
}

# メイン実行
main() {
    echo "🚀 Application starting..."
    echo "User: $(whoami) ($(id))"
    echo "Working directory: $(pwd)"
    echo "Environment: ${ENVIRONMENT:-production}"
    
    # セキュリティチェック実行
    security_check
    
    # シークレット読み込み
    load_secrets
    
    # アプリケーション起動
    echo "🎯 Starting application: $*"
    exec "$@" &
    
    # プロセスID記録
    echo $! > /tmp/app.pid
    
    # ヘルスチェック実行
    if ! health_check; then
        echo "❌ Application startup failed"
        exit 1
    fi
    
    # シグナルハンドリング
    wait
}

# シグナルハンドラー
cleanup() {
    echo "🛑 Shutting down gracefully..."
    if [ -f /tmp/app.pid ]; then
        kill -TERM $(cat /tmp/app.pid) 2>/dev/null || true
        wait $(cat /tmp/app.pid) 2>/dev/null || true
    fi
    exit 0
}

trap cleanup SIGTERM SIGINT

# メイン実行
main "$@"

脆弱性管理と継続的セキュリティ

自動化セキュリティスキャン

CI/CDパイプラインでのセキュリティ統合:

# .github/workflows/security-scan.yml
name: Security Scan Pipeline

on:
  push:
    branches: [main, develop]
  pull_request:
    branches: [main]
  schedule:
    - cron: '0 2 * * *'  # 毎日実行

jobs:
  #============================================================================
  # 依存関係脆弱性スキャン
  #============================================================================
  dependency-scan:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v4
    
    - name: Setup Python
      uses: actions/setup-python@v4
      with:
        python-version: '3.11'
    
    - name: Install dependencies
      run: |
        pip install safety pip-audit
        pip install -r requirements.txt
    
    - name: Run Safety check
      run: safety check --json --output safety-report.json
    
    - name: Run pip-audit
      run: pip-audit --format=json --output=pip-audit-report.json
    
    - name: Upload security reports
      uses: actions/upload-artifact@v3
      with:
        name: dependency-scan-reports
        path: |
          safety-report.json
          pip-audit-report.json

  #============================================================================
  # コンテナイメージ脆弱性スキャン
  #============================================================================
  container-scan:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v4
    
    - name: Build container image
      run: docker build -t myapp:scan .
    
    - name: Run Trivy vulnerability scanner
      uses: aquasecurity/trivy-action@master
      with:
        image-ref: 'myapp:scan'
        format: 'sarif'
        output: 'trivy-results.sarif'
        severity: 'CRITICAL,HIGH,MEDIUM'
    
    - name: Run Grype vulnerability scanner
      run: |
        curl -sSfL https://raw.githubusercontent.com/anchore/grype/main/install.sh | sh -s -- -b /usr/local/bin
        grype myapp:scan -o json > grype-results.json
    
    - name: Upload Trivy scan results to GitHub Security tab
      uses: github/codeql-action/upload-sarif@v2
      with:
        sarif_file: 'trivy-results.sarif'
    
    - name: Upload vulnerability reports
      uses: actions/upload-artifact@v3
      with:
        name: container-scan-reports
        path: |
          trivy-results.sarif
          grype-results.json

  #============================================================================
  # コード品質・セキュリティスキャン
  #============================================================================
  code-scan:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v4
    
    - name: Setup Python
      uses: actions/setup-python@v4
      with:
        python-version: '3.11'
    
    - name: Install security tools
      run: |
        pip install bandit[toml] semgrep
    
    - name: Run Bandit security linter
      run: |
        bandit -r src/ -f json -o bandit-report.json
    
    - name: Run Semgrep security analysis
      run: |
        semgrep --config=auto --json --output=semgrep-report.json src/
    
    - name: Upload code scan reports
      uses: actions/upload-artifact@v3
      with:
        name: code-scan-reports
        path: |
          bandit-report.json
          semgrep-report.json

  #============================================================================
  # セキュリティポリシー検証
  #============================================================================
  policy-check:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v4
    
    - name: Build container image
      run: docker build -t myapp:policy .
    
    - name: Install Conftest
      run: |
        wget https://github.com/open-policy-agent/conftest/releases/download/v0.46.0/conftest_0.46.0_Linux_x86_64.tar.gz
        tar xzf conftest_0.46.0_Linux_x86_64.tar.gz
        sudo mv conftest /usr/local/bin
    
    - name: Run security policy tests
      run: |
        # Dockerfileポリシーチェック
        conftest test --policy security-policies/ Dockerfile
        
        # docker-compose.ymlポリシーチェック
        conftest test --policy security-policies/ docker-compose.yml

  #============================================================================
  # セキュリティレポート集約
  #============================================================================
  security-report:
    needs: [dependency-scan, container-scan, code-scan, policy-check]
    runs-on: ubuntu-latest
    if: always()
    
    steps:
    - name: Download all reports
      uses: actions/download-artifact@v3
    
    - name: Generate security summary
      run: |
        python3 scripts/generate-security-summary.py \
          --dependency-scan dependency-scan-reports/ \
          --container-scan container-scan-reports/ \
          --code-scan code-scan-reports/ \
          --output security-summary.html
    
    - name: Upload security summary
      uses: actions/upload-artifact@v3
      with:
        name: security-summary
        path: security-summary.html

セキュリティポリシー定義

OPA (Open Policy Agent) セキュリティポリシー:

# security-policies/docker-security.rego
package docker.security

# ルートユーザー実行禁止
deny[msg] {
    input.User == "root"
    msg := "Container must not run as root user"
}

deny[msg] {
    input.User == "0"
    msg := "Container must not run as UID 0"
}

# 特権モード禁止
deny[msg] {
    input.Privileged == true
    msg := "Privileged containers are not allowed"
}

# 読み取り専用ファイルシステム強制
deny[msg] {
    input.ReadonlyRootfs != true
    msg := "Container filesystem must be read-only"
}

# 不要な権限削除確認
required_dropped_caps := [
    "AUDIT_WRITE",
    "CHOWN", 
    "DAC_OVERRIDE",
    "FOWNER",
    "FSETID",
    "KILL",
    "MKNOD",
    "NET_BIND_SERVICE",
    "NET_RAW",
    "SETFCAP",
    "SETGID",
    "SETPCAP",
    "SETUID",
    "SYS_CHROOT"
]

deny[msg] {
    missing := required_dropped_caps[_]
    not missing in input.CapDrop
    msg := sprintf("Must drop capability: %v", [missing])
}

# リソース制限必須
deny[msg] {
    not input.Memory
    msg := "Memory limit must be specified"
}

deny[msg] {
    not input.CpuShares
    msg := "CPU limit must be specified"
}
# security-policies/compose-security.rego
package compose.security

# ネットワークセキュリティ
deny[msg] {
    service := input.services[_]
    service.network_mode == "host"
    msg := "Host network mode is not allowed"
}

# ボリュームセキュリティ
deny[msg] {
    service := input.services[_]
    volume := service.volumes[_]
    contains(volume, ":/:")
    not contains(volume, ":ro")
    msg := sprintf("Volume mount should be read-only: %v", [volume])
}

# シークレット管理
deny[msg] {
    service := input.services[_]
    env := service.environment[_]
    contains(env, "PASSWORD=")
    msg := "Passwords must not be set as environment variables"
}

deny[msg] {
    service := input.services[_]
    env := service.environment[_]
    contains(env, "SECRET=")
    msg := "Secrets must not be set as environment variables"
}

監視・ログ・アラート戦略

包括的監視システム

Prometheus + Grafana + AlertManager 統合:

# monitoring/docker-compose.monitoring.yml
services:
  #============================================================================
  # Prometheus - メトリクス収集
  #============================================================================
  prometheus:
    image: prom/prometheus:latest
    container_name: prometheus
    restart: unless-stopped
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus/prometheus.yml:/etc/prometheus/prometheus.yml
      - ./prometheus/rules:/etc/prometheus/rules
      - prometheus_data:/prometheus
    command:
      - '--config.file=/etc/prometheus/prometheus.yml'
      - '--storage.tsdb.path=/prometheus'
      - '--web.console.libraries=/etc/prometheus/console_libraries'
      - '--web.console.templates=/etc/prometheus/consoles'
      - '--storage.tsdb.retention.time=30d'
      - '--web.enable-lifecycle'
      - '--web.enable-admin-api'

  #============================================================================
  # AlertManager - アラート管理
  #============================================================================
  alertmanager:
    image: prom/alertmanager:latest
    container_name: alertmanager
    restart: unless-stopped
    ports:
      - "9093:9093"
    volumes:
      - ./alertmanager/alertmanager.yml:/etc/alertmanager/alertmanager.yml
      - alertmanager_data:/alertmanager
    command:
      - '--config.file=/etc/alertmanager/alertmanager.yml'
      - '--storage.path=/alertmanager'

  #============================================================================
  # Grafana - 可視化ダッシュボード
  #============================================================================
  grafana:
    image: grafana/grafana:latest
    container_name: grafana
    restart: unless-stopped
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD_FILE=/run/secrets/grafana_admin_password
      - GF_SECURITY_DISABLE_GRAVATAR=true
      - GF_USERS_ALLOW_SIGN_UP=false
      - GF_SECURITY_COOKIE_SECURE=true
      - GF_SECURITY_STRICT_TRANSPORT_SECURITY=true
    volumes:
      - ./grafana/dashboards:/var/lib/grafana/dashboards
      - ./grafana/provisioning:/etc/grafana/provisioning
      - grafana_data:/var/lib/grafana
    secrets:
      - grafana_admin_password

  #============================================================================
  # cAdvisor - コンテナメトリクス
  #============================================================================
  cadvisor:
    image: gcr.io/cadvisor/cadvisor:latest
    container_name: cadvisor
    restart: unless-stopped
    ports:
      - "8080:8080"
    volumes:
      - /:/rootfs:ro
      - /var/run:/var/run:ro
      - /sys:/sys:ro
      - /var/lib/docker/:/var/lib/docker:ro
      - /dev/disk/:/dev/disk:ro
    privileged: true
    devices:
      - /dev/kmsg

  #============================================================================
  # Node Exporter - システムメトリクス
  #============================================================================
  node_exporter:
    image: prom/node-exporter:latest
    container_name: node_exporter
    restart: unless-stopped
    ports:
      - "9100:9100"
    volumes:
      - /proc:/host/proc:ro
      - /sys:/host/sys:ro
      - /:/rootfs:ro
    command:
      - '--path.procfs=/host/proc'
      - '--path.rootfs=/rootfs'
      - '--path.sysfs=/host/sys'
      - '--collector.filesystem.mount-points-exclude=^/(sys|proc|dev|host|etc)($$|/)'

  #============================================================================
  # Loki - ログ集約
  #============================================================================
  loki:
    image: grafana/loki:latest
    container_name: loki
    restart: unless-stopped
    ports:
      - "3100:3100"
    volumes:
      - ./loki/loki.yml:/etc/loki/local-config.yaml
      - loki_data:/loki
    command: -config.file=/etc/loki/local-config.yaml

  #============================================================================
  # Promtail - ログ収集
  #============================================================================
  promtail:
    image: grafana/promtail:latest
    container_name: promtail
    restart: unless-stopped
    volumes:
      - ./promtail/promtail.yml:/etc/promtail/config.yml
      - /var/log:/var/log:ro
      - /var/lib/docker/containers:/var/lib/docker/containers:ro
    command: -config.file=/etc/promtail/config.yml

volumes:
  prometheus_data:
  alertmanager_data:
  grafana_data:
  loki_data:

secrets:
  grafana_admin_password:
    external: true

セキュリティ特化アラートルール:

# prometheus/rules/security-alerts.yml
groups:
- name: container_security
  rules:
  # 高CPU使用率(DDoS攻撃など)
  - alert: HighCPUUsage
    expr: rate(container_cpu_usage_seconds_total[5m]) * 100 > 80
    for: 2m
    labels:
      severity: warning
      category: security
    annotations:
      summary: "High CPU usage detected on {{ $labels.name }}"
      description: "Container {{ $labels.name }} CPU usage is above 80% for more than 2 minutes"

  # 異常なメモリ使用(メモリリーク、攻撃)
  - alert: HighMemoryUsage
    expr: (container_memory_usage_bytes / container_spec_memory_limit_bytes) * 100 > 90
    for: 5m
    labels:
      severity: critical
      category: security
    annotations:
      summary: "High memory usage detected on {{ $labels.name }}"
      description: "Container {{ $labels.name }} memory usage is above 90% for more than 5 minutes"

  # 異常なネットワークトラフィック
  - alert: HighNetworkTraffic
    expr: rate(container_network_receive_bytes_total[5m]) > 100000000  # 100MB/s
    for: 1m
    labels:
      severity: warning
      category: security
    annotations:
      summary: "High network traffic detected on {{ $labels.name }}"
      description: "Container {{ $labels.name }} receiving more than 100MB/s"

  # コンテナ再起動頻発(攻撃による異常終了)
  - alert: ContainerRestartLoop
    expr: rate(container_last_seen[5m]) > 0.1
    for: 2m
    labels:
      severity: critical
      category: security
    annotations:
      summary: "Container restart loop detected"
      description: "Container {{ $labels.name }} is restarting frequently"

  # ファイルシステム書き込み検出(読み取り専用FS違反)
  - alert: FilesystemWriteAttempt
    expr: increase(container_fs_writes_total[5m]) > 0
    for: 0m
    labels:
      severity: critical
      category: security
    annotations:
      summary: "Unauthorized filesystem write detected"
      description: "Write attempt detected on read-only filesystem in {{ $labels.name }}"

- name: application_security  
  rules:
  # 異常なHTTPエラー率(攻撃パターン)
  - alert: HighHTTPErrorRate
    expr: rate(http_requests_total{status=~"4..|5.."}[5m]) / rate(http_requests_total[5m]) > 0.1
    for: 2m
    labels:
      severity: warning
      category: security
    annotations:
      summary: "High HTTP error rate detected"
      description: "HTTP error rate is above 10% for more than 2 minutes"

  # 認証失敗の頻発
  - alert: HighAuthFailureRate
    expr: rate(auth_failures_total[5m]) > 5
    for: 1m
    labels:
      severity: critical
      category: security
    annotations:
      summary: "High authentication failure rate"
      description: "More than 5 authentication failures per second"

  # データベース接続エラー
  - alert: DatabaseConnectionFailure
    expr: up{job="database"} == 0
    for: 30s
    labels:
      severity: critical
      category: availability
    annotations:
      summary: "Database connection failure"
      description: "Cannot connect to database"

セキュリティ専用ログ分析

セキュリティイベント検出スクリプト:

#!/usr/bin/env python3
# scripts/security-log-analyzer.py

import json
import re
import sys
import argparse
from datetime import datetime, timedelta
from collections import defaultdict, Counter
import geoip2.database
import requests

class SecurityLogAnalyzer:
    def __init__(self, geoip_db_path="/opt/GeoLite2-City.mmdb"):
        self.geoip_reader = None
        try:
            self.geoip_reader = geoip2.database.Reader(geoip_db_path)
        except:
            print("Warning: GeoIP database not available")
        
        # 脅威パターン定義
        self.threat_patterns = {
            'sql_injection': [
                r"(?i)(union.*select|select.*from.*where)",
                r"(?i)(drop.*table|delete.*from)",
                r"(?i)('.*or.*'=')",
            ],
            'xss_attempt': [
                r"(?i)(<script|javascript:|on\w+\s*=)",
                r"(?i)(alert\s*\(|confirm\s*\()",
            ],
            'path_traversal': [
                r"\.\.\/|\.\.\\",
                r"(?i)(\/etc\/passwd|\/etc\/shadow)",
            ],
            'brute_force': [
                r"(?i)(failed.*login|authentication.*failed)",
                r"(?i)(invalid.*password|wrong.*credentials)",
            ],
            'reconnaissance': [
                r"(?i)(robots\.txt|\.well-known)",
                r"(?i)(wp-admin|phpmyadmin)",
                r"(?i)(scan|probe|enum)",
            ]
        }
    
    def analyze_logs(self, log_file_path):
        """ログファイルを分析してセキュリティ脅威を検出"""
        
        security_events = []
        ip_stats = Counter()
        user_agent_stats = Counter()
        attack_types = Counter()
        
        with open(log_file_path, 'r') as f:
            for line_num, line in enumerate(f, 1):
                try:
                    # JSON形式のログをパース
                    if line.strip().startswith('{'):
                        log_entry = json.loads(line)
                    else:
                        # 標準的なApache/Nginxログ形式をパース
                        log_entry = self.parse_access_log(line)
                    
                    # セキュリティ脅威分析
                    threats = self.detect_threats(log_entry)
                    
                    if threats:
                        event = {
                            'timestamp': log_entry.get('timestamp'),
                            'ip': log_entry.get('remote_addr'),
                            'user_agent': log_entry.get('user_agent'),
                            'request': log_entry.get('request'),
                            'status': log_entry.get('status'),
                            'threats': threats,
                            'line_number': line_num
                        }
                        
                        # GeoIP情報追加
                        if self.geoip_reader and event['ip']:
                            geo_info = self.get_geo_info(event['ip'])
                            event['geo'] = geo_info
                        
                        security_events.append(event)
                        
                        # 統計更新
                        ip_stats[event['ip']] += 1
                        user_agent_stats[event['user_agent']] += 1
                        for threat in threats:
                            attack_types[threat] += 1
                
                except Exception as e:
                    print(f"Error processing line {line_num}: {e}")
                    continue
        
        return {
            'events': security_events,
            'statistics': {
                'top_attacking_ips': ip_stats.most_common(10),
                'top_user_agents': user_agent_stats.most_common(10),
                'attack_types': attack_types.most_common(),
                'total_events': len(security_events)
            }
        }
    
    def detect_threats(self, log_entry):
        """ログエントリから脅威パターンを検出"""
        threats = []
        request = log_entry.get('request', '')
        user_agent = log_entry.get('user_agent', '')
        
        for threat_type, patterns in self.threat_patterns.items():
            for pattern in patterns:
                if re.search(pattern, request) or re.search(pattern, user_agent):
                    threats.append(threat_type)
                    break
        
        return threats
    
    def parse_access_log(self, line):
        """標準的なアクセスログ形式をパース"""
        # Nginx/Apache combined log format
        pattern = r'(\S+) \S+ \S+ \[(.*?)\] "(.*?)" (\d+) \S+ "(.*?)" "(.*?)"'
        match = re.match(pattern, line)
        
        if match:
            return {
                'remote_addr': match.group(1),
                'timestamp': match.group(2),
                'request': match.group(3),
                'status': int(match.group(4)),
                'referer': match.group(5),
                'user_agent': match.group(6)
            }
        return {}
    
    def get_geo_info(self, ip):
        """IP アドレスの地理的情報を取得"""
        try:
            response = self.geoip_reader.city(ip)
            return {
                'country': response.country.name,
                'city': response.city.name,
                'latitude': float(response.location.latitude),
                'longitude': float(response.location.longitude)
            }
        except:
            return None
    
    def check_threat_intelligence(self, ip):
        """脅威インテリジェンスDBで IP をチェック"""
        # VirusTotal API を使用した例
        api_key = "YOUR_VT_API_KEY"
        url = f"https://www.virustotal.com/vtapi/v2/ip-address/report"
        params = {'apikey': api_key, 'ip': ip}
        
        try:
            response = requests.get(url, params=params, timeout=5)
            data = response.json()
            
            if data.get('response_code') == 1:
                return {
                    'malicious': data.get('positives', 0) > 0,
                    'detections': data.get('positives', 0),
                    'total_scans': data.get('total', 0)
                }
        except:
            pass
        
        return None
    
    def generate_report(self, analysis_result, output_format='json'):
        """分析結果のレポート生成"""
        
        report = {
            'analysis_timestamp': datetime.now().isoformat(),
            'summary': {
                'total_security_events': analysis_result['statistics']['total_events'],
                'unique_attacking_ips': len(analysis_result['statistics']['top_attacking_ips']),
                'most_common_attack': analysis_result['statistics']['attack_types'][0] if analysis_result['statistics']['attack_types'] else None
            },
            'events': analysis_result['events'][:100],  # Top 100 events
            'statistics': analysis_result['statistics']
        }
        
        if output_format == 'json':
            return json.dumps(report, indent=2, default=str)
        elif output_format == 'html':
            return self.generate_html_report(report)
        
        return report

    def generate_html_report(self, report):
        """HTML形式のセキュリティレポート生成"""
        html = f"""
        <!DOCTYPE html>
        <html>
        <head>
            <title>Security Analysis Report</title>
            <style>
                body {{ font-family: Arial, sans-serif; margin: 20px; }}
                .header {{ background: #f44336; color: white; padding: 20px; }}
                .summary {{ background: #ffeb3b; padding: 15px; margin: 10px 0; }}
                .event {{ background: #f5f5f5; margin: 10px 0; padding: 10px; border-left: 4px solid #f44336; }}
                .stats {{ display: flex; justify-content: space-between; }}
                .stat-box {{ background: white; padding: 15px; margin: 10px; border: 1px solid #ddd; }}
            </style>
        </head>
        <body>
            <div class="header">
                <h1>🔒 Security Analysis Report</h1>
                <p>Generated: {report['analysis_timestamp']}</p>
            </div>
            
            <div class="summary">
                <h2>📊 Summary</h2>
                <div class="stats">
                    <div class="stat-box">
                        <h3>{report['summary']['total_security_events']}</h3>
                        <p>Security Events</p>
                    </div>
                    <div class="stat-box">
                        <h3>{report['summary']['unique_attacking_ips']}</h3>
                        <p>Unique Attacking IPs</p>
                    </div>
                    <div class="stat-box">
                        <h3>{report['summary'].get('most_common_attack', 'N/A')}</h3>
                        <p>Most Common Attack</p>
                    </div>
                </div>
            </div>
            
            <div class="events">
                <h2>🚨 Security Events</h2>
        """
        
        for event in report['events'][:20]:  # Top 20 events
            html += f"""
                <div class="event">
                    <strong>IP:</strong> {event.get('ip', 'Unknown')} |
                    <strong>Time:</strong> {event.get('timestamp', 'Unknown')} |
                    <strong>Threats:</strong> {', '.join(event.get('threats', []))}
                    <br>
                    <strong>Request:</strong> <code>{event.get('request', '')}</code>
                </div>
            """
        
        html += """
            </div>
        </body>
        </html>
        """
        
        return html

def main():
    parser = argparse.ArgumentParser(description='Security Log Analyzer')
    parser.add_argument('log_file', help='Path to log file')
    parser.add_argument('--output', '-o', default='report.json', help='Output file')
    parser.add_argument('--format', '-f', choices=['json', 'html'], default='json', help='Output format')
    parser.add_argument('--geoip-db', help='Path to GeoIP database')
    
    args = parser.parse_args()
    
    analyzer = SecurityLogAnalyzer(args.geoip_db) if args.geoip_db else SecurityLogAnalyzer()
    
    print(f"🔍 Analyzing security logs: {args.log_file}")
    analysis_result = analyzer.analyze_logs(args.log_file)
    
    print(f"📊 Found {analysis_result['statistics']['total_events']} security events")
    
    report = analyzer.generate_report(analysis_result, args.format)
    
    with open(args.output, 'w') as f:
        f.write(report)
    
    print(f"📄 Report saved to: {args.output}")

if __name__ == "__main__":
    main()

インシデント対応とフォレンジック

自動化インシデント対応

セキュリティインシデント検出・対応システム:

#!/usr/bin/env python3
# scripts/incident-response.py

import docker
import json
import subprocess
import time
from datetime import datetime
from typing import Dict, List, Any
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart

class IncidentResponseSystem:
    def __init__(self, config_file="incident-config.json"):
        self.docker_client = docker.from_env()
        self.config = self.load_config(config_file)
        self.incident_log = []
    
    def load_config(self, config_file):
        """設定ファイル読み込み"""
        try:
            with open(config_file, 'r') as f:
                return json.load(f)
        except FileNotFoundError:
            return self.get_default_config()
    
    def get_default_config(self):
        """デフォルト設定"""
        return {
            "thresholds": {
                "cpu_usage": 90,
                "memory_usage": 95,
                "disk_usage": 85,
                "network_connections": 1000
            },
            "response_actions": {
                "isolate_container": True,
                "capture_logs": True,
                "create_snapshot": True,
                "notify_admins": True
            },
            "notification": {
                "email": {
                    "enabled": True,
                    "smtp_server": "smtp.company.com",
                    "smtp_port": 587,
                    "recipients": ["security@company.com"]
                },
                "slack": {
                    "enabled": False,
                    "webhook_url": ""
                }
            }
        }
    
    def monitor_containers(self):
        """コンテナ監視とインシデント検出"""
        print("🔍 Starting container monitoring...")
        
        while True:
            try:
                containers = self.docker_client.containers.list()
                
                for container in containers:
                    # メトリクス取得
                    stats = container.stats(stream=False)
                    
                    # インシデント検出
                    incidents = self.detect_incidents(container, stats)
                    
                    for incident in incidents:
                        self.handle_incident(container, incident)
                
                time.sleep(30)  # 30秒間隔で監視
                
            except KeyboardInterrupt:
                print("👋 Monitoring stopped")
                break
            except Exception as e:
                print(f"❌ Monitoring error: {e}")
                time.sleep(60)
    
    def detect_incidents(self, container, stats) -> List[Dict]:
        """インシデント検出ロジック"""
        incidents = []
        
        # CPU使用率チェック
        cpu_usage = self.calculate_cpu_usage(stats)
        if cpu_usage > self.config["thresholds"]["cpu_usage"]:
            incidents.append({
                "type": "high_cpu_usage",
                "severity": "critical",
                "value": cpu_usage,
                "threshold": self.config["thresholds"]["cpu_usage"],
                "message": f"CPU usage {cpu_usage:.1f}% exceeds threshold"
            })
        
        # メモリ使用率チェック
        memory_usage = self.calculate_memory_usage(stats)
        if memory_usage > self.config["thresholds"]["memory_usage"]:
            incidents.append({
                "type": "high_memory_usage",
                "severity": "critical",
                "value": memory_usage,
                "threshold": self.config["thresholds"]["memory_usage"],
                "message": f"Memory usage {memory_usage:.1f}% exceeds threshold"
            })
        
        # ネットワーク接続数チェック
        network_connections = self.count_network_connections(container)
        if network_connections > self.config["thresholds"]["network_connections"]:
            incidents.append({
                "type": "high_network_connections",
                "severity": "warning",
                "value": network_connections,
                "threshold": self.config["thresholds"]["network_connections"],
                "message": f"Network connections {network_connections} exceeds threshold"
            })
        
        # 異常プロセス検出
        suspicious_processes = self.detect_suspicious_processes(container)
        if suspicious_processes:
            incidents.append({
                "type": "suspicious_processes",
                "severity": "critical",
                "value": suspicious_processes,
                "message": f"Suspicious processes detected: {suspicious_processes}"
            })
        
        return incidents
    
    def handle_incident(self, container, incident):
        """インシデント対応処理"""
        incident_id = f"INC-{datetime.now().strftime('%Y%m%d-%H%M%S')}-{container.short_id}"
        
        incident_record = {
            "id": incident_id,
            "timestamp": datetime.now().isoformat(),
            "container_id": container.id,
            "container_name": container.name,
            "incident": incident,
            "actions_taken": []
        }
        
        print(f"🚨 INCIDENT DETECTED: {incident_id}")
        print(f"   Container: {container.name}")
        print(f"   Type: {incident['type']}")
        print(f"   Severity: {incident['severity']}")
        print(f"   Message: {incident['message']}")
        
        # 対応アクション実行
        if self.config["response_actions"]["capture_logs"]:
            self.capture_container_logs(container, incident_record)
        
        if self.config["response_actions"]["create_snapshot"]:
            self.create_container_snapshot(container, incident_record)
        
        if incident['severity'] == 'critical':
            if self.config["response_actions"]["isolate_container"]:
                self.isolate_container(container, incident_record)
        
        if self.config["response_actions"]["notify_admins"]:
            self.send_notifications(incident_record)
        
        # インシデントログに記録
        self.incident_log.append(incident_record)
        self.save_incident_log()
    
    def capture_container_logs(self, container, incident_record):
        """コンテナログキャプチャ"""
        try:
            logs = container.logs(tail=1000).decode('utf-8')
            
            log_file = f"/var/log/security/incidents/{incident_record['id']}_logs.txt"
            with open(log_file, 'w') as f:
                f.write(logs)
            
            incident_record["actions_taken"].append(f"Logs captured: {log_file}")
            print(f"   ✅ Logs captured: {log_file}")
            
        except Exception as e:
            print(f"   ❌ Failed to capture logs: {e}")
    
    def create_container_snapshot(self, container, incident_record):
        """コンテナスナップショット作成"""
        try:
            snapshot_name = f"incident-{incident_record['id']}"
            
            # コンテナをイメージとしてコミット
            container.commit(repository=snapshot_name, tag="forensic")
            
            incident_record["actions_taken"].append(f"Snapshot created: {snapshot_name}")
            print(f"   ✅ Snapshot created: {snapshot_name}")
            
        except Exception as e:
            print(f"   ❌ Failed to create snapshot: {e}")
    
    def isolate_container(self, container, incident_record):
        """コンテナ隔離"""
        try:
            # ネットワークから切断
            networks = container.attrs['NetworkSettings']['Networks']
            for network_name in networks:
                network = self.docker_client.networks.get(network_name)
                network.disconnect(container)
            
            # 隔離用ネットワークに接続
            isolation_network = self.get_or_create_isolation_network()
            isolation_network.connect(container)
            
            incident_record["actions_taken"].append("Container isolated from production networks")
            print(f"   ✅ Container isolated")
            
        except Exception as e:
            print(f"   ❌ Failed to isolate container: {e}")
    
    def get_or_create_isolation_network(self):
        """隔離用ネットワーク取得・作成"""
        try:
            return self.docker_client.networks.get("isolation")
        except docker.errors.NotFound:
            return self.docker_client.networks.create(
                "isolation",
                driver="bridge",
                internal=True,  # 外部アクセス不可
                labels={"purpose": "security_isolation"}
            )
    
    def send_notifications(self, incident_record):
        """通知送信"""
        if self.config["notification"]["email"]["enabled"]:
            self.send_email_notification(incident_record)
        
        if self.config["notification"]["slack"]["enabled"]:
            self.send_slack_notification(incident_record)
    
    def send_email_notification(self, incident_record):
        """メール通知"""
        try:
            msg = MIMEMultipart()
            msg['From'] = "security-system@company.com"
            msg['To'] = ", ".join(self.config["notification"]["email"]["recipients"])
            msg['Subject'] = f"🚨 Security Incident: {incident_record['id']}"
            
            body = f"""
Security Incident Detected

Incident ID: {incident_record['id']}
Timestamp: {incident_record['timestamp']}
Container: {incident_record['container_name']} ({incident_record['container_id'][:12]})

Incident Details:
Type: {incident_record['incident']['type']}
Severity: {incident_record['incident']['severity']}
Message: {incident_record['incident']['message']}

Actions Taken:
{chr(10).join(f"- {action}" for action in incident_record['actions_taken'])}

Please investigate immediately.
            """
            
            msg.attach(MIMEText(body, 'plain'))
            
            server = smtplib.SMTP(
                self.config["notification"]["email"]["smtp_server"],
                self.config["notification"]["email"]["smtp_port"]
            )
            server.starttls()
            server.send_message(msg)
            server.quit()
            
            print("   ✅ Email notification sent")
            
        except Exception as e:
            print(f"   ❌ Failed to send email notification: {e}")
    
    def calculate_cpu_usage(self, stats):
        """CPU使用率計算"""
        cpu_delta = stats['cpu_stats']['cpu_usage']['total_usage'] - stats['precpu_stats']['cpu_usage']['total_usage']
        system_delta = stats['cpu_stats']['system_cpu_usage'] - stats['precpu_stats']['system_cpu_usage']
        cpu_count = len(stats['cpu_stats']['cpu_usage']['percpu_usage'])
        
        if system_delta > 0:
            return (cpu_delta / system_delta) * cpu_count * 100
        return 0
    
    def calculate_memory_usage(self, stats):
        """メモリ使用率計算"""
        memory_usage = stats['memory_stats']['usage']
        memory_limit = stats['memory_stats']['limit']
        return (memory_usage / memory_limit) * 100
    
    def count_network_connections(self, container):
        """ネットワーク接続数カウント"""
        try:
            result = container.exec_run("netstat -an | wc -l")
            return int(result.output.decode().strip())
        except:
            return 0
    
    def detect_suspicious_processes(self, container):
        """疑わしいプロセス検出"""
        suspicious = []
        
        try:
            result = container.exec_run("ps aux")
            processes = result.output.decode().split('\n')
            
            # 疑わしいプロセス名
            suspicious_names = [
                'nc', 'netcat', 'ncat',  # ネットワークツール
                'nmap', 'masscan',       # ポートスキャナー
                'wget', 'curl',          # 外部通信(コンテキスト依存)
                'python -c', 'perl -e', 'ruby -e',  # ワンライナー実行
                '/tmp/', '/var/tmp/',    # 一時ディレクトリからの実行
            ]
            
            for process in processes:
                for sus_name in suspicious_names:
                    if sus_name in process.lower():
                        suspicious.append(process.strip())
                        
        except Exception as e:
            print(f"Error detecting processes: {e}")
        
        return suspicious
    
    def save_incident_log(self):
        """インシデントログ保存"""
        with open('/var/log/security/incident_log.json', 'w') as f:
            json.dump(self.incident_log, f, indent=2, default=str)

def main():
    print("🔒 Security Incident Response System Starting...")
    
    response_system = IncidentResponseSystem()
    response_system.monitor_containers()

if __name__ == "__main__":
    main()

この記事では、エンタープライズ級のコンテナセキュリティと本番運用について詳しく解説しました。記事が長くなったため、この続きは記事4で残りの内容(コンプライアンス、バックアップ・災害復旧、運用自動化など)を扱うことにします。

現在のところ、以下の重要なセキュリティ要素をカバーしています:

  1. ゼロトラストアーキテクチャ - 基本的なセキュリティ原則
  2. セキュアなコンテナ設計 - Dockerfileとcompose設定
  3. 脆弱性管理 - 自動スキャンとポリシー検証
  4. 監視・ログ・アラート - 包括的な監視システム
  5. インシデント対応 - 自動化された脅威対応

次の記事4「チーム開発でのコンテナ環境標準化戦略」で、組織レベルでの運用効率化を扱う予定です。

関連記事

Webスクレイピング入門:株価・仮想通貨価格を取得してみよう
データ分析

Webスクレイピング入門:株価・仮想通貨価格を取得してみよう

Webスクレイピング入門:株価・仮想通貨価格を取得してみよう 投資やトレードをしていると、リアルタイムの価格情報が欲しくなりますよね。この記事では、Pythonを使って株価や仮想通貨価格を自動取得する方法を、初心者でもわかりやすく解説します。 Webスクレイピングとは? 基本概念...

VSCode設定ガイド:プロレベルの開発効率を実現するカスタマイズ完全版
データ分析

VSCode設定ガイド:プロレベルの開発効率を実現するカスタマイズ完全版

VSCode設定ガイド:プロレベルの開発効率を実現するカスタマイズ完全版 Visual Studio Code(VSCode)は、軽量でありながら強力な機能を持つ無料のコードエディタです。適切な設定とカスタマイズにより、プロフェッショナルな開発環境を構築できます。この記事では、初心者から上級者まで役...

CCXTを使って仮想通貨のトレードをしてみる(第4回)
データ分析

CCXTを使って仮想通貨のトレードをしてみる(第4回)

はじめに 注意: 仮想通貨取引には大きなリスクが伴います。必ず余剰資金で行い、税務・法務についても最新の情報を確認し、必要に応じて専門家の助言を受けてください。 第3回/useccxtpython3では、CCXTを使った仮想通貨の自動取引ボットのリスク管理とバックテストについて解説しました。今回は仮...

コメント

0/2000