Web自動化の新時代:Browser-useとPlaywrightで実現するAI駆動型スクレイピング完全ガイド
Web自動化の新時代:Browser-useとPlaywrightで実現するAI駆動型スクレイピング完全ガイド
皆さん、こんにちは!ぽんたぬきです。
40代になって、つくづく感じることがあります。それは「時間の価値」です。家族との時間、本業での成果、そして副収入を得るための活動...すべてを効率化したい、そんな思いが日々強くなっています。
「同じ作業を毎日繰り返すのはもう嫌だ」 「もっとスマートに情報収集や作業を自動化したい」 「AIの力を借りて、収入につながるスキルを身につけたい」
そんな思いを抱いている方、いらっしゃいませんか?
今回は、私が最近注目している「Browser-use」と「Playwright」を組み合わせた、AI駆動型のWeb自動化について詳しく解説します。実際に私が試行錯誤を重ねて身につけた知識と、失敗談も含めて、実践的な内容をお届けします。
この技術をマスターすれば、データ収集作業の効率化はもちろん、クライアントワークでの差別化や、新しい収入源の開拓も可能になります。一緒に最新のWeb自動化技術を学んでいきましょう!
なぜ今、AI駆動型Web自動化なのか?
従来のスクレイピングの限界
私がWeb自動化に取り組み始めたのは約5年前。当時はBeautifulSoupやSeleniumを使った従来型のスクレイピングが主流でした。しかし、実際に運用してみると、いくつもの壁にぶつかりました。
従来手法の課題:
- 動的コンテンツへの対応が困難
- サイト構造の変更に脆弱
- 複雑な操作フローの実装が煩雑
- 人間らしい操作の再現が難しい
- メンテナンスコストが高い
「せっかく作ったスクリプトが、サイトの仕様変更で動かなくなった」という経験、皆さんもお持ちではないでしょうか?
AI時代の新しいアプローチ
2024年に入って、状況は劇的に変わりました。大規模言語モデル(LLM)の進化により、Webページの理解と操作を「人間のように」行えるツールが登場したのです。
AI駆動型自動化のメリット:
- 自然言語での指示が可能
- 動的な判断と適応
- 複雑なUIパターンへの対応
- エラー回復能力の向上
- 保守性の大幅な改善
Browser-useとは何か?
Browser-useの基本概念
Browser-useは、大規模言語モデル(LLM)を活用してWebブラウザを自動操作するためのPythonライブラリです。従来のスクレイピングツールとは根本的に異なるアプローチを採用しています。
従来のアプローチ:
HTML解析 → 要素特定 → 操作実行
Browser-useのアプローチ:
自然言語指示 → AI理解 → 適応的操作
技術的な特徴
1. マルチモーダルAI統合
- 視覚情報とテキスト情報を同時処理
- スクリーンショットベースの要素認識
- コンテキスト理解による適応的操作
2. 自然言語インターフェース
- 人間らしい指示での操作が可能
- 複雑な条件分岐の自動処理
- エラー時の自動リトライ機能
3. 学習機能
- 過去の操作履歴から学習
- サイト固有のパターン認識
- 効率的な操作ルートの最適化
実装例:基本的な使い方
from browser_use import Agent
# エージェントの初期化
agent = Agent(
task="Amazon で 'ワイヤレスイヤホン' を検索し、価格順に並び替えて上位5件の商品情報を取得",
llm=openai_llm # OpenAI GPT-4などのLLMを指定
)
# タスクの実行
result = agent.run()
print(result)このシンプルなコードだけで、複雑な検索・並び替え・データ抽出が自動実行されます。従来なら数百行必要だった処理が、わずか数行で実現できるのです。
Playwrightとの連携メリット
Playwrightの強み
Playwrightは、Microsoft が開発したブラウザ自動化ライブラリです。Browser-useと組み合わせることで、相乗効果が生まれます。
Playwrightの特徴:
- 高速で安定したブラウザ操作
- 複数ブラウザエンジン対応(Chromium、Firefox、Safari)
- 優れたネットワーク制御機能
- 堅牢なテスト機能
連携による相乗効果
1. 安定性の向上
from browser_use import Agent
from playwright.async_api import async_playwright
async def enhanced_automation():
async with async_playwright() as p:
browser = await p.chromium.launch(headless=False)
context = await browser.new_context(
viewport={'width': 1920, 'height': 1080},
user_agent='Mozilla/5.0 (Windows NT 10.0; Win64; x64)...'
)
agent = Agent(
task="競合他社の価格情報を定期的に収集",
llm=openai_llm,
browser_context=context
)
result = await agent.run()
await browser.close()
return result2. 高度なネットワーク制御
# リクエストの監視とフィルタリング
await context.route("**/*", lambda route: (
route.continue_() if "api" in route.request.url
else route.abort()
))3. エラーハンドリングの強化
try:
result = await agent.run()
except PlaywrightTimeoutError:
# 自動リトライロジック
await page.reload()
result = await agent.run()実践プロジェクト1:ECサイト価格監視システム
プロジェクト概要
競合他社の商品価格を自動監視し、価格変動をSlackに通知するシステムを構築してみましょう。これは実際に私が副業クライアント向けに開発したシステムの簡易版です。
システム設計
アーキテクチャ:
Scheduler → Browser-use Agent → Data Processing → Notification
技術スタック:
- Browser-use + Playwright(データ収集)
- SQLite(データ保存)
- Slack API(通知)
- APScheduler(定期実行)
実装ステップ
ステップ1:基本的なデータ収集エージェント
import asyncio
from browser_use import Agent
from playwright.async_api import async_playwright
import json
from datetime import datetime
class PriceMonitorAgent:
def __init__(self, llm):
self.llm = llm
async def collect_prices(self, products):
async with async_playwright() as p:
browser = await p.chromium.launch(
headless=True,
args=['--no-sandbox', '--disable-dev-shm-usage']
)
context = await browser.new_context(
viewport={'width': 1920, 'height': 1080}
)
results = []
for product in products:
agent = Agent(
task=f"""
{product['site']}で「{product['keyword']}」を検索し、
以下の情報を取得してください:
1. 商品名
2. 価格
3. 商品URL
4. 在庫状況
結果はJSON形式で返してください。
""",
llm=self.llm,
browser_context=context
)
try:
result = await agent.run()
price_data = {
'product_id': product['id'],
'timestamp': datetime.now().isoformat(),
'data': result
}
results.append(price_data)
except Exception as e:
print(f"Error collecting data for {product['keyword']}: {e}")
await browser.close()
return resultsステップ2:データ保存とトレンド分析
import sqlite3
from typing import List, Dict
import pandas as pd
class PriceDatabase:
def __init__(self, db_path="prices.db"):
self.db_path = db_path
self.init_db()
def init_db(self):
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute("""
CREATE TABLE IF NOT EXISTS price_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
product_id TEXT,
timestamp TEXT,
price REAL,
product_name TEXT,
url TEXT,
in_stock BOOLEAN
)
""")
conn.commit()
conn.close()
def save_prices(self, price_data: List[Dict]):
conn = sqlite3.connect(self.db_path)
for data in price_data:
try:
parsed_data = json.loads(data['data'])
cursor = conn.cursor()
cursor.execute("""
INSERT INTO price_history
(product_id, timestamp, price, product_name, url, in_stock)
VALUES (?, ?, ?, ?, ?, ?)
""", (
data['product_id'],
data['timestamp'],
parsed_data.get('price', 0),
parsed_data.get('product_name', ''),
parsed_data.get('url', ''),
parsed_data.get('in_stock', False)
))
except Exception as e:
print(f"Error saving price data: {e}")
conn.commit()
conn.close()
def get_price_changes(self, threshold=0.1):
"""価格変動が閾値を超えた商品を取得"""
conn = sqlite3.connect(self.db_path)
query = """
WITH latest_prices AS (
SELECT product_id, price, timestamp,
ROW_NUMBER() OVER (PARTITION BY product_id ORDER BY timestamp DESC) as rn
FROM price_history
),
previous_prices AS (
SELECT product_id, price as prev_price,
ROW_NUMBER() OVER (PARTITION BY product_id ORDER BY timestamp DESC) as rn
FROM price_history
)
SELECT l.product_id, l.price, p.prev_price,
(l.price - p.prev_price) / p.prev_price as change_rate
FROM latest_prices l
JOIN previous_prices p ON l.product_id = p.product_id
WHERE l.rn = 1 AND p.rn = 2
AND ABS((l.price - p.prev_price) / p.prev_price) > ?
"""
df = pd.read_sql_query(query, conn, params=[threshold])
conn.close()
return df.to_dict('records')ステップ3:通知システム
import requests
from typing import List, Dict
class SlackNotifier:
def __init__(self, webhook_url):
self.webhook_url = webhook_url
def send_price_alert(self, changes: List[Dict]):
if not changes:
return
message = {
"text": "価格変動アラート",
"attachments": []
}
for change in changes:
color = "good" if change['change_rate'] < 0 else "danger"
attachment = {
"color": color,
"fields": [
{
"title": f"商品ID: {change['product_id']}",
"value": f"前回価格: ¥{change['prev_price']:,.0f}\n現在価格: ¥{change['price']:,.0f}\n変動率: {change['change_rate']*100:.1f}%",
"short": True
}
]
}
message["attachments"].append(attachment)
response = requests.post(self.webhook_url, json=message)
return response.status_code == 200ステップ4:統合とスケジューリング
from apscheduler.schedulers.asyncio import AsyncIOScheduler
import asyncio
class PriceMonitoringSystem:
def __init__(self, llm, slack_webhook):
self.agent = PriceMonitorAgent(llm)
self.db = PriceDatabase()
self.notifier = SlackNotifier(slack_webhook)
self.scheduler = AsyncIOScheduler()
async def run_monitoring_cycle(self):
"""監視サイクルの実行"""
products = [
{
'id': 'laptop_001',
'keyword': 'MacBook Pro 14インチ',
'site': 'https://amazon.co.jp'
},
{
'id': 'laptop_002',
'keyword': 'MacBook Pro 14インチ',
'site': 'https://kakaku.com'
}
]
try:
# データ収集
price_data = await self.agent.collect_prices(products)
# データ保存
self.db.save_prices(price_data)
# 価格変動チェック
changes = self.db.get_price_changes(threshold=0.05) # 5%以上の変動
# 通知送信
if changes:
self.notifier.send_price_alert(changes)
except Exception as e:
print(f"Monitoring cycle error: {e}")
def start_monitoring(self):
"""定期監視の開始"""
self.scheduler.add_job(
self.run_monitoring_cycle,
'interval',
hours=6, # 6時間ごと
id='price_monitoring'
)
self.scheduler.start()
try:
asyncio.get_event_loop().run_forever()
except KeyboardInterrupt:
print("Monitoring stopped")
self.scheduler.shutdown()
# 使用例
if __name__ == "__main__":
from openai import AsyncOpenAI
llm = AsyncOpenAI(api_key="your-api-key")
slack_webhook = "your-slack-webhook-url"
system = PriceMonitoringSystem(llm, slack_webhook)
system.start_monitoring()運用での学び
実際にこのシステムを3ヶ月間運用して得た知見をご紹介します:
成功のポイント:
- エラーハンドリングの重要性:Webサイトの構造変更は頻繁に発生するため、堅牢なエラー処理が必須
- レート制限の考慮:過度な頻度でのアクセスはブロックの原因となるため、適切な間隔設定が重要
- データの正規化:同じ商品でも表記が異なる場合があるため、データクリーニングロジックが必要
改善点:
- プロキシローテーション機能の追加
- より高度な価格トレンド分析
- 複数通知チャネルへの対応
実践プロジェクト2:求人情報自動収集システム
プロジェクト背景
40代のエンジニアとして、常に市場価値を把握しておくことは重要です。また、副業案件の情報収集も効率化したいところです。そこで、複数の求人サイトから条件に合致する案件を自動収集するシステムを構築しました。
システム設計
要件定義:
- 複数の求人サイトから情報収集
- 条件フィルタリング(技術スタック、年収、リモート可否等)
- 重複排除と優先度付け
- 週次レポート生成
実装詳細
ステップ1:求人データ収集エージェント
import asyncio
from browser_use import Agent
from dataclasses import dataclass
from typing import List, Optional
from datetime import datetime
@dataclass
class JobSearchCriteria:
keywords: List[str]
location: Optional[str] = None
min_salary: Optional[int] = None
remote_ok: bool = False
experience_level: Optional[str] = None
@dataclass
class JobPosting:
title: str
company: str
salary: Optional[str]
location: str
description: str
url: str
posted_date: str
technologies: List[str]
remote_option: bool
source_site: str
class JobCollectionAgent:
def __init__(self, llm):
self.llm = llm
async def search_jobs(self, site_config, criteria: JobSearchCriteria) -> List[JobPosting]:
"""指定サイトから求人情報を収集"""
async with async_playwright() as p:
browser = await p.chromium.launch(headless=True)
context = await browser.new_context()
task_prompt = self._build_search_prompt(site_config, criteria)
agent = Agent(
task=task_prompt,
llm=self.llm,
browser_context=context
)
try:
result = await agent.run()
jobs = self._parse_job_results(result, site_config['name'])
return jobs
except Exception as e:
print(f"Error collecting jobs from {site_config['name']}: {e}")
return []
finally:
await browser.close()
def _build_search_prompt(self, site_config, criteria):
"""検索プロンプトの構築"""
keywords_str = ' OR '.join(criteria.keywords)
prompt = f"""
{site_config['url']}にアクセスし、以下の条件で求人を検索してください:
検索キーワード: {keywords_str}
場所: {criteria.location or '指定なし'}
最低年収: {criteria.min_salary or '指定なし'}万円
リモート勤務: {'必須' if criteria.remote_ok else '問わない'}
各求人について以下の情報を取得してください:
1. 職種・ポジション名
2. 会社名
3. 年収・給与情報
4. 勤務地
5. 求人詳細(スキル要件、業務内容)
6. 求人URL
7. 投稿日
8. 使用技術・プログラミング言語
9. リモートワーク可否
結果は以下のJSON配列形式で返してください:
[
{{
"title": "職種名",
"company": "会社名",
"salary": "年収情報",
"location": "勤務地",
"description": "詳細",
"url": "求人URL",
"posted_date": "投稿日",
"technologies": ["技術1", "技術2"],
"remote_option": true/false
}}
]
最大20件まで収集してください。
"""
return prompt
def _parse_job_results(self, result_json, source_site):
"""取得結果をJobPostingオブジェクトに変換"""
jobs = []
try:
job_data = json.loads(result_json)
for job_info in job_data:
job = JobPosting(
title=job_info.get('title', ''),
company=job_info.get('company', ''),
salary=job_info.get('salary'),
location=job_info.get('location', ''),
description=job_info.get('description', ''),
url=job_info.get('url', ''),
posted_date=job_info.get('posted_date', ''),
technologies=job_info.get('technologies', []),
remote_option=job_info.get('remote_option', False),
source_site=source_site
)
jobs.append(job)
except json.JSONDecodeError as e:
print(f"Failed to parse job results: {e}")
return jobsステップ2:データ分析とフィルタリング
import pandas as pd
import re
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
class JobAnalyzer:
def __init__(self):
self.vectorizer = TfidfVectorizer(stop_words='english', max_features=1000)
def remove_duplicates(self, jobs: List[JobPosting]) -> List[JobPosting]:
"""重複求人の除去"""
if not jobs:
return jobs
# 求人内容のベクトル化
descriptions = [job.description for job in jobs]
try:
tfidf_matrix = self.vectorizer.fit_transform(descriptions)
similarity_matrix = cosine_similarity(tfidf_matrix)
# 類似度が高い求人をグループ化
threshold = 0.8
unique_jobs = []
processed = set()
for i in range(len(jobs)):
if i in processed:
continue
unique_jobs.append(jobs[i])
# 類似する求人を特定
similar_indices = []
for j in range(i + 1, len(jobs)):
if similarity_matrix[i][j] > threshold:
similar_indices.append(j)
processed.add(j)
processed.add(i)
return unique_jobs
except Exception as e:
print(f"Error in duplicate removal: {e}")
return jobs
def extract_salary_info(self, salary_text: str) -> dict:
"""年収情報の抽出と正規化"""
if not salary_text:
return {'min_salary': None, 'max_salary': None, 'currency': None}
# 年収パターンの検出
patterns = [
r'(\d{3,4})万円?~(\d{3,4})万円?', # 400万円~600万円
r'(\d{3,4})~(\d{3,4})万円?', # 400~600万円
r'年収(\d{3,4})万円?以上', # 年収400万円以上
r'(\d{3,4})万円?以上', # 400万円以上
]
for pattern in patterns:
match = re.search(pattern, salary_text)
if match:
groups = match.groups()
if len(groups) == 2:
return {
'min_salary': int(groups[0]) * 10000,
'max_salary': int(groups[1]) * 10000,
'currency': 'JPY'
}
elif len(groups) == 1:
return {
'min_salary': int(groups[0]) * 10000,
'max_salary': None,
'currency': 'JPY'
}
return {'min_salary': None, 'max_salary': None, 'currency': None}
def score_jobs(self, jobs: List[JobPosting], criteria: JobSearchCriteria) -> List[tuple]:
"""求人のスコアリング"""
scored_jobs = []
for job in jobs:
score = 0
# キーワードマッチング
job_text = f"{job.title} {job.description}".lower()
keyword_matches = sum(1 for keyword in criteria.keywords
if keyword.lower() in job_text)
score += keyword_matches * 10
# 年収評価
salary_info = self.extract_salary_info(job.salary or '')
if salary_info['min_salary'] and criteria.min_salary:
if salary_info['min_salary'] >= criteria.min_salary * 10000:
score += 20
# リモートワーク
if criteria.remote_ok and job.remote_option:
score += 15
# 技術スタック
tech_matches = sum(1 for tech in job.technologies
for keyword in criteria.keywords
if keyword.lower() in tech.lower())
score += tech_matches * 5
scored_jobs.append((job, score))
# スコア順でソート
scored_jobs.sort(key=lambda x: x[1], reverse=True)
return scored_jobsステップ3:レポート生成
import matplotlib.pyplot as plt
import seaborn as sns
from jinja2 import Template
import base64
from io import BytesIO
class JobReportGenerator:
def __init__(self):
self.template = self._load_template()
def generate_weekly_report(self, jobs: List[JobPosting], criteria: JobSearchCriteria):
"""週次レポートの生成"""
analyzer = JobAnalyzer()
# データ分析
unique_jobs = analyzer.remove_duplicates(jobs)
scored_jobs = analyzer.score_jobs(unique_jobs, criteria)
# 統計情報の生成
stats = self._generate_statistics(unique_jobs)
# グラフの生成
charts = self._generate_charts(unique_jobs)
# トップ求人の抽出
top_jobs = [job for job, score in scored_jobs[:10]]
# HTMLレポートの生成
report_data = {
'stats': stats,
'charts': charts,
'top_jobs': top_jobs,
'criteria': criteria,
'generated_date': datetime.now().strftime('%Y年%m月%d日')
}
html_report = self.template.render(**report_data)
# ファイル保存
report_filename = f"job_report_{datetime.now().strftime('%Y%m%d')}.html"
with open(report_filename, 'w', encoding='utf-8') as f:
f.write(html_report)
return report_filename
def _generate_statistics(self, jobs: List[JobPosting]):
"""統計情報の生成"""
if not jobs:
return {}
analyzer = JobAnalyzer()
# 年収統計
salaries = []
for job in jobs:
salary_info = analyzer.extract_salary_info(job.salary or '')
if salary_info['min_salary']:
salaries.append(salary_info['min_salary'] / 10000)
# 技術統計
all_technologies = []
for job in jobs:
all_technologies.extend(job.technologies)
tech_counts = pd.Series(all_technologies).value_counts().head(10)
# 地域統計
location_counts = pd.Series([job.location for job in jobs]).value_counts().head(10)
return {
'total_jobs': len(jobs),
'avg_salary': int(np.mean(salaries)) if salaries else None,
'median_salary': int(np.median(salaries)) if salaries else None,
'top_technologies': tech_counts.to_dict(),
'top_locations': location_counts.to_dict(),
'remote_ratio': sum(1 for job in jobs if job.remote_option) / len(jobs) * 100
}
def _generate_charts(self, jobs: List[JobPosting]):
"""グラフの生成"""
charts = {}
# 年収分布グラフ
analyzer = JobAnalyzer()
salaries = []
for job in jobs:
salary_info = analyzer.extract_salary_info(job.salary or '')
if salary_info['min_salary']:
salaries.append(salary_info['min_salary'] / 10000)
if salaries:
plt.figure(figsize=(10, 6))
plt.hist(salaries, bins=20, alpha=0.7, color='skyblue')
plt.title('年収分布')
plt.xlabel('年収 (万円)')
plt.ylabel('求人数')
buffer = BytesIO()
plt.savefig(buffer, format='png')
buffer.seek(0)
charts['salary_distribution'] = base64.b64encode(buffer.getvalue()).decode()
plt.close()
# 技術トレンドグラフ
all_technologies = []
for job in jobs:
all_technologies.extend(job.technologies)
tech_counts = pd.Series(all_technologies).value_counts().head(10)
if not tech_counts.empty:
plt.figure(figsize=(12, 8))
tech_counts.plot(kind='barh', color='lightcoral')
plt.title('人気技術ランキング')
plt.xlabel('求人数')
buffer = BytesIO()
plt.savefig(buffer, format='png', bbox_inches='tight')
buffer.seek(0)
charts['technology_ranking'] = base64.b64encode(buffer.getvalue()).decode()
plt.close()
return charts
def _load_template(self):
"""HTMLテンプレートの読み込み"""
template_str = """
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>求人情報週次レポート</title>
<style>
body { font-family: Arial, sans-serif; margin: 20px; }
.header { background-color: #f0f0f0; padding: 20px; text-align: center; }
.stats { display: flex; justify-content: space-around; margin: 20px 0; }
.stat-box { background-color: #e8f4fd; padding: 15px; border-radius: 5px; text-align: center; }
.job-card { border: 1px solid #ddd; margin: 10px 0; padding: 15px; border-radius: 5px; }
.chart { text-align: center; margin: 20px 0; }
</style>
</head>
<body>
<div class="header">
<h1>求人情報週次レポート</h1>
<p>生成日: {{ generated_date }}</p>
</div>
<div class="stats">
<div class="stat-box">
<h3>{{ stats.total_jobs }}</h3>
<p>総求人数</p>
</div>
<div class="stat-box">
<h3>{{ stats.avg_salary or 'N/A' }}万円</h3>
<p>平均年収</p>
</div>
<div class="stat-box">
<h3>{{ "{:.1f}".format(stats.remote_ratio) }}%</h3>
<p>リモート可</p>
</div>
</div>
{% if charts.salary_distribution %}
<div class="chart">
<h2>年収分布</h2>
<img src="data:image/png;base64,{{ charts.salary_distribution }}" alt="年収分布">
</div>
{% endif %}
{% if charts.technology_ranking %}
<div class="chart">
<h2>人気技術ランキング</h2>
<img src="data:image/png;base64,{{ charts.technology_ranking }}" alt="技術ランキング">
</div>
{% endif %}
<h2>注目求人 TOP 10</h2>
{% for job in top_jobs %}
<div class="job-card">
<h3>{{ job.title }}</h3>
<p><strong>会社:</strong> {{ job.company }}</p>
<p><strong>年収:</strong> {{ job.salary or 'N/A' }}</p>
<p><strong>勤務地:</strong> {{ job.location }}</p>
<p><strong>技術:</strong> {{ job.technologies | join(', ') }}</p>
<p><strong>リモート:</strong> {{ '可' if job.remote_option else '不可' }}</p>
<p><a href="{{ job.url }}" target="_blank">求人詳細を見る</a></p>
</div>
{% endfor %}
</body>
</html>
"""
return Template(template_str)ステップ4:統合システム
class JobMonitoringSystem:
def __init__(self, llm, notification_config):
self.collector = JobCollectionAgent(llm)
self.analyzer = JobAnalyzer()
self.reporter = JobReportGenerator()
self.notification_config = notification_config
async def run_weekly_collection(self, criteria: JobSearchCriteria):
"""週次の求人収集とレポート生成"""
# 複数サイトから求人収集
site_configs = [
{'name': 'Indeed', 'url': 'https://jp.indeed.com'},
{'name': 'Green', 'url': 'https://www.green-japan.com'},
{'name': 'Wantedly', 'url': 'https://www.wantedly.com'}
]
all_jobs = []
for site_config in site_configs:
try:
jobs = await self.collector.search_jobs(site_config, criteria)
all_jobs.extend(jobs)
print(f"Collected {len(jobs)} jobs from {site_config['name']}")
except Exception as e:
print(f"Failed to collect from {site_config['name']}: {e}")
# 重複除去
unique_jobs = self.analyzer.remove_duplicates(all_jobs)
print(f"Unique jobs after deduplication: {len(unique_jobs)}")
# レポート生成
report_file = self.reporter.generate_weekly_report(unique_jobs, criteria)
print(f"Report generated: {report_file}")
# 通知送信
await self._send_notification(report_file, len(unique_jobs))
return report_file
async def _send_notification(self, report_file, job_count):
"""レポート完成通知"""
if self.notification_config.get('slack_webhook'):
message = {
"text": f"週次求人レポートが完成しました\n求人数: {job_count}件\nレポート: {report_file}"
}
requests.post(self.notification_config['slack_webhook'], json=message)
# 使用例
async def main():
from openai import AsyncOpenAI
llm = AsyncOpenAI(api_key="your-api-key")
notification_config = {
'slack_webhook': 'your-slack-webhook-url'
}
criteria = JobSearchCriteria(
keywords=['Python', 'Django', 'FastAPI', 'データサイエンス'],
location='東京',
min_salary=600, # 600万円以上
remote_ok=True
)
system = JobMonitoringSystem(llm, notification_config)
report_file = await system.run_weekly_collection(criteria)
print(f"Report saved as: {report_file}")
if __name__ == "__main__":
asyncio.run(main())パフォーマンス最適化のベストプラクティス
1. 並列処理の活用
import asyncio
from concurrent.futures import ThreadPoolExecutor
class OptimizedAgent:
def __init__(self, llm, max_concurrent=3):
self.llm = llm
self.semaphore = asyncio.Semaphore(max_concurrent)
async def process_multiple_sites(self, tasks):
"""複数サイトの並列処理"""
async def process_with_semaphore(task):
async with self.semaphore:
return await self.process_single_task(task)
results = await asyncio.gather(
*[process_with_semaphore(task) for task in tasks],
return_exceptions=True
)
# エラーハンドリング
successful_results = []
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i} failed: {result}")
else:
successful_results.append(result)
return successful_results2. キャッシング戦略
import hashlib
import pickle
from pathlib import Path
from datetime import datetime, timedelta
class CacheManager:
def __init__(self, cache_dir=".cache"):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(exist_ok=True)
def get_cache_key(self, task, site_url):
"""キャッシュキーの生成"""
content = f"{task}_{site_url}_{datetime.now().date()}"
return hashlib.md5(content.encode()).hexdigest()
def get_cached_result(self, cache_key, max_age_hours=6):
"""キャッシュからの結果取得"""
cache_file = self.cache_dir / f"{cache_key}.pkl"
if not cache_file.exists():
return None
# ファイルの更新時刻をチェック
file_time = datetime.fromtimestamp(cache_file.stat().st_mtime)
if datetime.now() - file_time > timedelta(hours=max_age_hours):
cache_file.unlink() # 期限切れキャッシュを削除
return None
try:
with open(cache_file, 'rb') as f:
return pickle.load(f)
except Exception:
return None
def save_to_cache(self, cache_key, data):
"""結果のキャッシュ保存"""
cache_file = self.cache_dir / f"{cache_key}.pkl"
try:
with open(cache_file, 'wb') as f:
pickle.dump(data, f)
except Exception as e:
print(f"Failed to save cache: {e}")3. リソース管理
class ResourceManagedAgent:
def __init__(self, llm, max_memory_mb=1024):
self.llm = llm
self.max_memory_mb = max_memory_mb
async def process_with_resource_monitoring(self, tasks):
"""リソース監視付きの処理"""
import psutil
initial_memory = psutil.Process().memory_info().rss / 1024 / 1024
for i, task in enumerate(tasks):
# メモリ使用量チェック
current_memory = psutil.Process().memory_info().rss / 1024 / 1024
memory_used = current_memory - initial_memory
if memory_used > self.max_memory_mb:
print(f"Memory limit exceeded: {memory_used:.1f}MB")
# ガベージコレクション強制実行
import gc
gc.collect()
# それでもメモリが多い場合は処理を中断
current_memory = psutil.Process().memory_info().rss / 1024 / 1024
if current_memory - initial_memory > self.max_memory_mb:
print("Stopping due to memory constraints")
break
result = await self.process_task(task)
yield resultエラーハンドリングと復旧戦略
1. 段階的リトライ機能
import random
from typing import Callable, Any
class RetryHandler:
def __init__(self, max_retries=3, base_delay=1):
self.max_retries = max_retries
self.base_delay = base_delay
async def retry_with_backoff(self, func: Callable, *args, **kwargs) -> Any:
"""指数バックオフによるリトライ"""
last_exception = None
for attempt in range(self.max_retries + 1):
try:
return await func(*args, **kwargs)
except Exception as e:
last_exception = e
if attempt < self.max_retries:
# 指数バックオフ + ジッター
delay = self.base_delay * (2 ** attempt) + random.uniform(0, 1)
print(f"Attempt {attempt + 1} failed: {e}. Retrying in {delay:.2f}s")
await asyncio.sleep(delay)
else:
print(f"All {self.max_retries + 1} attempts failed")
raise last_exception2. 回路ブレーカーパターン
from enum import Enum
from datetime import datetime, timedelta
class CircuitState(Enum):
CLOSED = "closed"
OPEN = "open"
HALF_OPEN = "half_open"
class CircuitBreaker:
def __init__(self, failure_threshold=5, reset_timeout=60):
self.failure_threshold = failure_threshold
self.reset_timeout = reset_timeout
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
async def call(self, func, *args, **kwargs):
"""回路ブレーカー経由での関数呼び出し"""
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
else:
raise Exception("Circuit breaker is OPEN")
try:
result = await func(*args, **kwargs)
self._on_success()
return result
except Exception as e:
self._on_failure()
raise e
def _should_attempt_reset(self):
"""リセット試行判定"""
return (self.last_failure_time and
datetime.now() - self.last_failure_time > timedelta(seconds=self.reset_timeout))
def _on_success(self):
"""成功時の処理"""
self.failure_count = 0
self.state = CircuitState.CLOSED
def _on_failure(self):
"""失敗時の処理"""
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPENセキュリティとコンプライアンス
1. レート制限とマナー
import asyncio
from datetime import datetime, timedelta
from collections import defaultdict
class RateLimiter:
def __init__(self):
self.requests = defaultdict(list)
self.limits = {
'default': {'requests': 10, 'window': 60}, # 10req/min
'aggressive': {'requests': 30, 'window': 300} # 30req/5min
}
async def acquire(self, domain, limit_type='default'):
"""レート制限の取得"""
now = datetime.now()
limit_config = self.limits[limit_type]
window = timedelta(seconds=limit_config['window'])
# 古いリクエスト履歴を削除
self.requests[domain] = [
req_time for req_time in self.requests[domain]
if now - req_time < window
]
# 制限チェック
if len(self.requests[domain]) >= limit_config['requests']:
oldest_request = min(self.requests[domain])
wait_time = (oldest_request + window - now).total_seconds()
if wait_time > 0:
await asyncio.sleep(wait_time)
# リクエストを記録
self.requests[domain].append(now)2. ユーザーエージェント管理
import random
class UserAgentManager:
def __init__(self):
self.user_agents = [
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
]
def get_random_user_agent(self):
"""ランダムなユーザーエージェントを取得"""
return random.choice(self.user_agents)
async def create_context_with_rotation(self, playwright):
"""ユーザーエージェントローテーション付きコンテキスト作成"""
browser = await playwright.chromium.launch()
context = await browser.new_context(
user_agent=self.get_random_user_agent(),
viewport={'width': 1920, 'height': 1080},
extra_http_headers={
'Accept-Language': 'ja-JP,ja;q=0.9,en;q=0.8'
}
)
return context運用監視とログ管理
1. 構造化ログシステム
import logging
import json
from datetime import datetime
class StructuredLogger:
def __init__(self, name):
self.logger = logging.getLogger(name)
self.logger.setLevel(logging.INFO)
# JSON形式のフォーマッター
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# ファイルハンドラー
file_handler = logging.FileHandler('automation.log')
file_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
def log_task_start(self, task_id, task_type, details):
"""タスク開始ログ"""
log_data = {
'event': 'task_start',
'task_id': task_id,
'task_type': task_type,
'details': details,
'timestamp': datetime.now().isoformat()
}
self.logger.info(json.dumps(log_data, ensure_ascii=False))
def log_task_end(self, task_id, success, results=None, error=None):
"""タスク完了ログ"""
log_data = {
'event': 'task_end',
'task_id': task_id,
'success': success,
'timestamp': datetime.now().isoformat()
}
if results:
log_data['results'] = results
if error:
log_data['error'] = str(error)
self.logger.info(json.dumps(log_data, ensure_ascii=False))2. メトリクス収集
import time
from dataclasses import dataclass
from typing import Dict, List
@dataclass
class TaskMetrics:
task_id: str
start_time: float
end_time: float
success: bool
error_type: str = None
data_count: int = 0
class MetricsCollector:
def __init__(self):
self.metrics: List[TaskMetrics] = []
def start_task(self, task_id: str) -> float:
"""タスク開始時刻の記録"""
return time.time()
def end_task(self, task_id: str, start_time: float, success: bool,
error_type: str = None, data_count: int = 0):
"""タスク完了メトリクスの記録"""
end_time = time.time()
metrics = TaskMetrics(
task_id=task_id,
start_time=start_time,
end_time=end_time,
success=success,
error_type=error_type,
data_count=data_count
)
self.metrics.append(metrics)
def get_performance_summary(self) -> Dict:
"""パフォーマンス要約の取得"""
if not self.metrics:
return {}
successful_tasks = [m for m in self.metrics if m.success]
failed_tasks = [m for m in self.metrics if not m.success]
execution_times = [m.end_time - m.start_time for m in successful_tasks]
return {
'total_tasks': len(self.metrics),
'successful_tasks': len(successful_tasks),
'failed_tasks': len(failed_tasks),
'success_rate': len(successful_tasks) / len(self.metrics) * 100,
'avg_execution_time': sum(execution_times) / len(execution_times) if execution_times else 0,
'total_data_collected': sum(m.data_count for m in successful_tasks)
}実際の収益化事例と学び
副業での活用実績
私がこれまでに Browser-use と Playwright を活用して実際に収益化した事例をご紹介します:
1. 不動産価格監視システム
- クライアント:不動産投資会社
- 内容:競合物件の価格動向自動監視
- 期間:6ヶ月継続契約
- 学び:データの正確性とリアルタイム性が最重要
2. 求人情報分析レポート
- クライアント:人材派遣会社
- 内容:IT業界の給与トレンド分析
- 頻度:四半期ごと
- 学び:分析結果の可視化とビジネスインサイトが価値
3. ECサイト在庫監視
- クライアント:小規模輸入業者
- 内容:海外サプライヤーの在庫状況監視
- 期間:1年継続中
- 学び:アラート機能の精度が契約継続のカギ
価格設定の考え方
40代エンジニアとして、適切な価格設定は重要です:
価格設定の基準:
- 開発工数:時給5,000円~10,000円ベース
- 継続的価値:月額料金の設定
- データ量:処理データ量に応じた従量課金
- 複雑性:技術的難易度による調整
まとめ:中年エンジニアの新しい武器
技術革新がもたらすチャンス
Browser-use と Playwright を組み合わせたAI駆動型Web自動化は、私たち中年エンジニアにとって大きなチャンスです。
従来の経験 × 最新技術 = 差別化
私たちの持つ豊富な実務経験と、最新のAI技術を組み合わせることで、若いエンジニアにはない価値を提供できます。
今後の展望
技術的進化の方向性:
- より高度な自然言語理解
- 複雑なワークフローの自動化
- リアルタイム適応機能の向上
- セキュリティ機能の強化
市場機会:
- 中小企業のデジタル化支援
- 業務プロセスの自動化コンサルティング
- データ収集・分析サービス
- カスタムツール開発
最後に:一緒に学び続けましょう
40代になって感じるのは、技術の変化への適応力の重要性です。Browser-use のようなツールは、従来の知識を活かしながら新しい価値を生み出せる、まさに私たちの世代にぴったりな技術だと思います。
失敗を恐れず、継続的に学習し、実践していくことで、必ず新しい収入源を築けると確信しています。
同じ道を歩む仲間として、ぜひ一緒にこの技術をマスターし、充実したエンジニアライフを送りましょう!
Web自動化の世界は、AI技術の進歩によって劇的に変化しています。Browser-use と Playwright の組み合わせは、私たち中年エンジニアにとって新しい可能性を開く強力なツールです。
重要なのは、完璧を求めすぎず、小さく始めて継続的に改善していくことです。実際に手を動かして試行錯誤を重ねることで、必ず実用的なスキルが身につきます。
この技術をマスターすることで、本業での差別化、副業での収入増加、そして何より新しい技術を学ぶ楽しさを感じられるはずです。
一緒に最新のWeb自動化技術を身につけ、エンジニアとしての新しいステージを目指しましょう!
今すぐ始められるアクションプラン
-
開発環境の構築(今週中)
- Python 3.8以上のインストール
- Browser-use とPlaywright のセットアップ
- OpenAI API キーの取得
-
小さなプロジェクトから開始(来週から)
- 単一サイトでの簡単なデータ収集
- 基本的なエラーハンドリングの実装
- ログ機能の追加
-
スキルアップとコミュニティ参加(継続的に)
- GitHub での事例研究
- 技術ブログでの情報収集
- エンジニアコミュニティでの知見共有
質問や相談があれば、ぜひコメント欄やSNSでお気軽にお声かけください。一緒に学習を進めていきましょう!