Python Webスクレイピング 2025 第2部:非同期処理と高度なデータ抽出

約61分で読めます by ぽんたぬき
Python Webスクレイピング 2025 第2部:非同期処理と高度なデータ抽出

はじめに

第1部ではPlaywrightの基本的な使い方と環境構築について学びました。第2部では、並行処理によるパフォーマンス向上高度なデータ抽出テクニックについて詳しく解説します。

大規模なスクレイピングプロジェクトでは、単一ページずつの処理では時間がかかりすぎるため、複数のページを同時に処理する並行処理が必須です。また、複雑なWebサイトから確実にデータを抽出するための高度な技術も重要になります。

⚡ 非同期処理によるパフォーマンス向上

並行処理の基本概念

従来の同期処理では、1つのページの処理が完了するまで次のページに進めません。しかし、非同期処理を使用することで、複数のページを同時並行で処理できます。

# 同期処理の例(遅い)
def sync_scraping(urls):
    results = []
    for url in urls:  # 1つずつ順番に処理
        result = scrape_page(url)  # この処理中は他が待機
        results.append(result)
    return results

# 非同期処理の例(高速)
async def async_scraping(urls):
    # 全てのタスクを同時に開始
    tasks = [scrape_page(url) for url in urls]
    # 全てのタスクの完了を待つ
    results = await asyncio.gather(*tasks)
    return results

制御された並行スクレイピング

無制限の並行処理はサーバーに負荷をかけるため、セマフォを使用して同時実行数を制限します。

class AsyncBatchScraper:
    """
    バッチ処理による高速スクレイピング
    
    セマフォを使用して同時実行数を制限
    サーバー負荷とパフォーマンスのバランスを取る
    """
    
    def __init__(self, max_concurrent: int = 5):
        """
        初期化メソッド
        
        Args:
            max_concurrent: 最大同時実行数
                           - 多すぎるとサーバー負荷増大
                           - 少なすぎるとパフォーマンス低下
        """
        self.max_concurrent = max_concurrent
        
        # セマフォ: 同時実行数を制限するための仕組み
        # 指定した数だけのスレッドが同時に実行可能
        self.semaphore = asyncio.Semaphore(max_concurrent)
    
    async def scrape_urls_concurrently(self, urls: List[str]) -> List[Dict]:
        """
        複数URLを並行処理でスクレイピング
        
        Args:
            urls: スクレイピング対象のURLリスト
            
        Returns:
            スクレイピング結果のリスト
        """
        
        async def scrape_single_url(url: str) -> Dict:
            """
            単一URLのスクレイピング(セマフォ制御付き)
            
            この内部関数は外部のsemaphoreを使用して
            同時実行数を制限する
            """
            async with self.semaphore:  # セマフォによる実行数制限
                # 設定オブジェクトの作成
                config = ScrapingConfig(headless=True, timeout=15000)
                
                try:
                    # ModernWebScraperを使用してページを取得
                    async with ModernWebScraper(config) as scraper:
                        result = await scraper.scrape_page(url)
                        
                        # レート制限(1秒待機)
                        # 各リクエスト間に適切な間隔を設ける
                        await asyncio.sleep(1)
                        return result
                        
                except Exception as e:
                    # 個別のエラーは結果として記録
                    # 他のURLの処理は継続
                    return {
                        'url': url,
                        'error': str(e),
                        'status': 'error',
                        'timestamp': datetime.now()
                    }
        
        # メイン処理の開始
        logger.info(f"Starting concurrent scraping of {len(urls)} URLs")
        
        # 全URLに対してタスクを作成
        tasks = [scrape_single_url(url) for url in urls]
        
        # asyncio.gather()で全タスクを並行実行
        # return_exceptions=Trueで例外も結果として取得
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
        # 例外処理とフォーマット統一
        processed_results = []
        for result in results:
            if isinstance(result, Exception):
                # 例外が発生した場合のフォーマット
                processed_results.append({
                    'error': str(result),
                    'status': 'exception',
                    'timestamp': datetime.now()
                })
            else:
                processed_results.append(result)
        
        return processed_results

# 使用例の詳細解説
async def demo_concurrent_scraping():
    """
    並行スクレイピングのデモ
    
    同期処理と非同期処理のパフォーマンス比較も含む
    """
    
    # テスト用URL(httpbin.orgのサービスを使用)
    test_urls = [
        "https://httpbin.org/delay/1",  # 1秒遅延してレスポンス
        "https://httpbin.org/delay/2",  # 2秒遅延してレスポンス
        "https://httpbin.org/json",     # JSON形式レスポンス
        "https://httpbin.org/html",     # HTML形式レスポンス
        "https://httpbin.org/xml"       # XML形式レスポンス
    ]
    
    # バッチスクレイパーのインスタンス作成
    # max_concurrent=3で同時実行数を3に制限
    scraper = AsyncBatchScraper(max_concurrent=3)
    
    # 実行時間の計測開始
    start_time = time.time()
    
    # 並行処理実行
    results = await scraper.scrape_urls_concurrently(test_urls)
    
    # 実行時間の計算
    elapsed_time = time.time() - start_time
    
    # 結果の分析と表示
    successful_count = sum(1 for r in results if r.get('status') == 'success')
    
    print(f"並行処理完了: {elapsed_time:.2f}秒")
    print(f"成功: {successful_count} / {len(results)}")
    print(f"平均処理時間: {elapsed_time/len(test_urls):.2f}秒/URL")
    
    # 個別結果の詳細表示
    for i, result in enumerate(results):
        status = "✅" if result.get('status') == 'success' else "❌"
        url = result.get('url', 'Unknown')
        print(f"{status} {i+1}. {url}")
    
    return results

# 実行例
# results = await demo_concurrent_scraping()

進歩的なバッチ処理(進捗表示付き)

class ProgressiveBatchScraper(AsyncBatchScraper):
    """
    進捗表示機能付きバッチスクレイパー
    
    大量のURLを処理する際の進捗状況を可視化
    """
    
    def __init__(self, max_concurrent: int = 5, batch_size: int = 10):
        """
        初期化メソッド
        
        Args:
            max_concurrent: 最大同時実行数
            batch_size: バッチサイズ(進捗表示の単位)
        """
        super().__init__(max_concurrent)
        self.batch_size = batch_size
        self.completed_count = 0
        self.total_count = 0
    
    async def scrape_with_progress(self, urls: List[str]) -> List[Dict]:
        """
        進捗表示付きスクレイピング
        
        大量のURLを小分けして処理し、
        進捗状況をリアルタイムで表示
        """
        self.total_count = len(urls)
        self.completed_count = 0
        all_results = []
        
        # URLリストをバッチサイズで分割
        for i in range(0, len(urls), self.batch_size):
            # バッチの作成
            batch_urls = urls[i:i + self.batch_size]
            batch_num = i // self.batch_size + 1
            total_batches = (len(urls) + self.batch_size - 1) // self.batch_size
            
            print(f"バッチ {batch_num}/{total_batches} 処理中... ({len(batch_urls)}件)")
            
            # バッチの並行処理実行
            batch_results = await self.scrape_urls_concurrently(batch_urls)
            all_results.extend(batch_results)
            
            # 進捗状況の更新
            self.completed_count += len(batch_urls)
            progress = (self.completed_count / self.total_count) * 100
            
            print(f"進捗: {self.completed_count}/{self.total_count} ({progress:.1f}%)")
            
            # バッチ間の休憩(サーバー負荷軽減)
            if i + self.batch_size < len(urls):
                print("バッチ間休憩中...")
                await asyncio.sleep(2)
        
        return all_results
    
    def get_performance_stats(self, results: List[Dict], elapsed_time: float) -> Dict:
        """
        パフォーマンス統計の計算
        
        Args:
            results: スクレイピング結果
            elapsed_time: 実行時間
            
        Returns:
            パフォーマンス統計
        """
        successful = sum(1 for r in results if r.get('status') == 'success')
        failed = len(results) - successful
        
        return {
            'total_urls': len(results),
            'successful': successful,
            'failed': failed,
            'success_rate': (successful / len(results)) * 100 if results else 0,
            'total_time': elapsed_time,
            'avg_time_per_url': elapsed_time / len(results) if results else 0,
            'urls_per_second': len(results) / elapsed_time if elapsed_time > 0 else 0
        }

# 使用例
async def demo_progressive_scraping():
    """
    進捗表示付きスクレイピングのデモ
    """
    
    # 大量のテストURLを生成
    base_urls = [
        "https://httpbin.org/json",
        "https://httpbin.org/html",
        "https://httpbin.org/xml"
    ]
    
    # クエリパラメータを付けて重複を避ける
    test_urls = []
    for i in range(15):  # 15個のURLを生成
        base_url = base_urls[i % len(base_urls)]
        test_urls.append(f"{base_url}?test={i}")
    
    # 進捗表示付きスクレイパーを作成
    scraper = ProgressiveBatchScraper(max_concurrent=3, batch_size=5)
    
    # 実行時間計測
    start_time = time.time()
    results = await scraper.scrape_with_progress(test_urls)
    elapsed_time = time.time() - start_time
    
    # パフォーマンス統計の表示
    stats = scraper.get_performance_stats(results, elapsed_time)
    
    print(f"\n=== パフォーマンス統計 ===")
    print(f"総URL数: {stats['total_urls']}")
    print(f"成功数: {stats['successful']}")
    print(f"失敗数: {stats['failed']}")
    print(f"成功率: {stats['success_rate']:.1f}%")
    print(f"総実行時間: {stats['total_time']:.2f}秒")
    print(f"平均処理時間: {stats['avg_time_per_url']:.2f}秒/URL")
    print(f"処理速度: {stats['urls_per_second']:.1f} URLs/秒")
    
    return results, stats

# 実行例
# progressive_results = await demo_progressive_scraping()

🔍 高度なデータ抽出テクニック

構造化データ抽出システム

実際のWebサイトから確実にデータを抽出するには、複数の抽出方法を組み合わせる必要があります。

class AdvancedDataExtractor:
    """
    高度なデータ抽出クラス
    
    複数の抽出手法を組み合わせて
    堅牢なデータ抽出を実現
    """
    
    def __init__(self):
        """
        初期化メソッド
        
        よく使用される抽出パターンを事前定義
        """
        # 正規表現パターンライブラリ
        self.extraction_patterns = {
            'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
            'phone': r'(\+?\d{1,3}[-.]?)?\(?\d{3}\)?[-.]?\d{3}[-.]?\d{4}',
            'price': r'[¥$€£]\s*[\d,]+\.?\d*|[\d,]+\.?\d*\s*[¥$€£]',
            'date': r'\d{4}[-/]\d{1,2}[-/]\d{1,2}|\d{1,2}[-/]\d{1,2}[-/]\d{4}',
            'url': r'https?://(?:[-\w.])+(?:[:\d]+)?(?:/(?:[\w/_.])*(?:\?(?:[\w&=%.])*)?(?:#(?:[\w.])*)?)?'
        }
        
        # よく使用されるCSSセレクターパターン
        self.common_selectors = {
            'title': ['h1', '.title', '.headline', '.article-title', '.post-title'],
            'price': ['.price', '.amount', '.cost', '.value', '.price-current'],
            'description': ['.description', '.summary', '.content', '.article-content'],
            'rating': ['.rating', '.score', '.stars', '.review-score'],
            'date': ['.date', '.timestamp', '.published', '.post-date']
        }
    
    async def extract_structured_data(self, html_content: str, target_schema: Dict) -> Dict:
        """
        構造化データ抽出
        
        定義されたスキーマに基づいて
        HTMLから構造化データを抽出
        
        Args:
            html_content: HTML文字列
            target_schema: 抽出スキーマの定義
            
        Returns:
            抽出されたデータの辞書
        """
        soup = BeautifulSoup(html_content, 'html.parser')
        extracted_data = {}
        
        # スキーマの各フィールドを処理
        for field, config in target_schema.items():
            try:
                # 抽出タイプに応じて処理を分岐
                if config['type'] == 'css_selector':
                    # CSSセレクターによる抽出
                    elements = soup.select(config['selector'])
                    
                    if config.get('multiple', False):
                        # 複数要素の場合はリストで返す
                        extracted_data[field] = [elem.get_text(strip=True) for elem in elements]
                    else:
                        # 単一要素の場合は最初のマッチを使用
                        extracted_data[field] = elements[0].get_text(strip=True) if elements else None
                
                elif config['type'] == 'regex':
                    # 正規表現による抽出
                    import re
                    pattern = self.extraction_patterns.get(config['pattern'], config['pattern'])
                    matches = re.findall(pattern, html_content)
                    
                    if config.get('multiple', False):
                        extracted_data[field] = matches
                    else:
                        extracted_data[field] = matches[0] if matches else None
                
                elif config['type'] == 'json_ld':
                    # JSON-LD(構造化データ)による抽出
                    json_ld_scripts = soup.find_all('script', type='application/ld+json')
                    
                    for script in json_ld_scripts:
                        try:
                            data = json.loads(script.string)
                            
                            # ネストしたキーの取得をサポート
                            keys = config['key'].split('.')
                            value = data
                            
                            for key in keys:
                                if isinstance(value, dict) and key in value:
                                    value = value[key]
                                else:
                                    value = None
                                    break
                            
                            if value is not None:
                                extracted_data[field] = value
                                break
                                
                        except json.JSONDecodeError:
                            continue
                
                elif config['type'] == 'auto_detect':
                    # 自動検出機能
                    extracted_data[field] = await self.auto_detect_field(soup, field)
                
                elif config['type'] == 'attribute':
                    # HTML属性による抽出
                    elements = soup.select(config['selector'])
                    if elements:
                        attr_value = elements[0].get(config['attribute'])
                        extracted_data[field] = attr_value
                    else:
                        extracted_data[field] = None
                
            except Exception as e:
                logger.error(f"Error extracting {field}: {e}")
                extracted_data[field] = None
        
        return extracted_data
    
    async def auto_detect_field(self, soup: BeautifulSoup, field_type: str) -> Optional[str]:
        """
        フィールド自動検出
        
        一般的なCSSセレクターのパターンを試行して
        該当するフィールドを自動検出
        
        Args:
            soup: BeautifulSoupオブジェクト
            field_type: フィールドタイプ(title, price等)
            
        Returns:
            検出されたテキスト(見つからない場合はNone)
        """
        if field_type in self.common_selectors:
            # 一般的なセレクターパターンを順次試行
            for selector in self.common_selectors[field_type]:
                element = soup.select_one(selector)
                if element:
                    return element.get_text(strip=True)
        
        return None
    
    def extract_with_confidence(self, html_content: str, extraction_rules: List[Dict]) -> Dict:
        """
        信頼度付きデータ抽出
        
        複数の抽出方法を試行し、
        最も信頼度の高い結果を選択
        
        Args:
            html_content: HTML文字列
            extraction_rules: 抽出ルールのリスト
            
        Returns:
            信頼度付きの抽出結果
        """
        soup = BeautifulSoup(html_content, 'html.parser')
        results = {}
        
        # 各フィールドの抽出ルールを処理
        for rule in extraction_rules:
            field_name = rule['field']
            methods = rule['methods']  # 複数の抽出方法
            
            field_results = []
            
            # 各抽出方法を試行
            for method in methods:
                try:
                    if method['type'] == 'css':
                        elements = soup.select(method['selector'])
                        if elements:
                            value = elements[0].get_text(strip=True)
                            field_results.append({
                                'value': value,
                                'confidence': method.get('confidence', 0.5),
                                'method': 'css_selector'
                            })
                    
                    elif method['type'] == 'regex':
                        import re
                        pattern = method['pattern']
                        matches = re.findall(pattern, html_content)
                        if matches:
                            field_results.append({
                                'value': matches[0],
                                'confidence': method.get('confidence', 0.7),
                                'method': 'regex'
                            })
                    
                    elif method['type'] == 'xpath':
                        # XPathによる抽出(より複雑な選択が可能)
                        try:
                            from lxml import etree, html as lxml_html
                            tree = lxml_html.fromstring(html_content)
                            elements = tree.xpath(method['xpath'])
                            
                            if elements:
                                if hasattr(elements[0], 'text'):
                                    value = elements[0].text or ''
                                else:
                                    value = str(elements[0])
                                
                                field_results.append({
                                    'value': value.strip(),
                                    'confidence': method.get('confidence', 0.8),
                                    'method': 'xpath'
                                })
                        except ImportError:
                            logger.warning("lxml not installed, skipping XPath method")
                        except Exception as e:
                            logger.debug(f"XPath extraction failed: {e}")
                    
                except Exception as e:
                    logger.debug(f"Extraction method failed: {e}")
                    continue
            
            # 最高信頼度の結果を選択
            if field_results:
                best_result = max(field_results, key=lambda x: x['confidence'])
                results[field_name] = {
                    'value': best_result['value'],
                    'confidence': best_result['confidence'],
                    'method': best_result['method'],
                    'alternatives': [r for r in field_results if r != best_result]
                }
            else:
                results[field_name] = {
                    'value': None,
                    'confidence': 0.0,
                    'method': 'none',
                    'alternatives': []
                }
        
        return results

# 使用例の詳細解説
async def demo_advanced_extraction():
    """
    高度な抽出のデモ
    
    実際のECサイト風HTMLからの商品情報抽出を模擬
    """
    
    # より複雑なサンプルHTML(ECサイト風)
    sample_html = """
    <html>
        <head>
            <title>商品詳細 - スマートフォン XYZ Pro</title>
            <meta property="og:title" content="スマートフォン XYZ Pro">
            <meta property="og:price:amount" content="128000">
        </head>
        <body>
            <div class="product-container">
                <div class="breadcrumb">
                    <a href="/">ホーム</a> > <a href="/electronics">家電</a> > スマートフォン
                </div>
                
                <div class="product-info">
                    <h1 class="product-title">スマートフォン XYZ Pro</h1>
                    <div class="price-section">
                        <span class="current-price">¥128,000</span>
                        <span class="original-price">¥148,000</span>
                        <span class="discount">13%OFF</span>
                    </div>
                    
                    <div class="rating-section">
                        <div class="stars" data-rating="4.2">★★★★☆</div>
                        <span class="rating-text">(4.2/5)</span>
                        <a href="#reviews">178件のレビュー</a>
                    </div>
                    
                    <div class="description">
                        <p>5G対応の最新フラッグシップモデル。高性能カメラとバッテリー長持ち。</p>
                    </div>
                    
                    <div class="specifications">
                        <ul>
                            <li>ディスプレイ: 6.7インチ有機EL</li>
                            <li>ストレージ: 256GB</li>
                            <li>カメラ: トリプルレンズ</li>
                            <li>バッテリー: 4500mAh</li>
                        </ul>
                    </div>
                    
                    <div class="availability">
                        <span class="stock-status in-stock">在庫あり</span>
                        <span class="shipping">明日お届け</span>
                    </div>
                </div>
            </div>
            
            <!-- JSON-LD構造化データ -->
            <script type="application/ld+json">
            {
                "@context": "http://schema.org/",
                "@type": "Product",
                "name": "スマートフォン XYZ Pro",
                "image": "https://example.com/phone.jpg",
                "description": "5G対応の最新フラッグシップモデル",
                "sku": "XYZ-PRO-256",
                "brand": {
                    "@type": "Brand",
                    "name": "XYZ Corporation"
                },
                "offers": {
                    "@type": "Offer",
                    "url": "https://example.com/phone-xyz-pro",
                    "priceCurrency": "JPY",
                    "price": "128000",
                    "itemCondition": "http://schema.org/NewCondition",
                    "availability": "http://schema.org/InStock"
                },
                "aggregateRating": {
                    "@type": "AggregateRating",
                    "ratingValue": "4.2",
                    "reviewCount": "178"
                }
            }
            </script>
        </body>
    </html>
    """
    
    extractor = AdvancedDataExtractor()
    
    # 構造化抽出スキーマの定義
    extraction_schema = {
        'product_name': {
            'type': 'css_selector',
            'selector': '.product-title',
            'multiple': False
        },
        'current_price': {
            'type': 'css_selector',
            'selector': '.current-price',
            'multiple': False
        },
        'rating_value': {
            'type': 'attribute',
            'selector': '.stars',
            'attribute': 'data-rating'
        },
        'specifications': {
            'type': 'css_selector',
            'selector': '.specifications li',
            'multiple': True
        },
        'structured_data': {
            'type': 'json_ld',
            'key': 'offers.price'
        },
        'stock_status': {
            'type': 'auto_detect'
        }
    }
    
    # 信頼度付き抽出ルール
    confidence_rules = [
        {
            'field': 'product_price',
            'methods': [
                {
                    'type': 'css',
                    'selector': '.current-price',
                    'confidence': 0.9
                },
                {
                    'type': 'regex',
                    'pattern': r'¥[\d,]+',
                    'confidence': 0.7
                },
                {
                    'type': 'css',
                    'selector': '.price-section .price',
                    'confidence': 0.6
                }
            ]
        },
        {
            'field': 'product_rating',
            'methods': [
                {
                    'type': 'xpath',
                    'xpath': '//*[@data-rating]/@data-rating',
                    'confidence': 0.9
                },
                {
                    'type': 'regex',
                    'pattern': r'(\d+\.\d+)/5',
                    'confidence': 0.8
                },
                {
                    'type': 'css',
                    'selector': '.rating-text',
                    'confidence': 0.7
                }
            ]
        }
    ]
    
    # 構造化データ抽出の実行
    print("=== 構造化データ抽出 ===")
    structured_result = await extractor.extract_structured_data(sample_html, extraction_schema)
    
    for key, value in structured_result.items():
        print(f"{key}: {value}")
    
    # 信頼度付き抽出の実行
    print("\n=== 信頼度付き抽出 ===")
    confidence_result = extractor.extract_with_confidence(sample_html, confidence_rules)
    
    for key, result in confidence_result.items():
        print(f"{key}:")
        print(f"  値: {result['value']}")
        print(f"  信頼度: {result['confidence']:.1f}")
        print(f"  抽出方法: {result['method']}")
        if result['alternatives']:
            print(f"  代替案数: {len(result['alternatives'])}")
    
    return structured_result, confidence_result

# 実行例
# extraction_results = await demo_advanced_extraction()

🛡️ エラーハンドリングとリトライ機能

指数バックオフによるリトライシステム

class RobustScraper:
    """
    堅牢なスクレイピングクラス
    
    エラー処理とリトライ機能を備えた
    プロダクション品質のスクレイパー
    """
    
    def __init__(self, config: ScrapingConfig = None):
        self.config = config or ScrapingConfig()
        self.error_stats = {
            'total_requests': 0,
            'successful_requests': 0,
            'failed_requests': 0,
            'retry_attempts': 0,
            'error_types': {}
        }
    
    async def scrape_with_retry(self, 
                              url: str, 
                              max_retries: int = 3,
                              backoff_factor: float = 2.0,
                              initial_delay: float = 1.0) -> Dict:
        """
        リトライ機能付きスクレイピング
        
        指数バックオフアルゴリズムを使用して
        一時的なエラーに対して自動的にリトライ
        
        Args:
            url: スクレイピング対象URL
            max_retries: 最大リトライ回数
            backoff_factor: 遅延時間の倍率
            initial_delay: 初期遅延時間(秒)
            
        Returns:
            スクレイピング結果
        """
        self.error_stats['total_requests'] += 1
        
        for attempt in range(max_retries + 1):
            try:
                async with ModernWebScraper(self.config) as scraper:
                    result = await scraper.scrape_page(url)
                    
                    if result['status'] == 'success':
                        self.error_stats['successful_requests'] += 1
                        return result
                    else:
                        raise Exception(f"Scraping failed: {result.get('error', 'Unknown error')}")
                        
            except Exception as e:
                error_type = type(e).__name__
                self.error_stats['error_types'][error_type] = \
                    self.error_stats['error_types'].get(error_type, 0) + 1
                
                # 最後の試行の場合はエラーを記録して終了
                if attempt == max_retries:
                    self.error_stats['failed_requests'] += 1
                    logger.error(f"Final attempt failed for {url}: {e}")
                    return {
                        'url': url,
                        'error': str(e),
                        'attempts': attempt + 1,
                        'status': 'failed_after_retries',
                        'timestamp': datetime.now()
                    }
                
                # リトライ時の遅延計算(指数バックオフ)
                delay = initial_delay * (backoff_factor ** attempt)
                
                # ジッター(ランダム要素)を追加して同時リトライを回避
                import random
                jitter = delay * random.uniform(0.1, 0.5)
                total_delay = delay + jitter
                
                self.error_stats['retry_attempts'] += 1
                logger.warning(f"Attempt {attempt + 1} failed for {url}: {e}. Retrying in {total_delay:.1f}s...")
                
                await asyncio.sleep(total_delay)
        
        # ここに到達することはないが、安全のため
        return {
            'url': url,
            'error': 'Unexpected retry loop exit',
            'status': 'error',
            'timestamp': datetime.now()
        }
    
    def get_error_statistics(self) -> Dict:
        """
        エラー統計の取得
        
        Returns:
            エラー統計情報
        """
        success_rate = 0
        if self.error_stats['total_requests'] > 0:
            success_rate = (self.error_stats['successful_requests'] / 
                          self.error_stats['total_requests']) * 100
        
        return {
            **self.error_stats,
            'success_rate': success_rate,
            'average_retries_per_request': (
                self.error_stats['retry_attempts'] / 
                max(self.error_stats['total_requests'], 1)
            )
        }

# 使用例
async def demo_robust_scraping():
    """
    堅牢なスクレイピングのデモ
    
    エラーが発生しやすいURLも含めて
    リトライ機能をテスト
    """
    
    scraper = RobustScraper()
    
    # 成功・失敗が混在するテストURL
    test_urls = [
        "https://httpbin.org/status/200",    # 成功
        "https://httpbin.org/status/500",    # サーバーエラー(リトライ対象)
        "https://httpbin.org/delay/10",      # タイムアウト(リトライ対象)
        "https://httpbin.org/json",          # 成功
        "https://invalid-url-for-test.example",  # DNS解決エラー
    ]
    
    results = []
    
    for url in test_urls:
        print(f"\nスクレイピング開始: {url}")
        result = await scraper.scrape_with_retry(
            url,
            max_retries=2,
            backoff_factor=2.0,
            initial_delay=1.0
        )
        
        status_icon = "✅" if result['status'] == 'success' else "❌"
        print(f"{status_icon} 結果: {result['status']}")
        
        results.append(result)
    
    # エラー統計の表示
    stats = scraper.get_error_statistics()
    print(f"\n=== エラー統計 ===")
    print(f"総リクエスト数: {stats['total_requests']}")
    print(f"成功数: {stats['successful_requests']}")
    print(f"失敗数: {stats['failed_requests']}")
    print(f"成功率: {stats['success_rate']:.1f}%")
    print(f"総リトライ回数: {stats['retry_attempts']}")
    print(f"平均リトライ数: {stats['average_retries_per_request']:.1f}")
    print(f"エラータイプ: {stats['error_types']}")
    
    return results, stats

# 実行例
# robust_results = await demo_robust_scraping()

📋 第2部のまとめ

学習した高度なテクニック

  1. 並行処理の実装: asyncio.gather()とセマフォによる制御
  2. バッチ処理: 大量URLの効率的な処理と進捗表示
  3. 高度なデータ抽出: 複数手法の組み合わせと信頼度評価
  4. エラーハンドリング: 指数バックオフとリトライ機構

パフォーマンス向上のポイント

  • 適切な同時実行数: サーバー負荷とスピードのバランス
  • バッチ分割: メモリ使用量の制御
  • エラー処理: 一部の失敗が全体に影響しない設計
  • 統計情報: パフォーマンスの可視化と改善点の特定

第3部では、以下の実践的なトピックについて解説します:


免責事項: 本記事の内容は教育目的のものです。実際のスクレイピング実行時は、対象サイトの利用規約を遵守し、適切な許可を得てから実施してください。

関連記事

Streamlit入門 第3部:ポートフォリオ管理とアラート機能で完全な投資ツールを構築
Python

Streamlit入門 第3部:ポートフォリオ管理とアラート機能で完全な投資ツールを構築

これまでの記事で価格表示とチャート分析ができるようになりました。今回は最終回として、自分の保有している仮想通貨を管理するポートフォリオ機能と、価格変動を知らせるアラート機能を追加します。これで本格的な投資管理ツールの完成です! ポートフォリオ管理とは?なぜ重要なのか ...

Streamlit入門 第2部:美しいチャートと可視化で仮想通貨を分析しよう
Python

Streamlit入門 第2部:美しいチャートと可視化で仮想通貨を分析しよう

前回の記事で基本的な価格表示ができるようになりました。今回は、プロのトレーダーが使うような美しいチャートを追加して、本格的な分析ツールに仕上げていきます。難しそうに見えますが、実は数行のコードで驚くほど高機能なチャートが作れるんです! Plotlyとは?なぜグラフライブラリの中で最強なのか ...

Python Webスクレイピング 2025 第3部:実践プロジェクトとデータ管理
Python

Python Webスクレイピング 2025 第3部:実践プロジェクトとデータ管理

はじめに シリーズ最終回となる第3部では、実際のプロジェクトで使える実践的なスクレイピングシステムの構築について解説します。検出回避技術、データの永続化、監視システムの構築など、プロダクション環境で必要となる高度な技術を学びます。 また、法的・倫理的配慮についても詳しく説明し、責任あるスクレイピング...

コメント

0/2000