-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain_laundry.py
executable file
·118 lines (97 loc) · 3.24 KB
/
main_laundry.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
""" Read washer and dryer CT sensor values. """
import time
from machine import ADC
from micropython import const
import xbee
# XBee ZigBee
xb = xbee.XBee()
# Sum ADC readings n times before transmitting.
Sum_count = 2
# Delay between transmitting summed ADC readings.
Transmit_sleep_s = 2
# Delay between consecutive ADC readings.
Readings_sleep_s = .01
#
ADC_pin = 'D1'
def wait_association():
"""Wait for association with network."""
while True:
ai = xb.atcmd('AI')
if ai == 0:
print('Associated')
break
else:
print('Not associated')
time.sleep(0.25)
def process_packet(payload):
"""Process rx_data packet.
Set global settings with data packet in format name:value.
e.g. Transmit_sleep_s:10"""
global Sum_count
global Transmit_sleep_s
global Readings_sleep_s
payload_split = payload.split(':')
if len(payload_split) == 2:
if payload_split[0] == 'Transmit_sleep_s':
Transmit_sleep_s = float(payload_split[1])
elif payload_split[0] == 'Sum_count':
Sum_count = int(payload_split[1])
elif payload_split[0] == 'Readings_sleep_s':
Readings_sleep_s = float(payload_split[1])
def wait_receive(Transmit_sleep_s):
"""Poll every delay_ms milliseconds for rx_data packet.
Wait for Transmit_sleep_s seconds before giving up."""
delay_ms = const(125)
ticks0 = time.ticks_ms()
test_ticks = lambda : time.ticks_diff(time.ticks_ms(), ticks0) >= (1000 * Transmit_sleep_s)
while True:
packet = xbee.receive()
if packet:
# Ignore broadcast.
if 'broadcast' in packet and not packet['broadcast']:
# Packet received; exit delay.
return packet
if test_ticks():
# No packet received within delay seconds.
return None
time.sleep_ms(delay_ms)
def transmit_data(val):
"""Transmit ADC values."""
msg = '{}={}\r'.format(ADC_pin, val)
xbee.transmit(xbee.ADDR_COORDINATOR, msg.encode())
def main():
"""Main laundry processing loop reads and transmits
ADC values and processes received packets."""
global ADC_pin
try:
with open('/flash/adc') as f:
# Read ADC pin name.
ADC_pin = f.read().strip()
print("Using ADC pin {}".format(ADC_pin))
except OSError:
print("Using default ADC pin {}".format(ADC_pin))
adc = ADC(ADC_pin)
while True:
# Take N readings and sum
adc_sum = 0
for i in range(Sum_count):
adc_sum += adc.read()
time.sleep(Readings_sleep_s)
print("adc=" + str(adc_sum))
# Transmit readings to coordinator.
transmit_data(adc_sum)
# Delay while polling for received packet.
packet = wait_receive(Transmit_sleep_s)
if packet and 'payload' in packet:
payload = packet['payload'].decode('UTF-8')
print(payload)
process_packet(payload)
if __name__ == '__main__':
# Process loop to handle exceptions and keep running.
while True:
try:
wait_association()
main()
except Exception as e:
print(e)
time.sleep(1)