Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#6233] feat(flink): flink jdbc catalog #6543

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
4 changes: 2 additions & 2 deletions flink-connector/flink/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ repositories {
var paimonVersion: String = libs.versions.paimon.get()
val flinkVersion: String = libs.versions.flink.get()
val flinkMajorVersion: String = flinkVersion.substringBeforeLast(".")

val icebergVersion: String = libs.versions.iceberg.get()

// The Flink only support scala 2.12, and all scala api will be removed in a future version.
Expand All @@ -56,6 +55,7 @@ dependencies {
compileOnly("org.apache.flink:flink-table-common:$flinkVersion")
compileOnly("org.apache.flink:flink-table-api-java:$flinkVersion")
compileOnly("org.apache.paimon:paimon-flink-1.18:$paimonVersion")
compileOnly(libs.flinkjdbc)

compileOnly(libs.hive2.exec) {
artifact {
Expand Down Expand Up @@ -97,7 +97,7 @@ dependencies {
testImplementation(libs.testcontainers.junit.jupiter)
testImplementation(libs.testcontainers.mysql)
testImplementation(libs.metrics.core)

testImplementation(libs.flinkjdbc)
testImplementation("org.apache.iceberg:iceberg-flink-runtime-$flinkMajorVersion:$icebergVersion")
testImplementation("org.apache.flink:flink-connector-hive_$scalaVersion:$flinkVersion")
testImplementation("org.apache.flink:flink-table-common:$flinkVersion")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.flink.connector.jdbc;

import java.util.Map;
import java.util.Optional;
import org.apache.flink.connector.jdbc.catalog.JdbcCatalog;
import org.apache.flink.connector.jdbc.catalog.factory.JdbcCatalogFactory;
import org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.table.factories.Factory;
import org.apache.gravitino.flink.connector.PartitionConverter;
import org.apache.gravitino.flink.connector.PropertiesConverter;
import org.apache.gravitino.flink.connector.catalog.BaseCatalog;

/**
* The GravitinoJdbcCatalog class is an implementation of the BaseCatalog class that is used to
* proxy the JdbcCatalog class.
*/
public class GravitinoJdbcCatalog extends BaseCatalog {

private final JdbcCatalog jdbcCatalog;

private final CatalogFactory.Context context;

protected GravitinoJdbcCatalog(
CatalogFactory.Context context,
String defaultDatabase,
PropertiesConverter propertiesConverter,
PartitionConverter partitionConverter) {
super(context.getName(), defaultDatabase, propertiesConverter, partitionConverter);
this.context = context;
JdbcCatalogFactory jdbcCatalogFactory = new JdbcCatalogFactory();
this.jdbcCatalog = (JdbcCatalog) jdbcCatalogFactory.createCatalog(context);
}

@Override
protected AbstractCatalog realCatalog() {
return jdbcCatalog;
}

@Override
public Optional<Factory> getFactory() {
return Optional.of(new JdbcDynamicTableFactory());
}

@Override
public CatalogBaseTable getTable(ObjectPath tablePath)
throws TableNotExistException, CatalogException {
CatalogBaseTable table = super.getTable(tablePath);
Map<String, String> contextOptions = context.getOptions();
Map<String, String> tableOptions = table.getOptions();
tableOptions.remove("engine");
tableOptions.put(
"url",
contextOptions.get(JdbcPropertiesConstants.FLINK_JDBC_URL)
+ "/"
+ tablePath.getDatabaseName());
tableOptions.put("table-name", tablePath.getObjectName());
tableOptions.put("username", contextOptions.get(JdbcPropertiesConstants.FLINK_JDBC_USER));
tableOptions.put("password", contextOptions.get(JdbcPropertiesConstants.FLINK_JDBC_PASSWORD));
return table;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.flink.connector.jdbc;

import java.util.Collections;
import java.util.Set;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.flink.connector.DefaultPartitionConverter;
import org.apache.gravitino.flink.connector.PartitionConverter;
import org.apache.gravitino.flink.connector.catalog.BaseCatalogFactory;
import org.apache.gravitino.flink.connector.utils.FactoryUtils;

/**
* Factory for creating instances of {@link GravitinoJdbcCatalog}. It will be created by SPI
* discovery in Flink.
*/
public abstract class GravitinoJdbcCatalogFactory implements BaseCatalogFactory {

@Override
public org.apache.flink.table.catalog.Catalog createCatalog(Context context) {
final FactoryUtil.CatalogFactoryHelper helper =
FactoryUtils.createCatalogFactoryHelper(this, context);
String defaultDatabase =
helper.getOptions().get(GravitinoJdbcCatalogFactoryOptions.DEFAULT_DATABASE);
return new GravitinoJdbcCatalog(
context, defaultDatabase, propertiesConverter(), partitionConverter());
}

@Override
public Catalog.Type gravitinoCatalogType() {
return Catalog.Type.RELATIONAL;
}

@Override
public PartitionConverter partitionConverter() {
return DefaultPartitionConverter.INSTANCE;
}

@Override
public Set<ConfigOption<?>> requiredOptions() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we place the following properties in requiredOptions?

  public static final String FLINK_JDBC_URL = "base-url";
  public static final String FLINK_JDBC_USER = "username";
  public static final String FLINK_JDBC_PASSWORD = "password";
  public static final String FLINK_JDBC_DEFAULT_DATABASE = "default-database";

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are already in the flink JdbcCatalogFactory.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we should use the requireOptions in JdbcCatalogFactory for GravitinoJdbcCatalogFactory?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would cause these options to be validated twice, both in JdbcCatalogFactory and GravitinoJdbcCatalogFactory. Additionally, since these options are intended for use by JdbcCatalogFactory only, and not needed within GravitinoJdbcCatalogFactory, I think it's unnecessary to add them to the requireOptions method in GravitinoJdbcCatalogFactory.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you provide the code path when is's validate in JdbcCatalogFactory?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

org.apache.flink.connector.jdbc.catalog.factory.JdbcCatalogFactory#requiredOptions()
image

return Collections.emptySet();
}

@Override
public Set<ConfigOption<?>> optionalOptions() {
return Collections.emptySet();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.flink.connector.jdbc;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.iceberg.flink.FlinkCatalogFactory;

public class GravitinoJdbcCatalogFactoryOptions {

/** Identifier for the {@link GravitinoJdbcCatalog}. */
public static final String MYSQL_IDENTIFIER = "gravitino-jdbc-mysql";

/** Identifier for the {@link GravitinoJdbcCatalog}. */
public static final String POSTGRESQL_IDENTIFIER = "gravitino-jdbc-postgresql";

public static final ConfigOption<String> DEFAULT_DATABASE =
ConfigOptions.key(FlinkCatalogFactory.DEFAULT_DATABASE).stringType().noDefaultValue();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.flink.connector.jdbc;

import org.apache.gravitino.flink.connector.PropertiesConverter;

public class GravitinoMysqlJdbcCatalogFactory extends GravitinoJdbcCatalogFactory {

@Override
public String gravitinoCatalogProvider() {
return "jdbc-mysql";
}

@Override
public PropertiesConverter propertiesConverter() {
return MysqlPropertiesConverter.INSTANCE;
}

@Override
public String factoryIdentifier() {
return GravitinoJdbcCatalogFactoryOptions.MYSQL_IDENTIFIER;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.flink.connector.jdbc;

import org.apache.gravitino.flink.connector.PropertiesConverter;

public class GravitinoPostgresJdbcCatalogFactory extends GravitinoJdbcCatalogFactory {

@Override
public String gravitinoCatalogProvider() {
return "jdbc-postgresql";
}

@Override
public PropertiesConverter propertiesConverter() {
return PostgresqlPropertiesConverter.INSTANCE;
}

@Override
public String factoryIdentifier() {
return GravitinoJdbcCatalogFactoryOptions.POSTGRESQL_IDENTIFIER;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.flink.connector.jdbc;

import java.util.HashMap;
import java.util.Map;

public class JdbcPropertiesConstants {

private JdbcPropertiesConstants() {}

public static final String GRAVITINO_JDBC_USER = "jdbc-user";
public static final String GRAVITINO_JDBC_PASSWORD = "jdbc-password";
public static final String GRAVITINO_JDBC_URL = "jdbc-url";
public static final String GRAVITINO_JDBC_DRIVER = "jdbc-driver";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does flink JDBC connector supports setting JDBC driver?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it is the document about how to create FlinkJdbcCatalog
image

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems missing the logic about tranforming from GRAVITINO_JDBC_DRIVER to Flink_JDBC_DRIVER?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the document seems not the latest, seems Jdbc connector supports setting driver

   public static final ConfigOption<String> DRIVER =
            ConfigOptions.key("driver")
                    .stringType()
                    .noDefaultValue()
                    .withDescription(
                            "The class name of the JDBC driver to use to connect to this URL. "
                                    + "If not set, it will automatically be derived from the URL.");

https://github.com/apache/flink-connector-jdbc/blob/v3.2/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcConnectorOptions.java#L57-L63

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Flink jdbc catalog doesn't support setting driver in catalog options but support in table options, the above configuration is for table not catalog.

public static final String GRAVITINO_JDBC_DEFAULT_DATABASE = "default-database";

public static final String FLINK_JDBC_URL = "base-url";
public static final String FLINK_JDBC_USER = "username";
public static final String FLINK_JDBC_PASSWORD = "password";
public static final String FLINK_JDBC_DEFAULT_DATABASE = "default-database";

public static Map<String, String> flinkToGravitinoMap = new HashMap<>();
public static Map<String, String> gravitinoToFlinkMap = new HashMap<>();

static {
flinkToGravitinoMap.put(FLINK_JDBC_URL, GRAVITINO_JDBC_URL);
flinkToGravitinoMap.put(FLINK_JDBC_USER, GRAVITINO_JDBC_USER);
flinkToGravitinoMap.put(FLINK_JDBC_PASSWORD, GRAVITINO_JDBC_PASSWORD);
flinkToGravitinoMap.put(FLINK_JDBC_DEFAULT_DATABASE, GRAVITINO_JDBC_DEFAULT_DATABASE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to transform FLINK_JDBC_DEFAULT_DATABASE between GRAVITINO_JDBC_DEFAULT_DATABASE, Flink properties converter will do it automaticly.


gravitinoToFlinkMap.put(GRAVITINO_JDBC_URL, FLINK_JDBC_URL);
gravitinoToFlinkMap.put(GRAVITINO_JDBC_USER, FLINK_JDBC_USER);
gravitinoToFlinkMap.put(GRAVITINO_JDBC_PASSWORD, FLINK_JDBC_PASSWORD);
gravitinoToFlinkMap.put(GRAVITINO_JDBC_DEFAULT_DATABASE, FLINK_JDBC_DEFAULT_DATABASE);
}
}
Loading
Loading