-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathmessagelogger.py
163 lines (138 loc) · 4.02 KB
/
messagelogger.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
"""
Example script to watch MQTT messages
"""
import json
import base64
import argparse
import getpass
import time
from pyephember.pyephember import EphEmber, PointIndex
def ts_print(*stuff):
"""
print with timestamp
"""
return print(time.strftime("%d %b %Y %T", time.gmtime()), *stuff)
def process_point_data(pstr):
"""
Parse base64-encoded pointData into a dictionary
Keys are the indices
Values are (datatype, dotted_bytes, integer_value)
where datatype is 1, 2, 4, 5 (see API.md)
"""
lengths = {1: 1, 2: 2, 4: 2, 5: 4}
parsed = {}
mode = "wait"
datatype = None
index = None
value = []
def bytes_to_int(byte_data):
"""
Convert bytes to an integer, naively
"""
result = 0
for a_byte in byte_data:
result = result * 256 + int(a_byte)
return result
for number in base64.b64decode(pstr):
if mode == "wait":
assert number == 0
mode = "index"
continue
if mode == "index":
index = number
mode = "datatype"
continue
if mode == "datatype":
datatype = number
if datatype not in lengths:
raise RuntimeError("Unknown datatype: {}".format(datatype))
mode = "value"
continue
if mode == "value":
value.append(number)
try:
index_lookup = PointIndex(index).name
except ValueError:
index_lookup = 'UNKNOWN'
if len(value) == lengths[datatype]:
parsed[index] = (
index_lookup,
datatype,
".".join([str(x) for x in value]),
bytes_to_int(value)
)
value = []
mode = "wait"
continue
return parsed
# pylint: disable=unused-argument
def on_log(client, userdata, level, buf):
"""
Simple callback for logging
"""
if args.show_log:
ts_print("log: ({})".format(",".join(
[str(x) for x in (userdata, level, buf)]
)))
def on_connect(client, userdata, flags, result_code):
"""
Simple callback on MQTT connection
"""
ts_print("Connected with result code:", result_code)
client.subscribe(POINTDATA_TOPIC_UPLOAD, 0)
def on_subscribe(client, userdata, mid, granted_qos):
"""
Simple callback on MQTT subscription
"""
ts_print("Subscribed with data: ({})".format(",".join(
[str(x) for x in [userdata, mid, granted_qos]]
)))
def on_message(client, userdata, message):
"""
Decode and print message pointData
"""
msg = str(message.payload.decode("utf-8").rstrip('\0'))
if args.show_raw_messages:
ts_print("raw message:", msg)
j = json.loads(msg)
if 'data' in j and 'pointData' in j['data']:
ts_print('pointData:',
process_point_data(j['data']['pointData']))
parser = argparse.ArgumentParser(
prog='ember-watch',
description='Watch EPH Ember MQTT "upload/pointdata" messages'
)
parser.add_argument(
"--email", type=str, required=True, help="Email Address for your account"
)
parser.add_argument(
'--password', type=str, default="", help="Password for your account"
)
parser.add_argument(
'--show-log', action='store_true', help="Show MQTT client log"
)
parser.add_argument(
'--show-raw-messages', action='store_true', help="Show undecoded messages"
)
args = parser.parse_args()
##################################################
# start
password = args.password
if not password:
password = getpass.getpass()
t = EphEmber(args.email, password)
POINTDATA_TOPIC_UPLOAD = "/".join([
t.get_home_details()['homes']['productId'],
t.get_home_details()['homes']['uid'],
"upload/pointdata"
])
st = t.messenger.start(
callbacks={
'on_connect': on_connect,
'on_log': on_log,
'on_message': on_message,
'on_subscribe': on_subscribe
}
)
while True:
t.messenger.client.loop()