Skip to content

Commit

Permalink
Merge pull request #74 from cr-org/type-level-builders
Browse files Browse the repository at this point in the history
Type-level builders
  • Loading branch information
gvolpe authored Sep 16, 2020
2 parents 7554fd8 + 3ace7bb commit f1bbca0
Show file tree
Hide file tree
Showing 8 changed files with 201 additions and 105 deletions.
2 changes: 2 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import Dependencies._
import Settings._

scalaVersion in ThisBuild := "2.13.2"

lazy val `neutron-core` = (project in file("core"))
.enablePlugins(AutomateHeaderPlugin)
.settings(commonSettings)
Expand Down
28 changes: 22 additions & 6 deletions core/src/it/scala/cr/pulsar/PulsarSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,16 +26,27 @@ import java.util.UUID

class PulsarSpec extends PulsarSuite {

val sub = (s: String) => Subscription(Subscription.Name(s)).withType(Subscription.Type.Failover)
val topic = (s: String) => Topic(Topic.Name(s), cfg)
val sub = (s: String) =>
Subscription.Builder
.withName(Subscription.Name(s))
.withType(Subscription.Type.Failover)
.build

val topic = (s: String) =>
Topic.Builder
.withName(Topic.Name(s))
.withConfig(cfg)
.build

val batch = Producer.Batching.Disabled
val shard = (_: Event) => Producer.MessageKey.Default

withPulsarClient { client =>
test("A message is published and consumed successfully") {
val res: Resource[IO, (Consumer[IO, Event], Producer[IO, Event])] =
for {
consumer <- Consumer.create[IO, Event](client, topic("happy-path"), sub("happy-path"))
consumer <- Consumer
.create[IO, Event](client, topic("happy-path"), sub("happy-path"))
producer <- Producer.create[IO, Event](client, topic("happy-path"))
} yield consumer -> producer

Expand Down Expand Up @@ -68,7 +79,8 @@ class PulsarSpec extends PulsarSuite {
test("A message is published and nack'ed when consumer fails to decode it") {
val res: Resource[IO, (Consumer[IO, Event], Producer[IO, String])] =
for {
consumer <- Consumer.create[IO, Event](client, topic("auto-nack"), sub("auto-nack"))
consumer <- Consumer
.create[IO, Event](client, topic("auto-nack"), sub("auto-nack"))
producer <- Producer.create[IO, String](client, topic("auto-nack"))
} yield consumer -> producer

Expand All @@ -80,7 +92,8 @@ class PulsarSpec extends PulsarSuite {
val consume =
consumer.autoSubscribe
.handleErrorWith {
case Consumer.DecodingFailure(data) => Stream.eval(latch.complete(data))
case Consumer.DecodingFailure(data) =>
Stream.eval(latch.complete(data))
}

val testMessage = "Consumer will fail to decode this message"
Expand All @@ -105,7 +118,10 @@ class PulsarSpec extends PulsarSuite {
) {
val makeSub =
(n: String) =>
Subscription(Subscription.Name(n)).withType(Subscription.Type.KeyShared)
Subscription.Builder
.withName(Subscription.Name(n))
.withType(Subscription.Type.KeyShared)
.build

val opts =
Producer.Options[IO, Event]().withShardKey(_.shardKey).withBatching(batch)
Expand Down
5 changes: 2 additions & 3 deletions core/src/it/scala/cr/pulsar/PulsarSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ abstract class PulsarSuite extends FunSuite {

override def beforeAll(): Unit = {
super.beforeAll()
val (cli, release) =
Pulsar.create[IO](cfg.serviceUrl).allocated.unsafeRunSync()
val (cli, release) = Pulsar.create[IO](cfg.url).allocated.unsafeRunSync()
this.client = cli
this.close = release
latch.complete(()).unsafeRunSync()
Expand Down Expand Up @@ -80,6 +79,6 @@ abstract class PulsarSuite extends FunSuite {
}
}

lazy val cfg = Config.Default
lazy val cfg = Config.Builder.default

}
89 changes: 56 additions & 33 deletions core/src/main/scala/cr/pulsar/Config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package cr.pulsar

import Config._
import io.estatico.newtype.macros._
import scala.annotation.implicitNotFound

/**
* Basic Pulsar configuration to establish
Expand All @@ -26,47 +27,69 @@ import io.estatico.newtype.macros._
sealed abstract class Config {
val tenant: PulsarTenant
val namespace: PulsarNamespace
val serviceUrl: PulsarURL
def withTenant(_tenant: PulsarTenant): Config
def withNamespace(_namespace: PulsarNamespace): Config
def withURL(_url: PulsarURL): Config
val url: PulsarURL
}

object Config {
@newtype case class PulsarTenant(value: String)
@newtype case class PulsarNamespace(value: String)
@newtype case class PulsarURL(value: String)

private case class ConfigImpl(
tenant: PulsarTenant,
namespace: PulsarNamespace,
serviceUrl: PulsarURL
) extends Config {
def withTenant(_tenant: PulsarTenant): Config =
copy(tenant = _tenant)
def withNamespace(_namespace: PulsarNamespace): Config =
copy(namespace = _namespace)
def withURL(_serviceUrl: PulsarURL): Config =
copy(serviceUrl = _serviceUrl)
/**************** Type-level builder ******************/
sealed trait Info
object Info {
sealed trait Empty extends Info
sealed trait Namespace extends Info
sealed trait Tenant extends Info
sealed trait URL extends Info

type Mandatory = Empty with Namespace with Tenant with URL
}

case class ConfigBuilder[I <: Info] protected (
_tenant: PulsarTenant = PulsarTenant(""),
_namespace: PulsarNamespace = PulsarNamespace(""),
_url: PulsarURL = PulsarURL("")
) {
def withTenant(tenant: PulsarTenant): ConfigBuilder[I with Info.Tenant] =
this.copy(_tenant = tenant)

def withNameSpace(namespace: PulsarNamespace): ConfigBuilder[I with Info.Namespace] =
this.copy(_namespace = namespace)

def withURL(url: PulsarURL): ConfigBuilder[I with Info.URL] =
this.copy(_url = url)

/**
* It creates a new configuration.
*/
def build(
implicit @implicitNotFound(
"Tenant, Namespace and URL are mandatory. To create a default configuration, use Config.Builder.default instead."
) ev: I =:= Info.Mandatory
): Config =
new Config {
val tenant = _tenant
val namespace = _namespace
val url = _url
}

/**
* It creates a new configuration with the following default values:
*
* - tenant: "public"
* - namespace: "default"
* - url: "pulsar://localhost:6650"
*/
def default: Config =
Config.Builder
.withTenant(PulsarTenant("public"))
.withNameSpace(PulsarNamespace("default"))
.withURL(PulsarURL("pulsar://localhost:6650"))
.build

}

def apply(
tenant: PulsarTenant,
namespace: PulsarNamespace,
serviceUrl: PulsarURL
): Config = ConfigImpl(tenant, namespace, serviceUrl)
object Builder extends ConfigBuilder[Info.Empty]()

/**
* It creates a default configuration.
*
* - tenant: "public"
* - namespace: "default"
* - url: "pulsar://localhost:6650"
*/
def Default: Config =
ConfigImpl(
PulsarTenant("public"),
PulsarNamespace("default"),
PulsarURL("pulsar://localhost:6650")
)
}
65 changes: 44 additions & 21 deletions core/src/main/scala/cr/pulsar/Subscription.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@ package cr.pulsar

import io.estatico.newtype.macros.newtype
import org.apache.pulsar.client.api.{ SubscriptionMode, SubscriptionType }
import scala.annotation.implicitNotFound

// Builder-style abstract class instead of case class to allow for bincompat-friendly extension in future versions.
sealed abstract class Subscription {
val name: Subscription.Name
val `type`: Subscription.Type
val mode: Subscription.Mode
def withType(_type: Subscription.Type): Subscription
def withMode(_mode: Subscription.Mode): Subscription
}

/**
Expand Down Expand Up @@ -73,25 +71,50 @@ object Subscription {
}
}

private case class SubscriptionImpl(
name: Subscription.Name,
`type`: Subscription.Type,
mode: Subscription.Mode
) extends Subscription {
def withType(_type: Subscription.Type): Subscription =
copy(`type` = _type)
def withMode(_mode: Subscription.Mode): Subscription =
copy(mode = _mode)
/**************** Type-level builder ******************/
sealed trait Info
object Info {
sealed trait Empty extends Info
sealed trait Name extends Info
sealed trait Mode extends Info
sealed trait Type extends Info

type Mandatory = Empty with Name with Mode with Type
}

case class SubscriptionBuilder[I <: Info] protected (
_name: Name = Name(""),
_type: Type = Type.Exclusive,
_mode: Mode = Mode.Durable
) {
def withName(name: Name): SubscriptionBuilder[I with Info.Name] =
this.copy(_name = Name(s"${name.value}-subscription"))

def withMode(mode: Mode): SubscriptionBuilder[I with Info.Mode] =
this.copy(_mode = mode)

def withType(typ: Type): SubscriptionBuilder[I with Info.Type] =
this.copy(_type = typ)

/**
* It creates a subscription with default configuration.
*
* - type: Exclusive
* - mode: Durable
*/
def build(
implicit @implicitNotFound(
"Subscription.Name is mandatory. By default Type=Exclusive and Mode=Durable."
) ev: I =:= Info.Mandatory
): Subscription =
new Subscription {
val name = _name
val `type` = _type
val mode = _mode
}

}

/**
* It creates a subscription with default configuration.
*
* - type: Exclusive
* - mode: Durable
*/
def apply(name: Name): Subscription =
// Same as Java's defaults: https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java#L62
SubscriptionImpl(Name(s"${name.value}-subscription"), Type.Exclusive, Mode.Durable)
object Builder extends SubscriptionBuilder[Info.Empty with Info.Type with Info.Mode]()

}
Loading

0 comments on commit f1bbca0

Please sign in to comment.