Python非同期処理入門 第2部:CCXTで複数取引所から価格を高速取得する方法
前回の記事で非同期処理の基本を学びました。今回は、実際に仮想通貨の価格データを複数の取引所から同時に取得する方法を解説します。CCXTライブラリの非同期版を使って、実用的なプログラムを作っていきましょう。
CCXTとは?
CCXT(CryptoCurrency eXchange Trading Library)は、100以上の仮想通貨取引所に対応した統一的なAPIライブラリです。各取引所のAPIの違いを吸収してくれるので、同じコードで複数の取引所にアクセスできます。
インストール方法
pip install ccxt同期版と非同期版の違い
CCXTには2つのバージョンがあります:
# 同期版(通常のインポート)
import ccxt
# 非同期版(async_supportを使う)
import ccxt.async_support as ccxt実際にコードで違いを見てみましょう。
同期版:1つずつ価格を取得(遅い)
import ccxt
import time
def get_btc_price_sync():
"""3つの取引所から順番に価格を取得"""
exchanges = {
'binance': ccxt.binance(),
'coinbase': ccxt.coinbase(),
'kraken': ccxt.kraken()
}
prices = {}
start_time = time.time()
for name, exchange in exchanges.items():
try:
print(f"{name}から価格を取得中...")
ticker = exchange.fetch_ticker('BTC/USDT')
prices[name] = ticker['last']
print(f"{name}: ${ticker['last']:,.2f}")
except Exception as e:
print(f"{name}: エラー - {e}")
prices[name] = None
elapsed_time = time.time() - start_time
print(f"\n処理時間: {elapsed_time:.2f}秒")
return prices
# 実行
prices = get_btc_price_sync()非同期版:同時に価格を取得(速い)
import asyncio
import ccxt.async_support as ccxt
import time
async def get_price_from_exchange(exchange_name, symbol):
"""1つの取引所から価格を取得"""
# 取引所オブジェクトを作成
exchange_class = getattr(ccxt, exchange_name)
exchange = exchange_class()
try:
print(f"{exchange_name}から価格を取得中...")
ticker = await exchange.fetch_ticker(symbol)
price = ticker['last']
print(f"{exchange_name}: ${price:,.2f}")
return exchange_name, price
except Exception as e:
print(f"{exchange_name}: エラー - {e}")
return exchange_name, None
finally:
# 重要:必ず接続を閉じる
await exchange.close()
async def get_btc_price_async():
"""3つの取引所から同時に価格を取得"""
start_time = time.time()
# 同時に実行するタスクを作成
tasks = [
get_price_from_exchange('binance', 'BTC/USDT'),
get_price_from_exchange('coinbase', 'BTC/USD'),
get_price_from_exchange('kraken', 'BTC/USD')
]
# 全てのタスクを同時実行
results = await asyncio.gather(*tasks)
elapsed_time = time.time() - start_time
print(f"\n処理時間: {elapsed_time:.2f}秒")
# 結果を辞書に変換
prices = dict(results)
return prices
# 実行
prices = asyncio.run(get_btc_price_async())初心者向け:コードの詳しい解説
1. 非同期関数の定義
async def get_price_from_exchange(exchange_name, symbol):async defで始まる関数は非同期関数です。この中でawaitを使えます。
2. 取引所オブジェクトの作成
exchange_class = getattr(ccxt, exchange_name)
exchange = exchange_class()getattrは文字列から属性を取得する関数です。'binance'という文字列からccxt.binanceを取得しています。
3. 非同期でAPIを呼び出す
ticker = await exchange.fetch_ticker(symbol)awaitをつけることで、APIの応答を待っている間に他の処理を実行できます。
4. 必ず接続を閉じる
finally:
await exchange.close()finallyブロックは、エラーが発生しても必ず実行されます。接続を閉じ忘れるとメモリリークの原因になります。
エラーハンドリングを追加した実用版
import asyncio
import ccxt.async_support as ccxt
from datetime import datetime
async def fetch_price_safely(exchange_name, symbol, timeout=10):
"""タイムアウトとリトライ機能付きの価格取得"""
exchange = None
for attempt in range(3): # 最大3回リトライ
try:
# 取引所オブジェクトを作成
exchange_class = getattr(ccxt, exchange_name)
exchange = exchange_class({
'timeout': timeout * 1000, # ミリ秒単位
'enableRateLimit': True # レート制限を有効化
})
# タイムアウト付きで価格を取得
ticker = await asyncio.wait_for(
exchange.fetch_ticker(symbol),
timeout=timeout
)
return {
'exchange': exchange_name,
'symbol': symbol,
'price': ticker['last'],
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'success': True
}
except asyncio.TimeoutError:
print(f"{exchange_name}: タイムアウト (試行 {attempt + 1}/3)")
except Exception as e:
print(f"{exchange_name}: エラー - {e} (試行 {attempt + 1}/3)")
finally:
if exchange:
await exchange.close()
# リトライ前に少し待つ
if attempt < 2:
await asyncio.sleep(1)
# 全ての試行が失敗
return {
'exchange': exchange_name,
'symbol': symbol,
'price': None,
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'success': False
}
async def compare_prices(symbol='BTC/USDT'):
"""複数取引所の価格を比較"""
exchanges = ['binance', 'bybit', 'okx', 'kucoin', 'gateio']
print(f"\n{symbol}の価格を取得中...\n")
# 全ての取引所から同時に価格を取得
tasks = [
fetch_price_safely(exchange, symbol)
for exchange in exchanges
]
results = await asyncio.gather(*tasks)
# 成功した結果のみフィルタリング
valid_results = [r for r in results if r['success']]
if not valid_results:
print("価格を取得できませんでした")
return
# 価格でソート
valid_results.sort(key=lambda x: x['price'])
print(f"\n{'取引所':<10} {'価格':>12} {'時刻'}")
print("-" * 40)
for result in valid_results:
print(f"{result['exchange']:<10} "
f"${result['price']:>11,.2f} "
f"{result['timestamp']}")
# 最安値と最高値の差を計算
min_price = valid_results[0]['price']
max_price = valid_results[-1]['price']
difference = max_price - min_price
percentage = (difference / min_price) * 100
print(f"\n価格差: ${difference:,.2f} ({percentage:.2f}%)")
print(f"最安: {valid_results[0]['exchange']} (${min_price:,.2f})")
print(f"最高: {valid_results[-1]['exchange']} (${max_price:,.2f})")
# 実行
asyncio.run(compare_prices())パフォーマンス比較:実測結果
実際にどれくらい速くなるか測定してみましょう:
import asyncio
import time
import ccxt
import ccxt.async_support as ccxt_async
async def performance_test():
"""同期と非同期のパフォーマンス比較"""
exchanges = ['binance', 'coinbase', 'kraken']
symbol = 'BTC/USD'
# 同期版の測定
print("同期版テスト中...")
sync_start = time.time()
for exchange_name in exchanges:
try:
exchange = getattr(ccxt, exchange_name)()
exchange.fetch_ticker(symbol)
except:
pass
sync_time = time.time() - sync_start
# 非同期版の測定
print("非同期版テスト中...")
async_start = time.time()
tasks = []
for exchange_name in exchanges:
exchange = getattr(ccxt_async, exchange_name)()
task = exchange.fetch_ticker(symbol)
tasks.append((exchange, task))
results = await asyncio.gather(
*[task for _, task in tasks],
return_exceptions=True
)
# 接続を閉じる
for exchange, _ in tasks:
await exchange.close()
async_time = time.time() - async_start
print(f"\n結果:")
print(f"同期版: {sync_time:.2f}秒")
print(f"非同期版: {async_time:.2f}秒")
print(f"速度向上: {sync_time / async_time:.1f}倍")
# 実行
asyncio.run(performance_test())よくあるトラブルと解決法
1. 「Session is closed」エラー
# ❌ 間違い:closeを忘れている
async def bad_example():
exchange = ccxt.binance()
ticker = await exchange.fetch_ticker('BTC/USDT')
# await exchange.close() を忘れている!
# ✅ 正解:必ずcloseする
async def good_example():
exchange = ccxt.binance()
try:
ticker = await exchange.fetch_ticker('BTC/USDT')
finally:
await exchange.close()2. レート制限エラー
# ✅ レート制限を有効にする
exchange = ccxt.binance({
'enableRateLimit': True, # 自動的にリクエスト間隔を調整
'rateLimit': 1200 # ミリ秒単位(1.2秒)
})まとめ
非同期処理を使うことで:
- 処理時間を大幅に短縮(3倍以上高速化)
- 複数の取引所を効率的に監視
- リアルタイムアプリケーションの構築が可能
ただし、以下の点に注意:
- 必ず
await exchange.close()で接続を閉じる - エラーハンドリングを適切に行う
- レート制限を守る
次回の第3部では、この知識を使って実用的な価格監視ツールを作成します。
練習問題
以下の機能を追加してみましょう:
- 指定した閾値を超えたらアラートを出す
- 価格データをCSVファイルに保存する
- 複数の通貨ペアを同時に監視する
次回予告
第3部:実用的な価格監視ツール作成では、リアルタイムで価格を監視し、条件に応じてアラートを出すツールを作成します。
関連記事
Streamlit入門 第3部:ポートフォリオ管理とアラート機能で完全な投資ツールを構築
これまでの記事で価格表示とチャート分析ができるようになりました。今回は最終回として、自分の保有している仮想通貨を管理するポートフォリオ機能と、価格変動を知らせるアラート機能を追加します。これで本格的な投資管理ツールの完成です! ポートフォリオ管理とは?なぜ重要なのか ...
Streamlit入門 第2部:美しいチャートと可視化で仮想通貨を分析しよう
前回の記事で基本的な価格表示ができるようになりました。今回は、プロのトレーダーが使うような美しいチャートを追加して、本格的な分析ツールに仕上げていきます。難しそうに見えますが、実は数行のコードで驚くほど高機能なチャートが作れるんです! Plotlyとは?なぜグラフライブラリの中で最強なのか ...
Streamlit入門 第1部:初心者でも30分で作れる仮想通貨価格表示アプリ
「PythonでWebアプリを作りたいけど、HTMLやJavaScriptは難しそう...」と思っていませんか?Streamlitなら、Pythonだけで簡単にWebアプリが作れます!今回は、仮想通貨の価格を表示するアプリを作りながら、Streamlitの基本を学びましょう。 ...
Python Webスクレイピング 2025 第3部:実践プロジェクトとデータ管理
はじめに シリーズ最終回となる第3部では、実際のプロジェクトで使える実践的なスクレイピングシステムの構築について解説します。検出回避技術、データの永続化、監視システムの構築など、プロダクション環境で必要となる高度な技術を学びます。 また、法的・倫理的配慮についても詳しく説明し、責任あるスクレイピング...