Skip to content

Commit

Permalink
feat: enable basic auth on schema registry
Browse files Browse the repository at this point in the history
Signed-off-by: Mehdi Rebiai <m.rebiai@lectra.com>
  • Loading branch information
mrebiai committed Jan 9, 2024
1 parent f5dfa75 commit 03bef0a
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 13 deletions.
5 changes: 4 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,12 @@ RUN sbt dependencyUpdatesReport

FROM eclipse-temurin:17-jre as release
ENV KAFKA_BOOTSTRAP_SERVERS "localhost:9092"
ENV KAFKA_SCHEMA_REGISTRY_URL "http://localhost:8081"
ENV KAFKA_USERNAME ""
ENV KAFKA_PASSWORD ""
ENV KAFKA_SCHEMA_REGISTRY_URL "http://localhost:8081"
ENV KAFKA_SCHEMA_REGISTRY_BASIC_AUTH_CREDENTIALS_SOURCE ""
ENV KAFKA_SCHEMA_REGISTRY_KEY ""
ENV KAFKA_SCHEMA_REGISTRY_SECRET ""

ENV CONFIG_FILE "application.conf"
# example KAPOEIRA_JAVA_SYSTEM_PROPERTIES="-Dkey1=value1 -Dkey2=value2"
Expand Down
7 changes: 5 additions & 2 deletions README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -147,13 +147,16 @@ Available Docker volumes::
* `/features`: root Kapoeira folder (sub-folders can be defined)
* `/reports`: folder containing Junit generated reports (`kapoeira-report.html`, `kapoeira-report.json`, `kapoeira-report.xml`)
* `/conf`: folder with extra configuration files
* `/var/run/docker.sock` : docker-cli is installed in Kapoeira, so you can set `-v /var/run/docker.sock:/var/run/docker.sock` for calling Docker command into Kapoeira
* `/var/run/docker.sock` : docker-cli is installed in Kapoeira, so you can set `-v /var/run/docker.sock:/var/run/docker.sock` for calling Docker commands into Kapoeira

Available environment variables::
* `KAFKA_BOOTSTRAP_SERVERS`: Kafka broker host & port list; default: `localhost:9092`
* `KAFKA_SCHEMA_REGISTRY_URL`: Schema Registry URL for AVRO/Json Schema contents (on Record Key or Value); default: `http://localhost:8081`
* `KAFKA_USERNAME`: Kafka broker username for configuration having user authentication; default: empty string
* `KAFKA_PASSWORD`: Kafka broker password for configuration having user authentication; default: empty string
* `KAFKA_SCHEMA_REGISTRY_URL`: Schema Registry URL for AVRO/Json Schema contents (on Record Key or Value); default: `http://localhost:8081`
* `KAFKA_SCHEMA_REGISTRY_BASIC_AUTH_CREDENTIALS_SOURCE`: Basic authentication mode for Schema Registry, `USER_INFO` for Confluent Cloud Schema Registry; default: empty string (no basic authentication)
* `KAFKA_SCHEMA_REGISTRY_KEY`: Schema Registry API key if Basic authentication is enabled
* `KAFKA_SCHEMA_REGISTRY_SECRET`: Schema Registry API secret if Basic authentication is enabled
* `KAFKA_CONSUMER_GROUP_ID`: Kafka consumer group id for any created consumers; it is only a prefix if `KAPOEIRA_CONSUMER_GROUP_ID_UNIQUE_SUFFIX` is set to `true`; default: `kapoeira`
* `KAPOEIRA_CONSUMER_GROUP_ID_UNIQUE_SUFFIX`: Boolean for adding a unique suffix (`-<hostname>-<uuid>`) for each created consumer; default: `true`
* `KAPOEIRA_THREADS`: number of threads used by Cucumber; default: `8`
Expand Down
5 changes: 4 additions & 1 deletion src/main/resources/application.conf
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
kafka {
bootstrap.servers = ${?KAFKA_BOOTSTRAP_SERVERS}
schema.registry.url = ${?KAFKA_SCHEMA_REGISTRY_URL}
consumer.group.id = ${?KAFKA_CONSUMER_GROUP_ID}
# schema registry
schema.registry.url = ${?KAFKA_SCHEMA_REGISTRY_URL}
basic.auth.credentials.source = ${?KAFKA_SCHEMA_REGISTRY_BASIC_AUTH_CREDENTIALS_SOURCE}
basic.auth.user.info = ${?KAFKA_SCHEMA_REGISTRY_KEY}":"${?KAFKA_SCHEMA_REGISTRY_SECRET}
}
kapoeira {
consumer.group.id-unique-suffix = ${?KAPOEIRA_CONSUMER_GROUP_ID_UNIQUE_SUFFIX}
Expand Down
3 changes: 3 additions & 0 deletions src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
kafka {
schema.registry.url = ""
basic.auth.credentials.source = ""
basic.auth.user.info = "key:secret"
consumer.group.id = "kapoeira"
producer = {}
}
Expand Down
5 changes: 5 additions & 0 deletions src/main/scala/com/lectra/kapoeira/Config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ object Config {

// Kafka common config
val KAFKA_SCHEMA_REGISTRY_URL = rootConfig.getString("kafka.schema.registry.url")
val KAFKA_SCHEMA_REGISTRY_BASIC_AUTH_CREDENTIALS_SOURCE = rootConfig.getString("kafka.basic.auth.credentials.source")
private val userInfo = rootConfig.getString("kafka.basic.auth.user.info").split(":").toSeq
val KAFKA_SCHEMA_REGISTRY_BASIC_KEY = userInfo.headOption.getOrElse("")
val KAFKA_SCHEMA_REGISTRY_BASIC_SECRET = userInfo.lastOption.getOrElse("")

val kafkaCommonProperties = rootConfig.getConfig("kafka").withoutPath("consumer").withoutPath("producer").entrySet().asScala.map(e => e.getKey -> e.getValue.unwrapped()).toMap

// Kafka consumer config
Expand Down
28 changes: 19 additions & 9 deletions src/main/scala/com/lectra/kapoeira/kafka/KapoeiraProducer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,20 +29,28 @@ import io.confluent.kafka.schemaregistry.json.JsonSchemaUtils
import org.apache.avro.Schema
import org.apache.avro.generic.GenericData
import org.apache.kafka.clients.producer._
import requests.RequestAuth
import zio.{Scope, Task, ZIO}

import java.util.Properties
import scala.util.{Failure, Try}

object KapoeiraProducer extends LazyLogging {

private val requestAuth : RequestAuth = {
if (KAFKA_SCHEMA_REGISTRY_BASIC_AUTH_CREDENTIALS_SOURCE=="USER_INFO") {
(KAFKA_SCHEMA_REGISTRY_BASIC_KEY, KAFKA_SCHEMA_REGISTRY_BASIC_SECRET)
} else RequestAuth.Empty
}

private def serializeJson(subject: SubjectConfig, bytes: Array[Byte]): JsonNode = {
val schemaString =
requests
.get(
s"$KAFKA_SCHEMA_REGISTRY_URL/subjects/${subject.name}/versions/latest/schema"
)
.text()
url = s"$KAFKA_SCHEMA_REGISTRY_URL/subjects/${subject.name}/versions/latest/schema",
auth = requestAuth,
verifySslCerts = false
).text()
val value = new String(bytes)
val mapper = new ObjectMapper()
val schemaJson = mapper.readTree(schemaString)
Expand All @@ -55,9 +63,10 @@ object KapoeiraProducer extends LazyLogging {
val schemaVersions =
requests
.get(
s"$KAFKA_SCHEMA_REGISTRY_URL/subjects/${subject.name}/versions"
)
.text()
url = s"$KAFKA_SCHEMA_REGISTRY_URL/subjects/${subject.name}/versions",
auth = requestAuth,
verifySslCerts = false
).text()
val versions: Array[String] = schemaVersions.replace("[", "").replace("]", "").split(",")

val init: Try[GenericData.Record] = Failure[GenericData.Record](new Exception(s"No schema version found for subject ${subject.name}"))
Expand All @@ -67,9 +76,10 @@ object KapoeiraProducer extends LazyLogging {
val schemaString =
requests
.get(
s"$KAFKA_SCHEMA_REGISTRY_URL/subjects/${subject.name}/versions/$version/schema"
)
.text()
url = s"$KAFKA_SCHEMA_REGISTRY_URL/subjects/${subject.name}/versions/$version/schema",
auth = requestAuth,
verifySslCerts = false
).text()
val parser = new Schema.Parser()
val schema = parser.parse(schemaString)
Try(AvroSchemaUtils
Expand Down

0 comments on commit 03bef0a

Please sign in to comment.