Building scheduled workflows using native integrations and intrinsic functions

Heeki Park
13 min readJan 31, 2024

In my last post, I discussed my approach to building a serverless API using single-table design with DynamoDB and elements of test-driven development. By the end of that post, I had a basic API with functional CRUD capabilities.

In this post, I discuss how I leveraged the serverless API as a building block to compose a scheduled workflow, using both EventBridge Scheduler and Step Functions workflows. I cover how I setup the cron schedule, and then dive deep on how I setup the workflow using the new HTTP invoke integration, SDK integrations, and intrinsic functions.

Components for the scheduled workflow on AWS

The code for this project has been published to this repository.

Scheduling workflows using a cron scheduler

I scheduled the execution of the workflow using a cron schedule with EventBridge Scheduler. I set my schedule to send every weekday at 6am: cron(0 6 ? * MON-FRI *)

The schema of the input payload to the Step Functions workflow contains a lot of custom elements, which are documented in following sections. Those elements need to be injected as part of the input payload from EventBridge. EventBridge Scheduler provides context attributes, like the execution id and execution time, that can be added to the Input payload definition.

With that definition, I end up with the following example input payload:

{
"id": "d7de3b85-3c5b-b567-2676-eba080e718d7",
"time": "2024-01-19T06:00:00Z",
"detail": {
"source_email": "reading@mail.example.com",
"reading_endpoint": "https://api.example.com/reading",
"user_endpoint": "https://api.example.com/user",
"plan_id": "779b3465-c39b-4aa1-9225-9ed435888be5",
"send_type": "template"
}
}

If you instead use EventBridge rules, you could do similar transformation using the InputTransformer.

Using native integrations in workflows

Within the workflow, I used either the new HTTP invoke integration or SDK integrations, along with intrinsic functions for data transformation.

Initial workflow with HTTP and SDK integrations

Using the HTTP invoke state to get the reading of the day

The first step entails getting the reading for the day. I had an API to get the details of a reading by the uid, but not by the date.

Adding API functionality into the GET method
I built that functionality into the GET method with a date query string parameter. Now to get the reading for the day, I can use the following: curl -XGET https://api.example.com/reading?date=2024-01–19

As I built more and more functionality into the GET method with query string parameters, for easier readability, I created an enum that defined the set of actions for that GET method. I then wrote a get_action() method that parses the query string parameters to interpret the desired action and returns a value of the enum. I then used match and case to call the appropriate method in the domain class.

Integrating the API endpoint into a workflow state
With this API endpoint, I then setup a state in the workflow to call that endpoint with the appropriate parameters.

  "GetTodaysReading": {
"Type": "Task",
"Resource": "arn:aws:states:::http:invoke",
"Parameters": {
"ApiEndpoint.$": "$.detail.reading_endpoint",
"Method": "GET",
"Authentication": {
"ConnectionArn": "${Connection}"
},
"Headers": {
"content-type": "application/json"
},
"QueryParameters": {
"date.$": "States.ArrayGetItem(States.StringSplit($.time, 'T'), 0)"
}
}

Extracting values from the event payload
The state is of type Task and uses the http:invoke resource, which is the state for calling third-party endpoints. The ApiEndpoint is the target endpoint, which I pass in via the event payload. To re-iterate from the scheduling section above, an example event payload to invoke the workflow is shown below.

{
"id": "d7de3b85-3c5b-b567-2676-eba080e718d7",
"time": "2024-01-19T06:00:00Z",
"detail": {
"source_email": "reading@mail.example.com",
"reading_endpoint": "https://api.example.com/reading",
"user_endpoint": "https://api.example.com/user",
"plan_id": "779b3465-c39b-4aa1-9225-9ed435888be5",
"send_type": "template"
}
}

The ApiEndpoint value is retrieved using JSONPath from $.detail.reading_endpoint, which is https://api.example.com/reading, and Method is set to GET. Translating the task to a curl command, so far I have the following:

curl -XGET \
https://api.example.com/reading

Configuring authentication for the API endpoint
The Authentication.ConnectionArn parameter references the Amazon Resource Name (ARN) of an EventBridge connection. API connections are used to define the client authentication and authorization parameters for connecting to an API endpoint. I create the connection by setting AuthorizationType as API_KEY, setting Authorization as the header key, and using a secret in Secrets Manager as the header value. I set the value of the secret via configuration parameter to the SAM template. Now I have the following:

curl -XGET \
-H "Authorization: {{secret}}" \
https://api.example.com/reading

Setting additional headers for the API endpoint
The Headers parameter sets any additional headers that are required. I pass along the content-type: application/json header, which is important for API Gateway when performing request validation against your API models. Now I have the following:

curl -XGET \
-H "content-type: application/json" \
-H "Authorization: {{secret}}" \
https://api.example.com/reading

Adding query string parameters
The QueryParameters parameter sets any query string parameters that are required. In this case, I set the date query string parameter. To do that, I use intrinsic functions to extract the date from the event payload. The $.time value has the ISO formatted datetime string: 2024-01-19T06:00:00Z. First I use the States.StringSplit(string, delimiter) function to split the datetime string by the T value. Then I use the States.ArrayGetItem(array, index) function to get the first element from the resultant split string array. And finally, I have the following:

curl -XGET \
-H "content-type: application/json" \
-H "Authorization: {{secret}}" \
https://api.example.com/reading?date=2024-01-19

Putting the state definition and runtime parameters together
If I take the state and inject the values as they would be at runtime, then I have something like this:

  "GetTodaysReading": {
"Type": "Task",
"Resource": "arn:aws:states:::http:invoke",
"Parameters": {
"ApiEndpoint.$": "https://api.example.com/reading",
"Method": "GET",
"Authentication": {
"ConnectionArn": "arn:aws:events:us-east-1:546275881527:connection/Connection-OECcz2FSwCDV/ac31901d-0dec-40b6-a474-f5b1e69213d6"
},
"Headers": {
"content-type": "application/json"
},
"QueryParameters": {
"date.$": "2024-01-19"
}
}

Processing the response and passing data to the next state
When the HTTP invoke completes, the result of the task (elements removed for brevity) is as follows:

{
"Headers": { ... },
"ResponseBody": {
"category": "reading",
"uid": "135ac1a6-a394-4557-b5e9-c709eb508b06",
"description": "...",
"body": "...",
"plan_id": "779b3465-c39b-4aa1-9225-9ed435888be5",
"sent_date": "2024-01-19T00:00:00.000000"
},
"StatusCode": 200,
"StatusText": "OK"
}

Now I need to setup the payload that will be passed to the next state. I want to append the reading data to the input payload, as elements of the input payload are still useful as input for downstream states. To do this, I used the following configuration:

  "ResultSelector": {
"response.$": "$.ResponseBody"
},
"ResultPath": "$.reading",
"OutputPath": "$"

The ResultSelector property allows me to use JSONPath to select what data I want to keep from the task output. The ResultPath property allows me to specify where to put that selected data in the output. The OutputPath property allows me to specify what ultimately gets sent downstream. Using the properties above, I end up with the following payload that gets passed to the next state in the workflow:

{
"id": "d7de3b85-3c5b-b567-2676-eba080e718d7",
"time": "2024-01-19T06:00:00Z",
"detail": {
"source_email": "reading@mail.example.com",
"reading_endpoint": "https://api.example.com/reading",
"user_endpoint": "https://api.example.com/user",
"plan_id": "779b3465-c39b-4aa1-9225-9ed435888be5",
"send_type": "template"
},
"reading": {
"response": {
"category": "reading",
"uid": "135ac1a6-a394-4557-b5e9-c709eb508b06",
"description": "...",
"body": "...",
"plan_id": "779b3465-c39b-4aa1-9225-9ed435888be5",
"sent_date": "2024-01-19T00:00:00.000000"
}
}
}

The OutputPath says the whole payload will be passed along. The ResultPath places the task output under reading. The ResultSelector places the response body under response. If I now want to get the body of the reading, I use the following JSONPath query: $.reading.response.body

This is the payload that gets passed to the next state.

Sending SNS notifications with SDK integrations

With the reading payload and an SNS topic created with subscriptions configured, I can send the reading to the topic using an SDK integration.

  "SendReading": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn": "${Topic}",
"Subject.$": "States.Format('Daily Reading for {}', States.ArrayGetItem(States.StringSplit($.time, 'T'), 0))",
"Message.$": "$.reading.response.body"
},
"ResultSelector": {
"message_id.$": "$.MessageId",
"request_id.$": "$.SdkResponseMetadata.RequestId"
},
"ResultPath": "$.publish_result",
"OutputPath": "$",
"End": true
}

Here instead of using http:invoke, I use sns:publish. The parameters are configured based on the SNS Publish API parameters. I select the message id and request id and save that to the $.publish_result key.

However, I found that I wanted to customize the look of the email message and switched to using Simple Email Service (SES), as SES has the ability to create email templates using HTML.

Using the HTTP invoke state to get the users for email distribution

When using SNS, I just publish to an SNS topic without any awareness of the downstream subscribers. In other words, it automatically fans out. When using SES, I could also create a distribution list and send to a single email address. Instead I chose to email users individually so that I could see individual delivery statistics and so that they could easily subscribe or unsubscribe.

Before I can start sending out emails, I need to get a list of users (and their email addresses) for that email distribution. The workflow now looks something like this:

Draft workflow when switching to emailing individual users

In the next state, I get the users by the plan, which is the equivalent of making the following request: curl -XGET https://api.example.com/user?plan_id=779b3465-c39b-4aa1-9225-9ed435888be5

Building the API endpoint, setting up the state to invoke the API endpoint, and processing the response is the same as I covered for getting the reading of the day in the prior section. One key difference in this workflow is that the SES SendEmail API is per email rather than to an SNS topic that may have many subscribed email addresses. So I need to add a for loop in my workflow.

Using the Map state to loop through array elements

Enter the Map state! Given the scale of this distribution, inline mode for the Map state is sufficient. The Map state takes an array as input and processes each item of the array individually.

Updated workflow for emailing individual users

Thus I need to get an array of email addresses that I can use in conjunction with the SES SendTemplatedEmail SDK.

Constructing an array from sub-elements in a list of objects
When the HTTP invoke for getting the users by plan completes, the result of the task (elements removed for brevity) is as follows:

{
"Headers": { ... },
"ResponseBody": [
{
"category": "user",
"uid": "0b22a47f-0123-424f-aa00-f3b7348a8140",
"description": "User A",
"email": "user.a@example.com",
"is_subscribed": true,
"group_ids": [
"8fe42e6c-ea78-4c61-b922-2e5776a87b6b"
],
"plan_ids": [
"779b3465-c39b-4aa1-9225-9ed435888be5"
]
},
{
"category": "user",
"uid": "0e868c2e-d90c-4d3a-a7c9-29308d8f3d31",
"description": "User B",
"email": "user.b@example.com",
"is_subscribed": true,
"group_ids": [
"84db3228-0174-45bf-8b67-2e9c62b5ecf7"
],
"plan_ids": [
"779b3465-c39b-4aa1-9225-9ed435888be5"
]
},
{
"category": "user",
"uid": "4a880705-d8a0-46b6-ada3-a211d5f344c2",
"description": "User C",
"email": "user.c@example.com",
"is_subscribed": true,
"group_ids": [
"8fe42e6c-ea78-4c61-b922-2e5776a87b6b"
],
"plan_ids": [
"779b3465-c39b-4aa1-9225-9ed435888be5"
]
}
],
"StatusCode": 200,
"StatusText": "OK"
}

To get the output that I need, I used the following configuration:

  "ResultSelector": {
"response.$": "$.ResponseBody",
"emails.$": "$.ResponseBody[*].email"
},
"ResultPath": "$.users",
"OutputPath": "$"

With the reading of the day, I kept the entire response body. With the users, I construct an array of just email addresses but also add the entire response body, as I needed more information about the user than just the email address.

Regardless, for emails, I can extract the email element from each object in the array and create an array of just those email addresses. That’s using the following JSONPath: $.ResponseBody[*].email. The [*] syntax allows me to iterate through the array. I personally make extensive use of jq and this is the equivalent of doing the following: | jq '.[].email'

I put the result of the task at $.users so the payload now looks as follows:

{
"id": "d7de3b85-3c5b-b567-2676-eba080e718d7",
"time": "2024-01-19T06:00:00Z",
"detail": {
"source_email": "reading@mail.example.com",
"reading_endpoint": "https://api.example.com/reading",
"user_endpoint": "https://api.example.com/user",
"plan_id": "779b3465-c39b-4aa1-9225-9ed435888be5",
"send_type": "template"
},
"reading": {
"response": { ... }
},
"users": {
"emails": [
"user.a@example.com",
"user.b@example.com",
"user.c@example.com"
],
"response": [
{
"category": "user",
"uid": "0b22a47f-0123-424f-aa00-f3b7348a8140",
"description": "User A",
"email": "user.a@example.com",
"is_subscribed": true,
"group_ids": [
"8fe42e6c-ea78-4c61-b922-2e5776a87b6b"
],
"plan_ids": [
"779b3465-c39b-4aa1-9225-9ed435888be5"
]
},
...
]
}

Configuring the Map state to process items
The Map state has two main properties for configuring the input to each Map iteration. The ItemsPath property identifies where in the input payload the array is located. The ItemSelector property defines the schema that is passed to each task.

I use the following configuration, which is quite involved, but I cover each element in detail in the following section.

  "ItemsPath": "$.users.response",
"ItemSelector": {
"source_email.$": "$.detail.source_email",
"data": {
"user_id.$": "$$.Map.Item.Value.uid",
"group_ids.$": "$$.Map.Item.Value.group_ids",
"email.$": "States.Array($$.Map.Item.Value.email)"
},
"template": {
"date.$": "States.ArrayGetItem(States.StringSplit($.time, 'T'), 0)",
"reading.$": "$.reading.response.body",
"complete.$": "States.Format('{}?uid={}&user_id={}', $.detail.reading_endpoint, $.reading.response.uid, $$.Map.Item.Value.uid)",
"unsubscribe.$": "States.Format('{}?unsubscribe={}', $.detail.user_endpoint, $$.Map.Item.Value.uid)"
}
}

Setting up the iterator elements
The ItemsPath property points to $.users.response which is an array of objects that contain the user information: category, uid, description, email, group_ids, plan_ids. The ItemSelector then creates three elements: source_email, data, and template.

The source_email element is used for every loop iteration and statically specifies the “from” email address.

source_email element

The data element is also used for every loop iteration but contains information for each Map iteration. I access that information using $$.Map.Item.Value which is the object with the user information. I then add the key for the particular user information that I need, e.g. $$.Map.Item.Value.uid or $$.Map.Item.Value.email

data element

The template element is used as the TemplateData parameter for the ses:sendTemplatedEmail action. In the email template, I defined four data elements: date (date of the reading), reading (text of the reading), complete (link to the API endpoint that marks a reading complete), and unsubscribe (link to the API endpoint if a user wants to unsubscribe).

template element

For the date, I use intrinsics to split the string and select the first element of that array. For the reading, I get the body of the reading from $.reading.response.body (not depicted here but shown in an earlier example). For complete and unsubscribe, I construct the API endpoint and query string parameters using the endpoints and user uid.

At the end of all this, I have an item for loop iteration that looks as follows:

{
"data": {
"user_id": "0b22a47f-0123-424f-aa00-f3b7348a8140",
"group_ids": [
"8fe42e6c-ea78-4c61-b922-2e5776a87b6b"
],
"email": [
"user.a@example.com"
]
},
"template": {
"date": "2024-01-19",
"unsubscribe": "https://api.example.com/reading/user?unsubscribe=0b22a47f-0123-424f-aa00-f3b7348a8140",
"reading": "...",
"complete": "https://api.example.com/reading/reading?uid=135ac1a6-a394-4557-b5e9-c709eb508b06&user_id=0b22a47f-0123-424f-aa00-f3b7348a8140"
},
"source_email": "reading@mail.example.com"
}

Sending emails with SDK integrations

With all the required data defined in the InputSelector, I can now configure the parameters for sending a templated email with SES.

  "Parameters": {
"Source.$": "$.source_email",
"Destination": {
"ToAddresses.$": "$.data.email"
},
"Template": "${EmailTemplate}",
"TemplateData.$": "States.JsonToString($.template)"
}

The destination addresses parameters expects an array of email addresses. However, I am only sending to a single email address. To comply with the requirements of the API, I wrapped the single email address in an array using an intrinsic function: States.Array($$.Map.Item.Value.email).

The template is an ARN that is injected via DefinitionSubstitutions in the SAM template.

The template data is the $.template element that I constructed with the ItemSelector and now convert to a JSON string using an intrinsic function.

Cleaning out iterator outputs
At the end of a loop iteration, after an email has been sent, I have the following iteration output:

{
"user_id": "0b22a47f-0123-424f-aa00-f3b7348a8140",
"group_ids": [
"8fe42e6c-ea78-4c61-b922-2e5776a87b6b"
],
"email": [
"user.a@example.com"
],
"publish_result": {
"message_id": "0100018d5a07831f-0173840f-ab44-49ac-8126-58f11d2ed2ca-000000"
}
}

Using a Pass state with Parameters, I collapse the outputs to a flat JSON object.

  "Parameters": {
"user_id.$": "$.user_id",
"group_id.$": "$.group_ids[0]",
"email.$": "$.email[0]",
"message_id.$": "$.publish_result.message_id"
}

And get a final output that looks like this:

{
"user_id": "0b22a47f-0123-424f-aa00-f3b7348a8140",
"group_id": "8fe42e6c-ea78-4c61-b922-2e5776a87b6b",
"email": "user.a@example.com",
"message_id": "0100018d5a07831f-0173840f-ab44-49ac-8126-58f11d2ed2ca-000000"
}

Setting sent count statistics for the daily reading
After the Map state completes, I end up with an array of the final output with the user_id, group_id, email, and message_id. I take that array of outputs, and send that to an API endpoint to update the sent_count attribute of the reading with a list of each user that was sent the email and a count of how many was sent to each group id. I use this data for stats and achievement tracking later.

Wrapping up the workflow
If all went well with the workflow, I get the following visualization of the workflow execution.

Successful workflow execution

When I was building this workflow, I started with SNS notifications. I then switched to sending emails and refined that with sending templated emails. Now that I’ve landed on using templated emails, I could simplify the workflow and remove MapEmail and SendNotification as options in the workflow. As a result of removing those, I could also remove SendChoice. For now, I left them in to show how to set up those integrations.

Conclusion

I walked through the process of building a scheduled workflow, using public API endpoints for business logic and intrinsic functions for data transformation. I demonstrated how to use SDK integrations by constructing the appropriate input parameters. I gave concrete payload examples to help illustrate exactly how input parameters are passed in and how output results are passed along.

I hope this was of practical help as you build your own scheduled workflows using EventBridge Scheduler and Step Functions.

Stay tuned for the next post, where I plan to write about observability, troubleshooting production issues, and optimizing performance for better end user experience.

Resources

--

--

Heeki Park

Principal Solutions Architect @ AWS. Opinions are my own.