Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add snapshot reproducer #1276

Merged
merged 8 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions paket.dependencies
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ source https://api.nuget.org/v3/index.json
// PRODUCTION STUFF
nuget Microsoft.Extensions.Http.Polly 8.0.3

// AWS
nuget AWSSDK.Extensions.NETCore.Setup 3.7.300

// For more healtchecks, look at https://github.com/Xabaril/AspNetCore.Diagnostics.HealthChecks
nuget AspNetCore.HealthChecks.SqlServer 8.0.0
nuget AspNetCore.HealthChecks.NpgSql 8.0.0
Expand Down Expand Up @@ -78,6 +81,7 @@ nuget Be.Vlaanderen.Basisregisters.GrAr.Contracts 21.14.1
nuget Be.Vlaanderen.Basisregisters.GrAr.Edit 21.14.1
nuget Be.Vlaanderen.Basisregisters.GrAr.Import 21.14.1
nuget Be.Vlaanderen.Basisregisters.GrAr.Legacy 21.14.1
nuget Be.Vlaanderen.Basisregisters.GrAr.Notifications 21.14.1
nuget Be.Vlaanderen.Basisregisters.GrAr.Provenance 21.14.1
nuget Be.Vlaanderen.Basisregisters.GrAr.Extracts 21.14.1
nuget Be.Vlaanderen.Basisregisters.GrAr.Oslo 21.14.1
Expand Down
12 changes: 11 additions & 1 deletion paket.lock
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,18 @@ NUGET
AutoFixture.Xunit2 (4.18.1)
AutoFixture (>= 4.18.1)
xunit.extensibility.core (>= 2.2 < 3.0)
AWSSDK.Core (3.7.302.20)
AWSSDK.Core (3.7.400.36)
AWSSDK.DynamoDBv2 (3.7.301.23)
AWSSDK.Core (>= 3.7.302.20 < 4.0)
AWSSDK.Extensions.NETCore.Setup (3.7.300)
AWSSDK.Core (>= 3.7.300)
Microsoft.Extensions.Configuration.Abstractions (>= 2.0)
Microsoft.Extensions.DependencyInjection.Abstractions (>= 2.0)
Microsoft.Extensions.Logging.Abstractions (>= 2.0)
AWSSDK.S3 (3.7.307)
AWSSDK.Core (>= 3.7.302.20 < 4.0)
AWSSDK.SimpleNotificationService (3.7.301.5)
AWSSDK.Core (>= 3.7.302.18 < 4.0)
AWSSDK.SQS (3.7.300.59)
AWSSDK.Core (>= 3.7.302.20 < 4.0)
Azure.Core (1.38)
Expand Down Expand Up @@ -283,6 +290,9 @@ NUGET
Be.Vlaanderen.Basisregisters.GrAr.Common (21.14.1)
Be.Vlaanderen.Basisregisters.Utilities.Rfc3339DateTimeOffset (>= 4.0)
Newtonsoft.Json (>= 13.0.3)
Be.Vlaanderen.Basisregisters.GrAr.Notifications (21.14.1)
AWSSDK.SimpleNotificationService (>= 3.7.301.3)
System.Text.Json (>= 8.0.3)
Be.Vlaanderen.Basisregisters.GrAr.Oslo (21.14.1)
Be.Vlaanderen.Basisregisters.AspNetCore.Mvc.Formatters.Json (>= 5.0)
Be.Vlaanderen.Basisregisters.GrAr.Common (21.14.1)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
namespace BuildingRegistry.Producer.Snapshot.Oslo
{
using System;
using System.Collections.Generic;
using System.Linq;
using Be.Vlaanderen.Basisregisters.GrAr.Notifications;
using Be.Vlaanderen.Basisregisters.GrAr.Oslo.SnapshotProducer;
using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Producer;
using Dapper;
using Microsoft.Extensions.Logging;
using NodaTime;
using Npgsql;

public class BuildingSnapshotReproducer : SnapshotReproducer
{
private readonly string _integrationConnectionString;

public BuildingSnapshotReproducer(
string integrationConnectionString,
IOsloProxy osloProxy,
IProducer producer,
IClock clock,
INotificationService notificationService,
int utcHourToRunWithin,
ILoggerFactory loggerFactory)
: base(osloProxy, producer, clock, notificationService, utcHourToRunWithin, loggerFactory)
{
_integrationConnectionString = integrationConnectionString;
}

protected override List<(int PersistentLocalId, long Position)> GetIdsToProcess(DateTime utcNow)
{
using var connection = new NpgsqlConnection(_integrationConnectionString);

var todayMidnight = utcNow.Date;
var yesterdayMidnight = todayMidnight.AddDays(-1);

var records = connection.Query<BuildingPosition>(
$"""
SELECT building_persistent_local_id, position, version_timestamp
FROM integration_building.building_versions
where version_timestamp >= '{yesterdayMidnight:yyyy-MM-dd}' and version_timestamp < '{todayMidnight:yyyy-MM-dd}'
""");

var duplicateEvents = records
.GroupBy(x => new
{
BuildingPersistentLocalId = x.building_persistent_local_id,
TimeStamp = x.version_timestamp.ToString("yyyyMMddHHmmss") // Format the timestamp to seconds as OSLO API doesn't return the milliseconds of the timestamp
})
.Where(x => x.Count() > 1)
.Select(x =>
{
var latest = x.MaxBy(y => y.position)!;
return (latest.building_persistent_local_id, latest.position);
})
.ToList();

return duplicateEvents;
}

private sealed class BuildingPosition
{
public int building_persistent_local_id { get; init; }
public long position { get; init; }
public DateTimeOffset version_timestamp { get; init; }
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
namespace BuildingRegistry.Producer.Snapshot.Oslo
{
using System;
using System.Collections.Generic;
using System.Linq;
using Be.Vlaanderen.Basisregisters.GrAr.Notifications;
using Be.Vlaanderen.Basisregisters.GrAr.Oslo.SnapshotProducer;
using Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Producer;
using Dapper;
using Microsoft.Extensions.Logging;
using NodaTime;
using Npgsql;

public class BuildingUnitSnapshotReproducer : SnapshotReproducer
{
private readonly string _integrationConnectionString;

public BuildingUnitSnapshotReproducer(
string integrationConnectionString,
IOsloProxy osloProxy,
IProducer producer,
IClock clock,
INotificationService notificationService,
int utcHourToRunWithin,
ILoggerFactory loggerFactory)
: base(osloProxy, producer, clock, notificationService, utcHourToRunWithin, loggerFactory)
{
_integrationConnectionString = integrationConnectionString;
}

protected override List<(int PersistentLocalId, long Position)> GetIdsToProcess(DateTime utcNow)
{
using var connection = new NpgsqlConnection(_integrationConnectionString);

var todayMidnight = utcNow.Date;
var yesterdayMidnight = todayMidnight.AddDays(-1);

var records = connection.Query<BuildingUnitPosition>(
$"""
SELECT building_unit_persistent_local_id, position, version_timestamp
FROM integration_building.building_unit_versions
where
version_timestamp >= '{yesterdayMidnight:yyyy-MM-dd}'
and version_timestamp < '{todayMidnight:yyyy-MM-dd}'
""");

var duplicateEvents = records
.GroupBy(x => new
{
BuildingUnitPersistentLocalId = x.building_unit_persistent_local_id,
TimeStamp = x.version_timestamp.ToString("yyyyMMddHHmmss") // Format the timestamp to seconds as OSLO API doesn't return the milliseconds of the timestamp
})
.Where(x => x.Count() > 1)
.Select(x =>
{
var latest = x.MaxBy(y => y.position)!;
return (latest.building_unit_persistent_local_id, latest.position);
})
.ToList();

return duplicateEvents;
}

private sealed class BuildingUnitPosition
{
public int building_unit_persistent_local_id { get; init; }
public long position { get; init; }
public DateTimeOffset version_timestamp { get; init; }
}
}
}
Loading
Loading