-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathopenxc_thread.py
166 lines (133 loc) · 6.07 KB
/
openxc_thread.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
import bluetooth
import threading
import datetime
import random
import time
import json
# THE MODEL OF THE OPENXC THAT IS GOING TO BE CONNECTED TO THIS DEVICE
DEVICE_NAME = "OpenXC-VI-57E3"
class OpenXCDataThread(threading.Thread):
engine_speed = None
vehicle_speed = None
transmission_gear_position = None
accelerator_pedal_position = None
steering_wheel_angle = None
brake_pedal_status = None
fuel_consumed = None
fuel_level = None
high_beam_status = None
ignition_status = None
latitude = None
longitude = None
odometer = None
parking_brake_status = None
torque_at_transmission = None
message = "Starting up..."
def __init__(self):
threading.Thread.__init__(self)
self.engine_speed = list()
self.vehicle_speed = list()
self.transmission_gear_position = list()
self.accelerator_pedal_position = list()
self.steering_wheel_angle = list()
self.brake_pedal_status = list()
self.fuel_consumed = list()
self.fuel_level = list()
self.high_beam_status = list()
self.ignition_status = list()
self.latitude = list()
self.longitude = list()
self.odometer = list()
self.parking_brake_status = list()
self.torque_at_transmission = list()
def run(self):
target_address = None
self.message = "Searching for a paired device called \"" + DEVICE_NAME + "\"."
nearby_devices = bluetooth.discover_devices()
for bdaddr in nearby_devices:
if DEVICE_NAME == bluetooth.lookup_name(bdaddr):
target_address = bdaddr
break
else:
print(bluetooth.lookup_name(bdaddr))
target_found = True
if target_address is not None:
self.message = "Found target bluetooth device with address " + target_address
else:
self.message = "Could not find target bluetooth device nearby"
if target_found:
port = 1
connected = False
try_count = 1
while not connected:
self.message = "Trying to connect, try "+str(try_count)
try_count += 1
sock = bluetooth.BluetoothSocket(bluetooth.RFCOMM)
try:
sock.connect((target_address, port))
connected = True
except IOError:
print("Failed to connect")
self.message = "Successfully accessed device, waiting for data."
while True:
data = ''
char = ''
while len(char) == 0 or chr(char[0]) != '}':
char = sock.recv(1)
if len(char) > 0 and char[0] != 0:
data += chr(char[0])
decoded = False
try:
jdata = json.loads(data)
decoded = True
except:
print("Error decoding data. Does not appear to be JSON format.")
if decoded:
if jdata['name'] == 'engine_speed':
self.engine_speed.append(
(time.time() * 1000, jdata['value']))
elif jdata['name'] == 'vehicle_speed':
self.vehicle_speed.append(
(time.time() * 1000, jdata['value']))
elif jdata['name'] == 'transmission_gear_position':
self.transmission_gear_position.append(
(time.time() * 1000, jdata['value']))
elif jdata['name'] == 'accelerator_pedal_position':
self.accelerator_pedal_position.append(
(time.time() * 1000, jdata['value']))
elif jdata['name'] == 'steering_wheel_angle':
self.steering_wheel_angle.append(
(time.time() * 1000, jdata['value']))
elif jdata['name'] == 'brake_pedal_status':
self.brake_pedal_status.append(
(time.time() * 1000, jdata['value']))
elif jdata['name'] == 'fuel_consumed':
self.fuel_consumed.append(
(time.time() * 1000, jdata['value']))
elif jdata['name'] == 'fuel_level ':
self.fuel_level.append(
(time.time() * 1000, jdata['value']))
elif jdata['name'] == 'fuel_consumed':
self.fuel_consumed.append(
(time.time() * 1000, jdata['value']))
elif jdata['name'] == 'high_beam_status':
self.high_beam_status.append(
(time.time() * 1000, jdata['value']))
elif jdata['name'] == 'ignition_status':
self.ignition_status.append(
(time.time() * 1000, jdata['value']))
elif jdata['name'] == 'latitude':
self.latitude.append(
(time.time() * 1000, jdata['value']))
elif jdata['name'] == 'longitude':
self.longitude.append(
(time.time() * 1000, jdata['value']))
elif jdata['name'] == 'odometer':
self.odometer.append(
(time.time() * 1000, jdata['value']))
elif jdata['name'] == 'parking_brake_status':
self.parking_brake_status.append(
(time.time() * 1000, jdata['value']))
elif jdata['name'] == 'torque_at_transmission':
self.torque_at_transmission.append(
(time.time() * 1000, jdata['value']))