Skip to content

Commit

Permalink
Merge pull request #356 from matt-kwong/flow
Browse files Browse the repository at this point in the history
Add flow control setting to source builder
  • Loading branch information
hannahrogers-google authored Mar 8, 2024
2 parents fa56435 + eb78c59 commit ba417fc
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
*/
package com.google.pubsub.flink;

import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.auth.Credentials;
import com.google.auto.value.AutoValue;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.pubsub.flink.internal.source.enumerator.PubSubCheckpointSerializer;
import com.google.pubsub.flink.internal.source.enumerator.PubSubSplitEnumerator;
import com.google.pubsub.flink.internal.source.reader.AckTracker;
Expand Down Expand Up @@ -57,6 +59,10 @@ public abstract class PubSubSource<OutputT>

public abstract PubSubDeserializationSchema<OutputT> deserializationSchema();

public abstract Optional<Long> maxOutstandingMessagesCount();

public abstract Optional<Long> maxOutstandingMessagesBytes();

public abstract Optional<Credentials> credentials();

public static <OutputT> Builder<OutputT> builder() {
Expand All @@ -67,6 +73,12 @@ Subscriber createSubscriber(MessageReceiver receiver) {
Subscriber.Builder builder =
Subscriber.newBuilder(
ProjectSubscriptionName.of(projectName(), subscriptionName()).toString(), receiver);
builder.setFlowControlSettings(
FlowControlSettings.newBuilder()
.setMaxOutstandingElementCount(maxOutstandingMessagesCount().or(1000L))
.setMaxOutstandingRequestBytes(
maxOutstandingMessagesBytes().or(100L * 1024L * 1024L)) // 100MB
.build());
if (credentials().isPresent()) {
builder.setCredentialsProvider(FixedCredentialsProvider.create(credentials().get()));
}
Expand Down Expand Up @@ -151,8 +163,23 @@ public abstract static class Builder<OutputT> {
public abstract Builder<OutputT> setDeserializationSchema(
PubSubDeserializationSchema<OutputT> deserializationSchema);

public abstract Builder<OutputT> setMaxOutstandingMessagesCount(Long count);

public abstract Builder<OutputT> setMaxOutstandingMessagesBytes(Long bytes);

public abstract Builder<OutputT> setCredentials(Credentials credentials);

public abstract PubSubSource<OutputT> build();
abstract PubSubSource<OutputT> autoBuild();

public final PubSubSource<OutputT> build() {
PubSubSource<OutputT> source = autoBuild();
Preconditions.checkArgument(
source.maxOutstandingMessagesCount().or(1L) > 0,
"maxOutstandingMessagesCount, if set, must be a value greater than 0.");
Preconditions.checkArgument(
source.maxOutstandingMessagesBytes().or(1L) > 0,
"maxOutstandingMessagesBytes, if set, must be a value greater than 0.");
return source;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,44 @@ public void build_invalidSchema() throws Exception {
assertThrows(IllegalStateException.class, builder::build);
}

@Test
public void build_nullMaxOutstandingMessagesCountThrows() throws Exception {
assertThrows(
NullPointerException.class,
() -> PubSubSource.<String>builder().setMaxOutstandingMessagesCount(null));
}

@Test
public void build_negativeMaxOutstandingMessagesCountThrows() throws Exception {
PubSubSource.Builder<String> builder =
PubSubSource.<String>builder()
.setProjectName("project")
.setSubscriptionName("subscription")
.setDeserializationSchema(
PubSubDeserializationSchema.dataOnly(new SimpleStringSchema()))
.setMaxOutstandingMessagesCount(-1L);
assertThrows(IllegalArgumentException.class, builder::build);
}

@Test
public void build_nullMaxOutstandingMessagesBytesThrows() throws Exception {
assertThrows(
NullPointerException.class,
() -> PubSubSource.<String>builder().setMaxOutstandingMessagesBytes(null));
}

@Test
public void build_negativeMaxOutstandingMessagesBytesThrows() throws Exception {
PubSubSource.Builder<String> builder =
PubSubSource.<String>builder()
.setProjectName("project")
.setSubscriptionName("subscription")
.setDeserializationSchema(
PubSubDeserializationSchema.dataOnly(new SimpleStringSchema()))
.setMaxOutstandingMessagesBytes(-1L);
assertThrows(IllegalArgumentException.class, builder::build);
}

@Test
public void build_invalidCreds() throws Exception {
assertThrows(
Expand Down

0 comments on commit ba417fc

Please sign in to comment.