-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathwann_train.py
298 lines (236 loc) · 10.1 KB
/
wann_train.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
import os
import sys
import time
import math
import argparse
import subprocess
import numpy as np
np.set_printoptions(precision=2, linewidth=160)
# MPI
from mpi4py import MPI
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
from wann_src import * # WANN evolution
from domain import * # Task environments
# -- Run NeuroEvolution (NE) -------------------------------------------- -- #
def master():
""" Main WANN optimization script """
global fileName, hyp
data = DataGatherer(fileName, hyp) # Initializes data object for logging
wann = Wann(hyp)
# Main EA optimization loop
for gen in range(hyp['maxGen']):
pop = wann.ask() # Get newly evolved population of individuals
reward = batchMpiEval(pop) # Send the population to evaluate and return fitness
wann.tell(reward) # Update WANN object with the fitness values
data = gatherData(data, wann, gen, hyp) # Safe data of current generation
print(gen, '\t - \t', data.display())
# Clean up and data gathering at end of run
data = gatherData(data, wann, gen, hyp, savePop=True)
data.save()
data.savePop(wann.pop, fileName)
stopAllWorkers()
def gatherData(data, wann, gen, hyp, savePop=False):
"""Collects run data, saves it to disk, and exports pickled population
Args:
data - (DataGatherer) - collected run data
wann - (Wann) - neat algorithm container
.pop - (Ind) - list of individuals in population
.species - (Species) - current species
gen - (int) - current generation
hyp - (dict) - algorithm hyperparameters
savePop - (bool) - save current population to disk?
Return:
data - (DataGatherer) - updated run data
"""
data.gatherData(wann.pop, wann.species)
if (gen % hyp['save_mod']) == 0:
# data = checkBest(data, bestReps=16)
data = checkBest(data)
data.save(gen)
if savePop is True: # Get a sample pop to play with in notebooks
global fileName
pref = 'log/' + fileName
import pickle
with open(pref + '_pop.obj', 'wb') as fp:
pickle.dump(wann.pop, fp)
return data
def checkBest(data):
"""Checks better performing individual if it performs over many trials.
Test a new 'best' individual with many different seeds to see if it really
outperforms the current best.
Args:
data - (DataGatherer) - collected run data
Return:
data - (DataGatherer) - collected run data with best individual updated
* This is a bit hacky, but is only for data gathering, and not optimization
"""
global filename, hyp
if data.newBest is True:
bestReps = max(hyp['bestReps'], (nWorker - 1))
rep = np.tile(data.best[-1], bestReps)
fitVector = batchMpiEval(rep, sameSeedForEachIndividual=False)
trueFit = np.mean(fitVector)
if trueFit > data.best[-2].fitness: # Actually better!
data.best[-1].fitness = trueFit
data.fit_top[-1] = trueFit
data.bestFitVec = fitVector
else: # Just lucky!
prev = hyp['save_mod']
data.best[-prev:] = data.best[-prev]
data.fit_top[-prev:] = data.fit_top[-prev]
data.newBest = False
return data
# -- Parallelization ----------------------------------------------------- -- #
def batchMpiEval(pop, sameSeedForEachIndividual=True):
"""Sends population to workers for evaluation one batch at a time.
Args:
pop - [Ind] - list of individuals
.wMat - (np_array) - weight matrix of network
[N X N]
.aVec - (np_array) - activation function of each node
[N X 1]
Optional:
sameSeedForEachIndividual - (bool) - use same seed for each individual?
Return:
reward - (np_array) - fitness value of each individual
[N X 1]
Todo:
* Asynchronous evaluation instead of batches
"""
global nWorker, hyp
nSlave = nWorker - 1
nJobs = len(pop)
nBatch = math.ceil(nJobs / nSlave) # First worker is master
# Set same seed for each individual
if sameSeedForEachIndividual is False:
seed = np.random.randint(1000, size=nJobs)
else:
seed = np.random.randint(1000)
reward = np.empty((nJobs, hyp['alg_nVals']), dtype=np.float64)
i = 0 # Index of fitness we are filling
for iBatch in range(nBatch): # Send one batch of individuals
for iWork in range(nSlave): # (one to each worker if there)
if i < nJobs:
wVec = pop[i].wMat.flatten()
n_wVec = np.shape(wVec)[0]
aVec = pop[i].aVec.flatten()
n_aVec = np.shape(aVec)[0]
comm.send(n_wVec, dest=(iWork) + 1, tag=1)
comm.Send(wVec, dest=(iWork) + 1, tag=2)
comm.send(n_aVec, dest=(iWork) + 1, tag=3)
comm.Send(aVec, dest=(iWork) + 1, tag=4)
if sameSeedForEachIndividual is False:
comm.send(seed.item(i), dest=(iWork) + 1, tag=5)
else:
comm.send(seed, dest=(iWork) + 1, tag=5)
else: # message size of 0 is signal to shutdown workers
n_wVec = 0
comm.send(n_wVec, dest=(iWork) + 1)
i = i + 1
# Get fitness values back for that batch
i -= nSlave
for iWork in range(1, nSlave + 1):
if i < nJobs:
workResult = np.empty(hyp['alg_nVals'], dtype='d')
comm.Recv(workResult, source=iWork)
reward[i, :] = workResult
i += 1
return reward
def slave():
"""Evaluation process: evaluates networks sent from master process.
PseudoArgs (recieved from master):
wVec - (np_array) - weight matrix as a flattened vector
[1 X N**2]
n_wVec - (int) - length of weight vector (N**2)
aVec - (np_array) - activation function of each node
[1 X N] - stored as ints, see applyAct in ann.py
n_aVec - (int) - length of activation vector (N)
seed - (int) - random seed (for consistency across workers)
PseudoReturn (sent to master):
result - (float) - fitness value of network
"""
global hyp
task = Task(games[hyp['task']], nReps=hyp['alg_nReps']) # Initialize task
# The function enters an infinite loop where it waits to receive data from the master process
# Evaluate any weight vectors sent this way
while True:
n_wVec = comm.recv(source=0, tag=1) # how long is the array that's coming?
if n_wVec > 0:
wVec = np.empty(n_wVec, dtype='d') # allocate space to receive weights
comm.Recv(wVec, source=0, tag=2) # recieve weights
n_aVec = comm.recv(source=0, tag=3) # how long is the array that's coming?
aVec = np.empty(n_aVec, dtype='d') # allocate space to receive activation
comm.Recv(aVec, source=0, tag=4) # recieve it
seed = comm.recv(source=0, tag=5) # random seed as int
# Evaluate the received network using the task.getDistFitness method, which computes the fitness of the network.
result = task.getDistFitness(wVec, aVec, hyp, nVals=hyp['alg_nVals'], seed=seed)
comm.Send(result, dest=0) # send it back
if n_wVec < 0: # End signal recieved
print('Worker # ', rank, ' shutting down.')
break
def stopAllWorkers():
"""Sends signal to all workers to shutdown."""
global nWorker
nSlave = nWorker - 1
print('stopping workers')
for iWork in range(nSlave):
comm.send(-1, dest=(iWork) + 1, tag=1)
def mpi_fork(n):
"""Re-launches the current script with workers
Returns "parent" for original parent, "child" for MPI children
(from https://github.com/garymcintire/mpi_util/)
"""
if n <= 1:
return "child"
if os.getenv("IN_MPI") is None:
env = os.environ.copy()
env.update(
MKL_NUM_THREADS="1",
OMP_NUM_THREADS="1",
IN_MPI="1"
)
# print( ["mpirun", "-np", str(n), sys.executable] + sys.argv)
# subprocess.check_call(["mpirun", "-np", str(n), sys.executable] +['-u']+ sys.argv, env=env)
# TL: Replace 'mpirun' with 'mpiexec' for compatibility with Microsoft MPI
print(["mpiexec", "-np", str(n), sys.executable] + sys.argv)
subprocess.check_call(["mpiexec", "-np", str(n), sys.executable] + ['-u'] + sys.argv, env=env)
return "parent"
else:
global nWorker, rank
nWorker = comm.Get_size()
rank = comm.Get_rank()
# print('assigning the rank and nworkers', nWorker, rank)
return "child"
# -- Input Parsing ------------------------------------------------------- -- #
def main(argv):
"""Handles command line input, launches optimization or evaluation script
depending on MPI rank.
"""
global fileName, hyp
fileName = args.outPrefix
hyp_default = args.default
hyp_adjust = args.hyperparam
hyp = loadHyp(pFileName=hyp_default)
updateHyp(hyp, hyp_adjust)
# Launch main thread and workers
if rank == 0:
master() # Main optimization process
else:
slave() # Evaluation process
if __name__ == "__main__":
''' Parse input and launch '''
# python wann_train.py -p p/sparse_mountain_car_conti.json -o smcc
parser = argparse.ArgumentParser(description='Evolve NEAT networks')
parser.add_argument('-d', '--default', type=str,
help='default hyperparameter file', default='p/default_wan.json')
parser.add_argument('-p', '--hyperparam', type=str,
help='hyperparameter file', default='p/sparse_mountain_car.json')
parser.add_argument('-o', '--outPrefix', type=str,
help='file name for result output', default='test')
parser.add_argument('-n', '--num_worker', type=int,
help='number of cores to use', default=4)
args = parser.parse_args()
# Use MPI if parallel
if "parent" == mpi_fork(args.num_worker + 1): os._exit(0)
main(args)