CCXTで作る仮想通貨自動取引ボット

はじめに

本記事は、過去の連載記事「CCXTを使って仮想通貨のトレードをしてみる」シリーズ(第1回~第4回)で解説した内容をもとに、実際に動作する自動取引ボットの全体像と実装例をまとめたものです。

  • 第1回では、CCXTの概要や環境構築、市場データの取得方法を解説しました。
  • 第2回では、実際の注文処理や基本的な取引戦略(移動平均・RSI等)の実装例を紹介しました。
  • 第3回では、リスク管理やバックテスト、パフォーマンス評価の考え方と実装例を解説しました。
  • 第4回では、税務・法的な注意点、運用監視やアラート機能、パフォーマンス分析の概要をまとめました。

これらの知見を活かし、PythonとCCXTで本格的な自動売買ボットを構築した事例を紹介します。

この記事ではPythonの環境構築がされている前提で解説を進めていきます。環境構築の方法については、Pythonの環境構築を参照してください。

ボットの特徴

  • CCXTによるマルチ取引所対応:CoincheckやBybitなど、主要な取引所APIに簡単接続
  • 戦略の差し替えが容易:移動平均クロスやRSIなど、戦略クラスを切り替えて検証可能
  • リスク管理機能:ストップロス・テイクプロフィット、ドローダウン監視などを標準搭載
  • トレード記録・ログ:全取引を自動記録し、運用状況を可視化

システム構成

  • bot.py:ボット本体。戦略・リスク管理・記録・ロギングを統合し、定期的に自動売買を実行
  • exchange_client.py:CCXTラッパー。取引所ごとのAPI差異を吸収
  • strategy.py:売買戦略クラス(例:移動平均クロス戦略)
  • risk_manager.py:リスク管理(ストップロス・テイクプロフィット等)
  • trade_recorder.py:トレード履歴の保存
  • logger.py:運用ログの記録
  • config.py/.env:APIキーや取引ペア等の設定

主要な処理の流れ

  1. 設定の読み込み

    • .envconfig.pyでAPIキー・取引ペア等を設定します。
  2. 取引所クライアントの初期化

    • 例:
      from exchange_client import ExchangeClient
      exchange = ExchangeClient('bybit', api_key=API_KEY, api_secret=API_SECRET)
  3. 戦略クラスの選択

    • 例:
      from strategy import MovingAverageStrategy
      strategy = MovingAverageStrategy(short_window=5, long_window=20)
  4. ボット本体の起動

    • 例:
      from bot import TradingBot
      bot = TradingBot(exchange=exchange, strategy=strategy)
      bot.run()
  5. 自動売買の実行

    • ボットは定期的に市場データを取得し、戦略に従って売買シグナルを判定。
    • シグナルに応じて成行注文を発注し、リスク管理条件(ストップロス・テイクプロフィット)で自動決済。
    • 取引履歴やログはファイルに自動保存されます。

コア処理のサンプル

移動平均クロス戦略の例

class MovingAverageStrategy(BaseStrategy):
    def analyze(self, ohlcv):
        df = pd.DataFrame(ohlcv, columns=['timestamp','open','high','low','close','volume'])
        df['ma_short'] = df['close'].rolling(window=self.short_window).mean()
        df['ma_long'] = df['close'].rolling(window=self.long_window).mean()
        # ...シグナル判定処理...

リスク管理(ストップロス・テイクプロフィット)

class RiskManager:
    def check_exit(self, current_price):
        if self.position == 'long':
            if current_price <= self.stop_loss:
                return 'stop_loss'
            elif current_price >= self.take_profit:
                return 'take_profit'
        return None

取引記録の保存

class TradeRecorder:
    def record_order(self, order, side, price, amount, result):
        record = {
            'timestamp': datetime.now().isoformat(),
            'side': side,
            'price': price,
            'amount': amount,
            'order_id': order.get('id'),
            'status': order.get('status'),
            'result': result
        }
        self.records.append(record)
        self.save_records()

ソースコード全体とファイルごとの詳細解説

ここからは、実際に動作する自動取引ボットの全ソースファイルを掲載し、各ファイルの役割や設計意図・ポイントを詳しく解説します。


1. bot.py(ボット本体)

# ボット本体
import time
from config import EXCHANGE_NAME, API_KEY, API_SECRET, SYMBOL, TRADE_AMOUNT, INTERVAL, STOP_LOSS_PCT, TAKE_PROFIT_PCT
from exchange_client import ExchangeClient
from strategy import BaseStrategy
from risk_manager import RiskManager
from trade_recorder import TradeRecorder
from logger import TradingLogger

class TradingBot:
    """
    自動売買ボットのメインクラス。
    戦略、リスク管理、取引記録、ロギングを統合して取引を実行する。
    """
    def __init__(self, exchange, strategy: BaseStrategy):
        self.exchange = exchange if exchange is not None else ExchangeClient()
        self.strategy = strategy
        self.risk_manager = RiskManager(STOP_LOSS_PCT, TAKE_PROFIT_PCT)
        self.recorder = TradeRecorder()
        self.logger = TradingLogger()
        self.position = None

    def run(self):
        self.logger.info('Bot started')
        while True:
            try:
                ohlcv = self.exchange.fetch_ohlcv(SYMBOL, '1h', 100)
                signal, price = self.strategy.analyze(ohlcv)
                balance = self.exchange.fetch_balance()
                current_price = price
                self.logger.info(f'Current Price: {current_price}, Signal: {signal}, Balance: {balance}')

                # エグジット判定
                if self.position == 'long':
                    exit_signal = self.risk_manager.check_exit(current_price)
                    if exit_signal:
                        order = self.exchange.create_market_order(SYMBOL, 'sell', TRADE_AMOUNT)
                        self.recorder.record_order(order, 'sell', current_price, TRADE_AMOUNT, exit_signal)
                        self.logger.info(f'Exit {exit_signal}: Sell {TRADE_AMOUNT} at {current_price}')
                        self.position = None
                        continue

                # エントリー判定
                if signal == 'buy' and self.position is None:
                    order = self.exchange.create_market_order(SYMBOL, 'buy', TRADE_AMOUNT)
                    self.risk_manager.set_position('long', current_price)
                    self.recorder.record_order(order, 'buy', current_price, TRADE_AMOUNT, 'entry')
                    self.logger.info(f'Entry: Buy {TRADE_AMOUNT} at {current_price}')
                    self.position = 'long'

            except Exception as e:
                self.logger.error(f'Error: {e}')
            time.sleep(INTERVAL)

if __name__ == '__main__':
    exchange = ExchangeClient(EXCHANGE_NAME, api_key=API_KEY, api_secret=API_SECRET, enable_rate_limit=True)
    from strategy import MovingAverageStrategy
    strategy=MovingAverageStrategy(short_window=5, long_window=20)
    bot = TradingBot(exchange=exchange, strategy=strategy)
    bot.run()

詳細解説:

  • ボットのエントリーポイントであり、全体の制御を担います。
  • TradingBotクラスは、取引所クライアント・戦略・リスク管理・記録・ロギングを集約。
  • run()メソッドで、以下の処理ををループ実行します。
    • 市場データの取得
    • 戦略によるシグナル判定
    • エントリー/エグジットの自動発注
    • 取引記録・ログ出力
  • 例外処理でエラーもロギングし、安定運用を意識した設計です。
  • if __name__ == '__main__': で設定・戦略を組み合わせて起動でき、他の戦略や取引所にも柔軟に対応できます。実行時のオプション引数として指定できるようにするのもいいかもしれません。

2. config.py(設定ファイル)

# CoinCheck APIキーなどの設定
import os

EXCHANGE_NAME = os.getenv('EXCHANGE_NAME')  # 使用する取引所名
API_KEY = os.getenv('API_KEY')
API_SECRET = os.getenv('API_SECRET')
SYMBOL = os.getenv('SYMBOL')  # 取引する通貨ペア(例: 'BTC/JPY'))
TRADE_AMOUNT = os.getenv('TRADE_AMOUNT')  # 取引数量
INTERVAL = 300  # 監視間隔(秒)

# リスク管理
STOP_LOSS_PCT = 3.0  # ストップロス(%)
TAKE_PROFIT_PCT = 5.0  # テイクプロフィット(%)

詳細解説:

  • APIキーや取引ペア、取引数量、監視間隔、リスク管理パラメータなどを一元管理。
  • .envファイルや環境変数から値を取得することで、ソースコードに秘密情報を直接書かずに済みます。
  • 複数取引所や複数戦略の切り替えも、このファイルの値を変更するだけで対応可能です。

3. exchange_client.py(CCXTラッパー)

# CoinCheck/CCXTラッパー
import ccxt

class ExchangeClient:
    """
    任意のCCXT対応取引所のラッパークラス。
    主要な取引所操作をメソッドとして提供する。
    """
    def __init__(self, exchange_name, api_key, api_secret, enable_rate_limit=True, **kwargs):
        exchange_class = getattr(ccxt, exchange_name)
        self.exchange = exchange_class({
            'apiKey': api_key,
            'secret': api_secret,
            'enableRateLimit': enable_rate_limit,
            **kwargs
        })

    def fetch_balance(self):
        return self.exchange.fetch_balance()

    def fetch_ohlcv(self, symbol, timeframe='1h', limit=100):
        return self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)

    def create_market_order(self, symbol, side, amount):
        if side == 'buy':
            return self.exchange.create_market_buy_order(symbol, amount)
        else:
            return self.exchange.create_market_sell_order(symbol, amount)

    def fetch_open_orders(self, symbol=None):
        return self.exchange.fetch_open_orders(symbol) if symbol else self.exchange.fetch_open_orders()

    def cancel_order(self, order_id, symbol):
        return self.exchange.cancel_order(order_id, symbol)

    def fetch_my_trades(self, symbol, limit=10):
        return self.exchange.fetch_my_trades(symbol, limit=limit)

詳細解説:

  • CCXTの取引所クラスをラップし、主要なAPI操作(残高取得・OHLCV取得・注文・キャンセル・約定履歴など)をメソッド化。
  • 取引所ごとの細かな違い(パラメータやAPI名)を吸収し、ボット本体からは統一的なインターフェースで操作できます。
  • 新しい取引所への対応やAPI拡張も、このクラスを拡張するだけで済みます。

4. strategy.py(売買戦略クラス)

# シンプルな移動平均クロス戦略
import pandas as pd
import numpy as np
from abc import ABC, abstractmethod

class BaseStrategy(ABC):
    @abstractmethod
    def analyze(self, ohlcv):
        pass

class MovingAverageStrategy(BaseStrategy):
    def __init__(self, short_window, long_window):
        self.short_window = short_window
        self.long_window = long_window
        self.position = None

    def analyze(self, ohlcv):
        df = pd.DataFrame(ohlcv, columns=['timestamp','open','high','low','close','volume'])
        df['ma_short'] = df['close'].rolling(window=self.short_window).mean()
        df['ma_long'] = df['close'].rolling(window=self.long_window).mean()
        df['signal'] = 0
        df.loc[self.short_window:, 'signal'] = np.where(
            df['ma_short'][self.short_window:] > df['ma_long'][self.short_window:], 1, 0
        )
        df['position'] = df['signal'].diff()
        latest = df.iloc[-1]
        prev = df.iloc[-2]
        if prev['signal'] == 0 and latest['signal'] == 1:
            return 'buy', latest['close']
        elif prev['signal'] == 1 and latest['signal'] == 0:
            return 'sell', latest['close']
        else:
            return None, latest['close']

詳細解説:

  • BaseStrategyは抽象基底クラスで、戦略ごとにanalyzeメソッドを実装。
  • MovingAverageStrategyは短期・長期移動平均のクロスでシグナルを判定。
  • DataFrameでテクニカル指標を計算し、直近のクロスで’buy’/’sell’/Noneを返します。
  • 他の戦略(RSI, 複合戦略等)も同じインターフェースで追加可能です。

5. risk_manager.py(リスク管理)

# リスク管理(ストップロス・テイクプロフィット)
class RiskManager:
    def __init__(self, stop_loss_pct, take_profit_pct):
        self.stop_loss_pct = stop_loss_pct
        self.take_profit_pct = take_profit_pct
        self.entry_price = None
        self.position = None

    def set_position(self, side, entry_price):
        self.position = side
        self.entry_price = entry_price
        self.stop_loss = self.calculate_stop_loss(entry_price, side)
        self.take_profit = self.calculate_take_profit(entry_price, side)

    def calculate_stop_loss(self, entry_price, side):
        if side == 'long':
            return entry_price * (1 - self.stop_loss_pct / 100)
        else:
            return entry_price * (1 + self.stop_loss_pct / 100)

    def calculate_take_profit(self, entry_price, side):
        if side == 'long':
            return entry_price * (1 + self.take_profit_pct / 100)
        else:
            return entry_price * (1 - self.take_profit_pct / 100)

    def check_exit(self, current_price):
        if self.position == 'long':
            if current_price <= self.stop_loss:
                return 'stop_loss'
            elif current_price >= self.take_profit:
                return 'take_profit'
        return None

詳細解説:

  • エントリー時にストップロス・テイクプロフィットの価格を自動計算。
  • check_exitで現在価格がリスク閾値に到達したか判定し、エグジットシグナルを返します。
  • これにより、感情に左右されない機械的な損切り・利確が可能です。
  • パラメータを変更することで、リスク許容度を柔軟に調整できます。

6. trade_recorder.py(トレード記録)

# トレード記録の保存
import json
from datetime import datetime

class TradeRecorder:
    def __init__(self, record_file="trade_records.json"):
        self.record_file = record_file
        self.records = self.load_records()

    def load_records(self):
        try:
            with open(self.record_file, 'r') as f:
                return json.load(f)
        except FileNotFoundError:
            return []

    def save_records(self):
        with open(self.record_file, 'w') as f:
            json.dump(self.records, f, indent=2, ensure_ascii=False)

    def record_order(self, order, side, price, amount, result):
        record = {
            'timestamp': datetime.now().isoformat(),
            'side': side,
            'price': price,
            'amount': amount,
            'order_id': order.get('id'),
            'status': order.get('status'),
            'result': result
        }
        self.records.append(record)
        self.save_records()

詳細解説:

  • すべての注文・約定をJSONファイルに記録。
  • 記録内容には、タイムスタンプ・売買区分・価格・数量・注文ID・ステータス・結果(entry/exit等)を含みます。
  • 記録は自動で追記・保存され、後からパフォーマンス分析や税務処理にも活用できます。
  • ファイルベースなので、Excelや他ツールでの集計も容易です。

7. logger.py(ロギング)

# ログ管理
import logging
import sys
from logging.handlers import TimedRotatingFileHandler

class TradingLogger:
    def __init__(self, log_file="trading.log"):
        self.logger = logging.getLogger('TradingBot')
        self.logger.setLevel(logging.INFO)
        formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
        file_handler = TimedRotatingFileHandler(log_file, when='midnight', interval=1, backupCount=31, encoding='utf-8')
        file_handler.setFormatter(formatter)
        console_handler = logging.StreamHandler(sys.stdout)
        console_handler.setFormatter(formatter)
        if not self.logger.handlers:
            self.logger.addHandler(file_handler)
            self.logger.addHandler(console_handler)

    def debug(self, msg):
        self.logger.debug(msg)
    def info(self, msg):
        self.logger.info(msg)
    def warning(self, msg):
        self.logger.warning(msg)
    def error(self, msg):
        self.logger.error(msg)
    def critical(self, msg):
        self.logger.critical(msg)

詳細解説:

  • ファイル・コンソール両方にINFOレベル以上のログを出力。
  • 日次ローテーションでログファイルが肥大化せず、長期運用にも安心。
  • ログには時刻・レベル・メッセージが記録され、運用監視やトラブル時の原因特定に役立ちます。
  • ログ出力の粒度(debug/info/warning/error/critical)も柔軟に制御可能です。

8. requirements.txt(依存パッケージ)

ccxt
pandas
numpy

詳細解説:

  • CCXT(取引所API)、pandas/numpy(データ処理・テクニカル指標計算)を利用。
  • pip install -r requirements.txtで一括インストールでき、環境構築も簡単です。

注意事項

  • 実際の資金で運用する前に、必ず少額やテスト環境で十分に検証してください。
  • 法的・税務上の要件を必ず確認し、自己責任で運用してください。

総括

本記事では、CCXTとPythonを活用した仮想通貨自動取引ボットの全体像と実装例を、ファイルごとに詳細に解説しました。各ファイルの設計意図や拡張ポイントを理解し、必要に応じてカスタマイズすることで、より高度な自動売買システムの構築も可能です。ぜひ本記事やサンプルコードを参考に、あなた自身の戦略やアイデアを形にしてみてください。


関連記事: