diff --git a/concurrent/src/main/scala/tofu/concurrent/Daemon.scala b/concurrent/src/main/scala/tofu/concurrent/Daemon.scala index 07d68ec04..4fe935fa3 100644 --- a/concurrent/src/main/scala/tofu/concurrent/Daemon.scala +++ b/concurrent/src/main/scala/tofu/concurrent/Daemon.scala @@ -7,7 +7,8 @@ import cats.effect.concurrent.{MVar, TryableDeferred} import cats.effect.syntax.bracket._ import cats.syntax.applicativeError._ import cats.tagless.autoApplyK -import cats.{Apply, FlatMap, Monad} +import cats.{Applicative, Apply, FlatMap, Monad} +import tofu.control.ApplicativeZip import tofu.higherKind.Function2K import tofu.syntax.monadic._ import tofu.syntax.start._ @@ -51,7 +52,9 @@ trait DaemonicInstances { self: Daemonic.type => /** instance making Daemons keeping default underlying behaviour*/ implicit def nativeInstance[F[_]: Start: TryableDeferreds: Bracket[*[_], E], E]: Daemonic[F, E] = - mkInstance[F, E](Function2K[Fiber[F, *], Promise[F, E, *], Daemon[F, E, *]]((fib, promise) => new Daemon.Impl(fib, promise))) + mkInstance[F, E]( + Function2K[Fiber[F, *], Promise[F, E, *], Daemon[F, E, *]]((fib, promise) => new Daemon.Impl(fib, promise)) + ) } @autoApplyK @@ -65,7 +68,7 @@ trait Daemon[F[_], E, A] extends Fiber[F, A] { } /** Probably Infinite processes */ -object Daemon { +object Daemon extends DaemonInstances { private[tofu] class Impl[F[_], E, A](process: Fiber[F, A], end: TryableDeferred[F, Exit[E, A]]) extends Daemon[F, E, A] { @@ -80,7 +83,7 @@ object Daemon { extends Impl[F, Throwable, A](process, end) { override def join: F[A] = exit.flatMap { case Exit.Error(e) => e.raiseError - case Exit.Completed(a) => a.pure + case Exit.Completed(a) => a.pure[F] case Exit.Canceled => new TofuCanceledJoinException[F, A](this).raiseError } } @@ -96,14 +99,24 @@ object Daemon { def repeat[F[_]: Monad: Daemonic[*[_], E], E, A, B](step: F[A]): F[Daemon[F, E, B]] = apply(step.foreverM) + def repeatThrow[F[_]: Monad: DaemonicThrow, A, B](step: F[A]): F[DaemonThrow[F, B]] = repeat(step) + + def repeatTask[F[_]: Monad: DaemonicThrow, A](step: F[A]): F[DaemonTask[F]] = repeat(step) + def iterate[F[_]: Monad: Daemonic[*[_], E], E, A, B](init: A)(step: A => F[A]): F[Daemon[F, E, B]] = apply(init.iterateForeverM(step)) + def iterateThrow[F[_]: Monad: DaemonicThrow, A, B](init: A)(step: A => F[A]): F[DaemonThrow[F, B]] = + iterate(init)(step) + + def iterateTask[F[_]: Monad: DaemonicThrow, A](init: A)(step: A => F[A]): F[DaemonTask[F]] = iterate(init)(step) + def state[F[_]: Monad: Daemonic[*[_], E], E, S, A, B](init: S)(state: StateT[F, S, A]): F[Daemon[F, E, B]] = iterate(init)(state.runS) def resource[F[_]: Monad: Daemonic[*[_], E], E, A](daemon: F[Daemon[F, E, A]]): Resource[F, Daemon[F, E, A]] = Resource.make(daemon)(_.cancel) + } final class Actor[F[_], E, A] private (queue: MVar[F, A], val daemon: Daemon[F, E, Void]) { @@ -163,3 +176,37 @@ object Actor { final class TofuCanceledJoinException[F[_], A] private[tofu] (val daemon: Daemon[F, Throwable, A]) extends InterruptedException("trying to join canceled fiber") + +trait DaemonInstances { + implicit def daemonApplicative[F[_], E](implicit F: Monad[F]): Applicative[Daemon[F, E, *]] = + new ApplicativeZip[Daemon[F, E, *]] { + def pure[A](x: A): Daemon[F, E, A] = new Daemon[F, E, A] { + def join: F[A] = F.pure(x) + def cancel: F[Unit] = F.unit + def poll: F[Option[Exit[E, A]]] = F.pure(Some(Exit.Completed(x))) + def exit: F[Exit[E, A]] = F.pure(Exit.Completed(x)) + } + + override def map[A, B](fa: Daemon[F, E, A])(f: A => B): Daemon[F, E, B] = new Daemon[F, E, B] { + def join: F[B] = fa.join.map(f) + def cancel: F[Unit] = fa.cancel + def poll: F[Option[Exit[E, B]]] = fa.poll.map(_.map(_.map(f))) + def exit: F[Exit[E, B]] = fa.exit.map(_.map(f)) + } + + def zipWith[A, B, C](fa: Daemon[F, E, A], fb: Daemon[F, E, B])(f: (A, B) => C): Daemon[F, E, C] = + new Daemon[F, E, C] { + def join: F[C] = fa.join.map2(fb.join)(f) + def cancel: F[Unit] = fa.cancel *> fb.cancel + def poll: F[Option[Exit[E, C]]] = fa.poll.flatMap { + case fe @ (None | Some(_: Exit.Incomplete[E])) => F.pure(fe.asInstanceOf[Option[Exit[E, C]]]) + case Some(Exit.Completed(a)) => + fb.poll.map { + case fe @ (None | Some(_: Exit.Incomplete[E])) => fe.asInstanceOf[Option[Exit[E, C]]] + case Some(Exit.Completed(b)) => Some(Exit.Completed(f(a, b))) + } + } + def exit: F[Exit[E, C]] = fa.exit.map2(fb.exit)(_.map2(_)(f)) + } + } +} diff --git a/concurrent/src/main/scala/tofu/concurrent/Exit.scala b/concurrent/src/main/scala/tofu/concurrent/Exit.scala index 658d1d0df..981551203 100644 --- a/concurrent/src/main/scala/tofu/concurrent/Exit.scala +++ b/concurrent/src/main/scala/tofu/concurrent/Exit.scala @@ -1,16 +1,22 @@ package tofu.concurrent +import cats.{Applicative, Eval, Traverse} import cats.effect.ExitCase +import tofu.control.ApplicativeZip +import tofu.syntax.monadic._ sealed trait Exit[+E, +A] { def exitCase: ExitCase[E] } object Exit { - case object Canceled extends Exit[Nothing, Nothing] { + + sealed trait Incomplete[+E] extends Exit[E, Nothing] + + case object Canceled extends Incomplete[Nothing] { def exitCase = ExitCase.Canceled } - final case class Error[+E](e: E) extends Exit[E, Nothing] { + final case class Error[+E](e: E) extends Incomplete[E] { def exitCase = ExitCase.Error(e) } final case class Completed[+A](a: A) extends Exit[Nothing, A] { @@ -18,4 +24,44 @@ object Exit { } + private[this] object exitInstanceAny extends Traverse[Exit[Any, *]] with ApplicativeZip[Exit[Any, *]] { + def traverse[G[_], A, B](fa: Exit[Any, A])(f: A => G[B])(implicit G: Applicative[G]): G[Exit[Any, B]] = + fa match { + case Canceled => G.pure(Canceled) + case Error(e) => G.pure(Error(e)) + case Completed(a) => f(a).map(Completed(_)) + } + def foldLeft[A, B](fa: Exit[Any, A], b: B)(f: (B, A) => B): B = + fa match { + case Canceled | Error(_) => b + case Completed(a) => f(b, a) + } + def foldRight[A, B](fa: Exit[Any, A], lb: Eval[B])(f: (A, Eval[B]) => Eval[B]): Eval[B] = + fa match { + case Canceled | Error(_) => lb + case Completed(a) => f(a, lb) + } + override def map[A, B](fa: Exit[Any, A])(f: A => B): Exit[Any, B] = + fa match { + case Canceled => Canceled + case e: Error[Any] => e + case Completed(a) => Completed(f(a)) + } + + def zipWith[A, B, C](fa: Exit[Any, A], fb: Exit[Any, B])(f: (A, B) => C): Exit[Any, C] = + fa match { + case Canceled => Canceled + case err: Error[Any] => err + case Completed(a) => + fb match { + case Canceled => Canceled + case err: Error[Any] => err + case Completed(b) => Completed(f(a, b)) + } + } + def pure[A](x: A): Exit[Any, A] = Completed(x) + } + + implicit def exitInstance[E]: Traverse[Exit[E, *]] with Applicative[Exit[E, *]] = + exitInstanceAny.asInstanceOf[Traverse[Exit[E, *]] with Applicative[Exit[E, *]]] } diff --git a/concurrent/src/main/scala/tofu/concurrent/package.scala b/concurrent/src/main/scala/tofu/concurrent/package.scala index d2077996c..fe38c08f9 100644 --- a/concurrent/src/main/scala/tofu/concurrent/package.scala +++ b/concurrent/src/main/scala/tofu/concurrent/package.scala @@ -15,4 +15,9 @@ package object concurrent { def newSemaphore[F[_]: Semaphores] = MakeSemaphore[F, F] def newVar[F[_]: MVars] = MakeMVar[F, F] def newDeffered[F[_]: Deferreds, A] = MakeDeferred[F, F, A] + + type DaemonThrow[F[_], A] = Daemon[F, Throwable, A] + type DaemonTask[F[_]] = DaemonThrow[F, Unit] + + type DaemonicThrow[F[_]] = Daemonic[F, Throwable] } diff --git a/concurrent/src/test/scala/tofu/concurrent/SyntaxCheck.scala b/concurrent/src/test/scala/tofu/concurrent/SyntaxCheck.scala new file mode 100644 index 000000000..ebcc1e686 --- /dev/null +++ b/concurrent/src/test/scala/tofu/concurrent/SyntaxCheck.scala @@ -0,0 +1,9 @@ +package tofu +package concurrent +import tofu.lift.Lift +import tofu.syntax.lift._ + +object SyntaxCheck { + def liftDaemon[F[_], G[_], E, A](daemon: Daemon[F, E, A])(implicit lift: Lift[F, G]): Daemon[G, E, A] = + daemon.lift2[G] +} diff --git a/core/src/main/scala/tofu/control/ApplicativeZip.scala b/core/src/main/scala/tofu/control/ApplicativeZip.scala new file mode 100644 index 000000000..61c016b50 --- /dev/null +++ b/core/src/main/scala/tofu/control/ApplicativeZip.scala @@ -0,0 +1,14 @@ +package tofu.control +import cats.{Applicative, Apply} + +/** mix-in for Apply instances via `map2`*/ +trait ApplyZip[F[_]] extends Apply[F] { + def zipWith[A, B, C](fa: F[A], fb: F[B])(f: (A, B) => C): F[C] + + def ap[A, B](ff: F[A => B])(fa: F[A]): F[B] = zipWith(ff, fa)(_ apply _) +} + +/** mix-in for Applicative instances via `map2`*/ +trait ApplicativeZip[F[_]] extends ApplyZip[F] with Applicative[F] { + override def map[A, B](fa: F[A])(f: A => B): F[B] = zipWith(fa, unit)((a, _) => f(a)) +} diff --git a/core/src/main/scala/tofu/syntax/lift.scala b/core/src/main/scala/tofu/syntax/lift.scala index 6c158f221..4d80c3548 100644 --- a/core/src/main/scala/tofu/syntax/lift.scala +++ b/core/src/main/scala/tofu/syntax/lift.scala @@ -1,6 +1,6 @@ package tofu.syntax -import cats.Functor +import cats.{Functor, ~>} import cats.effect.concurrent.{Deferred, MVar, Ref, Semaphore} import tofu.lift.{IsoK, Lift, Unlift} import cats.tagless.{FunctorK, InvariantK} @@ -25,10 +25,32 @@ object lift { G.map(unlift.unlift)(backf => semaphore.imapK(unlift.liftF, backf)) } - implicit class CatsFunctorKLiftSyntax[T[_[_]], F[_]](val tf: T[F]) extends AnyVal { + implicit class CatsTaglessLiftSyntax[T[_[_]], F[_]](val tf: T[F]) extends AnyVal { def lift[G[_]](implicit lift: Lift[F, G], fk: FunctorK[T]): T[G] = fk.mapK(tf)(lift.liftF) def ilift[G[_]](implicit lift: IsoK[F, G], fk: InvariantK[T]): T[G] = fk.imapK(tf)(lift.tof)(lift.fromF) def unlift[G[_]](implicit unlift: Unlift[F, G], G: Functor[G], fk: InvariantK[T]): G[T[G]] = G.map(unlift.unlift)(backf => fk.imapK(tf)(unlift.liftF)(backf)) } + + implicit class CatsTagless1LiftSyntax[T[_[_], _], F[_], A](val tf: T[F, A]) extends AnyVal { + def mapK1[G[_]](f: F ~> G)(implicit fk: FunctorK[T[*[_], A]]): T[G, A] = fk.mapK(tf)(f) + def imapK1[G[_]](f: F ~> G)(g: G ~> F)(implicit fk: InvariantK[T[*[_], A]]): T[G, A] = fk.imapK(tf)(f)(g) + + def lift1[G[_]](implicit lift: Lift[F, G], fk: FunctorK[T[*[_], A]]): T[G, A] = fk.mapK(tf)(lift.liftF) + def ilift1[G[_]](implicit lift: IsoK[F, G], fk: InvariantK[T[*[_], A]]): T[G, A] = + fk.imapK(tf)(lift.tof)(lift.fromF) + def unlift1[G[_]](implicit unlift: Unlift[F, G], G: Functor[G], fk: InvariantK[T[*[_], A]]): G[T[G, A]] = + G.map(unlift.unlift)(backf => fk.imapK(tf)(unlift.liftF)(backf)) + } + + implicit class CatsTagless2LiftSyntax[T[_[_], _, _], F[_], A, B](val tf: T[F, A, B]) extends AnyVal { + def mapK2[G[_]](f: F ~> G)(implicit fk: FunctorK[T[*[_], A, B]]): T[G, A, B] = fk.mapK(tf)(f) + def imapK2[G[_]](f: F ~> G)(g: G ~> F)(implicit fk: InvariantK[T[*[_], A, B]]): T[G, A, B] = fk.imapK(tf)(f)(g) + + def lift2[G[_]](implicit lift: Lift[F, G], fk: FunctorK[T[*[_], A, B]]): T[G, A, B] = fk.mapK(tf)(lift.liftF) + def ilift2[G[_]](implicit lift: IsoK[F, G], fk: InvariantK[T[*[_], A, B]]): T[G, A, B] = + fk.imapK(tf)(lift.tof)(lift.fromF) + def unlift2[G[_]](implicit unlift: Unlift[F, G], G: Functor[G], fk: InvariantK[T[*[_], A, B]]): G[T[G, A, B]] = + G.map(unlift.unlift)(backf => fk.imapK(tf)(unlift.liftF)(backf)) + } }