Skip to content

Commit

Permalink
Skip closing streaming pull RPC
Browse files Browse the repository at this point in the history
  • Loading branch information
istreeter committed Oct 13, 2024
1 parent 4262696 commit a1c1e21
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -182,13 +182,13 @@ private class SourceCoordinator[F[_]: Async, A] private (
Sync[F].pure(Status.AwaitingConsumer(controller, buffered, lastFetch) -> nextTimeout)
}
}
case Status.AwaitingConsumer(controller, buffered, _) =>
case Status.AwaitingConsumer(_, buffered, _) =>
for {
_ <-
Logger[F].info(
s"Dropping ${buffered.size} buffered batches from Streaming Pull $channelAffinity. Exceeded ${config.prefetchMax} pre-fetches while attempting to keep the stream alive."
)
_ <- Sync[F].delay(controller.cancel())
_ <- streamManager.stop
now <- Sync[F].realTime
// This does IO, which is not ideal inside a `evalModify` block,
// but on balance it is ok here because we are handling an edge-case error scenario
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import com.google.api.gax.grpc.GrpcCallContext
import com.google.api.gax.rpc.{ResponseObserver, StreamController}
import com.google.pubsub.v1.{StreamingPullRequest, StreamingPullResponse}
import com.google.cloud.pubsub.v1.stub.SubscriberStub
import io.grpc.Status
//import io.grpc.Status

import scala.jdk.CollectionConverters._

Expand Down Expand Up @@ -95,8 +95,9 @@ private object StreamManager {
.build

Resource
.make(Sync[F].delay(subStub.streamingPullCallable.splitCall(observer, context))) { stream =>
Sync[F].delay(stream.closeSendWithError(Status.CANCELLED.asException))
.make(Sync[F].delay(subStub.streamingPullCallable.splitCall(observer, context))) { _ =>
// Sync[F].delay(stream.closeSendWithError(Status.CANCELLED.asException))
Sync[F].unit
}
.evalMap { stream =>
Sync[F].delay(stream.send(request))
Expand Down

0 comments on commit a1c1e21

Please sign in to comment.