-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrn2483.py
146 lines (121 loc) · 4.24 KB
/
rn2483.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
from dataclasses import dataclass
from serial import Serial
from typing import Self
import json
BAUDRATE: int = 57600
@dataclass
class Parameters:
"""RN2483 configuration parameters."""
modulation: str = "lora"
frequency: int = 433050000
txpwr: int = 15
spread_factor: int = 7
coding_rate: int = 8 # 5 - 8 (becomes denominator of the coding rate fraction)
bandwidth: int = 500
crc: bool = True
iqi: bool = False
sync: int = 67 # In decimal
preamble: int = 6
@classmethod
def from_json(cls, path: str) -> Self:
with open(path, "r") as file:
config = json.load(file)
return cls(
modulation=config["modulation"],
frequency=config["frequency"],
txpwr=config["txpwr"],
spread_factor=config["spread_factor"],
coding_rate=config["coding_rate"],
bandwidth=config["bandwidth"],
crc=config["crc"],
iqi=config["iqi"],
sync=config["sync"],
preamble=config["preamble"],
)
class Radio:
"""Represents the RN2483 radio module."""
def __init__(self, port: Serial, params: Parameters):
self.port = port
self.params = params
def _write(self, data: str) -> None:
"""Write data to the radio."""
cmd = f"{data}\r\n"
self.port.flush()
self.port.write(cmd.encode("utf-8"))
def _wait_for_ok(self) -> bool:
"""Returns true for okay, false otherwise."""
line = self.port.readline()
return "ok" in str(line)
def _mac_pause(self) -> bool:
"""Pause the MAC layer."""
self._write("mac pause")
line = self.port.readline()
return "4294967245" in str(line)
def receive(self) -> str:
if not self._mac_pause():
return ""
self._write("radio rx 0")
if not self._wait_for_ok():
return ""
message = self.port.readline().decode("ascii")[8:].strip()
if len(message) == 0:
return ""
message = bytes.fromhex(message).decode("ascii")
return message
def transmit(self, data: str) -> bool:
if not self._mac_pause():
return False
# Encode data to string of hex TODO
data_str = ""
for b in data.encode("ascii"):
data_str += hex(b)[2:]
self._write(f"radio tx {data_str}")
if not self._wait_for_ok():
return False
# Wait for response of successful TX
line = ""
while "radio_tx_ok" not in line:
line = str(self.port.readline())
return True
def configure(self) -> bool:
"""Configure the radio parameters."""
self._write(f"radio set mod {self.params.modulation}")
if not self._wait_for_ok():
return False
self._write(f"radio set freq {self.params.frequency}")
if not self._wait_for_ok():
return False
self._write(f"radio set pwr {self.params.txpwr}")
if not self._wait_for_ok():
return False
self._write(f"radio set sf sf{self.params.spread_factor}")
if not self._wait_for_ok():
return False
self._write(f"radio set cr 4/{self.params.coding_rate}")
if not self._wait_for_ok():
return False
self._write(f"radio set bw {self.params.bandwidth}")
if not self._wait_for_ok():
return False
self._write(f"radio set crc {'on' if self.params.crc else 'off'}")
if not self._wait_for_ok():
return False
self._write(f"radio set iqi {'on' if self.params.iqi else 'off'}")
if not self._wait_for_ok():
return False
self._write(f"radio set sync {str(hex(self.params.sync))[2:]}")
if not self._wait_for_ok():
return False
self._write(f"radio set prlen {self.params.preamble}")
if not self._wait_for_ok():
return False
return True
def snr(self) -> str:
self._write("radio get snr")
return str(self.port.readline())
@classmethod
def from_port(cls, port: str, params: Parameters) -> Self:
return cls(
port=Serial(port=port, baudrate=BAUDRATE),
params=params,
)