Streamlit入門 第3部:ポートフォリオ管理とアラート機能で完全な投資ツールを構築

約40分で読めます by ぽんたぬき
Streamlit入門 第3部:ポートフォリオ管理とアラート機能で完全な投資ツールを構築

これまでの記事で価格表示とチャート分析ができるようになりました。今回は最終回として、自分の保有している仮想通貨を管理するポートフォリオ機能と、価格変動を知らせるアラート機能を追加します。これで本格的な投資管理ツールの完成です!

ポートフォリオ管理とは?なぜ重要なのか

ポートフォリオ管理の基本概念

ポートフォリオとは、投資している全ての資産をまとめて管理することです。

管理すべき情報:

  • どの通貨をどれだけ保有しているか
  • 現在の評価額はいくらか
  • 購入時からどれだけ利益/損失が出ているか
  • 全体のバランス(分散投資できているか)

手動管理の大変さ

従来の手動管理:

📝 メモ帳やExcelで管理
├─ BTC: 0.1枚、購入価格40,000ドル
├─ ETH: 2枚、購入価格2,500ドル  
└─ XRP: 1000枚、購入価格0.5ドル

💻 毎回手動で計算
├─ 現在価格を調べる
├─ 評価額を計算する
└─ 損益を計算する

これをStreamlitで自動化します!

セッション状態によるデータ管理

Streamlitでデータを保持する方法を学びましょう:

# session_state_basics.py
import streamlit as st

st.title("🔄 セッション状態の基本")

# セッション状態の初期化
if 'counter' not in st.session_state:
    st.session_state.counter = 0

if 'user_name' not in st.session_state:
    st.session_state.user_name = ""

# カウンター例
st.subheader("カウンター例")
st.write(f"現在のカウント: {st.session_state.counter}")

col1, col2, col3 = st.columns(3)

with col1:
    if st.button("➕ 増加"):
        st.session_state.counter += 1

with col2:
    if st.button("➖ 減少"):
        st.session_state.counter -= 1

with col3:
    if st.button("🔄 リセット"):
        st.session_state.counter = 0

# ユーザー名の保存例
st.subheader("ユーザー名保存例")

user_name = st.text_input(
    "ユーザー名を入力", 
    value=st.session_state.user_name
)

if st.button("名前を保存"):
    st.session_state.user_name = user_name
    st.success(f"ようこそ、{user_name}さん!")

if st.session_state.user_name:
    st.write(f"保存されている名前: **{st.session_state.user_name}**")

この仕組みを使ってポートフォリオデータを管理します。

基本的なポートフォリオ機能

保有通貨を追加・削除できる基本機能を作成:

# portfolio_basic.py
import streamlit as st
import ccxt
import pandas as pd
from datetime import datetime

st.title("💼 ポートフォリオ管理")

# セッション状態の初期化
if 'portfolio' not in st.session_state:
    st.session_state.portfolio = {}

if 'total_invested' not in st.session_state:
    st.session_state.total_invested = 0

@st.cache_data(ttl=60)
def get_current_prices():
    """現在の価格を取得"""
    exchange = ccxt.binance()
    symbols = ['BTC/USDT', 'ETH/USDT', 'XRP/USDT', 'ADA/USDT', 
               'DOT/USDT', 'LINK/USDT', 'UNI/USDT', 'MATIC/USDT']
    
    prices = {}
    for symbol in symbols:
        try:
            ticker = exchange.fetch_ticker(symbol)
            currency = symbol.split('/')[0]  # 'BTC/USDT' → 'BTC'
            prices[currency] = ticker['last']
        except:
            continue
    
    return prices

# サイドバーで通貨追加
st.sidebar.header("➕ 通貨を追加")

available_currencies = ['BTC', 'ETH', 'XRP', 'ADA', 'DOT', 'LINK', 'UNI', 'MATIC']

selected_currency = st.sidebar.selectbox(
    "通貨を選択",
    available_currencies
)

amount = st.sidebar.number_input(
    f"{selected_currency} の保有量",
    min_value=0.0,
    value=0.0,
    step=0.0001,
    format="%.6f"
)

purchase_price = st.sidebar.number_input(
    f"{selected_currency} の購入価格 ($)",
    min_value=0.0,
    value=0.0,
    step=0.01
)

if st.sidebar.button("💰 ポートフォリオに追加"):
    if amount > 0 and purchase_price > 0:
        # 既存の保有量に追加
        if selected_currency in st.session_state.portfolio:
            existing = st.session_state.portfolio[selected_currency]
            # 加重平均で購入価格を計算
            total_amount = existing['amount'] + amount
            avg_price = (
                (existing['amount'] * existing['purchase_price']) + 
                (amount * purchase_price)
            ) / total_amount
            
            st.session_state.portfolio[selected_currency] = {
                'amount': total_amount,
                'purchase_price': avg_price
            }
        else:
            st.session_state.portfolio[selected_currency] = {
                'amount': amount,
                'purchase_price': purchase_price
            }
        
        st.sidebar.success(f"✅ {selected_currency} を追加しました")
    else:
        st.sidebar.error("金額と価格を正しく入力してください")

# 現在価格取得
current_prices = get_current_prices()

# ポートフォリオ表示
if st.session_state.portfolio:
    st.subheader("📊 現在のポートフォリオ")
    
    portfolio_data = []
    total_value = 0
    total_cost = 0
    
    for currency, holding in st.session_state.portfolio.items():
        if currency in current_prices:
            current_price = current_prices[currency]
            amount = holding['amount']
            purchase_price = holding['purchase_price']
            
            current_value = amount * current_price
            cost_basis = amount * purchase_price
            profit_loss = current_value - cost_basis
            profit_loss_pct = (profit_loss / cost_basis) * 100 if cost_basis > 0 else 0
            
            total_value += current_value
            total_cost += cost_basis
            
            portfolio_data.append({
                '通貨': currency,
                '保有量': amount,
                '購入価格': purchase_price,
                '現在価格': current_price,
                '投資額': cost_basis,
                '評価額': current_value,
                '損益': profit_loss,
                '損益率': profit_loss_pct
            })
    
    # 概要メトリクス
    total_pnl = total_value - total_cost
    total_pnl_pct = (total_pnl / total_cost) * 100 if total_cost > 0 else 0
    
    col1, col2, col3, col4 = st.columns(4)
    
    with col1:
        st.metric("投資総額", f"${total_cost:,.2f}")
    
    with col2:
        st.metric("現在評価額", f"${total_value:,.2f}")
    
    with col3:
        st.metric(
            "総損益", 
            f"${total_pnl:+,.2f}",
            delta=f"{total_pnl_pct:+.2f}%"
        )
    
    with col4:
        profit_ratio = len([p for p in portfolio_data if p['損益'] > 0])
        st.metric("利益銘柄数", f"{profit_ratio}/{len(portfolio_data)}")
    
    # 詳細テーブル
    if portfolio_data:
        df = pd.DataFrame(portfolio_data)
        
        # スタイリング
        styled_df = df.style.format({
            '保有量': '{:.6f}',
            '購入価格': '${:,.2f}',
            '現在価格': '${:,.2f}',
            '投資額': '${:,.2f}',
            '評価額': '${:,.2f}',
            '損益': '${:+,.2f}',
            '損益率': '{:+.2f}%'
        }).apply(
            lambda x: ['background-color: lightgreen' if v > 0 
                      else 'background-color: lightcoral' if v < 0 
                      else '' for v in x], 
            subset=['損益']
        )
        
        st.dataframe(styled_df, use_container_width=True)
        
        # 個別削除ボタン
        st.subheader("🗑️ 通貨削除")
        
        cols = st.columns(len(portfolio_data))
        for i, (currency, _) in enumerate(st.session_state.portfolio.items()):
            with cols[i % len(cols)]:
                if st.button(f"❌ {currency}削除", key=f"delete_{currency}"):
                    del st.session_state.portfolio[currency]
                    st.experimental_rerun()

else:
    st.info("📝 サイドバーから通貨を追加してポートフォリオを作成しましょう")

# 全削除ボタン
if st.session_state.portfolio:
    if st.button("🗑️ ポートフォリオを全削除"):
        st.session_state.portfolio = {}
        st.experimental_rerun()

高度なアラート機能

価格変動やポートフォリオの状況に応じてアラートを出す機能:

# alert_system.py
import streamlit as st
import ccxt
import pandas as pd
from datetime import datetime, timedelta

st.title("🚨 アラートシステム")

# セッション状態の初期化
if 'alerts' not in st.session_state:
    st.session_state.alerts = []

if 'alert_history' not in st.session_state:
    st.session_state.alert_history = []

@st.cache_data(ttl=60)
def get_prices_for_alerts():
    """アラート用の価格データを取得"""
    exchange = ccxt.binance()
    symbols = ['BTC/USDT', 'ETH/USDT', 'XRP/USDT', 'ADA/USDT']
    
    prices = {}
    for symbol in symbols:
        try:
            ticker = exchange.fetch_ticker(symbol)
            prices[symbol] = {
                'price': ticker['last'],
                'change_24h': ticker['percentage']
            }
        except:
            continue
    
    return prices

# アラート設定
st.subheader("⚙️ アラート設定")

with st.expander("新しいアラートを作成"):
    col1, col2, col3 = st.columns(3)
    
    with col1:
        alert_type = st.selectbox(
            "アラートタイプ",
            ["価格アラート", "変動率アラート", "ポートフォリオアラート"]
        )
    
    if alert_type == "価格アラート":
        with col2:
            alert_symbol = st.selectbox(
                "通貨",
                ['BTC/USDT', 'ETH/USDT', 'XRP/USDT', 'ADA/USDT']
            )
        
        with col3:
            condition = st.selectbox("条件", ["以上", "以下"])
        
        target_price = st.number_input(
            "目標価格 ($)",
            min_value=0.0,
            value=50000.0 if alert_symbol == 'BTC/USDT' else 3000.0
        )
        
        if st.button("価格アラートを追加"):
            alert = {
                'type': 'price',
                'symbol': alert_symbol,
                'condition': condition,
                'target_price': target_price,
                'created_at': datetime.now(),
                'triggered': False
            }
            st.session_state.alerts.append(alert)
            st.success("価格アラートを追加しました")
    
    elif alert_type == "変動率アラート":
        with col2:
            alert_symbol = st.selectbox(
                "通貨",
                ['BTC/USDT', 'ETH/USDT', 'XRP/USDT', 'ADA/USDT'],
                key="change_symbol"
            )
        
        with col3:
            change_threshold = st.number_input(
                "変動率閾値 (%)",
                min_value=0.0,
                max_value=100.0,
                value=5.0
            )
        
        if st.button("変動率アラートを追加"):
            alert = {
                'type': 'change',
                'symbol': alert_symbol,
                'threshold': change_threshold,
                'created_at': datetime.now(),
                'triggered': False
            }
            st.session_state.alerts.append(alert)
            st.success("変動率アラートを追加しました")

# アラートチェック実行
prices = get_prices_for_alerts()
triggered_alerts = []

for i, alert in enumerate(st.session_state.alerts):
    if alert['triggered']:
        continue
        
    if alert['type'] == 'price':
        symbol = alert['symbol']
        if symbol in prices:
            current_price = prices[symbol]['price']
            
            if (alert['condition'] == '以上' and current_price >= alert['target_price']) or \
               (alert['condition'] == '以下' and current_price <= alert['target_price']):
                
                triggered_alerts.append({
                    'message': f"🚨 {symbol} が ${alert['target_price']:,.2f} {alert['condition']}になりました!",
                    'details': f"現在価格: ${current_price:,.2f}",
                    'alert_index': i
                })
                st.session_state.alerts[i]['triggered'] = True
                
                # 履歴に追加
                st.session_state.alert_history.append({
                    'timestamp': datetime.now(),
                    'message': f"{symbol} 価格アラート発火",
                    'details': f"${current_price:,.2f} ({alert['condition']} ${alert['target_price']:,.2f})"
                })
    
    elif alert['type'] == 'change':
        symbol = alert['symbol']
        if symbol in prices:
            change_24h = abs(prices[symbol]['change_24h'])
            
            if change_24h >= alert['threshold']:
                triggered_alerts.append({
                    'message': f"📊 {symbol} の24h変動率が {alert['threshold']}%を超えました!",
                    'details': f"現在の変動率: {prices[symbol]['change_24h']:+.2f}%",
                    'alert_index': i
                })
                st.session_state.alerts[i]['triggered'] = True
                
                # 履歴に追加
                st.session_state.alert_history.append({
                    'timestamp': datetime.now(),
                    'message': f"{symbol} 変動率アラート発火",
                    'details': f"{prices[symbol]['change_24h']:+.2f}% (閾値: {alert['threshold']}%)"
                })

# トリガーされたアラートを表示
if triggered_alerts:
    st.subheader("🔔 発火したアラート")
    for alert in triggered_alerts:
        st.error(f"{alert['message']}\n{alert['details']}")

# 現在のアラート一覧
st.subheader("📋 設定中のアラート")

if st.session_state.alerts:
    active_alerts = []
    
    for i, alert in enumerate(st.session_state.alerts):
        status = "✅ 発火済み" if alert['triggered'] else "⏰ 待機中"
        
        if alert['type'] == 'price':
            description = f"{alert['symbol']} が ${alert['target_price']:,.2f} {alert['condition']}"
        elif alert['type'] == 'change':
            description = f"{alert['symbol']} の24h変動率が {alert['threshold']}%以上"
        
        active_alerts.append({
            'ID': i,
            'タイプ': alert['type'],
            '条件': description,
            'ステータス': status,
            '作成日時': alert['created_at'].strftime('%m/%d %H:%M')
        })
    
    df_alerts = pd.DataFrame(active_alerts)
    st.dataframe(df_alerts, use_container_width=True)
    
    # アラート削除
    st.subheader("🗑️ アラート削除")
    
    delete_id = st.selectbox(
        "削除するアラートID",
        [i for i in range(len(st.session_state.alerts))]
    )
    
    col1, col2 = st.columns(2)
    
    with col1:
        if st.button("選択したアラートを削除"):
            if 0 <= delete_id < len(st.session_state.alerts):
                st.session_state.alerts.pop(delete_id)
                st.success("アラートを削除しました")
                st.experimental_rerun()
    
    with col2:
        if st.button("全てのアラートを削除"):
            st.session_state.alerts = []
            st.success("全てのアラートを削除しました")
            st.experimental_rerun()

else:
    st.info("アラートが設定されていません")

# アラート履歴
if st.session_state.alert_history:
    st.subheader("📜 アラート履歴")
    
    history_data = []
    for item in st.session_state.alert_history[-10:]:  # 最新10件
        history_data.append({
            '発火時刻': item['timestamp'].strftime('%m/%d %H:%M:%S'),
            'アラート': item['message'],
            '詳細': item['details']
        })
    
    df_history = pd.DataFrame(history_data)
    st.dataframe(df_history, use_container_width=True)

データ永続化とエクスポート機能

ポートフォリオデータを保存・読み込みできる機能:

# data_persistence.py
import streamlit as st
import json
import pandas as pd
from datetime import datetime
import io

st.title("💾 データ管理")

# JSONでのエクスポート/インポート
st.subheader("📤 ポートフォリオのエクスポート/インポート")

if 'portfolio' in st.session_state and st.session_state.portfolio:
    # エクスポート
    col1, col2 = st.columns(2)
    
    with col1:
        if st.button("📁 JSONでエクスポート"):
            export_data = {
                'portfolio': st.session_state.portfolio,
                'export_date': datetime.now().isoformat(),
                'version': '1.0'
            }
            
            json_str = json.dumps(export_data, indent=2, ensure_ascii=False)
            
            st.download_button(
                label="💾 ダウンロード",
                data=json_str,
                file_name=f"portfolio_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json",
                mime="application/json"
            )
    
    with col2:
        if st.button("📊 CSVでエクスポート"):
            # 現在の評価額込みでCSVエクスポート
            portfolio_data = []
            
            for currency, holding in st.session_state.portfolio.items():
                portfolio_data.append({
                    '通貨': currency,
                    '保有量': holding['amount'],
                    '購入価格': holding['purchase_price'],
                    '投資額': holding['amount'] * holding['purchase_price']
                })
            
            df = pd.DataFrame(portfolio_data)
            csv_buffer = io.StringIO()
            df.to_csv(csv_buffer, index=False, encoding='utf-8-sig')
            
            st.download_button(
                label="📊 CSVダウンロード",
                data=csv_buffer.getvalue(),
                file_name=f"portfolio_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv",
                mime="text/csv"
            )

# インポート
st.subheader("📥 ポートフォリオのインポート")

uploaded_file = st.file_uploader(
    "JSONファイルを選択",
    type=['json'],
    help="以前エクスポートしたポートフォリオファイルを選択してください"
)

if uploaded_file is not None:
    try:
        import_data = json.load(uploaded_file)
        
        if 'portfolio' in import_data:
            st.write("**インポート内容プレビュー:**")
            
            preview_data = []
            for currency, holding in import_data['portfolio'].items():
                preview_data.append({
                    '通貨': currency,
                    '保有量': holding['amount'],
                    '購入価格': f"${holding['purchase_price']:.2f}"
                })
            
            st.dataframe(pd.DataFrame(preview_data))
            
            col1, col2 = st.columns(2)
            
            with col1:
                if st.button("🔄 上書きインポート"):
                    st.session_state.portfolio = import_data['portfolio']
                    st.success("ポートフォリオをインポートしました")
                    st.experimental_rerun()
            
            with col2:
                if st.button("➕ 追加インポート"):
                    for currency, holding in import_data['portfolio'].items():
                        if currency in st.session_state.portfolio:
                            # 既存の保有量に追加
                            existing = st.session_state.portfolio[currency]
                            total_amount = existing['amount'] + holding['amount']
                            avg_price = (
                                (existing['amount'] * existing['purchase_price']) + 
                                (holding['amount'] * holding['purchase_price'])
                            ) / total_amount
                            
                            st.session_state.portfolio[currency] = {
                                'amount': total_amount,
                                'purchase_price': avg_price
                            }
                        else:
                            st.session_state.portfolio[currency] = holding
                    
                    st.success("ポートフォリオに追加しました")
                    st.experimental_rerun()
    
    except Exception as e:
        st.error(f"ファイルの読み込みに失敗しました: {e}")

# 設定のリセット
st.subheader("🗑️ データのリセット")

col1, col2, col3 = st.columns(3)

with col1:
    if st.button("ポートフォリオをリセット"):
        st.session_state.portfolio = {}
        st.success("ポートフォリオをリセットしました")

with col2:
    if st.button("アラートをリセット"):
        if 'alerts' in st.session_state:
            st.session_state.alerts = []
        st.success("アラートをリセットしました")

with col3:
    if st.button("全データをリセット"):
        for key in list(st.session_state.keys()):
            del st.session_state[key]
        st.success("全データをリセットしました")
        st.experimental_rerun()

完成版:統合ダッシュボード

全ての機能を統合した最終版を作成:

# complete_dashboard.py
import streamlit as st
import ccxt
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from datetime import datetime
import json

# ページ設定
st.set_page_config(
    page_title="仮想通貨投資ダッシュボード",
    page_icon="💰",
    layout="wide"
)

# サイドバーでページ選択
st.sidebar.title("🏠 ナビゲーション")
page = st.sidebar.selectbox(
    "ページを選択",
    ["📊 ダッシュボード", "💼 ポートフォリオ", "🚨 アラート", "💾 データ管理"]
)

# メインタイトル
st.title("💰 仮想通貨投資ダッシュボード")
st.markdown("---")

# 各ページの実装
if page == "📊 ダッシュボード":
    # 価格表示とチャート(第1部、第2部の内容)
    st.header("価格情報")
    # 実装内容...

elif page == "💼 ポートフォリオ":
    # ポートフォリオ管理機能
    st.header("ポートフォリオ管理")
    # 実装内容...

elif page == "🚨 アラート":
    # アラート機能
    st.header("アラートシステム")
    # 実装内容...

elif page == "💾 データ管理":
    # データのエクスポート/インポート
    st.header("データ管理")
    # 実装内容...

# フッター
st.markdown("---")
st.caption("Streamlit仮想通貨ダッシュボード v1.0")

まとめ

この3部作で作成した機能:

🎯 実装した全機能

  1. 価格表示:リアルタイム価格取得・表示
  2. チャート分析:ローソク足・移動平均・出来高
  3. ポートフォリオ管理:保有通貨・損益計算
  4. アラート機能:価格・変動率通知
  5. データ管理:エクスポート・インポート

🚀 今後の拡張アイデア

  • 自動取引機能:条件に応じた自動売買
  • バックテスト機能:過去データでの戦略検証
  • ニュース連携:価格に影響するニュース表示
  • SNS連携:TwitterやDiscordでアラート通知

この完全なダッシュボードを使って、効率的な仮想通貨投資を始めましょう!

関連記事

Streamlit入門 第2部:美しいチャートと可視化で仮想通貨を分析しよう
Python

Streamlit入門 第2部:美しいチャートと可視化で仮想通貨を分析しよう

前回の記事で基本的な価格表示ができるようになりました。今回は、プロのトレーダーが使うような美しいチャートを追加して、本格的な分析ツールに仕上げていきます。難しそうに見えますが、実は数行のコードで驚くほど高機能なチャートが作れるんです! Plotlyとは?なぜグラフライブラリの中で最強なのか ...

Python Webスクレイピング 2025 第3部:実践プロジェクトとデータ管理
Python

Python Webスクレイピング 2025 第3部:実践プロジェクトとデータ管理

はじめに シリーズ最終回となる第3部では、実際のプロジェクトで使える実践的なスクレイピングシステムの構築について解説します。検出回避技術、データの永続化、監視システムの構築など、プロダクション環境で必要となる高度な技術を学びます。 また、法的・倫理的配慮についても詳しく説明し、責任あるスクレイピング...

Python Webスクレイピング 2025 第2部:非同期処理と高度なデータ抽出
Python

Python Webスクレイピング 2025 第2部:非同期処理と高度なデータ抽出

はじめに 第1部ではPlaywrightの基本的な使い方と環境構築について学びました。第2部では、並行処理によるパフォーマンス向上と高度なデータ抽出テクニックについて詳しく解説します。 大規模なスクレイピングプロジェクトでは、単一ページずつの処理では時間がかかりすぎるため、複数のページを同時に処理す...

コメント

0/2000