From c81b3dd0ab14a4c7a31f3768d9a145c2cbbb5eff Mon Sep 17 00:00:00 2001 From: Gabriel Volpe Date: Fri, 26 Mar 2021 16:09:01 +0100 Subject: [PATCH 1/4] Simplify constructors --- README.md | 2 +- core/src/main/scala/cr/pulsar/Consumer.scala | 64 ++++------------- core/src/main/scala/cr/pulsar/Producer.scala | 33 +++------ core/src/main/scala/cr/pulsar/Pulsar.scala | 10 +-- core/src/main/scala/cr/pulsar/Reader.scala | 38 ++-------- core/src/main/scala/cr/pulsar/Topic.scala | 69 +++++++++++-------- docs/src/paradox/index.md | 6 +- .../AlwaysIncompatibleSchemaSuite.scala | 6 +- .../cr/pulsar/BackwardCompatSchemaSuite.scala | 6 +- .../src/it/scala/cr/pulsar/NeutronSuite.scala | 28 ++++---- 10 files changed, 96 insertions(+), 166 deletions(-) diff --git a/README.md b/README.md index 320169b5..8bcc8358 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,7 @@ Check out the [microsite](https://cr-org.github.io/neutron/). ### Pulsar version -At the moment, we target Apache Pulsar 2.6.x. +At the moment, we target Apache Pulsar 2.7.x. ### Development diff --git a/core/src/main/scala/cr/pulsar/Consumer.scala b/core/src/main/scala/cr/pulsar/Consumer.scala index 95b5f1a9..28dd2fef 100644 --- a/core/src/main/scala/cr/pulsar/Consumer.scala +++ b/core/src/main/scala/cr/pulsar/Consumer.scala @@ -73,18 +73,17 @@ object Consumer { private def mkConsumer[F[_]: Concurrent: ContextShift, E: Schema]( client: Pulsar.T, sub: Subscription, - topicType: Either[Topic.Pattern, Topic], + topic: Topic, opts: Options[F, E] ): Resource[F, Consumer[F, E]] = { val acquire = F.delay { val c = client.newConsumer(E.schema) - topicType - .fold( - p => c.topicsPattern(p.url.value.r.pattern), - t => c.topic(t.url.value) - ) - .readCompacted(opts.readCompacted) + val z = topic match { + case s: Topic.Single => c.topic(s.url.value) + case m: Topic.Multi => c.topicsPattern(m.url.value.r.pattern) + } + z.readCompacted(opts.readCompacted) .subscriptionType(sub.`type`.pulsarSubscriptionType) .subscriptionName(sub.name.value) .subscriptionMode(sub.mode.pulsarSubscriptionMode) @@ -125,57 +124,22 @@ object Consumer { } /** - * It creates a [[Consumer]] for a multi-topic subscription. + * It creates a [[Consumer]] with the supplied options, or a default value otherwise. * - * Find out more at [[https://pulsar.apache.org/docs/en/concepts-messaging/#multi-topic-subscriptions]] - * - * Note that this does not create a subscription to any Topic, - * you can use [[Consumer#subscribe]] for this purpose. - */ - def multiTopic[F[_]: Concurrent: ContextShift, E: Schema]( - client: Pulsar.T, - topicPattern: Topic.Pattern, - sub: Subscription - ): Resource[F, Consumer[F, E]] = - mkConsumer(client, sub, topicPattern.asLeft, Options[F, E]()) - - /** - * It creates a [[Consumer]] with default options. - * - * Note that this does not create a subscription to any Topic, - * you can use [[Consumer#subscribe]] for this purpose. - */ - def create[F[_]: Concurrent: ContextShift, E: Schema]( - client: Pulsar.T, - topic: Topic, - sub: Subscription - ): Resource[F, Consumer[F, E]] = - withOptions(client, topic, sub, Options[F, E]()) - - /** - * It creates a [[Consumer]] with default options and the supplied message logger. - */ - def withLogger[F[_]: ContextShift: Concurrent, E: Schema]( - client: Pulsar.T, - topic: Topic, - sub: Subscription, - logger: E => Topic.URL => F[Unit] - ): Resource[F, Consumer[F, E]] = - withOptions(client, topic, sub, Options[F, E]().withLogger(logger)) - - /** - * It creates a [[Consumer]] with the supplied options. + * A [[Topic]] can either be `Single` or `Multi` for multi topic subscriptions. * * Note that this does not create a subscription to any Topic, * you can use [[Consumer#subscribe]] for this purpose. */ - def withOptions[F[_]: Concurrent: ContextShift, E: Schema]( + def make[F[_]: Concurrent: ContextShift, E: Schema]( client: Pulsar.T, topic: Topic, sub: Subscription, - opts: Options[F, E] - ): Resource[F, Consumer[F, E]] = - mkConsumer(client, sub, topic.asRight, opts) + opts: Options[F, E] = null // default value does not work + ): Resource[F, Consumer[F, E]] = { + val _opts = Option(opts).getOrElse(Options[F, E]()) + mkConsumer(client, sub, topic, _opts) + } // Builder-style abstract class instead of case class to allow for bincompat-friendly extension in future versions. sealed abstract class Options[F[_], E] { diff --git a/core/src/main/scala/cr/pulsar/Producer.scala b/core/src/main/scala/cr/pulsar/Producer.scala index 1085919a..384068b0 100644 --- a/core/src/main/scala/cr/pulsar/Producer.scala +++ b/core/src/main/scala/cr/pulsar/Producer.scala @@ -64,11 +64,13 @@ object Producer { /** * It creates a simple [[Producer]] with the supplied options. */ - def withOptions[F[_]: Concurrent: ContextShift: Parallel, E: Schema]( + def make[F[_]: Concurrent: ContextShift: Parallel, E: Schema]( client: Pulsar.T, - topic: Topic, - opts: Options[F, E] + topic: Topic.Single, + opts: Options[F, E] = null // default value does not work ): Resource[F, Producer[F, E]] = { + val _opts = Option(opts).getOrElse(Options[F, E]()) + def configureBatching( batching: Batching, producerBuilder: ProducerBuilder[E] @@ -90,7 +92,7 @@ object Producer { .make { F.delay( configureBatching( - opts.batching, + _opts.batching, client.newProducer(E.schema).topic(topic.url.value) ).create ) @@ -98,11 +100,11 @@ object Producer { .map { prod => new Producer[F, E] { override def send(msg: E, key: MessageKey): F[MessageId] = - opts.logger(msg)(topic.url) &> F.delay { + _opts.logger(msg)(topic.url) &> F.delay { prod .newMessage() .value(msg) - .withShardKey(opts.shardKey(msg)) + .withShardKey(_opts.shardKey(msg)) .withMessageKey(key) .sendAsync() }.futureLift @@ -116,25 +118,6 @@ object Producer { } } - /** - * It creates a [[Producer]] with default options and the supplied message logger. - */ - def withLogger[F[_]: ContextShift: Parallel: Concurrent, E: Schema]( - client: Pulsar.T, - topic: Topic, - logger: E => Topic.URL => F[Unit] - ): Resource[F, Producer[F, E]] = - withOptions(client, topic, Options[F, E]().withLogger(logger)) - - /** - * It creates a [[Producer]] with default options (no-op logger). - */ - def create[F[_]: ContextShift: Parallel: Concurrent, E: Schema]( - client: Pulsar.T, - topic: Topic - ): Resource[F, Producer[F, E]] = - withOptions(client, topic, Options[F, E]()) - // Builder-style abstract class instead of case class to allow for bincompat-friendly extension in future versions. sealed abstract class Options[F[_], E] { val batching: Batching diff --git a/core/src/main/scala/cr/pulsar/Pulsar.scala b/core/src/main/scala/cr/pulsar/Pulsar.scala index 8519bdd9..79f89ea5 100644 --- a/core/src/main/scala/cr/pulsar/Pulsar.scala +++ b/core/src/main/scala/cr/pulsar/Pulsar.scala @@ -35,14 +35,10 @@ object Pulsar { * It will be closed once the client is no longer in use or in case of * shutdown of the application that makes use of it. */ - def create[F[_]: Sync]( - url: PulsarURL - ): Resource[F, T] = withOptions(url, Options()) - - def withOptions[F[_]: Sync]( + def make[F[_]: Sync]( url: PulsarURL, - opts: Options - ): Resource[F, Underlying] = + opts: Options = Options() + ): Resource[F, T] = Resource.fromAutoCloseable( F.delay( Underlying.builder diff --git a/core/src/main/scala/cr/pulsar/Reader.scala b/core/src/main/scala/cr/pulsar/Reader.scala index adf017db..0dff5d7a 100644 --- a/core/src/main/scala/cr/pulsar/Reader.scala +++ b/core/src/main/scala/cr/pulsar/Reader.scala @@ -52,7 +52,7 @@ object Reader { private def mkPulsarReader[F[_]: Sync]( client: Pulsar.T, - topic: Topic, + topic: Topic.Single, opts: Options ): Resource[F, JReader[Array[Byte]]] = Resource @@ -129,53 +129,29 @@ object Reader { /** * It creates a [[Reader]] with the supplied [[Options]]. */ - def withOptions[ + def make[ F[_]: Concurrent: ContextShift, E: Inject[*, Array[Byte]] ]( client: Pulsar.T, - topic: Topic, - opts: Options + topic: Topic.Single, + opts: Options = Options() ): Resource[F, Reader[F, E]] = mkPulsarReader[F](client, topic, opts) .map(c => mkPayloadReader(mkMessageReader[F, E](c))) - /** - * It creates a simple [[Reader]]. - */ - def create[ - F[_]: Concurrent: ContextShift, - E: Inject[*, Array[Byte]] - ]( - client: Pulsar.T, - topic: Topic - ): Resource[F, Reader[F, E]] = - withOptions[F, E](client, topic, Options()) - /** * It creates a [[MessageReader]] with the supplied [[Options]]. */ - def messageReaderWithOptions[ - F[_]: Concurrent: ContextShift, - E: Inject[*, Array[Byte]] - ]( - client: Pulsar.T, - topic: Topic, - opts: Options - ): Resource[F, MessageReader[F, E]] = - mkPulsarReader[F](client, topic, opts).map(mkMessageReader[F, E]) - - /** - * It creates a simple [[MessageReader]]. - */ def messageReader[ F[_]: Concurrent: ContextShift, E: Inject[*, Array[Byte]] ]( client: Pulsar.T, - topic: Topic + topic: Topic.Single, + opts: Options = Options() ): Resource[F, MessageReader[F, E]] = - messageReaderWithOptions[F, E](client, topic, Options()) + mkPulsarReader[F](client, topic, opts).map(mkMessageReader[F, E]) // Builder-style abstract class instead of case class to allow for bincompat-friendly extension in future versions. sealed abstract class Options { diff --git a/core/src/main/scala/cr/pulsar/Topic.scala b/core/src/main/scala/cr/pulsar/Topic.scala index 85e6ce8f..e858ab6f 100644 --- a/core/src/main/scala/cr/pulsar/Topic.scala +++ b/core/src/main/scala/cr/pulsar/Topic.scala @@ -22,10 +22,7 @@ import io.estatico.newtype.macros.newtype import scala.annotation.implicitNotFound import scala.util.matching.Regex -sealed abstract class Topic { - val name: Topic.Name - val url: Topic.URL -} +sealed trait Topic /** * Topic names are URLs that have a well-defined structure: @@ -34,12 +31,27 @@ sealed abstract class Topic { * {persistent|non-persistent}://tenant/namespace/topic * }}} * + * It could be either `Single` for one or `Multi` (taking a regular expression) for + * consuming from multiple topics. + * * Find out more at [[https://pulsar.apache.org/docs/en/concepts-messaging/#topics]] */ object Topic { + sealed abstract class Single extends Topic { + val name: Topic.Name + val url: Topic.URL + } + + sealed abstract class Multi extends Topic { + val url: URL + } + implicit val showTopic: Show[Topic] = - Show[String].contramap(_.url.value) + Show[String].contramap { + case s: Single => s.url.value + case m: Multi => m.url.value + } @newtype case class Name(value: String) @newtype case class NamePattern(value: Regex) @@ -58,10 +70,6 @@ object Topic { private def buildUrl(cfg: Config, name: Name, `type`: Type): URL = URL(s"${`type`.show}://${cfg.tenant.value}/${cfg.namespace.value}/${name.value}") - sealed abstract class Pattern { - val url: URL - } - private def buildRegexUrl(cfg: Config, namePattern: NamePattern, `type`: Type): URL = URL( s"${`type`.show}://${cfg.tenant.value}/${cfg.namespace.value}/${namePattern.value.regex}" @@ -76,24 +84,23 @@ object Topic { sealed trait Pattern extends Info sealed trait Type extends Info - type Mandatory = Empty with Config with Name with Type - type PatternMandatory = Empty with Config with Pattern with Type + type SingleMandatory = Empty with Config with Name with Type + type MultiMandatory = Empty with Config with Pattern with Type } case class TopicBuilder[I <: Info] protected ( - _name: Name = Name(""), - _pattern: NamePattern = NamePattern("".r), + _name: Either[Name, NamePattern] = Name("").asLeft, _config: Config = Config.Builder.default, _type: Type = Type.Persistent ) { def withName(name: Name): TopicBuilder[I with Info.Name] = - this.copy(_name = name) + this.copy(_name = name.asLeft) def withName(name: String): TopicBuilder[I with Info.Name] = withName(Name(name)) def withNamePattern(pattern: NamePattern): TopicBuilder[I with Info.Pattern] = - this.copy(_pattern = pattern) + this.copy(_name = pattern.asRight) def withNamePattern(regex: Regex): TopicBuilder[I with Info.Pattern] = withNamePattern(NamePattern(regex)) @@ -105,29 +112,33 @@ object Topic { this.copy(_type = typ) /** - * It creates a topic. By default, Type=Persistent. + * It creates a topic of type Single. By default, Type=Persistent. */ def build( implicit @implicitNotFound( - "Topic.Name and Config are mandatory. By default Type=Persistent." - ) ev: I =:= Info.Mandatory - ): Topic = - new Topic { - val name = _name - val url = buildUrl(_config, _name, _type) + "Topic.Name or Topic.Pattern, and Config are mandatory. By default Type=Persistent." + ) ev: I =:= Info.SingleMandatory + ): Topic.Single = { + val t = _name.swap.toOption.get + new Single { + val name = t + val url = buildUrl(_config, t, _type) } + } /** - * It creates a topic for a regex pattern. By default, Type=Persistent. + * It creates a topic of type Multi. By default, Type=Persistent. */ - def buildPattern( + def buildMulti( implicit @implicitNotFound( - "Topic.NamePattern and Config are mandatory. By default Type=Persistent." - ) ev: I =:= Info.PatternMandatory - ): Topic.Pattern = - new Topic.Pattern { - val url = buildRegexUrl(_config, _pattern, _type) + "Topic.Pattern, and Config are mandatory. By default Type=Persistent." + ) ev: I =:= Info.MultiMandatory + ): Topic.Multi = { + val m = _name.toOption.get + new Multi { + val url = buildRegexUrl(_config, m, _type) } + } } diff --git a/docs/src/paradox/index.md b/docs/src/paradox/index.md index 3f109bca..2a1f572e 100644 --- a/docs/src/paradox/index.md +++ b/docs/src/paradox/index.md @@ -43,9 +43,9 @@ object Demo extends IOApp { val resources: Resource[IO, (Consumer[IO, String], Producer[IO, String])] = for { - pulsar <- Pulsar.create[IO](config.url) - consumer <- Consumer.create[IO, String](pulsar, topic, subs) - producer <- Producer.create[IO, String](pulsar, topic) + pulsar <- Pulsar.make[IO](config.url) + consumer <- Consumer.make[IO, String](pulsar, topic, subs) + producer <- Producer.make[IO, String](pulsar, topic) } yield (consumer, producer) def run(args: List[String]): IO[ExitCode] = diff --git a/tests/src/it/scala/cr/pulsar/AlwaysIncompatibleSchemaSuite.scala b/tests/src/it/scala/cr/pulsar/AlwaysIncompatibleSchemaSuite.scala index 36fd6af5..98af982a 100644 --- a/tests/src/it/scala/cr/pulsar/AlwaysIncompatibleSchemaSuite.scala +++ b/tests/src/it/scala/cr/pulsar/AlwaysIncompatibleSchemaSuite.scala @@ -17,7 +17,7 @@ object AlwaysIncompatibleSchemaSuite extends IOSuite { .build override type Res = Pulsar.T - override def sharedResource: Resource[IO, Res] = Pulsar.create[IO](cfg.url) + override def sharedResource: Resource[IO, Res] = Pulsar.make[IO](cfg.url) val sub = (s: String) => Subscription.Builder @@ -38,8 +38,8 @@ object AlwaysIncompatibleSchemaSuite extends IOSuite { ) { client => val res: Resource[IO, (Consumer[IO, Event], Producer[IO, Event_V2])] = for { - consumer <- Consumer.create[IO, Event](client, topic, sub("circe")) - producer <- Producer.create[IO, Event_V2](client, topic) + consumer <- Consumer.make[IO, Event](client, topic, sub("circe")) + producer <- Producer.make[IO, Event_V2](client, topic) } yield consumer -> producer res.attempt.use { diff --git a/tests/src/it/scala/cr/pulsar/BackwardCompatSchemaSuite.scala b/tests/src/it/scala/cr/pulsar/BackwardCompatSchemaSuite.scala index fcaa774a..3c07056a 100644 --- a/tests/src/it/scala/cr/pulsar/BackwardCompatSchemaSuite.scala +++ b/tests/src/it/scala/cr/pulsar/BackwardCompatSchemaSuite.scala @@ -20,7 +20,7 @@ object BackwardCompatSchemaSuite extends IOSuite { .build override type Res = Pulsar.T - override def sharedResource: Resource[IO, Res] = Pulsar.create[IO](cfg.url) + override def sharedResource: Resource[IO, Res] = Pulsar.make[IO](cfg.url) val sub = (s: String) => Subscription.Builder @@ -40,8 +40,8 @@ object BackwardCompatSchemaSuite extends IOSuite { client => val res: Resource[IO, (Consumer[IO, Event_V2], Producer[IO, Event])] = for { - consumer <- Consumer.create[IO, Event_V2](client, topic, sub("circe")) - producer <- Producer.create[IO, Event](client, topic) + consumer <- Consumer.make[IO, Event_V2](client, topic, sub("circe")) + producer <- Producer.make[IO, Event](client, topic) } yield consumer -> producer (Ref.of[IO, Int](0), Deferred[IO, Event_V2]).tupled.flatMap { diff --git a/tests/src/it/scala/cr/pulsar/NeutronSuite.scala b/tests/src/it/scala/cr/pulsar/NeutronSuite.scala index 7922c7cc..d2c4d8fd 100644 --- a/tests/src/it/scala/cr/pulsar/NeutronSuite.scala +++ b/tests/src/it/scala/cr/pulsar/NeutronSuite.scala @@ -35,7 +35,7 @@ object NeutronSuite extends IOSuite { val cfg = Config.Builder.default override type Res = Pulsar.T - override def sharedResource: Resource[IO, Res] = Pulsar.create[IO](cfg.url) + override def sharedResource: Resource[IO, Res] = Pulsar.make[IO](cfg.url) val sub = (s: String) => Subscription.Builder @@ -58,8 +58,8 @@ object NeutronSuite extends IOSuite { val res: Resource[IO, (Consumer[IO, Event], Producer[IO, Event])] = for { - consumer <- Consumer.create[IO, Event](client, hpTopic, sub("hp-circe")) - producer <- Producer.create[IO, Event](client, hpTopic) + consumer <- Consumer.make[IO, Event](client, hpTopic, sub("hp-circe")) + producer <- Producer.make[IO, Event](client, hpTopic) } yield consumer -> producer Deferred[IO, Event].flatMap { latch => @@ -94,8 +94,8 @@ object NeutronSuite extends IOSuite { val res: Resource[IO, (Consumer[IO, String], Producer[IO, String])] = for { - consumer <- Consumer.create[IO, String](client, hpTopic, sub("hp-bytes")) - producer <- Producer.create[IO, String](client, hpTopic) + consumer <- Consumer.make[IO, String](client, hpTopic, sub("hp-bytes")) + producer <- Producer.make[IO, String](client, hpTopic) } yield consumer -> producer Deferred[IO, String].flatMap { latch => @@ -129,8 +129,8 @@ object NeutronSuite extends IOSuite { val res: Resource[IO, (Consumer[IO, Event], Producer[IO, String])] = for { - consumer <- Consumer.create[IO, Event](client, dfTopic, sub("decoding-err")) - producer <- Producer.create[IO, String](client, dfTopic) + consumer <- Consumer.make[IO, Event](client, dfTopic, sub("decoding-err")) + producer <- Producer.make[IO, String](client, dfTopic) } yield consumer -> producer Deferred[IO, String].flatMap { latch => @@ -179,9 +179,9 @@ object NeutronSuite extends IOSuite { (Consumer[IO, Event], Consumer[IO, Event], Producer[IO, Event]) ] = for { - c1 <- Consumer.create[IO, Event](client, topic("shared"), makeSub("s1")) - c2 <- Consumer.create[IO, Event](client, topic("shared"), makeSub("s2")) - p1 <- Producer.withOptions(client, topic("shared"), opts) + c1 <- Consumer.make[IO, Event](client, topic("shared"), makeSub("s1")) + c2 <- Consumer.make[IO, Event](client, topic("shared"), makeSub("s2")) + p1 <- Producer.make(client, topic("shared"), opts) } yield (c1, c2, p1) (Ref.of[IO, List[Event]](List.empty), Ref.of[IO, List[Event]](List.empty)).tupled @@ -241,8 +241,8 @@ object NeutronSuite extends IOSuite { val res: Resource[IO, (Consumer[IO, Fruit], Producer[IO, Fruit])] = for { - consumer <- Consumer.create[IO, Fruit](client, vTopic, sub("fruits")) - producer <- Producer.create[IO, Fruit](client, vTopic) + consumer <- Consumer.make[IO, Fruit](client, vTopic, sub("fruits")) + producer <- Producer.make[IO, Fruit](client, vTopic) } yield consumer -> producer res.use(_ => IO.pure(expect(true))) @@ -253,8 +253,8 @@ object NeutronSuite extends IOSuite { val res: Resource[IO, (Consumer[IO, Inner], Producer[IO, Inner])] = for { - consumer <- Consumer.create[IO, Inner](client, vTopic, sub("outer-inner")) - producer <- Producer.create[IO, Inner](client, vTopic) + consumer <- Consumer.make[IO, Inner](client, vTopic, sub("outer-inner")) + producer <- Producer.make[IO, Inner](client, vTopic) } yield consumer -> producer res.use(_ => IO.pure(expect(true))) From 69e12bf8f9342e6aa5d2b83c2d7345bc515edadd Mon Sep 17 00:00:00 2001 From: Gabriel Volpe Date: Fri, 26 Mar 2021 16:11:20 +0100 Subject: [PATCH 2/4] Organize constraints --- circe/src/main/scala/cr/pulsar/schema/circe/package.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/circe/src/main/scala/cr/pulsar/schema/circe/package.scala b/circe/src/main/scala/cr/pulsar/schema/circe/package.scala index 457ccdda..ad388a09 100644 --- a/circe/src/main/scala/cr/pulsar/schema/circe/package.scala +++ b/circe/src/main/scala/cr/pulsar/schema/circe/package.scala @@ -36,7 +36,7 @@ import org.apache.pulsar.client.api.schema.{ package object circe { - implicit def circeInstance[T: ClassTag: Encoder: Decoder]: Schema[T] = + implicit def circeInstance[T: ClassTag: Decoder: Encoder]: Schema[T] = new Schema[T] { def schema: JSchema[T] = { val reader = new SchemaReader[T] { From c72d79bc89dc5dbe118060a6a90a6cbb1d839223 Mon Sep 17 00:00:00 2001 From: Gabriel Volpe Date: Fri, 26 Mar 2021 16:14:09 +0100 Subject: [PATCH 3/4] Organize constraints --- core/src/main/scala/cr/pulsar/Topic.scala | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/cr/pulsar/Topic.scala b/core/src/main/scala/cr/pulsar/Topic.scala index e858ab6f..f5ef0af2 100644 --- a/core/src/main/scala/cr/pulsar/Topic.scala +++ b/core/src/main/scala/cr/pulsar/Topic.scala @@ -116,7 +116,7 @@ object Topic { */ def build( implicit @implicitNotFound( - "Topic.Name or Topic.Pattern, and Config are mandatory. By default Type=Persistent." + "Topic.Name and Config are mandatory. By default Type=Persistent." ) ev: I =:= Info.SingleMandatory ): Topic.Single = { val t = _name.swap.toOption.get @@ -131,14 +131,12 @@ object Topic { */ def buildMulti( implicit @implicitNotFound( - "Topic.Pattern, and Config are mandatory. By default Type=Persistent." + "Topic.Pattern and Config are mandatory. By default Type=Persistent." ) ev: I =:= Info.MultiMandatory - ): Topic.Multi = { - val m = _name.toOption.get + ): Topic.Multi = new Multi { - val url = buildRegexUrl(_config, m, _type) + val url = buildRegexUrl(_config, _name.toOption.get, _type) } - } } From 18be24f94a2d4a608da46b1dde1bdfe6159ed2d5 Mon Sep 17 00:00:00 2001 From: Gabriel Volpe Date: Mon, 29 Mar 2021 11:06:27 +0200 Subject: [PATCH 4/4] Update Reader to use schemas --- core/src/main/scala/cr/pulsar/Reader.scala | 53 ++++++++-------------- 1 file changed, 19 insertions(+), 34 deletions(-) diff --git a/core/src/main/scala/cr/pulsar/Reader.scala b/core/src/main/scala/cr/pulsar/Reader.scala index 0dff5d7a..28677c66 100644 --- a/core/src/main/scala/cr/pulsar/Reader.scala +++ b/core/src/main/scala/cr/pulsar/Reader.scala @@ -16,11 +16,12 @@ package cr.pulsar -import cats._ +import cats.Functor import cats.effect._ import cats.syntax.all._ import cr.pulsar.Reader.Message import cr.pulsar.internal.FutureLift._ +import cr.pulsar.schema.Schema import fs2._ import org.apache.pulsar.client.api.{ MessageId, Reader => JReader } @@ -50,15 +51,16 @@ object Reader { case class DecodingFailure(bytes: Array[Byte]) extends NoStackTrace case class Message[A](id: MessageId, key: MessageKey, payload: A) - private def mkPulsarReader[F[_]: Sync]( + private def mkPulsarReader[F[_]: Sync, E: Schema]( client: Pulsar.T, topic: Topic.Single, opts: Options - ): Resource[F, JReader[Array[Byte]]] = + ): Resource[F, JReader[E]] = Resource .make { F.delay { - client.newReader + client + .newReader(E.schema) .topic(topic.url.value) .startMessageId(opts.startMessageId) .startMessageIdInclusive() @@ -71,31 +73,20 @@ object Reader { private def mkMessageReader[ F[_]: Concurrent: ContextShift, - E: Inject[*, Array[Byte]] - ](c: JReader[Array[Byte]]): MessageReader[F, E] = + E: Schema + ](c: JReader[E]): MessageReader[F, E] = new MessageReader[F, E] { override def read: Stream[F, Message[E]] = Stream.repeatEval( - F.delay(c.readNextAsync()).futureLift.flatMap { m => - val data = m.getData - - E.prj(data) match { - case Some(e) => F.pure(Message(m.getMessageId, MessageKey(m.getKey), e)) - case None => DecodingFailure(data).raiseError[F, Message[E]] - } + F.delay(c.readNextAsync()).futureLift.map { m => + Message(m.getMessageId, MessageKey(m.getKey), m.getValue) } ) override def read1: F[Option[Message[E]]] = F.delay(c.hasMessageAvailableAsync).futureLift.flatMap { hasAvailable => - val readNext = F.delay(c.readNextAsync()).futureLift.flatMap { m => - val data = m.getData - - E.prj(data) match { - case Some(e) => - F.pure(Message(m.getMessageId, MessageKey(m.getKey), e)) - case None => DecodingFailure(data).raiseError[F, Message[E]] - } + val readNext = F.delay(c.readNextAsync()).futureLift.map { m => + Message(m.getMessageId, MessageKey(m.getKey), m.getValue) } Option.when(hasAvailable)(readNext).sequence @@ -104,17 +95,11 @@ object Reader { override def readUntil(timeout: FiniteDuration): F[Option[Message[E]]] = F.delay(c.hasMessageAvailableAsync).futureLift.flatMap { hasAvailable => val readNext = - F.delay(c.readNext(timeout.length.toInt, timeout.unit)).flatMap { m => - Option(m).map(_.getData).traverse { data => - E.prj(data) match { - case Some(e) => - F.pure(Message(m.getMessageId, MessageKey(m.getKey), e)) - case None => DecodingFailure(data).raiseError[F, Message[E]] - } - } + F.delay(c.readNext(timeout.length.toInt, timeout.unit)).map { m => + Message(m.getMessageId, MessageKey(m.getKey), m.getValue) } - Option.when(hasAvailable)(readNext).flatSequence + Option.when(hasAvailable)(readNext).sequence } } @@ -131,13 +116,13 @@ object Reader { */ def make[ F[_]: Concurrent: ContextShift, - E: Inject[*, Array[Byte]] + E: Schema ]( client: Pulsar.T, topic: Topic.Single, opts: Options = Options() ): Resource[F, Reader[F, E]] = - mkPulsarReader[F](client, topic, opts) + mkPulsarReader[F, E](client, topic, opts) .map(c => mkPayloadReader(mkMessageReader[F, E](c))) /** @@ -145,13 +130,13 @@ object Reader { */ def messageReader[ F[_]: Concurrent: ContextShift, - E: Inject[*, Array[Byte]] + E: Schema ]( client: Pulsar.T, topic: Topic.Single, opts: Options = Options() ): Resource[F, MessageReader[F, E]] = - mkPulsarReader[F](client, topic, opts).map(mkMessageReader[F, E]) + mkPulsarReader[F, E](client, topic, opts).map(mkMessageReader[F, E]) // Builder-style abstract class instead of case class to allow for bincompat-friendly extension in future versions. sealed abstract class Options {