TensorTradeを使ったトレーディングボットの作成

はじめに

CCXTで作る仮想通貨自動取引ボットの記事では、CCXTライブラリを使用して仮想通貨の自動取引ボットを構築しました。今回は、TensorTradeライブラリを使用して、強化学習ベースの自動取引ボットを作成する方法について詳しく解説します。

TensorTradeとは

TensorTradeは、機械学習を使った取引戦略の開発とバックテストを行うためのPythonライブラリです。OpenAI Gymの取引版として設計されており、強化学習アルゴリズムを簡単に取引環境に適用できます。

主な特徴

  • 強化学習対応: DQN、PPO、A3Cなど様々なアルゴリズムをサポート
  • 柔軟な環境設定: カスタマイズ可能な取引環境
  • リアルタイム取引: 実際の取引所との連携も可能
  • 豊富なテクニカル指標: 組み込み指標とカスタム指標をサポート

ボットの全体構造

今回作成するボットは以下の主要コンポーネントで構成されています:

  1. データ管理システム: 価格データの取得と前処理
  2. 特徴量エンジニアリング: テクニカル指標の計算
  3. 取引環境: TensorTradeによる取引シミュレーション
  4. 強化学習エージェント: DQNベースの意思決定システム
  5. 学習・評価システム: モデルの訓練とパフォーマンス評価

実装の詳細解説

1. データ管理とテクニカル指標の計算

def load_data(self, symbol='BTCUSD', timeframe='1h', days=30):
    """価格データを読み込み"""
    # 実際の実装では取引所APIから取得
    end_date = datetime.now()
    start_date = end_date - timedelta(days=days)

    # 時系列データを生成
    date_range = pd.date_range(start=start_date, end=end_date, freq='H')

    # ランダムウォークでBTC価格をシミュレート
    np.random.seed(42)
    price = 50000
    prices = []

    for _ in range(len(date_range)):
        change = np.random.normal(0, 0.02)  # 2%の標準偏差
        price *= (1 + change)
        prices.append(price)

    # OHLCV データフレームを作成
    df = pd.DataFrame({
        'datetime': date_range,
        'open': prices,
        'high': [p * (1 + abs(np.random.normal(0, 0.01))) for p in prices],
        'low': [p * (1 - abs(np.random.normal(0, 0.01))) for p in prices],
        'close': prices,
        'volume': np.random.randint(100, 1000, len(prices))
    })

    return df

このメソッドでは、実際の取引データを模擬するサンプルデータを生成しています。実際の運用では、取引所のAPIを使用してリアルタイムデータを取得します。CCXTで作る仮想通貨自動取引ボット第1回を参考にしてみて下さい。

2. テクニカル指標の計算

テクニカル指標は、価格の動きを数値化した指標で、取引の意思決定に重要な役割を果たします。

def create_features(self):
    """テクニカル指標を追加"""
    df = self.data.copy()

    # 移動平均線
    df['sma_10'] = df['close'].rolling(window=10).mean()
    df['sma_20'] = df['close'].rolling(window=20).mean()
    df['ema_12'] = df['close'].ewm(span=12).mean()
    df['ema_26'] = df['close'].ewm(span=26).mean()

    # MACD(移動平均収束拡散)
    df['macd'] = df['ema_12'] - df['ema_26']
    df['macd_signal'] = df['macd'].ewm(span=9).mean()
    df['macd_histogram'] = df['macd'] - df['macd_signal']

    # RSI(相対力指数)
    delta = df['close'].diff()
    gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
    loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
    rs = gain / loss
    df['rsi'] = 100 - (100 / (1 + rs))

    # ボリンジャーバンド
    df['bb_middle'] = df['close'].rolling(window=20).mean()
    bb_std = df['close'].rolling(window=20).std()
    df['bb_upper'] = df['bb_middle'] + (bb_std * 2)
    df['bb_lower'] = df['bb_middle'] - (bb_std * 2)

    return df

計算している主要な指標:

  • 移動平均線(SMA/EMA): トレンドの方向性を示す
  • MACD: モメンタムの変化を捉える
  • RSI: 買われすぎ・売られすぎの状態を判定(0-100の範囲)
  • ボリンジャーバンド: 価格の変動範囲を示す

3. TensorTrade環境の設定

def setup_environment(self):
    """TensorTrade環境をセットアップ"""
    # データストリームを作成
    price_stream = Stream.source(self.data['close'].values, dtype="float").rename("USD-BTC")

    # 特徴量ストリームを作成
    feature_streams = []
    feature_columns = ['sma_10', 'sma_20', 'rsi', 'macd', 'macd_signal', 
                      'bb_upper', 'bb_lower', 'volatility', 'price_change']

    for col in feature_columns:
        if col in self.data.columns:
            stream = Stream.source(self.data[col].values, dtype="float").rename(col)
            feature_streams.append(stream)

    # データフィードを構築
    feed = DataFeed([price_stream] + feature_streams)

    # 取引所とポートフォリオを設定
    exchange = Exchange("simulated", service=execute_order)(price_stream)

    usd_wallet = Wallet(exchange, self.initial_balance * USD)
    btc_wallet = Wallet(exchange, 0 * BTC)
    portfolio = Portfolio(USD, [usd_wallet, btc_wallet])

    # 環境を作成
    self.env = default.create(
        feed=feed,
        portfolio=portfolio,
        action_scheme="managed-risk",
        reward_scheme="simple",
        window_size=20,
        max_allowed_loss=0.1
    )

この部分では、TensorTradeの強化学習環境を構築しています。重要なポイント:

  • データフィード: 価格データと特徴量を統合
  • 取引所: シミュレーション取引環境
  • ポートフォリオ: USD/BTC ウォレットを管理
  • アクションスキーム: 「managed-risk」でリスク管理機能を有効化

4. 強化学習エージェント

class SimpleDQNAgent:
    """簡単なDQNエージェント"""
    def __init__(self, state_size, action_size):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = []
        self.epsilon = 1.0  # exploration rate
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.learning_rate = 0.001

        # Q-テーブル(簡単な実装)
        self.q_table = {}

    def act(self, state):
        """アクションを選択"""
        if np.random.random() <= self.epsilon:
            return np.random.choice(self.action_size)

        # 状態を文字列に変換
        state_key = str(np.round(state, 2))

        if state_key not in self.q_table:
            self.q_table[state_key] = np.zeros(self.action_size)

        return np.argmax(self.q_table[state_key])

    def replay(self, batch_size):
        """経験リプレイで学習"""
        if len(self.memory) < batch_size:
            return

        batch = np.random.choice(len(self.memory), batch_size, replace=False)

        for i in batch:
            state, action, reward, next_state, done = self.memory[i]

            state_key = str(np.round(state, 2))
            next_state_key = str(np.round(next_state, 2))

            if state_key not in self.q_table:
                self.q_table[state_key] = np.zeros(self.action_size)
            if next_state_key not in self.q_table:
                self.q_table[next_state_key] = np.zeros(self.action_size)

            target = reward
            if not done:
                target = reward + 0.95 * np.amax(self.q_table[next_state_key])

            self.q_table[state_key][action] = target

        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

DQNエージェントの主要機能:

  • ε-greedy戦略: 探索と活用のバランス
  • 経験リプレイ: 過去の経験から学習
  • Q値更新: ベルマン方程式による価値関数の更新

5. 学習プロセス

def train(self, episodes=100):
    """エージェントを訓練"""
    scores = []

    for episode in range(episodes):
        state = self.env.reset()
        total_reward = 0
        done = False
        steps = 0

        while not done and steps < 1000:
            # エージェントがアクションを選択
            action = self.agent.act(state)

            # 環境でアクションを実行
            next_state, reward, done, _ = self.env.step(action)

            # エージェントに経験を記憶させる
            self.agent.remember(state, action, reward, next_state, done)

            state = next_state
            total_reward += reward
            steps += 1

            # 定期的に学習
            if len(self.agent.memory) > 32:
                self.agent.replay(32)

        scores.append(total_reward)

        # 10エピソードごとに進捗表示
        if (episode + 1) % 10 == 0:
            avg_score = np.mean(scores[-10:])
            print(f"Episode {episode + 1}/{episodes}, Average Score: {avg_score:.2f}")

    return scores

学習ループでは以下を実行:

  1. 環境をリセット
  2. エージェントが状態を観察してアクションを選択
  3. 環境でアクションを実行し、報酬を取得
  4. 経験を記憶し、定期的に学習を実行

パフォーマンスの評価

def plot_results(self, scores):
    """結果をプロット"""
    plt.figure(figsize=(12, 8))

    # 学習曲線
    plt.subplot(2, 2, 1)
    plt.plot(scores)
    plt.title('Training Scores')
    plt.xlabel('Episode')
    plt.ylabel('Score')

    # 移動平均
    plt.subplot(2, 2, 2)
    window = min(10, len(scores))
    moving_avg = pd.Series(scores).rolling(window=window).mean()
    plt.plot(moving_avg)
    plt.title(f'Moving Average (window={window})')

    # 価格データとテクニカル指標も同時に表示
    plt.tight_layout()
    plt.show()

完全なコード

以下が今回作成したTensorTrade自動取引ボットの完全なコードです:

import numpy as np
import pandas as pd
import tensortrade.env.default as default
from tensortrade.env.default.actions import TensorTradeActionScheme
from tensortrade.env.default.rewards import SimpleProfit
from tensortrade.env.default.observers import TensorTradeObserver
from tensortrade.data.cdd import CryptoDataDownload
from tensortrade.feed.core import Stream, DataFeed
from tensortrade.oms.exchanges import Exchange
from tensortrade.oms.services.execution.simulated import execute_order
from tensortrade.oms.instruments import USD, BTC
from tensortrade.oms.wallets import Wallet, Portfolio
from tensortrade.agents import DQNAgent
import matplotlib.pyplot as plt
from datetime import datetime, timedelta
class TensorTradeBot:
def __init__(self, initial_balance=10000):
self.initial_balance = initial_balance
self.exchange = None
self.portfolio = None
self.env = None
self.agent = None
self.data = None
def load_data(self, symbol='BTCUSD', timeframe='1h', days=30):
"""価格データを読み込み"""
print(f"Loading data for {symbol}...")
# サンプルデータを生成(実際の実装では取引所APIから取得)
end_date = datetime.now()
start_date = end_date - timedelta(days=days)
# 時系列データを生成
date_range = pd.date_range(start=start_date, end=end_date, freq='H')
# BTC価格のようなデータを生成(ランダムウォーク)
np.random.seed(42)
price = 50000  # 初期価格
prices = []
for _ in range(len(date_range)):
change = np.random.normal(0, 0.02)  # 2%の標準偏差
price *= (1 + change)
prices.append(price)
# OHLCV データを作成
df = pd.DataFrame({
'datetime': date_range,
'open': prices,
'high': [p * (1 + abs(np.random.normal(0, 0.01))) for p in prices],
'low': [p * (1 - abs(np.random.normal(0, 0.01))) for p in prices],
'close': prices,
'volume': np.random.randint(100, 1000, len(prices))
})
df.set_index('datetime', inplace=True)
self.data = df
print(f"Loaded {len(df)} data points")
return df
def create_features(self):
"""テクニカル指標を追加"""
if self.data is None:
raise ValueError("Data not loaded. Call load_data() first.")
df = self.data.copy()
# 移動平均
df['sma_10'] = df['close'].rolling(window=10).mean()
df['sma_20'] = df['close'].rolling(window=20).mean()
df['ema_12'] = df['close'].ewm(span=12).mean()
df['ema_26'] = df['close'].ewm(span=26).mean()
# MACD
df['macd'] = df['ema_12'] - df['ema_26']
df['macd_signal'] = df['macd'].ewm(span=9).mean()
df['macd_histogram'] = df['macd'] - df['macd_signal']
# RSI
delta = df['close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rs = gain / loss
df['rsi'] = 100 - (100 / (1 + rs))
# ボリンジャーバンド
df['bb_middle'] = df['close'].rolling(window=20).mean()
bb_std = df['close'].rolling(window=20).std()
df['bb_upper'] = df['bb_middle'] + (bb_std * 2)
df['bb_lower'] = df['bb_middle'] - (bb_std * 2)
# ボラティリティ
df['volatility'] = df['close'].pct_change().rolling(window=20).std()
# 価格変化率
df['price_change'] = df['close'].pct_change()
df['price_change_5'] = df['close'].pct_change(periods=5)
# NaNを削除
df = df.dropna()
self.data = df
print(f"Created features. Data shape: {df.shape}")
return df
def setup_environment(self):
"""TensorTrade環境をセットアップ"""
if self.data is None:
raise ValueError("Data not loaded. Call load_data() and create_features() first.")
# データフィードを作成
price_stream = Stream.source(self.data['close'].values, dtype="float").rename("USD-BTC")
# 特徴量ストリームを作成
feature_streams = []
feature_columns = ['sma_10', 'sma_20', 'rsi', 'macd', 'macd_signal', 
'bb_upper', 'bb_lower', 'volatility', 'price_change']
for col in feature_columns:
if col in self.data.columns:
stream = Stream.source(self.data[col].values, dtype="float").rename(col)
feature_streams.append(stream)
# データフィードを構築
feed = DataFeed([price_stream] + feature_streams)
# 取引所とポートフォリオを設定
exchange = Exchange("simulated", service=execute_order)(
price_stream
)
# ウォレットとポートフォリオ
usd_wallet = Wallet(exchange, self.initial_balance * USD)
btc_wallet = Wallet(exchange, 0 * BTC)
portfolio = Portfolio(USD, [usd_wallet, btc_wallet])
# 環境を作成
self.env = default.create(
feed=feed,
portfolio=portfolio,
action_scheme="managed-risk",
reward_scheme="simple",
window_size=20,
max_allowed_loss=0.1
)
self.exchange = exchange
self.portfolio = portfolio
print("Environment setup completed")
def create_agent(self):
"""DQNエージェントを作成"""
if self.env is None:
raise ValueError("Environment not setup. Call setup_environment() first.")
# 状態とアクションの次元を取得
state_size = self.env.observation_space.shape[0]
action_size = self.env.action_space.n
print(f"State size: {state_size}, Action size: {action_size}")
# DQNエージェントを作成(簡単な実装)
self.agent = SimpleDQNAgent(state_size, action_size)
print("Agent created")
def train(self, episodes=100):
"""エージェントを訓練"""
if self.agent is None:
raise ValueError("Agent not created. Call create_agent() first.")
print(f"Training for {episodes} episodes...")
scores = []
for episode in range(episodes):
state = self.env.reset()
total_reward = 0
done = False
steps = 0
while not done and steps < 1000:  # 最大ステップ数を制限
# エージェントがアクションを選択
action = self.agent.act(state)
# 環境でアクションを実行
next_state, reward, done, _ = self.env.step(action)
# エージェントに経験を記憶させる
self.agent.remember(state, action, reward, next_state, done)
state = next_state
total_reward += reward
steps += 1
# 定期的に学習
if len(self.agent.memory) > 32:
self.agent.replay(32)
scores.append(total_reward)
# 進捗表示
if (episode + 1) % 10 == 0:
avg_score = np.mean(scores[-10:])
print(f"Episode {episode + 1}/{episodes}, Average Score: {avg_score:.2f}, Steps: {steps}")
return scores
def test(self, episodes=10):
"""訓練されたエージェントをテスト"""
if self.agent is None:
raise ValueError("Agent not created or trained.")
print(f"Testing for {episodes} episodes...")
test_scores = []
# テスト時はexplorationを無効化
original_epsilon = self.agent.epsilon
self.agent.epsilon = 0.01  # 最小限のexploration
for episode in range(episodes):
state = self.env.reset()
total_reward = 0
done = False
steps = 0
while not done and steps < 1000:
action = self.agent.act(state)
state, reward, done, _ = self.env.step(action)
total_reward += reward
steps += 1
test_scores.append(total_reward)
print(f"Test Episode {episode + 1}: Score = {total_reward:.2f}, Steps = {steps}")
# epsilonを元に戻す
self.agent.epsilon = original_epsilon
avg_test_score = np.mean(test_scores)
print(f"Average Test Score: {avg_test_score:.2f}")
return test_scores
def plot_results(self, scores):
"""結果をプロット"""
plt.figure(figsize=(12, 8))
# 学習曲線
plt.subplot(2, 2, 1)
plt.plot(scores)
plt.title('Training Scores')
plt.xlabel('Episode')
plt.ylabel('Score')
# 移動平均
plt.subplot(2, 2, 2)
window = min(10, len(scores))
moving_avg = pd.Series(scores).rolling(window=window).mean()
plt.plot(moving_avg)
plt.title(f'Moving Average (window={window})')
plt.xlabel('Episode')
plt.ylabel('Average Score')
# 価格データ
plt.subplot(2, 2, 3)
plt.plot(self.data.index, self.data['close'])
plt.title('Price Data')
plt.xlabel('Time')
plt.ylabel('Price')
plt.xticks(rotation=45)
# テクニカル指標
plt.subplot(2, 2, 4)
plt.plot(self.data.index, self.data['rsi'])
plt.axhline(y=70, color='r', linestyle='--', alpha=0.7)
plt.axhline(y=30, color='g', linestyle='--', alpha=0.7)
plt.title('RSI')
plt.xlabel('Time')
plt.ylabel('RSI')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
class SimpleDQNAgent:
"""簡単なDQNエージェント"""
def __init__(self, state_size, action_size):
self.state_size = state_size
self.action_size = action_size
self.memory = []
self.epsilon = 1.0  # exploration rate
self.epsilon_min = 0.01
self.epsilon_decay = 0.995
self.learning_rate = 0.001
# Q-テーブル(簡単な実装)
self.q_table = {}
def remember(self, state, action, reward, next_state, done):
"""経験を記憶"""
self.memory.append((state, action, reward, next_state, done))
if len(self.memory) > 2000:  # メモリサイズを制限
self.memory.pop(0)
def act(self, state):
"""アクションを選択"""
if np.random.random() <= self.epsilon:
return np.random.choice(self.action_size)
# 状態を文字列に変換(簡単なハッシュ)
state_key = str(np.round(state, 2))
if state_key not in self.q_table:
self.q_table[state_key] = np.zeros(self.action_size)
return np.argmax(self.q_table[state_key])
def replay(self, batch_size):
"""経験リプレイで学習"""
if len(self.memory) < batch_size:
return
batch = np.random.choice(len(self.memory), batch_size, replace=False)
for i in batch:
state, action, reward, next_state, done = self.memory[i]
state_key = str(np.round(state, 2))
next_state_key = str(np.round(next_state, 2))
if state_key not in self.q_table:
self.q_table[state_key] = np.zeros(self.action_size)
if next_state_key not in self.q_table:
self.q_table[next_state_key] = np.zeros(self.action_size)
target = reward
if not done:
target = reward + 0.95 * np.amax(self.q_table[next_state_key])
self.q_table[state_key][action] = target
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay
# 使用例
def main():
# ボットを作成
bot = TensorTradeBot(initial_balance=10000)
# データを読み込み
bot.load_data(symbol='BTCUSD', days=60)
# 特徴量を作成
bot.create_features()
# 環境をセットアップ
bot.setup_environment()
# エージェントを作成
bot.create_agent()
# 訓練
print("Starting training...")
scores = bot.train(episodes=50)
# テスト
print("Starting testing...")
test_scores = bot.test(episodes=5)
# 結果をプロット
bot.plot_results(scores)
return bot
if __name__ == "__main__":
bot = main()

実際の運用に向けた改善点

1. データソースの改善

# 実際の取引所APIを使用
import ccxt
def load_real_data(self, exchange_name='binance', symbol='BTC/USDT'):
exchange = getattr(ccxt, exchange_name)()
ohlcv = exchange.fetch_ohlcv(symbol, '1h', limit=1000)
df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
df['datetime'] = pd.to_datetime(df['timestamp'], unit='ms')
return df

2. より高度な特徴量

def create_advanced_features(self):
# フラクタル次元
# 出来高加重平均価格(VWAP)
# オンバランスボリューム(OBV)
# ストキャスティクス
# ウィリアムズ%R
pass

3. リスク管理の強化

def add_risk_management(self):
# ストップロス
# テイクプロフィット
# ポジションサイジング
# 最大ドローダウン制御
pass

4. より高度なニューラルネットワーク

import tensorflow as tf
class AdvancedDQNAgent:
def __init__(self, state_size, action_size):
self.model = tf.keras.Sequential([
tf.keras.layers.Dense(128, activation='relu', input_shape=(state_size,)),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(64, activation='relu'),
tf.keras.layers.Dropout(0.2),
tf.keras.layers.Dense(32, activation='relu'),
tf.keras.layers.Dense(action_size, activation='linear')
])
self.model.compile(optimizer='adam', loss='mse')

まとめ

TensorTradeを使用した自動取引ボットの構築について詳しく解説しました。このボットは以下の要素を組み合わせています:

実装したコンポーネント

  1. データ管理: 価格データの取得と前処理
  2. 特徴量エンジニアリング: テクニカル指標の計算
  3. 環境設定: TensorTradeによる取引シミュレーション
  4. 強化学習: DQNベースの意思決定アルゴリズム
  5. 評価システム: パフォーマンスの可視化と分析

技術的なポイント

  • 強化学習の適用: 価格予測ではなく、行動価値を学習
  • リアルタイム性: ストリーミングデータに対応
  • リスク管理: ポートフォリオレベルでの損失制御
  • 拡張性: カスタム指標や戦略の追加が容易

実際の運用時の注意点

リスク管理

class RiskManager:
def __init__(self, max_drawdown=0.15, max_position_size=0.5):
self.max_drawdown = max_drawdown
self.max_position_size = max_position_size
self.peak_portfolio_value = 0
def check_drawdown(self, current_value):
if current_value > self.peak_portfolio_value:
self.peak_portfolio_value = current_value
drawdown = (self.peak_portfolio_value - current_value) / self.peak_portfolio_value
return drawdown < self.max_drawdown
def calculate_position_size(self, portfolio_value, signal_strength):
base_size = portfolio_value * self.max_position_size
return base_size * signal_strength

バックテストの重要性

def comprehensive_backtest(self, start_date, end_date):
"""包括的なバックテスト"""
results = {
'total_return': 0,
'sharpe_ratio': 0,
'max_drawdown': 0,
'win_rate': 0,
'profit_factor': 0
}
# 期間外データでのテスト
# ウォークフォワード最適化
# モンテカルロシミュレーション
return results

データ品質の確保

def validate_data(self, df):
"""データ品質チェック"""
checks = {
'missing_values': df.isnull().sum().sum() == 0,
'duplicate_timestamps': df.index.duplicated().sum() == 0,
'price_anomalies': self.detect_price_anomalies(df),
'volume_consistency': self.check_volume_consistency(df)
}
return all(checks.values()), checks
def detect_price_anomalies(self, df):
"""価格の異常値を検出"""
price_changes = df['close'].pct_change()
# 3σを超える変化を異常とみなす
threshold = 3 * price_changes.std()
anomalies = abs(price_changes) > threshold
return anomalies.sum() < len(df) * 0.01  # 1%未満なら正常

パフォーマンス最適化

計算効率の改善

import numba
@numba.jit(nopython=True)
def fast_sma(prices, window):
"""高速移動平均計算"""
result = np.empty_like(prices)
result[:window-1] = np.nan
for i in range(window-1, len(prices)):
result[i] = np.mean(prices[i-window+1:i+1])
return result
def vectorized_features(self, df):
"""ベクトル化された特徴量計算"""
# NumPyのベクトル化操作を活用
df['returns'] = np.log(df['close'] / df['close'].shift(1))
df['volatility'] = df['returns'].rolling(20).std() * np.sqrt(252)
# 複数の移動平均を一度に計算
for window in [5, 10, 20, 50]:
df[f'sma_{window}'] = df['close'].rolling(window).mean()
return df

メモリ効率の改善

def optimize_memory(self, df):
"""メモリ使用量を最適化"""
# データ型を最適化
for col in df.select_dtypes(include=['float64']).columns:
df[col] = pd.to_numeric(df[col], downcast='float')
for col in df.select_dtypes(include=['int64']).columns:
df[col] = pd.to_numeric(df[col], downcast='integer')
# 不要な列を削除
essential_columns = ['open', 'high', 'low', 'close', 'volume'] + self.feature_columns
df = df[essential_columns]
return df

実運用への展開

リアルタイムデータ処理

import asyncio
import websocket
class RealTimeDataProcessor:
def __init__(self, bot):
self.bot = bot
self.data_buffer = []
async def process_tick_data(self, tick):
"""ティックデータをリアルタイム処理"""
self.data_buffer.append(tick)
# 十分なデータが蓄積されたら予測実行
if len(self.data_buffer) >= self.bot.window_size:
features = self.bot.create_features_realtime(self.data_buffer)
action = self.bot.agent.act(features)
# 取引実行
if action != 0:  # 0 = hold
await self.execute_trade(action, tick['price'])
async def execute_trade(self, action, price):
"""実際の取引実行"""
# 取引所APIを通じて注文実行
# リスク管理チェック
# ログ記録
pass

モニタリングとアラート

class TradingMonitor:
def __init__(self, bot):
self.bot = bot
self.alerts = []
def check_system_health(self):
"""システムヘルスチェック"""
checks = {
'data_freshness': self.check_data_freshness(),
'model_performance': self.check_model_performance(),
'risk_metrics': self.check_risk_metrics(),
'connection_status': self.check_connections()
}
for check, status in checks.items():
if not status:
self.send_alert(f"Warning: {check} failed")
return checks
def send_alert(self, message):
"""アラート送信(メール、Slack等)"""
# アラート送信ロジック
print(f"ALERT: {message}")
self.alerts.append({
'timestamp': datetime.now(),
'message': message
})

セキュリティ考慮事項

APIキーの安全管理

import os
from cryptography.fernet import Fernet
class SecureConfig:
def __init__(self):
self.cipher = Fernet(os.environ.get('ENCRYPTION_KEY'))
def encrypt_api_key(self, api_key):
"""APIキーを暗号化"""
return self.cipher.encrypt(api_key.encode())
def decrypt_api_key(self, encrypted_key):
"""APIキーを復号化"""
return self.cipher.decrypt(encrypted_key).decode()
def load_secure_config(self):
"""設定を安全に読み込み"""
return {
'api_key': self.decrypt_api_key(os.environ.get('ENCRYPTED_API_KEY')),
'api_secret': self.decrypt_api_key(os.environ.get('ENCRYPTED_API_SECRET')),
'max_position_size': float(os.environ.get('MAX_POSITION_SIZE', '0.1'))
}

今後の発展方向

1. マルチアセット対応

class MultiAssetBot(TensorTradeBot):
def __init__(self, assets=['BTC/USDT', 'ETH/USDT', 'ADA/USDT']):
super().__init__()
self.assets = assets
self.portfolios = {}
def create_multi_asset_environment(self):
"""マルチアセット環境を構築"""
# 複数通貨ペアの同時取引
# ポートフォリオ最適化
# 相関分析
pass

2. 高頻度取引対応

class HighFrequencyBot(TensorTradeBot):
def __init__(self):
super().__init__()
self.latency_monitor = LatencyMonitor()
def ultra_fast_prediction(self, market_data):
"""超高速予測(マイクロ秒レベル)"""
# C++での計算部分
# GPU加速
# 専用ハードウェア対応
pass

3. AI技術の統合

class AIEnhancedBot(TensorTradeBot):
def __init__(self):
super().__init__()
self.sentiment_analyzer = SentimentAnalyzer()
self.news_processor = NewsProcessor()
def incorporate_alternative_data(self):
"""オルタナティブデータの活用"""
# ニュース感情分析
# SNS動向分析
# マクロ経済指標
# 衛星データ
pass

法的・規制面の考慮

自動取引システムを運用する際は、以下の法的要件を確認してください:

  1. 金融商品取引法の遵守
  2. 税務申告の適切な処理
  3. データ保護規制への対応
  4. 取引所の利用規約確認
  5. アルゴリズム取引の報告義務

結論

TensorTradeを使用した自動取引ボットは、強化学習の力を活用して市場の複雑なパターンを学習し、データドリブンな取引決定を行うことができます。ただし、実際の運用には慎重なリスク管理、継続的なモニタリング、そして規制要件への対応が不可欠です。

今回紹介したコードは教育目的の基本実装ですが、これをベースに本格的な取引システムを構築することが可能です。重要なのは、段階的に機能を追加し、十分なテストを行いながら開発を進めることです。

最終的なアドバイス: 実際の資金での運用前には、必ず十分なバックテストと紙取引でのテストを行い、リスクを理解した上で小額から始めることをお勧めします。


関連記事: