Skip to content

Commit

Permalink
refactor(producers): use projector endpoints
Browse files Browse the repository at this point in the history
  • Loading branch information
ArneD committed Jan 20, 2025
1 parent 9eee5c6 commit 8e375ac
Show file tree
Hide file tree
Showing 4 changed files with 7 additions and 178 deletions.
2 changes: 1 addition & 1 deletion paket.dependencies
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ nuget Be.Vlaanderen.Basisregisters.ProjectionHandling.Connector.Testing 14.0.0
nuget Be.Vlaanderen.Basisregisters.ProjectionHandling.Testing.Xunit 14.0.0
nuget Be.Vlaanderen.Basisregisters.ProjectionHandling.Syndication 14.0.0

nuget Be.Vlaanderen.Basisregisters.Projector 15.1.0
nuget Be.Vlaanderen.Basisregisters.Projector 15.2.0

nuget Be.Vlaanderen.Basisregisters.Crab 4.0.0

Expand Down
2 changes: 1 addition & 1 deletion paket.lock
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ NUGET
Microsoft.EntityFrameworkCore (>= 8.0.2)
Microsoft.Extensions.Logging (>= 8.0)
xunit (>= 2.7)
Be.Vlaanderen.Basisregisters.Projector (15.1)
Be.Vlaanderen.Basisregisters.Projector (15.2)
Autofac (>= 8.0)
Autofac.Extensions.DependencyInjection (>= 9.0)
Be.Vlaanderen.Basisregisters.ProjectionHandling.Connector (>= 14.0)
Expand Down
132 changes: 1 addition & 131 deletions src/BuildingRegistry.Producer.Snapshot.Oslo/Infrastructure/Startup.cs
Original file line number Diff line number Diff line change
@@ -1,141 +1,11 @@
namespace BuildingRegistry.Producer.Snapshot.Oslo.Infrastructure
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.Mime;
using System.Threading;
using System.Threading.Tasks;
using Be.Vlaanderen.Basisregisters.AspNetCore.Mvc.Formatters.Json;
using Be.Vlaanderen.Basisregisters.Projector.ConnectedProjections;
using Be.Vlaanderen.Basisregisters.Projector.Controllers;
using Be.Vlaanderen.Basisregisters.Projector;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Newtonsoft.Json;
using SqlStreamStore;

/// <summary>
/// Extract to Library when working correctly
/// </summary>
public static class ApplicationBuilderProjectorExtensions
{
public static IApplicationBuilder UseProjectorEndpoints(
this IApplicationBuilder builder,
string baseUrl,
JsonSerializerSettings? jsonSerializerSettings)
{
ArgumentNullException.ThrowIfNull(baseUrl);

builder.UseEndpoints(endpoints =>
{
endpoints.MapGet("/v1/projections", async context => { await GetProjections(builder, context, baseUrl, jsonSerializerSettings); });
endpoints.MapGet("/projections", async context => { await GetProjections(builder, context, baseUrl, jsonSerializerSettings); });

endpoints.MapPost("/projections/start/all", async context => { await StartAll(builder, context); });
endpoints.MapPost("/v1/projections/start/all", async context => { await StartAll(builder, context); });

endpoints.MapPost("/projections/start/{projectionId}", async context
=> await StartProjection(builder, context.Request.RouteValues["projectionId"].ToString(), context));
endpoints.MapPost("/v1/projections/start/{projectionId}", async context
=> await StartProjection(builder, context.Request.RouteValues["projectionId"].ToString(), context));

endpoints.MapPost("/projections/stop/all", async context => { await StopAll(builder, context); });
endpoints.MapPost("/v1/projections/stop/all", async context => { await StopAll(builder, context); });

endpoints.MapPost("/projections/stop/{projectionId}", async context
=> await StopProjection(builder, context.Request.RouteValues["projectionId"].ToString(), context));
endpoints.MapPost("/v1/projections/stop/{projectionId}", async context
=> await StopProjection(builder, context.Request.RouteValues["projectionId"].ToString(), context));
});

return builder;
}

private static async Task StopProjection(IApplicationBuilder app, string? projectionId, HttpContext context)
{
var manager = app.ApplicationServices.GetRequiredService<IConnectedProjectionsManager>();
if (!manager.Exists(projectionId))
{
context.Response.StatusCode = StatusCodes.Status400BadRequest;
await context.Response.WriteAsync("Invalid projection Id.");
return;
}

await manager.Stop(projectionId, CancellationToken.None);
context.Response.StatusCode = StatusCodes.Status202Accepted;
}

private static async Task StopAll(IApplicationBuilder app, HttpContext context)
{
var manager = app.ApplicationServices.GetRequiredService<IConnectedProjectionsManager>();
await manager.Stop(CancellationToken.None);

context.Response.StatusCode = StatusCodes.Status202Accepted;
}

private static async Task StartProjection(IApplicationBuilder app, string? projectionId, HttpContext context)
{
var manager = app.ApplicationServices.GetRequiredService<IConnectedProjectionsManager>();

if (!manager.Exists(projectionId))
{
context.Response.StatusCode = StatusCodes.Status400BadRequest;
await context.Response.WriteAsync("Invalid projection Id.");
return;
}

await manager.Start(projectionId, CancellationToken.None);
context.Response.StatusCode = StatusCodes.Status202Accepted;
}

private static async Task StartAll(IApplicationBuilder app, HttpContext context)
{
var manager = app.ApplicationServices.GetRequiredService<IConnectedProjectionsManager>();
await manager.Start(CancellationToken.None);

context.Response.StatusCode = StatusCodes.Status202Accepted;
}

private static async Task GetProjections(
IApplicationBuilder app,
HttpContext context,
string baseUrl,
JsonSerializerSettings? jsonSerializerSettings = null)
{
var manager = app.ApplicationServices.GetRequiredService<IConnectedProjectionsManager>();
var streamStore = app.ApplicationServices.GetRequiredService<IStreamStore>();

var registeredConnectedProjections = manager
.GetRegisteredProjections()
.ToList();
var projectionStates = await manager.GetProjectionStates(CancellationToken.None);
var responses = registeredConnectedProjections.Aggregate(
new List<ProjectionResponse>(),
(list, projection) =>
{
var projectionState = projectionStates.SingleOrDefault(x => x.Name == projection.Id);
list.Add(new ProjectionResponse(
projection,
projectionState,
baseUrl));
return list;
});

var streamPosition = await streamStore.ReadHeadPosition();

var projectionResponseList = new ProjectionResponseList(responses, baseUrl)
{
StreamPosition = streamPosition
};

var json = JsonConvert.SerializeObject(projectionResponseList, jsonSerializerSettings ?? new JsonSerializerSettings());

context.Response.Headers.ContentType = MediaTypeNames.Application.Json;
await context.Response.WriteAsync(json);
}
}

public sealed class Startup
{
Expand Down
49 changes: 4 additions & 45 deletions src/BuildingRegistry.Producer/Infrastructure/Startup.cs
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
namespace BuildingRegistry.Producer.Infrastructure
{
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using Be.Vlaanderen.Basisregisters.AspNetCore.Mvc.Formatters.Json;
using Be.Vlaanderen.Basisregisters.Projector.ConnectedProjections;
using Be.Vlaanderen.Basisregisters.Projector.Controllers;
using Be.Vlaanderen.Basisregisters.Projector;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Newtonsoft.Json;
using SqlStreamStore;

public class Startup
{
Expand All @@ -22,44 +16,9 @@ public void Configure(IApplicationBuilder app)

app.UseHealthChecks("/health");

app.UseEndpoints(endpoints =>
{
endpoints.MapGet("v1/projections", async context =>
{
var configuration = app.ApplicationServices.GetRequiredService<IConfiguration>();
var manager = app.ApplicationServices.GetRequiredService<IConnectedProjectionsManager>();
var streamStore = app.ApplicationServices.GetRequiredService<IStreamStore>();

var baseUri = configuration.GetValue<string>("BaseUrl").TrimEnd('/');

var registeredConnectedProjections = manager
.GetRegisteredProjections()
.ToList();
var projectionStates = await manager.GetProjectionStates(CancellationToken.None);
var responses = registeredConnectedProjections.Aggregate(
new List<ProjectionResponse>(),
(list, projection) =>
{
var projectionState = projectionStates.SingleOrDefault(x => x.Name == projection.Id);
list.Add(new ProjectionResponse(
projection,
projectionState,
baseUri));
return list;
});

var streamPosition = await streamStore.ReadHeadPosition();

var projectionResponseList = new ProjectionResponseList(responses, baseUri)
{
StreamPosition = streamPosition
};

var json = JsonConvert.SerializeObject(projectionResponseList, new JsonSerializerSettings().ConfigureDefaultForApi());

await context.Response.WriteAsync(json);
});
});
var configuration = app.ApplicationServices.GetRequiredService<IConfiguration>();
var baseUri = configuration.GetValue<string>("BaseUrl").TrimEnd('/');
app.UseProjectorEndpoints(baseUri, new JsonSerializerSettings().ConfigureDefaultForApi());
}
}
}

0 comments on commit 8e375ac

Please sign in to comment.