From 5d4e299696bbdc7ca4406ac507bf4676db273ede Mon Sep 17 00:00:00 2001 From: Teemu Erkkola Date: Tue, 16 Apr 2024 11:46:47 +0300 Subject: [PATCH 1/2] AV-2184: Use a thread pool instead of CKAN jobs for sending Matomo events --- ckanext/matomo/matomo_api.py | 17 ++++++++++++ ckanext/matomo/tracking.py | 51 ++++++++++++++++++++++++++---------- 2 files changed, 54 insertions(+), 14 deletions(-) diff --git a/ckanext/matomo/matomo_api.py b/ckanext/matomo/matomo_api.py index 4da6a1f..4a060e1 100644 --- a/ckanext/matomo/matomo_api.py +++ b/ckanext/matomo/matomo_api.py @@ -2,6 +2,7 @@ import datetime import uuid +from urllib.parse import urlencode from typing import Dict, Any log = __import__('logging').getLogger(__name__) @@ -168,6 +169,22 @@ def tracking(self, extra_params): return requests.get(self.tracking_url, params=params) + def tracking_bulk(self, events, token_auth=None): + # URL encode events as required by Matomo: + # https://developer.matomo.org/api-reference/tracking-api#bulk-tracking + requests = [] + for event in events: + params = self.default_params.copy() + params.update(event) + requests.append(f'?{urlencode(params)}') + + data = { + 'requests': requests, + 'token_auth': token_auth or self.token_auth + } + + return requests.post(self.tracking_url, data=data) + def _process_one_or_more_dates_result(data, handler) -> Dict[str, Any]: # Single date diff --git a/ckanext/matomo/tracking.py b/ckanext/matomo/tracking.py index c465daf..94f9fad 100644 --- a/ckanext/matomo/tracking.py +++ b/ckanext/matomo/tracking.py @@ -1,12 +1,18 @@ import logging import datetime +from concurrent.futures import ThreadPoolExecutor +from queue import Queue, Full, Empty + from ckan.views.api import action as ckan_action import ckan.plugins.toolkit as toolkit from ckanext.matomo.matomo_api import MatomoAPI +MAX_EVENTS_PER_MATOMO_REQUEST = 32 log = logging.getLogger(__name__) +tracking_executor = ThreadPoolExecutor(max_workers=1) +tracking_queue = Queue() def tracked_action(logic_function, ver=3): @@ -51,23 +57,40 @@ def post_analytics(category, action, name, download=False): if download: event['download'] = event['url'] - matomo_url = toolkit.config.get(u'ckanext.matomo.domain') - matomo_site_id = toolkit.config.get(u'ckanext.matomo.site_id') - test_mode = toolkit.config.get('ckanext.matomo.test_mode', False) - log.info('Logging tracking event: %s', event) - toolkit.enqueue_job(matomo_track, [matomo_url, matomo_site_id, event, test_mode], queue='priority') + try: + tracking_queue.put_nowait(event) + tracking_executor.submit(matomo_track) + except Full: + log.warning(f'Matomo tracking event queue full, discarding {event}') # Required to be a free function to work with background jobs -def matomo_track(matomo_url, matomo_site_id, event, test_mode): +def matomo_track(): + # Gather events to send + events = [] + try: + while not tracking_queue.empty() and len(events) < MAX_EVENTS_PER_MATOMO_REQUEST: + events.append(tracking_queue.get_nowait()) + except Empty: + pass # Just continue if the queue was empty + + if not events: + return # No events to send + log = logging.getLogger('ckanext.matomo.tracking') + test_mode = toolkit.config.get('ckanext.matomo.test_mode', False) + if test_mode: - log.info("Would send API event to Matomo: %s", event) - else: - log.info("Sending API event to Matomo: %s", event) - api = MatomoAPI(matomo_url, matomo_site_id, token_auth=toolkit.config.get('ckanext.matomo.token_auth')) - r = api.tracking(event) - if not r.ok: - log.warn('Error when posting tracking event to matomo: %s %s' % (r.status_code, r.reason)) - log.warn('With request: %s' % r.url) + log.info(f"Would send API events to Matomo: {events}") + return + + log.info(f"Sending API events to Matomo: {events}") + matomo_url = toolkit.config.get(u'ckanext.matomo.domain') + matomo_site_id = toolkit.config.get(u'ckanext.matomo.site_id') + token_auth = toolkit.config.get('ckanext.matomo.token_auth') + api = MatomoAPI(matomo_url, matomo_site_id, token_auth=token_auth) + r = api.tracking_bulk(events) + if not r.ok: + log.warn('Error when posting tracking events to matomo: %s %s' % (r.status_code, r.reason)) + log.warn('With request: %s' % r.url) From 1c8999bc9eed41648df7afb165f052faa55613d9 Mon Sep 17 00:00:00 2001 From: Teemu Erkkola Date: Tue, 16 Apr 2024 11:53:07 +0300 Subject: [PATCH 2/2] Use tracking params --- ckanext/matomo/matomo_api.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ckanext/matomo/matomo_api.py b/ckanext/matomo/matomo_api.py index 4a060e1..b1a2b50 100644 --- a/ckanext/matomo/matomo_api.py +++ b/ckanext/matomo/matomo_api.py @@ -174,7 +174,7 @@ def tracking_bulk(self, events, token_auth=None): # https://developer.matomo.org/api-reference/tracking-api#bulk-tracking requests = [] for event in events: - params = self.default_params.copy() + params = self.tracking_params.copy() params.update(event) requests.append(f'?{urlencode(params)}')