-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathcount.py
130 lines (105 loc) · 3.58 KB
/
count.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#!/usr/bin/env python3
"""
Polyglot v3 node server Example 1
Copyright (C) 2021 Robert Paauwe
MIT License
"""
import udi_interface
import sys
import time
LOGGER = udi_interface.LOGGER
Custom = udi_interface.Custom
polyglot = None
Parameters = None
n_queue = []
count = 0
'''
TestNode is the device class. Our simple counter device
holds two values, the count and the count multiplied by a user defined
multiplier. These get updated at every shortPoll interval
'''
class TestNode(udi_interface.Node):
id = 'test'
drivers = [
{'driver': 'ST', 'value': 1, 'uom': 2},
{'driver': 'GV0', 'value': 0, 'uom': 56},
{'driver': 'GV1', 'value': 0, 'uom': 56},
]
def noop(self):
LOGGER.info('Discover not implemented')
commands = {'DISCOVER': noop}
'''
node_queue() and wait_for_node_event() create a simple way to wait
for a node to be created. The nodeAdd() API call is asynchronous and
will return before the node is fully created. Using this, we can wait
until it is fully created before we try to use it.
'''
def node_queue(data):
n_queue.append(data['address'])
def wait_for_node_event():
while len(n_queue) == 0:
time.sleep(0.1)
n_queue.pop()
'''
Read the user entered custom parameters. In this case, it is just
the 'multiplier' value. Save the parameters in the global 'Parameters'
'''
def parameterHandler(params):
global Parameters
Parameters.load(params)
'''
This is where the real work happens. When we get a shortPoll, increment the
count, report the current count in GV0 and the current count multiplied by
the user defined value in GV1. Then display a notice on the dashboard.
'''
def poll(polltype):
global count
global Parameters
if 'shortPoll' in polltype:
if Parameters['multiplier'] is not None:
mult = int(Parameters['multiplier'])
else:
mult = 1
node = polyglot.getNode('my_address')
if node is not None:
count += 1
node.setDriver('GV0', count, True, True)
node.setDriver('GV1', (count * mult), True, True)
# be fancy and display a notice on the polyglot dashboard
polyglot.Notices['count'] = 'Current count is {}'.format(count)
'''
When we are told to stop, we update the node's status to False. Since
we don't have a 'controller', we have to do this ourselves.
'''
def stop():
nodes = polyglot.getNodes()
for n in nodes:
nodes[n].setDriver('ST', 0, True, True)
polyglot.stop()
if __name__ == "__main__":
try:
polyglot = udi_interface.Interface([])
polyglot.start()
Parameters = Custom(polyglot, 'customparams')
# subscribe to the events we want
polyglot.subscribe(polyglot.CUSTOMPARAMS, parameterHandler)
polyglot.subscribe(polyglot.ADDNODEDONE, node_queue)
polyglot.subscribe(polyglot.STOP, stop)
polyglot.subscribe(polyglot.POLL, poll)
# Start running
polyglot.ready()
polyglot.setCustomParamsDoc()
polyglot.updateProfile()
'''
Here we create the device node. In a real node server we may
want to try and discover the device or devices and create nodes
based on what we find. Here, we simply create our node and wait
for the add to complete.
'''
node = TestNode(polyglot, 'my_address', 'my_address', 'Counter')
polyglot.addNode(node)
wait_for_node_event()
# Just sit and wait for events
polyglot.runForever()
except (KeyboardInterrupt, SystemExit):
sys.exit(0)