コンテナ本番運用とセキュリティ強化ガイド:エンタープライズ級の実践手法

コンテナ本番運用とセキュリティ強化ガイド:エンタープライズ級の実践手法

開発環境でのコンテナ化に成功したら、次の重要なステップは本番環境での安全で効率的な運用です。本番環境では、開発時とは全く異なるセキュリティ要件、可用性要件、コンプライアンス要件が求められます。

この記事では、エンタープライズ環境で求められるコンテナセキュリティのベストプラクティス、継続的な監視・運用手法、そして実際の脅威に対する具体的な対策を詳しく解説します。金融、医療、政府機関などの高セキュリティ環境でも適用できる実践的な手法を習得できます。

エンタープライズコンテナセキュリティの基本原則

ゼロトラストアーキテクチャの実装

基本方針:

  • すべてのコンテナを信頼しない
  • 最小権限の原則を徹底
  • 継続的な監視と検証
  • 多層防御の実装

実装レベル:

# セキュリティ強化されたdocker-compose.yml
services:
  web:
    image: myapp:latest

    # セキュリティ設定
    read_only: true  # ファイルシステム読み取り専用
    tmpfs:
      - /tmp:noexec,nosuid,size=100m
      - /var/run:noexec,nosuid,size=50m

    # 権限制限
    cap_drop:
      - ALL  # すべての権限を削除
    cap_add:
      - NET_BIND_SERVICE  # 必要最小限のみ追加

    security_opt:
      - no-new-privileges:true  # 特権昇格防止
      - apparmor:myapp-profile  # AppArmor プロファイル

    # ユーザー制限
    user: "1000:1000"  # 非rootユーザー

    # リソース制限
    deploy:
      resources:
        limits:
          memory: 512M
          cpus: '0.5'
        reservations:
          memory: 256M
          cpus: '0.25'

    # ネットワーク分離
    networks:
      - frontend
      - backend

    # 環境変数(シークレット管理)
    environment:
      - DATABASE_URL_FILE=/run/secrets/db_url
      - API_KEY_FILE=/run/secrets/api_key

    secrets:
      - db_url
      - api_key

    # ヘルスチェック
    healthcheck:
      test: ["CMD", "curl", "-f", "http://localhost:8000/health"]
      interval: 30s
      timeout: 10s
      retries: 3
      start_period: 40s

# ネットワーク分離
networks:
  frontend:
    driver: bridge
    internal: false
  backend:
    driver: bridge
    internal: true  # 外部アクセス遮断

# シークレット管理
secrets:
  db_url:
    external: true
  api_key:
    external: true

セキュアなDockerfile設計

セキュリティ重視のマルチステージビルド:

# セキュア本番用Dockerfile
#==============================================================================
# セキュリティスキャン付きベースイメージ
#==============================================================================
FROM python:3.11-slim@sha256:12345abcdef as base

# セキュリティアップデート
RUN apt-get update && apt-get upgrade -y 
    && apt-get install -y --no-install-recommends 
        # 必要最小限のパッケージ
        ca-certificates 
        curl 
        gnupg 
    && rm -rf /var/lib/apt/lists/* 
    && apt-get clean

# セキュリティ設定
RUN echo 'umask 077' >> /etc/profile

#==============================================================================
# ビルドステージ(特権操作を分離)
#==============================================================================
FROM base as builder

# ビルド用パッケージ(本番には含まれない)
RUN apt-get update && apt-get install -y --no-install-recommends 
    build-essential 
    && rm -rf /var/lib/apt/lists/*

WORKDIR /build

# 依存関係のビルド
COPY requirements.txt .
RUN pip install --user --no-cache-dir --upgrade pip setuptools 
    && pip install --user --no-cache-dir -r requirements.txt

# セキュリティスキャン(ビルド時)
RUN pip install --user safety 
    && /root/.local/bin/safety check 
    && pip uninstall -y safety

#==============================================================================
# 本番実行ステージ
#==============================================================================
FROM base as production

# 非特権ユーザーの作成
RUN groupadd -r -g 1000 appgroup 
    && useradd -r -u 1000 -g appgroup -d /app -s /sbin/nologin 
        -c "Application User" appuser

# アプリケーションディレクトリ設定
WORKDIR /app
RUN chown appuser:appgroup /app

# ビルドステージから依存関係のみコピー
COPY --from=builder --chown=appuser:appgroup /root/.local /home/appuser/.local

# アプリケーションコードコピー
COPY --chown=appuser:appgroup src/ ./src/
COPY --chown=appuser:appgroup scripts/entrypoint.sh ./

# 実行権限設定(最小限)
RUN chmod 755 entrypoint.sh 
    && chmod -R 755 src/ 
    && find . -name "*.py" -exec chmod 644 {} ;

# 環境変数設定
ENV PATH="/home/appuser/.local/bin:$PATH"
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
ENV PYTHONPATH=/app

# セキュリティラベル
LABEL security.scan.date="$(date -u +%Y-%m-%dT%H:%M:%SZ)"
LABEL security.base.image="python:3.11-slim"
LABEL security.maintainer="security@company.com"

# 非rootユーザーに切り替え
USER appuser

# ヘルスチェック
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 
    CMD curl -f http://localhost:8000/health || exit 1

# エントリーポイント
ENTRYPOINT ["./entrypoint.sh"]
CMD ["python", "src/main.py"]

セキュリティ強化エントリーポイント:

#!/bin/bash
# scripts/entrypoint.sh - セキュリティ強化エントリーポイント

set -euo pipefail

# セキュリティチェック関数
security_check() {
    echo "🔒 Security validation starting..."

    # 権限チェック
    if [ "$(id -u)" -eq 0 ]; then
        echo "❌ Running as root is not allowed"
        exit 1
    fi

    # 必要なシークレットファイル確認
    local required_secrets=("DATABASE_URL_FILE" "API_KEY_FILE")
    for secret_var in "${required_secrets[@]}"; do
        local secret_file="${!secret_var:-}"
        if [ -z "$secret_file" ] || [ ! -f "$secret_file" ]; then
            echo "❌ Required secret not found: $secret_var"
            exit 1
        fi

        # ファイル権限チェック
        local perms=$(stat -c "%a" "$secret_file")
        if [ "$perms" != "400" ] && [ "$perms" != "600" ]; then
            echo "❌ Insecure permissions on secret file: $secret_file ($perms)"
            exit 1
        fi
    done

    # 環境変数サニタイゼーション
    unset HISTFILE
    unset HISTSIZE
    export HISTFILE=/dev/null

    echo "✅ Security validation passed"
}

# シークレット読み込み関数
load_secrets() {
    echo "🔑 Loading secrets..."

    if [ -f "${DATABASE_URL_FILE:-}" ]; then
        export DATABASE_URL=$(cat "$DATABASE_URL_FILE")
        echo "✅ Database URL loaded"
    fi

    if [ -f "${API_KEY_FILE:-}" ]; then
        export API_KEY=$(cat "$API_KEY_FILE")
        echo "✅ API key loaded"
    fi
}

# ヘルスチェック関数
health_check() {
    local max_attempts=30
    local attempt=1

    echo "🏥 Waiting for application to be ready..."

    while [ $attempt -le $max_attempts ]; do
        if curl -f -s http://localhost:8000/health > /dev/null 2>&1; then
            echo "✅ Application is healthy"
            return 0
        fi

        echo "⏳ Health check attempt $attempt/$max_attempts..."
        sleep 2
        ((attempt++))
    done

    echo "❌ Application failed to become healthy"
    return 1
}

# メイン実行
main() {
    echo "🚀 Application starting..."
    echo "User: $(whoami) ($(id))"
    echo "Working directory: $(pwd)"
    echo "Environment: ${ENVIRONMENT:-production}"

    # セキュリティチェック実行
    security_check

    # シークレット読み込み
    load_secrets

    # アプリケーション起動
    echo "🎯 Starting application: $*"
    exec "$@" &

    # プロセスID記録
    echo $! > /tmp/app.pid

    # ヘルスチェック実行
    if ! health_check; then
        echo "❌ Application startup failed"
        exit 1
    fi

    # シグナルハンドリング
    wait
}

# シグナルハンドラー
cleanup() {
    echo "🛑 Shutting down gracefully..."
    if [ -f /tmp/app.pid ]; then
        kill -TERM $(cat /tmp/app.pid) 2>/dev/null || true
        wait $(cat /tmp/app.pid) 2>/dev/null || true
    fi
    exit 0
}

trap cleanup SIGTERM SIGINT

# メイン実行
main "$@"

脆弱性管理と継続的セキュリティ

自動化セキュリティスキャン

CI/CDパイプラインでのセキュリティ統合:

# .github/workflows/security-scan.yml
name: Security Scan Pipeline

on:
  push:
    branches: [main, develop]
  pull_request:
    branches: [main]
  schedule:
    - cron: '0 2 * * *'  # 毎日実行

jobs:
  #============================================================================
  # 依存関係脆弱性スキャン
  #============================================================================
  dependency-scan:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v4

    - name: Setup Python
      uses: actions/setup-python@v4
      with:
        python-version: '3.11'

    - name: Install dependencies
      run: |
        pip install safety pip-audit
        pip install -r requirements.txt

    - name: Run Safety check
      run: safety check --json --output safety-report.json

    - name: Run pip-audit
      run: pip-audit --format=json --output=pip-audit-report.json

    - name: Upload security reports
      uses: actions/upload-artifact@v3
      with:
        name: dependency-scan-reports
        path: |
          safety-report.json
          pip-audit-report.json

  #============================================================================
  # コンテナイメージ脆弱性スキャン
  #============================================================================
  container-scan:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v4

    - name: Build container image
      run: docker build -t myapp:scan .

    - name: Run Trivy vulnerability scanner
      uses: aquasecurity/trivy-action@master
      with:
        image-ref: 'myapp:scan'
        format: 'sarif'
        output: 'trivy-results.sarif'
        severity: 'CRITICAL,HIGH,MEDIUM'

    - name: Run Grype vulnerability scanner
      run: |
        curl -sSfL https://raw.githubusercontent.com/anchore/grype/main/install.sh | sh -s -- -b /usr/local/bin
        grype myapp:scan -o json > grype-results.json

    - name: Upload Trivy scan results to GitHub Security tab
      uses: github/codeql-action/upload-sarif@v2
      with:
        sarif_file: 'trivy-results.sarif'

    - name: Upload vulnerability reports
      uses: actions/upload-artifact@v3
      with:
        name: container-scan-reports
        path: |
          trivy-results.sarif
          grype-results.json

  #============================================================================
  # コード品質・セキュリティスキャン
  #============================================================================
  code-scan:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v4

    - name: Setup Python
      uses: actions/setup-python@v4
      with:
        python-version: '3.11'

    - name: Install security tools
      run: |
        pip install bandit[toml] semgrep

    - name: Run Bandit security linter
      run: |
        bandit -r src/ -f json -o bandit-report.json

    - name: Run Semgrep security analysis
      run: |
        semgrep --config=auto --json --output=semgrep-report.json src/

    - name: Upload code scan reports
      uses: actions/upload-artifact@v3
      with:
        name: code-scan-reports
        path: |
          bandit-report.json
          semgrep-report.json

  #============================================================================
  # セキュリティポリシー検証
  #============================================================================
  policy-check:
    runs-on: ubuntu-latest
    steps:
    - uses: actions/checkout@v4

    - name: Build container image
      run: docker build -t myapp:policy .

    - name: Install Conftest
      run: |
        wget https://github.com/open-policy-agent/conftest/releases/download/v0.46.0/conftest_0.46.0_Linux_x86_64.tar.gz
        tar xzf conftest_0.46.0_Linux_x86_64.tar.gz
        sudo mv conftest /usr/local/bin

    - name: Run security policy tests
      run: |
        # Dockerfileポリシーチェック
        conftest test --policy security-policies/ Dockerfile

        # docker-compose.ymlポリシーチェック
        conftest test --policy security-policies/ docker-compose.yml

  #============================================================================
  # セキュリティレポート集約
  #============================================================================
  security-report:
    needs: [dependency-scan, container-scan, code-scan, policy-check]
    runs-on: ubuntu-latest
    if: always()

    steps:
    - name: Download all reports
      uses: actions/download-artifact@v3

    - name: Generate security summary
      run: |
        python3 scripts/generate-security-summary.py 
          --dependency-scan dependency-scan-reports/ 
          --container-scan container-scan-reports/ 
          --code-scan code-scan-reports/ 
          --output security-summary.html

    - name: Upload security summary
      uses: actions/upload-artifact@v3
      with:
        name: security-summary
        path: security-summary.html

セキュリティポリシー定義

OPA (Open Policy Agent) セキュリティポリシー:

# security-policies/docker-security.rego
package docker.security

# ルートユーザー実行禁止
deny[msg] {
    input.User == "root"
    msg := "Container must not run as root user"
}

deny[msg] {
    input.User == "0"
    msg := "Container must not run as UID 0"
}

# 特権モード禁止
deny[msg] {
    input.Privileged == true
    msg := "Privileged containers are not allowed"
}

# 読み取り専用ファイルシステム強制
deny[msg] {
    input.ReadonlyRootfs != true
    msg := "Container filesystem must be read-only"
}

# 不要な権限削除確認
required_dropped_caps := [
    "AUDIT_WRITE",
    "CHOWN", 
    "DAC_OVERRIDE",
    "FOWNER",
    "FSETID",
    "KILL",
    "MKNOD",
    "NET_BIND_SERVICE",
    "NET_RAW",
    "SETFCAP",
    "SETGID",
    "SETPCAP",
    "SETUID",
    "SYS_CHROOT"
]

deny[msg] {
    missing := required_dropped_caps[_]
    not missing in input.CapDrop
    msg := sprintf("Must drop capability: %v", [missing])
}

# リソース制限必須
deny[msg] {
    not input.Memory
    msg := "Memory limit must be specified"
}

deny[msg] {
    not input.CpuShares
    msg := "CPU limit must be specified"
}
# security-policies/compose-security.rego
package compose.security

# ネットワークセキュリティ
deny[msg] {
    service := input.services[_]
    service.network_mode == "host"
    msg := "Host network mode is not allowed"
}

# ボリュームセキュリティ
deny[msg] {
    service := input.services[_]
    volume := service.volumes[_]
    contains(volume, ":/:")
    not contains(volume, ":ro")
    msg := sprintf("Volume mount should be read-only: %v", [volume])
}

# シークレット管理
deny[msg] {
    service := input.services[_]
    env := service.environment[_]
    contains(env, "PASSWORD=")
    msg := "Passwords must not be set as environment variables"
}

deny[msg] {
    service := input.services[_]
    env := service.environment[_]
    contains(env, "SECRET=")
    msg := "Secrets must not be set as environment variables"
}

監視・ログ・アラート戦略

包括的監視システム

Prometheus + Grafana + AlertManager 統合:

# monitoring/docker-compose.monitoring.yml
services:
  #============================================================================
  # Prometheus - メトリクス収集
  #============================================================================
  prometheus:
    image: prom/prometheus:latest
    container_name: prometheus
    restart: unless-stopped
    ports:
      - "9090:9090"
    volumes:
      - ./prometheus/prometheus.yml:/etc/prometheus/prometheus.yml
      - ./prometheus/rules:/etc/prometheus/rules
      - prometheus_data:/prometheus
    command:
      - '--config.file=/etc/prometheus/prometheus.yml'
      - '--storage.tsdb.path=/prometheus'
      - '--web.console.libraries=/etc/prometheus/console_libraries'
      - '--web.console.templates=/etc/prometheus/consoles'
      - '--storage.tsdb.retention.time=30d'
      - '--web.enable-lifecycle'
      - '--web.enable-admin-api'

  #============================================================================
  # AlertManager - アラート管理
  #============================================================================
  alertmanager:
    image: prom/alertmanager:latest
    container_name: alertmanager
    restart: unless-stopped
    ports:
      - "9093:9093"
    volumes:
      - ./alertmanager/alertmanager.yml:/etc/alertmanager/alertmanager.yml
      - alertmanager_data:/alertmanager
    command:
      - '--config.file=/etc/alertmanager/alertmanager.yml'
      - '--storage.path=/alertmanager'

  #============================================================================
  # Grafana - 可視化ダッシュボード
  #============================================================================
  grafana:
    image: grafana/grafana:latest
    container_name: grafana
    restart: unless-stopped
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD_FILE=/run/secrets/grafana_admin_password
      - GF_SECURITY_DISABLE_GRAVATAR=true
      - GF_USERS_ALLOW_SIGN_UP=false
      - GF_SECURITY_COOKIE_SECURE=true
      - GF_SECURITY_STRICT_TRANSPORT_SECURITY=true
    volumes:
      - ./grafana/dashboards:/var/lib/grafana/dashboards
      - ./grafana/provisioning:/etc/grafana/provisioning
      - grafana_data:/var/lib/grafana
    secrets:
      - grafana_admin_password

  #============================================================================
  # cAdvisor - コンテナメトリクス
  #============================================================================
  cadvisor:
    image: gcr.io/cadvisor/cadvisor:latest
    container_name: cadvisor
    restart: unless-stopped
    ports:
      - "8080:8080"
    volumes:
      - /:/rootfs:ro
      - /var/run:/var/run:ro
      - /sys:/sys:ro
      - /var/lib/docker/:/var/lib/docker:ro
      - /dev/disk/:/dev/disk:ro
    privileged: true
    devices:
      - /dev/kmsg

  #============================================================================
  # Node Exporter - システムメトリクス
  #============================================================================
  node_exporter:
    image: prom/node-exporter:latest
    container_name: node_exporter
    restart: unless-stopped
    ports:
      - "9100:9100"
    volumes:
      - /proc:/host/proc:ro
      - /sys:/host/sys:ro
      - /:/rootfs:ro
    command:
      - '--path.procfs=/host/proc'
      - '--path.rootfs=/rootfs'
      - '--path.sysfs=/host/sys'
      - '--collector.filesystem.mount-points-exclude=^/(sys|proc|dev|host|etc)($$|/)'

  #============================================================================
  # Loki - ログ集約
  #============================================================================
  loki:
    image: grafana/loki:latest
    container_name: loki
    restart: unless-stopped
    ports:
      - "3100:3100"
    volumes:
      - ./loki/loki.yml:/etc/loki/local-config.yaml
      - loki_data:/loki
    command: -config.file=/etc/loki/local-config.yaml

  #============================================================================
  # Promtail - ログ収集
  #============================================================================
  promtail:
    image: grafana/promtail:latest
    container_name: promtail
    restart: unless-stopped
    volumes:
      - ./promtail/promtail.yml:/etc/promtail/config.yml
      - /var/log:/var/log:ro
      - /var/lib/docker/containers:/var/lib/docker/containers:ro
    command: -config.file=/etc/promtail/config.yml

volumes:
  prometheus_data:
  alertmanager_data:
  grafana_data:
  loki_data:

secrets:
  grafana_admin_password:
    external: true

セキュリティ特化アラートルール:

# prometheus/rules/security-alerts.yml
groups:
- name: container_security
  rules:
  # 高CPU使用率(DDoS攻撃など)
  - alert: HighCPUUsage
    expr: rate(container_cpu_usage_seconds_total[5m]) * 100 > 80
    for: 2m
    labels:
      severity: warning
      category: security
    annotations:
      summary: "High CPU usage detected on {{ $labels.name }}"
      description: "Container {{ $labels.name }} CPU usage is above 80% for more than 2 minutes"

  # 異常なメモリ使用(メモリリーク、攻撃)
  - alert: HighMemoryUsage
    expr: (container_memory_usage_bytes / container_spec_memory_limit_bytes) * 100 > 90
    for: 5m
    labels:
      severity: critical
      category: security
    annotations:
      summary: "High memory usage detected on {{ $labels.name }}"
      description: "Container {{ $labels.name }} memory usage is above 90% for more than 5 minutes"

  # 異常なネットワークトラフィック
  - alert: HighNetworkTraffic
    expr: rate(container_network_receive_bytes_total[5m]) > 100000000  # 100MB/s
    for: 1m
    labels:
      severity: warning
      category: security
    annotations:
      summary: "High network traffic detected on {{ $labels.name }}"
      description: "Container {{ $labels.name }} receiving more than 100MB/s"

  # コンテナ再起動頻発(攻撃による異常終了)
  - alert: ContainerRestartLoop
    expr: rate(container_last_seen[5m]) > 0.1
    for: 2m
    labels:
      severity: critical
      category: security
    annotations:
      summary: "Container restart loop detected"
      description: "Container {{ $labels.name }} is restarting frequently"

  # ファイルシステム書き込み検出(読み取り専用FS違反)
  - alert: FilesystemWriteAttempt
    expr: increase(container_fs_writes_total[5m]) > 0
    for: 0m
    labels:
      severity: critical
      category: security
    annotations:
      summary: "Unauthorized filesystem write detected"
      description: "Write attempt detected on read-only filesystem in {{ $labels.name }}"

- name: application_security  
  rules:
  # 異常なHTTPエラー率(攻撃パターン)
  - alert: HighHTTPErrorRate
    expr: rate(http_requests_total{status=~"4..|5.."}[5m]) / rate(http_requests_total[5m]) > 0.1
    for: 2m
    labels:
      severity: warning
      category: security
    annotations:
      summary: "High HTTP error rate detected"
      description: "HTTP error rate is above 10% for more than 2 minutes"

  # 認証失敗の頻発
  - alert: HighAuthFailureRate
    expr: rate(auth_failures_total[5m]) > 5
    for: 1m
    labels:
      severity: critical
      category: security
    annotations:
      summary: "High authentication failure rate"
      description: "More than 5 authentication failures per second"

  # データベース接続エラー
  - alert: DatabaseConnectionFailure
    expr: up{job="database"} == 0
    for: 30s
    labels:
      severity: critical
      category: availability
    annotations:
      summary: "Database connection failure"
      description: "Cannot connect to database"

セキュリティ専用ログ分析

セキュリティイベント検出スクリプト:

#!/usr/bin/env python3
# scripts/security-log-analyzer.py
import json
import re
import sys
import argparse
from datetime import datetime, timedelta
from collections import defaultdict, Counter
import geoip2.database
import requests
class SecurityLogAnalyzer:
def __init__(self, geoip_db_path="/opt/GeoLite2-City.mmdb"):
self.geoip_reader = None
try:
self.geoip_reader = geoip2.database.Reader(geoip_db_path)
except:
print("Warning: GeoIP database not available")
# 脅威パターン定義
self.threat_patterns = {
'sql_injection': [
r"(?i)(union.*select|select.*from.*where)",
r"(?i)(drop.*table|delete.*from)",
r"(?i)('.*or.*'=')",
],
'xss_attempt': [
r"(?i)(<script|javascript:|onw+s*=)",
r"(?i)(alerts*(|confirms*()",
],
'path_traversal': [
r"../|..\",
r"(?i)(/etc/passwd|/etc/shadow)",
],
'brute_force': [
r"(?i)(failed.*login|authentication.*failed)",
r"(?i)(invalid.*password|wrong.*credentials)",
],
'reconnaissance': [
r"(?i)(robots.txt|.well-known)",
r"(?i)(wp-admin|phpmyadmin)",
r"(?i)(scan|probe|enum)",
]
}
def analyze_logs(self, log_file_path):
"""ログファイルを分析してセキュリティ脅威を検出"""
security_events = []
ip_stats = Counter()
user_agent_stats = Counter()
attack_types = Counter()
with open(log_file_path, 'r') as f:
for line_num, line in enumerate(f, 1):
try:
# JSON形式のログをパース
if line.strip().startswith('{'):
log_entry = json.loads(line)
else:
# 標準的なApache/Nginxログ形式をパース
log_entry = self.parse_access_log(line)
# セキュリティ脅威分析
threats = self.detect_threats(log_entry)
if threats:
event = {
'timestamp': log_entry.get('timestamp'),
'ip': log_entry.get('remote_addr'),
'user_agent': log_entry.get('user_agent'),
'request': log_entry.get('request'),
'status': log_entry.get('status'),
'threats': threats,
'line_number': line_num
}
# GeoIP情報追加
if self.geoip_reader and event['ip']:
geo_info = self.get_geo_info(event['ip'])
event['geo'] = geo_info
security_events.append(event)
# 統計更新
ip_stats[event['ip']] += 1
user_agent_stats[event['user_agent']] += 1
for threat in threats:
attack_types[threat] += 1
except Exception as e:
print(f"Error processing line {line_num}: {e}")
continue
return {
'events': security_events,
'statistics': {
'top_attacking_ips': ip_stats.most_common(10),
'top_user_agents': user_agent_stats.most_common(10),
'attack_types': attack_types.most_common(),
'total_events': len(security_events)
}
}
def detect_threats(self, log_entry):
"""ログエントリから脅威パターンを検出"""
threats = []
request = log_entry.get('request', '')
user_agent = log_entry.get('user_agent', '')
for threat_type, patterns in self.threat_patterns.items():
for pattern in patterns:
if re.search(pattern, request) or re.search(pattern, user_agent):
threats.append(threat_type)
break
return threats
def parse_access_log(self, line):
"""標準的なアクセスログ形式をパース"""
# Nginx/Apache combined log format
pattern = r'(S+) S+ S+ [(.*?)] "(.*?)" (d+) S+ "(.*?)" "(.*?)"'
match = re.match(pattern, line)
if match:
return {
'remote_addr': match.group(1),
'timestamp': match.group(2),
'request': match.group(3),
'status': int(match.group(4)),
'referer': match.group(5),
'user_agent': match.group(6)
}
return {}
def get_geo_info(self, ip):
"""IP アドレスの地理的情報を取得"""
try:
response = self.geoip_reader.city(ip)
return {
'country': response.country.name,
'city': response.city.name,
'latitude': float(response.location.latitude),
'longitude': float(response.location.longitude)
}
except:
return None
def check_threat_intelligence(self, ip):
"""脅威インテリジェンスDBで IP をチェック"""
# VirusTotal API を使用した例
api_key = "YOUR_VT_API_KEY"
url = f"https://www.virustotal.com/vtapi/v2/ip-address/report"
params = {'apikey': api_key, 'ip': ip}
try:
response = requests.get(url, params=params, timeout=5)
data = response.json()
if data.get('response_code') == 1:
return {
'malicious': data.get('positives', 0) > 0,
'detections': data.get('positives', 0),
'total_scans': data.get('total', 0)
}
except:
pass
return None
def generate_report(self, analysis_result, output_format='json'):
"""分析結果のレポート生成"""
report = {
'analysis_timestamp': datetime.now().isoformat(),
'summary': {
'total_security_events': analysis_result['statistics']['total_events'],
'unique_attacking_ips': len(analysis_result['statistics']['top_attacking_ips']),
'most_common_attack': analysis_result['statistics']['attack_types'][0] if analysis_result['statistics']['attack_types'] else None
},
'events': analysis_result['events'][:100],  # Top 100 events
'statistics': analysis_result['statistics']
}
if output_format == 'json':
return json.dumps(report, indent=2, default=str)
elif output_format == 'html':
return self.generate_html_report(report)
return report
def generate_html_report(self, report):
"""HTML形式のセキュリティレポート生成"""
html = f"""
<!DOCTYPE html>
<html>
<head>
<title>Security Analysis Report</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 20px; }}
.header {{ background: #f44336; color: white; padding: 20px; }}
.summary {{ background: #ffeb3b; padding: 15px; margin: 10px 0; }}
.event {{ background: #f5f5f5; margin: 10px 0; padding: 10px; border-left: 4px solid #f44336; }}
.stats {{ display: flex; justify-content: space-between; }}
.stat-box {{ background: white; padding: 15px; margin: 10px; border: 1px solid #ddd; }}
</style>
</head>
<body>
<div class="header">
<h1>🔒 Security Analysis Report</h1>
<p>Generated: {report['analysis_timestamp']}</p>
</div>
<div class="summary">
<h2>📊 Summary</h2>
<div class="stats">
<div class="stat-box">
<h3>{report['summary']['total_security_events']}</h3>
<p>Security Events</p>
</div>
<div class="stat-box">
<h3>{report['summary']['unique_attacking_ips']}</h3>
<p>Unique Attacking IPs</p>
</div>
<div class="stat-box">
<h3>{report['summary'].get('most_common_attack', 'N/A')}</h3>
<p>Most Common Attack</p>
</div>
</div>
</div>
<div class="events">
<h2>🚨 Security Events</h2>
"""
for event in report['events'][:20]:  # Top 20 events
html += f"""
<div class="event">
<strong>IP:</strong> {event.get('ip', 'Unknown')} |
<strong>Time:</strong> {event.get('timestamp', 'Unknown')} |
<strong>Threats:</strong> {', '.join(event.get('threats', []))}
<br>
<strong>Request:</strong> <code>{event.get('request', '')}</code>
</div>
"""
html += """
</div>
</body>
</html>
"""
return html
def main():
parser = argparse.ArgumentParser(description='Security Log Analyzer')
parser.add_argument('log_file', help='Path to log file')
parser.add_argument('--output', '-o', default='report.json', help='Output file')
parser.add_argument('--format', '-f', choices=['json', 'html'], default='json', help='Output format')
parser.add_argument('--geoip-db', help='Path to GeoIP database')
args = parser.parse_args()
analyzer = SecurityLogAnalyzer(args.geoip_db) if args.geoip_db else SecurityLogAnalyzer()
print(f"🔍 Analyzing security logs: {args.log_file}")
analysis_result = analyzer.analyze_logs(args.log_file)
print(f"📊 Found {analysis_result['statistics']['total_events']} security events")
report = analyzer.generate_report(analysis_result, args.format)
with open(args.output, 'w') as f:
f.write(report)
print(f"📄 Report saved to: {args.output}")
if __name__ == "__main__":
main()

インシデント対応とフォレンジック

自動化インシデント対応

セキュリティインシデント検出・対応システム:

#!/usr/bin/env python3
# scripts/incident-response.py
import docker
import json
import subprocess
import time
from datetime import datetime
from typing import Dict, List, Any
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
class IncidentResponseSystem:
def __init__(self, config_file="incident-config.json"):
self.docker_client = docker.from_env()
self.config = self.load_config(config_file)
self.incident_log = []
def load_config(self, config_file):
"""設定ファイル読み込み"""
try:
with open(config_file, 'r') as f:
return json.load(f)
except FileNotFoundError:
return self.get_default_config()
def get_default_config(self):
"""デフォルト設定"""
return {
"thresholds": {
"cpu_usage": 90,
"memory_usage": 95,
"disk_usage": 85,
"network_connections": 1000
},
"response_actions": {
"isolate_container": True,
"capture_logs": True,
"create_snapshot": True,
"notify_admins": True
},
"notification": {
"email": {
"enabled": True,
"smtp_server": "smtp.company.com",
"smtp_port": 587,
"recipients": ["security@company.com"]
},
"slack": {
"enabled": False,
"webhook_url": ""
}
}
}
def monitor_containers(self):
"""コンテナ監視とインシデント検出"""
print("🔍 Starting container monitoring...")
while True:
try:
containers = self.docker_client.containers.list()
for container in containers:
# メトリクス取得
stats = container.stats(stream=False)
# インシデント検出
incidents = self.detect_incidents(container, stats)
for incident in incidents:
self.handle_incident(container, incident)
time.sleep(30)  # 30秒間隔で監視
except KeyboardInterrupt:
print("👋 Monitoring stopped")
break
except Exception as e:
print(f"❌ Monitoring error: {e}")
time.sleep(60)
def detect_incidents(self, container, stats) -> List[Dict]:
"""インシデント検出ロジック"""
incidents = []
# CPU使用率チェック
cpu_usage = self.calculate_cpu_usage(stats)
if cpu_usage > self.config["thresholds"]["cpu_usage"]:
incidents.append({
"type": "high_cpu_usage",
"severity": "critical",
"value": cpu_usage,
"threshold": self.config["thresholds"]["cpu_usage"],
"message": f"CPU usage {cpu_usage:.1f}% exceeds threshold"
})
# メモリ使用率チェック
memory_usage = self.calculate_memory_usage(stats)
if memory_usage > self.config["thresholds"]["memory_usage"]:
incidents.append({
"type": "high_memory_usage",
"severity": "critical",
"value": memory_usage,
"threshold": self.config["thresholds"]["memory_usage"],
"message": f"Memory usage {memory_usage:.1f}% exceeds threshold"
})
# ネットワーク接続数チェック
network_connections = self.count_network_connections(container)
if network_connections > self.config["thresholds"]["network_connections"]:
incidents.append({
"type": "high_network_connections",
"severity": "warning",
"value": network_connections,
"threshold": self.config["thresholds"]["network_connections"],
"message": f"Network connections {network_connections} exceeds threshold"
})
# 異常プロセス検出
suspicious_processes = self.detect_suspicious_processes(container)
if suspicious_processes:
incidents.append({
"type": "suspicious_processes",
"severity": "critical",
"value": suspicious_processes,
"message": f"Suspicious processes detected: {suspicious_processes}"
})
return incidents
def handle_incident(self, container, incident):
"""インシデント対応処理"""
incident_id = f"INC-{datetime.now().strftime('%Y%m%d-%H%M%S')}-{container.short_id}"
incident_record = {
"id": incident_id,
"timestamp": datetime.now().isoformat(),
"container_id": container.id,
"container_name": container.name,
"incident": incident,
"actions_taken": []
}
print(f"🚨 INCIDENT DETECTED: {incident_id}")
print(f"   Container: {container.name}")
print(f"   Type: {incident['type']}")
print(f"   Severity: {incident['severity']}")
print(f"   Message: {incident['message']}")
# 対応アクション実行
if self.config["response_actions"]["capture_logs"]:
self.capture_container_logs(container, incident_record)
if self.config["response_actions"]["create_snapshot"]:
self.create_container_snapshot(container, incident_record)
if incident['severity'] == 'critical':
if self.config["response_actions"]["isolate_container"]:
self.isolate_container(container, incident_record)
if self.config["response_actions"]["notify_admins"]:
self.send_notifications(incident_record)
# インシデントログに記録
self.incident_log.append(incident_record)
self.save_incident_log()
def capture_container_logs(self, container, incident_record):
"""コンテナログキャプチャ"""
try:
logs = container.logs(tail=1000).decode('utf-8')
log_file = f"/var/log/security/incidents/{incident_record['id']}_logs.txt"
with open(log_file, 'w') as f:
f.write(logs)
incident_record["actions_taken"].append(f"Logs captured: {log_file}")
print(f"   ✅ Logs captured: {log_file}")
except Exception as e:
print(f"   ❌ Failed to capture logs: {e}")
def create_container_snapshot(self, container, incident_record):
"""コンテナスナップショット作成"""
try:
snapshot_name = f"incident-{incident_record['id']}"
# コンテナをイメージとしてコミット
container.commit(repository=snapshot_name, tag="forensic")
incident_record["actions_taken"].append(f"Snapshot created: {snapshot_name}")
print(f"   ✅ Snapshot created: {snapshot_name}")
except Exception as e:
print(f"   ❌ Failed to create snapshot: {e}")
def isolate_container(self, container, incident_record):
"""コンテナ隔離"""
try:
# ネットワークから切断
networks = container.attrs['NetworkSettings']['Networks']
for network_name in networks:
network = self.docker_client.networks.get(network_name)
network.disconnect(container)
# 隔離用ネットワークに接続
isolation_network = self.get_or_create_isolation_network()
isolation_network.connect(container)
incident_record["actions_taken"].append("Container isolated from production networks")
print(f"   ✅ Container isolated")
except Exception as e:
print(f"   ❌ Failed to isolate container: {e}")
def get_or_create_isolation_network(self):
"""隔離用ネットワーク取得・作成"""
try:
return self.docker_client.networks.get("isolation")
except docker.errors.NotFound:
return self.docker_client.networks.create(
"isolation",
driver="bridge",
internal=True,  # 外部アクセス不可
labels={"purpose": "security_isolation"}
)
def send_notifications(self, incident_record):
"""通知送信"""
if self.config["notification"]["email"]["enabled"]:
self.send_email_notification(incident_record)
if self.config["notification"]["slack"]["enabled"]:
self.send_slack_notification(incident_record)
def send_email_notification(self, incident_record):
"""メール通知"""
try:
msg = MIMEMultipart()
msg['From'] = "security-system@company.com"
msg['To'] = ", ".join(self.config["notification"]["email"]["recipients"])
msg['Subject'] = f"🚨 Security Incident: {incident_record['id']}"
body = f"""
Security Incident Detected
Incident ID: {incident_record['id']}
Timestamp: {incident_record['timestamp']}
Container: {incident_record['container_name']} ({incident_record['container_id'][:12]})
Incident Details:
Type: {incident_record['incident']['type']}
Severity: {incident_record['incident']['severity']}
Message: {incident_record['incident']['message']}
Actions Taken:
{chr(10).join(f"- {action}" for action in incident_record['actions_taken'])}
Please investigate immediately.
"""
msg.attach(MIMEText(body, 'plain'))
server = smtplib.SMTP(
self.config["notification"]["email"]["smtp_server"],
self.config["notification"]["email"]["smtp_port"]
)
server.starttls()
server.send_message(msg)
server.quit()
print("   ✅ Email notification sent")
except Exception as e:
print(f"   ❌ Failed to send email notification: {e}")
def calculate_cpu_usage(self, stats):
"""CPU使用率計算"""
cpu_delta = stats['cpu_stats']['cpu_usage']['total_usage'] - stats['precpu_stats']['cpu_usage']['total_usage']
system_delta = stats['cpu_stats']['system_cpu_usage'] - stats['precpu_stats']['system_cpu_usage']
cpu_count = len(stats['cpu_stats']['cpu_usage']['percpu_usage'])
if system_delta > 0:
return (cpu_delta / system_delta) * cpu_count * 100
return 0
def calculate_memory_usage(self, stats):
"""メモリ使用率計算"""
memory_usage = stats['memory_stats']['usage']
memory_limit = stats['memory_stats']['limit']
return (memory_usage / memory_limit) * 100
def count_network_connections(self, container):
"""ネットワーク接続数カウント"""
try:
result = container.exec_run("netstat -an | wc -l")
return int(result.output.decode().strip())
except:
return 0
def detect_suspicious_processes(self, container):
"""疑わしいプロセス検出"""
suspicious = []
try:
result = container.exec_run("ps aux")
processes = result.output.decode().split('n')
# 疑わしいプロセス名
suspicious_names = [
'nc', 'netcat', 'ncat',  # ネットワークツール
'nmap', 'masscan',       # ポートスキャナー
'wget', 'curl',          # 外部通信(コンテキスト依存)
'python -c', 'perl -e', 'ruby -e',  # ワンライナー実行
'/tmp/', '/var/tmp/',    # 一時ディレクトリからの実行
]
for process in processes:
for sus_name in suspicious_names:
if sus_name in process.lower():
suspicious.append(process.strip())
except Exception as e:
print(f"Error detecting processes: {e}")
return suspicious
def save_incident_log(self):
"""インシデントログ保存"""
with open('/var/log/security/incident_log.json', 'w') as f:
json.dump(self.incident_log, f, indent=2, default=str)
def main():
print("🔒 Security Incident Response System Starting...")
response_system = IncidentResponseSystem()
response_system.monitor_containers()
if __name__ == "__main__":
main()

この記事では、エンタープライズ級のコンテナセキュリティと本番運用について詳しく解説しました。記事が長くなったため、この続きは記事4で残りの内容(コンプライアンス、バックアップ・災害復旧、運用自動化など)を扱うことにします。

現在のところ、以下の重要なセキュリティ要素をカバーしています:

  1. ゼロトラストアーキテクチャ – 基本的なセキュリティ原則
  2. セキュアなコンテナ設計 – Dockerfileとcompose設定
  3. 脆弱性管理 – 自動スキャンとポリシー検証
  4. 監視・ログ・アラート – 包括的な監視システム
  5. インシデント対応 – 自動化された脅威対応

次の記事4「チーム開発でのコンテナ環境標準化戦略」で、組織レベルでの運用効率化を扱う予定です。

コメントする