From a6603945d5dd49715f2f1feb0784888dbe15e9d8 Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Sun, 9 Feb 2025 10:43:28 +0100 Subject: [PATCH 1/2] Zio-kafka - Deprecate accessor methods Bring the zio-kafka quickstart up to date with: - update the docker-compose for Kafka - remove usages of accessor methods - simplify the examples - update dependencies --- zio-quickstart-kafka/build.sbt | 4 +- zio-quickstart-kafka/docker-compose.yml | 33 +++---- .../quickstart/JsonStreamingKafkaApp.scala | 94 ++++++++++--------- .../dev/zio/quickstart/SimpleKafkaApp.scala | 71 ++++++-------- .../zio/quickstart/StreamingKafkaApp.scala | 88 ++++++++--------- 5 files changed, 141 insertions(+), 149 deletions(-) diff --git a/zio-quickstart-kafka/build.sbt b/zio-quickstart-kafka/build.sbt index 0029e64..e8d68ec 100644 --- a/zio-quickstart-kafka/build.sbt +++ b/zio-quickstart-kafka/build.sbt @@ -1,6 +1,6 @@ scalaVersion := "2.13.13" libraryDependencies ++= Seq( - "dev.zio" %% "zio-kafka" % "2.7.4", - "dev.zio" %% "zio-json" % "0.6.2" + "dev.zio" %% "zio-kafka" % "2.10.0", + "dev.zio" %% "zio-json" % "0.7.16" ) diff --git a/zio-quickstart-kafka/docker-compose.yml b/zio-quickstart-kafka/docker-compose.yml index cdcb806..9e72dc0 100644 --- a/zio-quickstart-kafka/docker-compose.yml +++ b/zio-quickstart-kafka/docker-compose.yml @@ -1,23 +1,20 @@ -version: '2' services: - zookeeper: - image: confluentinc/cp-zookeeper:latest - environment: - ZOOKEEPER_CLIENT_PORT: 2181 - ZOOKEEPER_TICK_TIME: 2000 - ports: - - 22181:2181 - - kafka: - image: confluentinc/cp-kafka:latest - depends_on: - - zookeeper + broker: + image: apache/kafka:3.9.0 + container_name: broker ports: - - 29092:29092 + - "9092:9092" environment: + KAFKA_NODE_ID: 1 KAFKA_BROKER_ID: 1 - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT - KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_PROCESS_ROLES: broker,controller + KAFKA_LISTENERS: PLAINTEXT://broker:29092,CONTROLLER://broker:29093,PLAINTEXT_HOST://0.0.0.0:9092 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092 + KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT,CONTROLLER:PLAINTEXT + KAFKA_CONTROLLER_QUORUM_VOTERS: 1@broker:29093 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1 + KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1 + KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0 + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT diff --git a/zio-quickstart-kafka/src/main/scala/dev/zio/quickstart/JsonStreamingKafkaApp.scala b/zio-quickstart-kafka/src/main/scala/dev/zio/quickstart/JsonStreamingKafkaApp.scala index 72d0ab8..36b528d 100644 --- a/zio-quickstart-kafka/src/main/scala/dev/zio/quickstart/JsonStreamingKafkaApp.scala +++ b/zio-quickstart-kafka/src/main/scala/dev/zio/quickstart/JsonStreamingKafkaApp.scala @@ -11,8 +11,10 @@ import zio.stream.ZStream import java.time.OffsetDateTime import java.util.UUID +/** This is the data we will be sending to Kafka in JSON format. */ case class Event(uuid: UUID, timestamp: OffsetDateTime, message: String) +/** A zio-json encoder/decoder for [[Event]]. */ object Event { implicit val encoder: JsonEncoder[Event] = DeriveJsonEncoder.gen[Event] @@ -21,12 +23,10 @@ object Event { DeriveJsonDecoder.gen[Event] } -object KafkaSerde { - val key: Serde[Any, Int] = - Serde.int - - val value: Serde[Any, Event] = - Serde.string.inmapM[Any, Event](s => +/** A zio-kafka serializer/deserializer for [[Event]]. */ +object EventKafkaSerde { + val event: Serde[Any, Event] = + Serde.string.inmapZIO[Any, Event](s => ZIO .fromEither(s.fromJson[Event]) .mapError(e => new RuntimeException(e)) @@ -34,49 +34,55 @@ object KafkaSerde { } object JsonStreamingKafkaApp extends ZIOAppDefault { - private val BOOSTRAP_SERVERS = List("localhost:29092") + private val BOOSTRAP_SERVERS = List("localhost:9092") private val KAFKA_TOPIC = "json-streaming-hello" - private val producer: ZLayer[Any, Throwable, Producer] = - ZLayer.scoped( - Producer.make( - ProducerSettings(BOOSTRAP_SERVERS) - ) - ) - - private val consumer: ZLayer[Any, Throwable, Consumer] = - ZLayer.scoped( - Consumer.make( - ConsumerSettings(BOOSTRAP_SERVERS) - .withGroupId("streaming-kafka-app") - ) - ) + def run: ZIO[Any, Throwable, Unit] = { + val p: ZIO[Any, Throwable, Unit] = + ZIO.scoped { + for { + producer <- Producer.make(ProducerSettings(BOOSTRAP_SERVERS)) + _ <- ZStream + .repeatZIO(Random.nextUUID <*> Clock.currentDateTime) + .schedule(Schedule.spaced(1.second)) + .map { case (uuid, time) => + new ProducerRecord( + KAFKA_TOPIC, + time.getMinute, + Event(uuid, time, "Hello, World!") + ) + } + .via(producer.produceAll(Serde.int, EventKafkaSerde.event)) + .runDrain + } yield () + } - def run = { - val p: ZStream[Producer, Throwable, Nothing] = - ZStream - .repeatZIO(Random.nextUUID <*> Clock.currentDateTime) - .schedule(Schedule.spaced(1.second)) - .map { case (uuid, time) => - new ProducerRecord( - KAFKA_TOPIC, - time.getMinute, - Event(uuid, time, "Hello, World!") + val c: ZIO[Any, Throwable, Unit] = + ZIO.scoped { + for { + consumer <- Consumer.make( + ConsumerSettings(BOOSTRAP_SERVERS).withGroupId("streaming-kafka-app") ) - } - .via(Producer.produceAll(KafkaSerde.key, KafkaSerde.value)) - .drain - - val c: ZStream[Consumer, Throwable, Nothing] = - Consumer - .plainStream(Subscription.topics(KAFKA_TOPIC), Serde.int, Serde.string) - .tap(r => Console.printLine(r.value)) - .map(_.offset) - .aggregateAsync(Consumer.offsetBatches) - .mapZIO(_.commit) - .drain + _ <- consumer + .plainStream( + Subscription.topics(KAFKA_TOPIC), + Serde.int, + EventKafkaSerde.event + ) + .tap { r => + val event: Event = r.value + Console.printLine( + s"Event ${event.uuid} was sent at ${event.timestamp} with message ${event.message}" + ) + } + .map(_.offset) + .aggregateAsync(Consumer.offsetBatches) + .mapZIO(_.commit) + .runDrain + } yield () + } - (p merge c).runDrain.provide(producer, consumer) + p <&> c } } diff --git a/zio-quickstart-kafka/src/main/scala/dev/zio/quickstart/SimpleKafkaApp.scala b/zio-quickstart-kafka/src/main/scala/dev/zio/quickstart/SimpleKafkaApp.scala index 06681f6..955e851 100644 --- a/zio-quickstart-kafka/src/main/scala/dev/zio/quickstart/SimpleKafkaApp.scala +++ b/zio-quickstart-kafka/src/main/scala/dev/zio/quickstart/SimpleKafkaApp.scala @@ -1,6 +1,5 @@ package dev.zio.quickstart -import org.apache.kafka.clients.producer.RecordMetadata import zio._ import zio.kafka.consumer._ import zio.kafka.producer.{Producer, ProducerSettings} @@ -10,53 +9,39 @@ import zio.kafka.serde._ * without using ZIO Streams. */ object SimpleKafkaApp extends ZIOAppDefault { - private val BOOSTRAP_SERVERS = List("localhost:29092") + private val BOOSTRAP_SERVERS = List("localhost:9092") private val KAFKA_TOPIC = "hello" - private def produce( - topic: String, - key: Long, - value: String - ): RIO[Any with Producer, RecordMetadata] = - Producer.produce[Any, Long, String]( - topic = topic, - key = key, - value = value, - keySerializer = Serde.long, - valueSerializer = Serde.string - ) - - private def consumeAndPrintEvents( - groupId: String, - topic: String - ): RIO[Any, Unit] = - Consumer.consumeWith( - settings = ConsumerSettings(BOOSTRAP_SERVERS) - .withGroupId(groupId), - subscription = Subscription.topics(topic), - keyDeserializer = Serde.long, - valueDeserializer = Serde.string - )(record => Console.printLine((record.key(), record.value())).orDie) + def run: ZIO[Scope, Throwable, Unit] = { + for { + c <- Consumer + .consumeWith( + settings = + ConsumerSettings(BOOSTRAP_SERVERS).withGroupId("simple-kafka-app"), + subscription = Subscription.topics(KAFKA_TOPIC), + keyDeserializer = Serde.long, + valueDeserializer = Serde.string + ) { record => + Console.printLine(s"Consumed ${record.key()}, ${record.value()}").orDie + } + .fork - private val producer: ZLayer[Any, Throwable, Producer] = - ZLayer.scoped( - Producer.make( - ProducerSettings(BOOSTRAP_SERVERS) - ) - ) + producer <- Producer.make(ProducerSettings(BOOSTRAP_SERVERS)) + p <- Clock.currentDateTime + .flatMap { time => + producer.produce[Any, Long, String]( + topic = KAFKA_TOPIC, + key = time.getHour.toLong, + value = s"$time -- Hello, World!", + keySerializer = Serde.long, + valueSerializer = Serde.string + ) + } + .schedule(Schedule.spaced(1.second)) + .fork - def run = - for { - c <- consumeAndPrintEvents("simple-kafka-app", KAFKA_TOPIC).fork - p <- - Clock.currentDateTime - .flatMap { time => - produce(KAFKA_TOPIC, time.getHour.toLong, s"$time -- Hello, World!") - } - .schedule(Schedule.spaced(1.second)) - .provide(producer) - .fork _ <- (c <*> p).join } yield () + } } diff --git a/zio-quickstart-kafka/src/main/scala/dev/zio/quickstart/StreamingKafkaApp.scala b/zio-quickstart-kafka/src/main/scala/dev/zio/quickstart/StreamingKafkaApp.scala index e202002..7b7b378 100644 --- a/zio-quickstart-kafka/src/main/scala/dev/zio/quickstart/StreamingKafkaApp.scala +++ b/zio-quickstart-kafka/src/main/scala/dev/zio/quickstart/StreamingKafkaApp.scala @@ -8,50 +8,54 @@ import zio.kafka.serde._ import zio.stream.ZStream object StreamingKafkaApp extends ZIOAppDefault { - private val BOOSTRAP_SERVERS = List("localhost:29092") + private val BOOSTRAP_SERVERS = List("localhost:9092") private val KAFKA_TOPIC = "streaming-hello" - private val producer: ZLayer[Any, Throwable, Producer] = - ZLayer.scoped( - Producer.make( - ProducerSettings(BOOSTRAP_SERVERS) - ) - ) - - private val consumer: ZLayer[Any, Throwable, Consumer] = - ZLayer.scoped( - Consumer.make( - ConsumerSettings(BOOSTRAP_SERVERS) - .withGroupId("streaming-kafka-app") - ) - ) - - def run = { - val p: ZStream[Producer, Throwable, Nothing] = - ZStream - .repeatZIO(Clock.currentDateTime) - .schedule(Schedule.spaced(1.second)) - .map { time => - new ProducerRecord( - KAFKA_TOPIC, - time.getMinute, - s"$time -- Hello, World!" - ) - } - .via(Producer.produceAll(Serde.int, Serde.string)) - .drain - - val c: ZStream[Consumer, Throwable, Nothing] = - Consumer - .plainStream(Subscription.topics(KAFKA_TOPIC), Serde.int, Serde.string) - // do not use `tap` it in prod because it destroys the chunking structure and leads to lower performance - .tap(r => Console.printLine(r.value)) - .map(_.offset) - .aggregateAsync(Consumer.offsetBatches) - .mapZIO(_.commit) - .drain - - (p merge c).runDrain.provide(producer, consumer) + private val producerSettings = ProducerSettings(BOOSTRAP_SERVERS) + private val consumerSettings = + ConsumerSettings(BOOSTRAP_SERVERS).withGroupId("streaming-kafka-app") + + def run: ZIO[Any, Throwable, Unit] = { + val p: ZIO[Any, Throwable, Unit] = + ZIO.scoped { + for { + producer <- Producer.make(producerSettings) + _ <- ZStream + .repeatZIO(Clock.currentDateTime) + .schedule(Schedule.spaced(1.second)) + .map { time => + new ProducerRecord( + KAFKA_TOPIC, + time.getMinute, + s"$time -- Hello, World!" + ) + } + .via(producer.produceAll(Serde.int, Serde.string)) + .runDrain + } yield () + } + + val c: ZIO[Any, Throwable, Unit] = + ZIO.scoped { + for { + consumer <- Consumer.make(consumerSettings) + _ <- consumer + .plainStream( + Subscription.topics(KAFKA_TOPIC), + Serde.int, + Serde.string + ) + // do not use `tap` in prod because it destroys the chunking structure and leads to lower performance + // See https://zio.dev/zio-kafka/serialization-and-deserialization#a-warning-about-mapzio + .tap(r => Console.printLine("Consumed: " + r.value)) + .map(_.offset) + .aggregateAsync(Consumer.offsetBatches) + .mapZIO(_.commit) + .runDrain + } yield () + } + + p <&> c } } From db4b3673a2fc25a045b3ef2d4d94db69953e7241 Mon Sep 17 00:00:00 2001 From: Erik van Oosten Date: Sun, 9 Feb 2025 15:55:14 +0100 Subject: [PATCH 2/2] Fix ci, run formatter --- .github/workflows/ci.yml | 37 ++++++++++++------- build.sbt | 7 ++-- project/plugins.sbt | 12 +++--- .../quickstart/JsonStreamingKafkaApp.scala | 4 +- .../dev/zio/quickstart/SimpleKafkaApp.scala | 4 +- 5 files changed, 38 insertions(+), 26 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3a48a64..12ef079 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -4,15 +4,18 @@ name: CI env: JDK_JAVA_OPTIONS: -XX:+PrintCommandLineFlags - JVM_OPTS: -XX:+PrintCommandLineFlags 'on': workflow_dispatch: {} release: types: - published push: {} - pull_request: {} - create: {} + pull_request: + branches-ignore: + - gh-pages +concurrency: + group: ${{ github.workflow }}-${{ github.ref == format('refs/heads/{0}', github.event.repository.default_branch) && github.run_id || github.ref }} + cancel-in-progress: true jobs: build: name: Build @@ -20,17 +23,19 @@ jobs: continue-on-error: true steps: - name: Git Checkout - uses: actions/checkout@v3.3.0 + uses: actions/checkout@v4 with: fetch-depth: '0' - name: Install libuv run: sudo apt-get update && sudo apt-get install -y libuv1-dev - name: Setup Scala - uses: actions/setup-java@v3.10.0 + uses: actions/setup-java@v4 with: - distribution: temurin - java-version: '8' + distribution: corretto + java-version: '17' check-latest: true + - name: Setup SBT + uses: sbt/setup-sbt@v1 - name: Cache Dependencies uses: coursier/cache-action@v6 - name: Check all code compiles @@ -43,17 +48,19 @@ jobs: continue-on-error: false steps: - name: Git Checkout - uses: actions/checkout@v3.3.0 + uses: actions/checkout@v4 with: fetch-depth: '0' - name: Install libuv run: sudo apt-get update && sudo apt-get install -y libuv1-dev - name: Setup Scala - uses: actions/setup-java@v3.10.0 + uses: actions/setup-java@v4 with: - distribution: temurin - java-version: '8' + distribution: corretto + java-version: '17' check-latest: true + - name: Setup SBT + uses: sbt/setup-sbt@v1 - name: Cache Dependencies uses: coursier/cache-action@v6 - name: Check if the site workflow is up to date @@ -78,15 +85,17 @@ jobs: - name: Install libuv run: sudo apt-get update && sudo apt-get install -y libuv1-dev - name: Setup Scala - uses: actions/setup-java@v3.10.0 + uses: actions/setup-java@v4 with: - distribution: temurin + distribution: corretto java-version: ${{ matrix.java }} check-latest: true + - name: Setup SBT + uses: sbt/setup-sbt@v1 - name: Cache Dependencies uses: coursier/cache-action@v6 - name: Git Checkout - uses: actions/checkout@v3.3.0 + uses: actions/checkout@v4 with: fetch-depth: '0' - name: Test diff --git a/build.sbt b/build.sbt index 55f0d18..10748b4 100644 --- a/build.sbt +++ b/build.sbt @@ -9,15 +9,14 @@ inThisBuild( ciPostReleaseJobs := Seq.empty, ciCheckWebsiteBuildProcess := Seq.empty, scalaVersion := scala213.value, - ciTargetScalaVersions := makeTargetScalaMap( + ciTargetScalaVersions := targetScalaVersionsFor( `zio-quickstart-encode-decode-json`, `zio-quickstart-sql`, `zio-quickstart-prelude`, `zio-quickstart-restful-webservice` ).value, - ciDefaultTargetJavaVersions := Seq("17"), - semanticdbEnabled := true, - semanticdbVersion := scalafixSemanticdb.revision + ciTargetJavaVersions := Seq("17"), + semanticdbEnabled := true ) ) diff --git a/project/plugins.sbt b/project/plugins.sbt index 732201e..58c036f 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,11 +1,11 @@ -val zioSbtVersion = "0.4.0-alpha.6+15-525bdf8e-SNAPSHOT" +val zioSbtVersion = "0.4.0-alpha.30" -//addSbtPlugin("dev.zio" % "zio-sbt-ecosystem" % zioSbtVersion) -addSbtPlugin("dev.zio" % "zio-sbt-website" % zioSbtVersion) -addSbtPlugin("dev.zio" % "zio-sbt-ci" % zioSbtVersion) +addSbtPlugin("dev.zio" % "zio-sbt-ecosystem" % zioSbtVersion) +addSbtPlugin("dev.zio" % "zio-sbt-website" % zioSbtVersion) +addSbtPlugin("dev.zio" % "zio-sbt-ci" % zioSbtVersion) -addSbtPlugin("com.github.sbt" % "sbt-native-packager" % "1.9.16") +addSbtPlugin("com.github.sbt" % "sbt-native-packager" % "1.11.1") addSbtPlugin("io.spray" % "sbt-revolver" % "0.10.0") -addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.11.1") +addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.14.0") resolvers ++= Resolver.sonatypeOssRepos("public") diff --git a/zio-quickstart-kafka/src/main/scala/dev/zio/quickstart/JsonStreamingKafkaApp.scala b/zio-quickstart-kafka/src/main/scala/dev/zio/quickstart/JsonStreamingKafkaApp.scala index 36b528d..1165c33 100644 --- a/zio-quickstart-kafka/src/main/scala/dev/zio/quickstart/JsonStreamingKafkaApp.scala +++ b/zio-quickstart-kafka/src/main/scala/dev/zio/quickstart/JsonStreamingKafkaApp.scala @@ -61,7 +61,9 @@ object JsonStreamingKafkaApp extends ZIOAppDefault { ZIO.scoped { for { consumer <- Consumer.make( - ConsumerSettings(BOOSTRAP_SERVERS).withGroupId("streaming-kafka-app") + ConsumerSettings(BOOSTRAP_SERVERS).withGroupId( + "streaming-kafka-app" + ) ) _ <- consumer .plainStream( diff --git a/zio-quickstart-kafka/src/main/scala/dev/zio/quickstart/SimpleKafkaApp.scala b/zio-quickstart-kafka/src/main/scala/dev/zio/quickstart/SimpleKafkaApp.scala index 955e851..ecf9fae 100644 --- a/zio-quickstart-kafka/src/main/scala/dev/zio/quickstart/SimpleKafkaApp.scala +++ b/zio-quickstart-kafka/src/main/scala/dev/zio/quickstart/SimpleKafkaApp.scala @@ -22,7 +22,9 @@ object SimpleKafkaApp extends ZIOAppDefault { keyDeserializer = Serde.long, valueDeserializer = Serde.string ) { record => - Console.printLine(s"Consumed ${record.key()}, ${record.value()}").orDie + Console + .printLine(s"Consumed ${record.key()}, ${record.value()}") + .orDie } .fork