Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#6233] feat(flink): flink jdbc catalog #6543

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
2 changes: 2 additions & 0 deletions flink-connector/flink/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ dependencies {
compileOnly("org.apache.flink:flink-table-common:$flinkVersion")
compileOnly("org.apache.flink:flink-table-api-java:$flinkVersion")
compileOnly("org.apache.paimon:paimon-flink-1.18:$paimonVersion")
compileOnly(libs.flinkjdbc)

compileOnly(libs.hive2.exec) {
artifact {
Expand Down Expand Up @@ -97,6 +98,7 @@ dependencies {
testImplementation(libs.testcontainers.junit.jupiter)
testImplementation(libs.testcontainers.mysql)
testImplementation(libs.metrics.core)
testImplementation(libs.flinkjdbc)

testImplementation("org.apache.iceberg:iceberg-flink-runtime-$flinkMajorVersion:$icebergVersion")
testImplementation("org.apache.flink:flink-connector-hive_$scalaVersion:$flinkVersion")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.CommonCatalogOptions;
import org.apache.flink.table.catalog.ObjectPath;

/**
* PropertiesConverter is used to convert properties between Flink properties and Apache Gravitino
Expand Down Expand Up @@ -136,7 +137,8 @@ default Map<String, String> toFlinkDatabaseProperties(Map<String, String> gravit
* @param gravitinoProperties The table properties provided by Gravitino.
* @return The table properties for the Flink connector.
*/
default Map<String, String> toFlinkTableProperties(Map<String, String> gravitinoProperties) {
default Map<String, String> toFlinkTableProperties(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you add tablePath to Java doc and explain why add this parameters?

Map<String, String> gravitinoProperties, ObjectPath tablePath) {
return gravitinoProperties;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.flink.connector;

import com.google.common.base.Preconditions;
import java.util.Collections;
import java.util.List;
import org.apache.gravitino.rel.expressions.transforms.Transform;

/** Suitable for Catalog types that do not support partition keys. */
public class UnsupportPartitionConverter implements PartitionConverter {

private UnsupportPartitionConverter() {}

public static final UnsupportPartitionConverter INSTANCE = new UnsupportPartitionConverter();

@Override
public List<String> toFlinkPartitionKeys(Transform[] partitions) {
Preconditions.checkArgument(
partitions == null || partitions.length == 0, "Partition key conversion is not supported.");
return Collections.emptyList();
}

@Override
public Transform[] toGravitinoPartitions(List<String> partitionsKey) {
Preconditions.checkArgument(
partitionsKey == null || partitionsKey.isEmpty(),
"Partition key conversion is not supported.");
return new Transform[0];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath)
catalog()
.asTableCatalog()
.loadTable(NameIdentifier.of(tablePath.getDatabaseName(), tablePath.getObjectName()));
return toFlinkTable(table);
return toFlinkTable(table, tablePath);
} catch (NoSuchTableException e) {
throw new TableNotExistException(catalogName(), tablePath, e);
} catch (Exception e) {
Expand Down Expand Up @@ -549,7 +549,7 @@ public void alterPartitionColumnStatistics(
throw new UnsupportedOperationException();
}

protected CatalogBaseTable toFlinkTable(Table table) {
protected CatalogBaseTable toFlinkTable(Table table, ObjectPath tablePath) {
org.apache.flink.table.api.Schema.Builder builder =
org.apache.flink.table.api.Schema.newBuilder();
for (Column column : table.columns()) {
Expand All @@ -561,7 +561,7 @@ protected CatalogBaseTable toFlinkTable(Table table) {
Optional<List<String>> flinkPrimaryKey = getFlinkPrimaryKey(table);
flinkPrimaryKey.ifPresent(builder::primaryKey);
Map<String, String> flinkTableProperties =
propertiesConverter.toFlinkTableProperties(table.properties());
propertiesConverter.toFlinkTableProperties(table.properties(), tablePath);
List<String> partitionKeys = partitionConverter.toFlinkPartitionKeys(table.partitioning());
return CatalogTable.of(builder.build(), table.comment(), partitionKeys, flinkTableProperties);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.gravitino.flink.connector.catalog;

import java.util.Map;
import org.apache.flink.table.factories.CatalogFactory;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.flink.connector.PartitionConverter;
Expand All @@ -45,7 +46,7 @@ public interface BaseCatalogFactory extends CatalogFactory {
*
* @return The requested property converter.
*/
PropertiesConverter propertiesConverter();
PropertiesConverter propertiesConverter(Map<String, String> catalogOptions);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add Java doc and explain what is it and why add this parameters.


/**
* Define partition converter.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.gravitino.flink.connector.hive.GravitinoHiveCatalogFactoryOptions.IDENTIFIER;

import com.google.common.collect.ImmutableSet;
import java.util.Map;
import java.util.Set;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.table.catalog.Catalog;
Expand Down Expand Up @@ -63,7 +64,7 @@ public Catalog createCatalog(Context context) {
return new GravitinoHiveCatalog(
context.getName(),
helper.getOptions().get(HiveCatalogFactoryOptions.DEFAULT_DATABASE),
propertiesConverter(),
propertiesConverter(context.getOptions()),
partitionConverter(),
hiveConf,
helper.getOptions().get(HiveCatalogFactoryOptions.HIVE_VERSION));
Expand Down Expand Up @@ -113,7 +114,7 @@ public org.apache.gravitino.Catalog.Type gravitinoCatalogType() {
* @return The requested property converter.
*/
@Override
public PropertiesConverter propertiesConverter() {
public PropertiesConverter propertiesConverter(Map<String, String> catalogOptions) {
return HivePropertiesConverter.INSTANCE;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.collect.ImmutableMap;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.gravitino.catalog.hive.HiveConstants;
import org.apache.gravitino.flink.connector.PropertiesConverter;
import org.apache.hadoop.hive.conf.HiveConf;
Expand All @@ -47,7 +48,8 @@ public String transformPropertyToFlinkCatalog(String configKey) {
}

@Override
public Map<String, String> toFlinkTableProperties(Map<String, String> gravitinoProperties) {
public Map<String, String> toFlinkTableProperties(
Map<String, String> gravitinoProperties, ObjectPath tablePath) {
Map<String, String> properties =
gravitinoProperties.entrySet().stream()
.collect(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.gravitino.flink.connector.iceberg;

import java.util.Collections;
import java.util.Map;
import java.util.Set;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.table.catalog.Catalog;
Expand All @@ -38,7 +39,7 @@ public Catalog createCatalog(Context context) {
return new GravitinoIcebergCatalog(
context.getName(),
helper.getOptions().get(GravitinoIcebergCatalogFactoryOptions.DEFAULT_DATABASE),
propertiesConverter(),
propertiesConverter(context.getOptions()),
partitionConverter(),
context.getOptions());
}
Expand Down Expand Up @@ -84,7 +85,7 @@ public org.apache.gravitino.Catalog.Type gravitinoCatalogType() {
* @return
*/
@Override
public PropertiesConverter propertiesConverter() {
public PropertiesConverter propertiesConverter(Map<String, String> catalogOptions) {
return IcebergPropertiesConverter.INSTANCE;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.flink.connector.jdbc;

import java.util.Optional;
import org.apache.flink.connector.jdbc.catalog.JdbcCatalog;
import org.apache.flink.connector.jdbc.catalog.factory.JdbcCatalogFactory;
import org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.table.factories.Factory;
import org.apache.gravitino.flink.connector.PartitionConverter;
import org.apache.gravitino.flink.connector.PropertiesConverter;
import org.apache.gravitino.flink.connector.catalog.BaseCatalog;

/**
* The GravitinoJdbcCatalog class is an implementation of the BaseCatalog class that is used to
* proxy the JdbcCatalog class.
*/
public class GravitinoJdbcCatalog extends BaseCatalog {

private final JdbcCatalog jdbcCatalog;

protected GravitinoJdbcCatalog(
CatalogFactory.Context context,
String defaultDatabase,
PropertiesConverter propertiesConverter,
PartitionConverter partitionConverter) {
super(context.getName(), defaultDatabase, propertiesConverter, partitionConverter);
JdbcCatalogFactory jdbcCatalogFactory = new JdbcCatalogFactory();
this.jdbcCatalog = (JdbcCatalog) jdbcCatalogFactory.createCatalog(context);
}

@Override
protected AbstractCatalog realCatalog() {
return jdbcCatalog;
}

@Override
public Optional<Factory> getFactory() {
return Optional.of(new JdbcDynamicTableFactory());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.flink.connector.jdbc;

import java.util.Collections;
import java.util.Set;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.util.Preconditions;
import org.apache.gravitino.Catalog;
import org.apache.gravitino.flink.connector.PartitionConverter;
import org.apache.gravitino.flink.connector.UnsupportPartitionConverter;
import org.apache.gravitino.flink.connector.catalog.BaseCatalogFactory;
import org.apache.gravitino.flink.connector.utils.FactoryUtils;

/**
* Factory for creating instances of {@link GravitinoJdbcCatalog}. It will be created by SPI
* discovery in Flink.
*/
public abstract class GravitinoJdbcCatalogFactory implements BaseCatalogFactory {

@Override
public org.apache.flink.table.catalog.Catalog createCatalog(Context context) {
final FactoryUtil.CatalogFactoryHelper helper =
FactoryUtils.createCatalogFactoryHelper(this, context);
String defaultDatabase =
helper.getOptions().get(GravitinoJdbcCatalogFactoryOptions.DEFAULT_DATABASE);
Preconditions.checkNotNull(
defaultDatabase,
GravitinoJdbcCatalogFactoryOptions.DEFAULT_DATABASE.key() + " should not be null.");
return new GravitinoJdbcCatalog(
context, defaultDatabase, propertiesConverter(context.getOptions()), partitionConverter());
}

@Override
public Catalog.Type gravitinoCatalogType() {
return Catalog.Type.RELATIONAL;
}

@Override
public PartitionConverter partitionConverter() {
return UnsupportPartitionConverter.INSTANCE;
}

@Override
public Set<ConfigOption<?>> requiredOptions() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we place the following properties in requiredOptions?

  public static final String FLINK_JDBC_URL = "base-url";
  public static final String FLINK_JDBC_USER = "username";
  public static final String FLINK_JDBC_PASSWORD = "password";
  public static final String FLINK_JDBC_DEFAULT_DATABASE = "default-database";

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are already in the flink JdbcCatalogFactory.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we should use the requireOptions in JdbcCatalogFactory for GravitinoJdbcCatalogFactory?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would cause these options to be validated twice, both in JdbcCatalogFactory and GravitinoJdbcCatalogFactory. Additionally, since these options are intended for use by JdbcCatalogFactory only, and not needed within GravitinoJdbcCatalogFactory, I think it's unnecessary to add them to the requireOptions method in GravitinoJdbcCatalogFactory.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you provide the code path when is's validate in JdbcCatalogFactory?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

org.apache.flink.connector.jdbc.catalog.factory.JdbcCatalogFactory#requiredOptions()
image

return Collections.emptySet();
}

@Override
public Set<ConfigOption<?>> optionalOptions() {
return Collections.emptySet();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.gravitino.flink.connector.jdbc;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;

public class GravitinoJdbcCatalogFactoryOptions {

/** Identifier for the {@link GravitinoJdbcCatalog}. */
public static final String MYSQL_IDENTIFIER = "gravitino-jdbc-mysql";

/** Identifier for the {@link GravitinoJdbcCatalog}. */
public static final String POSTGRESQL_IDENTIFIER = "gravitino-jdbc-postgresql";

public static final ConfigOption<String> DEFAULT_DATABASE =
ConfigOptions.key("default-database").stringType().noDefaultValue();
}
Loading