Skip to content

Commit e6225a0

Browse files
authored
[#6196] feat(iceberg): adjust table distribution if creating table without specifying disribution mode (#6214)
### What changes were proposed in this pull request? Adjust the distribution mode for creating Iceberg table with none distribution. the following is the Spark adjust logic, the flink is similar. ```java private DistributionMode defaultWriteDistributionMode() { if (table.sortOrder().isSorted()) { return RANGE; } else if (table.spec().isPartitioned()) { return HASH; } else { return NONE; } } ``` ### Why are the changes needed? Fix: #6196 ### Does this PR introduce _any_ user-facing change? Yes, add document ### How was this patch tested? add UT and IT
1 parent 1017f3e commit e6225a0

File tree

4 files changed

+117
-92
lines changed

4 files changed

+117
-92
lines changed

catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergCatalogOperations.java

+18
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import org.apache.gravitino.rel.TableCatalog;
5959
import org.apache.gravitino.rel.TableChange;
6060
import org.apache.gravitino.rel.expressions.distributions.Distribution;
61+
import org.apache.gravitino.rel.expressions.distributions.Distributions;
6162
import org.apache.gravitino.rel.expressions.sorts.SortOrder;
6263
import org.apache.gravitino.rel.expressions.transforms.Transform;
6364
import org.apache.gravitino.rel.indexes.Index;
@@ -513,6 +514,13 @@ public Table createTable(
513514
.build())
514515
.toArray(IcebergColumn[]::new);
515516

517+
// Gravitino NONE distribution means the client side doesn't specify distribution, which is
518+
// not the same as none distribution in Iceberg.
519+
if (Distributions.NONE.equals(distribution)) {
520+
distribution =
521+
getIcebergDefaultDistribution(sortOrders.length > 0, partitioning.length > 0);
522+
}
523+
516524
IcebergTable createdTable =
517525
IcebergTable.builder()
518526
.withName(tableIdent.name())
@@ -588,6 +596,16 @@ public void testConnection(
588596
}
589597
}
590598

599+
private static Distribution getIcebergDefaultDistribution(
600+
boolean isSorted, boolean isPartitioned) {
601+
if (isSorted) {
602+
return Distributions.RANGE;
603+
} else if (isPartitioned) {
604+
return Distributions.HASH;
605+
}
606+
return Distributions.NONE;
607+
}
608+
591609
private static String currentUser() {
592610
return PrincipalUtils.getCurrentUserName();
593611
}

catalogs/catalog-lakehouse-iceberg/src/main/java/org/apache/gravitino/catalog/lakehouse/iceberg/IcebergTable.java

+20-16
Original file line numberDiff line numberDiff line change
@@ -152,21 +152,6 @@ public static IcebergTable fromIcebergTable(TableMetadata table, String tableNam
152152
Schema schema = table.schema();
153153
Transform[] partitionSpec = FromIcebergPartitionSpec.fromPartitionSpec(table.spec(), schema);
154154
SortOrder[] sortOrder = FromIcebergSortOrder.fromSortOrder(table.sortOrder());
155-
Distribution distribution = Distributions.NONE;
156-
String distributionName = properties.get(IcebergTablePropertiesMetadata.DISTRIBUTION_MODE);
157-
if (null != distributionName) {
158-
switch (DistributionMode.fromName(distributionName)) {
159-
case HASH:
160-
distribution = Distributions.HASH;
161-
break;
162-
case RANGE:
163-
distribution = Distributions.RANGE;
164-
break;
165-
default:
166-
// do nothing
167-
break;
168-
}
169-
}
170155
IcebergColumn[] icebergColumns =
171156
schema.columns().stream().map(ConvertUtil::fromNestedField).toArray(IcebergColumn[]::new);
172157
return IcebergTable.builder()
@@ -178,7 +163,7 @@ public static IcebergTable fromIcebergTable(TableMetadata table, String tableNam
178163
.withAuditInfo(AuditInfo.EMPTY)
179164
.withPartitioning(partitionSpec)
180165
.withSortOrders(sortOrder)
181-
.withDistribution(distribution)
166+
.withDistribution(getDistribution(properties))
182167
.build();
183168
}
184169

@@ -236,4 +221,23 @@ protected IcebergTable internalBuild() {
236221
public static Builder builder() {
237222
return new Builder();
238223
}
224+
225+
private static Distribution getDistribution(Map<String, String> properties) {
226+
Distribution distribution = Distributions.NONE;
227+
String distributionName = properties.get(IcebergTablePropertiesMetadata.DISTRIBUTION_MODE);
228+
if (null != distributionName) {
229+
switch (DistributionMode.fromName(distributionName)) {
230+
case HASH:
231+
distribution = Distributions.HASH;
232+
break;
233+
case RANGE:
234+
distribution = Distributions.RANGE;
235+
break;
236+
default:
237+
// do nothing
238+
break;
239+
}
240+
}
241+
return distribution;
242+
}
239243
}

catalogs/catalog-lakehouse-iceberg/src/test/java/org/apache/gravitino/catalog/lakehouse/iceberg/integration/test/CatalogIcebergBaseIT.java

+75-5
Original file line numberDiff line numberDiff line change
@@ -379,6 +379,76 @@ void testCreateTableWithNullComment() {
379379
Assertions.assertNull(loadTable.comment());
380380
}
381381

382+
@Test
383+
void testCreateTableWithNoneDistribution() {
384+
// Create table from Gravitino API
385+
Column[] columns = createColumns();
386+
387+
NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName);
388+
Distribution distribution = Distributions.NONE;
389+
390+
final SortOrder[] sortOrders =
391+
new SortOrder[] {
392+
SortOrders.of(
393+
NamedReference.field(ICEBERG_COL_NAME2),
394+
SortDirection.DESCENDING,
395+
NullOrdering.NULLS_FIRST)
396+
};
397+
398+
Transform[] partitioning = new Transform[] {Transforms.day(columns[1].name())};
399+
Map<String, String> properties = createProperties();
400+
TableCatalog tableCatalog = catalog.asTableCatalog();
401+
Table tableWithPartitionAndSortorder =
402+
tableCatalog.createTable(
403+
tableIdentifier,
404+
columns,
405+
table_comment,
406+
properties,
407+
partitioning,
408+
distribution,
409+
sortOrders);
410+
Assertions.assertEquals(tableName, tableWithPartitionAndSortorder.name());
411+
Assertions.assertEquals(Distributions.RANGE, tableWithPartitionAndSortorder.distribution());
412+
413+
Table loadTable = tableCatalog.loadTable(tableIdentifier);
414+
Assertions.assertEquals(tableName, loadTable.name());
415+
Assertions.assertEquals(Distributions.RANGE, loadTable.distribution());
416+
tableCatalog.dropTable(tableIdentifier);
417+
418+
Table tableWithPartition =
419+
tableCatalog.createTable(
420+
tableIdentifier,
421+
columns,
422+
table_comment,
423+
properties,
424+
partitioning,
425+
distribution,
426+
new SortOrder[0]);
427+
Assertions.assertEquals(tableName, tableWithPartition.name());
428+
Assertions.assertEquals(Distributions.HASH, tableWithPartition.distribution());
429+
430+
loadTable = tableCatalog.loadTable(tableIdentifier);
431+
Assertions.assertEquals(tableName, loadTable.name());
432+
Assertions.assertEquals(Distributions.HASH, loadTable.distribution());
433+
tableCatalog.dropTable(tableIdentifier);
434+
435+
Table tableWithoutPartitionAndSortOrder =
436+
tableCatalog.createTable(
437+
tableIdentifier,
438+
columns,
439+
table_comment,
440+
properties,
441+
new Transform[0],
442+
distribution,
443+
new SortOrder[0]);
444+
Assertions.assertEquals(tableName, tableWithoutPartitionAndSortOrder.name());
445+
Assertions.assertEquals(Distributions.NONE, tableWithoutPartitionAndSortOrder.distribution());
446+
447+
loadTable = tableCatalog.loadTable(tableIdentifier);
448+
Assertions.assertEquals(tableName, loadTable.name());
449+
Assertions.assertEquals(Distributions.NONE, loadTable.distribution());
450+
}
451+
382452
@Test
383453
void testCreateAndLoadIcebergTable() {
384454
// Create table from Gravitino API
@@ -968,9 +1038,9 @@ public void testTableDistribution() {
9681038
columns,
9691039
table_comment,
9701040
properties,
971-
partitioning,
1041+
new Transform[0],
9721042
distribution,
973-
sortOrders);
1043+
new SortOrder[0]);
9741044

9751045
Table loadTable = tableCatalog.loadTable(tableIdentifier);
9761046

@@ -981,8 +1051,8 @@ public void testTableDistribution() {
9811051
Arrays.asList(columns),
9821052
properties,
9831053
distribution,
984-
sortOrders,
985-
partitioning,
1054+
new SortOrder[0],
1055+
new Transform[0],
9861056
loadTable);
9871057

9881058
Assertions.assertDoesNotThrow(() -> tableCatalog.dropTable(tableIdentifier));
@@ -1179,7 +1249,7 @@ public void testTableSortOrder() {
11791249
Column[] columns = createColumns();
11801250

11811251
NameIdentifier tableIdentifier = NameIdentifier.of(schemaName, tableName);
1182-
Distribution distribution = Distributions.NONE;
1252+
Distribution distribution = Distributions.HASH;
11831253

11841254
final SortOrder[] sortOrders =
11851255
new SortOrder[] {

docs/lakehouse-iceberg-catalog.md

+4-71
Original file line numberDiff line numberDiff line change
@@ -220,79 +220,12 @@ For `bucket` and `truncate`, the first argument must be integer literal, and the
220220

221221
### Table distributions
222222

223-
- Gravitino used by default `NoneDistribution`.
223+
- Support `HashDistribution`, which distribute data by partition key.
224+
- Support `RangeDistribution`, which distribute data by partition key or sort key for a SortOrder table.
225+
- Doesn't support `EvenDistribution`.
224226

225-
<Tabs groupId='language' queryString>
226-
<TabItem value="json" label="JSON">
227-
228-
```json
229-
{
230-
"strategy": "none",
231-
"number": 0,
232-
"expressions": []
233-
}
234-
```
235-
236-
</TabItem>
237-
<TabItem value="java" label="Java">
238-
239-
```java
240-
Distributions.NONE;
241-
```
242-
243-
</TabItem>
244-
</Tabs>
245-
246-
- Support `HashDistribution`, Hash distribute by partition key.
247-
248-
<Tabs groupId='language' queryString>
249-
<TabItem value="json" label="JSON">
250-
251-
```json
252-
{
253-
"strategy": "hash",
254-
"number": 0,
255-
"expressions": []
256-
}
257-
```
258-
</TabItem>
259-
<TabItem value="java" label="Java">
260-
261-
```java
262-
Distributions.HASH;
263-
```
264-
265-
</TabItem>
266-
</Tabs>
267-
268-
- Support `RangeDistribution`, You can pass `range` as values through the API. Range distribute by partition key or sort key if table has an SortOrder.
269-
270-
<Tabs groupId='language' queryString>
271-
<TabItem value="json" label="JSON">
272-
273-
```json
274-
{
275-
"strategy": "range",
276-
"number": 0,
277-
"expressions": []
278-
}
279-
```
280-
281-
</TabItem>
282-
<TabItem value="java" label="Java">
283-
284-
```java
285-
Distributions.RANGE;
286-
```
287-
288-
</TabItem>
289-
</Tabs>
290-
291-
:::info
292-
Iceberg automatically distributes the data according to the partition or table sort order. It is forbidden to specify distribution expressions.
293-
:::
294227
:::info
295-
Apache Iceberg doesn't support Gravitino `EvenDistribution` type.
228+
If you doesn't specify distribution expressions, the table distribution will be adjusted to `RangeDistribution` for a sort order table, to `HashDistribution` for a partition table.
296229
:::
297230

298231
### Table column types

0 commit comments

Comments
 (0)