Skip to content

Commit

Permalink
Store Kafka node certificates in separate Secrets
Browse files Browse the repository at this point in the history
Signed-off-by: Katherine Stanley <11195226+katheris@users.noreply.github.com>
  • Loading branch information
katheris committed Jan 6, 2025
1 parent 4239382 commit 478869b
Show file tree
Hide file tree
Showing 12 changed files with 230 additions and 184 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ public static String initContainerClusterRoleBindingName(String cluster, String
*
* @return The name of the corresponding Kafka Secret.
*/
@Deprecated // Kafka server certificates are now kept in per-node Secrets
public static String kafkaSecretName(String clusterName) {
return clusterName + "-kafka-brokers";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,21 @@ public static Map<String, String> buildSecretData(Map<String, CertAndKey> certif
return data;
}

/**
* Constructs a Map containing the provided certificates to be stored in a Kubernetes Secret.
*
* @param certificateName Name to use to create identifier for storing the data in the Secret.
* @param certAndKey Private key and public cert pair to store in the Secret.
*
* @return Map of certificate identifier to base64 encoded certificate or key
*/
public static Map<String, String> buildSecretData(String certificateName, CertAndKey certAndKey) {
return Map.of(
Ca.SecretEntry.KEY.asKey(certificateName), certAndKey.keyAsBase64String(),
Ca.SecretEntry.CRT.asKey(certificateName), certAndKey.certAsBase64String()
);
}

private static byte[] decodeFromSecret(Secret secret, String key) {
if (secret.getData().get(key) != null && !secret.getData().get(key).isEmpty()) {
return Util.decodeBytesFromBase64(secret.getData().get(key));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ protected String caName() {
* @param existingCertificates Existing certificates (or null if they do not exist yet)
* @param nodes Nodes that are part of the Cruise Control cluster
* @param isMaintenanceTimeWindowsSatisfied Flag indicating whether we can do maintenance tasks or not
* @param caCertGenerationChanged Flag indicating whether the CA cert generation has changed since the existing certificates were issued
*
* @return Map with CertAndKey object containing the public and private key
*
Expand All @@ -118,8 +117,7 @@ protected Map<String, CertAndKey> generateCcCerts(
String clusterName,
Map<String, CertAndKey> existingCertificates,
Set<NodeRef> nodes,
boolean isMaintenanceTimeWindowsSatisfied,
boolean caCertGenerationChanged
boolean isMaintenanceTimeWindowsSatisfied
) throws IOException {
DnsNameGenerator ccDnsGenerator = DnsNameGenerator.of(namespace, CruiseControlResources.serviceName(clusterName));

Expand All @@ -143,8 +141,8 @@ protected Map<String, CertAndKey> generateCcCerts(
nodes,
subjectFn,
existingCertificates,
isMaintenanceTimeWindowsSatisfied,
caCertGenerationChanged);
isMaintenanceTimeWindowsSatisfied
);
}

/**
Expand All @@ -158,7 +156,6 @@ protected Map<String, CertAndKey> generateCcCerts(
* @param externalBootstrapAddresses List of external bootstrap addresses (used for certificate SANs)
* @param externalAddresses Map with external listener addresses for the different nodes (used for certificate SANs)
* @param isMaintenanceTimeWindowsSatisfied Flag indicating whether we can do maintenance tasks or not
* @param caCertGenerationChanged Flag indicating whether the CA cert generation has changed since the existing certificates were issued
*
* @return Map with CertAndKey objects containing the public and private keys for the different brokers
*
Expand All @@ -171,8 +168,7 @@ protected Map<String, CertAndKey> generateBrokerCerts(
Set<NodeRef> nodes,
Set<String> externalBootstrapAddresses,
Map<Integer, Set<String>> externalAddresses,
boolean isMaintenanceTimeWindowsSatisfied,
boolean caCertGenerationChanged
boolean isMaintenanceTimeWindowsSatisfied
) throws IOException {
Function<NodeRef, Subject> subjectFn = node -> {
Subject.Builder subject = new Subject.Builder()
Expand Down Expand Up @@ -219,8 +215,8 @@ protected Map<String, CertAndKey> generateBrokerCerts(
nodes,
subjectFn,
existingCertificates,
isMaintenanceTimeWindowsSatisfied,
caCertGenerationChanged);
isMaintenanceTimeWindowsSatisfied
);
}

@Override
Expand All @@ -237,7 +233,6 @@ protected String caCertGenerationAnnotation() {
* @param subjectFn Function to generate certificate subject for given node / pod
* @param existingCertificates Existing certificates (or null if they do not exist yet)
* @param isMaintenanceTimeWindowsSatisfied Flag indicating if we are inside a maintenance window or not
* @param caCertGenerationChanged Flag indicating whether the CA cert generation has changed since the existing certificates were issued
*
* @return Returns map with node certificates which can be used to create or update the stored certificates
*
Expand All @@ -248,8 +243,7 @@ protected String caCertGenerationAnnotation() {
Set<NodeRef> nodes,
Function<NodeRef, Subject> subjectFn,
Map<String, CertAndKey> existingCertificates,
boolean isMaintenanceTimeWindowsSatisfied,
boolean caCertGenerationChanged
boolean isMaintenanceTimeWindowsSatisfied
) throws IOException {
// Maps for storing the certificates => will be used in the new or updated certificate store. This map is filled in this method and returned at the end.
Map<String, CertAndKey> certs = new HashMap<>();
Expand All @@ -269,7 +263,6 @@ protected String caCertGenerationAnnotation() {

if (!this.certRenewed() // No CA renewal is happening
&& certAndKey != null // There is a public cert and private key for this pod
&& !caCertGenerationChanged // The CA cert generation has not changed since the existing certificates were issued
) {
// A certificate for this node already exists, so we will try to reuse it
LOGGER.debugCr(reconciliation, "Certificate for node {} already exists", node);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -434,8 +434,10 @@ public Secret generateCertificatesSecret(String namespace, String clusterName, C
LOGGER.debugCr(reconciliation, "Generating certificates");
try {
Set<NodeRef> nodes = Set.of(new NodeRef(CruiseControl.COMPONENT_TYPE, 0, null, false, false));
ccCerts = clusterCa.generateCcCerts(namespace, clusterName, CertUtils.extractCertsAndKeysFromSecret(existingSecret, nodes),
nodes, isMaintenanceTimeWindowsSatisfied, clusterCa.hasCaCertGenerationChanged(existingSecret));
ccCerts = clusterCa.generateCcCerts(namespace, clusterName,
// Only pass existing certificates if the CA cert generation hasn't changed since they were generated
clusterCa.hasCaCertGenerationChanged(existingSecret) ? Map.of() : CertUtils.extractCertsAndKeysFromSecret(existingSecret, nodes),
nodes, isMaintenanceTimeWindowsSatisfied);
} catch (IOException e) {
LOGGER.warnCr(reconciliation, "Error while generating certificates", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1230,37 +1230,54 @@ public List<StrimziPodSet> generatePodSets(boolean isOpenShift,
}

/**
* Generates the private keys for the Kafka brokers (if needed) and the secret with them which contains both the
* Generates the private keys for the Kafka nodes (if needed) and the Secrets with them which contain both the
* public and private keys.
*
* @param clusterCa The CA for cluster certificates
* @param clientsCa The CA for clients certificates
* @param existingSecret The existing secret with Kafka certificates
* @param existingSecrets The existing secrets containing Kafka certificates
* @param externalBootstrapDnsName Map with bootstrap DNS names which should be added to the certificate
* @param externalDnsNames Map with broker DNS names which should be added to the certificate
* @param isMaintenanceTimeWindowsSatisfied Indicates whether we are in a maintenance window or not
*
* @return The generated Secret with broker certificates
* @return The generated Secrets containing Kafka node certificates
*/
public Secret generateCertificatesSecret(ClusterCa clusterCa, ClientsCa clientsCa, Secret existingSecret, Set<String> externalBootstrapDnsName, Map<Integer, Set<String>> externalDnsNames, boolean isMaintenanceTimeWindowsSatisfied) {
public List<Secret> generateCertificatesSecrets(ClusterCa clusterCa, ClientsCa clientsCa, List<Secret> existingSecrets, Set<String> externalBootstrapDnsName, Map<Integer, Set<String>> externalDnsNames, boolean isMaintenanceTimeWindowsSatisfied) {
Map<String, Secret> existingSecretWithName = existingSecrets.stream().collect(Collectors.toMap(secret -> secret.getMetadata().getName(), secret -> secret));
Set<NodeRef> nodes = nodes();
Map<String, CertAndKey> brokerCerts;
Map<String, CertAndKey> existingCerts = new HashMap<>();
for (NodeRef node : nodes) {
String podName = node.podName();
// Reuse existing certificate if it exists and the CA cert generation hasn't changed since they were generated
if (existingSecretWithName.get(podName) != null) {
if (clusterCa.hasCaCertGenerationChanged(existingSecretWithName.get(podName))) {
LOGGER.debugCr(reconciliation, "Certificate for pod {}/{} has old cert generation", namespace, podName);
} else {
existingCerts.put(podName, CertUtils.keyStoreCertAndKey(existingSecretWithName.get(podName), podName));
}
} else {
LOGGER.debugCr(reconciliation, "No existing certificate found for pod {}/{}", namespace, podName);
}
}

Map<String, CertAndKey> updatedCerts;
try {
brokerCerts = clusterCa.generateBrokerCerts(namespace, cluster, CertUtils.extractCertsAndKeysFromSecret(existingSecret, nodes),
nodes, externalBootstrapDnsName, externalDnsNames, isMaintenanceTimeWindowsSatisfied, clusterCa.hasCaCertGenerationChanged(existingSecret));
updatedCerts = clusterCa.generateBrokerCerts(namespace, cluster, existingCerts,
nodes, externalBootstrapDnsName, externalDnsNames, isMaintenanceTimeWindowsSatisfied);
} catch (IOException e) {
LOGGER.warnCr(reconciliation, "Error while generating certificates", e);
throw new RuntimeException("Failed to prepare Kafka certificates", e);
}

return ModelUtils.createSecret(KafkaResources.kafkaSecretName(cluster), namespace, labels, ownerReference,
CertUtils.buildSecretData(brokerCerts),
Map.ofEntries(
clusterCa.caCertGenerationFullAnnotation(),
clientsCa.caCertGenerationFullAnnotation()
),
emptyMap());
return updatedCerts.entrySet()
.stream()
.map(entry -> ModelUtils.createSecret(entry.getKey(), namespace, labels, ownerReference,
CertUtils.buildSecretData(entry.getKey(), entry.getValue()),
Map.ofEntries(
clusterCa.caCertGenerationFullAnnotation(),
clientsCa.caCertGenerationFullAnnotation()
),
emptyMap()))
.toList();
}

/**
Expand Down Expand Up @@ -1354,7 +1371,7 @@ private List<Volume> getNonDataVolumes(boolean isOpenShift, NodeRef node, PodTem

volumeList.add(VolumeUtils.createTempDirVolume(templatePod));
volumeList.add(VolumeUtils.createSecretVolume(CLUSTER_CA_CERTS_VOLUME, AbstractModel.clusterCaCertSecretName(cluster), isOpenShift));
volumeList.add(VolumeUtils.createSecretVolume(BROKER_CERTS_VOLUME, KafkaResources.kafkaSecretName(cluster), isOpenShift));
volumeList.add(VolumeUtils.createSecretVolume(BROKER_CERTS_VOLUME, node.podName(), isOpenShift));
volumeList.add(VolumeUtils.createSecretVolume(CLIENT_CA_CERTS_VOLUME, KafkaResources.clientsCaCertificateSecretName(cluster), isOpenShift));
volumeList.add(VolumeUtils.createConfigMapVolume(LOG_AND_METRICS_CONFIG_VOLUME_NAME, node.podName()));
volumeList.add(VolumeUtils.createEmptyDirVolume("ready-files", "1Ki", "Memory"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.fabric8.kubernetes.api.model.Node;
import io.fabric8.kubernetes.api.model.PersistentVolumeClaim;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.Secret;
import io.fabric8.kubernetes.api.model.rbac.ClusterRoleBinding;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.strimzi.api.kafka.model.common.Condition;
Expand Down Expand Up @@ -257,7 +258,7 @@ public Future<Void> reconcile(KafkaStatus kafkaStatus, Clock clock) {
.compose(i -> scaleDown())
.compose(i -> updateNodePoolStatuses(kafkaStatus))
.compose(i -> listeners())
.compose(i -> certificateSecret(clock))
.compose(i -> certificateSecrets(clock))
.compose(i -> brokerConfigurationConfigMaps())
.compose(i -> jmxSecret())
.compose(i -> podDisruptionBudget())
Expand Down Expand Up @@ -731,33 +732,84 @@ protected Future<Void> brokerConfigurationConfigMaps() {
}

/**
* Manages the Secret with the node certificates used by the Kafka brokers.
* Manages the Secrets with the node certificates used by the Kafka nodes.
*
* @param clock The clock for supplying the reconciler with the time instant of each reconciliation cycle.
* That time is used for checking maintenance windows
*
* @return Completes when the Secret was successfully created or updated
* @return Completes when the Secrets were successfully created, deleted or updated
*/
protected Future<Void> certificateSecret(Clock clock) {
return secretOperator.getAsync(reconciliation.namespace(), KafkaResources.kafkaSecretName(reconciliation.name()))
protected Future<Void> certificateSecrets(Clock clock) {
return secretOperator.listAsync(reconciliation.namespace(), kafka.getSelectorLabels().withStrimziComponentType("kafka"))
.compose(existingSecrets -> {
List<Secret> desiredCertSecrets = kafka.generateCertificatesSecrets(clusterCa, clientsCa, existingSecrets,
listenerReconciliationResults.bootstrapDnsNames, listenerReconciliationResults.brokerDnsNames,
Util.isMaintenanceTimeWindowsSatisfied(reconciliation, maintenanceWindows, clock.instant()));

List<String> secretsToDelete = new ArrayList<>(existingSecrets.stream().map(secret -> secret.getMetadata().getName()).toList());
secretsToDelete.removeAll(desiredCertSecrets.stream().map(secret -> secret.getMetadata().getName()).toList());
// Don't delete jmx secrets
secretsToDelete.remove(KafkaResources.kafkaJmxSecretName(reconciliation.name()));

Future<Void> deleteSecrets = deleteOldCertificateSecrets(secretsToDelete);
Future<Void> updateSecrets = updateCertificateSecrets(desiredCertSecrets);
return Future.join(deleteSecrets, updateSecrets);
}).mapEmpty();
}

/**
* Delete old certificate Secrets that are no longer needed.
*
* @param secrets List of Secrets to delete.
*
* @return Future that completes when the Secrets have been deleted.
*/
protected Future<Void> deleteOldCertificateSecrets(List<String> secrets) {
List<Future<Void>> deleteFutures = secrets.stream()
.map(secretName -> {
LOGGER.debugCr(reconciliation, "Deleting old Secret {}/{} that is no longer used.", reconciliation.namespace(), secretName);
return secretOperator.deleteAsync(reconciliation, reconciliation.namespace(), secretName, false);
}).toList();

// Remove old Secret containing all certs if it exists
@SuppressWarnings("deprecation")
String oldSecretName = KafkaResources.kafkaSecretName(reconciliation.name());
return secretOperator.getAsync(reconciliation.namespace(), oldSecretName)
.compose(oldSecret -> {
return secretOperator
.reconcile(reconciliation, reconciliation.namespace(), KafkaResources.kafkaSecretName(reconciliation.name()),
kafka.generateCertificatesSecret(clusterCa, clientsCa, oldSecret, listenerReconciliationResults.bootstrapDnsNames, listenerReconciliationResults.brokerDnsNames, Util.isMaintenanceTimeWindowsSatisfied(reconciliation, maintenanceWindows, clock.instant())))
if (oldSecret != null) {
LOGGER.debugCr(reconciliation, "Deleting old Secret {}/{} that is no longer needed.", reconciliation.namespace(), oldSecretName);
deleteFutures.add(secretOperator.deleteAsync(reconciliation, reconciliation.namespace(), oldSecretName, false));
}

return Future.join(deleteFutures).mapEmpty();
});
}

/**
* Updates the Secrets with the node certificates used by the Kafka nodes.
*
* @param secrets Secrets to update
*
* @return Future that completes when the Secrets were successfully created or updated
*/
protected Future<Void> updateCertificateSecrets(List<Secret> secrets) {
List<Future<Object>> reconcileFutures = secrets
.stream()
.map(secret -> {
String secretName = secret.getMetadata().getName();
return secretOperator.reconcile(reconciliation, reconciliation.namespace(), secretName, secret)
.compose(patchResult -> {
if (patchResult != null) {
for (NodeRef node : kafka.nodes()) {
kafkaServerCertificateHash.put(
node.nodeId(),
CertUtils.getCertificateThumbprint(patchResult.resource(),
Ca.SecretEntry.CRT.asKey(node.podName())
));
}
kafkaServerCertificateHash.put(
ReconcilerUtils.getPodIndexFromPodName(secretName),
CertUtils.getCertificateThumbprint(patchResult.resource(),
Ca.SecretEntry.CRT.asKey(secretName)
));
}

return Future.succeededFuture();
});
});
}).toList();
return Future.join(reconcileFutures).mapEmpty();
}

/**
Expand Down
Loading

0 comments on commit 478869b

Please sign in to comment.