Skip to content

Commit

Permalink
feat(iceberg): improve concurrency control and resilience (datahub-pr…
Browse files Browse the repository at this point in the history
  • Loading branch information
ksrinath authored and PeteMango committed Mar 3, 2025
1 parent 92372f5 commit 0606ab1
Show file tree
Hide file tree
Showing 12 changed files with 2,111 additions and 513 deletions.
1 change: 1 addition & 0 deletions metadata-service/iceberg-catalog/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ dependencies {
implementation project(':metadata-models')
implementation project(':metadata-utils')
implementation project(':metadata-operation-context')
implementation project(':metadata-io')
implementation project(':metadata-integration:java:datahub-schematron:lib')
implementation 'org.apache.iceberg:iceberg-core:1.6.1'
implementation 'org.apache.iceberg:iceberg-aws:1.6.1'
Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
package io.datahubproject.iceberg.catalog;

import static com.linkedin.metadata.Constants.*;
import static com.linkedin.metadata.utils.GenericRecordUtils.serializeAspect;
import static io.datahubproject.iceberg.catalog.Utils.*;

import com.google.common.util.concurrent.Striped;
import com.linkedin.common.AuditStamp;
import com.google.common.annotations.VisibleForTesting;
import com.linkedin.common.FabricType;
import com.linkedin.common.Status;
import com.linkedin.common.urn.DatasetUrn;
import com.linkedin.common.urn.Urn;
import com.linkedin.container.Container;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.dataplatforminstance.IcebergWarehouseInfo;
import com.linkedin.dataset.DatasetProperties;
import com.linkedin.dataset.IcebergCatalogInfo;
import com.linkedin.entity.EnvelopedAspect;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.aspect.batch.AspectsBatch;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.metadata.entity.IngestResult;
import com.linkedin.platformresource.PlatformResourceInfo;
import com.linkedin.secret.DataHubSecretValue;
import com.linkedin.util.Pair;
Expand All @@ -24,13 +25,13 @@
import io.datahubproject.metadata.services.SecretService;
import java.net.URISyntaxException;
import java.util.*;
import java.util.concurrent.locks.Lock;
import lombok.Getter;
import lombok.SneakyThrows;
import org.apache.iceberg.CatalogUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.exceptions.*;

@Slf4j
public class DataHubIcebergWarehouse {

public static final String DATASET_ICEBERG_METADATA_ASPECT_NAME = "icebergCatalogInfo";
Expand All @@ -47,11 +48,8 @@ public class DataHubIcebergWarehouse {

@Getter private final String platformInstance;

// TODO: Need to handle locks for deployments with multiple GMS replicas.
private static final Striped<Lock> resourceLocks =
Striped.lazyWeakLock(Runtime.getRuntime().availableProcessors() * 2);

private DataHubIcebergWarehouse(
@VisibleForTesting
DataHubIcebergWarehouse(
String platformInstance,
IcebergWarehouseInfo icebergWarehouse,
EntityService entityService,
Expand Down Expand Up @@ -121,39 +119,96 @@ public String getDataRoot() {
return icebergWarehouse.getDataRoot();
}

@SneakyThrows
public Optional<DatasetUrn> getDatasetUrn(TableIdentifier tableIdentifier) {
Urn resourceUrn = resourceUrn(tableIdentifier);
PlatformResourceInfo platformResourceInfo =
(PlatformResourceInfo)
entityService.getLatestAspect(
operationContext, resourceUrn, PLATFORM_RESOURCE_INFO_ASPECT_NAME);
if (platformResourceInfo == null) {
Optional<PlatformResourceInfo> platformResourceInfo =
getLatestAspectNonRemoved(resourceUrn, PLATFORM_RESOURCE_INFO_ASPECT_NAME);

if (platformResourceInfo.isEmpty()) {
return Optional.empty();
}
try {
return Optional.of(DatasetUrn.createFromString(platformResourceInfo.getPrimaryKey()));
} catch (URISyntaxException e) {
throw new RuntimeException("Invalid dataset urn " + platformResourceInfo.getPrimaryKey(), e);

return Optional.of(DatasetUrn.createFromString(platformResourceInfo.get().getPrimaryKey()));
}

private <T extends RecordTemplate> Optional<T> getLatestAspectNonRemoved(
Urn urn, String aspectName) {
Map<Urn, List<RecordTemplate>> aspectsMap =
entityService.getLatestAspects(
operationContext, Set.of(urn), Set.of(STATUS_ASPECT_NAME, aspectName), false);

if (aspectsMap == null || aspectsMap.isEmpty()) {
return Optional.empty();
}
List<RecordTemplate> aspects = aspectsMap.get(urn);
if (aspects == null || aspects.isEmpty()) {
return Optional.empty();
}

T result = null;

for (RecordTemplate aspect : aspects) {
if (aspect instanceof Status status) {
if (status.isRemoved()) {
return Optional.empty();
}
} else {
result = (T) aspect;
}
}

return Optional.ofNullable(result);
}

public IcebergCatalogInfo getIcebergMetadata(TableIdentifier tableIdentifier) {
private Optional<EnvelopedAspect> getLatestEnvelopedAspectNonRemoved(Urn urn, String aspectName)
throws URISyntaxException {

Map<Urn, List<EnvelopedAspect>> aspectsMap =
entityService.getLatestEnvelopedAspects(
operationContext, Set.of(urn), Set.of(STATUS_ASPECT_NAME, aspectName), false);

if (aspectsMap == null || aspectsMap.isEmpty()) {
return Optional.empty();
}
List<EnvelopedAspect> aspects = aspectsMap.get(urn);
if (aspects == null || aspects.isEmpty()) {
return Optional.empty();
}

EnvelopedAspect result = null;

for (EnvelopedAspect aspect : aspects) {
if (STATUS_ASPECT_NAME.equals(aspect.getName())) {
Status status = new Status(aspect.getValue().data());
if (status.isRemoved()) {
return Optional.empty();
}
} else {
result = aspect;
}
}

return Optional.ofNullable(result);
}

public Optional<IcebergCatalogInfo> getIcebergMetadata(TableIdentifier tableIdentifier) {
Optional<DatasetUrn> datasetUrn = getDatasetUrn(tableIdentifier);
if (datasetUrn.isEmpty()) {
return null;
return Optional.empty();
}

IcebergCatalogInfo icebergMeta =
(IcebergCatalogInfo)
entityService.getLatestAspect(
operationContext, datasetUrn.get(), DATASET_ICEBERG_METADATA_ASPECT_NAME);
Optional<IcebergCatalogInfo> icebergMeta =
getLatestAspectNonRemoved(datasetUrn.get(), DATASET_ICEBERG_METADATA_ASPECT_NAME);

if (icebergMeta == null) {
throw new IllegalStateException(
if (icebergMeta.isEmpty()) {
// possibly some deletion cleanup is pending; log error & return as if dataset doesn't exist.
log.error(
String.format(
"IcebergMetadata not found for resource %s, dataset %s",
resourceUrn(tableIdentifier), datasetUrn.get()));
}

return icebergMeta;
}

Expand All @@ -165,19 +220,19 @@ public Pair<EnvelopedAspect, DatasetUrn> getIcebergMetadataEnveloped(
}

try {
EnvelopedAspect existingEnveloped =
entityService.getLatestEnvelopedAspect(
operationContext,
DATASET_ENTITY_NAME,
datasetUrn.get(),
DATASET_ICEBERG_METADATA_ASPECT_NAME);
if (existingEnveloped == null) {
throw new IllegalStateException(
Optional<EnvelopedAspect> existingEnveloped =
getLatestEnvelopedAspectNonRemoved(
datasetUrn.get(), DATASET_ICEBERG_METADATA_ASPECT_NAME);
if (existingEnveloped.isEmpty()) {
// possibly some deletion cleanup is pending; log error & return as if dataset doesn't
// exist.
log.error(
String.format(
"IcebergMetadata not found for resource %s, dataset %s",
resourceUrn(tableIdentifier), datasetUrn.get()));
return null;
}
return Pair.of(existingEnveloped, datasetUrn.get());
return Pair.of(existingEnveloped.get(), datasetUrn.get());
} catch (Exception e) {
throw new RuntimeException(
"Error fetching IcebergMetadata aspect for dataset " + datasetUrn.get(), e);
Expand All @@ -186,79 +241,121 @@ public Pair<EnvelopedAspect, DatasetUrn> getIcebergMetadataEnveloped(

public boolean deleteDataset(TableIdentifier tableIdentifier) {
Urn resourceUrn = resourceUrn(tableIdentifier);
if (!entityService.exists(operationContext, resourceUrn)) {
return false;
}

// guard against concurrent modifications that depend on the resource (rename table/view)
Lock lock = resourceLocks.get(resourceUrn);
lock.lock();
try {
if (!entityService.exists(operationContext, resourceUrn)) {
return false;
}
Optional<DatasetUrn> urn = getDatasetUrn(tableIdentifier);
Optional<DatasetUrn> datasetUrn = getDatasetUrn(tableIdentifier);
if (datasetUrn.isEmpty()) {
log.warn("Dataset urn not found for platform resource {}; cleaning up resource", resourceUrn);
entityService.deleteUrn(operationContext, resourceUrn);
urn.ifPresent(x -> entityService.deleteUrn(operationContext, x));
return true;
} finally {
lock.unlock();
return false;
}

IcebergBatch icebergBatch = newIcebergBatch(operationContext);
icebergBatch.softDeleteEntity(resourceUrn, PLATFORM_RESOURCE_ENTITY_NAME);
icebergBatch.softDeleteEntity(datasetUrn.get(), DATASET_ENTITY_NAME);

AspectsBatch aspectsBatch = icebergBatch.asAspectsBatch();
List<IngestResult> ingestResults =
entityService.ingestProposal(operationContext, aspectsBatch, false);

boolean result = true;
for (IngestResult ingestResult : ingestResults) {
if (ingestResult.getResult().isNoOp()) {
result = false;
break;
}
}

entityService.deleteUrn(operationContext, resourceUrn);
entityService.deleteUrn(operationContext, datasetUrn.get());

return result;
}

public DatasetUrn createDataset(
TableIdentifier tableIdentifier, boolean view, AuditStamp auditStamp) {
TableIdentifier tableIdentifier, boolean view, IcebergBatch icebergBatch) {
String datasetName = platformInstance + "." + UUID.randomUUID();
DatasetUrn datasetUrn = new DatasetUrn(platformUrn(), datasetName, fabricType());
createResource(datasetUrn, tableIdentifier, view, auditStamp);

createResource(datasetUrn, tableIdentifier, view, icebergBatch);

return datasetUrn;
}

public DatasetUrn renameDataset(
TableIdentifier fromTableId, TableIdentifier toTableId, boolean view, AuditStamp auditStamp) {
public void renameDataset(TableIdentifier fromTableId, TableIdentifier toTableId, boolean view) {

Optional<DatasetUrn> optDatasetUrn = getDatasetUrn(fromTableId);
if (optDatasetUrn.isEmpty()) {
throw noSuchEntity(view, fromTableId);
}

DatasetUrn datasetUrn = optDatasetUrn.get();

IcebergBatch icebergBatch = newIcebergBatch(operationContext);
icebergBatch.softDeleteEntity(resourceUrn(fromTableId), PLATFORM_RESOURCE_ENTITY_NAME);
createResource(datasetUrn, toTableId, view, icebergBatch);

DatasetProperties datasetProperties =
new DatasetProperties()
.setName(toTableId.name())
.setQualifiedName(fullTableName(platformInstance, toTableId));

// guard against concurrent modifications to the resource (other renames, deletion)
Lock lock = resourceLocks.get(resourceUrn(fromTableId));
lock.lock();
IcebergBatch.EntityBatch datasetBatch =
icebergBatch.updateEntity(datasetUrn, DATASET_ENTITY_NAME);
datasetBatch.aspect(DATASET_PROPERTIES_ASPECT_NAME, datasetProperties);

if (!fromTableId.namespace().equals(toTableId.namespace())) {
Container container =
new Container().setContainer(containerUrn(platformInstance, toTableId.namespace()));
datasetBatch.aspect(CONTAINER_ASPECT_NAME, container);
}

try {
Optional<DatasetUrn> optDatasetUrn = getDatasetUrn(fromTableId);
if (optDatasetUrn.isEmpty()) {
if (view) {
throw new NoSuchViewException(
"No such view %s", fullTableName(platformInstance, fromTableId));
} else {
throw new NoSuchTableException(
"No such table %s", fullTableName(platformInstance, fromTableId));
}
AspectsBatch aspectsBatch = icebergBatch.asAspectsBatch();
entityService.ingestProposal(operationContext, aspectsBatch, false);
} catch (ValidationException e) {
if (!entityService.exists(operationContext, resourceUrn(fromTableId), false)) {
// someone else deleted "fromTable" before we could get through
throw noSuchEntity(view, fromTableId);
}

DatasetUrn datasetUrn = optDatasetUrn.get();
try {
createResource(datasetUrn, toTableId, view, auditStamp);
} catch (ValidationException e) {
if (entityService.exists(operationContext, resourceUrn(toTableId), true)) {
throw new AlreadyExistsException(
"%s already exists: %s",
view ? "View" : "Table", fullTableName(platformInstance, toTableId));
}
entityService.deleteUrn(operationContext, resourceUrn(fromTableId));
return datasetUrn;
} finally {
lock.unlock();
throw new IllegalStateException(
String.format(
"Rename operation failed inexplicably, from %s to %s in warehouse %s",
fromTableId, toTableId, platformInstance));
}

entityService.deleteUrn(operationContext, resourceUrn(fromTableId));
}

private RuntimeException noSuchEntity(boolean view, TableIdentifier tableIdentifier) {
return view
? new NoSuchViewException(
"No such view %s", fullTableName(platformInstance, tableIdentifier))
: new NoSuchTableException(
"No such table %s", fullTableName(platformInstance, tableIdentifier));
}

private void createResource(
DatasetUrn datasetUrn, TableIdentifier tableIdentifier, boolean view, AuditStamp auditStamp) {
DatasetUrn datasetUrn,
TableIdentifier tableIdentifier,
boolean view,
IcebergBatch icebergBatch) {
PlatformResourceInfo resourceInfo =
new PlatformResourceInfo().setPrimaryKey(datasetUrn.toString());
resourceInfo.setResourceType(view ? "icebergView" : "icebergTable");

MetadataChangeProposal mcp = new MetadataChangeProposal();
mcp.setEntityUrn(resourceUrn(tableIdentifier));
mcp.setEntityType(PLATFORM_RESOURCE_ENTITY_NAME);
mcp.setAspectName(PLATFORM_RESOURCE_INFO_ASPECT_NAME);
mcp.setChangeType(ChangeType.CREATE_ENTITY);
mcp.setAspect(serializeAspect(resourceInfo));

entityService.ingestProposal(operationContext, mcp, auditStamp, false);
icebergBatch.createEntity(
resourceUrn(tableIdentifier),
PLATFORM_RESOURCE_ENTITY_NAME,
PLATFORM_RESOURCE_INFO_ASPECT_NAME,
resourceInfo);
}

private FabricType fabricType() {
Expand All @@ -268,8 +365,15 @@ private FabricType fabricType() {
@SneakyThrows
private Urn resourceUrn(TableIdentifier tableIdentifier) {
return Urn.createFromString(
String.format(
"urn:li:platformResource:%s.%s",
PLATFORM_NAME, CatalogUtil.fullTableName(platformInstance, tableIdentifier)));
String.format("urn:li:platformResource:%s.%s", PLATFORM_NAME, tableName(tableIdentifier)));
}

private String tableName(TableIdentifier tableIdentifier) {
return fullTableName(platformInstance, tableIdentifier);
}

@VisibleForTesting
IcebergBatch newIcebergBatch(OperationContext operationContext) {
return new IcebergBatch(operationContext);
}
}
Loading

0 comments on commit 0606ab1

Please sign in to comment.