diff --git a/images/child-device/child.dockerfile b/images/child-device/child.dockerfile index 176d163..d6d98cf 100644 --- a/images/child-device/child.dockerfile +++ b/images/child-device/child.dockerfile @@ -1,10 +1,9 @@ -FROM python:3.10 -WORKDIR /usr/src/app -COPY requirements.txt ./ -RUN pip install --no-cache-dir -r requirements.txt +FROM ghcr.io/thin-edge/python-tedge-agent:0.0.1 ENV CONNECTOR_TEDGE_HOST=tedge ENV CONNECTOR_TEDGE_API=http://tedge:8000 -COPY . . -CMD [ "python", "-m", "connector" ] \ No newline at end of file +COPY config/* /data/config/ +COPY tedge-configuration-plugin.json /data/config/ +# Change working directory so the tedge-configuration-plugin file is read by default +WORKDIR /data/config diff --git a/images/child-device/connector/__init__.py b/images/child-device/connector/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/images/child-device/connector/__main__.py b/images/child-device/connector/__main__.py deleted file mode 100644 index cfb6808..0000000 --- a/images/child-device/connector/__main__.py +++ /dev/null @@ -1,5 +0,0 @@ -"""Module entrypoint""" - -from .app import App - -App().run() diff --git a/images/child-device/connector/app.py b/images/child-device/connector/app.py deleted file mode 100644 index 116b36e..0000000 --- a/images/child-device/connector/app.py +++ /dev/null @@ -1,62 +0,0 @@ -"""Connector application""" -import logging -import queue -import threading -import os -import time -import sys -from .config import Config -from .client import TedgeClient -from .management.metrics import collect_metrics - -# Set sensible logging defaults -log = logging.getLogger() -log.setLevel(logging.INFO) -handler = logging.StreamHandler() -handler.setLevel(logging.INFO) -formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") -handler.setFormatter(formatter) -log.addHandler(handler) - - -class App: - """Connection Application""" - - # pylint: disable=too-few-public-methods - def run(self): - """Main application loop""" - # pylint: disable=broad-except - metrics_queue = queue.SimpleQueue() - config = Config() - - file = os.getenv("CONNECTOR_SETTINGS", "./config/connector.ini") - if os.path.exists(file): - config.load_file(file) - config.load_env() - client = TedgeClient(config) - - while True: - try: - client.connect() - client.bootstrap() - client.subscribe() - - metrics_thread = threading.Thread( - target=collect_metrics, args=(client, metrics_queue), daemon=True - ) - metrics_thread.start() - while True: - time.sleep(1) - # client.loop_forever() - except ConnectionRefusedError: - log.info("MQTT broker is not ready yet") - except KeyboardInterrupt: - log.info("Exiting...") - if client: - client.shutdown() - sys.exit(0) - except Exception as ex: - log.info("Unexpected error. %s", ex) - - # Wait before trying again - time.sleep(5) diff --git a/images/child-device/connector/client.py b/images/child-device/connector/client.py deleted file mode 100644 index c85ff6b..0000000 --- a/images/child-device/connector/client.py +++ /dev/null @@ -1,210 +0,0 @@ -"""thin-edge.io client""" -import logging -import json -import time -import os -import threading -from typing import Any, List -from paho.mqtt.client import Client, MQTTMessage -from .config import Config -from .topics import health_topic -from .management.worker import Worker, Job -from .management import configuration, firmware -from .messages import JSONMessage -from .management.operation import OperationStatus - - -log = logging.getLogger(__file__) - - -class TedgeClient: - """Tedge Client - - The tedge client is used to communicate with thin-edge.io via MQTT and HTTP - """ - - def __init__(self, config: Config) -> None: - self.mqtt = None - self.config = config - self._workers: List[Worker] = [] - self.config.device_id = self.get_id() - self._subscriptions = [] - self._connected_once = threading.Event() - - def shutdown(self, worker_timeout: float = 10): - """Shutdown client including any workers in progress - - Args: - worker_timeout(float): Timeout in seconds to wait for - each worker (individually). Defaults to 10. - """ - if self.mqtt and self.mqtt.is_connected(): - self.mqtt.disconnect() - self.mqtt.loop_stop(True) - - # Stop all workers - for worker in self._workers: - worker.join(worker_timeout if worker_timeout and worker_timeout > 0 else 10) - - # Clear workers - self._workers = [] - self.mqtt = None - - def get_id(self): - """Get the id to be used for the connector""" - return ( - os.getenv("CONNECTOR_DEVICE_ID") - or os.getenv("HOSTNAME") - or os.getenv("HOST") - or "tedge_child" - ) - - def connect(self): - """Connect to the thin-edge.io MQTT broker""" - if self.mqtt is not None: - log.info( - "MQTT client already exists. connected=%s", self.mqtt.is_connected() - ) - return - - if self.mqtt is None: - # Don't use a clean session so no messages will go missing - client = Client(self.config.device_id, clean_session=False) - client.reconnect_delay_set(10, 120) - if self.config.device_id: - client.will_set( - health_topic( - topic_id=f"device/{self.config.device_id}/service/connector" - ), - json.dumps({"status": "down"}), - ) - - def _create_on_connect_callback(done): - _done = done - - def on_connect(_client, _userdata, _flags, result_code): - nonlocal _done - if result_code == 0: - log.info("Connected to MQTT Broker!") - _done.set() - else: - log.info("Failed to connect. code=%d", result_code) - - return on_connect - - def on_disconnect(_client: Client, _userdata: Any, result_code: int): - log.info("Client was disconnected. result_code=%d", result_code) - - self._connected_once.clear() - client.on_connect = _create_on_connect_callback(self._connected_once) - client.on_disconnect = on_disconnect - # Enable paho mqtt logs to help with any mqtt connection debugging - client.enable_logger(log) - log.info( - "Trying to connect to the MQTT broker: host=%s:%s, client_id=%s", - self.config.tedge.host, - self.config.tedge.port, - self.config.device_id, - ) - - client.connect(self.config.tedge.host, self.config.tedge.port) - client.loop_start() - - # Only assign client after .connect call (as it can throw an error if the address is not reachable) - self.mqtt = client - - if not self._connected_once.wait(30): - log.warning( - "Failed to connect successfully after 30 seconds. Continuing anyway" - ) - # TODO: Should an exception be thrown, or just let paho do the reconnect eventually - # self.shutdown() - # raise RuntimeError("Failed to connect successfully to MQTT broker") - - def bootstrap(self): - """Register extra services once the mqtt client is up""" - - self.mqtt.publish( - f"te/device/{self.config.device_id}//", - json.dumps( - { - "@type": "child-device", - "name": self.config.device_id, - "type": "python-connector", - } - ), - retain=True, - qos=1, - ) - # wait for registration to be processed - time.sleep(5) - configuration.bootstrap(self.config, self.mqtt) - firmware.bootstrap(self.config, self.mqtt) - - def subscribe(self): - """Subscribe to thin-edge.io child device topics and register - handlers to respond to different operations. - """ - # get config handler - handlers = [ - ( - f"te/device/{self.config.device_id}///cmd/config_snapshot/+", - configuration.on_config_snapshot_request, - ), - ( - f"te/device/{self.config.device_id}///cmd/config_update/+", - configuration.on_config_update_request, - ), - ( - # TODO: Update to te/ topic once c8y-firmware-plugin has been updated - f"tedge/+/commands/req/firmware_update", - firmware.on_firmware_update_request, - ), - ] - for topic, handler in handlers: - log.info("Registering worker. topic=%s", topic) - self.register_worker(topic, handler) - - # Only register that the child device is ready now - # Register health check and bootstrap other plugin settings - log.info( - "Publishing health endpoint. device=%s, service=connector", - self.config.device_id, - ) - self.mqtt.publish( - health_topic(topic_id=f"device/{self.config.device_id}/service/connector"), - json.dumps({"status": "up"}), - ) - - def register_worker(self, topic: str, target: Job, num_threads: int = 1): - """Register a worker to handle requests for a specific MQTT topic - - Args: - topic (str): MQTT topic - target (Any): Job function to execute for the worker - num_threads (int, optional): Number of threads. Defaults to 1. - """ - worker = Worker(target, num_threads=num_threads) - worker.start() - - def add_job(client, _userdata, message: MQTTMessage): - if len(message.payload) == 0: - # Message is being cleared - return - - payload = json.loads(message.payload.decode()) - status = payload.get("status", "") - is_legacy = message.topic.startswith("tedge/") - if not is_legacy and status != OperationStatus.INIT: - return - - log.info("Adding job") - worker.put(self.config, client, JSONMessage(message.topic, payload)) - - self.mqtt.message_callback_add(topic, add_job) - self.mqtt.subscribe(topic, qos=2) - self._workers.append(worker) - - def loop_forever(self): - """Block infinitely""" - self.mqtt.loop_forever() diff --git a/images/child-device/connector/config.py b/images/child-device/connector/config.py deleted file mode 100644 index e8ed1ab..0000000 --- a/images/child-device/connector/config.py +++ /dev/null @@ -1,92 +0,0 @@ -"""Connector configuration -""" -import os -from dataclasses import dataclass, field -from configparser import ConfigParser - - -@dataclass -class Configuration: - """Configuration settings""" - - download_timeout: float = 600.0 - upload_timeout: float = 600.0 - type: str = "tedge-configuration-plugin" - path: str = "./tedge-configuration-plugin.json" - - -@dataclass -class Firmware: - """Firmware settings""" - - download_timeout: float = 600.0 - - -@dataclass -class Tedge: - """Tedge settings""" - - host: str = "localhost" - port: int = 1883 - api: str = "http://localhost:8000" - - -@dataclass -class Metrics: - """Metric settings""" - - interval: float = 5.0 - - -@dataclass -class Config: - """Configuration class to load the connector's config""" - - device_id: str = None - - tedge: Tedge = field(default_factory=Tedge) - configuration: Configuration = field(default_factory=Configuration) - firmware: Firmware = field(default_factory=Firmware) - metrics: Metrics = field(default_factory=Metrics) - - def load_file(self, path: str): - """Load configuration from file - - Args: - path (str): Configuration file to load settings from - """ - config = ConfigParser() - config.read(path, encoding="utf8") - - for section in config.sections(): - if hasattr(self, section): - prop_section = getattr(self, section) - - for option in config.options(section): - if hasattr(prop_section, option): - existing_value = getattr(prop_section, option) - - if isinstance(existing_value, int): - new_value = config.getint(section, option) - elif isinstance(existing_value, float): - new_value = config.getfloat(section, option) - elif isinstance(existing_value, bool): - new_value = config.getboolean(section, option) - else: # default to string - new_value = config.get(section, option) - - setattr(prop_section, option, new_value) - - def load_env(self): - """Load connector configuration from environment variables""" - prefix = "CONNECTOR_" - for key, value in os.environ.items(): - if not key.startswith(prefix) or not value: - continue - - parts = key[len(prefix) :].lower().split("_", maxsplit=1) - if len(parts) == 2: - if hasattr(self, parts[0]): - section = getattr(self, parts[0]) - if hasattr(section, parts[1]): - setattr(section, parts[1], value) diff --git a/images/child-device/connector/management/__init__.py b/images/child-device/connector/management/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/images/child-device/connector/management/configuration.py b/images/child-device/connector/management/configuration.py deleted file mode 100644 index 2cfcda4..0000000 --- a/images/child-device/connector/management/configuration.py +++ /dev/null @@ -1,161 +0,0 @@ -"""Configuration handler to get and set configuration files""" -from dataclasses import dataclass -import logging -import json -import os -import shutil -import tempfile -from pathlib import Path -from typing import Dict, Any -import requests -from paho.mqtt.client import Client -from .operation import OperationFlow, OperationStatus -from ..config import Config -from ..messages import JSONMessage - -log = logging.getLogger(__name__) - - -@dataclass -class ConfigurationOperation: - """Configuration operation""" - - # pylint: disable=too-few-public-methods - status: str = None - type: str = None - path: str = None - remoteUrl: str = None - tedgeUrl: str = None - - @classmethod - def from_payload(cls, config: Config, payload: Dict[str, Any]): - """Convert a payload into a typed operation""" - data = cls() - for key, value in payload.items(): - if hasattr(data, key): - setattr(data, key, value) - - data.type = data.path if not data.type else data.type - if data.type == config.configuration.type: - data.path = config.configuration.path - return data - - -def bootstrap(config: Config, client: Client): - """Bootstrap configuration settings by sending the - available configuration files to thin-edge.io - - Args: - config (Config): Connection configuration - client (Client): MQTT client - """ - if not os.path.exists(config.configuration.path): - log.info( - "Skipping configuration bootstrap as file does not exist. path=%s", - config.configuration.path, - ) - return - - plugin_config = load_plugin_config(config) - - log.info("Registering support for config_snapshot and config_update") - cmd_payload = json.dumps( - { - # TODO: Read from the types of configuration from file - "types": sorted(plugin_config.keys()), - } - ) - client.publish( - f"te/device/{config.device_id}///cmd/config_snapshot", - cmd_payload, - retain=True, - qos=1, - ) - client.publish( - f"te/device/{config.device_id}///cmd/config_update", - cmd_payload, - retain=True, - qos=1, - ) - - -def load_plugin_config(config: Config) -> Dict[str, Any]: - plugin_config = json.loads( - Path(config.configuration.path).read_text(encoding="utf8") - ) - data = { - config.configuration.type: { - "type": config.configuration.type, - "path": config.configuration.path, - }, - } - for item in plugin_config.get("files", []): - data[item["type"]] = item - return data - - -def on_config_snapshot_request(config: Config, client: Client, msg: JSONMessage): - """Get configuration operation handler""" - payload = ConfigurationOperation.from_payload(config, msg.payload) - - if payload.status != OperationStatus.INIT: - return - - plugin_config = load_plugin_config(config) - - with OperationFlow(client, msg.topic, payload): - if payload.type not in plugin_config: - raise RuntimeError(f"Unknown configuration file type. type={payload.type}") - - file_config = plugin_config.get(payload.type) - path = file_config["path"] - - if not os.path.exists(path): - raise FileNotFoundError(f"File was not found. path={path}") - - # Upload the requested file - log.info("Uploading the config file. url=%s, path=%s", payload.tedgeUrl, path) - with open(path, "rb") as file: - response = requests.put( - payload.tedgeUrl, data=file, timeout=config.configuration.upload_timeout - ) - log.info("url=%s, status_code=%d", payload.tedgeUrl, response.status_code) - - -def on_config_update_request(config: Config, client: Client, msg: JSONMessage): - """Set configuration operation handler""" - payload = ConfigurationOperation.from_payload(config, msg.payload) - - if payload.status != OperationStatus.INIT: - return - - plugin_config = load_plugin_config(config) - - with OperationFlow(client, msg.topic, payload): - if payload.type not in plugin_config: - raise RuntimeError(f"Unknown configuration file type. type={payload.type}") - - file_config = plugin_config.get(payload.type) - path = file_config["path"] - - # Download the config file update from tedge - log.info("Downloading configuration. url=%s", payload.remoteUrl) - response = requests.get( - payload.remoteUrl, timeout=config.configuration.download_timeout - ) - log.debug( - "response: %s, status_code=%d", response.content, response.status_code - ) - - with tempfile.NamedTemporaryFile( - prefix=payload.type, delete=False - ) as target_path: - log.info("temp_path=%s", target_path.name) - target_path.write(response.content) - target_path.close() - # Replace the existing config file with the updated file downloaded from tedge - shutil.move(target_path.name, path) - - if payload.type == config.configuration.type: - log.info("Re-reading plugin configuration") - bootstrap(config, client) diff --git a/images/child-device/connector/management/firmware.py b/images/child-device/connector/management/firmware.py deleted file mode 100644 index b5c0ab9..0000000 --- a/images/child-device/connector/management/firmware.py +++ /dev/null @@ -1,129 +0,0 @@ -"""Firmware handler""" -import logging -import json -import tempfile -import time -from dataclasses import dataclass -from datetime import datetime, timedelta -import requests -from paho.mqtt.client import Client -from .operation import OperationFlow -from ..config import Config -from ..messages import JSONMessage -from ..topics import event_topic - - -log = logging.getLogger(__name__) - - -@dataclass -class FirmwareOperation: - """Operation data structure""" - - # pylint: disable=invalid-name - id: str = None - url: str = None - attempt: int = None - name: str = None - version: str = None - url: str = None - sha256: str = None - - @classmethod - def from_payload(cls, payload): - """Convert a payload into a typed operation""" - data = cls() - for key, value in payload.items(): - if hasattr(data, key): - setattr(data, key, value) - return data - - -def bootstrap(config: Config, client: Client): - """Bootstrap firmware operation - - Args: - config (Config): Connection configuration - client (Client): MQTT client - """ - client.publish( - f"te/device/{config.device_id}///cmd/firmware_update", "{}", retain=True, qos=1 - ) - - -def on_firmware_update_request(config: Config, client: Client, msg: JSONMessage): - """Set firmware operation handler""" - # TODO: replace once firmware_update operation has been ported to the v1 API, e.g. te/+/+/+/+/cmd/firmware_update/+ - is_legacy_topic = msg.topic.startswith("tedge/") - if is_legacy_topic and not str(msg.topic.split("/")[1]).endswith( - f":{config.device_id}" - ): - # ignore operation as it is not for us - log.info(f"Ignoring message. device_id=%s, topic=%s", config.device_id, msg.topic) - return - - payload = FirmwareOperation.from_payload(msg.payload) - - topic = msg.topic - if is_legacy_topic: - topic = msg.topic.replace("/req/firmware_update", "/res/firmware_update") - - with OperationFlow(client, topic, payload): - log.info("Downloading firmware. url=%s", payload.url) - with tempfile.NamedTemporaryFile( - prefix=payload.id, delete=False - ) as target_path: - # stream download so it does not have to save everything to memory - with requests.get( - payload.url, stream=True, timeout=config.firmware.download_timeout - ) as req: - req.raise_for_status() - for chunk in req.iter_content(chunk_size=8192): - target_path.write(chunk) - - target_path.close() - log.info("Firmware file downloaded to: %s", target_path.name) - - start_time = time.monotonic() - # Optional: Send an event indicating that the actually - client.publish( - event_topic( - "te", f"device/{config.device_id}//", "firmware_update_start" - ), - json.dumps( - { - "text": f"Applying firmware: {payload.name}={payload.version}", - "name": payload.name, - "version": payload.version, - "time": datetime.now().isoformat() + "Z", - } - ), - ) - - # Add whatever you want to do with the file here! - # Simulate some work by sleeping - work_duration = 10 - while True: - if time.monotonic() - start_time > work_duration: - break - _ = 100 * 100 - # time.sleep(10) - - # Optional: Send an event once the update is done with a duration how long it took - duration = timedelta(seconds=int(time.monotonic() - start_time)) - client.publish( - event_topic( - "te", f"device/{config.device_id}//", "firmware_update_done" - ), - json.dumps( - { - "text": ( - f"Finished applying firmware: {payload.name}={payload.version}" - f", duration={duration}" - ), - "name": payload.name, - "version": payload.version, - "time": datetime.now().isoformat() + "Z", - } - ), - ) diff --git a/images/child-device/connector/management/metrics.py b/images/child-device/connector/management/metrics.py deleted file mode 100644 index 653054c..0000000 --- a/images/child-device/connector/management/metrics.py +++ /dev/null @@ -1,56 +0,0 @@ -"""Metric handler""" -import json -import logging -import queue - -import psutil - -# from paho.mqtt.client import Client -from ..client import TedgeClient - -from ..topics import measurement_topic - -log = logging.getLogger(__name__) - - -def collect_metrics(client: TedgeClient, settings: queue.SimpleQueue): - """Collect metrics about the child device - - The function should be called in a background thread. - - Args: - client (Client): MQTT Client - settings (queue.SimpleQueue): Settings queue, which can be used - to the new interval to the control how often the metrics are - gathered. - """ - timeout = 5 - while True: - # pylint: disable=broad-except - try: - try: - # use a queue to limit how often the collection is run - timeout = settings.get(timeout=timeout) - except queue.Empty: - pass - log.debug("Checking metrics") - disk_root_usage = psutil.disk_usage("/").percent - cpu_usage = psutil.cpu_percent() - client.mqtt.publish( - measurement_topic( - topic_id=f"device/{client.config.device_id}//", - type="resource_usage", - ), - json.dumps( - { - "cpu": { - "percent-used": cpu_usage, - }, - "df-root": { - "percent-used": disk_root_usage, - }, - } - ), - ) - except Exception as ex: - log.warning("Unexpected error. %s", ex, exc_info=True) diff --git a/images/child-device/connector/management/operation.py b/images/child-device/connector/management/operation.py deleted file mode 100644 index 477ff0a..0000000 --- a/images/child-device/connector/management/operation.py +++ /dev/null @@ -1,96 +0,0 @@ -"""Operation utilities""" -import json -import logging -from typing import Any -from paho.mqtt.client import Client - -log = logging.getLogger(__name__) - - -class OperationStatus: - """Operation statuses""" - - # pylint: disable=too-few-public-methods - INIT = "init" - EXECUTING = "executing" - SUCCESSFUL = "successful" - FAILED = "failed" - - -class OperationFlow: - """Operation flow handler. The class is responsible for handling the operation - transitions. - - The class will automatically set the operation to successful if no exception - occur, and set it to failed if any exception occur. The idea it to reliably - transition the operations statuses and not let the user handle all of the - scenarios. - """ - - # pylint: disable=too-many-arguments - def __init__( - self, - client: Client, - topic: str, - payload: Any = None, - skip_executing: bool = False, - exceptions=Exception, - ) -> None: - self.client = client - self.topic = topic - self.request = payload - self._skip_executing = skip_executing - self._exceptions = exceptions - - def executing(self): - """Transition operation to the executing state""" - payload = { - **self.request.__dict__, - "status": OperationStatus.EXECUTING, - } - log.info( - "Setting %s to %s. topic=%s, payload=%s", - self.__class__.__name__, - OperationStatus.EXECUTING, - self.topic, - payload, - ) - self.client.publish(self.topic, json.dumps(payload)) - - def finished(self, status: str, reason: str = None): - """Transition operation to the executing state - - Args: - status (str): Operation status - reason (str, optional): Failure reason - """ - payload = { - **self.request.__dict__, - "status": status, - } - if reason: - payload["reason"] = reason - - log.info( - "Updating %s status. topic=%s, payload=%s", - self.__class__.__name__, - self.topic, - payload, - ) - self.client.publish(self.topic, json.dumps(payload)) - - def __enter__(self): - # Set operation to executing - if not self._skip_executing: - self.executing() - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - # set operation to either successful or failed - if exc_type: - log.warning("Operation failed. %s %s", exc_type, exc_val, exc_info=True) - self.finished(OperationStatus.FAILED, str(exc_val)) - else: - self.finished(OperationStatus.SUCCESSFUL) - - return exc_type is not None and issubclass(exc_type, self._exceptions) diff --git a/images/child-device/connector/management/worker.py b/images/child-device/connector/management/worker.py deleted file mode 100644 index 1b9e930..0000000 --- a/images/child-device/connector/management/worker.py +++ /dev/null @@ -1,82 +0,0 @@ -"""Worker class to process tasks in the background""" -import logging -import queue -import threading -from typing import Any, List, Callable - -from paho.mqtt.client import Client -from ..messages import JSONMessage -from ..config import Config - -Job = Callable[[Config, Client, JSONMessage], None] - - -class Worker: - """Worker thread to process work""" - - def __init__( - self, - target: Job, - num_threads: int = 1, - ) -> None: - """ - Args: - target (worker_func, None]): Callback that will - be called on each job received - num_threads (int, optional): Number of threads to use to - process the jobs. Defaults to 1. - """ - self.queue = queue.SimpleQueue() - self.target = target - self.name = getattr(target, "__name__", "worker") - self._num_threads = num_threads - self._threads: List[threading.Thread] = [] - self._log = logging.getLogger() - - def start(self): - """Start the background worker threads""" - for _ in range(self._num_threads): - thread = threading.Thread(target=self.run, name=self.name) - thread.start() - self._threads.append(thread) - - def join(self, timeout: float = None): - """Send shutdown signal to all workers and wait for them to stop - - Args: - timeout (float, optional): Timeout in seconds. Defaults to None. - """ - # Send shutdown signal (empty message) - for thread in self._threads: - self.queue.put(None) - - # Wait for threads to exit - for thread in self._threads: - thread.join(timeout=timeout) - - self._threads = [] - - def put(self, config: Config, client: Any, message: JSONMessage): - """Add job to queue - - Args: - client (Client): MQTT Client - message (MQTTMessage): MQTT Message - """ - self.queue.put((config, client, message)) - - def run(self): - """Process jobs by reading from the job queue""" - while True: - data = self.queue.get() - if not data: - self._log.info("Shutting down worker thread") - break - - config, client, message = data - self._log.info( - "%s: new message from queue. message=%s", - self.name, - message.payload, - ) - self.target(config, client, message) diff --git a/images/child-device/connector/messages.py b/images/child-device/connector/messages.py deleted file mode 100644 index d0ecda3..0000000 --- a/images/child-device/connector/messages.py +++ /dev/null @@ -1,12 +0,0 @@ -"""Messages""" - -from dataclasses import dataclass -from typing import Dict, Any - - -@dataclass -class JSONMessage: - """JSON Message""" - - topic: str - payload: Dict[str, Any] diff --git a/images/child-device/connector/topics.py b/images/child-device/connector/topics.py deleted file mode 100644 index 155ec1c..0000000 --- a/images/child-device/connector/topics.py +++ /dev/null @@ -1,41 +0,0 @@ -"""MQTT Topics""" - - -def health_topic(root_topic: str = "te", topic_id: str = "device/main//") -> str: - """Health topic for the child device""" - return "/".join( - [ - root_topic, - topic_id, - "status", - "health", - ] - ) - - -def measurement_topic( - root_topic: str = "te", topic_id: str = "device/main//", type: str = "" -) -> str: - """Measurement topic for the child device""" - return "/".join( - [ - root_topic, - topic_id, - "m", - type, - ] - ) - - -def event_topic( - root_topic: str = "te", topic_id: str = "device/main//", type: str = "" -) -> str: - """Event topic for the child device""" - return "/".join( - [ - root_topic, - topic_id, - "e", - type, - ] - ) diff --git a/images/child-device/tedge-configuration-plugin.json b/images/child-device/tedge-configuration-plugin.json index 04a50b4..ac4579c 100644 --- a/images/child-device/tedge-configuration-plugin.json +++ b/images/child-device/tedge-configuration-plugin.json @@ -1,6 +1,6 @@ { "files": [ - { "path": "/usr/src/app/config/connector.ini", "type": "connector"}, - { "path": "/usr/src/app/config/modem.json", "type": "modem"} + { "path": "/data/config/connector.ini", "type": "connector"}, + { "path": "/data/config/modem.json", "type": "modem"} ] }