Skip to content

Commit d8de25b

Browse files
committed
Add tests for REST Catalog to the Flink connector for Iceberg.
1 parent 61cfb52 commit d8de25b

File tree

6 files changed

+203
-8
lines changed

6 files changed

+203
-8
lines changed

flink-connector/flink/build.gradle.kts

+2
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,8 @@ dependencies {
9898
testImplementation(libs.testcontainers.mysql)
9999
testImplementation(libs.metrics.core)
100100

101+
testImplementation("org.apache.iceberg:iceberg-core:$icebergVersion")
102+
testImplementation("org.apache.iceberg:iceberg-hive-metastore:$icebergVersion")
101103
testImplementation("org.apache.iceberg:iceberg-flink-runtime-$flinkMajorVersion:$icebergVersion")
102104
testImplementation("org.apache.flink:flink-connector-hive_$scalaVersion:$flinkVersion")
103105
testImplementation("org.apache.flink:flink-table-common:$flinkVersion")

flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java

-2
Original file line numberDiff line numberDiff line change
@@ -77,8 +77,6 @@ protected boolean supportGetSchemaWithoutCommentAndOption() {
7777
return true;
7878
}
7979

80-
protected abstract String getProvider();
81-
8280
protected abstract boolean supportDropCascade();
8381

8482
protected boolean supportsPrimaryKey() {

flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkEnvIT.java

+55-3
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.google.errorprone.annotations.FormatString;
2525
import java.io.IOException;
2626
import java.util.Collections;
27+
import java.util.HashMap;
2728
import java.util.List;
2829
import java.util.Map;
2930
import java.util.function.Consumer;
@@ -37,11 +38,13 @@
3738
import org.apache.gravitino.Catalog;
3839
import org.apache.gravitino.client.GravitinoMetalake;
3940
import org.apache.gravitino.flink.connector.PropertiesConverter;
41+
import org.apache.gravitino.flink.connector.iceberg.IcebergPropertiesConstants;
4042
import org.apache.gravitino.flink.connector.integration.test.utils.TestUtils;
4143
import org.apache.gravitino.flink.connector.store.GravitinoCatalogStoreFactoryOptions;
4244
import org.apache.gravitino.integration.test.container.ContainerSuite;
4345
import org.apache.gravitino.integration.test.container.HiveContainer;
4446
import org.apache.gravitino.integration.test.util.BaseIT;
47+
import org.apache.gravitino.server.web.JettyServerConfig;
4548
import org.apache.hadoop.fs.FileSystem;
4649
import org.junit.jupiter.api.AfterAll;
4750
import org.junit.jupiter.api.BeforeAll;
@@ -51,6 +54,9 @@
5154
public abstract class FlinkEnvIT extends BaseIT {
5255
private static final Logger LOG = LoggerFactory.getLogger(FlinkEnvIT.class);
5356
private static final ContainerSuite CONTAINER_SUITE = ContainerSuite.getInstance();
57+
58+
protected static final String icebergRestServiceName = "iceberg-rest";
59+
5460
protected static final String GRAVITINO_METALAKE = "flink";
5561
protected static final String DEFAULT_CATALOG = "default_catalog";
5662

@@ -65,32 +71,68 @@ public abstract class FlinkEnvIT extends BaseIT {
6571

6672
private static String gravitinoUri = "http://127.0.0.1:8090";
6773

74+
private final String lakeHouseIcebergProvider = "lakehouse-iceberg";
75+
76+
protected String icebergRestServiceUri;
77+
6878
@BeforeAll
69-
void startUp() {
79+
void startUp() throws Exception {
80+
initHiveEnv();
81+
if (lakeHouseIcebergProvider.equalsIgnoreCase(getProvider())) {
82+
initIcebergRestServiceEnv();
83+
}
7084
// Start Gravitino server
85+
super.startIntegrationTest();
7186
initGravitinoEnv();
7287
initMetalake();
73-
initHiveEnv();
7488
initHdfsEnv();
7589
initFlinkEnv();
7690
LOG.info("Startup Flink env successfully, Gravitino uri: {}.", gravitinoUri);
7791
}
7892

7993
@AfterAll
80-
static void stop() {
94+
void stop() throws IOException, InterruptedException {
8195
stopFlinkEnv();
8296
stopHdfsEnv();
97+
super.stopIntegrationTest();
8398
LOG.info("Stop Flink env successfully.");
8499
}
85100

86101
protected String flinkByPass(String key) {
87102
return PropertiesConverter.FLINK_PROPERTY_PREFIX + key;
88103
}
89104

105+
private void initIcebergRestServiceEnv() {
106+
ignoreIcebergRestService = false;
107+
Map<String, String> icebergRestServiceConfigs = new HashMap<>();
108+
icebergRestServiceConfigs.put(
109+
"gravitino."
110+
+ icebergRestServiceName
111+
+ "."
112+
+ IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND,
113+
IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_HIVE);
114+
icebergRestServiceConfigs.put(
115+
"gravitino."
116+
+ icebergRestServiceName
117+
+ "."
118+
+ IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI,
119+
hiveMetastoreUri);
120+
icebergRestServiceConfigs.put(
121+
"gravitino."
122+
+ icebergRestServiceName
123+
+ "."
124+
+ IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE,
125+
warehouse);
126+
registerCustomConfigs(icebergRestServiceConfigs);
127+
}
128+
90129
private void initGravitinoEnv() {
91130
// Gravitino server is already started by AbstractIT, just construct gravitinoUrl
92131
int gravitinoPort = getGravitinoServerPort();
93132
gravitinoUri = String.format("http://127.0.0.1:%d", gravitinoPort);
133+
if (lakeHouseIcebergProvider.equalsIgnoreCase(getProvider())) {
134+
this.icebergRestServiceUri = getIcebergRestServiceUri();
135+
}
94136
}
95137

96138
private void initMetalake() {
@@ -212,4 +254,14 @@ protected static void clearTableInSchema() {
212254
TestUtils.assertTableResult(deleteResult, ResultKind.SUCCESS);
213255
}
214256
}
257+
258+
private String getIcebergRestServiceUri() {
259+
JettyServerConfig jettyServerConfig =
260+
JettyServerConfig.fromConfig(
261+
serverConfig, String.format("gravitino.%s.", icebergRestServiceName));
262+
return String.format(
263+
"http://%s:%d/iceberg/", jettyServerConfig.getHost(), jettyServerConfig.getHttpPort());
264+
}
265+
266+
protected abstract String getProvider();
215267
}

flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergCatalogIT.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ public void testCreateGravitinoIcebergCatalog() {
100100
// Check the catalog properties.
101101
org.apache.gravitino.Catalog gravitinoCatalog = metalake.loadCatalog(catalogName);
102102
Map<String, String> properties = gravitinoCatalog.properties();
103-
Assertions.assertEquals(hiveMetastoreUri, properties.get(IcebergConstants.URI));
103+
Assertions.assertEquals(getUri(), properties.get(IcebergConstants.URI));
104104

105105
// Get the created catalog.
106106
Optional<org.apache.flink.table.catalog.Catalog> catalog = tableEnv.getCatalog(catalogName);
@@ -153,14 +153,14 @@ public void testCreateGravitinoIcebergUsingSQL() {
153153
catalogName,
154154
GravitinoIcebergCatalogFactoryOptions.IDENTIFIER,
155155
getCatalogBackend(),
156-
hiveMetastoreUri,
156+
getUri(),
157157
warehouse));
158158
Assertions.assertTrue(metalake.catalogExists(catalogName));
159159

160160
// Check the properties of the created catalog.
161161
org.apache.gravitino.Catalog gravitinoCatalog = metalake.loadCatalog(catalogName);
162162
Map<String, String> properties = gravitinoCatalog.properties();
163-
Assertions.assertEquals(hiveMetastoreUri, properties.get(IcebergConstants.URI));
163+
Assertions.assertEquals(getUri(), properties.get(IcebergConstants.URI));
164164

165165
// Get the created catalog.
166166
Optional<org.apache.flink.table.catalog.Catalog> catalog = tableEnv.getCatalog(catalogName);
@@ -499,4 +499,6 @@ protected boolean supportDropCascade() {
499499
}
500500

501501
protected abstract String getCatalogBackend();
502+
503+
protected abstract String getUri();
502504
}

flink-connector/flink/src/test/java/org/apache/gravitino/flink/connector/integration/test/iceberg/FlinkIcebergHiveCatalogIT.java

+6
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,13 @@ protected Map<String, String> getCatalogConfigs() {
4040
return catalogProperties;
4141
}
4242

43+
@Override
4344
protected String getCatalogBackend() {
4445
return "hive";
4546
}
47+
48+
@Override
49+
protected String getUri() {
50+
return hiveMetastoreUri;
51+
}
4652
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.gravitino.flink.connector.integration.test.iceberg;
21+
22+
import com.google.common.collect.Maps;
23+
import java.util.Arrays;
24+
import java.util.Map;
25+
import org.apache.flink.table.api.ResultKind;
26+
import org.apache.flink.types.Row;
27+
import org.apache.gravitino.Schema;
28+
import org.apache.gravitino.flink.connector.iceberg.IcebergPropertiesConstants;
29+
import org.apache.gravitino.flink.connector.integration.test.utils.TestUtils;
30+
import org.junit.jupiter.api.Assertions;
31+
import org.junit.jupiter.api.Tag;
32+
import org.junit.jupiter.api.Test;
33+
import org.junit.jupiter.api.condition.EnabledIf;
34+
35+
@Tag("gravitino-docker-test")
36+
public class FlinkIcebergRestCatalogIT extends FlinkIcebergCatalogIT {
37+
38+
@Override
39+
protected Map<String, String> getCatalogConfigs() {
40+
Map<String, String> catalogProperties = Maps.newHashMap();
41+
catalogProperties.put(
42+
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND,
43+
IcebergPropertiesConstants.ICEBERG_CATALOG_BACKEND_REST);
44+
catalogProperties.put(
45+
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE, warehouse);
46+
catalogProperties.put(
47+
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_URI, icebergRestServiceUri);
48+
return catalogProperties;
49+
}
50+
51+
@Override
52+
public void testListSchema() {
53+
doWithCatalog(
54+
currentCatalog(),
55+
catalog -> {
56+
String schema = "test_list_schema";
57+
String schema2 = "test_list_schema2";
58+
String schema3 = "test_list_schema3";
59+
60+
try {
61+
TestUtils.assertTableResult(
62+
sql("CREATE DATABASE IF NOT EXISTS %s", schema), ResultKind.SUCCESS);
63+
TestUtils.assertTableResult(
64+
sql("CREATE DATABASE IF NOT EXISTS %s", schema2), ResultKind.SUCCESS);
65+
TestUtils.assertTableResult(
66+
sql("CREATE DATABASE IF NOT EXISTS %s", schema3), ResultKind.SUCCESS);
67+
TestUtils.assertTableResult(
68+
sql("SHOW DATABASES"),
69+
ResultKind.SUCCESS_WITH_CONTENT,
70+
Row.of("default"),
71+
Row.of(schema),
72+
Row.of(schema2),
73+
Row.of(schema3));
74+
75+
String[] schemas = catalog.asSchemas().listSchemas();
76+
Arrays.sort(schemas);
77+
Assertions.assertEquals(4, schemas.length);
78+
Assertions.assertEquals("default", schemas[0]);
79+
Assertions.assertEquals(schema, schemas[1]);
80+
Assertions.assertEquals(schema2, schemas[2]);
81+
Assertions.assertEquals(schema3, schemas[3]);
82+
} finally {
83+
catalog.asSchemas().dropSchema(schema, supportDropCascade());
84+
catalog.asSchemas().dropSchema(schema2, supportDropCascade());
85+
catalog.asSchemas().dropSchema(schema3, supportDropCascade());
86+
}
87+
});
88+
}
89+
90+
@Test
91+
@EnabledIf("supportSchemaOperationWithCommentAndOptions")
92+
public void testAlterSchemaWithCommentAndOptions() {
93+
doWithCatalog(
94+
currentCatalog(),
95+
catalog -> {
96+
String schema = "test_alter_schema";
97+
try {
98+
TestUtils.assertTableResult(
99+
sql(
100+
"CREATE DATABASE IF NOT EXISTS %s "
101+
+ "COMMENT 'test comment'"
102+
+ "WITH ('key1' = 'value1', 'key2'='value2')",
103+
schema),
104+
ResultKind.SUCCESS);
105+
106+
Schema loadedSchema = catalog.asSchemas().loadSchema(schema);
107+
Assertions.assertEquals(schema, loadedSchema.name());
108+
Assertions.assertEquals("test comment", loadedSchema.comment());
109+
Assertions.assertEquals("value1", loadedSchema.properties().get("key1"));
110+
Assertions.assertEquals("value2", loadedSchema.properties().get("key2"));
111+
112+
TestUtils.assertTableResult(
113+
sql("ALTER DATABASE %s SET ('key1'='new-value', 'key3'='value3')", schema),
114+
ResultKind.SUCCESS);
115+
Schema reloadedSchema = catalog.asSchemas().loadSchema(schema);
116+
Assertions.assertEquals(schema, reloadedSchema.name());
117+
Assertions.assertEquals("test comment", reloadedSchema.comment());
118+
Assertions.assertEquals("new-value", reloadedSchema.properties().get("key1"));
119+
Assertions.assertEquals("value3", reloadedSchema.properties().get("key3"));
120+
} finally {
121+
catalog.asSchemas().dropSchema(schema, supportDropCascade());
122+
}
123+
});
124+
}
125+
126+
@Override
127+
protected String getCatalogBackend() {
128+
return "rest";
129+
}
130+
131+
@Override
132+
protected String getUri() {
133+
return icebergRestServiceUri;
134+
}
135+
}

0 commit comments

Comments
 (0)