-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathoutlier_detector.py
75 lines (49 loc) · 1.88 KB
/
outlier_detector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
import numpy as np
from bico import BICO
class OutlierDetector:
def __init__(self, clustering_model,thresh=50,n_update=100,remove_outliers=False):
self.clustering_model=clustering_model
self.thresh=thresh
self.n_update=n_update
self.remove_outliers=remove_outliers
self.batch_counter=0
self.last_prediction=None
#self.y
def partial_fit(self, X, y=None, classes=None):
#------------------------------- detecting pretrain phase--------
#----------------------TODO (the right way)
if len(X)>=100:
#self.clustering_model.partial_fit(X)
self.clustering_model.fit(X)
return self
self.batch_counter += 1
if self.batch_counter<self.n_update:
if self.remove_outliers:
ind=(self.last_prediction==0)# the indecies of non outliers
self.clustering_model.partial_fit(X[ind])
else:
self.clustering_model.partial_fit(X)
else:
if self.remove_outliers:
ind=(self.last_prediction==0)# the indecies of non outliers
self.clustering_model.fit(X[ind])
else:
self.clustering_model.fit(X)
return self
def predict(self, X):
y=self.clustering_model.predict(X)
dist=self.clustering_model.transform(X)
min_dist=dist[range(len(dist)),y]
logic_classification=(min_dist>=self.thresh)
outlier_pred=np.array([1 if p else 0 for p in logic_classification ])
if self.remove_outliers:
self.last_prediction=outlier_pred
return outlier_pred
class ConstantClassifier:
def __init__(self):
pass
def partial_fit(self, X, y=None, classes=None):
return self
def predict(self, X):
N, D = X.shape
return np.zeros(N)