Python非同期処理入門 第2部:CCXTで複数取引所から価格を高速取得する方法

前回の記事で非同期処理の基本を学びました。今回は、実際に仮想通貨の価格データを複数の取引所から同時に取得する方法を解説します。CCXTライブラリの非同期版を使って、実用的なプログラムを作っていきましょう。

CCXTとは?

CCXT(CryptoCurrency eXchange Trading Library)は、100以上の仮想通貨取引所に対応した統一的なAPIライブラリです。各取引所のAPIの違いを吸収してくれるので、同じコードで複数の取引所にアクセスできます。

インストール方法

pip install ccxt

同期版と非同期版の違い

CCXTには2つのバージョンがあります:

# 同期版(通常のインポート)
import ccxt

# 非同期版(async_supportを使う)
import ccxt.async_support as ccxt

実際にコードで違いを見てみましょう。

同期版:1つずつ価格を取得(遅い)

import ccxt
import time

def get_btc_price_sync():
    """3つの取引所から順番に価格を取得"""
    exchanges = {
        'binance': ccxt.binance(),
        'coinbase': ccxt.coinbase(),
        'kraken': ccxt.kraken()
    }

    prices = {}
    start_time = time.time()

    for name, exchange in exchanges.items():
        try:
            print(f"{name}から価格を取得中...")
            ticker = exchange.fetch_ticker('BTC/USDT')
            prices[name] = ticker['last']
            print(f"{name}: ${ticker['last']:,.2f}")
        except Exception as e:
            print(f"{name}: エラー - {e}")
            prices[name] = None

    elapsed_time = time.time() - start_time
    print(f"n処理時間: {elapsed_time:.2f}秒")

    return prices

# 実行
prices = get_btc_price_sync()

非同期版:同時に価格を取得(速い)

import asyncio
import ccxt.async_support as ccxt
import time

async def get_price_from_exchange(exchange_name, symbol):
    """1つの取引所から価格を取得"""
    # 取引所オブジェクトを作成
    exchange_class = getattr(ccxt, exchange_name)
    exchange = exchange_class()

    try:
        print(f"{exchange_name}から価格を取得中...")
        ticker = await exchange.fetch_ticker(symbol)
        price = ticker['last']
        print(f"{exchange_name}: ${price:,.2f}")
        return exchange_name, price
    except Exception as e:
        print(f"{exchange_name}: エラー - {e}")
        return exchange_name, None
    finally:
        # 重要:必ず接続を閉じる
        await exchange.close()

async def get_btc_price_async():
    """3つの取引所から同時に価格を取得"""
    start_time = time.time()

    # 同時に実行するタスクを作成
    tasks = [
        get_price_from_exchange('binance', 'BTC/USDT'),
        get_price_from_exchange('coinbase', 'BTC/USD'),
        get_price_from_exchange('kraken', 'BTC/USD')
    ]

    # 全てのタスクを同時実行
    results = await asyncio.gather(*tasks)

    elapsed_time = time.time() - start_time
    print(f"n処理時間: {elapsed_time:.2f}秒")

    # 結果を辞書に変換
    prices = dict(results)
    return prices

# 実行
prices = asyncio.run(get_btc_price_async())

初心者向け:コードの詳しい解説

1. 非同期関数の定義

async def get_price_from_exchange(exchange_name, symbol):

async defで始まる関数は非同期関数です。この中でawaitを使えます。

2. 取引所オブジェクトの作成

exchange_class = getattr(ccxt, exchange_name)
exchange = exchange_class()

getattrは文字列から属性を取得する関数です。'binance'という文字列からccxt.binanceを取得しています。

3. 非同期でAPIを呼び出す

ticker = await exchange.fetch_ticker(symbol)

awaitをつけることで、APIの応答を待っている間に他の処理を実行できます。

4. 必ず接続を閉じる

finally:
    await exchange.close()

finallyブロックは、エラーが発生しても必ず実行されます。接続を閉じ忘れるとメモリリークの原因になります。

エラーハンドリングを追加した実用版

import asyncio
import ccxt.async_support as ccxt
from datetime import datetime

async def fetch_price_safely(exchange_name, symbol, timeout=10):
    """タイムアウトとリトライ機能付きの価格取得"""
    exchange = None

    for attempt in range(3):  # 最大3回リトライ
        try:
            # 取引所オブジェクトを作成
            exchange_class = getattr(ccxt, exchange_name)
            exchange = exchange_class({
                'timeout': timeout * 1000,  # ミリ秒単位
                'enableRateLimit': True     # レート制限を有効化
            })

            # タイムアウト付きで価格を取得
            ticker = await asyncio.wait_for(
                exchange.fetch_ticker(symbol),
                timeout=timeout
            )

            return {
                'exchange': exchange_name,
                'symbol': symbol,
                'price': ticker['last'],
                'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
                'success': True
            }

        except asyncio.TimeoutError:
            print(f"{exchange_name}: タイムアウト (試行 {attempt + 1}/3)")
        except Exception as e:
            print(f"{exchange_name}: エラー - {e} (試行 {attempt + 1}/3)")
        finally:
            if exchange:
                await exchange.close()

        # リトライ前に少し待つ
        if attempt < 2:
            await asyncio.sleep(1)

    # 全ての試行が失敗
    return {
        'exchange': exchange_name,
        'symbol': symbol,
        'price': None,
        'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
        'success': False
    }

async def compare_prices(symbol='BTC/USDT'):
    """複数取引所の価格を比較"""
    exchanges = ['binance', 'bybit', 'okx', 'kucoin', 'gateio']

    print(f"n{symbol}の価格を取得中...n")

    # 全ての取引所から同時に価格を取得
    tasks = [
        fetch_price_safely(exchange, symbol)
        for exchange in exchanges
    ]
    results = await asyncio.gather(*tasks)

    # 成功した結果のみフィルタリング
    valid_results = [r for r in results if r['success']]

    if not valid_results:
        print("価格を取得できませんでした")
        return

    # 価格でソート
    valid_results.sort(key=lambda x: x['price'])

    print(f"n{'取引所':<10} {'価格':>12} {'時刻'}")
    print("-" * 40)

    for result in valid_results:
        print(f"{result['exchange']:<10} "
              f"${result['price']:>11,.2f} "
              f"{result['timestamp']}")

    # 最安値と最高値の差を計算
    min_price = valid_results[0]['price']
    max_price = valid_results[-1]['price']
    difference = max_price - min_price
    percentage = (difference / min_price) * 100

    print(f"n価格差: ${difference:,.2f} ({percentage:.2f}%)")
    print(f"最安: {valid_results[0]['exchange']} (${min_price:,.2f})")
    print(f"最高: {valid_results[-1]['exchange']} (${max_price:,.2f})")

# 実行
asyncio.run(compare_prices())

パフォーマンス比較:実測結果

実際にどれくらい速くなるか測定してみましょう:

import asyncio
import time
import ccxt
import ccxt.async_support as ccxt_async

async def performance_test():
    """同期と非同期のパフォーマンス比較"""
    exchanges = ['binance', 'coinbase', 'kraken']
    symbol = 'BTC/USD'

    # 同期版の測定
    print("同期版テスト中...")
    sync_start = time.time()

    for exchange_name in exchanges:
        try:
            exchange = getattr(ccxt, exchange_name)()
            exchange.fetch_ticker(symbol)
        except:
            pass

    sync_time = time.time() - sync_start

    # 非同期版の測定
    print("非同期版テスト中...")
    async_start = time.time()

    tasks = []
    for exchange_name in exchanges:
        exchange = getattr(ccxt_async, exchange_name)()
        task = exchange.fetch_ticker(symbol)
        tasks.append((exchange, task))

    results = await asyncio.gather(
        *[task for _, task in tasks],
        return_exceptions=True
    )

    # 接続を閉じる
    for exchange, _ in tasks:
        await exchange.close()

    async_time = time.time() - async_start

    print(f"n結果:")
    print(f"同期版: {sync_time:.2f}秒")
    print(f"非同期版: {async_time:.2f}秒")
    print(f"速度向上: {sync_time / async_time:.1f}倍")

# 実行
asyncio.run(performance_test())

よくあるトラブルと解決法

1. 「Session is closed」エラー

# ❌ 間違い:closeを忘れている
async def bad_example():
    exchange = ccxt.binance()
    ticker = await exchange.fetch_ticker('BTC/USDT')
    # await exchange.close() を忘れている!

# ✅ 正解:必ずcloseする
async def good_example():
    exchange = ccxt.binance()
    try:
        ticker = await exchange.fetch_ticker('BTC/USDT')
    finally:
        await exchange.close()

2. レート制限エラー

# ✅ レート制限を有効にする
exchange = ccxt.binance({
    'enableRateLimit': True,  # 自動的にリクエスト間隔を調整
    'rateLimit': 1200        # ミリ秒単位(1.2秒)
})

まとめ

非同期処理を使うことで:

  1. 処理時間を大幅に短縮(3倍以上高速化)
  2. 複数の取引所を効率的に監視
  3. リアルタイムアプリケーションの構築が可能

ただし、以下の点に注意:

  • 必ずawait exchange.close()で接続を閉じる
  • エラーハンドリングを適切に行う
  • レート制限を守る

次回の第3部では、この知識を使って実用的な価格監視ツールを作成します。

練習問題

以下の機能を追加してみましょう:

  1. 指定した閾値を超えたらアラートを出す
  2. 価格データをCSVファイルに保存する
  3. 複数の通貨ペアを同時に監視する

次回予告

第3部:実用的な価格監視ツール作成では、リアルタイムで価格を監視し、条件に応じてアラートを出すツールを作成します。

関連記事

コメントする