【Windows11 Pro】 DSPライセンス日本語版(インストールUSB付)
¥10,980 (2025-07-17 16:40 GMT +09:00 時点 - 詳細はこちら価格および発送可能時期は表示された日付/時刻の時点のものであり、変更される場合があります。本商品の購入においては、購入の時点で当該の Amazon サイトに表示されている価格および発送可能時期の情報が適用されます。)かんたん印刷位置合わせ2 DL版 [ダウンロード]
¥2,118 (2025-07-17 16:40 GMT +09:00 時点 - 詳細はこちら価格および発送可能時期は表示された日付/時刻の時点のものであり、変更される場合があります。本商品の購入においては、購入の時点で当該の Amazon サイトに表示されている価格および発送可能時期の情報が適用されます。)これまでの記事で学んだ非同期処理の知識を活かして、実際に使える仮想通貨価格監視ツールを作成します。価格の変動をリアルタイムで監視し、設定した条件でアラートを出す機能も実装します。
作成するツールの機能
今回作るツールには以下の機能があります:
- 複数取引所の価格を同時監視
- 価格変動アラート(上昇・下落時に通知)
- 価格差アービトラージ検出(取引所間の価格差を検出)
- データの自動保存(CSV形式で記録)
基本的な価格監視クラス
まずは、シンプルな価格監視クラスから始めましょう:
import asyncio
import ccxt.async_support as ccxt
from datetime import datetime
import csv
import os
class CryptoPriceMonitor:
"""仮想通貨価格監視ツール"""
def __init__(self, exchanges, symbols, check_interval=10):
"""
初期化
exchanges: 監視する取引所のリスト
symbols: 監視する通貨ペアのリスト
check_interval: チェック間隔(秒)
"""
self.exchanges = exchanges
self.symbols = symbols
self.check_interval = check_interval
self.running = False
self.price_history = {}
async def fetch_price(self, exchange_name, symbol):
"""1つの取引所から価格を取得"""
exchange = None
try:
# 取引所オブジェクトを作成
exchange_class = getattr(ccxt, exchange_name)
exchange = exchange_class({'enableRateLimit': True})
# 価格を取得
ticker = await exchange.fetch_ticker(symbol)
return {
'exchange': exchange_name,
'symbol': symbol,
'price': ticker['last'],
'volume': ticker['volume'],
'timestamp': datetime.now()
}
except Exception as e:
print(f"エラー {exchange_name}-{symbol}: {e}")
return None
finally:
if exchange:
await exchange.close()
async def check_all_prices(self):
"""全ての価格をチェック"""
tasks = []
# 全ての組み合わせでタスクを作成
for exchange in self.exchanges:
for symbol in self.symbols:
task = self.fetch_price(exchange, symbol)
tasks.append(task)
# 全てのタスクを同時実行
results = await asyncio.gather(*tasks)
# Noneを除外して返す
return [r for r in results if r is not None]
async def start_monitoring(self):
"""監視を開始"""
self.running = True
print("価格監視を開始しました...")
print("停止するにはCtrl+Cを押してくださいn")
while self.running:
# 価格を取得
prices = await self.check_all_prices()
# 結果を表示
self.display_prices(prices)
# 次のチェックまで待機
await asyncio.sleep(self.check_interval)
def display_prices(self, prices):
"""価格を表示"""
print(f"n========== {datetime.now().strftime('%H:%M:%S')} ==========")
# シンボルごとに整理
by_symbol = {}
for price_data in prices:
symbol = price_data['symbol']
if symbol not in by_symbol:
by_symbol[symbol] = []
by_symbol[symbol].append(price_data)
# 表示
for symbol, price_list in by_symbol.items():
print(f"n{symbol}:")
for data in sorted(price_list, key=lambda x: x['price']):
print(f" {data['exchange']:<10} ${data['price']:>10,.2f}")
def stop(self):
"""監視を停止"""
self.running = False
# 使用例
async def main():
monitor = CryptoPriceMonitor(
exchanges=['binance', 'coinbase', 'kraken'],
symbols=['BTC/USDT', 'ETH/USDT'],
check_interval=15
)
try:
await monitor.start_monitoring()
except KeyboardInterrupt:
print("n監視を終了しました")
# 実行
asyncio.run(main())
アラート機能の追加
価格が設定した閾値を超えたらアラートを出す機能を追加します:
class PriceAlert:
"""価格アラート設定"""
def __init__(self, symbol, condition, threshold, message):
self.symbol = symbol
self.condition = condition # 'above' または 'below'
self.threshold = threshold
self.message = message
self.triggered = False
class AdvancedPriceMonitor(CryptoPriceMonitor):
"""アラート機能付き価格監視ツール"""
def __init__(self, exchanges, symbols, check_interval=10):
super().__init__(exchanges, symbols, check_interval)
self.alerts = []
self.last_prices = {}
def add_alert(self, symbol, condition, threshold, message=None):
"""アラートを追加"""
if message is None:
message = f"{symbol}が${threshold:,.2f}を{'上回り' if condition == 'above' else '下回り'}ました!"
alert = PriceAlert(symbol, condition, threshold, message)
self.alerts.append(alert)
print(f"アラート設定: {message}")
async def check_alerts(self, prices):
"""アラート条件をチェック"""
for price_data in prices:
symbol = price_data['symbol']
current_price = price_data['price']
for alert in self.alerts:
if alert.symbol != symbol or alert.triggered:
continue
# 条件をチェック
if (alert.condition == 'above' and current_price > alert.threshold) or
(alert.condition == 'below' and current_price < alert.threshold):
# アラートを発火
print(f"n🚨 アラート: {alert.message}")
print(f" {price_data['exchange']}: ${current_price:,.2f}")
alert.triggered = True
async def check_price_changes(self, prices):
"""価格変動をチェック"""
for price_data in prices:
key = f"{price_data['exchange']}-{price_data['symbol']}"
current_price = price_data['price']
if key in self.last_prices:
last_price = self.last_prices[key]
change = current_price - last_price
change_pct = (change / last_price) * 100
# 大きな変動を検出(1%以上)
if abs(change_pct) >= 1.0:
emoji = "📈" if change > 0 else "📉"
print(f"n{emoji} 価格変動: {price_data['exchange']} {price_data['symbol']}")
print(f" ${last_price:,.2f} → ${current_price:,.2f} ({change_pct:+.2f}%)")
self.last_prices[key] = current_price
async def start_monitoring(self):
"""監視を開始(アラート機能付き)"""
self.running = True
print("価格監視を開始しました(アラート機能付き)...")
print("停止するにはCtrl+Cを押してくださいn")
while self.running:
prices = await self.check_all_prices()
# 通常の価格表示
self.display_prices(prices)
# アラートチェック
await self.check_alerts(prices)
# 価格変動チェック
await self.check_price_changes(prices)
await asyncio.sleep(self.check_interval)
# 使用例
async def main_with_alerts():
monitor = AdvancedPriceMonitor(
exchanges=['binance', 'coinbase'],
symbols=['BTC/USDT'],
check_interval=10
)
# アラートを設定
monitor.add_alert('BTC/USDT', 'above', 45000)
monitor.add_alert('BTC/USDT', 'below', 40000)
try:
await monitor.start_monitoring()
except KeyboardInterrupt:
print("n監視を終了しました")
# 実行
asyncio.run(main_with_alerts())
データ保存機能の実装
監視したデータをCSVファイルに保存する機能を追加します:
class DataSavingMonitor(AdvancedPriceMonitor):
"""データ保存機能付き監視ツール"""
def __init__(self, exchanges, symbols, check_interval=10, save_dir="price_data"):
super().__init__(exchanges, symbols, check_interval)
self.save_dir = save_dir
self.ensure_save_dir()
def ensure_save_dir(self):
"""保存ディレクトリを作成"""
if not os.path.exists(self.save_dir):
os.makedirs(self.save_dir)
print(f"データ保存ディレクトリを作成: {self.save_dir}")
def save_to_csv(self, prices):
"""価格データをCSVに保存"""
# 日付ごとにファイルを分ける
today = datetime.now().strftime('%Y%m%d')
for symbol in self.symbols:
# シンボルごとのファイル名
filename = f"{self.save_dir}/{symbol.replace('/', '_')}_{today}.csv"
# ファイルが存在しない場合はヘッダーを書く
write_header = not os.path.exists(filename)
# データを書き込み
with open(filename, 'a', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
if write_header:
writer.writerow(['timestamp', 'exchange', 'price', 'volume'])
# 該当するデータを抽出して保存
for price_data in prices:
if price_data['symbol'] == symbol:
writer.writerow([
price_data['timestamp'].strftime('%Y-%m-%d %H:%M:%S'),
price_data['exchange'],
price_data['price'],
price_data['volume']
])
async def check_arbitrage(self, prices):
"""アービトラージ機会を検出"""
# シンボルごとに価格を整理
by_symbol = {}
for price_data in prices:
symbol = price_data['symbol']
if symbol not in by_symbol:
by_symbol[symbol] = []
by_symbol[symbol].append(price_data)
# 各シンボルで価格差をチェック
for symbol, price_list in by_symbol.items():
if len(price_list) < 2:
continue
# 最安値と最高値を見つける
sorted_prices = sorted(price_list, key=lambda x: x['price'])
min_price = sorted_prices[0]
max_price = sorted_prices[-1]
# 価格差を計算
diff = max_price['price'] - min_price['price']
diff_pct = (diff / min_price['price']) * 100
# 1%以上の差があればアービトラージ機会として表示
if diff_pct >= 1.0:
print(f"n💰 アービトラージ機会検出: {symbol}")
print(f" 買い: {min_price['exchange']} @ ${min_price['price']:,.2f}")
print(f" 売り: {max_price['exchange']} @ ${max_price['price']:,.2f}")
print(f" 利益: ${diff:,.2f} ({diff_pct:.2f}%)")
async def start_monitoring(self):
"""監視を開始(データ保存機能付き)"""
self.running = True
print("価格監視を開始しました(データ保存機能付き)...")
print(f"データは {self.save_dir} に保存されます")
print("停止するにはCtrl+Cを押してくださいn")
while self.running:
prices = await self.check_all_prices()
# 価格表示
self.display_prices(prices)
# データ保存
self.save_to_csv(prices)
# アラートチェック
await self.check_alerts(prices)
# アービトラージチェック
await self.check_arbitrage(prices)
await asyncio.sleep(self.check_interval)
# 完全版の使用例
async def main_complete():
monitor = DataSavingMonitor(
exchanges=['binance', 'coinbase', 'kraken'],
symbols=['BTC/USDT', 'ETH/USDT'],
check_interval=30,
save_dir="crypto_prices"
)
# アラート設定
monitor.add_alert('BTC/USDT', 'above', 45000, "ビットコインが$45,000を突破!")
monitor.add_alert('ETH/USDT', 'above', 3000, "イーサリアムが$3,000を突破!")
try:
await monitor.start_monitoring()
except KeyboardInterrupt:
print("n監視を終了しました")
print(f"保存されたデータは {monitor.save_dir} で確認できます")
# 実行
asyncio.run(main_complete())
使い方とカスタマイズ
基本的な使い方
# 1. 簡単な監視
monitor = CryptoPriceMonitor(
exchanges=['binance'],
symbols=['BTC/USDT'],
check_interval=60 # 1分ごと
)
# 2. 複数取引所・複数通貨
monitor = DataSavingMonitor(
exchanges=['binance', 'coinbase', 'kraken', 'bitfinex'],
symbols=['BTC/USDT', 'ETH/USDT', 'XRP/USDT'],
check_interval=30
)
# 3. アラート設定
monitor.add_alert('BTC/USDT', 'above', 50000)
monitor.add_alert('BTC/USDT', 'below', 30000)
カスタマイズのアイデア
- LINE通知の追加(LINE Notify APIを使用)
- Webダッシュボード(StreamlitやFlaskで可視化)
- データベース保存(SQLiteやPostgreSQL)
- テクニカル指標の計算(移動平均線など)
まとめ
この3部作で学んだこと:
- 第1部:非同期処理の基本概念
- 第2部:CCXTでの実践的な使い方
- 第3部:実用的なツールの作成
非同期処理を使うことで、複数の取引所からリアルタイムでデータを取得し、効率的に処理できるようになりました。
このツールをベースに、自分のニーズに合わせてカスタマイズしてみてください!