人渣反派自救系統[分冊版59] 人渣反派自救系統[分冊版] (Pleiades Press)
¥99 (2025-07-17 16:40 GMT +09:00 時点 - 詳細はこちら価格および発送可能時期は表示された日付/時刻の時点のものであり、変更される場合があります。本商品の購入においては、購入の時点で当該の Amazon サイトに表示されている価格および発送可能時期の情報が適用されます。)【Windows11 Pro】 DSPライセンス日本語版(インストールUSB付)
¥10,980 (2025-07-17 16:40 GMT +09:00 時点 - 詳細はこちら価格および発送可能時期は表示された日付/時刻の時点のものであり、変更される場合があります。本商品の購入においては、購入の時点で当該の Amazon サイトに表示されている価格および発送可能時期の情報が適用されます。)DocuWorks 10 アップグレード ライセンス認証版 基本パッケージ / 1ライセンス
¥16,000 (2025-07-17 16:40 GMT +09:00 時点 - 詳細はこちら価格および発送可能時期は表示された日付/時刻の時点のものであり、変更される場合があります。本商品の購入においては、購入の時点で当該の Amazon サイトに表示されている価格および発送可能時期の情報が適用されます。)前回の記事で非同期処理の基本を学びました。今回は、実際に仮想通貨の価格データを複数の取引所から同時に取得する方法を解説します。CCXTライブラリの非同期版を使って、実用的なプログラムを作っていきましょう。
目次
CCXTとは?
CCXT(CryptoCurrency eXchange Trading Library)は、100以上の仮想通貨取引所に対応した統一的なAPIライブラリです。各取引所のAPIの違いを吸収してくれるので、同じコードで複数の取引所にアクセスできます。
インストール方法
pip install ccxt
同期版と非同期版の違い
CCXTには2つのバージョンがあります:
# 同期版(通常のインポート)
import ccxt
# 非同期版(async_supportを使う)
import ccxt.async_support as ccxt
実際にコードで違いを見てみましょう。
同期版:1つずつ価格を取得(遅い)
import ccxt
import time
def get_btc_price_sync():
"""3つの取引所から順番に価格を取得"""
exchanges = {
'binance': ccxt.binance(),
'coinbase': ccxt.coinbase(),
'kraken': ccxt.kraken()
}
prices = {}
start_time = time.time()
for name, exchange in exchanges.items():
try:
print(f"{name}から価格を取得中...")
ticker = exchange.fetch_ticker('BTC/USDT')
prices[name] = ticker['last']
print(f"{name}: ${ticker['last']:,.2f}")
except Exception as e:
print(f"{name}: エラー - {e}")
prices[name] = None
elapsed_time = time.time() - start_time
print(f"n処理時間: {elapsed_time:.2f}秒")
return prices
# 実行
prices = get_btc_price_sync()
非同期版:同時に価格を取得(速い)
import asyncio
import ccxt.async_support as ccxt
import time
async def get_price_from_exchange(exchange_name, symbol):
"""1つの取引所から価格を取得"""
# 取引所オブジェクトを作成
exchange_class = getattr(ccxt, exchange_name)
exchange = exchange_class()
try:
print(f"{exchange_name}から価格を取得中...")
ticker = await exchange.fetch_ticker(symbol)
price = ticker['last']
print(f"{exchange_name}: ${price:,.2f}")
return exchange_name, price
except Exception as e:
print(f"{exchange_name}: エラー - {e}")
return exchange_name, None
finally:
# 重要:必ず接続を閉じる
await exchange.close()
async def get_btc_price_async():
"""3つの取引所から同時に価格を取得"""
start_time = time.time()
# 同時に実行するタスクを作成
tasks = [
get_price_from_exchange('binance', 'BTC/USDT'),
get_price_from_exchange('coinbase', 'BTC/USD'),
get_price_from_exchange('kraken', 'BTC/USD')
]
# 全てのタスクを同時実行
results = await asyncio.gather(*tasks)
elapsed_time = time.time() - start_time
print(f"n処理時間: {elapsed_time:.2f}秒")
# 結果を辞書に変換
prices = dict(results)
return prices
# 実行
prices = asyncio.run(get_btc_price_async())
初心者向け:コードの詳しい解説
1. 非同期関数の定義
async def get_price_from_exchange(exchange_name, symbol):
async def
で始まる関数は非同期関数です。この中でawait
を使えます。
2. 取引所オブジェクトの作成
exchange_class = getattr(ccxt, exchange_name)
exchange = exchange_class()
getattr
は文字列から属性を取得する関数です。'binance'
という文字列からccxt.binance
を取得しています。
3. 非同期でAPIを呼び出す
ticker = await exchange.fetch_ticker(symbol)
await
をつけることで、APIの応答を待っている間に他の処理を実行できます。
4. 必ず接続を閉じる
finally:
await exchange.close()
finally
ブロックは、エラーが発生しても必ず実行されます。接続を閉じ忘れるとメモリリークの原因になります。
エラーハンドリングを追加した実用版
import asyncio
import ccxt.async_support as ccxt
from datetime import datetime
async def fetch_price_safely(exchange_name, symbol, timeout=10):
"""タイムアウトとリトライ機能付きの価格取得"""
exchange = None
for attempt in range(3): # 最大3回リトライ
try:
# 取引所オブジェクトを作成
exchange_class = getattr(ccxt, exchange_name)
exchange = exchange_class({
'timeout': timeout * 1000, # ミリ秒単位
'enableRateLimit': True # レート制限を有効化
})
# タイムアウト付きで価格を取得
ticker = await asyncio.wait_for(
exchange.fetch_ticker(symbol),
timeout=timeout
)
return {
'exchange': exchange_name,
'symbol': symbol,
'price': ticker['last'],
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'success': True
}
except asyncio.TimeoutError:
print(f"{exchange_name}: タイムアウト (試行 {attempt + 1}/3)")
except Exception as e:
print(f"{exchange_name}: エラー - {e} (試行 {attempt + 1}/3)")
finally:
if exchange:
await exchange.close()
# リトライ前に少し待つ
if attempt < 2:
await asyncio.sleep(1)
# 全ての試行が失敗
return {
'exchange': exchange_name,
'symbol': symbol,
'price': None,
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'success': False
}
async def compare_prices(symbol='BTC/USDT'):
"""複数取引所の価格を比較"""
exchanges = ['binance', 'bybit', 'okx', 'kucoin', 'gateio']
print(f"n{symbol}の価格を取得中...n")
# 全ての取引所から同時に価格を取得
tasks = [
fetch_price_safely(exchange, symbol)
for exchange in exchanges
]
results = await asyncio.gather(*tasks)
# 成功した結果のみフィルタリング
valid_results = [r for r in results if r['success']]
if not valid_results:
print("価格を取得できませんでした")
return
# 価格でソート
valid_results.sort(key=lambda x: x['price'])
print(f"n{'取引所':<10} {'価格':>12} {'時刻'}")
print("-" * 40)
for result in valid_results:
print(f"{result['exchange']:<10} "
f"${result['price']:>11,.2f} "
f"{result['timestamp']}")
# 最安値と最高値の差を計算
min_price = valid_results[0]['price']
max_price = valid_results[-1]['price']
difference = max_price - min_price
percentage = (difference / min_price) * 100
print(f"n価格差: ${difference:,.2f} ({percentage:.2f}%)")
print(f"最安: {valid_results[0]['exchange']} (${min_price:,.2f})")
print(f"最高: {valid_results[-1]['exchange']} (${max_price:,.2f})")
# 実行
asyncio.run(compare_prices())
パフォーマンス比較:実測結果
実際にどれくらい速くなるか測定してみましょう:
import asyncio
import time
import ccxt
import ccxt.async_support as ccxt_async
async def performance_test():
"""同期と非同期のパフォーマンス比較"""
exchanges = ['binance', 'coinbase', 'kraken']
symbol = 'BTC/USD'
# 同期版の測定
print("同期版テスト中...")
sync_start = time.time()
for exchange_name in exchanges:
try:
exchange = getattr(ccxt, exchange_name)()
exchange.fetch_ticker(symbol)
except:
pass
sync_time = time.time() - sync_start
# 非同期版の測定
print("非同期版テスト中...")
async_start = time.time()
tasks = []
for exchange_name in exchanges:
exchange = getattr(ccxt_async, exchange_name)()
task = exchange.fetch_ticker(symbol)
tasks.append((exchange, task))
results = await asyncio.gather(
*[task for _, task in tasks],
return_exceptions=True
)
# 接続を閉じる
for exchange, _ in tasks:
await exchange.close()
async_time = time.time() - async_start
print(f"n結果:")
print(f"同期版: {sync_time:.2f}秒")
print(f"非同期版: {async_time:.2f}秒")
print(f"速度向上: {sync_time / async_time:.1f}倍")
# 実行
asyncio.run(performance_test())
よくあるトラブルと解決法
1. 「Session is closed」エラー
# ❌ 間違い:closeを忘れている
async def bad_example():
exchange = ccxt.binance()
ticker = await exchange.fetch_ticker('BTC/USDT')
# await exchange.close() を忘れている!
# ✅ 正解:必ずcloseする
async def good_example():
exchange = ccxt.binance()
try:
ticker = await exchange.fetch_ticker('BTC/USDT')
finally:
await exchange.close()
2. レート制限エラー
# ✅ レート制限を有効にする
exchange = ccxt.binance({
'enableRateLimit': True, # 自動的にリクエスト間隔を調整
'rateLimit': 1200 # ミリ秒単位(1.2秒)
})
まとめ
非同期処理を使うことで:
- 処理時間を大幅に短縮(3倍以上高速化)
- 複数の取引所を効率的に監視
- リアルタイムアプリケーションの構築が可能
ただし、以下の点に注意:
- 必ず
await exchange.close()
で接続を閉じる - エラーハンドリングを適切に行う
- レート制限を守る
次回の第3部では、この知識を使って実用的な価格監視ツールを作成します。
練習問題
以下の機能を追加してみましょう:
- 指定した閾値を超えたらアラートを出す
- 価格データをCSVファイルに保存する
- 複数の通貨ペアを同時に監視する
次回予告
第3部:実用的な価格監視ツール作成では、リアルタイムで価格を監視し、条件に応じてアラートを出すツールを作成します。