Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PubSubSinkTask.flush should flush tracking for all input partitions and only those partitions #306

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

jordanbull
Copy link
Contributor

The existing behavior will only clear tracking for the first partition that throws an exception in flush. In cases where the partitions complete successfully, it will also clear tracking for partitions that it was not asked to flush. Both of these are not the expected behavior when implementing the interface. This pull requests fixes these two behaviors and adds tests that catch them in the old code and pass with the changes.

ApiFuture<String> badFuture = getFailedPublishFuture();
when(publisher.publish(any(PubsubMessage.class))).thenReturn(badFuture);
task.put(records);
assertThrows(RuntimeException.class, () -> task.flush(partitionOffsets));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use assertThrows instead of expected so the following verifies are actually reached.

Removal of partition map from allOutstandingFutures was not needed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants