Webスクレイピング入門:株価・仮想通貨価格を取得してみよう

Webスクレイピング入門:株価・仮想通貨価格を取得してみよう

投資やトレードをしていると、リアルタイムの価格情報が欲しくなりますよね。この記事では、Pythonを使って株価や仮想通貨価格を自動取得する方法を、初心者でもわかりやすく解説します。

Webスクレイピングとは?

基本概念

Webスクレイピングとは、Webサイトから自動的にデータを抽出する技術です。人間がブラウザで行うデータ収集作業を、プログラムが代わりに実行します。

スクレイピングの方法

  1. HTMLの直接解析: BeautifulSoupなどでHTMLを解析
  2. API利用: 公式APIが提供されている場合
  3. ブラウザ自動化: Seleniumでブラウザを操作

注意事項とマナー

# スクレイピングの基本マナー
scraping_etiquette = {
    "robots.txt": "必ず確認する",
    "アクセス頻度": "適切な間隔を空ける(1秒以上推奨)",
    "利用規約": "サイトの利用規約を遵守",
    "サーバー負荷": "過度なアクセスは避ける",
    "個人情報": "個人情報は取得しない",
    "商用利用": "利用目的を明確にする"
}

for key, value in scraping_etiquette.items():
    print(f"{key}: {value}")

環境構築

必要なライブラリのインストール

# 基本ライブラリ
pip install requests beautifulsoup4 lxml

# データ分析用
pip install pandas numpy matplotlib

# 仮想通貨API用
pip install ccxt

# より高度なスクレイピング用(必要に応じて)
pip install selenium webdriver-manager

基本的なインポート

import requests
from bs4 import BeautifulSoup
import pandas as pd
import time
import json
from datetime import datetime, timedelta
import matplotlib.pyplot as plt

# 警告を非表示
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

株価データの取得

1. Yahoo Finance から株価を取得

import requests
from bs4 import BeautifulSoup
import pandas as pd
import time

class StockPriceScraper:
    """株価データ取得クラス"""

    def __init__(self):
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        })

    def get_yahoo_finance_stock(self, symbol):
        """Yahoo Finance から株価を取得

        Args:
            symbol (str): 株式シンボル(例: "7203.T" for トヨタ)

        Returns:
            dict: 株価情報
        """
        try:
            url = f"https://finance.yahoo.com/quote/{symbol}"
            response = self.session.get(url)
            response.raise_for_status()

            soup = BeautifulSoup(response.content, 'html.parser')

            # 現在価格を取得
            price_element = soup.find('fin-streamer', {'data-field': 'regularMarketPrice'})
            current_price = price_element.text if price_element else "N/A"

            # 変動額を取得
            change_element = soup.find('fin-streamer', {'data-field': 'regularMarketChange'})
            change = change_element.text if change_element else "N/A"

            # 変動率を取得
            change_percent_element = soup.find('fin-streamer', {'data-field': 'regularMarketChangePercent'})
            change_percent = change_percent_element.text if change_percent_element else "N/A"

            # 企業名を取得
            title_element = soup.find('h1', {'data-reactid': '7'})
            company_name = title_element.text.split('(')[0].strip() if title_element else symbol

            return {
                'symbol': symbol,
                'company_name': company_name,
                'current_price': current_price,
                'change': change,
                'change_percent': change_percent,
                'timestamp': datetime.now(),
                'source': 'Yahoo Finance'
            }

        except Exception as e:
            print(f"エラー: {symbol} の取得に失敗 - {e}")
            return None

    def get_multiple_stocks(self, symbols, delay=1):
        """複数の株式データを取得

        Args:
            symbols (list): 株式シンボルのリスト
            delay (int): リクエスト間隔(秒)

        Returns:
            list: 株価データのリスト
        """
        stocks_data = []

        for symbol in symbols:
            print(f"取得中: {symbol}")
            stock_data = self.get_yahoo_finance_stock(symbol)

            if stock_data:
                stocks_data.append(stock_data)
                print(f"  ✓ {stock_data['company_name']}: {stock_data['current_price']}")
            else:
                print(f"  ✗ {symbol}: 取得失敗")

            time.sleep(delay)  # サーバー負荷軽減

        return stocks_data

# 使用例
def demo_stock_scraping():
    """株価スクレイピングのデモ"""

    scraper = StockPriceScraper()

    # 人気の日本株式シンボル
    japanese_stocks = [
        "7203.T",   # トヨタ自動車
        "9984.T",   # ソフトバンクグループ
        "6758.T",   # ソニーグループ
        "9432.T",   # NTT
        "8306.T"    # 三菱UFJフィナンシャル・グループ
    ]

    # 米国株式シンボル
    us_stocks = [
        "AAPL",     # Apple
        "GOOGL",    # Alphabet (Google)
        "MSFT",     # Microsoft
        "TSLA",     # Tesla
        "AMZN"      # Amazon
    ]

    print("=== 日本株価取得 ===")
    jp_data = scraper.get_multiple_stocks(japanese_stocks)

    print("n=== 米国株価取得 ===")
    us_data = scraper.get_multiple_stocks(us_stocks)

    # データフレームに変換
    all_data = jp_data + us_data
    if all_data:
        df = pd.DataFrame(all_data)
        print("n=== 取得結果 ===")
        print(df[['symbol', 'company_name', 'current_price', 'change_percent']])

        # CSVファイルに保存
        filename = f"stock_prices_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
        df.to_csv(filename, index=False, encoding='utf-8-sig')
        print(f"nデータを保存しました: {filename}")

    return all_data

# デモ実行
stock_data = demo_stock_scraping()

2. Alpha Vantage API を使用した高精度な株価取得

import requests
import pandas as pd
from datetime import datetime

class AlphaVantageAPI:
    """Alpha Vantage API クライアント"""

    def __init__(self, api_key=None):
        self.api_key = api_key or "demo"  # 無料API Key要取得
        self.base_url = "https://www.alphavantage.co/query"

    def get_daily_stock_data(self, symbol, outputsize="compact"):
        """日次株価データを取得

        Args:
            symbol (str): 株式シンボル
            outputsize (str): "compact" (最新100日) or "full" (全期間)

        Returns:
            pd.DataFrame: 株価データ
        """
        params = {
            'function': 'TIME_SERIES_DAILY',
            'symbol': symbol,
            'outputsize': outputsize,
            'apikey': self.api_key
        }

        try:
            response = requests.get(self.base_url, params=params)
            data = response.json()

            if 'Time Series (Daily)' in data:
                time_series = data['Time Series (Daily)']

                # DataFrameに変換
                df = pd.DataFrame.from_dict(time_series, orient='index')
                df.index = pd.to_datetime(df.index)
                df = df.sort_index()

                # 列名を変更
                df.columns = ['Open', 'High', 'Low', 'Close', 'Volume']
                df = df.astype(float)

                # 追加情報
                df['Symbol'] = symbol

                return df
            else:
                print(f"データ取得失敗: {data.get('Error Message', 'Unknown error')}")
                return None

        except Exception as e:
            print(f"API リクエストエラー: {e}")
            return None

    def get_intraday_data(self, symbol, interval="5min"):
        """分足データを取得

        Args:
            symbol (str): 株式シンボル
            interval (str): "1min", "5min", "15min", "30min", "60min"

        Returns:
            pd.DataFrame: 分足データ
        """
        params = {
            'function': 'TIME_SERIES_INTRADAY',
            'symbol': symbol,
            'interval': interval,
            'apikey': self.api_key
        }

        try:
            response = requests.get(self.base_url, params=params)
            data = response.json()

            time_series_key = f'Time Series ({interval})'

            if time_series_key in data:
                time_series = data[time_series_key]

                df = pd.DataFrame.from_dict(time_series, orient='index')
                df.index = pd.to_datetime(df.index)
                df = df.sort_index()

                df.columns = ['Open', 'High', 'Low', 'Close', 'Volume']
                df = df.astype(float)
                df['Symbol'] = symbol

                return df
            else:
                print(f"分足データ取得失敗: {data}")
                return None

        except Exception as e:
            print(f"分足データ取得エラー: {e}")
            return None

# 使用例
def demo_alphavantage_api():
    """Alpha Vantage API のデモ"""

    api = AlphaVantageAPI()  # 本番では API Key を設定

    # 日次データ取得
    print("日次データ取得中...")
    daily_data = api.get_daily_stock_data("AAPL")

    if daily_data is not None:
        print("最新5日の株価:")
        print(daily_data.tail())

        # 簡単な分析
        latest_price = daily_data['Close'].iloc[-1]
        prev_price = daily_data['Close'].iloc[-2]
        change_percent = ((latest_price - prev_price) / prev_price) * 100

        print(f"n現在価格: ${latest_price:.2f}")
        print(f"前日比: {change_percent:+.2f}%")

    return daily_data

# API Key が必要なため、デモはコメントアウト
# alphavantage_data = demo_alphavantage_api()

仮想通貨価格の取得

1. CoinGecko API を使用した仮想通貨価格取得

import requests
import pandas as pd
from datetime import datetime
import time
class CryptoPriceScraper:
"""仮想通貨価格取得クラス"""
def __init__(self):
self.base_url = "https://api.coingecko.com/api/v3"
self.session = requests.Session()
def get_current_prices(self, coin_ids, vs_currencies="jpy,usd"):
"""現在の仮想通貨価格を取得
Args:
coin_ids (list): コインIDのリスト
vs_currencies (str): 比較通貨("jpy,usd"など)
Returns:
dict: 価格データ
"""
try:
ids_str = ",".join(coin_ids)
url = f"{self.base_url}/simple/price"
params = {
'ids': ids_str,
'vs_currencies': vs_currencies,
'include_24hr_change': 'true',
'include_24hr_vol': 'true',
'include_last_updated_at': 'true'
}
response = self.session.get(url, params=params)
response.raise_for_status()
data = response.json()
# データを整形
formatted_data = []
for coin_id, coin_data in data.items():
for currency in vs_currencies.split(','):
price_key = currency
change_key = f"{currency}_24h_change"
vol_key = f"{currency}_24h_vol"
if price_key in coin_data:
formatted_data.append({
'coin_id': coin_id,
'currency': currency.upper(),
'price': coin_data[price_key],
'change_24h': coin_data.get(change_key, 0),
'volume_24h': coin_data.get(vol_key, 0),
'last_updated': datetime.fromtimestamp(coin_data.get('last_updated_at', 0)),
'timestamp': datetime.now()
})
return formatted_data
except Exception as e:
print(f"価格取得エラー: {e}")
return []
def get_market_data(self, coin_id):
"""詳細な市場データを取得
Args:
coin_id (str): コインID
Returns:
dict: 市場データ
"""
try:
url = f"{self.base_url}/coins/{coin_id}"
params = {
'localization': 'false',
'tickers': 'false',
'community_data': 'false',
'developer_data': 'false'
}
response = self.session.get(url, params=params)
response.raise_for_status()
data = response.json()
market_data = data.get('market_data', {})
return {
'coin_id': coin_id,
'name': data.get('name'),
'symbol': data.get('symbol', '').upper(),
'current_price_jpy': market_data.get('current_price', {}).get('jpy'),
'current_price_usd': market_data.get('current_price', {}).get('usd'),
'market_cap_jpy': market_data.get('market_cap', {}).get('jpy'),
'market_cap_usd': market_data.get('market_cap', {}).get('usd'),
'market_cap_rank': market_data.get('market_cap_rank'),
'total_volume_jpy': market_data.get('total_volume', {}).get('jpy'),
'price_change_24h': market_data.get('price_change_24h'),
'price_change_percentage_24h': market_data.get('price_change_percentage_24h'),
'circulating_supply': market_data.get('circulating_supply'),
'total_supply': market_data.get('total_supply'),
'ath_jpy': market_data.get('ath', {}).get('jpy'),
'ath_date': market_data.get('ath_date', {}).get('jpy'),
'atl_jpy': market_data.get('atl', {}).get('jpy'),
'atl_date': market_data.get('atl_date', {}).get('jpy'),
'last_updated': market_data.get('last_updated')
}
except Exception as e:
print(f"市場データ取得エラー: {e}")
return None
def get_historical_data(self, coin_id, days=30, vs_currency="jpy"):
"""過去の価格データを取得
Args:
coin_id (str): コインID
days (int): 取得日数
vs_currency (str): 比較通貨
Returns:
pd.DataFrame: 価格履歴データ
"""
try:
url = f"{self.base_url}/coins/{coin_id}/market_chart"
params = {
'vs_currency': vs_currency,
'days': days,
'interval': 'daily' if days > 90 else 'hourly'
}
response = self.session.get(url, params=params)
response.raise_for_status()
data = response.json()
# データを DataFrame に変換
prices = data.get('prices', [])
volumes = data.get('total_volumes', [])
market_caps = data.get('market_caps', [])
df_data = []
for i, (timestamp, price) in enumerate(prices):
row = {
'timestamp': datetime.fromtimestamp(timestamp / 1000),
'price': price,
'volume': volumes[i][1] if i < len(volumes) else 0,
'market_cap': market_caps[i][1] if i < len(market_caps) else 0
}
df_data.append(row)
df = pd.DataFrame(df_data)
df.set_index('timestamp', inplace=True)
return df
except Exception as e:
print(f"履歴データ取得エラー: {e}")
return None
# 使用例
def demo_crypto_scraping():
"""仮想通貨価格取得のデモ"""
scraper = CryptoPriceScraper()
# 人気の仮想通貨
popular_coins = [
'bitcoin',
'ethereum',
'binancecoin',
'cardano',
'solana',
'polkadot',
'dogecoin',
'shiba-inu'
]
print("=== 仮想通貨価格取得 ===")
# 現在価格を取得
current_prices = scraper.get_current_prices(popular_coins)
if current_prices:
df_prices = pd.DataFrame(current_prices)
# JPY価格のみ表示
jpy_prices = df_prices[df_prices['currency'] == 'JPY']
print("現在の価格(円):")
for _, row in jpy_prices.iterrows():
coin_name = row['coin_id'].replace('-', ' ').title()
price = f"{row['price']:,.0f}" if row['price'] > 1 else f"{row['price']:.6f}"
change = row['change_24h']
change_symbol = "📈" if change > 0 else "📉" if change < 0 else "➡️"
print(f"  {coin_name}: ¥{price} ({change:+.2f}%) {change_symbol}")
# データを保存
filename = f"crypto_prices_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
df_prices.to_csv(filename, index=False, encoding='utf-8-sig')
print(f"nデータを保存しました: {filename}")
# ビットコインの詳細データ
print("n=== ビットコイン詳細情報 ===")
btc_market_data = scraper.get_market_data('bitcoin')
if btc_market_data:
print(f"名前: {btc_market_data['name']} ({btc_market_data['symbol']})")
print(f"現在価格: ¥{btc_market_data['current_price_jpy']:,.0f}")
print(f"時価総額ランキング: {btc_market_data['market_cap_rank']}位")
print(f"24時間変動: {btc_market_data['price_change_percentage_24h']:+.2f}%")
print(f"過去最高値: ¥{btc_market_data['ath_jpy']:,.0f}")
# 過去30日の価格推移
print("n=== ビットコイン価格推移(過去30日)===")
btc_history = scraper.get_historical_data('bitcoin', days=30)
if btc_history is not None:
print("最新5日の価格:")
print(btc_history.tail())
# 簡単な統計
max_price = btc_history['price'].max()
min_price = btc_history['price'].min()
avg_price = btc_history['price'].mean()
print(f"n30日間の統計:")
print(f"最高値: ¥{max_price:,.0f}")
print(f"最安値: ¥{min_price:,.0f}")
print(f"平均値: ¥{avg_price:,.0f}")
return current_prices, btc_market_data, btc_history
# デモ実行
crypto_data = demo_crypto_scraping()

2. CCXT ライブラリを使用した取引所データ取得

import ccxt
import pandas as pd
from datetime import datetime
import time
class ExchangeDataCollector:
"""取引所データ収集クラス"""
def __init__(self):
self.exchanges = {
'binance': ccxt.binance(),
'coincheck': ccxt.coincheck(),
'bitflyer': ccxt.bitflyer()
}
def get_exchange_prices(self, symbol='BTC/JPY'):
"""複数取引所の価格を比較
Args:
symbol (str): 取引ペア
Returns:
list: 取引所別価格データ
"""
prices_data = []
for exchange_name, exchange in self.exchanges.items():
try:
print(f"{exchange_name} から {symbol} の価格を取得中...")
# Tickerデータを取得
ticker = exchange.fetch_ticker(symbol)
price_data = {
'exchange': exchange_name,
'symbol': symbol,
'last_price': ticker['last'],
'bid': ticker['bid'],
'ask': ticker['ask'],
'spread': ticker['ask'] - ticker['bid'] if ticker['ask'] and ticker['bid'] else 0,
'spread_percent': ((ticker['ask'] - ticker['bid']) / ticker['last']) * 100 if ticker['ask'] and ticker['bid'] and ticker['last'] else 0,
'volume': ticker['baseVolume'],
'high_24h': ticker['high'],
'low_24h': ticker['low'],
'change_24h': ticker['change'],
'change_percent_24h': ticker['percentage'],
'timestamp': datetime.fromtimestamp(ticker['timestamp'] / 1000) if ticker['timestamp'] else datetime.now()
}
prices_data.append(price_data)
print(f"  ✓ {exchange_name}: ¥{price_data['last_price']:,.0f}")
time.sleep(0.5)  # レート制限対策
except Exception as e:
print(f"  ✗ {exchange_name} エラー: {e}")
return prices_data
def get_orderbook(self, exchange_name, symbol='BTC/JPY', limit=10):
"""板情報を取得
Args:
exchange_name (str): 取引所名
symbol (str): 取引ペア
limit (int): 取得する板の深さ
Returns:
dict: 板情報
"""
try:
exchange = self.exchanges[exchange_name]
orderbook = exchange.fetch_order_book(symbol, limit)
return {
'exchange': exchange_name,
'symbol': symbol,
'bids': orderbook['bids'][:limit],  # 買い注文
'asks': orderbook['asks'][:limit],  # 売り注文
'timestamp': datetime.fromtimestamp(orderbook['timestamp'] / 1000) if orderbook['timestamp'] else datetime.now()
}
except Exception as e:
print(f"板情報取得エラー ({exchange_name}): {e}")
return None
def get_ohlcv_data(self, exchange_name, symbol='BTC/JPY', timeframe='1h', limit=100):
"""OHLCV(ローソク足)データを取得
Args:
exchange_name (str): 取引所名
symbol (str): 取引ペア
timeframe (str): 時間軸 ('1m', '5m', '1h', '1d' など)
limit (int): 取得するデータ数
Returns:
pd.DataFrame: OHLCVデータ
"""
try:
exchange = self.exchanges[exchange_name]
# OHLCVデータを取得
ohlcv = exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
# DataFrameに変換
df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
df.set_index('timestamp', inplace=True)
# 追加情報
df['exchange'] = exchange_name
df['symbol'] = symbol
df['timeframe'] = timeframe
return df
except Exception as e:
print(f"OHLCVデータ取得エラー ({exchange_name}): {e}")
return None
# 使用例
def demo_exchange_data():
"""取引所データ取得のデモ"""
collector = ExchangeDataCollector()
# 複数取引所の価格比較
print("=== 取引所価格比較 ===")
btc_prices = collector.get_exchange_prices('BTC/JPY')
if btc_prices:
df_prices = pd.DataFrame(btc_prices)
print("n価格比較:")
for _, row in df_prices.iterrows():
print(f"{row['exchange']:>10}: ¥{row['last_price']:>10,.0f} (スプレッド: {row['spread_percent']:.2f}%)")
# 最安値・最高値
min_price_row = df_prices.loc[df_prices['last_price'].idxmin()]
max_price_row = df_prices.loc[df_prices['last_price'].idxmax()]
print(f"n最安値: {min_price_row['exchange']} - ¥{min_price_row['last_price']:,.0f}")
print(f"最高値: {max_price_row['exchange']} - ¥{max_price_row['last_price']:,.0f}")
price_diff = max_price_row['last_price'] - min_price_row['last_price']
arbitrage_opportunity = (price_diff / min_price_row['last_price']) * 100
print(f"価格差: ¥{price_diff:,.0f} ({arbitrage_opportunity:.2f}%)")
# 板情報の取得
print("n=== 板情報 ===")
orderbook = collector.get_orderbook('bitflyer', 'BTC/JPY', limit=5)
if orderbook:
print(f"{orderbook['exchange']} の板情報:")
print("売り注文 (Ask):")
for price, amount in orderbook['asks']:
print(f"  ¥{price:>10,.0f} x {amount:>8.4f} BTC")
print("買い注文 (Bid):")
for price, amount in orderbook['bids']:
print(f"  ¥{price:>10,.0f} x {amount:>8.4f} BTC")
# OHLCVデータの取得
print("n=== ローソク足データ ===")
ohlcv_data = collector.get_ohlcv_data('bitflyer', 'BTC/JPY', '1h', 24)
if ohlcv_data is not None:
print("過去24時間の時間足データ:")
print(ohlcv_data.tail())
# 簡単な分析
highest = ohlcv_data['high'].max()
lowest = ohlcv_data['low'].min()
latest_close = ohlcv_data['close'].iloc[-1]
print(f"n24時間の統計:")
print(f"最高値: ¥{highest:,.0f}")
print(f"最安値: ¥{lowest:,.0f}")
print(f"現在値: ¥{latest_close:,.0f}")
return btc_prices, orderbook, ohlcv_data
# デモ実行
# 注意: 取引所のAPIアクセスが必要
# exchange_data = demo_exchange_data()

データの可視化と分析

1. 価格チャートの作成

import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
import pandas as pd
class PriceVisualizer:
"""価格データ可視化クラス"""
def __init__(self):
plt.style.use('seaborn-v0_8')
plt.rcParams['font.family'] = 'DejaVu Sans'
sns.set_palette("husl")
def plot_price_comparison(self, price_data, title="価格比較"):
"""複数取引所の価格比較チャート
Args:
price_data (list): 価格データのリスト
title (str): チャートタイトル
"""
if not price_data:
print("表示するデータがありません")
return
df = pd.DataFrame(price_data)
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
# 価格比較(棒グラフ)
ax1.bar(df['exchange'], df['last_price'], color=sns.color_palette("viridis", len(df)))
ax1.set_title(f'{title} - 現在価格')
ax1.set_ylabel('価格 (JPY)')
ax1.tick_params(axis='x', rotation=45)
# 価格を上に表示
for i, v in enumerate(df['last_price']):
ax1.text(i, v + max(df['last_price']) * 0.01, f'¥{v:,.0f}', 
ha='center', va='bottom', fontweight='bold')
# スプレッド比較
if 'spread_percent' in df.columns:
ax2.bar(df['exchange'], df['spread_percent'], color=sns.color_palette("plasma", len(df)))
ax2.set_title('スプレッド比較')
ax2.set_ylabel('スプレッド (%)')
ax2.tick_params(axis='x', rotation=45)
# スプレッドを上に表示
for i, v in enumerate(df['spread_percent']):
ax2.text(i, v + max(df['spread_percent']) * 0.05, f'{v:.2f}%', 
ha='center', va='bottom', fontweight='bold')
plt.tight_layout()
plt.show()
def plot_price_history(self, df, title="価格推移"):
"""価格履歴チャート
Args:
df (pd.DataFrame): 価格履歴データ
title (str): チャートタイトル
"""
if df is None or df.empty:
print("表示するデータがありません")
return
fig, axes = plt.subplots(2, 2, figsize=(15, 10))
fig.suptitle(title, fontsize=16)
# 価格推移(折れ線グラフ)
axes[0, 0].plot(df.index, df['price'], linewidth=2, color='blue')
axes[0, 0].set_title('価格推移')
axes[0, 0].set_ylabel('価格 (JPY)')
axes[0, 0].grid(True, alpha=0.3)
# 出来高
axes[0, 1].bar(df.index, df['volume'], alpha=0.7, color='green')
axes[0, 1].set_title('出来高')
axes[0, 1].set_ylabel('出来高')
# 価格分布(ヒストグラム)
axes[1, 0].hist(df['price'], bins=30, alpha=0.7, color='orange')
axes[1, 0].set_title('価格分布')
axes[1, 0].set_xlabel('価格 (JPY)')
axes[1, 0].set_ylabel('頻度')
# 移動平均
df['MA7'] = df['price'].rolling(window=7).mean()
df['MA30'] = df['price'].rolling(window=30).mean()
axes[1, 1].plot(df.index, df['price'], label='価格', alpha=0.7)
axes[1, 1].plot(df.index, df['MA7'], label='7日移動平均', linewidth=2)
axes[1, 1].plot(df.index, df['MA30'], label='30日移動平均', linewidth=2)
axes[1, 1].set_title('移動平均線')
axes[1, 1].set_ylabel('価格 (JPY)')
axes[1, 1].legend()
axes[1, 1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
def plot_candlestick(self, df, title="ローソク足チャート"):
"""ローソク足チャート(簡易版)
Args:
df (pd.DataFrame): OHLCVデータ
title (str): チャートタイトル
"""
if df is None or df.empty:
print("表示するデータがありません")
return
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(15, 10), height_ratios=[3, 1])
# ローソク足(簡易版)
for i, (timestamp, row) in enumerate(df.iterrows()):
color = 'red' if row['close'] >= row['open'] else 'blue'
# 高値・安値の線
ax1.plot([i, i], [row['low'], row['high']], color='black', linewidth=1)
# 始値・終値の箱
box_height = abs(row['close'] - row['open'])
box_bottom = min(row['open'], row['close'])
ax1.bar(i, box_height, bottom=box_bottom, width=0.6, 
color=color, alpha=0.7, edgecolor='black')
ax1.set_title(title)
ax1.set_ylabel('価格 (JPY)')
ax1.grid(True, alpha=0.3)
# 出来高
ax2.bar(range(len(df)), df['volume'], alpha=0.7, color='green')
ax2.set_title('出来高')
ax2.set_ylabel('出来高')
ax2.set_xlabel('時間')
plt.tight_layout()
plt.show()
# 使用例とデモ
def demo_visualization():
"""データ可視化のデモ"""
visualizer = PriceVisualizer()
# サンプルデータの作成
sample_exchange_data = [
{'exchange': 'bitFlyer', 'last_price': 5000000, 'spread_percent': 0.05},
{'exchange': 'Coincheck', 'last_price': 5005000, 'spread_percent': 0.08},
{'exchange': 'Binance', 'last_price': 4998000, 'spread_percent': 0.03}
]
# 価格比較チャート
print("価格比較チャートを表示...")
visualizer.plot_price_comparison(sample_exchange_data, "BTC/JPY 取引所比較")
# 価格履歴のサンプルデータ
dates = pd.date_range(start='2024-01-01', end='2024-01-30', freq='D')
np.random.seed(42)
sample_history = pd.DataFrame({
'price': 5000000 + np.random.randn(30).cumsum() * 50000,
'volume': np.random.randint(100, 1000, 30),
'market_cap': np.random.randint(1000000, 10000000, 30)
}, index=dates)
# 価格履歴チャート
print("価格履歴チャートを表示...")
visualizer.plot_price_history(sample_history, "BTC価格履歴(過去30日)")
# デモ実行
# 注意: matplotlib, seaborn のインストールが必要
# demo_visualization()

自動監視システムの構築

1. 価格アラートシステム

import time
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from datetime import datetime
import json
import threading
class PriceAlertSystem:
"""価格アラートシステム"""
def __init__(self, config_file="alert_config.json"):
self.config_file = config_file
self.config = self.load_config()
self.running = False
self.price_history = {}
def load_config(self):
"""設定ファイルを読み込み"""
try:
with open(self.config_file, 'r', encoding='utf-8') as f:
return json.load(f)
except FileNotFoundError:
# デフォルト設定
default_config = {
"alerts": [
{
"symbol": "BTC/JPY",
"exchange": "bitflyer",
"upper_threshold": 6000000,
"lower_threshold": 4000000,
"enabled": True
}
],
"email": {
"enabled": False,
"smtp_server": "smtp.gmail.com",
"smtp_port": 587,
"username": "",
"password": "",
"to_email": ""
},
"check_interval": 60
}
self.save_config(default_config)
return default_config
def save_config(self, config):
"""設定ファイルを保存"""
with open(self.config_file, 'w', encoding='utf-8') as f:
json.dump(config, f, indent=2, ensure_ascii=False)
def add_alert(self, symbol, exchange, upper_threshold=None, lower_threshold=None):
"""アラートを追加"""
alert = {
"symbol": symbol,
"exchange": exchange,
"upper_threshold": upper_threshold,
"lower_threshold": lower_threshold,
"enabled": True
}
self.config["alerts"].append(alert)
self.save_config(self.config)
print(f"アラートを追加: {symbol} @ {exchange}")
def send_email_alert(self, subject, message):
"""メールアラートを送信"""
if not self.config["email"]["enabled"]:
return False
try:
smtp_server = self.config["email"]["smtp_server"]
smtp_port = self.config["email"]["smtp_port"]
username = self.config["email"]["username"]
password = self.config["email"]["password"]
to_email = self.config["email"]["to_email"]
msg = MIMEMultipart()
msg['From'] = username
msg['To'] = to_email
msg['Subject'] = subject
msg.attach(MIMEText(message, 'plain', 'utf-8'))
server = smtplib.SMTP(smtp_server, smtp_port)
server.starttls()
server.login(username, password)
text = msg.as_string()
server.sendmail(username, to_email, text)
server.quit()
return True
except Exception as e:
print(f"メール送信エラー: {e}")
return False
def check_alerts(self):
"""アラートをチェック"""
from .crypto_scraper import ExchangeDataCollector  # 前で定義したクラス
collector = ExchangeDataCollector()
for alert in self.config["alerts"]:
if not alert["enabled"]:
continue
try:
# 価格を取得
prices = collector.get_exchange_prices(alert["symbol"])
exchange_price = None
for price_data in prices:
if price_data["exchange"] == alert["exchange"]:
exchange_price = price_data["last_price"]
break
if exchange_price is None:
continue
# 価格履歴に記録
key = f"{alert['exchange']}_{alert['symbol']}"
if key not in self.price_history:
self.price_history[key] = []
self.price_history[key].append({
'timestamp': datetime.now(),
'price': exchange_price
})
# 古いデータを削除(24時間以上前)
cutoff_time = datetime.now() - timedelta(hours=24)
self.price_history[key] = [
entry for entry in self.price_history[key] 
if entry['timestamp'] > cutoff_time
]
# アラート条件をチェック
alert_triggered = False
alert_message = ""
if alert["upper_threshold"] and exchange_price > alert["upper_threshold"]:
alert_triggered = True
alert_message = f"{alert['symbol']} が上限価格を突破しました!n" 
f"現在価格: ¥{exchange_price:,.0f}n" 
f"上限設定: ¥{alert['upper_threshold']:,.0f}n" 
f"取引所: {alert['exchange']}"
elif alert["lower_threshold"] and exchange_price < alert["lower_threshold"]:
alert_triggered = True
alert_message = f"{alert['symbol']} が下限価格を下回りました!n" 
f"現在価格: ¥{exchange_price:,.0f}n" 
f"下限設定: ¥{alert['lower_threshold']:,.0f}n" 
f"取引所: {alert['exchange']}"
if alert_triggered:
print(f"🚨 アラート発生: {alert_message}")
# メールアラート送信
subject = f"価格アラート: {alert['symbol']}"
self.send_email_alert(subject, alert_message)
except Exception as e:
print(f"アラートチェックエラー: {e}")
def start_monitoring(self):
"""監視を開始"""
self.running = True
print("価格監視を開始しました...")
print(f"チェック間隔: {self.config['check_interval']} 秒")
while self.running:
try:
self.check_alerts()
time.sleep(self.config["check_interval"])
except KeyboardInterrupt:
break
except Exception as e:
print(f"監視エラー: {e}")
time.sleep(5)  # エラー時は5秒待機
def stop_monitoring(self):
"""監視を停止"""
self.running = False
print("価格監視を停止しました")
# 使用例
def demo_alert_system():
"""アラートシステムのデモ"""
alert_system = PriceAlertSystem()
# アラートを追加
alert_system.add_alert("BTC/JPY", "bitflyer", 
upper_threshold=6000000, 
lower_threshold=4000000)
print("アラートシステムのデモ(10回チェック)")
# 10回だけチェック
for i in range(10):
print(f"チェック {i+1}/10")
alert_system.check_alerts()
time.sleep(5)
print("デモ完了")
# デモ実行
# demo_alert_system()

スクレイピングのベストプラクティス

1. エラーハンドリングとリトライ機能

import requests
from time import sleep
import random
from functools import wraps
def retry_on_failure(max_retries=3, delay=1, backoff=2):
"""リトライデコレータ"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
retries = 0
while retries < max_retries:
try:
return func(*args, **kwargs)
except Exception as e:
retries += 1
if retries == max_retries:
print(f"最大リトライ回数に達しました: {e}")
raise
wait_time = delay * (backoff ** (retries - 1))
jitter = random.uniform(0.1, 0.5)  # ランダムな遅延を追加
sleep(wait_time + jitter)
print(f"リトライ {retries}/{max_retries} (待機: {wait_time:.1f}秒)")
return None
return wrapper
return decorator
class RobustScraper:
"""堅牢なスクレイピングクラス"""
def __init__(self):
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
})
@retry_on_failure(max_retries=3, delay=2, backoff=2)
def safe_request(self, url, **kwargs):
"""安全なHTTPリクエスト"""
response = self.session.get(url, timeout=10, **kwargs)
response.raise_for_status()
return response
def scrape_with_rate_limit(self, urls, delay_range=(1, 3)):
"""レート制限付きスクレイピング"""
results = []
for i, url in enumerate(urls):
try:
print(f"処理中 ({i+1}/{len(urls)}): {url}")
response = self.safe_request(url)
results.append({
'url': url,
'status': 'success',
'content': response.text,
'timestamp': datetime.now()
})
# ランダムな遅延
if i < len(urls) - 1:  # 最後以外
delay = random.uniform(*delay_range)
print(f"  待機中: {delay:.1f}秒")
sleep(delay)
except Exception as e:
results.append({
'url': url,
'status': 'error',
'error': str(e),
'timestamp': datetime.now()
})
print(f"  エラー: {e}")
return results

2. データの永続化と管理

import sqlite3
import json
from datetime import datetime
class PriceDataManager:
"""価格データ管理クラス"""
def __init__(self, db_file="price_data.db"):
self.db_file = db_file
self.init_database()
def init_database(self):
"""データベースを初期化"""
conn = sqlite3.connect(self.db_file)
cursor = conn.cursor()
# 価格データテーブル
cursor.execute('''
CREATE TABLE IF NOT EXISTS price_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
symbol TEXT NOT NULL,
exchange TEXT NOT NULL,
price REAL NOT NULL,
volume REAL,
timestamp DATETIME NOT NULL,
source TEXT,
raw_data TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
''')
# インデックス作成
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_symbol_exchange_timestamp 
ON price_data(symbol, exchange, timestamp)
''')
conn.commit()
conn.close()
def save_price_data(self, data_list):
"""価格データを保存"""
conn = sqlite3.connect(self.db_file)
cursor = conn.cursor()
for data in data_list:
cursor.execute('''
INSERT INTO price_data 
(symbol, exchange, price, volume, timestamp, source, raw_data)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
data.get('symbol'),
data.get('exchange'),
data.get('price'),
data.get('volume'),
data.get('timestamp'),
data.get('source'),
json.dumps(data, ensure_ascii=False, default=str)
))
conn.commit()
conn.close()
print(f"{len(data_list)} 件のデータを保存しました")
def get_price_history(self, symbol, exchange=None, hours=24):
"""価格履歴を取得"""
conn = sqlite3.connect(self.db_file)
query = '''
SELECT * FROM price_data 
WHERE symbol = ? 
AND timestamp >= datetime('now', '-{} hours')
'''.format(hours)
params = [symbol]
if exchange:
query += ' AND exchange = ?'
params.append(exchange)
query += ' ORDER BY timestamp'
df = pd.read_sql_query(query, conn, params=params)
conn.close()
return df
def get_latest_prices(self, symbol):
"""最新価格を取得"""
conn = sqlite3.connect(self.db_file)
query = '''
SELECT * FROM price_data 
WHERE symbol = ? 
AND timestamp = (
SELECT MAX(timestamp) FROM price_data 
WHERE symbol = ? AND exchange = price_data.exchange
)
'''
df = pd.read_sql_query(query, conn, params=[symbol, symbol])
conn.close()
return df
# 使用例
def demo_data_management():
"""データ管理のデモ"""
manager = PriceDataManager()
# サンプルデータ
sample_data = [
{
'symbol': 'BTC/JPY',
'exchange': 'bitflyer',
'price': 5000000,
'volume': 100,
'timestamp': datetime.now(),
'source': 'demo'
}
]
# データ保存
manager.save_price_data(sample_data)
# 履歴取得
history = manager.get_price_history('BTC/JPY')
print(f"履歴データ: {len(history)} 件")
# 最新価格取得
latest = manager.get_latest_prices('BTC/JPY')
print(f"最新価格: {len(latest)} 件")
# demo_data_management()

まとめと次のステップ

この記事で学んだこと

  1. Webスクレイピングの基本: requests、BeautifulSoupの使用方法
  2. 株価データ取得: Yahoo Finance、Alpha Vantage APIの活用
  3. 仮想通貨価格取得: CoinGecko API、CCXTライブラリの活用
  4. データ可視化: matplotlib、seabornによるチャート作成
  5. 自動監視システム: 価格アラート、メール通知の実装
  6. 堅牢な設計: エラーハンドリング、リトライ機能、データ永続化

スクレイピングの注意点

  • 法的・倫理的配慮: robots.txt、利用規約の遵守
  • サーバー負荷: 適切なアクセス間隔の維持
  • データ品質: エラーハンドリングとデータ検証
  • セキュリティ: APIキーの安全な管理

次のステップ

  1. 機械学習との組み合わせ: 価格予測モデルの構築
  2. リアルタイム処理: WebSocketを使った即時データ取得
  3. クラウド化: AWS、GCPでの自動実行環境構築
  4. 高度な分析: テクニカル分析指標の実装

関連記事

Webスクレイピングは強力なデータ収集手法ですが、責任を持って適切に活用することが重要です。まずは小規模なプロジェクトから始めて、徐々にスキルを向上させていきましょう。

コメントする