diff --git a/modules/gcp/src/main/scala/common-streams-extensions/v2/SourceCoordinator.scala b/modules/gcp/src/main/scala/common-streams-extensions/v2/SourceCoordinator.scala index 7d63ddb3..f30bca8e 100644 --- a/modules/gcp/src/main/scala/common-streams-extensions/v2/SourceCoordinator.scala +++ b/modules/gcp/src/main/scala/common-streams-extensions/v2/SourceCoordinator.scala @@ -182,13 +182,13 @@ private class SourceCoordinator[F[_]: Async, A] private ( Sync[F].pure(Status.AwaitingConsumer(controller, buffered, lastFetch) -> nextTimeout) } } - case Status.AwaitingConsumer(controller, buffered, _) => + case Status.AwaitingConsumer(_, buffered, _) => for { _ <- Logger[F].info( s"Dropping ${buffered.size} buffered batches from Streaming Pull $channelAffinity. Exceeded ${config.prefetchMax} pre-fetches while attempting to keep the stream alive." ) - _ <- Sync[F].delay(controller.cancel()) + _ <- streamManager.stop now <- Sync[F].realTime // This does IO, which is not ideal inside a `evalModify` block, // but on balance it is ok here because we are handling an edge-case error scenario diff --git a/modules/gcp/src/main/scala/common-streams-extensions/v2/StreamManager.scala b/modules/gcp/src/main/scala/common-streams-extensions/v2/StreamManager.scala index f955050b..b4cad178 100644 --- a/modules/gcp/src/main/scala/common-streams-extensions/v2/StreamManager.scala +++ b/modules/gcp/src/main/scala/common-streams-extensions/v2/StreamManager.scala @@ -17,7 +17,7 @@ import com.google.api.gax.grpc.GrpcCallContext import com.google.api.gax.rpc.{ResponseObserver, StreamController} import com.google.pubsub.v1.{StreamingPullRequest, StreamingPullResponse} import com.google.cloud.pubsub.v1.stub.SubscriberStub -import io.grpc.Status +//import io.grpc.Status import scala.jdk.CollectionConverters._ @@ -95,8 +95,9 @@ private object StreamManager { .build Resource - .make(Sync[F].delay(subStub.streamingPullCallable.splitCall(observer, context))) { stream => - Sync[F].delay(stream.closeSendWithError(Status.CANCELLED.asException)) + .make(Sync[F].delay(subStub.streamingPullCallable.splitCall(observer, context))) { _ => + // Sync[F].delay(stream.closeSendWithError(Status.CANCELLED.asException)) + Sync[F].unit } .evalMap { stream => Sync[F].delay(stream.send(request))