From 8184636027726f76719395f65530c09a290b3120 Mon Sep 17 00:00:00 2001 From: Jordan Bull Date: Mon, 27 Dec 2021 12:45:32 -0800 Subject: [PATCH 1/3] PubSubSinkTask.flush should flush tracking for all input partitions and only those partitions --- .../kafka/sink/CloudPubSubSinkTask.java | 18 +++-- .../kafka/sink/CloudPubSubSinkTaskTest.java | 72 ++++++++++++++++--- 2 files changed, 75 insertions(+), 15 deletions(-) diff --git a/kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java b/kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java index d3421241..ea25740e 100644 --- a/kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java +++ b/kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java @@ -347,6 +347,7 @@ private ByteString handleValue(Schema schema, Object value, Map @Override public void flush(Map partitionOffsets) { log.debug("Flushing..."); + RuntimeException maybeException = null; // Process results of all the outstanding futures specified by each TopicPartition. for (Map.Entry partitionOffset : partitionOffsets.entrySet()) { @@ -356,20 +357,29 @@ public void flush(Map partitionOffsets) { if (outstandingFuturesForTopic == null) { continue; } + int partition = partitionOffset.getKey().partition(); OutstandingFuturesForPartition outstandingFutures = - outstandingFuturesForTopic.get(partitionOffset.getKey().partition()); + outstandingFuturesForTopic.get(partition); if (outstandingFutures == null) { continue; } try { - ApiFutures.allAsList(outstandingFutures.futures).get(); + // Only wait for partition to complete if the clush hasn't already failed. + if (maybeException == null) { + ApiFutures.allAsList(outstandingFutures.futures).get(); + } } catch (Exception e) { - throw new RuntimeException(e); + maybeException = new RuntimeException(e); } finally { + // Always clear tracking for flushed partitions outstandingFutures.futures.clear(); + allOutstandingFutures.remove(partition); } } - allOutstandingFutures.clear(); + if (maybeException != null) { + // If any partitions had an exceptional future, throw after all cleanup + throw maybeException; + } } /** Publish all the messages in a partition and store the Future's for each publish request. */ diff --git a/kafka-connector/src/test/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTaskTest.java b/kafka-connector/src/test/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTaskTest.java index c8679846..89369782 100644 --- a/kafka-connector/src/test/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTaskTest.java +++ b/kafka-connector/src/test/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTaskTest.java @@ -16,6 +16,7 @@ package com.google.pubsub.kafka.sink; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; import static org.mockito.Matchers.any; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.mock; @@ -25,6 +26,7 @@ import static org.mockito.Mockito.when; import com.google.api.core.ApiFuture; +import com.google.api.core.SettableApiFuture; import com.google.cloud.pubsub.v1.Publisher; import com.google.protobuf.ByteString; import com.google.pubsub.kafka.common.ConnectorUtils; @@ -281,7 +283,7 @@ public void testNullSchema() { public void testPutWherePublishesAreInvoked() { props.put(CloudPubSubSinkConnector.MAX_BUFFER_SIZE_CONFIG, CPS_MIN_BATCH_SIZE1); task.start(props); - List records = getSampleRecords(); + List records = getSampleRecords(0); task.put(records); ArgumentCaptor captor = ArgumentCaptor.forClass(PubsubMessage.class); verify(publisher, times(2)).publish(captor.capture()); @@ -349,7 +351,7 @@ public void testFlushWithNoPublishInPut() throws Exception { task.start(props); Map partitionOffsets = new HashMap<>(); partitionOffsets.put(new TopicPartition(KAFKA_TOPIC, 0), null); - List records = getSampleRecords(); + List records = getSampleRecords(0); ApiFuture goodFuture = getSuccessfulPublishFuture(); when(publisher.publish(any(PubsubMessage.class))).thenReturn(goodFuture); task.put(records); @@ -362,18 +364,66 @@ public void testFlushWithNoPublishInPut() throws Exception { * Tests that if a Future that is being processed in flush() failed with an exception, that an * exception is thrown. */ - @Test(expected = RuntimeException.class) + @Test public void testFlushExceptionCase() throws Exception { task.start(props); Map partitionOffsets = new HashMap<>(); partitionOffsets.put(new TopicPartition(KAFKA_TOPIC, 0), null); - List records = getSampleRecords(); + List records = getSampleRecords(0); ApiFuture badFuture = getFailedPublishFuture(); when(publisher.publish(any(PubsubMessage.class))).thenReturn(badFuture); task.put(records); + assertThrows(RuntimeException.class, () -> task.flush(partitionOffsets)); + verify(publisher, times(2)).publish(any(PubsubMessage.class)); + verify(badFuture, times(2)).addListener(any(Runnable.class), any(Executor.class)); + } + + /** + * Tests that flush clears tracking of all partitions it is invoked for even during exceptions and + * propagates an exception. + */ + @Test + public void testFlushMultiPartitionExceptionCase() throws Exception { + task.start(props); + Map partitionOffsets = new HashMap<>(); + partitionOffsets.put(new TopicPartition(KAFKA_TOPIC, 0), null); + partitionOffsets.put(new TopicPartition(KAFKA_TOPIC, 1), null); + List recordsP0 = getSampleRecords(0); + List recordsP1 = getSampleRecords(1); + when(publisher.publish(any(PubsubMessage.class))).thenReturn(getFailedPublishFuture()); + task.put(recordsP0); + task.put(recordsP1); + assertThrows(RuntimeException.class, () -> task.flush(partitionOffsets)); + verify(publisher, times(4)).publish(any(PubsubMessage.class)); + // The second flush should not throw an exception since all tracking was cleared task.flush(partitionOffsets); - verify(publisher, times(1)).publish(any(PubsubMessage.class)); - verify(badFuture, times(1)).addListener(any(Runnable.class), any(Executor.class)); + } + + /** + * Tests that flush clears tracking of only the partitions it is invoked for so it cannot be + * considered complete for a subsequent flush on another partition that may have failed. + */ + @Test + public void testFlushOneOfMultiplePartitionsCase() throws Exception { + task.start(props); + Map partitionOffsetsP0 = new HashMap<>(); + partitionOffsetsP0.put(new TopicPartition(KAFKA_TOPIC, 0), null); + Map partitionOffsetsP1 = new HashMap<>(); + partitionOffsetsP1.put(new TopicPartition(KAFKA_TOPIC, 1), null); + List recordsP0 = getSampleRecords(0); + List recordsP1 = getSampleRecords(1); + when(publisher.publish(any(PubsubMessage.class))) + .thenReturn(getSuccessfulPublishFuture()) + .thenReturn(getSuccessfulPublishFuture()) + .thenReturn(getFailedPublishFuture()) + .thenReturn(getFailedPublishFuture()); + task.put(recordsP0); + task.put(recordsP1); + verify(publisher, times(4)).publish(any(PubsubMessage.class)); + + task.flush(partitionOffsetsP0); + // The second flush should not complete successfully + assertThrows(RuntimeException.class, () -> task.flush(partitionOffsetsP1)); } /** @@ -590,7 +640,7 @@ public void testFlushExceptionThenNoExceptionCase() throws Exception { task.start(props); Map partitionOffsets = new HashMap<>(); partitionOffsets.put(new TopicPartition(KAFKA_TOPIC, 0), null); - List records = getSampleRecords(); + List records = getSampleRecords(0); ApiFuture badFuture = getFailedPublishFuture(); ApiFuture goodFuture = getSuccessfulPublishFuture(); when(publisher.publish(any(PubsubMessage.class))).thenReturn(badFuture).thenReturn(badFuture).thenReturn(goodFuture); @@ -599,7 +649,7 @@ public void testFlushExceptionThenNoExceptionCase() throws Exception { task.flush(partitionOffsets); } catch (RuntimeException e) { } - records = getSampleRecords(); + records = getSampleRecords(0); task.put(records); task.flush(partitionOffsets); verify(publisher, times(4)).publish(any(PubsubMessage.class)); @@ -620,12 +670,12 @@ public void testPublisherShutdownOnStop() throws Exception { } /** Get some sample SinkRecords's to use in the tests. */ - private List getSampleRecords() { + private List getSampleRecords(int partition) { List records = new ArrayList<>(); records.add( new SinkRecord( KAFKA_TOPIC, - 0, + partition, STRING_SCHEMA, KAFKA_MESSAGE_KEY1, BYTE_STRING_SCHEMA, @@ -634,7 +684,7 @@ private List getSampleRecords() { records.add( new SinkRecord( KAFKA_TOPIC, - 0, + partition, STRING_SCHEMA, KAFKA_MESSAGE_KEY1, BYTE_STRING_SCHEMA, From b45a8ec6f551106fb04320194ccc4dd65d56defa Mon Sep 17 00:00:00 2001 From: Jordan Bull Date: Tue, 28 Dec 2021 10:31:37 -0800 Subject: [PATCH 2/3] Update CloudPubSubSinkTask.java fix typo --- .../java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java b/kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java index ea25740e..87505501 100644 --- a/kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java +++ b/kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java @@ -364,7 +364,7 @@ public void flush(Map partitionOffsets) { continue; } try { - // Only wait for partition to complete if the clush hasn't already failed. + // Only wait for partition to complete if the flush hasn't already failed. if (maybeException == null) { ApiFutures.allAsList(outstandingFutures.futures).get(); } From 7f2b5daa4d4d39407f3a89d5903dafde3ccb6fda Mon Sep 17 00:00:00 2001 From: Jordan Bull Date: Thu, 6 Jan 2022 10:54:40 -0800 Subject: [PATCH 3/3] Update CloudPubSubSinkTask.java Removal of partition map from allOutstandingFutures was not needed --- .../java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java | 1 - 1 file changed, 1 deletion(-) diff --git a/kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java b/kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java index 87505501..fca44973 100644 --- a/kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java +++ b/kafka-connector/src/main/java/com/google/pubsub/kafka/sink/CloudPubSubSinkTask.java @@ -373,7 +373,6 @@ public void flush(Map partitionOffsets) { } finally { // Always clear tracking for flushed partitions outstandingFutures.futures.clear(); - allOutstandingFutures.remove(partition); } } if (maybeException != null) {