Skip to content

Commit f22c6d9

Browse files
committed
Improve performance of datapoint ingestion
Several users reported timeouts logged in Druid Historical/MiddleManager daemons when their traffic is really high (like 1500 msg/s). The exporter was built with a more conservative use case in mind (Wikimedia's), so the default sync single process of make_server() has been enough up to now. This patch separates datapoints ingestion (namely traffic that comes from Druid daemons towards the exporter) and datapoints processing, using a (thread-safe) queue. It also uses the gevent's WSGIServer implementation, that uses coroutines/greenlets and it is able to sustain a lot more (concurrent) traffic. GH issue: #11 Change-Id: I4b335a1f663957277fe0c443492c4000bbbcac89
1 parent c8b6d1f commit f22c6d9

File tree

3 files changed

+28
-9
lines changed

3 files changed

+28
-9
lines changed

druid_exporter/collector.py

+24-6
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
# limitations under the License.
1515

1616
import logging
17+
import queue
18+
import threading
1719

1820
from collections import defaultdict
1921
from prometheus_client.core import (CounterMetricFamily, GaugeMetricFamily,
@@ -28,6 +30,17 @@ class DruidCollector(object):
2830
'druid_scrape_duration_seconds', 'Druid scrape duration')
2931

3032
def __init__(self, metrics_config):
33+
34+
# The ingestion of the datapoints is separated from their processing,
35+
# to separate concerns and avoid unnecessary slowdowns for Druid
36+
# daemons sending data.
37+
# Only one thread de-queues and process datapoints, in this way we
38+
# don't really need any special locking to guarantee consistency.
39+
# Since this thread is not I/O bound it doesn't seem the case to
40+
# use a gevent's greenlet, but more tests might prove the contrary.
41+
self.datapoints_queue = queue.Queue()
42+
threading.Thread(target=self.process_queued_datapoints).start()
43+
3144
# Datapoints successfully registered
3245
self.datapoints_registered = 0
3346

@@ -230,10 +243,15 @@ def register_datapoint(self, datapoint):
230243
.format(datapoint))
231244
return
232245

233-
metric_name = str(datapoint['metric'])
234-
if self.metrics_config[daemon][metric_name]['type'] == 'histogram':
235-
self.store_histogram(datapoint)
236-
else:
237-
self.store_counter(datapoint)
246+
self.datapoints_queue.put((daemon, datapoint))
247+
248+
def process_queued_datapoints(self):
249+
while True:
250+
(daemon, datapoint) = self.datapoints_queue.get()
251+
metric_name = str(datapoint['metric'])
252+
if self.metrics_config[daemon][metric_name]['type'] == 'histogram':
253+
self.store_histogram(datapoint)
254+
else:
255+
self.store_counter(datapoint)
238256

239-
self.datapoints_registered += 1
257+
self.datapoints_registered += 1

druid_exporter/exporter.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
from druid_exporter import collector
2323
from prometheus_client import generate_latest, make_wsgi_app, REGISTRY
24-
from wsgiref.simple_server import make_server
24+
from gevent.pywsgi import WSGIServer
2525

2626
log = logging.getLogger(__name__)
2727

@@ -149,7 +149,7 @@ def main():
149149
druid_wsgi_app = DruidWSGIApp(args.uri, druid_collector,
150150
prometheus_app, args.encoding)
151151

152-
httpd = make_server(address, int(port), druid_wsgi_app)
152+
httpd = WSGIServer((address, int(port)), druid_wsgi_app)
153153
httpd.serve_forever()
154154

155155

setup.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from setuptools import setup
22

33
setup(name='druid_exporter',
4-
version='0.8',
4+
version='0.9',
55
description='Prometheus exporter for Druid',
66
url='https://github.com/wikimedia/operations-software-druid_exporter',
77
author='Luca Toscano',
@@ -10,6 +10,7 @@
1010
packages=['druid_exporter'],
1111
install_requires=[
1212
'prometheus-client>=0.5.0',
13+
'gevent',
1314
],
1415
entry_points={
1516
'console_scripts': [

0 commit comments

Comments
 (0)