Skip to content

Commit a4190e1

Browse files
authored
[#5739] feat(model-catalog): Implement the model catalog logic (#5848)
### What changes were proposed in this pull request? This PR adds the model catalog implementation. ### Why are the changes needed? This is a part of work to support model management in Gravitino. Fix: #5739 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added UTs to cover the changes.
1 parent 5e9919e commit a4190e1

24 files changed

+1812
-187
lines changed

api/src/main/java/org/apache/gravitino/model/ModelCatalog.java

+12-8
Original file line numberDiff line numberDiff line change
@@ -79,12 +79,11 @@ default boolean modelExists(NameIdentifier ident) {
7979
* @param properties The properties of the model. The properties are optional and can be null or
8080
* empty.
8181
* @return The registered model object.
82+
* @throws NoSuchSchemaException If the schema does not exist.
8283
* @throws ModelAlreadyExistsException If the model already registered.
8384
*/
84-
default Model registerModel(NameIdentifier ident, String comment, Map<String, String> properties)
85-
throws ModelAlreadyExistsException {
86-
return registerModel(ident, null, new String[0], comment, properties);
87-
}
85+
Model registerModel(NameIdentifier ident, String comment, Map<String, String> properties)
86+
throws NoSuchSchemaException, ModelAlreadyExistsException;
8887

8988
/**
9089
* Register a model in the catalog if the model is not existed, otherwise the {@link
@@ -99,16 +98,22 @@ default Model registerModel(NameIdentifier ident, String comment, Map<String, St
9998
* @param properties The properties of the model. The properties are optional and can be null or
10099
* empty.
101100
* @return The registered model object.
101+
* @throws NoSuchSchemaException If the schema does not exist when register a model.
102102
* @throws ModelAlreadyExistsException If the model already registered.
103103
* @throws ModelVersionAliasesAlreadyExistException If the aliases already exist in the model.
104104
*/
105-
Model registerModel(
105+
default Model registerModel(
106106
NameIdentifier ident,
107107
String uri,
108108
String[] aliases,
109109
String comment,
110110
Map<String, String> properties)
111-
throws ModelAlreadyExistsException, ModelVersionAliasesAlreadyExistException;
111+
throws NoSuchSchemaException, ModelAlreadyExistsException,
112+
ModelVersionAliasesAlreadyExistException {
113+
Model model = registerModel(ident, comment, properties);
114+
linkModelVersion(ident, uri, aliases, comment, properties);
115+
return model;
116+
}
112117

113118
/**
114119
* Delete the model from the catalog. If the model does not exist, return false. Otherwise, return
@@ -197,11 +202,10 @@ default boolean modelVersionExists(NameIdentifier ident, String alias) {
197202
* @param comment The comment of the model version. The comment is optional and can be null.
198203
* @param properties The properties of the model version. The properties are optional and can be
199204
* null or empty.
200-
* @return The model version object.
201205
* @throws NoSuchModelException If the model does not exist.
202206
* @throws ModelVersionAliasesAlreadyExistException If the aliases already exist in the model.
203207
*/
204-
ModelVersion linkModelVersion(
208+
void linkModelVersion(
205209
NameIdentifier ident,
206210
String uri,
207211
String[] aliases,

catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/HadoopCatalogOperations.java

+31-112
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,12 @@
4444
import org.apache.gravitino.audit.CallerContext;
4545
import org.apache.gravitino.audit.FilesetAuditConstants;
4646
import org.apache.gravitino.audit.FilesetDataOperation;
47+
import org.apache.gravitino.catalog.ManagedSchemaOperations;
4748
import org.apache.gravitino.catalog.hadoop.fs.FileSystemProvider;
4849
import org.apache.gravitino.catalog.hadoop.fs.FileSystemUtils;
4950
import org.apache.gravitino.connector.CatalogInfo;
5051
import org.apache.gravitino.connector.CatalogOperations;
5152
import org.apache.gravitino.connector.HasPropertyMetadata;
52-
import org.apache.gravitino.connector.SupportsSchemas;
5353
import org.apache.gravitino.exceptions.AlreadyExistsException;
5454
import org.apache.gravitino.exceptions.FilesetAlreadyExistsException;
5555
import org.apache.gravitino.exceptions.GravitinoRuntimeException;
@@ -74,7 +74,8 @@
7474
import org.slf4j.Logger;
7575
import org.slf4j.LoggerFactory;
7676

77-
public class HadoopCatalogOperations implements CatalogOperations, SupportsSchemas, FilesetCatalog {
77+
public class HadoopCatalogOperations extends ManagedSchemaOperations
78+
implements CatalogOperations, FilesetCatalog {
7879
private static final String SCHEMA_DOES_NOT_EXIST_MSG = "Schema %s does not exist";
7980
private static final String FILESET_DOES_NOT_EXIST_MSG = "Fileset %s does not exist";
8081
private static final String SLASH = "/";
@@ -104,7 +105,8 @@ public HadoopCatalogOperations() {
104105
this(GravitinoEnv.getInstance().entityStore());
105106
}
106107

107-
public EntityStore getStore() {
108+
@Override
109+
public EntityStore store() {
108110
return store;
109111
}
110112

@@ -451,19 +453,6 @@ public String getFileLocation(NameIdentifier ident, String subPath)
451453
return fileLocation;
452454
}
453455

454-
@Override
455-
public NameIdentifier[] listSchemas(Namespace namespace) throws NoSuchCatalogException {
456-
try {
457-
List<SchemaEntity> schemas =
458-
store.list(namespace, SchemaEntity.class, Entity.EntityType.SCHEMA);
459-
return schemas.stream()
460-
.map(s -> NameIdentifier.of(namespace, s.name()))
461-
.toArray(NameIdentifier[]::new);
462-
} catch (IOException e) {
463-
throw new RuntimeException("Failed to list schemas under namespace " + namespace, e);
464-
}
465-
}
466-
467456
@Override
468457
public Schema createSchema(NameIdentifier ident, String comment, Map<String, String> properties)
469458
throws NoSuchCatalogException, SchemaAlreadyExistsException {
@@ -496,53 +485,7 @@ public Schema createSchema(NameIdentifier ident, String comment, Map<String, Str
496485
}
497486
}
498487

499-
StringIdentifier stringId = StringIdentifier.fromProperties(properties);
500-
Preconditions.checkNotNull(stringId, "Property String identifier should not be null");
501-
502-
SchemaEntity schemaEntity =
503-
SchemaEntity.builder()
504-
.withName(ident.name())
505-
.withId(stringId.id())
506-
.withNamespace(ident.namespace())
507-
.withComment(comment)
508-
.withProperties(properties)
509-
.withAuditInfo(
510-
AuditInfo.builder()
511-
.withCreator(PrincipalUtils.getCurrentPrincipal().getName())
512-
.withCreateTime(Instant.now())
513-
.build())
514-
.build();
515-
try {
516-
store.put(schemaEntity, true /* overwrite */);
517-
} catch (IOException ioe) {
518-
throw new RuntimeException("Failed to create schema " + ident, ioe);
519-
}
520-
521-
return HadoopSchema.builder()
522-
.withName(ident.name())
523-
.withComment(comment)
524-
.withProperties(schemaEntity.properties())
525-
.withAuditInfo(schemaEntity.auditInfo())
526-
.build();
527-
}
528-
529-
@Override
530-
public Schema loadSchema(NameIdentifier ident) throws NoSuchSchemaException {
531-
try {
532-
SchemaEntity schemaEntity = store.get(ident, Entity.EntityType.SCHEMA, SchemaEntity.class);
533-
534-
return HadoopSchema.builder()
535-
.withName(ident.name())
536-
.withComment(schemaEntity.comment())
537-
.withProperties(schemaEntity.properties())
538-
.withAuditInfo(schemaEntity.auditInfo())
539-
.build();
540-
541-
} catch (NoSuchEntityException exception) {
542-
throw new NoSuchSchemaException(exception, SCHEMA_DOES_NOT_EXIST_MSG, ident);
543-
} catch (IOException ioe) {
544-
throw new RuntimeException("Failed to load schema " + ident, ioe);
545-
}
488+
return super.createSchema(ident, comment, properties);
546489
}
547490

548491
@Override
@@ -556,32 +499,7 @@ public Schema alterSchema(NameIdentifier ident, SchemaChange... changes)
556499
throw new RuntimeException("Failed to check if schema " + ident + " exists", ioe);
557500
}
558501

559-
try {
560-
SchemaEntity entity =
561-
store.update(
562-
ident,
563-
SchemaEntity.class,
564-
Entity.EntityType.SCHEMA,
565-
schemaEntity -> updateSchemaEntity(ident, schemaEntity, changes));
566-
567-
return HadoopSchema.builder()
568-
.withName(ident.name())
569-
.withComment(entity.comment())
570-
.withProperties(entity.properties())
571-
.withAuditInfo(entity.auditInfo())
572-
.build();
573-
574-
} catch (IOException ioe) {
575-
throw new RuntimeException("Failed to update schema " + ident, ioe);
576-
} catch (NoSuchEntityException nsee) {
577-
throw new NoSuchSchemaException(nsee, SCHEMA_DOES_NOT_EXIST_MSG, ident);
578-
} catch (AlreadyExistsException aee) {
579-
throw new RuntimeException(
580-
"Schema with the same name "
581-
+ ident.name()
582-
+ " already exists, this is unexpected because schema doesn't support rename",
583-
aee);
584-
}
502+
return super.alterSchema(ident, changes);
585503
}
586504

587505
@Override
@@ -600,6 +518,16 @@ public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmpty
600518
throw new NonEmptySchemaException("Schema %s is not empty", ident);
601519
}
602520

521+
SchemaEntity schemaEntity = store.get(ident, Entity.EntityType.SCHEMA, SchemaEntity.class);
522+
Map<String, String> properties =
523+
Optional.ofNullable(schemaEntity.properties()).orElse(Collections.emptyMap());
524+
Path schemaPath = getSchemaPath(ident.name(), properties);
525+
526+
boolean dropped = super.dropSchema(ident, cascade);
527+
if (!dropped) {
528+
return false;
529+
}
530+
603531
// Delete all the managed filesets no matter whether the storage location is under the
604532
// schema path or not.
605533
// The reason why we delete the managed fileset's storage location one by one is because we
@@ -635,30 +563,21 @@ public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmpty
635563
}
636564
});
637565

638-
SchemaEntity schemaEntity = store.get(ident, Entity.EntityType.SCHEMA, SchemaEntity.class);
639-
Map<String, String> properties =
640-
Optional.ofNullable(schemaEntity.properties()).orElse(Collections.emptyMap());
641-
642566
// Delete the schema path if it exists and is empty.
643-
Path schemaPath = getSchemaPath(ident.name(), properties);
644-
// Nothing to delete if the schema path is not set.
645-
if (schemaPath == null) {
646-
return false;
647-
}
648-
649-
FileSystem fs = getFileSystem(schemaPath, conf);
650-
// Nothing to delete if the schema path does not exist.
651-
if (!fs.exists(schemaPath)) {
652-
return false;
653-
}
654-
655-
FileStatus[] statuses = fs.listStatus(schemaPath);
656-
if (statuses.length == 0) {
657-
if (fs.delete(schemaPath, true)) {
658-
LOG.info("Deleted schema {} location {}", ident, schemaPath);
659-
} else {
660-
LOG.warn("Failed to delete schema {} location {}", ident, schemaPath);
661-
return false;
567+
if (schemaPath != null) {
568+
FileSystem fs = getFileSystem(schemaPath, conf);
569+
if (fs.exists(schemaPath)) {
570+
FileStatus[] statuses = fs.listStatus(schemaPath);
571+
if (statuses.length == 0) {
572+
if (fs.delete(schemaPath, true)) {
573+
LOG.info("Deleted schema {} location {}", ident, schemaPath);
574+
} else {
575+
LOG.warn(
576+
"Failed to delete schema {} because it has files/folders under location {}",
577+
ident,
578+
schemaPath);
579+
}
580+
}
662581
}
663582
}
664583

catalogs/catalog-hadoop/src/main/java/org/apache/gravitino/catalog/hadoop/SecureHadoopCatalogOperations.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ public boolean dropFileset(NameIdentifier ident) {
107107
try {
108108
filesetEntity =
109109
hadoopCatalogOperations
110-
.getStore()
110+
.store()
111111
.get(ident, Entity.EntityType.FILESET, FilesetEntity.class);
112112
} catch (NoSuchEntityException e) {
113113
LOG.warn("Fileset {} does not exist", ident);
@@ -143,9 +143,7 @@ public Schema createSchema(NameIdentifier ident, String comment, Map<String, Str
143143
public boolean dropSchema(NameIdentifier ident, boolean cascade) throws NonEmptySchemaException {
144144
try {
145145
SchemaEntity schemaEntity =
146-
hadoopCatalogOperations
147-
.getStore()
148-
.get(ident, Entity.EntityType.SCHEMA, SchemaEntity.class);
146+
hadoopCatalogOperations.store().get(ident, Entity.EntityType.SCHEMA, SchemaEntity.class);
149147
Map<String, String> properties =
150148
Optional.ofNullable(schemaEntity.properties()).orElse(Collections.emptyMap());
151149

catalogs/catalog-hadoop/src/test/java/org/apache/gravitino/catalog/hadoop/TestHadoopCatalogOperations.java

+3
Original file line numberDiff line numberDiff line change
@@ -446,6 +446,7 @@ public void testDropSchema() throws IOException {
446446
Assertions.assertFalse(fs.exists(schemaPath));
447447

448448
// Test drop non-empty schema with cascade = false
449+
createSchema(name, comment, catalogPath, null);
449450
Fileset fs1 = createFileset("fs1", name, "comment", Fileset.Type.MANAGED, catalogPath, null);
450451
Path fs1Path = new Path(fs1.storageLocation());
451452

@@ -459,6 +460,7 @@ public void testDropSchema() throws IOException {
459460
Assertions.assertFalse(fs.exists(fs1Path));
460461

461462
// Test drop both managed and external filesets
463+
createSchema(name, comment, catalogPath, null);
462464
Fileset fs2 = createFileset("fs2", name, "comment", Fileset.Type.MANAGED, catalogPath, null);
463465
Path fs2Path = new Path(fs2.storageLocation());
464466

@@ -472,6 +474,7 @@ public void testDropSchema() throws IOException {
472474
Assertions.assertTrue(fs.exists(fs3Path));
473475

474476
// Test drop schema with different storage location
477+
createSchema(name, comment, catalogPath, null);
475478
Path fs4Path = new Path(TEST_ROOT_PATH + "/fs4");
476479
createFileset("fs4", name, "comment", Fileset.Type.MANAGED, catalogPath, fs4Path.toString());
477480
ops.dropSchema(id, true);

catalogs/catalog-model/build.gradle.kts

+8-2
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,17 @@ dependencies {
4040
exclude(group = "*")
4141
}
4242

43-
compileOnly(libs.guava)
44-
43+
implementation(libs.guava)
4544
implementation(libs.slf4j.api)
4645

46+
testImplementation(project(":clients:client-java"))
47+
testImplementation(project(":integration-test-common", "testArtifacts"))
48+
testImplementation(project(":server"))
49+
testImplementation(project(":server-common"))
50+
4751
testImplementation(libs.bundles.log4j)
52+
testImplementation(libs.commons.io)
53+
testImplementation(libs.commons.lang3)
4854
testImplementation(libs.mockito.core)
4955
testImplementation(libs.mockito.inline)
5056
testImplementation(libs.junit.jupiter.api)

catalogs/catalog-model/src/main/java/org/apache/gravitino/catalog/model/ModelCatalog.java catalogs/catalog-model/src/main/java/org/apache/gravitino/catalog/model/ModelCatalogImpl.java

+5-2
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,14 @@
2020

2121
import java.util.Map;
2222
import org.apache.gravitino.CatalogProvider;
23+
import org.apache.gravitino.EntityStore;
24+
import org.apache.gravitino.GravitinoEnv;
2325
import org.apache.gravitino.connector.BaseCatalog;
2426
import org.apache.gravitino.connector.CatalogOperations;
2527
import org.apache.gravitino.connector.PropertiesMetadata;
2628
import org.apache.gravitino.connector.capability.Capability;
2729

28-
public class ModelCatalog extends BaseCatalog<ModelCatalog> {
30+
public class ModelCatalogImpl extends BaseCatalog<ModelCatalogImpl> {
2931

3032
private static final ModelCatalogPropertiesMetadata CATALOG_PROPERTIES_META =
3133
new ModelCatalogPropertiesMetadata();
@@ -43,7 +45,8 @@ public String shortName() {
4345

4446
@Override
4547
protected CatalogOperations newOps(Map<String, String> config) {
46-
return null;
48+
EntityStore store = GravitinoEnv.getInstance().entityStore();
49+
return new ModelCatalogOperations(store);
4750
}
4851

4952
@Override

0 commit comments

Comments
 (0)