Skip to content

Commit

Permalink
Update eventHub SDK to version 5.1.0 (#92)
Browse files Browse the repository at this point in the history
Updated frameworks in ASA latency monitoring utility:

* Updated .NET Core to 3.1

* Update eventHub SDK to version 5.1.0

* Removed dependency of Newtonsoft, added System.Text.Json dependency. Refactor based on comments.

* Remove the ReceiverProcessor type and merge its ProcessEvents method into EventReceiver also split EventReceiver into it's own file.

* Some changes in read me, and add comments

* Remove lock in EventReceiver and some codestyle changing.

* Quick mention the condition to run this sample.

* updated to GA package

* some describing details updated

Co-authored-by: Frank Li (Wicresoft North America Ltd) <v-cheli@microsoft.com>
  • Loading branch information
wantedfast and Frank Li (Wicresoft North America Ltd) authored Aug 18, 2020
1 parent c175b67 commit 063f301
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 159 deletions.
47 changes: 43 additions & 4 deletions eventhubs-streamanalytics-eventhubs/tools/README.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,50 @@
# Testing End-To-End Latency
---
page_type: sample
languages:
- csharp
products:
- azure
- azure-event-hubs
extensions:
- platforms: dotnet
---

Get the Event Hub connection string created and displayed during the script execution. If you cannot find it, just grab the execution string for the `Listen` Shared Access Policy for the output eventhub.
# Getting started on processing received message from Event Hubs #

The `eventhubs-streamanalytics-eventhubs` sample demonstrates processing messages from Event Hubs. The `EventReceiver` will:

- Connect to an existing event hub.
- Read events from all partitions using the `EventHubConsumerClient`.
- Process each received message and output to a `.csv` file.

# Running this Sample #
To run this sample:

If you don't have an Azure subscription, create a [free account] before you begin.

Go to [Azure portal], sign in with your account. You can find how to [create event hub] from here.

After creating an event hub within the Event Hubs namespace, you can get the Event Hub-level EventHubConnectionString with following steps.
1. In the list of event hubs, select your event hub.
2. On the Event Hubs Instance page, select Shared Access Policies on the left menu.
3. Add a policy with appropriate permissions(Listen or Manage).
4. Click on added policy.

Hints:While running this sample, You should have an existing Azure Stream Analytics configuration(a producer sends events)
that publishes events to this event hub.

Run the following command.
```bash
git clone https://github.com/Azure-Samples/streaming-at-scale.git
```
From within the `eh-asa-perform` folder run:

```bash
dotnet run -- -c "<connection string>"
dotnet run -- -c "<EventHubConnectionString>"
```
The application will read from the event hub and will measure latency (difference from Enqueued and Processed time) for each batch of recevied messages.

The application will read from the output eventhub and will measure latency (difference from Enqueued and Processed time) for each batch of recevied messages.
<!-- LINKS -->
[free account]: https://azure.microsoft.com/free/?WT.mc_id=A261C142F
[Azure portal]: https://portal.azure.com/
[create event hub]: https://docs.microsoft.com/en-us/azure/event-hubs/event-hubs-create
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
using Azure.Messaging.EventHubs.Consumer;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;

namespace StreamingAtScale
{
public class EventReceiver
{
public async Task Receive(string connectionString, CancellationToken cancellationToken)
{
// In here, our consumer will read from the latest position instead of the earliest. As a result, it won't see events that
// have previously been published.
//
// Each partition of an Event Hub represents a potentially infinite stream of events. When a consumer is reading, there is no definitive
// point where it can assess that all events have been read and no more will be available. As a result, when the consumer reaches the end of
// the available events for a partition, it will continue to wait for new events to arrive so that it can surface them to be processed. During this
// time, the iterator will block.
//
// In order to prevent the consumer from waiting forever for events, and blocking other code, there is a method available for developers to
// control this behavior. It's signaling the cancellation token passed when reading will cause the consumer to stop waiting and end iteration
// immediately. This is desirable when you have decided that you are done reading and do not wish to continue.

await using (var consumerClient = new EventHubConsumerClient(EventHubConsumerClient.DefaultConsumerGroupName, connectionString))
{
Console.WriteLine("The application will now start to listen for incoming message.");

// Each time the consumer looks to read events, we'll ask that it waits only a short time before emitting
// an empty event, so that our code has the chance to run without indefinite blocking.

ReadEventOptions readOptions = new ReadEventOptions
{
MaximumWaitTime = TimeSpan.FromMilliseconds(500)
};

using (var csvOutput = File.CreateText("./result.csv"))
{
csvOutput.WriteLine("EventCount,BatchCount,BatchFrom,BatchTo,MinLatency,MaxLatency,AvgLatency");
try
{
// The client is safe and intended to be long-lived.

await foreach (PartitionEvent currentEvent in consumerClient.ReadEventsAsync(readOptions, cancellationToken))
{
var eventData = currentEvent.Data;

// Because publishing and receiving events is asynchronous, the events that published may not
// be immediately available for our consumer to see, so we'll have to guard against an empty event being sent
if (eventData == null) continue;

int eventCount = 0;
var listTimeSpan = new List<TimeSpan>();
var listDateTime = new List<DateTimeOffset>();

var eventBody = Encoding.UTF8.GetString(eventData.Body.ToArray()).Split('\n');

foreach (var bodyInfo in eventBody)
{
try
{
var message = JsonSerializer.Deserialize<Dictionary<string, object>>(bodyInfo);
eventCount++;

var timeCreated = DateTime.Parse(message["createdAt"].ToString());
var timeIn = DateTime.Parse(message["EventEnqueuedUtcTime"].ToString());
var timeProcessed = DateTime.Parse(message["EventProcessedUtcTime"].ToString());
var timeAsaProcessed = DateTime.Parse(message["ASAProcessedUtcTime"].ToString());
var timeOut = eventData.EnqueuedTime.UtcDateTime.ToLocalTime();

listDateTime.Add(timeIn);
var elapsed = timeOut - timeIn;
listTimeSpan.Add(elapsed);
}
catch (Exception ex)
{
Console.WriteLine("Error while parsing event body.");
Console.WriteLine("Error:" + ex.Message);
Console.WriteLine("Message:" + bodyInfo);
}
}

var batchFrom = listDateTime.Min();
var batchTo = listDateTime.Min();
var minLatency = listTimeSpan.Min().TotalMilliseconds;
var maxLatency = listTimeSpan.Max().TotalMilliseconds;
var avgLatency = Math.Round(listTimeSpan.Average(ts => ts.TotalMilliseconds), 0);

Console.Write($"Received {eventCount} events.");
Console.Write($"\tBatch (From/To): {batchFrom.ToString("HH:mm:ss.ffffff")}/{batchTo.ToString("HH:mm:ss.ffffff")}");
Console.Write($"\tElapsed msec (Min/Max/Avg): {minLatency}/{maxLatency}/{avgLatency}");
Console.WriteLine();

csvOutput.WriteLine($"{eventCount},{eventCount},{batchFrom.ToString("o")},{batchTo.ToString("o")},{minLatency},{maxLatency},{avgLatency}");
}
}
catch (TaskCanceledException)
{
// This is okay because the task was cancelled.
}
}
}
// At this point, our clients have passed their "using" scope and have safely been disposed of. We
// have no further obligations.
}
}
}
162 changes: 11 additions & 151 deletions eventhubs-streamanalytics-eventhubs/tools/eh-asa-perfmon/Program.cs
Original file line number Diff line number Diff line change
@@ -1,14 +1,8 @@
using System;
using CommandLine;
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using System.Linq;
using CommandLine;
using Microsoft.Azure.EventHubs;
using System.Collections.Generic;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System.IO;
using System.Text;

namespace StreamingAtScale
{
Expand All @@ -18,135 +12,6 @@ public class Options
public string EventHubConnectionString { get; set; }
}

class MyPartitionReceiver : IPartitionReceiveHandler
{
private readonly object _lock = new object();
private readonly string _partitionId = string.Empty;
private readonly StreamWriter _csvOutput = null;
private int _maxBatchSize = 1;

public int MaxBatchSize { get => _maxBatchSize; set => _maxBatchSize = value; }

public MyPartitionReceiver(string partitionId, StreamWriter csvOutput)
{
this._partitionId = partitionId;
this._csvOutput = csvOutput;
}

public Task ProcessErrorAsync(Exception error)
{
Console.WriteLine(error.Message);
return Task.FromException(error);
}

public Task ProcessEventsAsync(IEnumerable<EventData> events)
{
int eventCount = 0;

var listTS = new List<TimeSpan>();
var listDT = new List<DateTimeOffset>();

foreach (var e in events)
{
var eventBody = Encoding.UTF8.GetString(e.Body.Array).Split('\n');

foreach (var b in eventBody)
{
try
{
var message = JsonConvert.DeserializeObject<JObject>(b, new JsonSerializerSettings() { DateParseHandling = DateParseHandling.None } );
eventCount += 1;

var timeCreated = DateTime.Parse(message["createdAt"].ToString());
var timeIn = DateTime.Parse(message["EventEnqueuedUtcTime"].ToString());
var timeProcessed = DateTime.Parse(message["EventProcessedUtcTime"].ToString());
var timeASAProcessed = DateTime.Parse(message["ASAProcessedUtcTime"].ToString());
var timeOut = e.SystemProperties.EnqueuedTimeUtc.ToLocalTime();

listDT.Add(timeIn);
var elapsed = timeOut - timeIn;
listTS.Add(elapsed);
}
catch (Exception ex)
{
Console.WriteLine("Error while parsing event body.");
Console.WriteLine("Error:" + ex.Message);
Console.WriteLine("Message:" + b);
}
}
}

var batchFrom = listDT.Min();
var batchTo = listDT.Min();
var minLatency = listTS.Min().TotalMilliseconds;
var maxLatency = listTS.Max().TotalMilliseconds;
var avgLatency = Math.Round(listTS.Average(ts => ts.TotalMilliseconds), 0);

lock(_lock) {
Console.Write($"[{this._partitionId}] Received {eventCount} events in {events.Count()} batch(es).");
Console.Write($"\tBatch (From/To): {batchFrom.ToString("HH:mm:ss.ffffff")}/{batchTo.ToString("HH:mm:ss.ffffff")}");
Console.Write($"\tElapsed msec (Min/Max/Avg): {minLatency}/{maxLatency}/{avgLatency}");
Console.WriteLine();

_csvOutput.WriteLine($"{this._partitionId},{events.Count()},{eventCount},{batchFrom.ToString("o")},{batchTo.ToString("o")},{minLatency},{maxLatency},{avgLatency}");
}

return Task.CompletedTask;
}
}


public class EventReceiver
{
private readonly EventHubClient _client;
private CancellationToken _cancellationToken;

public EventReceiver(string connectionString, CancellationToken _cancellationToken)
{
this._client = EventHubClient.CreateFromConnectionString(connectionString);

this._cancellationToken = _cancellationToken;
}

public async Task Receive()
{
Console.WriteLine("The application will now start to listen for incoming message.");

var csvOutput = File.CreateText("./result.csv");
csvOutput.WriteLine("PartitionId,EventCount,BatchCount,BatchFrom,BatchTo,MinLatency,MaxLatency,AvgLatency");

var runtimeInfo = await _client.GetRuntimeInformationAsync();
Console.WriteLine("Creating receiver handlers...");
var utcNow = DateTime.UtcNow;
var receivers = runtimeInfo.PartitionIds
.Select(pid => {
var receiver = _client.CreateReceiver("$Default", pid, EventPosition.FromEnqueuedTime(utcNow));
Console.WriteLine("Created receiver for partition '{0}'.", pid);
receiver.SetReceiveHandler(new MyPartitionReceiver(pid, csvOutput));
return receiver;
})
.ToList();

try
{
await Task.Delay(-1, this._cancellationToken);
}
catch (TaskCanceledException)
{
// This is okay because the task was cancelled. :)
}
finally
{
// Clean up nicely.
await Task.WhenAll(
receivers.Select(receiver => receiver.CloseAsync())
);
}

csvOutput.Close();
}
}

public class Program
{
static async Task Main(string[] args)
Expand All @@ -167,24 +32,19 @@ static async Task Main(string[] args)
{
Console.WriteLine("No parameters passed or environment variables set.");
Console.WriteLine("Please use --help to learn how to use the application.");
Console.WriteLine(" dotnet run -- --help");
Console.WriteLine("dotnet run -- --help");
return;
}

var cts = new CancellationTokenSource();
var receiver = new EventReceiver(options.EventHubConnectionString, cts.Token);

Task t = receiver.Receive();

Console.ReadKey(true);
cts.Cancel();
// As a preventative measure, we'll specify that cancellation should occur after 60 seconds,
// so that we don't hold indefinitely in the event receiving.

Console.WriteLine("Exiting...");
await t;
using CancellationTokenSource cancellationSource = new CancellationTokenSource();
cancellationSource.CancelAfter(TimeSpan.FromSeconds(60));
var receiver = new EventReceiver();
await receiver.Receive(options.EventHubConnectionString, cancellationSource.Token);

Console.WriteLine("Done.");
Console.WriteLine("Done.");
}

}
}

Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp2.1</TargetFramework>
<TargetFramework>netcoreapp3.1</TargetFramework>
<RootNamespace>StreamingAtScale</RootNamespace>
<LangVersion>8.0</LangVersion>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="CommandLineParser" Version="2.5.0" />
<PackageReference Include="Microsoft.Azure.EventHubs" Version="3.0.0" />
<PackageReference Include="Azure.Messaging.EventHubs" Version="5.1.0" />
<PackageReference Include="CommandLineParser" Version="2.8.0" />
<PackageReference Include="System.Text.Json" Version="4.7.2" />
</ItemGroup>

</Project>

0 comments on commit 063f301

Please sign in to comment.