Skip to content

Part 2 Schedule activities

Gurmit Teotia edited this page Jun 20, 2019 · 13 revisions

In this part of the tutorial, we will enhance the transcoding workflow to schedule the activities. For this tutorial, we will write the activities in Guflow but you can write them in any other language/framework.

Let us give a meaning to transcode workflow by scheduling the activities-

[WorkflowDescription("1.0"...)]
public class TranscodeWorkflow : Workflow
{
  public TranscodeWorkflow()
  {
    ScheduleActivity<DownloadActivity>()
        .WithInput(_=> new {DownloadFolder = Input.SessionId});

    ScheduleActivity<TranscodeActivity>().AfterActivity<DownloadActivity>()
        .WithInput(a => new {InputFile = ParentResult(a).DownloadedFile, Format = "MP4"});

    ScheduleActivity<UploadToS3Activity>().AfterActivity<TranscodeActivity>()
        .WithInput(a => new { InputFile = ParentResult(a).Result().TranscodedFile });

    ScheduleActivity<SendConfirmationActivity>().AfterActivity<UploadToS3Activity>();
  }
  private static dynamic ParentResult(IActivityItem a) => a.ParentActivity().Result();
}

Things to note in the above example-

  • Activities are scheduled using ScheduleActivity API (with the same ease you can schedule other supported items: lambda functions, timer and child workflows).
  • Workflow input is read from Input property of workflow. Workflow.Input returns a dynamic object and allows you to access JSON format data with ease.

In the above example

  1. "DownloadActivity" is the startup activity and it will be scheduled when the workflow is started. It is passed "SessionId" ( which is being read from workflow input) as download folder.
  2. "TranscodeActivity" is scheduled after "DownloadActivity" is completed and is passed in input the path of the downloaded file and the transcoding format.
  3. "UploadToS3Activity" is scheduled after "TranscodeActivity" and path of the transcoded file is passed as input.
  4. Workflow is completed after "UploadToS3Activity" is completed.

Note- In above execution is quite serial but Guflows allows you to schedule the activities (or other items) in parallel workflow branches.

Let us first look at DownloadActivity. We will leave out the code to download the file from S3 bucket and to transcode the video in the following examples for clarity:

[ActivityDescription("1.0")]
public class DownloadActivity : Activity
{
 //Activity method can be async
 [ActivityMethod]
 public async Task<Response> Execute(Input input)
 {
  //write code to download it from downloadFolder
  var downloadedPath = ....assigned downloaded path.
  return new Response(){ DownloadedPath = "local downloaded path"};
 }
 private class Response
 {
   public string DownloadedFile;
 }
 private class Input
 {
   public string DownloadFolder;
 }
}

The minimum requirement in Guflow to create activity is to

  • Derive a class from Activity class
  • Decorate it with ActivityDescriptionAttribute.
  • And implement a method with ActivityMethod attribute

In ActivityDescriptionAttribute, "Version" is the only required property. By default activity's name is derived from its class name but you can override by setting the Name property of ActivityDescriptionAttribute. Other properties of ActivityDescriptionAttribute are optional while registering the activity but are required while scheduling the activity. You can provide these activity attributes either at registration time or at scheduling time.

Lets us define our remaining activities (leaving out ActivityDescription properties for clarity)-

[ActivityDescription("1.0")]
public class TranscodeActivity : Activity
{
  [ActivityMethod]
  public async Task<Response> Execute(Input input)
  {
      //simulate transcode
      await Task.Delay(10);
      return new Response{TranscodedFile ="some file"};
  }
  public class Input
  {
      public string FilePath;
  }
  public class Response
  {
    public string TranscodedFile;
  }
}

[ActivityDescription("1.0")]
public class UploadToS3Activity : Activity
{
  [ActivityMethod]
  public async Task<string> Execute(Input input)
  {
      //simulate upload to s3
      await Task.Delay(10);
      return "done";
  }
  public class Input
  {
      public string FilePath;
  }
}

[ActivityDescription("1.0")]
public class SendConfirmationActivity : Activity
{
  [ActivityMethod]
  public async Task<string> Execute(string input)
  {
      //simulate send email
      await Task.Delay(10);
      return "done";
  }
}

Let us now host the transcode workflow and its related activities-

var domain = new Domain("learning", RegionEndpoint.EUWest2);
await domain.RegisterAsync(10);
var activities = new[]
{
    typeof(DownloadActivity), typeof(TranscodeActivity), typeof(UploadToS3Activity),
    typeof(SendConfirmationActivity)
};
//Register the activities if not already registered.
foreach (var a in activities)
    await domain.RegisterActivityAsync(a);

await domain.RegisterWorkflowAsync<TranscodeWorkflow>();

using (var workflowHost = domain.Host(new[] { new TranscodeWorkflow() }))
using (var activityHost = domain.Host(activities))
{
    workflowHost.StartExecution();
    activityHost.StartExecution(new TaskList("dtasklist"));
    Console.WriteLine("Press any key to terminate");
    Console.ReadKey();
}          

In above example host expect activities to have default constructor but it allows you to take control of activity instance creation in case you have parameterized constructor as shown in following example:

    using (var activityHost = domain.Host(activities, t=>_yourServiceLocator.CreateInstance(t))
    {
       activityHost.StartExecution(new TaskList("somequeue"));
       Console.WriteLine("Press any key to terminate");
       Console.ReadKey();
    }

Above setup will work fine if you have only one worker machine to execute all the activities but in reality, you may have multiple worker machines to execute the activities. In such scenario above setup will fail. e.g. if DownloadActivity is executed on machine A and TranscodeActivity scheduled on machine B then it will not find the downloaded file. To sort out this problem TranscodeActivity and UploadToS3Activity should be executed on the same machine A and that is what next section talks about.

Clone this wiki locally