高性能データ分析入門:Pandas最適化と大規模データ処理の実践

約107分で読めます by ぽんたぬき
高性能データ分析入門:Pandas最適化と大規模データ処理の実践

高性能データ分析入門:Pandas最適化と大規模データ処理の実践

Pandasの基本操作をマスターした後、次のステップはパフォーマンス最適化大規模データ処理です。本記事では、実際のプロダクション環境で使える高度な技術を解説します。

なぜパフォーマンス最適化が重要か

現実的な課題

メモリ不足: 数GB〜数TBのデータ処理時のRAM制限 処理時間: バッチ処理やリアルタイム分析での応答時間要件 コスト効率: クラウド環境でのリソース使用量最適化 スケーラビリティ: データ量増加に対する処理能力の拡張性

パフォーマンスの指標

import pandas as pd
import numpy as np
import time
import psutil
import gc
from memory_profiler import profile
from functools import wraps

class PerformanceProfiler:
    """
    パフォーマンス計測ユーティリティクラス
    
    デコレータパターンを使用して、関数の実行時間とメモリ使用量を
    非侵入的に計測します。プロダクション環境での性能監視や
    最適化効果の定量評価に活用できます。
    
    主な機能:
    - 実行時間の精密測定(マイクロ秒レベル)
    - メモリ使用量の追跡(RSS基準)
    - DataFrameの詳細プロファイリング
    
    使用例:
    @profiler.measure_time
    @profiler.measure_memory
    def your_data_processing_function():
        # データ処理のコード
        pass
    """
    
    @staticmethod
    def measure_time(func):
        """
        実行時間測定デコレータ
        
        time.perf_counter()を使用した高精度な時間測定を行います。
        この関数はモノトニックカウンタを使用するため、システム時刻の
        調整やスリープ状態の影響を受けません。
        
        測定精度: ナノ秒レベル(環境依存)
        オーバーヘッド: 数マイクロ秒程度
        
        適用場面:
        - 処理時間の比較検証
        - ボトルネック特定
        - 最適化効果の定量評価
        - SLA監視
        """
        @wraps(func)
        def wrapper(*args, **kwargs):
            # 高精度タイマーで開始時刻を記録
            start_time = time.perf_counter()
            
            # 実際の関数を実行
            result = func(*args, **kwargs)
            
            # 終了時刻を記録し、実行時間を計算
            end_time = time.perf_counter()
            execution_time = end_time - start_time
            
            # 結果を見やすい形式で出力
            print(f"⏱ {func.__name__}: {execution_time:.4f}秒")
            return result
        return wrapper
    
    @staticmethod
    def measure_memory(func):
        """
        メモリ使用量測定デコレータ
        
        psutilライブラリを使用してプロセスのメモリ使用量を監視します。
        RSS(Resident Set Size)を基準とし、実際に物理メモリに
        ロードされているデータ量を測定します。
        
        測定対象: プロセス全体のメモリ使用量
        測定単位: MB(メガバイト)
        測定タイミング: 関数実行前後の差分
        
        注意点:
        - 他のスレッド・プロセスの影響は含まない
        - ガベージコレクションのタイミングに依存
        - メモリプールの影響で実際の削減量と異なる場合がある
        """
        @wraps(func)
        def wrapper(*args, **kwargs):
            # ガベージコレクションを実行して正確な測定を行う
            gc.collect()
            
            # 現在のプロセスのメモリ使用量を取得
            process = psutil.Process()
            memory_before = process.memory_info().rss / 1024 / 1024  # Bytes -> MB
            
            # 実際の関数を実行
            result = func(*args, **kwargs)
            
            # 実行後のメモリ使用量を取得
            memory_after = process.memory_info().rss / 1024 / 1024  # Bytes -> MB
            memory_diff = memory_after - memory_before
            
            # メモリ変化量と総使用量を表示
            print(f"🧠 {func.__name__}: {memory_diff:+.2f}MB ({memory_after:.2f}MB total)")
            return result
        return wrapper
    
    @staticmethod
    def profile_dataframe(df, name="DataFrame"):
        """
        DataFrameの詳細プロファイリング
        
        DataFrameのメモリ使用量、データ型分布、欠損値などの
        重要な統計情報を表示します。データ読み込み後の状況確認や
        最適化前後の比較に活用できます。
        
        提供情報:
        - 形状(行数×列数)
        - 真のメモリ使用量(deep=True で文字列も含む)
        - データ型の分布(最適化可能性の判断)
        - 欠損値の統計(データ品質の評価)
        
        活用方法:
        - データ読み込み後の初期確認
        - 型最適化前後の効果測定
        - メモリ使用量の継続監視
        - データ品質の定期チェック
        """
        print(f"\n📊 {name} プロファイル:")
        print(f"   形状: {df.shape}")
        
        # deep=True で文字列やオブジェクト型の実際のメモリ使用量も計算
        total_memory = df.memory_usage(deep=True).sum() / 1024 / 1024
        print(f"   メモリ使用量: {total_memory:.2f}MB")
        
        # データ型の分布を表示(最適化の余地を判断)
        print(f"   データ型分布:")
        dtype_counts = df.dtypes.value_counts()
        for dtype, count in dtype_counts.items():
            print(f"     {dtype}: {count} 列")
        
        # 欠損値の統計(データ品質の評価)
        null_counts = df.isnull().sum()
        if null_counts.sum() > 0:
            print(f"   欠損値: {null_counts.sum()} 件")
            # 欠損率が高い列を特定
            high_null_cols = null_counts[null_counts > len(df) * 0.1]
            if len(high_null_cols) > 0:
                print(f"   高欠損率列 (>10%): {list(high_null_cols.index)}")

# 使用例の準備
profiler = PerformanceProfiler()

メモリ効率最適化

この章では、メモリ使用量を劇的に削減する技術を学びます。以下のクラスとメソッドを実装・活用します:

主要コンポーネント:

  • PerformanceProfiler: パフォーマンス測定とプロファイリング
  • DataTypeOptimizer: データ型の自動最適化エンジン
    • optimize_numeric_columns(): 数値型の最適化(int64→int8等)
    • optimize_categorical_columns(): 文字列のカテゴリ型変換
    • compare_memory_usage(): 最適化効果の定量評価

期待する成果:

  • メモリ使用量 30-70% 削減
  • データ処理速度の向上
  • 大容量データの安全な取り扱い

データ型の最適化

class DataTypeOptimizer:
    """
    データ型自動最適化エンジン
    
    メモリ効率を大幅に改善するため、データの実際の値範囲に基づいて
    最適なデータ型を自動選択します。通常30-70%のメモリ削減が可能です。
    
    最適化戦略:
    1. 数値データの範囲ベース型選択
    2. 文字列データのカテゴリ型変換  
    3. 精度保持を前提とした安全な変換
    
    期待効果:
    - メモリ使用量の大幅削減
    - 一部操作の高速化(groupby等)
    - CPUキャッシュ効率の向上
    """
    
    @staticmethod
    def optimize_numeric_columns(df):
        """
        数値列の型最適化
        
        データの実際の値範囲を分析し、精度を保ちながら
        最小限のメモリで表現可能なデータ型を選択します。
        
        最適化ルール:
        
        【整数型の最適化 (int64 → より小さな型)】
        - uint8:  0 ≤ value ≤ 255 (1バイト)
        - uint16: 0 ≤ value ≤ 65,535 (2バイト)  
        - uint32: 0 ≤ value ≤ 4,294,967,295 (4バイト)
        - int8:   -128 ≤ value ≤ 127 (1バイト)
        - int16:  -32,768 ≤ value ≤ 32,767 (2バイト)
        - int32:  -2,147,483,648 ≤ value ≤ 2,147,483,647 (4バイト)
        
        【浮動小数点型の最適化 (float64 → float32)】
        - 範囲チェック: np.finfo(np.float32)の範囲内
        - 精度チェック: np.allclose()による許容誤差内確認
        - メモリ半減効果(8バイト → 4バイト)
        
        適用条件:
        - データの範囲が小さい型で表現可能
        - 精度損失が許容範囲内(相対誤差1e-5以下)
        - 業務要件との整合性
        """
        optimized_df = df.copy()
        
        # 数値列のみを対象として処理
        for col in df.select_dtypes(include=['int64', 'float64']).columns:
            col_data = df[col]
            
            # 整数列の最適化処理
            if col_data.dtype == 'int64':
                col_min, col_max = col_data.min(), col_data.max()
                
                if col_min >= 0:  # 非負整数の場合、unsigned型を検討
                    if col_max <= 255:
                        optimized_df[col] = col_data.astype('uint8')  # 1バイト
                        print(f"✅ {col}: int64 → uint8 (範囲: 0-{col_max})")
                    elif col_max <= 65535:
                        optimized_df[col] = col_data.astype('uint16')  # 2バイト
                        print(f"✅ {col}: int64 → uint16 (範囲: 0-{col_max})")
                    elif col_max <= 4294967295:
                        optimized_df[col] = col_data.astype('uint32')  # 4バイト
                        print(f"✅ {col}: int64 → uint32 (範囲: 0-{col_max})")
                
                else:  # 負の値を含む場合、signed型を使用
                    if col_min >= -128 and col_max <= 127:
                        optimized_df[col] = col_data.astype('int8')  # 1バイト
                        print(f"✅ {col}: int64 → int8 (範囲: {col_min}-{col_max})")
                    elif col_min >= -32768 and col_max <= 32767:
                        optimized_df[col] = col_data.astype('int16')  # 2バイト
                        print(f"✅ {col}: int64 → int16 (範囲: {col_min}-{col_max})")
                    elif col_min >= -2147483648 and col_max <= 2147483647:
                        optimized_df[col] = col_data.astype('int32')  # 4バイト
                        print(f"✅ {col}: int64 → int32 (範囲: {col_min}-{col_max})")
            
            # 浮動小数点列の最適化処理
            elif col_data.dtype == 'float64':
                # float32の表現可能範囲内かチェック
                float32_info = np.finfo(np.float32)
                if (col_data.min() >= float32_info.min and 
                    col_data.max() <= float32_info.max):
                    
                    # 精度チェック: float32変換後の誤差を確認
                    float32_data = col_data.astype('float32')
                    
                    # 相対誤差1e-5以下なら精度が保たれていると判断
                    if np.allclose(col_data, float32_data, rtol=1e-5, equal_nan=True):
                        optimized_df[col] = float32_data
                        print(f"✅ {col}: float64 → float32 (精度保持)")
                    else:
                        print(f"⚠ {col}: float32変換で精度損失あり、変換をスキップ")
        
        return optimized_df
    
    @staticmethod
    def optimize_categorical_columns(df, threshold=0.5):
        """
        文字列列のカテゴリ型最適化
        
        繰り返しの多い文字列データをカテゴリ型に変換することで
        メモリ使用量を大幅に削減します。
        
        変換判定基準:
        - ユニーク率 ≤ threshold (デフォルト50%)
        - 文字列型 (object) のカラム
        - 重複パターンが多いデータ
        
        メモリ削減メカニズム:
        - 文字列の重複排除(カテゴリ辞書で管理)
        - 整数コードによる内部表現
        - 文字列コピーの削減
        
        追加利点:
        - groupby操作の高速化
        - メモリアクセスの局所性向上
        - 一部分析処理の最適化
        
        例: 100万行で10種類の文字列 → 90%以上のメモリ削減
        """
        optimized_df = df.copy()
        
        # オブジェクト型(主に文字列)の列を処理
        for col in df.select_dtypes(include=['object']).columns:
            unique_count = df[col].nunique()
            total_count = len(df)
            unique_ratio = unique_count / total_count
            
            # ユニーク率が閾値以下の場合はカテゴリ型に変換
            # 閾値が低いほど重複が多く、カテゴリ型のメリットが大きい
            if unique_ratio <= threshold:
                original_memory = df[col].memory_usage(deep=True)
                
                # カテゴリ型に変換
                optimized_df[col] = df[col].astype('category')
                
                # 変換後のメモリ使用量を計算
                new_memory = optimized_df[col].memory_usage(deep=True)
                reduction = (original_memory - new_memory) / original_memory * 100
                
                print(f"✅ {col}: カテゴリ型に変換")
                print(f"   ユニーク率: {unique_ratio:.2%} ({unique_count}/{total_count})")
                print(f"   メモリ削減: {reduction:.1f}% ({original_memory/1024/1024:.1f}MB → {new_memory/1024/1024:.1f}MB)")
            else:
                print(f"⚠ {col}: ユニーク率が高いため変換をスキップ ({unique_ratio:.2%})")
        
        return optimized_df
    
    @staticmethod
    def compare_memory_usage(df_original, df_optimized):
        """
        最適化効果の定量評価
        
        最適化前後のメモリ使用量を詳細に比較し、
        削減効果を定量的に評価します。
        
        計算内容:
        - 最適化前後のメモリ使用量(MB単位)
        - 削減率の百分率表示
        - 絶対削減量(MB単位)
        - 列別の削減効果(詳細分析)
        
        評価指標:
        - memory_usage(deep=True): 真のメモリ使用量
        - 削減率 = (削減量 / 元サイズ) × 100
        - ROI計算の基礎データ
        
        活用方法:
        - 最適化効果の可視化
        - 処理時間とのトレードオフ評価
        - 継続的なパフォーマンス監視
        """
        # 詳細なメモリ使用量を計算(文字列も含む)
        memory_before = df_original.memory_usage(deep=True).sum() / 1024 / 1024
        memory_after = df_optimized.memory_usage(deep=True).sum() / 1024 / 1024
        
        # 削減量と削減率を計算
        memory_reduction = memory_before - memory_after
        reduction_percentage = (memory_reduction / memory_before) * 100
        
        print(f"\n📈 メモリ使用量最適化結果:")
        print(f"   最適化前: {memory_before:.2f}MB")
        print(f"   最適化後: {memory_after:.2f}MB")
        print(f"   削減量: {memory_reduction:.2f}MB")
        print(f"   削減率: {reduction_percentage:.1f}%")
        
        # 列別の詳細分析
        print(f"\n📊 列別メモリ使用量の変化:")
        original_memory = df_original.memory_usage(deep=True)
        optimized_memory = df_optimized.memory_usage(deep=True)
        
        for col in df_original.columns:
            if col in df_optimized.columns:
                before = original_memory[col] / 1024 / 1024
                after = optimized_memory[col] / 1024 / 1024
                if before != after:
                    col_reduction = (before - after) / before * 100
                    print(f"   {col}: {before:.2f}MB → {after:.2f}MB ({col_reduction:.1f}% 削減)")
        
        return memory_after, reduction_percentage

# データ型最適化の実践例
@profiler.measure_time
@profiler.measure_memory
def demonstrate_dtype_optimization():
    """データ型最適化のデモンストレーション"""
    
    # サンプルデータ生成
    np.random.seed(42)
    n_rows = 1_000_000
    
    data = {
        'user_id': np.random.randint(1, 100000, n_rows),  # int64 → uint32可能
        'age': np.random.randint(18, 80, n_rows),         # int64 → uint8可能
        'score': np.random.random(n_rows) * 100,         # float64 → float32可能
        'category': np.random.choice(['A', 'B', 'C', 'D'], n_rows),  # object → category可能
        'product': np.random.choice([f'Product_{i}' for i in range(1000)], n_rows)
    }
    
    df_original = pd.DataFrame(data)
    print("📊 元データの情報:")
    profiler.profile_dataframe(df_original, "Original")
    
    # 最適化実行
    optimizer = DataTypeOptimizer()
    df_optimized = optimizer.optimize_numeric_columns(df_original)
    df_optimized = optimizer.optimize_categorical_columns(df_optimized)
    
    print("\n📊 最適化後データの情報:")
    profiler.profile_dataframe(df_optimized, "Optimized")
    
    # 比較結果
    optimizer.compare_memory_usage(df_original, df_optimized)
    
    return df_optimized

# 実行
optimized_data = demonstrate_dtype_optimization()

チャンクベース処理

大容量ファイルをメモリ制約下で安全に処理する技術を実装します:

実装するクラス・メソッド:

  • ChunkProcessor: チャンクベース処理エンジン
    • process_large_file(): 分割ファイル処理とメモリ監視
    • aggregate_chunks(): Map-Reduce式の分散集計
  • 進行状況の可視化とエラー回復機能

解決する課題:

  • RAM < データサイズ の状況での処理
  • 長時間バッチ処理の安定性
  • メモリリークの防止
class ChunkProcessor:
    """
    チャンクベース大容量データ処理エンジン
    
    メモリ制約下でのスケーラブルなデータ処理を実現します。
    RAMサイズを超える大容量ファイルも安全に処理可能です。
    
    設計思想:
    - メモリ制約の克服(データサイズ > RAM でも処理可能)
    - 進行状況の可視化(長時間処理の監視)
    - エラー時の部分結果保持(障害耐性)
    - スケーラブルな処理パターン(分散処理への拡張性)
    
    適用場面:
    - RAM < データサイズ の状況
    - ETL処理パイプライン
    - バッチ処理システム
    - ストリーミング風処理
    
    パフォーマンス特性:
    - メモリ使用量: O(chunk_size) で一定
    - 処理時間: O(total_size / chunk_size) の線形性
    - 障害耐性: チャンク単位での回復可能
    """
    
    def __init__(self, chunk_size=10000):
        """
        チャンクプロセッサの初期化
        
        chunk_size の選択指針:
        - 小さすぎ(<1000): I/Oオーバーヘッド増大、処理効率低下
        - 大きすぎ(>100000): メモリ不足リスク、進行状況の粒度低下
        - 推奨範囲: 10,000-100,000行
        
        調整要素:
        - 列数(多いほど小さく)
        - データ型(文字列が多いほど小さく)
        - 利用可能メモリ(少ないほど小さく)
        - 処理関数の複雑さ(重いほど小さく)
        
        メモリ計算例:
        chunk_size=50000, 20列, float64 → 約8MB/チャンク
        """
        self.chunk_size = chunk_size
        self.results = []  # 部分結果を保存(デバッグ用)
    
    @profiler.measure_time
    def process_large_file(self, file_path, processing_func):
        """
        大容量CSVファイルをチャンク単位で処理
        
        処理フロー:
        1. ファイルをchunk_size行ずつ読み込み
        2. 各チャンクに処理関数を適用
        3. メモリ使用量を定期監視
        4. 結果を順次蓄積
        5. 最終的な結合処理
        
        メモリ管理戦略:
        - チャンク完了後の即座解放(del, gc.collect)
        - 10チャンクごとのメモリ監視
        - 必要に応じたガベージコレクション
        - メモリリークの早期検出
        
        エラー処理:
        - チャンク単位での例外捕捉
        - 部分結果の保持(処理済み分は保存)
        - 詳細なエラー情報提供
        - リトライ可能な設計
        
        進行状況監視:
        - チャンク番号の表示
        - 処理行数の表示
        - メモリ使用量の監視
        - 推定完了時間(将来拡張)
        """
        chunk_results = []
        total_rows_processed = 0
        
        try:
            # pd.read_csv の chunksize パラメータで分割読み込み
            for i, chunk in enumerate(pd.read_csv(file_path, chunksize=self.chunk_size)):
                print(f"🔄 チャンク {i+1} 処理中... ({len(chunk)} 行)")
                
                # 各チャンクに処理関数を適用
                # processing_func は DataFrame を受け取り DataFrame を返すと仮定
                processed_chunk = processing_func(chunk)
                chunk_results.append(processed_chunk)
                
                total_rows_processed += len(chunk)
                
                # メモリ使用量の定期監視(10チャンクごと)
                if i % 10 == 0:
                    current_memory = psutil.Process().memory_info().rss / 1024 / 1024
                    print(f"   メモリ使用量: {current_memory:.2f}MB")
                    print(f"   累計処理行数: {total_rows_processed:,} 行")
                    
                    # メモリ使用量が異常に増加している場合の警告
                    # 実際の実装では閾値を設定して自動対処
                    if i > 0 and current_memory > 1000:  # 1GB を超えた場合
                        print(f"⚠ メモリ使用量が高くなっています: {current_memory:.2f}MB")
            
            # 全チャンクの結果を結合
            # ignore_index=True で連続したインデックスを生成
            final_result = pd.concat(chunk_results, ignore_index=True)
            
            print(f"✅ 処理完了: {len(chunk_results)} チャンク, 最終データ {len(final_result):,} 行")
            print(f"   総処理行数: {total_rows_processed:,} 行")
            
            return final_result
            
        except Exception as e:
            print(f"❌ エラー発生: {str(e)}")
            print(f"   処理済みチャンク数: {len(chunk_results)}")
            print(f"   部分結果の行数: {sum(len(cr) for cr in chunk_results) if chunk_results else 0}")
            
            # 部分結果がある場合は保存して返却
            if chunk_results:
                partial_result = pd.concat(chunk_results, ignore_index=True)
                print(f"   部分結果を返却: {len(partial_result)} 行")
                return partial_result
            
            return None
    
    @staticmethod
    def aggregate_chunks(file_path, agg_functions, group_by_cols, chunk_size=50000):
        """
        チャンク単位での集計処理
        
        Map-Reduce パラダイムを適用した分散集計アルゴリズム:
        
        【Map段階】各チャンクで部分集計を実行
        1. ファイルをチャンク単位で読み込み
        2. チャンク内でgroupbyを実行
        3. 集計関数を適用して部分結果を生成
        
        【Reduce段階】部分結果を統合して最終集計
        1. 全ての部分結果を結合
        2. 同一グループの結果をマージ
        3. 集計関数の特性に応じた再集計
        
        数学的基盤:
        - 結合法則: (a + b) + c = a + (b + c)
        - 分散計算理論における分割統治法
        - 集計関数の合成可能性
        
        対応集計関数:
        - sum: 線形加算 → sum(部分sum)
        - count: カウント加算 → sum(部分count)
        - mean: 加重平均 → sum(部分sum)/sum(部分count)
        - min/max: 比較演算 → min/max(部分結果)
        
        制約事項:
        - 非結合的集計(中央値、分位数等)は非対応
        - メモリ内での中間結果保持が必要
        - グループ数が多い場合のメモリ制約
        
        例: 1億行のデータを1万行ずつ処理 → 1万回の部分集計
        """
        partial_results = []
        chunk_count = 0
        
        print(f"📊 チャンク単位集計処理開始 (chunk_size: {chunk_size:,})")
        
        # 各チャンクで部分集計を実行(Map段階)
        for chunk in pd.read_csv(file_path, chunksize=chunk_size):
            chunk_count += 1
            
            # チャンク内でグループ化と集計を実行
            chunk_agg = chunk.groupby(group_by_cols).agg(agg_functions)
            partial_results.append(chunk_agg)
            
            # 進行状況の表示
            if chunk_count % 100 == 0:
                print(f"   処理済みチャンク: {chunk_count}")
        
        print(f"📈 Map段階完了: {chunk_count} チャンク処理")
        
        # 部分集計結果を統合(Reduce段階)
        print(f"🔄 Reduce段階: 部分結果の統合中...")
        combined = pd.concat(partial_results)
        
        # 同一グループの結果を再集計
        # 集計関数の特性に応じた適切な統合処理
        final_agg_funcs = {}
        for col, funcs in agg_functions.items():
            if isinstance(funcs, list):
                # 複数集計関数の場合
                col_funcs = {}
                for func in funcs:
                    if func in ['sum', 'count']:
                        col_funcs[func] = 'sum'  # 部分結果の合計
                    elif func == 'mean':
                        # 平均は sum/count で再計算が必要(簡略化)
                        col_funcs[func] = 'mean'
                    elif func in ['min', 'max']:
                        col_funcs[func] = func  # min/max を再適用
                final_agg_funcs[col] = col_funcs
            else:
                # 単一集計関数の場合
                if funcs in ['sum', 'count']:
                    final_agg_funcs[col] = 'sum'
                elif funcs == 'mean':
                    final_agg_funcs[col] = 'mean'
                elif funcs in ['min', 'max']:
                    final_agg_funcs[col] = funcs
        
        # 最終集計の実行
        final_agg = combined.groupby(level=0).agg(final_agg_funcs)
        
        print(f"✅ 集計完了: {len(final_agg)} グループ")
        
        return final_agg

# サンプルデータの作成と処理例
def create_sample_large_file():
    """大容量サンプルファイルの作成"""
    n_rows = 500_000
    np.random.seed(42)
    
    data = {
        'timestamp': pd.date_range('2023-01-01', periods=n_rows, freq='1min'),
        'user_id': np.random.randint(1, 10000, n_rows),
        'product_id': np.random.randint(1, 1000, n_rows),
        'price': np.random.uniform(100, 5000, n_rows),
        'quantity': np.random.randint(1, 10, n_rows),
        'region': np.random.choice(['Asia', 'Europe', 'Americas'], n_rows)
    }
    
    df = pd.DataFrame(data)
    df['revenue'] = df['price'] * df['quantity']
    
    # CSVファイルとして保存
    sample_file = 'large_sales_data.csv'
    df.to_csv(sample_file, index=False)
    print(f"📁 サンプルファイル作成: {sample_file} ({len(df)} 行)")
    
    return sample_file

def revenue_analysis_function(chunk):
    """チャンクごとの売上分析処理"""
    # 売上計算
    chunk['revenue'] = chunk['price'] * chunk['quantity']
    
    # 地域別集計
    region_summary = chunk.groupby('region').agg({
        'revenue': 'sum',
        'quantity': 'sum',
        'user_id': 'nunique'
    }).reset_index()
    
    return region_summary

# チャンク処理の実行例
sample_file = create_sample_large_file()
processor = ChunkProcessor(chunk_size=25000)

print("\n🔄 チャンク処理による大容量データ分析:")
result = processor.process_large_file(sample_file, revenue_analysis_function)

if result is not None:
    print("\n📊 地域別集計結果:")
    final_summary = result.groupby('region').agg({
        'revenue': 'sum',
        'quantity': 'sum', 
        'user_id': 'sum'
    }).round(2)
    print(final_summary)

高速化テクニック

この章では、計算処理を10-100倍高速化する技術を習得します:

実装する最適化技術:

  • VectorizationOptimizer: ベクトル化最適化クラス
    • compare_vectorization_vs_loop(): ループ vs ベクトル化の性能比較
    • optimize_string_operations(): 文字列処理の高速化
    • numpy_integration_examples(): NumPy統合による高速化

技術要素:

  • SIMD命令の活用
  • ループオーバーヘッドの削減
  • メモリアクセスパターンの最適化

ベクトル化とNumPy活用

class VectorizationOptimizer:
    """ベクトル化による高速化テクニック"""
    
    @staticmethod
    @profiler.measure_time
    def compare_vectorization_vs_loop(df, iterations=3):
        """ベクトル化とループの性能比較"""
        
        print("🚀 ベクトル化 vs ループ処理の比較:")
        
        # サンプルデータ準備
        df_test = df.copy()
        
        # 方法1: ループ処理(避けるべき)
        def loop_method(data):
            result = []
            for _, row in data.iterrows():
                if row['price'] > 1000:
                    result.append(row['price'] * 1.1)  # 10%値上げ
                else:
                    result.append(row['price'] * 1.05)  # 5%値上げ
            return result
        
        # 方法2: ベクトル化(推奨)
        def vectorized_method(data):
            return np.where(data['price'] > 1000, 
                          data['price'] * 1.1, 
                          data['price'] * 1.05)
        
        # 方法3: eval()を使った高速化
        def eval_method(data):
            return data.eval('price * (1.1 if price > 1000 else 1.05)')
        
        # 実行時間計測
        print("\n⏱ 実行時間比較(小さなデータセット):")
        small_df = df_test.head(1000)
        
        start = time.perf_counter()
        for _ in range(iterations):
            result1 = loop_method(small_df)
        loop_time = (time.perf_counter() - start) / iterations
        
        start = time.perf_counter()
        for _ in range(iterations):
            result2 = vectorized_method(small_df)
        vectorized_time = (time.perf_counter() - start) / iterations
        
        print(f"   ループ処理: {loop_time:.4f}秒")
        print(f"   ベクトル化: {vectorized_time:.4f}秒")
        print(f"   高速化率: {loop_time/vectorized_time:.1f}倍")
        
        return result2
    
    @staticmethod
    def optimize_string_operations(df):
        """文字列操作の最適化"""
        print("\n📝 文字列操作の最適化:")
        
        # サンプル文字列データ
        df_str = df.copy()
        df_str['product_name'] = [f'Product_{i%1000:03d}' for i in range(len(df))]
        
        # 方法1: 通常のstring操作
        @profiler.measure_time
        def regular_string_ops():
            result = df_str['product_name'].str.contains('Product_0')
            return result
        
        # 方法2: 正規表現の事前コンパイル
        @profiler.measure_time
        def optimized_string_ops():
            import re
            pattern = re.compile(r'Product_0')
            result = df_str['product_name'].str.contains(pattern)
            return result
        
        # 方法3: カテゴリ型での処理
        @profiler.measure_time
        def categorical_string_ops():
            df_cat = df_str.copy()
            df_cat['product_name'] = df_cat['product_name'].astype('category')
            result = df_cat['product_name'].str.contains('Product_0')
            return result
        
        print("   通常の文字列操作:")
        result1 = regular_string_ops()
        
        print("   正規表現コンパイル済み:")
        result2 = optimized_string_ops()
        
        print("   カテゴリ型での処理:")
        result3 = categorical_string_ops()
        
        return result1
    
    @staticmethod
    def numpy_integration_examples(df):
        """NumPy統合による高速化例"""
        print("\n🔢 NumPy統合による高速化:")
        
        # NumPy配列として取得
        prices = df['price'].values
        quantities = df['quantity'].values
        
        # 統計計算の高速化
        @profiler.measure_time
        def pandas_stats():
            mean_price = df['price'].mean()
            std_price = df['price'].std()
            percentiles = df['price'].quantile([0.25, 0.5, 0.75])
            return mean_price, std_price, percentiles
        
        @profiler.measure_time
        def numpy_stats():
            mean_price = np.mean(prices)
            std_price = np.std(prices)
            percentiles = np.percentile(prices, [25, 50, 75])
            return mean_price, std_price, percentiles
        
        print("   Pandas統計計算:")
        pandas_result = pandas_stats()
        
        print("   NumPy統計計算:")
        numpy_result = numpy_stats()
        
        # 複雑な計算の例
        @profiler.measure_time
        def complex_calculation():
            """複雑な売上計算の例"""
            # 価格帯別の重み付き計算
            weights = np.where(prices > 1000, 1.2, 
                      np.where(prices > 500, 1.1, 1.0))
            
            weighted_revenue = prices * quantities * weights
            
            # 統計量の一括計算
            stats = {
                'total_revenue': np.sum(weighted_revenue),
                'avg_revenue': np.mean(weighted_revenue),
                'revenue_std': np.std(weighted_revenue),
                'high_value_count': np.sum(weighted_revenue > 5000)
            }
            
            return stats
        
        print("   複雑な計算(NumPy活用):")
        complex_result = complex_calculation()
        
        for key, value in complex_result.items():
            print(f"     {key}: {value:.2f}")
        
        return complex_result

# 最適化技術の実行例
if 'optimized_data' in locals():
    optimizer = VectorizationOptimizer()
    
    # ベクトル化の比較
    optimizer.compare_vectorization_vs_loop(optimized_data)
    
    # 文字列操作の最適化
    optimizer.optimize_string_operations(optimized_data)
    
    # NumPy統合の例
    optimizer.numpy_integration_examples(optimized_data)

並列処理による高速化

CPU リソースを最大限活用した並列処理技術を実装します:

実装する並列処理システム:

  • ParallelProcessor: マルチプロセシング処理エンジン
    • parallel_apply(): DataFrame の並列 apply 処理
    • parallel_groupby(): グループ操作の並列化
    • CPU コア数の自動検出と最適化

活用場面:

  • CPU集約的な重い計算処理
  • 大量データの変換・集計
  • バッチ処理の高速化

マルチプロセシングの活用

import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor, as_completed
import dask.dataframe as dd
from functools import partial

class ParallelProcessor:
    """並列処理による高速化"""
    
    def __init__(self, n_cores=None):
        self.n_cores = n_cores or mp.cpu_count()
        print(f"💻 利用可能CPU数: {mp.cpu_count()}, 使用CPU数: {self.n_cores}")
    
    @profiler.measure_time
    def parallel_apply(self, df, func, column=None):
        """DataFrameの並列apply処理"""
        
        # データを分割
        chunk_size = len(df) // self.n_cores
        chunks = [df.iloc[i:i + chunk_size] for i in range(0, len(df), chunk_size)]
        
        # 並列処理実行
        with ProcessPoolExecutor(max_workers=self.n_cores) as executor:
            futures = [executor.submit(func, chunk) for chunk in chunks]
            results = [future.result() for future in as_completed(futures)]
        
        # 結果をマージ
        final_result = pd.concat(results, ignore_index=True)
        return final_result
    
    @profiler.measure_time  
    def parallel_groupby(self, df, group_cols, agg_funcs):
        """グループ操作の並列化"""
        
        def process_group_chunk(chunk):
            return chunk.groupby(group_cols).agg(agg_funcs)
        
        # データ分割
        chunks = np.array_split(df, self.n_cores)
        
        # 各チャンクで部分集計
        with ProcessPoolExecutor(max_workers=self.n_cores) as executor:
            partial_results = list(executor.map(process_group_chunk, chunks))
        
        # 部分結果を統合
        combined = pd.concat(partial_results)
        final_result = combined.groupby(level=0).agg(agg_funcs)
        
        return final_result
    
    @staticmethod
    def demonstrate_multiprocessing():
        """マルチプロセシング処理の実例"""
        
        def heavy_computation(chunk):
            """重い計算処理のシミュレーション"""
            # 複雑な統計計算
            chunk = chunk.copy()
            chunk['price_rank'] = chunk['price'].rank(method='dense')
            chunk['rolling_mean'] = chunk['price'].rolling(window=min(100, len(chunk))).mean()
            chunk['price_zscore'] = (chunk['price'] - chunk['price'].mean()) / chunk['price'].std()
            
            # 条件別カテゴリ生成
            chunk['price_category'] = pd.cut(chunk['price'], 
                                           bins=[0, 500, 1000, 2000, float('inf')],
                                           labels=['Low', 'Medium', 'High', 'Premium'])
            
            return chunk
        
        # サンプルデータ生成
        np.random.seed(42)
        large_df = pd.DataFrame({
            'price': np.random.uniform(100, 3000, 100000),
            'quantity': np.random.randint(1, 20, 100000),
            'category': np.random.choice(['A', 'B', 'C'], 100000)
        })
        
        processor = ParallelProcessor(n_cores=4)
        
        print("\n🔄 並列処理デモンストレーション:")
        
        # シングルプロセス処理
        print("   シングルプロセス処理:")
        start_time = time.perf_counter()
        single_result = heavy_computation(large_df)
        single_time = time.perf_counter() - start_time
        print(f"   実行時間: {single_time:.4f}秒")
        
        # 並列処理
        print("   並列処理:")
        parallel_result = processor.parallel_apply(large_df, heavy_computation)
        
        print(f"   高速化率: {single_time / (time.perf_counter() - start_time):.2f}倍")
        
        return parallel_result

# 並列処理の実行
parallel_demo_result = ParallelProcessor.demonstrate_multiprocessing()

Daskによるスケーラブル処理

分散処理による大規模データ分析システムを構築します:

Dask 実装コンポーネント:

  • DaskProcessor: 分散データ処理エンジン
    • setup_cluster(): Dask クラスタの構築と管理
    • pandas_vs_dask_comparison(): 性能比較とベンチマーク
    • large_scale_processing_example(): 数百万行データの処理例

技術特徴:

  • 遅延評価によるクエリ最適化
  • タスクグラフベースの実行計画
  • メモリを超える大容量データ対応

Dask DataFrame入門

try:
    import dask
    import dask.dataframe as dd
    from dask.distributed import Client
    import dask.array as da
    
    class DaskProcessor:
        """Daskによる大規模データ処理"""
        
        def __init__(self):
            self.client = None
        
        def setup_cluster(self, n_workers=4):
            """Daskクラスタのセットアップ"""
            self.client = Client(processes=True, n_workers=n_workers, threads_per_worker=2)
            print(f"🌐 Daskクラスタ開始: {self.client}")
            return self.client
        
        @profiler.measure_time
        def pandas_vs_dask_comparison(self, file_path):
            """PandasとDaskの性能比較"""
            
            print("\n📊 Pandas vs Dask 性能比較:")
            
            # Pandas処理
            print("   Pandas処理:")
            start_time = time.perf_counter()
            
            pandas_df = pd.read_csv(file_path)
            pandas_result = (pandas_df
                           .groupby('region')
                           .agg({'revenue': ['sum', 'mean', 'count']})
                           .compute() if hasattr(pandas_df.groupby('region').agg({'revenue': ['sum', 'mean', 'count']}), 'compute') 
                           else pandas_df.groupby('region').agg({'revenue': ['sum', 'mean', 'count']})
                           )
            
            pandas_time = time.perf_counter() - start_time
            print(f"   Pandas実行時間: {pandas_time:.4f}秒")
            
            # Dask処理
            print("   Dask処理:")
            start_time = time.perf_counter()
            
            dask_df = dd.read_csv(file_path)
            dask_result = (dask_df
                          .groupby('region')
                          .agg({'revenue': ['sum', 'mean', 'count']})
                          .compute())
            
            dask_time = time.perf_counter() - start_time
            print(f"   Dask実行時間: {dask_time:.4f}秒")
            
            print(f"   結果比較: {'一致' if pandas_result.equals(dask_result) else '不一致'}")
            
            return pandas_result, dask_result
        
        def large_scale_processing_example(self):
            """大規模データ処理の例"""
            
            print("\n🚀 大規模データ処理デモ:")
            
            # 大容量データの生成(Daskで)
            print("   大容量データ生成中...")
            
            # Dask Arrayで大容量データ生成
            n_rows = 5_000_000  # 500万行
            
            dates = dd.from_pandas(
                pd.date_range('2023-01-01', periods=n_rows, freq='1H'), 
                npartitions=50
            )
            
            # ランダムデータ生成
            np.random.seed(42)
            dask_df = dd.from_pandas(pd.DataFrame({
                'timestamp': pd.date_range('2023-01-01', periods=n_rows, freq='1H'),
                'user_id': np.random.randint(1, 100000, n_rows),
                'product_id': np.random.randint(1, 10000, n_rows), 
                'price': np.random.uniform(10, 1000, n_rows),
                'quantity': np.random.randint(1, 5, n_rows)
            }), npartitions=50)
            
            print(f"   データサイズ: {len(dask_df)} 行")
            
            # 複雑な分析処理
            @profiler.measure_time
            def complex_analysis():
                # 売上計算
                dask_df_with_revenue = dask_df.assign(
                    revenue=dask_df['price'] * dask_df['quantity']
                )
                
                # 時間別、商品別集計
                daily_stats = (dask_df_with_revenue
                              .set_index('timestamp')
                              .resample('D')
                              .agg({
                                  'revenue': ['sum', 'mean'],
                                  'quantity': 'sum',
                                  'user_id': 'nunique'
                              })
                              .compute())
                
                # 商品別売上ランキング
                product_ranking = (dask_df_with_revenue
                                 .groupby('product_id')
                                 .agg({'revenue': 'sum', 'quantity': 'sum'})
                                 .nlargest(10, 'revenue')
                                 .compute())
                
                return daily_stats, product_ranking
            
            daily_stats, product_ranking = complex_analysis()
            
            print("   📈 日別統計(先頭5日):")
            print(daily_stats.head())
            
            print("\n   🏆 売上TOP10商品:")
            print(product_ranking)
            
            return daily_stats, product_ranking
        
        def cleanup(self):
            """リソースのクリーンアップ"""
            if self.client:
                self.client.close()
                print("🧹 Daskクラスタをクローズしました")

    # Dask処理の実行例
    if 'sample_file' in locals():
        dask_processor = DaskProcessor()
        
        # クラスタセットアップ
        dask_processor.setup_cluster()
        
        try:
            # 性能比較
            pandas_result, dask_result = dask_processor.pandas_vs_dask_comparison(sample_file)
            
            # 大規模処理デモ
            daily_stats, product_ranking = dask_processor.large_scale_processing_example()
            
        finally:
            # クリーンアップ
            dask_processor.cleanup()

except ImportError:
    print("⚠ Daskがインストールされていません。pip install dask でインストールしてください。")

Polarsによる高性能データ処理

Rust ベースの次世代高性能データ処理ライブラリを活用します:

Polars 実装システム:

  • PolarsProcessor: 高性能データ処理エンジン
    • pandas_vs_polars_comparison(): Pandas との性能比較
    • polars_advanced_operations(): 高度なクエリ操作
    • polars_lazy_evaluation(): 遅延評価による最適化

性能特性:

  • Pandas 比 2-10倍の高速化
  • Apache Arrow フォーマット活用
  • SIMD命令による自動並列化

Polars入門

try:
    import polars as pl
    
    class PolarsProcessor:
        """Polarsによる高性能データ処理"""
        
        @staticmethod
        @profiler.measure_time
        def pandas_vs_polars_comparison(df_pandas):
            """PandasとPolarsの性能比較"""
            
            print("\n⚡ Pandas vs Polars 性能比較:")
            
            # Pandasでの処理
            print("   Pandas処理:")
            start_time = time.perf_counter()
            
            pandas_result = (df_pandas
                           .groupby('region')
                           .agg({
                               'price': ['mean', 'std'],
                               'quantity': 'sum',
                               'revenue': 'sum'
                           }))
            
            pandas_time = time.perf_counter() - start_time
            print(f"   Pandas実行時間: {pandas_time:.4f}秒")
            
            # PolarsのDataFrameに変換
            df_polars = pl.from_pandas(df_pandas)
            
            # Polarsでの処理
            print("   Polars処理:")
            start_time = time.perf_counter()
            
            polars_result = (df_polars
                           .group_by('region')
                           .agg([
                               pl.col('price').mean().alias('price_mean'),
                               pl.col('price').std().alias('price_std'),
                               pl.col('quantity').sum().alias('quantity_sum'),
                               pl.col('revenue').sum().alias('revenue_sum')
                           ]))
            
            polars_time = time.perf_counter() - start_time
            print(f"   Polars実行時間: {polars_time:.4f}秒")
            print(f"   高速化率: {pandas_time/polars_time:.2f}倍")
            
            return polars_result
        
        @staticmethod
        def polars_advanced_operations():
            """Polarsの高度な操作例"""
            
            print("\n🔧 Polars高度な操作:")
            
            # サンプルデータ生成
            n_rows = 1_000_000
            np.random.seed(42)
            
            df = pl.DataFrame({
                'timestamp': pl.date_range(
                    start=pl.datetime(2023, 1, 1),
                    end=pl.datetime(2023, 12, 31),
                    interval='1h'
                )[:n_rows],
                'user_id': np.random.randint(1, 10000, n_rows),
                'product_category': np.random.choice(['Electronics', 'Clothing', 'Books'], n_rows),
                'price': np.random.uniform(10, 500, n_rows),
                'quantity': np.random.randint(1, 10, n_rows)
            })
            
            # 複雑なクエリの例
            @profiler.measure_time
            def complex_polars_query():
                result = (df
                         .with_columns([
                             (pl.col('price') * pl.col('quantity')).alias('revenue'),
                             pl.col('timestamp').dt.month().alias('month'),
                             pl.col('timestamp').dt.hour().alias('hour')
                         ])
                         .filter(pl.col('revenue') > 50)
                         .group_by(['product_category', 'month'])
                         .agg([
                             pl.col('revenue').sum().alias('total_revenue'),
                             pl.col('revenue').mean().alias('avg_revenue'),
                             pl.col('user_id').n_unique().alias('unique_users'),
                             pl.col('quantity').sum().alias('total_quantity'),
                             pl.col('hour').mode().first().alias('peak_hour')
                         ])
                         .sort(['product_category', 'total_revenue'], descending=[False, True]))
                
                return result
            
            result = complex_polars_query()
            print("   📊 カテゴリ・月別分析結果(Top 10):")
            print(result.head(10))
            
            return result
        
        @staticmethod
        def polars_lazy_evaluation():
            """Polarsの遅延評価によるクエリ最適化"""
            
            print("\n⚡ Polars遅延評価による最適化:")
            
            # サンプルデータ
            df = pl.DataFrame({
                'id': range(1000000),
                'value': np.random.randn(1000000),
                'category': np.random.choice(['A', 'B', 'C', 'D'], 1000000),
                'date': pl.date_range(
                    start=pl.datetime(2023, 1, 1),
                    end=pl.datetime(2023, 12, 31),
                    interval='8h'
                )[:1000000]
            })
            
            # 即座実行(Eager)
            @profiler.measure_time
            def eager_execution():
                result = (df
                         .filter(pl.col('value') > 0)
                         .with_columns(pl.col('date').dt.month().alias('month'))
                         .group_by(['category', 'month'])
                         .agg(pl.col('value').mean())
                         .sort('value', descending=True))
                return result
            
            # 遅延評価(Lazy)
            @profiler.measure_time  
            def lazy_execution():
                result = (df.lazy()
                         .filter(pl.col('value') > 0)
                         .with_columns(pl.col('date').dt.month().alias('month'))
                         .group_by(['category', 'month'])
                         .agg(pl.col('value').mean())
                         .sort('value', descending=True)
                         .collect())
                return result
            
            print("   即座実行:")
            eager_result = eager_execution()
            
            print("   遅延評価:")
            lazy_result = lazy_execution()
            
            print(f"   結果の一致: {'✅' if eager_result.equals(lazy_result) else '❌'}")
            
            return lazy_result

    # Polars処理の実行例
    if 'optimized_data' in locals():
        polars_processor = PolarsProcessor()
        
        # 性能比較
        polars_result = polars_processor.pandas_vs_polars_comparison(optimized_data)
        
        # 高度な操作
        advanced_result = polars_processor.polars_advanced_operations()
        
        # 遅延評価
        lazy_result = polars_processor.polars_lazy_evaluation()

except ImportError:
    print("⚠ Polarsがインストールされていません。pip install polars でインストールしてください。")

プロダクション環境での実践

本格的な運用環境で使える堅牢なデータ処理システムを構築します:

プロダクション対応コンポーネント:

  • ProductionDataProcessor: 企業レベルデータ処理基盤
    • performance_monitoring(): 包括的パフォーマンス監視
    • robust_data_loading(): 堅牢なデータ読み込み
    • data_quality_check(): 自動データ品質検証
    • safe_data_processing(): 安全な処理パイプライン

運用機能:

  • 24/7稼働対応の監視・アラート
  • 障害時の自動回復とログ
  • データ品質の継続的監視

モニタリングとプロファイリング

import logging
import sys
from contextlib import contextmanager
import traceback

class ProductionDataProcessor:
    """プロダクション環境でのデータ処理基盤"""
    
    def __init__(self, log_level=logging.INFO):
        self.setup_logging(log_level)
        self.performance_metrics = {}
    
    def setup_logging(self, log_level):
        """ログ設定"""
        logging.basicConfig(
            level=log_level,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
            handlers=[
                logging.StreamHandler(sys.stdout),
                logging.FileHandler('data_processing.log')
            ]
        )
        self.logger = logging.getLogger(__name__)
    
    @contextmanager
    def performance_monitoring(self, operation_name):
        """パフォーマンス監視コンテキストマネージャ"""
        start_time = time.perf_counter()
        start_memory = psutil.Process().memory_info().rss / 1024 / 1024
        
        self.logger.info(f"🚀 {operation_name} 開始")
        
        try:
            yield
            
            end_time = time.perf_counter()
            end_memory = psutil.Process().memory_info().rss / 1024 / 1024
            
            execution_time = end_time - start_time
            memory_diff = end_memory - start_memory
            
            self.performance_metrics[operation_name] = {
                'execution_time': execution_time,
                'memory_change': memory_diff,
                'peak_memory': end_memory
            }
            
            self.logger.info(f"✅ {operation_name} 完了 - "
                           f"実行時間: {execution_time:.4f}秒, "
                           f"メモリ変化: {memory_diff:+.2f}MB")
            
        except Exception as e:
            self.logger.error(f"❌ {operation_name} エラー: {str(e)}")
            self.logger.error(traceback.format_exc())
            raise
    
    def robust_data_loading(self, file_path, **kwargs):
        """堅牢なデータ読み込み"""
        
        with self.performance_monitoring("データ読み込み"):
            try:
                # ファイル存在確認
                if not os.path.exists(file_path):
                    raise FileNotFoundError(f"ファイルが見つかりません: {file_path}")
                
                # ファイルサイズ確認
                file_size = os.path.getsize(file_path) / 1024 / 1024  # MB
                self.logger.info(f"📁 ファイルサイズ: {file_size:.2f}MB")
                
                # メモリ使用量に応じた読み込み戦略
                available_memory = psutil.virtual_memory().available / 1024 / 1024  # MB
                
                if file_size > available_memory * 0.5:  # 利用可能メモリの50%以上
                    self.logger.warning("⚠ 大容量ファイル検出 - チャンク読み込みを推奨")
                    
                    chunk_size = kwargs.get('chunksize', 50000)
                    df_chunks = []
                    
                    for i, chunk in enumerate(pd.read_csv(file_path, chunksize=chunk_size)):
                        df_chunks.append(chunk)
                        if i % 10 == 0:
                            self.logger.info(f"   チャンク {i+1} 読み込み完了")
                    
                    df = pd.concat(df_chunks, ignore_index=True)
                    del df_chunks  # メモリ解放
                    
                else:
                    df = pd.read_csv(file_path, **kwargs)
                
                self.logger.info(f"📊 データ読み込み完了: {df.shape}")
                return df
                
            except Exception as e:
                self.logger.error(f"データ読み込みエラー: {e}")
                raise
    
    def data_quality_check(self, df, name="データ"):
        """データ品質チェック"""
        
        with self.performance_monitoring("データ品質チェック"):
            quality_report = {
                'row_count': len(df),
                'column_count': len(df.columns),
                'memory_usage_mb': df.memory_usage(deep=True).sum() / 1024 / 1024,
                'null_counts': df.isnull().sum().to_dict(),
                'duplicate_rows': df.duplicated().sum(),
                'data_types': df.dtypes.to_dict()
            }
            
            # 品質問題の検出
            issues = []
            
            # 欠損値チェック
            high_null_cols = [col for col, count in quality_report['null_counts'].items() 
                            if count > len(df) * 0.1]  # 10%以上欠損
            if high_null_cols:
                issues.append(f"高欠損率列: {high_null_cols}")
            
            # 重複行チェック
            if quality_report['duplicate_rows'] > 0:
                issues.append(f"重複行: {quality_report['duplicate_rows']} 件")
            
            # データ型チェック
            object_cols = [col for col, dtype in quality_report['data_types'].items() 
                          if dtype == 'object']
            if object_cols:
                issues.append(f"最適化可能な文字列列: {len(object_cols)} 列")
            
            # レポート出力
            self.logger.info(f"📋 {name} 品質レポート:")
            self.logger.info(f"   行数: {quality_report['row_count']:,}")
            self.logger.info(f"   列数: {quality_report['column_count']}")
            self.logger.info(f"   メモリ使用量: {quality_report['memory_usage_mb']:.2f}MB")
            
            if issues:
                self.logger.warning("⚠ 品質問題:")
                for issue in issues:
                    self.logger.warning(f"   - {issue}")
            else:
                self.logger.info("✅ データ品質: 良好")
            
            return quality_report, issues
    
    def safe_data_processing(self, df, processing_steps):
        """安全なデータ処理パイプライン"""
        
        result_df = df.copy()
        
        for i, (step_name, step_func) in enumerate(processing_steps):
            with self.performance_monitoring(f"ステップ{i+1}: {step_name}"):
                try:
                    # 処理前の状態保存
                    before_shape = result_df.shape
                    
                    # 処理実行
                    result_df = step_func(result_df)
                    
                    # 処理後の検証
                    after_shape = result_df.shape
                    
                    self.logger.info(f"   データ変化: {before_shape}{after_shape}")
                    
                    # 異常な変化の検出
                    if after_shape[0] < before_shape[0] * 0.5:  # 行数が50%以上減少
                        self.logger.warning(f"⚠ 大量データ削除検出: {before_shape[0] - after_shape[0]} 行削除")
                    
                except Exception as e:
                    self.logger.error(f"ステップ{i+1}でエラー: {e}")
                    raise
        
        return result_df
    
    def generate_performance_report(self):
        """パフォーマンスレポート生成"""
        
        if not self.performance_metrics:
            self.logger.info("📊 パフォーマンス情報なし")
            return
        
        self.logger.info("📈 パフォーマンスレポート:")
        total_time = sum(metrics['execution_time'] for metrics in self.performance_metrics.values())
        
        for operation, metrics in self.performance_metrics.items():
            percentage = metrics['execution_time'] / total_time * 100
            self.logger.info(
                f"   {operation}: {metrics['execution_time']:.4f}秒 "
                f"({percentage:.1f}%), メモリ: {metrics['memory_change']:+.2f}MB"
            )
        
        self.logger.info(f"   総実行時間: {total_time:.4f}秒")

# プロダクション処理の実行例
def production_pipeline_example():
    """プロダクション環境でのデータ処理パイプライン例"""
    
    processor = ProductionDataProcessor()
    
    # 処理ステップの定義
    def step1_data_cleaning(df):
        """データクリーニング"""
        return df.dropna().drop_duplicates()
    
    def step2_feature_engineering(df):
        """特徴量エンジニアリング"""
        df = df.copy()
        if 'price' in df.columns and 'quantity' in df.columns:
            df['revenue'] = df['price'] * df['quantity']
        return df
    
    def step3_optimization(df):
        """データ型最適化"""
        optimizer = DataTypeOptimizer()
        df = optimizer.optimize_numeric_columns(df)
        df = optimizer.optimize_categorical_columns(df)
        return df
    
    processing_steps = [
        ("データクリーニング", step1_data_cleaning),
        ("特徴量エンジニアリング", step2_feature_engineering),  
        ("データ型最適化", step3_optimization)
    ]
    
    try:
        # データ読み込み
        if 'sample_file' in locals():
            df = processor.robust_data_loading(sample_file)
            
            # 品質チェック
            quality_report, issues = processor.data_quality_check(df, "元データ")
            
            # 安全な処理実行
            processed_df = processor.safe_data_processing(df, processing_steps)
            
            # 最終品質チェック
            final_quality, final_issues = processor.data_quality_check(processed_df, "処理後データ")
            
            # パフォーマンスレポート
            processor.generate_performance_report()
            
            return processed_df
            
    except Exception as e:
        processor.logger.error(f"パイプライン実行エラー: {e}")
        raise

# プロダクション処理の実行
if 'sample_file' in locals():
    final_result = production_pipeline_example()

まとめと次のステップ

パフォーマンス最適化のベストプラクティス

メモリ効率:

  • 適切なデータ型選択(int32, category等)
  • 不要データの早期削除(del, gc.collect())
  • チャンク処理による大容量ファイル対応

処理速度向上:

  • ベクトル化操作の活用
  • NumPy統合による高速計算
  • 並列処理による CPU リソース活用

スケーラビリティ:

  • Daskによる分散処理
  • Polarsによる高性能処理
  • 遅延評価の活用

プロダクション対応:

  • 堅牢なエラーハンドリング
  • 包括的なログ機能
  • パフォーマンス監視

技術選択の指針

データサイズ 推奨技術 理由
< 1GB Pandas最適化 シンプルで十分な性能
1-10GB Polars、最適化Pandas メモリ効率と高速処理
10-100GB Dask、チャンク処理 分散処理とスケーラビリティ
> 100GB Spark、クラウドサービス 専用基盤が必要

継続的改善のポイント

  1. 定期的なプロファイリング: ボトルネック特定
  2. メモリ使用量監視: リソース効率の測定
  3. 新技術の評価: Polars、DuckDB等の検討
  4. 並列化の最適化: CPU・メモリバランス

次のレベルへ

  • 機械学習パイプライン: scikit-learn、MLOps統合
  • ストリーミング処理: Kafka、Apache Beam
  • クラウドネイティブ: AWS、GCP、Azureサービス
  • 可視化の高度化: Plotly、Streamlit、Dash

高性能データ分析をマスターすることで、大規模なビジネス課題に対応できる技術力が身につきます。まずは自身のプロジェクトで小さく始めて、徐々にスケールアップしていくことをお勧めします。

関連記事

高度な株価スクレイピング 第3部:アービトラージ検出とリアルタイムダッシュボード
仮想通貨

高度な株価スクレイピング 第3部:アービトラージ検出とリアルタイムダッシュボード

高度な株価スクレイピング 第3部:アービトラージ検出とリアルタイムダッシュボード シリーズ最終回では、アービトラージ(裁定取引)機会の自動検出と、すべてのデータを統合するリアルタイムダッシュボードを構築します。取引所間の価格差を利用した利益機会を見逃さないシステムを実装しましょう。 🎯...

高度な株価スクレイピング 第2部:機械学習による価格予測システムの実装
仮想通貨

高度な株価スクレイピング 第2部:機械学習による価格予測システムの実装

高度な株価スクレイピング 第2部:機械学習による価格予測システムの実装 第1部で構築したリアルタイムデータ収集システムを活用し、機械学習による価格予測システムを実装します。LSTMニューラルネットワークを使用して、高精度な価格予測を実現しましょう。 🎯 この記事で構築するもの 🚀...

高度な株価スクレイピング 第1部:リアルタイム監視システムの設計と実装
仮想通貨

高度な株価スクレイピング 第1部:リアルタイム監視システムの設計と実装

高度な株価スクレイピング 第1部:リアルタイム監視システムの設計と実装 基本的な株価取得から一歩進んで、プロトレーダーレベルのリアルタイム監視システムを構築しましょう。この第1部では、WebSocketを活用した高頻度データ取得とスケーラブルなアーキテクチャの設計を詳しく解説します。 🎯...

コンテナ開発環境パフォーマンス最適化実践:高速化テクニックの完全ガイド
仮想通貨

コンテナ開発環境パフォーマンス最適化実践:高速化テクニックの完全ガイド

コンテナ開発環境パフォーマンス最適化実践:高速化テクニックの完全ガイド コンテナ化開発環境の基本的な構築ができたら、次に重要なのはパフォーマンスの最適化です。「ビルドに時間がかかりすぎる」「コンテナの起動が遅い」「ファイル同期が重い」といった問題は、開発効率を大幅に低下させます。...

コメント

0/2000