CCXTとFastAPIを使ったOHLCVデータ取得APIの構築ガイド

はじめに

CCXTで作る仮想通貨自動取引ボットでは、仮想通貨の自動取引ボットを構築しましたが、このようなボットの開発や、取引戦略のバックテスト等を行っていると、取引所へのAPIアクセス制限が気になってきます。
今回はそんな制限を気にせず、思う存分にOHLCVデータを取得できるように、CCXTとFastAPIを使ったOHLCVデータ取得APIの構築方法を解説します。

概要

今回構築するシステムの主な機能:

  • データ取得: CCXTを使用して取引所からリアルタイムでOHLCVデータを取得
  • データ保存: データベース(SQLite)での効率的なデータ管理
  • 自動更新: バックグラウンドタスクによる定期的なデータ更新
  • REST API: FastAPIによる高性能なWebAPI提供
  • 柔軟なリクエスト: 様々なパラメータでのデータ取得に対応

システム構成

使用技術

  • CCXT: 暗号通貨取引所のAPIを統一的に扱うライブラリ
  • FastAPI: 高性能なPython Webフレームワーク
  • SQLite: 軽量なリレーショナルデータベース
  • Pydantic: データバリデーションとシリアライゼーション
  • Uvicorn: 高性能なASGIサーバー

アーキテクチャ図

┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│   取引所API      │────│  CCXT Fetcher   │────│  SQLite DB      │
│  (Binance等)    │    │                 │    │                 │
└─────────────────┘    └─────────────────┘    └─────────────────┘
                                │
                                │
┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│  クライアント    │────│   FastAPI       │────│ Background      │
│                 │    │   REST API      │    │ Scheduler       │
└─────────────────┘    └─────────────────┘    └─────────────────┘

コード詳細解説

以下に、CCXTとFastAPIを使用したOHLCVデータ取得APIの実装コードを抜粋して解説します。全体のコードは最下部にあります。

1. データベース管理クラス

OHLCVデータを格納するデータベースへの接続と管理を行うクラスを作成します。SQLiteを使用し、データの重複を防ぐためにUNIQUE制約を設けます。

class OHLCVDatabase:
    def __init__(self, db_name: str = DB_NAME):
        self.db_name = db_name
        self.init_db()

    def init_db(self):
        """データベースとテーブルを初期化"""
        conn = sqlite3.connect(self.db_name)
        cursor = conn.cursor()
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS ohlcv_data (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                symbol TEXT NOT NULL,
                timeframe TEXT NOT NULL,
                timestamp INTEGER NOT NULL,
                open REAL NOT NULL,
                high REAL NOT NULL,
                low REAL NOT NULL,
                close REAL NOT NULL,
                volume REAL NOT NULL,
                datetime TEXT NOT NULL,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                UNIQUE(symbol, timeframe, timestamp)
            )
        ''')

ポイント

  • UNIQUE制約により重複データを防止
  • インデックスによる高速検索
  • タイムスタンプとISO形式の日時を両方保存

2. データ取得クラス

データの取得と保存を行うクラスを作成します。CCXTライブラリを使用して取引所からOHLCVデータを取得し、SQLiteデータベースに保存します。

class ExchangeDataFetcher:
    def __init__(self, exchange_name: str = 'bybit'):
        self.exchange_name = exchange_name
        self.exchange = getattr(ccxt, exchange_name)({
            'rateLimit': 1200,
            'enableRateLimit': True,
        })
        self.db = OHLCVDatabase()

    async def fetch_and_store_ohlcv(self, symbol: str, timeframe: str = '1h', 
                                   limit: int = 100):
        """取引所からOHLCVデータを取得してDBに保存"""
        try:
            ohlcv = self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)

            if ohlcv:
                inserted_count = self.db.insert_ohlcv_data(symbol, timeframe, ohlcv)
                logger.info(f"Inserted {inserted_count} records for {symbol} {timeframe}")
                return inserted_count

ポイント

  • レート制限を有効にして取引所のAPI制限に対応
  • エラーハンドリングによる堅牢性の確保
  • 非同期処理による効率的なデータ取得

3. バックグラウンドスケジューラー

定期的にデータを取得するためのバックグラウンドタスクを実装します。FastAPIのバックグラウンドタスク機能を使用して、指定した時間間隔でデータを更新します。

class BackgroundScheduler:
    def __init__(self):
        self.fetcher = ExchangeDataFetcher()
        self.is_running = False
        self.symbols = ['BTC/USDT', 'ETH/USDT', 'BNB/USDT']
        self.timeframes = ['1h', '4h', '1d']

    async def periodic_fetch(self):
        """定期的なデータ取得タスク"""
        while self.is_running:
            try:
                for symbol in self.symbols:
                    for timeframe in self.timeframes:
                        await self.fetcher.fetch_and_store_ohlcv(symbol, timeframe)
                        await asyncio.sleep(2)  # レート制限対策

                await asyncio.sleep(3600)  # 1時間ごとに実行

ポイント

  • 複数のシンボルと時間足に対応
  • API制限を考慮した適切な間隔設定
  • エラー時の自動復旧機能

4. FastAPI エンドポイント

APIエンドポイントを定義し、クライアントからのリクエストに応じてOHLCVデータを返す機能を実装します。Pydanticモデルを使用して、リクエストとレスポンスの型安全性を確保します。

@app.get("/ohlcv/{symbol}", response_model=OHLCVResponse)
async def get_ohlcv(
    symbol: str,
    timeframe: str = "1h",
    limit: Optional[int] = 100,
    since: Optional[int] = None
):
    """OHLCVデータを取得"""
    try:
        db = OHLCVDatabase()
        ohlcv_data = db.get_ohlcv_data(symbol, timeframe, limit, since)

        if not ohlcv_data:
            raise HTTPException(
                status_code=404, 
                detail=f"No data found for {symbol} with timeframe {timeframe}"
            )

        return OHLCVResponse(
            symbol=symbol,
            timeframe=timeframe,
            data=ohlcv_data,
            count=len(ohlcv_data)
        )

ポイント

  • Pydanticモデルによる型安全性
  • 適切なHTTPステータスコードの返却
  • 柔軟なクエリパラメータ対応

完全なコード

import asyncio
import sqlite3
from datetime import datetime, timedelta
from typing import List, Optional
import ccxt
from fastapi import FastAPI, HTTPException, BackgroundTasks
from pydantic import BaseModel
import pandas as pd
from contextlib import asynccontextmanager
import uvicorn
import logging
# ログ設定
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# データベース設定
DB_NAME = "ohlcv_data.db"
# Pydanticモデル
class OHLCVData(BaseModel):
timestamp: int
open: float
high: float
low: float
close: float
volume: float
datetime: str
class OHLCVResponse(BaseModel):
symbol: str
timeframe: str
data: List[OHLCVData]
count: int
# データベースクラス
class OHLCVDatabase:
def __init__(self, db_name: str = DB_NAME):
self.db_name = db_name
self.init_db()
def init_db(self):
"""データベースとテーブルを初期化"""
conn = sqlite3.connect(self.db_name)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS ohlcv_data (
id INTEGER PRIMARY KEY AUTOINCREMENT,
symbol TEXT NOT NULL,
timeframe TEXT NOT NULL,
timestamp INTEGER NOT NULL,
open REAL NOT NULL,
high REAL NOT NULL,
low REAL NOT NULL,
close REAL NOT NULL,
volume REAL NOT NULL,
datetime TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(symbol, timeframe, timestamp)
)
''')
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_symbol_timeframe_timestamp 
ON ohlcv_data(symbol, timeframe, timestamp)
''')
conn.commit()
conn.close()
def insert_ohlcv_data(self, symbol: str, timeframe: str, ohlcv_list: List[List]):
"""OHLCVデータをデータベースに挿入"""
conn = sqlite3.connect(self.db_name)
cursor = conn.cursor()
data_to_insert = []
for ohlcv in ohlcv_list:
timestamp, open_price, high, low, close, volume = ohlcv
dt = datetime.fromtimestamp(timestamp / 1000).isoformat()
data_to_insert.append((
symbol, timeframe, timestamp, open_price, high, low, close, volume, dt
))
cursor.executemany('''
INSERT OR REPLACE INTO ohlcv_data 
(symbol, timeframe, timestamp, open, high, low, close, volume, datetime)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', data_to_insert)
conn.commit()
conn.close()
return len(data_to_insert)
def get_ohlcv_data(self, symbol: str, timeframe: str, limit: Optional[int] = None, 
since: Optional[int] = None) -> List[OHLCVData]:
"""データベースからOHLCVデータを取得"""
conn = sqlite3.connect(self.db_name)
cursor = conn.cursor()
query = '''
SELECT timestamp, open, high, low, close, volume, datetime
FROM ohlcv_data
WHERE symbol = ? AND timeframe = ?
'''
params = [symbol, timeframe]
if since:
query += ' AND timestamp >= ?'
params.append(since)
query += ' ORDER BY timestamp DESC'
if limit:
query += ' LIMIT ?'
params.append(limit)
cursor.execute(query, params)
rows = cursor.fetchall()
conn.close()
return [
OHLCVData(
timestamp=row[0],
open=row[1],
high=row[2],
low=row[3],
close=row[4],
volume=row[5],
datetime=row[6]
)
for row in rows
]
# 取引所データ取得クラス
class ExchangeDataFetcher:
def __init__(self, exchange_name: str = 'binance'):
self.exchange_name = exchange_name
self.exchange = getattr(ccxt, exchange_name)({
'rateLimit': 1200,
'enableRateLimit': True,
})
self.db = OHLCVDatabase()
async def fetch_and_store_ohlcv(self, symbol: str, timeframe: str = '1h', 
limit: int = 100):
"""取引所からOHLCVデータを取得してDBに保存"""
try:
# CCXTを使用してOHLCVデータを取得
ohlcv = self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
if ohlcv:
# データベースに保存
inserted_count = self.db.insert_ohlcv_data(symbol, timeframe, ohlcv)
logger.info(f"Inserted {inserted_count} records for {symbol} {timeframe}")
return inserted_count
else:
logger.warning(f"No data received for {symbol} {timeframe}")
return 0
except Exception as e:
logger.error(f"Error fetching data for {symbol} {timeframe}: {str(e)}")
raise e
# バックグラウンドタスク管理
class BackgroundScheduler:
def __init__(self):
self.fetcher = ExchangeDataFetcher()
self.is_running = False
self.symbols = ['BTC/USDT', 'ETH/USDT', 'BNB/USDT']  # デフォルトシンボル
self.timeframes = ['1h', '4h', '1d']  # デフォルト時間足
async def periodic_fetch(self):
"""定期的なデータ取得タスク"""
while self.is_running:
try:
for symbol in self.symbols:
for timeframe in self.timeframes:
await self.fetcher.fetch_and_store_ohlcv(symbol, timeframe)
await asyncio.sleep(2)  # レート制限対策
# 1時間ごとに実行
await asyncio.sleep(3600)
except Exception as e:
logger.error(f"Error in periodic fetch: {str(e)}")
await asyncio.sleep(300)  # エラー時は5分待機
def start(self):
"""バックグラウンドタスクを開始"""
self.is_running = True
asyncio.create_task(self.periodic_fetch())
def stop(self):
"""バックグラウンドタスクを停止"""
self.is_running = False
# バックグラウンドスケジューラーのインスタンス
scheduler = BackgroundScheduler()
# FastAPIアプリケーション
@asynccontextmanager
async def lifespan(app: FastAPI):
# 起動時の処理
logger.info("Starting OHLCV API server...")
scheduler.start()
yield
# 終了時の処理
logger.info("Shutting down OHLCV API server...")
scheduler.stop()
app = FastAPI(
title="OHLCV Data API",
description="CCXTを使用したOHLCVデータ取得API",
version="1.0.0",
lifespan=lifespan
)
# APIエンドポイント
@app.get("/")
async def root():
return {"message": "OHLCV Data API is running"}
@app.get("/ohlcv/{symbol}", response_model=OHLCVResponse)
async def get_ohlcv(
symbol: str,
timeframe: str = "1h",
limit: Optional[int] = 100,
since: Optional[int] = None
):
"""
OHLCVデータを取得
- **symbol**: 取引ペア (例: BTC/USDT)
- **timeframe**: 時間足 (1m, 5m, 15m, 1h, 4h, 1d など)
- **limit**: 取得する件数の上限
- **since**: 指定したタイムスタンプ以降のデータを取得
"""
try:
db = OHLCVDatabase()
ohlcv_data = db.get_ohlcv_data(symbol, timeframe, limit, since)
if not ohlcv_data:
raise HTTPException(
status_code=404, 
detail=f"No data found for {symbol} with timeframe {timeframe}"
)
return OHLCVResponse(
symbol=symbol,
timeframe=timeframe,
data=ohlcv_data,
count=len(ohlcv_data)
)
except Exception as e:
logger.error(f"Error getting OHLCV data: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/fetch/{symbol}")
async def manual_fetch(symbol: str, timeframe: str = "1h", limit: int = 100):
"""
手動でOHLCVデータを取得してDBに保存
- **symbol**: 取引ペア (例: BTC/USDT)
- **timeframe**: 時間足
- **limit**: 取得する件数
"""
try:
fetcher = ExchangeDataFetcher()
inserted_count = await fetcher.fetch_and_store_ohlcv(symbol, timeframe, limit)
return {
"message": f"Successfully fetched and stored {inserted_count} records",
"symbol": symbol,
"timeframe": timeframe,
"inserted_count": inserted_count
}
except Exception as e:
logger.error(f"Error in manual fetch: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/symbols")
async def get_available_symbols():
"""利用可能なシンボル一覧を取得"""
try:
conn = sqlite3.connect(DB_NAME)
cursor = conn.cursor()
cursor.execute('SELECT DISTINCT symbol FROM ohlcv_data ORDER BY symbol')
symbols = [row[0] for row in cursor.fetchall()]
conn.close()
return {"symbols": symbols}
except Exception as e:
logger.error(f"Error getting symbols: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/timeframes/{symbol}")
async def get_available_timeframes(symbol: str):
"""指定したシンボルで利用可能な時間足を取得"""
try:
conn = sqlite3.connect(DB_NAME)
cursor = conn.cursor()
cursor.execute(
'SELECT DISTINCT timeframe FROM ohlcv_data WHERE symbol = ? ORDER BY timeframe',
(symbol,)
)
timeframes = [row[0] for row in cursor.fetchall()]
conn.close()
if not timeframes:
raise HTTPException(
status_code=404,
detail=f"No timeframes found for symbol {symbol}"
)
return {"symbol": symbol, "timeframes": timeframes}
except Exception as e:
logger.error(f"Error getting timeframes: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/status")
async def get_status():
"""システムの状態を取得"""
try:
conn = sqlite3.connect(DB_NAME)
cursor = conn.cursor()
cursor.execute('SELECT COUNT(*) FROM ohlcv_data')
total_records = cursor.fetchone()[0]
cursor.execute('''
SELECT symbol, timeframe, COUNT(*) as count, 
MAX(datetime) as latest_data
FROM ohlcv_data 
GROUP BY symbol, timeframe
ORDER BY symbol, timeframe
''')
data_summary = cursor.fetchall()
conn.close()
return {
"total_records": total_records,
"scheduler_running": scheduler.is_running,
"data_summary": [
{
"symbol": row[0],
"timeframe": row[1],
"count": row[2],
"latest_data": row[3]
}
for row in data_summary
]
}
except Exception as e:
logger.error(f"Error getting status: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
if __name__ == "__main__":
# サーバーを起動
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8000,
reload=True,
log_level="info"
)

使用方法

Pythonの開発環境に関しては、Pythonの環境構築で解説しているように、pyenvPoetryを使用していることを前提としています。

1. 環境構築

まず、pyenvでPythonのバージョンをインストールし、Poetryでプロジェクトをセットアップします:

# Pythonのインストール(例: 3.11.5)
pyenv install 3.11.5
pyenv local 3.11.5
# Poetryプロジェクトの初期化
poetry init --no-interaction
# 必要な依存関係を追加
poetry add fastapi uvicorn ccxt pydantic pandas

2. サーバー起動

poetry run python main.py

サーバーが起動すると、以下のURLでアクセス可能になります:

  • API: http://localhost:8000
  • ドキュメント: http://localhost:8000/docs

3. API使用例

OHLCVデータの取得

curl "http://localhost:8000/ohlcv/BTC/USDT?timeframe=1h&limit=50"

手動でのデータ取得

curl -X POST "http://localhost:8000/fetch/ETH/USDT?timeframe=4h&limit=100"

システム状態の確認

curl "http://localhost:8000/status"

改善点の候補

このAPIは基本的な機能を提供していますが、以下のような改善点についても検討の余地があります。

1. パフォーマンス関連

  • データベースの最適化

    • PostgreSQLやMySQLへの移行によるスケーラビリティ向上
    • 読み取り専用レプリカの導入
    • パーティショニングによる大量データの効率的な管理
  • キャッシュ機能の追加

    • Redisを使用したメモリキャッシュ
    • よく使用されるデータの高速アクセス
    • キャッシュ無効化戦略の実装

2. セキュリティ強化

  • 認証・認可機能

    • JWTトークンベースの認証
    • APIキーによるアクセス制御
  • データ保護

    • HTTPS通信の強制
    • 入力値検証の強化

3. 監視・運用性

  • 監視機能

    • Prometheusメトリクス出力
    • ヘルスチェックエンドポイントの強化
    • アラート機能の追加
  • ログ機能

    • 構造化ログの導入
    • ログレベルの動的変更
    • 外部ログ管理システムとの連携

4. 機能拡張

  • 複数取引所対応

    • 設定による取引所の切り替え
    • アービトラージ検出機能
    • 取引所間のデータ統合
  • データ分析機能

    • テクニカル指標の計算
    • リアルタイムチャート機能
    • 統計データの提供

5. インフラ・デプロイ

  • コンテナ化

    • Dockerによる環境統一
    • Kubernetes対応
    • CI/CDパイプラインの構築
  • 設定管理

    • 環境変数による設定外部化

6. エラーハンドリングとリトライ

# 改善例: 指数バックオフによるリトライ機能
import asyncio
from tenacity import retry, stop_after_attempt, wait_exponential
class ImprovedExchangeDataFetcher(ExchangeDataFetcher):
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
async def fetch_and_store_ohlcv_with_retry(self, symbol: str, timeframe: str = '1h', 
limit: int = 100):
return await self.fetch_and_store_ohlcv(symbol, timeframe, limit)

7. 設定の外部化

# 改善例: 環境変数による設定管理
import os
from pydantic import BaseSettings
class Settings(BaseSettings):
database_url: str = os.getenv("DATABASE_URL", "sqlite:///ohlcv_data.db")
exchange_name: str = os.getenv("EXCHANGE_NAME", "binance")
api_key: str = os.getenv("API_KEY", "")
api_secret: str = os.getenv("API_SECRET", "")
fetch_interval: int = int(os.getenv("FETCH_INTERVAL", "3600"))
class Config:
env_file = ".env"
settings = Settings()

まとめ

本記事では、CCXTとFastAPIを使用したOHLCVデータ取得APIの構築方法を詳しく解説しました。このAPIには以下の特徴があります:

  • 効率的なデータ管理: 自動更新機能による常に最新のデータ提供
  • 高性能なAPI: FastAPIによる高速なレスポンス
  • 拡張可能性: モジュラー設計による機能追加の容易さ
  • 実用性: 実際のプロダクションで使用可能な堅牢性

提案した改善点を段階的に実装することで、より高性能で信頼性の高めることができます。ぜひ実践してみて、仮想通貨ボットライフを充実させましょう。


関連記事: