This repository has been archived by the owner on Sep 11, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathdso138mini.py
111 lines (84 loc) · 2.88 KB
/
dso138mini.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
#!/usr/bin/env python
import json
import logging
import threading
import time
from usblib import device_from_fd
from usblib import shell_usbdevice
from usblib import CP210xSerial
LOGGER = logging.getLogger(__name__)
# ----------------------------------------------------------------------------
def main(fd, debug=False):
device = device_from_fd(fd)
if debug:
shell_usbdevice(fd, device)
print("\n", "#" * 40, "\n")
print(device)
assert device.idVendor == 0x10C4 and device.idProduct == 0xEA60
ser = CP210xSerial(device, baudRate=115200)
try:
ser.open(_async=True)
data = grab_data(ser)
fn = "dso138mini.grab.json"
with open(fn, "w") as fp:
json.dump(data, fp, indent=2)
finally:
ser.close()
def grab_data(ser):
delay = 5000.0 / 1000.0
header = None
transfers = list()
LOGGER.info("Trying to grab header for 30 sec ...")
try:
data = ser.read(16 * 1024, 30.0)
if data:
text = data.decode("utf-8").strip()
header = text.splitlines()
except KeyboardInterrupt:
pass
LOGGER.info("Waiting for dumps ...")
while True:
try:
print(" Waiting", end="", flush=True)
while not ser._buf_in:
with ser._buf_in.changed:
notified = ser._buf_in.changed.wait(delay)
print(".", end="", flush=True)
if not notified:
continue
print()
# consume all to finish?
# on mismatch?
meta, rows = dict(), list()
for i in range(19 + 1024):
line = ser.read_until(b"\n", -1, 1.0)
line = line.decode("utf-8").rstrip()
if i < 19:
key, value = line.split(",")
key, value = key.strip(), value.strip()
meta[key] = value
else:
idx, x, y = line.split(",")
x, y = x.strip(), y.strip()
x, y = int(x), float(y)
rows.append((x, y))
transfers.append({"meta": meta, "data": rows})
LOGGER.info("Got record.")
except KeyboardInterrupt:
break
return {"header": header, "transfers": transfers}
if __name__ == "__main__":
# https://wiki.termux.com/wiki/Termux-usb
logging.basicConfig(
level=logging.INFO, format="[%(levelname).1s] %(name)s: %(message)s"
)
logging.getLogger(__name__).setLevel(logging.DEBUG)
logging.getLogger("usblib").setLevel(logging.DEBUG)
logging.getLogger("usblib.RXTX").setLevel(logging.INFO)
logging.getLogger("usb").setLevel(logging.DEBUG)
# grab fd number from args
# (from termux wrapper)
import sys
LOGGER.debug("args: %s", sys.argv)
fd = int(sys.argv[1])
main(fd)