Python非同期処理入門 第3部:実用的な仮想通貨価格監視ツールを作ろう

これまでの記事で学んだ非同期処理の知識を活かして、実際に使える仮想通貨価格監視ツールを作成します。価格の変動をリアルタイムで監視し、設定した条件でアラートを出す機能も実装します。

作成するツールの機能

今回作るツールには以下の機能があります:

  1. 複数取引所の価格を同時監視
  2. 価格変動アラート(上昇・下落時に通知)
  3. 価格差アービトラージ検出(取引所間の価格差を検出)
  4. データの自動保存(CSV形式で記録)

基本的な価格監視クラス

まずは、シンプルな価格監視クラスから始めましょう:

import asyncio
import ccxt.async_support as ccxt
from datetime import datetime
import csv
import os

class CryptoPriceMonitor:
    """仮想通貨価格監視ツール"""

    def __init__(self, exchanges, symbols, check_interval=10):
        """
        初期化
        exchanges: 監視する取引所のリスト
        symbols: 監視する通貨ペアのリスト
        check_interval: チェック間隔(秒)
        """
        self.exchanges = exchanges
        self.symbols = symbols
        self.check_interval = check_interval
        self.running = False
        self.price_history = {}

    async def fetch_price(self, exchange_name, symbol):
        """1つの取引所から価格を取得"""
        exchange = None
        try:
            # 取引所オブジェクトを作成
            exchange_class = getattr(ccxt, exchange_name)
            exchange = exchange_class({'enableRateLimit': True})

            # 価格を取得
            ticker = await exchange.fetch_ticker(symbol)

            return {
                'exchange': exchange_name,
                'symbol': symbol,
                'price': ticker['last'],
                'volume': ticker['volume'],
                'timestamp': datetime.now()
            }

        except Exception as e:
            print(f"エラー {exchange_name}-{symbol}: {e}")
            return None

        finally:
            if exchange:
                await exchange.close()

    async def check_all_prices(self):
        """全ての価格をチェック"""
        tasks = []

        # 全ての組み合わせでタスクを作成
        for exchange in self.exchanges:
            for symbol in self.symbols:
                task = self.fetch_price(exchange, symbol)
                tasks.append(task)

        # 全てのタスクを同時実行
        results = await asyncio.gather(*tasks)

        # Noneを除外して返す
        return [r for r in results if r is not None]

    async def start_monitoring(self):
        """監視を開始"""
        self.running = True
        print("価格監視を開始しました...")
        print("停止するにはCtrl+Cを押してくださいn")

        while self.running:
            # 価格を取得
            prices = await self.check_all_prices()

            # 結果を表示
            self.display_prices(prices)

            # 次のチェックまで待機
            await asyncio.sleep(self.check_interval)

    def display_prices(self, prices):
        """価格を表示"""
        print(f"n========== {datetime.now().strftime('%H:%M:%S')} ==========")

        # シンボルごとに整理
        by_symbol = {}
        for price_data in prices:
            symbol = price_data['symbol']
            if symbol not in by_symbol:
                by_symbol[symbol] = []
            by_symbol[symbol].append(price_data)

        # 表示
        for symbol, price_list in by_symbol.items():
            print(f"n{symbol}:")
            for data in sorted(price_list, key=lambda x: x['price']):
                print(f"  {data['exchange']:<10} ${data['price']:>10,.2f}")

    def stop(self):
        """監視を停止"""
        self.running = False

# 使用例
async def main():
    monitor = CryptoPriceMonitor(
        exchanges=['binance', 'coinbase', 'kraken'],
        symbols=['BTC/USDT', 'ETH/USDT'],
        check_interval=15
    )

    try:
        await monitor.start_monitoring()
    except KeyboardInterrupt:
        print("n監視を終了しました")

# 実行
asyncio.run(main())

アラート機能の追加

価格が設定した閾値を超えたらアラートを出す機能を追加します:

class PriceAlert:
    """価格アラート設定"""
    def __init__(self, symbol, condition, threshold, message):
        self.symbol = symbol
        self.condition = condition  # 'above' または 'below'
        self.threshold = threshold
        self.message = message
        self.triggered = False

class AdvancedPriceMonitor(CryptoPriceMonitor):
    """アラート機能付き価格監視ツール"""

    def __init__(self, exchanges, symbols, check_interval=10):
        super().__init__(exchanges, symbols, check_interval)
        self.alerts = []
        self.last_prices = {}

    def add_alert(self, symbol, condition, threshold, message=None):
        """アラートを追加"""
        if message is None:
            message = f"{symbol}が${threshold:,.2f}を{'上回り' if condition == 'above' else '下回り'}ました!"

        alert = PriceAlert(symbol, condition, threshold, message)
        self.alerts.append(alert)
        print(f"アラート設定: {message}")

    async def check_alerts(self, prices):
        """アラート条件をチェック"""
        for price_data in prices:
            symbol = price_data['symbol']
            current_price = price_data['price']

            for alert in self.alerts:
                if alert.symbol != symbol or alert.triggered:
                    continue

                # 条件をチェック
                if (alert.condition == 'above' and current_price > alert.threshold) or 
                   (alert.condition == 'below' and current_price < alert.threshold):

                    # アラートを発火
                    print(f"n🚨 アラート: {alert.message}")
                    print(f"   {price_data['exchange']}: ${current_price:,.2f}")
                    alert.triggered = True

    async def check_price_changes(self, prices):
        """価格変動をチェック"""
        for price_data in prices:
            key = f"{price_data['exchange']}-{price_data['symbol']}"
            current_price = price_data['price']

            if key in self.last_prices:
                last_price = self.last_prices[key]
                change = current_price - last_price
                change_pct = (change / last_price) * 100

                # 大きな変動を検出(1%以上)
                if abs(change_pct) >= 1.0:
                    emoji = "📈" if change > 0 else "📉"
                    print(f"n{emoji} 価格変動: {price_data['exchange']} {price_data['symbol']}")
                    print(f"   ${last_price:,.2f} → ${current_price:,.2f} ({change_pct:+.2f}%)")

            self.last_prices[key] = current_price

    async def start_monitoring(self):
        """監視を開始(アラート機能付き)"""
        self.running = True
        print("価格監視を開始しました(アラート機能付き)...")
        print("停止するにはCtrl+Cを押してくださいn")

        while self.running:
            prices = await self.check_all_prices()

            # 通常の価格表示
            self.display_prices(prices)

            # アラートチェック
            await self.check_alerts(prices)

            # 価格変動チェック
            await self.check_price_changes(prices)

            await asyncio.sleep(self.check_interval)

# 使用例
async def main_with_alerts():
    monitor = AdvancedPriceMonitor(
        exchanges=['binance', 'coinbase'],
        symbols=['BTC/USDT'],
        check_interval=10
    )

    # アラートを設定
    monitor.add_alert('BTC/USDT', 'above', 45000)
    monitor.add_alert('BTC/USDT', 'below', 40000)

    try:
        await monitor.start_monitoring()
    except KeyboardInterrupt:
        print("n監視を終了しました")

# 実行
asyncio.run(main_with_alerts())

データ保存機能の実装

監視したデータをCSVファイルに保存する機能を追加します:

class DataSavingMonitor(AdvancedPriceMonitor):
    """データ保存機能付き監視ツール"""

    def __init__(self, exchanges, symbols, check_interval=10, save_dir="price_data"):
        super().__init__(exchanges, symbols, check_interval)
        self.save_dir = save_dir
        self.ensure_save_dir()

    def ensure_save_dir(self):
        """保存ディレクトリを作成"""
        if not os.path.exists(self.save_dir):
            os.makedirs(self.save_dir)
            print(f"データ保存ディレクトリを作成: {self.save_dir}")

    def save_to_csv(self, prices):
        """価格データをCSVに保存"""
        # 日付ごとにファイルを分ける
        today = datetime.now().strftime('%Y%m%d')

        for symbol in self.symbols:
            # シンボルごとのファイル名
            filename = f"{self.save_dir}/{symbol.replace('/', '_')}_{today}.csv"

            # ファイルが存在しない場合はヘッダーを書く
            write_header = not os.path.exists(filename)

            # データを書き込み
            with open(filename, 'a', newline='', encoding='utf-8') as f:
                writer = csv.writer(f)

                if write_header:
                    writer.writerow(['timestamp', 'exchange', 'price', 'volume'])

                # 該当するデータを抽出して保存
                for price_data in prices:
                    if price_data['symbol'] == symbol:
                        writer.writerow([
                            price_data['timestamp'].strftime('%Y-%m-%d %H:%M:%S'),
                            price_data['exchange'],
                            price_data['price'],
                            price_data['volume']
                        ])

    async def check_arbitrage(self, prices):
        """アービトラージ機会を検出"""
        # シンボルごとに価格を整理
        by_symbol = {}
        for price_data in prices:
            symbol = price_data['symbol']
            if symbol not in by_symbol:
                by_symbol[symbol] = []
            by_symbol[symbol].append(price_data)

        # 各シンボルで価格差をチェック
        for symbol, price_list in by_symbol.items():
            if len(price_list) < 2:
                continue

            # 最安値と最高値を見つける
            sorted_prices = sorted(price_list, key=lambda x: x['price'])
            min_price = sorted_prices[0]
            max_price = sorted_prices[-1]

            # 価格差を計算
            diff = max_price['price'] - min_price['price']
            diff_pct = (diff / min_price['price']) * 100

            # 1%以上の差があればアービトラージ機会として表示
            if diff_pct >= 1.0:
                print(f"n💰 アービトラージ機会検出: {symbol}")
                print(f"   買い: {min_price['exchange']} @ ${min_price['price']:,.2f}")
                print(f"   売り: {max_price['exchange']} @ ${max_price['price']:,.2f}")
                print(f"   利益: ${diff:,.2f} ({diff_pct:.2f}%)")

    async def start_monitoring(self):
        """監視を開始(データ保存機能付き)"""
        self.running = True
        print("価格監視を開始しました(データ保存機能付き)...")
        print(f"データは {self.save_dir} に保存されます")
        print("停止するにはCtrl+Cを押してくださいn")

        while self.running:
            prices = await self.check_all_prices()

            # 価格表示
            self.display_prices(prices)

            # データ保存
            self.save_to_csv(prices)

            # アラートチェック
            await self.check_alerts(prices)

            # アービトラージチェック
            await self.check_arbitrage(prices)

            await asyncio.sleep(self.check_interval)

# 完全版の使用例
async def main_complete():
    monitor = DataSavingMonitor(
        exchanges=['binance', 'coinbase', 'kraken'],
        symbols=['BTC/USDT', 'ETH/USDT'],
        check_interval=30,
        save_dir="crypto_prices"
    )

    # アラート設定
    monitor.add_alert('BTC/USDT', 'above', 45000, "ビットコインが$45,000を突破!")
    monitor.add_alert('ETH/USDT', 'above', 3000, "イーサリアムが$3,000を突破!")

    try:
        await monitor.start_monitoring()
    except KeyboardInterrupt:
        print("n監視を終了しました")
        print(f"保存されたデータは {monitor.save_dir} で確認できます")

# 実行
asyncio.run(main_complete())

使い方とカスタマイズ

基本的な使い方

# 1. 簡単な監視
monitor = CryptoPriceMonitor(
    exchanges=['binance'],
    symbols=['BTC/USDT'],
    check_interval=60  # 1分ごと
)

# 2. 複数取引所・複数通貨
monitor = DataSavingMonitor(
    exchanges=['binance', 'coinbase', 'kraken', 'bitfinex'],
    symbols=['BTC/USDT', 'ETH/USDT', 'XRP/USDT'],
    check_interval=30
)

# 3. アラート設定
monitor.add_alert('BTC/USDT', 'above', 50000)
monitor.add_alert('BTC/USDT', 'below', 30000)

カスタマイズのアイデア

  1. LINE通知の追加(LINE Notify APIを使用)
  2. Webダッシュボード(StreamlitやFlaskで可視化)
  3. データベース保存(SQLiteやPostgreSQL)
  4. テクニカル指標の計算(移動平均線など)

まとめ

この3部作で学んだこと:

  1. 第1部:非同期処理の基本概念
  2. 第2部:CCXTでの実践的な使い方
  3. 第3部:実用的なツールの作成

非同期処理を使うことで、複数の取引所からリアルタイムでデータを取得し、効率的に処理できるようになりました。

このツールをベースに、自分のニーズに合わせてカスタマイズしてみてください!

関連記事

コメントする