Streamlit入門 第3部:ポートフォリオ管理とアラート機能で完全な投資ツールを構築

これまでの記事で価格表示とチャート分析ができるようになりました。今回は最終回として、自分の保有している仮想通貨を管理するポートフォリオ機能と、価格変動を知らせるアラート機能を追加します。これで本格的な投資管理ツールの完成です!

ポートフォリオ管理とは?なぜ重要なのか

ポートフォリオ管理の基本概念

ポートフォリオとは、投資している全ての資産をまとめて管理することです。

管理すべき情報:

  • どの通貨をどれだけ保有しているか
  • 現在の評価額はいくらか
  • 購入時からどれだけ利益/損失が出ているか
  • 全体のバランス(分散投資できているか)

手動管理の大変さ

従来の手動管理:

📝 メモ帳やExcelで管理
├─ BTC: 0.1枚、購入価格40,000ドル
├─ ETH: 2枚、購入価格2,500ドル  
└─ XRP: 1000枚、購入価格0.5ドル

💻 毎回手動で計算
├─ 現在価格を調べる
├─ 評価額を計算する
└─ 損益を計算する

これをStreamlitで自動化します!

セッション状態によるデータ管理

Streamlitでデータを保持する方法を学びましょう:

# session_state_basics.py
import streamlit as st

st.title("🔄 セッション状態の基本")

# セッション状態の初期化
if 'counter' not in st.session_state:
    st.session_state.counter = 0

if 'user_name' not in st.session_state:
    st.session_state.user_name = ""

# カウンター例
st.subheader("カウンター例")
st.write(f"現在のカウント: {st.session_state.counter}")

col1, col2, col3 = st.columns(3)

with col1:
    if st.button("➕ 増加"):
        st.session_state.counter += 1

with col2:
    if st.button("➖ 減少"):
        st.session_state.counter -= 1

with col3:
    if st.button("🔄 リセット"):
        st.session_state.counter = 0

# ユーザー名の保存例
st.subheader("ユーザー名保存例")

user_name = st.text_input(
    "ユーザー名を入力", 
    value=st.session_state.user_name
)

if st.button("名前を保存"):
    st.session_state.user_name = user_name
    st.success(f"ようこそ、{user_name}さん!")

if st.session_state.user_name:
    st.write(f"保存されている名前: **{st.session_state.user_name}**")

この仕組みを使ってポートフォリオデータを管理します。

基本的なポートフォリオ機能

保有通貨を追加・削除できる基本機能を作成:

# portfolio_basic.py
import streamlit as st
import ccxt
import pandas as pd
from datetime import datetime

st.title("💼 ポートフォリオ管理")

# セッション状態の初期化
if 'portfolio' not in st.session_state:
    st.session_state.portfolio = {}

if 'total_invested' not in st.session_state:
    st.session_state.total_invested = 0

@st.cache_data(ttl=60)
def get_current_prices():
    """現在の価格を取得"""
    exchange = ccxt.binance()
    symbols = ['BTC/USDT', 'ETH/USDT', 'XRP/USDT', 'ADA/USDT', 
               'DOT/USDT', 'LINK/USDT', 'UNI/USDT', 'MATIC/USDT']

    prices = {}
    for symbol in symbols:
        try:
            ticker = exchange.fetch_ticker(symbol)
            currency = symbol.split('/')[0]  # 'BTC/USDT' → 'BTC'
            prices[currency] = ticker['last']
        except:
            continue

    return prices

# サイドバーで通貨追加
st.sidebar.header("➕ 通貨を追加")

available_currencies = ['BTC', 'ETH', 'XRP', 'ADA', 'DOT', 'LINK', 'UNI', 'MATIC']

selected_currency = st.sidebar.selectbox(
    "通貨を選択",
    available_currencies
)

amount = st.sidebar.number_input(
    f"{selected_currency} の保有量",
    min_value=0.0,
    value=0.0,
    step=0.0001,
    format="%.6f"
)

purchase_price = st.sidebar.number_input(
    f"{selected_currency} の購入価格 ($)",
    min_value=0.0,
    value=0.0,
    step=0.01
)

if st.sidebar.button("💰 ポートフォリオに追加"):
    if amount > 0 and purchase_price > 0:
        # 既存の保有量に追加
        if selected_currency in st.session_state.portfolio:
            existing = st.session_state.portfolio[selected_currency]
            # 加重平均で購入価格を計算
            total_amount = existing['amount'] + amount
            avg_price = (
                (existing['amount'] * existing['purchase_price']) + 
                (amount * purchase_price)
            ) / total_amount

            st.session_state.portfolio[selected_currency] = {
                'amount': total_amount,
                'purchase_price': avg_price
            }
        else:
            st.session_state.portfolio[selected_currency] = {
                'amount': amount,
                'purchase_price': purchase_price
            }

        st.sidebar.success(f"✅ {selected_currency} を追加しました")
    else:
        st.sidebar.error("金額と価格を正しく入力してください")

# 現在価格取得
current_prices = get_current_prices()

# ポートフォリオ表示
if st.session_state.portfolio:
    st.subheader("📊 現在のポートフォリオ")

    portfolio_data = []
    total_value = 0
    total_cost = 0

    for currency, holding in st.session_state.portfolio.items():
        if currency in current_prices:
            current_price = current_prices[currency]
            amount = holding['amount']
            purchase_price = holding['purchase_price']

            current_value = amount * current_price
            cost_basis = amount * purchase_price
            profit_loss = current_value - cost_basis
            profit_loss_pct = (profit_loss / cost_basis) * 100 if cost_basis > 0 else 0

            total_value += current_value
            total_cost += cost_basis

            portfolio_data.append({
                '通貨': currency,
                '保有量': amount,
                '購入価格': purchase_price,
                '現在価格': current_price,
                '投資額': cost_basis,
                '評価額': current_value,
                '損益': profit_loss,
                '損益率': profit_loss_pct
            })

    # 概要メトリクス
    total_pnl = total_value - total_cost
    total_pnl_pct = (total_pnl / total_cost) * 100 if total_cost > 0 else 0

    col1, col2, col3, col4 = st.columns(4)

    with col1:
        st.metric("投資総額", f"${total_cost:,.2f}")

    with col2:
        st.metric("現在評価額", f"${total_value:,.2f}")

    with col3:
        st.metric(
            "総損益", 
            f"${total_pnl:+,.2f}",
            delta=f"{total_pnl_pct:+.2f}%"
        )

    with col4:
        profit_ratio = len([p for p in portfolio_data if p['損益'] > 0])
        st.metric("利益銘柄数", f"{profit_ratio}/{len(portfolio_data)}")

    # 詳細テーブル
    if portfolio_data:
        df = pd.DataFrame(portfolio_data)

        # スタイリング
        styled_df = df.style.format({
            '保有量': '{:.6f}',
            '購入価格': '${:,.2f}',
            '現在価格': '${:,.2f}',
            '投資額': '${:,.2f}',
            '評価額': '${:,.2f}',
            '損益': '${:+,.2f}',
            '損益率': '{:+.2f}%'
        }).apply(
            lambda x: ['background-color: lightgreen' if v > 0 
                      else 'background-color: lightcoral' if v < 0 
                      else '' for v in x], 
            subset=['損益']
        )

        st.dataframe(styled_df, use_container_width=True)

        # 個別削除ボタン
        st.subheader("🗑️ 通貨削除")

        cols = st.columns(len(portfolio_data))
        for i, (currency, _) in enumerate(st.session_state.portfolio.items()):
            with cols[i % len(cols)]:
                if st.button(f"❌ {currency}削除", key=f"delete_{currency}"):
                    del st.session_state.portfolio[currency]
                    st.experimental_rerun()

else:
    st.info("📝 サイドバーから通貨を追加してポートフォリオを作成しましょう")

# 全削除ボタン
if st.session_state.portfolio:
    if st.button("🗑️ ポートフォリオを全削除"):
        st.session_state.portfolio = {}
        st.experimental_rerun()

高度なアラート機能

価格変動やポートフォリオの状況に応じてアラートを出す機能:

# alert_system.py
import streamlit as st
import ccxt
import pandas as pd
from datetime import datetime, timedelta
st.title("🚨 アラートシステム")
# セッション状態の初期化
if 'alerts' not in st.session_state:
st.session_state.alerts = []
if 'alert_history' not in st.session_state:
st.session_state.alert_history = []
@st.cache_data(ttl=60)
def get_prices_for_alerts():
"""アラート用の価格データを取得"""
exchange = ccxt.binance()
symbols = ['BTC/USDT', 'ETH/USDT', 'XRP/USDT', 'ADA/USDT']
prices = {}
for symbol in symbols:
try:
ticker = exchange.fetch_ticker(symbol)
prices[symbol] = {
'price': ticker['last'],
'change_24h': ticker['percentage']
}
except:
continue
return prices
# アラート設定
st.subheader("⚙️ アラート設定")
with st.expander("新しいアラートを作成"):
col1, col2, col3 = st.columns(3)
with col1:
alert_type = st.selectbox(
"アラートタイプ",
["価格アラート", "変動率アラート", "ポートフォリオアラート"]
)
if alert_type == "価格アラート":
with col2:
alert_symbol = st.selectbox(
"通貨",
['BTC/USDT', 'ETH/USDT', 'XRP/USDT', 'ADA/USDT']
)
with col3:
condition = st.selectbox("条件", ["以上", "以下"])
target_price = st.number_input(
"目標価格 ($)",
min_value=0.0,
value=50000.0 if alert_symbol == 'BTC/USDT' else 3000.0
)
if st.button("価格アラートを追加"):
alert = {
'type': 'price',
'symbol': alert_symbol,
'condition': condition,
'target_price': target_price,
'created_at': datetime.now(),
'triggered': False
}
st.session_state.alerts.append(alert)
st.success("価格アラートを追加しました")
elif alert_type == "変動率アラート":
with col2:
alert_symbol = st.selectbox(
"通貨",
['BTC/USDT', 'ETH/USDT', 'XRP/USDT', 'ADA/USDT'],
key="change_symbol"
)
with col3:
change_threshold = st.number_input(
"変動率閾値 (%)",
min_value=0.0,
max_value=100.0,
value=5.0
)
if st.button("変動率アラートを追加"):
alert = {
'type': 'change',
'symbol': alert_symbol,
'threshold': change_threshold,
'created_at': datetime.now(),
'triggered': False
}
st.session_state.alerts.append(alert)
st.success("変動率アラートを追加しました")
# アラートチェック実行
prices = get_prices_for_alerts()
triggered_alerts = []
for i, alert in enumerate(st.session_state.alerts):
if alert['triggered']:
continue
if alert['type'] == 'price':
symbol = alert['symbol']
if symbol in prices:
current_price = prices[symbol]['price']
if (alert['condition'] == '以上' and current_price >= alert['target_price']) or 
(alert['condition'] == '以下' and current_price <= alert['target_price']):
triggered_alerts.append({
'message': f"🚨 {symbol} が ${alert['target_price']:,.2f} {alert['condition']}になりました!",
'details': f"現在価格: ${current_price:,.2f}",
'alert_index': i
})
st.session_state.alerts[i]['triggered'] = True
# 履歴に追加
st.session_state.alert_history.append({
'timestamp': datetime.now(),
'message': f"{symbol} 価格アラート発火",
'details': f"${current_price:,.2f} ({alert['condition']} ${alert['target_price']:,.2f})"
})
elif alert['type'] == 'change':
symbol = alert['symbol']
if symbol in prices:
change_24h = abs(prices[symbol]['change_24h'])
if change_24h >= alert['threshold']:
triggered_alerts.append({
'message': f"📊 {symbol} の24h変動率が {alert['threshold']}%を超えました!",
'details': f"現在の変動率: {prices[symbol]['change_24h']:+.2f}%",
'alert_index': i
})
st.session_state.alerts[i]['triggered'] = True
# 履歴に追加
st.session_state.alert_history.append({
'timestamp': datetime.now(),
'message': f"{symbol} 変動率アラート発火",
'details': f"{prices[symbol]['change_24h']:+.2f}% (閾値: {alert['threshold']}%)"
})
# トリガーされたアラートを表示
if triggered_alerts:
st.subheader("🔔 発火したアラート")
for alert in triggered_alerts:
st.error(f"{alert['message']}n{alert['details']}")
# 現在のアラート一覧
st.subheader("📋 設定中のアラート")
if st.session_state.alerts:
active_alerts = []
for i, alert in enumerate(st.session_state.alerts):
status = "✅ 発火済み" if alert['triggered'] else "⏰ 待機中"
if alert['type'] == 'price':
description = f"{alert['symbol']} が ${alert['target_price']:,.2f} {alert['condition']}"
elif alert['type'] == 'change':
description = f"{alert['symbol']} の24h変動率が {alert['threshold']}%以上"
active_alerts.append({
'ID': i,
'タイプ': alert['type'],
'条件': description,
'ステータス': status,
'作成日時': alert['created_at'].strftime('%m/%d %H:%M')
})
df_alerts = pd.DataFrame(active_alerts)
st.dataframe(df_alerts, use_container_width=True)
# アラート削除
st.subheader("🗑️ アラート削除")
delete_id = st.selectbox(
"削除するアラートID",
[i for i in range(len(st.session_state.alerts))]
)
col1, col2 = st.columns(2)
with col1:
if st.button("選択したアラートを削除"):
if 0 <= delete_id < len(st.session_state.alerts):
st.session_state.alerts.pop(delete_id)
st.success("アラートを削除しました")
st.experimental_rerun()
with col2:
if st.button("全てのアラートを削除"):
st.session_state.alerts = []
st.success("全てのアラートを削除しました")
st.experimental_rerun()
else:
st.info("アラートが設定されていません")
# アラート履歴
if st.session_state.alert_history:
st.subheader("📜 アラート履歴")
history_data = []
for item in st.session_state.alert_history[-10:]:  # 最新10件
history_data.append({
'発火時刻': item['timestamp'].strftime('%m/%d %H:%M:%S'),
'アラート': item['message'],
'詳細': item['details']
})
df_history = pd.DataFrame(history_data)
st.dataframe(df_history, use_container_width=True)

データ永続化とエクスポート機能

ポートフォリオデータを保存・読み込みできる機能:

# data_persistence.py
import streamlit as st
import json
import pandas as pd
from datetime import datetime
import io
st.title("💾 データ管理")
# JSONでのエクスポート/インポート
st.subheader("📤 ポートフォリオのエクスポート/インポート")
if 'portfolio' in st.session_state and st.session_state.portfolio:
# エクスポート
col1, col2 = st.columns(2)
with col1:
if st.button("📁 JSONでエクスポート"):
export_data = {
'portfolio': st.session_state.portfolio,
'export_date': datetime.now().isoformat(),
'version': '1.0'
}
json_str = json.dumps(export_data, indent=2, ensure_ascii=False)
st.download_button(
label="💾 ダウンロード",
data=json_str,
file_name=f"portfolio_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json",
mime="application/json"
)
with col2:
if st.button("📊 CSVでエクスポート"):
# 現在の評価額込みでCSVエクスポート
portfolio_data = []
for currency, holding in st.session_state.portfolio.items():
portfolio_data.append({
'通貨': currency,
'保有量': holding['amount'],
'購入価格': holding['purchase_price'],
'投資額': holding['amount'] * holding['purchase_price']
})
df = pd.DataFrame(portfolio_data)
csv_buffer = io.StringIO()
df.to_csv(csv_buffer, index=False, encoding='utf-8-sig')
st.download_button(
label="📊 CSVダウンロード",
data=csv_buffer.getvalue(),
file_name=f"portfolio_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv",
mime="text/csv"
)
# インポート
st.subheader("📥 ポートフォリオのインポート")
uploaded_file = st.file_uploader(
"JSONファイルを選択",
type=['json'],
help="以前エクスポートしたポートフォリオファイルを選択してください"
)
if uploaded_file is not None:
try:
import_data = json.load(uploaded_file)
if 'portfolio' in import_data:
st.write("**インポート内容プレビュー:**")
preview_data = []
for currency, holding in import_data['portfolio'].items():
preview_data.append({
'通貨': currency,
'保有量': holding['amount'],
'購入価格': f"${holding['purchase_price']:.2f}"
})
st.dataframe(pd.DataFrame(preview_data))
col1, col2 = st.columns(2)
with col1:
if st.button("🔄 上書きインポート"):
st.session_state.portfolio = import_data['portfolio']
st.success("ポートフォリオをインポートしました")
st.experimental_rerun()
with col2:
if st.button("➕ 追加インポート"):
for currency, holding in import_data['portfolio'].items():
if currency in st.session_state.portfolio:
# 既存の保有量に追加
existing = st.session_state.portfolio[currency]
total_amount = existing['amount'] + holding['amount']
avg_price = (
(existing['amount'] * existing['purchase_price']) + 
(holding['amount'] * holding['purchase_price'])
) / total_amount
st.session_state.portfolio[currency] = {
'amount': total_amount,
'purchase_price': avg_price
}
else:
st.session_state.portfolio[currency] = holding
st.success("ポートフォリオに追加しました")
st.experimental_rerun()
except Exception as e:
st.error(f"ファイルの読み込みに失敗しました: {e}")
# 設定のリセット
st.subheader("🗑️ データのリセット")
col1, col2, col3 = st.columns(3)
with col1:
if st.button("ポートフォリオをリセット"):
st.session_state.portfolio = {}
st.success("ポートフォリオをリセットしました")
with col2:
if st.button("アラートをリセット"):
if 'alerts' in st.session_state:
st.session_state.alerts = []
st.success("アラートをリセットしました")
with col3:
if st.button("全データをリセット"):
for key in list(st.session_state.keys()):
del st.session_state[key]
st.success("全データをリセットしました")
st.experimental_rerun()

完成版:統合ダッシュボード

全ての機能を統合した最終版を作成:

# complete_dashboard.py
import streamlit as st
import ccxt
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from datetime import datetime
import json
# ページ設定
st.set_page_config(
page_title="仮想通貨投資ダッシュボード",
page_icon="💰",
layout="wide"
)
# サイドバーでページ選択
st.sidebar.title("🏠 ナビゲーション")
page = st.sidebar.selectbox(
"ページを選択",
["📊 ダッシュボード", "💼 ポートフォリオ", "🚨 アラート", "💾 データ管理"]
)
# メインタイトル
st.title("💰 仮想通貨投資ダッシュボード")
st.markdown("---")
# 各ページの実装
if page == "📊 ダッシュボード":
# 価格表示とチャート(第1部、第2部の内容)
st.header("価格情報")
# 実装内容...
elif page == "💼 ポートフォリオ":
# ポートフォリオ管理機能
st.header("ポートフォリオ管理")
# 実装内容...
elif page == "🚨 アラート":
# アラート機能
st.header("アラートシステム")
# 実装内容...
elif page == "💾 データ管理":
# データのエクスポート/インポート
st.header("データ管理")
# 実装内容...
# フッター
st.markdown("---")
st.caption("Streamlit仮想通貨ダッシュボード v1.0")

まとめ

この3部作で作成した機能:

🎯 実装した全機能

  1. 価格表示:リアルタイム価格取得・表示
  2. チャート分析:ローソク足・移動平均・出来高
  3. ポートフォリオ管理:保有通貨・損益計算
  4. アラート機能:価格・変動率通知
  5. データ管理:エクスポート・インポート

🚀 今後の拡張アイデア

  • 自動取引機能:条件に応じた自動売買
  • バックテスト機能:過去データでの戦略検証
  • ニュース連携:価格に影響するニュース表示
  • SNS連携:TwitterやDiscordでアラート通知

この完全なダッシュボードを使って、効率的な仮想通貨投資を始めましょう!

関連記事

コメントする