forked from CAS-LRJ/ExpPAC
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgenetic_testing.py
204 lines (183 loc) · 9.37 KB
/
genetic_testing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
import os
import logging
import random
import yaml
from src.converter import Converter
import numpy as np
import pickle
from src.executer import Executer
import argparse
def genetic_mate(parent1, parent2, mutatation_p, bounds):
offspring = np.zeros_like(parent1)
for i in range(len(parent1)):
prob = random.random()
if prob <= mutatation_p:
## Gene Mutate
offspring[i] = random.random() * (bounds[1][i] - bounds[0][i]) + bounds[0][i]
else:
prob = random.random()
if prob <= 0.5:
offspring[i] = parent1[i]
else:
offspring[i] = parent2[i]
return offspring
def genetic_testing(config_file):
## Logging Module Initialize
logfile = os.path.basename(config_file).replace('.yaml', '_genetic.log')
logfile = os.path.join('logs', logfile)
# Remove all handlers associated with the root logger object.
for handler in logging.root.handlers[:]:
logging.root.removeHandler(handler)
logging.basicConfig(filename=logfile, format='%(asctime)s %(levelname)s:%(message)s', datefmt='%m/%d/%Y %I:%M:%S %p', level=logging.DEBUG)
logging.info('Task Start.... Loading Configs...')
root_cwd = os.getcwd()
with open(config_file, 'r') as f:
settings_all = yaml.safe_load(f)
## Load Sub Configs
settings_config = settings_all['config']
## Load Metainfo
settings_meta = settings_all['metainfo']
meta_splits = settings_meta['splits']
scenario_name = settings_meta['scenario_name']
scenario_id = settings_meta['scenario_id']
divided_dimension = settings_meta['divided_dimension']
with open(settings_config['scenario_config'], 'r') as f:
scenario_config = yaml.safe_load(f)
with open(settings_config['weather_config'], 'r') as f:
weather_config = yaml.safe_load(f)
example_json = os.path.join(root_cwd, settings_config['example_json'])
## Load Simulation Details
simulation_config = settings_all['simulation']
srunner_xml = os.path.join(root_cwd, simulation_config['srunner_xml'])
carla_port = simulation_config['port']
carla_traffic_port = simulation_config['traffic_port']
## Get the parameter order and the converter
parameter_order = []
for item in scenario_config:
if isinstance(scenario_config[item], list):
parameter_order.append(item)
for item in weather_config:
if isinstance(weather_config[item], list):
parameter_order.append(item)
parameter_order = sorted(parameter_order)
converter = Converter(scenario_config, weather_config, parameter_order)
## Calculate Current Parameter Bounds
split_branches = bin(meta_splits)[3:]
bounds = np.array([np.zeros(len(parameter_order)), np.ones(len(parameter_order))])
for parameter_name, split_branch in zip(divided_dimension, split_branches):
parameter_pos = parameter_order.index(parameter_name)
if split_branch == '0':
## Left Branch
bounds[1][parameter_pos] = (bounds[0][parameter_pos] + bounds[1][parameter_pos]) / 2
else:
## Right Branch
bounds[0][parameter_pos] = (bounds[0][parameter_pos] + bounds[1][parameter_pos]) / 2
## Get Root Dirs
running_result_rootdir = settings_all['data']['running_result_rootdir']
analyse_result_rootdir = settings_all['data']['analyse_result_rootdir']
save_rootdir = settings_all['data']['save_rootdir']
running_result_rootdir = os.path.join(root_cwd, running_result_rootdir)
analyse_result_rootdir = os.path.join(root_cwd, analyse_result_rootdir)
save_rootdir = os.path.join(root_cwd, save_rootdir)
## Main Procedure
verification_analyse_dir = os.path.join(analyse_result_rootdir, scenario_name+'_'+str(scenario_id))
verification_analyse_dir = os.path.join(verification_analyse_dir, str(meta_splits))
current_analyse_dir = os.path.join(analyse_result_rootdir, scenario_name+'_'+str(scenario_id))
current_analyse_dir = os.path.join(current_analyse_dir, str(meta_splits))
current_analyse_dir = os.path.join(current_analyse_dir, 'Genetic')
current_sim_dir = os.path.join(running_result_rootdir, scenario_name+'_'+str(scenario_id))
current_sim_dir = os.path.join(current_sim_dir, str(meta_splits))
current_sim_dir = os.path.join(current_sim_dir, 'Genetic')
os.makedirs(verification_analyse_dir, exist_ok=True)
os.makedirs(current_analyse_dir, exist_ok=True)
os.makedirs(current_sim_dir, exist_ok=True)
logging.info('Config Loaded...')
## Load Inital Population (init.pickle, init_pre.pickle)
population = []
fitness = []
init_pickle_file = os.path.join(verification_analyse_dir, 'init.pickle')
if os.path.exists(init_pickle_file):
with open(init_pickle_file, 'rb') as f:
init_data = pickle.load(f)
population += init_data['data']
fitness += init_data['value']
init_pre_pickle_file = os.path.join(verification_analyse_dir, 'init_pre.pickle')
if os.path.exists(init_pre_pickle_file):
with open(init_pre_pickle_file, 'rb') as f:
init_data_pre = pickle.load(f)
population += init_data_pre['data']
fitness += init_data_pre['value']
if len(population) == 0:
logging.warning('The initial population is empty. Please run the main framework first.')
## Future: Implement Initial Sampling if no init pickles found.
population = np.array(population)
fitness = np.array(fitness)
## Genetic Algorithm
## Mutatation Prob: 0.1
## Population Size: 100
## Save 10% Elites..
## Use Top 50% to produce Offspring..
## Roulotte Selection with Min-max Fitness Normalization
## 100 Iterations At Most...
## Future: Use Augment to set params..
## To-Do
elite_percent = 0.1
parent_percent = 0.5
population_size = 100
mutation_p = 0.2
# genetic_samples_dir = os.path.join(current_sim_dir, 'genetic')
genetic_json_dir = os.path.join(current_sim_dir, 'json')
genetic_log_dir = os.path.join(current_sim_dir, 'log')
for iter in range(100):
current_population_savefile = os.path.join(current_analyse_dir, 'iter_%d.pickle' % iter)
if os.path.exists(current_population_savefile):
logging.info('Population of iteration %d found. Skipping..' % iter)
with open(current_population_savefile, 'rb') as f:
data_iter = pickle.load(f)
population = np.array(data_iter['data'])
fitness = np.array(data_iter['value'])
else:
logging.info('Genetic Algorithm Iteration %d Start...' % iter)
genetic_json_savepath = os.path.join(genetic_json_dir, 'iter'+str(iter)+'_%d.json')
executer = Executer(srunner_xml, example_json, genetic_json_savepath, genetic_log_dir, carla_port, carla_traffic_port)
population_index = fitness.argsort()
population = population[population_index]
fitness = fitness[population_index]
next_population = []
next_fitness = []
## Elitism: Save {elite_percent}% Fittest Population
elite_size = int(population_size * elite_percent)
next_population.extend(population[:elite_size].tolist())
next_fitness.extend(fitness[:elite_size].tolist())
## Offspring Generation
## Use Top {parent_percent}% to produce offspring..
offspring_size = population_size - elite_size
parent_size = int(len(population) * parent_percent)
parents = population[:parent_size]
parents_fitness = fitness[:parent_size]
parents_prob = (parents_fitness.max() - parents_fitness) / (parents_fitness.max() - parents_fitness).sum()
for id in range(offspring_size):
parent1 = parents[np.random.choice(len(parents), 1, p=parents_prob).item()]
parent2 = parents[np.random.choice(len(parents), 1, p=parents_prob).item()]
offspring = np.array(genetic_mate(parent1, parent2, mutation_p, bounds))
logging.debug('Offspring %d: %s' % (id, str(offspring)))
scenario_config, weather_config = converter.convert(offspring)
executer.execute(scenario_config, weather_config, id)
value = executer.analyse(id)
logging.debug('Offspring Fitness %d: %.3f' % (id, value))
next_population.append(offspring.tolist())
next_fitness.append(value)
logging.info('Genetic Algorithm Iteration %d Ends..' % iter)
data_iter = dict()
data_iter['data'] = next_population
data_iter['value'] = next_fitness
with open(current_population_savefile, 'wb') as f:
pickle.dump(data_iter, f)
population = np.array(next_population)
fitness = np.array(next_fitness)
logging.info('Population Fitness: %.3f' % fitness.min())
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Genetic Testing for Autonoumous Driving Systems')
parser.add_argument('--case', type=str, default='PedestrianCrossing_1_1')
args = parser.parse_args()
genetic_testing(os.path.join('configs', args.case+'.yaml'))