From a32f0be3ee8742d895fb2b51c8d1242165ed30f1 Mon Sep 17 00:00:00 2001 From: Matt Kwong Date: Thu, 22 Feb 2024 18:49:06 +0000 Subject: [PATCH 1/3] Add initial documentation for Flink connector --- README.md | 4 + flink-connector/README.md | 51 ++++++ .../docs/connectors/datastream/pubsub.md | 158 ++++++++++++++++++ 3 files changed, 213 insertions(+) create mode 100644 flink-connector/README.md create mode 100644 flink-connector/docs/content/docs/connectors/datastream/pubsub.md diff --git a/README.md b/README.md index d9e57531..240f8f6e 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,10 @@ available are: If you're having a problem building with those versions, please reach out to us with your issue or solution. * [Ordering Keys Prober](https://github.com/GoogleCloudPlatform/pubsub/tree/master/ordering-keys-prober): A reference implementation for how to use ordering keys effectively. +* UNDER DEVELOPMENT - + [Flink Connector](https://github.com/GoogleCloudPlatform/pubsub/tree/master/flink-connector): + Send and receive messages from [Apache Flink](https://flink.apache.org/). + The connector is currently available for experimental usage. * DEPRECATED - [Kafka Connector](https://github.com/GoogleCloudPlatform/pubsub/tree/master/kafka-connector): Send and receive messages from [Apache Kafka](http://kafka.apache.org). *The connector will have future release from its own [repo](https://github.com/googleapis/java-pubsub-group-kafka-connector/).* diff --git a/flink-connector/README.md b/flink-connector/README.md new file mode 100644 index 00000000..605bf1b9 --- /dev/null +++ b/flink-connector/README.md @@ -0,0 +1,51 @@ +# Apache Flink Google Cloud Pub/Sub Connector (Under Development) + +The official Apache Flink Google Cloud Pub/Sub connector is located at +[https://github.com/apache/flink-connector-gcp-pubsub](https://github.com/apache/flink-connector-gcp-pubsub). + +This repository contains an **unofficial** connector that is **under +development** by the owners of Google Cloud Pub/Sub. The connector is available +to preview, but there are currently no performance or stability guarantees. + +Some motivations behind creating this connector: + +* Create a Google Cloud Pub/Sub source that implements the Source interface + introduced in + [FLIP-27](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface) +* Leverage + [StreamingPull API](https://cloud.google.com/pubsub/docs/pull#streamingpull_api) + to achieve high throughput and low latency in this connector source +* Add support for automatic + [message lease extensions](https://cloud.google.com/pubsub/docs/lease-management) + to enable setting longer checkpointing intervals + +## Apache Flink + +Apache Flink is an open source stream processing framework with powerful stream- +and batch-processing capabilities. + +Learn more about Flink at [https://flink.apache.org/](https://flink.apache.org/) + +## Building from Source + +Prerequisites: + +* Unix-like environment (we use Linux, Mac OS X) +* Git +* Maven (we recommend version 3.8.6) +* Java 11 + +``` +git clone https://github.com/GoogleCloudPlatform/pubsub.git +cd pubsub/flink-connector +mvn clean package -DskipTests +``` + +The resulting jars can be found in the `target` directory of the respective +module. + +## Documentation + +The documentation of Apache Flink is located on the website: +[https://flink.apache.org](https://flink.apache.org) or in the `docs/` +directory. diff --git a/flink-connector/docs/content/docs/connectors/datastream/pubsub.md b/flink-connector/docs/content/docs/connectors/datastream/pubsub.md new file mode 100644 index 00000000..4d32d540 --- /dev/null +++ b/flink-connector/docs/content/docs/connectors/datastream/pubsub.md @@ -0,0 +1,158 @@ +# Google Cloud Pub/Sub Connector + +This connector provides access to reading data from and writing data to +[Google Cloud Pub/Sub](https://cloud.google.com/pubsub). + +## Pub/Sub Source + +Pub/Sub source streams data from a single Google Cloud Pub/Sub subscription. The +sample below shows the minimal configurations required to build Pub/Sub source. + +```java +PubSubSource.builder() + .setDeserializationSchema(PubSubDeserializationSchema.dataOnly(new SimpleStringSchema())) + .setProjectName("my-project-name") + .setSubscriptionName("my-subscription-name") + .build() +``` + +### Subscription + +Pub/Sub source only supports streaming messages from +[pull subscriptions](https://cloud.google.com/pubsub/docs/pull). Push +subscriptions are not supported. + +Pub/Sub source can create parallel readers to to the same subscription. Since +Google Cloud Pub/Sub has no notion of subscription partitions or splits, a +message can be received by any reader. Google Cloud Pub/Sub automatically load +balances message delivery across readers. + +### Deserialization Schema + +`PubSubDeserializationSchema` is required to define how +[`PubsubMessage`](https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage) +is deserialized into an output that is suitable for processing. + +For convenience, `PubSubDeserializationSchema.dataOnly` can be used if only the +field `PubsubMessage.data` is required for processing. + +### Boundedness + +Pub/Sub source streams unbounded data, only stopping when a Flink job stops or +fails. + +### Checkpointing + +Checkpointing is required to use Pub/Sub source. When a checkpoint completes, +all messages delivered before the last successful checkpoint are acknowledged to +Google Cloud Pub/Sub. In case of failure, messages delivered after the last +successful checkpoint are unacknowledged and will automatically be redelivered. +Note that there is no message delivery state stored in checkpoints, so retained +checkpoints are not necessary to resume using Pub/Sub source. + +### Flow Control + +Each Pub/Sub source subtask manages a `StreamingPull` connection to Google Cloud +Pub/Sub. That is, a Pub/Sub source with parallelism of 20 manages 20 separate +`StreamingPull` connections to Google Cloud Pub/Sub. The flow control settings +described in this section are applied to **individual** connections. + +Several Pub/Sub source options configure flow control. These options are +illustrated below: + +```java +PubSubSource.builder() + // Allow up to 10,000 message deliveries per checkpoint interval. + .setMaxOutstandingMessagesCount(10_000L) + // Allow up to 1000 MB in cumulatitive message size per checkpoint interval. + .setMaxOutstandingMessagesBytes(1000L * 1024L * 1024L) // 1000 MB +``` + +Google Cloud Pub/Sub servers pause message delivery when a flow control setting +is exceeded. A message is considered outstanding from when Google +Cloud Pub/Sub delivers it until the subscriber acknowledges it. Since +outstanding messages are acknowledged when a checkpoint completes, flow control +limits for outstanding messages are effectively per-checkpoint interval limits. + +### Message Leasing + +Pub/Sub source automatically extends the acknowledge deadline of messages. This +means a checkpointing interval can be longer than the acknowledge deadline +without causing messages redelivery. Note that acknowledge deadlines can be +extended to at most 1h. + +### Performance Considerations + +- Infrequent checkpointing can cause performance issues, including subscription + backlog growth and increased rate of duplicate message delivery. +- `StreamingPull` connections have a 10 MB/s throughput limit. See + [Pub/Sub quotas and limits](https://cloud.google.com/pubsub/quotas) for all + restrictions. + + + +### All Options + +#### Required Builder Options + + + + + + + + + + + + + + + + + + + + + + + + + + +
Builder MethodDefault ValueDescription
setSubscriptionName(String subscriptionName)(none)The ID of the subscription from which Pub/Sub source consumes messages.
setProjectName(String projectName)(none)The ID of the GCP project that owns the subscription from which Pub/Sub source consumes messages.
setDeserializationSchema(PubSubDeserializationSchema<OutputT> deserializationSchema)(none)How PubsubMessage is deserialized when Pub/Sub source receives a message.
+ +#### Optional Builder Options + + + + + + + + + + + + + + + + + + + + + + + + + + +
Builder MethodDefault ValueDescription
setMaxOutstandingMessagesCount(Long count)1000LThe maximum number of messages that can be delivered to a Pub/Sub source subtask in a checkpoint interval.
setMaxOutstandingMessagesBytes(Long bytes)100L * 1024L * 1024L (100 MB)The maximum number of cumulative bytes that can be delivered to a Pub/Sub source subtask in a checkpoint interval.
setCredentials(Credentials credentials)(none)The credentials attached to requests sent to Google Cloud Pub/Sub. The identity in the credentials must be authorized to pull messages from the subscription. If not set, then Pub/Sub source attempts to use environment-configured credentials.
+ + + + + + From 7250e7e492f0ba4e29b56647adb0b0ef66eff20c Mon Sep 17 00:00:00 2001 From: Matt Kwong Date: Wed, 6 Mar 2024 19:56:08 +0000 Subject: [PATCH 2/3] Address PR comments --- README.md | 2 +- flink-connector/README.md | 14 ++++++++++++++ .../docs/connectors/datastream/pubsub.md | 19 ++++++++++++++++--- 3 files changed, 31 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 240f8f6e..5067c2b0 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ available are: A reference implementation for how to use ordering keys effectively. * UNDER DEVELOPMENT - [Flink Connector](https://github.com/GoogleCloudPlatform/pubsub/tree/master/flink-connector): - Send and receive messages from [Apache Flink](https://flink.apache.org/). + Send and receive messages to/from [Apache Flink](https://flink.apache.org/). The connector is currently available for experimental usage. * DEPRECATED - [Kafka Connector](https://github.com/GoogleCloudPlatform/pubsub/tree/master/kafka-connector): Send and receive messages from [Apache Kafka](http://kafka.apache.org). *The diff --git a/flink-connector/README.md b/flink-connector/README.md index 605bf1b9..7ec12fe0 100644 --- a/flink-connector/README.md +++ b/flink-connector/README.md @@ -44,6 +44,20 @@ mvn clean package -DskipTests The resulting jars can be found in the `target` directory of the respective module. +Flink applications built with Maven can include the connector as a dependency in +their pom.xml file by adding: + +```xml + + com.google.pubsub.flink + flink-connector-gcp-pubsub + 0.0.0 + +``` + +Learn more about Flink connector packaging +[here](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/configuration/overview/). + ## Documentation The documentation of Apache Flink is located on the website: diff --git a/flink-connector/docs/content/docs/connectors/datastream/pubsub.md b/flink-connector/docs/content/docs/connectors/datastream/pubsub.md index 4d32d540..85225838 100644 --- a/flink-connector/docs/content/docs/connectors/datastream/pubsub.md +++ b/flink-connector/docs/content/docs/connectors/datastream/pubsub.md @@ -27,6 +27,21 @@ Google Cloud Pub/Sub has no notion of subscription partitions or splits, a message can be received by any reader. Google Cloud Pub/Sub automatically load balances message delivery across readers. +### Google Credentials + +By default, Pub/Sub source uses +[Application Default Credentials](https://cloud.google.com/docs/authentication/application-default-credentials) +for [authentication](https://cloud.google.com/docs/authentication) to Google +Cloud Pub/Sub. Credentials can be manually set using +`PubSubSource.builder().setCredentials(...)`, which takes precedence +over Application Default Credentials. + +The authenticating principal must be +[authorized](https://cloud.google.com/pubsub/docs/access-control) to pull +messages from the subscription. Authorization is managed through Google +[IAM](https://cloud.google.com/security/products/iam) and can be configured on +individual subscriptions. + ### Deserialization Schema `PubSubDeserializationSchema` is required to define how @@ -146,13 +161,11 @@ extended to at most 1h. setCredentials(Credentials credentials) (none) - The credentials attached to requests sent to Google Cloud Pub/Sub. The identity in the credentials must be authorized to pull messages from the subscription. If not set, then Pub/Sub source attempts to use environment-configured credentials. + The credentials attached to requests sent to Google Cloud Pub/Sub. The identity in the credentials must be authorized to pull messages from the subscription. If not set, then Pub/Sub source uses Application Default Credentials. - - From 6b26592ec7c3766e8da04ab49054a090b8c26566 Mon Sep 17 00:00:00 2001 From: Matt Kwong Date: Thu, 14 Mar 2024 17:09:45 +0000 Subject: [PATCH 3/3] Add sink/it testing doc --- flink-connector/README.md | 2 +- .../docs/connectors/datastream/pubsub.md | 201 +++++++++++++++--- 2 files changed, 175 insertions(+), 28 deletions(-) diff --git a/flink-connector/README.md b/flink-connector/README.md index 7ec12fe0..0a8c2b29 100644 --- a/flink-connector/README.md +++ b/flink-connector/README.md @@ -35,7 +35,7 @@ Prerequisites: * Maven (we recommend version 3.8.6) * Java 11 -``` +```sh git clone https://github.com/GoogleCloudPlatform/pubsub.git cd pubsub/flink-connector mvn clean package -DskipTests diff --git a/flink-connector/docs/content/docs/connectors/datastream/pubsub.md b/flink-connector/docs/content/docs/connectors/datastream/pubsub.md index 85225838..04fc7b2e 100644 --- a/flink-connector/docs/content/docs/connectors/datastream/pubsub.md +++ b/flink-connector/docs/content/docs/connectors/datastream/pubsub.md @@ -1,8 +1,71 @@ # Google Cloud Pub/Sub Connector -This connector provides access to reading data from and writing data to +This connector provides access to reading data from and writing data to [Google Cloud Pub/Sub](https://cloud.google.com/pubsub). +## Usage + +This library is currently not published to any repositories. Usage requires +building it from source and packaging it with your Flink application. + +Prerequisites: + +* Unix-like environment (we use Linux, Mac OS X) +* Git +* Maven (we recommend version 3.8.6) +* Java 11 + +```sh +git clone https://github.com/GoogleCloudPlatform/pubsub.git +cd pubsub/flink-connector +mvn clean package -DskipTests +``` + +The resulting jars can be found in the `target` directory of the respective +module. + +Flink applications built with Maven can include the connector as a dependency in +their pom.xml file by adding: + +```xml + + com.google.pubsub.flink + flink-connector-gcp-pubsub + 0.0.0 + +``` + +An example Flink application can be found under +[`flink-connector/flink-examples-gcp-pubsub/`](https://github.com/GoogleCloudPlatform/pubsub/tree/master/flink-connector/flink-examples-gcp-pubsub). + +Learn more about Flink connector packaging +[here](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/configuration/overview/). + +## Configuring Access to Google Cloud Pub/Sub + +By default, the connector library uses +[Application Default Credentials](https://cloud.google.com/docs/authentication/application-default-credentials) +when [authenticating](https://cloud.google.com/docs/authentication) to Google +Cloud Pub/Sub. + +Credentials that are set in the source and sink builders take precedence over +Application Default Credentials. Example of setting credentials in builders: + +```java +// Set credentials used by Pub/Sub source to pull messages from a subscription. +`PubSubSource.builder().setCredentials(...)` + +// Set credentials used by Pub/Sub sink to publish messages to a topic. +PubSubSink.builder().setCredentials(...) +``` + +The authenticating principal must be +[authorized](https://cloud.google.com/pubsub/docs/access-control) to pull +messages from a subscription when using Pub/Sub source or publish messages to a +topic when using Pub/Sub sink. Authorization is managed through Google +[IAM](https://cloud.google.com/security/products/iam) and can be configured +either at the Google Cloud project-level or the Pub/Sub resource-level. + ## Pub/Sub Source Pub/Sub source streams data from a single Google Cloud Pub/Sub subscription. The @@ -27,21 +90,6 @@ Google Cloud Pub/Sub has no notion of subscription partitions or splits, a message can be received by any reader. Google Cloud Pub/Sub automatically load balances message delivery across readers. -### Google Credentials - -By default, Pub/Sub source uses -[Application Default Credentials](https://cloud.google.com/docs/authentication/application-default-credentials) -for [authentication](https://cloud.google.com/docs/authentication) to Google -Cloud Pub/Sub. Credentials can be manually set using -`PubSubSource.builder().setCredentials(...)`, which takes precedence -over Application Default Credentials. - -The authenticating principal must be -[authorized](https://cloud.google.com/pubsub/docs/access-control) to pull -messages from the subscription. Authorization is managed through Google -[IAM](https://cloud.google.com/security/products/iam) and can be configured on -individual subscriptions. - ### Deserialization Schema `PubSubDeserializationSchema` is required to define how @@ -84,10 +132,10 @@ PubSubSource.builder() ``` Google Cloud Pub/Sub servers pause message delivery when a flow control setting -is exceeded. A message is considered outstanding from when Google -Cloud Pub/Sub delivers it until the subscriber acknowledges it. Since -outstanding messages are acknowledged when a checkpoint completes, flow control -limits for outstanding messages are effectively per-checkpoint interval limits. +is exceeded. A message is considered outstanding from when Google Cloud Pub/Sub +delivers it until the subscriber acknowledges it. Since outstanding messages are +acknowledged when a checkpoint completes, flow control limits for outstanding +messages are effectively per-checkpoint interval limits. ### Message Leasing @@ -98,11 +146,12 @@ extended to at most 1h. ### Performance Considerations -- Infrequent checkpointing can cause performance issues, including subscription - backlog growth and increased rate of duplicate message delivery. -- `StreamingPull` connections have a 10 MB/s throughput limit. See - [Pub/Sub quotas and limits](https://cloud.google.com/pubsub/quotas) for all - restrictions. +- Infrequent checkpointing can cause performance issues, including + subscription backlog growth and increased rate of duplicate message + delivery. +- `StreamingPull` connections have a 10 MB/s throughput limit. See + [Pub/Sub quotas and limits](https://cloud.google.com/pubsub/quotas) for all + restrictions. @@ -166,6 +215,104 @@ extended to at most 1h. - +## Pub/Sub Sink + +Pub/Sub sink publishes data to a single Google Cloud Pub/Sub topic. The sample +below shows the minimal configurations required to build Pub/Sub sink. + +```java +PubSubSink.builder() + .setSerializationSchema( + PubSubSerializationSchema.dataOnly(new SimpleStringSchema())) + .setProjectName("my-project-name") + .setTopicName("my-topic-name") + .build() +``` + +### Serialization Schema + +`PubSubSerializationSchema` is required to define how incoming data is +serialized to +[`PubsubMessage`](https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage). + +For convenience, `PubSubSserializationSchema.dataOnly(SerializationSchema +schema)` can be used if `PubsubMessage.data` is the only field used when +publishing messages. + +### Publish Guarantees + +There are currently no guarantees that messages are published. Pub/Sub sink uses +a fire-and-forget publishing strategy to maximize throughput, at the cost of +possible data loss. + +### All Options + +#### Required Builder Options + + + + + + + + + + + + + + + + + + + + + + + + + + +
Builder MethodDefault ValueDescription
setTopicName(String topicName)(none)The ID of the topic to which Pub/Sub sink publishes messages.
setProjectName(String projectName)(none)The ID of the GCP project that owns the topic to which Pub/Sub sink publishes messages.
setSerializationSchema(PubSubSerializationSchema<T> serializationSchema)(none)How incoming data is serialized to PubsubMessage.
+ +#### Optional Builder Options + + + + + + + + + + + + + + + + +
Builder MethodDefault ValueDescription
setCredentials(Credentials credentials)(none)The credentials attached to requests sent to Google Cloud Pub/Sub. The identity in the credentials must be authorized to publish messages to the topic. If not set, then Pub/Sub sink uses Application Default Credentials.
- +## Integration Testing + +Instead of integration tests reading from and writing to production Google Cloud +Pub/Sub, tests can run against a local instance of the +[Pub/Sub emulator](https://cloud.google.com/pubsub/docs/emulator). Pub/Sub +source and sink will automatically try connecting to the emulator if the +environment variable `PUBSUB_EMULATOR_HOST` is set. + +Steps to run tests against the Pub/Sub emulator: + +1. Ensure that the + [required dependencies](https://cloud.google.com/pubsub/docs/emulator#before-you-begin) + and + [emulator](https://cloud.google.com/pubsub/docs/emulator#install_the_emulator) + are installed. +2. Start the emulator using the + [Google Cloud CLI](https://cloud.google.com/pubsub/docs/emulator#start). +3. Run the test with the environment variable `PUBSUB_EMULATOR_HOST` set to + where the emulator is running. For example, if the emulator is listening on + port 8085 and running on the same machine as the test, set + `PUBSUB_EMULATOR_HOST=localhost:8085`.