-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathKafkaOperator.py
60 lines (53 loc) · 2.39 KB
/
KafkaOperator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
from ConfigOperator import ConfigOperator
from confluent_kafka import Producer, Consumer, KafkaError, KafkaException
from confluent_kafka.admin import AdminClient
import GlobalConstants
from models.RepoInfo import RepoInfo
class KafkaOperator(object):
def __init__(self, config_path: str) -> None:
self.config = ConfigOperator(config_path=config_path).config
kafka_config = self.config.get("kafka")
self._host = kafka_config.get("host")
self._port = int(kafka_config.get("port"))
self.topicname = kafka_config.get("topic")
self.bootstrap_servers = self._host + ":" + str(self._port)
def sendMsgs(self, producer: Producer, msgs):
for msg in msgs:
producer.produce(topic=self.topicname, value=msg.serialize())
producer.flush()
def isEmpty(self, consumer: Consumer):
task = consumer.poll(timeout=1.0)
if task is None:
return True
if task.error():
if task.error().code() == KafkaError._PARTITION_EOF:
return True
elif task.error().code() == KafkaError.UNKNOWN_TOPIC_OR_PART:
# new topic
return True
return False
def deleteConsumers(self):
a = AdminClient({'bootstrap.servers': self.bootstrap_servers})
groups = a.list_groups()
if len(groups) > 0:
group_ids = [g.id for g in groups]
a.delete_consumer_groups(group_ids=group_ids)
def getConsumer(self, group_id):
groups = self.listConsumers()
for group in groups:
if group_id == group.id:
return Consumer(group)
consumer = Consumer({'bootstrap.servers': self.bootstrap_servers, 'group.id': group_id, 'auto.offset.reset': 'earliest', 'enable.auto.commit': 'false'})
consumer.subscribe([self.topicname])
return consumer
def calRemainedTasks(self, queueName):
channel = self.conn.channel()
queue = channel.queue_declare(queue=queueName, durable=True)
qSize = queue.method.message_count
self.close()
return qSize
if __name__ == "__main__":
kafkaOp = KafkaOperator(config_path=GlobalConstants.CONFIG_PATH)
consumer = Consumer({'bootstrap.servers': kafkaOp.bootstrap_servers, 'group.id': "test", 'auto.offset.reset': 'earliest', 'enable.auto.commit': 'false'})
kafkaOp.deleteConsumers()
print("finish")