Skip to content

Commit

Permalink
Add retry logic to recieving messages
Browse files Browse the repository at this point in the history
  • Loading branch information
CodeFoodPixels committed Sep 28, 2023
1 parent 40480cc commit bea3817
Showing 1 changed file with 28 additions and 10 deletions.
38 changes: 28 additions & 10 deletions custom_components/robovac/tuyalocalapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,10 @@ class RequestResponseCommandMismatch(TuyaException):
"""The command in the response didn't match the one from the request."""


class ResponseTimeoutException(TuyaException):
"""Did not recieve a response to the request within the timeout"""


class TuyaCipher:
"""Tuya cryptographic helpers."""

Expand Down Expand Up @@ -445,11 +449,11 @@ def __init__(self, command, payload=None, sequence=None, encrypt_for=None):
payload = b""
self.payload = payload
self.command = command
self.original_sequence = sequence
if sequence is None:
# Use millisecond process time as the sequence number. Not ideal,
# but good for one month's continuous connection time though.
sequence = int(time.perf_counter() * 1000) & 0xFFFFFFFF
self.sequence = sequence
self.set_sequence()
else:
self.sequence = sequence
self.encrypt = False
self.device = None
if encrypt_for is not None:
Expand All @@ -465,6 +469,9 @@ def __repr__(self):
"<Device {}>".format(self.device) if self.device else None,
)

def set_sequence(self):
self.sequence = int(time.perf_counter() * 1000) & 0xFFFFFFFF

def hex(self):
return self.bytes().hex()

Expand Down Expand Up @@ -496,21 +503,32 @@ def bytes(self):

__bytes__ = bytes

async def async_send(self, device):
async def async_send(self, device, retries=4):
device._listeners[self.sequence] = asyncio.Semaphore(0)
await device._async_send(self)
try:
await asyncio.wait_for(
device._listeners[self.sequence].acquire(), timeout=device.timeout
)
except:
del device._listeners[self.sequence]
if retries == 0:
raise ResponseTimeoutException(
"Timed out waiting for response to sequence number {}".format(
self.sequence
)
)

_LOGGER.debug(
"Timed out waiting for response to sequence number {}".format(
"Timed out waiting for response to sequence number {}. Retrying".format(
self.sequence
)
)
del device._listeners[self.sequence]
raise

if self.original_sequence is None:
self.set_sequence()

return self.async_send(device, retries - 1)

return device._listeners.pop(self.sequence)

Expand Down Expand Up @@ -734,9 +752,9 @@ async def _async_handle_message(self):
response_data = await self.reader.readuntil(MAGIC_SUFFIX_BYTES)
message = Message.from_bytes(response_data, self.cipher)
except InvalidMessage as e:
_LOGGER.error("Invalid message from {}: {}".format(self, e))
_LOGGER.debug("Invalid message from {}: {}".format(self, e))
except MessageDecodeFailed as e:
_LOGGER.error("Failed to decrypt message from {}".format(self))
_LOGGER.debug("Failed to decrypt message from {}".format(self))
else:
_LOGGER.debug("Received message from {}: {}".format(self, message))
if message.sequence in self._listeners:
Expand Down

0 comments on commit bea3817

Please sign in to comment.