Skip to content

Commit

Permalink
[8.16](backport #42401) Handle authorization errors in Kafka output (#…
Browse files Browse the repository at this point in the history
…42845)

When there is an authorisation error in the Kafka output, the events
are dropped and an error message is logged.

(cherry picked from commit 5720300)

# Conflicts:
#	libbeat/tests/integration/kafka_test.go

* Update CHANGELOG.next.asciidoc

* run mage check-no-changes on beats

* Fix tests

---------

Co-authored-by: Tiago Queiroz <tiago.queiroz@elastic.co>
  • Loading branch information
mergify[bot] and belimawr authored Mar 5, 2025
1 parent 7cd1aec commit de80ecd
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Support Elastic Agent control protocol chunking support {pull}37343[37343]
- Lower logging level to debug when attempting to configure beats with unknown fields from autodiscovered events/environments {pull}[37816][37816]
- Set timeout of 1 minute for FQDN requests {pull}37756[37756]
- The Kafka output now drops events when there is an authorisation error {issue}42343[42343] {pull}42401[42401]

*Auditbeat*

Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@ require (
github.com/elastic/go-quark v0.2.0
github.com/elastic/go-sfdc v0.0.0-20241010131323-8e176480d727
github.com/elastic/mito v1.15.0
github.com/elastic/sarama v1.19.0
github.com/elastic/tk-btf v0.1.0
github.com/elastic/toutoumomoma v0.0.0-20240626215117-76e39db18dfb
github.com/foxcpp/go-mockdns v0.0.0-20201212160233-ede2f9158d15
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,8 @@ github.com/elastic/mito v1.15.0 h1:MicOxLSVkgU2Aonbh3i+++66Wl5wvD8y9gALK8PQDYs=
github.com/elastic/mito v1.15.0/go.mod h1:J+wCf4HccW2YoSFmZMGu+d06gN+WmnIlj5ehBqine74=
github.com/elastic/pkcs8 v1.0.0 h1:HhitlUKxhN288kcNcYkjW6/ouvuwJWd9ioxpjnD9jVA=
github.com/elastic/pkcs8 v1.0.0/go.mod h1:ipsZToJfq1MxclVTwpG7U/bgeDtf+0HkUiOxebk95+0=
github.com/elastic/sarama v1.19.0 h1:MKjfcrOgJnTpXZXxdHPCAWkpD/qGB6LVFsnqThggHaM=
github.com/elastic/sarama v1.19.0/go.mod h1:Yu+7tBb5Mkwg3b0VXG1mea/Eyk7ZCIUgWWkkO7ko71A=
github.com/elastic/sarama v1.19.1-0.20220310193331-ebc2b0d8eef3 h1:FzA0/n4iMt8ojGDGRoiFPSHFvvdVIvxOxyLtiFnrLBM=
github.com/elastic/sarama v1.19.1-0.20220310193331-ebc2b0d8eef3/go.mod h1:mdtqvCSg8JOxk8PmpTNGyo6wzd4BMm4QXSfDnTXmgkE=
github.com/elastic/tk-btf v0.1.0 h1:T4rbsnfaRH/MZKSLwZFd3sndt/NexsQb0IXWtMQ9PAA=
Expand Down
28 changes: 28 additions & 0 deletions libbeat/outputs/kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,20 @@ type msgRef struct {

var (
errNoTopicsSelected = errors.New("no topic could be selected")

// authErrors are authentication/authorisation errors that will cause
// the event to be dropped
authErrors = []error{
sarama.ErrTopicAuthorizationFailed,
sarama.ErrGroupAuthorizationFailed,
sarama.ErrClusterAuthorizationFailed,
// I believe those are handled before the connection is
// stabilised, however we also handle them here just in
// case
sarama.ErrUnsupportedSASLMechanism,
sarama.ErrIllegalSASLState,
sarama.ErrSASLAuthenticationFailed,
}
)

func newKafkaClient(
Expand Down Expand Up @@ -368,6 +382,10 @@ func (r *msgRef) fail(msg *message, err error) {
len(msg.key)+len(msg.value))
r.client.observer.PermanentErrors(1)

case isAuthError(err):
r.client.log.Errorf("Kafka (topic=%v): authorisation error: %s", msg.topic, err)
r.client.observer.PermanentErrors(1)

case errors.Is(err, breaker.ErrBreakerOpen):
// Add this message to the failed list, but don't overwrite r.err since
// all the breaker error means is "there were a lot of other errors".
Expand Down Expand Up @@ -425,3 +443,13 @@ func (c *client) Test(d testing.Driver) {
}

}

func isAuthError(err error) bool {
for _, e := range authErrors {
if errors.Is(err, e) {
return true
}
}

return false
}
128 changes: 128 additions & 0 deletions libbeat/tests/integration/kafka_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//go:build integration

package integration

import (
"fmt"
"testing"
"time"

"github.com/elastic/sarama"
)

var (
// https://github.com/elastic/sarama/blob/c7eabfcee7e5bcd7d0071f0ece4d6bec8c33928a/config_test.go#L14-L17
// The version of MockBroker used when this test was written only supports the lowest protocol version by default.
// Version incompatibilities will result in message decoding errors between the mock and the beat.
kafkaVersion = sarama.MinVersion
kafkaTopic = "test_topic"
kafkaCfg = `
mockbeat:
logging:
level: debug
selectors:
- publisher_pipeline_output
- kafka
queue.mem:
events: 4096
flush.timeout: 0s
output.kafka:
topic: %s
version: %s
hosts:
- %s
backoff:
init: 0.1s
max: 0.2s
`
)

// TestKafkaOutputCanConnectAndPublish ensures the beat Kafka output can successfully produce messages to Kafka.
// Regression test for https://github.com/elastic/beats/issues/41823 where the Kafka output would
// panic on the first Publish because it's Connect method was no longer called.
func TestKafkaOutputCanConnectAndPublish(t *testing.T) {
// Create a Mock Kafka broker that will listen on localhost on a random unallocated port.
// The reference configuration was taken from https://github.com/elastic/sarama/blob/c7eabfcee7e5bcd7d0071f0ece4d6bec8c33928a/async_producer_test.go#L141.
leader := sarama.NewMockBroker(t, 1)
defer leader.Close()

// The mock broker must respond to a single metadata request.
metadataResponse := new(sarama.MetadataResponse)
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
metadataResponse.AddTopicPartition(kafkaTopic, 0, leader.BrokerID(), nil, nil, sarama.ErrNoError)
leader.Returns(metadataResponse)

// The mock broker must return a single produce response. If no produce request is received, the test will fail.
// This guarantees that mockbeat successfully produced a message to Kafka and connectivity is established.
prodSuccess := new(sarama.ProduceResponse)
prodSuccess.AddTopicPartition(kafkaTopic, 0, sarama.ErrNoError)
leader.Returns(prodSuccess)

// Start mockbeat with the appropriate configuration.
mockbeat := NewBeat(t, "mockbeat", "../../libbeat.test")
mockbeat.WriteConfigFile(fmt.Sprintf(kafkaCfg, kafkaTopic, kafkaVersion, leader.Addr()))
mockbeat.Start()

// Wait for mockbeat to log that it successfully published a batch to Kafka.
// This ensures that mockbeat received the expected produce response configured above.
mockbeat.WaitForLogs(
`finished kafka batch`,
10*time.Second,
"did not find finished batch log")
}

func TestAuthorisationErrors(t *testing.T) {
leader := sarama.NewMockBroker(t, 1)
defer leader.Close()

// The mock broker must respond to a single metadata request.
metadataResponse := new(sarama.MetadataResponse)
metadataResponse.AddBroker(leader.Addr(), leader.BrokerID())
metadataResponse.AddTopicPartition(kafkaTopic, 0, leader.BrokerID(), nil, nil, sarama.ErrNoError)
leader.Returns(metadataResponse)

authErrors := []sarama.KError{
sarama.ErrTopicAuthorizationFailed,
sarama.ErrGroupAuthorizationFailed,
sarama.ErrClusterAuthorizationFailed,
}

// The mock broker must return one produce response per error we want
// to test. If less calls are made, the test will fail
for _, err := range authErrors {
producerResponse := new(sarama.ProduceResponse)
producerResponse.AddTopicPartition(kafkaTopic, 0, err)
leader.Returns(producerResponse)
}

// Start mockbeat with the appropriate configuration.
mockbeat := NewBeat(t, "mockbeat", "../../libbeat.test")
mockbeat.WriteConfigFile(fmt.Sprintf(kafkaCfg, kafkaTopic, kafkaVersion, leader.Addr()))
mockbeat.Start()

// Wait for mockbeat to log each of the errors.
for _, err := range authErrors {
t.Log("waiting for:", err)
mockbeat.WaitForLogs(
fmt.Sprintf("Kafka (topic=test_topic): authorisation error: %s", err),
10*time.Second,
"did not find error log: %s", err)
}
}

0 comments on commit de80ecd

Please sign in to comment.