diff --git a/src/RadixDlt.NetworkGateway.PostgresIntegration/LedgerExtension/EntitySchemaProcessor.cs b/src/RadixDlt.NetworkGateway.PostgresIntegration/LedgerExtension/EntitySchemaProcessor.cs index ca4b24db2..3d5c1a824 100644 --- a/src/RadixDlt.NetworkGateway.PostgresIntegration/LedgerExtension/EntitySchemaProcessor.cs +++ b/src/RadixDlt.NetworkGateway.PostgresIntegration/LedgerExtension/EntitySchemaProcessor.cs @@ -68,33 +68,40 @@ using RadixDlt.NetworkGateway.PostgresIntegration.Models; using System.Collections.Generic; using System.Collections.Immutable; +using System.Diagnostics; using System.Linq; using System.Threading.Tasks; using CoreModel = RadixDlt.CoreApiSdk.Model; namespace RadixDlt.NetworkGateway.PostgresIntegration.LedgerExtension; +internal record struct SchemaDefinitionEntryDbLookup(long EntityId, ValueBytes SchemaHash); internal record struct SchemaChangePointerLookup(long EntityId, long StateVersion); internal record SchemaChangePointer { public List Entries { get; } = new(); + + public List DeletedSchemaHashes { get; } = new(); } internal class EntitySchemaProcessor { private readonly ProcessorContext _context; + private readonly byte _networkId; private readonly ChangeTracker _changes = new(); private readonly Dictionary _mostRecentAggregates = new(); + private readonly Dictionary _existingSchemas = new(); private readonly List _aggregatesToAdd = new(); private readonly List _definitionsToAdd = new(); - public EntitySchemaProcessor(ProcessorContext context) + public EntitySchemaProcessor(ProcessorContext context, byte networkId) { _context = context; + _networkId = networkId; } public void VisitUpsert(CoreModel.Substate substateData, ReferencedEntity referencedEntity, long stateVersion) @@ -108,9 +115,24 @@ public void VisitUpsert(CoreModel.Substate substateData, ReferencedEntity refere } } + public void VisitDelete(CoreModel.SubstateId substateId, ReferencedEntity referencedEntity, long stateVersion) + { + if (substateId.SubstateType == CoreModel.SubstateType.SchemaEntry) + { + var keyHex = ((CoreModel.MapSubstateKey)substateId.SubstateKey).KeyHex; + var schemaHash = ScryptoSborUtils.DataToProgrammaticScryptoSborValueBytes(keyHex.ConvertFromHex(), _networkId); + + _changes + .GetOrAdd(new SchemaChangePointerLookup(referencedEntity.DatabaseId, stateVersion), _ => new SchemaChangePointer()) + .DeletedSchemaHashes + .Add(schemaHash.Hex); + } + } + public async Task LoadDependencies() { _mostRecentAggregates.AddRange(await MostRecentSchemaEntryAggregateHistory()); + _existingSchemas.AddRange(await LoadExistingSchemas()); } public void ProcessChanges() @@ -157,6 +179,29 @@ public void ProcessChanges() aggregate.EntryIds.Insert(0, entryDefinition.Id); } + + foreach (var deletedSchemaHash in change.DeletedSchemaHashes) + { + var entryLookup = new SchemaDefinitionEntryDbLookup(lookup.EntityId, deletedSchemaHash.ConvertFromHex()); + + if (_existingSchemas.TryGetValue(entryLookup, out var previousEntry)) + { + var currentPosition = aggregate.EntryIds.IndexOf(previousEntry.Id); + + if (currentPosition != -1) + { + aggregate.EntryIds.RemoveAt(currentPosition); + } + else + { + throw new UnreachableException($"Unexpected situation where SchemaEntryDefinition with EntityId:{entryLookup.EntityId}, SchemaHash:{deletedSchemaHash} got deleted but wasn't found in aggregate table."); + } + } + else + { + throw new UnreachableException($"Unexpected situation where SchemaEntryDefinition with EntityId:{entryLookup.EntityId}, SchemaHash:{deletedSchemaHash} got deleted but wasn't found in gateway database."); + } + } } } @@ -196,6 +241,31 @@ LIMIT 1 e => e.EntityId); } + private async Task> LoadExistingSchemas() + { + var lookupSet = new HashSet(); + + foreach (var (lookup, change) in _changes.AsEnumerable()) + { + foreach (var deletedSchemaHash in change.DeletedSchemaHashes) + { + lookupSet.Add(new SchemaDefinitionEntryDbLookup(lookup.EntityId, deletedSchemaHash.ConvertFromHex())); + } + } + + if (!lookupSet.Unzip(x => x.EntityId, x => (byte[])x.SchemaHash, out var entityIds, out var schemaHashes)) + { + return ImmutableDictionary.Empty; + } + + return await _context.ReadHelper.LoadDependencies( + @$" +SELECT * +FROM schema_entry_definition +WHERE (entity_id, schema_hash) IN (SELECT UNNEST({entityIds}), UNNEST({schemaHashes}));", + e => new SchemaDefinitionEntryDbLookup(e.EntityId, e.SchemaHash)); + } + private Task CopySchemaEntryDefinitions() => _context.WriteHelper.Copy( _definitionsToAdd, "COPY schema_entry_definition (id, from_state_version, entity_id, schema_hash, schema) FROM STDIN (FORMAT BINARY)", diff --git a/src/RadixDlt.NetworkGateway.PostgresIntegration/LedgerExtension/PostgresLedgerExtenderService.cs b/src/RadixDlt.NetworkGateway.PostgresIntegration/LedgerExtension/PostgresLedgerExtenderService.cs index 6d06cd87e..1930e333c 100644 --- a/src/RadixDlt.NetworkGateway.PostgresIntegration/LedgerExtension/PostgresLedgerExtenderService.cs +++ b/src/RadixDlt.NetworkGateway.PostgresIntegration/LedgerExtension/PostgresLedgerExtenderService.cs @@ -808,7 +808,7 @@ private async Task ProcessTransactions(ReadWriteDbContext db var processorContext = new ProcessorContext(sequences, readHelper, writeHelper, token); var entityStateProcessor = new EntityStateProcessor(processorContext, referencedEntities); var entityMetadataProcessor = new EntityMetadataProcessor(processorContext); - var entitySchemaProcessor = new EntitySchemaProcessor(processorContext); + var entitySchemaProcessor = new EntitySchemaProcessor(processorContext, networkConfiguration.Id); var componentMethodRoyaltyProcessor = new ComponentMethodRoyaltyProcessor(processorContext); var entityRoleAssignmentProcessor = new EntityRoleAssignmentProcessor(processorContext); var packageCodeProcessor = new PackageCodeProcessor(processorContext, networkConfiguration.Id); @@ -999,6 +999,7 @@ private async Task ProcessTransactions(ReadWriteDbContext db } packageCodeProcessor.VisitDelete(substateId, referencedEntity, stateVersion); + entitySchemaProcessor.VisitDelete(substateId, referencedEntity, stateVersion); } var transaction = ledgerTransactionsToAdd.Single(x => x.StateVersion == stateVersion);