diff --git a/README.md b/README.md index 12ad355..93c4b2d 100644 --- a/README.md +++ b/README.md @@ -36,6 +36,15 @@ Activity logs will be output to a `logs` subdirectory. Logging granularity can b AbletonOSC listens for OSC messages on port **11000**, and sends replies on port **11001**. Replies will be sent to the same IP as the originating message. When querying properties, OSC wildcard patterns can be used; for example, `/live/clip/get/* 0 0` will query all the properties of track 0, clip 0. +When the data to be sent exceeds the maximum size limit of one OSC message, it will be divided into smaller chunks. Each chunk ends with these 4 information + +- chunk index +- total chunks +- message id +- delimiter '#\$#' + +This situation may occur, for example, with commands such as `/live/song/get/track_data` or `/live/clip/get/notes`. + ## Application API
diff --git a/abletonosc/osc_server.py b/abletonosc/osc_server.py index c781f81..a19db37 100644 --- a/abletonosc/osc_server.py +++ b/abletonosc/osc_server.py @@ -1,3 +1,4 @@ +import sys from typing import Tuple, Any, Callable from .constants import OSC_LISTEN_PORT, OSC_RESPONSE_PORT from ..pythonosc.osc_message import OscMessage, ParseError @@ -8,6 +9,7 @@ import socket import logging import traceback +import random class OSCServer: def __init__(self, @@ -40,6 +42,8 @@ def __init__(self, self.logger = logging.getLogger("abletonosc") self.logger.info("Starting OSC server (local %s, response port %d)", str(self._local_addr), self._response_port) + max_chunk_size = self._socket.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF) + self.logger.info(f"Socket message size limit: {max_chunk_size} bytes") def add_handler(self, address: str, handler: Callable) -> None: """ @@ -71,17 +75,70 @@ def send(self, remote_addr: The remote address to send to, as a 2-tuple (hostname, port). If None, uses the default remote address. """ - msg_builder = OscMessageBuilder(address) - for param in params: - msg_builder.add_arg(param) - - try: - msg = msg_builder.build() - if remote_addr is None: - remote_addr = self._remote_addr - self._socket.sendto(msg.dgram, remote_addr) - except BuildError: - self.logger.error("AbletonOSC: OSC build error: %s" % (traceback.format_exc())) + chunks = [] + max_chunk_size = min(self._socket.getsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF), 47000) + self.logger.debug(f"Max chunk size {max_chunk_size}") + # calculate total size of params in bytes + total_size = sys.getsizeof(params) + for item in params: + total_size += sys.getsizeof(item) + + if total_size <= max_chunk_size: + # the whole params fit in 1 chunk + chunks = [params] + else: + # Split the data into smaller chunks to avoid the "Message Too Long" error + # If the data is split into chunks, the last 4 pieces of information of a chunk are + # chunk index, total chunks, message id, '#$#' + current_chunk = [] + current_size = 0 + + for item in params: + item_size = sys.getsizeof(item) + + if current_size + item_size <= max_chunk_size: + current_chunk.append(item) + current_size += item_size + else: + chunks.append(tuple(current_chunk)) + current_chunk = [item] + current_size = item_size + + # Add the last chunk if it is not empty + if current_chunk: + chunks.append(tuple(current_chunk)) + + total_chunks = len(chunks) + updated_chunks = [] + self.logger.debug(f"total chunks {total_chunks}") + + if total_chunks > 1: + msg_id = random.randint(1, 127) + self.logger.info(f"Long message, split into {total_chunks} chunks before sending. msg id {msg_id}") + + for index, chunk in enumerate(chunks): + updated_chunk = chunk + (index, total_chunks, msg_id, '#$#') + self.logger.debug(f"updated chunk {updated_chunk}") + + updated_chunks.append(updated_chunk) + + self.logger.debug(f"updated chunks {updated_chunks}") + # Select chunks to iterate based on the total number of chunks + chunks_to_iterate = updated_chunks if total_chunks > 1 else chunks + self.logger.debug(f"chunks to iterate {chunks_to_iterate}") + + for chunk in chunks_to_iterate: + msg_builder = OscMessageBuilder(address) + for param in chunk: + msg_builder.add_arg(param) + + try: + msg = msg_builder.build() + if remote_addr is None: + remote_addr = self._remote_addr + self._socket.sendto(msg.dgram, remote_addr) + except BuildError: + self.logger.error("AbletonOSC: OSC build error: %s" % (traceback.format_exc())) def process(self) -> None: """ diff --git a/client/client.py b/client/client.py index 27f8135..185a762 100644 --- a/client/client.py +++ b/client/client.py @@ -96,20 +96,48 @@ def query(self, address: str, params: tuple = (), timeout: float = TICK_DURATION): - rv = None + rv = () _event = threading.Event() + data_sent_in_chunks = False + msg_id = 0 + n_chunk_received = 0 + total_chunks = 0 def received_response(params): nonlocal rv nonlocal _event - rv = params - _event.set() + nonlocal data_sent_in_chunks + nonlocal msg_id + nonlocal n_chunk_received + nonlocal total_chunks + + rv += tuple(params) + + delimiter = '#$#' + # If response data was sent in chunks due to socket size limit, + # the last 4 pieces of information of a chunk are + # chunk index, total chunks, message id, '#$#' + + if params and params[-1] == delimiter: + data_sent_in_chunks = True + msg_id = params[-2] + total_chunks = params[-3] + chunk_id = params[-4] + n_chunk_received += 1 + + # Assuming that consecutive chunks received within the timeout have the same message ID + # TODO: Handle chunks with different message IDs + if not data_sent_in_chunks or (n_chunk_received == total_chunks): + _event.set() self.add_handler(address, received_response) self.send_message(address, params) _event.wait(timeout) self.remove_handler(address) if not _event.is_set(): + if data_sent_in_chunks: + print(f"Timeout! Recevied {n_chunk_received} / {total_chunks} chunks.") + print("Try lowering max_chunk_size in send() in osc_server.py if problem persists") raise RuntimeError("No response received to query: %s" % address) return rv