Share on facebook
Share on twitter
Share on linkedin

How to use AWS Step Functions to iterate and sequentially process jobs

A Cloud Guru News
A Cloud Guru News

The sample project uses the iterator pattern to process a dynamic list of jobs and is freely available on Github

Update: Amazon has since introduced support for dynamic parallelism.

AWS Step Functions is based on the concepts of tasks and state machines, using JSON-based Amazon States Language to define workflows. In this article, we’re going to explore how a workflow can be set up with AWS Step Functions to iterate over an arbitrary number of complex tasks.

At Marked, we use the iterator pattern with AWS Step Functions to intelligently process audio recordings of meetings. Once the audio files are processed, the participants can then access and search the conversations.

For every meeting, the number of files to be processed depends on a range of factors — the number of people on the call, to connection drop outs resulting in partial recordings for the affected person, to someone having to leave the call and return at a later point for different reasons. Regardless, the post-processing workflow should handle all of the scenarios equally well.

TL;DR Process a list of jobs, always perform tasks on the job first in the list, mark job as done and move to the end of the array, repeat until all jobs are completed. Full sample project is available on GitHub.

AWS Step Functions lets you coordinate multiple AWS services into serverless workflows so you can build and update apps quickly. Using Step Functions, you can design and run workflows that stitch together services such as AWS Lambda and Amazon ECS into feature-rich applications. Workflows are made up of a series of steps, with the output of one step acting as input into the next.

Iterate Pattern

The iterator pattern described in the AWS documentation works very well when the step doing the work for each iteration doesn’t perform any specific input and output processing.

But how do we deal with iterations that perform processing and modify the state machine data in place? Let’s imagine a workflow that takes a list of jobs as input on execution.

{
  "jobs": [{
    "name": "First job"
  }, {
    "name": "Second job"
  }]
}

Each job should be processed by two tasks, adding the results to the state machine data — ultimately producing the following output:

{
  "jobs": [{
    "name": "First job",
    "firstResult": "success",
    "secondResult": "success"
  }, {
    "name": "Second job",
    "firstResult": "success",
    "secondResult": "success"
  }]
}

The key aspect is that firstResult and secondResult are set for each individual job as the workflow is being processed.

Note: Amazon generally recommends to keep the state machine data as small as possible and store larger objects on S3, DynamoDB or elsewhere, using references within the workflow.

Due to limitations within the Amazon States Language, it’s not possible to use the full JsonPath syntax to select or query the jobs array. Instead, we can carry out the tasks just on the first item in the array, mark the job as done after processing is complete, and move it to the end of the array.

To determine if the workflow has reached the end, each iteration concludes with another step checking if the next job in line — always the first item in the array — has already been marked as done. This leads to the complete workflow definition below.

The workflow runs until all jobs are done
{
  "Comment": "Processes an arbitrary list of jobs.",
  "StartAt": "ProcessFirstPass",
  "States": {
    "ProcessFirstPass": {
      "Type": "Pass",
      "Result": "success",
      "ResultPath": "$.jobs[0].firstResult",
      "Next": "ProcessSecondPass"
    },
    "ProcessSecondPass": {
      "Type": "Pass",
      "Result": "success",
      "ResultPath": "$.jobs[0].secondResult",
      "Next": "MarkAsDone"
    },
    "MarkAsDone": {
      "Type": "Pass",
      "ResultPath": "$.jobs[0].done",
      "Result": true,
      "Next": "MoveToEnd"
    },
    "MoveToEnd": {
      "Type": "Task",
      "Comment": "Moves the currently processed job to the end of the array",
      "InputPath": "$.jobs",
      "ResultPath": "$.jobs",
      "Resource": "${ProcessMoveToEnd.Arn}",
      "Next": "AllDone"
    },
    "AllDone": {
      "Type": "Choice",
      "Choices": [
        {
          "Variable": "$.jobs[0].done",
          "BooleanEquals": true,
          "Next": "Done"
        }
      ],
      "Default": "ProcessFirstPass"
    },
    "Done": {
      "Type": "Pass",
      "End": true
    }
  }
}

Both steps ProcessFirstPass and ProcessSecondPass could be a Lambda function or an activity. In this example, a Pass state is sufficient to demonstrate the flow.

The function moving the first item in the array to the end is implemented as a Lambda function. I chose Go — but if you prefer Node.js. or Python, you could technically provide the source code even as inline text in your CloudFormation template.

package main
import (
  "github.com/aws/aws-lambda-go/lambda"
)
func handler(list []interface{}) ([]interface{}, error) {
  if len(list) == 0 {
    return list, nil
  }
  return append(list[1:], list[0]), nil
}
func main() {
  lambda.Start(handler)
}

The final piece in the puzzle is the execution of the workflow itself:

aws stepfunctions start-execution \
  --state-machine-arn <STATE_MACHINE_ARN>
  --input "{\"jobs\": [{\"input\": \"First job\"}, {\"input\": \"Second job\"}]}"

However this causes a runtime error because the Choice step checking a job for its completion requires the done attribute to be set.

{
  "error": "States.Runtime",
  "cause": "An error occurred while executing the state 'AllDone' (entered at the event id #13). Invalid path '$.jobs[0].done': The choice state’s condition path references an invalid value."
}

To start the execution correctly, provide the default value false for all jobs.

aws stepfunctions start-execution \
  --state-machine-arn <STATE_MACHINE_ARN>
  --input "{\"jobs\": [{\"input\": \"First job\", \"done\": false}, {\"input\": \"Second job\", \"done\": false}]}"
🎉🎉🎉
{
  "startDate": 1534268042.564, 
  "executionArn": "arn:[...]:83a1d843–0a60–45b7–8e6b-3be8077cfd09"
}

Conclusion

This example shows how a dynamic list of jobs can be processed with AWS Step Functions. It’s worth pointing out the approach outlined in this article does not make use of parallelism — but handles all jobs sequentially. For a full working version see the sample project available on GitHub.

I’d love to hear hear your feedback on this approach, or about your own experience with AWS Step Functions. Please drop your comment below or connect with me on Twitter @christianklotz.

Get Started
Who’s going to be learning?
Sign In
Welcome Back!

Psst…this one if you’ve been moved to ACG!