Webスクレイピング入門:株価・仮想通貨価格を取得してみよう

約89分で読めます by ぽんたぬき
Webスクレイピング入門:株価・仮想通貨価格を取得してみよう

Webスクレイピング入門:株価・仮想通貨価格を取得してみよう

投資やトレードをしていると、リアルタイムの価格情報が欲しくなりますよね。この記事では、Pythonを使って株価や仮想通貨価格を自動取得する方法を、初心者でもわかりやすく解説します。

Webスクレイピングとは?

基本概念

Webスクレイピングとは、Webサイトから自動的にデータを抽出する技術です。人間がブラウザで行うデータ収集作業を、プログラムが代わりに実行します。

スクレイピングの方法

  1. HTMLの直接解析: BeautifulSoupなどでHTMLを解析
  2. API利用: 公式APIが提供されている場合
  3. ブラウザ自動化: Seleniumでブラウザを操作

注意事項とマナー

# スクレイピングの基本マナー
scraping_etiquette = {
    "robots.txt": "必ず確認する",
    "アクセス頻度": "適切な間隔を空ける(1秒以上推奨)",
    "利用規約": "サイトの利用規約を遵守",
    "サーバー負荷": "過度なアクセスは避ける",
    "個人情報": "個人情報は取得しない",
    "商用利用": "利用目的を明確にする"
}

for key, value in scraping_etiquette.items():
    print(f"{key}: {value}")

環境構築

必要なライブラリのインストール

# 基本ライブラリ
pip install requests beautifulsoup4 lxml

# データ分析用
pip install pandas numpy matplotlib

# 仮想通貨API用
pip install ccxt

# より高度なスクレイピング用(必要に応じて)
pip install selenium webdriver-manager

基本的なインポート

import requests
from bs4 import BeautifulSoup
import pandas as pd
import time
import json
from datetime import datetime, timedelta
import matplotlib.pyplot as plt

# 警告を非表示
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

株価データの取得

1. Yahoo Finance から株価を取得

import requests
from bs4 import BeautifulSoup
import pandas as pd
import time

class StockPriceScraper:
    """株価データ取得クラス"""
    
    def __init__(self):
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        })
    
    def get_yahoo_finance_stock(self, symbol):
        """Yahoo Finance から株価を取得
        
        Args:
            symbol (str): 株式シンボル(例: "7203.T" for トヨタ)
        
        Returns:
            dict: 株価情報
        """
        try:
            url = f"https://finance.yahoo.com/quote/{symbol}"
            response = self.session.get(url)
            response.raise_for_status()
            
            soup = BeautifulSoup(response.content, 'html.parser')
            
            # 現在価格を取得
            price_element = soup.find('fin-streamer', {'data-field': 'regularMarketPrice'})
            current_price = price_element.text if price_element else "N/A"
            
            # 変動額を取得
            change_element = soup.find('fin-streamer', {'data-field': 'regularMarketChange'})
            change = change_element.text if change_element else "N/A"
            
            # 変動率を取得
            change_percent_element = soup.find('fin-streamer', {'data-field': 'regularMarketChangePercent'})
            change_percent = change_percent_element.text if change_percent_element else "N/A"
            
            # 企業名を取得
            title_element = soup.find('h1', {'data-reactid': '7'})
            company_name = title_element.text.split('(')[0].strip() if title_element else symbol
            
            return {
                'symbol': symbol,
                'company_name': company_name,
                'current_price': current_price,
                'change': change,
                'change_percent': change_percent,
                'timestamp': datetime.now(),
                'source': 'Yahoo Finance'
            }
            
        except Exception as e:
            print(f"エラー: {symbol} の取得に失敗 - {e}")
            return None
    
    def get_multiple_stocks(self, symbols, delay=1):
        """複数の株式データを取得
        
        Args:
            symbols (list): 株式シンボルのリスト
            delay (int): リクエスト間隔(秒)
        
        Returns:
            list: 株価データのリスト
        """
        stocks_data = []
        
        for symbol in symbols:
            print(f"取得中: {symbol}")
            stock_data = self.get_yahoo_finance_stock(symbol)
            
            if stock_data:
                stocks_data.append(stock_data)
                print(f"  ✓ {stock_data['company_name']}: {stock_data['current_price']}")
            else:
                print(f"  ✗ {symbol}: 取得失敗")
            
            time.sleep(delay)  # サーバー負荷軽減
        
        return stocks_data

# 使用例
def demo_stock_scraping():
    """株価スクレイピングのデモ"""
    
    scraper = StockPriceScraper()
    
    # 人気の日本株式シンボル
    japanese_stocks = [
        "7203.T",   # トヨタ自動車
        "9984.T",   # ソフトバンクグループ
        "6758.T",   # ソニーグループ
        "9432.T",   # NTT
        "8306.T"    # 三菱UFJフィナンシャル・グループ
    ]
    
    # 米国株式シンボル
    us_stocks = [
        "AAPL",     # Apple
        "GOOGL",    # Alphabet (Google)
        "MSFT",     # Microsoft
        "TSLA",     # Tesla
        "AMZN"      # Amazon
    ]
    
    print("=== 日本株価取得 ===")
    jp_data = scraper.get_multiple_stocks(japanese_stocks)
    
    print("\n=== 米国株価取得 ===")
    us_data = scraper.get_multiple_stocks(us_stocks)
    
    # データフレームに変換
    all_data = jp_data + us_data
    if all_data:
        df = pd.DataFrame(all_data)
        print("\n=== 取得結果 ===")
        print(df[['symbol', 'company_name', 'current_price', 'change_percent']])
        
        # CSVファイルに保存
        filename = f"stock_prices_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
        df.to_csv(filename, index=False, encoding='utf-8-sig')
        print(f"\nデータを保存しました: {filename}")
    
    return all_data

# デモ実行
stock_data = demo_stock_scraping()

2. Alpha Vantage API を使用した高精度な株価取得

import requests
import pandas as pd
from datetime import datetime

class AlphaVantageAPI:
    """Alpha Vantage API クライアント"""
    
    def __init__(self, api_key=None):
        self.api_key = api_key or "demo"  # 無料API Key要取得
        self.base_url = "https://www.alphavantage.co/query"
    
    def get_daily_stock_data(self, symbol, outputsize="compact"):
        """日次株価データを取得
        
        Args:
            symbol (str): 株式シンボル
            outputsize (str): "compact" (最新100日) or "full" (全期間)
        
        Returns:
            pd.DataFrame: 株価データ
        """
        params = {
            'function': 'TIME_SERIES_DAILY',
            'symbol': symbol,
            'outputsize': outputsize,
            'apikey': self.api_key
        }
        
        try:
            response = requests.get(self.base_url, params=params)
            data = response.json()
            
            if 'Time Series (Daily)' in data:
                time_series = data['Time Series (Daily)']
                
                # DataFrameに変換
                df = pd.DataFrame.from_dict(time_series, orient='index')
                df.index = pd.to_datetime(df.index)
                df = df.sort_index()
                
                # 列名を変更
                df.columns = ['Open', 'High', 'Low', 'Close', 'Volume']
                df = df.astype(float)
                
                # 追加情報
                df['Symbol'] = symbol
                
                return df
            else:
                print(f"データ取得失敗: {data.get('Error Message', 'Unknown error')}")
                return None
                
        except Exception as e:
            print(f"API リクエストエラー: {e}")
            return None
    
    def get_intraday_data(self, symbol, interval="5min"):
        """分足データを取得
        
        Args:
            symbol (str): 株式シンボル
            interval (str): "1min", "5min", "15min", "30min", "60min"
        
        Returns:
            pd.DataFrame: 分足データ
        """
        params = {
            'function': 'TIME_SERIES_INTRADAY',
            'symbol': symbol,
            'interval': interval,
            'apikey': self.api_key
        }
        
        try:
            response = requests.get(self.base_url, params=params)
            data = response.json()
            
            time_series_key = f'Time Series ({interval})'
            
            if time_series_key in data:
                time_series = data[time_series_key]
                
                df = pd.DataFrame.from_dict(time_series, orient='index')
                df.index = pd.to_datetime(df.index)
                df = df.sort_index()
                
                df.columns = ['Open', 'High', 'Low', 'Close', 'Volume']
                df = df.astype(float)
                df['Symbol'] = symbol
                
                return df
            else:
                print(f"分足データ取得失敗: {data}")
                return None
                
        except Exception as e:
            print(f"分足データ取得エラー: {e}")
            return None

# 使用例
def demo_alphavantage_api():
    """Alpha Vantage API のデモ"""
    
    api = AlphaVantageAPI()  # 本番では API Key を設定
    
    # 日次データ取得
    print("日次データ取得中...")
    daily_data = api.get_daily_stock_data("AAPL")
    
    if daily_data is not None:
        print("最新5日の株価:")
        print(daily_data.tail())
        
        # 簡単な分析
        latest_price = daily_data['Close'].iloc[-1]
        prev_price = daily_data['Close'].iloc[-2]
        change_percent = ((latest_price - prev_price) / prev_price) * 100
        
        print(f"\n現在価格: ${latest_price:.2f}")
        print(f"前日比: {change_percent:+.2f}%")
    
    return daily_data

# API Key が必要なため、デモはコメントアウト
# alphavantage_data = demo_alphavantage_api()

仮想通貨価格の取得

1. CoinGecko API を使用した仮想通貨価格取得

import requests
import pandas as pd
from datetime import datetime
import time

class CryptoPriceScraper:
    """仮想通貨価格取得クラス"""
    
    def __init__(self):
        self.base_url = "https://api.coingecko.com/api/v3"
        self.session = requests.Session()
    
    def get_current_prices(self, coin_ids, vs_currencies="jpy,usd"):
        """現在の仮想通貨価格を取得
        
        Args:
            coin_ids (list): コインIDのリスト
            vs_currencies (str): 比較通貨("jpy,usd"など)
        
        Returns:
            dict: 価格データ
        """
        try:
            ids_str = ",".join(coin_ids)
            url = f"{self.base_url}/simple/price"
            
            params = {
                'ids': ids_str,
                'vs_currencies': vs_currencies,
                'include_24hr_change': 'true',
                'include_24hr_vol': 'true',
                'include_last_updated_at': 'true'
            }
            
            response = self.session.get(url, params=params)
            response.raise_for_status()
            
            data = response.json()
            
            # データを整形
            formatted_data = []
            for coin_id, coin_data in data.items():
                for currency in vs_currencies.split(','):
                    price_key = currency
                    change_key = f"{currency}_24h_change"
                    vol_key = f"{currency}_24h_vol"
                    
                    if price_key in coin_data:
                        formatted_data.append({
                            'coin_id': coin_id,
                            'currency': currency.upper(),
                            'price': coin_data[price_key],
                            'change_24h': coin_data.get(change_key, 0),
                            'volume_24h': coin_data.get(vol_key, 0),
                            'last_updated': datetime.fromtimestamp(coin_data.get('last_updated_at', 0)),
                            'timestamp': datetime.now()
                        })
            
            return formatted_data
            
        except Exception as e:
            print(f"価格取得エラー: {e}")
            return []
    
    def get_market_data(self, coin_id):
        """詳細な市場データを取得
        
        Args:
            coin_id (str): コインID
        
        Returns:
            dict: 市場データ
        """
        try:
            url = f"{self.base_url}/coins/{coin_id}"
            
            params = {
                'localization': 'false',
                'tickers': 'false',
                'community_data': 'false',
                'developer_data': 'false'
            }
            
            response = self.session.get(url, params=params)
            response.raise_for_status()
            
            data = response.json()
            
            market_data = data.get('market_data', {})
            
            return {
                'coin_id': coin_id,
                'name': data.get('name'),
                'symbol': data.get('symbol', '').upper(),
                'current_price_jpy': market_data.get('current_price', {}).get('jpy'),
                'current_price_usd': market_data.get('current_price', {}).get('usd'),
                'market_cap_jpy': market_data.get('market_cap', {}).get('jpy'),
                'market_cap_usd': market_data.get('market_cap', {}).get('usd'),
                'market_cap_rank': market_data.get('market_cap_rank'),
                'total_volume_jpy': market_data.get('total_volume', {}).get('jpy'),
                'price_change_24h': market_data.get('price_change_24h'),
                'price_change_percentage_24h': market_data.get('price_change_percentage_24h'),
                'circulating_supply': market_data.get('circulating_supply'),
                'total_supply': market_data.get('total_supply'),
                'ath_jpy': market_data.get('ath', {}).get('jpy'),
                'ath_date': market_data.get('ath_date', {}).get('jpy'),
                'atl_jpy': market_data.get('atl', {}).get('jpy'),
                'atl_date': market_data.get('atl_date', {}).get('jpy'),
                'last_updated': market_data.get('last_updated')
            }
            
        except Exception as e:
            print(f"市場データ取得エラー: {e}")
            return None
    
    def get_historical_data(self, coin_id, days=30, vs_currency="jpy"):
        """過去の価格データを取得
        
        Args:
            coin_id (str): コインID
            days (int): 取得日数
            vs_currency (str): 比較通貨
        
        Returns:
            pd.DataFrame: 価格履歴データ
        """
        try:
            url = f"{self.base_url}/coins/{coin_id}/market_chart"
            
            params = {
                'vs_currency': vs_currency,
                'days': days,
                'interval': 'daily' if days > 90 else 'hourly'
            }
            
            response = self.session.get(url, params=params)
            response.raise_for_status()
            
            data = response.json()
            
            # データを DataFrame に変換
            prices = data.get('prices', [])
            volumes = data.get('total_volumes', [])
            market_caps = data.get('market_caps', [])
            
            df_data = []
            for i, (timestamp, price) in enumerate(prices):
                row = {
                    'timestamp': datetime.fromtimestamp(timestamp / 1000),
                    'price': price,
                    'volume': volumes[i][1] if i < len(volumes) else 0,
                    'market_cap': market_caps[i][1] if i < len(market_caps) else 0
                }
                df_data.append(row)
            
            df = pd.DataFrame(df_data)
            df.set_index('timestamp', inplace=True)
            
            return df
            
        except Exception as e:
            print(f"履歴データ取得エラー: {e}")
            return None

# 使用例
def demo_crypto_scraping():
    """仮想通貨価格取得のデモ"""
    
    scraper = CryptoPriceScraper()
    
    # 人気の仮想通貨
    popular_coins = [
        'bitcoin',
        'ethereum',
        'binancecoin',
        'cardano',
        'solana',
        'polkadot',
        'dogecoin',
        'shiba-inu'
    ]
    
    print("=== 仮想通貨価格取得 ===")
    
    # 現在価格を取得
    current_prices = scraper.get_current_prices(popular_coins)
    
    if current_prices:
        df_prices = pd.DataFrame(current_prices)
        
        # JPY価格のみ表示
        jpy_prices = df_prices[df_prices['currency'] == 'JPY']
        
        print("現在の価格(円):")
        for _, row in jpy_prices.iterrows():
            coin_name = row['coin_id'].replace('-', ' ').title()
            price = f"{row['price']:,.0f}" if row['price'] > 1 else f"{row['price']:.6f}"
            change = row['change_24h']
            change_symbol = "📈" if change > 0 else "📉" if change < 0 else "➡️"
            
            print(f"  {coin_name}: ¥{price} ({change:+.2f}%) {change_symbol}")
        
        # データを保存
        filename = f"crypto_prices_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
        df_prices.to_csv(filename, index=False, encoding='utf-8-sig')
        print(f"\nデータを保存しました: {filename}")
    
    # ビットコインの詳細データ
    print("\n=== ビットコイン詳細情報 ===")
    btc_market_data = scraper.get_market_data('bitcoin')
    
    if btc_market_data:
        print(f"名前: {btc_market_data['name']} ({btc_market_data['symbol']})")
        print(f"現在価格: ¥{btc_market_data['current_price_jpy']:,.0f}")
        print(f"時価総額ランキング: {btc_market_data['market_cap_rank']}位")
        print(f"24時間変動: {btc_market_data['price_change_percentage_24h']:+.2f}%")
        print(f"過去最高値: ¥{btc_market_data['ath_jpy']:,.0f}")
    
    # 過去30日の価格推移
    print("\n=== ビットコイン価格推移(過去30日)===")
    btc_history = scraper.get_historical_data('bitcoin', days=30)
    
    if btc_history is not None:
        print("最新5日の価格:")
        print(btc_history.tail())
        
        # 簡単な統計
        max_price = btc_history['price'].max()
        min_price = btc_history['price'].min()
        avg_price = btc_history['price'].mean()
        
        print(f"\n30日間の統計:")
        print(f"最高値: ¥{max_price:,.0f}")
        print(f"最安値: ¥{min_price:,.0f}")
        print(f"平均値: ¥{avg_price:,.0f}")
    
    return current_prices, btc_market_data, btc_history

# デモ実行
crypto_data = demo_crypto_scraping()

2. CCXT ライブラリを使用した取引所データ取得

import ccxt
import pandas as pd
from datetime import datetime
import time

class ExchangeDataCollector:
    """取引所データ収集クラス"""
    
    def __init__(self):
        self.exchanges = {
            'binance': ccxt.binance(),
            'coincheck': ccxt.coincheck(),
            'bitflyer': ccxt.bitflyer()
        }
    
    def get_exchange_prices(self, symbol='BTC/JPY'):
        """複数取引所の価格を比較
        
        Args:
            symbol (str): 取引ペア
        
        Returns:
            list: 取引所別価格データ
        """
        prices_data = []
        
        for exchange_name, exchange in self.exchanges.items():
            try:
                print(f"{exchange_name} から {symbol} の価格を取得中...")
                
                # Tickerデータを取得
                ticker = exchange.fetch_ticker(symbol)
                
                price_data = {
                    'exchange': exchange_name,
                    'symbol': symbol,
                    'last_price': ticker['last'],
                    'bid': ticker['bid'],
                    'ask': ticker['ask'],
                    'spread': ticker['ask'] - ticker['bid'] if ticker['ask'] and ticker['bid'] else 0,
                    'spread_percent': ((ticker['ask'] - ticker['bid']) / ticker['last']) * 100 if ticker['ask'] and ticker['bid'] and ticker['last'] else 0,
                    'volume': ticker['baseVolume'],
                    'high_24h': ticker['high'],
                    'low_24h': ticker['low'],
                    'change_24h': ticker['change'],
                    'change_percent_24h': ticker['percentage'],
                    'timestamp': datetime.fromtimestamp(ticker['timestamp'] / 1000) if ticker['timestamp'] else datetime.now()
                }
                
                prices_data.append(price_data)
                print(f"  ✓ {exchange_name}: ¥{price_data['last_price']:,.0f}")
                
                time.sleep(0.5)  # レート制限対策
                
            except Exception as e:
                print(f"  ✗ {exchange_name} エラー: {e}")
        
        return prices_data
    
    def get_orderbook(self, exchange_name, symbol='BTC/JPY', limit=10):
        """板情報を取得
        
        Args:
            exchange_name (str): 取引所名
            symbol (str): 取引ペア
            limit (int): 取得する板の深さ
        
        Returns:
            dict: 板情報
        """
        try:
            exchange = self.exchanges[exchange_name]
            orderbook = exchange.fetch_order_book(symbol, limit)
            
            return {
                'exchange': exchange_name,
                'symbol': symbol,
                'bids': orderbook['bids'][:limit],  # 買い注文
                'asks': orderbook['asks'][:limit],  # 売り注文
                'timestamp': datetime.fromtimestamp(orderbook['timestamp'] / 1000) if orderbook['timestamp'] else datetime.now()
            }
            
        except Exception as e:
            print(f"板情報取得エラー ({exchange_name}): {e}")
            return None
    
    def get_ohlcv_data(self, exchange_name, symbol='BTC/JPY', timeframe='1h', limit=100):
        """OHLCV(ローソク足)データを取得
        
        Args:
            exchange_name (str): 取引所名
            symbol (str): 取引ペア
            timeframe (str): 時間軸 ('1m', '5m', '1h', '1d' など)
            limit (int): 取得するデータ数
        
        Returns:
            pd.DataFrame: OHLCVデータ
        """
        try:
            exchange = self.exchanges[exchange_name]
            
            # OHLCVデータを取得
            ohlcv = exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
            
            # DataFrameに変換
            df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
            df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
            df.set_index('timestamp', inplace=True)
            
            # 追加情報
            df['exchange'] = exchange_name
            df['symbol'] = symbol
            df['timeframe'] = timeframe
            
            return df
            
        except Exception as e:
            print(f"OHLCVデータ取得エラー ({exchange_name}): {e}")
            return None

# 使用例
def demo_exchange_data():
    """取引所データ取得のデモ"""
    
    collector = ExchangeDataCollector()
    
    # 複数取引所の価格比較
    print("=== 取引所価格比較 ===")
    btc_prices = collector.get_exchange_prices('BTC/JPY')
    
    if btc_prices:
        df_prices = pd.DataFrame(btc_prices)
        
        print("\n価格比較:")
        for _, row in df_prices.iterrows():
            print(f"{row['exchange']:>10}: ¥{row['last_price']:>10,.0f} (スプレッド: {row['spread_percent']:.2f}%)")
        
        # 最安値・最高値
        min_price_row = df_prices.loc[df_prices['last_price'].idxmin()]
        max_price_row = df_prices.loc[df_prices['last_price'].idxmax()]
        
        print(f"\n最安値: {min_price_row['exchange']} - ¥{min_price_row['last_price']:,.0f}")
        print(f"最高値: {max_price_row['exchange']} - ¥{max_price_row['last_price']:,.0f}")
        
        price_diff = max_price_row['last_price'] - min_price_row['last_price']
        arbitrage_opportunity = (price_diff / min_price_row['last_price']) * 100
        
        print(f"価格差: ¥{price_diff:,.0f} ({arbitrage_opportunity:.2f}%)")
    
    # 板情報の取得
    print("\n=== 板情報 ===")
    orderbook = collector.get_orderbook('bitflyer', 'BTC/JPY', limit=5)
    
    if orderbook:
        print(f"{orderbook['exchange']} の板情報:")
        print("売り注文 (Ask):")
        for price, amount in orderbook['asks']:
            print(f"  ¥{price:>10,.0f} x {amount:>8.4f} BTC")
        
        print("買い注文 (Bid):")
        for price, amount in orderbook['bids']:
            print(f"  ¥{price:>10,.0f} x {amount:>8.4f} BTC")
    
    # OHLCVデータの取得
    print("\n=== ローソク足データ ===")
    ohlcv_data = collector.get_ohlcv_data('bitflyer', 'BTC/JPY', '1h', 24)
    
    if ohlcv_data is not None:
        print("過去24時間の時間足データ:")
        print(ohlcv_data.tail())
        
        # 簡単な分析
        highest = ohlcv_data['high'].max()
        lowest = ohlcv_data['low'].min()
        latest_close = ohlcv_data['close'].iloc[-1]
        
        print(f"\n24時間の統計:")
        print(f"最高値: ¥{highest:,.0f}")
        print(f"最安値: ¥{lowest:,.0f}")
        print(f"現在値: ¥{latest_close:,.0f}")
    
    return btc_prices, orderbook, ohlcv_data

# デモ実行
# 注意: 取引所のAPIアクセスが必要
# exchange_data = demo_exchange_data()

データの可視化と分析

1. 価格チャートの作成

import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
import pandas as pd

class PriceVisualizer:
    """価格データ可視化クラス"""
    
    def __init__(self):
        plt.style.use('seaborn-v0_8')
        plt.rcParams['font.family'] = 'DejaVu Sans'
        sns.set_palette("husl")
    
    def plot_price_comparison(self, price_data, title="価格比較"):
        """複数取引所の価格比較チャート
        
        Args:
            price_data (list): 価格データのリスト
            title (str): チャートタイトル
        """
        if not price_data:
            print("表示するデータがありません")
            return
        
        df = pd.DataFrame(price_data)
        
        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
        
        # 価格比較(棒グラフ)
        ax1.bar(df['exchange'], df['last_price'], color=sns.color_palette("viridis", len(df)))
        ax1.set_title(f'{title} - 現在価格')
        ax1.set_ylabel('価格 (JPY)')
        ax1.tick_params(axis='x', rotation=45)
        
        # 価格を上に表示
        for i, v in enumerate(df['last_price']):
            ax1.text(i, v + max(df['last_price']) * 0.01, f{v:,.0f}', 
                    ha='center', va='bottom', fontweight='bold')
        
        # スプレッド比較
        if 'spread_percent' in df.columns:
            ax2.bar(df['exchange'], df['spread_percent'], color=sns.color_palette("plasma", len(df)))
            ax2.set_title('スプレッド比較')
            ax2.set_ylabel('スプレッド (%)')
            ax2.tick_params(axis='x', rotation=45)
            
            # スプレッドを上に表示
            for i, v in enumerate(df['spread_percent']):
                ax2.text(i, v + max(df['spread_percent']) * 0.05, f'{v:.2f}%', 
                        ha='center', va='bottom', fontweight='bold')
        
        plt.tight_layout()
        plt.show()
    
    def plot_price_history(self, df, title="価格推移"):
        """価格履歴チャート
        
        Args:
            df (pd.DataFrame): 価格履歴データ
            title (str): チャートタイトル
        """
        if df is None or df.empty:
            print("表示するデータがありません")
            return
        
        fig, axes = plt.subplots(2, 2, figsize=(15, 10))
        fig.suptitle(title, fontsize=16)
        
        # 価格推移(折れ線グラフ)
        axes[0, 0].plot(df.index, df['price'], linewidth=2, color='blue')
        axes[0, 0].set_title('価格推移')
        axes[0, 0].set_ylabel('価格 (JPY)')
        axes[0, 0].grid(True, alpha=0.3)
        
        # 出来高
        axes[0, 1].bar(df.index, df['volume'], alpha=0.7, color='green')
        axes[0, 1].set_title('出来高')
        axes[0, 1].set_ylabel('出来高')
        
        # 価格分布(ヒストグラム)
        axes[1, 0].hist(df['price'], bins=30, alpha=0.7, color='orange')
        axes[1, 0].set_title('価格分布')
        axes[1, 0].set_xlabel('価格 (JPY)')
        axes[1, 0].set_ylabel('頻度')
        
        # 移動平均
        df['MA7'] = df['price'].rolling(window=7).mean()
        df['MA30'] = df['price'].rolling(window=30).mean()
        
        axes[1, 1].plot(df.index, df['price'], label='価格', alpha=0.7)
        axes[1, 1].plot(df.index, df['MA7'], label='7日移動平均', linewidth=2)
        axes[1, 1].plot(df.index, df['MA30'], label='30日移動平均', linewidth=2)
        axes[1, 1].set_title('移動平均線')
        axes[1, 1].set_ylabel('価格 (JPY)')
        axes[1, 1].legend()
        axes[1, 1].grid(True, alpha=0.3)
        
        plt.tight_layout()
        plt.show()
    
    def plot_candlestick(self, df, title="ローソク足チャート"):
        """ローソク足チャート(簡易版)
        
        Args:
            df (pd.DataFrame): OHLCVデータ
            title (str): チャートタイトル
        """
        if df is None or df.empty:
            print("表示するデータがありません")
            return
        
        fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(15, 10), height_ratios=[3, 1])
        
        # ローソク足(簡易版)
        for i, (timestamp, row) in enumerate(df.iterrows()):
            color = 'red' if row['close'] >= row['open'] else 'blue'
            
            # 高値・安値の線
            ax1.plot([i, i], [row['low'], row['high']], color='black', linewidth=1)
            
            # 始値・終値の箱
            box_height = abs(row['close'] - row['open'])
            box_bottom = min(row['open'], row['close'])
            
            ax1.bar(i, box_height, bottom=box_bottom, width=0.6, 
                   color=color, alpha=0.7, edgecolor='black')
        
        ax1.set_title(title)
        ax1.set_ylabel('価格 (JPY)')
        ax1.grid(True, alpha=0.3)
        
        # 出来高
        ax2.bar(range(len(df)), df['volume'], alpha=0.7, color='green')
        ax2.set_title('出来高')
        ax2.set_ylabel('出来高')
        ax2.set_xlabel('時間')
        
        plt.tight_layout()
        plt.show()

# 使用例とデモ
def demo_visualization():
    """データ可視化のデモ"""
    
    visualizer = PriceVisualizer()
    
    # サンプルデータの作成
    sample_exchange_data = [
        {'exchange': 'bitFlyer', 'last_price': 5000000, 'spread_percent': 0.05},
        {'exchange': 'Coincheck', 'last_price': 5005000, 'spread_percent': 0.08},
        {'exchange': 'Binance', 'last_price': 4998000, 'spread_percent': 0.03}
    ]
    
    # 価格比較チャート
    print("価格比較チャートを表示...")
    visualizer.plot_price_comparison(sample_exchange_data, "BTC/JPY 取引所比較")
    
    # 価格履歴のサンプルデータ
    dates = pd.date_range(start='2024-01-01', end='2024-01-30', freq='D')
    np.random.seed(42)
    
    sample_history = pd.DataFrame({
        'price': 5000000 + np.random.randn(30).cumsum() * 50000,
        'volume': np.random.randint(100, 1000, 30),
        'market_cap': np.random.randint(1000000, 10000000, 30)
    }, index=dates)
    
    # 価格履歴チャート
    print("価格履歴チャートを表示...")
    visualizer.plot_price_history(sample_history, "BTC価格履歴(過去30日)")

# デモ実行
# 注意: matplotlib, seaborn のインストールが必要
# demo_visualization()

自動監視システムの構築

1. 価格アラートシステム

import time
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from datetime import datetime
import json
import threading

class PriceAlertSystem:
    """価格アラートシステム"""
    
    def __init__(self, config_file="alert_config.json"):
        self.config_file = config_file
        self.config = self.load_config()
        self.running = False
        self.price_history = {}
    
    def load_config(self):
        """設定ファイルを読み込み"""
        try:
            with open(self.config_file, 'r', encoding='utf-8') as f:
                return json.load(f)
        except FileNotFoundError:
            # デフォルト設定
            default_config = {
                "alerts": [
                    {
                        "symbol": "BTC/JPY",
                        "exchange": "bitflyer",
                        "upper_threshold": 6000000,
                        "lower_threshold": 4000000,
                        "enabled": True
                    }
                ],
                "email": {
                    "enabled": False,
                    "smtp_server": "smtp.gmail.com",
                    "smtp_port": 587,
                    "username": "",
                    "password": "",
                    "to_email": ""
                },
                "check_interval": 60
            }
            self.save_config(default_config)
            return default_config
    
    def save_config(self, config):
        """設定ファイルを保存"""
        with open(self.config_file, 'w', encoding='utf-8') as f:
            json.dump(config, f, indent=2, ensure_ascii=False)
    
    def add_alert(self, symbol, exchange, upper_threshold=None, lower_threshold=None):
        """アラートを追加"""
        alert = {
            "symbol": symbol,
            "exchange": exchange,
            "upper_threshold": upper_threshold,
            "lower_threshold": lower_threshold,
            "enabled": True
        }
        
        self.config["alerts"].append(alert)
        self.save_config(self.config)
        print(f"アラートを追加: {symbol} @ {exchange}")
    
    def send_email_alert(self, subject, message):
        """メールアラートを送信"""
        if not self.config["email"]["enabled"]:
            return False
        
        try:
            smtp_server = self.config["email"]["smtp_server"]
            smtp_port = self.config["email"]["smtp_port"]
            username = self.config["email"]["username"]
            password = self.config["email"]["password"]
            to_email = self.config["email"]["to_email"]
            
            msg = MIMEMultipart()
            msg['From'] = username
            msg['To'] = to_email
            msg['Subject'] = subject
            
            msg.attach(MIMEText(message, 'plain', 'utf-8'))
            
            server = smtplib.SMTP(smtp_server, smtp_port)
            server.starttls()
            server.login(username, password)
            text = msg.as_string()
            server.sendmail(username, to_email, text)
            server.quit()
            
            return True
            
        except Exception as e:
            print(f"メール送信エラー: {e}")
            return False
    
    def check_alerts(self):
        """アラートをチェック"""
        from .crypto_scraper import ExchangeDataCollector  # 前で定義したクラス
        
        collector = ExchangeDataCollector()
        
        for alert in self.config["alerts"]:
            if not alert["enabled"]:
                continue
            
            try:
                # 価格を取得
                prices = collector.get_exchange_prices(alert["symbol"])
                
                exchange_price = None
                for price_data in prices:
                    if price_data["exchange"] == alert["exchange"]:
                        exchange_price = price_data["last_price"]
                        break
                
                if exchange_price is None:
                    continue
                
                # 価格履歴に記録
                key = f"{alert['exchange']}_{alert['symbol']}"
                if key not in self.price_history:
                    self.price_history[key] = []
                
                self.price_history[key].append({
                    'timestamp': datetime.now(),
                    'price': exchange_price
                })
                
                # 古いデータを削除(24時間以上前)
                cutoff_time = datetime.now() - timedelta(hours=24)
                self.price_history[key] = [
                    entry for entry in self.price_history[key] 
                    if entry['timestamp'] > cutoff_time
                ]
                
                # アラート条件をチェック
                alert_triggered = False
                alert_message = ""
                
                if alert["upper_threshold"] and exchange_price > alert["upper_threshold"]:
                    alert_triggered = True
                    alert_message = f"{alert['symbol']} が上限価格を突破しました!\n" \
                                  f"現在価格: ¥{exchange_price:,.0f}\n" \
                                  f"上限設定: ¥{alert['upper_threshold']:,.0f}\n" \
                                  f"取引所: {alert['exchange']}"
                
                elif alert["lower_threshold"] and exchange_price < alert["lower_threshold"]:
                    alert_triggered = True
                    alert_message = f"{alert['symbol']} が下限価格を下回りました!\n" \
                                  f"現在価格: ¥{exchange_price:,.0f}\n" \
                                  f"下限設定: ¥{alert['lower_threshold']:,.0f}\n" \
                                  f"取引所: {alert['exchange']}"
                
                if alert_triggered:
                    print(f"🚨 アラート発生: {alert_message}")
                    
                    # メールアラート送信
                    subject = f"価格アラート: {alert['symbol']}"
                    self.send_email_alert(subject, alert_message)
                
            except Exception as e:
                print(f"アラートチェックエラー: {e}")
    
    def start_monitoring(self):
        """監視を開始"""
        self.running = True
        print("価格監視を開始しました...")
        print(f"チェック間隔: {self.config['check_interval']} 秒")
        
        while self.running:
            try:
                self.check_alerts()
                time.sleep(self.config["check_interval"])
            except KeyboardInterrupt:
                break
            except Exception as e:
                print(f"監視エラー: {e}")
                time.sleep(5)  # エラー時は5秒待機
    
    def stop_monitoring(self):
        """監視を停止"""
        self.running = False
        print("価格監視を停止しました")

# 使用例
def demo_alert_system():
    """アラートシステムのデモ"""
    
    alert_system = PriceAlertSystem()
    
    # アラートを追加
    alert_system.add_alert("BTC/JPY", "bitflyer", 
                          upper_threshold=6000000, 
                          lower_threshold=4000000)
    
    print("アラートシステムのデモ(10回チェック)")
    
    # 10回だけチェック
    for i in range(10):
        print(f"チェック {i+1}/10")
        alert_system.check_alerts()
        time.sleep(5)
    
    print("デモ完了")

# デモ実行
# demo_alert_system()

スクレイピングのベストプラクティス

1. エラーハンドリングとリトライ機能

import requests
from time import sleep
import random
from functools import wraps

def retry_on_failure(max_retries=3, delay=1, backoff=2):
    """リトライデコレータ"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            retries = 0
            while retries < max_retries:
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    retries += 1
                    if retries == max_retries:
                        print(f"最大リトライ回数に達しました: {e}")
                        raise
                    
                    wait_time = delay * (backoff ** (retries - 1))
                    jitter = random.uniform(0.1, 0.5)  # ランダムな遅延を追加
                    sleep(wait_time + jitter)
                    print(f"リトライ {retries}/{max_retries} (待機: {wait_time:.1f}秒)")
            
            return None
        return wrapper
    return decorator

class RobustScraper:
    """堅牢なスクレイピングクラス"""
    
    def __init__(self):
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        })
    
    @retry_on_failure(max_retries=3, delay=2, backoff=2)
    def safe_request(self, url, **kwargs):
        """安全なHTTPリクエスト"""
        response = self.session.get(url, timeout=10, **kwargs)
        response.raise_for_status()
        return response
    
    def scrape_with_rate_limit(self, urls, delay_range=(1, 3)):
        """レート制限付きスクレイピング"""
        results = []
        
        for i, url in enumerate(urls):
            try:
                print(f"処理中 ({i+1}/{len(urls)}): {url}")
                
                response = self.safe_request(url)
                results.append({
                    'url': url,
                    'status': 'success',
                    'content': response.text,
                    'timestamp': datetime.now()
                })
                
                # ランダムな遅延
                if i < len(urls) - 1:  # 最後以外
                    delay = random.uniform(*delay_range)
                    print(f"  待機中: {delay:.1f}秒")
                    sleep(delay)
                
            except Exception as e:
                results.append({
                    'url': url,
                    'status': 'error',
                    'error': str(e),
                    'timestamp': datetime.now()
                })
                print(f"  エラー: {e}")
        
        return results

2. データの永続化と管理

import sqlite3
import json
from datetime import datetime

class PriceDataManager:
    """価格データ管理クラス"""
    
    def __init__(self, db_file="price_data.db"):
        self.db_file = db_file
        self.init_database()
    
    def init_database(self):
        """データベースを初期化"""
        conn = sqlite3.connect(self.db_file)
        cursor = conn.cursor()
        
        # 価格データテーブル
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS price_data (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                symbol TEXT NOT NULL,
                exchange TEXT NOT NULL,
                price REAL NOT NULL,
                volume REAL,
                timestamp DATETIME NOT NULL,
                source TEXT,
                raw_data TEXT,
                created_at DATETIME DEFAULT CURRENT_TIMESTAMP
            )
        ''')
        
        # インデックス作成
        cursor.execute('''
            CREATE INDEX IF NOT EXISTS idx_symbol_exchange_timestamp 
            ON price_data(symbol, exchange, timestamp)
        ''')
        
        conn.commit()
        conn.close()
    
    def save_price_data(self, data_list):
        """価格データを保存"""
        conn = sqlite3.connect(self.db_file)
        cursor = conn.cursor()
        
        for data in data_list:
            cursor.execute('''
                INSERT INTO price_data 
                (symbol, exchange, price, volume, timestamp, source, raw_data)
                VALUES (?, ?, ?, ?, ?, ?, ?)
            ''', (
                data.get('symbol'),
                data.get('exchange'),
                data.get('price'),
                data.get('volume'),
                data.get('timestamp'),
                data.get('source'),
                json.dumps(data, ensure_ascii=False, default=str)
            ))
        
        conn.commit()
        conn.close()
        print(f"{len(data_list)} 件のデータを保存しました")
    
    def get_price_history(self, symbol, exchange=None, hours=24):
        """価格履歴を取得"""
        conn = sqlite3.connect(self.db_file)
        
        query = '''
            SELECT * FROM price_data 
            WHERE symbol = ? 
            AND timestamp >= datetime('now', '-{} hours')
        '''.format(hours)
        
        params = [symbol]
        
        if exchange:
            query += ' AND exchange = ?'
            params.append(exchange)
        
        query += ' ORDER BY timestamp'
        
        df = pd.read_sql_query(query, conn, params=params)
        conn.close()
        
        return df
    
    def get_latest_prices(self, symbol):
        """最新価格を取得"""
        conn = sqlite3.connect(self.db_file)
        
        query = '''
            SELECT * FROM price_data 
            WHERE symbol = ? 
            AND timestamp = (
                SELECT MAX(timestamp) FROM price_data 
                WHERE symbol = ? AND exchange = price_data.exchange
            )
        '''
        
        df = pd.read_sql_query(query, conn, params=[symbol, symbol])
        conn.close()
        
        return df

# 使用例
def demo_data_management():
    """データ管理のデモ"""
    
    manager = PriceDataManager()
    
    # サンプルデータ
    sample_data = [
        {
            'symbol': 'BTC/JPY',
            'exchange': 'bitflyer',
            'price': 5000000,
            'volume': 100,
            'timestamp': datetime.now(),
            'source': 'demo'
        }
    ]
    
    # データ保存
    manager.save_price_data(sample_data)
    
    # 履歴取得
    history = manager.get_price_history('BTC/JPY')
    print(f"履歴データ: {len(history)} 件")
    
    # 最新価格取得
    latest = manager.get_latest_prices('BTC/JPY')
    print(f"最新価格: {len(latest)} 件")

# demo_data_management()

まとめと次のステップ

この記事で学んだこと

  1. Webスクレイピングの基本: requests、BeautifulSoupの使用方法
  2. 株価データ取得: Yahoo Finance、Alpha Vantage APIの活用
  3. 仮想通貨価格取得: CoinGecko API、CCXTライブラリの活用
  4. データ可視化: matplotlib、seabornによるチャート作成
  5. 自動監視システム: 価格アラート、メール通知の実装
  6. 堅牢な設計: エラーハンドリング、リトライ機能、データ永続化

スクレイピングの注意点

  • 法的・倫理的配慮: robots.txt、利用規約の遵守
  • サーバー負荷: 適切なアクセス間隔の維持
  • データ品質: エラーハンドリングとデータ検証
  • セキュリティ: APIキーの安全な管理
  1. 機械学習との組み合わせ: 価格予測モデルの構築
  2. リアルタイム処理: WebSocketを使った即時データ取得
  3. クラウド化: AWS、GCPでの自動実行環境構築
  4. 高度な分析: テクニカル分析指標の実装

Webスクレイピングは強力なデータ収集手法ですが、責任を持って適切に活用することが重要です。まずは小規模なプロジェクトから始めて、徐々にスキルを向上させていきましょう。


関連リンク

関連記事

VSCode設定ガイド:プロレベルの開発効率を実現するカスタマイズ完全版
データ分析

VSCode設定ガイド:プロレベルの開発効率を実現するカスタマイズ完全版

VSCode設定ガイド:プロレベルの開発効率を実現するカスタマイズ完全版 Visual Studio Code(VSCode)は、軽量でありながら強力な機能を持つ無料のコードエディタです。適切な設定とカスタマイズにより、プロフェッショナルな開発環境を構築できます。この記事では、初心者から上級者まで役...

CCXTを使って仮想通貨のトレードをしてみる(第4回)
データ分析

CCXTを使って仮想通貨のトレードをしてみる(第4回)

はじめに 注意: 仮想通貨取引には大きなリスクが伴います。必ず余剰資金で行い、税務・法務についても最新の情報を確認し、必要に応じて専門家の助言を受けてください。 第3回/useccxtpython3では、CCXTを使った仮想通貨の自動取引ボットのリスク管理とバックテストについて解説しました。今回は仮...

CCXTを使って仮想通貨のトレードをしてみる(第3回)
データ分析

CCXTを使って仮想通貨のトレードをしてみる(第3回)

はじめに 第2回/useccxtpython2では、CCXTを使った基本的な取引の流れと、基本的な取引戦略について解説しました。今回の第3回ではその続きとして、トレーディングボット開発において非常に重要なリスク管理とバックテストについて詳しく解説します。これらの要素は、長期的な成功を収めるために欠か...

コメント

0/2000