Skip to content

Commit

Permalink
Continuational representation of ConnectionIO (#698)
Browse files Browse the repository at this point in the history
* Continuational representation of ConnectionIO

* fix import

* revert `contextual` (still deprecated)

* def -> val

* rework implicits

* make Txr 2-parametric, deprecate more stuff
  • Loading branch information
catostrophe authored Jul 15, 2021
1 parent 0771ccc commit b6d67ff
Show file tree
Hide file tree
Showing 13 changed files with 237 additions and 119 deletions.

This file was deleted.

This file was deleted.

18 changes: 18 additions & 0 deletions modules/doobie/src/main/scala/tofu/doobie/ConnectionCIO.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package tofu.doobie

import cats.data.Kleisli
import cats.~>
import doobie.ConnectionIO
import tofu.kernel.types.AnyK

object ConnectionCIO {
trait Cont[F[_]] extends (ConnectionIO ~> F)
object Cont {
private val liftConnectionIOToConnectionCIOAny: LiftConnectionIO[ConnectionCIO[AnyK, *]] =
new LiftConnectionIO[ConnectionCIO[AnyK, *]] {
def lift[A](ca: ConnectionIO[A]): ConnectionCIO[AnyK, A] = Kleisli(k => k(ca))
}
@inline final implicit def liftConnectionIOToConnectionCIO[F[_]]: LiftConnectionIO[ConnectionCIO[F, *]] =
liftConnectionIOToConnectionCIOAny.asInstanceOf[LiftConnectionIO[ConnectionCIO[F, *]]]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import cats.effect.syntax.effect._
import cats.effect.syntax.syncEffect._
import doobie.ConnectionIO
import doobie.free.connection.AsyncConnectionIO
import tofu.HasProvide
import tofu.WithProvide
import tofu.doobie.ConnectionRIO
import tofu.lift.Lift

Expand All @@ -22,7 +22,7 @@ private[instances] trait DoobieInstances {
new LiftToConnectionRIO

final def liftProvideToConnectionRIO[F[_], G[_], R](implicit
HP: HasProvide[G, F, R],
WP: WithProvide[G, F, R],
L: Lift[F, ConnectionIO]
): LiftProvideToConnectionRIO[F, G, R] = new LiftProvideToConnectionRIO
}
Expand All @@ -43,7 +43,7 @@ final class LiftToConnectionRIO[F[_], R](implicit L: Lift[F, ConnectionIO]) exte
def lift[A](fa: F[A]): ConnectionRIO[R, A] = ReaderT.liftF(L.lift(fa))
}

final class LiftProvideToConnectionRIO[F[_], G[_], R](implicit HP: HasProvide[G, F, R], L: Lift[F, ConnectionIO])
final class LiftProvideToConnectionRIO[F[_], G[_], R](implicit WP: WithProvide[G, F, R], L: Lift[F, ConnectionIO])
extends Lift[G, ConnectionRIO[R, *]] {
def lift[A](fa: G[A]): ConnectionRIO[R, A] = ReaderT(ctx => L.lift(HP.runContext(fa)(ctx)))
def lift[A](fa: G[A]): ConnectionRIO[R, A] = ReaderT(ctx => L.lift(WP.runContext(fa)(ctx)))
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package tofu.doobie.instances

import cats.effect.{Effect, IO, SyncEffect}
import doobie.ConnectionIO
import tofu.WithProvide
import tofu.lift.Lift

object implicits extends DoobieImplicits1
Expand All @@ -11,7 +12,7 @@ private[instances] trait DoobieImplicits1 extends DoobieImplicits2 {
liftToConnectionIOViaIO
}

private[instances] trait DoobieImplicits2 extends DoobieImplicitsScalaVersionSpecific {
private[instances] trait DoobieImplicits2 extends DoobieImplicits3 {
@inline final implicit def liftEffectToConnectionIOImplicit[F[_]: Effect]: LiftEffectToConnectionIO[F] =
liftEffectToConnectionIO

Expand All @@ -22,3 +23,10 @@ private[instances] trait DoobieImplicits2 extends DoobieImplicitsScalaVersionSpe
L: Lift[F, ConnectionIO]
): LiftToConnectionRIO[F, R] = liftToConnectionRIO
}

private[instances] trait DoobieImplicits3 {
@inline final implicit def liftProvideToConnectionRIOImplicit[F[_], G[_], R](implicit
WP: WithProvide[G, F, R],
L: Lift[F, ConnectionIO]
): LiftProvideToConnectionRIO[F, G, R] = liftProvideToConnectionRIO
}
5 changes: 4 additions & 1 deletion modules/doobie/src/main/scala/tofu/doobie/package.scala
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package tofu

import _root_.doobie.ConnectionIO
import cats.data.ReaderT
import cats.data.{Kleisli, ReaderT}
import tofu.lift.Lift

package object doobie {

/** A contextual database effect with environment of type R. */
type ConnectionRIO[R, A] = ReaderT[ConnectionIO, R, A]

/** A continuational form of `ConnectionIO` equivalent to `[x] =>> ConnectionIO ~> F => F[x]`. */
type ConnectionCIO[F[_], A] = Kleisli[F, ConnectionCIO.Cont[F], A]

/** A typeclass for lifting `ConnectionIO` into extended database effects such as `ConnectionRIO`. */
type LiftConnectionIO[DB[_]] = Lift[ConnectionIO, DB]
}
125 changes: 81 additions & 44 deletions modules/doobie/src/main/scala/tofu/doobie/transactor/Txr.scala
Original file line number Diff line number Diff line change
@@ -1,32 +1,31 @@
package tofu.doobie
package transactor

import cats.data.ReaderT
import cats.{Monad, ~>}
import cats.data.{Kleisli, ReaderT}
import cats.effect.Resource
import cats.{Defer, Monad, ~>}
import doobie.{ConnectionIO, Transactor}
import fs2.Stream
import tofu.lift.Lift
import tofu.syntax.funk._
import tofu.{BracketThrow, HasContext}
import tofu.syntax.monadic._
import tofu.{BracketThrow, WithContext}

import scala.annotation.nowarn

/** A simple facade for [[doobie.Transactor]] that holds an inner database effect type `DB[_]` and provides
* natural transformations from this effect to the target effect `F[_]`.
*
* The motivation for using this facade instead of `Transactor` is to:
* 1) initialize all its natural transformations early and remove the need for additional constraints
* on `F[_]` later (e.g. `Bracket`);
* 2) be able to use another `DB[_]` effect besides `ConnectionIO` and build a layer of transactional logic
* with it.
*
* N.B. The only extended DB effect provided below is `ConnectionRIO`. There are intentionally no effects
* based on `EitherT` to provide a separate error channel for business-errors. This is due to the (bad) design
* of Cats Effect. Doobie uses `Throwable`-bounded [[cats.effect.Resource]] and [[fs2.Stream]] under the hood
* for applying a [[doobie.util.transactor.Strategy]], and throwing an error through `EitherT`'s left channel
* does not lead to the proper `ExitCase` of a bracket. All errors on the DB layer must be thrown and handled
* via the `Throwable` channel of `ConnectionIO`.
* - initialize all its natural transformations early and remove the need for additional constraints
* on `F[_]` later (e.g. `Bracket`);
*
* - be able to use another `DB[_]` effect besides `ConnectionIO` and build a layer of transactional logic
* with it.
*/
trait Txr[F[_]] {
type DB[_]
trait Txr[F[_], DB0[_]] {
type DB[x] >: DB0[x] <: DB0[x]

/** Interprets `DB` into `F`, applying the transactional strategy. */
def trans: DB ~> F
Expand All @@ -42,46 +41,84 @@ trait Txr[F[_]] {
}

object Txr {
type Aux[F[_], DB0[_]] = Txr[F] { type DB[x] >: DB0[x] <: DB0[x] }
type Plain[F[_]] = Aux[F, ConnectionIO]
type Lifted[F[_]] = Aux[F, ConnectionIO]
type Contextual[F[_], Ctx] = Aux[F, ConnectionRIO[Ctx, *]]
type Plain[F[_]] = Txr[F, ConnectionIO]
type Continuational[F[_]] = Txr[F, ConnectionCIO[F, *]]

def Aux[F[_], DB[_]](implicit ev: Aux[F, DB]): Aux[F, DB] = ev
def Plain[F[_]](implicit ev: Plain[F]): Plain[F] = ev
def Lifted[F[_]](implicit ev: Lifted[F]): Lifted[F] = ev
def Contextual[F[_], Ctx](implicit ev: Contextual[F, Ctx]): Contextual[F, Ctx] = ev
def apply[F[_], DB[_]](implicit ev: Txr[F, DB]): Txr[F, DB] = ev
def Plain[F[_]](implicit ev: Plain[F]): Plain[F] = ev
def Continuational[F[_]](implicit ev: Continuational[F]): Continuational[F] = ev

/** Creates a plain facade that preserves the effect of `Transactor` with `ConnectionIO` as the database effect.
*/
def plain[F[_]: BracketThrow](t: Transactor[F]): Txr.Plain[F] =
new Txr[F] {
type DB[x] = ConnectionIO[x]
new Txr.Plain[F] {
val trans: ConnectionIO ~> F = t.trans
val rawTrans: ConnectionIO ~> F = t.rawTrans

def trans: ConnectionIO ~> F = t.trans
def rawTrans: ConnectionIO ~> F = t.rawTrans
val transP: Stream[ConnectionIO, *] ~> Stream[F, *] = t.transP
val rawTransP: Stream[ConnectionIO, *] ~> Stream[F, *] = t.rawTransP
}

def transP: Stream[ConnectionIO, *] ~> Stream[F, *] = t.transP
def rawTransP: Stream[ConnectionIO, *] ~> Stream[F, *] = t.rawTransP
/** Creates a facade that uses `ConnectionCIO` as the database effect.
*/
def continuational[F[_]: BracketThrow: Defer](t: Transactor[F]): Txr.Continuational[F] =
new Txr.Continuational[F] {
val trans: ConnectionCIO[F, *] ~> F = makeTrans(true)
val rawTrans: ConnectionCIO[F, *] ~> F = makeTrans(false)

val transP: Stream[ConnectionCIO[F, *], *] ~> Stream[F, *] = makeTransP(true)
val rawTransP: Stream[ConnectionCIO[F, *], *] ~> Stream[F, *] = makeTransP(false)

private def interpret(withStrategy: Boolean): Resource[F, ConnectionCIO.Cont[F]] = for {
c <- t.connect(t.kernel)
f = new ConnectionCIO.Cont[F] {
def apply[A](ca: ConnectionIO[A]): F[A] = ca.foldMap(t.interpret).run(c)
}
_ <- withStrategy.when_(t.strategy.resource.mapK(f))
} yield f

private def makeTrans(withStrategy: Boolean): ConnectionCIO[F, *] ~> F =
funK(ccio => interpret(withStrategy).use(ccio.run))

private def makeTransP(withStrategy: Boolean): Stream[ConnectionCIO[F, *], *] ~> Stream[F, *] =
funK(s =>
Stream
.resource(interpret(withStrategy))
.flatMap(fk => s.translate(Kleisli.applyK[F, ConnectionCIO.Cont[F]](fk)))
)
}

@deprecated("Use `Txr[F, DB]` instead", since = "0.10.3")
type Aux[F[_], DB[_]] = Txr[F, DB]
@deprecated("Use `Transactor.mapK` and `Txr.Plain[F]` instead", since = "0.10.3")
type Lifted[F[_]] = Txr[F, ConnectionIO]
@deprecated("Use `Transactor.mapK` and `Txr.Continuational[F]` instead", since = "0.10.3")
type Contextual[F[_], Ctx] = Txr[F, ConnectionRIO[Ctx, *]]

@deprecated("Use `Txr[F, DB]` instead", since = "0.10.3")
def Aux[F[_], DB[_]](implicit ev: Aux[F, DB]): Aux[F, DB] = ev
@deprecated("Use `Transactor.mapK` and `Txr.plain` instead", since = "0.10.3")
def Lifted[F[_]](implicit ev: Lifted[F]): Lifted[F] = ev
@deprecated("Use `Transactor.mapK` and `Txr.continuational` as a better alternative", since = "0.10.3")
def Contextual[F[_], Ctx](implicit ev: Contextual[F, Ctx]): Contextual[F, Ctx] = ev

/** Creates a facade that lifts the effect of `Transactor` from `F[_]` to `G[_]` with `ConnectionIO` as the database
* effect.
*/
@deprecated("Use `Transactor.mapK` and `Txr.plain` instead", since = "0.10.3")
def lifted[G[_]]: LiftedPA[G] = new LiftedPA[G]

@nowarn("cat=deprecation")
private[transactor] final class LiftedPA[G[_]](private val dummy: Boolean = true) extends AnyVal {
def apply[F[_]: BracketThrow](t: Transactor[F])(implicit L: Lift[F, G]): Txr.Lifted[G] =
new Txr[G] {
type DB[x] = ConnectionIO[x]

def trans: ConnectionIO ~> G = liftTrans(t.trans)
def rawTrans: ConnectionIO ~> G = liftTrans(t.rawTrans)
new Txr.Lifted[G] {
val trans: ConnectionIO ~> G = liftTrans(t.trans)
val rawTrans: ConnectionIO ~> G = liftTrans(t.rawTrans)

private def liftTrans(fk: ConnectionIO ~> F): ConnectionIO ~> G = fk andThen L.liftF

def transP: Stream[ConnectionIO, *] ~> Stream[G, *] = liftTransP(t.transP)
def rawTransP: Stream[ConnectionIO, *] ~> Stream[G, *] = liftTransP(t.rawTransP)
val transP: Stream[ConnectionIO, *] ~> Stream[G, *] = liftTransP(t.transP)
val rawTransP: Stream[ConnectionIO, *] ~> Stream[G, *] = liftTransP(t.rawTransP)

private def liftTransP(fk: Stream[ConnectionIO, *] ~> Stream[F, *]): Stream[ConnectionIO, *] ~> Stream[G, *] =
funK(s => fk(s).translate(L.liftF))
Expand All @@ -91,23 +128,23 @@ object Txr {
/** Creates a contextual facade that lifts the effect of `Transactor` from `F[_]` to `G[_]` given `G HasContext R`
* with `ConnectionRIO[R, *]` as the database effect.
*/
@deprecated("Use `Transactor.mapK` and `Txr.continuational` as a better alternative", since = "0.10.3")
def contextual[G[_]]: ContextualPA[G] = new ContextualPA[G]

@nowarn("cat=deprecation")
private[transactor] final class ContextualPA[G[_]](private val dummy: Boolean = true) extends AnyVal {
def apply[F[_]: BracketThrow, R](
t: Transactor[F]
)(implicit L: Lift[F, G], G: Monad[G], C: G HasContext R): Txr.Contextual[G, R] =
new Txr[G] {
type DB[x] = ConnectionRIO[R, x]

def trans: ConnectionRIO[R, *] ~> G = liftTrans(t.trans)
def rawTrans: ConnectionRIO[R, *] ~> G = liftTrans(t.rawTrans)
)(implicit L: Lift[F, G], G: Monad[G], C: G WithContext R): Txr.Contextual[G, R] =
new Txr.Contextual[G, R] {
val trans: ConnectionRIO[R, *] ~> G = liftTrans(t.trans)
val rawTrans: ConnectionRIO[R, *] ~> G = liftTrans(t.rawTrans)

private def liftTrans(fk: ConnectionIO ~> F): ConnectionRIO[R, *] ~> G =
funK(crio => C.askF(ctx => L.lift(fk(crio.run(ctx)))))

def transP: Stream[ConnectionRIO[R, *], *] ~> Stream[G, *] = liftTransPK(t.transPK)
def rawTransP: Stream[ConnectionRIO[R, *], *] ~> Stream[G, *] = liftTransPK(t.rawTransPK)
val transP: Stream[ConnectionRIO[R, *], *] ~> Stream[G, *] = liftTransPK(t.transPK)
val rawTransP: Stream[ConnectionRIO[R, *], *] ~> Stream[G, *] = liftTransPK(t.rawTransPK)

private def liftTransPK(
fk: Stream[ConnectionRIO[R, *], *] ~> Stream[ReaderT[F, R, *], *]
Expand Down
8 changes: 4 additions & 4 deletions modules/doobie/src/main/scala/tofu/syntax/doobie/txr.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ private[doobie] trait TxrSyntax {
}

private[doobie] final class TxrOps[DB[_], A](private val dba: DB[A]) extends AnyVal {
def trans[F[_]](implicit txr: Txr.Aux[F, DB]): F[A] = txr.trans(dba)
def rawTrans[F[_]](implicit txr: Txr.Aux[F, DB]): F[A] = txr.rawTrans(dba)
def trans[F[_]](implicit txr: Txr[F, DB]): F[A] = txr.trans(dba)
def rawTrans[F[_]](implicit txr: Txr[F, DB]): F[A] = txr.rawTrans(dba)
}

private[doobie] final class TxrStreamOps[DB[_], A](private val sdba: Stream[DB, A]) {
def trans[F[_]](implicit txr: Txr.Aux[F, DB]): Stream[F, A] = txr.transP(sdba)
def rawTrans[F[_]](implicit txr: Txr.Aux[F, DB]): Stream[F, A] = txr.rawTransP(sdba)
def trans[F[_]](implicit txr: Txr[F, DB]): Stream[F, A] = txr.transP(sdba)
def rawTrans[F[_]](implicit txr: Txr[F, DB]): Stream[F, A] = txr.rawTransP(sdba)
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,23 @@ import zio.interop.catz._

object DoobieInstancesSuite {

def summonImplicitsViaLiftToIO[F[_]: Applicative, R](implicit L: Lift[F, IO]): Unit = {
def summonImplicitsViaLiftToIO[F[_]: Applicative, R](implicit L: Lift[F, IO]): Any = {
Lift[F, ConnectionIO]
Lift[F, ConnectionRIO[R, *]]
Lift[ReaderT[F, R, *], ConnectionRIO[R, *]]
()
}

def summonCatsEffectImplicits[R](): Unit = {
def summonCatsEffectImplicits[R](): Any = {
Lift[SyncIO, ConnectionIO]
Lift[SyncIO, ConnectionRIO[R, *]]
Lift[ReaderT[SyncIO, R, *], ConnectionRIO[R, *]]

Lift[IO, ConnectionIO]
Lift[IO, ConnectionRIO[R, *]]
Lift[ReaderT[IO, R, *], ConnectionRIO[R, *]]

()
}

def summonMonixImplicitsViaScheduler[R](implicit sc: Scheduler): Unit = {
def summonMonixImplicitsViaScheduler[R](implicit sc: Scheduler): Any = {
Lift[Coeval, ConnectionIO]
Lift[Coeval, ConnectionRIO[R, *]]
Lift[ReaderT[Coeval, R, *], ConnectionRIO[R, *]]
Expand All @@ -44,29 +41,29 @@ object DoobieInstancesSuite {
Lift[ReaderT[Task, R, *], ConnectionRIO[R, *]]

Lift[Env[R, *], ConnectionRIO[R, *]]

()
}

def summonMonixImplicitsUnambiguously[R](implicit @unused sc: Scheduler, L: Lift[Task, IO]): Unit = {
def summonMonixImplicitsUnambiguously[R](implicit @unused sc: Scheduler, L: Lift[Task, IO]): Any = {
Lift[Task, ConnectionIO]
Lift[Task, ConnectionRIO[R, *]]
Lift[ReaderT[Task, R, *], ConnectionRIO[R, *]]
()
}

def summonZioImplicits[R](): zio.Task[Unit] =
def summonZioImplicits[R](): zio.Task[Any] =
zio.Task.concurrentEffect.map { implicit CE =>
Lift[zio.Task, ConnectionIO]
Lift[zio.Task, ConnectionRIO[R, *]]
Lift[zio.RIO[R, *], ConnectionRIO[R, *]]
()
}

def summonLiftConnectionIO[R](): Unit = {
def summonLiftConnectionIO[R, F[_]](): Any = {
LiftConnectionIO[ConnectionIO]
LiftConnectionIO[ConnectionRIO[R, *]]
()
LiftConnectionIO[ConnectionCIO[F, *]]
}

def summonLiftToConnectionCIO[F[_]](): Any = {
Lift[F, ConnectionCIO[F, *]]
}

}
Loading

0 comments on commit b6d67ff

Please sign in to comment.