This repository has been archived by the owner on Oct 26, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathevolution.py
413 lines (311 loc) · 11.8 KB
/
evolution.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
import numpy as np
import warnings
from rlagents.functions.decay import DecayBase, FixedDecay
class EvolutionaryBase(object):
def export(self):
raise NotImplementedError
def next_generation(self, batch, results):
raise NotImplementedError
class DefaultEvolution(EvolutionaryBase):
def next_generation(self, batch, results):
return batch
def export(self):
return {"Type": "Default"}
class CrossEntropy(EvolutionaryBase):
"""
Cross Entropy evolution selects the top x% of samples in a batch
and generates a new batch from the mean/std of the those samples
"""
def __init__(self, elite=0.2):
"""
:param elite: float
Percentage of samples selected to generate next batch.
Must be between 0 and 1
"""
self.elite = elite
@property
def elite(self):
return self._elite
@elite.setter
def elite(self, e):
if e < 0 or e > 1:
raise ValueError("Elite must be between 0 and 1 inclusive")
self._elite = e
def export(self):
return {"Type": "CrossEntropy",
"Elite": self.elite}
def next_generation(self, batch, results):
"""
Selects the top x% samples from the batch and generates
a new batch from the mean/std of the those samples
:param batch: list of models
:param results: list of rewards for each model
:return: list of models
"""
elite_n = int(len(batch) * self.elite)
batch_vals = [b.export_values() for b in batch]
# Select the x best scoring samples based on reverse score order
best = np.array([batch_vals[b] for b in np.argsort(results)[-elite_n:]])
# Update the mean/std based on new values
mean = best.mean(axis=0)
std = best.std(axis=0)
for i in range(len(batch)):
batch[i].import_values(np.random.normal(mean, std))
return batch
class GeneticAlgorithm(EvolutionaryBase):
"""
Genetic Algorithm works by selecting two parent samples, with better performing samples
having increased likelihood of being selected, and then swapping model values, along with a
(normally) small chance of that value being mutated.
"""
def __init__(self, crossover=0.3, mutation_rate=0.1, mutation_amount=0.02, scaling=1):
"""
:param crossover: float
Likelihood of switching parents to get the 'genes' from
Range (0, 1)
:param mutation_rate: float
Likelihood of the current gene being mutated
Range (0, 1)
:param mutation_amount: float
Percent to generate the standard deviation of the mutation from
Range (0, 3)
:param scaling: float
How much results are scaled so high scores appear a lot better than low scores
1 means no scaling applied
Range (0, 5)
"""
self.crossover = crossover
self.mutation_rate = mutation_rate
self.mutation_amount = mutation_amount
self.scaling = scaling
def export(self):
return {"Type": "GeneticAlgorithm",
"Cross Over": self.crossover,
"Mutation Rate": self.mutation_rate,
"Mutation Amount": self.mutation_amount,
"Scaling": self.scaling}
@property
def crossover(self):
return self._crossover
@crossover.setter
def crossover(self, c):
if c < 0 or c > 1:
raise ValueError("Crossover must be between 0 and 1 inclusive")
self._crossover = c
@property
def mutation_rate(self):
return self._mutation_rate
@mutation_rate.setter
def mutation_rate(self, mr):
if mr <= 0 or mr >= 1:
raise ValueError("Mutation Rate must be between 0 and 1")
self._mutation_rate = mr
@property
def mutation_amount(self):
return self._mutation_amount
@mutation_amount.setter
def mutation_amount(self, ma):
if ma <= 0 or ma > 3:
raise ValueError("Mutation Amount must be between 0 and 3")
self._mutation_amount = ma
@property
def scaling(self):
return self._scaling
@scaling.setter
def scaling(self, s):
if s < 0 or s > 5:
raise ValueError("Scaling must be between 0 and 5 inclusive")
self._scaling = s
def _scale_results(self, results):
"""
Normalises results within range [0, 1] and applies scaling
:param results: list of floats
:return: list of floats
"""
result_max = max(results) # Stops numbers growing too large to handle
result_min = min(results) - 0.001 # Stops result ranges that cover 0 causing issues with odd numbers scalings)
return [(r - result_min) ** self.scaling / (result_max - result_min) ** self.scaling for r in results]
def next_generation(self, batch, results):
"""
:param batch: list of x models
:param results: list of x floats
:return: list of x models
"""
results = self._scale_results(results)
gen_size = len(batch)
batch_vals = [b.export_values() for b in batch]
next_gen = []
for i in range(gen_size):
father = self._roulette(results)
mother = self._roulette(results)
child = self._create_child(batch_vals[father], batch_vals[mother])
next_gen.append(child)
for i in range(len(batch)):
batch[i].import_values(np.array(next_gen[i]))
return batch
def _create_child(self, father, mother):
"""
Selected genes from both the father and mother with some mutation
:param father: list of x floats
:param mother: list of x floats
:return: list of x floats
"""
current_sel = 1
child = []
for j in range(len(father)):
if np.random.uniform() <= self.crossover:
current_sel *= -1
if current_sel == 1:
child.append(father[j])
else:
child.append(mother[j])
if np.random.uniform() < self.mutation_rate:
child[j] = np.random.normal(child[j], abs(child[j] * self.mutation_amount))
return child
@staticmethod
def _roulette(results):
"""
Select index based on score
:param results: list of floats
:return: int
Index of parent to choose
"""
selection = np.random.uniform() * sum(results)
cum_sum = 0
for i, score in enumerate(results):
cum_sum += score
if cum_sum >= selection:
return i
class HillClimbing(EvolutionaryBase):
"""
Hill Climbing works by selecting the best sample from a batch, then
generating a new batch from that best sample
"""
def __init__(self, spread=None, accept_equal=False):
"""
:param spread: functions.DecayBase
Used to determine standard deviation of new best sample
:param accept_equal: boolean
Whether equal best will replace current best
"""
self.accept_equal = accept_equal
self.spread = spread
self.best_weights = None
self.best_score = None
def export(self):
return {"Type": "Hill Climbing",
"Accept Equal": self.accept_equal,
"Spread": self.spread.export()}
@property
def spread(self):
return self._spread
@spread.setter
def spread(self, s):
if s is None:
s = FixedDecay(0.05, 0, 0)
if not isinstance(s, DecayBase):
raise TypeError("Spread not a valid DecayBase")
self._spread = s
def is_new_best(self, best):
if self.best_weights is None:
return True
if best > self.best_score:
return True
if best == self.best_score and self.accept_equal:
return True
return False
def next_generation(self, batch, results):
"""
Finds the best sample
Updates the current best if needed
Creates new batch based on current best
:param batch: list of Models
:param results: list of floats
:return: list of Models
"""
best = np.argmax(results)
if self.is_new_best(results[best]):
self.best_weights = batch[best].export_values().copy()
self.best_score = results[best]
for i in range(len(batch)):
child = []
for j in range(len(self.best_weights)):
std = max(abs(self.best_weights[j] * self.spread.value), 0.001)
child.append(np.random.normal(self.best_weights[j], std))
batch[i].import_values(np.array(child).copy())
self.spread.update()
return batch
class SimulatedAnnealing(EvolutionaryBase):
"""
Simulated Annealing evolves a single sample by modifying one of it's values it and accepting the
updated value with a reducing probability. When it starts (temperature is high)
a lower value may be selected, but this reduces over time.
This is based on the process of hardening metal.
"""
def __init__(self, temperature=None, shift=None):
"""
:param temperature: functions.Decay
Influences likelihood of new sample being accepted
:param shift: functions.Decay
Amount to alter the randomly selected value by
"""
self.temperature = temperature
self.shift = shift
self.testing_vals = None
self.testing_result = 0
def export(self):
return {"Type": "Simulated Annealing",
"Temperature": self.temperature.export(),
"Shift": self.shift.export()}
@property
def temperature(self):
return self._temperature
@temperature.setter
def temperature(self, t):
if t is None:
t = FixedDecay(10, decay=0.997, minimum=0.1)
if not isinstance(t, DecayBase):
raise TypeError("Temperature must be of type DecayBase")
self._temperature = t
@property
def shift(self):
return self._shift
@shift.setter
def shift(self, s):
if s is None:
s = FixedDecay(1, decay=0.995, minimum=0.01)
if not isinstance(s, DecayBase):
raise TypeError("Shift must be of type DecayBase")
self._shift = s
@property
def testing_vals(self):
if self._testing_vals is None:
return None
return self._testing_vals.copy()
@testing_vals.setter
def testing_vals(self, t):
if t is None:
self._testing_vals = None
else:
self._testing_vals = t.copy()
def next_generation(self, batch, results):
if len(batch) > 1:
warnings.warn("Simulated Annealing should use batch size of 1")
batch_vals = batch[0].export_values()
result = results[0]
if self.testing_vals is None:
self.testing_vals = batch_vals
self.testing_result = result
acceptance_probability = np.e ** ((result - self.testing_result) / self.temperature.value)
# print self.testing_result, result, self.shift.value, self.temperature.value, acceptance_probability
if np.random.uniform() < acceptance_probability:
self.testing_vals = batch_vals
self.testing_result = result
batch[0].import_values(self.__create_next_test(self.testing_vals))
self.temperature.update()
self.shift.update()
return batch
def __create_next_test(self, batch_vals):
choice = np.random.randint(len(batch_vals))
batch_vals[choice] = np.random.normal(batch_vals[choice], abs(batch_vals[choice] * self.shift.value))
return batch_vals