Python Webスクレイピング 2025 第3部:実践プロジェクトとデータ管理

約110分で読めます by ぽんたぬき
Python Webスクレイピング 2025 第3部:実践プロジェクトとデータ管理

はじめに

シリーズ最終回となる第3部では、実際のプロジェクトで使える実践的なスクレイピングシステムの構築について解説します。検出回避技術、データの永続化、監視システムの構築など、プロダクション環境で必要となる高度な技術を学びます。

また、法的・倫理的配慮についても詳しく説明し、責任あるスクレイピングの実現方法を示します。

🛡️ 検出回避技術とセキュリティ対策

プロキシローテーションとUser-Agent管理

大規模なスクレイピングでは、IPアドレスの制限やbot検出を回避するための技術が重要です。

class AntiDetectionScraper(ModernWebScraper):
    """
    検出回避機能付きスクレーパー
    
    プロキシローテーション、User-Agent管理、
    人間らしい行動パターンの模倣を実装
    """
    
    def __init__(self, config: ScrapingConfig = None):
        super().__init__(config)
        
        # 複数のUser-Agentを準備(実際のブラウザからの抜粋)
        self.user_agents = [
            # Chrome on Windows
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
            # Chrome on macOS
            "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
            # Firefox on Windows
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:121.0) Gecko/20100101 Firefox/121.0",
            # Safari on macOS
            "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.1 Safari/605.1.15",
            # Edge on Windows
            "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36 Edg/120.0.0.0"
        ]
        
        # プロキシリスト(実際の使用時は有効なプロキシを設定)
        self.proxy_list = [
            # 例: {"host": "proxy1.example.com", "port": 8080, "username": "user1", "password": "pass1"}
        ]
        
        # リクエスト間隔の設定
        self.min_delay = 2.0  # 最小遅延(秒)
        self.max_delay = 5.0  # 最大遅延(秒)
        
        # セッション管理
        self.current_proxy_index = 0
        self.requests_with_current_proxy = 0
        self.max_requests_per_proxy = 50  # プロキシあたりの最大リクエスト数
    
    def get_random_user_agent(self) -> str:
        """
        ランダムなUser-Agentを取得
        
        Returns:
            ランダムに選択されたUser-Agent文字列
        """
        import random
        return random.choice(self.user_agents)
    
    def get_current_proxy(self) -> Optional[Dict[str, str]]:
        """
        現在使用するプロキシを取得
        
        ローテーション機能付き
        一定回数使用後に次のプロキシに切り替え
        
        Returns:
            プロキシ設定の辞書(None = プロキシなし)
        """
        if not self.proxy_list:
            return None
        
        # リクエスト数制限チェック
        if self.requests_with_current_proxy >= self.max_requests_per_proxy:
            # 次のプロキシに切り替え
            self.current_proxy_index = (self.current_proxy_index + 1) % len(self.proxy_list)
            self.requests_with_current_proxy = 0
            logger.info(f"Switching to proxy {self.current_proxy_index + 1}/{len(self.proxy_list)}")
        
        proxy = self.proxy_list[self.current_proxy_index]
        self.requests_with_current_proxy += 1
        
        return {
            'server': f"http://{proxy['host']}:{proxy['port']}",
            'username': proxy.get('username'),
            'password': proxy.get('password')
        }
    
    async def stealth_scrape(self, url: str, extra_stealth: bool = True) -> Dict:
        """
        ステルスモードでのスクレイピング
        
        人間らしい行動パターンを模倣して
        bot検出を回避
        
        Args:
            url: スクレイピング対象URL
            extra_stealth: 追加のステルス機能を有効にするか
            
        Returns:
            スクレイピング結果
        """
        # ランダム遅延(人間らしいアクセス間隔)
        import random
        delay = random.uniform(self.min_delay, self.max_delay)
        await asyncio.sleep(delay)
        
        # 動的にUser-Agentを設定
        user_agent = self.get_random_user_agent()
        proxy = self.get_current_proxy()
        
        # 設定を更新
        config = ScrapingConfig(
            headless=True,
            timeout=30000,
            user_agent=user_agent,
            proxy=proxy
        )
        
        async with ModernWebScraper(config) as scraper:
            page = await scraper.context.new_page()
            
            try:
                if extra_stealth:
                    # ブラウザ検出対策のJavaScriptを実行
                    await page.add_init_script("""
                        // webdriverプロパティを隠す
                        Object.defineProperty(navigator, 'webdriver', {
                            get: () => false,
                        });
                        
                        // Chrome runtime 偽装
                        window.chrome = {
                            runtime: {},
                            loadTimes: function() {},
                            csi: function() {},
                            app: {}
                        };
                        
                        // プラグイン情報の偽装
                        Object.defineProperty(navigator, 'plugins', {
                            get: () => [1, 2, 3, 4, 5],
                        });
                        
                        // 言語設定の偽装
                        Object.defineProperty(navigator, 'languages', {
                            get: () => ['ja-JP', 'ja', 'en-US', 'en'],
                        });
                        
                        // WebGL情報の偽装
                        const getParameter = WebGLRenderingContext.getParameter;
                        WebGLRenderingContext.prototype.getParameter = function(parameter) {
                            if (parameter === 37445) {
                                return 'Intel Open Source Technology Center';
                            }
                            if (parameter === 37446) {
                                return 'Mesa DRI Intel(R) Ivybridge Mobile ';
                            }
                            return getParameter(parameter);
                        };
                    """)
                
                # ページ読み込み
                logger.info(f"Stealth loading: {url}")
                await page.goto(url, wait_until="domcontentloaded")
                
                if extra_stealth:
                    # 人間らしいスクロール動作
                    await self.simulate_human_behavior(page)
                
                # データ取得
                title = await page.title()
                html_content = await page.content()
                
                # ブラウザ情報の取得(検証用)
                browser_info = await page.evaluate("""
                    () => {
                        return {
                            userAgent: navigator.userAgent,
                            platform: navigator.platform,
                            language: navigator.language,
                            cookieEnabled: navigator.cookieEnabled,
                            onLine: navigator.onLine,
                            webdriver: navigator.webdriver
                        }
                    }
                """)
                
                return {
                    'url': url,
                    'title': title,
                    'html_content': html_content,
                    'browser_info': browser_info,
                    'proxy_used': proxy is not None,
                    'user_agent_used': user_agent,
                    'stealth_mode': True,
                    'timestamp': datetime.now(),
                    'status': 'success'
                }
                
            except Exception as e:
                logger.error(f"Stealth scraping error for {url}: {e}")
                return {
                    'url': url,
                    'error': str(e),
                    'stealth_mode': True,
                    'timestamp': datetime.now(),
                    'status': 'error'
                }
            finally:
                await page.close()
    
    async def simulate_human_behavior(self, page):
        """
        人間らしい行動の模擬
        
        ランダムなスクロール、マウス移動、
        一時停止などを実行
        
        Args:
            page: Playwrightページオブジェクト
        """
        import random
        
        # ページの高さを取得
        page_height = await page.evaluate("document.body.scrollHeight")
        viewport_height = await page.evaluate("window.innerHeight")
        
        if page_height > viewport_height:
            # スクロール動作の模擬
            scroll_steps = random.randint(3, 7)
            scroll_distance = page_height // scroll_steps
            
            for i in range(scroll_steps):
                # ランダムな距離をスクロール
                scroll_to = min((i + 1) * scroll_distance + random.randint(-50, 50), page_height)
                
                await page.evaluate(f"window.scrollTo(0, {scroll_to})")
                
                # 人間らしい停止時間
                pause_time = random.uniform(0.5, 2.0)
                await asyncio.sleep(pause_time)
        
        # ランダムなマウス移動(オプション)
        try:
            x = random.randint(100, 800)
            y = random.randint(100, 600)
            await page.mouse.move(x, y)
        except Exception:
            pass  # マウス操作が失敗しても続行
        
        # 最終的な待機
        await asyncio.sleep(random.uniform(1.0, 3.0))

# 使用例
async def demo_anti_detection():
    """
    検出回避スクレイピングのデモ
    
    User-Agent検証サイトを使って
    ステルス機能をテスト
    """
    
    # 検出回避スクレイパーを初期化
    scraper = AntiDetectionScraper()
    
    # テスト用URL(User-Agentやプロキシ情報を確認できるサービス)
    test_urls = [
        "https://httpbin.org/user-agent",     # User-Agent確認
        "https://httpbin.org/headers",        # ヘッダー情報確認
        "https://httpbin.org/ip"              # IPアドレス確認
    ]
    
    results = []
    
    for url in test_urls:
        print(f"\nステルススクレイピング: {url}")
        
        result = await scraper.stealth_scrape(url, extra_stealth=True)
        
        if result['status'] == 'success':
            print(f"✅ 成功: {result['title'][:50]}...")
            print(f"   User-Agent: {result['user_agent_used'][:60]}...")
            print(f"   プロキシ使用: {result['proxy_used']}")
            print(f"   ブラウザ検出: webdriver={result['browser_info'].get('webdriver', 'undefined')}")
        else:
            print(f"❌ エラー: {result['error']}")
        
        results.append(result)
    
    return results

# 実行例
# stealth_results = await demo_anti_detection()

📊 データ管理と永続化

高度なデータパイプライン

スクレイピングしたデータを効率的に管理・保存するためのシステムを構築します。

class DataPipeline:
    """
    スクレイピングデータパイプライン
    
    データの収集、変換、保存、エクスポートを
    一元的に管理するシステム
    """
    
    def __init__(self, output_dir: str = "scraped_data", db_type: str = "sqlite"):
        """
        初期化メソッド
        
        Args:
            output_dir: 出力ディレクトリ
            db_type: データベースタイプ(sqlite, postgresql等)
        """
        self.output_dir = Path(output_dir)
        self.output_dir.mkdir(exist_ok=True)
        
        self.db_type = db_type
        
        # SQLiteデータベースの設定
        if db_type == "sqlite":
            self.db_path = self.output_dir / "scraping_data.db"
            self.init_sqlite_database()
        
        # データ変換設定
        self.data_transformers = {}
        
        # メタデータ管理
        self.session_metadata = {
            'created_at': datetime.now(),
            'total_pages_processed': 0,
            'total_data_points': 0,
            'data_sources': set()
        }
    
    def init_sqlite_database(self):
        """
        SQLiteデータベースの初期化
        
        必要なテーブルを作成し、
        インデックスを設定
        """
        import sqlite3
        
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # メインのページデータテーブル
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS scraped_pages (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                url TEXT NOT NULL,
                title TEXT,
                content_hash TEXT UNIQUE,
                scraped_at TIMESTAMP,
                status TEXT,
                source_domain TEXT,
                metadata TEXT,
                raw_html TEXT
            )
        """)
        
        # 抽出されたデータテーブル
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS extracted_data (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                page_id INTEGER,
                data_type TEXT,
                field_name TEXT,
                field_value TEXT,
                confidence REAL,
                extraction_method TEXT,
                extracted_at TIMESTAMP,
                FOREIGN KEY (page_id) REFERENCES scraped_pages (id)
            )
        """)
        
        # スクレイピングセッションテーブル
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS scraping_sessions (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                session_name TEXT,
                start_time TIMESTAMP,
                end_time TIMESTAMP,
                total_urls INTEGER,
                successful_urls INTEGER,
                failed_urls INTEGER,
                configuration TEXT
            )
        """)
        
        # パフォーマンス最適化のためのインデックス
        cursor.execute("CREATE INDEX IF NOT EXISTS idx_url ON scraped_pages(url)")
        cursor.execute("CREATE INDEX IF NOT EXISTS idx_content_hash ON scraped_pages(content_hash)")
        cursor.execute("CREATE INDEX IF NOT EXISTS idx_scraped_at ON scraped_pages(scraped_at)")
        cursor.execute("CREATE INDEX IF NOT EXISTS idx_source_domain ON scraped_pages(source_domain)")
        cursor.execute("CREATE INDEX IF NOT EXISTS idx_data_type ON extracted_data(data_type)")
        
        conn.commit()
        conn.close()
        
        logger.info("Database initialized successfully")
    
    def save_scraped_data(self, scraped_results: List[Dict], session_name: str = None) -> int:
        """
        スクレイピング結果をデータベースに保存
        
        Args:
            scraped_results: スクレイピング結果のリスト
            session_name: セッション名(統計管理用)
            
        Returns:
            保存された件数
        """
        import sqlite3
        import hashlib
        from urllib.parse import urlparse
        
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        saved_count = 0
        
        for result in scraped_results:
            if result.get('status') != 'success':
                continue
            
            try:
                # コンテンツハッシュ生成(重複検出用)
                html_content = result.get('html_content', '')
                content_hash = hashlib.md5(html_content.encode('utf-8')).hexdigest()
                
                # ドメイン抽出
                url = result.get('url', '')
                domain = urlparse(url).netloc
                
                # 重複チェック
                cursor.execute(
                    "SELECT id FROM scraped_pages WHERE content_hash = ?",
                    (content_hash,)
                )
                existing = cursor.fetchone()
                
                if not existing:
                    # メタデータの準備
                    metadata = {
                        'js_data': result.get('js_data', {}),
                        'browser_info': result.get('browser_info', {}),
                        'extraction_timestamp': result.get('timestamp', datetime.now()).isoformat(),
                        'session_name': session_name
                    }
                    
                    # ページデータの挿入
                    cursor.execute("""
                        INSERT INTO scraped_pages 
                        (url, title, content_hash, scraped_at, status, source_domain, metadata, raw_html)
                        VALUES (?, ?, ?, ?, ?, ?, ?, ?)
                    """, (
                        url,
                        result.get('title'),
                        content_hash,
                        result.get('timestamp', datetime.now()),
                        result.get('status'),
                        domain,
                        json.dumps(metadata, ensure_ascii=False),
                        html_content
                    ))
                    
                    saved_count += 1
                    
                    # メタデータ更新
                    self.session_metadata['total_pages_processed'] += 1
                    self.session_metadata['data_sources'].add(domain)
                    
                    logger.debug(f"Saved new data for: {url}")
                else:
                    logger.debug(f"Duplicate content skipped: {url}")
                    
            except Exception as e:
                logger.error(f"Error saving data for {result.get('url', 'unknown')}: {e}")
        
        conn.commit()
        conn.close()
        
        logger.info(f"Saved {saved_count} new pages to database")
        return saved_count
    
    def save_extracted_data(self, page_url: str, extracted_data: Dict, extraction_method: str = "unknown"):
        """
        抽出されたデータを保存
        
        Args:
            page_url: ページURL
            extracted_data: 抽出されたデータ
            extraction_method: 抽出方法
        """
        import sqlite3
        
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        try:
            # ページIDを取得
            cursor.execute("SELECT id FROM scraped_pages WHERE url = ? ORDER BY scraped_at DESC LIMIT 1", (page_url,))
            page_result = cursor.fetchone()
            
            if not page_result:
                logger.warning(f"Page not found in database: {page_url}")
                return
            
            page_id = page_result[0]
            
            # 抽出データの保存
            data_count = 0
            for field_name, field_value in extracted_data.items():
                if field_value is not None:
                    # 信頼度情報の処理
                    confidence = 1.0
                    value = field_value
                    
                    if isinstance(field_value, dict) and 'value' in field_value:
                        # 信頼度付きデータの場合
                        confidence = field_value.get('confidence', 1.0)
                        value = field_value['value']
                        extraction_method = field_value.get('method', extraction_method)
                    
                    # リストの場合はJSONで保存
                    if isinstance(value, (list, dict)):
                        value = json.dumps(value, ensure_ascii=False)
                    
                    cursor.execute("""
                        INSERT INTO extracted_data 
                        (page_id, data_type, field_name, field_value, confidence, extraction_method, extracted_at)
                        VALUES (?, ?, ?, ?, ?, ?, ?)
                    """, (
                        page_id,
                        type(field_value).__name__,
                        field_name,
                        str(value),
                        confidence,
                        extraction_method,
                        datetime.now()
                    ))
                    
                    data_count += 1
            
            self.session_metadata['total_data_points'] += data_count
            conn.commit()
            
            logger.debug(f"Saved {data_count} extracted data points for {page_url}")
            
        except Exception as e:
            logger.error(f"Error saving extracted data: {e}")
        finally:
            conn.close()
    
    def export_to_formats(self, query: str = None, export_formats: List[str] = None) -> Dict[str, str]:
        """
        複数形式でのデータエクスポート
        
        Args:
            query: カスタムSQLクエリ
            export_formats: エクスポート形式のリスト
            
        Returns:
            エクスポートされたファイルのパス
        """
        import sqlite3
        
        if export_formats is None:
            export_formats = ['csv', 'json', 'excel']
        
        conn = sqlite3.connect(self.db_path)
        
        # データ取得
        if query:
            df = pd.read_sql_query(query, conn)
        else:
            # デフォルトクエリ(結合データ)
            default_query = """
                SELECT 
                    p.url,
                    p.title,
                    p.source_domain,
                    p.scraped_at,
                    e.field_name,
                    e.field_value,
                    e.confidence,
                    e.extraction_method
                FROM scraped_pages p
                LEFT JOIN extracted_data e ON p.id = e.page_id
                ORDER BY p.scraped_at DESC
            """
            df = pd.read_sql_query(default_query, conn)
        
        conn.close()
        
        if df.empty:
            return {"error": "No data to export"}
        
        timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
        export_files = {}
        
        # CSV形式
        if 'csv' in export_formats:
            csv_file = self.output_dir / f"export_{timestamp}.csv"
            df.to_csv(csv_file, index=False, encoding='utf-8-sig')
            export_files['csv'] = str(csv_file)
        
        # JSON形式
        if 'json' in export_formats:
            json_file = self.output_dir / f"export_{timestamp}.json"
            df.to_json(json_file, orient='records', ensure_ascii=False, indent=2, date_format='iso')
            export_files['json'] = str(json_file)
        
        # Excel形式(オプション)
        if 'excel' in export_formats:
            try:
                excel_file = self.output_dir / f"export_{timestamp}.xlsx"
                
                with pd.ExcelWriter(excel_file, engine='openpyxl') as writer:
                    # メインデータ
                    df.to_excel(writer, sheet_name='ScrapedData', index=False)
                    
                    # 統計情報
                    stats_df = self.get_statistics_dataframe()
                    stats_df.to_excel(writer, sheet_name='Statistics', index=False)
                
                export_files['excel'] = str(excel_file)
            except ImportError:
                logger.warning("openpyxl not installed, skipping Excel export")
        
        logger.info(f"Data exported to {len(export_files)} formats")
        return export_files
    
    def get_statistics_dataframe(self) -> pd.DataFrame:
        """
        統計情報をDataFrame形式で取得
        
        Returns:
            統計情報のDataFrame
        """
        import sqlite3
        
        conn = sqlite3.connect(self.db_path)
        
        stats_data = []
        
        # 基本統計
        cursor = conn.cursor()
        
        # 総ページ数
        cursor.execute("SELECT COUNT(*) FROM scraped_pages")
        total_pages = cursor.fetchone()[0]
        stats_data.append({"Metric": "Total Pages", "Value": total_pages})
        
        # 成功ページ数
        cursor.execute("SELECT COUNT(*) FROM scraped_pages WHERE status = 'success'")
        successful_pages = cursor.fetchone()[0]
        stats_data.append({"Metric": "Successful Pages", "Value": successful_pages})
        
        # ユニークドメイン数
        cursor.execute("SELECT COUNT(DISTINCT source_domain) FROM scraped_pages")
        unique_domains = cursor.fetchone()[0]
        stats_data.append({"Metric": "Unique Domains", "Value": unique_domains})
        
        # 抽出データポイント数
        cursor.execute("SELECT COUNT(*) FROM extracted_data")
        data_points = cursor.fetchone()[0]
        stats_data.append({"Metric": "Extracted Data Points", "Value": data_points})
        
        # 成功率
        if total_pages > 0:
            success_rate = (successful_pages / total_pages) * 100
            stats_data.append({"Metric": "Success Rate (%)", "Value": f"{success_rate:.1f}"})
        
        # 日別統計
        cursor.execute("""
            SELECT DATE(scraped_at) as date, COUNT(*) as count 
            FROM scraped_pages 
            GROUP BY DATE(scraped_at) 
            ORDER BY date DESC 
            LIMIT 7
        """)
        
        daily_stats = cursor.fetchall()
        for date, count in daily_stats:
            stats_data.append({"Metric": f"Pages on {date}", "Value": count})
        
        conn.close()
        
        return pd.DataFrame(stats_data)

# 使用例
async def demo_data_pipeline():
    """
    データパイプラインのデモ
    
    スクレイピング→保存→エクスポートの
    完全なワークフローを実行
    """
    
    # データパイプライン初期化
    pipeline = DataPipeline("demo_output")
    
    # サンプルスクレイピングの実行
    print("🔄 サンプルデータ収集中...")
    
    # 複数のサイトからデータを収集
    sample_urls = [
        "https://httpbin.org/json",
        "https://httpbin.org/html",
        "https://httpbin.org/xml"
    ]
    
    scraper = AsyncBatchScraper(max_concurrent=2)
    scraped_results = await scraper.scrape_urls_concurrently(sample_urls)
    
    # データベースに保存
    print("💾 データベースに保存中...")
    saved_count = pipeline.save_scraped_data(scraped_results, session_name="demo_session")
    
    # 抽出データの追加(デモ用)
    for result in scraped_results:
        if result.get('status') == 'success':
            # サンプル抽出データ
            extracted_data = {
                'content_type': 'web_page',
                'content_length': len(result.get('html_content', '')),
                'has_title': bool(result.get('title')),
                'extraction_timestamp': datetime.now().isoformat()
            }
            
            pipeline.save_extracted_data(
                result['url'], 
                extracted_data, 
                extraction_method="demo_extraction"
            )
    
    # エクスポート実行
    print("📤 データエクスポート中...")
    export_files = pipeline.export_to_formats(
        export_formats=['csv', 'json']
    )
    
    # 結果表示
    print(f"\n✅ 処理完了!")
    print(f"   保存ページ数: {saved_count}")
    
    print("\n📁 エクスポートファイル:")
    for format_type, file_path in export_files.items():
        print(f"   {format_type.upper()}: {file_path}")
    
    # 統計情報表示
    stats_df = pipeline.get_statistics_dataframe()
    print(f"\n📊 統計情報:")
    for _, row in stats_df.head(10).iterrows():
        print(f"   {row['Metric']}: {row['Value']}")
    
    return pipeline, export_files

# 実行例
# pipeline_result = await demo_data_pipeline()

⚖️ 法的・倫理的考慮事項とコンプライアンス

robots.txt対応とコンプライアンスチェッカー

class ScrapingComplianceChecker:
    """
    スクレイピングコンプライアンスチェッカー
    
    robots.txt解析、利用規約チェック、
    アクセス頻度制限などの法的・倫理的
    配慮を自動化
    """
    
    def __init__(self):
        self.robots_cache = {}      # robots.txtのキャッシュ
        self.compliance_cache = {}  # コンプライアンス結果のキャッシュ
        self.request_history = {}   # ドメイン別リクエスト履歴
    
    async def check_robots_txt(self, url: str, user_agent: str = "*") -> Dict:
        """
        robots.txtのチェック
        
        指定されたURLがrobots.txtで
        アクセス許可されているかを確認
        
        Args:
            url: チェック対象のURL
            user_agent: User-Agent名(* = 全て)
            
        Returns:
            robots.txt解析結果
        """
        from urllib.parse import urljoin, urlparse
        import aiohttp
        
        domain = f"{urlparse(url).scheme}://{urlparse(url).netloc}"
        robots_url = urljoin(domain, '/robots.txt')
        
        # キャッシュチェック
        cache_key = f"{robots_url}:{user_agent}"
        if cache_key in self.robots_cache:
            return self.robots_cache[cache_key]
        
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(robots_url, timeout=10) as response:
                    if response.status == 200:
                        robots_content = await response.text()
                        result = self._parse_robots_txt(robots_content, url, user_agent)
                    else:
                        # robots.txtが存在しない場合
                        result = {
                            'robots_exists': False,
                            'is_allowed': True,  # デフォルトで許可
                            'crawl_delay': None,
                            'robots_url': robots_url,
                            'status_code': response.status
                        }
        
        except Exception as e:
            # ネットワークエラー等の場合
            result = {
                'robots_exists': False,
                'is_allowed': True,  # エラー時はデフォルト許可
                'error': str(e),
                'robots_url': robots_url
            }
        
        # キャッシュに保存(1時間有効)
        result['cached_at'] = datetime.now()
        self.robots_cache[cache_key] = result
        
        return result
    
    def _parse_robots_txt(self, robots_content: str, target_url: str, user_agent: str) -> Dict:
        """
        robots.txtの内容を解析
        
        Args:
            robots_content: robots.txtの内容
            target_url: 対象URL
            user_agent: User-Agent
            
        Returns:
            解析結果
        """
        from urllib.parse import urlparse
        
        lines = robots_content.split('\n')
        rules = {
            'allowed': [],
            'disallowed': [],
            'crawl_delay': None,
            'sitemap': []
        }
        
        current_agents = []
        target_path = urlparse(target_url).path
        
        for line in lines:
            line = line.strip()
            
            # コメントを除去
            if '#' in line:
                line = line[:line.index('#')].strip()
            
            if not line:
                continue
            
            # User-agent指定
            if line.lower().startswith('user-agent:'):
                agent = line.split(':', 1)[1].strip()
                current_agents = [agent]
            
            # 対象User-Agentかチェック
            elif any(agent == '*' or agent.lower() == user_agent.lower() 
                    for agent in current_agents):
                
                if line.lower().startswith('disallow:'):
                    path = line.split(':', 1)[1].strip()
                    if path:
                        rules['disallowed'].append(path)
                
                elif line.lower().startswith('allow:'):
                    path = line.split(':', 1)[1].strip()
                    if path:
                        rules['allowed'].append(path)
                
                elif line.lower().startswith('crawl-delay:'):
                    try:
                        delay = float(line.split(':', 1)[1].strip())
                        rules['crawl_delay'] = delay
                    except ValueError:
                        pass
            
            # サイトマップ情報
            elif line.lower().startswith('sitemap:'):
                sitemap = line.split(':', 1)[1].strip()
                rules['sitemap'].append(sitemap)
        
        # URLパスの許可チェック
        is_allowed = self._check_path_allowed(target_path, rules)
        
        return {
            'robots_exists': True,
            'is_allowed': is_allowed,
            'crawl_delay': rules['crawl_delay'],
            'rules': rules,
            'target_path': target_path
        }
    
    def _check_path_allowed(self, path: str, rules: Dict) -> bool:
        """
        パスがrobots.txtで許可されているかチェック
        
        Args:
            path: チェック対象のパス
            rules: robots.txtルール
            
        Returns:
            許可されている場合True
        """
        # Disallowルールをチェック
        for disallowed in rules['disallowed']:
            if disallowed == '/':
                # 全てのパスが禁止
                return False
            elif path.startswith(disallowed):
                # このパスは禁止されている
                # ただし、Allowルールでオーバーライドされる可能性あり
                
                # より具体的なAllowルールがあるかチェック
                for allowed in rules['allowed']:
                    if path.startswith(allowed) and len(allowed) > len(disallowed):
                        return True
                
                return False
        
        # 明示的に許可されているかチェック
        for allowed in rules['allowed']:
            if path.startswith(allowed):
                return True
        
        # デフォルトは許可
        return True
    
    def generate_compliance_report(self, url: str, robots_result: Dict, 
                                 custom_checks: List[str] = None) -> Dict:
        """
        包括的なコンプライアンスレポート生成
        
        Args:
            url: 対象URL
            robots_result: robots.txtチェック結果
            custom_checks: 追加チェック項目
            
        Returns:
            コンプライアンスレポート
        """
        from urllib.parse import urlparse
        
        recommendations = []
        risk_level = "low"
        compliance_score = 100  # 100点満点
        
        # robots.txtチェック結果の評価
        if not robots_result.get('is_allowed', True):
            recommendations.append({
                'type': 'critical',
                'message': "⚠️ robots.txtでアクセスが禁止されています。スクレイピングを避けてください。",
                'action': "別のデータソースを検討するか、サイト運営者に許可を求めてください。"
            })
            risk_level = "high"
            compliance_score -= 50
        
        # クロール遅延の確認
        if robots_result.get('crawl_delay'):
            delay = robots_result['crawl_delay']
            if delay > 10:
                recommendations.append({
                    'type': 'warning',
                    'message': f"⏱️ robots.txtで{delay}秒の長いクロール間隔が指定されています。",
                    'action': f"最低{delay}秒の間隔を空けてアクセスしてください。"
                })
                risk_level = "medium" if risk_level == "low" else risk_level
                compliance_score -= 20
            else:
                recommendations.append({
                    'type': 'info',
                    'message': f"⏱️ robots.txtで{delay}秒のクロール間隔が指定されています。",
                    'action': f"{delay}秒の間隔を遵守してアクセスしてください。"
                })
                compliance_score -= 5
        
        # ドメイン別の一般的な推奨事項
        domain = urlparse(url).netloc
        
        # 大手サイトの特別な配慮
        high_traffic_domains = [
            'amazon.com', 'google.com', 'facebook.com', 'twitter.com', 
            'youtube.com', 'wikipedia.org', 'instagram.com'
        ]
        
        if any(htd in domain for htd in high_traffic_domains):
            recommendations.append({
                'type': 'warning',
                'message': f"🏢 {domain}は高トラフィックサイトです。",
                'action': "より長い間隔(5秒以上)でのアクセスを推奨します。"
            })
            compliance_score -= 10
        
        # 基本的な推奨事項
        basic_recommendations = [
            {
                'type': 'info',
                'message': "📋 対象サイトの利用規約を必ず確認してください。",
                'action': "利用規約ページを読み、スクレイピングの制限事項を確認してください。"
            },
            {
                'type': 'info',
                'message': "🤖 適切で識別可能なUser-Agentを設定してください。",
                'action': "連絡先情報を含むUser-Agentの使用を検討してください。"
            },
            {
                'type': 'info',
                'message': "⏳ サーバー負荷を考慮して適切な間隔を空けてアクセスしてください。",
                'action': "最低1秒、可能であれば2-3秒の間隔を推奨します。"
            },
            {
                'type': 'info',
                'message': "💾 個人情報や機密情報の取得は避けてください。",
                'action': "公開されている情報のみを対象とし、取得目的を明確にしてください。"
            }
        ]
        
        recommendations.extend(basic_recommendations)
        
        # カスタムチェック項目の追加
        if custom_checks:
            for check in custom_checks:
                recommendations.append({
                    'type': 'custom',
                    'message': f"🔍 カスタムチェック: {check}",
                    'action': "プロジェクト固有の要件を確認してください。"
                })
        
        return {
            'url': url,
            'domain': domain,
            'risk_level': risk_level,
            'compliance_score': max(compliance_score, 0),  # 最低0点
            'robots_compliant': robots_result.get('is_allowed', True),
            'suggested_delay': max(robots_result.get('crawl_delay', 1), 1),
            'recommendations': recommendations,
            'robots_details': robots_result,
            'timestamp': datetime.now(),
            'checked_by': 'ScrapingComplianceChecker v1.0'
        }
    
    async def batch_compliance_check(self, urls: List[str]) -> Dict[str, Dict]:
        """
        複数URLの一括コンプライアンスチェック
        
        Args:
            urls: チェック対象URLのリスト
            
        Returns:
            URL別のコンプライアンス結果
        """
        results = {}
        
        for url in urls:
            print(f"コンプライアンスチェック中: {url}")
            
            # robots.txtチェック
            robots_result = await self.check_robots_txt(url)
            
            # コンプライアンスレポート生成
            compliance_report = self.generate_compliance_report(url, robots_result)
            
            results[url] = compliance_report
            
            # チェック間隔
            await asyncio.sleep(0.5)
        
        return results

# 使用例
async def demo_compliance_check():
    """
    コンプライアンスチェックのデモ
    """
    
    checker = ScrapingComplianceChecker()
    
    # テスト対象URL
    test_urls = [
        "https://httpbin.org/html",
        "https://example.com",
        "https://github.com/robots.txt"  # robots.txtが存在するサイト
    ]
    
    # 一括コンプライアンスチェック
    print("🔍 コンプライアンスチェック開始\n")
    
    compliance_results = await checker.batch_compliance_check(test_urls)
    
    # 結果の表示
    for url, report in compliance_results.items():
        print(f"{'='*80}")
        print(f"📊 {url}")
        print(f"{'='*80}")
        print(f"リスクレベル: {report['risk_level'].upper()}")
        print(f"コンプライアンススコア: {report['compliance_score']}/100")
        print(f"robots.txt準拠: {'✅' if report['robots_compliant'] else '❌'}")
        print(f"推奨アクセス間隔: {report['suggested_delay']}秒")
        
        print(f"\n📝 推奨事項 ({len(report['recommendations'])}件):")
        for i, rec in enumerate(report['recommendations'][:5], 1):  # 最初の5件のみ表示
            icon = "🚨" if rec['type'] == 'critical' else "⚠️" if rec['type'] == 'warning' else "ℹ️"
            print(f"  {i}. {icon} {rec['message']}")
            print(f"     → {rec['action']}")
        
        print()  # 空行
    
    return compliance_results

# 実行例
# compliance_results = await demo_compliance_check()

🎯 実践プロジェクト:ニュース監視システム

最後に、これまで学んだ技術を統合した実用的なプロジェクトを構築します。

class NewsMonitoringSystem:
    """
    総合ニュース監視システム
    
    複数のニュースサイトから記事を自動収集し、
    分析・分類・アラート機能を提供
    """
    
    def __init__(self, config_file: str = "news_monitor_config.json"):
        self.config = self.load_config(config_file)
        self.pipeline = DataPipeline("news_monitoring")
        self.compliance_checker = ScrapingComplianceChecker()
        self.extractor = AdvancedDataExtractor()
        self.anti_detection_scraper = AntiDetectionScraper()
        
        # 監視状態
        self.monitoring_active = False
        self.last_check_time = {}
        
        # アラート機能
        self.alert_handlers = []
    
    def load_config(self, config_file: str) -> Dict:
        """設定ファイル読み込み"""
        try:
            with open(config_file, 'r', encoding='utf-8') as f:
                return json.load(f)
        except FileNotFoundError:
            # デフォルト設定
            default_config = {
                "news_sources": [
                    {
                        "name": "Tech News Site",
                        "base_url": "https://example-tech-news.com",
                        "rss_url": "https://example-tech-news.com/rss",
                        "selectors": {
                            "article_links": ".article-link",
                            "title": ".article-title",
                            "content": ".article-content",
                            "author": ".author",
                            "date": ".publish-date"
                        },
                        "enabled": True,
                        "check_interval_minutes": 30
                    }
                ],
                "monitoring": {
                    "max_articles_per_check": 20,
                    "content_min_length": 200,
                    "keywords": ["AI", "Python", "technology", "startup"],
                    "categories": ["technology", "business", "science"]
                },
                "alerts": {
                    "keyword_alerts": True,
                    "new_source_alerts": True,
                    "error_alerts": True,
                    "daily_summary": True
                },
                "compliance": {
                    "respect_robots_txt": True,
                    "min_delay_seconds": 2,
                    "max_requests_per_hour": 100
                }
            }
            
            with open(config_file, 'w', encoding='utf-8') as f:
                json.dump(default_config, f, indent=2, ensure_ascii=False)
            
            return default_config
    
    async def start_monitoring(self):
        """監視開始"""
        self.monitoring_active = True
        logger.info("ニュース監視システム開始")
        
        while self.monitoring_active:
            try:
                # 各ニュースソースをチェック
                for source in self.config['news_sources']:
                    if not source.get('enabled', True):
                        continue
                    
                    # チェック間隔の確認
                    if self._should_check_source(source):
                        await self._monitor_news_source(source)
                
                # 次のチェックまで待機
                await asyncio.sleep(60)  # 1分間隔でチェック
                
            except KeyboardInterrupt:
                logger.info("監視停止要求を受信")
                break
            except Exception as e:
                logger.error(f"監視ループでエラー: {e}")
                await asyncio.sleep(300)  # エラー時は5分待機
        
        self.monitoring_active = False
        logger.info("ニュース監視システム停止")
    
    def _should_check_source(self, source: Dict) -> bool:
        """ソースをチェックすべきかどうか判定"""
        source_name = source['name']
        check_interval = source.get('check_interval_minutes', 30) * 60  # 秒に変換
        
        if source_name not in self.last_check_time:
            return True
        
        elapsed = time.time() - self.last_check_time[source_name]
        return elapsed >= check_interval
    
    async def _monitor_news_source(self, source: Dict):
        """単一ニュースソースの監視"""
        source_name = source['name']
        logger.info(f"チェック開始: {source_name}")
        
        try:
            # コンプライアンスチェック
            if self.config['compliance']['respect_robots_txt']:
                robots_result = await self.compliance_checker.check_robots_txt(source['base_url'])
                
                if not robots_result.get('is_allowed', True):
                    logger.warning(f"{source_name}: robots.txtで禁止されています")
                    return
                
                # 推奨遅延の適用
                suggested_delay = robots_result.get('crawl_delay', 
                                                  self.config['compliance']['min_delay_seconds'])
                await asyncio.sleep(suggested_delay)
            
            # 記事収集
            articles = await self._collect_articles(source)
            
            if articles:
                # データベースに保存
                await self._save_articles(articles, source_name)
                
                # キーワードアラートのチェック
                await self._check_keyword_alerts(articles)
                
                logger.info(f"{source_name}: {len(articles)} 件の記事を処理")
            
            # 最終チェック時刻の更新
            self.last_check_time[source_name] = time.time()
            
        except Exception as e:
            logger.error(f"{source_name} の監視でエラー: {e}")
            
            # エラーアラート
            if self.config['alerts']['error_alerts']:
                await self._send_error_alert(source_name, str(e))
    
    async def _collect_articles(self, source: Dict) -> List[Dict]:
        """記事の収集"""
        articles = []
        
        # RSS がある場合は RSS を優先
        if source.get('rss_url'):
            articles.extend(await self._collect_from_rss(source))
        
        # HTMLスクレイピング
        if source.get('base_url') and source.get('selectors'):
            articles.extend(await self._collect_from_html(source))
        
        # 重複除去
        seen_urls = set()
        unique_articles = []
        
        for article in articles:
            if article['url'] not in seen_urls:
                seen_urls.add(article['url'])
                unique_articles.append(article)
        
        return unique_articles[:self.config['monitoring']['max_articles_per_check']]
    
    async def _collect_from_rss(self, source: Dict) -> List[Dict]:
        """RSS フィードからの記事収集"""
        try:
            import feedparser
            import aiohttp
            
            async with aiohttp.ClientSession() as session:
                async with session.get(source['rss_url']) as response:
                    rss_content = await response.text()
            
            feed = feedparser.parse(rss_content)
            articles = []
            
            for entry in feed.entries:
                article = {
                    'title': getattr(entry, 'title', ''),
                    'url': getattr(entry, 'link', ''),
                    'description': getattr(entry, 'summary', ''),
                    'author': getattr(entry, 'author', ''),
                    'published': getattr(entry, 'published', ''),
                    'source': source['name'],
                    'collection_method': 'rss',
                    'collected_at': datetime.now()
                }
                articles.append(article)
            
            return articles
            
        except ImportError:
            logger.warning("feedparser not installed, skipping RSS collection")
            return []
        except Exception as e:
            logger.error(f"RSS収集エラー: {e}")
            return []
    
    async def _collect_from_html(self, source: Dict) -> List[Dict]:
        """HTMLスクレイピングによる記事収集"""
        try:
            # ステルスモードでページを取得
            result = await self.anti_detection_scraper.stealth_scrape(
                source['base_url'], 
                extra_stealth=True
            )
            
            if result['status'] != 'success':
                return []
            
            # 記事リンクの抽出
            soup = BeautifulSoup(result['html_content'], 'html.parser')
            
            article_links = []
            link_selector = source['selectors'].get('article_links', 'a')
            
            for link_element in soup.select(link_selector):
                href = link_element.get('href')
                if href:
                    # 相対URLを絶対URLに変換
                    if href.startswith('/'):
                        href = source['base_url'].rstrip('/') + href
                    elif not href.startswith('http'):
                        href = source['base_url'].rstrip('/') + '/' + href
                    
                    article_links.append(href)
            
            # 各記事の詳細を取得
            articles = []
            for link in article_links[:10]:  # 最新10記事に制限
                article_detail = await self._scrape_article_detail(link, source)
                if article_detail:
                    articles.append(article_detail)
                
                # 記事間の適切な間隔
                await asyncio.sleep(2)
            
            return articles
            
        except Exception as e:
            logger.error(f"HTMLスクレイピングエラー: {e}")
            return []
    
    async def _scrape_article_detail(self, url: str, source: Dict) -> Optional[Dict]:
        """個別記事の詳細スクレイピング"""
        try:
            result = await self.anti_detection_scraper.stealth_scrape(url)
            
            if result['status'] != 'success':
                return None
            
            # 記事詳細の抽出
            soup = BeautifulSoup(result['html_content'], 'html.parser')
            
            title_elem = soup.select_one(source['selectors'].get('title', 'h1'))
            content_elem = soup.select_one(source['selectors'].get('content', 'article'))
            author_elem = soup.select_one(source['selectors'].get('author', '.author'))
            date_elem = soup.select_one(source['selectors'].get('date', '.date'))
            
            # テキスト抽出
            title = title_elem.get_text(strip=True) if title_elem else ''
            content = content_elem.get_text(strip=True) if content_elem else ''
            author = author_elem.get_text(strip=True) if author_elem else ''
            date_str = date_elem.get_text(strip=True) if date_elem else ''
            
            # 最小文字数チェック
            min_length = self.config['monitoring']['content_min_length']
            if len(content) < min_length:
                return None
            
            return {
                'title': title,
                'url': url,
                'content': content,
                'author': author,
                'published': date_str,
                'source': source['name'],
                'collection_method': 'html_scraping',
                'collected_at': datetime.now(),
                'content_length': len(content)
            }
            
        except Exception as e:
            logger.error(f"記事詳細取得エラー ({url}): {e}")
            return None
    
    async def _save_articles(self, articles: List[Dict], source_name: str):
        """記事をデータベースに保存"""
        # スクレイピング結果形式に変換
        scraped_results = []
        
        for article in articles:
            scraped_results.append({
                'url': article['url'],
                'title': article['title'],
                'html_content': article.get('content', ''),
                'timestamp': article['collected_at'],
                'status': 'success'
            })
        
        # データベースに保存
        saved_count = self.pipeline.save_scraped_data(
            scraped_results, 
            session_name=f"news_monitoring_{source_name}"
        )
        
        # 抽出データも保存
        for article in articles:
            extracted_data = {
                'article_title': article['title'],
                'author': article.get('author', ''),
                'published_date': article.get('published', ''),
                'content_length': article.get('content_length', 0),
                'source_name': source_name,
                'collection_method': article.get('collection_method', '')
            }
            
            self.pipeline.save_extracted_data(
                article['url'],
                extracted_data,
                'news_extraction'
            )
    
    async def _check_keyword_alerts(self, articles: List[Dict]):
        """キーワードアラートのチェック"""
        if not self.config['alerts']['keyword_alerts']:
            return
        
        keywords = self.config['monitoring']['keywords']
        matched_articles = []
        
        for article in articles:
            content_text = f"{article['title']} {article.get('content', '')}"
            
            for keyword in keywords:
                if keyword.lower() in content_text.lower():
                    matched_articles.append({
                        'article': article,
                        'matched_keyword': keyword
                    })
                    break
        
        if matched_articles:
            await self._send_keyword_alert(matched_articles)
    
    async def _send_keyword_alert(self, matched_articles: List[Dict]):
        """キーワードマッチアラートの送信"""
        logger.info(f"キーワードアラート: {len(matched_articles)} 件の記事がマッチ")
        
        # 実際の実装では、メール送信やSlack通知等を行う
        for match in matched_articles:
            article = match['article']
            keyword = match['matched_keyword']
            
            print(f"🚨 キーワードアラート: '{keyword}'")
            print(f"   記事: {article['title']}")
            print(f"   URL: {article['url']}")
            print(f"   ソース: {article['source']}")
    
    async def _send_error_alert(self, source_name: str, error_message: str):
        """エラーアラートの送信"""
        logger.warning(f"エラーアラート: {source_name} - {error_message}")
        
        # 実際の実装では、管理者への通知を行う
        print(f"⚠️ エラーアラート")
        print(f"   ソース: {source_name}")
        print(f"   エラー: {error_message}")
        print(f"   時刻: {datetime.now()}")
    
    async def generate_daily_summary(self) -> Dict:
        """日次サマリーの生成"""
        # 今日のデータを取得
        today = datetime.now().date()
        
        # データベースから統計情報を取得
        stats = self.pipeline.get_statistics_dataframe()
        
        summary = {
            'date': today.isoformat(),
            'total_articles_collected': 0,
            'sources_active': len([s for s in self.config['news_sources'] if s.get('enabled', True)]),
            'keyword_matches': 0,
            'errors': 0,
            'top_sources': [],
            'top_keywords': []
        }
        
        # より詳細な統計は実際のデータベースクエリで取得
        # ここでは簡略化
        
        return summary

# 使用例
async def demo_news_monitoring():
    """
    ニュース監視システムのデモ
    
    実際の運用を模擬したテスト実行
    """
    
    # システム初期化
    monitor = NewsMonitoringSystem()
    
    print("📰 ニュース監視システム デモ開始")
    print("=" * 60)
    
    # 設定情報表示
    print(f"監視対象ソース数: {len(monitor.config['news_sources'])}")
    print(f"キーワード: {', '.join(monitor.config['monitoring']['keywords'])}")
    print(f"コンプライアンス: robots.txt遵守={monitor.config['compliance']['respect_robots_txt']}")
    
    # 単発実行(デモ用)
    print(f"\n🔍 単発チェック実行中...")
    
    for source in monitor.config['news_sources']:
        if source.get('enabled', True):
            await monitor._monitor_news_source(source)
    
    # 日次サマリー生成
    print(f"\n📊 日次サマリー生成中...")
    summary = await monitor.generate_daily_summary()
    
    print(f"日次サマリー:")
    for key, value in summary.items():
        print(f"  {key}: {value}")
    
    # データエクスポート
    print(f"\n📤 データエクスポート実行中...")
    export_files = monitor.pipeline.export_to_formats(['json'])
    
    for format_type, file_path in export_files.items():
        print(f"  {format_type.upper()}: {file_path}")
    
    print(f"\n✅ デモ完了")
    
    return monitor, summary

# 実行例
# news_monitor_demo = await demo_news_monitoring()

# 実際の監視開始(バックグラウンド実行)
# monitor = NewsMonitoringSystem()
# await monitor.start_monitoring()  # Ctrl+Cで停止

🎉 シリーズ総まとめ

学習した技術の総合

全3部を通じて、以下の包括的なスクレイピング技術を習得しました:

第1部:基礎技術

  • Playwrightによる動的サイト対応
  • 非同期プログラミングの基本
  • データ抽出の基礎

第2部:高度な技術

  • 並行処理によるパフォーマンス向上
  • エラーハンドリングとリトライ機構
  • 構造化データ抽出

第3部:実践的応用

  • 検出回避技術とセキュリティ対策
  • データ永続化と管理システム
  • 法的・倫理的配慮の実装

プロダクション運用のポイント

  1. スケーラビリティ: 大量データへの対応
  2. 堅牢性: エラー処理とリカバリ機能
  3. コンプライアンス: 法的・倫理的配慮
  4. 監視: システムの健全性チェック
  5. 保守性: 設定管理とログ出力
  • クラウド展開: AWS/GCPでの運用
  • 機械学習統合: 収集データの自動分析
  • API化: スクレイピング結果のサービス化
  • リアルタイム処理: WebSocketとストリーミング

責任あるスクレイピングの実現

技術的な実装だけでなく、法的・倫理的配慮を組み込んだシステム設計が重要です。本シリーズで学んだ技術を使って、サーバー負荷を考慮し、利用規約を遵守した責任あるスクレイピングを実践しましょう。


免責事項: 本記事の内容は教育目的のものです。実際のスクレイピング実行時は、対象サイトの利用規約を遵守し、適切な許可を得てから実施してください。

関連記事

Streamlit入門 第3部:ポートフォリオ管理とアラート機能で完全な投資ツールを構築
Python

Streamlit入門 第3部:ポートフォリオ管理とアラート機能で完全な投資ツールを構築

これまでの記事で価格表示とチャート分析ができるようになりました。今回は最終回として、自分の保有している仮想通貨を管理するポートフォリオ機能と、価格変動を知らせるアラート機能を追加します。これで本格的な投資管理ツールの完成です! ポートフォリオ管理とは?なぜ重要なのか ...

Streamlit入門 第2部:美しいチャートと可視化で仮想通貨を分析しよう
Python

Streamlit入門 第2部:美しいチャートと可視化で仮想通貨を分析しよう

前回の記事で基本的な価格表示ができるようになりました。今回は、プロのトレーダーが使うような美しいチャートを追加して、本格的な分析ツールに仕上げていきます。難しそうに見えますが、実は数行のコードで驚くほど高機能なチャートが作れるんです! Plotlyとは?なぜグラフライブラリの中で最強なのか ...

Python Webスクレイピング 2025 第2部:非同期処理と高度なデータ抽出
Python

Python Webスクレイピング 2025 第2部:非同期処理と高度なデータ抽出

はじめに 第1部ではPlaywrightの基本的な使い方と環境構築について学びました。第2部では、並行処理によるパフォーマンス向上と高度なデータ抽出テクニックについて詳しく解説します。 大規模なスクレイピングプロジェクトでは、単一ページずつの処理では時間がかかりすぎるため、複数のページを同時に処理す...

コメント

0/2000