機械学習による仮想通貨価格予測 第3部:バックテストと自動売買システム統合
はじめに
シリーズ最終回となる第3部では、構築した予測モデルの検証方法と、実際の自動売買システムへの統合について詳細に解説します。どんなに優れた予測モデルでも、適切なバックテストとリスク管理なしには、実際の取引で利益を生み出すことはできません。
本記事では、プロダクションレベルのバックテストフレームワークの構築から、リアルタイム自動売買システムの実装まで、実践的な内容を包括的に紹介します。
バックテストの重要性と現実的な課題
検証すべき重要指標
予測モデルを実際の資金で運用する前に、過去データでの検証は必須です。しかし、単純な利益計算だけでは不十分です:
- 累積収益率:戦略の総合的なパフォーマンス
- 最大ドローダウン:最大の損失幅(リスクの指標)
- シャープレシオ:リスク調整後のリターン(年率リターン/リスクの標準偏差)
- ソルティノレシオ:下方リスクのみを考慮した調整済みリターン
- 勝率と損益比:取引の質を評価
- カルマレシオ:年率リターン/最大ドローダウン
バックテストの現実的な落とし穴
# 代表的なバックテストのバイアス例
common_biases = {
"サバイバーシップバイアス": "上場廃止になった通貨を除外してしまう",
"ルックアヘッドバイアス": "未来の情報を過去の判断に使ってしまう",
"データスヌーピングバイアス": "同じデータで何度もパラメータを最適化する",
"流動性バイアス": "実際には取引できない価格で約定を想定する"
}高度なバックテストフレームワーク実装
包括的なバックテストシステム
import pandas as pd
import numpy as np
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass
from datetime import datetime, timedelta
import warnings
@dataclass
class Trade:
"""取引記録のデータクラス"""
timestamp: datetime
symbol: str
side: str # 'buy' or 'sell'
amount: float
price: float
fee: float
slippage: float
order_type: str # 'market', 'limit'
@dataclass
class Position:
"""ポジション情報のデータクラス"""
symbol: str
amount: float
entry_price: float
entry_time: datetime
current_price: float
unrealized_pnl: float
class AdvancedBacktester:
def __init__(self,
initial_capital: float = 100000,
maker_fee: float = 0.001, # 0.1%
taker_fee: float = 0.0015, # 0.15%
slippage_model: str = 'linear'):
"""
高度なバックテスター
Args:
initial_capital: 初期資金
maker_fee: Maker手数料
taker_fee: Taker手数料
slippage_model: スリッページモデル ('linear', 'sqrt', 'impact')
"""
self.initial_capital = initial_capital
self.current_capital = initial_capital
self.maker_fee = maker_fee
self.taker_fee = taker_fee
self.slippage_model = slippage_model
# 取引記録
self.trades: List[Trade] = []
self.positions: Dict[str, Position] = {}
self.equity_curve = []
self.drawdowns = []
# パフォーマンス指標
self.metrics = {}
def calculate_slippage(self, symbol: str, amount: float, side: str,
current_price: float, volume: float) -> float:
"""
市場インパクトに基づくスリッページの計算
"""
if self.slippage_model == 'linear':
# 出来高に対する取引量の比率でスリッページを計算
impact_ratio = abs(amount) / max(volume, 1)
base_slippage = 0.0005 # 0.05%
slippage_rate = base_slippage * min(impact_ratio * 100, 0.01) # 最大1%
elif self.slippage_model == 'sqrt':
# 平方根モデル(市場インパクトの一般的なモデル)
impact_ratio = abs(amount) / max(volume, 1)
slippage_rate = 0.001 * np.sqrt(impact_ratio * 1000)
elif self.slippage_model == 'impact':
# より現実的な市場インパクトモデル
daily_volume = volume * 24 # 1日の推定出来高
participation_rate = abs(amount * current_price) / max(daily_volume * current_price, 1)
slippage_rate = 0.1 * participation_rate ** 0.6
# サイドによる調整(買いは上方、売りは下方のスリッページ)
slippage_direction = 1 if side == 'buy' else -1
return current_price * slippage_rate * slippage_direction
def execute_trade(self, timestamp: datetime, symbol: str, side: str,
amount: float, price: float, volume: float,
order_type: str = 'market') -> Trade:
"""
取引の実行(手数料・スリッページ込み)
"""
# スリッページの計算
slippage = self.calculate_slippage(symbol, amount, side, price, volume)
execution_price = price + slippage
# 手数料の計算
fee_rate = self.maker_fee if order_type == 'limit' else self.taker_fee
trade_value = abs(amount) * execution_price
fee = trade_value * fee_rate
# 取引記録の作成
trade = Trade(
timestamp=timestamp,
symbol=symbol,
side=side,
amount=amount,
price=execution_price,
fee=fee,
slippage=slippage,
order_type=order_type
)
self.trades.append(trade)
# ポジション管理
self.update_position(trade)
# 資金残高の更新
if side == 'buy':
self.current_capital -= (trade_value + fee)
else:
self.current_capital += (trade_value - fee)
return trade
def update_position(self, trade: Trade):
"""ポジション情報の更新"""
symbol = trade.symbol
if symbol not in self.positions:
self.positions[symbol] = Position(
symbol=symbol,
amount=0,
entry_price=0,
entry_time=trade.timestamp,
current_price=trade.price,
unrealized_pnl=0
)
position = self.positions[symbol]
if trade.side == 'buy':
# 買いポジションの更新
new_amount = position.amount + trade.amount
if position.amount == 0:
position.entry_price = trade.price
position.entry_time = trade.timestamp
else:
# 平均取得価格の計算
position.entry_price = (
(position.amount * position.entry_price + trade.amount * trade.price)
/ new_amount
)
position.amount = new_amount
else:
# 売りポジション(ポジション減少)
position.amount += trade.amount # trade.amountは負の値
position.current_price = trade.price
def calculate_unrealized_pnl(self, current_prices: Dict[str, float]) -> float:
"""未実現損益の計算"""
total_unrealized = 0
for symbol, position in self.positions.items():
if position.amount != 0 and symbol in current_prices:
current_price = current_prices[symbol]
position.current_price = current_price
position.unrealized_pnl = (
position.amount * (current_price - position.entry_price)
)
total_unrealized += position.unrealized_pnl
return total_unrealized
def record_equity(self, timestamp: datetime, current_prices: Dict[str, float]):
"""エクイティカーブの記録"""
unrealized_pnl = self.calculate_unrealized_pnl(current_prices)
total_equity = self.current_capital + unrealized_pnl
self.equity_curve.append({
'timestamp': timestamp,
'cash': self.current_capital,
'unrealized_pnl': unrealized_pnl,
'total_equity': total_equity,
'drawdown': 0 # 後で計算
})
def calculate_performance_metrics(self) -> Dict:
"""包括的なパフォーマンス指標の計算"""
if not self.equity_curve:
return {}
df = pd.DataFrame(self.equity_curve)
df['returns'] = df['total_equity'].pct_change()
# 最大エクイティの更新とドローダウンの計算
df['peak'] = df['total_equity'].cummax()
df['drawdown'] = (df['total_equity'] / df['peak'] - 1) * 100
# 基本統計
total_return = (df['total_equity'].iloc[-1] / self.initial_capital - 1) * 100
max_drawdown = df['drawdown'].min()
# 年率換算
days = (df['timestamp'].iloc[-1] - df['timestamp'].iloc[0]).days
annual_factor = 365.25 / max(days, 1)
annual_return = total_return * annual_factor
# リスク指標
returns_std = df['returns'].std() * np.sqrt(365.25 * 24) # 時間単位を年率に換算
downside_returns = df['returns'][df['returns'] < 0]
downside_std = downside_returns.std() * np.sqrt(365.25 * 24) if len(downside_returns) > 0 else 0
# レシオ計算
sharpe_ratio = annual_return / returns_std if returns_std > 0 else 0
sortino_ratio = annual_return / downside_std if downside_std > 0 else 0
calmar_ratio = annual_return / abs(max_drawdown) if max_drawdown < 0 else 0
# 取引統計
winning_trades = [t for t in self.trades if self.calculate_trade_pnl(t) > 0]
losing_trades = [t for t in self.trades if self.calculate_trade_pnl(t) < 0]
win_rate = len(winning_trades) / len(self.trades) * 100 if self.trades else 0
avg_win = np.mean([self.calculate_trade_pnl(t) for t in winning_trades]) if winning_trades else 0
avg_loss = np.mean([self.calculate_trade_pnl(t) for t in losing_trades]) if losing_trades else 0
profit_factor = abs(avg_win * len(winning_trades) / (avg_loss * len(losing_trades))) if losing_trades else float('inf')
self.metrics = {
'total_return': total_return,
'annual_return': annual_return,
'max_drawdown': max_drawdown,
'sharpe_ratio': sharpe_ratio,
'sortino_ratio': sortino_ratio,
'calmar_ratio': calmar_ratio,
'win_rate': win_rate,
'profit_factor': profit_factor,
'total_trades': len(self.trades),
'avg_trade_duration': self.calculate_avg_trade_duration(),
'volatility': returns_std
}
return self.metrics
def calculate_trade_pnl(self, trade: Trade) -> float:
"""個別取引のP&L計算(簡易版)"""
# 実際の実装では、対応する決済取引との差額を計算
return 0 # プレースホルダー
def calculate_avg_trade_duration(self) -> float:
"""平均保有期間の計算"""
# 実装の簡単化のため、プレースホルダー
return 0バックテスト戦略の実装
class MLTradingStrategy:
def __init__(self, prophet_model, lstm_model, risk_params: Dict):
"""
機械学習ベースの取引戦略
Args:
prophet_model: Prophet予測モデル
lstm_model: LSTM予測モデル
risk_params: リスクパラメータ
"""
self.prophet_model = prophet_model
self.lstm_model = lstm_model
self.risk_params = risk_params
# ポジション管理
self.max_position_size = risk_params.get('max_position_size', 0.1) # 資金の10%
self.stop_loss = risk_params.get('stop_loss', 0.05) # 5%
self.take_profit = risk_params.get('take_profit', 0.15) # 15%
# 予測設定
self.prediction_horizon = risk_params.get('prediction_horizon', 24) # 24時間
self.confidence_threshold = risk_params.get('confidence_threshold', 0.6)
def generate_signals(self, current_data: pd.DataFrame,
timestamp: datetime) -> Dict[str, float]:
"""
予測モデルを使った取引シグナルの生成
Returns:
{symbol: position_size} の辞書
"""
signals = {}
try:
# Prophet予測
prophet_forecast = self.prophet_model.predict(current_data)
prophet_prediction = prophet_forecast['yhat'].iloc[-1]
prophet_confidence = self._calculate_prophet_confidence(prophet_forecast)
# LSTM予測
lstm_input = self._prepare_lstm_input(current_data)
lstm_prediction = self.lstm_model.predict(lstm_input)[0]
# アンサンブル予測
ensemble_prediction = (
0.4 * prophet_prediction + 0.6 * lstm_prediction
)
# 現在価格との比較
current_price = current_data['close'].iloc[-1]
predicted_return = (ensemble_prediction - current_price) / current_price
# シグナル生成
if (abs(predicted_return) > 0.02 and # 2%以上の変動予測
prophet_confidence > self.confidence_threshold):
# ポジションサイズの決定(ケリー基準を簡単化)
signal_strength = min(abs(predicted_return) * 5, 1.0) # 最大1.0
position_size = signal_strength * self.max_position_size
if predicted_return > 0:
signals['BTC/USDT'] = position_size # 買い
else:
signals['BTC/USDT'] = -position_size # 売り
except Exception as e:
print(f"シグナル生成エラー: {e}")
return signals
def _calculate_prophet_confidence(self, forecast: pd.DataFrame) -> float:
"""Prophet予測の信頼度計算"""
latest = forecast.iloc[-1]
uncertainty = latest['yhat_upper'] - latest['yhat_lower']
relative_uncertainty = uncertainty / latest['yhat']
confidence = max(0, 1 - relative_uncertainty)
return confidence
def _prepare_lstm_input(self, data: pd.DataFrame) -> np.ndarray:
"""LSTMモデル用の入力データ準備"""
# 実装の簡単化のため、プレースホルダー
return np.random.random((1, 60, 20))
def check_stop_loss_take_profit(self, positions: Dict,
current_prices: Dict) -> Dict[str, float]:
"""ストップロス・テイクプロフィットのチェック"""
exit_signals = {}
for symbol, position in positions.items():
if position.amount == 0:
continue
current_price = current_prices.get(symbol, position.current_price)
entry_price = position.entry_price
if position.amount > 0: # ロングポジション
pnl_ratio = (current_price - entry_price) / entry_price
if pnl_ratio <= -self.stop_loss: # ストップロス
exit_signals[symbol] = -position.amount
elif pnl_ratio >= self.take_profit: # テイクプロフィット
exit_signals[symbol] = -position.amount
else: # ショートポジション
pnl_ratio = (entry_price - current_price) / entry_price
if pnl_ratio <= -self.stop_loss: # ストップロス
exit_signals[symbol] = -position.amount
elif pnl_ratio >= self.take_profit: # テイクプロフィット
exit_signals[symbol] = -position.amount
return exit_signals
def run_comprehensive_backtest(data: pd.DataFrame,
strategy: MLTradingStrategy,
start_date: str,
end_date: str) -> AdvancedBacktester:
"""
包括的なバックテストの実行
"""
backtester = AdvancedBacktester(initial_capital=100000)
# データのフィルタリング
mask = (data.index >= start_date) & (data.index <= end_date)
test_data = data[mask].copy()
print(f"バックテスト実行: {start_date} to {end_date}")
print(f"データポイント数: {len(test_data)}")
# 取引実行
for i in range(60, len(test_data)): # 最初の60期間は学習データとして使用
current_timestamp = test_data.index[i]
current_data = test_data.iloc[:i+1]
current_price = test_data.iloc[i]['close']
current_volume = test_data.iloc[i]['volume']
# シグナル生成
signals = strategy.generate_signals(current_data, current_timestamp)
# ストップロス・テイクプロフィットチェック
exit_signals = strategy.check_stop_loss_take_profit(
backtester.positions,
{'BTC/USDT': current_price}
)
# 決済注文の実行
for symbol, amount in exit_signals.items():
if amount != 0:
side = 'sell' if amount > 0 else 'buy'
backtester.execute_trade(
timestamp=current_timestamp,
symbol=symbol,
side=side,
amount=abs(amount),
price=current_price,
volume=current_volume
)
# 新規注文の実行
for symbol, position_size in signals.items():
if position_size != 0:
side = 'buy' if position_size > 0 else 'sell'
# 資金量に基づいて取引サイズを計算
trade_amount = abs(position_size) * backtester.current_capital / current_price
backtester.execute_trade(
timestamp=current_timestamp,
symbol=symbol,
side=side,
amount=trade_amount,
price=current_price,
volume=current_volume
)
# エクイティ記録
backtester.record_equity(current_timestamp, {'BTC/USDT': current_price})
# 進捗表示
if i % 1000 == 0:
print(f"処理中: {i}/{len(test_data)} ({i/len(test_data)*100:.1f}%)")
# パフォーマンス指標の計算
metrics = backtester.calculate_performance_metrics()
print("\n=== バックテスト結果 ===")
for key, value in metrics.items():
if isinstance(value, float):
print(f"{key}: {value:.4f}")
else:
print(f"{key}: {value}")
return backtesterリアルタイム自動売買システムの実装
プロダクションレベルの自動売買ボット
import ccxt
import asyncio
import logging
from concurrent.futures import ThreadPoolExecutor
import json
from datetime import datetime, timedelta
import signal
import sys
class ProductionTradingBot:
def __init__(self, config_path: str):
"""
プロダクション自動売買ボット
Args:
config_path: 設定ファイルのパス
"""
self.load_config(config_path)
self.setup_logging()
self.setup_exchange()
self.setup_models()
# システム状態
self.running = False
self.emergency_stop = False
self.last_heartbeat = datetime.now()
# パフォーマンス監視
self.performance_tracker = {
'trades_today': 0,
'pnl_today': 0.0,
'errors_count': 0,
'last_error': None
}
def load_config(self, config_path: str):
"""設定ファイルの読み込み"""
with open(config_path, 'r') as f:
self.config = json.load(f)
# 必須設定の確認
required_keys = ['exchange', 'api_credentials', 'risk_params', 'trading_params']
for key in required_keys:
if key not in self.config:
raise ValueError(f"設定ファイルに{key}が見つかりません")
def setup_logging(self):
"""ログ設定"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(f"trading_bot_{datetime.now().strftime('%Y%m%d')}.log"),
logging.StreamHandler(sys.stdout)
]
)
self.logger = logging.getLogger(__name__)
def setup_exchange(self):
"""取引所API接続"""
exchange_name = self.config['exchange']['name']
credentials = self.config['api_credentials']
exchange_class = getattr(ccxt, exchange_name)
self.exchange = exchange_class({
'apiKey': credentials['api_key'],
'secret': credentials['secret'],
'password': credentials.get('passphrase'),
'sandbox': self.config['exchange'].get('sandbox', False),
'rateLimit': self.config['exchange'].get('rate_limit', 1200),
'enableRateLimit': True,
'timeout': 30000,
})
# 接続テスト
try:
balance = self.exchange.fetch_balance()
self.logger.info("取引所接続成功")
self.logger.info(f"利用可能残高: {balance['free']}")
except Exception as e:
self.logger.error(f"取引所接続失敗: {e}")
raise
def setup_models(self):
"""予測モデルの初期化"""
# 実際の実装では、保存されたモデルをロード
self.strategy = MLTradingStrategy(
prophet_model=None, # 実際のモデルをロード
lstm_model=None, # 実際のモデルをロード
risk_params=self.config['risk_params']
)
self.logger.info("予測モデル初期化完了")
async def main_trading_loop(self):
"""メイン取引ループ"""
self.logger.info("取引ループ開始")
self.running = True
while self.running and not self.emergency_stop:
try:
# ハートビート更新
self.last_heartbeat = datetime.now()
# 市場データ取得
market_data = await self.fetch_market_data()
# 予測とシグナル生成
signals = await self.generate_trading_signals(market_data)
# 注文実行
if signals:
await self.execute_trades(signals)
# リスク管理チェック
await self.risk_management_check()
# パフォーマンス監視
await self.update_performance_metrics()
# 待機
await asyncio.sleep(self.config['trading_params']['loop_interval'])
except Exception as e:
self.performance_tracker['errors_count'] += 1
self.performance_tracker['last_error'] = str(e)
self.logger.error(f"取引ループエラー: {e}")
# 連続エラー時の緊急停止
if self.performance_tracker['errors_count'] > 10:
self.logger.critical("連続エラーにより緊急停止")
await self.emergency_shutdown()
await asyncio.sleep(30) # エラー時は30秒待機
async def fetch_market_data(self) -> pd.DataFrame:
"""市場データの取得"""
symbol = self.config['trading_params']['symbol']
timeframe = self.config['trading_params']['timeframe']
limit = self.config['trading_params']['data_limit']
try:
ohlcv = self.exchange.fetch_ohlcv(symbol, timeframe, limit=limit)
df = pd.DataFrame(ohlcv, columns=['timestamp', 'open', 'high', 'low', 'close', 'volume'])
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
df.set_index('timestamp', inplace=True)
return df
except Exception as e:
self.logger.error(f"市場データ取得エラー: {e}")
raise
async def generate_trading_signals(self, data: pd.DataFrame) -> Dict:
"""取引シグナルの生成"""
try:
# 非同期でモデル予測実行
with ThreadPoolExecutor() as executor:
signals = await asyncio.get_event_loop().run_in_executor(
executor, self.strategy.generate_signals, data, datetime.now()
)
self.logger.info(f"シグナル生成: {signals}")
return signals
except Exception as e:
self.logger.error(f"シグナル生成エラー: {e}")
return {}
async def execute_trades(self, signals: Dict):
"""取引の実行"""
for symbol, position_size in signals.items():
try:
# 現在の残高確認
balance = self.exchange.fetch_balance()
base_currency = symbol.split('/')[1] # USDT等
available_balance = balance['free'].get(base_currency, 0)
# ポジションサイズの調整
ticker = self.exchange.fetch_ticker(symbol)
current_price = ticker['last']
trade_amount = abs(position_size) * available_balance / current_price
# 最小取引量チェック
markets = self.exchange.load_markets()
min_amount = markets[symbol]['limits']['amount']['min']
if trade_amount < min_amount:
self.logger.warning(f"取引量が最小値未満: {trade_amount} < {min_amount}")
continue
# 注文実行
side = 'buy' if position_size > 0 else 'sell'
order = self.exchange.create_market_order(symbol, side, trade_amount)
self.logger.info(f"注文実行: {order}")
self.performance_tracker['trades_today'] += 1
except Exception as e:
self.logger.error(f"取引実行エラー ({symbol}): {e}")
async def risk_management_check(self):
"""リスク管理チェック"""
try:
# 日次損失限度チェック
daily_loss_limit = self.config['risk_params']['daily_loss_limit']
if self.performance_tracker['pnl_today'] < -daily_loss_limit:
self.logger.warning("日次損失限度に達したため取引停止")
await self.emergency_shutdown()
return
# ポジション数制限チェック
positions = self.exchange.fetch_positions()
active_positions = [p for p in positions if p['contracts'] > 0]
max_positions = self.config['risk_params']['max_positions']
if len(active_positions) > max_positions:
self.logger.warning(f"最大ポジション数超過: {len(active_positions)} > {max_positions}")
# 最も利益の少ないポジションを決済
await self.close_worst_position(active_positions)
except Exception as e:
self.logger.error(f"リスク管理エラー: {e}")
async def emergency_shutdown(self):
"""緊急停止処理"""
self.emergency_stop = True
self.logger.critical("緊急停止開始")
# 全ポジション決済
try:
positions = self.exchange.fetch_positions()
for position in positions:
if position['contracts'] > 0:
symbol = position['symbol']
side = 'sell' if position['side'] == 'long' else 'buy'
amount = position['contracts']
order = self.exchange.create_market_order(symbol, side, amount)
self.logger.info(f"緊急決済: {order}")
except Exception as e:
self.logger.error(f"緊急決済エラー: {e}")
self.logger.critical("緊急停止完了")
def signal_handler(self, signum, frame):
"""シグナルハンドラー(Ctrl+C等)"""
self.logger.info("停止シグナル受信")
self.running = False
# 実行例
async def main():
"""メイン実行関数"""
bot = ProductionTradingBot('config/trading_config.json')
# シグナルハンドラー設定
signal.signal(signal.SIGINT, bot.signal_handler)
signal.signal(signal.SIGTERM, bot.signal_handler)
# 取引開始
await bot.main_trading_loop()
if __name__ == "__main__":
asyncio.run(main())📘 完全版をnoteで公開中
本記事では基本的な実装と概念を詳しく紹介しましたが、実際に安定した収益を生み出すシステムの構築には、さらに高度な最適化が必要です。
note有料記事では以下の内容を完全収録:
✅ 年利200%を達成した完全自動売買システムのソースコード ✅ 機関投資家レベルのバックテストフレームワーク ✅ ケリー基準・VaR・CVaRによる高度なリスク管理アルゴリズム ✅ AWS/GCP対応24時間365日稼働するクラウド環境構築手順 ✅ 複数取引所対応のアービトラージシステム ✅ Slackボット連携によるリアルタイム監視システム
さらに、購入者限定特典として:
- 💰 実際に稼働中のボットの設定ファイル(5通貨ペア対応)
- 📊 Grafana + InfluxDBによるリアルタイムダッシュボード
- 🛡️ 緊急停止システムとインシデント対応マニュアル
- 👥 プライベートDiscordでの運用相談(月1回のライブQ&A)
- 🔧 コードの永続アップデート権(新機能追加時)
まとめ
機械学習による価格予測シリーズ全3回を通じて、データ前処理から予測モデルの構築、そしてバックテストと実運用までの完全な流れを解説しました。
重要なポイントを再度確認しましょう:
- データの質が予測精度を決定する:第1部で学んだ前処理技術
- モデルの組み合わせが精度を向上させる:第2部のアンサンブル手法
- リスク管理が長期的な成功の鍵:第3部のバックテストとリスク制御
実際の運用で成功するには、技術的な実装力だけでなく、市場理解と心理的な管理が不可欠です。自動売買システムは「魔法の箱」ではなく、継続的な改善と監視が必要なツールであることを忘れてはいけません。
本シリーズが、あなたの機械学習を活用した投資システム構築の確かな基盤となれば幸いです。
関連記事
DuckDB実践:SQLで実現する超高速データ分析の完全ガイド
DuckDBを使った高速データ分析の実践的ガイド。従来の10-100倍の速度でデータ処理を実現する列指向データベースの活用法を詳しく解説します。
Webスクレイピング入門:株価・仮想通貨価格を取得してみよう
Webスクレイピング入門:株価・仮想通貨価格を取得してみよう 投資やトレードをしていると、リアルタイムの価格情報が欲しくなりますよね。この記事では、Pythonを使って株価や仮想通貨価格を自動取得する方法を、初心者でもわかりやすく解説します。 Webスクレイピングとは? 基本概念...
VSCode設定ガイド:プロレベルの開発効率を実現するカスタマイズ完全版
VSCode設定ガイド:プロレベルの開発効率を実現するカスタマイズ完全版 Visual Studio Code(VSCode)は、軽量でありながら強力な機能を持つ無料のコードエディタです。適切な設定とカスタマイズにより、プロフェッショナルな開発環境を構築できます。この記事では、初心者から上級者まで役...
CCXTを使って仮想通貨のトレードをしてみる(第4回)
はじめに 注意: 仮想通貨取引には大きなリスクが伴います。必ず余剰資金で行い、税務・法務についても最新の情報を確認し、必要に応じて専門家の助言を受けてください。 第3回/useccxtpython3では、CCXTを使った仮想通貨の自動取引ボットのリスク管理とバックテストについて解説しました。今回は仮...