Skip to content

Commit cb9cdd8

Browse files
authored
[#6035]feat(spark-connector):Support custom catalog backend (#6036)
### What changes were proposed in this pull request? When the catalog backend is `custom`, add the `catalog-impl` instead `type` in properties ### Why are the changes needed? Fix: #6035 ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? add a new case in `TestIcebergPropertiesConverter`
1 parent 688c1c9 commit cb9cdd8

File tree

16 files changed

+56
-13
lines changed

16 files changed

+56
-13
lines changed

iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/IcebergCatalogBackend.java catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogBackend.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
* specific language governing permissions and limitations
1717
* under the License.
1818
*/
19-
package org.apache.gravitino.iceberg.common;
19+
package org.apache.gravitino.catalog.lakehouse.iceberg;
2020

2121
public enum IcebergCatalogBackend {
2222
HIVE,

catalogs/catalog-common/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergPropertiesUtils.java

+1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public class IcebergPropertiesUtils {
3737
static {
3838
Map<String, String> map = new HashMap();
3939
map.put(IcebergConstants.CATALOG_BACKEND, IcebergConstants.CATALOG_BACKEND);
40+
map.put(IcebergConstants.CATALOG_BACKEND_IMPL, IcebergConstants.CATALOG_BACKEND_IMPL);
4041
map.put(IcebergConstants.GRAVITINO_JDBC_DRIVER, IcebergConstants.GRAVITINO_JDBC_DRIVER);
4142
map.put(IcebergConstants.GRAVITINO_JDBC_USER, IcebergConstants.ICEBERG_JDBC_USER);
4243
map.put(IcebergConstants.GRAVITINO_JDBC_PASSWORD, IcebergConstants.ICEBERG_JDBC_PASSWORD);

catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogPropertiesMetadata.java

-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030
import java.util.Map;
3131
import org.apache.gravitino.connector.BaseCatalogPropertiesMetadata;
3232
import org.apache.gravitino.connector.PropertyEntry;
33-
import org.apache.gravitino.iceberg.common.IcebergCatalogBackend;
3433
import org.apache.gravitino.iceberg.common.authentication.AuthenticationConfig;
3534
import org.apache.gravitino.iceberg.common.authentication.kerberos.KerberosConfig;
3635
import org.apache.gravitino.storage.AzureProperties;

catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,14 +47,14 @@
4747
import org.apache.gravitino.SchemaChange;
4848
import org.apache.gravitino.SupportsSchemas;
4949
import org.apache.gravitino.auth.AuthConstants;
50+
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergCatalogBackend;
5051
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergSchemaPropertiesMetadata;
5152
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergTable;
5253
import org.apache.gravitino.catalog.lakehouse.iceberg.ops.IcebergCatalogWrapperHelper;
5354
import org.apache.gravitino.client.GravitinoMetalake;
5455
import org.apache.gravitino.exceptions.NoSuchSchemaException;
5556
import org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
5657
import org.apache.gravitino.exceptions.TableAlreadyExistsException;
57-
import org.apache.gravitino.iceberg.common.IcebergCatalogBackend;
5858
import org.apache.gravitino.iceberg.common.IcebergConfig;
5959
import org.apache.gravitino.iceberg.common.utils.IcebergCatalogUtil;
6060
import org.apache.gravitino.integration.test.container.ContainerSuite;

docs/spark-connector/spark-catalog-iceberg.md

+2-1
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,8 @@ Gravitino spark connector will transform below property names which are defined
103103

104104
| Gravitino catalog property name | Spark Iceberg connector configuration | Description | Since Version |
105105
|---------------------------------|---------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------|
106-
| `catalog-backend` | `type` | Catalog backend type | 0.5.0 |
106+
| `catalog-backend` | `type` | Catalog backend type.Supports `hive` or `jdbc` or `rest` or `custom` | 0.5.0 |
107+
| `catalog-backend-impl` | `catalog-impl` | The fully-qualified class name of a custom catalog implementation, only worked if `catalog-backend` is `custom` | 0.8.0-incubating |
107108
| `uri` | `uri` | Catalog backend uri | 0.5.0 |
108109
| `warehouse` | `warehouse` | Catalog backend warehouse | 0.5.0 |
109110
| `jdbc-user` | `jdbc.user` | JDBC user name | 0.5.0 |

iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/ops/IcebergCatalogWrapper.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import lombok.Getter;
2929
import lombok.Setter;
3030
import org.apache.commons.lang3.StringUtils;
31-
import org.apache.gravitino.iceberg.common.IcebergCatalogBackend;
31+
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergCatalogBackend;
3232
import org.apache.gravitino.iceberg.common.IcebergConfig;
3333
import org.apache.gravitino.iceberg.common.utils.IcebergCatalogUtil;
3434
import org.apache.gravitino.utils.IsolatedClassLoader;

iceberg/iceberg-common/src/main/java/org/apache/gravitino/iceberg/common/utils/IcebergCatalogUtil.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -30,10 +30,10 @@
3030
import java.util.HashMap;
3131
import java.util.Locale;
3232
import java.util.Map;
33+
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergCatalogBackend;
3334
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
3435
import org.apache.gravitino.exceptions.ConnectionFailedException;
3536
import org.apache.gravitino.iceberg.common.ClosableHiveCatalog;
36-
import org.apache.gravitino.iceberg.common.IcebergCatalogBackend;
3737
import org.apache.gravitino.iceberg.common.IcebergConfig;
3838
import org.apache.gravitino.iceberg.common.authentication.AuthenticationConfig;
3939
import org.apache.gravitino.iceberg.common.authentication.kerberos.HiveBackendProxy;

iceberg/iceberg-common/src/test/java/org/apache/gravitino/iceberg/common/utils/TestIcebergCatalogUtil.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121

2222
import java.util.HashMap;
2323
import java.util.Map;
24+
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergCatalogBackend;
2425
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
25-
import org.apache.gravitino.iceberg.common.IcebergCatalogBackend;
2626
import org.apache.gravitino.iceberg.common.IcebergConfig;
2727
import org.apache.iceberg.CatalogProperties;
2828
import org.apache.iceberg.catalog.Catalog;

iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRESTHiveCatalogIT.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
import java.util.HashMap;
2222
import java.util.Map;
23-
import org.apache.gravitino.iceberg.common.IcebergCatalogBackend;
23+
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergCatalogBackend;
2424
import org.apache.gravitino.iceberg.common.IcebergConfig;
2525
import org.apache.gravitino.integration.test.container.ContainerSuite;
2626
import org.apache.gravitino.integration.test.container.HiveContainer;

iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRESTJdbcCatalogIT.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,8 @@
2020

2121
import java.util.HashMap;
2222
import java.util.Map;
23+
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergCatalogBackend;
2324
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
24-
import org.apache.gravitino.iceberg.common.IcebergCatalogBackend;
2525
import org.apache.gravitino.iceberg.common.IcebergConfig;
2626
import org.apache.gravitino.integration.test.container.ContainerSuite;
2727
import org.apache.gravitino.integration.test.container.HiveContainer;

iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRESTMemoryCatalogIT.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
import java.util.HashMap;
2222
import java.util.Map;
23-
import org.apache.gravitino.iceberg.common.IcebergCatalogBackend;
23+
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergCatalogBackend;
2424
import org.apache.gravitino.iceberg.common.IcebergConfig;
2525

2626
public class IcebergRESTMemoryCatalogIT extends IcebergRESTServiceIT {

iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRESTServiceBaseIT.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
import java.util.stream.IntStream;
3232
import org.apache.commons.io.FileUtils;
3333
import org.apache.commons.lang3.StringUtils;
34-
import org.apache.gravitino.iceberg.common.IcebergCatalogBackend;
34+
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergCatalogBackend;
3535
import org.apache.gravitino.iceberg.common.IcebergConfig;
3636
import org.apache.gravitino.iceberg.integration.test.util.IcebergRESTServerManager;
3737
import org.apache.gravitino.integration.test.util.ITUtils;

iceberg/iceberg-rest-server/src/test/java/org/apache/gravitino/iceberg/integration/test/IcebergRestKerberosHiveCatalogIT.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
import java.util.Map;
2626
import java.util.Objects;
2727
import org.apache.commons.io.FileUtils;
28-
import org.apache.gravitino.iceberg.common.IcebergCatalogBackend;
28+
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergCatalogBackend;
2929
import org.apache.gravitino.iceberg.common.IcebergConfig;
3030
import org.apache.gravitino.integration.test.container.HiveContainer;
3131
import org.apache.gravitino.integration.test.util.GravitinoITUtils;

spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/iceberg/IcebergPropertiesConstants.java

+1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ public class IcebergPropertiesConstants {
2929
public static final String GRAVITINO_ICEBERG_CATALOG_BACKEND = IcebergConstants.CATALOG_BACKEND;
3030

3131
static final String ICEBERG_CATALOG_TYPE = CatalogUtil.ICEBERG_CATALOG_TYPE;
32+
static final String ICEBERG_CATALOG_IMPL = CatalogProperties.CATALOG_IMPL;
3233

3334
public static final String GRAVITINO_ICEBERG_CATALOG_WAREHOUSE = IcebergConstants.WAREHOUSE;
3435

spark-connector/spark-common/src/main/java/org/apache/gravitino/spark/connector/iceberg/IcebergPropertiesConverter.java

+15-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.HashMap;
2424
import java.util.Map;
2525
import org.apache.commons.lang3.StringUtils;
26+
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergCatalogBackend;
2627
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
2728
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergPropertiesUtils;
2829
import org.apache.gravitino.spark.connector.PropertiesConverter;
@@ -49,7 +50,20 @@ public Map<String, String> toSparkCatalogProperties(Map<String, String> properti
4950
Preconditions.checkArgument(
5051
StringUtils.isNotBlank(catalogBackend),
5152
String.format("%s should not be empty", IcebergConstants.CATALOG_BACKEND));
52-
all.put(IcebergPropertiesConstants.ICEBERG_CATALOG_TYPE, catalogBackend);
53+
if (catalogBackend.equalsIgnoreCase(IcebergCatalogBackend.CUSTOM.name())) {
54+
String catalogBackendImpl = all.remove(IcebergConstants.CATALOG_BACKEND_IMPL);
55+
Preconditions.checkArgument(
56+
StringUtils.isNotBlank(catalogBackendImpl),
57+
String.format(
58+
"%s should not be empty when %s is %s",
59+
IcebergConstants.CATALOG_BACKEND_IMPL,
60+
IcebergConstants.CATALOG_BACKEND,
61+
IcebergCatalogBackend.CUSTOM.name()));
62+
all.put(IcebergPropertiesConstants.ICEBERG_CATALOG_IMPL, catalogBackendImpl);
63+
} else {
64+
all.put(IcebergPropertiesConstants.ICEBERG_CATALOG_TYPE, catalogBackend);
65+
}
66+
5367
all.put(IcebergPropertiesConstants.ICEBERG_CATALOG_CACHE_ENABLED, "FALSE");
5468
return all;
5569
}

spark-connector/spark-common/src/test/java/org/apache/gravitino/spark/connector/iceberg/TestIcebergPropertiesConverter.java

+27
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121

2222
import com.google.common.collect.ImmutableMap;
2323
import java.util.Map;
24+
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergCatalogBackend;
25+
import org.apache.gravitino.catalog.lakehouse.iceberg.IcebergConstants;
2426
import org.junit.jupiter.api.Assertions;
2527
import org.junit.jupiter.api.Test;
2628

@@ -113,4 +115,29 @@ void testCatalogPropertiesWithRestBackend() {
113115
"rest-warehouse"),
114116
properties);
115117
}
118+
119+
@Test
120+
void testCatalogPropertiesWithCustomBackend() {
121+
Map<String, String> properties =
122+
icebergPropertiesConverter.toSparkCatalogProperties(
123+
ImmutableMap.of(
124+
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_BACKEND,
125+
IcebergCatalogBackend.CUSTOM.name(),
126+
IcebergConstants.CATALOG_BACKEND_IMPL,
127+
"CustomCatalog",
128+
IcebergPropertiesConstants.GRAVITINO_ICEBERG_CATALOG_WAREHOUSE,
129+
"custom-warehouse",
130+
"key1",
131+
"value1"));
132+
133+
Assertions.assertEquals(
134+
ImmutableMap.of(
135+
IcebergPropertiesConstants.ICEBERG_CATALOG_CACHE_ENABLED,
136+
"FALSE",
137+
IcebergPropertiesConstants.ICEBERG_CATALOG_IMPL,
138+
"CustomCatalog",
139+
IcebergPropertiesConstants.ICEBERG_CATALOG_WAREHOUSE,
140+
"custom-warehouse"),
141+
properties);
142+
}
116143
}

0 commit comments

Comments
 (0)