-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathio_service.py
executable file
·332 lines (256 loc) · 8.08 KB
/
io_service.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
#!/usr/bin/python3
# -*- coding: utf-8 -*-
import json
import os
import time
import threading
import sys
if sys.version_info[0] < 3:
import Queue as queue
else:
import queue
import paho.mqtt.client as mqtt
import gpio_next as gpio
class LED1x4(object):
leds = []
def __init__(self):
if not self.leds:
pins = [64, 65, 66, 67]
for i in range(4):
self.leds.append(gpio.Output(pins[i], default_value=1))
self.queue = queue.Queue()
threading.Thread(target=self._run, daemon=True).start()
def on_press(self):
self.leds[1].write(0)
self.leds[2].write(0)
time.sleep(0.05)
self.leds[3].write(0)
self.leds[0].write(0)
def on_release(self):
self.leds[3].write(1)
self.leds[0].write(1)
time.sleep(0.05)
self.leds[1].write(1)
self.leds[2].write(1)
def on_wakeup(self):
value = 0b1111000
for _ in range(4):
self.value(value)
value = value >> 1
time.sleep(0.5)
def on_listen(self):
self.value(0xF)
def on_wait(self):
value = 0b1010
self.repeat(value)
def on_finish(self):
self.value(0x0)
def same(self, value):
for i in range(4):
self.leds[i].write(value)
def raw(self, value):
for i in range(4):
self.leds[i].write((value >> i) & 1)
def value(self, value):
self.raw(~value)
def mask(self, value, mask):
value = 1 - value
for i in range(4):
if (mask >> i) & 1:
self.leds[i].write(value)
def repeat(self, value):
while self.queue.empty():
self.value(value)
value = ~value
time.sleep(0.5)
self.value(0x0)
def step(self):
step = 0
while self.queue.empty():
self.value(1 << step)
step = (step + 1) & 0x3
time.sleep(0.5)
def loop(self):
delta = 1
step = 0
while self.queue.empty():
self.value(1 << step)
if 0 == step:
delta = 1
elif 3 == step:
delta = -1
step += delta
time.sleep(0.5)
def wipe(self):
value = 0b0111
for _ in range(4):
self.value(value)
value = value >> 1
time.sleep(0.5)
def blink(self, mask=0xF):
value = 0
while self.queue.empty():
self.mask(value, mask)
value = 1 - value
time.sleep(0.5)
if value:
for i in range(4):
self.leds[i].write(1)
def call(self, func):
self.queue.put(func)
def _run(self):
while True:
func, args, kargs = self.queue.get()
func(*args, **kargs)
class LEDAgent(object):
leds = None
def __init__(self):
if self.leds is None:
LEDAgent.leds = LED1x4()
def __getattr__(self, attr):
func = getattr(self.leds, attr, None)
if func is None or not callable(func):
return func
def func_warp(*args, **kargs):
self.leds.call((func, args, kargs))
return func_warp
class Amplifier(object):
def __init__(self, power=2, mute=3):
self.power = gpio.Output(power)
self.mute = gpio.Output(mute)
def on(self):
# os.system('sunxi-pio -m PA2=1 && sunxi-pio -m PA3=1')
self.power.write(1)
self.mute.write(1)
def off(self):
self.mute.write(0)
self.power.write(0)
class HeyWifiService(object):
def __init__(self):
self.state = 0
def is_active(self):
return 0 == os.system('systemctl is-active hey_wifi.service')
def start(self):
return os.system('systemctl start hey_wifi.service')
def stop(self):
return os.system('systemctl stop hey_wifi.service')
leds = LEDAgent()
amplifier = Amplifier()
hey_wifi_service = HeyWifiService()
def button_task(mqttc):
button = gpio.Input(203)
result = button.wait(0)
if result and result[0] == 1:
print('wait for button release')
button.wait()
while True:
result = button.wait()
try:
mqttc.publish('/voicen/touch', json.dumps({'event': result[0], 'timestamp': result[1]}))
except Exception:
pass
if result and result[0] == 1:
leds.on_press()
t1 = result[1]
hey_wifi_active = hey_wifi_service.is_active()
for i in range(0, 4):
result = button.wait(1)
if result is not None:
break
if not hey_wifi_active:
leds.blink((2**(1 + i)) - 1)
else:
leds.value(0xF << (i + 1))
if result is None:
result = button.wait()
try:
mqttc.publish('/voicen/touch', json.dumps({'event': result[0], 'timestamp': result[1]}))
except Exception:
pass
t2 = result[1]
dt = t2 - t1
print(dt)
if dt >= 4:
if not hey_wifi_active:
hey_wifi_service.start()
else:
hey_wifi_service.stop()
continue
elif hey_wifi_active:
mask = 0xF >> hey_wifi_service.state
leds.mask(1, ~mask)
leds.blink(mask)
continue
leds.on_release()
def str2int(s):
if s.startswith('0x'):
return int(s, 16)
elif s.startswith('0b'):
return int(s, 2)
else:
return int(s)
def on_message(mqttc, obj, msg):
print(msg.topic)
if msg.topic == '/voicen/amp':
if msg.payload == b'1':
print('amp on')
amplifier.on()
else:
print('amp off')
amplifier.off()
elif msg.topic == '/voicen/leds/value':
leds.value(str2int(msg.payload.decode()))
elif msg.topic == '/voicen/leds/mode':
mode = msg.payload.decode()
if mode in ('on_press', 'on_release', 'on_wakeup', 'on_listen', 'on_wait', 'on_finish',
'step', 'loop', 'wipe', 'blink'):
func = getattr(leds, mode)
func()
elif msg.topic == '/voicen/hey_wifi':
hey_wifi_service.state = (int(msg.payload.decode()))
if hey_wifi_service.state:
mask = 0xF >> hey_wifi_service.state
leds.mask(1, ~mask)
leds.blink(mask)
else:
leds.value(0x0)
elif msg.topic == '/voicen/pcm/open':
pid = int(msg.payload.decode())
mqttc.pcm_apps.append(pid)
def amp_on():
if mqttc.pcm_apps:
print('amp on')
amplifier.on()
threading.Timer(0.1, amp_on).start()
elif msg.topic == '/voicen/pcm/close':
pid = int(msg.payload.decode())
if pid in mqttc.pcm_apps:
mqttc.pcm_apps.remove(pid)
def amp_off():
if not mqttc.pcm_apps:
print('amp off')
amplifier.off()
threading.Timer(0.1, amp_off).start()
def on_connect(mqttc, obj, flags, rc):
print('connected - rc: ' + str(rc))
mqttc.subscribe('/voicen/leds/#', 0)
mqttc.subscribe('/voicen/amp', 0)
mqttc.subscribe('/voicen/hey_wifi', 0)
mqttc.subscribe('/voicen/pcm/#', 0)
def on_publish(mqttc, obj, mid):
print("mid: " + str(mid))
def on_subscribe(mqttc, obj, mid, granted_qos):
print("Subscribed: " + str(mid) + " " + str(granted_qos))
# If you want to use a specific client id, use
# mqttc = mqtt.Client("client-id")
# but note that the client id must be unique on the broker. Leaving the client
# id parameter empty will generate a random id for you.
mqttc = mqtt.Client()
mqttc.on_message = on_message
mqttc.on_connect = on_connect
mqttc.on_publish = on_publish
mqttc.on_subscribe = on_subscribe
mqttc.connect('localhost', 1883, 60)
mqttc.pcm_apps = []
threading.Thread(target=button_task, args=(mqttc,), daemon=True).start()
mqttc.loop_forever()