Skip to content

Commit deac3d4

Browse files
committed
Optimize the logic of jdbcCatalog's getTable method.
1 parent 9bb4078 commit deac3d4

12 files changed

+51
-45
lines changed

flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/PropertiesConverter.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.Map;
2424
import org.apache.flink.configuration.Configuration;
2525
import org.apache.flink.table.catalog.CommonCatalogOptions;
26+
import org.apache.flink.table.catalog.ObjectPath;
2627

2728
/**
2829
* PropertiesConverter is used to convert properties between Flink properties and Apache Gravitino
@@ -136,7 +137,8 @@ default Map<String, String> toFlinkDatabaseProperties(Map<String, String> gravit
136137
* @param gravitinoProperties The table properties provided by Gravitino.
137138
* @return The table properties for the Flink connector.
138139
*/
139-
default Map<String, String> toFlinkTableProperties(Map<String, String> gravitinoProperties) {
140+
default Map<String, String> toFlinkTableProperties(
141+
Map<String, String> gravitinoProperties, ObjectPath tablePath) {
140142
return gravitinoProperties;
141143
}
142144

flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ public CatalogBaseTable getTable(ObjectPath tablePath)
211211
catalog()
212212
.asTableCatalog()
213213
.loadTable(NameIdentifier.of(tablePath.getDatabaseName(), tablePath.getObjectName()));
214-
return toFlinkTable(table);
214+
return toFlinkTable(table, tablePath);
215215
} catch (NoSuchTableException e) {
216216
throw new TableNotExistException(catalogName(), tablePath, e);
217217
} catch (Exception e) {
@@ -550,7 +550,7 @@ public void alterPartitionColumnStatistics(
550550
throw new UnsupportedOperationException();
551551
}
552552

553-
protected CatalogBaseTable toFlinkTable(Table table) {
553+
protected CatalogBaseTable toFlinkTable(Table table, ObjectPath tablePath) {
554554
org.apache.flink.table.api.Schema.Builder builder =
555555
org.apache.flink.table.api.Schema.newBuilder();
556556
for (Column column : table.columns()) {
@@ -562,7 +562,7 @@ protected CatalogBaseTable toFlinkTable(Table table) {
562562
Optional<List<String>> flinkPrimaryKey = getFlinkPrimaryKey(table);
563563
flinkPrimaryKey.ifPresent(builder::primaryKey);
564564
Map<String, String> flinkTableProperties =
565-
propertiesConverter.toFlinkTableProperties(table.properties());
565+
propertiesConverter.toFlinkTableProperties(table.properties(), tablePath);
566566
List<String> partitionKeys = partitionConverter.toFlinkPartitionKeys(table.partitioning());
567567
return CatalogTable.of(builder.build(), table.comment(), partitionKeys, flinkTableProperties);
568568
}

flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalogFactory.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public interface BaseCatalogFactory extends CatalogFactory {
4545
*
4646
* @return The requested property converter.
4747
*/
48-
PropertiesConverter propertiesConverter();
48+
PropertiesConverter propertiesConverter(Context context);
4949

5050
/**
5151
* Define partition converter.

flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalogFactory.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ public Catalog createCatalog(Context context) {
6363
return new GravitinoHiveCatalog(
6464
context.getName(),
6565
helper.getOptions().get(HiveCatalogFactoryOptions.DEFAULT_DATABASE),
66-
propertiesConverter(),
66+
propertiesConverter(context),
6767
partitionConverter(),
6868
hiveConf,
6969
helper.getOptions().get(HiveCatalogFactoryOptions.HIVE_VERSION));
@@ -113,7 +113,7 @@ public org.apache.gravitino.Catalog.Type gravitinoCatalogType() {
113113
* @return The requested property converter.
114114
*/
115115
@Override
116-
public PropertiesConverter propertiesConverter() {
116+
public PropertiesConverter propertiesConverter(Context context) {
117117
return HivePropertiesConverter.INSTANCE;
118118
}
119119

flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFactory.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public Catalog createCatalog(Context context) {
3838
return new GravitinoIcebergCatalog(
3939
context.getName(),
4040
helper.getOptions().get(GravitinoIcebergCatalogFactoryOptions.DEFAULT_DATABASE),
41-
propertiesConverter(),
41+
propertiesConverter(context),
4242
partitionConverter(),
4343
context.getOptions());
4444
}
@@ -84,7 +84,7 @@ public org.apache.gravitino.Catalog.Type gravitinoCatalogType() {
8484
* @return
8585
*/
8686
@Override
87-
public PropertiesConverter propertiesConverter() {
87+
public PropertiesConverter propertiesConverter(Context context) {
8888
return IcebergPropertiesConverter.INSTANCE;
8989
}
9090

flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoJdbcCatalog.java

-23
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,11 @@
1919

2020
package org.apache.gravitino.flink.connector.jdbc;
2121

22-
import java.util.Map;
2322
import java.util.Optional;
2423
import org.apache.flink.connector.jdbc.catalog.JdbcCatalog;
2524
import org.apache.flink.connector.jdbc.catalog.factory.JdbcCatalogFactory;
2625
import org.apache.flink.connector.jdbc.table.JdbcDynamicTableFactory;
2726
import org.apache.flink.table.catalog.AbstractCatalog;
28-
import org.apache.flink.table.catalog.CatalogBaseTable;
29-
import org.apache.flink.table.catalog.ObjectPath;
30-
import org.apache.flink.table.catalog.exceptions.CatalogException;
31-
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
3227
import org.apache.flink.table.factories.CatalogFactory;
3328
import org.apache.flink.table.factories.Factory;
3429
import org.apache.gravitino.flink.connector.PartitionConverter;
@@ -65,22 +60,4 @@ protected AbstractCatalog realCatalog() {
6560
public Optional<Factory> getFactory() {
6661
return Optional.of(new JdbcDynamicTableFactory());
6762
}
68-
69-
@Override
70-
public CatalogBaseTable getTable(ObjectPath tablePath)
71-
throws TableNotExistException, CatalogException {
72-
CatalogBaseTable table = super.getTable(tablePath);
73-
Map<String, String> contextOptions = context.getOptions();
74-
Map<String, String> tableOptions = table.getOptions();
75-
tableOptions.remove("engine");
76-
tableOptions.put(
77-
"url",
78-
contextOptions.get(JdbcPropertiesConstants.FLINK_JDBC_URL)
79-
+ "/"
80-
+ tablePath.getDatabaseName());
81-
tableOptions.put("table-name", tablePath.getObjectName());
82-
tableOptions.put("username", contextOptions.get(JdbcPropertiesConstants.FLINK_JDBC_USER));
83-
tableOptions.put("password", contextOptions.get(JdbcPropertiesConstants.FLINK_JDBC_PASSWORD));
84-
return table;
85-
}
8663
}

flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoMysqlJdbcCatalogFactory.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ public String gravitinoCatalogProvider() {
2929
}
3030

3131
@Override
32-
public PropertiesConverter propertiesConverter() {
33-
return MysqlPropertiesConverter.INSTANCE;
32+
public PropertiesConverter propertiesConverter(Context context) {
33+
return new MysqlPropertiesConverter(context);
3434
}
3535

3636
@Override

flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoPostgresJdbcCatalogFactory.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,8 @@ public String gravitinoCatalogProvider() {
2929
}
3030

3131
@Override
32-
public PropertiesConverter propertiesConverter() {
33-
return PostgresqlPropertiesConverter.INSTANCE;
32+
public PropertiesConverter propertiesConverter(Context context) {
33+
return new PostgresqlPropertiesConverter(context);
3434
}
3535

3636
@Override

flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/JdbcPropertiesConverter.java

+24-1
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,20 @@
1919

2020
package org.apache.gravitino.flink.connector.jdbc;
2121

22+
import java.util.HashMap;
2223
import java.util.Map;
2324
import org.apache.flink.configuration.Configuration;
25+
import org.apache.flink.table.catalog.ObjectPath;
26+
import org.apache.flink.table.factories.CatalogFactory;
2427
import org.apache.gravitino.flink.connector.PropertiesConverter;
2528

2629
public abstract class JdbcPropertiesConverter implements PropertiesConverter {
2730

28-
protected JdbcPropertiesConverter() {}
31+
private final CatalogFactory.Context context;
32+
33+
protected JdbcPropertiesConverter(CatalogFactory.Context context) {
34+
this.context = context;
35+
}
2936

3037
@Override
3138
public Map<String, String> toGravitinoCatalogProperties(Configuration flinkConf) {
@@ -45,5 +52,21 @@ public String transformPropertyToFlinkCatalog(String configKey) {
4552
return JdbcPropertiesConstants.gravitinoToFlinkMap.get(configKey);
4653
}
4754

55+
@Override
56+
public Map<String, String> toFlinkTableProperties(
57+
Map<String, String> gravitinoProperties, ObjectPath tablePath) {
58+
Map<String, String> catalogOptions = context.getOptions();
59+
Map<String, String> tableOptions = new HashMap<>();
60+
tableOptions.put(
61+
"url",
62+
catalogOptions.get(JdbcPropertiesConstants.FLINK_JDBC_URL)
63+
+ "/"
64+
+ tablePath.getDatabaseName());
65+
tableOptions.put("table-name", tablePath.getObjectName());
66+
tableOptions.put("username", catalogOptions.get(JdbcPropertiesConstants.FLINK_JDBC_USER));
67+
tableOptions.put("password", catalogOptions.get(JdbcPropertiesConstants.FLINK_JDBC_PASSWORD));
68+
return tableOptions;
69+
}
70+
4871
protected abstract String driverName();
4972
}

flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/MysqlPropertiesConverter.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@
1919

2020
package org.apache.gravitino.flink.connector.jdbc;
2121

22-
public class MysqlPropertiesConverter extends JdbcPropertiesConverter {
22+
import org.apache.flink.table.factories.CatalogFactory;
2323

24-
public static final MysqlPropertiesConverter INSTANCE = new MysqlPropertiesConverter();
24+
public class MysqlPropertiesConverter extends JdbcPropertiesConverter {
2525

26-
private MysqlPropertiesConverter() {}
26+
protected MysqlPropertiesConverter(CatalogFactory.Context context) {
27+
super(context);
28+
}
2729

2830
@Override
2931
public String driverName() {

flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/PostgresqlPropertiesConverter.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@
1919

2020
package org.apache.gravitino.flink.connector.jdbc;
2121

22-
public class PostgresqlPropertiesConverter extends JdbcPropertiesConverter {
22+
import org.apache.flink.table.factories.CatalogFactory;
2323

24-
public static final PostgresqlPropertiesConverter INSTANCE = new PostgresqlPropertiesConverter();
24+
public class PostgresqlPropertiesConverter extends JdbcPropertiesConverter {
2525

26-
private PostgresqlPropertiesConverter() {}
26+
protected PostgresqlPropertiesConverter(CatalogFactory.Context context) {
27+
super(context);
28+
}
2729

2830
@Override
2931
protected String driverName() {

flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactory.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public Catalog createCatalog(Context context) {
4343
String defaultDatabase =
4444
helper.getOptions().get(GravitinoPaimonCatalogFactoryOptions.DEFAULT_DATABASE);
4545
return new GravitinoPaimonCatalog(
46-
context, defaultDatabase, propertiesConverter(), partitionConverter());
46+
context, defaultDatabase, propertiesConverter(context), partitionConverter());
4747
}
4848

4949
@Override
@@ -72,7 +72,7 @@ public org.apache.gravitino.Catalog.Type gravitinoCatalogType() {
7272
}
7373

7474
@Override
75-
public PropertiesConverter propertiesConverter() {
75+
public PropertiesConverter propertiesConverter(Context context) {
7676
return PaimonPropertiesConverter.INSTANCE;
7777
}
7878

0 commit comments

Comments
 (0)