Projeto desenvolvido utilizando Java 21 e Spring boot 3 utilizando Kafka com um producer e um consumer seguindo o modelo de Arquitetura Orientada a Evento
- Requisitos
- Configurações Kafka
- Comandos Kafka CLI
- Mirror Maker 2
- Kafka Confluent Replicator
- Uso de multiplos consumers e producers
- Monitoramento
- Escalando aplicações Kubernetes com o KEDA
- Referencias
- Java 21
- Spring 3.3.1
- Gradle 8.8
- Kafka 3.5.0
- Docker
- Kubernetes (opcional)
Antes de executar o projeto Spring, é necessário criar o tópico Kafka. Para isso, devemos antes iniciar o Kafka, onde iniciamos o Zookeper e o kafka com os comandos abaixo:
- Iniciar Zookeper:
./bin/zookeeper-server-start.sh ./config/zookeeper.properties
- Iniciar Kafka:
./bin/kafka-server-start.sh ./config/server.properties
Para criar o tópico kafka, é necessário executar o comando abaixo na pasta do Kafka:
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --topic topico-teste --create --partitions 3 --replication-factor 1
Para acompanhar e validar se as mensagens estão sendo enviadas para o tópico, execute o comando abaixo:
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topico-teste --from-beginning
Também é possivel executar o Kafka com o arquivo Docker disponibilizado no repositório na pasta docker/confluent-all-in-one/
. Para subir o Kafka utilizando o Docker, deve executar o comando abaixo:
docker-compose up
Também foi disponibilizado a configuração de subir um ambiente Kafka utilizando o Kubernetes, baseado no ambiente do ambiente Docker. Para subir o ambiente Kafka no Kubernetes, basta acessar a pasta docker/confluent-all-in-one/
e executar o comando abaixo, que irá criar todos os artefatos Kubernetes:
kubectl apply -f kubernetes
Abaixo:
kafka-reassign-partitions.sh --zookeeper <YOUR_ZOOKEEPER> --verify --reassignment-json-file <YOUR_JSON_FILE> 2>&1 | grep "Leader: -1" | awk '{print $2}' | sort | uniq | awk 'BEGIN{printf "{\"topics\": [\n"} {printf " {\n \"topic\": \"%s\"\n },\n", $1} END{print "]}\n"}' | sed '$s/,$//'
kafka-reassign-partitions.sh --zookeeper <YOUR_ZOOKEEPER> --verify --reassignment-json-file <YOUR_JSON_FILE> 2>&1 | grep "Leader: -1" | awk '{print $2}' | sort | uniq | awk 'BEGIN{printf "{\"topics\": [\n"} {printf " {\n \"topic\": \"%s\"\n },\n", $1} END{print "]}\n"}' | sed '$s/,$//' > topics_without_leader.json
Com isso será criado o Broker Kafka, Zookeper, Schema Registry e o Kafka UI da Confluent, onde o gerenciamento será realizado pela interface e pode ser acessado acessando a url Control Center.
É possivel monitorar os recursos do cluster Kubernetes utilizando o Prometheus e o Grafana, para isso. Como foi utilizando o Microk8s para os testes local, execute os comandos abaixo para instalar o Prometheus e o Grafana:
# Via Microk8s
microk8s enable observability
Com isso, será habilitado o prometheus e o Dash do Grafana, que pode ser acessado pela url http://localhost:37349, onde o usuário e senha padrão é admin/prom-operator.
Também é possivel monitorar o Kafka utilizando o Confluent Control Center, que pode ser acessado pela url http://localhost:9021, que monitora os recursos internos do Kafka, como o KSQLDB, Schema Registry, Kafka Connect, Kafka Broker, Kafka Rest Proxy.
Para a escala de aplicações Kubernetes, foi utilizado o KEDA, que é um componente que permite escalar aplicações baseado
em métricas mais complexas, no caso foi utilizado o lag do consumer Kafka. Para isso, foi substituido o artefato HPA (Horizontal Pod Autoscaler)
pelo ScaledObject do KEDA, que pode ser visto no arquivo hpa.yaml
na pasta kubernetes/
nos projetos producer e consumer.
No arquivo contem o exemplo de implementação com HPA e com o ScaledObject do KEDA, onde está escalando com memoria e cpu, e para o consumer inclui
a metrica do lag do consumer.
com o KEDA instado, agora será configurado a configuração de autenticaçao do KEDA com o Kafka, para isso será necessário disponibilizar as credenciais do cluster Kafka.
Neste estudo foi utilizado o AWS Secret Manager para armazenar as credenciais, e o External secret Operator para disponibilizar as credenciais para o Keda
Para isso, siga o procedimento abaixo:
- Instale o KEDA + External Secret Operator com o comando abaixo:
# instalar o KEDA
## Via Microk8s
microk8s enable keda
## Via Helm
helm repo add kedacore https://kedacore.github.io/charts
helm repo update
helm install keda kedacore/keda --namespace keda --create-namespace
# instalar o External Secret Operator
## Via Healm
helm repo add external-secrets https://charts.external-secrets.io
helm install external-secrets \
external-secrets/external-secrets \
-n external-secrets \
--create-namespace
- Com os plugins instalados, devemos configurar as credenciais da AWS para o External Secret Operator. Para isso, ajuste o arquivo
aws-secret.yaml
na pastakubernetes/keda/aws/
do projeto producer e consumer, com as credenciais de um usuario IAM (AWS acess key e AWS secret key id).
apiVersion: v1
kind: Secret
metadata:
name: keda-aws-credential-secret
namespace: external-secrets
type: Opaque
data:
AWS_ACCESS_KEY_ID: <base64-encoded> # echo -n "AWS_ACCES_KEY_ID" | base64
AWS_SECRET_ACCESS_KEY: <base64-encoded> # echo -n "AWS_SECRET_ACCESS_KEY" | base64
- Com as credenciais configuradas, agora iremos ajustar o artefato que irá obter criar o cliente AWS para o External Operator, utilizando a secret criada na etapa 2.
apiVersion: external-secrets.io/v1beta1
kind: ClusterSecretStore
metadata:
name: aws-secretsmanager
spec:
provider:
aws:
service: SecretsManager
region: us-east-1
auth:
secretRef:
accessKeyIDSecretRef:
name: keda-aws-credential-secret #nome dos kubernetes secrets com as credenciais
namespace: external-secrets
key: AWS_ACCESS_KEY_ID # nome do campo da credencial access-key no kubernetes secret
secretAccessKeySecretRef:
name: keda-aws-credential-secret #nome dos kubernetes secrets com as credenciais
namespace: external-secrets
key: AWS_SECRET_ACCESS_KEY # nome do campo da credencial access-key no kubernetes secret
- Agora devemos criar o artefato obter o valor da secret na aws e criar o Kubernetes secret com o valor da secret da AWS.
apiVersion: external-secrets.io/v1beta1
kind: ClusterExternalSecret
metadata:
name: cluster-keda-credentials-external-secret
spec:
namespaceSelector:
# Validação para quais namespace será criada o Kubernetes Secrets
matchLabels:
name: "keda"
externalSecretName: "keda-secret" # nome do Kubernetes Secrets que será criado
externalSecretSpec:
refreshInterval: "1h" # Intervalo que a secret será consultada novamente na AWS
secretStoreRef:
name: aws-secretsmanager
kind: ClusterSecretStore
target:
name: keda-credentials
creationPolicy: Owner
data:
- secretKey: username # nome do campo no Kubernetes Secrets
remoteRef:
key: "k8s/kafka/keda-user" # nome da secret na AWS
property: username # nome do campo na Secret na AWS
- secretKey: password # nome do campo no Kubernetes Secrets
remoteRef:
key: "k8s/kafka/keda-user" # nome da secret na AWS
property: password # nome do campo na Secret na AWS
- Pronto, agora será necessário configurar o KEDA para usar esse Kubernetes Secret para se conectar com o Kafka, primeiro devemos configurar o artefato do KEDA que irá mapear a secret para o uso no Trigger do Keda:
apiVersion: keda.sh/v1alpha1
kind: ClusterTriggerAuthentication
metadata:
name: kafka-trigger-auth-aws-secret-manager
spec:
secretTargetRef:
- parameter: username # nome do campo de autenticação que o KEDA irá usar
name: keda-credentials # nome do Kubernetes secrets
key: username # nome do campo no Kubernetes Secrets
- parameter: password nome do campo de autenticação que o KEDA irá usar
name: keda-credentials # nome do Kubernetes secrets
key: password # nome do campo no Kubernetes Secrets
- Agora basta somente configurar o Trigger do Keda, utilizando o Cluster Trigger Authentication criado na etapa 5:
apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: kafka-keda-consumer-trigger
namespace: consumer
spec:
minReplicaCount: 2 # Defina aqui o número mínimo de réplicas desejado
maxReplicaCount: 6 # Defina aqui o número máximo de réplicas desejado
scaleTargetRef:
name: consumer-deployment
pollingInterval: 3
triggers:
# Configuração para configuração de trigger utilizando lag de consumer Kafka
- type: kafka
metadata:
bootstrapServers: broker.kafka.svc.cluster.local:9093
consumerGroup: consumer-teste # Consumer group que será monitorado
topic: topico-teste-avro # Tópico que será monitorado
lagThreshold: "3" # Limite de lag para iniciar a escala
offsetResetPolicy: latest # Política de reset de offset
sasl: plaintext # Configuração de autenticação
# Configuração para utilizar objeto de autenticação do tipo ClusterTriggerAuthentication (que contem valor do AWS Secret Manager)
# Esse artefato disponibiliza os campos "username" e "password" para autenticação
authenticationRef:
name: kafka-trigger-auth-aws-secret-manager
kind: ClusterTriggerAuthentication
# Configuração para configuração de trigger utilizando porcentagem de utilização do CPU
- type: cpu
metricType: Utilization
metadata:
value: "60" #valor é calculado em porcentagem
Após aplicar o artefato de Trigger junto com o Deploy da aplicação, a aplicação passará a escalar a partir do lag do tópico Kafka e do consumo de CPU
Para simular um lag no consumer, altere a variavel ENABLE_MOCK_LAG
para true
no arquivo configMap.yaml
na pasta kubernetes/
do projeto consumer.
Com isso ao produzir as mensagens com o consumer, o consumer irá aguardar 30 segundos antes de consumir a mensagem, simulando um lag.