Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: configure consumer offset without deploy #1287

Merged
merged 2 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ namespace BuildingRegistry.Consumer.Address

public class ConsumerAddressContext : SqlServerConsumerDbContext<ConsumerAddressContext>, IAddresses
{
public DbSet<AddressConsumerItem> AddressConsumerItems { get; set; }
public DbSet<AddressConsumerItem> AddressConsumerItems => Set<AddressConsumerItem>();
public DbSet<OffsetOverride> OffsetOverrides => Set<OffsetOverride>();

// This needs to be here to please EF
public ConsumerAddressContext()
Expand All @@ -30,6 +31,12 @@ public ConsumerAddressContext(DbContextOptions<ConsumerAddressContext> options)

public override string ProcessedMessagesSchema => Schema.ConsumerAddress;

public OffsetOverride? GetOffsetOverride(string consumerGroupId)
{
return OffsetOverrides
.SingleOrDefault(x => x.ConsumerGroupId == consumerGroupId && x.Configured == false);
}

public AddressData? GetOptional(AddressPersistentLocalId addressPersistentLocalId)
{
var item = AddressConsumerItems
Expand Down
29 changes: 10 additions & 19 deletions src/BuildingRegistry.Consumer.Address/Infrastructure/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public static async Task Main(string[] args)

builder.Register(c =>
{
var bootstrapServers = hostContext.Configuration["Kafka:BootstrapServers"];
var bootstrapServers = hostContext.Configuration["Kafka:BootstrapServers"]!;
var topic = $"{hostContext.Configuration["AddressTopic"]}" ?? throw new ArgumentException("Configuration has no AddressTopic.");
var suffix = hostContext.Configuration["GroupSuffix"];
var consumerGroupId = $"BuildingRegistry.ConsumerAddress.{topic}{suffix}";
Expand All @@ -113,26 +113,17 @@ public static async Task Main(string[] args)
EventsJsonSerializerSettingsProvider.CreateSerializerSettings());

consumerOptions.ConfigureSaslAuthentication(new SaslAuthentication(
hostContext.Configuration["Kafka:SaslUserName"],
hostContext.Configuration["Kafka:SaslPassword"]));
hostContext.Configuration["Kafka:SaslUserName"]!,
hostContext.Configuration["Kafka:SaslPassword"]!));

var offsetStr = hostContext.Configuration["AddressTopicOffset"];
if (!string.IsNullOrEmpty(offsetStr) && long.TryParse(offsetStr, out var offset))
{
var ignoreDataCheck = hostContext.Configuration.GetValue<bool>("IgnoreAddressTopicOffsetDataCheck", false);

if (!ignoreDataCheck)
{
using var ctx = c.Resolve<ConsumerAddressContext>();
using var ctx = c.Resolve<ConsumerAddressContext>();
var offsetOverride = ctx.GetOffsetOverride(consumerGroupId);

if (ctx.AddressConsumerItems.Any())
{
throw new InvalidOperationException(
$"Cannot set Kafka offset to {offset} because {nameof(ctx.AddressConsumerItems)} has data.");
}
}

consumerOptions.ConfigureOffset(new Offset(offset));
if (offsetOverride is not null)
{
consumerOptions.ConfigureOffset(new Offset(offsetOverride.Offset));
offsetOverride.Configured = true;
ctx.SaveChanges();
}

return consumerOptions;
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
using Microsoft.EntityFrameworkCore.Migrations;

#nullable disable

namespace BuildingRegistry.Consumer.Address.Migrations
{
/// <inheritdoc />
public partial class AddOffsetOverride : Migration
{
/// <inheritdoc />
protected override void Up(MigrationBuilder migrationBuilder)
{
migrationBuilder.CreateTable(
name: "OffsetOverrides",
schema: "BuildingRegistryConsumerAddress",
columns: table => new
{
ConsumerGroupId = table.Column<string>(type: "nvarchar(450)", nullable: false),
Offset = table.Column<long>(type: "bigint", nullable: false),
Configured = table.Column<bool>(type: "bit", nullable: false)
},
constraints: table =>
{
table.PrimaryKey("PK_OffsetOverrides", x => x.ConsumerGroupId);
});
}

/// <inheritdoc />
protected override void Down(MigrationBuilder migrationBuilder)
{
migrationBuilder.DropTable(
name: "OffsetOverrides",
schema: "BuildingRegistryConsumerAddress");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ protected override void BuildModel(ModelBuilder modelBuilder)
{
#pragma warning disable 612, 618
modelBuilder
.HasAnnotation("ProductVersion", "6.0.3")
.HasAnnotation("ProductVersion", "8.0.3")
.HasAnnotation("Relational:MaxIdentifierLength", 128);

SqlServerModelBuilderExtensions.UseIdentityColumns(modelBuilder, 1L, 1);
SqlServerModelBuilderExtensions.UseIdentityColumns(modelBuilder);

modelBuilder.Entity("Be.Vlaanderen.Basisregisters.MessageHandling.Kafka.Consumer.ProcessedMessage", b =>
{
Expand Down Expand Up @@ -65,6 +65,22 @@ protected override void BuildModel(ModelBuilder modelBuilder)

b.ToTable("Addresses", "BuildingRegistryConsumerAddress");
});

modelBuilder.Entity("BuildingRegistry.Consumer.Address.OffsetOverride", b =>
{
b.Property<string>("ConsumerGroupId")
.HasColumnType("nvarchar(450)");

b.Property<bool>("Configured")
.HasColumnType("bit");

b.Property<long>("Offset")
.HasColumnType("bigint");

b.HasKey("ConsumerGroupId");

b.ToTable("OffsetOverrides", "BuildingRegistryConsumerAddress");
});
#pragma warning restore 612, 618
}
}
Expand Down
24 changes: 24 additions & 0 deletions src/BuildingRegistry.Consumer.Address/OffsetOverride.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
namespace BuildingRegistry.Consumer.Address
{
using BuildingRegistry.Infrastructure;
using Microsoft.EntityFrameworkCore;
using Microsoft.EntityFrameworkCore.Metadata.Builders;

public class OffsetOverride
{
public string ConsumerGroupId { get; set; }
public long Offset { get; set; }
public bool Configured { get; set; }
}

public class OffsetOverrideConfiguration : IEntityTypeConfiguration<OffsetOverride>
{
public const string TableName = "OffsetOverrides";

public void Configure(EntityTypeBuilder<OffsetOverride> builder)
{
builder.ToTable(TableName, Schema.ConsumerAddress)
.HasKey(x => x.ConsumerGroupId);
}
}
}
2 changes: 0 additions & 2 deletions src/BuildingRegistry.Consumer.Address/appsettings.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

"GroupSuffix": "",
"AddressTopic": "dev.address",
"AddressTopicOffset": "",
"IgnoreAddressTopicOffsetDataCheck": false,

"BaseUrl": "https://api.staging-basisregisters.vlaanderen/",

Expand Down
Loading