From 331b09a484a4fc7d7780eb7d82d4baa49067c35d Mon Sep 17 00:00:00 2001 From: yozik04 Date: Thu, 6 Jul 2023 01:06:53 +0300 Subject: [PATCH 1/3] Retry ws requests 4 additional times --- tests/test_client.py | 34 ++++++++++++++++++++++- vallox_websocket_api/client.py | 49 +++++++++++++++++++++++++++++----- 2 files changed, 76 insertions(+), 7 deletions(-) diff --git a/tests/test_client.py b/tests/test_client.py index 878be42..51d484d 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -1,7 +1,9 @@ +import asyncio import binascii -from unittest.mock import AsyncMock +from unittest.mock import AsyncMock, patch import pytest +import websockets from websockets.exceptions import InvalidMessage from vallox_websocket_api.client import Client @@ -110,3 +112,33 @@ async def test_set_new_settable_address_by_address_exception(client: Client, ws) with pytest.raises(ValloxWebsocketException): await client.set_values({"A_CYC_RH_VALUE": 22}) + + assert ws.send.call_count == 5 + + +async def test_connection_closed_ws_exception(client: Client, ws): + ws.recv.side_effect = AsyncMock(side_effect=websockets.ConnectionClosed(None, None)) + + with pytest.raises(ValloxWebsocketException): + await client.fetch_metric("A_CYC_ENABLED") + + assert ws.send.call_count == 5 + + +async def test_ws_timeout_exception(client: Client, ws): + ws.recv.side_effect = AsyncMock(side_effect=asyncio.TimeoutError()) + + with pytest.raises(ValloxWebsocketException): + await client.fetch_metric("A_CYC_ENABLED") + + assert ws.send.call_count == 5 + + +async def test_invalid_ws_url_exception(client: Client): + with patch("websockets.connect") as connect: + connect.side_effect = websockets.InvalidURI("test", "test") + + with pytest.raises(ValloxWebsocketException): + await client.fetch_metric("A_CYC_ENABLED") + + assert connect.call_count == 1 diff --git a/vallox_websocket_api/client.py b/vallox_websocket_api/client.py index 66faa50..9eed897 100644 --- a/vallox_websocket_api/client.py +++ b/vallox_websocket_api/client.py @@ -1,5 +1,6 @@ import asyncio from functools import wraps +import logging import re from typing import Any, Callable, Dict, List, Optional, Tuple, TypeVar, Union, cast @@ -20,8 +21,14 @@ WriteMessageRequest, ) +logger = logging.getLogger("vallox").getChild(__name__) + KPageSize = 65536 +WEBSOCKETS_OPEN_TIMEOUT = 1 +WEBSOCKETS_RECV_TIMEOUT = 1 +WEBSOCKET_RETRY_DELAYS = [0.1, 0.2, 0.5, 1] + def calculate_offset(aIndex: int) -> int: offset = 0 @@ -147,10 +154,27 @@ def to_kelvin(value: float) -> int: def _websocket_exception_handler(request_fn: FuncT) -> FuncT: + retry_on_exceptions = ( + websockets.InvalidHandshake, + websockets.InvalidState, + websockets.WebSocketProtocolError, + websockets.ConnectionClosed, + OSError, + asyncio.TimeoutError, + ) + @wraps(request_fn) async def wrapped(*args: Any, **kwargs: Any) -> Any: try: - return await request_fn(*args, **kwargs) + delays = WEBSOCKET_RETRY_DELAYS.copy() + while len(delays) >= 0: + try: + return await request_fn(*args, **kwargs) + except Exception as e: + if isinstance(e, retry_on_exceptions) and len(delays) > 0: + await asyncio.sleep(delays.pop(0)) + else: + raise e except websockets.InvalidHandshake as e: raise ValloxWebsocketException("Websocket handshake failed") from e except websockets.InvalidURI as e: @@ -161,8 +185,12 @@ async def wrapped(*args: Any, **kwargs: Any) -> Any: raise ValloxWebsocketException("Websocket invalid state") from e except websockets.WebSocketProtocolError as e: raise ValloxWebsocketException("Websocket protocol error") from e + except websockets.ConnectionClosed as e: + raise ValloxWebsocketException("Websocket connection closed") from e except OSError as e: raise ValloxWebsocketException("Websocket connection failed") from e + except asyncio.TimeoutError as e: + raise ValloxWebsocketException("Websocket connection timed out") from e return cast(FuncT, wrapped) @@ -234,18 +262,27 @@ def _encode_pair( @_websocket_exception_handler async def _websocket_request(self, payload: bytes) -> bytes: - async with websockets.connect(f"ws://{self.ip_address}/") as ws: + async with websockets.connect( + f"ws://{self.ip_address}/", + open_timeout=WEBSOCKETS_OPEN_TIMEOUT, + logger=logger, + ) as ws: await ws.send(payload) - r: bytes = await ws.recv() - return r + task = asyncio.create_task(ws.recv()) + return await asyncio.wait_for(task, timeout=WEBSOCKETS_RECV_TIMEOUT) @_websocket_exception_handler async def _websocket_request_multiple( self, payload: bytes, read_packets: int ) -> List[bytes]: - async with websockets.connect(f"ws://{self.ip_address}/") as ws: + async with websockets.connect( + f"ws://{self.ip_address}/", + open_timeout=WEBSOCKETS_OPEN_TIMEOUT, + logger=logger, + ) as ws: await ws.send(payload) - return await asyncio.gather(*[ws.recv() for _ in range(0, read_packets)]) + tasks = asyncio.gather(*[ws.recv() for _ in range(0, read_packets)]) + return await asyncio.wait_for(tasks, timeout=WEBSOCKETS_RECV_TIMEOUT) async def fetch_metrics( self, metric_keys: Optional[List[str]] = None From 2d3e2d9aa14afbfc7a5b6198b2788bc2629853f8 Mon Sep 17 00:00:00 2001 From: yozik04 Date: Thu, 6 Jul 2023 01:16:20 +0300 Subject: [PATCH 2/3] One more test and version --- tests/test_client.py | 12 +++++++++++- vallox_websocket_api/__init__.py | 2 +- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/tests/test_client.py b/tests/test_client.py index 51d484d..d8ba2ef 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -125,7 +125,7 @@ async def test_connection_closed_ws_exception(client: Client, ws): assert ws.send.call_count == 5 -async def test_ws_timeout_exception(client: Client, ws): +async def test_ws_recv_timeout_exception(client: Client, ws): ws.recv.side_effect = AsyncMock(side_effect=asyncio.TimeoutError()) with pytest.raises(ValloxWebsocketException): @@ -142,3 +142,13 @@ async def test_invalid_ws_url_exception(client: Client): await client.fetch_metric("A_CYC_ENABLED") assert connect.call_count == 1 + + +async def test_ws_connection_timeout_exception(client: Client): + with patch("websockets.connect") as connect: + connect.side_effect = asyncio.TimeoutError() + + with pytest.raises(ValloxWebsocketException): + await client.fetch_metric("A_CYC_ENABLED") + + assert connect.call_count == 5 diff --git a/vallox_websocket_api/__init__.py b/vallox_websocket_api/__init__.py index 1bd00fd..dc97849 100644 --- a/vallox_websocket_api/__init__.py +++ b/vallox_websocket_api/__init__.py @@ -24,4 +24,4 @@ "ValloxWebsocketException", ] -__version__ = "3.2.1" +__version__ = "3.3.0" From 387ddf931ab032b941dbed22f0d318b3e93cbc4b Mon Sep 17 00:00:00 2001 From: yozik04 Date: Thu, 6 Jul 2023 19:19:31 +0300 Subject: [PATCH 3/3] Read multiple fixes --- vallox_websocket_api/client.py | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/vallox_websocket_api/client.py b/vallox_websocket_api/client.py index 9eed897..69c3282 100644 --- a/vallox_websocket_api/client.py +++ b/vallox_websocket_api/client.py @@ -153,7 +153,7 @@ def to_kelvin(value: float) -> int: FuncT = TypeVar("FuncT", bound=Callable[..., Any]) -def _websocket_exception_handler(request_fn: FuncT) -> FuncT: +def _websocket_retry_wrapper(request_fn: FuncT) -> FuncT: retry_on_exceptions = ( websockets.InvalidHandshake, websockets.InvalidState, @@ -260,18 +260,10 @@ def _encode_pair( return address, raw_value - @_websocket_exception_handler async def _websocket_request(self, payload: bytes) -> bytes: - async with websockets.connect( - f"ws://{self.ip_address}/", - open_timeout=WEBSOCKETS_OPEN_TIMEOUT, - logger=logger, - ) as ws: - await ws.send(payload) - task = asyncio.create_task(ws.recv()) - return await asyncio.wait_for(task, timeout=WEBSOCKETS_RECV_TIMEOUT) + return (await self._websocket_request_multiple(payload, 1))[0] - @_websocket_exception_handler + @_websocket_retry_wrapper async def _websocket_request_multiple( self, payload: bytes, read_packets: int ) -> List[bytes]: @@ -281,8 +273,13 @@ async def _websocket_request_multiple( logger=logger, ) as ws: await ws.send(payload) - tasks = asyncio.gather(*[ws.recv() for _ in range(0, read_packets)]) - return await asyncio.wait_for(tasks, timeout=WEBSOCKETS_RECV_TIMEOUT) + + async def _get_responses() -> List[bytes]: + return [await ws.recv() for _ in range(0, read_packets)] + + return await asyncio.wait_for( + _get_responses(), timeout=WEBSOCKETS_RECV_TIMEOUT * read_packets + ) async def fetch_metrics( self, metric_keys: Optional[List[str]] = None