diff --git a/src/main/scala/zio/interop/reactivestreams/Adapters.scala b/src/main/scala/zio/interop/reactivestreams/Adapters.scala index 6eb22ef..7860f9c 100644 --- a/src/main/scala/zio/interop/reactivestreams/Adapters.scala +++ b/src/main/scala/zio/interop/reactivestreams/Adapters.scala @@ -41,8 +41,8 @@ object Adapters { error <- Promise.makeManaged[E, Nothing] subscription = new DemandTrackingSubscription(sub) _ <- ZManaged.succeed(sub.onSubscribe(subscription)) - _ <- error.await.catchAll(t => UIO(sub.onError(t))).toManaged.fork - } yield (error.fail(_).unit, demandUnfoldSink(sub, subscription)) + fiber <- error.await.catchAll(t => UIO(sub.onError(t))).toManaged.fork + } yield (error.fail(_) *> fiber.join, demandUnfoldSink(sub, subscription)) } def publisherToStream[O]( diff --git a/src/test/scala/zio/interop/reactivestreams/SubscriberToSinkSpec.scala b/src/test/scala/zio/interop/reactivestreams/SubscriberToSinkSpec.scala index 95b73a6..04c0115 100644 --- a/src/test/scala/zio/interop/reactivestreams/SubscriberToSinkSpec.scala +++ b/src/test/scala/zio/interop/reactivestreams/SubscriberToSinkSpec.scala @@ -2,14 +2,11 @@ package zio.interop.reactivestreams import org.reactivestreams.tck.TestEnvironment import org.reactivestreams.tck.TestEnvironment.ManualSubscriberWithSubscriptionSupport -import zio.durationInt -import zio.IO -import zio.Task -import zio.UIO -import zio.stream.Stream -import zio.stream.ZStream +import zio.stream.{ Stream, ZStream } import zio.test.Assertion._ +import zio.test.TestAspect.nonFlaky import zio.test._ +import zio.{ IO, Task, UIO, durationInt } import scala.jdk.CollectionConverters._ @@ -58,6 +55,20 @@ object SubscriberToSinkSpec extends DefaultRunnableSpec { } ) }, + test("transports errors 3") { + makeSubscriber.flatMap { probe => + for { + fiber <- probe.underlying + .toSink[Throwable] + .use { case (signalError, sink) => + ZStream.fail(e).run(sink).catchAll(signalError) + } + .fork + _ <- fiber.join + err <- probe.expectError.exit + } yield assert(err)(succeeds(equalTo(e))) + } + } @@ nonFlaky(10), test("transports errors only once") { makeSubscriber.flatMap(probe => probe.underlying