-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathPI.py
158 lines (140 loc) · 6.84 KB
/
PI.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
# Copyright (c) 2018, Raul Astudillo Marban
import numpy as np
from GPyOpt.acquisitions.base import AcquisitionBase
from GPyOpt.core.task.cost import constant_cost_withGradients
from scipy.stats import norm
from scipy.special import erfc
class PI(AcquisitionBase):
"""
Expected improvement acquisition function
:param model: GPyOpt class of model
:param space: GPyOpt class of domain
:param optimizer: optimizer of the acquisition. Should be a GPyOpt optimizer
:param cost_withGradients: function
:param jitter: positive value to make the acquisition more explorative.
"""
analytical_gradient_prediction = True
def __init__(self, model, space, optimizer=None, cost_withGradients=None, utility=None):
self.optimizer = optimizer
self.utility = utility
super(PI, self).__init__(model, space, optimizer, cost_withGradients=cost_withGradients)
if cost_withGradients == None:
self.cost_withGradients = constant_cost_withGradients
else:
print('LBC acquisition does now make sense with cost. Cost set to constant.')
self.cost_withGradients = constant_cost_withGradients
self.use_full_support = self.utility.parameter_dist.use_full_support
self.n_hyps_samples = min(10, self.model.number_of_hyps_samples())
self.jitter = 1e-6
def _compute_acq(self, X):
"""
Computes the Probability of Improvement at X.
"""
if self.use_full_support:
self.utility_params_samples = self.utility.parameter_dist.support
self.utility_param_dist = np.atleast_1d(self.utility.parameter_dist.prob_dist)
else:
self.utility_params_samples = self.utility.parameter_dist.sample(10)
X = np.atleast_2d(X)
marginal_acqX = self._marginal_acq(X, self.utility_params_samples)
if self.use_full_support:
acqX = np.matmul(marginal_acqX, self.utility_param_dist)
else:
acqX = np.sum(marginal_acqX, axis=1) / len(self.utility_params_samples)
acqX = np.reshape(acqX, (X.shape[0], 1))
return acqX
def _compute_acq_withGradients(self, X):
"""
Computes the Expected Improvement and its derivative (has a very easy derivative!)
"""
if self.use_full_support:
self.utility_params_samples = self.utility.parameter_dist.support
self.utility_param_dist = np.atleast_1d(self.utility.parameter_dist.prob_dist)
else:
self.utility_params_samples = self.utility.parameter_dist.sample(3)
X = np.atleast_2d(X)
marginal_acqX, marginal_dacq_dX = self._marginal_acq_with_gradient(X, self.utility_params_samples)
if self.use_full_support:
acqX = np.matmul(marginal_acqX, self.utility_param_dist)
dacq_dX = np.tensordot(marginal_dacq_dX, self.utility_param_dist, 1)
else:
acqX = np.sum(marginal_acqX, axis=1) / len(self.utility_params_samples)
dacq_dX = np.sum(marginal_dacq_dX, axis=2) / len(self.utility_params_samples)
acqX = np.reshape(acqX, (X.shape[0], 1))
dacq_dX = np.reshape(dacq_dX, X.shape)
return acqX, dacq_dX
def _marginal_acq(self, X, utility_params_samples):
L = len(utility_params_samples)
marginal_acqX = np.zeros((X.shape[0], L))
n_h = self.n_hyps_samples # Number of GP hyperparameters samples.
for h in range(n_h):
self.model.set_hyperparameters(h)
meanX, varX = self.model.predict(X)
marginal_best_so_far = self._marginal_best_so_far(utility_params_samples)
for l in range(L):
current_best = marginal_best_so_far[l]
for i in range(X.shape[0]):
mu = np.dot(utility_params_samples[l], meanX[:, i])
sigma = np.sqrt(np.dot(np.square(utility_params_samples[l]), varX[:, i]))
Phi = self._get_quantiles(current_best, mu, sigma)[1]
marginal_acqX[i, l] += Phi
marginal_acqX = marginal_acqX / n_h
return marginal_acqX
def _marginal_acq_with_gradient(self, X, utility_params_samples):
L = len(utility_params_samples)
marginal_acqX = np.zeros((X.shape[0], L))
marginal_dacq_dX = np.zeros((X.shape[0], X.shape[1], L))
n_h = self.n_hyps_samples # Number of GP hyperparameters samples.
for h in range(n_h):
self.model.set_hyperparameters(h)
meanX, varX = self.model.predict(X)
dmean_dX = self.model.posterior_mean_gradient(X)
dvar_dX = self.model.posterior_variance_gradient(X)
marginal_best_so_far = self._marginal_best_so_far(utility_params_samples)
for l in range(L):
current_best = marginal_best_so_far[l]
for i in range(X.shape[0]):
mu = np.dot(utility_params_samples[l], meanX[:, i])
sigma = np.sqrt(np.dot(np.square(utility_params_samples[l]), varX[:, i]))
phi, Phi, u = self._get_quantiles(current_best, mu, sigma)
marginal_acqX[i, l] += Phi
dmu_dX = np.squeeze(utility_params_samples[l]*dmean_dX[:, i, :])
dsigma_dX = np.squeeze((0.5 * np.square(utility_params_samples[l]) * dvar_dX[:, i, :]) / sigma)
marginal_dacq_dX[i, :, l] += (phi/sigma)*(dmu_dX - u*dsigma_dX )
marginal_acqX = marginal_acqX / n_h
marginal_dacq_dX = marginal_dacq_dX / n_h
return marginal_acqX, marginal_dacq_dX
def _marginal_best_so_far(self, utility_params_samples):
L = len(utility_params_samples)
marginal_best = np.empty(L)
muX_eval = self.model.posterior_mean_at_evaluated_points()
for l in range(L):
marginal_best[l] = np.amax(utility_params_samples[l]*muX_eval)
return marginal_best
def _get_utility_parameters_samples(self, n_samples=None):
if n_samples == None:
samples = self.utility.parameter_dist.support
else:
samples = self.utility.parameter_dist.sample(n_samples)
return samples
def _get_quantiles(self, fmax, m, s):
'''
Quantiles of the Gaussian distribution useful to determine the acquisition function values
:param acquisition_par: parameter of the acquisition function
:param fmin: current minimum.
:param m: vector of means.
:param s: vector of standard deviations.
'''
if isinstance(s, np.ndarray):
s[s < 1e-10] = 1e-10
elif s < 1e-10:
#print('test')
s = 1e-10
u = (m - (fmax + self.jitter)) / s
phi = np.exp(-0.5 * u ** 2) / np.sqrt(2 * np.pi)
Phi = 0.5 * erfc(-u / np.sqrt(2))
#print('test')
#print(Phi)
#Phi = norm.cdf(u)
#print(Phi)
return (phi, Phi, u)