MENU

【Python実践】Isolation Forestで異常検知を行う(コピペで使えるサンプルコード付き)

Isolation Forest(アイソレーションフォレスト)は、機械学習の異常検知手法の一つで、特に大量のデータから異常なデータポイントを効率的に検出できることで注目されています。

このアルゴリズムは、データの「孤立度」を基に異常を検知するユニークなアプローチを取っています。

製造業においても、Isolation Forestは品質管理や設備保全の異常を早期に発見する手段として活用されており、従来の手法では見逃されやすい異常を見つけることが可能です。

この記事では、Isolation Forestの概要から製造業での応用、具体的な実装方法まで、詳しく解説します。

目次

Isolation Forestとは?

IsolationForestの概念図

Isolation Forestは、異常検知のためのアルゴリズムであり、データの孤立度を利用して異常を特定します。

Isolation Forestは決定木を多数生成し、各データポイントがどれだけ簡単に他のデータから分離できるか(孤立できるか)を評価します。

一般的に、異常なデータポイントは少数派であり、通常のデータと比べて早く孤立されるため、短い経路で検出されます。

より詳しい説明は、株式会社 zero to oneIsolation Forestをわかりやすく!異常検知アルゴリズムの基礎と応用」が参考になります。

メリット

  • 高効率・高速処理:大規模なデータセットでも迅速に処理できる。
  • 前処理が少ない:従来の異常検知手法と比べ、データの前処理がほとんど必要ない。
  • ノンパラメトリック:分布に依存せず、汎用性が高い。
  • スケーラビリティ:大量のデータにも適応しやすく、異常を効果的に見つけることができる。

デメリット

  • 解釈性の低さ:異常の原因が特定しにくいことがある。
  • 高次元データに弱い:次元が多いデータに対しては精度が低下する可能性がある。
  • 外れ値の種類による影響:外れ値が大きく異なる場合、その検出精度に影響が出ることがある。

Isolation Forestに適したデータ

Isolation Forestは、異常が通常のデータに比べて少数派であり、その孤立度が高いデータに適しています。製造業で用いられるセンサーデータや、金融業界での不正検知データなど、異常が稀に発生するデータに効果的です。

Isolation Forestの製造業における用途

Isolation Forest は、製造業において異常検知のために非常に効果的な手法です。特に、機器の故障予測、品質管理、生産効率の向上などの領域で広く応用されています。以下に、具体的な用途を解説します。

  • 機器の故障予測
    製造業では、機械の故障が生産に大きな影響を与えます。Isolation Forestを使うことで、センサーから得られる機器の動作データを解析し、通常とは異なるパターンを検出して早期に故障を予測できます。例えば、振動や温度、電流などのデータに対してIsolation Forestを適用し、異常な値をリアルタイムで検知することで、計画的なメンテナンスを行うことができます。

  • 品質管理
    製品の品質を確保するためには、製造プロセスでの異常を検出することが重要です。Isolation Forestは、製造ラインのセンサーデータや製品の特性データを監視し、異常な製品やプロセスの逸脱を検知します。これにより、製造段階での異常を迅速にキャッチし、欠陥品が市場に出回る前に対処できます。

  • 生産プロセスの最適化
    製造プロセスの最適化では、通常の動作範囲から逸脱するデータを検出することで、非効率な部分を見つけ出し、改善の糸口を探すことができます。例えば、生産速度、温度、圧力などのパラメータに対してIsolation Forestを適用し、異常値を見つけることで、プロセスの無駄やエネルギーの浪費を最小化できます。

  • サプライチェーンの異常検知
    製造業におけるサプライチェーン管理において、物流データや在庫データなどを監視し、異常な供給遅延や在庫過剰を検出することで、サプライチェーンの効率を高めることができます。これにより、供給不足や過剰在庫を未然に防ぎ、コストの削減に貢献します。

  • 不良品の早期検知
    製造プロセスで発生する不良品は、量産体制に大きなコスト負担を強いることがあります。Isolation Forest は、通常の製品の特性を学習し、そこから逸脱するデータを迅速に検出します。これにより、不良品の発生を早期に発見し、対策を講じることで、生産コストの低減が可能です。

Isolation Forestの実装方法

モジュールのインストール

Isolation Forestを実装するには、Pythonとscikit-learnライブラリが必要です。以下のコマンドで必要なライブラリをインストールできます。

pip install scikit-learn numpy pandas

学習と異常検知

  • このサンプルプログラムでは、①学習データ生成 ②学習 ③テストデータ(異常データ)投入 ④判定結果の出力
    という一連の処理を行っています。
  • 実データは正規分布ではない場合が多いため、学習データは3つの分布をあえて混在させています。
  • Isolation Forestに渡すデータは2次元以上のデータでないと処理できないため、np.random.randn(100, 2) を使って2次元データを100個分作成しています。
  • 異常検知したいデータが1次元の場合は、reshape(-1, 1) を使って2次元してからfit() の引数に指定する必要があります。
  • 後ほど紹介する自作クラスでは、1次元データをそのまま引数に指定できるよう考慮されています。
import numpy as np
from sklearn.ensemble import IsolationForest

# 学習データ(100個の値を持つ2次元の正常データ)の作成
np.random.seed(42)
X_train = 0.3 * np.random.randn(100, 2)
X_train = np.r_[X_train + 2, X_train - 2,X_train]

# モデルの作成と学習
model = IsolationForest(contamination=0.05, random_state=42).fit(X_train)

# 出来上がったモデルに学習データを投入し、判定させてみる
y_pred_train = model.predict(X_train)
print("正常データの判定結果:",y_pred_train.tolist())

# テストデータを生成(異常と判定されるよう、振幅を-4~4で生成)
X_test = np.random.uniform(low=-4, high=4, size=(20, 2))

# モデルに異常データを投入し、判定させてみる
y_pred_test = model.predict(X_test)
print("異常データの判定結果:",y_pred_test.tolist())

実行結果は次の通りです。
Isolation Forest は正常データを1,異常データを-1 として判定結果を返します。
「正常データの判定結果」は学習を投入した時の結果、「異常データの判定結果」はテストデータを投入した時の結果です。
正常データだけを使って学習していますが、学習した分布が全ての正常データを網羅できないため、いくつかのデータは異常判定されています(ハイパーパラメータの調整が必要)。

常データの判定結果: [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, -1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, -1, 1, 1, -1, 1, 1, 1, 1, -1, -1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, -1, 1, 1, 1, 1, -1, 1, 1, 1, 1, 1, -1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, -1, 1, -1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, -1, -1, 1, 1, 1, 1, -1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, -1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, -1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
異常データの判定結果: [-1, 1, -1, -1, -1, 1, -1, 1, -1, -1, -1, 1, -1, -1, -1, -1, -1, -1, -1, -1]

可視化

上記のグラフは、学習データ(正常)とテストデータ(異常)を散布図としてプロットし、さらに異常スコアで境界線(等高線)を描画したものです。
先ほどのプログラムの末尾に下記のコードを丸ごと挿入してもらえればグラフが描画されます。関数化しているので、コピペしてお使いいただけます。

import matplotlib.pyplot as plt
from matplotlib import rcParams
rcParams['font.family'] = 'Meiryo'

def plot_isolationforest_results(model, X_train=None, y_pred_train=None, X_test=None, y_pred_test=None):
    """
    2次元データのIsolation Forestの結果をプロットする。
    """
    plt.figure(figsize=(8, 6))

    if X_train is not None and y_pred_train is not None and X_train.ndim > 1:
        plt.scatter(X_train[y_pred_train == 1][:, 0], X_train[y_pred_train == 1][:, 1],
                    c='lightblue', edgecolors='k', s=70, label="正常データ")
        plt.scatter(X_train[y_pred_train == -1][:, 0], X_train[y_pred_train == -1][:, 1],
                    c='orange', edgecolors='k', s=70, label="異常データ(学習データ)")

    if X_test is not None and y_pred_test is not None and X_test.ndim > 1:
        plt.scatter(X_test[y_pred_test == 1][:, 0], X_test[y_pred_test == 1][:, 1],
                    c='green', edgecolors='k', s=70, label="正常データ(テストデータ)")
        plt.scatter(X_test[y_pred_test == -1][:, 0], X_test[y_pred_test == -1][:, 1],
                    c='red', edgecolors='k', s=70, label="異常データ(テストデータ)")

    xx, yy = np.meshgrid(np.linspace(X_train[:, 0].min() - 1, X_train[:, 0].max() + 1, 500),
                         np.linspace(X_train[:, 1].min() - 1, X_train[:, 1].max() + 1, 500))
    x_range = np.c_[xx.ravel(), yy.ravel()]

    Z = model.decision_function(x_range)
    Z = Z.reshape(xx.shape)

    plt.contour(xx, yy, Z, levels=[0], linewidths=2, colors='black')
    plt.contourf(xx, yy, Z, levels=np.linspace(Z.min(), 0, 7), cmap=plt.cm.Blues_r, alpha=0.3)

    plt.legend()
    plt.title("Isolation Forest 異常検知と決定境界", fontsize=16)
    plt.xlabel("特徴量 1", fontsize=12)
    plt.ylabel("特徴量 2", fontsize=12)
    plt.grid(True, linestyle='--', alpha=0.7)
    plt.show()

# プロット関数を実行
plot_isolationforest_results(model,X_train,y_pred_train,X_test,y_pred_test)

パラメータ調整とモデルの精度向上

Isolation Forestの主なパラメータには、次の項目があります。これらのパラメータを調整することで、モデルの精度を向上させることが可能です。特に、異常データの割合に応じて contamination を適切に設定することが重要です。

パラメータ名説明調整のポイント
n_estimators決定木の数多いほど精度向上に繋がるが、計算コストが増加。一般的に100~1000程度が使用される。
max_samples各決定木を構築するためにサンプリングするデータの割合auto' (デフォルト): データセットのサイズに応じて自動的に決定。数値を指定することも可能。
contaminationデータセットにおける異常データの割合を推定するパラメータ異常データの割合を事前にある程度把握している場合に有効。
max_features各決定木で分割に使用する特徴量の最大数を比率で指定0~1の間で指定。
1:全ての特徴量を使用 0.5:特徴量の半分を使用
random_state乱数生成器のシードモデルの再現性を確保するために設定する。

ハイパーパラメータの調整には、クロスバリデーションを活用するのが効果的です。例えば、GridSerchCV を用いて最適なパラメータを探索できます。

from sklearn.ensemble import IsolationForest
from sklearn.model_selection import GridSearchCV

# ハイパーパラメータの探索範囲を設定
param_grid = {
    'contamination': [0.01, 0.05, 0.1],
    'n_estimators': [100, 200, 300],
    'max_samples': ['auto', 0.5, 1.0],
    'max_features': [1.0,0.0.5,0.1]
}

# GridSearchCVのインスタンスを作成
grid_search = GridSearchCV(IsolationForest(), param_grid, cv=5)

# グリッドサーチを実行
grid_search.fit(X_train)

# 最適なパラメータを表示
print("Best parameters:", grid_search.best_params_)

ハイパーパラメータを調整しても異常検知の結果に満足できない場合は、特徴量エンジニアリングを行います。
例えば、正規化、標準化、分布の調整、標準偏差、分散、移動平均などの演算、他の特徴量との相関係数の利用などが考えられます。
また、OneClassSVM、Local Outlier Factor (LOF)、AutoEncoderなど、別のアルゴリズムに切り替えることも検討すべきです。

Isolation Forestが簡単に使える自作クラス

Isolation Forestを簡単に使えるようにするために自作クラスとグラフ描画関数を作成しました。
このクラスのfit() メソッドは、1次元データが渡された場合、内部でreshape(-1, 1)で2次元に拡張しているため、問題なく処理できます。

モデルを作成するには

IsolationForestUtilクラスのインスタンス生成時、引数にデータを格納したDataFrameを渡します。そして、fit メソッドにモデル作成で使いたいカラム名と、ハイパーパラメータを指定します。
read_csvメソッドを使うと、CSVファイルを読み込んでインスタンス変数(self.df)に保持します。
完成したモデルは save_modelメソッドで pickle ファイルとして保存できます。

import numpy as np
import pandas as pd
from isolation_forest import IsolationForestUtil,plot_isolationforest_results

# 学習データの生成
np.random.seed(42)
X_train_data = 0.5 * np.random.randn(100, 2)
X_train_data = np.r_[X_train_data + 1, X_train_data - 1, X_train_data]
df_train = pd.DataFrame(X_train_data, columns=['feature1', 'feature2'])

# インスタンスを生成
iso_for = IsolationForestUtil(df_train)

# モデルを学習
iso_for.fit(['feature1', 'feature2'], contamination=0.05, random_state=42)

# 学習データで異常検知を実施
y_pred_train = iso_for.predict(['feature1', 'feature2'])
print("Train predictions:", y_pred_train)

# モデルの保存
iso_for.save_model('isolation_forest_model.pkl')

# グラフ描画
plot_isolationforest_results(iso_for.model,X_train=df_train[['feature1', 'feature2']].values, y_pred_train=y_pred_train)

新しいデータで異常検知するには

IsolationForestUtilのインスタンス生成時にモデルファイルのパスを指定します。load_modelメソッドで後からモデルファイルを読み込むことも可能です。
predictメソッドにカラムを指定し、df引数にDataFrameを指定することで、新たなデータで異常検知が行えます。

import numpy as np
import pandas as pd
from isolation_forest import IsolationForestUtil,plot_isolationforest_results

# 学習データの生成
np.random.seed(42)
X_train_data = 0.5 * np.random.randn(100, 2)
X_train_data = np.r_[X_train_data + 1, X_train_data - 1, X_train_data]
df_train = pd.DataFrame(X_train_data, columns=['feature1', 'feature2'])

# インスタンスを生成
iso_for = IsolationForestUtil(df_train)

# モデルを学習
iso_for.fit(['feature1', 'feature2'], contamination=0.05, random_state=42)

# 学習データで異常検知を実施
y_pred_train = iso_for.predict(['feature1', 'feature2'])
print("Train predictions:", y_pred_train)

# テストデータを生成
np.random.seed(20)
X_test_data = np.random.uniform(low=-4, high=4, size=(20, 2))
df_test = pd.DataFrame(X_test_data, columns=['feature1', 'feature2'])

# テストデータで異常検知を実施
y_pred_test = iso_for.predict(['feature1', 'feature2'],df=df_test)
print("Test predictions:", y_pred_test)

# 学習データとテストデータの検知結果を合わせてグラフ描画
plot_isolationforest_results(iso_for.model,df_train[['feature1', 'feature2']].values, y_pred_train,df_test[['feature1', 'feature2']].values, y_pred_test)

モデル作成時にテストデータによる評価を行うには

基本的には前述の処理を続けて行えばよいのですが、少しだけ処理を簡素化しています。

import numpy as np
import pandas as pd
from isolation_forest import IsolationForestUtil,plot_isolationforest_results

# 学習データの生成
np.random.seed(42)
X_train_data = 0.5 * np.random.randn(100, 2)
X_train_data = np.r_[X_train_data + 1, X_train_data - 1, X_train_data]
df_train = pd.DataFrame(X_train_data, columns=['feature1', 'feature2'])

# インスタンスを生成
iso_for = IsolationForestUtil(df_train)

# モデルを学習
iso_for.fit(['feature1', 'feature2'], contamination=0.05, random_state=42)

# 学習データで異常検知を実施
y_pred_train = iso_for.predict(['feature1', 'feature2'])
print("Train predictions:", y_pred_train)

# テストデータを生成
np.random.seed(20)
X_test_data = np.random.uniform(low=-4, high=4, size=(20, 2))
df_test = pd.DataFrame(X_test_data, columns=['feature1', 'feature2'])

# テストデータで異常検知を実施
y_pred_test = iso_for.predict(['feature1', 'feature2'],df=df_test)
print("Test predictions:", y_pred_test)

# 学習データとテストデータの検知結果を合わせてグラフ描画
plot_isolationforest_results(iso_for.model,df_train[['feature1', 'feature2']].values, y_pred_train,df_test[['feature1', 'feature2']].values, y_pred_test)

1次元の異常検知をするには

IsolationForestUtilクラスの使い方は前述と全く同じですが、グラフ描画は plot_ocsvm_results_1d を使用します。

import numpy as np
import pandas as pd
from isolation_forest  import IsolationForestUtil, plot_isolationforest_results_1d

# 学習データの生成
np.random.seed(42)
X_train_data = 0.5 * np.random.randn(100).reshape(-1, 1)
# 1次元データを2次元配列に変換
df_train = pd.DataFrame(X_train_data, columns=['feature1'])

# インスタンスを生成
iso_for = IsolationForestUtil(df_train)

# モデルを学習
iso_for.fit(['feature1'],contamination=0.05, random_state=42)

# 学習データで異常検知を実施
y_pred_train = iso_for.predict(['feature1'])
print("Train predictions:", y_pred_train)

# テストデータを生成
np.random.seed(20)
X_test_data = np.random.uniform(low=-4, high=4, size=(20,)).reshape(-1, 1)
df_test = pd.DataFrame(X_test_data, columns=['feature1'])

# テストデータで異常検知を実施
y_pred_test = iso_for.predict(['feature1'], df=df_test)
print("Test predictions:", y_pred_test)

# 1次元用の関数を使ってグラフ描画
plot_isolationforest_results_1d(
    iso_for.model,  # 学習したモデルを直接使用
    df_train[['feature1']].values,
    y_pred_train,
    df_test[['feature1']].values,
    y_pred_test
)

IsolationForestUtilクラスとグラフ描画関数リファレンス

One-Class SVMのグラフ描画関数説明
plot_isolationforest_results_1d(
model,
X_train=None,
y_pred_train=None,
X_test=None,
y_pred_test=None
)
1次元データのIsolationForestの結果をプロットする。学習データとテストデータを表示し、決定境界を描画する。
plot_isolationforest_results(
model,
X_train=None,
y_pred_train=None,
X_test=None,
y_pred_test=None
)
2次元データのIsolationForestの結果をプロットする。学習データとテストデータを表示し、決定境界を描画する。
IsolationForestUtilのメソッド説明
__init__(df=None, model_path=None)クラスの初期化メソッド。データフレームとモデルパスを受け取る。
fit(column, nu=0.1, gamma="auto", kernel="rbf")IsolationForestモデルを作成・学習するメソッド。指定したカラムを使用。
predict(column, df=None)学習したモデルを使用して予測を行うメソッド。指定したカラムを使用。
read_csv(file_name, encoding="shift-jis")CSVファイルからデータを読み込むメソッド。
save_model(model_path)学習したモデルをファイルに保存するメソッド。
load_model(model_path=None)モデルをファイルから読み込むメソッド。

IsolationForestUtilクラスとグラフ描画関数のソースコード

import numpy as np
import pandas as pd
from sklearn.ensemble import IsolationForest
import pickle
from sklearn.model_selection import GridSearchCV
import matplotlib.pyplot as plt
from matplotlib import rcParams
rcParams['font.family'] = 'Meiryo'

def plot_isolationforest_results_1d(model, X_train=None, y_pred_train=None, X_test=None, y_pred_test=None):
    """
    1次元データのIsolation Forestの結果をプロットする。

    学習データとテストデータを表示し、正常データと異常と検出されたデータを色分けしてプロットする。
    また、決定境界を描画する。

    Parameters:
    model : IsolationForestモデル
        学習済みのIsolationForestモデル。
    X_train : array-like, shape (n_samples,), optional
        学習データの特徴量。1次元データを想定。
    y_pred_train : array-like, shape (n_samples,), optional
        学習データに対する予測結果。1(正常)または-1(異常)。
    X_test : array-like, shape (n_samples,), optional
        テストデータの特徴量。1次元データを想定。
    y_pred_test : array-like, shape (n_samples,), optional
        テストデータに対する予測結果。1(正常)または-1(異常)。
    """
    plt.figure(figsize=(8, 6))

    # 学習データのプロット
    if X_train is not None and y_pred_train is not None:
        plt.scatter(X_train[y_pred_train == 1], np.zeros_like(X_train[y_pred_train == 1]),
                    c='lightblue', edgecolors='k', s=70, label="正常データ")
        plt.scatter(X_train[y_pred_train == -1], np.zeros_like(X_train[y_pred_train == -1]),
                    c='orange', edgecolors='k', s=70, label="異常データ(学習データ)")

    # テストデータのプロット
    if X_test is not None and y_pred_test is not None:
        plt.scatter(X_test[y_pred_test == 1], np.zeros_like(X_test[y_pred_test == 1]),
                    c='green', edgecolors='k', s=70, label="正常データ(テストデータ)")
        plt.scatter(X_test[y_pred_test == -1], np.zeros_like(X_test[y_pred_test == -1]),
                    c='red', edgecolors='k', s=70, label="異常データ(テストデータ)")

    plt.axhline(0, color='black', linewidth=2)

    # 決定境界を描画
    if X_train is not None or X_test is not None:
        x_range = np.linspace(X_train.min() - 1, X_train.max() + 1, 500).reshape(-1, 1) if X_train is not None else \
                  np.linspace(X_test.min() - 1, X_test.max() + 1, 500).reshape(-1, 1)

        Z = model.decision_function(x_range)
        plt.plot(x_range, Z, color='black', linewidth=2, label='決定境界')

    # グラフの設定
    plt.title("Isolation Forest 異常検知(1次元)", fontsize=16)
    plt.xlabel("特徴量", fontsize=12)
    plt.ylabel("値", fontsize=12)
    plt.grid(True, linestyle='--', alpha=0.7)
    plt.legend()
    plt.show()


def plot_isolationforest_results(model, X_train=None, y_pred_train=None, X_test=None, y_pred_test=None):
    """
    2次元データのIsolationForestの結果をプロットする。

    学習データとテストデータを表示し、正常データと異常と検出されたデータを色分けしてプロットする。
    また、決定境界を描画する。

    Parameters:
    model : IsolationForestモデル
        学習済みのIsolationForestモデル
    X_train : array-like, shape (n_samples, 2), optional
        学習データの特徴量。2次元データを想定。
    y_pred_train : array-like, shape (n_samples,), optional
        学習データに対する予測結果。1(正常)または-1(異常)。
    X_test : array-like, shape (n_samples, 2), optional
        テストデータの特徴量。2次元データを想定。
    y_pred_test : array-like, shape (n_samples,), optional
        テストデータに対する予測結果。1(正常)または-1(異常)。
    """
    plt.figure(figsize=(8, 6))

    # 学習データのプロット
    if X_train is not None and y_pred_train is not None and X_train.ndim > 1:
        plt.scatter(X_train[y_pred_train == 1][:, 0], X_train[y_pred_train == 1][:, 1],
                    c='lightblue', edgecolors='k', s=70, label="正常データ")
        plt.scatter(X_train[y_pred_train == -1][:, 0], X_train[y_pred_train == -1][:, 1],
                    c='orange', edgecolors='k', s=70, label="異常データ(学習データ)")

    # テストデータのプロット
    if X_test is not None and y_pred_test is not None and X_test.ndim > 1:
        plt.scatter(X_test[y_pred_test == 1][:, 0], X_test[y_pred_test == 1][:, 1],
                    c='green', edgecolors='k', s=70, label="正常データ(テストデータ)")
        plt.scatter(X_test[y_pred_test == -1][:, 0], X_test[y_pred_test == -1][:, 1],
                    c='red', edgecolors='k', s=70, label="異常データ(テストデータ)")

    # 決定境界の描画
    if X_train is not None and X_train.ndim > 1:
        xx, yy = np.meshgrid(np.linspace(X_train[:, 0].min() - 1, X_train[:, 0].max() + 1, 500),
                             np.linspace(X_train[:, 1].min() - 1, X_train[:, 1].max() + 1, 500))
        x_range = np.c_[xx.ravel(), yy.ravel()]

        Z = model.decision_function(x_range)
        Z = Z.reshape(xx.shape)

        plt.contour(xx, yy, Z, levels=[0], linewidths=2, colors='black')
        plt.contourf(xx, yy, Z, levels=np.linspace(Z.min(), 0, 7), cmap=plt.cm.Blues_r, alpha=0.3)
    elif X_test is not None and X_test.ndim > 1:
        xx, yy = np.meshgrid(np.linspace(X_test[:, 0].min() - 1, X_test[:, 0].max() + 1, 500),
                             np.linspace(X_test[:, 1].min() - 1, X_test[:, 1].max() + 1, 500))
        x_range = np.c_[xx.ravel(), yy.ravel()]

        Z = model.decision_function(x_range)
        Z = Z.reshape(xx.shape)

        plt.contour(xx, yy, Z, levels=[0], linewidths=2, colors='black')
        plt.contourf(xx, yy, Z, levels=np.linspace(Z.min(), 0, 7), cmap=plt.cm.Blues_r, alpha=0.3)

    plt.legend()
    plt.title("Isolation Forest 異常検知と決定境界", fontsize=16)
    plt.xlabel("特徴量 1", fontsize=12)
    plt.ylabel("特徴量 2", fontsize=12)
    plt.grid(True, linestyle='--', alpha=0.7)
    plt.show()

class IsolationForestUtil:
    def __init__(self, df=None, model_path=None):
        """
        IsolationForestModelクラスの初期化メソッド
        """
        self.df = None if df is None else df.copy()
        self.model = None if model_path is None else self.load_model(model_path)
        self.pred = []

    def fit(self, column, n_estimators=100, max_samples="auto",max_features=1.0,contamination="auto", random_state=None):
        """
        Isolation Forestモデルの作成と学習
        """

        # 指定されたカラムのデータを抽出
        if isinstance(column, list) and len(column) > 1:
            X = self.df[column].values
        else:
            X = self.df[column].values.reshape(-1, 1)  # 複数カラムのデータを2次元配列に変換

        self.model = IsolationForest(n_estimators=n_estimators, max_samples=max_samples,max_features=max_features,
                                     contamination=contamination, random_state=random_state).fit(X)
        return self.model

    def predict(self, column, df=None):
        """
        学習したモデルを使用して予測を行う
        """
        if df is not None:
            self.df = df

        # 指定されたカラムのデータを抽出
        if isinstance(column, list) and len(column) > 1:
            X = self.df[column].values
        else:
            X = self.df[column].values.reshape(-1, 1)  # 複数カラムのデータを2次元配列に変換

        self.pred = self.model.predict(X)
        return self.pred

    def read_csv(self, file_name, encoding="shift-jis"):
        self.df = pd.read_csv(file_name, encoding=encoding)

    def save_model(self, model_path):
        self.model_path = model_path
        with open(model_path, 'wb') as f:
            pickle.dump(self.model, f)

    def load_model(self, model_path=None):
        model_path = model_path if model_path else self.model_path
        with open(model_path, 'rb') as f:
            self.model = pickle.load(f)
        return self.model

まとめ

Isolation Forestは、異常検知に非常に効果的なアルゴリズムであり、特に製造業において設備や品質管理の自動化で活用されています。

高速かつ高精度な異常検知が可能なため、多くの企業で導入が進んでいます。

この記事が、皆さんの異常検知に役立てていただければ幸いです。

よかったらシェアしてね!
  • URLをコピーしました!
  • URLをコピーしました!

この記事を書いた人

コメント

コメントする

目次