-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathMessageBusSender.cs
115 lines (97 loc) · 3.86 KB
/
MessageBusSender.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
using System.Collections.Concurrent;
using System.Text;
using Microsoft.Extensions.Logging;
using Pact.Core.Extensions;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
namespace Pact.RabbitMQ;
/// <summary>
/// Basic handler for sending messages
/// </summary>
public class MessageBusSender : IMessageBusSender
{
public MessageBusSender (ILogger<MessageBusSender> logger, IMessageBusClient client)
{
_logger = logger;
_client = client;
_rpcQueue = _client.Channel.QueueDeclare().QueueName;
var consumer = new EventingBasicConsumer(_client.Channel);
consumer.Received += Consumer_Received;
_client.Channel.BasicConsume(consumer: consumer, queue: _rpcQueue, autoAck: true);
_pendingMessages = new ConcurrentDictionary<string, TaskCompletionSource<string>>();
}
private void Consumer_Received(object sender, BasicDeliverEventArgs e)
{
try
{
var correlationId = e.BasicProperties.CorrelationId;
var message = Encoding.UTF8.GetString(e.Body.ToArray());
_logger.LogTrace("Received: {message} with CorrelationId {correlationId}", message, correlationId);
_pendingMessages.TryRemove(correlationId, out var tcs);
tcs?.SetResult(message);
}
catch (Exception exc)
{
_logger.LogError(exc, "Error processing RPC response");
throw;
}
}
private readonly ILogger<MessageBusSender> _logger;
private readonly IMessageBusClient _client;
private readonly ConcurrentDictionary<string, TaskCompletionSource<string>> _pendingMessages;
private readonly string _rpcQueue;
/// <summary>
/// Sends a basic fire and forget message
/// </summary>
/// <param name="item">object to be sent</param>
/// <param name="exchange">the bucket to send it too</param>
/// <param name="key">the message routing key</param>
public void Send(object item, string exchange, string key)
{
try
{
var json = item.ToJson();
var data = Encoding.UTF8.GetBytes(json);
var properties = _client.Channel.CreateBasicProperties();
properties.Persistent = true;
_client.Channel.BasicPublish(exchange, key, true, properties, data);
}
catch (Exception exc)
{
_logger.LogError(exc, "Error Send {key} {exchange} {item}", exchange, key, item);
throw;
}
}
/// <summary>
/// Sends a message and waits for a response
/// </summary>
/// <typeparam name="T">Expected return type</typeparam>
/// <param name="item">object to be sent</param>
/// <param name="exchange">the bucket to send it too</param>
/// <param name="key">the message routing key</param>
/// <returns>instance of T</returns>
public async Task<T> SendRPCAsync<T>(object item, string exchange, string key) where T : class
{
try
{
var tcs = new TaskCompletionSource<string>();
var correlationId = Guid.NewGuid().ToString();
_pendingMessages[correlationId] = tcs;
//Ok call remote server
var props = _client.Channel.CreateBasicProperties();
props.CorrelationId = correlationId;
props.ReplyTo = _rpcQueue;
var messageJson = item.ToJson();
var messageData = Encoding.UTF8.GetBytes(messageJson);
_client.Channel.BasicPublish(exchange, key, props, messageData);
_logger.LogTrace("Sent: {messageData} with CorrelationId {correlationId}", messageData, correlationId);
var rawJson = await tcs.Task;
return rawJson.FromJson<T>();
}
catch (Exception exc)
{
_logger.LogError(exc, "Error Send RPC {key} {exchange} {item}", exchange, key, item);
throw;
}
}
}