Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#6566] improvement(core): Add the cache mechanism for metalake and use cache to load in-use information. #6569

Merged
merged 22 commits into from
Mar 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions core/src/main/java/org/apache/gravitino/GravitinoEnv.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,8 @@ public class GravitinoEnv {

private CatalogManager catalogManager;

private MetalakeManager metalakeManager;

private SchemaDispatcher schemaDispatcher;

private TableDispatcher tableDispatcher;
Expand Down Expand Up @@ -390,6 +392,10 @@ public void shutdown() {
eventListenerManager.stop();
}

if (metalakeManager != null) {
metalakeManager.close();
}

LOG.info("Gravitino Environment is shut down.");
}

Expand All @@ -414,10 +420,13 @@ private void initGravitinoServerComponents() {
// create and initialize a random id generator
this.idGenerator = new RandomIdGenerator();

// Tree lock
this.lockManager = new LockManager(config);

// Create and initialize metalake related modules, the operation chain is:
// MetalakeEventDispatcher -> MetalakeNormalizeDispatcher -> MetalakeHookDispatcher ->
// MetalakeManager
MetalakeDispatcher metalakeManager = new MetalakeManager(entityStore, idGenerator);
this.metalakeManager = new MetalakeManager(entityStore, idGenerator);
MetalakeHookDispatcher metalakeHookDispatcher = new MetalakeHookDispatcher(metalakeManager);
MetalakeNormalizeDispatcher metalakeNormalizeDispatcher =
new MetalakeNormalizeDispatcher(metalakeHookDispatcher);
Expand Down Expand Up @@ -498,9 +507,6 @@ private void initGravitinoServerComponents() {
this.auxServiceManager = new AuxiliaryServiceManager();
this.auxServiceManager.serviceInit(config);

// Tree lock
this.lockManager = new LockManager(config);

// Create and initialize Tag related modules
this.tagDispatcher = new TagEventDispatcher(eventBus, new TagManager(idGenerator, entityStore));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ private ModelCatalog asModels() {

private final Config config;

@VisibleForTesting final Cache<NameIdentifier, CatalogWrapper> catalogCache;
@VisibleForTesting static Cache<NameIdentifier, CatalogWrapper> catalogCache;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we make it static? I assume there will be only one CatalogManager, so there should be only one catalogCache, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we make it static?

The method check catalogInUse and metalakeInUse are all static. If we want to use cache for them, we need to change it to static

assume there will be only one CatalogManager, so there should be only one catalogCache, right?

Yes, there will be only one cache and all catalogs shares the same instance, It's not a big problem I think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should named CATALOG_CACHE?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should it be final?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, let me change it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we use static final flag, then Gravitino server configuraoin like gravitino.catalog.cache.evictionIntervalMs should be remove as we can't used it in static code block, Is that acceptable to you?


private final EntityStore store;

Expand All @@ -281,7 +281,7 @@ public CatalogManager(Config config, EntityStore store, IdGenerator idGenerator)
this.idGenerator = idGenerator;

long cacheEvictionIntervalInMs = config.get(Configs.CATALOG_CACHE_EVICTION_INTERVAL_MS);
this.catalogCache =
catalogCache =
Caffeine.newBuilder()
.expireAfterAccess(cacheEvictionIntervalInMs, TimeUnit.MILLISECONDS)
.removalListener(
Expand Down Expand Up @@ -848,8 +848,13 @@ private static boolean catalogInUse(EntityStore store, NameIdentifier ident)

private static boolean getCatalogInUseValue(EntityStore store, NameIdentifier catalogIdent) {
try {
CatalogEntity catalogEntity =
store.get(catalogIdent, EntityType.CATALOG, CatalogEntity.class);
CatalogWrapper wrapper = catalogCache.getIfPresent(catalogIdent);
CatalogEntity catalogEntity;
if (wrapper != null) {
catalogEntity = wrapper.catalog.entity();
} else {
catalogEntity = store.get(catalogIdent, EntityType.CATALOG, CatalogEntity.class);
}
return (boolean)
BASIC_CATALOG_PROPERTIES_METADATA.getOrDefault(
catalogEntity.getProperties(), PROPERTY_IN_USE);
Expand Down
186 changes: 121 additions & 65 deletions core/src/main/java/org/apache/gravitino/metalake/MetalakeManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,21 @@

import static org.apache.gravitino.Metalake.PROPERTY_IN_USE;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Scheduler;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.Closeable;
import java.io.IOException;
import java.time.Instant;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.gravitino.Entity.EntityType;
import org.apache.gravitino.EntityAlreadyExistsException;
import org.apache.gravitino.EntityStore;
Expand Down Expand Up @@ -54,7 +62,7 @@
import org.slf4j.LoggerFactory;

/** Manages Metalakes within the Apache Gravitino system. */
public class MetalakeManager implements MetalakeDispatcher {
public class MetalakeManager implements MetalakeDispatcher, Closeable {

private static final String METALAKE_DOES_NOT_EXIST_MSG = "Metalake %s does not exist";

Expand All @@ -64,6 +72,28 @@ public class MetalakeManager implements MetalakeDispatcher {

private final IdGenerator idGenerator;

// Currently, there will be only one MetalakeManager instance in the system. In this case
// we can clear or close the cache when the instance is destroyed.
@VisibleForTesting
static final Cache<NameIdentifier, BaseMetalake> METALAKE_CACHE =
Caffeine.newBuilder()
.expireAfterAccess(24, TimeUnit.HOURS)
.removalListener((k, v, c) -> LOG.info("Closing metalake {}.", k))
.scheduler(
Scheduler.forScheduledExecutorService(
new ScheduledThreadPoolExecutor(
1,
new ThreadFactoryBuilder()
.setDaemon(true)
.setNameFormat("metalake-cleaner-%d")
.build())))
.build();

@Override
public void close() {
METALAKE_CACHE.invalidateAll();
}

/**
* Constructs a MetalakeManager instance.
*
Expand All @@ -73,6 +103,13 @@ public class MetalakeManager implements MetalakeDispatcher {
public MetalakeManager(EntityStore store, IdGenerator idGenerator) {
this.store = store;
this.idGenerator = idGenerator;

// pre-load all metalakes and put them into cache, this is useful when user load schema/table
// directly without list/get metalake first.
BaseMetalake[] metalakes = listMetalakes();
for (BaseMetalake metalake : metalakes) {
METALAKE_CACHE.put(metalake.nameIdentifier(), metalake);
}
}

/**
Expand Down Expand Up @@ -103,10 +140,12 @@ public static void checkMetalake(NameIdentifier ident, EntityStore store)
public static boolean metalakeInUse(EntityStore store, NameIdentifier ident)
throws NoSuchMetalakeException {
try {
BaseMetalake metalake = store.get(ident, EntityType.METALAKE, BaseMetalake.class);
BaseMetalake metalake = METALAKE_CACHE.getIfPresent(ident);
if (metalake == null) {
metalake = store.get(ident, EntityType.METALAKE, BaseMetalake.class);
}
return (boolean)
metalake.propertiesMetadata().getOrDefault(metalake.properties(), PROPERTY_IN_USE);

} catch (NoSuchEntityException e) {
LOG.warn("Metalake {} does not exist", ident, e);
throw new NoSuchMetalakeException(METALAKE_DOES_NOT_EXIST_MSG, ident);
Expand Down Expand Up @@ -149,20 +188,25 @@ public BaseMetalake[] listMetalakes() {
*/
@Override
public BaseMetalake loadMetalake(NameIdentifier ident) throws NoSuchMetalakeException {
try {
BaseMetalake baseMetalake =
TreeLockUtils.doWithTreeLock(
ident,
LockType.READ,
() -> store.get(ident, EntityType.METALAKE, BaseMetalake.class));
return newMetalakeWithResolvedProperties(baseMetalake);
} catch (NoSuchEntityException e) {
LOG.warn("Metalake {} does not exist", ident, e);
throw new NoSuchMetalakeException(METALAKE_DOES_NOT_EXIST_MSG, ident);
} catch (IOException ioe) {
LOG.error("Loading Metalake {} failed due to storage issues", ident, ioe);
throw new RuntimeException(ioe);
}
return TreeLockUtils.doWithTreeLock(
ident,
LockType.READ,
() ->
METALAKE_CACHE.get(
ident,
k -> {
try {
BaseMetalake baseMetalake =
store.get(ident, EntityType.METALAKE, BaseMetalake.class);
return newMetalakeWithResolvedProperties(baseMetalake);
} catch (NoSuchEntityException e) {
LOG.warn("Metalake {} does not exist", ident, e);
throw new NoSuchMetalakeException(METALAKE_DOES_NOT_EXIST_MSG, ident);
} catch (IOException ioe) {
LOG.error("Loading Metalake {} failed due to storage issues", ident, ioe);
throw new RuntimeException(ioe);
}
}));
}

private BaseMetalake newMetalakeWithResolvedProperties(BaseMetalake metalakeEntity) {
Expand Down Expand Up @@ -222,6 +266,7 @@ public BaseMetalake createMetalake(
() -> {
try {
store.put(metalake, false /* overwritten */);
METALAKE_CACHE.put(ident, newMetalakeWithResolvedProperties(metalake));
return metalake;
} catch (EntityAlreadyExistsException | AlreadyExistsException e) {
LOG.warn("Metalake {} already exists", ident, e);
Expand Down Expand Up @@ -253,22 +298,25 @@ public BaseMetalake alterMetalake(NameIdentifier ident, MetalakeChange... change
throw new MetalakeNotInUseException(
"Metalake %s is not in use, please enable it first", ident);
}

return store.update(
ident,
BaseMetalake.class,
EntityType.METALAKE,
metalake -> {
BaseMetalake.Builder builder = newMetalakeBuilder(metalake);
Map<String, String> newProps =
metalake.properties() == null
? Maps.newHashMap()
: Maps.newHashMap(metalake.properties());
builder = updateEntity(builder, newProps, changes);

return builder.build();
});

METALAKE_CACHE.invalidate(ident);
BaseMetalake baseMetalake =
store.update(
ident,
BaseMetalake.class,
EntityType.METALAKE,
metalake -> {
BaseMetalake.Builder builder = newMetalakeBuilder(metalake);
Map<String, String> newProps =
metalake.properties() == null
? Maps.newHashMap()
: Maps.newHashMap(metalake.properties());
builder = updateEntity(builder, newProps, changes);

return builder.build();
});
METALAKE_CACHE.put(
baseMetalake.nameIdentifier(), newMetalakeWithResolvedProperties(baseMetalake));
return baseMetalake;
} catch (NoSuchEntityException ne) {
LOG.warn("Metalake {} does not exist", ident, ne);
throw new NoSuchMetalakeException(METALAKE_DOES_NOT_EXIST_MSG, ident);
Expand Down Expand Up @@ -305,6 +353,8 @@ public boolean dropMetalake(NameIdentifier ident, boolean force)
"Metalake %s is in use, please disable it first or use force option", ident);
}

METALAKE_CACHE.invalidate(ident);

List<CatalogEntity> catalogEntities =
store.list(Namespace.of(ident.name()), CatalogEntity.class, EntityType.CATALOG);
if (!catalogEntities.isEmpty() && !force) {
Expand All @@ -331,22 +381,25 @@ public void enableMetalake(NameIdentifier ident) throws NoSuchMetalakeException
try {
boolean inUse = metalakeInUse(store, ident);
if (!inUse) {
store.update(
ident,
BaseMetalake.class,
EntityType.METALAKE,
metalake -> {
BaseMetalake.Builder builder = newMetalakeBuilder(metalake);

Map<String, String> newProps =
metalake.properties() == null
? Maps.newHashMap()
: Maps.newHashMap(metalake.properties());
newProps.put(PROPERTY_IN_USE, "true");
builder.withProperties(newProps);

return builder.build();
});
METALAKE_CACHE.invalidate(ident);
BaseMetalake baseMetalake =
store.update(
ident,
BaseMetalake.class,
EntityType.METALAKE,
metalake -> {
BaseMetalake.Builder builder = newMetalakeBuilder(metalake);

Map<String, String> newProps =
metalake.properties() == null
? Maps.newHashMap()
: Maps.newHashMap(metalake.properties());
newProps.put(PROPERTY_IN_USE, "true");
builder.withProperties(newProps);

return builder.build();
});
METALAKE_CACHE.put(ident, newMetalakeWithResolvedProperties(baseMetalake));
}

return null;
Expand All @@ -365,22 +418,25 @@ public void disableMetalake(NameIdentifier ident) throws NoSuchMetalakeException
try {
boolean inUse = metalakeInUse(store, ident);
if (inUse) {
store.update(
ident,
BaseMetalake.class,
EntityType.METALAKE,
metalake -> {
BaseMetalake.Builder builder = newMetalakeBuilder(metalake);

Map<String, String> newProps =
metalake.properties() == null
? Maps.newHashMap()
: Maps.newHashMap(metalake.properties());
newProps.put(PROPERTY_IN_USE, "false");
builder.withProperties(newProps);

return builder.build();
});
METALAKE_CACHE.invalidate(ident);
BaseMetalake baseMetalake =
store.update(
ident,
BaseMetalake.class,
EntityType.METALAKE,
metalake -> {
BaseMetalake.Builder builder = newMetalakeBuilder(metalake);

Map<String, String> newProps =
metalake.properties() == null
? Maps.newHashMap()
: Maps.newHashMap(metalake.properties());
newProps.put(PROPERTY_IN_USE, "false");
builder.withProperties(newProps);

return builder.build();
});
METALAKE_CACHE.put(ident, newMetalakeWithResolvedProperties(baseMetalake));
}
return null;
} catch (IOException e) {
Expand Down
Loading