Python Webスクレイピング 2025 第2部:非同期処理と高度なデータ抽出

はじめに

第1部ではPlaywrightの基本的な使い方と環境構築について学びました。第2部では、並行処理によるパフォーマンス向上高度なデータ抽出テクニックについて詳しく解説します。

大規模なスクレイピングプロジェクトでは、単一ページずつの処理では時間がかかりすぎるため、複数のページを同時に処理する並行処理が必須です。また、複雑なWebサイトから確実にデータを抽出するための高度な技術も重要になります。

⚡ 非同期処理によるパフォーマンス向上

並行処理の基本概念

従来の同期処理では、1つのページの処理が完了するまで次のページに進めません。しかし、非同期処理を使用することで、複数のページを同時並行で処理できます。

# 同期処理の例(遅い)
def sync_scraping(urls):
    results = []
    for url in urls:  # 1つずつ順番に処理
        result = scrape_page(url)  # この処理中は他が待機
        results.append(result)
    return results

# 非同期処理の例(高速)
async def async_scraping(urls):
    # 全てのタスクを同時に開始
    tasks = [scrape_page(url) for url in urls]
    # 全てのタスクの完了を待つ
    results = await asyncio.gather(*tasks)
    return results

制御された並行スクレイピング

無制限の並行処理はサーバーに負荷をかけるため、セマフォを使用して同時実行数を制限します。

class AsyncBatchScraper:
    """
    バッチ処理による高速スクレイピング

    セマフォを使用して同時実行数を制限
    サーバー負荷とパフォーマンスのバランスを取る
    """

    def __init__(self, max_concurrent: int = 5):
        """
        初期化メソッド

        Args:
            max_concurrent: 最大同時実行数
                           - 多すぎるとサーバー負荷増大
                           - 少なすぎるとパフォーマンス低下
        """
        self.max_concurrent = max_concurrent

        # セマフォ: 同時実行数を制限するための仕組み
        # 指定した数だけのスレッドが同時に実行可能
        self.semaphore = asyncio.Semaphore(max_concurrent)

    async def scrape_urls_concurrently(self, urls: List[str]) -> List[Dict]:
        """
        複数URLを並行処理でスクレイピング

        Args:
            urls: スクレイピング対象のURLリスト

        Returns:
            スクレイピング結果のリスト
        """

        async def scrape_single_url(url: str) -> Dict:
            """
            単一URLのスクレイピング(セマフォ制御付き)

            この内部関数は外部のsemaphoreを使用して
            同時実行数を制限する
            """
            async with self.semaphore:  # セマフォによる実行数制限
                # 設定オブジェクトの作成
                config = ScrapingConfig(headless=True, timeout=15000)

                try:
                    # ModernWebScraperを使用してページを取得
                    async with ModernWebScraper(config) as scraper:
                        result = await scraper.scrape_page(url)

                        # レート制限(1秒待機)
                        # 各リクエスト間に適切な間隔を設ける
                        await asyncio.sleep(1)
                        return result

                except Exception as e:
                    # 個別のエラーは結果として記録
                    # 他のURLの処理は継続
                    return {
                        'url': url,
                        'error': str(e),
                        'status': 'error',
                        'timestamp': datetime.now()
                    }

        # メイン処理の開始
        logger.info(f"Starting concurrent scraping of {len(urls)} URLs")

        # 全URLに対してタスクを作成
        tasks = [scrape_single_url(url) for url in urls]

        # asyncio.gather()で全タスクを並行実行
        # return_exceptions=Trueで例外も結果として取得
        results = await asyncio.gather(*tasks, return_exceptions=True)

        # 例外処理とフォーマット統一
        processed_results = []
        for result in results:
            if isinstance(result, Exception):
                # 例外が発生した場合のフォーマット
                processed_results.append({
                    'error': str(result),
                    'status': 'exception',
                    'timestamp': datetime.now()
                })
            else:
                processed_results.append(result)

        return processed_results

# 使用例の詳細解説
async def demo_concurrent_scraping():
    """
    並行スクレイピングのデモ

    同期処理と非同期処理のパフォーマンス比較も含む
    """

    # テスト用URL(httpbin.orgのサービスを使用)
    test_urls = [
        "https://httpbin.org/delay/1",  # 1秒遅延してレスポンス
        "https://httpbin.org/delay/2",  # 2秒遅延してレスポンス
        "https://httpbin.org/json",     # JSON形式レスポンス
        "https://httpbin.org/html",     # HTML形式レスポンス
        "https://httpbin.org/xml"       # XML形式レスポンス
    ]

    # バッチスクレイパーのインスタンス作成
    # max_concurrent=3で同時実行数を3に制限
    scraper = AsyncBatchScraper(max_concurrent=3)

    # 実行時間の計測開始
    start_time = time.time()

    # 並行処理実行
    results = await scraper.scrape_urls_concurrently(test_urls)

    # 実行時間の計算
    elapsed_time = time.time() - start_time

    # 結果の分析と表示
    successful_count = sum(1 for r in results if r.get('status') == 'success')

    print(f"並行処理完了: {elapsed_time:.2f}秒")
    print(f"成功: {successful_count} / {len(results)}")
    print(f"平均処理時間: {elapsed_time/len(test_urls):.2f}秒/URL")

    # 個別結果の詳細表示
    for i, result in enumerate(results):
        status = "✅" if result.get('status') == 'success' else "❌"
        url = result.get('url', 'Unknown')
        print(f"{status} {i+1}. {url}")

    return results

# 実行例
# results = await demo_concurrent_scraping()

進歩的なバッチ処理(進捗表示付き)

class ProgressiveBatchScraper(AsyncBatchScraper):
    """
    進捗表示機能付きバッチスクレイパー

    大量のURLを処理する際の進捗状況を可視化
    """

    def __init__(self, max_concurrent: int = 5, batch_size: int = 10):
        """
        初期化メソッド

        Args:
            max_concurrent: 最大同時実行数
            batch_size: バッチサイズ(進捗表示の単位)
        """
        super().__init__(max_concurrent)
        self.batch_size = batch_size
        self.completed_count = 0
        self.total_count = 0

    async def scrape_with_progress(self, urls: List[str]) -> List[Dict]:
        """
        進捗表示付きスクレイピング

        大量のURLを小分けして処理し、
        進捗状況をリアルタイムで表示
        """
        self.total_count = len(urls)
        self.completed_count = 0
        all_results = []

        # URLリストをバッチサイズで分割
        for i in range(0, len(urls), self.batch_size):
            # バッチの作成
            batch_urls = urls[i:i + self.batch_size]
            batch_num = i // self.batch_size + 1
            total_batches = (len(urls) + self.batch_size - 1) // self.batch_size

            print(f"バッチ {batch_num}/{total_batches} 処理中... ({len(batch_urls)}件)")

            # バッチの並行処理実行
            batch_results = await self.scrape_urls_concurrently(batch_urls)
            all_results.extend(batch_results)

            # 進捗状況の更新
            self.completed_count += len(batch_urls)
            progress = (self.completed_count / self.total_count) * 100

            print(f"進捗: {self.completed_count}/{self.total_count} ({progress:.1f}%)")

            # バッチ間の休憩(サーバー負荷軽減)
            if i + self.batch_size < len(urls):
                print("バッチ間休憩中...")
                await asyncio.sleep(2)

        return all_results

    def get_performance_stats(self, results: List[Dict], elapsed_time: float) -> Dict:
        """
        パフォーマンス統計の計算

        Args:
            results: スクレイピング結果
            elapsed_time: 実行時間

        Returns:
            パフォーマンス統計
        """
        successful = sum(1 for r in results if r.get('status') == 'success')
        failed = len(results) - successful

        return {
            'total_urls': len(results),
            'successful': successful,
            'failed': failed,
            'success_rate': (successful / len(results)) * 100 if results else 0,
            'total_time': elapsed_time,
            'avg_time_per_url': elapsed_time / len(results) if results else 0,
            'urls_per_second': len(results) / elapsed_time if elapsed_time > 0 else 0
        }

# 使用例
async def demo_progressive_scraping():
    """
    進捗表示付きスクレイピングのデモ
    """

    # 大量のテストURLを生成
    base_urls = [
        "https://httpbin.org/json",
        "https://httpbin.org/html",
        "https://httpbin.org/xml"
    ]

    # クエリパラメータを付けて重複を避ける
    test_urls = []
    for i in range(15):  # 15個のURLを生成
        base_url = base_urls[i % len(base_urls)]
        test_urls.append(f"{base_url}?test={i}")

    # 進捗表示付きスクレイパーを作成
    scraper = ProgressiveBatchScraper(max_concurrent=3, batch_size=5)

    # 実行時間計測
    start_time = time.time()
    results = await scraper.scrape_with_progress(test_urls)
    elapsed_time = time.time() - start_time

    # パフォーマンス統計の表示
    stats = scraper.get_performance_stats(results, elapsed_time)

    print(f"n=== パフォーマンス統計 ===")
    print(f"総URL数: {stats['total_urls']}")
    print(f"成功数: {stats['successful']}")
    print(f"失敗数: {stats['failed']}")
    print(f"成功率: {stats['success_rate']:.1f}%")
    print(f"総実行時間: {stats['total_time']:.2f}秒")
    print(f"平均処理時間: {stats['avg_time_per_url']:.2f}秒/URL")
    print(f"処理速度: {stats['urls_per_second']:.1f} URLs/秒")

    return results, stats

# 実行例
# progressive_results = await demo_progressive_scraping()

🔍 高度なデータ抽出テクニック

構造化データ抽出システム

実際のWebサイトから確実にデータを抽出するには、複数の抽出方法を組み合わせる必要があります。

class AdvancedDataExtractor:
"""
高度なデータ抽出クラス
複数の抽出手法を組み合わせて
堅牢なデータ抽出を実現
"""
def __init__(self):
"""
初期化メソッド
よく使用される抽出パターンを事前定義
"""
# 正規表現パターンライブラリ
self.extraction_patterns = {
'email': r'b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+.[A-Z|a-z]{2,}b',
'phone': r'(+?d{1,3}[-.]?)?(?d{3})?[-.]?d{3}[-.]?d{4}',
'price': r'[¥$€£]s*[d,]+.?d*|[d,]+.?d*s*[¥$€£]',
'date': r'd{4}[-/]d{1,2}[-/]d{1,2}|d{1,2}[-/]d{1,2}[-/]d{4}',
'url': r'https?://(?:[-w.])+(?:[:d]+)?(?:/(?:[w/_.])*(?:?(?:[w&=%.])*)?(?:#(?:[w.])*)?)?'
}
# よく使用されるCSSセレクターパターン
self.common_selectors = {
'title': ['h1', '.title', '.headline', '.article-title', '.post-title'],
'price': ['.price', '.amount', '.cost', '.value', '.price-current'],
'description': ['.description', '.summary', '.content', '.article-content'],
'rating': ['.rating', '.score', '.stars', '.review-score'],
'date': ['.date', '.timestamp', '.published', '.post-date']
}
async def extract_structured_data(self, html_content: str, target_schema: Dict) -> Dict:
"""
構造化データ抽出
定義されたスキーマに基づいて
HTMLから構造化データを抽出
Args:
html_content: HTML文字列
target_schema: 抽出スキーマの定義
Returns:
抽出されたデータの辞書
"""
soup = BeautifulSoup(html_content, 'html.parser')
extracted_data = {}
# スキーマの各フィールドを処理
for field, config in target_schema.items():
try:
# 抽出タイプに応じて処理を分岐
if config['type'] == 'css_selector':
# CSSセレクターによる抽出
elements = soup.select(config['selector'])
if config.get('multiple', False):
# 複数要素の場合はリストで返す
extracted_data[field] = [elem.get_text(strip=True) for elem in elements]
else:
# 単一要素の場合は最初のマッチを使用
extracted_data[field] = elements[0].get_text(strip=True) if elements else None
elif config['type'] == 'regex':
# 正規表現による抽出
import re
pattern = self.extraction_patterns.get(config['pattern'], config['pattern'])
matches = re.findall(pattern, html_content)
if config.get('multiple', False):
extracted_data[field] = matches
else:
extracted_data[field] = matches[0] if matches else None
elif config['type'] == 'json_ld':
# JSON-LD(構造化データ)による抽出
json_ld_scripts = soup.find_all('script', type='application/ld+json')
for script in json_ld_scripts:
try:
data = json.loads(script.string)
# ネストしたキーの取得をサポート
keys = config['key'].split('.')
value = data
for key in keys:
if isinstance(value, dict) and key in value:
value = value[key]
else:
value = None
break
if value is not None:
extracted_data[field] = value
break
except json.JSONDecodeError:
continue
elif config['type'] == 'auto_detect':
# 自動検出機能
extracted_data[field] = await self.auto_detect_field(soup, field)
elif config['type'] == 'attribute':
# HTML属性による抽出
elements = soup.select(config['selector'])
if elements:
attr_value = elements[0].get(config['attribute'])
extracted_data[field] = attr_value
else:
extracted_data[field] = None
except Exception as e:
logger.error(f"Error extracting {field}: {e}")
extracted_data[field] = None
return extracted_data
async def auto_detect_field(self, soup: BeautifulSoup, field_type: str) -> Optional[str]:
"""
フィールド自動検出
一般的なCSSセレクターのパターンを試行して
該当するフィールドを自動検出
Args:
soup: BeautifulSoupオブジェクト
field_type: フィールドタイプ(title, price等)
Returns:
検出されたテキスト(見つからない場合はNone)
"""
if field_type in self.common_selectors:
# 一般的なセレクターパターンを順次試行
for selector in self.common_selectors[field_type]:
element = soup.select_one(selector)
if element:
return element.get_text(strip=True)
return None
def extract_with_confidence(self, html_content: str, extraction_rules: List[Dict]) -> Dict:
"""
信頼度付きデータ抽出
複数の抽出方法を試行し、
最も信頼度の高い結果を選択
Args:
html_content: HTML文字列
extraction_rules: 抽出ルールのリスト
Returns:
信頼度付きの抽出結果
"""
soup = BeautifulSoup(html_content, 'html.parser')
results = {}
# 各フィールドの抽出ルールを処理
for rule in extraction_rules:
field_name = rule['field']
methods = rule['methods']  # 複数の抽出方法
field_results = []
# 各抽出方法を試行
for method in methods:
try:
if method['type'] == 'css':
elements = soup.select(method['selector'])
if elements:
value = elements[0].get_text(strip=True)
field_results.append({
'value': value,
'confidence': method.get('confidence', 0.5),
'method': 'css_selector'
})
elif method['type'] == 'regex':
import re
pattern = method['pattern']
matches = re.findall(pattern, html_content)
if matches:
field_results.append({
'value': matches[0],
'confidence': method.get('confidence', 0.7),
'method': 'regex'
})
elif method['type'] == 'xpath':
# XPathによる抽出(より複雑な選択が可能)
try:
from lxml import etree, html as lxml_html
tree = lxml_html.fromstring(html_content)
elements = tree.xpath(method['xpath'])
if elements:
if hasattr(elements[0], 'text'):
value = elements[0].text or ''
else:
value = str(elements[0])
field_results.append({
'value': value.strip(),
'confidence': method.get('confidence', 0.8),
'method': 'xpath'
})
except ImportError:
logger.warning("lxml not installed, skipping XPath method")
except Exception as e:
logger.debug(f"XPath extraction failed: {e}")
except Exception as e:
logger.debug(f"Extraction method failed: {e}")
continue
# 最高信頼度の結果を選択
if field_results:
best_result = max(field_results, key=lambda x: x['confidence'])
results[field_name] = {
'value': best_result['value'],
'confidence': best_result['confidence'],
'method': best_result['method'],
'alternatives': [r for r in field_results if r != best_result]
}
else:
results[field_name] = {
'value': None,
'confidence': 0.0,
'method': 'none',
'alternatives': []
}
return results
# 使用例の詳細解説
async def demo_advanced_extraction():
"""
高度な抽出のデモ
実際のECサイト風HTMLからの商品情報抽出を模擬
"""
# より複雑なサンプルHTML(ECサイト風)
sample_html = """
<html>
<head>
<title>商品詳細 - スマートフォン XYZ Pro</title>
<meta property="og:title" content="スマートフォン XYZ Pro">
<meta property="og:price:amount" content="128000">
</head>
<body>
<div class="product-container">
<div class="breadcrumb">
<a href="/">ホーム</a> > <a href="/electronics">家電</a> > スマートフォン
</div>
<div class="product-info">
<h1 class="product-title">スマートフォン XYZ Pro</h1>
<div class="price-section">
<span class="current-price">¥128,000</span>
<span class="original-price">¥148,000</span>
<span class="discount">13%OFF</span>
</div>
<div class="rating-section">
<div class="stars" data-rating="4.2">★★★★☆</div>
<span class="rating-text">(4.2/5)</span>
<a href="#reviews">178件のレビュー</a>
</div>
<div class="description">
<p>5G対応の最新フラッグシップモデル。高性能カメラとバッテリー長持ち。</p>
</div>
<div class="specifications">
<ul>
<li>ディスプレイ: 6.7インチ有機EL</li>
<li>ストレージ: 256GB</li>
<li>カメラ: トリプルレンズ</li>
<li>バッテリー: 4500mAh</li>
</ul>
</div>
<div class="availability">
<span class="stock-status in-stock">在庫あり</span>
<span class="shipping">明日お届け</span>
</div>
</div>
</div>
<!-- JSON-LD構造化データ -->
<script type="application/ld+json">
{
"@context": "http://schema.org/",
"@type": "Product",
"name": "スマートフォン XYZ Pro",
"image": "https://example.com/phone.jpg",
"description": "5G対応の最新フラッグシップモデル",
"sku": "XYZ-PRO-256",
"brand": {
"@type": "Brand",
"name": "XYZ Corporation"
},
"offers": {
"@type": "Offer",
"url": "https://example.com/phone-xyz-pro",
"priceCurrency": "JPY",
"price": "128000",
"itemCondition": "http://schema.org/NewCondition",
"availability": "http://schema.org/InStock"
},
"aggregateRating": {
"@type": "AggregateRating",
"ratingValue": "4.2",
"reviewCount": "178"
}
}
</script>
</body>
</html>
"""
extractor = AdvancedDataExtractor()
# 構造化抽出スキーマの定義
extraction_schema = {
'product_name': {
'type': 'css_selector',
'selector': '.product-title',
'multiple': False
},
'current_price': {
'type': 'css_selector',
'selector': '.current-price',
'multiple': False
},
'rating_value': {
'type': 'attribute',
'selector': '.stars',
'attribute': 'data-rating'
},
'specifications': {
'type': 'css_selector',
'selector': '.specifications li',
'multiple': True
},
'structured_data': {
'type': 'json_ld',
'key': 'offers.price'
},
'stock_status': {
'type': 'auto_detect'
}
}
# 信頼度付き抽出ルール
confidence_rules = [
{
'field': 'product_price',
'methods': [
{
'type': 'css',
'selector': '.current-price',
'confidence': 0.9
},
{
'type': 'regex',
'pattern': r'¥[d,]+',
'confidence': 0.7
},
{
'type': 'css',
'selector': '.price-section .price',
'confidence': 0.6
}
]
},
{
'field': 'product_rating',
'methods': [
{
'type': 'xpath',
'xpath': '//*[@data-rating]/@data-rating',
'confidence': 0.9
},
{
'type': 'regex',
'pattern': r'(d+.d+)/5',
'confidence': 0.8
},
{
'type': 'css',
'selector': '.rating-text',
'confidence': 0.7
}
]
}
]
# 構造化データ抽出の実行
print("=== 構造化データ抽出 ===")
structured_result = await extractor.extract_structured_data(sample_html, extraction_schema)
for key, value in structured_result.items():
print(f"{key}: {value}")
# 信頼度付き抽出の実行
print("n=== 信頼度付き抽出 ===")
confidence_result = extractor.extract_with_confidence(sample_html, confidence_rules)
for key, result in confidence_result.items():
print(f"{key}:")
print(f"  値: {result['value']}")
print(f"  信頼度: {result['confidence']:.1f}")
print(f"  抽出方法: {result['method']}")
if result['alternatives']:
print(f"  代替案数: {len(result['alternatives'])}")
return structured_result, confidence_result
# 実行例
# extraction_results = await demo_advanced_extraction()

🛡️ エラーハンドリングとリトライ機能

指数バックオフによるリトライシステム

class RobustScraper:
"""
堅牢なスクレイピングクラス
エラー処理とリトライ機能を備えた
プロダクション品質のスクレイパー
"""
def __init__(self, config: ScrapingConfig = None):
self.config = config or ScrapingConfig()
self.error_stats = {
'total_requests': 0,
'successful_requests': 0,
'failed_requests': 0,
'retry_attempts': 0,
'error_types': {}
}
async def scrape_with_retry(self, 
url: str, 
max_retries: int = 3,
backoff_factor: float = 2.0,
initial_delay: float = 1.0) -> Dict:
"""
リトライ機能付きスクレイピング
指数バックオフアルゴリズムを使用して
一時的なエラーに対して自動的にリトライ
Args:
url: スクレイピング対象URL
max_retries: 最大リトライ回数
backoff_factor: 遅延時間の倍率
initial_delay: 初期遅延時間(秒)
Returns:
スクレイピング結果
"""
self.error_stats['total_requests'] += 1
for attempt in range(max_retries + 1):
try:
async with ModernWebScraper(self.config) as scraper:
result = await scraper.scrape_page(url)
if result['status'] == 'success':
self.error_stats['successful_requests'] += 1
return result
else:
raise Exception(f"Scraping failed: {result.get('error', 'Unknown error')}")
except Exception as e:
error_type = type(e).__name__
self.error_stats['error_types'][error_type] = 
self.error_stats['error_types'].get(error_type, 0) + 1
# 最後の試行の場合はエラーを記録して終了
if attempt == max_retries:
self.error_stats['failed_requests'] += 1
logger.error(f"Final attempt failed for {url}: {e}")
return {
'url': url,
'error': str(e),
'attempts': attempt + 1,
'status': 'failed_after_retries',
'timestamp': datetime.now()
}
# リトライ時の遅延計算(指数バックオフ)
delay = initial_delay * (backoff_factor ** attempt)
# ジッター(ランダム要素)を追加して同時リトライを回避
import random
jitter = delay * random.uniform(0.1, 0.5)
total_delay = delay + jitter
self.error_stats['retry_attempts'] += 1
logger.warning(f"Attempt {attempt + 1} failed for {url}: {e}. Retrying in {total_delay:.1f}s...")
await asyncio.sleep(total_delay)
# ここに到達することはないが、安全のため
return {
'url': url,
'error': 'Unexpected retry loop exit',
'status': 'error',
'timestamp': datetime.now()
}
def get_error_statistics(self) -> Dict:
"""
エラー統計の取得
Returns:
エラー統計情報
"""
success_rate = 0
if self.error_stats['total_requests'] > 0:
success_rate = (self.error_stats['successful_requests'] / 
self.error_stats['total_requests']) * 100
return {
**self.error_stats,
'success_rate': success_rate,
'average_retries_per_request': (
self.error_stats['retry_attempts'] / 
max(self.error_stats['total_requests'], 1)
)
}
# 使用例
async def demo_robust_scraping():
"""
堅牢なスクレイピングのデモ
エラーが発生しやすいURLも含めて
リトライ機能をテスト
"""
scraper = RobustScraper()
# 成功・失敗が混在するテストURL
test_urls = [
"https://httpbin.org/status/200",    # 成功
"https://httpbin.org/status/500",    # サーバーエラー(リトライ対象)
"https://httpbin.org/delay/10",      # タイムアウト(リトライ対象)
"https://httpbin.org/json",          # 成功
"https://invalid-url-for-test.example",  # DNS解決エラー
]
results = []
for url in test_urls:
print(f"nスクレイピング開始: {url}")
result = await scraper.scrape_with_retry(
url,
max_retries=2,
backoff_factor=2.0,
initial_delay=1.0
)
status_icon = "✅" if result['status'] == 'success' else "❌"
print(f"{status_icon} 結果: {result['status']}")
results.append(result)
# エラー統計の表示
stats = scraper.get_error_statistics()
print(f"n=== エラー統計 ===")
print(f"総リクエスト数: {stats['total_requests']}")
print(f"成功数: {stats['successful_requests']}")
print(f"失敗数: {stats['failed_requests']}")
print(f"成功率: {stats['success_rate']:.1f}%")
print(f"総リトライ回数: {stats['retry_attempts']}")
print(f"平均リトライ数: {stats['average_retries_per_request']:.1f}")
print(f"エラータイプ: {stats['error_types']}")
return results, stats
# 実行例
# robust_results = await demo_robust_scraping()

📋 第2部のまとめ

学習した高度なテクニック

  1. 並行処理の実装: asyncio.gather()とセマフォによる制御
  2. バッチ処理: 大量URLの効率的な処理と進捗表示
  3. 高度なデータ抽出: 複数手法の組み合わせと信頼度評価
  4. エラーハンドリング: 指数バックオフとリトライ機構

パフォーマンス向上のポイント

  • 適切な同時実行数: サーバー負荷とスピードのバランス
  • バッチ分割: メモリ使用量の制御
  • エラー処理: 一部の失敗が全体に影響しない設計
  • 統計情報: パフォーマンスの可視化と改善点の特定

次のステップ

第3部では、以下の実践的なトピックについて解説します:

  • 検出回避技術: プロキシとUser-Agent管理
  • データ永続化: データベースとファイル出力
  • 監視システム: 実用的なスクレイピングプロジェクト
  • 法的・倫理的配慮: robots.txt対応とコンプライアンス

関連記事


免責事項: 本記事の内容は教育目的のものです。実際のスクレイピング実行時は、対象サイトの利用規約を遵守し、適切な許可を得てから実施してください。

コメントする