Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#3362] feat(flink-connector): Add the code skeleton for flink-connector #2635

Merged
merged 28 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/backend-integration-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ jobs:
- name: Backend Integration Test
id: integrationTest
run: |
./gradlew test --rerun-tasks -PskipTests -PtestMode=${{ matrix.test-mode }} -PjdkVersion=${{ matrix.java-version }} -PskipWebITs -P${{ matrix.backend }} -PskipPyClientITs
./gradlew test --rerun-tasks -PskipTests -PtestMode=${{ matrix.test-mode }} -PjdkVersion=${{ matrix.java-version }} -PskipWebITs -P${{ matrix.backend }} -PskipPyClientITs -PskipFlinkITs

- name: Upload integrate tests reports
uses: actions/upload-artifact@v3
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ jobs:
- server/**
- server-common/**
- spark-connector/**
- flink-connector/**
- trino-connector/**
- web/**
- docs/open-api/**
Expand Down
108 changes: 108 additions & 0 deletions .github/workflows/flink-integration-test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
name: Flink Integration Test

# Controls when the workflow will run
on:
# Triggers the workflow on push or pull request events but only for the "main" branch
push:
branches: [ "main", "branch-*" ]
pull_request:
branches: [ "main", "branch-*" ]

concurrency:
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }}
cancel-in-progress: true

jobs:
changes:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: dorny/paths-filter@v2
id: filter
with:
filters: |
source_changes:
- .github/**
- api/**
- bin/**
- catalogs/**
- clients/client-java/**
- clients/client-java-runtime/**
- clients/filesystem-hadoop3/**
- clients/filesystem-hadoop3-runtime/**
- common/**
- conf/**
- core/**
- dev/**
- gradle/**
- meta/**
- server/**
- server-common/**
- flink-connector/**
- docs/open-api/**
- build.gradle.kts
- gradle.properties
- gradlew
- setting.gradle.kts
outputs:
source_changes: ${{ steps.filter.outputs.source_changes }}

# Integration test for AMD64 architecture
test-amd64-arch:
needs: changes
if: needs.changes.outputs.source_changes == 'true'
runs-on: ubuntu-latest
timeout-minutes: 30
strategy:
matrix:
architecture: [linux/amd64]
java-version: [ 8, 11, 17 ]
env:
PLATFORM: ${{ matrix.architecture }}
steps:
- uses: actions/checkout@v3

- uses: actions/setup-java@v3
with:
java-version: ${{ matrix.java-version }}
distribution: 'temurin'

- name: Set up QEMU
uses: docker/setup-qemu-action@v2

- name: Check required command
run: |
dev/ci/check_commands.sh

- name: Package Gravitino
run: |
./gradlew build -x test -PjdkVersion=${{ matrix.java-version }}
./gradlew compileDistribution -x test -PjdkVersion=${{ matrix.java-version }}

- name: Setup debug Github Action
if: ${{ contains(github.event.pull_request.labels.*.name, 'debug action') }}
uses: csexton/debugger-action@master

- name: Free up disk space
run: |
dev/ci/util_free_space.sh

- name: Flink Integration Test
id: integrationTest
run: |
./gradlew --rerun-tasks -PskipTests -PtestMode=embedded -PjdkVersion=${{ matrix.java-version }} :flink-connector:test --tests "com.datastrato.gravitino.flink.connector.integration.test.**"
./gradlew --rerun-tasks -PskipTests -PtestMode=deploy -PjdkVersion=${{ matrix.java-version }} :flink-connector:test --tests "com.datastrato.gravitino.flink.connector.integration.test.**"

- name: Upload integrate tests reports
uses: actions/upload-artifact@v3
if: ${{ (failure() && steps.integrationTest.outcome == 'failure') || contains(github.event.pull_request.labels.*.name, 'upload log') }}
with:
name: flink-connector-integrate-test-reports-${{ matrix.java-version }}
path: |
build/reports
flink-connector/build/flink-connector-integration-test.log
flink-connector/build/*.tar
distribution/package/logs/gravitino-server.out
distribution/package/logs/gravitino-server.log
catalogs/**/*.log
catalogs/**/*.tar
5 changes: 3 additions & 2 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ tasks {
subprojects.forEach() {
if (!it.name.startsWith("catalog") &&
!it.name.startsWith("client") && !it.name.startsWith("filesystem") && !it.name.startsWith("spark-connector") && it.name != "trino-connector" &&
it.name != "integration-test" && it.name != "bundled-catalog"
it.name != "integration-test" && it.name != "bundled-catalog" && it.name != "flink-connector"
) {
from(it.configurations.runtimeClasspath)
into("distribution/package/libs")
Expand All @@ -634,7 +634,8 @@ tasks {
!it.name.startsWith("spark-connector") &&
it.name != "trino-connector" &&
it.name != "integration-test" &&
it.name != "bundled-catalog"
it.name != "bundled-catalog" &&
it.name != "flink-connector"
) {
dependsOn("${it.name}:build")
from("${it.name}/build/libs")
Expand Down
131 changes: 131 additions & 0 deletions flink-connector/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
plugins {
`maven-publish`
id("java")
id("idea")
}

repositories {
mavenCentral()
}

val flinkVersion: String = libs.versions.flink.get()
val scalaVersion: String = project.properties["scalaVersion"] as? String ?: extra["defaultScalaVersion"].toString()

dependencies {
implementation(project(":api"))
implementation(project(":common"))
implementation(project(":core"))
implementation(project(":clients:client-java"))
implementation(project(":catalogs:bundled-catalog", configuration = "shadow"))

implementation(libs.bundles.log4j)
implementation(libs.commons.lang3)
implementation(libs.guava)
implementation(libs.httpclient5)
implementation(libs.jackson.databind)
implementation(libs.jackson.annotations)
implementation(libs.jackson.datatype.jdk8)
implementation(libs.jackson.datatype.jsr310)

implementation("org.apache.flink:flink-connector-hive_$scalaVersion:$flinkVersion")
implementation("org.apache.flink:flink-table-common:$flinkVersion")
implementation("org.apache.flink:flink-table-api-java:$flinkVersion")

implementation(libs.hive2.exec) {
artifact {
classifier = "core"
}
exclude("com.fasterxml.jackson.core")
exclude("com.google.code.findbugs", "jsr305")
exclude("com.google.protobuf")
exclude("org.apache.avro")
exclude("org.apache.calcite")
exclude("org.apache.calcite.avatica")
exclude("org.apache.curator")
exclude("org.apache.hadoop", "hadoop-yarn-server-resourcemanager")
exclude("org.apache.logging.log4j")
exclude("org.apache.zookeeper")
exclude("org.eclipse.jetty.aggregate", "jetty-all")
exclude("org.eclipse.jetty.orbit", "javax.servlet")
exclude("org.openjdk.jol")
exclude("org.pentaho")
exclude("org.slf4j")
}

testAnnotationProcessor(libs.lombok)

testCompileOnly(libs.lombok)
testImplementation(project(":integration-test-common", "testArtifacts"))
testImplementation(project(":server"))
testImplementation(project(":server-common"))
testImplementation(libs.junit.jupiter.api)
testImplementation(libs.junit.jupiter.params)
testImplementation(libs.mockito.core)
testImplementation(libs.sqlite.jdbc)
testImplementation(libs.testcontainers)
testImplementation(libs.testcontainers.junit.jupiter)
testImplementation(libs.testcontainers.mysql)

testImplementation(libs.hadoop2.common) {
exclude("*")
}
testImplementation(libs.hadoop2.mapreduce.client.core) {
exclude("*")
}
testImplementation(libs.hive2.common) {
exclude("org.eclipse.jetty.aggregate", "jetty-all")
exclude("org.eclipse.jetty.orbit", "javax.servlet")
}
testImplementation(libs.hive2.metastore) {
exclude("co.cask.tephra")
exclude("com.github.joshelser")
exclude("com.google.code.findbugs", "jsr305")
exclude("com.google.code.findbugs", "sr305")
exclude("com.tdunning", "json")
exclude("com.zaxxer", "HikariCP")
exclude("io.dropwizard.metricss")
exclude("javax.transaction", "transaction-api")
exclude("org.apache.avro")
exclude("org.apache.curator")
exclude("org.apache.hbase")
exclude("org.apache.hadoop", "hadoop-yarn-server-resourcemanager")
exclude("org.apache.logging.log4j")
exclude("org.apache.parquet", "parquet-hadoop-bundle")
exclude("org.apache.zookeeper")
exclude("org.eclipse.jetty.aggregate", "jetty-all")
exclude("org.eclipse.jetty.orbit", "javax.servlet")
exclude("org.slf4j")
}
testImplementation("org.apache.flink:flink-table-api-bridge-base:$flinkVersion") {
exclude("commons-cli", "commons-cli")
exclude("commons-io", "commons-io")
exclude("com.google.code.findbugs", "jsr305")
}
testImplementation("org.apache.flink:flink-table-planner_$scalaVersion:$flinkVersion")

testRuntimeOnly(libs.junit.jupiter.engine)
}

tasks.test {
val skipUTs = project.hasProperty("skipTests")
if (skipUTs) {
// Only run integration tests
include("**/integration/**")
}

val skipITs = project.hasProperty("skipITs")
val skipFlinkITs = project.hasProperty("skipFlinkITs")
if (skipITs || skipFlinkITs) {
// Exclude integration tests
exclude("**/integration/**")
} else {
dependsOn(tasks.jar)

val init = project.extra.get("initIntegrationTest") as (Test) -> Unit
init(this)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/

package com.datastrato.gravitino.flink.connector;

import java.util.Map;
import org.apache.flink.configuration.Configuration;

/**
* PropertiesConverter is used to convert properties between Flink properties and Gravitino
* properties
*/
public interface PropertiesConverter {

String FLINK_PROPERTY_PREFIX = "flink.bypass.";

/**
* Converts properties from application provided properties and Flink connector properties to
* Gravitino properties.
*
* @param flinkConf The configuration provided by Flink.
* @return properties for the Gravitino connector.
*/
default Map<String, String> toGravitinoCatalogProperties(Configuration flinkConf) {
return flinkConf.toMap();
}

/**
* Converts properties from Gravitino properties to Flink connector properties.
*
* @param gravitinoProperties The properties provided by Gravitino.
* @return properties for the Flink connector.
*/
default Map<String, String> toFlinkCatalogProperties(Map<String, String> gravitinoProperties) {
return gravitinoProperties;
}
}
Loading
Loading