LSTMは、時系列データの長期依存性を捉えられる特殊なリカレントニューラルネットワーク(RNN)であり、従来のARIMAや指数平滑法では難しい非線形かつ複雑なパターンの予測に強みがあります。
本記事では、ディープラーニングの代表的な時系列解析手法であるLSTM(Long Short-Term Memory)について、基礎知識からPythonでの実装方法、実際の時系列データを用いた予測・異常検知・補正の具体例まで詳しく解説します。
時系列データ分析全般に関する情報、他の手法について知りたい方は、「【Python実践】時系列データ分析で未来予測・異常検知・補正に挑戦!」をご一読ください。

LSTM(Long Short-Term Memory)とは
STMは、リカレントニューラルネットワーク(RNN)の一種で、時系列データや系列データの長期依存関係を学習できる強力なディープラーニングモデルです。通常のRNNが苦手とする「長期間の情報を保持する」問題を、独自のゲート機構によって解決しています。
LSTMは複雑なパターンや非線形な変動も捉えることができ、時系列予測や異常検知、補正など多様なタスクに適用されています。ただし、モデル構築や学習には比較的多くの計算リソースとデータが必要で、パラメータの調整も難しい点があります。
得意な分野
✅ 長期依存性の学習:過去の長期間にわたる影響を考慮できるため、複雑なトレンドやパターンの予測に強い。
✅ 非線形なデータ:複雑で非線形な時系列の関係性を柔軟にモデル化可能。
✅ 多変量時系列:複数の関連する時系列データを同時に扱い、高度な予測や異常検知ができる。
✅ 自動特徴抽出:手動で特徴量を設計しなくても、モデルが重要なパターンを自動的に学習する。
不得意な分野
❌ 少量データ:大量の学習データがないと過学習や性能低下のリスクが高い。
❌ モデルの解釈性:ブラックボックス的な性質が強く、結果の説明や解釈が難しい。
❌ 計算コスト:トレーニングに時間と計算資源がかかるため、リアルタイム処理には不向きな場合もある。
❌ パラメータ調整:ハイパーパラメータが多く、適切な設定には専門知識が必要。
このような特徴を踏まえ、LSTMは複雑で長期的な時系列解析に非常に有効ですが、使いこなすには十分な学習データと計算リソース、そして知識が求められます。
準備
本記事で紹介しているプログラムを実行する場合、こちらに掲載しているプログラムを実行し、ダミーデータを作成の上、各プログラムの read_csv()
のファイル参照パスを適宜変更してください。
また、下記のコマンドで必要なモジュールをインストールしてください。
pip install pandas numpy matplotlib scikit-learn tensorflow keras
LSTMによる未来予測

LSTMは、過去の情報を記憶し、活用できるディープラーニングモデルです。このため、複雑な時系列パターンや非線形な変動を学習し、高精度な予測が可能です。
どれくらい過去の情報(期間)を記憶させるかは、sequence_length
で指定します。今回の例では 15日を指定したため、グラフ上の過去部分の予測が2025/1/15 から開始しています。それより前は15日分のデータが揃っていないので、予測できないのです。
sequence_length
の値は、予測タスクの性質、データの特性、そして利用可能な計算リソースで変わりますが、データに日次、週次、月次などの明確な周期性がある場合、その周期の長さやその倍数を指定します。
また、過去の特定のイベント(例:プロモーション、祝日、機器のメンテナンス)が現在の値に影響を与えている場合は、その影響期間をカバーできる程度の値を指定します。
サンプルデータを使った結果では、過去の測定値と予測値の間に大きな乖離があり、特に変動に対して十分に追従できていないことが見て取れます。また、未来の予測も信頼性に乏しい結果となっています。
この予測精度不足の原因として、sequence_length
に指定した15日という値が、データの持つ複雑なパターンや長期的な変動(例:数ヶ月スケールのトレンドや、もし存在するなら季節性)を捉えるには不十分である可能性が考えられます。LSTMは過去の情報を活用できますが、その「窓」であるsequence_length
が短すぎると、モデルが学習できる文脈が限られてしまい、結果として変動の大きいデータに対する予測が平坦化しやすくなります。
このように、LSTMがある程度のデータ件数、特に学習したい周期性や長期的なトレンドを複数回含むような期間のデータ(例えば、年間の季節性を学習したい場合、最低でも数年分)が無いと、モデルの持つ能力を十分に引き出し、高精度な予測を得ることは困難です。
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import rcParams
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.callbacks import EarlyStopping
import warnings
# TensorFlowの警告を非表示にする
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' # INFOとWARNINGメッセージを非表示にする
warnings.filterwarnings("ignore") # その他の警告メッセージを非表示にする
# 日本語フォントの設定 (お使いの環境に合わせて適宜変更してください)
try:
rcParams['font.family'] = 'Meiryo' # Windowsの場合
# rcParams['font.family'] = 'AppleGothic' # Macの場合
# rcParams['font.family'] = 'DejaVu Sans' # フォールバック(日本語表示はされない可能性あり)
except Exception:
print("警告: 日本語フォントの設定に失敗しました。グラフの日本語が正しく表示されない可能性があります。")
pass
# --- データのシーケンス変換関数 ---
def create_sequences(data, sequence_length):
"""
時系列データをLSTM入力用のシーケンスに変換する。
例: [1,2,3,4,5] -> sequence_length=3 -> X=[[1,2,3]], y=[4]
"""
X, y = [], []
for i in range(len(data) - sequence_length):
X.append(data[i:(i + sequence_length)])
y.append(data[i + sequence_length])
return np.array(X), np.array(y)
def lstm_future_forecasting(df: pd.DataFrame, time_col: str, value_col: str,
sequence_length: int = 10,
epochs: int = 50, batch_size: int = 32,
validation_split: float = 0.1,
forecast_steps: int = 30): # 新しい引数: 未来の予測ステップ数
"""
LSTMモデルを使用して時系列データの未来予測を行います。
Args:
df (pd.DataFrame): 処理するデータフレーム。
time_col (str): タイムスタンプの列名。
value_col (str): 時系列データの値の列名。
sequence_length (int, optional): LSTMに入力するシーケンスの長さ(過去何点を見るか)。デフォルトは10。
epochs (int, optional): 学習エポック数。デフォルトは50。
batch_size (int, optional): バッチサイズ。デフォルトは32。
validation_split (float, optional): 学習データの検証用分割比率。デフォルトは0.1。
forecast_steps (int, optional): 未来に予測するステップ数。デフォルトは30。
Returns:
pd.DataFrame: 'normalized_value', 'lstm_past_prediction', 'lstm_future_prediction' 列が追加されたデータフレーム。
"""
df_processed = df.set_index(time_col).copy()
data = df_processed[value_col].values.reshape(-1, 1) # LSTMは2D配列を要求するため
# --- 1. データ正規化 ---
# MinMaxScalerを使ってデータを0-1の範囲に正規化
scaler = MinMaxScaler(feature_range=(0, 1))
normalized_data = scaler.fit_transform(data)
# df_processed['normalized_value'] は今回は直接使わないが、デバッグ用に残すことも可能
# --- 2. シーケンスデータの作成 ---
X, y = create_sequences(normalized_data, sequence_length)
# LSTMの入力形状: (サンプル数, シーケンス長, 特徴量数)
# ここでは単一の時系列なので特徴量数は1
n_features = 1
X = X.reshape(X.shape[0], X.shape[1], n_features)
# --- 3. LSTMモデルの構築 ---
print("--- LSTMモデルの構築 ---")
model = Sequential([
LSTM(units=50, activation='relu', input_shape=(sequence_length, n_features), return_sequences=False),
# return_sequences=Falseは、次の層にシーケンスではなく最後の出力だけを渡す
Dropout(0.2), # ドロップアウトで過学習を抑制
Dense(units=1) # 次の1点を予測する
])
model.compile(optimizer='adam', loss='mse') # 回帰問題なので損失関数はMSE (平均二乗誤差)
model.summary()
# --- 4. モデルの学習 ---
print("--- LSTMモデルの学習 ---")
# EarlyStopping: 検証ロスが改善しなくなったら学習を停止
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True, verbose=0)
history = model.fit(X, y,
epochs=epochs,
batch_size=batch_size,
validation_split=validation_split,
callbacks=[early_stopping],
verbose=1) # verbose=1 で学習の進捗を表示
# --- 5. 過去データに対する予測 (学習データに対する予測) ---
past_predictions_normalized = model.predict(X)
past_predictions = scaler.inverse_transform(past_predictions_normalized)
# 予測結果をデータフレームに追加
df_processed['lstm_past_prediction'] = np.nan # 初期化
# 予測はsequence_lengthだけ遅れて始まる
df_processed['lstm_past_prediction'].iloc[sequence_length:] = past_predictions.flatten()
# --- 6. 未来予測 ---
print(f"\n--- {forecast_steps}ステップの未来予測を開始 ---")
future_predictions_normalized = []
# 未来予測のための最後の既知のシーケンス
current_sequence = normalized_data[-sequence_length:].reshape(1, sequence_length, n_features)
for _ in range(forecast_steps):
next_prediction_normalized = model.predict(current_sequence, verbose=0)[0]
future_predictions_normalized.append(next_prediction_normalized)
# シーケンスを更新: 古いデータを捨て、新しい予測値を追加
current_sequence = np.append(current_sequence[:, 1:, :], next_prediction_normalized.reshape(1, 1, n_features), axis=1)
# 未来予測を元のスケールに戻す
future_predictions = scaler.inverse_transform(np.array(future_predictions_normalized).reshape(-1, 1)).flatten()
# 未来のタイムスタンプを生成
last_timestamp = df_processed.index[-1]
# 仮にデータが日次であると仮定(pd.Timedelta('1 day'))
future_timestamps = pd.date_range(start=last_timestamp + pd.Timedelta('1 day'), periods=forecast_steps, freq='D')
# 未来予測の結果を格納するDataFrameを作成
df_future = pd.DataFrame(index=future_timestamps, data=future_predictions, columns=['lstm_future_prediction'])
# 元のデータフレームと未来予測を結合(NaNを適切に扱うためにconcatはせず、別々にプロット)
# プロットのために、元のdf_processedに未来予測の値を結合せず、分離して扱います
# --- 結果の可視化 ---
plt.figure(figsize=(14, 7))
# 過去の測定値 (学習用)
plt.plot(df_processed.index, df_processed[value_col], label='過去の測定値 (学習用)', color='blue', alpha=0.9)
# 過去のLSTM予測 (学習データに対する予測)
# 予測はsequence_lengthだけ遅れて始まるため、その部分だけプロット
plt.plot(df_processed.index[sequence_length:], df_processed['lstm_past_prediction'].dropna(),
label='LSTM予測 (学習データ)', color='green', linestyle='--', alpha=0.8)
# 未来のLSTM予測
plt.plot(df_future.index, df_future['lstm_future_prediction'],
label='LSTM未来予測', color='red', linestyle='-.', alpha=0.8)
# 予測開始点を示す縦の点線
plt.axvline(x=last_timestamp, color='gray', linestyle=':', label='予測開始時点')
plt.title('LSTMによる未来予測')
plt.xlabel('日付')
plt.ylabel('測定値')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()
# 結果として、元のdf_processedと未来予測のDataFrameを返すことも可能
# ここではプロットに必要なデータだけを準備し、元のdf_processedは過去の予測を持つ
# 必要であれば、df_processedとdf_futureを結合したものを返すこともできます
return df_processed, df_future
# --- メインの実行部分 ---
if __name__ == "__main__":
# CSVを読み込んで関数を実行
df = pd.read_csv("dummy_timeseries.csv", parse_dates=["timestamp"])
# LSTMモデルのパラメータ
# sequence_length: 過去何日間のデータから予測するか。
# epochs: モデルを学習する回数。データ量と複雑性による。
# batch_size: 一度に学習するデータの数。
# forecast_steps: 未来に何日分予測するか。
past_df, future_df = lstm_future_forecasting(df,
time_col="timestamp",
value_col="value",
sequence_length=15, # 例: 過去15日間のデータを見る
epochs=100, # 学習回数を増やす (EarlyStoppingで調整される)
batch_size=16, # バッチサイズを小さくする
forecast_steps=14) # 未来14日分を予測
if past_df is not None:
print("\n--- 関数実行結果 (過去の予測値含む) ---")
print(past_df.tail())
if future_df is not None:
print("\n--- 未来予測結果 ---")
print(future_df.head())
print(future_df.tail())
--- 14ステップの未来予測を開始 ---
--- 関数実行結果 (過去の予測値含む) ---
value lstm_past_prediction
timestamp
2025-04-06 42.682425 47.723091
2025-04-07 51.480601 46.894085
2025-04-08 51.305276 46.661629
2025-04-09 50.025567 46.414650
2025-04-10 48.827064 46.241364
--- 未来予測結果 ---
lstm_future_prediction
2025-04-11 46.055107
2025-04-12 45.875980
2025-04-13 45.649559
2025-04-14 45.451019
2025-04-15 45.292248
lstm_future_prediction
2025-04-20 44.520313
2025-04-21 44.411457
2025-04-22 44.347607
2025-04-23 44.224552
2025-04-24 44.113136
LSTMによる異常検知

LSTMは、学習結果を元に予測した値と測定値との差=予測誤差(再構築誤差)が大きい場合に異常と判断します。
従来の統計モデル(ARIMA, ES, Prophet)と比較して、LSTMは長期的な依存関係をより詳細に学習でき、非線形な変化にも柔軟に対応可能です(例:SARIMAよりも複雑な変動に対応)。しかし、その計算負荷の高さから、リアルタイムでの異常検知には不向きな場合があります。
LSTMは、事前に学習した正常なデータパターンに基づいて予測モデルを構築します。このため、学習データに異常値が混入していると、モデルが異常を正常と誤認識するリスクがあるため、クリーンな学習データが不可欠です。
また、閾値の設定には、予測誤差の統計的特性(平均、標準偏差)に基づく方法のほか、ヒストグラム分析、パーセンタイル、または教師なし学習アルゴリズム(例:One-Class SVM, Isolation Forestなどと組み合わせて、異常スコアの分布から自動的に閾値を決定する方法)を用いることもあります。
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import rcParams
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.callbacks import EarlyStopping
import warnings
# TensorFlowの警告を非表示にする
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' # INFOとWARNINGメッセージを非表示にする
warnings.filterwarnings("ignore") # その他の警告メッセージを非表示にする
# 日本語フォントの設定 (お使いの環境に合わせて適宜変更してください)
rcParams['font.family'] = 'Meiryo' # Windowsの場合
# rcParams['font.family'] = 'AppleGothic' # Macの場合
# rcParams['font.family'] = 'DejaVu Sans' # デフォルト、日本語対応しない場合がある
# --- データのシーケンス変換関数 ---
def create_sequences(data, sequence_length):
"""
時系列データをLSTM入力用のシーケンスに変換する。
例: [1,2,3,4,5] -> sequence_length=3 -> X=[[1,2,3]], y=[4]
"""
X, y = [], []
for i in range(len(data) - sequence_length):
X.append(data[i:(i + sequence_length)])
y.append(data[i + sequence_length])
return np.array(X), np.array(y)
def lstm_anomaly_detection(df: pd.DataFrame, time_col: str, value_col: str,
sequence_length: int = 10,
epochs: int = 50, batch_size: int = 32,
validation_split: float = 0.1,
threshold_sigma: float = 3.0):
"""
LSTMモデルを使用して時系列データの異常検知を行います。
予測誤差(再構築誤差)の閾値に基づいて異常を検知します。
Args:
df (pd.DataFrame): 処理するデータフレーム。
time_col (str): タイムスタンプの列名。
value_col (str): 時系列データの値の列名。
sequence_length (int, optional): LSTMに入力するシーケンスの長さ(過去何点を見るか)。デフォルトは10。
epochs (int, optional): 学習エポック数。デフォルトは50。
batch_size (int, optional): バッチサイズ。デフォルトは32。
validation_split (float, optional): 学習データの検証用分割比率。デフォルトは0.1。
threshold_sigma (float, optional): 異常値を判断するための予測誤差の標準偏差の倍数。デフォルトは3.0。
Returns:
pd.DataFrame: 'normalized_value', 'lstm_prediction', 'prediction_error', 'is_anomaly' 列が追加されたデータフレーム。
"""
df_processed = df.set_index(time_col).copy()
data = df_processed[value_col].values.reshape(-1, 1) # LSTMは2D配列を要求するため
# --- 1. データ正規化 ---
# MinMaxScalerを使ってデータを0-1の範囲に正規化
scaler = MinMaxScaler(feature_range=(0, 1))
normalized_data = scaler.fit_transform(data)
df_processed['normalized_value'] = normalized_data # 後でプロットするために保存
# --- 2. シーケンスデータの作成 ---
X, y = create_sequences(normalized_data, sequence_length)
# LSTMの入力形状: (サンプル数, シーケンス長, 特徴量数)
# ここでは単一の時系列なので特徴量数は1
n_features = 1
X = X.reshape(X.shape[0], X.shape[1], n_features)
# --- 3. LSTMモデルの構築 ---
print("--- LSTMモデルの構築 ---")
model = Sequential([
LSTM(units=50, activation='relu', input_shape=(sequence_length, n_features), return_sequences=False),
# return_sequences=Falseは、次の層にシーケンスではなく最後の出力だけを渡す
Dropout(0.2), # ドロップアウトで過学習を抑制
Dense(units=1) # 次の1点を予測する
])
model.compile(optimizer='adam', loss='mse') # 回帰問題なので損失関数はMSE (平均二乗誤差)
model.summary()
# --- 4. モデルの学習 ---
print("--- LSTMモデルの学習 ---")
# EarlyStopping: 検証ロスが改善しなくなったら学習を停止
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True)
history = model.fit(X, y,
epochs=epochs,
batch_size=batch_size,
validation_split=validation_split,
callbacks=[early_stopping],
verbose=1) # verbose=1 で学習の進捗を表示
# --- 5. 予測と予測誤差の計算 ---
# 学習データ全体に対する予測
# 予測はシーケンス長だけ遅れて始まる
predictions_normalized = model.predict(X)
# 予測を元のスケールに戻す
predictions = scaler.inverse_transform(predictions_normalized)
# 予測誤差を計算 (元のスケールで)
# 予測誤差のインデックスを調整
df_processed['lstm_prediction'] = np.nan # 初期化
df_processed['lstm_prediction'][sequence_length:] = predictions.flatten()
# 予測誤差 (実際の値 - 予測値)
# NaN値が含まれるため、計算はNaNを無視しないとできない
df_processed['prediction_error'] = df_processed[value_col] - df_processed['lstm_prediction']
# --- 6. 異常検知 (予測誤差に基づく) ---
# 学習データの誤差の分布を基準にする
# NaN値を除外して誤差の統計を計算
train_errors = df_processed['prediction_error'].dropna()
error_mean = train_errors.mean()
error_std = train_errors.std()
# 閾値設定 (例: 平均から threshold_sigma 標準偏差以上離れている場合)
df_processed['is_anomaly'] = (df_processed['prediction_error'] > error_mean + threshold_sigma * error_std) | \
(df_processed['prediction_error'] < error_mean - threshold_sigma * error_std)
# シーケンス長より前のデータはNaNになるので、異常検知の対象外とする
df_processed['is_anomaly'] = df_processed['is_anomaly'].fillna(False)
# 検出された異常値
anomalies = df_processed[df_processed['is_anomaly']]
if not anomalies.empty:
print("\n--- 検出された異常値 ---")
print(anomalies[[value_col, 'lstm_prediction', 'prediction_error', 'is_anomaly']])
else:
print("\n--- 異常値は検出されませんでした ---")
# --- 結果の可視化 (グラフを縦に並べる) ---
fig, axes = plt.subplots(nrows=2, ncols=1, figsize=(14, 10), sharex=True)
# 1つ目のサブプロット: 測定値とLSTM予測
axes[0].plot(df_processed.index, df_processed[value_col], label='測定値', color='blue', alpha=0.7)
axes[0].plot(df_processed.index, df_processed['lstm_prediction'], label='LSTM予測', color='orange', linestyle='--')
if not anomalies.empty:
axes[0].scatter(anomalies.index, anomalies[value_col], color='red', s=100, zorder=5, label='異常値 (LSTM誤差に基づく)')
axes[0].set_title('LSTMモデルによる時系列異常検知 (測定値と予測)')
axes[0].set_ylabel('測定値')
axes[0].legend()
axes[0].grid(True)
# 2つ目のサブプロット: 予測誤差と閾値
axes[1].plot(df_processed.index, df_processed['prediction_error'], label='予測誤差', color='purple', alpha=0.7)
axes[1].axhline(error_mean, color='gray', linestyle='--', label='誤差平均')
axes[1].axhline(error_mean + threshold_sigma * error_std, color='orange', linestyle='--', label=f'+{threshold_sigma}σ閾値')
axes[1].axhline(error_mean - threshold_sigma * error_std, color='orange', linestyle='--', label=f'-{threshold_sigma}σ閾値')
if not anomalies.empty:
axes[1].scatter(anomalies.index, anomalies['prediction_error'], color='red', s=100, zorder=5, label='異常誤差')
axes[1].set_title('LSTMモデル予測誤差と異常検知閾値')
axes[1].set_xlabel('日付')
axes[1].set_ylabel('予測誤差')
axes[1].legend()
axes[1].grid(True)
plt.tight_layout()
plt.show()
return df_processed
# --- メインの実行部分 ---
if __name__ == "__main__":
# CSVを読み込んで関数を実行
df = pd.read_csv("dummy_timeseries.csv", parse_dates=["timestamp"])
# LSTMモデルのパラメータ
# sequence_length: 過去何日間のデータから予測するか。
# epochs: モデルを学習する回数。データ量と複雑性による。
# batch_size: 一度に学習するデータの数。
# threshold_sigma: 何σを異常とするか。
result_df = lstm_anomaly_detection(df,
time_col="timestamp",
value_col="value",
sequence_length=15, # 例: 過去15日間のデータを見る
epochs=100, # 学習回数を増やす (EarlyStoppingで調整される)
batch_size=16, # バッチサイズを小さくする
threshold_sigma=3.0)
if result_df is not None:
print("\n--- 関数実行結果 (異常値検出フラグ含む) ---")
print(result_df.tail())
--- 検出された異常値 ---
value lstm_prediction prediction_error is_anomaly
timestamp
2025-03-23 71.785563 48.471062 23.314501 True
2025-03-24 77.389470 49.677299 27.712171 True
LSTMによるデータ補正・クリーニング

LSTMで異常と判断されたデータを、予測値(最も可能性の高い正常な値)で置き換えることで、データ補正・クリーニングを行います。これは、LSTMが正常なデータパターンを学習しているためです。
逆に、モデルが適切に正常パターンを学習できていない場合や、異常が長期にわたる場合は、補正の精度が低下する可能性があります。
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from matplotlib import rcParams
from sklearn.preprocessing import MinMaxScaler
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout
from tensorflow.keras.callbacks import EarlyStopping
import warnings
# TensorFlowの警告を非表示にする
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' # INFOとWARNINGメッセージを非表示にする
warnings.filterwarnings("ignore") # その他の警告メッセージを非表示にする
# 日本語フォントの設定 (お使いの環境に合わせて適宜変更してください)
try:
rcParams['font.family'] = 'Meiryo' # Windowsの場合
# rcParams['font.family'] = 'AppleGothic' # Macの場合
# rcParams['font.family'] = 'DejaVu Sans' # フォールバック(日本語表示はされない可能性あり)
except Exception:
print("警告: 日本語フォントの設定に失敗しました。グラフの日本語が正しく表示されない可能性があります。")
pass
# --- データのシーケンス変換関数 ---
def create_sequences(data, sequence_length):
"""
時系列データをLSTM入力用のシーケンスに変換する。
例: [1,2,3,4,5] -> sequence_length=3 -> X=[[1,2,3]], y=[4]
"""
X, y = [], []
for i in range(len(data) - sequence_length):
X.append(data[i:(i + sequence_length)])
y.append(data[i + sequence_length])
return np.array(X), np.array(y)
def lstm_anomaly_detection_and_correction(df: pd.DataFrame, time_col: str, value_col: str,
sequence_length: int = 10,
epochs: int = 50, batch_size: int = 32,
validation_split: float = 0.1,
confidence_level_sigma: float = 3.0): # 信頼水準(標準偏差の倍数)
"""
LSTMモデルを使用して時系列データの異常検知と補正を行います。
予測誤差の閾値に基づいて異常を検知し、予測値で補正します。
Args:
df (pd.DataFrame): 処理するデータフレーム。
time_col (str): タイムスタンプの列名。
value_col (str): 時系列データの値の列名。
sequence_length (int, optional): LSTMに入力するシーケンスの長さ(過去何点を見るか)。デフォルトは10。
epochs (int, optional): 学習エポック数。デフォルトは50。
batch_size (int, optional): バッチサイズ。デフォルトは32。
validation_split (float, optional): 学習データの検証用分割比率。デフォルトは0.1。
confidence_level_sigma (float, optional): 異常値を判断するための予測誤差の標準偏差の倍数。デフォルトは3.0 (約99.7%信頼区間)。
Returns:
pd.DataFrame: 'normalized_value', 'lstm_prediction', 'prediction_error', 'is_anomaly', 'corrected_value' 列が追加されたデータフレーム。
"""
df_processed = df.set_index(time_col).copy()
data = df_processed[value_col].values.reshape(-1, 1) # LSTMは2D配列を要求するため
# --- 1. データ正規化 ---
scaler = MinMaxScaler(feature_range=(0, 1))
normalized_data = scaler.fit_transform(data)
df_processed['normalized_value'] = normalized_data
# --- 2. シーケンスデータの作成 ---
X, y = create_sequences(normalized_data, sequence_length)
n_features = 1
X = X.reshape(X.shape[0], X.shape[1], n_features)
# --- 3. LSTMモデルの構築 ---
print("--- LSTMモデルの構築 ---")
model = Sequential([
LSTM(units=50, activation='relu', input_shape=(sequence_length, n_features), return_sequences=False),
Dropout(0.2),
Dense(units=1)
])
model.compile(optimizer='adam', loss='mse')
model.summary()
# --- 4. モデルの学習 ---
print("--- LSTMモデルの学習 ---")
early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True, verbose=0)
history = model.fit(X, y,
epochs=epochs,
batch_size=batch_size,
validation_split=validation_split,
callbacks=[early_stopping],
verbose=1)
# --- 5. 予測と予測誤差の計算 ---
predictions_normalized = model.predict(X)
predictions = scaler.inverse_transform(predictions_normalized)
df_processed['lstm_prediction'] = np.nan
df_processed['lstm_prediction'].iloc[sequence_length:] = predictions.flatten()
# 予測誤差 (実際の値 - 予測値)
df_processed['prediction_error'] = df_processed[value_col] - df_processed['lstm_prediction']
# --- 6. 異常検知と信頼区間の計算 ---
# 学習データの誤差の分布を基準にする
train_errors = df_processed['prediction_error'].dropna()
error_mean = train_errors.mean()
error_std = train_errors.std()
# 信頼区間の上下限を計算
upper_bound = error_mean + confidence_level_sigma * error_std
lower_bound = error_mean - confidence_level_sigma * error_std
df_processed['upper_bound'] = df_processed['lstm_prediction'] + upper_bound
df_processed['lower_bound'] = df_processed['lstm_prediction'] + lower_bound
# 異常値の判定
# 予測誤差が信頼区間外にある場合、または実際の値が信頼区間外にある場合を異常とする
# ここでは、予測値からの誤差が閾値を超える場合を異常とする (画像に合わせて)
df_processed['is_anomaly'] = (df_processed['prediction_error'] > upper_bound) | \
(df_processed['prediction_error'] < lower_bound)
# シーケンス長より前のデータはNaNになるので、異常検知の対象外とする
df_processed['is_anomaly'] = df_processed['is_anomaly'].fillna(False)
# --- 7. 異常値の補正 ---
df_processed['corrected_value'] = df_processed[value_col].copy()
# 異常と判断された点をLSTMの予測値で補正
df_processed.loc[df_processed['is_anomaly'], 'corrected_value'] = \
df_processed.loc[df_processed['is_anomaly'], 'lstm_prediction']
# 検出された異常値と補正値
anomalies = df_processed[df_processed['is_anomaly']]
if not anomalies.empty:
print("\n--- 検出された異常値と補正結果 ---")
print(anomalies[[value_col, 'lstm_prediction', 'prediction_error', 'is_anomaly', 'corrected_value']])
else:
print("\n--- 異常値は検出されませんでした ---")
# --- 結果の可視化 ---
plt.figure(figsize=(14, 7))
# 元データ
plt.plot(df_processed.index, df_processed[value_col], label='元データ', color='blue', alpha=0.9)
# LSTM予測値
plt.plot(df_processed.index[sequence_length:], df_processed['lstm_prediction'].dropna(),
label='LSTM予測値', color='green', linestyle='--', alpha=0.8)
# 99%予測区間(信頼区間)
# 予測区間は予測値を中心に上下に広がる
# df_processed['upper_bound']とdf_processed['lower_bound']はNaNが含まれるのでdropna()
plt.fill_between(df_processed.index[sequence_length:],
df_processed['lower_bound'].dropna(),
df_processed['upper_bound'].dropna(),
color='lightgray', alpha=0.5, label=f'{int(confidence_level_sigma*100/3*99.7)/100:.0f}% 予測区間') # 3sigmaで約99.7%
# 異常値 (補正前)
# is_anomalyがTrueで、かつ値がNaNでないものだけをプロット
anomalies_to_plot = df_processed[df_processed['is_anomaly'] & df_processed[value_col].notna()]
if not anomalies_to_plot.empty:
plt.scatter(anomalies_to_plot.index, anomalies_to_plot[value_col],
color='red', s=50, zorder=5, label='異常値 (補正前)', marker='o')
# 補正値
corrected_points = df_processed[df_processed['is_anomaly'] & df_processed['corrected_value'].notna()]
if not corrected_points.empty:
plt.scatter(corrected_points.index, corrected_points['corrected_value'],
color="limegreen", label="補正値", edgecolors="darkgreen", linewidths=1.5)
plt.title('LSTMによる異常検知と補正')
plt.xlabel('日付')
plt.ylabel('測定値')
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.show()
return df_processed
# --- メインの実行部分 ---
if __name__ == "__main__":
# CSVを読み込んで関数を実行
df = pd.read_csv("dummy_timeseries.csv", parse_dates=["timestamp"])
# LSTMモデルのパラメータ
# sequence_length: 過去何日間のデータから予測するか。
# epochs: モデルを学習する回数。データ量と複雑性による。
# batch_size: 一度に学習するデータの数。
# confidence_level_sigma: 何σを異常と判断するかの基準。3.0で約99.7%信頼区間。
result_df = lstm_anomaly_detection_and_correction(df,
time_col="timestamp",
value_col="value",
sequence_length=15, # 例: 過去15日間のデータを見る
epochs=100, # 学習回数を増やす (EarlyStoppingで調整される)
batch_size=16, # バッチサイズを小さくする
confidence_level_sigma=3.0) # 3σで異常と判断
if result_df is not None:
print("\n--- 関数実行結果 (異常値検出・補正フラグ含む) ---")
print(result_df.tail())
--- 検出された異常値と補正結果 ---
value lstm_prediction prediction_error is_anomaly corrected_value
timestamp
2025-03-23 71.785563 46.811966 24.973597 True 46.811966
2025-03-24 77.389470 47.761429 29.628041 True 47.761429
--- 関数実行結果 (異常値検出・補正フラグ含む) ---
value normalized_value lstm_prediction prediction_error upper_bound lower_bound is_anomaly corrected_value
timestamp
2025-04-06 42.682425 0.142786 47.957775 -5.275350 71.629384 32.177871 False 42.682425
2025-04-07 51.480601 0.360088 47.125462 4.355140 70.797071 31.345557 False 51.480601
2025-04-08 51.305276 0.355758 46.848446 4.456830 70.520055 31.068541 False 51.305276
2025-04-09 50.025567 0.324151 46.572323 3.453244 70.243932 30.792418 False 50.025567
2025-04-10 48.827064 0.294550 46.376949 2.450115 70.048558 30.597045 False 48.827064
まとめ
本記事では、LSTMの基本的な仕組みからモデル構築・学習の方法、そしてPythonを用いた実践的な時系列データへの応用例まで解説しました。
LSTMは長期的な時系列の依存関係を効果的に捉えることができ、複雑で非線形なデータの予測や異常検知、データ補正に非常に有用な手法です。
TensorFlowやPyTorchを使うことで、実務に即したモデルを比較的容易に構築でき、多くの業界で幅広く応用されています。
これまでの統計的手法や単純な機械学習モデルでは難しかった課題に挑戦するために、ぜひLSTMを活用してみてください。
コメント