Skip to content

Commit 35e781c

Browse files
committed
Add support to pull datapoints from Kafka
This change introduces a separate thread that is able to pull data from a Kafka topic and insert datapoints in the shared queue that separate ingestion from processing. The idea is to set Druid daemons that need to push hundreds of datapoints/s to Kafka (via the KafkaEmitter) and collect them at a slower pace in the Exporter. GH issue: #11 Change-Id: Ibc82be5883f20c26b50342d2032381086bcd218a
1 parent 6f03fcd commit 35e781c

File tree

4 files changed

+90
-9
lines changed

4 files changed

+90
-9
lines changed

README.md

+16-6
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,14 @@ The druid prometheus exporter accepts HTTP POST data, inspects it and stores/agg
5757
every supported datapoint into a data structure. It then formats the
5858
data on the fly to Prometheus metrics when a GET /metrics is requested.
5959
60+
The exporter is also able to pull metrics from a Kafka topic, when Druid is configured to do so:
61+
62+
https://druid.apache.org/docs/latest/development/extensions-contrib/kafka-emitter.html
63+
64+
The Kafka support is optional and should be used when the number of metrics/datapoints emitted
65+
by Druid is very high (rule of thumb could be around 1000 datapoints/s). Please check the section
66+
about performance considerations below for more info.
67+
6068
This exporter is supposed to be run on each host running one or multiple Druid daemons.
6169
6270
## Supported metrics and labels
@@ -117,14 +125,16 @@ requests are welcome!
117125

118126
In https://github.com/wikimedia/operations-software-druid_exporter/issues/11 some users
119127
brought up an interesting use case for the exporter, namely handling peaks of 1500 datapoints/s
120-
sent from Druid brokers. The exporter's code was refactored to be able to scale more
128+
sent from Druid brokers (one can check the rate of datapoints/s via the
129+
`druid_exporter_datapoints_registered_total metric`).
130+
131+
The exporter's code was refactored to be able to scale more
121132
(if you are curious, check [this commit](https://github.com/wikimedia/operations-software-druid_exporter/commit/f22c6d9f8707ae2d274db9b10669b971beed64ab)), but it wasn't enough, since users kept reporting timeouts from Druid daemons while sending
122133
datapoints to the exporter. There are some recommendations to follow:
123134
* Try to use a dedicated exporter instance for daemons sending high volumes of datapoints.
124135
* Try to tune the HTTP emitter's settings via https://druid.apache.org/docs/latest/configuration/index.html#http-emitter-module
125136
(if the Druid version that you are running supports them).
126-
On our side, we are working on a solution that should be simple and flexible, namely use the
127-
[https://druid.apache.org/docs/latest/development/extensions-contrib/kafka-emitter.html](Kafka)
128-
emitter instead of the HTTP emitter. The idea is to instruct Druid daemons to push datapoints
129-
to a Kafka topic, and then to point the exporter to it. The code is not ready yet,
130-
please refer to the aforementioned issue for updates.
137+
If the above is not enough, the exporter can be configured to also pull datapoints from
138+
a Kafka topic (see [https://druid.apache.org/docs/latest/development/extensions-contrib/kafka-emitter.html](Kafka)). With this configuration, the exporter will ingest datapoints coming via
139+
HTTP and Kafka at the same time. An ideal solution is to force Druid daemons emitting too many
140+
datapoints/s to use the KafkaEmitter, and the other ones to use the HTTPEmitter.

druid_exporter/collector.py

+39-1
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,17 @@
2424

2525
log = logging.getLogger(__name__)
2626

27+
try:
28+
from kafka import KafkaConsumer
29+
except ImportError:
30+
KafkaConsumer = None
31+
2732

2833
class DruidCollector(object):
2934
scrape_duration = Summary(
3035
'druid_scrape_duration_seconds', 'Druid scrape duration')
3136

32-
def __init__(self, metrics_config):
37+
def __init__(self, metrics_config, kafka_config=None):
3338

3439
# The ingestion of the datapoints is separated from their processing,
3540
# to separate concerns and avoid unnecessary slowdowns for Druid
@@ -41,6 +46,17 @@ def __init__(self, metrics_config):
4146
self.datapoints_queue = queue.Queue()
4247
threading.Thread(target=self.process_queued_datapoints).start()
4348

49+
# if a Kafka config is provided, create a dedicated thread
50+
# that pulls datapoints from a Kafka topic.
51+
# The thread will then push datapoints to the same queue that
52+
# the HTTP server uses. In this way the exporter allows a mixed
53+
# configuration for Druid Brokers between HTTPEmitter and
54+
# KafkaEmitter (for daemons emitting too many datapoints/s).
55+
if kafka_config and KafkaConsumer:
56+
threading.Thread(
57+
target=self.pull_datapoints_from_kafka,
58+
args=(kafka_config,)).start()
59+
4460
# Datapoints successfully registered
4561
self.datapoints_registered = 0
4662

@@ -255,3 +271,25 @@ def process_queued_datapoints(self):
255271
self.store_counter(datapoint)
256272

257273
self.datapoints_registered += 1
274+
275+
def pull_datapoints_from_kafka(self, kafka_config):
276+
consumer = KafkaConsumer(
277+
kafka_config['topic'],
278+
group_id=kafka_config['group_id'],
279+
bootstrap_servers=kafka_config['bootstrap_servers'])
280+
281+
while True:
282+
consumer.poll()
283+
for message in consumer:
284+
try:
285+
json_message = json.loads(message.value.decode())
286+
log.debug('Datapoint from kafka: %s', json_message)
287+
if type(json_message) == list:
288+
for datapoint in json_message:
289+
self.register_datapoint(datapoint)
290+
else:
291+
self.register_datapoint(json_message)
292+
except json.JSONDecodeError:
293+
log.exception("Failed to decode message from Kafka, skipping..")
294+
except Exception as e:
295+
log.exception("Generic exception while pulling datapoints from Kafka")

druid_exporter/exporter.py

+31-1
Original file line numberDiff line numberDiff line change
@@ -122,13 +122,42 @@ def main():
122122
help='Enable debug logging')
123123
parser.add_argument('-e', '--encoding', default='utf-8',
124124
help='Encoding of the Druid POST JSON data.')
125+
kafka_parser = parser.add_argument_group('kafka',
126+
'Optional configuration for datapoints emitted '
127+
'to a topic via the Druid Kafka Emitter extension.')
128+
kafka_parser.add_argument('-t', '--kafka-topic',
129+
help='Pull datapoints from a given Kafka topic.')
130+
kafka_parser.add_argument('-b', '--kafka-bootstrap-servers', nargs='+',
131+
help='Pull datapoints from a given list of Kafka brokers.')
132+
kafka_parser.add_argument('-g', '--kafka-consumer-group-id',
133+
help='Pull datapoints from Kafka using this Consumer group id.')
134+
125135
args = parser.parse_args()
126136

127137
if args.debug:
128138
logging.basicConfig(level=logging.DEBUG)
129139
else:
130140
logging.basicConfig(level=logging.WARNING)
131141

142+
kafka_args = (args.kafka_topic,
143+
args.kafka_bootstrap_servers,
144+
args.kafka_consumer_group_id)
145+
146+
# Check if a Kafka config is provided
147+
if any(kafka_args):
148+
if not all(kafka_args):
149+
argparse.ArgumentParser.error('Kafka configuration incomplete, '
150+
'please provide a topic, one or more brokers '
151+
'as bootstrap-servers and the consumer group id.')
152+
else:
153+
kafka_config = {}
154+
kafka_config['topic'] = args.kafka_topic
155+
kafka_config['bootstrap_servers'] = args.kafka_bootstrap_servers
156+
kafka_config['group_id'] = args.kafka_consumer_group_id
157+
log.info('Using Kafka config: {}'.format(kafka_config))
158+
else:
159+
kafka_config = None
160+
132161
collect_metrics_from = []
133162

134163
address, port = args.listen.split(':', 1)
@@ -143,7 +172,8 @@ def main():
143172
log.info('Checking consistency of metrics config file..')
144173
check_metrics_config_file_consistency(metrics_config)
145174

146-
druid_collector = collector.DruidCollector(metrics_config)
175+
druid_collector = collector.DruidCollector(
176+
metrics_config, kafka_config)
147177
REGISTRY.register(druid_collector)
148178
prometheus_app = make_wsgi_app()
149179
druid_wsgi_app = DruidWSGIApp(args.uri, druid_collector,

setup.py

+4-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from setuptools import setup
22

33
setup(name='druid_exporter',
4-
version='0.9',
4+
version='0.10',
55
description='Prometheus exporter for Druid',
66
url='https://github.com/wikimedia/operations-software-druid_exporter',
77
author='Luca Toscano',
@@ -12,6 +12,9 @@
1212
'prometheus-client>=0.5.0',
1313
'gevent',
1414
],
15+
extras_require = {
16+
'kafka': ['kafka-python']
17+
},
1518
entry_points={
1619
'console_scripts': [
1720
'druid_exporter = druid_exporter.exporter:main'

0 commit comments

Comments
 (0)