From 76e1eac005a08ee86bf15fecc348bfcfbbe959aa Mon Sep 17 00:00:00 2001 From: Lachlan O'Dea Date: Tue, 15 Feb 2022 20:28:52 +1100 Subject: [PATCH] Re-implement using callbacks. (#188) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Re-implement using callbacks. Advantages: * The ZIO → Monix conversion is now lazy * Interruption/cancellation can be propagated * Executes synchronously where possible Also removed the need for the user to provide a Monix scheduler, instead we create one from the ZIO runtime. * Fix tests. * Apply suggestions from code review Co-authored-by: Dejan Mijić --- README.md | 99 +++++------ build.sbt | 2 +- .../src/main/scala/zio/interop/monix.scala | 23 --- .../scala/zio/interop/monix/package.scala | 109 ++++++++++++ .../test/scala/zio/interop/MonixSpec.scala | 86 ---------- .../zio/interop/monix/MonixTaskSpec.scala | 162 ++++++++++++++++++ 6 files changed, 319 insertions(+), 162 deletions(-) delete mode 100644 interop-monix/shared/src/main/scala/zio/interop/monix.scala create mode 100644 interop-monix/shared/src/main/scala/zio/interop/monix/package.scala delete mode 100644 interop-monix/shared/src/test/scala/zio/interop/MonixSpec.scala create mode 100644 interop-monix/shared/src/test/scala/zio/interop/monix/MonixTaskSpec.scala diff --git a/README.md b/README.md index 5a5ce6e..2914d8c 100644 --- a/README.md +++ b/README.md @@ -1,92 +1,87 @@ -# Interop Monix +# Monix Interoperability for ZIO | Project Stage | CI | Release | Snapshot | Discord | | --- | --- | --- | --- | --- | | [![Project stage][Stage]][Stage-Page] | ![CI][Badge-CI] | [![Release Artifacts][Badge-SonatypeReleases]][Link-SonatypeReleases] | [![Snapshot Artifacts][Badge-SonatypeSnapshots]][Link-SonatypeSnapshots] | [![Discord][Badge-Discord]][Link-Discord] | -## Task conversions +This library provides interoperability between **Monix 3.4** and **ZIO 1 and ZIO 2**. -Interop layer provides the following conversions: +## Tasks -- from `Task[A]` to `UIO[Task[A]]` -- from `Task[A]` to `Task[A]` - -To convert an `IO` value to `Task`, use the following method: +Monix tasks can be converted to ZIO tasks: ```scala -def toTask: UIO[eval.Task[A]] -``` +import zio._ +import zio.interop.monix._ +import monix.eval -To perform conversion in other direction, use the following extension method -available on `IO` companion object: +val monixTask: eval.Task[String] = ??? -```scala -def fromTask[A](task: eval.Task[A])(implicit scheduler: Scheduler): Task[A] +val zioTask: Task[String] = ZIO.fromMonixTask(monixTask) ``` -Note that in order to convert the `Task` to an `IO`, an appropriate `Scheduler` -needs to be available. +The conversion is lazy: the Monix task will only be executed if the returned ZIO task is executed. -### Example +ZIO tasks can be converted to Monix tasks: ```scala -import monix.eval.Task -import monix.execution.Scheduler.Implicits.global -import zio.{ IO, DefaultRuntime } +import zio._ import zio.interop.monix._ +import monix.eval +import monix.execution.Scheduler.Implicits.global -object UnsafeExample extends DefaultRuntime { - def main(args: Array[String]): Unit = { - val io1 = IO.succeed(10) - val t1 = unsafeRun(io1.toTask) - - t1.runToFuture.foreach(r => println(s"IO to task result is $r")) +val zioTask: Task[String] = ??? - val t2 = Task(10) - val io2 = IO.fromTask(t2).map(r => s"Task to IO result is $r") +val createMonixTask: UIO[eval.Task[String]] = zioTask.toMonixTask() - println(unsafeRun(io2)) - } -} +// illustrative, you wouldn't usually do things this way +val monixTask: eval.Task[String] = Runtime.default.unsafeRun(createMonixTask) +val stringResult = monixTask.runSyncUnsafe ``` -## Coeval conversions +The conversion is lazy: the ZIO effect so converted will only be executed if the returned Monix task is executed. -To convert an `IO` value to `Coeval`, use the following method: +Sometimes you need to provide a Monix task in a context where using a ZIO effect is difficult. For example, when an API requires you to provide a function that returns a Monix task. In these situations, the `toMonixTaskUsingRuntime` method can be used: ```scala -def toCoeval: UIO[eval.Coeval[A]] -``` +import zio._ +import zio.interop.monix._ +import monix.eval -To perform conversion in other direction, use the following extension method -available on `IO` companion object: +def monixBasedApi(f: String => eval.Task[Unit]): eval.Task[Unit] = ??? -```scala -def fromCoeval[A](coeval: eval.Coeval[A]): Task[A] +def zioBasedProcessor(s: String): Task[Unit] = ??? + +val zioEffects = for { + zioRuntime <- ZIO.runtime[Any] + monixTask = + _ <- ZIO.fromMonixTask { + monixBasedApi(s => + zioBasedProcessor(s).toMonixTaskUsingRuntime(zioRuntime) + ) + } +} yield () ``` -### Example +Cancellation/Interruption is propagated between the effect systems. Interrupting a ZIO task based on a Monix task will cancel the underlying Monix task and vice-versa. Be aware that ZIO interruption does not return until cancellation effects have completed, whereas Monix cancellation returns as soon as the signal is sent, without waiting for the cancellation effects to complete. -```scala -import monix.eval.Coeval -import zio.{ IO, DefaultRuntime } -import zio.interop.monix._ +## Monix Scheduler -object UnsafeExample extends DefaultRuntime { - def main(args: Array[String]): Unit = { - val io1 = IO.succeed(10) - val c1 = unsafeRun(io1.toCoeval) +Sometimes it is useful to have a Monix `Scheduler` available for interop purposes. The `Runtime#monixScheduler` method will create a scheduler that shares its execution context with the ZIO runtime: - println(s"IO to coeval result is ${c1.value}") +```scala +import zio._ +import zio.interop.monix._ +import monix.execution.Scheduler - val c2 = Coeval(10) - val io2 = IO.fromCoeval(c2).map(r => s"Coeval to IO result is $r") +ZIO.runtime[Any].flatMap { runtime => + implicit val monixScheduler: Scheduler = runtime.monixScheduler() - println(unsafeRun(io2)) - } + // do Monixy things } ``` + [Badge-CI]: https://github.com/zio/interop-monix/workflows/CI/badge.svg [Badge-Discord]: https://img.shields.io/discord/629491597070827530?logo=discord [Badge-SonatypeReleases]: https://img.shields.io/nexus/r/https/oss.sonatype.org/dev.zio/zio-interop-monix_2.12.svg diff --git a/build.sbt b/build.sbt index 9dddc4e..596bea6 100644 --- a/build.sbt +++ b/build.sbt @@ -44,7 +44,7 @@ lazy val interopMonix = crossProject(JSPlatform, JVMPlatform) libraryDependencies ++= Seq( "io.monix" %%% "monix" % "3.4.0", "dev.zio" %%% "zio" % "1.0.13", - "dev.zio" %%% "zio-test" % "1.0.13", + "dev.zio" %%% "zio-test" % "1.0.13" % Test, "dev.zio" %%% "zio-test-sbt" % "1.0.13" % Test ) ) diff --git a/interop-monix/shared/src/main/scala/zio/interop/monix.scala b/interop-monix/shared/src/main/scala/zio/interop/monix.scala deleted file mode 100644 index ba6b870..0000000 --- a/interop-monix/shared/src/main/scala/zio/interop/monix.scala +++ /dev/null @@ -1,23 +0,0 @@ -package zio.interop - -import _root_.monix.eval -import _root_.monix.execution.Scheduler -import zio.{ IO, Task, UIO } - -package object monix { - implicit class IOObjOps(private val obj: IO.type) extends AnyVal { - def fromTask[A](task: eval.Task[A])(implicit scheduler: Scheduler): Task[A] = - Task.fromFuture(_ => task.runToFuture) - - def fromCoeval[A](coeval: eval.Coeval[A]): Task[A] = - Task.fromTry(coeval.runTry()) - } - - implicit class TaskOps[A](private val io: Task[A]) extends AnyVal { - def toTask: UIO[eval.Task[A]] = - io.fold(eval.Task.raiseError, eval.Task.now) - - def toCoeval: UIO[eval.Coeval[A]] = - io.fold(eval.Coeval.raiseError, eval.Coeval.now) - } -} diff --git a/interop-monix/shared/src/main/scala/zio/interop/monix/package.scala b/interop-monix/shared/src/main/scala/zio/interop/monix/package.scala new file mode 100644 index 0000000..36ff114 --- /dev/null +++ b/interop-monix/shared/src/main/scala/zio/interop/monix/package.scala @@ -0,0 +1,109 @@ +package zio +package interop + +import _root_.monix.eval.{ Task => MTask } +import _root_.monix.execution.{ Scheduler => MScheduler, ExecutionModel } + +/** + * Monix interoperability for ZIO. + * + * - Monix tasks can be converted to ZIO tasks via + * `ZIO.fromMonixTask(monixTask)`. + * - ZIO tasks can be converted to Monix tasks via `zioTask.toMonixTask`. + */ +package object monix { + + implicit final class ZIORuntimeOps[R](private val runtime: Runtime[R]) extends AnyVal { + + /** + * Creates a Monix scheduler that shares its execution context with this ZIO + * runtime. + */ + def monixScheduler(executionModel: ExecutionModel = ExecutionModel.Default): MScheduler = + MScheduler(runtime.platform.executor.asEC, executionModel) + + } + + implicit final class ZIOObjOps(private val unused: ZIO.type) extends AnyVal { + + /** + * Converts a Monix task into a ZIO task. + * + * Interrupting the returned effect will cancel the underlying Monix task. + * The conversion is lazy: the Monix task is only executed if the returned + * ZIO task is executed. + * + * If the returned ZIO task is interrupted, the underlying Monix task will be + * cancelled. + * + * @param monixTask + * The Monix task. + * @param executionModel + * The Monix execution model to use for the Monix execution. This only + * need be specified if you want to override the Monix default. + */ + def fromMonixTask[A](monixTask: MTask[A], executionModel: ExecutionModel = ExecutionModel.Default): Task[A] = + Task.runtime.flatMap { zioRuntime => + Task.effectAsyncInterrupt[A] { cb => + implicit val scheduler: MScheduler = zioRuntime.monixScheduler(executionModel) + try + // runSyncStep will try to execute the Monix effects synchronously + // if it fails before hitting an async boundary, the failure will be thrown + monixTask.runSyncStep match { + case Right(result) => + // Monix task ran synchronously and successfully + Right(ZIO.succeedNow(result)) + case Left(asyncMonixTask) => + // Monix task hit an async boundary, so we have to use the callback (cb) + // and return a cancelation effect + val cancelable = asyncMonixTask.runAsync { result => + val zioEffect = ZIO.fromEither(result) + cb(zioEffect) + } + Left(ZIO.succeed(cancelable.cancel())) + } catch { + // Monix task failed during synchronous execution + case e: Throwable => Right(ZIO.fail(e)) + } + } + } + + } + + implicit final class ExtraZioEffectOps[-R, +A](private val effect: ZIO[R, Throwable, A]) extends AnyVal { + + /** + * Converts this ZIO effect into a Monix task. + * + * The conversion is lazy: this effect will only be executed if the returned Monix task + * is executed. + * If the returned Monix task is cancelled, the underlying ZIO effect will + * be interrupted. + */ + def toMonixTask: URIO[R, MTask[A]] = ZIO.runtime[R].map(toMonixTaskUsingRuntime) + + /** + * Converts this ZIO effect into a Monix task using a specified ZIO runtime + * to run the ZIO effect. + * + * This is useful in situations where a Monix task is needed, but executing ZIO effects + * is incovenient. For example, when using a library API that requires you to pass a + * function that returns a Monix task. In such situations, you can acquire the ZIO + * runtime up-front and use this method to create the Monix task. + * + * The conversion is lazy: this effect will only be executed if the returned Monix task + * is executed. + * If the returned Monix task is cancelled, the underlying ZIO effect will + * be interrupted. + */ + def toMonixTaskUsingRuntime(zioRuntime: Runtime[R]): MTask[A] = + MTask.cancelable { cb => + val cancelable = zioRuntime.unsafeRunAsyncCancelable(effect) { exit => + exit.fold(failed => cb.onError(failed.squash), cb.onSuccess) + } + MTask.eval(cancelable(Fiber.Id.None)).void + } + + } + +} diff --git a/interop-monix/shared/src/test/scala/zio/interop/MonixSpec.scala b/interop-monix/shared/src/test/scala/zio/interop/MonixSpec.scala deleted file mode 100644 index 7dbc56a..0000000 --- a/interop-monix/shared/src/test/scala/zio/interop/MonixSpec.scala +++ /dev/null @@ -1,86 +0,0 @@ -package zio.interop - -import _root_.monix.eval -import _root_.monix.execution.Scheduler -import zio.interop.monix._ -import zio.test.Assertion._ -import zio.test._ -import zio._ - -object MonixSpec extends DefaultRunnableSpec { - - implicit val scheduler: Scheduler = Scheduler(runner.platform.executor.asEC) - - def spec = - suite("MonixSpec")( - suite("IO.fromTask")( - testM("return an `IO` that fails if `Task` failed.") { - val error = new Exception - val task = eval.Task.raiseError[Int](error) - val io = IO.fromTask(task).unit - - assertM(io.either)(isLeft(equalTo(error))) - }, - testM("return an `IO` that produces the value from `Task`.") { - val value = 10 - val task = eval.Task(value) - val io = IO.fromTask(task)(scheduler) - - assertM(io.either)(isRight(equalTo(value))) - } - ), - suite("IO.toTask")( - testM("produce a successful `IO` of `Task`.") { - val task = IO.fail(new Exception).toTask - assertM(task.map(_.isInstanceOf[eval.Task[Unit]]))(isTrue) - }, - testM("returns a `Task` that fails if `IO` fails.") { - val error = new Exception - val task = IO.fail(error).toTask - - assertM(task)(equalTo(eval.Task.raiseError(error))) - }, - testM("returns a `Task` that produces the value from `IO`.") { - for { - value <- UIO(10) - task <- UIO(value).toTask - result <- ZIO.fromFuture(_ => task.runToFuture) - } yield assert(result)(equalTo(value)) - } - ), - suite("IO.fromCoeval")( - testM("return an `IO` that fails if `Coeval` failed") { - val error = new Exception - val coeval = eval.Coeval.raiseError[Int](error) - val io = IO.fromCoeval(coeval).unit - - assertM(io.either)(isLeft(equalTo(error))) - }, - testM("return an `IO` that produces the value from `Coeval`.") { - val value = 10 - val coeval = eval.Coeval(value) - val io = IO.fromCoeval(coeval) - - assertM(io.either)(isRight(equalTo(value))) - } - ), - suite("IO.toCoeval")( - testM("produce a successful `IO` of `Coeval`") { - val task = IO.fail(new Exception).toCoeval - assertM(task.map(_.isInstanceOf[eval.Coeval[Unit]]))(isTrue) - }, - testM("returns a `Coeval` that fails if `IO` fails.") { - val error = new Exception - val coeval = IO.fail(error).toCoeval - - assertM(coeval)(equalTo(eval.Coeval.raiseError(error))) - }, - testM("returns a `Coeval` that produces the value from `IO`.") { - val value = 10 - val coeval = UIO(value).toCoeval.map(_.runTry()) - - assertM(coeval)(isSuccess(equalTo(value))) - } - ) - ) -} diff --git a/interop-monix/shared/src/test/scala/zio/interop/monix/MonixTaskSpec.scala b/interop-monix/shared/src/test/scala/zio/interop/monix/MonixTaskSpec.scala new file mode 100644 index 0000000..0198ef5 --- /dev/null +++ b/interop-monix/shared/src/test/scala/zio/interop/monix/MonixTaskSpec.scala @@ -0,0 +1,162 @@ +package zio +package interop.monix + +import duration._ + +import test._ +import Assertion._ +import TestAspect._ + +import _root_.monix.eval.{ Task => MTask } + +object MonixTaskSpec extends DefaultRunnableSpec { + + override def spec = + suite("MonixTaskSpec")( + suite("ZIO.fromMonixTask")( + testM("return a ZIO that fails if Monix task failed") { + val error = new Exception("monix task failed") + val monixTask = MTask.raiseError(error) + val io = ZIO.fromMonixTask(monixTask) + + assertM(io.either)(isLeft(equalTo(error))) + }, + testM("return a ZIO that succeeds if Monix task succeeded") { + val result = "monix task result" + val monixTask = MTask.now(result) + val io = ZIO.fromMonixTask(monixTask) + + assertM(io)(equalTo(result)) + }, + testM("converts a synchronous Monix task to a ZIO task") { + @volatile var testVar = 0 + val monixTask = MTask.eval { + val old = testVar + testVar = old + 1 + old + } + val io = for { + orig <- ZIO.fromMonixTask(monixTask) + current <- UIO(testVar) + } yield ((orig, current)) + assertM(io)(equalTo((0, 1))) + }, + testM("converts an asynchronous Monix task to a ZIO task") { + @volatile var testVar = 0 + val monixTask = MTask.async[Int] { cb => + val old = testVar + testVar = old + 1 + cb.onSuccess(old) + } + val io = for { + orig <- ZIO.fromMonixTask(monixTask) + current <- UIO(testVar) + } yield ((orig, current)) + assertM(io)(equalTo((0, 1))) + }, + testM("the ZIO task fails if a Monix async task fails") { + val error = new Exception("async monix failure") + val monixTask = MTask.async[Int] { cb => + cb.onError(error) + } + val io = ZIO.fromMonixTask(monixTask) + assertM(io.either)(isLeft(equalTo(error))) + }, + testM("lazily executes a converted Monix task") { + @volatile var executed = false + val monixTask = MTask.eval { + executed = true + } + val test = ZIO.effectTotal { + executed = false + } *> { + ZIO.fromMonixTask(monixTask) + UIO(executed) + } + assertM(test)(isFalse) + }, + testM("propagates cancellation from ZIO to Monix") { + @volatile var cancelled = false + @volatile var running = false + val monixTask = MTask.eval { + running = true + } >> MTask.never.doOnCancel { + MTask.eval { + cancelled = true + } + } + for { + _ <- ZIO.effectTotal { + cancelled = false + running = false + } + fiber <- ZIO.fromMonixTask(monixTask).fork + // wait until the monix task is running before interrupting the fiber + _ <- UIO(running).repeatUntil(Predef.identity) + exit <- fiber.interrupt + wasCancelled <- UIO(cancelled) + } yield assertTrue(wasCancelled) && assertTrue(exit.interrupted) + } @@ nonFlaky + ), + suite("ZIO#toMonixTask")( + testM("converts a successful ZIO task to a Monix task") { + @volatile var testVar = 0 + val io = ZIO.effectTotal { + val old = testVar + testVar = old + 1 + old + } + for { + monixTask <- io.toMonixTask + result <- ZIO.fromMonixTask(monixTask) + current <- ZIO.succeed(testVar) + } yield assertTrue(result == 0) && assertTrue(current == result + 1) + }, + testM("converts a failed ZIO task to a failed Monix task") { + val error = new Exception("ZIO operation failed") + val io = ZIO.fail(error) + val test = io.toMonixTask.flatMap[Any, Throwable, Nothing](ZIO.fromMonixTask(_)).either + assertM(test)(isLeft(equalTo(error))) + }, + testM("propagates cancellation from Monix to ZIO") { + @volatile var cancelled = false + @volatile var running = false + val io = ZIO.effectTotal { + running = true + } *> ZIO.never.onInterrupt { + ZIO.effectTotal { + cancelled = true + } + } + val test = for { + monixTask <- io.toMonixTask + _ <- ZIO.fromMonixTask { + for { + fiber <- monixTask.start + _ <- MTask.eval(running).restartUntil(Predef.identity) + _ <- fiber.cancel + } yield () + } + // Monix `fiber.cancel` doesn't wait for all the cancellation effects + // to complete, so `cancelled` might not be true at first + wasCancelled <- UIO(cancelled).repeatUntil(Predef.identity) + } yield wasCancelled + assertM(test)(isTrue) + } @@ jvmOnly @@ timeout(5.seconds), + testM("only executes the ZIO effect if the Monix task is executed") { + @volatile var executed: Boolean = false + val io = ZIO.effectTotal { + executed = true + } + val test = for { + _ <- ZIO.effectTotal { + executed = false + } + _ <- io.toMonixTask + result <- UIO(executed) + } yield result + assertM(test)(isFalse) + } + ) + ) +}