Pythonで始める業務自動化:ファイル操作とデータ処理の基本

約69分で読めます by ぽんたぬき
Pythonで始める業務自動化:ファイル操作とデータ処理の基本

Pythonで始める業務自動化:ファイル操作とデータ処理の基本

毎日同じようなファイル操作やデータ処理に時間を取られていませんか?Pythonを使えば、そんな繰り返し作業を自動化して、もっと価値のある仕事に時間を使えるようになります。

この記事では、Python初心者でも実践できる業務自動化の基本テクニックを、実際のコード例とともに詳しく解説します。

なぜPythonで業務自動化なのか?

Pythonを選ぶ理由

  1. 学習コストが低い: 直感的で読みやすい文法
  2. 豊富なライブラリ: ファイル操作、データ処理に特化したツールが充実
  3. クロスプラットフォーム: Windows、Mac、Linuxで同じコードが動作
  4. 無料: ライセンス費用がかからない
  5. コミュニティが活発: 問題解決の情報が豊富

自動化できる典型的な業務

  • ファイル整理: 大量のファイルを条件に応じて分類・移動
  • データ変換: ExcelファイルをCSVに変換、複数ファイルの統合
  • レポート作成: データから自動でグラフやレポートを生成
  • メール送信: 定期的な報告メールの自動送信
  • ログ解析: アクセスログやエラーログの分析

環境構築と必要なライブラリ

基本ライブラリのインストール

# 必要なライブラリをインストール
pip install pandas openpyxl xlsxwriter pathlib2

推奨ライブラリ一覧

# 標準ライブラリ(インストール不要)
import os          # ファイル・ディレクトリ操作
import shutil      # ファイルコピー・移動
import glob        # ファイル検索
import datetime    # 日時操作
import csv         # CSV操作
import json        # JSON操作

# 外部ライブラリ(pip install必要)
import pandas as pd      # データ分析・操作
import openpyxl         # Excel操作(読み書き)
from pathlib import Path # モダンなパス操作

ファイル操作の基本

1. ディレクトリとファイルの基本操作

import os
import shutil
from pathlib import Path

def basic_file_operations():
    """基本的なファイル操作のデモ"""
    
    # 現在のディレクトリを取得
    current_dir = Path.cwd()
    print(f"現在のディレクトリ: {current_dir}")
    
    # 新しいディレクトリを作成
    new_dir = current_dir / "automation_test"
    new_dir.mkdir(exist_ok=True)  # 既に存在しても エラーにならない
    
    # ファイルの存在確認
    test_file = new_dir / "test.txt"
    if test_file.exists():
        print("ファイルは既に存在します")
    else:
        # ファイルを作成
        test_file.write_text("これはテストファイルです", encoding='utf-8')
        print("ファイルを作成しました")
    
    # ファイル情報の取得
    if test_file.exists():
        file_size = test_file.stat().st_size
        modified_time = datetime.fromtimestamp(test_file.stat().st_mtime)
        print(f"ファイルサイズ: {file_size} bytes")
        print(f"最終更新日時: {modified_time}")

# 実行例
basic_file_operations()

2. 大量ファイルの自動分類

import os
import shutil
from pathlib import Path

def organize_files_by_extension(source_dir, target_dir):
    """ファイル拡張子別に自動分類する関数
    
    Args:
        source_dir (str): 分類対象のディレクトリ
        target_dir (str): 分類先のディレクトリ
    """
    source_path = Path(source_dir)
    target_path = Path(target_dir)
    
    # ターゲットディレクトリを作成
    target_path.mkdir(exist_ok=True)
    
    # ファイル種類別のディレクトリマッピング
    file_categories = {
        'images': ['.jpg', '.jpeg', '.png', '.gif', '.bmp', '.svg'],
        'documents': ['.pdf', '.doc', '.docx', '.txt', '.rtf'],
        'spreadsheets': ['.xls', '.xlsx', '.csv'],
        'presentations': ['.ppt', '.pptx'],
        'archives': ['.zip', '.rar', '.7z', '.tar', '.gz'],
        'videos': ['.mp4', '.avi', '.mov', '.wmv', '.flv'],
        'audio': ['.mp3', '.wav', '.flac', '.aac']
    }
    
    moved_files = {'success': 0, 'error': 0, 'details': []}
    
    # すべてのファイルをスキャン
    for file_path in source_path.rglob('*'):
        if file_path.is_file():
            file_extension = file_path.suffix.lower()
            
            # ファイルのカテゴリを決定
            category = 'others'  # デフォルトカテゴリ
            for cat, extensions in file_categories.items():
                if file_extension in extensions:
                    category = cat
                    break
            
            # カテゴリディレクトリを作成
            category_dir = target_path / category
            category_dir.mkdir(exist_ok=True)
            
            # ファイルを移動
            try:
                target_file = category_dir / file_path.name
                
                # 同名ファイルが存在する場合は連番を付加
                counter = 1
                original_name = target_file.stem
                original_suffix = target_file.suffix
                
                while target_file.exists():
                    target_file = category_dir / f"{original_name}_{counter}{original_suffix}"
                    counter += 1
                
                shutil.move(str(file_path), str(target_file))
                moved_files['success'] += 1
                moved_files['details'].append(f"✓ {file_path.name}{category}/{target_file.name}")
                
            except Exception as e:
                moved_files['error'] += 1
                moved_files['details'].append(f"✗ {file_path.name}: {str(e)}")
    
    return moved_files

# 使用例
def demo_file_organization():
    """ファイル整理のデモ"""
    
    # テスト用ファイルを作成
    test_dir = Path("test_files")
    test_dir.mkdir(exist_ok=True)
    
    test_files = [
        "report.pdf", "data.xlsx", "photo.jpg", 
        "music.mp3", "video.mp4", "archive.zip",
        "document.docx", "presentation.pptx"
    ]
    
    for filename in test_files:
        (test_dir / filename).write_text(f"Test content for {filename}")
    
    print("テストファイルを作成しました:")
    for file in test_dir.iterdir():
        print(f"  - {file.name}")
    
    # ファイルを整理
    result = organize_files_by_extension("test_files", "organized_files")
    
    print(f"\n整理結果:")
    print(f"成功: {result['success']} ファイル")
    print(f"エラー: {result['error']} ファイル")
    print("\n詳細:")
    for detail in result['details']:
        print(f"  {detail}")

# デモ実行
demo_file_organization()

Excel・CSVファイルの自動処理

1. 複数ExcelファイルをCSVに一括変換

import pandas as pd
from pathlib import Path
import os

def excel_to_csv_converter(input_dir, output_dir):
    """Excel ファイルを CSV に一括変換する関数
    
    Args:
        input_dir (str): Excel ファイルが格納されたディレクトリ
        output_dir (str): CSV ファイルの出力先ディレクトリ
    """
    input_path = Path(input_dir)
    output_path = Path(output_dir)
    output_path.mkdir(exist_ok=True)
    
    conversion_results = []
    
    # Excel ファイルを検索
    excel_files = list(input_path.glob("*.xlsx")) + list(input_path.glob("*.xls"))
    
    if not excel_files:
        print("Excel ファイルが見つかりませんでした")
        return conversion_results
    
    for excel_file in excel_files:
        try:
            print(f"処理中: {excel_file.name}")
            
            # Excel ファイルを読み込み(複数シート対応)
            excel_data = pd.read_excel(excel_file, sheet_name=None)
            
            for sheet_name, df in excel_data.items():
                # シート名をファイル名に含める
                if len(excel_data) == 1:
                    # シートが1つの場合
                    csv_filename = f"{excel_file.stem}.csv"
                else:
                    # 複数シートの場合
                    csv_filename = f"{excel_file.stem}_{sheet_name}.csv"
                
                csv_path = output_path / csv_filename
                
                # CSV として保存
                df.to_csv(csv_path, index=False, encoding='utf-8-sig')
                
                conversion_results.append({
                    'source': excel_file.name,
                    'sheet': sheet_name,
                    'output': csv_filename,
                    'rows': len(df),
                    'cols': len(df.columns),
                    'status': 'success'
                })
                
                print(f"  ✓ {sheet_name}{csv_filename} ({len(df)} 行, {len(df.columns)} 列)")
                
        except Exception as e:
            conversion_results.append({
                'source': excel_file.name,
                'sheet': None,
                'output': None,
                'rows': 0,
                'cols': 0,
                'status': f'error: {str(e)}'
            })
            print(f"  ✗ エラー: {str(e)}")
    
    return conversion_results

# 使用例
def demo_excel_conversion():
    """Excel変換デモ"""
    
    # テスト用のExcelファイルを作成
    test_input_dir = Path("excel_input")
    test_input_dir.mkdir(exist_ok=True)
    
    # サンプルデータの作成
    sample_data1 = pd.DataFrame({
        '商品名': ['りんご', 'バナナ', 'オレンジ'],
        '価格': [100, 80, 120],
        '在庫': [50, 30, 25]
    })
    
    sample_data2 = pd.DataFrame({
        '顧客名': ['田中太郎', '佐藤花子', '鈴木一郎'],
        '売上': [15000, 22000, 18000],
        '地域': ['東京', '大阪', '名古屋']
    })
    
    # Excelファイルに保存
    with pd.ExcelWriter(test_input_dir / "商品管理.xlsx") as writer:
        sample_data1.to_excel(writer, sheet_name='商品マスタ', index=False)
        sample_data2.to_excel(writer, sheet_name='売上データ', index=False)
    
    print("テスト用Excelファイルを作成しました")
    
    # 変換実行
    results = excel_to_csv_converter("excel_input", "csv_output")
    
    print(f"\n変換完了: {len(results)} 件処理")
    
    # 結果表示
    success_count = sum(1 for r in results if r['status'] == 'success')
    error_count = len(results) - success_count
    
    print(f"成功: {success_count} 件")
    print(f"エラー: {error_count} 件")

demo_excel_conversion()

2. 複数CSVファイルの統合と分析

import pandas as pd
from pathlib import Path
import matplotlib.pyplot as plt
import datetime

def merge_and_analyze_csv_files(input_dir, output_file):
    """複数のCSVファイルを統合して分析する関数
    
    Args:
        input_dir (str): CSVファイルが格納されたディレクトリ
        output_file (str): 統合結果の出力ファイル名
    """
    input_path = Path(input_dir)
    
    # CSVファイルを検索
    csv_files = list(input_path.glob("*.csv"))
    
    if not csv_files:
        print("CSVファイルが見つかりませんでした")
        return None
    
    all_dataframes = []
    
    for csv_file in csv_files:
        try:
            print(f"読み込み中: {csv_file.name}")
            
            # CSVファイルを読み込み
            df = pd.read_csv(csv_file, encoding='utf-8-sig')
            
            # ファイル名を列として追加(どのファイルのデータかを識別)
            df['ソースファイル'] = csv_file.stem
            
            all_dataframes.append(df)
            print(f"  ✓ {len(df)} 行を読み込み")
            
        except Exception as e:
            print(f"  ✗ エラー: {str(e)}")
    
    if not all_dataframes:
        print("有効なデータが見つかりませんでした")
        return None
    
    # 全データを統合
    merged_df = pd.concat(all_dataframes, ignore_index=True)
    print(f"\n統合完了: 総データ数 {len(merged_df)} 行")
    
    # 基本統計を計算
    analysis_results = {}
    
    # 数値列の統計
    numeric_columns = merged_df.select_dtypes(include=['number']).columns
    if len(numeric_columns) > 0:
        analysis_results['数値統計'] = merged_df[numeric_columns].describe()
        print("\n=== 数値列の統計 ===")
        print(analysis_results['数値統計'])
    
    # カテゴリ列の集計
    categorical_columns = merged_df.select_dtypes(include=['object']).columns
    for col in categorical_columns:
        if col != 'ソースファイル':  # ソースファイル列以外
            value_counts = merged_df[col].value_counts()
            analysis_results[f'{col}_集計'] = value_counts
            print(f"\n=== {col} の集計 ===")
            print(value_counts.head(10))  # 上位10件のみ表示
    
    # ソースファイル別のデータ数
    file_counts = merged_df['ソースファイル'].value_counts()
    analysis_results['ファイル別データ数'] = file_counts
    print("\n=== ファイル別データ数 ===")
    print(file_counts)
    
    # 統合データを保存
    output_path = Path(output_file)
    merged_df.to_csv(output_path, index=False, encoding='utf-8-sig')
    print(f"\n統合データを保存しました: {output_path}")
    
    # 分析レポートを保存
    report_path = output_path.with_suffix('.txt')
    with open(report_path, 'w', encoding='utf-8') as f:
        f.write(f"データ統合・分析レポート\n")
        f.write(f"生成日時: {datetime.datetime.now()}\n")
        f.write(f"=" * 50 + "\n\n")
        
        f.write(f"統合ファイル数: {len(csv_files)}\n")
        f.write(f"総データ数: {len(merged_df)}\n")
        f.write(f"列数: {len(merged_df.columns)}\n\n")
        
        for key, value in analysis_results.items():
            f.write(f"{key}:\n")
            f.write(str(value))
            f.write("\n" + "=" * 30 + "\n\n")
    
    print(f"分析レポートを保存しました: {report_path}")
    
    return merged_df, analysis_results

# 使用例とデモ
def demo_csv_merge_analysis():
    """CSV統合分析のデモ"""
    
    # テスト用のCSVデータを作成
    test_dir = Path("csv_data")
    test_dir.mkdir(exist_ok=True)
    
    # 月別売上データを作成
    months = ['2024年01月', '2024年02月', '2024年03月']
    products = ['商品A', '商品B', '商品C', '商品D']
    regions = ['東京', '大阪', '名古屋', '福岡']
    
    for i, month in enumerate(months):
        # ランダムな売上データを生成
        data = []
        for _ in range(20):  # 各月20件のデータ
            data.append({
                '月': month,
                '商品': pd.np.random.choice(products),
                '地域': pd.np.random.choice(regions),
                '売上': pd.np.random.randint(1000, 10000),
                '数量': pd.np.random.randint(1, 50)
            })
        
        df = pd.DataFrame(data)
        csv_path = test_dir / f"売上_{month.replace('年', '').replace('月', '')}.csv"
        df.to_csv(csv_path, index=False, encoding='utf-8-sig')
        print(f"テストデータ作成: {csv_path.name}")
    
    # CSV統合・分析実行
    merged_data, analysis = merge_and_analyze_csv_files("csv_data", "統合売上データ.csv")
    
    return merged_data, analysis

# デモ実行
demo_data, demo_analysis = demo_csv_merge_analysis()

定期実行とタスクスケジューリング

1. スケジュール実行の基本

import schedule
import time
import datetime
from pathlib import Path

def backup_important_files():
    """重要ファイルのバックアップ処理"""
    source_dir = Path("important_files")
    backup_dir = Path("backups")
    
    # バックアップディレクトリを作成
    backup_dir.mkdir(exist_ok=True)
    
    # 日付付きバックアップフォルダを作成
    today = datetime.date.today().strftime("%Y%m%d")
    daily_backup_dir = backup_dir / f"backup_{today}"
    daily_backup_dir.mkdir(exist_ok=True)
    
    # ファイルをコピー
    if source_dir.exists():
        import shutil
        for file_path in source_dir.glob("*"):
            if file_path.is_file():
                shutil.copy2(file_path, daily_backup_dir)
        
        print(f"バックアップ完了: {daily_backup_dir}")
    else:
        print("バックアップ対象ディレクトリが見つかりません")

def generate_daily_report():
    """日次レポート生成"""
    report_data = {
        '日付': datetime.date.today(),
        '処理時刻': datetime.datetime.now().strftime("%H:%M:%S"),
        'ステータス': '正常'
    }
    
    report_file = Path(f"daily_report_{datetime.date.today().strftime('%Y%m%d')}.txt")
    
    with open(report_file, 'w', encoding='utf-8') as f:
        f.write("=== 日次処理レポート ===\n")
        for key, value in report_data.items():
            f.write(f"{key}: {value}\n")
    
    print(f"日次レポート生成: {report_file}")

def setup_automation_schedule():
    """自動化スケジュールの設定"""
    
    # 毎日午前9時にバックアップ実行
    schedule.every().day.at("09:00").do(backup_important_files)
    
    # 毎週月曜日の午前10時にレポート生成
    schedule.every().monday.at("10:00").do(generate_daily_report)
    
    # 毎時00分にファイルチェック(例)
    schedule.every().hour.at(":00").do(lambda: print("定期チェック実行"))
    
    print("スケジュールを設定しました:")
    print("- 毎日09:00: ファイルバックアップ")
    print("- 毎週月曜10:00: レポート生成")
    print("- 毎時00分: 定期チェック")
    
    # スケジュール実行(実際の運用では常駐プロセスとして実行)
    print("\nスケジュール実行中... (Ctrl+C で終了)")
    try:
        while True:
            schedule.run_pending()
            time.sleep(60)  # 1分間隔でチェック
    except KeyboardInterrupt:
        print("\nスケジュール実行を終了します")

# インストールが必要: pip install schedule
# setup_automation_schedule()

エラーハンドリングとログ管理

1. 堅牢なエラーハンドリング

import logging
import traceback
from pathlib import Path
import datetime

def setup_logging(log_file="automation.log"):
    """ログ設定を初期化"""
    logging.basicConfig(
        level=logging.INFO,
        format='%(asctime)s - %(levelname)s - %(message)s',
        handlers=[
            logging.FileHandler(log_file, encoding='utf-8'),
            logging.StreamHandler()  # コンソールにも出力
        ]
    )
    return logging.getLogger(__name__)

def safe_file_operation(func, *args, **kwargs):
    """安全なファイル操作のラッパー関数"""
    logger = logging.getLogger(__name__)
    
    try:
        result = func(*args, **kwargs)
        logger.info(f"操作成功: {func.__name__}")
        return {'success': True, 'result': result, 'error': None}
        
    except FileNotFoundError as e:
        logger.error(f"ファイルが見つかりません: {e}")
        return {'success': False, 'result': None, 'error': str(e)}
        
    except PermissionError as e:
        logger.error(f"アクセス権限がありません: {e}")
        return {'success': False, 'result': None, 'error': str(e)}
        
    except Exception as e:
        logger.error(f"予期しないエラー: {e}")
        logger.error(f"詳細: {traceback.format_exc()}")
        return {'success': False, 'result': None, 'error': str(e)}

def robust_batch_processor(file_list, process_function):
    """バッチ処理用の堅牢な処理関数"""
    logger = logging.getLogger(__name__)
    
    results = {
        'total': len(file_list),
        'success': 0,
        'failed': 0,
        'errors': [],
        'processed_files': []
    }
    
    logger.info(f"バッチ処理開始: {len(file_list)} ファイル")
    
    for i, file_path in enumerate(file_list, 1):
        logger.info(f"処理中 ({i}/{len(file_list)}): {file_path}")
        
        try:
            result = process_function(file_path)
            results['success'] += 1
            results['processed_files'].append({
                'file': str(file_path),
                'status': 'success',
                'result': result
            })
            logger.info(f"  ✓ 成功")
            
        except Exception as e:
            results['failed'] += 1
            error_info = {
                'file': str(file_path),
                'error': str(e),
                'traceback': traceback.format_exc()
            }
            results['errors'].append(error_info)
            results['processed_files'].append({
                'file': str(file_path),
                'status': 'failed',
                'error': str(e)
            })
            logger.error(f"  ✗ エラー: {e}")
    
    # 処理結果のサマリー
    logger.info(f"バッチ処理完了:")
    logger.info(f"  成功: {results['success']} ファイル")
    logger.info(f"  失敗: {results['failed']} ファイル")
    
    return results

# 使用例
def demo_error_handling():
    """エラーハンドリングのデモ"""
    
    # ログ設定
    logger = setup_logging("demo_automation.log")
    
    # テスト用の処理関数
    def test_file_processor(file_path):
        """テスト用のファイル処理関数"""
        path = Path(file_path)
        if not path.exists():
            raise FileNotFoundError(f"ファイルが存在しません: {file_path}")
        
        # ファイルサイズを返す(例)
        return path.stat().st_size
    
    # テストファイルリスト(存在しないファイルも含む)
    test_files = [
        "存在するファイル.txt",
        "存在しないファイル.txt",
        __file__,  # 現在のPythonファイル(存在する)
        "another_missing_file.csv"
    ]
    
    # 存在するテストファイルを作成
    Path("存在するファイル.txt").write_text("テストデータ", encoding='utf-8')
    
    # バッチ処理実行
    results = robust_batch_processor(test_files, test_file_processor)
    
    print("\n=== 処理結果 ===")
    print(f"総ファイル数: {results['total']}")
    print(f"成功: {results['success']}")
    print(f"失敗: {results['failed']}")
    
    if results['errors']:
        print("\nエラー詳細:")
        for error in results['errors']:
            print(f"  - {error['file']}: {error['error']}")

# デモ実行
demo_error_handling()

実践プロジェクト:売上データ自動処理システム

import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
import datetime
import logging

class SalesDataProcessor:
    """売上データ自動処理クラス"""
    
    def __init__(self, input_dir, output_dir):
        self.input_dir = Path(input_dir)
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(exist_ok=True)
        
        # ログ設定
        self.logger = self._setup_logging()
        
    def _setup_logging(self):
        """ログ設定"""
        log_file = self.output_dir / "sales_processing.log"
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler(log_file, encoding='utf-8'),
                logging.StreamHandler()
            ]
        )
        return logging.getLogger(__name__)
    
    def load_sales_data(self):
        """売上データの読み込み"""
        self.logger.info("売上データの読み込み開始")
        
        # Excel・CSVファイルを検索
        files = list(self.input_dir.glob("*.xlsx")) + \
               list(self.input_dir.glob("*.xls")) + \
               list(self.input_dir.glob("*.csv"))
        
        if not files:
            raise FileNotFoundError("データファイルが見つかりません")
        
        all_data = []
        
        for file_path in files:
            try:
                if file_path.suffix.lower() in ['.xlsx', '.xls']:
                    df = pd.read_excel(file_path)
                else:
                    df = pd.read_csv(file_path, encoding='utf-8-sig')
                
                df['ソースファイル'] = file_path.stem
                all_data.append(df)
                self.logger.info(f"読み込み完了: {file_path.name} ({len(df)} 行)")
                
            except Exception as e:
                self.logger.error(f"読み込みエラー: {file_path.name} - {e}")
        
        if not all_data:
            raise ValueError("有効なデータが見つかりませんでした")
        
        self.data = pd.concat(all_data, ignore_index=True)
        self.logger.info(f"データ統合完了: 総計 {len(self.data)} 行")
        
        return self.data
    
    def clean_data(self):
        """データクリーニング"""
        self.logger.info("データクリーニング開始")
        
        original_count = len(self.data)
        
        # 欠損値の処理
        self.data = self.data.dropna(subset=['売上', '数量'])
        
        # 数値型への変換
        numeric_columns = ['売上', '数量', '単価']
        for col in numeric_columns:
            if col in self.data.columns:
                self.data[col] = pd.to_numeric(self.data[col], errors='coerce')
        
        # 日付型への変換
        if '日付' in self.data.columns:
            self.data['日付'] = pd.to_datetime(self.data['日付'], errors='coerce')
        
        # 無効なデータを除去
        self.data = self.data.dropna()
        
        cleaned_count = len(self.data)
        removed_count = original_count - cleaned_count
        
        self.logger.info(f"データクリーニング完了: {removed_count} 行削除")
        
        return self.data
    
    def analyze_data(self):
        """データ分析"""
        self.logger.info("データ分析開始")
        
        analysis = {}
        
        # 基本統計
        analysis['基本統計'] = self.data.describe()
        
        # 商品別売上
        if '商品' in self.data.columns:
            analysis['商品別売上'] = self.data.groupby('商品')['売上'].sum().sort_values(ascending=False)
        
        # 地域別売上
        if '地域' in self.data.columns:
            analysis['地域別売上'] = self.data.groupby('地域')['売上'].sum().sort_values(ascending=False)
        
        # 月別売上推移
        if '日付' in self.data.columns:
            self.data['年月'] = self.data['日付'].dt.to_period('M')
            analysis['月別売上'] = self.data.groupby('年月')['売上'].sum()
        
        self.analysis = analysis
        self.logger.info("データ分析完了")
        
        return analysis
    
    def generate_reports(self):
        """レポート生成"""
        self.logger.info("レポート生成開始")
        
        # テキストレポート
        report_file = self.output_dir / f"売上分析レポート_{datetime.date.today()}.txt"
        
        with open(report_file, 'w', encoding='utf-8') as f:
            f.write("売上データ分析レポート\n")
            f.write("=" * 50 + "\n")
            f.write(f"生成日時: {datetime.datetime.now()}\n")
            f.write(f"データ件数: {len(self.data)}\n\n")
            
            for key, value in self.analysis.items():
                f.write(f"{key}:\n")
                f.write(str(value))
                f.write("\n" + "-" * 30 + "\n\n")
        
        # Excel詳細レポート
        excel_file = self.output_dir / f"売上分析_{datetime.date.today()}.xlsx"
        
        with pd.ExcelWriter(excel_file) as writer:
            self.data.to_excel(writer, sheet_name='統合データ', index=False)
            
            for key, value in self.analysis.items():
                if isinstance(value, pd.DataFrame) or isinstance(value, pd.Series):
                    value.to_excel(writer, sheet_name=key[:30])  # シート名の長さ制限
        
        self.logger.info(f"レポート生成完了: {report_file}, {excel_file}")
        
        return report_file, excel_file
    
    def create_visualizations(self):
        """グラフ作成"""
        self.logger.info("グラフ作成開始")
        
        plt.style.use('seaborn-v0_8')
        fig, axes = plt.subplots(2, 2, figsize=(15, 12))
        fig.suptitle('売上データ分析ダッシュボード', fontsize=16)
        
        # 商品別売上(棒グラフ)
        if '商品別売上' in self.analysis:
            top_products = self.analysis['商品別売上'].head(10)
            axes[0, 0].bar(top_products.index, top_products.values)
            axes[0, 0].set_title('商品別売上 TOP10')
            axes[0, 0].tick_params(axis='x', rotation=45)
        
        # 地域別売上(円グラフ)
        if '地域別売上' in self.analysis:
            axes[0, 1].pie(self.analysis['地域別売上'].values, 
                          labels=self.analysis['地域別売上'].index,
                          autopct='%1.1f%%')
            axes[0, 1].set_title('地域別売上構成比')
        
        # 月別売上推移(折れ線グラフ)
        if '月別売上' in self.analysis:
            monthly_sales = self.analysis['月別売上']
            axes[1, 0].plot(monthly_sales.index.astype(str), monthly_sales.values, marker='o')
            axes[1, 0].set_title('月別売上推移')
            axes[1, 0].tick_params(axis='x', rotation=45)
        
        # 売上分布(ヒストグラム)
        axes[1, 1].hist(self.data['売上'], bins=30, alpha=0.7)
        axes[1, 1].set_title('売上分布')
        axes[1, 1].set_xlabel('売上')
        axes[1, 1].set_ylabel('件数')
        
        plt.tight_layout()
        
        # グラフを保存
        graph_file = self.output_dir / f"売上分析グラフ_{datetime.date.today()}.png"
        plt.savefig(graph_file, dpi=300, bbox_inches='tight')
        plt.close()
        
        self.logger.info(f"グラフ作成完了: {graph_file}")
        
        return graph_file
    
    def run_full_analysis(self):
        """全自動分析実行"""
        try:
            # データ処理の実行
            self.load_sales_data()
            self.clean_data()
            self.analyze_data()
            
            # レポート生成
            text_report, excel_report = self.generate_reports()
            graph_file = self.create_visualizations()
            
            self.logger.info("全処理が正常に完了しました")
            
            return {
                'status': 'success',
                'data_count': len(self.data),
                'text_report': text_report,
                'excel_report': excel_report,
                'graph_file': graph_file
            }
            
        except Exception as e:
            self.logger.error(f"処理中にエラーが発生しました: {e}")
            return {
                'status': 'error',
                'error': str(e)
            }

# 使用例
def demo_sales_automation():
    """売上データ自動処理のデモ"""
    
    # テストデータの作成
    input_dir = Path("sales_input")
    input_dir.mkdir(exist_ok=True)
    
    # サンプル売上データを作成
    sample_data = pd.DataFrame({
        '日付': pd.date_range('2024-01-01', '2024-12-31', freq='D')[:100],
        '商品': pd.np.random.choice(['商品A', '商品B', '商品C', '商品D'], 100),
        '地域': pd.np.random.choice(['東京', '大阪', '名古屋', '福岡'], 100),
        '売上': pd.np.random.randint(1000, 50000, 100),
        '数量': pd.np.random.randint(1, 20, 100)
    })
    
    sample_data.to_excel(input_dir / "売上データ_サンプル.xlsx", index=False)
    print("サンプルデータを作成しました")
    
    # 自動処理実行
    processor = SalesDataProcessor("sales_input", "sales_output")
    result = processor.run_full_analysis()
    
    if result['status'] == 'success':
        print(f"\n処理完了!")
        print(f"処理データ数: {result['data_count']} 件")
        print(f"テキストレポート: {result['text_report']}")
        print(f"Excelレポート: {result['excel_report']}")
        print(f"グラフファイル: {result['graph_file']}")
    else:
        print(f"エラーが発生しました: {result['error']}")

# デモ実行
# 注意: matplotlib, seaborn のインストールが必要
# pip install matplotlib seaborn
demo_sales_automation()

重要なクラス・メソッドリファレンス

この記事で使用した主要なクラスとメソッドの解説をまとめました。

pathlibライブラリ(ファイル操作)

from pathlib import Path

# 重要なメソッド一覧
path_methods = {
    # パス操作
    "Path.cwd()": "現在の作業ディレクトリを取得",
    "Path.home()": "ホームディレクトリを取得", 
    "path.exists()": "ファイル・ディレクトリの存在確認",
    "path.is_file()": "ファイルかどうかの判定",
    "path.is_dir()": "ディレクトリかどうかの判定",
    
    # ファイル作成・操作
    "path.mkdir(exist_ok=True)": "ディレクトリ作成(既存でもエラーなし)",
    "path.write_text(content, encoding='utf-8')": "テキストファイル作成",
    "path.read_text(encoding='utf-8')": "テキストファイル読み込み",
    
    # パス情報取得
    "path.name": "ファイル名取得",
    "path.stem": "拡張子なしファイル名",
    "path.suffix": "拡張子取得",
    "path.parent": "親ディレクトリ取得",
    
    # ファイル検索
    "path.glob('*.txt')": "パターンマッチングでファイル検索",
    "path.rglob('*.csv')": "再帰的なファイル検索",
    
    # ファイル情報
    "path.stat()": "ファイル情報(サイズ、更新日時など)取得"
}

for method, description in path_methods.items():
    print(f"{method:<40} # {description}")

pandasライブラリ(データ処理)

import pandas as pd

# DataFrameの主要メソッド
dataframe_methods = {
    # ファイルI/O
    "pd.read_csv(file, encoding='utf-8-sig')": "CSVファイル読み込み",
    "pd.read_excel(file, sheet_name=None)": "Excelファイル読み込み(全シート)",
    "df.to_csv(file, index=False, encoding='utf-8-sig')": "CSV出力",
    "df.to_excel(writer, sheet_name='シート名', index=False)": "Excel出力",
    
    # データ操作
    "pd.concat([df1, df2], ignore_index=True)": "データフレーム結合",
    "df.groupby('列名').agg({'列名': ['sum', 'mean']})": "グループ集計",
    "df.select_dtypes(include=['number'])": "データ型による列選択",
    "df.describe()": "基本統計量計算",
    
    # データクリーニング
    "df.dropna()": "欠損値除去",
    "df.fillna(value)": "欠損値補完",
    "df.drop_duplicates()": "重複行削除",
    "pd.to_numeric(series, errors='coerce')": "数値型変換"
}

for method, description in dataframe_methods.items():
    print(f"{method:<50} # {description}")

shutilライブラリ(ファイル操作)

import shutil

# shutilの主要メソッド
shutil_methods = {
    "shutil.copy2(src, dst)": "ファイルコピー(メタデータ保持)",
    "shutil.move(src, dst)": "ファイル移動・リネーム",
    "shutil.copytree(src, dst)": "ディレクトリ全体をコピー",
    "shutil.rmtree(path)": "ディレクトリ全体を削除",
    "shutil.make_archive(base_name, format, root_dir)": "アーカイブファイル作成"
}

for method, description in shutil_methods.items():
    print(f"{method:<45} # {description}")

loggingライブラリ(ログ管理)

import logging

# ログレベルと使い分け
log_levels = {
    "logger.debug('詳細情報')": "DEBUG - 開発時のデバッグ情報",
    "logger.info('処理完了')": "INFO - 一般的な実行情報", 
    "logger.warning('注意事項')": "WARNING - 警告メッセージ",
    "logger.error('エラー発生')": "ERROR - エラー情報",
    "logger.critical('致命的エラー')": "CRITICAL - 深刻なエラー"
}

# ログ設定の基本パターン
logging_setup = """
logging.basicConfig(
    level=logging.INFO,                           # ログレベル設定
    format='%(asctime)s - %(levelname)s - %(message)s',  # フォーマット
    handlers=[
        logging.FileHandler('app.log', encoding='utf-8'),    # ファイル出力
        logging.StreamHandler()                               # コンソール出力
    ]
)
"""

print("ログレベルと使い分け:")
for usage, description in log_levels.items():
    print(f"  {usage:<30} # {description}")

print(f"\nログ設定の基本パターン:\n{logging_setup}")

scheduleライブラリ(タスクスケジューリング)

import schedule

# scheduleの主要メソッド
schedule_methods = {
    "schedule.every().day.at('09:00').do(func)": "毎日指定時刻に実行",
    "schedule.every().monday.at('10:00').do(func)": "毎週月曜日に実行",
    "schedule.every(10).minutes.do(func)": "10分間隔で実行",
    "schedule.every().hour.do(func)": "毎時実行",
    "schedule.run_pending()": "スケジュール済みタスクを実行",
    "schedule.clear()": "全スケジュールをクリア",
    "schedule.cancel_job(job)": "特定のジョブをキャンセル"
}

for method, description in schedule_methods.items():
    print(f"{method:<45} # {description}")

エラーハンドリングのベストプラクティス

# try-except文の効果的な使用パターン

# 1. 特定の例外を捕捉
try:
    file_path = Path("data.txt")
    content = file_path.read_text()
except FileNotFoundError:
    print("ファイルが見つかりません")
except PermissionError:
    print("ファイルへのアクセス権限がありません")

# 2. 複数例外の一括処理
try:
    df = pd.read_csv("data.csv")
    result = process_data(df)
except (FileNotFoundError, pd.errors.EmptyDataError, ValueError) as e:
    logger.error(f"データ処理エラー: {e}")
    return None

# 3. finally句でリソース解放
try:
    file = open("data.txt", "r")
    data = file.read()
except Exception as e:
    logger.error(f"ファイル読み込みエラー: {e}")
finally:
    if 'file' in locals():
        file.close()

# 4. with文によるリソース管理(推奨)
try:
    with open("data.txt", "r", encoding="utf-8") as file:
        data = file.read()
except Exception as e:
    logger.error(f"ファイル読み込みエラー: {e}")

実践的なクラス設計パターン

class AutomationBase:
    """自動化処理の基底クラス"""
    
    def __init__(self, input_dir, output_dir):
        self.input_dir = Path(input_dir)
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(exist_ok=True)
        self.logger = self._setup_logging()
    
    def _setup_logging(self):
        """ログ設定(プライベートメソッド)"""
        # ログ設定処理
        pass
    
    def validate_inputs(self):
        """入力データの検証(抽象メソッド)"""
        raise NotImplementedError("サブクラスで実装してください")
    
    def process_data(self):
        """データ処理(抽象メソッド)"""
        raise NotImplementedError("サブクラスで実装してください")
    
    def generate_output(self):
        """出力生成(抽象メソッド)"""
        raise NotImplementedError("サブクラスで実装してください")
    
    def run(self):
        """メイン実行フロー(テンプレートメソッド)"""
        try:
            self.validate_inputs()
            self.process_data()
            self.generate_output()
            self.logger.info("処理が正常に完了しました")
            return {"status": "success"}
        except Exception as e:
            self.logger.error(f"処理中にエラーが発生: {e}")
            return {"status": "error", "error": str(e)}

# 使用例:具体的な実装クラス
class FileOrganizerProcessor(AutomationBase):
    """ファイル整理処理クラス"""
    
    def validate_inputs(self):
        if not self.input_dir.exists():
            raise FileNotFoundError(f"入力ディレクトリが存在しません: {self.input_dir}")
    
    def process_data(self):
        # ファイル整理処理の実装
        pass
    
    def generate_output(self):
        # 処理結果レポート生成
        pass

まとめと次のステップ

この記事で学んだこと

  1. ファイル操作の自動化: 大量ファイルの分類・整理
  2. データ処理の自動化: Excel・CSV の変換・統合・分析
  3. エラーハンドリング: 堅牢な自動化システムの構築
  4. スケジュール実行: 定期的な自動処理の設定
  5. 実践プロジェクト: 売上データの全自動分析システム
  6. 重要なライブラリとメソッド: pathlib、pandas、shutil、logging、schedule の活用法

業務自動化の効果

  • 時間短縮: 手作業時間を90%以上削減
  • 品質向上: 人的ミスの削減
  • 継続性: 定期的な処理の確実な実行
  • スケーラビリティ: 大量データの効率的な処理
  1. Web スクレイピング: 外部データの自動取得
  2. API 連携: クラウドサービスとの自動連携
  3. GUI アプリケーション: ユーザーフレンドリーなツール作成
  4. クラウド化: AWS・Azure での自動化システム構築

業務自動化は一度覚えてしまえば、様々な場面で応用できる強力なスキルです。小さな作業から始めて、徐々に複雑な処理を自動化していきましょう。


関連する学習リソース

関連記事

Streamlit入門 第3部:ポートフォリオ管理とアラート機能で完全な投資ツールを構築
Python

Streamlit入門 第3部:ポートフォリオ管理とアラート機能で完全な投資ツールを構築

これまでの記事で価格表示とチャート分析ができるようになりました。今回は最終回として、自分の保有している仮想通貨を管理するポートフォリオ機能と、価格変動を知らせるアラート機能を追加します。これで本格的な投資管理ツールの完成です! ポートフォリオ管理とは?なぜ重要なのか ...

Streamlit入門 第2部:美しいチャートと可視化で仮想通貨を分析しよう
Python

Streamlit入門 第2部:美しいチャートと可視化で仮想通貨を分析しよう

前回の記事で基本的な価格表示ができるようになりました。今回は、プロのトレーダーが使うような美しいチャートを追加して、本格的な分析ツールに仕上げていきます。難しそうに見えますが、実は数行のコードで驚くほど高機能なチャートが作れるんです! Plotlyとは?なぜグラフライブラリの中で最強なのか ...

Python Webスクレイピング 2025 第3部:実践プロジェクトとデータ管理
Python

Python Webスクレイピング 2025 第3部:実践プロジェクトとデータ管理

はじめに シリーズ最終回となる第3部では、実際のプロジェクトで使える実践的なスクレイピングシステムの構築について解説します。検出回避技術、データの永続化、監視システムの構築など、プロダクション環境で必要となる高度な技術を学びます。 また、法的・倫理的配慮についても詳しく説明し、責任あるスクレイピング...

コメント

0/2000