-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathserial_connection.py
45 lines (32 loc) · 1.02 KB
/
serial_connection.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from serial import Serial
from threading import Thread
SERIAL_PORT = "/dev/tty.HC-06-DevB"
SENSOR = 'sensor'
MOTOR = 'motor'
class SerialWrapper():
def __init__(self):
self.serial_connection = Serial(SERIAL_PORT, 9600, timeout=30)
def write(self, var):
if self.serial_connection:
self.serial_connection.write(bytes(var, 'utf-8'))
def readline(self):
if self.serial_connection:
line = self.serial_connection.readline()
return line.decode('utf-8').strip() if line.strip() != b'' else None
return None
def close(self):
if self.serial_connection:
self.serial_connection.close()
ser = SerialWrapper()
class InputThread(Thread):
def __init__(self):
super(InputThread, self).__init__()
self.stopped = False
def run(self):
while(not self.stopped):
message = ser.readline()
if message:
print(message)
inputThread = InputThread()
inputThread.start()
inputThread.join()