From cbfd902b77ccdf77482981aceb28d3ebabafdb49 Mon Sep 17 00:00:00 2001 From: Jordan Bull Date: Wed, 16 Jun 2021 16:27:23 -0700 Subject: [PATCH] Sink Connector should publish all outstanding messages on flush --- .../java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java | 2 ++ .../com/google/pubsub/kafka/sink/CloudPubSubSinkTaskTest.java | 3 +++ 2 files changed, 5 insertions(+) diff --git a/kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java b/kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java index a8cddec3..41f73e5c 100644 --- a/kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java +++ b/kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java @@ -334,6 +334,8 @@ private ByteString handleValue(Schema schema, Object value, Map @Override public void flush(Map partitionOffsets) { + // Publish the incomplete batch now instead of waiting for the maxDelayThresholdMs + publisher.publishAllOutstanding(); log.debug("Flushing..."); // Process results of all the outstanding futures specified by each TopicPartition. for (Map.Entry partitionOffset : diff --git a/kafka-connector/src/test/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTaskTest.java b/kafka-connector/src/test/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTaskTest.java index c8679846..8d20c727 100644 --- a/kafka-connector/src/test/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTaskTest.java +++ b/kafka-connector/src/test/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTaskTest.java @@ -354,6 +354,7 @@ public void testFlushWithNoPublishInPut() throws Exception { when(publisher.publish(any(PubsubMessage.class))).thenReturn(goodFuture); task.put(records); task.flush(partitionOffsets); + verify(publisher, times(1)).publishAllOutstanding(); verify(publisher, times(2)).publish(any(PubsubMessage.class)); verify(goodFuture, times(2)).addListener(any(Runnable.class), any(Executor.class)); } @@ -372,6 +373,7 @@ public void testFlushExceptionCase() throws Exception { when(publisher.publish(any(PubsubMessage.class))).thenReturn(badFuture); task.put(records); task.flush(partitionOffsets); + verify(publisher, times(1)).publishAllOutstanding(); verify(publisher, times(1)).publish(any(PubsubMessage.class)); verify(badFuture, times(1)).addListener(any(Runnable.class), any(Executor.class)); } @@ -602,6 +604,7 @@ public void testFlushExceptionThenNoExceptionCase() throws Exception { records = getSampleRecords(); task.put(records); task.flush(partitionOffsets); + verify(publisher, times(2)).publishAllOutstanding(); verify(publisher, times(4)).publish(any(PubsubMessage.class)); verify(badFuture, times(2)).addListener(any(Runnable.class), any(Executor.class)); verify(goodFuture, times(2)).addListener(any(Runnable.class), any(Executor.class));