diff --git a/flink-connector/flink/build.gradle.kts b/flink-connector/flink/build.gradle.kts index 6cbfbfa53b4..34a1cfa5b0b 100644 --- a/flink-connector/flink/build.gradle.kts +++ b/flink-connector/flink/build.gradle.kts @@ -56,6 +56,7 @@ dependencies { compileOnly("org.apache.flink:flink-table-common:$flinkVersion") compileOnly("org.apache.flink:flink-table-api-java:$flinkVersion") compileOnly("org.apache.paimon:paimon-flink-1.18:$paimonVersion") + compileOnly(libs.flinkjdbc) compileOnly(libs.hive2.exec) { artifact { @@ -97,6 +98,7 @@ dependencies { testImplementation(libs.testcontainers.junit.jupiter) testImplementation(libs.testcontainers.mysql) testImplementation(libs.metrics.core) + testImplementation(libs.flinkjdbc) testImplementation("org.apache.iceberg:iceberg-flink-runtime-$flinkMajorVersion:$icebergVersion") testImplementation("org.apache.flink:flink-connector-hive_$scalaVersion:$flinkVersion") diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/PropertiesConverter.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/PropertiesConverter.java index 15d1a12fa3b..5e135f4f448 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/PropertiesConverter.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/PropertiesConverter.java @@ -23,6 +23,7 @@ import java.util.Map; import org.apache.flink.configuration.Configuration; import org.apache.flink.table.catalog.CommonCatalogOptions; +import org.apache.flink.table.catalog.ObjectPath; /** * PropertiesConverter is used to convert properties between Flink properties and Apache Gravitino @@ -136,7 +137,8 @@ default Map toFlinkDatabaseProperties(Map gravit * @param gravitinoProperties The table properties provided by Gravitino. * @return The table properties for the Flink connector. */ - default Map toFlinkTableProperties(Map gravitinoProperties) { + default Map toFlinkTableProperties( + Map gravitinoProperties, ObjectPath tablePath) { return gravitinoProperties; } diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/UnsupportPartitionConverter.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/UnsupportPartitionConverter.java new file mode 100644 index 00000000000..753404a2f3a --- /dev/null +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/UnsupportPartitionConverter.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector; + +import com.google.common.base.Preconditions; +import java.util.Collections; +import java.util.List; +import org.apache.gravitino.rel.expressions.transforms.Transform; + +/** Suitable for Catalog types that do not support partition keys. */ +public class UnsupportPartitionConverter implements PartitionConverter { + + private UnsupportPartitionConverter() {} + + public static final UnsupportPartitionConverter INSTANCE = new UnsupportPartitionConverter(); + + @Override + public List toFlinkPartitionKeys(Transform[] partitions) { + Preconditions.checkArgument( + partitions == null || partitions.length == 0, "Partition key conversion is not supported."); + return Collections.emptyList(); + } + + @Override + public Transform[] toGravitinoPartitions(List partitionsKey) { + Preconditions.checkArgument( + partitionsKey == null || partitionsKey.isEmpty(), + "Partition key conversion is not supported."); + return new Transform[0]; + } +} diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java index 9a741eda626..01c05d67fe6 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java @@ -211,7 +211,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath) catalog() .asTableCatalog() .loadTable(NameIdentifier.of(tablePath.getDatabaseName(), tablePath.getObjectName())); - return toFlinkTable(table); + return toFlinkTable(table, tablePath); } catch (NoSuchTableException e) { throw new TableNotExistException(catalogName(), tablePath, e); } catch (Exception e) { @@ -549,7 +549,7 @@ public void alterPartitionColumnStatistics( throw new UnsupportedOperationException(); } - protected CatalogBaseTable toFlinkTable(Table table) { + protected CatalogBaseTable toFlinkTable(Table table, ObjectPath tablePath) { org.apache.flink.table.api.Schema.Builder builder = org.apache.flink.table.api.Schema.newBuilder(); for (Column column : table.columns()) { @@ -561,7 +561,7 @@ protected CatalogBaseTable toFlinkTable(Table table) { Optional> flinkPrimaryKey = getFlinkPrimaryKey(table); flinkPrimaryKey.ifPresent(builder::primaryKey); Map flinkTableProperties = - propertiesConverter.toFlinkTableProperties(table.properties()); + propertiesConverter.toFlinkTableProperties(table.properties(), tablePath); List partitionKeys = partitionConverter.toFlinkPartitionKeys(table.partitioning()); return CatalogTable.of(builder.build(), table.comment(), partitionKeys, flinkTableProperties); } diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalogFactory.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalogFactory.java index 5086b532571..db5150811b8 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalogFactory.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalogFactory.java @@ -19,6 +19,7 @@ package org.apache.gravitino.flink.connector.catalog; +import java.util.Map; import org.apache.flink.table.factories.CatalogFactory; import org.apache.gravitino.Catalog; import org.apache.gravitino.flink.connector.PartitionConverter; @@ -45,7 +46,7 @@ public interface BaseCatalogFactory extends CatalogFactory { * * @return The requested property converter. */ - PropertiesConverter propertiesConverter(); + PropertiesConverter propertiesConverter(Map catalogOptions); /** * Define partition converter. diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalogFactory.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalogFactory.java index 23607ebb402..3accb1789d2 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalogFactory.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalogFactory.java @@ -22,6 +22,7 @@ import static org.apache.gravitino.flink.connector.hive.GravitinoHiveCatalogFactoryOptions.IDENTIFIER; import com.google.common.collect.ImmutableSet; +import java.util.Map; import java.util.Set; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.table.catalog.Catalog; @@ -63,7 +64,7 @@ public Catalog createCatalog(Context context) { return new GravitinoHiveCatalog( context.getName(), helper.getOptions().get(HiveCatalogFactoryOptions.DEFAULT_DATABASE), - propertiesConverter(), + propertiesConverter(context.getOptions()), partitionConverter(), hiveConf, helper.getOptions().get(HiveCatalogFactoryOptions.HIVE_VERSION)); @@ -113,7 +114,7 @@ public org.apache.gravitino.Catalog.Type gravitinoCatalogType() { * @return The requested property converter. */ @Override - public PropertiesConverter propertiesConverter() { + public PropertiesConverter propertiesConverter(Map catalogOptions) { return HivePropertiesConverter.INSTANCE; } diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/HivePropertiesConverter.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/HivePropertiesConverter.java index 20a3e8cf62d..70f89025420 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/HivePropertiesConverter.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/HivePropertiesConverter.java @@ -22,6 +22,7 @@ import com.google.common.collect.ImmutableMap; import java.util.Map; import java.util.stream.Collectors; +import org.apache.flink.table.catalog.ObjectPath; import org.apache.gravitino.catalog.hive.HiveConstants; import org.apache.gravitino.flink.connector.PropertiesConverter; import org.apache.hadoop.hive.conf.HiveConf; @@ -47,7 +48,8 @@ public String transformPropertyToFlinkCatalog(String configKey) { } @Override - public Map toFlinkTableProperties(Map gravitinoProperties) { + public Map toFlinkTableProperties( + Map gravitinoProperties, ObjectPath tablePath) { Map properties = gravitinoProperties.entrySet().stream() .collect( diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFactory.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFactory.java index ad0363d9867..8f024e4c33e 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFactory.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFactory.java @@ -19,6 +19,7 @@ package org.apache.gravitino.flink.connector.iceberg; import java.util.Collections; +import java.util.Map; import java.util.Set; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.table.catalog.Catalog; @@ -38,7 +39,7 @@ public Catalog createCatalog(Context context) { return new GravitinoIcebergCatalog( context.getName(), helper.getOptions().get(GravitinoIcebergCatalogFactoryOptions.DEFAULT_DATABASE), - propertiesConverter(), + propertiesConverter(context.getOptions()), partitionConverter(), context.getOptions()); } @@ -84,7 +85,7 @@ public org.apache.gravitino.Catalog.Type gravitinoCatalogType() { * @return */ @Override - public PropertiesConverter propertiesConverter() { + public PropertiesConverter propertiesConverter(Map catalogOptions) { return IcebergPropertiesConverter.INSTANCE; } diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoJdbcCatalog.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoJdbcCatalog.java new file mode 100644 index 00000000000..39e3c4235d0 --- /dev/null +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoJdbcCatalog.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.jdbc; + +import java.util.Optional; +import org.apache.flink.connector.jdbc.catalog.JdbcCatalog; +import org.apache.flink.connector.jdbc.catalog.factory.JdbcCatalogFactory; +import org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory; +import org.apache.flink.table.catalog.AbstractCatalog; +import org.apache.flink.table.factories.CatalogFactory; +import org.apache.flink.table.factories.Factory; +import org.apache.gravitino.flink.connector.PartitionConverter; +import org.apache.gravitino.flink.connector.PropertiesConverter; +import org.apache.gravitino.flink.connector.catalog.BaseCatalog; + +/** + * The GravitinoJdbcCatalog class is an implementation of the BaseCatalog class that is used to + * proxy the JdbcCatalog class. + */ +public class GravitinoJdbcCatalog extends BaseCatalog { + + private final JdbcCatalog jdbcCatalog; + + protected GravitinoJdbcCatalog( + CatalogFactory.Context context, + String defaultDatabase, + PropertiesConverter propertiesConverter, + PartitionConverter partitionConverter) { + super(context.getName(), defaultDatabase, propertiesConverter, partitionConverter); + JdbcCatalogFactory jdbcCatalogFactory = new JdbcCatalogFactory(); + this.jdbcCatalog = (JdbcCatalog) jdbcCatalogFactory.createCatalog(context); + } + + @Override + protected AbstractCatalog realCatalog() { + return jdbcCatalog; + } + + @Override + public Optional getFactory() { + return Optional.of(new JdbcDynamicTableFactory()); + } +} diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoJdbcCatalogFactory.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoJdbcCatalogFactory.java new file mode 100644 index 00000000000..4239c900cf8 --- /dev/null +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoJdbcCatalogFactory.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.jdbc; + +import java.util.Collections; +import java.util.Set; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.table.factories.FactoryUtil; +import org.apache.flink.util.Preconditions; +import org.apache.gravitino.Catalog; +import org.apache.gravitino.flink.connector.PartitionConverter; +import org.apache.gravitino.flink.connector.UnsupportPartitionConverter; +import org.apache.gravitino.flink.connector.catalog.BaseCatalogFactory; +import org.apache.gravitino.flink.connector.utils.FactoryUtils; + +/** + * Factory for creating instances of {@link GravitinoJdbcCatalog}. It will be created by SPI + * discovery in Flink. + */ +public abstract class GravitinoJdbcCatalogFactory implements BaseCatalogFactory { + + @Override + public org.apache.flink.table.catalog.Catalog createCatalog(Context context) { + final FactoryUtil.CatalogFactoryHelper helper = + FactoryUtils.createCatalogFactoryHelper(this, context); + String defaultDatabase = + helper.getOptions().get(GravitinoJdbcCatalogFactoryOptions.DEFAULT_DATABASE); + Preconditions.checkNotNull( + defaultDatabase, + GravitinoJdbcCatalogFactoryOptions.DEFAULT_DATABASE.key() + " should not be null."); + return new GravitinoJdbcCatalog( + context, defaultDatabase, propertiesConverter(context.getOptions()), partitionConverter()); + } + + @Override + public Catalog.Type gravitinoCatalogType() { + return Catalog.Type.RELATIONAL; + } + + @Override + public PartitionConverter partitionConverter() { + return UnsupportPartitionConverter.INSTANCE; + } + + @Override + public Set> requiredOptions() { + return Collections.emptySet(); + } + + @Override + public Set> optionalOptions() { + return Collections.emptySet(); + } +} diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoJdbcCatalogFactoryOptions.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoJdbcCatalogFactoryOptions.java new file mode 100644 index 00000000000..b85775bf9db --- /dev/null +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoJdbcCatalogFactoryOptions.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.jdbc; + +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; + +public class GravitinoJdbcCatalogFactoryOptions { + + /** Identifier for the {@link GravitinoJdbcCatalog}. */ + public static final String MYSQL_IDENTIFIER = "gravitino-jdbc-mysql"; + + /** Identifier for the {@link GravitinoJdbcCatalog}. */ + public static final String POSTGRESQL_IDENTIFIER = "gravitino-jdbc-postgresql"; + + public static final ConfigOption DEFAULT_DATABASE = + ConfigOptions.key("default-database").stringType().noDefaultValue(); +} diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoMysqlJdbcCatalogFactory.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoMysqlJdbcCatalogFactory.java new file mode 100644 index 00000000000..dfaba03ade5 --- /dev/null +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoMysqlJdbcCatalogFactory.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.jdbc; + +import java.util.Map; +import org.apache.gravitino.flink.connector.PropertiesConverter; + +public class GravitinoMysqlJdbcCatalogFactory extends GravitinoJdbcCatalogFactory { + + @Override + public String gravitinoCatalogProvider() { + return "jdbc-mysql"; + } + + @Override + public PropertiesConverter propertiesConverter(Map catalogOptions) { + return new MysqlPropertiesConverter(catalogOptions); + } + + @Override + public String factoryIdentifier() { + return GravitinoJdbcCatalogFactoryOptions.MYSQL_IDENTIFIER; + } +} diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoPostgresJdbcCatalogFactory.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoPostgresJdbcCatalogFactory.java new file mode 100644 index 00000000000..f85abba0d86 --- /dev/null +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoPostgresJdbcCatalogFactory.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.jdbc; + +import java.util.Map; +import org.apache.gravitino.flink.connector.PropertiesConverter; + +public class GravitinoPostgresJdbcCatalogFactory extends GravitinoJdbcCatalogFactory { + + @Override + public String gravitinoCatalogProvider() { + return "jdbc-postgresql"; + } + + @Override + public PropertiesConverter propertiesConverter(Map catalogOptions) { + return new PostgresqlPropertiesConverter(catalogOptions); + } + + @Override + public String factoryIdentifier() { + return GravitinoJdbcCatalogFactoryOptions.POSTGRESQL_IDENTIFIER; + } +} diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/JdbcPropertiesConstants.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/JdbcPropertiesConstants.java new file mode 100644 index 00000000000..2b685fccd5f --- /dev/null +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/JdbcPropertiesConstants.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.jdbc; + +import java.util.HashMap; +import java.util.Map; + +public class JdbcPropertiesConstants { + + private JdbcPropertiesConstants() {} + + public static final String GRAVITINO_JDBC_USER = "jdbc-user"; + public static final String GRAVITINO_JDBC_PASSWORD = "jdbc-password"; + public static final String GRAVITINO_JDBC_URL = "jdbc-url"; + public static final String GRAVITINO_JDBC_DRIVER = "jdbc-driver"; + public static final String GRAVITINO_JDBC_DEFAULT_DATABASE = "flink.bypass.default-database"; + + public static final String FLINK_JDBC_URL = "base-url"; + public static final String FLINK_JDBC_USER = "username"; + public static final String FLINK_JDBC_PASSWORD = "password"; + public static final String FLINK_JDBC_DEFAULT_DATABASE = "default-database"; + + public static Map flinkToGravitinoMap = new HashMap<>(); + public static Map gravitinoToFlinkMap = new HashMap<>(); + + static { + flinkToGravitinoMap.put(FLINK_JDBC_URL, GRAVITINO_JDBC_URL); + flinkToGravitinoMap.put(FLINK_JDBC_USER, GRAVITINO_JDBC_USER); + flinkToGravitinoMap.put(FLINK_JDBC_PASSWORD, GRAVITINO_JDBC_PASSWORD); + flinkToGravitinoMap.put(FLINK_JDBC_DEFAULT_DATABASE, GRAVITINO_JDBC_DEFAULT_DATABASE); + + gravitinoToFlinkMap.put(GRAVITINO_JDBC_URL, FLINK_JDBC_URL); + gravitinoToFlinkMap.put(GRAVITINO_JDBC_USER, FLINK_JDBC_USER); + gravitinoToFlinkMap.put(GRAVITINO_JDBC_PASSWORD, FLINK_JDBC_PASSWORD); + gravitinoToFlinkMap.put(GRAVITINO_JDBC_DEFAULT_DATABASE, FLINK_JDBC_DEFAULT_DATABASE); + } +} diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/JdbcPropertiesConverter.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/JdbcPropertiesConverter.java new file mode 100644 index 00000000000..060a84083f7 --- /dev/null +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/JdbcPropertiesConverter.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.jdbc; + +import java.util.HashMap; +import java.util.Map; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.table.catalog.ObjectPath; +import org.apache.gravitino.flink.connector.PropertiesConverter; + +public abstract class JdbcPropertiesConverter implements PropertiesConverter { + + private final Map catalogOptions; + + protected JdbcPropertiesConverter(Map catalogOptions) { + this.catalogOptions = catalogOptions; + } + + @Override + public Map toGravitinoCatalogProperties(Configuration flinkConf) { + Map gravitinoCatalogProperties = + PropertiesConverter.super.toGravitinoCatalogProperties(flinkConf); + gravitinoCatalogProperties.put(JdbcPropertiesConstants.GRAVITINO_JDBC_DRIVER, driverName()); + return gravitinoCatalogProperties; + } + + @Override + public String transformPropertyToGravitinoCatalog(String configKey) { + return JdbcPropertiesConstants.flinkToGravitinoMap.get(configKey); + } + + @Override + public String transformPropertyToFlinkCatalog(String configKey) { + return JdbcPropertiesConstants.gravitinoToFlinkMap.get(configKey); + } + + @Override + public Map toFlinkTableProperties( + Map gravitinoProperties, ObjectPath tablePath) { + Map tableOptions = new HashMap<>(); + tableOptions.put( + "url", + catalogOptions.get(JdbcPropertiesConstants.FLINK_JDBC_URL) + + "/" + + tablePath.getDatabaseName()); + tableOptions.put("table-name", tablePath.getObjectName()); + tableOptions.put("username", catalogOptions.get(JdbcPropertiesConstants.FLINK_JDBC_USER)); + tableOptions.put("password", catalogOptions.get(JdbcPropertiesConstants.FLINK_JDBC_PASSWORD)); + return tableOptions; + } + + protected abstract String driverName(); + + public Map getCatalogOptions() { + return catalogOptions; + } +} diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/MysqlPropertiesConverter.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/MysqlPropertiesConverter.java new file mode 100644 index 00000000000..dee5d98f63c --- /dev/null +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/MysqlPropertiesConverter.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.jdbc; + +import java.util.Map; + +public class MysqlPropertiesConverter extends JdbcPropertiesConverter { + + protected MysqlPropertiesConverter(Map catalogOptions) { + super(catalogOptions); + } + + @Override + public String driverName() { + return "com.mysql.jdbc.Driver"; + } + + @Override + public String getFlinkCatalogType() { + return GravitinoJdbcCatalogFactoryOptions.MYSQL_IDENTIFIER; + } +} diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/PostgresqlPropertiesConverter.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/PostgresqlPropertiesConverter.java new file mode 100644 index 00000000000..00d6d8d81da --- /dev/null +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/PostgresqlPropertiesConverter.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.jdbc; + +import java.util.Map; + +public class PostgresqlPropertiesConverter extends JdbcPropertiesConverter { + + protected PostgresqlPropertiesConverter(Map catalogOptions) { + super(catalogOptions); + } + + @Override + protected String driverName() { + return "org.postgresql.Driver"; + } + + @Override + public String getFlinkCatalogType() { + return GravitinoJdbcCatalogFactoryOptions.POSTGRESQL_IDENTIFIER; + } +} diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactory.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactory.java index 8732ade23ed..659670312c1 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactory.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactory.java @@ -20,6 +20,7 @@ package org.apache.gravitino.flink.connector.paimon; import java.util.Collections; +import java.util.Map; import java.util.Set; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.table.catalog.Catalog; @@ -43,7 +44,7 @@ public Catalog createCatalog(Context context) { String defaultDatabase = helper.getOptions().get(GravitinoPaimonCatalogFactoryOptions.DEFAULT_DATABASE); return new GravitinoPaimonCatalog( - context, defaultDatabase, propertiesConverter(), partitionConverter()); + context, defaultDatabase, propertiesConverter(context.getOptions()), partitionConverter()); } @Override @@ -72,7 +73,7 @@ public org.apache.gravitino.Catalog.Type gravitinoCatalogType() { } @Override - public PropertiesConverter propertiesConverter() { + public PropertiesConverter propertiesConverter(Map catalogOptions) { return PaimonPropertiesConverter.INSTANCE; } diff --git a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStore.java b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStore.java index 4c29b7fde3b..986b38be4fd 100644 --- a/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStore.java +++ b/flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/store/GravitinoCatalogStore.java @@ -57,7 +57,7 @@ public void storeCatalog(String catalogName, CatalogDescriptor descriptor) Map gravitino = configuration.toMap(); BaseCatalogFactory catalogFactory = getCatalogFactory(gravitino); Map gravitinoProperties = - catalogFactory.propertiesConverter().toGravitinoCatalogProperties(configuration); + catalogFactory.propertiesConverter(gravitino).toGravitinoCatalogProperties(configuration); gravitinoCatalogManager.createCatalog( catalogName, catalogFactory.gravitinoCatalogType(), @@ -80,7 +80,8 @@ public Optional getCatalog(String catalogName) throws Catalog try { Catalog catalog = gravitinoCatalogManager.getGravitinoCatalogInfo(catalogName); BaseCatalogFactory catalogFactory = getCatalogFactory(catalog.provider()); - PropertiesConverter propertiesConverter = catalogFactory.propertiesConverter(); + PropertiesConverter propertiesConverter = + catalogFactory.propertiesConverter(catalog.properties()); Map flinkCatalogProperties = propertiesConverter.toFlinkCatalogProperties(catalog.properties()); CatalogDescriptor descriptor = diff --git a/flink-connector/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory b/flink-connector/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory index 45ff2512e73..4f373e167d1 100644 --- a/flink-connector/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory +++ b/flink-connector/flink/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory @@ -21,3 +21,5 @@ org.apache.gravitino.flink.connector.store.GravitinoCatalogStoreFactory org.apache.gravitino.flink.connector.hive.GravitinoHiveCatalogFactory org.apache.gravitino.flink.connector.paimon.GravitinoPaimonCatalogFactory org.apache.gravitino.flink.connector.iceberg.GravitinoIcebergCatalogFactory +org.apache.gravitino.flink.connector.jdbc.GravitinoMysqlJdbcCatalogFactory +org.apache.gravitino.flink.connector.jdbc.GravitinoPostgresJdbcCatalogFactory diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java index 8ff6f8db7a2..2a687b5c7fd 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java @@ -19,7 +19,6 @@ package org.apache.gravitino.flink.connector.integration.test; -import static org.apache.gravitino.flink.connector.integration.test.utils.TestUtils.assertColumns; import static org.apache.gravitino.flink.connector.integration.test.utils.TestUtils.toFlinkPhysicalColumn; import static org.apache.gravitino.rel.expressions.transforms.Transforms.EMPTY_TRANSFORM; import static org.junit.jupiter.api.Assertions.fail; @@ -51,6 +50,7 @@ import org.apache.gravitino.flink.connector.integration.test.utils.TestUtils; import org.apache.gravitino.rel.Column; import org.apache.gravitino.rel.Table; +import org.apache.gravitino.rel.expressions.literals.Literals; import org.apache.gravitino.rel.indexes.Index; import org.apache.gravitino.rel.types.Types; import org.junit.jupiter.api.Assertions; @@ -85,6 +85,22 @@ protected boolean supportsPrimaryKey() { return true; } + protected boolean supportTablePropertiesOperation() { + return true; + } + + protected boolean supportModifyColumnsMultipleByOneSql() { + return true; + } + + protected boolean defaultValueWithNullLiterals() { + return false; + } + + protected String defaultDatabaseName() { + return "default"; + } + @Test public void testCreateSchema() { doWithCatalog( @@ -177,7 +193,7 @@ public void testListSchema() { TestUtils.assertTableResult( sql("SHOW DATABASES"), ResultKind.SUCCESS_WITH_CONTENT, - Row.of("default"), + Row.of(defaultDatabaseName()), Row.of(schema), Row.of(schema2), Row.of(schema3)); @@ -185,7 +201,7 @@ public void testListSchema() { String[] schemas = catalog.asSchemas().listSchemas(); Arrays.sort(schemas); Assertions.assertEquals(4, schemas.length); - Assertions.assertEquals("default", schemas[0]); + Assertions.assertEquals(defaultDatabaseName(), schemas[0]); Assertions.assertEquals(schema, schemas[1]); Assertions.assertEquals(schema2, schemas[2]); Assertions.assertEquals(schema3, schemas[3]); @@ -262,7 +278,9 @@ public void testCreateSimpleTable() { catalog.asTableCatalog().loadTable(NameIdentifier.of(databaseName, tableName)); Assertions.assertNotNull(table); Assertions.assertEquals(comment, table.comment()); - Assertions.assertEquals(value, table.properties().get(key)); + if (supportTablePropertiesOperation()) { + Assertions.assertEquals(value, table.properties().get(key)); + } Column[] columns = new Column[] { Column.of("string_type", Types.StringType.get(), "string_type", true, false, null), @@ -649,8 +667,8 @@ public void testAlterTableDropColumn() { } @Test - @EnabledIf("supportColumnOperation") - public void testAlterColumnTypeAndChangeOrder() { + @EnabledIf("supportModifyColumnsMultipleByOneSql") + public void testAlterColumnTypeAndChangeOrderInOneSql() { String databaseName = "test_alter_table_alter_column_db"; String tableName = "test_alter_table_rename_column"; doWithSchema( @@ -691,6 +709,50 @@ public void testAlterColumnTypeAndChangeOrder() { supportDropCascade()); } + @Test + @EnabledIf("supportColumnOperation") + public void testAlterColumnTypeAndChangeOrder() { + String databaseName = "test_alter_table_alter_column_db"; + String tableName = "test_alter_table_rename_column"; + doWithSchema( + currentCatalog(), + databaseName, + catalog -> { + TableResult result = + sql( + "CREATE TABLE %s " + + "(user_id BIGINT COMMENT 'USER_ID', " + + " order_amount INT COMMENT 'ORDER_AMOUNT')" + + " COMMENT 'test comment'" + + " WITH (" + + "'%s' = '%s')", + tableName, "test key", "test value"); + TestUtils.assertTableResult(result, ResultKind.SUCCESS); + result = sql("ALTER TABLE %s MODIFY order_amount INT COMMENT 'new comment2'", tableName); + TestUtils.assertTableResult(result, ResultKind.SUCCESS); + result = + sql("ALTER TABLE %s MODIFY order_amount BIGINT COMMENT 'new comment2'", tableName); + TestUtils.assertTableResult(result, ResultKind.SUCCESS); + result = sql("ALTER TABLE %s MODIFY user_id BIGINT COMMENT 'new comment'", tableName); + TestUtils.assertTableResult(result, ResultKind.SUCCESS); + result = sql("ALTER TABLE %s MODIFY user_id BIGINT AFTER order_amount", tableName); + TestUtils.assertTableResult(result, ResultKind.SUCCESS); + Column[] actual = + catalog + .asTableCatalog() + .loadTable(NameIdentifier.of(databaseName, tableName)) + .columns(); + Column[] expected = + new Column[] { + Column.of("order_amount", Types.LongType.get(), "new comment2"), + Column.of("user_id", Types.LongType.get(), "new comment") + }; + assertColumns(expected, actual); + }, + true, + supportDropCascade()); + } + @Test @EnabledIf("supportTableOperation") public void testRenameTable() { @@ -721,7 +783,7 @@ public void testRenameTable() { } @Test - @EnabledIf("supportTableOperation") + @EnabledIf("supportTablePropertiesOperation") public void testAlterTableProperties() { String databaseName = "test_alter_table_properties_db"; String tableName = "test_alter_table_properties"; @@ -763,4 +825,23 @@ public void testAlterTableProperties() { true, supportDropCascade()); } + + public void assertColumns(Column[] expected, Column[] actual) { + Assertions.assertEquals(expected.length, actual.length); + for (int i = 0; i < expected.length; i++) { + Assertions.assertEquals(expected[i].name(), actual[i].name()); + Assertions.assertEquals(expected[i].comment(), actual[i].comment()); + Assertions.assertEquals( + expected[i].dataType().simpleString(), actual[i].dataType().simpleString()); + if (expected[i].defaultValue().equals(Column.DEFAULT_VALUE_NOT_SET) + && expected[i].nullable() + && defaultValueWithNullLiterals()) { + Assertions.assertEquals(Literals.NULL, actual[i].defaultValue()); + } else { + Assertions.assertEquals(expected[i].defaultValue(), actual[i].defaultValue()); + } + Assertions.assertEquals(expected[i].autoIncrement(), actual[i].autoIncrement()); + Assertions.assertEquals(expected[i].nullable(), actual[i].nullable()); + } + } } diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java index 959123f3362..b813a80affb 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java @@ -66,23 +66,29 @@ public abstract class FlinkEnvIT extends BaseIT { private static String gravitinoUri = "http://127.0.0.1:8090"; @BeforeAll - void startUp() { + void startUp() throws Exception { // Start Gravitino server initGravitinoEnv(); initMetalake(); initHiveEnv(); initHdfsEnv(); initFlinkEnv(); + initCatalogEnv(); LOG.info("Startup Flink env successfully, Gravitino uri: {}.", gravitinoUri); } @AfterAll - static void stop() { + void stop() throws Exception { + stopCatalogEnv(); stopFlinkEnv(); stopHdfsEnv(); LOG.info("Stop Flink env successfully."); } + protected void initCatalogEnv() throws Exception {}; + + protected void stopCatalogEnv() throws Exception {} + protected String flinkByPass(String key) { return PropertiesConverter.FLINK_PROPERTY_PREFIX + key; } diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java index 3add18211f1..1b9d321370e 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/hive/FlinkHiveCatalogIT.java @@ -18,7 +18,6 @@ */ package org.apache.gravitino.flink.connector.integration.test.hive; -import static org.apache.gravitino.flink.connector.integration.test.utils.TestUtils.assertColumns; import static org.apache.gravitino.flink.connector.integration.test.utils.TestUtils.toFlinkPhysicalColumn; import static org.apache.gravitino.rel.expressions.transforms.Transforms.EMPTY_TRANSFORM; diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java index f8a3cdf2e17..78537fa080c 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java @@ -19,7 +19,6 @@ package org.apache.gravitino.flink.connector.integration.test.iceberg; -import static org.apache.gravitino.flink.connector.integration.test.utils.TestUtils.assertColumns; import static org.apache.gravitino.flink.connector.integration.test.utils.TestUtils.toFlinkPhysicalColumn; import static org.apache.gravitino.rel.expressions.transforms.Transforms.EMPTY_TRANSFORM; diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/jdbc/FlinkJdbcMysqlCatalogIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/jdbc/FlinkJdbcMysqlCatalogIT.java new file mode 100644 index 00000000000..d75d585b50d --- /dev/null +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/jdbc/FlinkJdbcMysqlCatalogIT.java @@ -0,0 +1,170 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.integration.test.jdbc; + +import static org.apache.gravitino.integration.test.util.TestDatabaseName.MYSQL_CATALOG_MYSQL_IT; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.apache.gravitino.Catalog; +import org.apache.gravitino.flink.connector.integration.test.FlinkCommonIT; +import org.apache.gravitino.flink.connector.jdbc.JdbcPropertiesConstants; +import org.apache.gravitino.integration.test.container.ContainerSuite; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +@Tag("gravitino-docker-test") +public class FlinkJdbcMysqlCatalogIT extends FlinkCommonIT { + + protected String mysqlUrl; + protected String mysqlUsername; + protected String mysqlPassword; + protected String mysqlDriver; + protected String mysqlDefaultDatabase = MYSQL_CATALOG_MYSQL_IT.name(); + + protected Catalog catalog; + + protected static final String CATALOG_NAME = "test_flink_jdbc_catalog"; + + @Override + protected boolean supportTablePropertiesOperation() { + return false; + } + + @Override + protected String defaultDatabaseName() { + return MYSQL_CATALOG_MYSQL_IT.name(); + } + + @Override + protected boolean supportSchemaOperationWithCommentAndOptions() { + return false; + } + + @Override + protected boolean supportModifyColumnsMultipleByOneSql() { + return false; + } + + @Override + protected Catalog currentCatalog() { + return catalog; + } + + @Override + protected String getProvider() { + return "jdbc-mysql"; + } + + @BeforeAll + void jdbcStartup() { + init(); + } + + @AfterAll + void jdbcStop() { + Preconditions.checkNotNull(metalake); + metalake.dropCatalog(CATALOG_NAME, true); + } + + @Override + protected boolean supportDropCascade() { + return true; + } + + @Override + protected boolean defaultValueWithNullLiterals() { + return true; + } + + private void init() { + Preconditions.checkNotNull(metalake); + catalog = + metalake.createCatalog( + CATALOG_NAME, + org.apache.gravitino.Catalog.Type.RELATIONAL, + getProvider(), + null, + ImmutableMap.of( + JdbcPropertiesConstants.GRAVITINO_JDBC_USER, + mysqlUsername, + JdbcPropertiesConstants.GRAVITINO_JDBC_PASSWORD, + mysqlPassword, + JdbcPropertiesConstants.GRAVITINO_JDBC_URL, + mysqlUrl, + JdbcPropertiesConstants.GRAVITINO_JDBC_DRIVER, + mysqlDriver, + JdbcPropertiesConstants.GRAVITINO_JDBC_DEFAULT_DATABASE, + mysqlDefaultDatabase)); + } + + @Override + protected void initCatalogEnv() throws Exception { + ContainerSuite containerSuite = ContainerSuite.getInstance(); + containerSuite.startMySQLContainer(MYSQL_CATALOG_MYSQL_IT); + mysqlUrl = containerSuite.getMySQLContainer().getJdbcUrl(); + mysqlUsername = containerSuite.getMySQLContainer().getUsername(); + mysqlPassword = containerSuite.getMySQLContainer().getPassword(); + mysqlDriver = containerSuite.getMySQLContainer().getDriverClassName(MYSQL_CATALOG_MYSQL_IT); + } + + @Override + protected void stopCatalogEnv() throws Exception { + if (null != containerSuite) { + containerSuite.close(); + } + } + + @Test + public void testCreateGravitinoJdbcCatalogUsingSQL() { + tableEnv.useCatalog(DEFAULT_CATALOG); + int numCatalogs = tableEnv.listCatalogs().length; + String catalogName = "gravitino_mysql_jdbc_catalog"; + tableEnv.executeSql( + String.format( + "create catalog %s with (" + + "'type'='gravitino-jdbc-mysql', " + + "'base-url'='%s'," + + "'username'='%s'," + + "'password'='%s'," + + "'default-database'='%s'" + + ")", + catalogName, mysqlUrl, mysqlUsername, mysqlPassword, mysqlDefaultDatabase)); + String[] catalogs = tableEnv.listCatalogs(); + Assertions.assertEquals(numCatalogs + 1, catalogs.length, "Should create a new catalog"); + Assertions.assertTrue(metalake.catalogExists(catalogName)); + org.apache.gravitino.Catalog gravitinoCatalog = metalake.loadCatalog(catalogName); + Map properties = gravitinoCatalog.properties(); + Assertions.assertEquals(mysqlUrl, properties.get(JdbcPropertiesConstants.GRAVITINO_JDBC_URL)); + Assertions.assertEquals( + mysqlUsername, properties.get(JdbcPropertiesConstants.GRAVITINO_JDBC_USER)); + Assertions.assertEquals( + mysqlPassword, properties.get(JdbcPropertiesConstants.GRAVITINO_JDBC_USER)); + Assertions.assertEquals( + mysqlDefaultDatabase, + properties.get(JdbcPropertiesConstants.GRAVITINO_JDBC_DEFAULT_DATABASE)); + Assertions.assertEquals( + "com.mysql.jdbc.Driver", properties.get(JdbcPropertiesConstants.GRAVITINO_JDBC_DRIVER)); + } +} diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java index 66458ba8e74..948e02dd818 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/paimon/FlinkPaimonCatalogIT.java @@ -67,7 +67,7 @@ void setup() { } @AfterAll - static void stop() { + void stop() { Preconditions.checkNotNull(metalake); metalake.dropCatalog(DEFAULT_PAIMON_CATALOG, true); } diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/utils/TestUtils.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/utils/TestUtils.java index 02710bcfb35..66e59ce41d1 100644 --- a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/utils/TestUtils.java +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/utils/TestUtils.java @@ -25,7 +25,6 @@ import org.apache.flink.table.api.TableResult; import org.apache.flink.table.types.DataType; import org.apache.flink.types.Row; -import org.apache.gravitino.rel.Column; import org.junit.jupiter.api.Assertions; public class TestUtils { @@ -48,19 +47,6 @@ public static void assertTableResult( } } - public static void assertColumns(Column[] expected, Column[] actual) { - Assertions.assertEquals(expected.length, actual.length); - for (int i = 0; i < expected.length; i++) { - Assertions.assertEquals(expected[i].name(), actual[i].name()); - Assertions.assertEquals(expected[i].comment(), actual[i].comment()); - Assertions.assertEquals( - expected[i].dataType().simpleString(), actual[i].dataType().simpleString()); - Assertions.assertEquals(expected[i].defaultValue(), actual[i].defaultValue()); - Assertions.assertEquals(expected[i].autoIncrement(), actual[i].autoIncrement()); - Assertions.assertEquals(expected[i].nullable(), actual[i].nullable()); - } - } - public static org.apache.flink.table.catalog.Column[] toFlinkPhysicalColumn( List unresolvedPhysicalColumns) { return unresolvedPhysicalColumns.stream() diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/jdbc/AbstractJdbcPropertiesConverter.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/jdbc/AbstractJdbcPropertiesConverter.java new file mode 100644 index 00000000000..75f9db7ae46 --- /dev/null +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/jdbc/AbstractJdbcPropertiesConverter.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.jdbc; + +import com.google.common.collect.ImmutableMap; +import java.util.Map; +import org.apache.flink.configuration.Configuration; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +/** Test for {@link JdbcPropertiesConverter} */ +public abstract class AbstractJdbcPropertiesConverter { + + String username = "testUser"; + String password = "testPassword"; + String url = "testUrl"; + String defaultDatabase = "test"; + + Map catalogProperties = + ImmutableMap.of( + JdbcPropertiesConstants.GRAVITINO_JDBC_USER, + username, + JdbcPropertiesConstants.GRAVITINO_JDBC_PASSWORD, + password, + JdbcPropertiesConstants.GRAVITINO_JDBC_URL, + url, + JdbcPropertiesConstants.GRAVITINO_JDBC_DEFAULT_DATABASE, + defaultDatabase); + + protected abstract JdbcPropertiesConverter getConverter(Map catalogOptions); + + @Test + public void testToPaimonFileSystemCatalog() { + + Map properties = + getConverter(catalogProperties).toFlinkCatalogProperties(catalogProperties); + Assertions.assertEquals(username, properties.get(JdbcPropertiesConstants.FLINK_JDBC_USER)); + Assertions.assertEquals(password, properties.get(JdbcPropertiesConstants.FLINK_JDBC_PASSWORD)); + Assertions.assertEquals(url, properties.get(JdbcPropertiesConstants.FLINK_JDBC_URL)); + Assertions.assertEquals( + defaultDatabase, properties.get(JdbcPropertiesConstants.FLINK_JDBC_DEFAULT_DATABASE)); + } + + @Test + public void testToGravitinoCatalogProperties() { + Configuration configuration = + Configuration.fromMap( + ImmutableMap.of( + JdbcPropertiesConstants.FLINK_JDBC_USER, + username, + JdbcPropertiesConstants.FLINK_JDBC_PASSWORD, + password, + JdbcPropertiesConstants.FLINK_JDBC_URL, + url, + JdbcPropertiesConstants.FLINK_JDBC_DEFAULT_DATABASE, + defaultDatabase)); + Map properties = + getConverter(catalogProperties).toGravitinoCatalogProperties(configuration); + + Assertions.assertEquals(username, properties.get(JdbcPropertiesConstants.GRAVITINO_JDBC_USER)); + Assertions.assertEquals( + password, properties.get(JdbcPropertiesConstants.GRAVITINO_JDBC_PASSWORD)); + Assertions.assertEquals(url, properties.get(JdbcPropertiesConstants.GRAVITINO_JDBC_URL)); + Assertions.assertEquals( + defaultDatabase, properties.get(JdbcPropertiesConstants.GRAVITINO_JDBC_DEFAULT_DATABASE)); + Assertions.assertEquals( + getConverter(catalogProperties).driverName(), + properties.get(JdbcPropertiesConstants.GRAVITINO_JDBC_DRIVER)); + } +} diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/jdbc/TestMysqlPropertiesConverter.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/jdbc/TestMysqlPropertiesConverter.java new file mode 100644 index 00000000000..33a4c171a15 --- /dev/null +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/jdbc/TestMysqlPropertiesConverter.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.jdbc; + +import java.util.Map; + +public class TestMysqlPropertiesConverter extends AbstractJdbcPropertiesConverter { + + @Override + protected JdbcPropertiesConverter getConverter(Map catalogOptions) { + return new MysqlPropertiesConverter(catalogOptions); + } +} diff --git a/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/jdbc/TestPostgresqlPropertiesConverter.java b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/jdbc/TestPostgresqlPropertiesConverter.java new file mode 100644 index 00000000000..4c4a73cd5b1 --- /dev/null +++ b/flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/jdbc/TestPostgresqlPropertiesConverter.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.gravitino.flink.connector.jdbc; + +import java.util.Map; + +public class TestPostgresqlPropertiesConverter extends AbstractJdbcPropertiesConverter { + + @Override + protected JdbcPropertiesConverter getConverter(Map catalogOptions) { + return new PostgresqlPropertiesConverter(catalogOptions); + } +} diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 736691c4b52..8ba71d51a77 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -89,6 +89,7 @@ awaitility = "4.2.1" servlet = "3.1.0" jodd = "3.5.2" flink = "1.18.0" +flinkjdbc = "3.2.0-1.18" cglib = "2.2" ranger = "2.4.0" javax-jaxb-api = "2.3.1" @@ -277,6 +278,7 @@ google-auth-http = { group = "com.google.auth", name = "google-auth-library-oaut google-auth-credentials = { group = "com.google.auth", name = "google-auth-library-credentials", version.ref = "google-auth" } aliyun-credentials-sdk = { group='com.aliyun', name='credentials-java', version.ref='aliyun-credentials' } +flinkjdbc = {group='org.apache.flink',name='flink-connector-jdbc', version.ref='flinkjdbc'} [bundles] log4j = ["slf4j-api", "log4j-slf4j2-impl", "log4j-api", "log4j-core", "log4j-12-api", "log4j-layout-template-json"]