-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathsensor.py
70 lines (63 loc) · 1.91 KB
/
sensor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
import serial
import threading
import time
import datetime
import pandas as pd
import os
# Global configuration
PORT = None
BAUDRATE = 9600
DELAY = 1
CSV_FILE = 'data.csv'
data_dict = {
'exception': '',
'location': ' No Location',
'date': '',
'temperature': '0.00',
'pressure': '0.00',
'humidity': '0.00',
'pm1': '0.00',
'pm2': '0.00',
'pm10': '0.00',
'gas': '0.00',
'tvoc': '0.00',
'eco2': '0.00'
}
def get_port():
global PORT
if PORT is None:
PORT = input("What port is your receiver: ")
return PORT
def read_sensor_data():
global data_dict
port = get_port()
while True:
try:
with serial.Serial(port=port, baudrate=BAUDRATE, timeout=1) as ser:
while True:
raw_data = ser.readline().decode('utf-8').strip()
data = raw_data.split(':')
if len(data) == 2:
key, value = data
if key in data_dict:
data_dict[key] = value
data_dict['date'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
else:
print("Unexpected data format: " + raw_data)
except serial.SerialException as e:
data_dict['exception'] = str(e)
data_dict['date'] = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
time.sleep(DELAY)
def to_csv():
global data_dict
while True:
try:
df = pd.DataFrame([data_dict])
df.to_csv(CSV_FILE, mode='a', header=not os.path.isfile(CSV_FILE), index=False)
time.sleep(DELAY)
except Exception as e:
print(f"Error in to_csv function: {e}")
time.sleep(DELAY)
PORT = get_port()
threading.Thread(target=read_sensor_data, daemon=True).start()
#threading.Thread(target=to_csv, daemon=True).start()