-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsensor_interface.py
250 lines (194 loc) · 8.78 KB
/
sensor_interface.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
import copy
import logging
import numpy as np
import os
import time
from threading import Thread
from queue import Queue
from queue import Empty
import carla
from srunner.scenariomanager.carla_data_provider import CarlaDataProvider
from srunner.scenariomanager.timer import GameTime
def threaded(fn):
def wrapper(*args, **kwargs):
thread = Thread(target=fn, args=args, kwargs=kwargs)
thread.setDaemon(True)
thread.start()
return thread
return wrapper
class SensorConfigurationInvalid(Exception):
"""
Exceptions thrown when the sensors used by the agent are not allowed for that specific submissions
"""
def __init__(self, message):
super(SensorConfigurationInvalid, self).__init__(message)
class SensorReceivedNoData(Exception):
"""
Exceptions thrown when the sensors used by the agent take too long to receive data
"""
def __init__(self, message):
super(SensorReceivedNoData, self).__init__(message)
class GenericMeasurement(object):
def __init__(self, data, frame):
self.data = data
self.frame = frame
class BaseReader(object):
def __init__(self, vehicle, reading_frequency=1.0):
self._vehicle = vehicle
self._reading_frequency = reading_frequency
self._callback = None
self._run_ps = True
self.run()
def __call__(self):
pass
@threaded
def run(self):
first_time = True
latest_time = GameTime.get_time()
while self._run_ps:
if self._callback is not None:
current_time = GameTime.get_time()
# Second part forces the sensors to send data at the first tick, regardless of frequency
if current_time - latest_time > (1 / self._reading_frequency) \
or (first_time and GameTime.get_frame() != 0):
self._callback(GenericMeasurement(self.__call__(), GameTime.get_frame()))
latest_time = GameTime.get_time()
first_time = False
else:
time.sleep(0.001)
def listen(self, callback):
# Tell that this function receives what the producer does.
self._callback = callback
def stop(self):
self._run_ps = False
def destroy(self):
self._run_ps = False
class SpeedometerReader(BaseReader):
"""
Sensor to measure the speed of the vehicle.
"""
MAX_CONNECTION_ATTEMPTS = 10
def _get_forward_speed(self, transform=None, velocity=None):
""" Convert the vehicle transform directly to forward speed """
if not velocity:
velocity = self._vehicle.get_velocity()
if not transform:
transform = self._vehicle.get_transform()
vel_np = np.array([velocity.x, velocity.y, velocity.z])
pitch = np.deg2rad(transform.rotation.pitch)
yaw = np.deg2rad(transform.rotation.yaw)
orientation = np.array([np.cos(pitch) * np.cos(yaw), np.cos(pitch) * np.sin(yaw), np.sin(pitch)])
speed = np.dot(vel_np, orientation)
return speed
def __call__(self):
""" We convert the vehicle physics information into a convenient dictionary """
# protect this access against timeout
attempts = 0
while attempts < self.MAX_CONNECTION_ATTEMPTS:
try:
velocity = self._vehicle.get_velocity()
transform = self._vehicle.get_transform()
break
except Exception:
attempts += 1
time.sleep(0.2)
continue
return {'speed': self._get_forward_speed(transform=transform, velocity=velocity)}
class OpenDriveMapReader(BaseReader):
def __call__(self):
return {'opendrive': CarlaDataProvider.get_map().to_opendrive()}
class CallBack(object):
def __init__(self, tag, sensor_type, sensor, data_provider):
self._tag = tag
self._data_provider = data_provider
self._data_provider.register_sensor(tag, sensor_type, sensor)
def __call__(self, data):
if isinstance(data, carla.libcarla.Image):
self._parse_image_cb(data, self._tag)
elif isinstance(data, carla.libcarla.LidarMeasurement):
self._parse_lidar_cb(data, self._tag)
elif isinstance(data, carla.libcarla.SemanticLidarMeasurement):
self._parse_semantic_lidar_cb(data, self._tag)
elif isinstance(data, carla.libcarla.RadarMeasurement):
self._parse_radar_cb(data, self._tag)
elif isinstance(data, carla.libcarla.GnssMeasurement):
self._parse_gnss_cb(data, self._tag)
elif isinstance(data, carla.libcarla.IMUMeasurement):
self._parse_imu_cb(data, self._tag)
elif isinstance(data, GenericMeasurement):
self._parse_pseudosensor(data, self._tag)
else:
logging.error('No callback method for this sensor.')
# Parsing CARLA physical Sensors
def _parse_image_cb(self, image, tag):
array = np.frombuffer(image.raw_data, dtype=np.dtype("uint8"))
array = copy.deepcopy(array)
array = np.reshape(array, (image.height, image.width, 4))
self._data_provider.update_sensor(tag, array, image.frame)
def _parse_lidar_cb(self, lidar_data, tag):
points = np.frombuffer(lidar_data.raw_data, dtype=np.dtype('f4'))
points = copy.deepcopy(points)
points = np.reshape(points, (int(points.shape[0] / 4), 4))
self._data_provider.update_sensor(tag, points, lidar_data.frame)
def _parse_semantic_lidar_cb(self, semantic_lidar_data, tag):
points = np.frombuffer(semantic_lidar_data.raw_data, dtype=np.dtype('f4'))
points = copy.deepcopy(points)
points = np.reshape(points, (int(points.shape[0] / 6), 6))
self._data_provider.update_sensor(tag, points, semantic_lidar_data.frame)
def _parse_radar_cb(self, radar_data, tag):
# [depth, azimuth, altitute, velocity]
points = np.frombuffer(radar_data.raw_data, dtype=np.dtype('f4'))
points = copy.deepcopy(points)
points = np.reshape(points, (int(points.shape[0] / 4), 4))
points = np.flip(points, 1)
self._data_provider.update_sensor(tag, points, radar_data.frame)
def _parse_gnss_cb(self, gnss_data, tag):
array = np.array([gnss_data.latitude,
gnss_data.longitude,
gnss_data.altitude], dtype=np.float64)
self._data_provider.update_sensor(tag, array, gnss_data.frame)
def _parse_imu_cb(self, imu_data, tag):
array = np.array([imu_data.accelerometer.x,
imu_data.accelerometer.y,
imu_data.accelerometer.z,
imu_data.gyroscope.x,
imu_data.gyroscope.y,
imu_data.gyroscope.z,
imu_data.compass,
], dtype=np.float64)
self._data_provider.update_sensor(tag, array, imu_data.frame)
def _parse_pseudosensor(self, package, tag):
self._data_provider.update_sensor(tag, package.data, package.frame)
class SensorInterface(object):
def __init__(self):
self._sensors_objects = {}
self._data_buffers = {}
self._new_data_buffers = Queue()
self._queue_timeout = 10 # default: 10
# Only sensor that doesn't get the data on tick, needs special treatment
self._opendrive_tag = None
def register_sensor(self, tag, sensor_type, sensor):
if tag in self._sensors_objects:
raise SensorConfigurationInvalid("Duplicated sensor tag [{}]".format(tag))
self._sensors_objects[tag] = sensor
if sensor_type == 'sensor.opendrive_map':
self._opendrive_tag = tag
def update_sensor(self, tag, data, timestamp):
# print("Updating {} - {}".format(tag, timestamp))
if tag not in self._sensors_objects:
raise SensorConfigurationInvalid("The sensor with tag [{}] has not been created!".format(tag))
self._new_data_buffers.put((tag, timestamp, data))
def get_data(self):
try:
data_dict = {}
while len(data_dict.keys()) < len(self._sensors_objects.keys()):
# Don't wait for the opendrive sensor
if self._opendrive_tag and self._opendrive_tag not in data_dict.keys() \
and len(self._sensors_objects.keys()) == len(data_dict.keys()) + 1:
# print("Ignoring opendrive sensor")
break
sensor_data = self._new_data_buffers.get(True, self._queue_timeout)
data_dict[sensor_data[0]] = ((sensor_data[1], sensor_data[2]))
except Empty:
raise SensorReceivedNoData("A sensor took too long to send their data")
return data_dict