forked from rsyslog/rsyslog
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathimkafka_multi_many.sh
executable file
·140 lines (118 loc) · 3.76 KB
/
imkafka_multi_many.sh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
#!/bin/bash
# added 2018-08-29 by alorbach
# This file is part of the rsyslog project, released under ASL 2.0
echo Init Testbench
. ${srcdir:=.}/diag.sh init
check_command_available kafkacat
# *** ==============================================================================
export TESTMESSAGES=100000
export TESTMESSAGESFULL=100000
# Generate random topic name
export RANDTOPIC=$(tr -dc 'a-zA-Z0-9' < /dev/urandom | fold -w 8 | head -n 1)
# Set EXTRA_EXITCHECK to dump kafka/zookeeperlogfiles on failure only.
export EXTRA_EXITCHECK=dumpkafkalogs
export EXTRA_EXIT=kafkamulti
echo ===============================================================================
echo Check and Stop previous instances of kafka/zookeeper
download_kafka
stop_zookeeper '.dep_wrk1'
stop_zookeeper '.dep_wrk2'
stop_zookeeper '.dep_wrk3'
stop_kafka '.dep_wrk1'
stop_kafka '.dep_wrk2'
stop_kafka '.dep_wrk3'
echo Create kafka/zookeeper instance and $RANDTOPIC topic
start_zookeeper '.dep_wrk1'
start_zookeeper '.dep_wrk2'
start_zookeeper '.dep_wrk3'
start_kafka '.dep_wrk1'
start_kafka '.dep_wrk2'
start_kafka '.dep_wrk3'
# create new topic
create_kafka_topic $RANDTOPIC '.dep_wrk1' '22181'
# --- Create imkafka receiver config
export RSYSLOG_DEBUGLOG="log"
generate_conf
add_conf '
main_queue(queue.timeoutactioncompletion="60000" queue.timeoutshutdown="60000")
module(load="../plugins/imkafka/.libs/imkafka")
/* Polls messages from kafka server!*/
input( type="imkafka"
topic="'$RANDTOPIC'"
broker=["localhost:29092", "localhost:29093", "localhost:29094"]
consumergroup="default1"
confParam=[ "compression.codec=none",
"session.timeout.ms=10000",
"socket.timeout.ms=5000",
"socket.keepalive.enable=true",
"reconnect.backoff.jitter.ms=1000",
"enable.partition.eof=false" ]
)
input( type="imkafka"
topic="'$RANDTOPIC'"
broker=["localhost:29092", "localhost:29093", "localhost:29094"]
consumergroup="default2"
confParam=[ "compression.codec=none",
"session.timeout.ms=10000",
"socket.timeout.ms=5000",
"socket.keepalive.enable=true",
"reconnect.backoff.jitter.ms=1000",
"enable.partition.eof=false" ]
)
input( type="imkafka"
topic="'$RANDTOPIC'"
broker=["localhost:29092", "localhost:29093", "localhost:29094"]
consumergroup="default3"
confParam=[ "compression.codec=none",
"session.timeout.ms=10000",
"socket.timeout.ms=5000",
"socket.keepalive.enable=true",
"reconnect.backoff.jitter.ms=1000",
"enable.partition.eof=false" ]
)
input( type="imkafka"
topic="'$RANDTOPIC'"
broker=["localhost:29092", "localhost:29093", "localhost:29094"]
consumergroup="default4"
confParam=[ "compression.codec=none",
"session.timeout.ms=10000",
"socket.timeout.ms=5000",
"socket.keepalive.enable=true",
"reconnect.backoff.jitter.ms=1000",
"enable.partition.eof=false" ]
)
template(name="outfmt" type="string" string="%msg:F,58:2%\n")
if ($msg contains "msgnum:") then {
action( type="omfile" file=`echo $RSYSLOG_OUT_LOG` template="outfmt" )
}
'
# Start imkafka receiver config
echo Starting receiver instance [imkafka]
startup
# ---
# Measure Starttime
TIMESTART=$(date +%s.%N)
# --- Fill Kafka Server with messages
# Can properly be done in a better way?!
for i in {00000001..00100000}
do
echo " msgnum:$i" >> $RSYSLOG_OUT_LOG.in
done
echo Inject messages into kafka
kafkacat <$RSYSLOG_OUT_LOG.in -P -b localhost:29092 -t $RANDTOPIC
# ---
echo Give imkafka some time to start...
sleep 5
echo Stopping sender instance [omkafka]
shutdown_when_empty
wait_shutdown
# Measure Endtime
TIMEEND=$(date +%s.%N)
TIMEDIFF=$(echo "$TIMEEND - $TIMESTART" | bc)
echo "*** imkafka time to process all data: $TIMEDIFF seconds!"
# Delete topic to remove old traces before
delete_kafka_topic $RANDTOPIC '.dep_wrk1' '22181'
# Do the final sequence check
seq_check 1 $TESTMESSAGESFULL -d
echo success
exit_test