Skip to content

Commit 0e48086

Browse files
committed
Optimize the logic of jdbcCatalog's getTable method.
1 parent deac3d4 commit 0e48086

14 files changed

+61
-42
lines changed

flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalogFactory.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.gravitino.flink.connector.catalog;
2121

22+
import java.util.Map;
2223
import org.apache.flink.table.factories.CatalogFactory;
2324
import org.apache.gravitino.Catalog;
2425
import org.apache.gravitino.flink.connector.PartitionConverter;
@@ -45,7 +46,7 @@ public interface BaseCatalogFactory extends CatalogFactory {
4546
*
4647
* @return The requested property converter.
4748
*/
48-
PropertiesConverter propertiesConverter(Context context);
49+
PropertiesConverter propertiesConverter(Map<String, String> catalogOptions);
4950

5051
/**
5152
* Define partition converter.

flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/GravitinoHiveCatalogFactory.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static org.apache.gravitino.flink.connector.hive.GravitinoHiveCatalogFactoryOptions.IDENTIFIER;
2323

2424
import com.google.common.collect.ImmutableSet;
25+
import java.util.Map;
2526
import java.util.Set;
2627
import org.apache.flink.configuration.ConfigOption;
2728
import org.apache.flink.table.catalog.Catalog;
@@ -63,7 +64,7 @@ public Catalog createCatalog(Context context) {
6364
return new GravitinoHiveCatalog(
6465
context.getName(),
6566
helper.getOptions().get(HiveCatalogFactoryOptions.DEFAULT_DATABASE),
66-
propertiesConverter(context),
67+
propertiesConverter(context.getOptions()),
6768
partitionConverter(),
6869
hiveConf,
6970
helper.getOptions().get(HiveCatalogFactoryOptions.HIVE_VERSION));
@@ -113,7 +114,7 @@ public org.apache.gravitino.Catalog.Type gravitinoCatalogType() {
113114
* @return The requested property converter.
114115
*/
115116
@Override
116-
public PropertiesConverter propertiesConverter(Context context) {
117+
public PropertiesConverter propertiesConverter(Map<String, String> catalogOptions) {
117118
return HivePropertiesConverter.INSTANCE;
118119
}
119120

flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/hive/HivePropertiesConverter.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.google.common.collect.ImmutableMap;
2323
import java.util.Map;
2424
import java.util.stream.Collectors;
25+
import org.apache.flink.table.catalog.ObjectPath;
2526
import org.apache.gravitino.catalog.hive.HiveConstants;
2627
import org.apache.gravitino.flink.connector.PropertiesConverter;
2728
import org.apache.hadoop.hive.conf.HiveConf;
@@ -47,7 +48,8 @@ public String transformPropertyToFlinkCatalog(String configKey) {
4748
}
4849

4950
@Override
50-
public Map<String, String> toFlinkTableProperties(Map<String, String> gravitinoProperties) {
51+
public Map<String, String> toFlinkTableProperties(
52+
Map<String, String> gravitinoProperties, ObjectPath tablePath) {
5153
Map<String, String> properties =
5254
gravitinoProperties.entrySet().stream()
5355
.collect(

flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/iceberg/GravitinoIcebergCatalogFactory.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.gravitino.flink.connector.iceberg;
2020

2121
import java.util.Collections;
22+
import java.util.Map;
2223
import java.util.Set;
2324
import org.apache.flink.configuration.ConfigOption;
2425
import org.apache.flink.table.catalog.Catalog;
@@ -38,7 +39,7 @@ public Catalog createCatalog(Context context) {
3839
return new GravitinoIcebergCatalog(
3940
context.getName(),
4041
helper.getOptions().get(GravitinoIcebergCatalogFactoryOptions.DEFAULT_DATABASE),
41-
propertiesConverter(context),
42+
propertiesConverter(context.getOptions()),
4243
partitionConverter(),
4344
context.getOptions());
4445
}
@@ -84,7 +85,7 @@ public org.apache.gravitino.Catalog.Type gravitinoCatalogType() {
8485
* @return
8586
*/
8687
@Override
87-
public PropertiesConverter propertiesConverter(Context context) {
88+
public PropertiesConverter propertiesConverter(Map<String, String> catalogOptions) {
8889
return IcebergPropertiesConverter.INSTANCE;
8990
}
9091

flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoJdbcCatalogFactory.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ public org.apache.flink.table.catalog.Catalog createCatalog(Context context) {
4646
defaultDatabase,
4747
GravitinoJdbcCatalogFactoryOptions.DEFAULT_DATABASE.key() + " should not be null.");
4848
return new GravitinoJdbcCatalog(
49-
context, defaultDatabase, propertiesConverter(), partitionConverter());
49+
context, defaultDatabase, propertiesConverter(context.getOptions()), partitionConverter());
5050
}
5151

5252
@Override

flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoMysqlJdbcCatalogFactory.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.gravitino.flink.connector.jdbc;
2121

22+
import java.util.Map;
2223
import org.apache.gravitino.flink.connector.PropertiesConverter;
2324

2425
public class GravitinoMysqlJdbcCatalogFactory extends GravitinoJdbcCatalogFactory {
@@ -29,8 +30,8 @@ public String gravitinoCatalogProvider() {
2930
}
3031

3132
@Override
32-
public PropertiesConverter propertiesConverter(Context context) {
33-
return new MysqlPropertiesConverter(context);
33+
public PropertiesConverter propertiesConverter(Map<String, String> catalogOptions) {
34+
return new MysqlPropertiesConverter(catalogOptions);
3435
}
3536

3637
@Override

flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/GravitinoPostgresJdbcCatalogFactory.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
package org.apache.gravitino.flink.connector.jdbc;
2121

22+
import java.util.Map;
2223
import org.apache.gravitino.flink.connector.PropertiesConverter;
2324

2425
public class GravitinoPostgresJdbcCatalogFactory extends GravitinoJdbcCatalogFactory {
@@ -29,8 +30,8 @@ public String gravitinoCatalogProvider() {
2930
}
3031

3132
@Override
32-
public PropertiesConverter propertiesConverter(Context context) {
33-
return new PostgresqlPropertiesConverter(context);
33+
public PropertiesConverter propertiesConverter(Map<String, String> catalogOptions) {
34+
return new PostgresqlPropertiesConverter(catalogOptions);
3435
}
3536

3637
@Override

flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/JdbcPropertiesConverter.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,14 @@
2323
import java.util.Map;
2424
import org.apache.flink.configuration.Configuration;
2525
import org.apache.flink.table.catalog.ObjectPath;
26-
import org.apache.flink.table.factories.CatalogFactory;
2726
import org.apache.gravitino.flink.connector.PropertiesConverter;
2827

2928
public abstract class JdbcPropertiesConverter implements PropertiesConverter {
3029

31-
private final CatalogFactory.Context context;
30+
private final Map<String, String> catalogOptions;
3231

33-
protected JdbcPropertiesConverter(CatalogFactory.Context context) {
34-
this.context = context;
32+
protected JdbcPropertiesConverter(Map<String, String> catalogOptions) {
33+
this.catalogOptions = catalogOptions;
3534
}
3635

3736
@Override
@@ -55,7 +54,6 @@ public String transformPropertyToFlinkCatalog(String configKey) {
5554
@Override
5655
public Map<String, String> toFlinkTableProperties(
5756
Map<String, String> gravitinoProperties, ObjectPath tablePath) {
58-
Map<String, String> catalogOptions = context.getOptions();
5957
Map<String, String> tableOptions = new HashMap<>();
6058
tableOptions.put(
6159
"url",
@@ -69,4 +67,8 @@ public Map<String, String> toFlinkTableProperties(
6967
}
7068

7169
protected abstract String driverName();
70+
71+
public Map<String, String> getCatalogOptions() {
72+
return catalogOptions;
73+
}
7274
}

flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/MysqlPropertiesConverter.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,12 @@
1919

2020
package org.apache.gravitino.flink.connector.jdbc;
2121

22-
import org.apache.flink.table.factories.CatalogFactory;
22+
import java.util.Map;
2323

2424
public class MysqlPropertiesConverter extends JdbcPropertiesConverter {
2525

26-
protected MysqlPropertiesConverter(CatalogFactory.Context context) {
27-
super(context);
26+
protected MysqlPropertiesConverter(Map<String, String> catalogOptions) {
27+
super(catalogOptions);
2828
}
2929

3030
@Override

flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/jdbc/PostgresqlPropertiesConverter.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,12 @@
1919

2020
package org.apache.gravitino.flink.connector.jdbc;
2121

22-
import org.apache.flink.table.factories.CatalogFactory;
22+
import java.util.Map;
2323

2424
public class PostgresqlPropertiesConverter extends JdbcPropertiesConverter {
2525

26-
protected PostgresqlPropertiesConverter(CatalogFactory.Context context) {
27-
super(context);
26+
protected PostgresqlPropertiesConverter(Map<String, String> catalogOptions) {
27+
super(catalogOptions);
2828
}
2929

3030
@Override

flink-connector/flink/src/main/java/org/apache/gravitino/flink/connector/paimon/GravitinoPaimonCatalogFactory.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
package org.apache.gravitino.flink.connector.paimon;
2121

2222
import java.util.Collections;
23+
import java.util.Map;
2324
import java.util.Set;
2425
import org.apache.flink.configuration.ConfigOption;
2526
import org.apache.flink.table.catalog.Catalog;
@@ -43,7 +44,7 @@ public Catalog createCatalog(Context context) {
4344
String defaultDatabase =
4445
helper.getOptions().get(GravitinoPaimonCatalogFactoryOptions.DEFAULT_DATABASE);
4546
return new GravitinoPaimonCatalog(
46-
context, defaultDatabase, propertiesConverter(context), partitionConverter());
47+
context, defaultDatabase, propertiesConverter(context.getOptions()), partitionConverter());
4748
}
4849

4950
@Override
@@ -72,7 +73,7 @@ public org.apache.gravitino.Catalog.Type gravitinoCatalogType() {
7273
}
7374

7475
@Override
75-
public PropertiesConverter propertiesConverter(Context context) {
76+
public PropertiesConverter propertiesConverter(Map<String, String> catalogOptions) {
7677
return PaimonPropertiesConverter.INSTANCE;
7778
}
7879

flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/jdbc/AbstractJdbcPropertiesConverter.java

+19-14
Original file line numberDiff line numberDiff line change
@@ -33,21 +33,24 @@ public abstract class AbstractJdbcPropertiesConverter {
3333
String url = "testUrl";
3434
String defaultDatabase = "test";
3535

36-
protected abstract JdbcPropertiesConverter getConverter();
36+
Map<String, String> catalogProperties =
37+
ImmutableMap.of(
38+
JdbcPropertiesConstants.GRAVITINO_JDBC_USER,
39+
username,
40+
JdbcPropertiesConstants.GRAVITINO_JDBC_PASSWORD,
41+
password,
42+
JdbcPropertiesConstants.GRAVITINO_JDBC_URL,
43+
url,
44+
JdbcPropertiesConstants.GRAVITINO_JDBC_DEFAULT_DATABASE,
45+
defaultDatabase);
46+
47+
protected abstract JdbcPropertiesConverter getConverter(Map<String, String> catalogOptions);
3748

3849
@Test
3950
public void testToPaimonFileSystemCatalog() {
40-
Map<String, String> catalogProperties =
41-
ImmutableMap.of(
42-
JdbcPropertiesConstants.GRAVITINO_JDBC_USER,
43-
username,
44-
JdbcPropertiesConstants.GRAVITINO_JDBC_PASSWORD,
45-
password,
46-
JdbcPropertiesConstants.GRAVITINO_JDBC_URL,
47-
url,
48-
JdbcPropertiesConstants.GRAVITINO_JDBC_DEFAULT_DATABASE,
49-
defaultDatabase);
50-
Map<String, String> properties = getConverter().toFlinkCatalogProperties(catalogProperties);
51+
52+
Map<String, String> properties =
53+
getConverter(catalogProperties).toFlinkCatalogProperties(catalogProperties);
5154
Assertions.assertEquals(username, properties.get(JdbcPropertiesConstants.FLINK_JDBC_USER));
5255
Assertions.assertEquals(password, properties.get(JdbcPropertiesConstants.FLINK_JDBC_PASSWORD));
5356
Assertions.assertEquals(url, properties.get(JdbcPropertiesConstants.FLINK_JDBC_URL));
@@ -68,7 +71,8 @@ public void testToGravitinoCatalogProperties() {
6871
url,
6972
JdbcPropertiesConstants.FLINK_JDBC_DEFAULT_DATABASE,
7073
defaultDatabase));
71-
Map<String, String> properties = getConverter().toGravitinoCatalogProperties(configuration);
74+
Map<String, String> properties =
75+
getConverter(catalogProperties).toGravitinoCatalogProperties(configuration);
7276

7377
Assertions.assertEquals(username, properties.get(JdbcPropertiesConstants.GRAVITINO_JDBC_USER));
7478
Assertions.assertEquals(
@@ -77,6 +81,7 @@ public void testToGravitinoCatalogProperties() {
7781
Assertions.assertEquals(
7882
defaultDatabase, properties.get(JdbcPropertiesConstants.GRAVITINO_JDBC_DEFAULT_DATABASE));
7983
Assertions.assertEquals(
80-
getConverter().driverName(), properties.get(JdbcPropertiesConstants.GRAVITINO_JDBC_DRIVER));
84+
getConverter(catalogProperties).driverName(),
85+
properties.get(JdbcPropertiesConstants.GRAVITINO_JDBC_DRIVER));
8186
}
8287
}

flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/jdbc/TestMysqlPropertiesConverter.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@
1919

2020
package org.apache.gravitino.flink.connector.jdbc;
2121

22+
import java.util.Map;
23+
2224
public class TestMysqlPropertiesConverter extends AbstractJdbcPropertiesConverter {
2325

2426
@Override
25-
protected JdbcPropertiesConverter getConverter() {
26-
return MysqlPropertiesConverter.INSTANCE;
27+
protected JdbcPropertiesConverter getConverter(Map<String, String> catalogOptions) {
28+
return new MysqlPropertiesConverter(catalogOptions);
2729
}
2830
}

flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/jdbc/TestPostgresqlPropertiesConverter.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,12 @@
1919

2020
package org.apache.gravitino.flink.connector.jdbc;
2121

22+
import java.util.Map;
23+
2224
public class TestPostgresqlPropertiesConverter extends AbstractJdbcPropertiesConverter {
2325

2426
@Override
25-
protected JdbcPropertiesConverter getConverter() {
26-
return PostgresqlPropertiesConverter.INSTANCE;
27+
protected JdbcPropertiesConverter getConverter(Map<String, String> catalogOptions) {
28+
return new PostgresqlPropertiesConverter(catalogOptions);
2729
}
2830
}

0 commit comments

Comments
 (0)