Skip to content

Commit

Permalink
Allow to delete schemas. (#745)
Browse files Browse the repository at this point in the history
* Allow to delete schemas.
  • Loading branch information
PawelPawelec-RDX authored May 16, 2024
1 parent c6fca15 commit 3b50288
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,33 +68,40 @@
using RadixDlt.NetworkGateway.PostgresIntegration.Models;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using CoreModel = RadixDlt.CoreApiSdk.Model;

namespace RadixDlt.NetworkGateway.PostgresIntegration.LedgerExtension;

internal record struct SchemaDefinitionEntryDbLookup(long EntityId, ValueBytes SchemaHash);
internal record struct SchemaChangePointerLookup(long EntityId, long StateVersion);

internal record SchemaChangePointer
{
public List<CoreModel.SchemaEntrySubstate> Entries { get; } = new();

public List<string> DeletedSchemaHashes { get; } = new();
}

internal class EntitySchemaProcessor
{
private readonly ProcessorContext _context;
private readonly byte _networkId;

private readonly ChangeTracker<SchemaChangePointerLookup, SchemaChangePointer> _changes = new();

private readonly Dictionary<long, SchemaEntryAggregateHistory> _mostRecentAggregates = new();
private readonly Dictionary<SchemaDefinitionEntryDbLookup, SchemaEntryDefinition> _existingSchemas = new();

private readonly List<SchemaEntryAggregateHistory> _aggregatesToAdd = new();
private readonly List<SchemaEntryDefinition> _definitionsToAdd = new();

public EntitySchemaProcessor(ProcessorContext context)
public EntitySchemaProcessor(ProcessorContext context, byte networkId)
{
_context = context;
_networkId = networkId;
}

public void VisitUpsert(CoreModel.Substate substateData, ReferencedEntity referencedEntity, long stateVersion)
Expand All @@ -108,9 +115,24 @@ public void VisitUpsert(CoreModel.Substate substateData, ReferencedEntity refere
}
}

public void VisitDelete(CoreModel.SubstateId substateId, ReferencedEntity referencedEntity, long stateVersion)
{
if (substateId.SubstateType == CoreModel.SubstateType.SchemaEntry)
{
var keyHex = ((CoreModel.MapSubstateKey)substateId.SubstateKey).KeyHex;
var schemaHash = ScryptoSborUtils.DataToProgrammaticScryptoSborValueBytes(keyHex.ConvertFromHex(), _networkId);

_changes
.GetOrAdd(new SchemaChangePointerLookup(referencedEntity.DatabaseId, stateVersion), _ => new SchemaChangePointer())
.DeletedSchemaHashes
.Add(schemaHash.Hex);
}
}

public async Task LoadDependencies()
{
_mostRecentAggregates.AddRange(await MostRecentSchemaEntryAggregateHistory());
_existingSchemas.AddRange(await LoadExistingSchemas());
}

public void ProcessChanges()
Expand Down Expand Up @@ -157,6 +179,29 @@ public void ProcessChanges()

aggregate.EntryIds.Insert(0, entryDefinition.Id);
}

foreach (var deletedSchemaHash in change.DeletedSchemaHashes)
{
var entryLookup = new SchemaDefinitionEntryDbLookup(lookup.EntityId, deletedSchemaHash.ConvertFromHex());

if (_existingSchemas.TryGetValue(entryLookup, out var previousEntry))
{
var currentPosition = aggregate.EntryIds.IndexOf(previousEntry.Id);

if (currentPosition != -1)
{
aggregate.EntryIds.RemoveAt(currentPosition);
}
else
{
throw new UnreachableException($"Unexpected situation where SchemaEntryDefinition with EntityId:{entryLookup.EntityId}, SchemaHash:{deletedSchemaHash} got deleted but wasn't found in aggregate table.");
}
}
else
{
throw new UnreachableException($"Unexpected situation where SchemaEntryDefinition with EntityId:{entryLookup.EntityId}, SchemaHash:{deletedSchemaHash} got deleted but wasn't found in gateway database.");
}
}
}
}

Expand Down Expand Up @@ -196,6 +241,31 @@ LIMIT 1
e => e.EntityId);
}

private async Task<IDictionary<SchemaDefinitionEntryDbLookup, SchemaEntryDefinition>> LoadExistingSchemas()
{
var lookupSet = new HashSet<SchemaDefinitionEntryDbLookup>();

foreach (var (lookup, change) in _changes.AsEnumerable())
{
foreach (var deletedSchemaHash in change.DeletedSchemaHashes)
{
lookupSet.Add(new SchemaDefinitionEntryDbLookup(lookup.EntityId, deletedSchemaHash.ConvertFromHex()));
}
}

if (!lookupSet.Unzip(x => x.EntityId, x => (byte[])x.SchemaHash, out var entityIds, out var schemaHashes))
{
return ImmutableDictionary<SchemaDefinitionEntryDbLookup, SchemaEntryDefinition>.Empty;
}

return await _context.ReadHelper.LoadDependencies<SchemaDefinitionEntryDbLookup, SchemaEntryDefinition>(
@$"
SELECT *
FROM schema_entry_definition
WHERE (entity_id, schema_hash) IN (SELECT UNNEST({entityIds}), UNNEST({schemaHashes}));",
e => new SchemaDefinitionEntryDbLookup(e.EntityId, e.SchemaHash));
}

private Task<int> CopySchemaEntryDefinitions() => _context.WriteHelper.Copy(
_definitionsToAdd,
"COPY schema_entry_definition (id, from_state_version, entity_id, schema_hash, schema) FROM STDIN (FORMAT BINARY)",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -808,7 +808,7 @@ private async Task<ExtendLedgerReport> ProcessTransactions(ReadWriteDbContext db
var processorContext = new ProcessorContext(sequences, readHelper, writeHelper, token);
var entityStateProcessor = new EntityStateProcessor(processorContext, referencedEntities);
var entityMetadataProcessor = new EntityMetadataProcessor(processorContext);
var entitySchemaProcessor = new EntitySchemaProcessor(processorContext);
var entitySchemaProcessor = new EntitySchemaProcessor(processorContext, networkConfiguration.Id);
var componentMethodRoyaltyProcessor = new ComponentMethodRoyaltyProcessor(processorContext);
var entityRoleAssignmentProcessor = new EntityRoleAssignmentProcessor(processorContext);
var packageCodeProcessor = new PackageCodeProcessor(processorContext, networkConfiguration.Id);
Expand Down Expand Up @@ -999,6 +999,7 @@ private async Task<ExtendLedgerReport> ProcessTransactions(ReadWriteDbContext db
}

packageCodeProcessor.VisitDelete(substateId, referencedEntity, stateVersion);
entitySchemaProcessor.VisitDelete(substateId, referencedEntity, stateVersion);
}

var transaction = ledgerTransactionsToAdd.Single(x => x.StateVersion == stateVersion);
Expand Down

0 comments on commit 3b50288

Please sign in to comment.