Skip to content

Commit

Permalink
read serial data in chunk to prevent memory error
Browse files Browse the repository at this point in the history
  • Loading branch information
mchilli committed Jun 28, 2024
1 parent 4b17762 commit 20845d5
Showing 1 changed file with 96 additions and 65 deletions.
161 changes: 96 additions & 65 deletions circuitpython/code.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,22 @@

supervisor.runtime.autoreload = False

SETTINGSFILE = "settings.json" # The file in which the settings are saved
MACROFILE = "macros.json" # The file in which the macros are saved
# The file in which the settings are saved
SETTINGSFILE = "settings.json"
# The file in which the macros are saved
MACROFILE = "macros.json"

SETTINGS = {
"sleeptime": 2, # Time in seconds until the display turns off
"keyboardlayout": "us", # Supported keyboard layouts: br, cz, da, de, es, fr, hu, it, po, sw, tr, uk, us
"useunicodefont": False, # Use a unicode bitmap font, which will increas the initial load time!
"fliprotation": False, # Flips the rotation of the device by 180 degrees
"brightness": 0.1 # Set the LCD and LED Brightness
# Time in seconds until the display turns off
"sleeptime": 2,
# Supported keyboard layouts: br, cz, da, de, es, fr, hu, it, po, sw, tr, uk, us
"keyboardlayout": "us",
# Use a unicode bitmap font, which will increas the initial load time!
"useunicodefont": False,
# Flips the rotation of the device by 180 degrees
"fliprotation": False,
# Set the LCD and LED Brightness
"brightness": 0.1
}

try:
Expand Down Expand Up @@ -85,9 +92,13 @@

class MacroApp():
""" Main Class """

def __init__(self) -> None:
self.macropad = MacroPad(layout_class=KeyboardLayout, rotation=180 if SETTINGS["fliprotation"] else 0)

self.macropad = MacroPad(
layout_class=KeyboardLayout,
rotation=180 if SETTINGS["fliprotation"] else 0
)

self.macropad.display.auto_refresh = False
self.macropad.display.brightness = SETTINGS["brightness"]
self.macropad.display.root_group = displayio.Group()
Expand All @@ -97,6 +108,7 @@ def __init__(self) -> None:

self.readonly = storage.getmount('/').readonly
self.serial_data = usb_cdc.data
self.serial_buffer = ""
self.serial_last_state = False

self.macroStack = [self._init_macros()]
Expand Down Expand Up @@ -127,16 +139,16 @@ def _init_macros(self) -> list[dict]:
macros = json.load(f)
if isinstance(macros, list):
return {
"label": rootLabel,
"label": rootLabel,
"content": macros,
}
return macros
except OSError:
return {
"label": rootLabel,
"label": rootLabel,
"content": [],
}

def _save_macros(self) -> None:
""" store the macros in the macrofile
"""
Expand All @@ -148,17 +160,19 @@ def _save_macros(self) -> None:

def _init_group_label(self) -> dict[str, Key]:
group_label = Label(
font=load_font("/fonts/6x12.pcf") if SETTINGS["useunicodefont"] else terminalio.FONT,
text="",
padding_top=0,
padding_bottom=0,
padding_left=0,
padding_right=0,
color=0xFFFFFF,
anchored_position=(self.macropad.display.width // 2, self.macropad.display.height - 10),
anchor_point=(0.5, 0.0)
)

font=load_font(
"/fonts/6x12.pcf") if SETTINGS["useunicodefont"] else terminalio.FONT,
text="",
padding_top=0,
padding_bottom=0,
padding_left=0,
padding_right=0,
color=0xFFFFFF,
anchored_position=(self.macropad.display.width // 2,
self.macropad.display.height - 10),
anchor_point=(0.5, 0.0)
)

self.macropad.display.root_group.append(group_label)

return group_label
Expand All @@ -173,19 +187,20 @@ def _init_keys(self) -> list[Key]:

for i in range(self.macropad.keys.key_count):
label = Label(
font=load_font("/fonts/6x12.pcf") if SETTINGS["useunicodefont"] else terminalio.FONT,
text="",
padding_top=0,
padding_bottom=1,
padding_left=4,
padding_right=4,
color=0xFFFFFF,
anchored_position=(
(self.macropad.display.width - 2) / 2 * (i % 3) + 1,
self.macropad.display.height / 5 * (i // 3) + 2),
anchor_point=((i % 3) / 2, 0.0)
)

font=load_font(
"/fonts/6x12.pcf") if SETTINGS["useunicodefont"] else terminalio.FONT,
text="",
padding_top=0,
padding_bottom=1,
padding_left=4,
padding_right=4,
color=0xFFFFFF,
anchored_position=(
(self.macropad.display.width - 2) / 2 * (i % 3) + 1,
self.macropad.display.height / 5 * (i // 3) + 2),
anchor_point=((i % 3) / 2, 0.0)
)

keys.append(Key(self.macropad, i, label))
self.macropad.display.root_group.append(label)

Expand All @@ -198,7 +213,7 @@ def _init_group(self) -> None:

self._update_tab()

def run_macro(self, item:dict, *args) -> None:
def run_macro(self, item: dict, *args) -> None:
""" run the macro, can be:
Float (e.g. 0.25): delay in seconds
String (e.g. "Foo"): corresponding keys pressed & released
Expand All @@ -225,20 +240,23 @@ def run_macro(self, item:dict, *args) -> None:
key_name = key['kc'][1:] if key['kc'][:1] == "-" else key['kc']
key_code = getattr(Keycode, key_name.upper(), None)
if key_code:
if key['kc'][:1] != "-" :
if key['kc'][:1] != "-":
self.macropad.keyboard.press(key_code)
else:
self.macropad.keyboard.release(key_code)
if 'ccc' in key:
control_code = getattr(ConsumerControlCode, key['ccc'].upper(), None)
control_code = getattr(
ConsumerControlCode, key['ccc'].upper(), None)
if control_code:
self.macropad.consumer_control.press(control_code)
self.macropad.consumer_control.release()
if 'tone' in key:
self.macropad.play_tone(key['tone']['frequency'], key['tone']['duration'])
self.macropad.play_tone(
key['tone']['frequency'], key['tone']['duration'])
if 'mse' in key:
if "b" in key["mse"]:
btn = getattr(Mouse, f"{key['mse']['b'].upper()}_BUTTON", None)
btn = getattr(
Mouse, f"{key['mse']['b'].upper()}_BUTTON", None)
if btn:
self.macropad.mouse.click(btn)
self.macropad.mouse.move(
Expand All @@ -249,11 +267,11 @@ def run_macro(self, item:dict, *args) -> None:
method = getattr(System, key['sys'], None)
if method:
method(self)

self.macropad.keyboard.release_all()
self.macropad.mouse.release_all()

def open_group(self, item:dict, *args) -> None:
def open_group(self, item: dict, *args) -> None:
""" open a group
Args:
Expand Down Expand Up @@ -284,16 +302,17 @@ def _update_tab(self) -> None:

for i, item in enumerate(self.macroStack[-1]["content"][:self.macropad.keys.key_count]):
self.keys[i].type = item["type"]
self.keys[i].label = "" if item["type"] == "blank" else item["label"]
self.keys[i].color = (0, 0, 0) if item["type"] == "blank" else item["color"]
self.keys[i].label = "" if item["type"] == "blank" else item["label"]
self.keys[i].color = (
0, 0, 0) if item["type"] == "blank" else item["color"]
self.keys[i].set_func(self._get_key_func(item["type"]), item)

self.group_label.text = self.macroStack[-1]["label"]

for key in self.keys:
key.update_colors()

def _get_key_func(self, type:str) -> function:
def _get_key_func(self, type: str) -> function:
""" get the specific function for the type
Args:
Expand All @@ -313,12 +332,14 @@ def _update_encoder_macros(self) -> None:
""" update the rotary encoder macros defined for opened group
"""
self.encoder.update_encoder_macros(
on_switch = self.macroStack[-1].get("encoder", {}).get("switch"),
on_increased = self.macroStack[-1].get("encoder", {}).get("increased"),
on_decreased = self.macroStack[-1].get("encoder", {}).get("decreased")
on_switch=self.macroStack[-1].get("encoder", {}).get("switch"),
on_increased=self.macroStack[-1].get("encoder",
{}).get("increased"),
on_decreased=self.macroStack[-1].get("encoder",
{}).get("decreased")
)

def _handle_serial_data(self, payload:str) -> dict:
def _handle_serial_data(self, payload: str) -> dict:
""" handle the data comming over the serial connection
Args:
Expand All @@ -341,12 +362,12 @@ def _handle_serial_data(self, payload:str) -> dict:
response['ACK'] = 'settings'
response['CONTENT'] = SETTINGS
return response

elif command == 'set_settings':
if 'content' not in payload.keys():
response['ERR'] = 'No content: %s' % payload
return response

content = payload['content']

if self._save_settings(content):
Expand All @@ -360,54 +381,54 @@ def _handle_serial_data(self, payload:str) -> dict:
response['ACK'] = 'macros'
response['CONTENT'] = self.macroStack[0]
return response

elif command == 'set_macros':
if 'content' not in payload.keys():
response['ERR'] = 'No content: %s' % payload
return response

content = payload['content']
self.macroStack = [content]
self._display_on()
self._init_group()

response['ACK'] = 'Macros received'
return response

elif command == 'save_macros':
if self._save_macros():
response['ACK'] = 'Macros stored'
else:
response['ERR'] = 'Cannot store macros because USB storage is enabled'

return response

elif command == 'enable_usb':
System.enable_usb()

response['ACK'] = 'Enable USB'
return response

elif command == 'soft_reset':
System.soft_reset()

response['ACK'] = 'Softreset'
return response

elif command == 'hard_reset':
System.hard_reset()

response['ACK'] = 'Hardreset'
return response

else:
response['ERR'] = 'Unkown command: %s' % command
return response
except Exception as e:
response['ERR'] = str(e)
return response

def _send_serial_data(self, payload:dict) -> None:
def _send_serial_data(self, payload: dict) -> None:
""" prepare and send data over serial connection
Args:
Expand Down Expand Up @@ -437,12 +458,20 @@ def start(self) -> None:
if self.serial_last_state != self.serial_data.connected:
self.serial_last_state = self.serial_data.connected
if self.serial_data.connected:
self._send_serial_data({'ACK': 'usbenabled', 'CONTENT': self.readonly })
self._send_serial_data(
{'ACK': 'usbenabled', 'CONTENT': self.readonly})

if self.serial_data.connected:
if self.serial_data.in_waiting > 0:
data = self.serial_data.readline()
self._send_serial_data(self._handle_serial_data(data.decode("utf-8").strip()))
while self.serial_data.in_waiting:
chunk = self.serial_data.read(
self.serial_data.in_waiting)
self.serial_buffer += chunk.decode("utf-8")

if self.serial_buffer.endswith("\n"):
self._send_serial_data(
self._handle_serial_data(self.serial_buffer[:-1]))
self.serial_buffer = ""

# get key events, so no inputs will be stored during connection
# self.macropad.keys.events.get()
Expand All @@ -451,7 +480,8 @@ def start(self) -> None:
key_event = self.macropad.keys.events.get()
if key_event:
self._display_on()
self.keys[key_event.key_number].pressed = True if key_event.pressed and not any([key.pressed for key in self.keys]) else False
self.keys[key_event.key_number].pressed = True if key_event.pressed and not any(
[key.pressed for key in self.keys]) else False

if self.encoder.switch and self.encoder.on_switch:
self._display_on()
Expand All @@ -469,5 +499,6 @@ def start(self) -> None:
"content": self.encoder.on_decreased
})


app = MacroApp()
app.start()
app.start()

0 comments on commit 20845d5

Please sign in to comment.