Skip to content

Commit

Permalink
Merge pull request #58 from cr-org/simplify-consumer
Browse files Browse the repository at this point in the history
Simplify consumer creation
  • Loading branch information
gvolpe authored Aug 25, 2020
2 parents 112e92b + 1df8085 commit 6e686cb
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 66 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ jobs:
name: Build
runs-on: ubuntu-18.04
steps:
- uses: "actions/checkout@v2.1.0"
- uses: "actions/checkout@v2.3.2"
- name: "Starting up Pulsar 🐳"
run: docker-compose up -d
- run: |
Expand Down
4 changes: 3 additions & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ jobs:
name: Publish
runs-on: ubuntu-18.04
steps:
- uses: "actions/checkout@v2.1.0"
- uses: "actions/checkout@v2.3.2"
with:
fetch-depth: 0 # fetch all branches & tags
- name: java 11 setup
uses: "olafurpg/setup-java@v6"
with:
Expand Down
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ lazy val docs = (project in file("docs"))
"org" -> organization.value,
"scala.binary.version" -> s"2.${CrossVersion.partialVersion(scalaVersion.value).get._2}",
"neutron-core" -> s"${(`neutron-core` / name).value}_2.${CrossVersion.partialVersion(scalaVersion.value).get._2}",
"neutron-circe" -> s"${(`neutron-circe` / name).value}_2.${CrossVersion.partialVersion(scalaVersion.value).get._2}",
"neutron-function" -> s"${(`neutron-function` / name).value}_2.${CrossVersion.partialVersion(scalaVersion.value).get._2}",
"version" -> version.value
),
Expand Down
13 changes: 7 additions & 6 deletions core/src/it/scala/cr/pulsar/PulsarSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,10 @@ import cats.implicits._
import cr.pulsar.Producer.MessageKey
import fs2.Stream
import java.util.UUID
import org.apache.pulsar.client.api.SubscriptionInitialPosition

class PulsarSpec extends PulsarSuite {

val subs = Subscription(Subscription.Name("test"), Subscription.Type.Failover)
val spos = SubscriptionInitialPosition.Latest
val topic = Topic(cfg, Topic.Name("test"), Topic.Type.Persistent)
val batch = Producer.Batching.Disabled
val shard = (_: Event) => Producer.MessageKey.Default
Expand All @@ -36,7 +34,7 @@ class PulsarSpec extends PulsarSuite {
test("A message is published and consumed successfully") {
val res: Resource[IO, (Consumer[IO], Producer[IO, Event])] =
for {
consumer <- Consumer.create[IO](client, topic, subs, spos)
consumer <- Consumer.create[IO](client, topic, subs)
producer <- Producer.create[IO, Event](client, topic)
} yield consumer -> producer

Expand Down Expand Up @@ -73,11 +71,14 @@ class PulsarSpec extends PulsarSuite {
val makeSub =
(n: String) => Subscription(Subscription.Name(n), Subscription.Type.KeyShared)

val opts =
Producer.Options[IO, Event]().withShardKey(_.shardKey).withBatching(batch)

val res: Resource[IO, (Consumer[IO], Consumer[IO], Producer[IO, Event])] =
for {
c1 <- Consumer.create[IO](client, topic, makeSub("s1"), spos)
c2 <- Consumer.create[IO](client, topic, makeSub("s2"), spos)
producer <- Producer.withOptions[IO, Event](client, topic, _.shardKey, batch)
c1 <- Consumer.create[IO](client, topic, makeSub("s1"))
c2 <- Consumer.create[IO](client, topic, makeSub("s2"))
producer <- Producer.withOptions(client, topic, opts)
} yield (c1, c2, producer)

(Ref.of[IO, List[Event]](List.empty), Ref.of[IO, List[Event]](List.empty)).tupled
Expand Down
59 changes: 46 additions & 13 deletions core/src/main/scala/cr/pulsar/Consumer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ object Consumer {

private def mkConsumer[F[_]: Concurrent: ContextShift](
client: Pulsar.T,
subs: Subscription,
initial: SubscriptionInitialPosition,
topicType: Either[Topic.Pattern, Topic]
sub: Subscription,
topicType: Either[Topic.Pattern, Topic],
opts: Options
): Resource[F, Consumer[F]] =
Resource
.make {
Expand All @@ -49,9 +49,9 @@ object Consumer {
p => c.topicsPattern(p.url.value.r.pattern),
t => c.topic(t.url.value)
)
.subscriptionType(subs.sType)
.subscriptionName(subs.name)
.subscriptionInitialPosition(initial)
.subscriptionType(sub.sType)
.subscriptionName(sub.name)
.subscriptionInitialPosition(opts.initial)
.subscribeAsync
}.futureLift
}(
Expand Down Expand Up @@ -81,24 +81,36 @@ object Consumer {
def multiTopic[F[_]: Concurrent: ContextShift](
client: Pulsar.T,
topicPattern: Topic.Pattern,
subs: Subscription,
initial: SubscriptionInitialPosition
sub: Subscription
): Resource[F, Consumer[F]] =
mkConsumer[F](client, subs, initial, topicPattern.asLeft)
mkConsumer[F](client, sub, topicPattern.asLeft, Options())

/**
* It creates a simple [[Consumer]].
* It creates a simple [[Consumer]] with default options.
*
* Note that this does not create a subscription to any Topic,
* you can use [[Consumer#subscribe]] for this purpose.
*/
def create[F[_]: Concurrent: ContextShift](
client: Pulsar.T,
topic: Topic,
subs: Subscription,
initial: SubscriptionInitialPosition
sub: Subscription
): Resource[F, Consumer[F]] =
mkConsumer[F](client, subs, initial, topic.asRight)
mkConsumer[F](client, sub, topic.asRight, Options())

/**
* It creates a simple [[Consumer]] with the supplied options.
*
* Note that this does not create a subscription to any Topic,
* you can use [[Consumer#subscribe]] for this purpose.
*/
def withOptions[F[_]: Concurrent: ContextShift](
client: Pulsar.T,
topic: Topic,
sub: Subscription,
opts: Options
): Resource[F, Consumer[F]] =
mkConsumer[F](client, sub, topic.asRight, opts)

/**
* A simple message decoder that uses a [[cats.Inject]] instance
Expand Down Expand Up @@ -139,4 +151,25 @@ object Consumer {
): Pipe[F, Message[Array[Byte]], E] =
loggingMessageDecoder[F, E](c, _ => _ => F.unit)

// Builder-style abstract class instead of case class to allow for bincompat-friendly extension in future versions.
sealed abstract class Options {
val initial: SubscriptionInitialPosition
def withInitialPosition(initial: SubscriptionInitialPosition): Options
}

/**
* Consumer options such as subscription initial position.
*/
object Options {
private case class OptionsImpl(
initial: SubscriptionInitialPosition
) extends Options {
override def withInitialPosition(
_initial: SubscriptionInitialPosition
): Options =
copy(initial = _initial)
}
def apply(): Options = OptionsImpl(SubscriptionInitialPosition.Latest)
}

}
86 changes: 50 additions & 36 deletions core/src/main/scala/cr/pulsar/Producer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,19 +57,15 @@ object Producer {
}

/**
* It creates a simple [[Producer]].
*
* Produced messages will be logged using the given `logAction`.
* It creates a simple [[Producer]] with the supplied options.
*/
def withLogger[
def withOptions[
F[_]: Concurrent: ContextShift: Parallel,
E: Inject[*, Array[Byte]]
](
client: Pulsar.T,
topic: Topic,
shardKey: E => MessageKey, // only needed for key-shared topics
batching: Batching,
logAction: E => Topic.URL => F[Unit]
opts: Options[F, E]
): Resource[F, Producer[F, E]] = {
def configureBatching(
batching: Batching,
Expand All @@ -92,7 +88,7 @@ object Producer {
.make {
F.delay(
configureBatching(
batching,
opts.batching,
client.newProducer.topic(topic.url.value)
).create
)
Expand All @@ -115,8 +111,8 @@ object Producer {
}

override def send(msg: E): F[MessageId] =
logAction(msg)(topic.url) &> F.delay {
buildMessage(serialise(msg), shardKey(msg)).sendAsync()
opts.logger(msg)(topic.url) &> F.delay {
buildMessage(serialise(msg), opts.shardKey(msg)).sendAsync()
}.futureLift

override def send_(msg: E): F[Unit] = send(msg).void
Expand All @@ -126,44 +122,62 @@ object Producer {
}

/**
* It creates a simple [[Producer]] with a no-op logger and disabled batching.
*
* Produced messages will not be logged.
* It creates a [[Producer]] with default options and the supplied message logger.
*/
def create[
def withLogger[
F[_]: ContextShift: Parallel: Concurrent,
E: Inject[*, Array[Byte]]
](
client: Pulsar.T,
topic: Topic
topic: Topic,
logger: E => Topic.URL => F[Unit]
): Resource[F, Producer[F, E]] =
withLogger[F, E](
client,
topic,
_ => Producer.MessageKey.Default,
Batching.Disabled,
_ => _ => F.unit
)
withOptions(client, topic, Options[F, E]().withLogger(logger))

/**
* It creates a [[Producer]] with a no-op logger.
*
* Produced messages will not be logged.
* It creates a [[Producer]] with default options (no-op logger).
*/
def withOptions[
def create[
F[_]: ContextShift: Parallel: Concurrent,
E: Inject[*, Array[Byte]]
](
client: Pulsar.T,
topic: Topic,
shardKey: E => MessageKey,
batching: Batching
topic: Topic
): Resource[F, Producer[F, E]] =
withLogger[F, E](
client,
topic,
shardKey,
batching,
_ => _ => F.unit
)
withOptions(client, topic, Options[F, E]())

// Builder-style abstract class instead of case class to allow for bincompat-friendly extension in future versions.
sealed abstract class Options[F[_], E] {
val batching: Batching
val shardKey: E => MessageKey
val logger: E => Topic.URL => F[Unit]
def withBatching(_batching: Batching): Options[F, E]
def withShardKey(_shardKey: E => MessageKey): Options[F, E]
def withLogger(_logger: E => Topic.URL => F[Unit]): Options[F, E]
}

/**
* Producer options such as sharding key, batching, and message logger
*/
object Options {
private case class OptionsImpl[F[_], E](
batching: Batching,
shardKey: E => MessageKey,
logger: E => Topic.URL => F[Unit]
) extends Options[F, E] {
override def withBatching(_batching: Batching): Options[F, E] =
copy(batching = _batching)
override def withShardKey(_shardKey: E => MessageKey): Options[F, E] =
copy(shardKey = _shardKey)
override def withLogger(_logger: E => (Topic.URL => F[Unit])): Options[F, E] =
copy(logger = _logger)
}
def apply[F[_]: Applicative, E](): Options[F, E] =
OptionsImpl[F, E](
Batching.Disabled,
_ => Producer.MessageKey.Default,
_ => _ => F.unit
)
}

}
13 changes: 4 additions & 9 deletions docs/src/paradox/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,18 @@ import cats.effect._
import fs2.Stream
import cr.pulsar._
import cr.pulsar.schema.utf8._
import org.apache.pulsar.client.api.SubscriptionInitialPosition
import scala.concurrent.duration._

object Demo extends IOApp {

// Pulsar configuration
val config = Config.default
val topic = Topic(config, Topic.Name("my-topic"), Topic.Type.NonPersistent)

// Consumer details
val subs = Subscription(Subscription.Name("my-sub"), Subscription.Type.Shared)
val initPos = SubscriptionInitialPosition.Latest
val config = Config.default
val topic = Topic(config, Topic.Name("my-topic"), Topic.Type.NonPersistent)
val subs = Subscription(Subscription.Name("my-sub"), Subscription.Type.Shared)

val resources: Resource[IO, (Consumer[IO], Producer[IO, String])] =
for {
client <- Pulsar.create[IO](config.serviceUrl)
consumer <- Consumer.create[IO](client, topic, subs, initPos)
consumer <- Consumer.create[IO](client, topic, subs)
producer <- Producer.create[IO, String](client, topic)
} yield (consumer, producer)

Expand Down

0 comments on commit 6e686cb

Please sign in to comment.