Skip to content

Commit

Permalink
Merge azure-storage branch into master
Browse files Browse the repository at this point in the history
  • Loading branch information
cgillum authored Jun 29, 2019
2 parents d65b992 + bae4c4e commit e6824be
Show file tree
Hide file tree
Showing 14 changed files with 239 additions and 49 deletions.
146 changes: 120 additions & 26 deletions Test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,25 @@ public async Task AutoStart(bool enableExtendedSessions)
}
}

[DataTestMethod]
[DataRow(false)]
[DataRow(true)]
public async Task ContinueAsNewThenTimer(bool enableExtendedSessions)
{
using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions))
{
await host.StartAsync();

var client = await host.StartOrchestrationAsync(typeof(Test.Orchestrations.ContinueAsNewThenTimerOrchestration), 0);
var status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(30));

Assert.AreEqual(OrchestrationStatus.Completed, status?.OrchestrationStatus);
Assert.AreEqual("OK", JToken.Parse(status?.Output));

await host.StopAsync();
}
}

[TestMethod]
public async Task PurgeInstanceHistoryForSingleInstanceWithoutLargeMessageBlobs()
{
Expand Down Expand Up @@ -262,11 +281,8 @@ public async Task PurgeInstanceHistoryForSingleInstanceWithLargeMessageBlobs()
IList<OrchestrationState> results = await host.GetAllOrchestrationInstancesAsync();
Assert.AreEqual(1, results.Count);

await ValidateBlobUrlAsync(
host.TaskHub,
instanceId,
results.First(x => x.OrchestrationInstance.InstanceId == instanceId).Output,
Encoding.UTF8.GetByteCount(message));
string result = JToken.Parse(results.First(x => x.OrchestrationInstance.InstanceId == instanceId).Output).ToString();
Assert.AreEqual(message, result);

await client.PurgeInstanceHistory();

Expand Down Expand Up @@ -313,12 +329,8 @@ public async Task PurgeInstanceHistoryForTimePeriodDeleteAll()
Assert.AreEqual("\"Done\"", results.First(x => x.OrchestrationInstance.InstanceId == firstInstanceId).Output);
Assert.AreEqual("\"Done\"", results.First(x => x.OrchestrationInstance.InstanceId == secondInstanceId).Output);
Assert.AreEqual("\"Done\"", results.First(x => x.OrchestrationInstance.InstanceId == thirdInstanceId).Output);

await ValidateBlobUrlAsync(
host.TaskHub,
fourthInstanceId,
results.First(x => x.OrchestrationInstance.InstanceId == fourthInstanceId).Output,
Encoding.UTF8.GetByteCount(message));
string result = JToken.Parse(results.First(x => x.OrchestrationInstance.InstanceId == fourthInstanceId).Output).ToString();
Assert.AreEqual(message, result);

List<HistoryStateEvent> firstHistoryEvents = await client.GetOrchestrationHistoryAsync(firstInstanceId);
Assert.IsTrue(firstHistoryEvents.Count > 0);
Expand Down Expand Up @@ -672,6 +684,7 @@ private async Task<Tuple<string, TestOrchestrationClient>> ValidateCharacterCoun
await host.StartAsync();

string initialMessage = this.GenerateMediumRandomStringPayload().ToString();
string finalMessage = initialMessage;
int counter = initialMessage.Length;
var initialValue = new Tuple<string, int>(initialMessage, counter);
TestOrchestrationClient client =
Expand All @@ -684,22 +697,19 @@ private async Task<Tuple<string, TestOrchestrationClient>> ValidateCharacterCoun

// Perform some operations
await client.RaiseEventAsync("operation", "double");
finalMessage = finalMessage + new string(finalMessage.Reverse().ToArray());
counter *= 2;

// TODO: Sleeping to avoid a race condition where multiple ContinueAsNew messages
// are processed by the same instance at the same time, resulting in a corrupt
// storage failure in DTFx.
await Task.Delay(10000);
await client.RaiseEventAsync("operation", "double");
finalMessage = finalMessage + new string(finalMessage.Reverse().ToArray());
counter *= 2;
await Task.Delay(10000);
await client.RaiseEventAsync("operation", "double");
counter *= 2;
await Task.Delay(10000);
await client.RaiseEventAsync("operation", "double");
counter *= 2;
await Task.Delay(10000);
await client.RaiseEventAsync("operation", "double");
finalMessage = finalMessage + new string(finalMessage.Reverse().ToArray());
counter *= 2;
await Task.Delay(10000);

Expand All @@ -715,11 +725,14 @@ private async Task<Tuple<string, TestOrchestrationClient>> ValidateCharacterCoun
status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(10));

Assert.AreEqual(OrchestrationStatus.Completed, status?.OrchestrationStatus);
var result = status?.Output;
var result = JObject.Parse(status?.Output);
Assert.IsNotNull(result);

await ValidateBlobUrlAsync(host.TaskHub, client.InstanceId, result);
await ValidateBlobUrlAsync(host.TaskHub, client.InstanceId, status?.Input);
var input = JObject.Parse(status?.Input);
Assert.AreEqual(finalMessage, input["Item1"].Value<string>());
Assert.AreEqual(finalMessage.Length, input["Item2"].Value<int>());
Assert.AreEqual(finalMessage, result["Item1"].Value<string>());
Assert.AreEqual(counter, result["Item2"].Value<int>());

await host.StopAsync();

Expand Down Expand Up @@ -1265,9 +1278,9 @@ public async Task SmallTextMessagePayloads(bool enableExtendedSessions)
[DataTestMethod]
[DataRow(true)]
[DataRow(false)]
public async Task LargeTextMessagePayloads(bool enableExtendedSessions)
public async Task LargeTextMessagePayloads_BlobUrl(bool enableExtendedSessions)
{
using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions))
using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions, fetchLargeMessages: false))
{
await host.StartAsync();

Expand All @@ -1276,6 +1289,11 @@ public async Task LargeTextMessagePayloads(bool enableExtendedSessions)
var status = await client.WaitForCompletionAsync(TimeSpan.FromMinutes(2));

Assert.AreEqual(OrchestrationStatus.Completed, status?.OrchestrationStatus);
await ValidateBlobUrlAsync(
host.TaskHub,
client.InstanceId,
status?.Input,
Encoding.UTF8.GetByteCount(message));
await ValidateBlobUrlAsync(
host.TaskHub,
client.InstanceId,
Expand All @@ -1286,6 +1304,81 @@ await ValidateBlobUrlAsync(
}
}

/// <summary>
/// End-to-end test which validates that orchestrations with > 60KB text message sizes can run successfully.
/// </summary>
[DataTestMethod]
[DataRow(true)]
[DataRow(false)]
public async Task LargeTextMessagePayloads_FetchLargeMessages(bool enableExtendedSessions)
{
using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions, fetchLargeMessages: true))
{
await host.StartAsync();

string message = this.GenerateMediumRandomStringPayload().ToString();
var client = await host.StartOrchestrationAsync(typeof(Orchestrations.Echo), message);
var status = await client.WaitForCompletionAsync(TimeSpan.FromMinutes(2));

Assert.AreEqual(OrchestrationStatus.Completed, status?.OrchestrationStatus);
Assert.AreEqual(message, JToken.Parse(status?.Input));
Assert.AreEqual(message, JToken.Parse(status?.Output));

await host.StopAsync();
}
}

/// <summary>
/// End-to-end test which validates that orchestrations with > 60KB text message sizes can run successfully.
/// </summary>
[DataTestMethod]
[DataRow(true)]
[DataRow(false)]
public async Task NonBlobUriPayload_FetchLargeMessages_RetainsOriginalPayload(bool enableExtendedSessions)
{
using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions, fetchLargeMessages: true))
{
await host.StartAsync();

string message = "https://anygivenurl.azurewebsites.net";
var client = await host.StartOrchestrationAsync(typeof(Orchestrations.Echo), message);
var status = await client.WaitForCompletionAsync(TimeSpan.FromMinutes(2));

Assert.AreEqual(OrchestrationStatus.Completed, status?.OrchestrationStatus);
Assert.AreEqual(message, JToken.Parse(status?.Input));
Assert.AreEqual(message, JToken.Parse(status?.Output));

await host.StopAsync();
}
}

/// <summary>
/// End-to-end test which validates that orchestrations with > 60KB text message sizes can run successfully.
/// </summary>
[DataTestMethod]
[DataRow(true)]
[DataRow(false)]
public async Task LargeTextMessagePayloads_FetchLargeMessages_QueryState(bool enableExtendedSessions)
{
using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions, fetchLargeMessages: true))
{
await host.StartAsync();

string message = this.GenerateMediumRandomStringPayload().ToString();
var client = await host.StartOrchestrationAsync(typeof(Orchestrations.Echo), message);
var status = await client.WaitForCompletionAsync(TimeSpan.FromMinutes(2));

//Ensure that orchestration state querying also retrieves messages
status = (await client.GetStateAsync(status.OrchestrationInstance.InstanceId)).First();

Assert.AreEqual(OrchestrationStatus.Completed, status?.OrchestrationStatus);
Assert.AreEqual(message, JToken.Parse(status?.Input));
Assert.AreEqual(message, JToken.Parse(status?.Output));

await host.StopAsync();
}
}

private StringBuilder GenerateMediumRandomStringPayload()
{
// Generate a medium random string payload
Expand Down Expand Up @@ -1327,8 +1420,8 @@ public async Task LargeBinaryByteMessagePayloads(bool enableExtendedSessions)

Assert.AreEqual(OrchestrationStatus.Completed, status?.OrchestrationStatus);

// Large message payloads may actually get bigger when stored in blob storage.
await ValidateBlobUrlAsync(host.TaskHub, client.InstanceId, status?.Output, (int)(readBytes.Length * 1.3));
byte[] resultBytes = JObject.Parse(status?.Output).ToObject<byte[]>();
Assert.IsTrue(readBytes.SequenceEqual(resultBytes));

await host.StopAsync();
}
Expand Down Expand Up @@ -1359,7 +1452,8 @@ public async Task LargeBinaryStringMessagePayloads(bool enableExtendedSessions)
Assert.AreEqual(OrchestrationStatus.Completed, status?.OrchestrationStatus);

// Large message payloads may actually get bigger when stored in blob storage.
await ValidateBlobUrlAsync(host.TaskHub, client.InstanceId, status?.Output, (int)(readBytes.Length * 1.3));
string result = JToken.Parse(status?.Output).ToString();
Assert.AreEqual(message, result);

await host.StopAsync();
}
Expand Down Expand Up @@ -2014,7 +2108,7 @@ public override async Task<Tuple<string, int>> RunTask(OrchestrationContext cont
{
case "double":
inputData = new Tuple<string, int>(
$"{inputData.Item1}{inputData.Item1.Reverse()}",
$"{inputData.Item1}{new string(inputData.Item1.Reverse().ToArray())}",
inputData.Item2 * 2);
break;
case "end":
Expand Down
4 changes: 3 additions & 1 deletion Test/DurableTask.AzureStorage.Tests/TestHelpers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ static class TestHelpers
{
public static TestOrchestrationHost GetTestOrchestrationHost(
bool enableExtendedSessions,
int extendedSessionTimeoutInSeconds = 30)
int extendedSessionTimeoutInSeconds = 30,
bool fetchLargeMessages = true)
{
string storageConnectionString = GetTestStorageAccountConnectionString();

Expand All @@ -30,6 +31,7 @@ public static TestOrchestrationHost GetTestOrchestrationHost(
TaskHubName = ConfigurationManager.AppSettings.Get("TaskHubName"),
ExtendedSessionsEnabled = enableExtendedSessions,
ExtendedSessionIdleTimeout = TimeSpan.FromSeconds(extendedSessionTimeoutInSeconds),
FetchLargeMessageDataEnabled = fetchLargeMessages,
};

return new TestOrchestrationHost(settings);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1504,6 +1504,11 @@ public async Task<OrchestrationState> WaitForOrchestrationAsync(
}
else
{
if (this.settings.FetchLargeMessageDataEnabled)
{
state.Input = await this.messageManager.FetchLargeMessageIfNecessary(state.Input);
state.Output = await this.messageManager.FetchLargeMessageIfNecessary(state.Output);
}
return state;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,12 @@ public class AzureStorageOrchestrationServiceSettings
/// </summary>
public bool ExtendedSessionsEnabled { get; set; }

/// <summary>
/// Gets or sets a flag indicating whether to automatically fetch large orchestration input and outputs
/// when it is stored in a compressed blob when retrieving orchestration state.
/// </summary>
public bool FetchLargeMessageDataEnabled { get; set; } = true;

/// <summary>
/// Gets or sets the number of seconds before an idle session times out.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<PropertyGroup>
<TargetFrameworks>netstandard2.0;net451</TargetFrameworks>
<GeneratePackageOnBuild>false</GeneratePackageOnBuild>
<FileVersion>1.6.2</FileVersion>
<FileVersion>1.6.3</FileVersion>
<AssemblyVersion>$(FileVersion)</AssemblyVersion>
<Version>$(FileVersion)</Version>
<IncludeSymbols>true</IncludeSymbols>
Expand Down
19 changes: 19 additions & 0 deletions src/DurableTask.AzureStorage/MessageManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ namespace DurableTask.AzureStorage
using System.Text;
using System.Threading.Tasks;
using DurableTask.AzureStorage.Monitoring;
using DurableTask.Core;
using Microsoft.WindowsAzure.Storage.Blob;
using Microsoft.WindowsAzure.Storage.Queue;
using Newtonsoft.Json;
Expand Down Expand Up @@ -109,6 +110,24 @@ public async Task<string> SerializeMessageDataAsync(MessageData messageData)
return JsonConvert.SerializeObject(messageData, this.taskMessageSerializerSettings);
}

/// <summary>
/// If the "message" of an orchestration state is actually a URI retrieves actual message from blob storage.
/// Otherwise returns the message as is.
/// </summary>
/// <param name="message">The message to be fetched if it is a url.</param>
/// <returns>Actual string representation of message.</returns>
public async Task<string> FetchLargeMessageIfNecessary(string message)
{
if (Uri.IsWellFormedUriString(message, UriKind.Absolute))
{
return await this.DownloadAndDecompressAsBytesAsync(new Uri(message));
}
else
{
return message;
}
}

/// <summary>
/// Deserializes the MessageData object
/// </summary>
Expand Down
18 changes: 12 additions & 6 deletions src/DurableTask.AzureStorage/Messaging/OrchestrationSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public OrchestrationSession(

public IList<MessageData> CurrentMessageBatch { get; private set; }

public OrchestrationRuntimeState RuntimeState { get; }
public OrchestrationRuntimeState RuntimeState { get; private set; }

public string ETag { get; set; }

Expand Down Expand Up @@ -117,15 +117,21 @@ public async Task<IList<TaskMessage>> FetchNewOrchestrationMessagesAsync(
return messages;
}

public void UpdateRuntimeState(OrchestrationRuntimeState runtimeState)
{
this.RuntimeState = runtimeState;
this.Instance = runtimeState.OrchestrationInstance;
}

internal bool IsOutOfOrderMessage(MessageData message)
{
if (this.IsNonexistantInstance() && message.OriginalQueueMessage.DequeueCount > 3)
if (this.IsNonexistantInstance() && message.OriginalQueueMessage.DequeueCount > 5)
{
// The first three times a message for a nonexistant instance is dequeued, give the message the benefit
// The first five times a message for a nonexistant instance is dequeued, give the message the benefit
// of the doubt and assume that the instance hasn't had its history table populated yet. After the
// third execution, the most likely scenario is that this is a zombie event for a previous iteration of
// an orchestration that called ContinueAsNew(). Return false to let the zombie message continue on to be
// discarded so that we don't end up processing it indefinitely.
// fifth execution, ~30 seconds have passed and the most likely scenario is that this is a zombie event.
// This means the history table for the message's orchestration no longer exists, either due to an explicit
// PurgeHistory request or due to a ContinueAsNew call cleaning the old execution's history.
return false;
}

Expand Down
2 changes: 1 addition & 1 deletion src/DurableTask.AzureStorage/Messaging/Session.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public SessionBase(string storageAccountName, string taskHubName, OrchestrationI
};
}

public OrchestrationInstance Instance { get; }
public OrchestrationInstance Instance { get; protected set; }

public OperationContext StorageOperationContext { get; }

Expand Down
Loading

0 comments on commit e6824be

Please sign in to comment.