Skip to content

Commit

Permalink
Merge pull request #179 from cr-org/refactor/simplify-constructors
Browse files Browse the repository at this point in the history
Simplify constructors
  • Loading branch information
gvolpe authored Mar 29, 2021
2 parents 9e0d8de + 18be24f commit 92947b1
Show file tree
Hide file tree
Showing 11 changed files with 112 additions and 199 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Check out the [microsite](https://cr-org.github.io/neutron/).

### Pulsar version

At the moment, we target Apache Pulsar 2.6.x.
At the moment, we target Apache Pulsar 2.7.x.

### Development

Expand Down
2 changes: 1 addition & 1 deletion circe/src/main/scala/cr/pulsar/schema/circe/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.pulsar.client.api.schema.{

package object circe {

implicit def circeInstance[T: ClassTag: Encoder: Decoder]: Schema[T] =
implicit def circeInstance[T: ClassTag: Decoder: Encoder]: Schema[T] =
new Schema[T] {
def schema: JSchema[T] = {
val reader = new SchemaReader[T] {
Expand Down
64 changes: 14 additions & 50 deletions core/src/main/scala/cr/pulsar/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -73,18 +73,17 @@ object Consumer {
private def mkConsumer[F[_]: Concurrent: ContextShift, E: Schema](
client: Pulsar.T,
sub: Subscription,
topicType: Either[Topic.Pattern, Topic],
topic: Topic,
opts: Options[F, E]
): Resource[F, Consumer[F, E]] = {
val acquire =
F.delay {
val c = client.newConsumer(E.schema)
topicType
.fold(
p => c.topicsPattern(p.url.value.r.pattern),
t => c.topic(t.url.value)
)
.readCompacted(opts.readCompacted)
val z = topic match {
case s: Topic.Single => c.topic(s.url.value)
case m: Topic.Multi => c.topicsPattern(m.url.value.r.pattern)
}
z.readCompacted(opts.readCompacted)
.subscriptionType(sub.`type`.pulsarSubscriptionType)
.subscriptionName(sub.name.value)
.subscriptionMode(sub.mode.pulsarSubscriptionMode)
Expand Down Expand Up @@ -125,57 +124,22 @@ object Consumer {
}

/**
* It creates a [[Consumer]] for a multi-topic subscription.
* It creates a [[Consumer]] with the supplied options, or a default value otherwise.
*
* Find out more at [[https://pulsar.apache.org/docs/en/concepts-messaging/#multi-topic-subscriptions]]
*
* Note that this does not create a subscription to any Topic,
* you can use [[Consumer#subscribe]] for this purpose.
*/
def multiTopic[F[_]: Concurrent: ContextShift, E: Schema](
client: Pulsar.T,
topicPattern: Topic.Pattern,
sub: Subscription
): Resource[F, Consumer[F, E]] =
mkConsumer(client, sub, topicPattern.asLeft, Options[F, E]())

/**
* It creates a [[Consumer]] with default options.
*
* Note that this does not create a subscription to any Topic,
* you can use [[Consumer#subscribe]] for this purpose.
*/
def create[F[_]: Concurrent: ContextShift, E: Schema](
client: Pulsar.T,
topic: Topic,
sub: Subscription
): Resource[F, Consumer[F, E]] =
withOptions(client, topic, sub, Options[F, E]())

/**
* It creates a [[Consumer]] with default options and the supplied message logger.
*/
def withLogger[F[_]: ContextShift: Concurrent, E: Schema](
client: Pulsar.T,
topic: Topic,
sub: Subscription,
logger: E => Topic.URL => F[Unit]
): Resource[F, Consumer[F, E]] =
withOptions(client, topic, sub, Options[F, E]().withLogger(logger))

/**
* It creates a [[Consumer]] with the supplied options.
* A [[Topic]] can either be `Single` or `Multi` for multi topic subscriptions.
*
* Note that this does not create a subscription to any Topic,
* you can use [[Consumer#subscribe]] for this purpose.
*/
def withOptions[F[_]: Concurrent: ContextShift, E: Schema](
def make[F[_]: Concurrent: ContextShift, E: Schema](
client: Pulsar.T,
topic: Topic,
sub: Subscription,
opts: Options[F, E]
): Resource[F, Consumer[F, E]] =
mkConsumer(client, sub, topic.asRight, opts)
opts: Options[F, E] = null // default value does not work
): Resource[F, Consumer[F, E]] = {
val _opts = Option(opts).getOrElse(Options[F, E]())
mkConsumer(client, sub, topic, _opts)
}

// Builder-style abstract class instead of case class to allow for bincompat-friendly extension in future versions.
sealed abstract class Options[F[_], E] {
Expand Down
33 changes: 8 additions & 25 deletions core/src/main/scala/cr/pulsar/Producer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,13 @@ object Producer {
/**
* It creates a simple [[Producer]] with the supplied options.
*/
def withOptions[F[_]: Concurrent: ContextShift: Parallel, E: Schema](
def make[F[_]: Concurrent: ContextShift: Parallel, E: Schema](
client: Pulsar.T,
topic: Topic,
opts: Options[F, E]
topic: Topic.Single,
opts: Options[F, E] = null // default value does not work
): Resource[F, Producer[F, E]] = {
val _opts = Option(opts).getOrElse(Options[F, E]())

def configureBatching(
batching: Batching,
producerBuilder: ProducerBuilder[E]
Expand All @@ -90,19 +92,19 @@ object Producer {
.make {
F.delay(
configureBatching(
opts.batching,
_opts.batching,
client.newProducer(E.schema).topic(topic.url.value)
).create
)
}(p => F.delay(p.closeAsync()).futureLift.void)
.map { prod =>
new Producer[F, E] {
override def send(msg: E, key: MessageKey): F[MessageId] =
opts.logger(msg)(topic.url) &> F.delay {
_opts.logger(msg)(topic.url) &> F.delay {
prod
.newMessage()
.value(msg)
.withShardKey(opts.shardKey(msg))
.withShardKey(_opts.shardKey(msg))
.withMessageKey(key)
.sendAsync()
}.futureLift
Expand All @@ -116,25 +118,6 @@ object Producer {
}
}

/**
* It creates a [[Producer]] with default options and the supplied message logger.
*/
def withLogger[F[_]: ContextShift: Parallel: Concurrent, E: Schema](
client: Pulsar.T,
topic: Topic,
logger: E => Topic.URL => F[Unit]
): Resource[F, Producer[F, E]] =
withOptions(client, topic, Options[F, E]().withLogger(logger))

/**
* It creates a [[Producer]] with default options (no-op logger).
*/
def create[F[_]: ContextShift: Parallel: Concurrent, E: Schema](
client: Pulsar.T,
topic: Topic
): Resource[F, Producer[F, E]] =
withOptions(client, topic, Options[F, E]())

// Builder-style abstract class instead of case class to allow for bincompat-friendly extension in future versions.
sealed abstract class Options[F[_], E] {
val batching: Batching
Expand Down
10 changes: 3 additions & 7 deletions core/src/main/scala/cr/pulsar/Pulsar.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,10 @@ object Pulsar {
* It will be closed once the client is no longer in use or in case of
* shutdown of the application that makes use of it.
*/
def create[F[_]: Sync](
url: PulsarURL
): Resource[F, T] = withOptions(url, Options())

def withOptions[F[_]: Sync](
def make[F[_]: Sync](
url: PulsarURL,
opts: Options
): Resource[F, Underlying] =
opts: Options = Options()
): Resource[F, T] =
Resource.fromAutoCloseable(
F.delay(
Underlying.builder
Expand Down
89 changes: 25 additions & 64 deletions core/src/main/scala/cr/pulsar/Reader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@

package cr.pulsar

import cats._
import cats.Functor
import cats.effect._
import cats.syntax.all._
import cr.pulsar.Reader.Message
import cr.pulsar.internal.FutureLift._
import cr.pulsar.schema.Schema
import fs2._
import org.apache.pulsar.client.api.{ MessageId, Reader => JReader }

Expand Down Expand Up @@ -50,15 +51,16 @@ object Reader {
case class DecodingFailure(bytes: Array[Byte]) extends NoStackTrace
case class Message[A](id: MessageId, key: MessageKey, payload: A)

private def mkPulsarReader[F[_]: Sync](
private def mkPulsarReader[F[_]: Sync, E: Schema](
client: Pulsar.T,
topic: Topic,
topic: Topic.Single,
opts: Options
): Resource[F, JReader[Array[Byte]]] =
): Resource[F, JReader[E]] =
Resource
.make {
F.delay {
client.newReader
client
.newReader(E.schema)
.topic(topic.url.value)
.startMessageId(opts.startMessageId)
.startMessageIdInclusive()
Expand All @@ -71,31 +73,20 @@ object Reader {

private def mkMessageReader[
F[_]: Concurrent: ContextShift,
E: Inject[*, Array[Byte]]
](c: JReader[Array[Byte]]): MessageReader[F, E] =
E: Schema
](c: JReader[E]): MessageReader[F, E] =
new MessageReader[F, E] {
override def read: Stream[F, Message[E]] =
Stream.repeatEval(
F.delay(c.readNextAsync()).futureLift.flatMap { m =>
val data = m.getData

E.prj(data) match {
case Some(e) => F.pure(Message(m.getMessageId, MessageKey(m.getKey), e))
case None => DecodingFailure(data).raiseError[F, Message[E]]
}
F.delay(c.readNextAsync()).futureLift.map { m =>
Message(m.getMessageId, MessageKey(m.getKey), m.getValue)
}
)

override def read1: F[Option[Message[E]]] =
F.delay(c.hasMessageAvailableAsync).futureLift.flatMap { hasAvailable =>
val readNext = F.delay(c.readNextAsync()).futureLift.flatMap { m =>
val data = m.getData

E.prj(data) match {
case Some(e) =>
F.pure(Message(m.getMessageId, MessageKey(m.getKey), e))
case None => DecodingFailure(data).raiseError[F, Message[E]]
}
val readNext = F.delay(c.readNextAsync()).futureLift.map { m =>
Message(m.getMessageId, MessageKey(m.getKey), m.getValue)
}

Option.when(hasAvailable)(readNext).sequence
Expand All @@ -104,17 +95,11 @@ object Reader {
override def readUntil(timeout: FiniteDuration): F[Option[Message[E]]] =
F.delay(c.hasMessageAvailableAsync).futureLift.flatMap { hasAvailable =>
val readNext =
F.delay(c.readNext(timeout.length.toInt, timeout.unit)).flatMap { m =>
Option(m).map(_.getData).traverse { data =>
E.prj(data) match {
case Some(e) =>
F.pure(Message(m.getMessageId, MessageKey(m.getKey), e))
case None => DecodingFailure(data).raiseError[F, Message[E]]
}
}
F.delay(c.readNext(timeout.length.toInt, timeout.unit)).map { m =>
Message(m.getMessageId, MessageKey(m.getKey), m.getValue)
}

Option.when(hasAvailable)(readNext).flatSequence
Option.when(hasAvailable)(readNext).sequence
}
}

Expand All @@ -129,53 +114,29 @@ object Reader {
/**
* It creates a [[Reader]] with the supplied [[Options]].
*/
def withOptions[
def make[
F[_]: Concurrent: ContextShift,
E: Inject[*, Array[Byte]]
E: Schema
](
client: Pulsar.T,
topic: Topic,
opts: Options
topic: Topic.Single,
opts: Options = Options()
): Resource[F, Reader[F, E]] =
mkPulsarReader[F](client, topic, opts)
mkPulsarReader[F, E](client, topic, opts)
.map(c => mkPayloadReader(mkMessageReader[F, E](c)))

/**
* It creates a simple [[Reader]].
*/
def create[
F[_]: Concurrent: ContextShift,
E: Inject[*, Array[Byte]]
](
client: Pulsar.T,
topic: Topic
): Resource[F, Reader[F, E]] =
withOptions[F, E](client, topic, Options())

/**
* It creates a [[MessageReader]] with the supplied [[Options]].
*/
def messageReaderWithOptions[
F[_]: Concurrent: ContextShift,
E: Inject[*, Array[Byte]]
](
client: Pulsar.T,
topic: Topic,
opts: Options
): Resource[F, MessageReader[F, E]] =
mkPulsarReader[F](client, topic, opts).map(mkMessageReader[F, E])

/**
* It creates a simple [[MessageReader]].
*/
def messageReader[
F[_]: Concurrent: ContextShift,
E: Inject[*, Array[Byte]]
E: Schema
](
client: Pulsar.T,
topic: Topic
topic: Topic.Single,
opts: Options = Options()
): Resource[F, MessageReader[F, E]] =
messageReaderWithOptions[F, E](client, topic, Options())
mkPulsarReader[F, E](client, topic, opts).map(mkMessageReader[F, E])

// Builder-style abstract class instead of case class to allow for bincompat-friendly extension in future versions.
sealed abstract class Options {
Expand Down
Loading

0 comments on commit 92947b1

Please sign in to comment.