forked from oskardudycz/EventSourcing.NetCore
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathEventStoreDBSubscriptionToAll.cs
185 lines (154 loc) · 7.11 KB
/
EventStoreDBSubscriptionToAll.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
using Core.Events;
using Core.EventStoreDB.Events;
using Core.Threading;
using EventStore.Client;
using Grpc.Core;
using Microsoft.Extensions.Logging;
namespace Core.EventStoreDB.Subscriptions;
public class EventStoreDBSubscriptionToAllOptions
{
public string SubscriptionId { get; set; } = "default";
public SubscriptionFilterOptions FilterOptions { get; set; } =
new(EventTypeFilter.ExcludeSystemEvents());
public Action<EventStoreClientOperationOptions>? ConfigureOperation { get; set; }
public UserCredentials? Credentials { get; set; }
public bool ResolveLinkTos { get; set; }
public bool IgnoreDeserializationErrors { get; set; } = true;
}
public class EventStoreDBSubscriptionToAll
{
private readonly IEventBus eventBus;
private readonly EventStoreClient eventStoreClient;
private readonly ISubscriptionCheckpointRepository checkpointRepository;
private readonly ILogger<EventStoreDBSubscriptionToAll> logger;
private EventStoreDBSubscriptionToAllOptions subscriptionOptions = default!;
private string SubscriptionId => subscriptionOptions.SubscriptionId;
private readonly object resubscribeLock = new();
private CancellationToken cancellationToken;
public EventStoreDBSubscriptionToAll(
EventStoreClient eventStoreClient,
IEventBus eventBus,
ISubscriptionCheckpointRepository checkpointRepository,
ILogger<EventStoreDBSubscriptionToAll> logger
)
{
this.eventBus = eventBus ?? throw new ArgumentNullException(nameof(eventBus));
this.eventStoreClient = eventStoreClient ?? throw new ArgumentNullException(nameof(eventStoreClient));
this.checkpointRepository =
checkpointRepository ?? throw new ArgumentNullException(nameof(checkpointRepository));
this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
}
public async Task SubscribeToAll(EventStoreDBSubscriptionToAllOptions subscriptionOptions, CancellationToken ct)
{
// see: https://github.com/dotnet/runtime/issues/36063
await Task.Yield();
this.subscriptionOptions = subscriptionOptions;
cancellationToken = ct;
logger.LogInformation("Subscription to all '{SubscriptionId}'", subscriptionOptions.SubscriptionId);
var checkpoint = await checkpointRepository.Load(SubscriptionId, ct);
await eventStoreClient.SubscribeToAllAsync(
checkpoint == null? FromAll.Start : FromAll.After(new Position(checkpoint.Value, checkpoint.Value)),
HandleEvent,
subscriptionOptions.ResolveLinkTos,
HandleDrop,
subscriptionOptions.FilterOptions,
subscriptionOptions.Credentials,
ct
);
logger.LogInformation("Subscription to all '{SubscriptionId}' started", SubscriptionId);
}
private async Task HandleEvent(StreamSubscription subscription, ResolvedEvent resolvedEvent,
CancellationToken ct)
{
try
{
if (IsEventWithEmptyData(resolvedEvent) || IsCheckpointEvent(resolvedEvent)) return;
var streamEvent = resolvedEvent.ToEventEnvelope();
if (streamEvent == null)
{
// That can happen if we're sharing database between modules.
// If we're subscribing to all and not filtering out events from other modules,
// then we might get events that are from other module and we might not be able to deserialize them.
// In that case it's safe to ignore deserialization error.
// You may add more sophisticated logic checking if it should be ignored or not.
logger.LogWarning("Couldn't deserialize event with id: {EventId}", resolvedEvent.Event.EventId);
if (!subscriptionOptions.IgnoreDeserializationErrors)
throw new InvalidOperationException(
$"Unable to deserialize event {resolvedEvent.Event.EventType} with id: {resolvedEvent.Event.EventId}"
);
return;
}
// publish event to internal event bus
await eventBus.Publish(streamEvent, ct);
await checkpointRepository.Store(SubscriptionId, resolvedEvent.Event.Position.CommitPosition, ct);
}
catch (Exception e)
{
logger.LogError("Error consuming message: {ExceptionMessage}{ExceptionStackTrace}", e.Message,
e.StackTrace);
// if you're fine with dropping some events instead of stopping subscription
// then you can add some logic if error should be ignored
throw;
}
}
private void HandleDrop(StreamSubscription _, SubscriptionDroppedReason reason, Exception? exception)
{
logger.LogError(
exception,
"Subscription to all '{SubscriptionId}' dropped with '{Reason}'",
SubscriptionId,
reason
);
if (exception is RpcException { StatusCode: StatusCode.Cancelled })
return;
Resubscribe();
}
private void Resubscribe()
{
// You may consider adding a max resubscribe count if you want to fail process
// instead of retrying until database is up
while (true)
{
var resubscribed = false;
try
{
Monitor.Enter(resubscribeLock);
// No synchronization context is needed to disable synchronization context.
// That enables running asynchronous method not causing deadlocks.
// As this is a background process then we don't need to have async context here.
using (NoSynchronizationContextScope.Enter())
{
SubscribeToAll(subscriptionOptions, cancellationToken).Wait(cancellationToken);
}
resubscribed = true;
}
catch (Exception exception)
{
logger.LogWarning(exception,
"Failed to resubscribe to all '{SubscriptionId}' dropped with '{ExceptionMessage}{ExceptionStackTrace}'",
SubscriptionId, exception.Message, exception.StackTrace);
}
finally
{
Monitor.Exit(resubscribeLock);
}
if (resubscribed)
break;
// Sleep between reconnections to not flood the database or not kill the CPU with infinite loop
// Randomness added to reduce the chance of multiple subscriptions trying to reconnect at the same time
Thread.Sleep(1000 + new Random((int)DateTime.UtcNow.Ticks).Next(1000));
}
}
private bool IsEventWithEmptyData(ResolvedEvent resolvedEvent)
{
if (resolvedEvent.Event.Data.Length != 0) return false;
logger.LogInformation("Event without data received");
return true;
}
private bool IsCheckpointEvent(ResolvedEvent resolvedEvent)
{
if (resolvedEvent.Event.EventType != EventTypeMapper.ToName<CheckpointStored>()) return false;
logger.LogInformation("Checkpoint event - ignoring");
return true;
}
}