Python Webスクレイピング 2025 第3部:実践プロジェクトとデータ管理

はじめに

シリーズ最終回となる第3部では、実際のプロジェクトで使える実践的なスクレイピングシステムの構築について解説します。検出回避技術、データの永続化、監視システムの構築など、プロダクション環境で必要となる高度な技術を学びます。

また、法的・倫理的配慮についても詳しく説明し、責任あるスクレイピングの実現方法を示します。

🛡️ 検出回避技術とセキュリティ対策

プロキシローテーションとUser-Agent管理

大規模なスクレイピングでは、IPアドレスの制限やbot検出を回避するための技術が重要です。

class AntiDetectionScraper(ModernWebScraper):
"""
検出回避機能付きスクレーパー
プロキシローテーション、User-Agent管理、
人間らしい行動パターンの模倣を実装
"""
def __init__(self, config: ScrapingConfig = None):
super().__init__(config)
# 複数のUser-Agentを準備(実際のブラウザからの抜粋)
self.user_agents = [
# Chrome on Windows
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
# Chrome on macOS
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
# Firefox on Windows
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0",
# Safari on macOS
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.1 Safari/605.1.15",
# Edge on Windows
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 Edg/120.0.0.0"
]
# プロキシリスト(実際の使用時は有効なプロキシを設定)
self.proxy_list = [
# 例: {"host": "proxy1.example.com", "port": 8080, "username": "user1", "password": "pass1"}
]
# リクエスト間隔の設定
self.min_delay = 2.0  # 最小遅延(秒)
self.max_delay = 5.0  # 最大遅延(秒)
# セッション管理
self.current_proxy_index = 0
self.requests_with_current_proxy = 0
self.max_requests_per_proxy = 50  # プロキシあたりの最大リクエスト数
def get_random_user_agent(self) -> str:
"""
ランダムなUser-Agentを取得
Returns:
ランダムに選択されたUser-Agent文字列
"""
import random
return random.choice(self.user_agents)
def get_current_proxy(self) -> Optional[Dict[str, str]]:
"""
現在使用するプロキシを取得
ローテーション機能付き
一定回数使用後に次のプロキシに切り替え
Returns:
プロキシ設定の辞書(None = プロキシなし)
"""
if not self.proxy_list:
return None
# リクエスト数制限チェック
if self.requests_with_current_proxy >= self.max_requests_per_proxy:
# 次のプロキシに切り替え
self.current_proxy_index = (self.current_proxy_index + 1) % len(self.proxy_list)
self.requests_with_current_proxy = 0
logger.info(f"Switching to proxy {self.current_proxy_index + 1}/{len(self.proxy_list)}")
proxy = self.proxy_list[self.current_proxy_index]
self.requests_with_current_proxy += 1
return {
'server': f"http://{proxy['host']}:{proxy['port']}",
'username': proxy.get('username'),
'password': proxy.get('password')
}
async def stealth_scrape(self, url: str, extra_stealth: bool = True) -> Dict:
"""
ステルスモードでのスクレイピング
人間らしい行動パターンを模倣して
bot検出を回避
Args:
url: スクレイピング対象URL
extra_stealth: 追加のステルス機能を有効にするか
Returns:
スクレイピング結果
"""
# ランダム遅延(人間らしいアクセス間隔)
import random
delay = random.uniform(self.min_delay, self.max_delay)
await asyncio.sleep(delay)
# 動的にUser-Agentを設定
user_agent = self.get_random_user_agent()
proxy = self.get_current_proxy()
# 設定を更新
config = ScrapingConfig(
headless=True,
timeout=30000,
user_agent=user_agent,
proxy=proxy
)
async with ModernWebScraper(config) as scraper:
page = await scraper.context.new_page()
try:
if extra_stealth:
# ブラウザ検出対策のJavaScriptを実行
await page.add_init_script("""
// webdriverプロパティを隠す
Object.defineProperty(navigator, 'webdriver', {
get: () => false,
});
// Chrome runtime 偽装
window.chrome = {
runtime: {},
loadTimes: function() {},
csi: function() {},
app: {}
};
// プラグイン情報の偽装
Object.defineProperty(navigator, 'plugins', {
get: () => [1, 2, 3, 4, 5],
});
// 言語設定の偽装
Object.defineProperty(navigator, 'languages', {
get: () => ['ja-JP', 'ja', 'en-US', 'en'],
});
// WebGL情報の偽装
const getParameter = WebGLRenderingContext.getParameter;
WebGLRenderingContext.prototype.getParameter = function(parameter) {
if (parameter === 37445) {
return 'Intel Open Source Technology Center';
}
if (parameter === 37446) {
return 'Mesa DRI Intel(R) Ivybridge Mobile ';
}
return getParameter(parameter);
};
""")
# ページ読み込み
logger.info(f"Stealth loading: {url}")
await page.goto(url, wait_until="domcontentloaded")
if extra_stealth:
# 人間らしいスクロール動作
await self.simulate_human_behavior(page)
# データ取得
title = await page.title()
html_content = await page.content()
# ブラウザ情報の取得(検証用)
browser_info = await page.evaluate("""
() => {
return {
userAgent: navigator.userAgent,
platform: navigator.platform,
language: navigator.language,
cookieEnabled: navigator.cookieEnabled,
onLine: navigator.onLine,
webdriver: navigator.webdriver
}
}
""")
return {
'url': url,
'title': title,
'html_content': html_content,
'browser_info': browser_info,
'proxy_used': proxy is not None,
'user_agent_used': user_agent,
'stealth_mode': True,
'timestamp': datetime.now(),
'status': 'success'
}
except Exception as e:
logger.error(f"Stealth scraping error for {url}: {e}")
return {
'url': url,
'error': str(e),
'stealth_mode': True,
'timestamp': datetime.now(),
'status': 'error'
}
finally:
await page.close()
async def simulate_human_behavior(self, page):
"""
人間らしい行動の模擬
ランダムなスクロール、マウス移動、
一時停止などを実行
Args:
page: Playwrightページオブジェクト
"""
import random
# ページの高さを取得
page_height = await page.evaluate("document.body.scrollHeight")
viewport_height = await page.evaluate("window.innerHeight")
if page_height > viewport_height:
# スクロール動作の模擬
scroll_steps = random.randint(3, 7)
scroll_distance = page_height // scroll_steps
for i in range(scroll_steps):
# ランダムな距離をスクロール
scroll_to = min((i + 1) * scroll_distance + random.randint(-50, 50), page_height)
await page.evaluate(f"window.scrollTo(0, {scroll_to})")
# 人間らしい停止時間
pause_time = random.uniform(0.5, 2.0)
await asyncio.sleep(pause_time)
# ランダムなマウス移動(オプション)
try:
x = random.randint(100, 800)
y = random.randint(100, 600)
await page.mouse.move(x, y)
except Exception:
pass  # マウス操作が失敗しても続行
# 最終的な待機
await asyncio.sleep(random.uniform(1.0, 3.0))
# 使用例
async def demo_anti_detection():
"""
検出回避スクレイピングのデモ
User-Agent検証サイトを使って
ステルス機能をテスト
"""
# 検出回避スクレイパーを初期化
scraper = AntiDetectionScraper()
# テスト用URL(User-Agentやプロキシ情報を確認できるサービス)
test_urls = [
"https://httpbin.org/user-agent",     # User-Agent確認
"https://httpbin.org/headers",        # ヘッダー情報確認
"https://httpbin.org/ip"              # IPアドレス確認
]
results = []
for url in test_urls:
print(f"nステルススクレイピング: {url}")
result = await scraper.stealth_scrape(url, extra_stealth=True)
if result['status'] == 'success':
print(f"✅ 成功: {result['title'][:50]}...")
print(f"   User-Agent: {result['user_agent_used'][:60]}...")
print(f"   プロキシ使用: {result['proxy_used']}")
print(f"   ブラウザ検出: webdriver={result['browser_info'].get('webdriver', 'undefined')}")
else:
print(f"❌ エラー: {result['error']}")
results.append(result)
return results
# 実行例
# stealth_results = await demo_anti_detection()

📊 データ管理と永続化

高度なデータパイプライン

スクレイピングしたデータを効率的に管理・保存するためのシステムを構築します。

class DataPipeline:
"""
スクレイピングデータパイプライン
データの収集、変換、保存、エクスポートを
一元的に管理するシステム
"""
def __init__(self, output_dir: str = "scraped_data", db_type: str = "sqlite"):
"""
初期化メソッド
Args:
output_dir: 出力ディレクトリ
db_type: データベースタイプ(sqlite, postgresql等)
"""
self.output_dir = Path(output_dir)
self.output_dir.mkdir(exist_ok=True)
self.db_type = db_type
# SQLiteデータベースの設定
if db_type == "sqlite":
self.db_path = self.output_dir / "scraping_data.db"
self.init_sqlite_database()
# データ変換設定
self.data_transformers = {}
# メタデータ管理
self.session_metadata = {
'created_at': datetime.now(),
'total_pages_processed': 0,
'total_data_points': 0,
'data_sources': set()
}
def init_sqlite_database(self):
"""
SQLiteデータベースの初期化
必要なテーブルを作成し、
インデックスを設定
"""
import sqlite3
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# メインのページデータテーブル
cursor.execute("""
CREATE TABLE IF NOT EXISTS scraped_pages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
url TEXT NOT NULL,
title TEXT,
content_hash TEXT UNIQUE,
scraped_at TIMESTAMP,
status TEXT,
source_domain TEXT,
metadata TEXT,
raw_html TEXT
)
""")
# 抽出されたデータテーブル
cursor.execute("""
CREATE TABLE IF NOT EXISTS extracted_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
page_id INTEGER,
data_type TEXT,
field_name TEXT,
field_value TEXT,
confidence REAL,
extraction_method TEXT,
extracted_at TIMESTAMP,
FOREIGN KEY (page_id) REFERENCES scraped_pages (id)
)
""")
# スクレイピングセッションテーブル
cursor.execute("""
CREATE TABLE IF NOT EXISTS scraping_sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_name TEXT,
start_time TIMESTAMP,
end_time TIMESTAMP,
total_urls INTEGER,
successful_urls INTEGER,
failed_urls INTEGER,
configuration TEXT
)
""")
# パフォーマンス最適化のためのインデックス
cursor.execute("CREATE INDEX IF NOT EXISTS idx_url ON scraped_pages(url)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_content_hash ON scraped_pages(content_hash)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_scraped_at ON scraped_pages(scraped_at)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_source_domain ON scraped_pages(source_domain)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_data_type ON extracted_data(data_type)")
conn.commit()
conn.close()
logger.info("Database initialized successfully")
def save_scraped_data(self, scraped_results: List[Dict], session_name: str = None) -> int:
"""
スクレイピング結果をデータベースに保存
Args:
scraped_results: スクレイピング結果のリスト
session_name: セッション名(統計管理用)
Returns:
保存された件数
"""
import sqlite3
import hashlib
from urllib.parse import urlparse
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
saved_count = 0
for result in scraped_results:
if result.get('status') != 'success':
continue
try:
# コンテンツハッシュ生成(重複検出用)
html_content = result.get('html_content', '')
content_hash = hashlib.md5(html_content.encode('utf-8')).hexdigest()
# ドメイン抽出
url = result.get('url', '')
domain = urlparse(url).netloc
# 重複チェック
cursor.execute(
"SELECT id FROM scraped_pages WHERE content_hash = ?",
(content_hash,)
)
existing = cursor.fetchone()
if not existing:
# メタデータの準備
metadata = {
'js_data': result.get('js_data', {}),
'browser_info': result.get('browser_info', {}),
'extraction_timestamp': result.get('timestamp', datetime.now()).isoformat(),
'session_name': session_name
}
# ページデータの挿入
cursor.execute("""
INSERT INTO scraped_pages 
(url, title, content_hash, scraped_at, status, source_domain, metadata, raw_html)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (
url,
result.get('title'),
content_hash,
result.get('timestamp', datetime.now()),
result.get('status'),
domain,
json.dumps(metadata, ensure_ascii=False),
html_content
))
saved_count += 1
# メタデータ更新
self.session_metadata['total_pages_processed'] += 1
self.session_metadata['data_sources'].add(domain)
logger.debug(f"Saved new data for: {url}")
else:
logger.debug(f"Duplicate content skipped: {url}")
except Exception as e:
logger.error(f"Error saving data for {result.get('url', 'unknown')}: {e}")
conn.commit()
conn.close()
logger.info(f"Saved {saved_count} new pages to database")
return saved_count
def save_extracted_data(self, page_url: str, extracted_data: Dict, extraction_method: str = "unknown"):
"""
抽出されたデータを保存
Args:
page_url: ページURL
extracted_data: 抽出されたデータ
extraction_method: 抽出方法
"""
import sqlite3
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
# ページIDを取得
cursor.execute("SELECT id FROM scraped_pages WHERE url = ? ORDER BY scraped_at DESC LIMIT 1", (page_url,))
page_result = cursor.fetchone()
if not page_result:
logger.warning(f"Page not found in database: {page_url}")
return
page_id = page_result[0]
# 抽出データの保存
data_count = 0
for field_name, field_value in extracted_data.items():
if field_value is not None:
# 信頼度情報の処理
confidence = 1.0
value = field_value
if isinstance(field_value, dict) and 'value' in field_value:
# 信頼度付きデータの場合
confidence = field_value.get('confidence', 1.0)
value = field_value['value']
extraction_method = field_value.get('method', extraction_method)
# リストの場合はJSONで保存
if isinstance(value, (list, dict)):
value = json.dumps(value, ensure_ascii=False)
cursor.execute("""
INSERT INTO extracted_data 
(page_id, data_type, field_name, field_value, confidence, extraction_method, extracted_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
page_id,
type(field_value).__name__,
field_name,
str(value),
confidence,
extraction_method,
datetime.now()
))
data_count += 1
self.session_metadata['total_data_points'] += data_count
conn.commit()
logger.debug(f"Saved {data_count} extracted data points for {page_url}")
except Exception as e:
logger.error(f"Error saving extracted data: {e}")
finally:
conn.close()
def export_to_formats(self, query: str = None, export_formats: List[str] = None) -> Dict[str, str]:
"""
複数形式でのデータエクスポート
Args:
query: カスタムSQLクエリ
export_formats: エクスポート形式のリスト
Returns:
エクスポートされたファイルのパス
"""
import sqlite3
if export_formats is None:
export_formats = ['csv', 'json', 'excel']
conn = sqlite3.connect(self.db_path)
# データ取得
if query:
df = pd.read_sql_query(query, conn)
else:
# デフォルトクエリ(結合データ)
default_query = """
SELECT 
p.url,
p.title,
p.source_domain,
p.scraped_at,
e.field_name,
e.field_value,
e.confidence,
e.extraction_method
FROM scraped_pages p
LEFT JOIN extracted_data e ON p.id = e.page_id
ORDER BY p.scraped_at DESC
"""
df = pd.read_sql_query(default_query, conn)
conn.close()
if df.empty:
return {"error": "No data to export"}
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
export_files = {}
# CSV形式
if 'csv' in export_formats:
csv_file = self.output_dir / f"export_{timestamp}.csv"
df.to_csv(csv_file, index=False, encoding='utf-8-sig')
export_files['csv'] = str(csv_file)
# JSON形式
if 'json' in export_formats:
json_file = self.output_dir / f"export_{timestamp}.json"
df.to_json(json_file, orient='records', ensure_ascii=False, indent=2, date_format='iso')
export_files['json'] = str(json_file)
# Excel形式(オプション)
if 'excel' in export_formats:
try:
excel_file = self.output_dir / f"export_{timestamp}.xlsx"
with pd.ExcelWriter(excel_file, engine='openpyxl') as writer:
# メインデータ
df.to_excel(writer, sheet_name='ScrapedData', index=False)
# 統計情報
stats_df = self.get_statistics_dataframe()
stats_df.to_excel(writer, sheet_name='Statistics', index=False)
export_files['excel'] = str(excel_file)
except ImportError:
logger.warning("openpyxl not installed, skipping Excel export")
logger.info(f"Data exported to {len(export_files)} formats")
return export_files
def get_statistics_dataframe(self) -> pd.DataFrame:
"""
統計情報をDataFrame形式で取得
Returns:
統計情報のDataFrame
"""
import sqlite3
conn = sqlite3.connect(self.db_path)
stats_data = []
# 基本統計
cursor = conn.cursor()
# 総ページ数
cursor.execute("SELECT COUNT(*) FROM scraped_pages")
total_pages = cursor.fetchone()[0]
stats_data.append({"Metric": "Total Pages", "Value": total_pages})
# 成功ページ数
cursor.execute("SELECT COUNT(*) FROM scraped_pages WHERE status = 'success'")
successful_pages = cursor.fetchone()[0]
stats_data.append({"Metric": "Successful Pages", "Value": successful_pages})
# ユニークドメイン数
cursor.execute("SELECT COUNT(DISTINCT source_domain) FROM scraped_pages")
unique_domains = cursor.fetchone()[0]
stats_data.append({"Metric": "Unique Domains", "Value": unique_domains})
# 抽出データポイント数
cursor.execute("SELECT COUNT(*) FROM extracted_data")
data_points = cursor.fetchone()[0]
stats_data.append({"Metric": "Extracted Data Points", "Value": data_points})
# 成功率
if total_pages > 0:
success_rate = (successful_pages / total_pages) * 100
stats_data.append({"Metric": "Success Rate (%)", "Value": f"{success_rate:.1f}"})
# 日別統計
cursor.execute("""
SELECT DATE(scraped_at) as date, COUNT(*) as count 
FROM scraped_pages 
GROUP BY DATE(scraped_at) 
ORDER BY date DESC 
LIMIT 7
""")
daily_stats = cursor.fetchall()
for date, count in daily_stats:
stats_data.append({"Metric": f"Pages on {date}", "Value": count})
conn.close()
return pd.DataFrame(stats_data)
# 使用例
async def demo_data_pipeline():
"""
データパイプラインのデモ
スクレイピング→保存→エクスポートの
完全なワークフローを実行
"""
# データパイプライン初期化
pipeline = DataPipeline("demo_output")
# サンプルスクレイピングの実行
print("🔄 サンプルデータ収集中...")
# 複数のサイトからデータを収集
sample_urls = [
"https://httpbin.org/json",
"https://httpbin.org/html",
"https://httpbin.org/xml"
]
scraper = AsyncBatchScraper(max_concurrent=2)
scraped_results = await scraper.scrape_urls_concurrently(sample_urls)
# データベースに保存
print("💾 データベースに保存中...")
saved_count = pipeline.save_scraped_data(scraped_results, session_name="demo_session")
# 抽出データの追加(デモ用)
for result in scraped_results:
if result.get('status') == 'success':
# サンプル抽出データ
extracted_data = {
'content_type': 'web_page',
'content_length': len(result.get('html_content', '')),
'has_title': bool(result.get('title')),
'extraction_timestamp': datetime.now().isoformat()
}
pipeline.save_extracted_data(
result['url'], 
extracted_data, 
extraction_method="demo_extraction"
)
# エクスポート実行
print("📤 データエクスポート中...")
export_files = pipeline.export_to_formats(
export_formats=['csv', 'json']
)
# 結果表示
print(f"n✅ 処理完了!")
print(f"   保存ページ数: {saved_count}")
print("n📁 エクスポートファイル:")
for format_type, file_path in export_files.items():
print(f"   {format_type.upper()}: {file_path}")
# 統計情報表示
stats_df = pipeline.get_statistics_dataframe()
print(f"n📊 統計情報:")
for _, row in stats_df.head(10).iterrows():
print(f"   {row['Metric']}: {row['Value']}")
return pipeline, export_files
# 実行例
# pipeline_result = await demo_data_pipeline()

⚖️ 法的・倫理的考慮事項とコンプライアンス

robots.txt対応とコンプライアンスチェッカー

class ScrapingComplianceChecker:
"""
スクレイピングコンプライアンスチェッカー
robots.txt解析、利用規約チェック、
アクセス頻度制限などの法的・倫理的
配慮を自動化
"""
def __init__(self):
self.robots_cache = {}      # robots.txtのキャッシュ
self.compliance_cache = {}  # コンプライアンス結果のキャッシュ
self.request_history = {}   # ドメイン別リクエスト履歴
async def check_robots_txt(self, url: str, user_agent: str = "*") -> Dict:
"""
robots.txtのチェック
指定されたURLがrobots.txtで
アクセス許可されているかを確認
Args:
url: チェック対象のURL
user_agent: User-Agent名(* = 全て)
Returns:
robots.txt解析結果
"""
from urllib.parse import urljoin, urlparse
import aiohttp
domain = f"{urlparse(url).scheme}://{urlparse(url).netloc}"
robots_url = urljoin(domain, '/robots.txt')
# キャッシュチェック
cache_key = f"{robots_url}:{user_agent}"
if cache_key in self.robots_cache:
return self.robots_cache[cache_key]
try:
async with aiohttp.ClientSession() as session:
async with session.get(robots_url, timeout=10) as response:
if response.status == 200:
robots_content = await response.text()
result = self._parse_robots_txt(robots_content, url, user_agent)
else:
# robots.txtが存在しない場合
result = {
'robots_exists': False,
'is_allowed': True,  # デフォルトで許可
'crawl_delay': None,
'robots_url': robots_url,
'status_code': response.status
}
except Exception as e:
# ネットワークエラー等の場合
result = {
'robots_exists': False,
'is_allowed': True,  # エラー時はデフォルト許可
'error': str(e),
'robots_url': robots_url
}
# キャッシュに保存(1時間有効)
result['cached_at'] = datetime.now()
self.robots_cache[cache_key] = result
return result
def _parse_robots_txt(self, robots_content: str, target_url: str, user_agent: str) -> Dict:
"""
robots.txtの内容を解析
Args:
robots_content: robots.txtの内容
target_url: 対象URL
user_agent: User-Agent
Returns:
解析結果
"""
from urllib.parse import urlparse
lines = robots_content.split('n')
rules = {
'allowed': [],
'disallowed': [],
'crawl_delay': None,
'sitemap': []
}
current_agents = []
target_path = urlparse(target_url).path
for line in lines:
line = line.strip()
# コメントを除去
if '#' in line:
line = line[:line.index('#')].strip()
if not line:
continue
# User-agent指定
if line.lower().startswith('user-agent:'):
agent = line.split(':', 1)[1].strip()
current_agents = [agent]
# 対象User-Agentかチェック
elif any(agent == '*' or agent.lower() == user_agent.lower() 
for agent in current_agents):
if line.lower().startswith('disallow:'):
path = line.split(':', 1)[1].strip()
if path:
rules['disallowed'].append(path)
elif line.lower().startswith('allow:'):
path = line.split(':', 1)[1].strip()
if path:
rules['allowed'].append(path)
elif line.lower().startswith('crawl-delay:'):
try:
delay = float(line.split(':', 1)[1].strip())
rules['crawl_delay'] = delay
except ValueError:
pass
# サイトマップ情報
elif line.lower().startswith('sitemap:'):
sitemap = line.split(':', 1)[1].strip()
rules['sitemap'].append(sitemap)
# URLパスの許可チェック
is_allowed = self._check_path_allowed(target_path, rules)
return {
'robots_exists': True,
'is_allowed': is_allowed,
'crawl_delay': rules['crawl_delay'],
'rules': rules,
'target_path': target_path
}
def _check_path_allowed(self, path: str, rules: Dict) -> bool:
"""
パスがrobots.txtで許可されているかチェック
Args:
path: チェック対象のパス
rules: robots.txtルール
Returns:
許可されている場合True
"""
# Disallowルールをチェック
for disallowed in rules['disallowed']:
if disallowed == '/':
# 全てのパスが禁止
return False
elif path.startswith(disallowed):
# このパスは禁止されている
# ただし、Allowルールでオーバーライドされる可能性あり
# より具体的なAllowルールがあるかチェック
for allowed in rules['allowed']:
if path.startswith(allowed) and len(allowed) > len(disallowed):
return True
return False
# 明示的に許可されているかチェック
for allowed in rules['allowed']:
if path.startswith(allowed):
return True
# デフォルトは許可
return True
def generate_compliance_report(self, url: str, robots_result: Dict, 
custom_checks: List[str] = None) -> Dict:
"""
包括的なコンプライアンスレポート生成
Args:
url: 対象URL
robots_result: robots.txtチェック結果
custom_checks: 追加チェック項目
Returns:
コンプライアンスレポート
"""
from urllib.parse import urlparse
recommendations = []
risk_level = "low"
compliance_score = 100  # 100点満点
# robots.txtチェック結果の評価
if not robots_result.get('is_allowed', True):
recommendations.append({
'type': 'critical',
'message': "⚠️ robots.txtでアクセスが禁止されています。スクレイピングを避けてください。",
'action': "別のデータソースを検討するか、サイト運営者に許可を求めてください。"
})
risk_level = "high"
compliance_score -= 50
# クロール遅延の確認
if robots_result.get('crawl_delay'):
delay = robots_result['crawl_delay']
if delay > 10:
recommendations.append({
'type': 'warning',
'message': f"⏱️ robots.txtで{delay}秒の長いクロール間隔が指定されています。",
'action': f"最低{delay}秒の間隔を空けてアクセスしてください。"
})
risk_level = "medium" if risk_level == "low" else risk_level
compliance_score -= 20
else:
recommendations.append({
'type': 'info',
'message': f"⏱️ robots.txtで{delay}秒のクロール間隔が指定されています。",
'action': f"{delay}秒の間隔を遵守してアクセスしてください。"
})
compliance_score -= 5
# ドメイン別の一般的な推奨事項
domain = urlparse(url).netloc
# 大手サイトの特別な配慮
high_traffic_domains = [
'amazon.com', 'google.com', 'facebook.com', 'twitter.com', 
'youtube.com', 'wikipedia.org', 'instagram.com'
]
if any(htd in domain for htd in high_traffic_domains):
recommendations.append({
'type': 'warning',
'message': f"🏢 {domain}は高トラフィックサイトです。",
'action': "より長い間隔(5秒以上)でのアクセスを推奨します。"
})
compliance_score -= 10
# 基本的な推奨事項
basic_recommendations = [
{
'type': 'info',
'message': "📋 対象サイトの利用規約を必ず確認してください。",
'action': "利用規約ページを読み、スクレイピングの制限事項を確認してください。"
},
{
'type': 'info',
'message': "🤖 適切で識別可能なUser-Agentを設定してください。",
'action': "連絡先情報を含むUser-Agentの使用を検討してください。"
},
{
'type': 'info',
'message': "⏳ サーバー負荷を考慮して適切な間隔を空けてアクセスしてください。",
'action': "最低1秒、可能であれば2-3秒の間隔を推奨します。"
},
{
'type': 'info',
'message': "💾 個人情報や機密情報の取得は避けてください。",
'action': "公開されている情報のみを対象とし、取得目的を明確にしてください。"
}
]
recommendations.extend(basic_recommendations)
# カスタムチェック項目の追加
if custom_checks:
for check in custom_checks:
recommendations.append({
'type': 'custom',
'message': f"🔍 カスタムチェック: {check}",
'action': "プロジェクト固有の要件を確認してください。"
})
return {
'url': url,
'domain': domain,
'risk_level': risk_level,
'compliance_score': max(compliance_score, 0),  # 最低0点
'robots_compliant': robots_result.get('is_allowed', True),
'suggested_delay': max(robots_result.get('crawl_delay', 1), 1),
'recommendations': recommendations,
'robots_details': robots_result,
'timestamp': datetime.now(),
'checked_by': 'ScrapingComplianceChecker v1.0'
}
async def batch_compliance_check(self, urls: List[str]) -> Dict[str, Dict]:
"""
複数URLの一括コンプライアンスチェック
Args:
urls: チェック対象URLのリスト
Returns:
URL別のコンプライアンス結果
"""
results = {}
for url in urls:
print(f"コンプライアンスチェック中: {url}")
# robots.txtチェック
robots_result = await self.check_robots_txt(url)
# コンプライアンスレポート生成
compliance_report = self.generate_compliance_report(url, robots_result)
results[url] = compliance_report
# チェック間隔
await asyncio.sleep(0.5)
return results
# 使用例
async def demo_compliance_check():
"""
コンプライアンスチェックのデモ
"""
checker = ScrapingComplianceChecker()
# テスト対象URL
test_urls = [
"https://httpbin.org/html",
"https://example.com",
"https://github.com/robots.txt"  # robots.txtが存在するサイト
]
# 一括コンプライアンスチェック
print("🔍 コンプライアンスチェック開始n")
compliance_results = await checker.batch_compliance_check(test_urls)
# 結果の表示
for url, report in compliance_results.items():
print(f"{'='*80}")
print(f"📊 {url}")
print(f"{'='*80}")
print(f"リスクレベル: {report['risk_level'].upper()}")
print(f"コンプライアンススコア: {report['compliance_score']}/100")
print(f"robots.txt準拠: {'✅' if report['robots_compliant'] else '❌'}")
print(f"推奨アクセス間隔: {report['suggested_delay']}秒")
print(f"n📝 推奨事項 ({len(report['recommendations'])}件):")
for i, rec in enumerate(report['recommendations'][:5], 1):  # 最初の5件のみ表示
icon = "🚨" if rec['type'] == 'critical' else "⚠️" if rec['type'] == 'warning' else "ℹ️"
print(f"  {i}. {icon} {rec['message']}")
print(f"     → {rec['action']}")
print()  # 空行
return compliance_results
# 実行例
# compliance_results = await demo_compliance_check()

🎯 実践プロジェクト:ニュース監視システム

最後に、これまで学んだ技術を統合した実用的なプロジェクトを構築します。

class NewsMonitoringSystem:
"""
総合ニュース監視システム
複数のニュースサイトから記事を自動収集し、
分析・分類・アラート機能を提供
"""
def __init__(self, config_file: str = "news_monitor_config.json"):
self.config = self.load_config(config_file)
self.pipeline = DataPipeline("news_monitoring")
self.compliance_checker = ScrapingComplianceChecker()
self.extractor = AdvancedDataExtractor()
self.anti_detection_scraper = AntiDetectionScraper()
# 監視状態
self.monitoring_active = False
self.last_check_time = {}
# アラート機能
self.alert_handlers = []
def load_config(self, config_file: str) -> Dict:
"""設定ファイル読み込み"""
try:
with open(config_file, 'r', encoding='utf-8') as f:
return json.load(f)
except FileNotFoundError:
# デフォルト設定
default_config = {
"news_sources": [
{
"name": "Tech News Site",
"base_url": "https://example-tech-news.com",
"rss_url": "https://example-tech-news.com/rss",
"selectors": {
"article_links": ".article-link",
"title": ".article-title",
"content": ".article-content",
"author": ".author",
"date": ".publish-date"
},
"enabled": True,
"check_interval_minutes": 30
}
],
"monitoring": {
"max_articles_per_check": 20,
"content_min_length": 200,
"keywords": ["AI", "Python", "technology", "startup"],
"categories": ["technology", "business", "science"]
},
"alerts": {
"keyword_alerts": True,
"new_source_alerts": True,
"error_alerts": True,
"daily_summary": True
},
"compliance": {
"respect_robots_txt": True,
"min_delay_seconds": 2,
"max_requests_per_hour": 100
}
}
with open(config_file, 'w', encoding='utf-8') as f:
json.dump(default_config, f, indent=2, ensure_ascii=False)
return default_config
async def start_monitoring(self):
"""監視開始"""
self.monitoring_active = True
logger.info("ニュース監視システム開始")
while self.monitoring_active:
try:
# 各ニュースソースをチェック
for source in self.config['news_sources']:
if not source.get('enabled', True):
continue
# チェック間隔の確認
if self._should_check_source(source):
await self._monitor_news_source(source)
# 次のチェックまで待機
await asyncio.sleep(60)  # 1分間隔でチェック
except KeyboardInterrupt:
logger.info("監視停止要求を受信")
break
except Exception as e:
logger.error(f"監視ループでエラー: {e}")
await asyncio.sleep(300)  # エラー時は5分待機
self.monitoring_active = False
logger.info("ニュース監視システム停止")
def _should_check_source(self, source: Dict) -> bool:
"""ソースをチェックすべきかどうか判定"""
source_name = source['name']
check_interval = source.get('check_interval_minutes', 30) * 60  # 秒に変換
if source_name not in self.last_check_time:
return True
elapsed = time.time() - self.last_check_time[source_name]
return elapsed >= check_interval
async def _monitor_news_source(self, source: Dict):
"""単一ニュースソースの監視"""
source_name = source['name']
logger.info(f"チェック開始: {source_name}")
try:
# コンプライアンスチェック
if self.config['compliance']['respect_robots_txt']:
robots_result = await self.compliance_checker.check_robots_txt(source['base_url'])
if not robots_result.get('is_allowed', True):
logger.warning(f"{source_name}: robots.txtで禁止されています")
return
# 推奨遅延の適用
suggested_delay = robots_result.get('crawl_delay', 
self.config['compliance']['min_delay_seconds'])
await asyncio.sleep(suggested_delay)
# 記事収集
articles = await self._collect_articles(source)
if articles:
# データベースに保存
await self._save_articles(articles, source_name)
# キーワードアラートのチェック
await self._check_keyword_alerts(articles)
logger.info(f"{source_name}: {len(articles)} 件の記事を処理")
# 最終チェック時刻の更新
self.last_check_time[source_name] = time.time()
except Exception as e:
logger.error(f"{source_name} の監視でエラー: {e}")
# エラーアラート
if self.config['alerts']['error_alerts']:
await self._send_error_alert(source_name, str(e))
async def _collect_articles(self, source: Dict) -> List[Dict]:
"""記事の収集"""
articles = []
# RSS がある場合は RSS を優先
if source.get('rss_url'):
articles.extend(await self._collect_from_rss(source))
# HTMLスクレイピング
if source.get('base_url') and source.get('selectors'):
articles.extend(await self._collect_from_html(source))
# 重複除去
seen_urls = set()
unique_articles = []
for article in articles:
if article['url'] not in seen_urls:
seen_urls.add(article['url'])
unique_articles.append(article)
return unique_articles[:self.config['monitoring']['max_articles_per_check']]
async def _collect_from_rss(self, source: Dict) -> List[Dict]:
"""RSS フィードからの記事収集"""
try:
import feedparser
import aiohttp
async with aiohttp.ClientSession() as session:
async with session.get(source['rss_url']) as response:
rss_content = await response.text()
feed = feedparser.parse(rss_content)
articles = []
for entry in feed.entries:
article = {
'title': getattr(entry, 'title', ''),
'url': getattr(entry, 'link', ''),
'description': getattr(entry, 'summary', ''),
'author': getattr(entry, 'author', ''),
'published': getattr(entry, 'published', ''),
'source': source['name'],
'collection_method': 'rss',
'collected_at': datetime.now()
}
articles.append(article)
return articles
except ImportError:
logger.warning("feedparser not installed, skipping RSS collection")
return []
except Exception as e:
logger.error(f"RSS収集エラー: {e}")
return []
async def _collect_from_html(self, source: Dict) -> List[Dict]:
"""HTMLスクレイピングによる記事収集"""
try:
# ステルスモードでページを取得
result = await self.anti_detection_scraper.stealth_scrape(
source['base_url'], 
extra_stealth=True
)
if result['status'] != 'success':
return []
# 記事リンクの抽出
soup = BeautifulSoup(result['html_content'], 'html.parser')
article_links = []
link_selector = source['selectors'].get('article_links', 'a')
for link_element in soup.select(link_selector):
href = link_element.get('href')
if href:
# 相対URLを絶対URLに変換
if href.startswith('/'):
href = source['base_url'].rstrip('/') + href
elif not href.startswith('http'):
href = source['base_url'].rstrip('/') + '/' + href
article_links.append(href)
# 各記事の詳細を取得
articles = []
for link in article_links[:10]:  # 最新10記事に制限
article_detail = await self._scrape_article_detail(link, source)
if article_detail:
articles.append(article_detail)
# 記事間の適切な間隔
await asyncio.sleep(2)
return articles
except Exception as e:
logger.error(f"HTMLスクレイピングエラー: {e}")
return []
async def _scrape_article_detail(self, url: str, source: Dict) -> Optional[Dict]:
"""個別記事の詳細スクレイピング"""
try:
result = await self.anti_detection_scraper.stealth_scrape(url)
if result['status'] != 'success':
return None
# 記事詳細の抽出
soup = BeautifulSoup(result['html_content'], 'html.parser')
title_elem = soup.select_one(source['selectors'].get('title', 'h1'))
content_elem = soup.select_one(source['selectors'].get('content', 'article'))
author_elem = soup.select_one(source['selectors'].get('author', '.author'))
date_elem = soup.select_one(source['selectors'].get('date', '.date'))
# テキスト抽出
title = title_elem.get_text(strip=True) if title_elem else ''
content = content_elem.get_text(strip=True) if content_elem else ''
author = author_elem.get_text(strip=True) if author_elem else ''
date_str = date_elem.get_text(strip=True) if date_elem else ''
# 最小文字数チェック
min_length = self.config['monitoring']['content_min_length']
if len(content) < min_length:
return None
return {
'title': title,
'url': url,
'content': content,
'author': author,
'published': date_str,
'source': source['name'],
'collection_method': 'html_scraping',
'collected_at': datetime.now(),
'content_length': len(content)
}
except Exception as e:
logger.error(f"記事詳細取得エラー ({url}): {e}")
return None
async def _save_articles(self, articles: List[Dict], source_name: str):
"""記事をデータベースに保存"""
# スクレイピング結果形式に変換
scraped_results = []
for article in articles:
scraped_results.append({
'url': article['url'],
'title': article['title'],
'html_content': article.get('content', ''),
'timestamp': article['collected_at'],
'status': 'success'
})
# データベースに保存
saved_count = self.pipeline.save_scraped_data(
scraped_results, 
session_name=f"news_monitoring_{source_name}"
)
# 抽出データも保存
for article in articles:
extracted_data = {
'article_title': article['title'],
'author': article.get('author', ''),
'published_date': article.get('published', ''),
'content_length': article.get('content_length', 0),
'source_name': source_name,
'collection_method': article.get('collection_method', '')
}
self.pipeline.save_extracted_data(
article['url'],
extracted_data,
'news_extraction'
)
async def _check_keyword_alerts(self, articles: List[Dict]):
"""キーワードアラートのチェック"""
if not self.config['alerts']['keyword_alerts']:
return
keywords = self.config['monitoring']['keywords']
matched_articles = []
for article in articles:
content_text = f"{article['title']} {article.get('content', '')}"
for keyword in keywords:
if keyword.lower() in content_text.lower():
matched_articles.append({
'article': article,
'matched_keyword': keyword
})
break
if matched_articles:
await self._send_keyword_alert(matched_articles)
async def _send_keyword_alert(self, matched_articles: List[Dict]):
"""キーワードマッチアラートの送信"""
logger.info(f"キーワードアラート: {len(matched_articles)} 件の記事がマッチ")
# 実際の実装では、メール送信やSlack通知等を行う
for match in matched_articles:
article = match['article']
keyword = match['matched_keyword']
print(f"🚨 キーワードアラート: '{keyword}'")
print(f"   記事: {article['title']}")
print(f"   URL: {article['url']}")
print(f"   ソース: {article['source']}")
async def _send_error_alert(self, source_name: str, error_message: str):
"""エラーアラートの送信"""
logger.warning(f"エラーアラート: {source_name} - {error_message}")
# 実際の実装では、管理者への通知を行う
print(f"⚠️ エラーアラート")
print(f"   ソース: {source_name}")
print(f"   エラー: {error_message}")
print(f"   時刻: {datetime.now()}")
async def generate_daily_summary(self) -> Dict:
"""日次サマリーの生成"""
# 今日のデータを取得
today = datetime.now().date()
# データベースから統計情報を取得
stats = self.pipeline.get_statistics_dataframe()
summary = {
'date': today.isoformat(),
'total_articles_collected': 0,
'sources_active': len([s for s in self.config['news_sources'] if s.get('enabled', True)]),
'keyword_matches': 0,
'errors': 0,
'top_sources': [],
'top_keywords': []
}
# より詳細な統計は実際のデータベースクエリで取得
# ここでは簡略化
return summary
# 使用例
async def demo_news_monitoring():
"""
ニュース監視システムのデモ
実際の運用を模擬したテスト実行
"""
# システム初期化
monitor = NewsMonitoringSystem()
print("📰 ニュース監視システム デモ開始")
print("=" * 60)
# 設定情報表示
print(f"監視対象ソース数: {len(monitor.config['news_sources'])}")
print(f"キーワード: {', '.join(monitor.config['monitoring']['keywords'])}")
print(f"コンプライアンス: robots.txt遵守={monitor.config['compliance']['respect_robots_txt']}")
# 単発実行(デモ用)
print(f"n🔍 単発チェック実行中...")
for source in monitor.config['news_sources']:
if source.get('enabled', True):
await monitor._monitor_news_source(source)
# 日次サマリー生成
print(f"n📊 日次サマリー生成中...")
summary = await monitor.generate_daily_summary()
print(f"日次サマリー:")
for key, value in summary.items():
print(f"  {key}: {value}")
# データエクスポート
print(f"n📤 データエクスポート実行中...")
export_files = monitor.pipeline.export_to_formats(['json'])
for format_type, file_path in export_files.items():
print(f"  {format_type.upper()}: {file_path}")
print(f"n✅ デモ完了")
return monitor, summary
# 実行例
# news_monitor_demo = await demo_news_monitoring()
# 実際の監視開始(バックグラウンド実行)
# monitor = NewsMonitoringSystem()
# await monitor.start_monitoring()  # Ctrl+Cで停止

🎉 シリーズ総まとめ

学習した技術の総合

全3部を通じて、以下の包括的なスクレイピング技術を習得しました:

第1部:基礎技術

  • Playwrightによる動的サイト対応
  • 非同期プログラミングの基本
  • データ抽出の基礎

第2部:高度な技術

  • 並行処理によるパフォーマンス向上
  • エラーハンドリングとリトライ機構
  • 構造化データ抽出

第3部:実践的応用

  • 検出回避技術とセキュリティ対策
  • データ永続化と管理システム
  • 法的・倫理的配慮の実装

プロダクション運用のポイント

  1. スケーラビリティ: 大量データへの対応
  2. 堅牢性: エラー処理とリカバリ機能
  3. コンプライアンス: 法的・倫理的配慮
  4. 監視: システムの健全性チェック
  5. 保守性: 設定管理とログ出力

次のステップと応用

  • クラウド展開: AWS/GCPでの運用
  • 機械学習統合: 収集データの自動分析
  • API化: スクレイピング結果のサービス化
  • リアルタイム処理: WebSocketとストリーミング

責任あるスクレイピングの実現

技術的な実装だけでなく、法的・倫理的配慮を組み込んだシステム設計が重要です。本シリーズで学んだ技術を使って、サーバー負荷を考慮し、利用規約を遵守した責任あるスクレイピングを実践しましょう。

関連記事


免責事項: 本記事の内容は教育目的のものです。実際のスクレイピング実行時は、対象サイトの利用規約を遵守し、適切な許可を得てから実施してください。

コメントする