cdxy.me
Footprints on Cyber Security.

Exponential Weighted Moving Average(EWMA)

  • EWMA用于平滑:1) 计算效率高 2) 时间较近的观测值权重较大,更好地反应时序变化趋势。
  • weights (1-alpha)**(n-1), (1-alpha)**(n-2), …, 1-alpha, 1.
import pandas as pd
df = pd.read_csv('../STOCK_HMM/company_data/AAPL.csv') # AAPL现货数据
df.head(1)
date open high low close volume adj_close
0 2009-01-02 85.88 91.04 85.16 90.75 186503800 12.23
# window = 60

# df_plot = pd.DataFrame()
# df_plot['close'] = df['close'][0:1000]
# df_plot['MA60'] = df_plot['close'].rolling(window=window).mean() # 移动平均
# df_plot['EWMA60'] = df_plot['close'].ewm(span=window,ignore_na=True).mean() # 指数加权平均
# df_plot.plot(figsize=(16,4))

png

Anomaly Detection in Time Series Data Using LSTMs and Automatic Thresholding

流程概述

  1. 滑动窗口+LSTM学习时序行为,并给出下一步预测。(LSTM特性适用于与时间序列、非线性数据流记忆)
  2. 求预测值与真实观测值的误差。(量化异常程度)
  3. 对误差使用EWMA平滑。(抑制基于LSTM的预测中经常出现的误差峰值)
  4. 采用动态阈值方法判定该误差是否为异常。(快速、不需要高斯分布假设)

总体是无监督的过程,适用于难以取得标注样本的单通道时序异常检测。

动态误差阈值

Step 1:误差和平滑

每一步 t 产生一个预测值 ŷ (t),计算预测误差为 e(t)=|y(t)−ŷ (t)|,其中 y(t)=x(t+1)i,其中 i 对应于真实值的维数,将每一个 e(t) 附加到一个一维误差向量上,构成误差集合:

e=[e(t−h),...,e(t−ls),...,e(t−1),e(t)]

然后通过EWMA平滑构成e_s

Step 2:对误差序列动态计算阈值

  • 第一个公式是异常检测中常见的评估方法,如3-sigma方法即是设定Z=3。这里可以将阈值e通过z=[2,10]计算出一批备选。
  • 然后通过第二个公式找到右式最大时的e作为最终阈值。此中分子以"阈值滤出异常后的平滑误差均值和标准差下降最大的百分比"表达异常点的影响,分母以"滤除的异常值数量、滤除的异常序列数量"作为惩罚项防止过度贪心。

png

实验

Load

import csv
reader = csv.DictReader(open("labeled_anomalies.csv", "rU"))
/root/anaconda3/lib/python3.7/site-packages/ipykernel_launcher.py:1: DeprecationWarning: 'U' mode is deprecated
  """Entry point for launching an IPython kernel.
import pandas as pd
df = pd.read_csv("labeled_anomalies.csv")
df.head()
chan_id spacecraft anomaly_sequences class num_values
0 P-1 SMAP [[1899, 2099], [4286, 4594], [3289, 3529]] [contextual, contextual, contextual] 8505
1 S-1 SMAP [[5050, 5497]] [point] 7331
2 E-1 SMAP [[4750, 4780], [5360, 5836]] [contextual, contextual] 8516
3 E-2 SMAP [[5348, 6745]] [point] 8532
4 E-3 SMAP [[4844, 8124]] [point] 8307

Use channel P-1 for demo

CHIN_ID = 'P-1'
import numpy as np
import os

def load_train_data(chan_id):
    return np.load(os.path.join("data", "train", chan_id + ".npy"))    

def load_test_data(chan_id):
    return np.load(os.path.join("data", "test", chan_id + ".npy"))    

train_data = load_train_data(CHIN_ID)
test_data = load_test_data(CHIN_ID)
class config:
    # num previous timesteps provided to model to predict future values
    l_s = 250

    # number of steps ahead to predict
    n_predictions = 10

    # for reload data 
    use_id = "2018-05-19_15.00.10"
def shape_data(arr, train=True):

    # 滑动窗口采样
    data = [] 
    for i in range(len(arr) - config.l_s - config.n_predictions):
        data.append(arr[i:i + config.l_s + config.n_predictions])
    data = np.array(data) 

    assert len(data.shape) == 3

    if train == True:
        np.random.shuffle(data)

    # 每个窗口切分出X和y
    X = data[:,:-config.n_predictions,:]
    y = data[:,-config.n_predictions:,0] #telemetry value is at position 0

    return X,y
train_X,train_y = shape_data(train_data,train=True)
test_X,test_y = shape_data(test_data,train=False)
train_X.shape
(2612, 250, 25)

Train LSTM model

from keras.models import Sequential, load_model
from keras.callbacks import History, EarlyStopping, Callback
from keras.layers.recurrent import LSTM
from keras.layers.core import Dense, Activation, Dropout
import numpy as np
import os

model = Sequential()
model.add(LSTM(80,input_shape=(None, train_X.shape[2]),return_sequences=True))
model.add(Dropout(0.3))
model.add(LSTM(80,return_sequences=False))
model.add(Dropout(0.3))
model.add(Dense(10)) 
model.add(Activation("linear"))
Using TensorFlow backend.


WARNING:tensorflow:From /root/anaconda3/lib/python3.7/site-packages/keras/backend/tensorflow_backend.py:66: The name tf.get_default_graph is deprecated. Please use tf.compat.v1.get_default_graph instead.

WARNING:tensorflow:From /root/anaconda3/lib/python3.7/site-packages/keras/backend/tensorflow_backend.py:541: The name tf.placeholder is deprecated. Please use tf.compat.v1.placeholder instead.

WARNING:tensorflow:From /root/anaconda3/lib/python3.7/site-packages/keras/backend/tensorflow_backend.py:4432: The name tf.random_uniform is deprecated. Please use tf.random.uniform instead.

WARNING:tensorflow:From /root/anaconda3/lib/python3.7/site-packages/keras/backend/tensorflow_backend.py:148: The name tf.placeholder_with_default is deprecated. Please use tf.compat.v1.placeholder_with_default instead.

WARNING:tensorflow:From /root/anaconda3/lib/python3.7/site-packages/keras/backend/tensorflow_backend.py:3733: calling dropout (from tensorflow.python.ops.nn_ops) with keep_prob is deprecated and will be removed in a future version.
Instructions for updating:
Please use `rate` instead of `keep_prob`. Rate should be set to `rate = 1 - keep_prob`.
model.compile(loss='mse', optimizer='adam') 
model.summary()
WARNING:tensorflow:From /root/anaconda3/lib/python3.7/site-packages/keras/optimizers.py:793: The name tf.train.Optimizer is deprecated. Please use tf.compat.v1.train.Optimizer instead.

Model: "sequential_1"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
=================================================================
lstm_1 (LSTM)                (None, None, 80)          33920     
_________________________________________________________________
dropout_1 (Dropout)          (None, None, 80)          0         
_________________________________________________________________
lstm_2 (LSTM)                (None, 80)                51520     
_________________________________________________________________
dropout_2 (Dropout)          (None, 80)                0         
_________________________________________________________________
dense_1 (Dense)              (None, 10)                810       
_________________________________________________________________
activation_1 (Activation)    (None, 10)                0         
=================================================================
Total params: 86,250
Trainable params: 86,250
Non-trainable params: 0
_________________________________________________________________
# early stop condition.
cbs = [History(), EarlyStopping(monitor='val_loss', min_delta=0.0003, verbose=0)]
# train model
model.fit(train_X, train_y, 
          batch_size=64, 
          epochs=35, 
          validation_split=0.2, 
          callbacks=cbs,
          verbose=True)
Train on 2089 samples, validate on 523 samples
Epoch 1/35
2089/2089 [==============================] - 16s 8ms/step - loss: 0.1118 - val_loss: 0.0911
Epoch 2/35
2089/2089 [==============================] - 16s 8ms/step - loss: 0.1051 - val_loss: 0.0905
Epoch 3/35
2089/2089 [==============================] - 16s 8ms/step - loss: 0.1007 - val_loss: 0.0854
Epoch 4/35
2089/2089 [==============================] - 16s 8ms/step - loss: 0.0950 - val_loss: 0.0806
Epoch 5/35
2089/2089 [==============================] - 16s 8ms/step - loss: 0.0899 - val_loss: 0.0805





<keras.callbacks.History at 0x7f1970220048>
model.save(CHIN_ID + ".h5")
# model = load_model(CHIN_ID + ".h5")
WARNING:tensorflow:From /root/anaconda3/lib/python3.7/site-packages/keras/backend/tensorflow_backend.py:190: The name tf.get_default_session is deprecated. Please use tf.compat.v1.get_default_session instead.

WARNING:tensorflow:From /root/anaconda3/lib/python3.7/site-packages/tensorflow/python/ops/math_grad.py:1250: add_dispatch_support.<locals>.wrapper (from tensorflow.python.ops.array_ops) is deprecated and will be removed in a future version.
Instructions for updating:
Use tf.where in 2.0, which has the same broadcast rule as np.where

predict y_hat

y_hat = np.array([])
# num previous timesteps provided to model to predict future values
l_s = 250 

# number of values to evaluate in each batch
batch_size = 70 

# number of steps ahead to predict
n_predictions = 10
num_batches = int((test_y.shape[0] - l_s) / batch_size)
for i in range(1, num_batches+2):
    prior_idx = (i-1) * batch_size
    idx = i * batch_size
    if i == num_batches+1:
        idx = test_y.shape[0] #remaining values won't necessarily equal batch size

    X_test_period = test_X[prior_idx:idx]

    y_hat_period = model.predict(X_test_period)

    # map predictions n steps ahead to their corresponding timestep
    # TODO: vectorize
    final_y_hat = []
    for t in range(len(y_hat_period)+n_predictions):
        y_hat_t = []
        for j in range(n_predictions):
            if t - j >= 0 and t-j < len(y_hat_period):
                y_hat_t.append(y_hat_period[t-j][j])
        if t < len(y_hat_period):
            if y_hat_t.count(0) == len(y_hat_t):
                final_y_hat.append(0)
            else:
                final_y_hat.append(y_hat_t[0]) # first prediction


    y_hat_period = np.array(final_y_hat).reshape(len(final_y_hat),1)
    y_hat = np.append(y_hat, y_hat_period)

y_hat = np.reshape(y_hat, (y_hat.size,))
y_hat
array([-0.50184548, -0.44260275, -0.25634772, ..., -0.26641721,
       -0.10017672,  0.07764894])
np.save(CHIN_ID + ".y_hat.npy",y_hat)

Calc Predict Error

print(y_hat.shape)
print(test_y.shape)
(8245,)
(8245, 10)
# bare error
e = [abs(y_h-y_t[0]) for y_h,y_t in zip(y_hat, test_y)]
# pd.DataFrame(e).plot()

png

Calc Smoothed Error

# number of trailing batches to use in error calculation
window_size = 30

# determines window size used in EWMA smoothing (percentage of total values for channel)
smoothing_perc = 0.05

smoothing_window = int(batch_size * window_size * smoothing_perc)
smoothing_window
105
np.mean(e)
0.18694956205081267
smoothing_window
105
# EWMA
e_s = list(pd.DataFrame(e).ewm(span=smoothing_window).mean().values.flatten())
np.mean(e_s)
0.1868574605984681
l_s
250
# for values at beginning < sequence length, just use avg
e_s[:l_s] = [np.mean(e_s[:l_s*2])]*l_s 
# _df = pd.DataFrame()
# _df['error'] = e
# _df['smoothed_error'] = e_s
# _df.plot(figsize=(16,4))

png

np.save(CHIN_ID + ".e_s.npy",e_s)
# anom["normalized_error"] = np.mean(e) / np.ptp(y_test)
# logger.info("normalized prediction error: %s" %anom["normalized_error"])

Process Errors

window_size = 30
batch_size = 70

i_anom = [] # anomaly indices
num_windows = int((test_y.shape[0] - (batch_size*window_size)) / batch_size)
# decrease the historical error window size (h) if number of test values is limited
while num_windows < 0:
    window_size -= 1
    if window_size <= 0:
        window_size = 1
    num_windows = int((test_y.shape[0] - (batch_size*window_size)) / batch_size)
def find_epsilon(e_s, error_buffer, sd_lim=12.0):

    e_s = np.array(e_s)
    mean = np.mean(e_s)
    sd = np.std(e_s)

    max_s = 0
    sd_threshold = sd_lim # default if no winner or too many anomalous ranges

    for z in np.arange(2.5, sd_lim, 0.5):
        epsilon = mean + (sd*z)
        pruned_e_s, pruned_i, i_anom  = [], [], []

        for i,e in enumerate(e_s):
            if e < epsilon:
                pruned_e_s.append(e)
                pruned_i.append(i)
            if e > epsilon:
                for j in range(0, error_buffer):
                    if not i + j in i_anom and not i + j >= len(e_s):
                        i_anom.append(i + j)
                    if not i - j in i_anom and not i - j < 0:
                        i_anom.append(i - j)

        if len(i_anom) > 0:
            # preliminarily group anomalous indices into continuous sequences (# sequences needed for scoring)
            i_anom = sorted(list(set(i_anom)))
            groups = [list(group) for group in mit.consecutive_groups(i_anom)]
            E_seq = [(g[0], g[-1]) for g in groups if not g[0] == g[-1]]

            perc_removed = 1.0 - (float(len(pruned_e_s)) / float(len(e_s)))
            mean_perc_decrease = (mean - np.mean(pruned_e_s)) / mean
            sd_perc_decrease = (sd - np.std(pruned_e_s)) / sd
            s = (mean_perc_decrease + sd_perc_decrease) / (len(E_seq)**2 + len(i_anom))

            # sanity checks
            if s >= max_s and len(E_seq) <= 5 and len(i_anom) < (len(e_s)*0.5):
                sd_threshold = z
                max_s = s

    return sd_threshold #multiply by sd to get epsilon
'''
Find anomalous sequences of smoothed error values that are above error threshold (epsilon). Both
    smoothed errors and the inverse of the smoothed errors are evaluated - large dips in errors often
    also indicate anomlies.
'''
def get_anomalies(e_s, y_test, z, window, i_anom_full, len_y_test):

    perc_high, perc_low = np.percentile(y_test, [95,5])
    inter_range = perc_high - perc_low

    mean = np.mean(e_s)
    std = np.std(e_s)
    chan_std = np.std(y_test)

    e_s_inv = [mean + (mean-e) for e in e_s] #flip it around the mean
    z_inv = find_epsilon(e_s_inv, error_buffer)

    epsilon = mean + (float(z)*std)
    epsilon_inv = mean + (float(z_inv)*std)

    # find sequences of anomalies greater than epsilon
    E_seq, i_anom, non_anom_max = compare_to_epsilon(e_s, epsilon, len_y_test,
        inter_range, chan_std, std, error_buffer, window, i_anom_full)

    # find sequences of anomalies using inverted error values (lower than normal errors are also anomalous)
    E_seq_inv, i_anom_inv, inv_non_anom_max = compare_to_epsilon(e_s_inv, epsilon_inv, 
        len_y_test, inter_range, chan_std, std, error_buffer, window, i_anom_full)

    if len(E_seq) > 0:
        i_anom = prune_anoms(E_seq, e_s, non_anom_max, i_anom)

    if len(E_seq_inv) > 0:
        i_anom_inv = prune_anoms(E_seq_inv, e_s_inv, inv_non_anom_max, i_anom_inv)

    i_anom = list(set(i_anom + i_anom_inv))

    return i_anom
def compare_to_epsilon(e_s, epsilon, len_y_test, inter_range, chan_std, 
    std, error_buffer, window, i_anom_full):
    '''Compare smoothed error values to epsilon (error threshold) and group consecutive errors together into 
    sequences. 

    Args:
        e_s (list): smoothed errors between y_test and y_hat values
        epsilon (float): Threshold for errors above which an error is considered anomalous
        len_y_test (int): number of timesteps t in test data
        inter_range (tuple of floats): range between 5th and 95 percentile values of error values
        chan_std (float): standard deviation on test values
        std (float): standard deviation of smoothed errors
        error_buffer (int): number of values surrounding anomalous errors to be included in anomalous sequence
        window (int): Count of number of error windows that have been processed
        i_anom_full (list): list of all previously identified anomalies in test set

    Returns:
        E_seq (list of tuples): contains start and end indices of anomalous ranges
        i_anom (list): indices of errors that are part of an anomlous sequnce
        non_anom_max (float): highest smoothed error value below epsilon
    '''

    i_anom = []
    E_seq = []
    non_anom_max = 0

    # Don't consider anything in window because scale of errors too small compared to scale of values   
    if not (std > (.05*chan_std) or max(e_s) > (.05 * inter_range)) or not max(e_s) > 0.05:    
        return E_seq, i_anom, non_anom_max

    # ignore initial error values until enough history for smoothing, prediction, comparisons
    num_to_ignore = l_s * 2
    # if y_test is small, ignore fewer
    if len_y_test < 2500: 
        num_to_ignore = l_s
    if len_y_test < 1800: 
        num_to_ignore = 0

    for x in range(0, len(e_s)):

        anom = True
        if not e_s[x] > epsilon or not e_s[x] > 0.05 * inter_range:
            anom = False

        if anom:
            for b in range(0, error_buffer):
                if not x + b in i_anom and not x + b >= len(e_s) and ((x + b) >= len(e_s) - batch_size or window == 0):
                    if not (window == 0 and x + b < num_to_ignore):
                        i_anom.append(x + b)
                # only considering new batch of values added to window, not full window
                if not x - b in i_anom and ( (x - b) >= len(e_s) - batch_size or window == 0):
                    if not (window == 0 and x - b < num_to_ignore):
                        i_anom.append(x - b)

    # capture max of values below the threshold that weren't previously identified as anomalies 
    # (used in filtering process)
    for x in range(0, len(e_s)):
        adjusted_x = x + window*batch_size
        if e_s[x] > non_anom_max and not adjusted_x in i_anom_full and not x in i_anom:
            non_anom_max = e_s[x]

    # group anomalous indices into continuous sequences
    i_anom = sorted(list(set(i_anom)))
    groups = [list(group) for group in mit.consecutive_groups(i_anom)]
    E_seq = [(g[0], g[-1]) for g in groups if not g[0] == g[-1]]

    return E_seq, i_anom, non_anom_max
def prune_anoms(E_seq, e_s, non_anom_max, i_anom):
    '''Remove anomalies that don't meet minimum separation from the next closest anomaly or error value

    Args:
        E_seq (list of lists): contains start and end indices of anomalous ranges
        e_s (list): smoothed errors between y_test and y_hat values
        non_anom_max (float): highest smoothed error value below epsilon
        i_anom (list): indices of errors that are part of an anomlous sequnce

    Returns:
        i_pruned (list): remaining indices of errors that are part of an anomlous sequnces 
            after pruning procedure  
    '''

    E_seq_max, e_s_max = [], []
    for e_seq in E_seq:
        if len(e_s[e_seq[0]:e_seq[1]]) > 0:
            E_seq_max.append(max(e_s[e_seq[0]:e_seq[1]]))
            e_s_max.append(max(e_s[e_seq[0]:e_seq[1]]))
    e_s_max.sort(reverse=True)

    if non_anom_max and non_anom_max > 0:
        e_s_max.append(non_anom_max) # for comparing the last actual anomaly to next highest below epsilon

    i_to_remove = []

    # minimum percent decrease between max errors in anomalous sequences (used for pruning)
    p = 0.13

    for i in range(0,len(e_s_max)):
        if i+1 < len(e_s_max): 
            if (e_s_max[i] - e_s_max[i+1]) / e_s_max[i] < p:
                i_to_remove.append(E_seq_max.index(e_s_max[i]))
                # p += 0.03 # increase minimum separation by this amount for each step further from max error
            else:
                i_to_remove = []
    for idx in sorted(i_to_remove, reverse=True):    
        del E_seq[idx]



    i_pruned = []
    for i in i_anom:
        keep_anomaly_idx = False

        for e_seq in E_seq:
            if i >= e_seq[0] and i <= e_seq[1]:
                keep_anomaly_idx = True

        if keep_anomaly_idx == True:
            i_pruned.append(i)

    return i_pruned
# Identify anomalies for each new batch of values
import more_itertools as mit

for i in range(1, num_windows+2):
    prior_idx = (i-1) * (batch_size)
    idx = (window_size * batch_size) + ((i-1) * batch_size)

    if i == num_windows+1:
        idx = test_y.shape[0]

    window_e_s = e_s[prior_idx:idx]
    window_y_test = test_y[prior_idx:idx]

    error_buffer = 100

    epsilon = find_epsilon(window_e_s, error_buffer)
    window_anom_indices = get_anomalies(window_e_s, window_y_test, epsilon, i-1, i_anom, len(test_y))

    # update indices to reflect true indices in full set of values (not just window)
    i_anom.extend([i_a + (i-1)*batch_size for i_a in window_anom_indices])
# group anomalous indices into continuous sequences
i_anom = sorted(list(set(i_anom)))
groups = [list(group) for group in mit.consecutive_groups(i_anom)]
E_seq = [(g[0], g[-1]) for g in groups if not g[0] == g[-1]]
E_seq
[(4270, 4479)]
# calc anomaly scores based on max distance from epsilon for each sequence
anom_scores = []
for e_seq in E_seq:
    score = max([abs(e_s[x] - epsilon) / (np.mean(e_s) + np.std(e_s)) for x in range(e_seq[0], e_seq[1])])
    anom_scores.append(score)
anom_scores
[11.603910939266248]

Evaluate

def evaluate_sequences(E_seq, anom):
    '''Compare identified anomalous sequences with labeled anomalous sequences

    Args:
        E_seq (list of lists): contains start and end indices of anomalous ranges
        anom (dict): contains anomaly information for a given input stream

    Returns:
        anom (dict): with updated anomaly information (whether identified, scores, etc.)
    '''

    anom["false_positives"] = 0
    anom["false_negatives"] = 0
    anom["true_positives"] = 0
    anom["fp_sequences"] = []
    anom["tp_sequences"] = []
    anom["num_anoms"] = len(anom["anomaly_sequences"])   

    E_seq_test = eval(anom["anomaly_sequences"])

    if len(E_seq) > 0:

        matched_E_seq_test = []

        for e_seq in E_seq:

            valid = False

            for i, a in enumerate(E_seq_test):

                if (e_seq[0] >= a[0] and e_seq[0] <= a[1]) or (e_seq[1] >= a[0] and e_seq[1] <= a[1]) or\
                    (e_seq[0] <= a[0] and e_seq[1] >= a[1]) or (a[0] <= e_seq[0] and a[1] >= e_seq[1]):

                    anom["tp_sequences"].append(e_seq)

                    valid = True

                    if i not in matched_E_seq_test:
                        anom["true_positives"] += 1
                        matched_E_seq_test.append(i)

            if valid == False:
                anom["false_positives"] += 1
                anom["fp_sequences"].append([e_seq[0], e_seq[1]])

        anom["false_negatives"] += (len(E_seq_test) - len(matched_E_seq_test))

    else:
        anom["false_negatives"] += len(E_seq_test)

    return anom
anom = dict()
anom["normalized_error"] = np.mean(e) / np.ptp(test_y)
anom['scores'] = anom_scores
df[df['chan_id']=='P-1'].anomaly_sequences
0    [[1899, 2099], [4286, 4594], [3289, 3529]]
Name: anomaly_sequences, dtype: object
anom['anomaly_sequences'] = '[[1899, 2099], [4286, 4594], [3289, 3529]]'
anom
{'normalized_error': 0.09347478102540632,
 'scores': [11.603910939266248],
 'anomaly_sequences': '[[1899, 2099], [4286, 4594], [3289, 3529]]'}
evaluate_sequences(E_seq,anom)
{'normalized_error': 0.09347478102540632,
 'scores': [11.603910939266248],
 'anomaly_sequences': '[[1899, 2099], [4286, 4594], [3289, 3529]]',
 'false_positives': 0,
 'false_negatives': 2,
 'true_positives': 1,
 'fp_sequences': [],
 'tp_sequences': [(4270, 4479)],
 'num_anoms': 42}

Ref