GetNavi 2025年9月・10月号[雑誌]
¥529 (2025-09-05 23:11 GMT +09:00 時点 - 詳細はこちら価格および発送可能時期は表示された日付/時刻の時点のものであり、変更される場合があります。本商品の購入においては、購入の時点で当該の Amazon サイトに表示されている価格および発送可能時期の情報が適用されます。)タイプc ケーブル 2M/2本セットPD対応 60W急速充電(USB A&USB C ケーブル) iPhone16 充電器 Type-c アイフォン16 15/いphone16 15 pro Max,iPad Pro/Air/mini6,MacBookPro,Samsung Galaxy S25/S24/S23/S22/S21/S20/S10,Pixel等Type-c機種対応-グレー
¥989 (2025-09-05 23:11 GMT +09:00 時点 - 詳細はこちら価格および発送可能時期は表示された日付/時刻の時点のものであり、変更される場合があります。本商品の購入においては、購入の時点で当該の Amazon サイトに表示されている価格および発送可能時期の情報が適用されます。)日経エンタテインメント! 2025年10月号増刊【表紙:BMSG】
¥890 (2025-09-05 23:11 GMT +09:00 時点 - 詳細はこちら価格および発送可能時期は表示された日付/時刻の時点のものであり、変更される場合があります。本商品の購入においては、購入の時点で当該の Amazon サイトに表示されている価格および発送可能時期の情報が適用されます。)デネット 動画 ダウンロード 保存 Android
(2025-09-05 23:11 GMT +09:00 時点 - 詳細はこちら価格および発送可能時期は表示された日付/時刻の時点のものであり、変更される場合があります。本商品の購入においては、購入の時点で当該の Amazon サイトに表示されている価格および発送可能時期の情報が適用されます。)目次
はじめに
第1部ではPlaywrightの基本的な使い方と環境構築について学びました。第2部では、並行処理によるパフォーマンス向上と高度なデータ抽出テクニックについて詳しく解説します。
大規模なスクレイピングプロジェクトでは、単一ページずつの処理では時間がかかりすぎるため、複数のページを同時に処理する並行処理が必須です。また、複雑なWebサイトから確実にデータを抽出するための高度な技術も重要になります。
⚡ 非同期処理によるパフォーマンス向上
並行処理の基本概念
従来の同期処理では、1つのページの処理が完了するまで次のページに進めません。しかし、非同期処理を使用することで、複数のページを同時並行で処理できます。
# 同期処理の例(遅い)
def sync_scraping(urls):
results = []
for url in urls: # 1つずつ順番に処理
result = scrape_page(url) # この処理中は他が待機
results.append(result)
return results
# 非同期処理の例(高速)
async def async_scraping(urls):
# 全てのタスクを同時に開始
tasks = [scrape_page(url) for url in urls]
# 全てのタスクの完了を待つ
results = await asyncio.gather(*tasks)
return results
制御された並行スクレイピング
無制限の並行処理はサーバーに負荷をかけるため、セマフォを使用して同時実行数を制限します。
class AsyncBatchScraper:
"""
バッチ処理による高速スクレイピング
セマフォを使用して同時実行数を制限
サーバー負荷とパフォーマンスのバランスを取る
"""
def __init__(self, max_concurrent: int = 5):
"""
初期化メソッド
Args:
max_concurrent: 最大同時実行数
- 多すぎるとサーバー負荷増大
- 少なすぎるとパフォーマンス低下
"""
self.max_concurrent = max_concurrent
# セマフォ: 同時実行数を制限するための仕組み
# 指定した数だけのスレッドが同時に実行可能
self.semaphore = asyncio.Semaphore(max_concurrent)
async def scrape_urls_concurrently(self, urls: List[str]) -> List[Dict]:
"""
複数URLを並行処理でスクレイピング
Args:
urls: スクレイピング対象のURLリスト
Returns:
スクレイピング結果のリスト
"""
async def scrape_single_url(url: str) -> Dict:
"""
単一URLのスクレイピング(セマフォ制御付き)
この内部関数は外部のsemaphoreを使用して
同時実行数を制限する
"""
async with self.semaphore: # セマフォによる実行数制限
# 設定オブジェクトの作成
config = ScrapingConfig(headless=True, timeout=15000)
try:
# ModernWebScraperを使用してページを取得
async with ModernWebScraper(config) as scraper:
result = await scraper.scrape_page(url)
# レート制限(1秒待機)
# 各リクエスト間に適切な間隔を設ける
await asyncio.sleep(1)
return result
except Exception as e:
# 個別のエラーは結果として記録
# 他のURLの処理は継続
return {
'url': url,
'error': str(e),
'status': 'error',
'timestamp': datetime.now()
}
# メイン処理の開始
logger.info(f"Starting concurrent scraping of {len(urls)} URLs")
# 全URLに対してタスクを作成
tasks = [scrape_single_url(url) for url in urls]
# asyncio.gather()で全タスクを並行実行
# return_exceptions=Trueで例外も結果として取得
results = await asyncio.gather(*tasks, return_exceptions=True)
# 例外処理とフォーマット統一
processed_results = []
for result in results:
if isinstance(result, Exception):
# 例外が発生した場合のフォーマット
processed_results.append({
'error': str(result),
'status': 'exception',
'timestamp': datetime.now()
})
else:
processed_results.append(result)
return processed_results
# 使用例の詳細解説
async def demo_concurrent_scraping():
"""
並行スクレイピングのデモ
同期処理と非同期処理のパフォーマンス比較も含む
"""
# テスト用URL(httpbin.orgのサービスを使用)
test_urls = [
"https://httpbin.org/delay/1", # 1秒遅延してレスポンス
"https://httpbin.org/delay/2", # 2秒遅延してレスポンス
"https://httpbin.org/json", # JSON形式レスポンス
"https://httpbin.org/html", # HTML形式レスポンス
"https://httpbin.org/xml" # XML形式レスポンス
]
# バッチスクレイパーのインスタンス作成
# max_concurrent=3で同時実行数を3に制限
scraper = AsyncBatchScraper(max_concurrent=3)
# 実行時間の計測開始
start_time = time.time()
# 並行処理実行
results = await scraper.scrape_urls_concurrently(test_urls)
# 実行時間の計算
elapsed_time = time.time() - start_time
# 結果の分析と表示
successful_count = sum(1 for r in results if r.get('status') == 'success')
print(f"並行処理完了: {elapsed_time:.2f}秒")
print(f"成功: {successful_count} / {len(results)}")
print(f"平均処理時間: {elapsed_time/len(test_urls):.2f}秒/URL")
# 個別結果の詳細表示
for i, result in enumerate(results):
status = "✅" if result.get('status') == 'success' else "❌"
url = result.get('url', 'Unknown')
print(f"{status} {i+1}. {url}")
return results
# 実行例
# results = await demo_concurrent_scraping()
進歩的なバッチ処理(進捗表示付き)
class ProgressiveBatchScraper(AsyncBatchScraper):
"""
進捗表示機能付きバッチスクレイパー
大量のURLを処理する際の進捗状況を可視化
"""
def __init__(self, max_concurrent: int = 5, batch_size: int = 10):
"""
初期化メソッド
Args:
max_concurrent: 最大同時実行数
batch_size: バッチサイズ(進捗表示の単位)
"""
super().__init__(max_concurrent)
self.batch_size = batch_size
self.completed_count = 0
self.total_count = 0
async def scrape_with_progress(self, urls: List[str]) -> List[Dict]:
"""
進捗表示付きスクレイピング
大量のURLを小分けして処理し、
進捗状況をリアルタイムで表示
"""
self.total_count = len(urls)
self.completed_count = 0
all_results = []
# URLリストをバッチサイズで分割
for i in range(0, len(urls), self.batch_size):
# バッチの作成
batch_urls = urls[i:i + self.batch_size]
batch_num = i // self.batch_size + 1
total_batches = (len(urls) + self.batch_size - 1) // self.batch_size
print(f"バッチ {batch_num}/{total_batches} 処理中... ({len(batch_urls)}件)")
# バッチの並行処理実行
batch_results = await self.scrape_urls_concurrently(batch_urls)
all_results.extend(batch_results)
# 進捗状況の更新
self.completed_count += len(batch_urls)
progress = (self.completed_count / self.total_count) * 100
print(f"進捗: {self.completed_count}/{self.total_count} ({progress:.1f}%)")
# バッチ間の休憩(サーバー負荷軽減)
if i + self.batch_size < len(urls):
print("バッチ間休憩中...")
await asyncio.sleep(2)
return all_results
def get_performance_stats(self, results: List[Dict], elapsed_time: float) -> Dict:
"""
パフォーマンス統計の計算
Args:
results: スクレイピング結果
elapsed_time: 実行時間
Returns:
パフォーマンス統計
"""
successful = sum(1 for r in results if r.get('status') == 'success')
failed = len(results) - successful
return {
'total_urls': len(results),
'successful': successful,
'failed': failed,
'success_rate': (successful / len(results)) * 100 if results else 0,
'total_time': elapsed_time,
'avg_time_per_url': elapsed_time / len(results) if results else 0,
'urls_per_second': len(results) / elapsed_time if elapsed_time > 0 else 0
}
# 使用例
async def demo_progressive_scraping():
"""
進捗表示付きスクレイピングのデモ
"""
# 大量のテストURLを生成
base_urls = [
"https://httpbin.org/json",
"https://httpbin.org/html",
"https://httpbin.org/xml"
]
# クエリパラメータを付けて重複を避ける
test_urls = []
for i in range(15): # 15個のURLを生成
base_url = base_urls[i % len(base_urls)]
test_urls.append(f"{base_url}?test={i}")
# 進捗表示付きスクレイパーを作成
scraper = ProgressiveBatchScraper(max_concurrent=3, batch_size=5)
# 実行時間計測
start_time = time.time()
results = await scraper.scrape_with_progress(test_urls)
elapsed_time = time.time() - start_time
# パフォーマンス統計の表示
stats = scraper.get_performance_stats(results, elapsed_time)
print(f"n=== パフォーマンス統計 ===")
print(f"総URL数: {stats['total_urls']}")
print(f"成功数: {stats['successful']}")
print(f"失敗数: {stats['failed']}")
print(f"成功率: {stats['success_rate']:.1f}%")
print(f"総実行時間: {stats['total_time']:.2f}秒")
print(f"平均処理時間: {stats['avg_time_per_url']:.2f}秒/URL")
print(f"処理速度: {stats['urls_per_second']:.1f} URLs/秒")
return results, stats
# 実行例
# progressive_results = await demo_progressive_scraping()
🔍 高度なデータ抽出テクニック
構造化データ抽出システム
実際のWebサイトから確実にデータを抽出するには、複数の抽出方法を組み合わせる必要があります。
class AdvancedDataExtractor:
"""
高度なデータ抽出クラス
複数の抽出手法を組み合わせて
堅牢なデータ抽出を実現
"""
def __init__(self):
"""
初期化メソッド
よく使用される抽出パターンを事前定義
"""
# 正規表現パターンライブラリ
self.extraction_patterns = {
'email': r'b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+.[A-Z|a-z]{2,}b',
'phone': r'(+?d{1,3}[-.]?)?(?d{3})?[-.]?d{3}[-.]?d{4}',
'price': r'[¥$€£]s*[d,]+.?d*|[d,]+.?d*s*[¥$€£]',
'date': r'd{4}[-/]d{1,2}[-/]d{1,2}|d{1,2}[-/]d{1,2}[-/]d{4}',
'url': r'https?://(?:[-w.])+(?:[:d]+)?(?:/(?:[w/_.])*(?:?(?:[w&=%.])*)?(?:#(?:[w.])*)?)?'
}
# よく使用されるCSSセレクターパターン
self.common_selectors = {
'title': ['h1', '.title', '.headline', '.article-title', '.post-title'],
'price': ['.price', '.amount', '.cost', '.value', '.price-current'],
'description': ['.description', '.summary', '.content', '.article-content'],
'rating': ['.rating', '.score', '.stars', '.review-score'],
'date': ['.date', '.timestamp', '.published', '.post-date']
}
async def extract_structured_data(self, html_content: str, target_schema: Dict) -> Dict:
"""
構造化データ抽出
定義されたスキーマに基づいて
HTMLから構造化データを抽出
Args:
html_content: HTML文字列
target_schema: 抽出スキーマの定義
Returns:
抽出されたデータの辞書
"""
soup = BeautifulSoup(html_content, 'html.parser')
extracted_data = {}
# スキーマの各フィールドを処理
for field, config in target_schema.items():
try:
# 抽出タイプに応じて処理を分岐
if config['type'] == 'css_selector':
# CSSセレクターによる抽出
elements = soup.select(config['selector'])
if config.get('multiple', False):
# 複数要素の場合はリストで返す
extracted_data[field] = [elem.get_text(strip=True) for elem in elements]
else:
# 単一要素の場合は最初のマッチを使用
extracted_data[field] = elements[0].get_text(strip=True) if elements else None
elif config['type'] == 'regex':
# 正規表現による抽出
import re
pattern = self.extraction_patterns.get(config['pattern'], config['pattern'])
matches = re.findall(pattern, html_content)
if config.get('multiple', False):
extracted_data[field] = matches
else:
extracted_data[field] = matches[0] if matches else None
elif config['type'] == 'json_ld':
# JSON-LD(構造化データ)による抽出
json_ld_scripts = soup.find_all('script', type='application/ld+json')
for script in json_ld_scripts:
try:
data = json.loads(script.string)
# ネストしたキーの取得をサポート
keys = config['key'].split('.')
value = data
for key in keys:
if isinstance(value, dict) and key in value:
value = value[key]
else:
value = None
break
if value is not None:
extracted_data[field] = value
break
except json.JSONDecodeError:
continue
elif config['type'] == 'auto_detect':
# 自動検出機能
extracted_data[field] = await self.auto_detect_field(soup, field)
elif config['type'] == 'attribute':
# HTML属性による抽出
elements = soup.select(config['selector'])
if elements:
attr_value = elements[0].get(config['attribute'])
extracted_data[field] = attr_value
else:
extracted_data[field] = None
except Exception as e:
logger.error(f"Error extracting {field}: {e}")
extracted_data[field] = None
return extracted_data
async def auto_detect_field(self, soup: BeautifulSoup, field_type: str) -> Optional[str]:
"""
フィールド自動検出
一般的なCSSセレクターのパターンを試行して
該当するフィールドを自動検出
Args:
soup: BeautifulSoupオブジェクト
field_type: フィールドタイプ(title, price等)
Returns:
検出されたテキスト(見つからない場合はNone)
"""
if field_type in self.common_selectors:
# 一般的なセレクターパターンを順次試行
for selector in self.common_selectors[field_type]:
element = soup.select_one(selector)
if element:
return element.get_text(strip=True)
return None
def extract_with_confidence(self, html_content: str, extraction_rules: List[Dict]) -> Dict:
"""
信頼度付きデータ抽出
複数の抽出方法を試行し、
最も信頼度の高い結果を選択
Args:
html_content: HTML文字列
extraction_rules: 抽出ルールのリスト
Returns:
信頼度付きの抽出結果
"""
soup = BeautifulSoup(html_content, 'html.parser')
results = {}
# 各フィールドの抽出ルールを処理
for rule in extraction_rules:
field_name = rule['field']
methods = rule['methods'] # 複数の抽出方法
field_results = []
# 各抽出方法を試行
for method in methods:
try:
if method['type'] == 'css':
elements = soup.select(method['selector'])
if elements:
value = elements[0].get_text(strip=True)
field_results.append({
'value': value,
'confidence': method.get('confidence', 0.5),
'method': 'css_selector'
})
elif method['type'] == 'regex':
import re
pattern = method['pattern']
matches = re.findall(pattern, html_content)
if matches:
field_results.append({
'value': matches[0],
'confidence': method.get('confidence', 0.7),
'method': 'regex'
})
elif method['type'] == 'xpath':
# XPathによる抽出(より複雑な選択が可能)
try:
from lxml import etree, html as lxml_html
tree = lxml_html.fromstring(html_content)
elements = tree.xpath(method['xpath'])
if elements:
if hasattr(elements[0], 'text'):
value = elements[0].text or ''
else:
value = str(elements[0])
field_results.append({
'value': value.strip(),
'confidence': method.get('confidence', 0.8),
'method': 'xpath'
})
except ImportError:
logger.warning("lxml not installed, skipping XPath method")
except Exception as e:
logger.debug(f"XPath extraction failed: {e}")
except Exception as e:
logger.debug(f"Extraction method failed: {e}")
continue
# 最高信頼度の結果を選択
if field_results:
best_result = max(field_results, key=lambda x: x['confidence'])
results[field_name] = {
'value': best_result['value'],
'confidence': best_result['confidence'],
'method': best_result['method'],
'alternatives': [r for r in field_results if r != best_result]
}
else:
results[field_name] = {
'value': None,
'confidence': 0.0,
'method': 'none',
'alternatives': []
}
return results
# 使用例の詳細解説
async def demo_advanced_extraction():
"""
高度な抽出のデモ
実際のECサイト風HTMLからの商品情報抽出を模擬
"""
# より複雑なサンプルHTML(ECサイト風)
sample_html = """
<html>
<head>
<title>商品詳細 - スマートフォン XYZ Pro</title>
<meta property="og:title" content="スマートフォン XYZ Pro">
<meta property="og:price:amount" content="128000">
</head>
<body>
<div class="product-container">
<div class="breadcrumb">
<a href="/">ホーム</a> > <a href="/electronics">家電</a> > スマートフォン
</div>
<div class="product-info">
<h1 class="product-title">スマートフォン XYZ Pro</h1>
<div class="price-section">
<span class="current-price">¥128,000</span>
<span class="original-price">¥148,000</span>
<span class="discount">13%OFF</span>
</div>
<div class="rating-section">
<div class="stars" data-rating="4.2">★★★★☆</div>
<span class="rating-text">(4.2/5)</span>
<a href="#reviews">178件のレビュー</a>
</div>
<div class="description">
<p>5G対応の最新フラッグシップモデル。高性能カメラとバッテリー長持ち。</p>
</div>
<div class="specifications">
<ul>
<li>ディスプレイ: 6.7インチ有機EL</li>
<li>ストレージ: 256GB</li>
<li>カメラ: トリプルレンズ</li>
<li>バッテリー: 4500mAh</li>
</ul>
</div>
<div class="availability">
<span class="stock-status in-stock">在庫あり</span>
<span class="shipping">明日お届け</span>
</div>
</div>
</div>
<!-- JSON-LD構造化データ -->
<script type="application/ld+json">
{
"@context": "http://schema.org/",
"@type": "Product",
"name": "スマートフォン XYZ Pro",
"image": "https://example.com/phone.jpg",
"description": "5G対応の最新フラッグシップモデル",
"sku": "XYZ-PRO-256",
"brand": {
"@type": "Brand",
"name": "XYZ Corporation"
},
"offers": {
"@type": "Offer",
"url": "https://example.com/phone-xyz-pro",
"priceCurrency": "JPY",
"price": "128000",
"itemCondition": "http://schema.org/NewCondition",
"availability": "http://schema.org/InStock"
},
"aggregateRating": {
"@type": "AggregateRating",
"ratingValue": "4.2",
"reviewCount": "178"
}
}
</script>
</body>
</html>
"""
extractor = AdvancedDataExtractor()
# 構造化抽出スキーマの定義
extraction_schema = {
'product_name': {
'type': 'css_selector',
'selector': '.product-title',
'multiple': False
},
'current_price': {
'type': 'css_selector',
'selector': '.current-price',
'multiple': False
},
'rating_value': {
'type': 'attribute',
'selector': '.stars',
'attribute': 'data-rating'
},
'specifications': {
'type': 'css_selector',
'selector': '.specifications li',
'multiple': True
},
'structured_data': {
'type': 'json_ld',
'key': 'offers.price'
},
'stock_status': {
'type': 'auto_detect'
}
}
# 信頼度付き抽出ルール
confidence_rules = [
{
'field': 'product_price',
'methods': [
{
'type': 'css',
'selector': '.current-price',
'confidence': 0.9
},
{
'type': 'regex',
'pattern': r'¥[d,]+',
'confidence': 0.7
},
{
'type': 'css',
'selector': '.price-section .price',
'confidence': 0.6
}
]
},
{
'field': 'product_rating',
'methods': [
{
'type': 'xpath',
'xpath': '//*[@data-rating]/@data-rating',
'confidence': 0.9
},
{
'type': 'regex',
'pattern': r'(d+.d+)/5',
'confidence': 0.8
},
{
'type': 'css',
'selector': '.rating-text',
'confidence': 0.7
}
]
}
]
# 構造化データ抽出の実行
print("=== 構造化データ抽出 ===")
structured_result = await extractor.extract_structured_data(sample_html, extraction_schema)
for key, value in structured_result.items():
print(f"{key}: {value}")
# 信頼度付き抽出の実行
print("n=== 信頼度付き抽出 ===")
confidence_result = extractor.extract_with_confidence(sample_html, confidence_rules)
for key, result in confidence_result.items():
print(f"{key}:")
print(f" 値: {result['value']}")
print(f" 信頼度: {result['confidence']:.1f}")
print(f" 抽出方法: {result['method']}")
if result['alternatives']:
print(f" 代替案数: {len(result['alternatives'])}")
return structured_result, confidence_result
# 実行例
# extraction_results = await demo_advanced_extraction()
🛡️ エラーハンドリングとリトライ機能
指数バックオフによるリトライシステム
class RobustScraper:
"""
堅牢なスクレイピングクラス
エラー処理とリトライ機能を備えた
プロダクション品質のスクレイパー
"""
def __init__(self, config: ScrapingConfig = None):
self.config = config or ScrapingConfig()
self.error_stats = {
'total_requests': 0,
'successful_requests': 0,
'failed_requests': 0,
'retry_attempts': 0,
'error_types': {}
}
async def scrape_with_retry(self,
url: str,
max_retries: int = 3,
backoff_factor: float = 2.0,
initial_delay: float = 1.0) -> Dict:
"""
リトライ機能付きスクレイピング
指数バックオフアルゴリズムを使用して
一時的なエラーに対して自動的にリトライ
Args:
url: スクレイピング対象URL
max_retries: 最大リトライ回数
backoff_factor: 遅延時間の倍率
initial_delay: 初期遅延時間(秒)
Returns:
スクレイピング結果
"""
self.error_stats['total_requests'] += 1
for attempt in range(max_retries + 1):
try:
async with ModernWebScraper(self.config) as scraper:
result = await scraper.scrape_page(url)
if result['status'] == 'success':
self.error_stats['successful_requests'] += 1
return result
else:
raise Exception(f"Scraping failed: {result.get('error', 'Unknown error')}")
except Exception as e:
error_type = type(e).__name__
self.error_stats['error_types'][error_type] =
self.error_stats['error_types'].get(error_type, 0) + 1
# 最後の試行の場合はエラーを記録して終了
if attempt == max_retries:
self.error_stats['failed_requests'] += 1
logger.error(f"Final attempt failed for {url}: {e}")
return {
'url': url,
'error': str(e),
'attempts': attempt + 1,
'status': 'failed_after_retries',
'timestamp': datetime.now()
}
# リトライ時の遅延計算(指数バックオフ)
delay = initial_delay * (backoff_factor ** attempt)
# ジッター(ランダム要素)を追加して同時リトライを回避
import random
jitter = delay * random.uniform(0.1, 0.5)
total_delay = delay + jitter
self.error_stats['retry_attempts'] += 1
logger.warning(f"Attempt {attempt + 1} failed for {url}: {e}. Retrying in {total_delay:.1f}s...")
await asyncio.sleep(total_delay)
# ここに到達することはないが、安全のため
return {
'url': url,
'error': 'Unexpected retry loop exit',
'status': 'error',
'timestamp': datetime.now()
}
def get_error_statistics(self) -> Dict:
"""
エラー統計の取得
Returns:
エラー統計情報
"""
success_rate = 0
if self.error_stats['total_requests'] > 0:
success_rate = (self.error_stats['successful_requests'] /
self.error_stats['total_requests']) * 100
return {
**self.error_stats,
'success_rate': success_rate,
'average_retries_per_request': (
self.error_stats['retry_attempts'] /
max(self.error_stats['total_requests'], 1)
)
}
# 使用例
async def demo_robust_scraping():
"""
堅牢なスクレイピングのデモ
エラーが発生しやすいURLも含めて
リトライ機能をテスト
"""
scraper = RobustScraper()
# 成功・失敗が混在するテストURL
test_urls = [
"https://httpbin.org/status/200", # 成功
"https://httpbin.org/status/500", # サーバーエラー(リトライ対象)
"https://httpbin.org/delay/10", # タイムアウト(リトライ対象)
"https://httpbin.org/json", # 成功
"https://invalid-url-for-test.example", # DNS解決エラー
]
results = []
for url in test_urls:
print(f"nスクレイピング開始: {url}")
result = await scraper.scrape_with_retry(
url,
max_retries=2,
backoff_factor=2.0,
initial_delay=1.0
)
status_icon = "✅" if result['status'] == 'success' else "❌"
print(f"{status_icon} 結果: {result['status']}")
results.append(result)
# エラー統計の表示
stats = scraper.get_error_statistics()
print(f"n=== エラー統計 ===")
print(f"総リクエスト数: {stats['total_requests']}")
print(f"成功数: {stats['successful_requests']}")
print(f"失敗数: {stats['failed_requests']}")
print(f"成功率: {stats['success_rate']:.1f}%")
print(f"総リトライ回数: {stats['retry_attempts']}")
print(f"平均リトライ数: {stats['average_retries_per_request']:.1f}")
print(f"エラータイプ: {stats['error_types']}")
return results, stats
# 実行例
# robust_results = await demo_robust_scraping()
📋 第2部のまとめ
学習した高度なテクニック
- 並行処理の実装:
asyncio.gather()
とセマフォによる制御 - バッチ処理: 大量URLの効率的な処理と進捗表示
- 高度なデータ抽出: 複数手法の組み合わせと信頼度評価
- エラーハンドリング: 指数バックオフとリトライ機構
パフォーマンス向上のポイント
- 適切な同時実行数: サーバー負荷とスピードのバランス
- バッチ分割: メモリ使用量の制御
- エラー処理: 一部の失敗が全体に影響しない設計
- 統計情報: パフォーマンスの可視化と改善点の特定
次のステップ
第3部では、以下の実践的なトピックについて解説します:
- 検出回避技術: プロキシとUser-Agent管理
- データ永続化: データベースとファイル出力
- 監視システム: 実用的なスクレイピングプロジェクト
- 法的・倫理的配慮: robots.txt対応とコンプライアンス
関連記事
- Python Webスクレイピング 2025 第1部:Playwright入門と環境構築
- Python Webスクレイピング 2025 第3部:実践プロジェクトとデータ管理(近日公開)
- 高度な株価スクレイピング 第2部:機械学習による価格予測システム
免責事項: 本記事の内容は教育目的のものです。実際のスクレイピング実行時は、対象サイトの利用規約を遵守し、適切な許可を得てから実施してください。