Skip to content

Commit

Permalink
Merge pull request #355 from matt-kwong/emu
Browse files Browse the repository at this point in the history
Add PubSubSource integration test using CPS emulator
  • Loading branch information
clmccart authored Mar 14, 2024
2 parents ba417fc + 2d917a1 commit cc44b48
Show file tree
Hide file tree
Showing 6 changed files with 346 additions and 1 deletion.
99 changes: 99 additions & 0 deletions flink-connector/flink-connector-gcp-pubsub-e2e-tests/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>com.google.pubsub.flink</groupId>
<artifactId>flink-connector-parent</artifactId>
<version>0.0.0</version>
</parent>

<artifactId>flink-connector-gcp-pubsub-e2e-tests</artifactId>
<name>Google Cloud Pub/Sub Connector E2E Tests</name>

<packaging>jar</packaging>

<dependencies>
<dependency>
<groupId>com.google.pubsub.flink</groupId>
<artifactId>flink-connector-gcp-pubsub</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>${flink.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils-junit</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-pubsub</artifactId>
<!-- Version is pulled from google-cloud-bom (loaded via the libraries-bom) -->
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-core</artifactId>
<exclusions>
<exclusion>
<groupId>com.google.api</groupId>
<artifactId>gax</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</exclusion>
<exclusion>
<groupId>com.google.http-client</groupId>
<artifactId>google-http-client</artifactId>
</exclusion>
</exclusions>
<!-- Version is pulled from google-cloud-bom (loaded via the libraries-bom) -->
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<!-- Don't publish the E2E tests -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-deploy-plugin</artifactId>
<version>3.1.0</version>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<groupId>org.sonatype.plugins</groupId>
<artifactId>nexus-staging-maven-plugin</artifactId>
<version>1.6.13</version>
<configuration>
<skipNexusStagingDeployMojo>true</skipNexusStagingDeployMojo>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.pubsub.flink;

import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.api.gax.rpc.NotFoundException;
import com.google.api.gax.rpc.TransportChannel;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.SubscriptionAdminClient;
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.PushConfig;
import com.google.pubsub.v1.Subscription;
import com.google.pubsub.v1.SubscriptionName;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import io.grpc.ManagedChannelBuilder;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Helper for local E2E testing against a Cloud Pub/Sub emulator. */
public final class PubSubEmulatorHelper {
private static final Logger LOG = LoggerFactory.getLogger(PubSubEmulatorHelper.class);

private static TransportChannel channel;
private static TransportChannelProvider channelProvider;

private static TopicAdminClient topicAdminClient;
private static SubscriptionAdminClient subscriptionAdminClient;

private PubSubEmulatorHelper() {}

public static String getEmulatorEndpoint() throws IllegalStateException {
String hostport = System.getenv("PUBSUB_EMULATOR_HOST");
if (hostport == null) {
throw new IllegalStateException("Environment variable PUBSUB_EMULATOR_HOST not set.");
}
return hostport;
}

public static TransportChannelProvider getTransportChannelProvider()
throws IllegalStateException {
if (channel == null) {
channel =
GrpcTransportChannel.create(
ManagedChannelBuilder.forTarget(getEmulatorEndpoint()).usePlaintext().build());
}
if (channelProvider == null) {
channelProvider = FixedTransportChannelProvider.create(channel);
}
return channelProvider;
}

public static CredentialsProvider getCredentialsProvider() {
return NoCredentialsProvider.create();
}

public static Topic createTopic(TopicName topic) throws IOException {
deleteTopic(topic);
LOG.info("CreateTopic {}", topic);
return getTopicAdminClient().createTopic(topic);
}

public static void deleteTopic(TopicName topic) throws IOException {
try {
getTopicAdminClient().getTopic(topic);
} catch (NotFoundException e) {
return;
}

LOG.info("DeleteTopic {}", topic);
getTopicAdminClient().deleteTopic(topic);
}

public static Subscription createSubscription(SubscriptionName subscription, TopicName topic)
throws IOException {
deleteSubscription(subscription);
LOG.info("CreateSubscription {} on topic {}", subscription, topic);
return getSubscriptionAdminClient()
.createSubscription(
subscription, topic, PushConfig.getDefaultInstance(), /* ackDeadlineSeconds= */ 10);
}

public static void deleteSubscription(SubscriptionName subscription) throws IOException {
try {
getSubscriptionAdminClient().getSubscription(subscription);
} catch (NotFoundException e) {
return;
}

LOG.info("DeleteSubscription {}", subscription);
getSubscriptionAdminClient().deleteSubscription(subscription);
}

public static void publishMessages(TopicName topic, List<String> payloads)
throws ExecutionException, InterruptedException, IOException {
Publisher publisher =
Publisher.newBuilder(topic)
.setChannelProvider(getTransportChannelProvider())
.setCredentialsProvider(getCredentialsProvider())
.build();
for (final String payload : payloads) {
publisher
.publish(PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(payload)).build())
.get();
}
}

private static TopicAdminClient getTopicAdminClient() throws IOException {
if (topicAdminClient == null) {
topicAdminClient =
TopicAdminClient.create(
TopicAdminSettings.newBuilder()
.setTransportChannelProvider(getTransportChannelProvider())
.setCredentialsProvider(getCredentialsProvider())
.build());
}
return topicAdminClient;
}

private static SubscriptionAdminClient getSubscriptionAdminClient() throws IOException {
if (subscriptionAdminClient == null) {
subscriptionAdminClient =
SubscriptionAdminClient.create(
SubscriptionAdminSettings.newBuilder()
.setTransportChannelProvider(getTransportChannelProvider())
.setCredentialsProvider(getCredentialsProvider())
.build());
}
return subscriptionAdminClient;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright 2023 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.pubsub.flink;

import com.google.pubsub.v1.SubscriptionName;
import com.google.pubsub.v1.TopicName;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.TestLogger;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

/** E2E test for PubSubSource using a local Pub/Sub emulator. */
public class PubSubSourceEmulatorTest extends TestLogger {
private final TopicName topic = TopicName.of("test-project", "test-topic");
private final SubscriptionName subscription =
SubscriptionName.of("test-project", "test-subscription");

@Test
public void testPubSubSource() throws Exception {
PubSubEmulatorHelper.createTopic(topic);
PubSubEmulatorHelper.createSubscription(subscription, topic);
List<String> messageInput = Arrays.asList("msg-1", "msg-2", "msg-3", "msg-4", "msg-5");
PubSubEmulatorHelper.publishMessages(topic, messageInput);

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(100);
env.setParallelism(2);

DataStream<String> stream =
env.fromSource(
PubSubSource.<String>builder()
.setDeserializationSchema(
PubSubDeserializationSchema.dataOnly(new SimpleStringSchema()))
.setProjectName(subscription.getProject())
.setSubscriptionName(subscription.getSubscription())
.build(),
WatermarkStrategy.noWatermarks(),
"PubSubEmulatorSource");
CloseableIterator<String> iterator = stream.executeAndCollect();
List<String> messageOutput = new ArrayList<>();
while (messageOutput.size() < messageInput.size() && iterator.hasNext()) {
messageOutput.add(iterator.next());
}
iterator.close();

assertEquals(
"Did not receive expected number of messages.", messageInput.size(), messageOutput.size());
for (final String msg : messageInput) {
assertTrue("Missing " + msg, messageOutput.contains(msg));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.core.FixedCredentialsProvider;
import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.auth.Credentials;
import com.google.auto.value.AutoValue;
import com.google.cloud.pubsub.v1.MessageReceiver;
Expand All @@ -34,6 +37,7 @@
import com.google.pubsub.flink.internal.source.split.SubscriptionSplitSerializer;
import com.google.pubsub.flink.proto.PubSubEnumeratorCheckpoint;
import com.google.pubsub.v1.ProjectSubscriptionName;
import io.grpc.ManagedChannelBuilder;
import java.util.HashMap;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
Expand Down Expand Up @@ -82,6 +86,16 @@ Subscriber createSubscriber(MessageReceiver receiver) {
if (credentials().isPresent()) {
builder.setCredentialsProvider(FixedCredentialsProvider.create(credentials().get()));
}

// Assume we should connect to the Pub/Sub emulator if PUBSUB_EMULATOR_HOST is set.
String emulatorEndpoint = System.getenv("PUBSUB_EMULATOR_HOST");
if (emulatorEndpoint != null) {
builder.setCredentialsProvider(NoCredentialsProvider.create());
builder.setChannelProvider(
FixedTransportChannelProvider.create(
GrpcTransportChannel.create(
ManagedChannelBuilder.forTarget(emulatorEndpoint).usePlaintext().build())));
}
return builder.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public synchronized Optional<PubsubMessage> pullMessage() throws Throwable {
if (permanentError.isPresent()) {
throw permanentError.get();
}
if (messages.size() == 0) {
if (messages.isEmpty()) {
return Optional.absent();
}
return Optional.of(messages.pop());
Expand Down
1 change: 1 addition & 0 deletions flink-connector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
<modules>
<module>flink-connector-gcp-pubsub</module>
<module>flink-examples-gcp-pubsub</module>
<module>flink-connector-gcp-pubsub-e2e-tests</module>
</modules>
<properties>
<flink.version>1.17.0</flink.version>
Expand Down

0 comments on commit cc44b48

Please sign in to comment.