Skip to content

Part 3 Task routing

Gurmit Teotia edited this page Jun 20, 2019 · 12 revisions

In this part of the tutorial, we will execute TranscodeActivity and UploadToS3Activity on the same worker machine where DownloadActivity was executed. We will use Amazon SWF task routing.

Note: This is just one of the way you can use task routing. Amazon SWF document also discusses other possibilities.

We will use the following strategy in transcoding workflow to achieve the task routing:

  • DownloadActivity will be scheduled on the default task list, which will be polled by different worker machines and it can be executed on either of them
  • DownloadActivity in its response sends the task list, specific to this machine, for the workflow to schedule the dependent activities on.
  • The host for TranscodeActivity and UploadToS3Activity will be polling on the machine-specific task list
  • Workflow will schedule the TranscodeActivity and UploadToS3Activity on the machine-specific task list.

Following example of modified workflow clarify it further:

[WorkflowDescription("1.0")]
public class TranscodeWorkflow : WorkflowDescription
{
  public TranscodeWorkflow()
  {
    //DownloadActivity will be scheduled on its default task list
    ScheduleActivity<DownloadActivity>()
        .WithInput(_=> new {DownloadFolder = Input.SessionId});
	 
    //TranscodeActivity will be scheduled on the machine where download activity has executed. 
    ScheduleActivity<TranscodeActivity>().AfterActivity<DownloadFile>()
	.OnTaskList(a=>a.Parent().Result().PollingQueue)
	.WithInput(a=> new { InputFile=a.Parent().Result().DownloadedFile, Format ="MP4});
	
    //TranscodeActivity will be scheduled on the machine where download activity has executed.	
    ScheduleActivity<UploadToS3Activity>().AfterActivity<TranscodeActivity>()
	.OnTaskList(_=>Activity<DownloadActivity>().Result().PollingQueue)
	.WithInput(a=> new { InputFile=a.Parent().Result().TranscodedFile});
	 
    //SendConfirmationActivity can be executed on any worker machine	 
    ScheduleActivity<SendConfirmationActivity>().AfterActivity<UploadToS3Activity>();
  }  
}

Now let us change the DownloadActivity to return polling task list for dependent activity, rest of the activities will remain same:

[ActivityDescription("1.0")]
public class DownloadActivity : Activity
{
 [ActivityMethod]
 public async Task<Response> Execute(Input input)
 {
  //write code to download it from downloadFolder
  Console.WriteLine("Downloading from from S3 bucket {0}",downloadFolder);
  return new Response() {DownloadedPath = "downloaded path", PollingQueue = PollingQueue.Download};
 }
 private class Response
 {
   public string DownloadedFile;
   public string PollingQueue;
 }
 private class Input
 {
   public string DownloadFolder;
 }
}

public class PollingQueue
{
    //In production code this value should remain same even when worker is restarted.
    public static string Download = Guid.NewGuid().ToString();
}

Now we need to change the hosting strategy to support task routing. We will create two ActivityHost, one will host DownloadActivity and SendConfirmation activity and poll on default task list while another one will host TranscodeActivity and UploadToS3Activity and will poll on the machine-specific task list.

var domain = new Domain("learning", RegionEndpoint.EUWest2);
await domain.RegisterAsync(10);

var activities1 = new[]{typeof(DownloadActivity), typeof(SendConfirmationActivity)};
var activities2 = new[]{typeof(TranscodeActivity), typeof(UploadToS3Activity)};


foreach (var a in activities1.Concat(activities2))
    await domain.RegisterActivityAsync(a);

await domain.RegisterWorkflowAsync<TranscodeWorkflow>();

using (var workflowHost = domain.Host(new[] { new TranscodeWorkflow() }))
using (var activityHost1 = domain.Host(activities1))
using (var activityHost2 = domain.Host(activities2))
{
    workflowHost.StartExecution();
    activityHost1.StartExecution(new TaskQueue("dtasklist"));
    activityHost2.StartExecution(new TaskQueue(PollingQueue.Download));
    Console.WriteLine("Press any key to terminate");
    Console.ReadKey();
}
Clone this wiki locally