forked from confluentinc/cp-ansible
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathupgrade_kafka_broker.yml
335 lines (292 loc) · 12.1 KB
/
upgrade_kafka_broker.yml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
---
- name: Kafka Broker - Prerequisites
hosts: kafka_broker
gather_facts: false
serial: 1
vars:
kafka_broker_upgrade_start_version: "5.3"
vars_files:
- roles/confluent.kafka_broker/defaults/main.yml
handlers:
- name: restart kafka
include_tasks: roles/confluent.kafka_broker/tasks/restart_and_wait.yml
tasks:
- name: Gather OS Facts
setup:
# Only gathers items in list, filters out the rest
filter: ansible_os_family
gather_subset:
- '!all'
- import_role:
name: confluent.variables
- name: Set confluent_server_enabled Variable
include_tasks: tasks/set_confluent_server_enabled.yml
when: installation_method == 'package'
- name: Load server.properties
slurp:
src: "{{ kafka_broker.config_file }}"
register: slurped_properties
- set_fact:
kafka_broker_rest_proxy_enabled: "{{ True if (slurped_properties.content|b64decode).split('\n') | select('match', '^kafka.rest.enable=true') | list | length == 1 else False }}"
- set_fact:
inter_broker_protocol_version: "{{'2.3' if kafka_broker_upgrade_start_version is match('5.3.*') else
'2.4' if kafka_broker_upgrade_start_version is match('5.4.*') else
'2.5' if kafka_broker_upgrade_start_version is match('5.5.*') else
'2.6' if kafka_broker_upgrade_start_version is match('6.0.*') else
'2.7' }}"
log_message_format_version: "{{'2.3' if kafka_broker_upgrade_start_version is match('5.3.*') else
'2.4' if kafka_broker_upgrade_start_version is match('5.4.*') else
'2.5' if kafka_broker_upgrade_start_version is match('5.5.*') else
'2.6' if kafka_broker_upgrade_start_version is match('6.0.*') else
'2.7' }}"
- name: Set Inter-Broker Protocol Version to Current Version
lineinfile:
path: "{{kafka_broker.config_file}}"
line: "inter.broker.protocol.version={{inter_broker_protocol_version}}"
regexp: inter.broker.protocol.version.*
state: present
- name: Set Log Message Format Version to Current Version
lineinfile:
path: "{{kafka_broker.config_file}}"
line: "log.message.format.version={{log_message_format_version}}"
regexp: log.message.format.version.*
state: present
- name: Make sure Controlled Shutdown Property is not set to False
lineinfile:
path: "{{kafka_broker.config_file}}"
# Default value for controlled.shutdown.enable is true
# Removing the line and restarting the service will set property to true
line: controlled.shutdown.enable=false
state: absent
notify: restart kafka
- name: Create Kafka Broker Client Config
template:
src: roles/confluent.kafka_broker/templates/client.properties.j2
dest: "{{kafka_broker.client_config_file}}"
mode: 0640
owner: "{{kafka_broker_user}}"
group: "{{kafka_broker_group}}"
- meta: flush_handlers
- name: Load override.conf
slurp:
src: "{{ kafka_broker.systemd_override }}"
register: slurped_override
when: installation_method == "archive"
- name: Set kafka_broker_current_version variable - Archive Installer
set_fact:
kafka_broker_current_version: "{{ (slurped_override.content|b64decode) .split('\n') |
select('match', '^ExecStart=' + archive_config_base_path + '/confluent-(.*)/bin/kafka-server-start ' + kafka_broker.config_file) |
list | first | regex_search('[0-9]+(.[0-9]+)+') }}"
when: installation_method == "archive"
- name: Kafka Broker Health Check
include_tasks: roles/confluent.kafka_broker/tasks/health_check.yml
vars:
archive_version: "{{kafka_broker_current_version | default(confluent_package_version)}}"
when: not ansible_check_mode
- name: Kafka Broker Upgrade - Host Ordering
hosts: kafka_broker
gather_facts: false
tasks:
- import_role:
name: confluent.kafka_broker
tasks_from: dynamic_groups.yml
vars:
archive_version: "{{kafka_broker_current_version | default(confluent_package_version)}}"
- name: Set Current Package Version of confluent-server
set_fact:
kafka_broker_current_version: "{{ ansible_facts.packages['confluent-server'][0]['version'] }}"
when:
- confluent_server_enabled|bool
- installation_method == 'package'
- name: Set Current Package Version of confluent-kafka-2.12
set_fact:
kafka_broker_current_version: "{{ ansible_facts.packages['confluent-kafka-2.12'][0]['version'] }}"
when:
- not confluent_server_enabled|bool
- not confluent_kafka_upgraded|bool
- installation_method == 'package'
- name: Set Current Package Version of confluent-kafka
set_fact:
kafka_broker_current_version: "{{ ansible_facts.packages['confluent-kafka'][0]['version'] }}"
when:
- not confluent_server_enabled|bool
- confluent_kafka_upgraded|bool
- installation_method == 'package'
- name: Set Current Package Version of confluent-security
set_fact:
confluent_security_current_version: "{{ ansible_facts.packages['confluent-security'][0]['version'] }}"
when: installation_method == 'package'
- name: Load override.conf
slurp:
src: "{{ kafka_broker.systemd_override }}"
register: slurped_override
when: installation_method == "archive"
- name: Set kafka_broker_current_version variable - Archive Installer
set_fact:
kafka_broker_current_version: "{{ (slurped_override.content|b64decode) .split('\n') |
select('match', '^ExecStart=' + archive_config_base_path + '/confluent-(.*)/bin/kafka-server-start ' + kafka_broker.config_file) |
list | first | regex_search('[0-9]+(.[0-9]+)+') }}"
when: installation_method == "archive"
# On colocated zk/kafka hosts, the kafka package will already be upgraded, need to check confluent-security as well
- name: Set Current Package Version
set_fact:
kafka_broker_current_version: "{{ [kafka_broker_current_version, confluent_security_current_version|default(kafka_broker_current_version)] | min }}"
- debug:
msg: "Current version: {{kafka_broker_current_version}} Upgrade to version: {{confluent_package_version}}"
- name: Add host to Upgrade Non Controller Group
group_by:
key: upgrade_non_controllers
changed_when: false
when:
- inventory_hostname in groups['kafka_broker_non_controller']
- kafka_broker_current_version != confluent_full_package_version
- kafka_broker_current_version != confluent_package_version
- name: Add host to Upgrade Controller Group
group_by:
key: upgrade_controller
changed_when: false
when:
- inventory_hostname in groups['kafka_broker_controller']
- kafka_broker_current_version != confluent_full_package_version
- kafka_broker_current_version != confluent_package_version
- name: Kafka Broker Upgrade
# Putting controller group last here with serial=1 has the controller as the last host to run
hosts: upgrade_non_controllers,upgrade_controller
gather_facts: false
environment: "{{ proxy_env }}"
serial: 1
tags:
- upgrade
tasks:
- import_role:
name: confluent.variables
- name: Create Backup Directory
file:
path: "/tmp/upgrade/{{ kafka_broker_service_name }}"
state: directory
mode: 0640
when: installation_method == 'package'
- set_fact:
timestamp: "{{ lookup('pipe', 'date +%Y%m%d%H%M%S') }}"
when: installation_method == 'package'
- name: Backup Configuration files
copy:
src: "{{ item }}"
remote_src: true
dest: "/tmp/upgrade/{{ kafka_broker_service_name }}/{{ item | basename }}-{{timestamp}}"
loop:
- "{{ kafka_broker.config_file }}"
- "{{ kafka_broker.systemd_override }}"
- "{{ zookeeper.config_file }}"
when: installation_method == "package"
# Files cannot be copied because directory is not created in check mode
ignore_errors: "{{ ansible_check_mode }}"
- name: Stop Service
systemd:
name: "{{ kafka_broker_service_name }}"
state: stopped
- name: Configure Repositories
import_role:
name: confluent.common
vars:
install_java: false
- name: Remove Confluent-Kafka Packages - Red Hat
yum:
name: confluent-kafka-2.12
state: absent
when:
- ansible_os_family == "RedHat"
- not confluent_server_enabled|bool
- installation_method == "package"
# Packages will not be found in check mode because repos not changed
ignore_errors: "{{ ansible_check_mode }}"
- name: Remove Confluent-Kafka Packages - Debian
apt:
name: confluent-kafka-2.12
state: absent
when:
- ansible_os_family == "Debian"
- not confluent_server_enabled|bool
- installation_method == "package"
# Packages will not be found in check mode because repos not changed
ignore_errors: "{{ ansible_check_mode }}"
- name: Install the Packages - Red Hat
yum:
name: "{{item}}{{confluent_package_redhat_suffix}}"
state: latest
update_cache: true
loop: "{{ kafka_broker_packages }}"
when:
- ansible_os_family == "RedHat"
- installation_method == "package"
# Packages will not be found in check mode because repos not changed
ignore_errors: "{{ ansible_check_mode }}"
- name: Install the Packages - Debian
apt:
name: "{{item}}{{confluent_package_debian_suffix}}"
update_cache: true
loop: "{{ kafka_broker_packages }}"
when:
- ansible_os_family == "Debian"
- installation_method == "package"
# Packages will not be found in check mode because repos not changed
ignore_errors: "{{ ansible_check_mode }}"
- name: Put back configuration
copy:
dest: "{{ item }}"
remote_src: true
src: "/tmp/upgrade/{{ kafka_broker_service_name }}/{{ item | basename }}-{{timestamp}}"
loop:
- "{{ kafka_broker.config_file }}"
- "{{ zookeeper.config_file }}"
when: installation_method == "package"
# Files cannot be copied because directory is not created in check mode
ignore_errors: "{{ ansible_check_mode }}"
- name: Disable Embedded Rest Proxy
# Existing server.properties is missing configuration
lineinfile:
path: "{{ kafka_broker.config_file }}"
regexp: 'kafka.rest.enable=*'
line: 'kafka.rest.enable=false'
when: not kafka_broker_rest_proxy_enabled|bool
- name: Restart Service
systemd:
daemon_reload: true
name: "{{ kafka_broker_service_name }}"
state: restarted
- name: Kafka Broker Health Check
include_tasks: roles/confluent.kafka_broker/tasks/health_check.yml
when: not ansible_check_mode
- name: Kafka Broker Upgrade - Set Inter-Broker Protocol to Current Version
hosts: kafka_broker
gather_facts: false
serial: 1
tags:
- inter_broker_protocol_version
vars:
upgrade_to_version: "6.1"
vars_files:
- roles/confluent.kafka_broker/defaults/main.yml
handlers:
- name: restart kafka
include_tasks: roles/confluent.kafka_broker/tasks/restart_and_wait.yml
tasks:
- import_role:
name: confluent.variables
- set_fact:
inter_broker_protocol_version: "{{'2.3' if upgrade_to_version is match('5.3.*') else
'2.4' if upgrade_to_version is match('5.4.*') else
'2.5' if upgrade_to_version is match('5.5.*') else
'2.6' if upgrade_to_version is match('6.0.*') else
'2.7' }}"
- name: Set Inter-Broker Protocol Version to Current Version
lineinfile:
name: "{{kafka_broker.config_file}}"
line: "inter.broker.protocol.version={{inter_broker_protocol_version}}"
regexp: inter.broker.protocol.version.*
state: present
notify: restart kafka
- meta: flush_handlers
- name: Kafka Broker Health Check
include_tasks: roles/confluent.kafka_broker/tasks/health_check.yml
when: not ansible_check_mode