From 063f3015d8188c58d9dd9de45e3039226e532bea Mon Sep 17 00:00:00 2001 From: "Geralt Wang(MSFT)" Date: Tue, 18 Aug 2020 12:09:31 +0800 Subject: [PATCH] Update eventHub SDK to version 5.1.0 (#92) Updated frameworks in ASA latency monitoring utility: * Updated .NET Core to 3.1 * Update eventHub SDK to version 5.1.0 * Removed dependency of Newtonsoft, added System.Text.Json dependency. Refactor based on comments. * Remove the ReceiverProcessor type and merge its ProcessEvents method into EventReceiver also split EventReceiver into it's own file. * Some changes in read me, and add comments * Remove lock in EventReceiver and some codestyle changing. * Quick mention the condition to run this sample. * updated to GA package * some describing details updated Co-authored-by: Frank Li (Wicresoft North America Ltd) --- .../tools/README.md | 47 ++++- .../tools/eh-asa-perfmon/EventReceiver.cs | 111 ++++++++++++ .../tools/eh-asa-perfmon/Program.cs | 162 ++---------------- .../eh-asa-perfmon/eh-asa-perfmon.csproj | 10 +- 4 files changed, 171 insertions(+), 159 deletions(-) create mode 100644 eventhubs-streamanalytics-eventhubs/tools/eh-asa-perfmon/EventReceiver.cs diff --git a/eventhubs-streamanalytics-eventhubs/tools/README.md b/eventhubs-streamanalytics-eventhubs/tools/README.md index e1c9e564..9259c84d 100644 --- a/eventhubs-streamanalytics-eventhubs/tools/README.md +++ b/eventhubs-streamanalytics-eventhubs/tools/README.md @@ -1,11 +1,50 @@ -# Testing End-To-End Latency +--- +page_type: sample +languages: +- csharp +products: +- azure +- azure-event-hubs +extensions: +- platforms: dotnet +--- -Get the Event Hub connection string created and displayed during the script execution. If you cannot find it, just grab the execution string for the `Listen` Shared Access Policy for the output eventhub. +# Getting started on processing received message from Event Hubs # +The `eventhubs-streamanalytics-eventhubs` sample demonstrates processing messages from Event Hubs. The `EventReceiver` will: + + - Connect to an existing event hub. + - Read events from all partitions using the `EventHubConsumerClient`. + - Process each received message and output to a `.csv` file. + +# Running this Sample # +To run this sample: + +If you don't have an Azure subscription, create a [free account] before you begin. + +Go to [Azure portal], sign in with your account. You can find how to [create event hub] from here. + +After creating an event hub within the Event Hubs namespace, you can get the Event Hub-level EventHubConnectionString with following steps. + 1. In the list of event hubs, select your event hub. + 2. On the Event Hubs Instance page, select Shared Access Policies on the left menu. + 3. Add a policy with appropriate permissions(Listen or Manage). + 4. Click on added policy. + +Hints:While running this sample, You should have an existing Azure Stream Analytics configuration(a producer sends events) +that publishes events to this event hub. + +Run the following command. +```bash +git clone https://github.com/Azure-Samples/streaming-at-scale.git +``` From within the `eh-asa-perform` folder run: ```bash -dotnet run -- -c "" +dotnet run -- -c "" ``` +The application will read from the event hub and will measure latency (difference from Enqueued and Processed time) for each batch of recevied messages. -The application will read from the output eventhub and will measure latency (difference from Enqueued and Processed time) for each batch of recevied messages. + +[free account]: https://azure.microsoft.com/free/?WT.mc_id=A261C142F +[Azure portal]: https://portal.azure.com/ +[create event hub]: https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-create \ No newline at end of file diff --git a/eventhubs-streamanalytics-eventhubs/tools/eh-asa-perfmon/EventReceiver.cs b/eventhubs-streamanalytics-eventhubs/tools/eh-asa-perfmon/EventReceiver.cs new file mode 100644 index 00000000..e1198cc9 --- /dev/null +++ b/eventhubs-streamanalytics-eventhubs/tools/eh-asa-perfmon/EventReceiver.cs @@ -0,0 +1,111 @@ +using Azure.Messaging.EventHubs.Consumer; +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; + +namespace StreamingAtScale +{ + public class EventReceiver + { + public async Task Receive(string connectionString, CancellationToken cancellationToken) + { + // In here, our consumer will read from the latest position instead of the earliest. As a result, it won't see events that + // have previously been published. + // + // Each partition of an Event Hub represents a potentially infinite stream of events. When a consumer is reading, there is no definitive + // point where it can assess that all events have been read and no more will be available. As a result, when the consumer reaches the end of + // the available events for a partition, it will continue to wait for new events to arrive so that it can surface them to be processed. During this + // time, the iterator will block. + // + // In order to prevent the consumer from waiting forever for events, and blocking other code, there is a method available for developers to + // control this behavior. It's signaling the cancellation token passed when reading will cause the consumer to stop waiting and end iteration + // immediately. This is desirable when you have decided that you are done reading and do not wish to continue. + + await using (var consumerClient = new EventHubConsumerClient(EventHubConsumerClient.DefaultConsumerGroupName, connectionString)) + { + Console.WriteLine("The application will now start to listen for incoming message."); + + // Each time the consumer looks to read events, we'll ask that it waits only a short time before emitting + // an empty event, so that our code has the chance to run without indefinite blocking. + + ReadEventOptions readOptions = new ReadEventOptions + { + MaximumWaitTime = TimeSpan.FromMilliseconds(500) + }; + + using (var csvOutput = File.CreateText("./result.csv")) + { + csvOutput.WriteLine("EventCount,BatchCount,BatchFrom,BatchTo,MinLatency,MaxLatency,AvgLatency"); + try + { + // The client is safe and intended to be long-lived. + + await foreach (PartitionEvent currentEvent in consumerClient.ReadEventsAsync(readOptions, cancellationToken)) + { + var eventData = currentEvent.Data; + + // Because publishing and receiving events is asynchronous, the events that published may not + // be immediately available for our consumer to see, so we'll have to guard against an empty event being sent + if (eventData == null) continue; + + int eventCount = 0; + var listTimeSpan = new List(); + var listDateTime = new List(); + + var eventBody = Encoding.UTF8.GetString(eventData.Body.ToArray()).Split('\n'); + + foreach (var bodyInfo in eventBody) + { + try + { + var message = JsonSerializer.Deserialize>(bodyInfo); + eventCount++; + + var timeCreated = DateTime.Parse(message["createdAt"].ToString()); + var timeIn = DateTime.Parse(message["EventEnqueuedUtcTime"].ToString()); + var timeProcessed = DateTime.Parse(message["EventProcessedUtcTime"].ToString()); + var timeAsaProcessed = DateTime.Parse(message["ASAProcessedUtcTime"].ToString()); + var timeOut = eventData.EnqueuedTime.UtcDateTime.ToLocalTime(); + + listDateTime.Add(timeIn); + var elapsed = timeOut - timeIn; + listTimeSpan.Add(elapsed); + } + catch (Exception ex) + { + Console.WriteLine("Error while parsing event body."); + Console.WriteLine("Error:" + ex.Message); + Console.WriteLine("Message:" + bodyInfo); + } + } + + var batchFrom = listDateTime.Min(); + var batchTo = listDateTime.Min(); + var minLatency = listTimeSpan.Min().TotalMilliseconds; + var maxLatency = listTimeSpan.Max().TotalMilliseconds; + var avgLatency = Math.Round(listTimeSpan.Average(ts => ts.TotalMilliseconds), 0); + + Console.Write($"Received {eventCount} events."); + Console.Write($"\tBatch (From/To): {batchFrom.ToString("HH:mm:ss.ffffff")}/{batchTo.ToString("HH:mm:ss.ffffff")}"); + Console.Write($"\tElapsed msec (Min/Max/Avg): {minLatency}/{maxLatency}/{avgLatency}"); + Console.WriteLine(); + + csvOutput.WriteLine($"{eventCount},{eventCount},{batchFrom.ToString("o")},{batchTo.ToString("o")},{minLatency},{maxLatency},{avgLatency}"); + } + } + catch (TaskCanceledException) + { + // This is okay because the task was cancelled. + } + } + } + // At this point, our clients have passed their "using" scope and have safely been disposed of. We + // have no further obligations. + } + } +} diff --git a/eventhubs-streamanalytics-eventhubs/tools/eh-asa-perfmon/Program.cs b/eventhubs-streamanalytics-eventhubs/tools/eh-asa-perfmon/Program.cs index 510fd03b..3ff18ba7 100644 --- a/eventhubs-streamanalytics-eventhubs/tools/eh-asa-perfmon/Program.cs +++ b/eventhubs-streamanalytics-eventhubs/tools/eh-asa-perfmon/Program.cs @@ -1,14 +1,8 @@ -using System; +using CommandLine; +using System; +using System.Linq; using System.Threading; using System.Threading.Tasks; -using System.Linq; -using CommandLine; -using Microsoft.Azure.EventHubs; -using System.Collections.Generic; -using Newtonsoft.Json; -using Newtonsoft.Json.Linq; -using System.IO; -using System.Text; namespace StreamingAtScale { @@ -18,135 +12,6 @@ public class Options public string EventHubConnectionString { get; set; } } - class MyPartitionReceiver : IPartitionReceiveHandler - { - private readonly object _lock = new object(); - private readonly string _partitionId = string.Empty; - private readonly StreamWriter _csvOutput = null; - private int _maxBatchSize = 1; - - public int MaxBatchSize { get => _maxBatchSize; set => _maxBatchSize = value; } - - public MyPartitionReceiver(string partitionId, StreamWriter csvOutput) - { - this._partitionId = partitionId; - this._csvOutput = csvOutput; - } - - public Task ProcessErrorAsync(Exception error) - { - Console.WriteLine(error.Message); - return Task.FromException(error); - } - - public Task ProcessEventsAsync(IEnumerable events) - { - int eventCount = 0; - - var listTS = new List(); - var listDT = new List(); - - foreach (var e in events) - { - var eventBody = Encoding.UTF8.GetString(e.Body.Array).Split('\n'); - - foreach (var b in eventBody) - { - try - { - var message = JsonConvert.DeserializeObject(b, new JsonSerializerSettings() { DateParseHandling = DateParseHandling.None } ); - eventCount += 1; - - var timeCreated = DateTime.Parse(message["createdAt"].ToString()); - var timeIn = DateTime.Parse(message["EventEnqueuedUtcTime"].ToString()); - var timeProcessed = DateTime.Parse(message["EventProcessedUtcTime"].ToString()); - var timeASAProcessed = DateTime.Parse(message["ASAProcessedUtcTime"].ToString()); - var timeOut = e.SystemProperties.EnqueuedTimeUtc.ToLocalTime(); - - listDT.Add(timeIn); - var elapsed = timeOut - timeIn; - listTS.Add(elapsed); - } - catch (Exception ex) - { - Console.WriteLine("Error while parsing event body."); - Console.WriteLine("Error:" + ex.Message); - Console.WriteLine("Message:" + b); - } - } - } - - var batchFrom = listDT.Min(); - var batchTo = listDT.Min(); - var minLatency = listTS.Min().TotalMilliseconds; - var maxLatency = listTS.Max().TotalMilliseconds; - var avgLatency = Math.Round(listTS.Average(ts => ts.TotalMilliseconds), 0); - - lock(_lock) { - Console.Write($"[{this._partitionId}] Received {eventCount} events in {events.Count()} batch(es)."); - Console.Write($"\tBatch (From/To): {batchFrom.ToString("HH:mm:ss.ffffff")}/{batchTo.ToString("HH:mm:ss.ffffff")}"); - Console.Write($"\tElapsed msec (Min/Max/Avg): {minLatency}/{maxLatency}/{avgLatency}"); - Console.WriteLine(); - - _csvOutput.WriteLine($"{this._partitionId},{events.Count()},{eventCount},{batchFrom.ToString("o")},{batchTo.ToString("o")},{minLatency},{maxLatency},{avgLatency}"); - } - - return Task.CompletedTask; - } - } - - - public class EventReceiver - { - private readonly EventHubClient _client; - private CancellationToken _cancellationToken; - - public EventReceiver(string connectionString, CancellationToken _cancellationToken) - { - this._client = EventHubClient.CreateFromConnectionString(connectionString); - - this._cancellationToken = _cancellationToken; - } - - public async Task Receive() - { - Console.WriteLine("The application will now start to listen for incoming message."); - - var csvOutput = File.CreateText("./result.csv"); - csvOutput.WriteLine("PartitionId,EventCount,BatchCount,BatchFrom,BatchTo,MinLatency,MaxLatency,AvgLatency"); - - var runtimeInfo = await _client.GetRuntimeInformationAsync(); - Console.WriteLine("Creating receiver handlers..."); - var utcNow = DateTime.UtcNow; - var receivers = runtimeInfo.PartitionIds - .Select(pid => { - var receiver = _client.CreateReceiver("$Default", pid, EventPosition.FromEnqueuedTime(utcNow)); - Console.WriteLine("Created receiver for partition '{0}'.", pid); - receiver.SetReceiveHandler(new MyPartitionReceiver(pid, csvOutput)); - return receiver; - }) - .ToList(); - - try - { - await Task.Delay(-1, this._cancellationToken); - } - catch (TaskCanceledException) - { - // This is okay because the task was cancelled. :) - } - finally - { - // Clean up nicely. - await Task.WhenAll( - receivers.Select(receiver => receiver.CloseAsync()) - ); - } - - csvOutput.Close(); - } - } - public class Program { static async Task Main(string[] args) @@ -167,24 +32,19 @@ static async Task Main(string[] args) { Console.WriteLine("No parameters passed or environment variables set."); Console.WriteLine("Please use --help to learn how to use the application."); - Console.WriteLine(" dotnet run -- --help"); + Console.WriteLine("dotnet run -- --help"); return; } - var cts = new CancellationTokenSource(); - var receiver = new EventReceiver(options.EventHubConnectionString, cts.Token); - - Task t = receiver.Receive(); - - Console.ReadKey(true); - cts.Cancel(); + // As a preventative measure, we'll specify that cancellation should occur after 60 seconds, + // so that we don't hold indefinitely in the event receiving. - Console.WriteLine("Exiting..."); - await t; + using CancellationTokenSource cancellationSource = new CancellationTokenSource(); + cancellationSource.CancelAfter(TimeSpan.FromSeconds(60)); + var receiver = new EventReceiver(); + await receiver.Receive(options.EventHubConnectionString, cancellationSource.Token); - Console.WriteLine("Done."); + Console.WriteLine("Done."); } - } } - diff --git a/eventhubs-streamanalytics-eventhubs/tools/eh-asa-perfmon/eh-asa-perfmon.csproj b/eventhubs-streamanalytics-eventhubs/tools/eh-asa-perfmon/eh-asa-perfmon.csproj index e9372a0a..85291eb9 100644 --- a/eventhubs-streamanalytics-eventhubs/tools/eh-asa-perfmon/eh-asa-perfmon.csproj +++ b/eventhubs-streamanalytics-eventhubs/tools/eh-asa-perfmon/eh-asa-perfmon.csproj @@ -1,14 +1,16 @@ - + Exe - netcoreapp2.1 + netcoreapp3.1 StreamingAtScale + 8.0 - - + + +