From d0685a7c226183d87f1ceb2d5965d5bab256d97b Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Thu, 10 Oct 2024 13:21:31 +0100 Subject: [PATCH] Pubsub source v2 request zero messages when blocked --- .../v2/PubsubSourceConfigV2.scala | 5 +--- .../v2/PubsubSourceV2.scala | 29 +++++-------------- 2 files changed, 8 insertions(+), 26 deletions(-) diff --git a/modules/gcp/src/main/scala/common-streams-extensions/v2/PubsubSourceConfigV2.scala b/modules/gcp/src/main/scala/common-streams-extensions/v2/PubsubSourceConfigV2.scala index 700e61a3..2bade5e9 100644 --- a/modules/gcp/src/main/scala/common-streams-extensions/v2/PubsubSourceConfigV2.scala +++ b/modules/gcp/src/main/scala/common-streams-extensions/v2/PubsubSourceConfigV2.scala @@ -24,10 +24,7 @@ case class PubsubSourceConfigV2( minRemainingDeadline: Double, gcpUserAgent: GcpUserAgent, maxPullsPerTransportChannel: Int, - progressTimeout: FiniteDuration, - modackOnProgressTimeout: Boolean, - cancelOnProgressTimeout: Boolean, - consistentClientId: Boolean + progressTimeout: FiniteDuration ) object PubsubSourceConfigV2 { diff --git a/modules/gcp/src/main/scala/common-streams-extensions/v2/PubsubSourceV2.scala b/modules/gcp/src/main/scala/common-streams-extensions/v2/PubsubSourceV2.scala index 2adef3cc..10ac0403 100644 --- a/modules/gcp/src/main/scala/common-streams-extensions/v2/PubsubSourceV2.scala +++ b/modules/gcp/src/main/scala/common-streams-extensions/v2/PubsubSourceV2.scala @@ -34,7 +34,7 @@ import com.snowplowanalytics.snowplow.pubsub.GcpUserAgent import com.snowplowanalytics.snowplow.sources.SourceAndAck import com.snowplowanalytics.snowplow.sources.internal.{Checkpointer, LowLevelEvents, LowLevelSource} -import scala.concurrent.duration.{Duration, DurationDouble, FiniteDuration} +import scala.concurrent.duration.{DurationDouble, FiniteDuration} import scala.jdk.CollectionConverters._ import java.util.concurrent.{ExecutorService, Executors, LinkedBlockingQueue} @@ -92,7 +92,7 @@ object PubsubSourceV2 { (hotswap, _) <- Stream.resource(Hotswap(resource)) fs2Queue <- Stream.eval(Queue.synchronous[F, SubscriberAction]) _ <- extendDeadlines(config, stub, refStates, channelAffinity).spawn - _ <- Stream.eval(queueToQueue(config, jQueue, fs2Queue, stub, channelAffinity)).repeat.spawn + _ <- Stream.eval(queueToQueue(config, jQueue, fs2Queue)).repeat.spawn lle <- Stream .fromQueueUnterminated(fs2Queue) .through(toLowLevelEvents(config, refStates, hotswap, resource, channelAffinity)) @@ -102,27 +102,12 @@ object PubsubSourceV2 { private def queueToQueue[F[_]: Async]( config: PubsubSourceConfigV2, jQueue: LinkedBlockingQueue[SubscriberAction], - fs2Queue: QueueSink[F, SubscriberAction], - stub: SubscriberStub, - channelAffinity: Int + fs2Queue: QueueSink[F, SubscriberAction] ): F[Unit] = resolveNextAction(jQueue).flatMap { - case action @ SubscriberAction.ProcessRecords(records, controller, _) => - val fallback = if (config.modackOnProgressTimeout) { - val ackIds = records.map(_.getAckId) - if (config.cancelOnProgressTimeout) - Logger[F].debug(s"Cancelling Pubsub channel $channelAffinity for not making progress") *> - Sync[F].delay(controller.cancel()) *> Utils.modAck(config.subscription, stub, ackIds, Duration.Zero, channelAffinity) - else - Logger[F].debug(s"Nacking on Pubsub channel $channelAffinity for not making progress") *> - Sync[F].delay(controller.request(1)) *> Utils.modAck(config.subscription, stub, ackIds, Duration.Zero, channelAffinity) - } else { - if (config.cancelOnProgressTimeout) - Logger[F].debug(s"Cancelling Pubsub channel $channelAffinity for not making progress") *> - Sync[F].delay(controller.cancel()) *> fs2Queue.offer(action) - else - fs2Queue.offer(action) - } + case action @ SubscriberAction.ProcessRecords(_, controller, _) => + def fallback: F[Unit] = + Sync[F].delay(controller.request(0)) >> fs2Queue.offer(action).timeoutTo(config.progressTimeout, fallback) fs2Queue.offer(action).timeoutTo(config.progressTimeout, fallback) case action: SubscriberAction.SubscriberError => fs2Queue.offer(action) @@ -314,7 +299,7 @@ object PubsubSourceV2 { val request = StreamingPullRequest.newBuilder .setSubscription(config.subscription.show) .setStreamAckDeadlineSeconds(config.durationPerAckExtension.toSeconds.toInt) - .setClientId(if (config.consistentClientId) clientId.toString else UUID.randomUUID.toString) + .setClientId(clientId.toString) .setMaxOutstandingMessages(0) .setMaxOutstandingBytes(0) .build