Skip to content

Commit

Permalink
chore: follow design patterns for publishers
Browse files Browse the repository at this point in the history
  • Loading branch information
philasmar committed Feb 28, 2024
1 parent 1aa4506 commit 9b38376
Show file tree
Hide file tree
Showing 15 changed files with 133 additions and 80 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ public class PublisherController : ControllerBase
}

// Publish the message to SQS using the injected ISQSPublisher, with SQS-specific options
await _sqsPublisher.PublishAsync(message, new SQSOptions
await _sqsPublisher.SendAsync(message, new SQSOptions
{
DelaySeconds = <delay-in-seconds>,
MessageAttributes = <message-attributes>,
Expand Down
2 changes: 1 addition & 1 deletion docs/docs/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public class PublisherController : ControllerBase
}

// Publish the message to SQS using the injected ISQSPublisher, with SQS-specific options
await _sqsPublisher.PublishAsync(message, new SQSOptions
await _sqsPublisher.SendAsync(message, new SQSOptions
{
DelaySeconds = <delay-in-seconds>,
MessageAttributes = <message-attributes>,
Expand Down
4 changes: 2 additions & 2 deletions sampleapps/PublisherAPI/Controllers/PublisherController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public async Task<IActionResult> PublishOrder([FromBody] OrderInfo message)

return Ok();
}

[HttpPost("fooditem", Name = "Food Item")]
public async Task<IActionResult> PublishFoodItem([FromBody] FoodItem message)
{
Expand Down Expand Up @@ -87,7 +87,7 @@ public async Task<IActionResult> PublishTransaction([FromBody] TransactionInfo t
return BadRequest("The TransactionId cannot be null or empty.");
}

await _sqsPublisher.PublishAsync(transactionInfo, new SQSOptions
await _sqsPublisher.SendAsync(transactionInfo, new SQSOptions
{
MessageGroupId = "group-123"
});
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

using AWS.Messaging.Services;

namespace AWS.Messaging.Publishers.EventBridge
{
/// <summary>
/// This interface allows publishing messages from application code to Amazon EventBridge.
/// It exposes the <see cref="PublishAsync{T}(T, EventBridgeOptions?, CancellationToken)"/> method which takes in a user-defined message, and <see cref="EventBridgeOptions"/> to set additonal parameters while publishing messages to EventBridge.
/// Using dependency injection, this interface is available to inject anywhere in the code.
/// </summary>
public interface IEventBridgePublisher
public interface IEventBridgePublisher : IEventPublisher
{
/// <summary>
/// Publishes the application message to SNS.
Expand Down
38 changes: 22 additions & 16 deletions src/AWS.Messaging/Publishers/MessageRoutingPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
using AWS.Messaging.Publishers.EventBridge;
using AWS.Messaging.Publishers.SNS;
using AWS.Messaging.Publishers.SQS;
using AWS.Messaging.Services;
using AWS.Messaging.Telemetry;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
Expand Down Expand Up @@ -47,10 +48,16 @@ public MessageRoutingPublisher(
};

/// <summary>
/// This dictionary serves as a method to cache created instances of <see cref="IMessagePublisher"/>,
/// This dictionary serves as a method to cache created instances of <see cref="ICommandPublisher"/>,
/// to avoid having to create a new instance any time a message is sent.
/// </summary>
private readonly ConcurrentDictionary<Type, ICommandPublisher> _commandPublisherInstances = new();

/// <summary>
/// This dictionary serves as a method to cache created instances of <see cref="IEventPublisher"/>,
/// to avoid having to create a new instance any time a message is published.
/// </summary>
private readonly ConcurrentDictionary<Type, IMessagePublisher> _publisherInstances = new();
private readonly ConcurrentDictionary<Type, IEventPublisher> _eventPublisherInstances = new();

/// <summary>
/// Publishes a user-defined message to an AWS service based on the
Expand Down Expand Up @@ -80,16 +87,23 @@ public async Task PublishAsync<T>(T message, CancellationToken token = default)

if (_publisherTypeMapping.TryGetValue(mapping.PublishTargetType, out var publisherType))
{
if (!typeof(IMessagePublisher).IsAssignableFrom(publisherType))
if (typeof(ICommandPublisher).IsAssignableFrom(publisherType))
{
var publisher = _commandPublisherInstances.GetOrAdd(publisherType, _ => (ICommandPublisher) ActivatorUtilities.CreateInstance(_serviceProvider, publisherType));
await publisher.SendAsync(message, token);
}
else if (typeof(IEventPublisher).IsAssignableFrom(publisherType))
{
var publisher = _eventPublisherInstances.GetOrAdd(publisherType, _ => (IEventPublisher) ActivatorUtilities.CreateInstance(_serviceProvider, publisherType));
await publisher.PublishAsync(message, token);
}
else
{
_logger.LogError("The message publisher corresponding to the type '{PublishTargetType}' is invalid " +
"and does not implement the interface '{InterfaceType}'.", mapping.PublishTargetType, typeof(IMessagePublisher));
"and does not implement the interface '{CommandInterfaceType}' or '{EventInterfaceType}'.", mapping.PublishTargetType, typeof(ICommandPublisher), typeof(IEventPublisher));
throw new InvalidPublisherTypeException($"The message publisher corresponding to the type '{mapping.PublishTargetType}' is invalid " +
$"and does not implement the interface '{typeof(IMessagePublisher)}'.");
$"and does not implement the interface '{typeof(ICommandPublisher)}' or '{typeof(IEventPublisher)}'.");
}

var publisher = GetPublisherInstance(publisherType);
await publisher.PublishAsync(message, token);
}
else
{
Expand All @@ -104,12 +118,4 @@ public async Task PublishAsync<T>(T message, CancellationToken token = default)
}
}
}

private IMessagePublisher GetPublisherInstance(Type publisherType)
{
return _publisherInstances.GetOrAdd(publisherType, x =>
{
return (IMessagePublisher) ActivatorUtilities.CreateInstance(_serviceProvider, publisherType);
});
}
}
4 changes: 3 additions & 1 deletion src/AWS.Messaging/Publishers/SNS/ISNSPublisher.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

using AWS.Messaging.Services;

namespace AWS.Messaging.Publishers.SNS
{
/// <summary>
/// This interface allows publishing messages from application code to Amazon SNS.
/// It exposes the <see cref="PublishAsync{T}(T, SNSOptions?, CancellationToken)"/> method which takes in a user-defined message, and <see cref="SNSOptions"/> to set additonal parameters while publishing messages to SNS.
/// Using dependency injection, this interface is available to inject anywhere in the code.
/// </summary>
public interface ISNSPublisher
public interface ISNSPublisher : IEventPublisher
{
/// <summary>
/// Publishes the application message to SNS.
Expand Down
12 changes: 7 additions & 5 deletions src/AWS.Messaging/Publishers/SQS/ISQSPublisher.cs
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

using AWS.Messaging.Services;

namespace AWS.Messaging.Publishers.SQS
{
/// <summary>
/// This interface allows publishing messages from application code to Amazon SQS.
/// It exposes the <see cref="PublishAsync{T}(T, SQSOptions?, CancellationToken)"/> method which takes in a user-defined message, and <see cref="SQSOptions"/> to set additonal parameters while publishing messages to SQS.
/// This interface allows sending messages from application code to Amazon SQS.
/// It exposes the <see cref="SendAsync{T}(T, SQSOptions?, CancellationToken)"/> method which takes in a user-defined message, and <see cref="SQSOptions"/> to set additional parameters while sending messages to SQS.
/// Using dependency injection, this interface is available to inject anywhere in the code.
/// </summary>
public interface ISQSPublisher
public interface ISQSPublisher : ICommandPublisher
{
/// <summary>
/// Publishes the application message to SQS.
/// Sends the application message to SQS.
/// </summary>
/// <param name="message">The application message that will be serialized and sent to an SQS queue</param>
/// <param name="sqsOptions">Contains additional parameters that can be set while sending a message to an SQS queue</param>
/// <param name="token">The cancellation token used to cancel the request.</param>
Task PublishAsync<T>(T message, SQSOptions? sqsOptions, CancellationToken token = default);
Task SendAsync<T>(T message, SQSOptions? sqsOptions, CancellationToken token = default);
}
}
20 changes: 10 additions & 10 deletions src/AWS.Messaging/Publishers/SQS/SQSPublisher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@
namespace AWS.Messaging.Publishers.SQS;

/// <summary>
/// The SQS message publisher allows publishing messages to AWS SQS.
/// The SQS message publisher allows sending messages to AWS SQS.
/// </summary>
internal class SQSPublisher : IMessagePublisher, ISQSPublisher
internal class SQSPublisher : ISQSPublisher
{
private readonly IAWSClientProvider _awsClientProvider;
private readonly ILogger<IMessagePublisher> _logger;
private readonly ILogger<ISQSPublisher> _logger;
private readonly IMessageConfiguration _messageConfiguration;
private readonly IEnvelopeSerializer _envelopeSerializer;
private readonly ITelemetryFactory _telemetryFactory;
Expand All @@ -29,7 +29,7 @@ internal class SQSPublisher : IMessagePublisher, ISQSPublisher
/// </summary>
public SQSPublisher(
IAWSClientProvider awsClientProvider,
ILogger<IMessagePublisher> logger,
ILogger<ISQSPublisher> logger,
IMessageConfiguration messageConfiguration,
IEnvelopeSerializer envelopeSerializer,
ITelemetryFactory telemetryFactory)
Expand All @@ -48,9 +48,9 @@ public SQSPublisher(
/// <param name="token">The cancellation token used to cancel the request.</param>
/// <exception cref="InvalidMessageException">If the message is null or invalid.</exception>
/// <exception cref="MissingMessageTypeConfigurationException">If cannot find the publisher configuration for the message type.</exception>
public async Task PublishAsync<T>(T message, CancellationToken token = default)
public async Task SendAsync<T>(T message, CancellationToken token = default)
{
await PublishAsync(message, null, token);
await SendAsync(message, null, token);
}

/// <summary>
Expand All @@ -61,7 +61,7 @@ public async Task PublishAsync<T>(T message, CancellationToken token = default)
/// <param name="token">The cancellation token used to cancel the request.</param>
/// <exception cref="InvalidMessageException">If the message is null or invalid.</exception>
/// <exception cref="MissingMessageTypeConfigurationException">If cannot find the publisher configuration for the message type.</exception>
public async Task PublishAsync<T>(T message, SQSOptions? sqsOptions, CancellationToken token = default)
public async Task SendAsync<T>(T message, SQSOptions? sqsOptions, CancellationToken token = default)
{
using (var trace = _telemetryFactory.Trace("Publish to AWS SQS"))
{
Expand Down Expand Up @@ -127,9 +127,9 @@ private SendMessageRequest CreateSendMessageRequest(string queueUrl, string mess
if (queueUrl.EndsWith(FIFO_SUFFIX) && string.IsNullOrEmpty(sqsOptions?.MessageGroupId))
{
var errorMessage =
$"You are attempting to publish to a FIFO SQS queue but the request does not include a message group ID. " +
$"Please use {nameof(ISQSPublisher)} from the service collection to publish to FIFO queues. " +
$"It exposes a {nameof(PublishAsync)} method that accepts {nameof(SQSOptions)} as a parameter. " +
$"You are attempting to send to a FIFO SQS queue but the request does not include a message group ID. " +
$"Please use {nameof(ISQSPublisher)} from the service collection to send to FIFO queues. " +
$"It exposes a {nameof(SendAsync)} method that accepts {nameof(SQSOptions)} as a parameter. " +
$"A message group ID must be specified via {nameof(SQSOptions.MessageGroupId)}. " +
$"Additionally, {nameof(SQSOptions.MessageDeduplicationId)} must also be specified if content based de-duplication is not enabled on the queue.";

Expand Down
18 changes: 18 additions & 0 deletions src/AWS.Messaging/Services/ICommandPublisher.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.\r
// SPDX-License-Identifier: Apache-2.0

namespace AWS.Messaging.Services;

/// <summary>
/// This interface allows sending messages from application code to recipient-specific Amazon services.
/// It exposes the <see cref="SendAsync{T}(T, CancellationToken)"/> method which takes in a user-defined message to send to a recipient-specific Amazon service.
/// </summary>
public interface ICommandPublisher
{
/// <summary>
/// Sends the application message to a recipient-specific Amazon service.
/// </summary>
/// <param name="message">The application message that will be serialized and sent.</param>
/// <param name="token">The cancellation token used to cancel the request.</param>
Task SendAsync<T>(T message, CancellationToken token = default);
}
18 changes: 18 additions & 0 deletions src/AWS.Messaging/Services/IEventPublisher.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.\r
// SPDX-License-Identifier: Apache-2.0

namespace AWS.Messaging.Services;

/// <summary>
/// This interface allows publishing messages from application code to event-based Amazon services.
/// It exposes the <see cref="PublishAsync{T}(T, CancellationToken)"/> method which takes in a user-defined message to publish to an event-based Amazon service.
/// </summary>
public interface IEventPublisher
{
/// <summary>
/// Publishes the application message to an event-based Amazon service.
/// </summary>
/// <param name="message">The application message that will be serialized and published.</param>
/// <param name="token">The cancellation token used to cancel the request.</param>
Task PublishAsync<T>(T message, CancellationToken token = default);
}
4 changes: 2 additions & 2 deletions test/AWS.Messaging.Benchmarks/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public static async Task<BenchmarkCollector> RunBenchmarkAsync(string queueUrl,
{
DisplayData(benchmarkCollector.PublishTimes, publishElapsedTime, numberOfMessages, "Publishing");
DisplayData(benchmarkCollector.ReceptionTimes, handlingElapsedTime, numberOfMessages, "Receiving");
}
}

host.Dispose();
return benchmarkCollector;
Expand Down Expand Up @@ -218,7 +218,7 @@ private static async Task<TimeSpan> PublishMessages(ISQSPublisher publisher, IBe
await Parallel.ForEachAsync(Enumerable.Range(0, messageCount), options, async (messageNumber, token) =>
{
var start = stopwatch.Elapsed;
await publisher.PublishAsync(new BenchmarkMessage { SentTime = DateTime.UtcNow }, null, token);
await publisher.SendAsync(new BenchmarkMessage { SentTime = DateTime.UtcNow }, null, token);
var publishDuration = stopwatch.Elapsed - start;

benchmarkCollector.RecordMessagePublish(publishDuration);
Expand Down
2 changes: 1 addition & 1 deletion test/AWS.Messaging.IntegrationTests/FifoSubscriberTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ private async Task PublishTransactions(ISQSPublisher sqsPublisher, int numTransa
transactionInfo.ShouldFail = true;
}

await sqsPublisher.PublishAsync(transactionInfo, new SQSOptions
await sqsPublisher.SendAsync(transactionInfo, new SQSOptions
{
MessageGroupId = userId
});
Expand Down
2 changes: 1 addition & 1 deletion test/AWS.Messaging.IntegrationTests/LambdaTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ public async Task ProcessFifoLambdaEventsAsync_Success(int numberOfGroups, int n

expectedMessagesPerGroup[groupId].Add(transactionInfo);

await _publisher!.PublishAsync(transactionInfo, new SQSOptions
await _publisher!.SendAsync(transactionInfo, new SQSOptions
{
MessageGroupId = groupId
});
Expand Down
Loading

0 comments on commit 9b38376

Please sign in to comment.