diff --git a/VERSION b/VERSION index 6acdb44..26e3379 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2.0.3 \ No newline at end of file +2.0.4 \ No newline at end of file diff --git a/rogerthat/app/rogerthat.py b/rogerthat/app/rogerthat.py index 65ae333..bbaca81 100644 --- a/rogerthat/app/rogerthat.py +++ b/rogerthat/app/rogerthat.py @@ -24,14 +24,29 @@ def __init__(self): self._ev_loop = None self._serv_task = None + def ensure_future(self, coro, with_timeout=None, *args, **kwargs): + return safe_ensure_future( + coro, + with_timeout=with_timeout, + loop=self._ev_loop, + *args, + **kwargs + ) + + def call_soon_threadsafe(self, *args, **kwargs): + return self._ev_loop.call_soon_threadsafe(*args, **kwargs) + + async def async_run_in_executor(self, *args, **kwargs): + return await self._ev_loop.run_in_executor(*args, **kwargs) + async def Initialise(self): - logger.info("Initialising database.") + logger.debug("Initialising database.") db_started = await database_init.initialise() if not db_started: await asyncio.sleep(0.1) self.shutdown() return - logger.info("Finished initialising database.") + logger.debug("Finished initialising database.") logger.info(splash_msg) def _signal_handler(self, *_): # noqa: N803 @@ -73,12 +88,13 @@ async def exit_loop(self): def shutdown(self): logger.info("Stopping RogerThat Server.") self.shutdown_event.set() - safe_ensure_future(self.exit_loop(), loop=self._ev_loop) + self.ensure_future(self.exit_loop()) def start_queues(self): self._request_queue = request_processing_queue.get_instance() - logger.info("Starting Broadcast Queues.") + logger.debug("Starting Broadcast Queues.") self._mqtt_queue = mqtt_queue.get_instance() + self._mqtt_queue.start() def start_server(self): logger.info(f"RogerThat v{Config.get_inst().version} starting.") diff --git a/rogerthat/db/database_init.py b/rogerthat/db/database_init.py index 20dca81..0fd0d8c 100644 --- a/rogerthat/db/database_init.py +++ b/rogerthat/db/database_init.py @@ -28,22 +28,22 @@ class database_init(): @classmethod async def create_db(cls): - logger.info("Database does not exist yet, creating.") + logger.warning("Database does not exist yet, creating.") async with db_engine.db().engine_root.connect() as conn: await conn.execute(text(f"CREATE DATABASE {Config.get_inst().database_name}")) return True @classmethod async def create_tables(cls): - logger.info("Creating new or missing db tables.") + logger.debug("Creating new or missing db tables.") async with db_engine.db().engine.begin() as conn: await conn.run_sync(cls._meta.create_all) - logger.info("Done creating tables.") + logger.info("Done creating missing database tables.") return True @classmethod async def initialise(cls): - logger.info("Database init.") + logger.debug("Database init.") try: await cls.create_tables() except Exception: @@ -55,17 +55,17 @@ async def initialise(cls): logger.error("Failed to connect to SQL database. Check host and port.") return None - logger.info("Checking alembic.") + logger.debug("Checking alembic.") try: current_revision = fetch_alembic_revision() except Exception as e: logger.error(f"Alembic exception: {e}") current_revision = None if not current_revision: - logger.info("First time init, stamping revision.") + logger.warning("First time init, stamping revision.") alembic_cmd.stamp(cls._alembic_cfg, "head") else: - logger.info("Running alembic migration.") + logger.warning("Running alembic migration.") alembic_cmd.upgrade(cls._alembic_cfg, "head") return True diff --git a/rogerthat/mqtt/mqtt.py b/rogerthat/mqtt/mqtt.py index e426461..9aaf93d 100644 --- a/rogerthat/mqtt/mqtt.py +++ b/rogerthat/mqtt/mqtt.py @@ -1,10 +1,13 @@ #!/usr/bin/env python +import asyncio +import threading from typing import TYPE_CHECKING from commlib.node import Node from commlib.transports.mqtt import ConnectionParameters as MQTTConnectionParameters +from rogerthat.app.delegate import App from rogerthat.config.config import Config from rogerthat.logging.configure import AsyncioLogger from rogerthat.mqtt.messages import TradingviewMessage @@ -21,6 +24,7 @@ class MQTTPublisher: def __init__(self, topic: str, mqtt_node: Node): + self._initial_connection_completed = False self._node = mqtt_node @@ -29,11 +33,28 @@ def __init__(self, self.publisher = self._node.create_publisher( topic=self._topic, msg_type=TradingviewMessage ) + self.publisher.run() + + @property + def is_ready(self): + return self._initial_connection_completed + + @property + def is_connected(self): + is_connected = self.publisher._transport.is_connected + + if not self._initial_connection_completed and is_connected: + self._initial_connection_completed = True + + return is_connected def broadcast(self, event: "tradingview_event"): logger.debug(f"Broadcasting MQTT event on {self._topic}: {event}") self.publisher.publish(event) + def __del__(self): + self.publisher.stop() + class MQTTGateway(Node): NODE_NAME = "$APP.$UID" @@ -41,6 +62,12 @@ class MQTTGateway(Node): def __init__(self, *args, **kwargs): + self._initial_connection_completed = False + self._health = False + self._gateway_ready = asyncio.Event() + self._stop_event_async = asyncio.Event() + self._restart_heartbeat_event_async = asyncio.Event() + self.mqtt_publisher = None self.HEARTBEAT_URI = f"{Config.get_inst().app_name}/{Config.get_inst().mqtt_instance_name}/hb" @@ -60,9 +87,17 @@ def __init__(self, **kwargs ) + @property + def health(self): + return self._health + + async def async_set_ready(self): + await asyncio.sleep(5) + self._gateway_ready.set() + def get_publisher_for(self, topic: str): if topic not in self._topic_publishers: - logger.info(f"Starting MQTT Publisher for {topic}") + logger.debug(f"Starting MQTT Publisher for {topic}") self._topic_publishers[topic] = MQTTPublisher(topic=topic, mqtt_node=self) return self._topic_publishers[topic] @@ -74,3 +109,98 @@ def _create_mqtt_params_from_conf(self): password=Config.get_inst().mqtt_password, ssl=Config.get_inst().mqtt_ssl ) + + def _start_health_monitoring_loop(self): + if threading.current_thread() != threading.main_thread(): # pragma: no cover + App.get_instance().call_soon_threadsafe(self._start_health_monitoring_loop) + return + self._stop_event_async.clear() + App.get_instance().ensure_future(self._monitor_health_loop()) + + async def _restart_heartbeat(self): + await asyncio.sleep(3) + + if not self._hb_thread._heartbeat_pub._transport.is_connected: + logger.warning("Restarting heartbeat thread.") + self._hb_thread.stop() + await asyncio.sleep(3) + + try: + self._init_heartbeat_thread() + await asyncio.sleep(5) + logger.warning("Heartbeat thread restarted.") + + except Exception: + await asyncio.sleep(5) + + self._restart_heartbeat_event_async.clear() + + def _check_connections(self) -> bool: + connected = True + + # Check heartbeat + if ( + not self._hb_thread or + self._hb_thread.stopped() or + not self._hb_thread._heartbeat_pub._transport.is_connected + ): + if ( + self._initial_connection_completed and + not self._restart_heartbeat_event_async.is_set() and + self._hb_thread and ( + self._hb_thread.stopped() or + not self._hb_thread._heartbeat_pub._transport.is_connected + ) + ): + self._restart_heartbeat_event_async.set() + App.get_instance().ensure_future(self._restart_heartbeat()) + + connected = False + + # Check Publishers + topic_keys = list(self._topic_publishers.keys()) + + for topic in topic_keys: + + p = self._topic_publishers[topic] + + if not p.is_connected: + if p.is_ready and self._initial_connection_completed: + logger.debug(f"Restarting publisher thread on {topic}.") + del self._topic_publishers[topic] + + connected = False + + return connected + + async def _monitor_health_loop(self, period: float = 3.0): + await self._gateway_ready.wait() + logger.debug("Started MQTT health monitoring.") + while not self._stop_event_async.is_set(): + self._health = await App.get_instance().async_run_in_executor( + None, self._check_connections) + + if self._health: + if not self._initial_connection_completed: + self._initial_connection_completed = True + + await asyncio.sleep(period) + else: + if self._initial_connection_completed: + logger.warning("MQTT Health check failed. Services should be restarting.") + + await asyncio.sleep(10.0) + + def _stop_health_monitoring_loop(self): + self._stop_event_async.set() + + def start(self) -> None: + self.run() + self._start_health_monitoring_loop() + + def stop(self): + self._stop_health_monitoring_loop() + super().stop() + + def __del__(self): + self.stop() diff --git a/rogerthat/queues/mqtt_queue.py b/rogerthat/queues/mqtt_queue.py index 200b5d3..69e509c 100644 --- a/rogerthat/queues/mqtt_queue.py +++ b/rogerthat/queues/mqtt_queue.py @@ -27,48 +27,63 @@ def __init__(self): self._failure_msg = "Failed to connect to MQTT Broker!" self._is_ready = False - if Config.get_inst().mqtt_enable: - try: - self._mqtt = MQTTGateway() - self._mqtt.run() - self.start() - except (ConnectionRefusedError, gaierror, OSError) as e: - if self._mqtt is not None: - extra_debug = f" Parameters: {self._mqtt._params}" - else: - extra_debug = "" - logger.error(f"{self._failure_msg} Check host and port! - {e}.{extra_debug}") - except SSLEOFError: - logger.error(f"{self._failure_msg} Using plain HTTP port with SSL enabled!") - except SSLCertVerificationError: - logger.error(f"{self._failure_msg} You need to set up your SSL certificates correctly!") - except Exception as e: - logger.error(e) - raise e - if self._is_ready: - logger.info("MQTT Gateway is ready.") - def _create_queue(self): self._mqtt_queue = asyncio.Queue() def start(self): + if not Config.get_inst().mqtt_enable: + return + + try: + self._mqtt = MQTTGateway() + self._mqtt.start() + self._start_queue_tasks() + except (ConnectionRefusedError, gaierror, OSError) as e: + if self._mqtt is not None: + extra_debug = f" Parameters: {self._mqtt._params}" + else: + extra_debug = "" + logger.error(f"{self._failure_msg} Check host and port! - {e}.{extra_debug}") + except SSLEOFError: + logger.error(f"{self._failure_msg} Using plain HTTP port with SSL enabled!") + except SSLCertVerificationError: + logger.error(f"{self._failure_msg} You need to set up your SSL certificates correctly!") + except Exception as e: + logger.error(e) + raise e + if self._is_ready: + logger.debug("MQTT Queue is ready.") + + def _start_queue_tasks(self): if self._mqtt: self._create_queue() self._mqtt_queue_task = safe_ensure_future( self._listen_for_broadcasts() ) self._is_ready = True + safe_ensure_future(self._mqtt.async_set_ready()) def stop(self): if self._mqtt_queue_task is not None: self._mqtt_queue_task.cancel() self._mqtt_queue_task = None logger.debug("MQTT Queue stopped.") + if self._mqtt: + self._mqtt.stop() async def _listen_for_broadcasts(self): if not self._mqtt: raise Exception("listen_for_broadcasts called but mqtt is not enabled!") while True: + if not self._mqtt.health: + if not self._mqtt_queue.empty(): + logger.warning("MQTT not ready for broadcast, sleeping 5s before retry.") + + await asyncio.sleep(5) + continue + + msg = None + try: msg = await self._mqtt_queue.get() publisher = self._mqtt.get_publisher_for(msg.topic) @@ -79,10 +94,13 @@ async def _listen_for_broadcasts(self): tb = "".join(traceback.TracebackException.from_exception(e).format()) logger.error(f"Error in mqtt_queue: {e}\n{tb}") + if msg and self._mqtt_queue: + self._mqtt_queue.put_nowait(msg) + def broadcast(self, event): if not self._mqtt_queue: - logger.error("Cannot broadcast, MQTT not started!") + logger.error("Cannot broadcast, MQTT Queue not started!") return - if self._mqtt: - logger.debug("Adding event to MQTT queue.") - self._mqtt_queue.put_nowait(event) + + logger.debug("Adding event to MQTT queue.") + self._mqtt_queue.put_nowait(event)