Source code for anko.anomaly_detector

# -*- coding: utf-8 -*- 
import numpy as np
from . import stats_util
import copy
# TODO: add params and returns type in func entry

[docs]class AnomalyDetector: r""" Attributes: apply_policies (dict): Policies for AnomalyDetector to follow with. Args: scaleless_t (bool, default True): If True, use numpy.arange(1, len(t)+1) for the fitting. boxcox (bool, default False): If True, perform log-boxcox transformation before carrying out normal test. This will result in higher chances on selecting normal distribution method. z_normalization (bool, default True): If True, apply z-score normalization to fitting residual. This parameter is stringly advised to define threshold values in AnomalyDetector.thres_params scalelessly. info_criterion (str, default 'AIC'): Information criterion for selecting fitting ansatzs, allowed fields are 'AIC' or 'BIC'. abs_residual (bool, default False): If True, return absolute value of residual. full_return (bool, default True): If True, return named dictionary for fitting parameters, eilse return list of fitting parameters in the order that can be found in AnomalyDetector.models. min_sample_size (int, default 10): Minimum number of data samples to execute AnomalyDetector. If provided number of samples is less than this attribute, raise ValueError. thres_params (dict): Threshold values for selecting anomalous data. Args: p_normality (float, default 5e-3): Threshold value for selecting normal distribution, in accordance with the p value of normal test. normal_err (float, default 75): Threshold value for selecting normal distribution, in case that fitting on normal distribution failed and unconverged. normal_std_width (float, default 1.5): Threshold width of standard deviation, data points exceed this param will be regarded as anomalous. normal_std_err (float, default 1e+1): Maximum tolerence of convergence. If fitting error is larger than this param, pass ConvergenceError to CheckResult.extra_info. linregress_std_err (float, default 1e+1): Maximum tolerence of convergence. If fitting error is larger than this param, pass ConvergenceError to CheckResult.extra_info. linregress_res (float, default 2): Threshold value of residual for linear regression, data points exceed this param will be regarded as anomalous. step_func_err (float, default 1e+1): Maximum tolerence of convergence. If fitting error is larger than this param, pass ConvergenceError to CheckResult.extra_info. step_func_res (float, default 2.5): Threshold value of residual for general sign function, data points exceed this param will be regarded as anomalous. exp_decay_err (float, default 1e+1): Maximum tolerence of convergence. If fitting error is larger than this param, pass ConvergenceError to CheckResult.extra_info. exp_decay_res (float, default 2): Threshold value of residual for exponential function, data points exceed this param will be regarded as anomalous. skewness (float, default 20): Threshold value of skewness. If skewness of data distribution is larger than this param, pass Warning to CheckResult.extra_info. min_res (float, default 10): Absolute minimum value of residul, residuals that are smaller than this param will be masked into zero. This action is always performed before z-score normalizing the residual. models (dict): Models that can be considered by AnomalyDetector. Gaussian (Normal) Distribution .. math:: f(x) = a \exp\left(-\frac{\left(x-x_0\right)^2}{2\sigma^2}\right). Linear Regression .. math:: f(x) = intercept + slope \times x. Step Function .. math:: f(x) = \begin{cases} a, & x < x_0, \\ \frac{a+b}{2}, & x = x_0, \\ b, & x > x_0. \end{cases} Exponential Decay .. math:: f(x) = a\exp\left(-\alpha x\right). Args: gaussian (bool, default True): Gaussian (normal) distribution. Define in stats_util.normal_distr. half_gaussian (bool, default False): In development, unavailable for now. linear_regression (bool, default True): Linear ansatz. step_func (bool, default True): Generalize Heaviside step function. Define in stats_util.general_sgn. exp_decay (bool, default True): Exponential function. Define in stats_util.exp_decay. check_failed (bool): Boolean value if check failed. """ def __init__(self, t, series): """ Args: t (array_like): series (array_like): """ self.apply_policies = { "scaleless_t": True, "boxcox": False, "z_normalization": True, "info_criterion": 'AIC', "abs_residual": False, "full_return": False, "min_sample_size": 10 } if isinstance(t, list): t = np.array(t) if isinstance(series, list): series = np.array(series) if t is None: self.apply_policies["scaleless_t"] = True if self.apply_policies["scaleless_t"]: self.t = np.arange(1, len(series)+1) else: self.t = t self.series = series self._clone_t = copy.deepcopy(t) self._clone_series = copy.deepcopy(series) self.check_failed = True self.thres_params = { "p_normality": 5e-3, "normal_err": 75, "normal_std_width": 1.5, "normal_std_err": 1e+1, "linregress_std_err": 1e+1, "linregress_res": 2, "step_func_err": 1e+1, "step_func_res": 2.5, "exp_decay_err": 1e+1, "exp_decay_res": 2, "skewness": 20, "min_res": 10 } self.error_code = { "0": "Check passed.", "-1": "ConvergenceError: Gaussian fitting may not converge, std_err > std_err_th.", "-2": "Warning: Normal distribution may have skewed, skewness > skewness_th.", "-3": "ConvergenceError: General sign function fitting may not converge, perr > perr_th.", "-4": "ConvergenceError: Exponential fitting may not converge, perr > perr_th.", "-5": "ConvergenceError: Linear ansatz fitting may not converge, perr > perr_th.", "-6": "Warning: Rawdata might be oscillating, data flips sign repeatedly over mean.", "-7": "Info: AnomalyDetector is using boxcox method.", "-8": "Info: AnomalyDetector is using z normalization.", "-9": "Info: There are more than %d discontinuous points detected." } self.models = { "gaussian": True, "half_gaussian": False, "linear_regression": True, "step_func": True, "exp_decay": True } if len(series) < self.apply_policies["min_sample_size"]: raise ValueError("number of samples {} are less than apply_policies['min_sample_size'] = {}".format(len(series), self.apply_policies["min_sample_size"])) if len(t) != len(series): raise ValueError("shape {} does not match with shape {}.".format(len(t), len(series))) if self.apply_policies["info_criterion"] not in ["AIC", "BIC"]: raise ValueError("Information criterion can only be 'AIC' or 'BIC'.") def _build_stats_data(self): statsdata, ref, IC_score = {}, {}, {}; proceed = False try: normality = stats_util.normaltest(self.series) except ValueError: normality = [np.inf, np.inf] if normality[1] >= self.thres_params["p_normality"] and np.isfinite(normality[1]) and self.models["gaussian"]: if self.apply_policies["boxcox"]: self.series = stats_util.boxcox(self.series, lmbda=0) try: statsdata["model"] = 'gaussian' statsdata["popt"], statsdata["perr"] = stats_util.gaussian_fit(self.series) except: pass if "popt" in statsdata: err_score = np.sum(np.square(statsdata["perr"][1:])) if err_score > self.thres_params["normal_err"]: proceed = True else: proceed = True if proceed: if self.apply_policies["boxcox"]: self.series = copy.deepcopy(self._clone_series) for model_id, run_token in self.models.items(): if model_id == 'gaussian' or model_id == 'half_gaussian': continue if run_token is True: ref[model_id] = {} IC_score[model_id], ref[model_id]["popt"], ref[model_id]["perr"] = self._fitting_model(model_id) best_model = min(IC_score.items(), key=lambda x: x[1]) if "linear_regression" in IC_score.keys(): if np.isclose(best_model[1], IC_score["linear_regression"], atol=10, rtol=1e-2): best_model = "linear_regression" else: best_model = best_model[0] else: best_model = best_model[0] statsdata["popt"], statsdata["perr"] = ref[best_model]["popt"], ref[best_model]["perr"] if best_model == 'step_func': if ref[best_model]["popt"][1]-ref[best_model]["popt"][0] > 0: statsdata["model"] = "increase_step_func" else: statsdata["model"] = "decrease_step_func" else: statsdata["model"] = best_model return statsdata def _fitting_model(self, model_id: str): if model_id == 'linear_regression': r_sq, intercept, slope, p_value, std_err = stats_util.linear_regression(self.t, self.series) linregress_y_pred = np.polyval([slope,intercept], self.t) if self.apply_policies["info_criterion"] == 'AIC': IC_score = stats_util.AIC_score(self.series, linregress_y_pred, 2) elif self.apply_policies["info_criterion"] == 'BIC': IC_score = stats_util.BIC_score(self.series, linregress_y_pred, 2) popt, perr = np.array([intercept, slope]), std_err elif model_id == 'step_func': try: popt, perr = stats_util.general_sgn_fit(self.t, self.series) y_pred = stats_util.general_sgn(self.t, *popt.tolist()) except RuntimeError: popt = perr = np.inf * np.ones(3) y_pred = np.inf * np.ones(len(self.series)) if self.apply_policies["info_criterion"] == 'AIC': IC_score = stats_util.AIC_score(self.series, y_pred, len(popt)) elif self.apply_policies["info_criterion"] == 'BIC': IC_score = stats_util.BIC_score(self.series, y_pred, len(popt)) elif model_id == 'exp_decay': try: popt, perr = stats_util.exp_decay_fit(self.t, self.series) y_pred = stats_util.exp_decay(self.t, *popt.tolist()) except RuntimeError: popt = perr = np.inf * np.ones(2) y_pred = np.inf * np.ones(len(self.series)) if self.apply_policies["info_criterion"] == 'AIC': IC_score = stats_util.AIC_score(self.series, y_pred, len(popt)) elif self.apply_policies["info_criterion"] == 'BIC': IC_score = stats_util.BIC_score(self.series, y_pred, len(popt)) return IC_score, popt, perr
[docs] def check(self) -> object: """ Returns: CheckResult: check_result (CheckResult): """ statsdata = self._build_stats_data() model_id = statsdata["model"] anomalous_t, anomalous_data, res, msgs = [], [], [], [] if model_id == 'gaussian' or model_id == 'flat_histo': if statsdata["perr"][2] > self.thres_params["normal_std_err"]: msgs.append(self.error_code["-1"]) # Get anomalous data norm = np.std(self.series) mean_centered_series = self.series - np.mean(self.series) mean_centered_series[np.where(abs(mean_centered_series) < self.thres_params["min_res"])] = 0 z_normalized_series = mean_centered_series / norm anomalous_idx = abs(z_normalized_series) > self.thres_params["normal_std_width"] if np.count_nonzero(anomalous_idx) > 0: anomalous_data = self.series[anomalous_idx] anomalous_t = self._clone_t[anomalous_idx] res = z_normalized_series[anomalous_idx] histo_x, histo_y = stats_util.get_histogram(self.series) if abs(stats_util.skew(histo_y)) > self.thres_params["skewness"]: msgs.append(self.error_code["-2"]) elif model_id == "increase_step_func": err_score = np.sum(np.square(statsdata["perr"])) if err_score > self.thres_params["step_func_err"]: msgs.append(self.error_code["-3"]) res = stats_util.fitting_residual(self.t, self.series, stats_util.general_sgn, statsdata["popt"], mask_min=self.thres_params["min_res"], standardized=self.apply_policies["z_normalization"]) anomalous_t = self._clone_t[abs(res) > self.thres_params["step_func_res"]] anomalous_data = self.series[abs(res) > self.thres_params["step_func_res"]] res = res[abs(res) > self.thres_params["step_func_res"]] elif model_id == "decrease_step_func": err_score = np.sum(np.square(statsdata["perr"])) if err_score > self.thres_params["step_func_err"]: res = stats_util.fitting_residual(self.t, self.series, stats_util.general_sgn, statsdata["popt"], mask_min=self.thres_params["min_res"], standardized=self.apply_policies["z_normalization"]) anomalous_t = self._clone_t[abs(res) > self.thres_params["step_func_res"]] anomalous_data = self.series[abs(res) > self.thres_params["step_func_res"]] res = res[abs(res) > self.thres_params["step_func_res"]] msgs.append(self.error_code["-3"]) else: anomalous_idx = np.where(self.t > statsdata["popt"][2])[0] if len(anomalous_idx) != 0 and (statsdata["popt"][0]-statsdata["popt"][1]) > self.thres_params["min_res"]: anomalous_t = self._clone_t[anomalous_idx] anomalous_data = self.series[anomalous_idx] res = (statsdata["popt"][1]-statsdata["popt"][0]) * np.ones(len(anomalous_idx)) # ============================================================================= # elif model_id == 'three_stair': # err_score = np.sum(np.square(statsdata[key]["perr"])) # if err_score > self.thres_params["step_func_err"]: # msgs.append(self.dyError.getErrorText(16)) # t = np.arange(1, len(statsdata[key]["series"])+1) # res = stats_util.fitting_residual(t, statsdata[key]["series"], stats_util.three_stair_sgn, statsdata[key]["popt"]) # anomalous_data = statsdata[key]["series"][res > self.thres_params["step_func_res"]] # ============================================================================= elif model_id == 'exp_decay': err_score = np.sum(np.square(statsdata["perr"])) if err_score > self.thres_params["exp_decay_err"]: msgs.append(self.error_code["-4"]) res = stats_util.fitting_residual(self.t, self.series, stats_util.exp_decay, statsdata["popt"], mask_min=self.thres_params["min_res"], standardized=self.apply_policies["z_normalization"]) anomalous_t = self._clone_t[abs(res) > self.thres_params["exp_decay_res"]] anomalous_data = self.series[abs(res) > self.thres_params["exp_decay_res"]] res = res[abs(res) > self.thres_params["exp_decay_res"]] elif model_id == 'linear_regression': if statsdata["perr"] > self.thres_params["linregress_std_err"]: msgs.append(self.error_code["-5"]) func = lambda x, a, b: a + b*x res = stats_util.fitting_residual(self.t, self.series, func, statsdata["popt"], mask_min=self.thres_params["min_res"], standardized=self.apply_policies["z_normalization"]) anomalous_t = self._clone_t[abs(res) > self.thres_params["linregress_res"]] anomalous_data = self.series[abs(res) > self.thres_params["linregress_res"]] res = res[abs(res) > self.thres_params["linregress_res"]] # Extra info if stats_util.is_oscillating(self.series): msgs.append(self.error_code["-6"]) if self.apply_policies["boxcox"]: msgs.append(self.error_code["-7"]) if self.apply_policies["z_normalization"]: msgs.append(self.error_code["-8"]) discontinuity = len(stats_util.discontinuous_idx(self.series)) if discontinuity > 0: msgs.append(self.error_code["-9"] %discontinuity) if len(anomalous_data) == 0: self.check_failed = False msgs.append(self.error_code["0"]) if self.apply_policies["abs_residual"]: res = abs(res) if self.apply_policies["full_return"]: statsdata["popt"] = self._popt_dictionize(model_id, statsdata["popt"]) for key, value in statsdata.items(): if isinstance(value, np.ndarray): statsdata[key] = value.tolist() check_result = CheckResult( model=statsdata["model"], popt=statsdata["popt"], perr=statsdata["perr"], anomalous_data=list(zip(anomalous_t, anomalous_data)), residual=res, extra_info=msgs ) return check_result
def _popt_dictionize(self, model_id, popt): if model_id == 'gaussian': popt_type = np.dtype([('a',float), ('mean',float), ('std',float)]) elif model_id == 'linear_regression': popt_type = np.dtype([('intercept',float), ('slope',float)]) elif model_id == 'step_func': popt_type = np.dtype([('a',float), ('b',float), ('x0',float)]) elif model_id == 'exp_decay': popt_type = np.dtype([('a',float), ('alpha',float)]) struc_popt = popt.view(dtype=popt_type) return [dict(zip(struc_popt.dtype.names,i)) for i in struc_popt][0]
[docs]class CheckResult(dict): def __getattr__(self, name): try: return self[name] except KeyError: raise AttributeError(name) __setattr__ = dict.__setitem__ __delattr__ = dict.__delitem__ def __repr__(self): if self.keys(): m = max(map(len, list(self.keys()))) + 1 return '\n'.join([k.rjust(m) + ': ' + repr(v) for k, v in sorted(self.items())]) else: return self.__class__.__name__ + "()" def __dir__(self): return list(self.keys())
[docs]class AnomalousData(CheckResult): def __init__(self): super(AnomalousData, self).__init__()