Building scheduled workflows using native integrations and intrinsic functions
In my last post, I discussed my approach to building a serverless API using single-table design with DynamoDB and elements of test-driven development. By the end of that post, I had a basic API with functional CRUD capabilities.
In this post, I discuss how I leveraged the serverless API as a building block to compose a scheduled workflow, using both EventBridge Scheduler and Step Functions workflows. I cover how I setup the cron schedule, and then dive deep on how I setup the workflow using the new HTTP invoke integration, SDK integrations, and intrinsic functions.
The code for this project has been published to this repository.
Scheduling workflows using a cron scheduler
I scheduled the execution of the workflow using a cron schedule with EventBridge Scheduler. I set my schedule to send every weekday at 6am: cron(0 6 ? * MON-FRI *)
The schema of the input payload to the Step Functions workflow contains a lot of custom elements, which are documented in following sections. Those elements need to be injected as part of the input payload from EventBridge. EventBridge Scheduler provides context attributes, like the execution id and execution time, that can be added to the Input payload definition.
With that definition, I end up with the following example input payload:
{
"id": "d7de3b85-3c5b-b567-2676-eba080e718d7",
"time": "2024-01-19T06:00:00Z",
"detail": {
"source_email": "reading@mail.example.com",
"reading_endpoint": "https://api.example.com/reading",
"user_endpoint": "https://api.example.com/user",
"plan_id": "779b3465-c39b-4aa1-9225-9ed435888be5",
"send_type": "template"
}
}
If you instead use EventBridge rules, you could do similar transformation using the InputTransformer
.
Using native integrations in workflows
Within the workflow, I used either the new HTTP invoke integration or SDK integrations, along with intrinsic functions for data transformation.
Using the HTTP invoke state to get the reading of the day
The first step entails getting the reading for the day. I had an API to get the details of a reading by the uid
, but not by the date
.
Adding API functionality into the GET method
I built that functionality into the GET
method with a date
query string parameter. Now to get the reading for the day, I can use the following: curl -XGET https://api.example.com/reading?date=2024-01–19
As I built more and more functionality into the GET
method with query string parameters, for easier readability, I created an enum
that defined the set of actions for that GET
method. I then wrote a get_action()
method that parses the query string parameters to interpret the desired action and returns a value of the enum
. I then used match
and case
to call the appropriate method in the domain class.
Integrating the API endpoint into a workflow state
With this API endpoint, I then setup a state in the workflow to call that endpoint with the appropriate parameters.
"GetTodaysReading": {
"Type": "Task",
"Resource": "arn:aws:states:::http:invoke",
"Parameters": {
"ApiEndpoint.$": "$.detail.reading_endpoint",
"Method": "GET",
"Authentication": {
"ConnectionArn": "${Connection}"
},
"Headers": {
"content-type": "application/json"
},
"QueryParameters": {
"date.$": "States.ArrayGetItem(States.StringSplit($.time, 'T'), 0)"
}
}
Extracting values from the event payload
The state is of type Task
and uses the http:invoke
resource, which is the state for calling third-party endpoints. The ApiEndpoint
is the target endpoint, which I pass in via the event payload. To re-iterate from the scheduling section above, an example event payload to invoke the workflow is shown below.
{
"id": "d7de3b85-3c5b-b567-2676-eba080e718d7",
"time": "2024-01-19T06:00:00Z",
"detail": {
"source_email": "reading@mail.example.com",
"reading_endpoint": "https://api.example.com/reading",
"user_endpoint": "https://api.example.com/user",
"plan_id": "779b3465-c39b-4aa1-9225-9ed435888be5",
"send_type": "template"
}
}
The ApiEndpoint
value is retrieved using JSONPath from $.detail.reading_endpoint
, which is https://api.example.com/reading
, and Method
is set to GET
. Translating the task to a curl command, so far I have the following:
curl -XGET \
https://api.example.com/reading
Configuring authentication for the API endpoint
The Authentication.ConnectionArn
parameter references the Amazon Resource Name (ARN) of an EventBridge connection. API connections are used to define the client authentication and authorization parameters for connecting to an API endpoint. I create the connection by setting AuthorizationType
as API_KEY
, setting Authorization
as the header key, and using a secret in Secrets Manager as the header value. I set the value of the secret via configuration parameter to the SAM template. Now I have the following:
curl -XGET \
-H "Authorization: {{secret}}" \
https://api.example.com/reading
Setting additional headers for the API endpoint
The Headers
parameter sets any additional headers that are required. I pass along the content-type: application/json
header, which is important for API Gateway when performing request validation against your API models. Now I have the following:
curl -XGET \
-H "content-type: application/json" \
-H "Authorization: {{secret}}" \
https://api.example.com/reading
Adding query string parameters
The QueryParameters
parameter sets any query string parameters that are required. In this case, I set the date
query string parameter. To do that, I use intrinsic functions to extract the date from the event payload. The $.time
value has the ISO formatted datetime string: 2024-01-19T06:00:00Z
. First I use the States.StringSplit(string, delimiter)
function to split the datetime string by the T
value. Then I use the States.ArrayGetItem(array, index)
function to get the first element from the resultant split string array. And finally, I have the following:
curl -XGET \
-H "content-type: application/json" \
-H "Authorization: {{secret}}" \
https://api.example.com/reading?date=2024-01-19
Putting the state definition and runtime parameters together
If I take the state and inject the values as they would be at runtime, then I have something like this:
"GetTodaysReading": {
"Type": "Task",
"Resource": "arn:aws:states:::http:invoke",
"Parameters": {
"ApiEndpoint.$": "https://api.example.com/reading",
"Method": "GET",
"Authentication": {
"ConnectionArn": "arn:aws:events:us-east-1:546275881527:connection/Connection-OECcz2FSwCDV/ac31901d-0dec-40b6-a474-f5b1e69213d6"
},
"Headers": {
"content-type": "application/json"
},
"QueryParameters": {
"date.$": "2024-01-19"
}
}
Processing the response and passing data to the next state
When the HTTP invoke completes, the result of the task (elements removed for brevity) is as follows:
{
"Headers": { ... },
"ResponseBody": {
"category": "reading",
"uid": "135ac1a6-a394-4557-b5e9-c709eb508b06",
"description": "...",
"body": "...",
"plan_id": "779b3465-c39b-4aa1-9225-9ed435888be5",
"sent_date": "2024-01-19T00:00:00.000000"
},
"StatusCode": 200,
"StatusText": "OK"
}
Now I need to setup the payload that will be passed to the next state. I want to append the reading data to the input payload, as elements of the input payload are still useful as input for downstream states. To do this, I used the following configuration:
"ResultSelector": {
"response.$": "$.ResponseBody"
},
"ResultPath": "$.reading",
"OutputPath": "$"
The ResultSelector
property allows me to use JSONPath to select what data I want to keep from the task output. The ResultPath
property allows me to specify where to put that selected data in the output. The OutputPath
property allows me to specify what ultimately gets sent downstream. Using the properties above, I end up with the following payload that gets passed to the next state in the workflow:
{
"id": "d7de3b85-3c5b-b567-2676-eba080e718d7",
"time": "2024-01-19T06:00:00Z",
"detail": {
"source_email": "reading@mail.example.com",
"reading_endpoint": "https://api.example.com/reading",
"user_endpoint": "https://api.example.com/user",
"plan_id": "779b3465-c39b-4aa1-9225-9ed435888be5",
"send_type": "template"
},
"reading": {
"response": {
"category": "reading",
"uid": "135ac1a6-a394-4557-b5e9-c709eb508b06",
"description": "...",
"body": "...",
"plan_id": "779b3465-c39b-4aa1-9225-9ed435888be5",
"sent_date": "2024-01-19T00:00:00.000000"
}
}
}
The OutputPath
says the whole payload will be passed along. The ResultPath
places the task output under reading
. The ResultSelector
places the response body under response
. If I now want to get the body of the reading, I use the following JSONPath query: $.reading.response.body
This is the payload that gets passed to the next state.
Sending SNS notifications with SDK integrations
With the reading payload and an SNS topic created with subscriptions configured, I can send the reading to the topic using an SDK integration.
"SendReading": {
"Type": "Task",
"Resource": "arn:aws:states:::sns:publish",
"Parameters": {
"TopicArn": "${Topic}",
"Subject.$": "States.Format('Daily Reading for {}', States.ArrayGetItem(States.StringSplit($.time, 'T'), 0))",
"Message.$": "$.reading.response.body"
},
"ResultSelector": {
"message_id.$": "$.MessageId",
"request_id.$": "$.SdkResponseMetadata.RequestId"
},
"ResultPath": "$.publish_result",
"OutputPath": "$",
"End": true
}
Here instead of using http:invoke
, I use sns:publish
. The parameters are configured based on the SNS Publish API parameters. I select the message id and request id and save that to the $.publish_result
key.
However, I found that I wanted to customize the look of the email message and switched to using Simple Email Service (SES), as SES has the ability to create email templates using HTML.
Using the HTTP invoke state to get the users for email distribution
When using SNS, I just publish to an SNS topic without any awareness of the downstream subscribers. In other words, it automatically fans out. When using SES, I could also create a distribution list and send to a single email address. Instead I chose to email users individually so that I could see individual delivery statistics and so that they could easily subscribe or unsubscribe.
Before I can start sending out emails, I need to get a list of users (and their email addresses) for that email distribution. The workflow now looks something like this:
In the next state, I get the users by the plan, which is the equivalent of making the following request: curl -XGET https://api.example.com/user?plan_id=779b3465-c39b-4aa1-9225-9ed435888be5
Building the API endpoint, setting up the state to invoke the API endpoint, and processing the response is the same as I covered for getting the reading of the day in the prior section. One key difference in this workflow is that the SES SendEmail API is per email rather than to an SNS topic that may have many subscribed email addresses. So I need to add a for
loop in my workflow.
Using the Map state to loop through array elements
Enter the Map state! Given the scale of this distribution, inline mode for the Map state is sufficient. The Map state takes an array as input and processes each item of the array individually.
Thus I need to get an array of email addresses that I can use in conjunction with the SES SendTemplatedEmail SDK.
Constructing an array from sub-elements in a list of objects
When the HTTP invoke for getting the users by plan completes, the result of the task (elements removed for brevity) is as follows:
{
"Headers": { ... },
"ResponseBody": [
{
"category": "user",
"uid": "0b22a47f-0123-424f-aa00-f3b7348a8140",
"description": "User A",
"email": "user.a@example.com",
"is_subscribed": true,
"group_ids": [
"8fe42e6c-ea78-4c61-b922-2e5776a87b6b"
],
"plan_ids": [
"779b3465-c39b-4aa1-9225-9ed435888be5"
]
},
{
"category": "user",
"uid": "0e868c2e-d90c-4d3a-a7c9-29308d8f3d31",
"description": "User B",
"email": "user.b@example.com",
"is_subscribed": true,
"group_ids": [
"84db3228-0174-45bf-8b67-2e9c62b5ecf7"
],
"plan_ids": [
"779b3465-c39b-4aa1-9225-9ed435888be5"
]
},
{
"category": "user",
"uid": "4a880705-d8a0-46b6-ada3-a211d5f344c2",
"description": "User C",
"email": "user.c@example.com",
"is_subscribed": true,
"group_ids": [
"8fe42e6c-ea78-4c61-b922-2e5776a87b6b"
],
"plan_ids": [
"779b3465-c39b-4aa1-9225-9ed435888be5"
]
}
],
"StatusCode": 200,
"StatusText": "OK"
}
To get the output that I need, I used the following configuration:
"ResultSelector": {
"response.$": "$.ResponseBody",
"emails.$": "$.ResponseBody[*].email"
},
"ResultPath": "$.users",
"OutputPath": "$"
With the reading of the day, I kept the entire response body. With the users, I construct an array of just email addresses but also add the entire response body, as I needed more information about the user than just the email address.
Regardless, for emails, I can extract the email element from each object in the array and create an array of just those email addresses. That’s using the following JSONPath: $.ResponseBody[*].email
. The [*]
syntax allows me to iterate through the array. I personally make extensive use of jq
and this is the equivalent of doing the following: | jq '.[].email'
I put the result of the task at $.users
so the payload now looks as follows:
{
"id": "d7de3b85-3c5b-b567-2676-eba080e718d7",
"time": "2024-01-19T06:00:00Z",
"detail": {
"source_email": "reading@mail.example.com",
"reading_endpoint": "https://api.example.com/reading",
"user_endpoint": "https://api.example.com/user",
"plan_id": "779b3465-c39b-4aa1-9225-9ed435888be5",
"send_type": "template"
},
"reading": {
"response": { ... }
},
"users": {
"emails": [
"user.a@example.com",
"user.b@example.com",
"user.c@example.com"
],
"response": [
{
"category": "user",
"uid": "0b22a47f-0123-424f-aa00-f3b7348a8140",
"description": "User A",
"email": "user.a@example.com",
"is_subscribed": true,
"group_ids": [
"8fe42e6c-ea78-4c61-b922-2e5776a87b6b"
],
"plan_ids": [
"779b3465-c39b-4aa1-9225-9ed435888be5"
]
},
...
]
}
Configuring the Map state to process items
The Map state has two main properties for configuring the input to each Map iteration. The ItemsPath
property identifies where in the input payload the array is located. The ItemSelector
property defines the schema that is passed to each task.
I use the following configuration, which is quite involved, but I cover each element in detail in the following section.
"ItemsPath": "$.users.response",
"ItemSelector": {
"source_email.$": "$.detail.source_email",
"data": {
"user_id.$": "$$.Map.Item.Value.uid",
"group_ids.$": "$$.Map.Item.Value.group_ids",
"email.$": "States.Array($$.Map.Item.Value.email)"
},
"template": {
"date.$": "States.ArrayGetItem(States.StringSplit($.time, 'T'), 0)",
"reading.$": "$.reading.response.body",
"complete.$": "States.Format('{}?uid={}&user_id={}', $.detail.reading_endpoint, $.reading.response.uid, $$.Map.Item.Value.uid)",
"unsubscribe.$": "States.Format('{}?unsubscribe={}', $.detail.user_endpoint, $$.Map.Item.Value.uid)"
}
}
Setting up the iterator elements
The ItemsPath
property points to $.users.response
which is an array of objects that contain the user information: category
, uid
, description
, email
, group_ids
, plan_ids
. The ItemSelector
then creates three elements: source_email
, data
, and template
.
The source_email
element is used for every loop iteration and statically specifies the “from” email address.
The data
element is also used for every loop iteration but contains information for each Map
iteration. I access that information using $$.Map.Item.Value
which is the object with the user information. I then add the key for the particular user information that I need, e.g. $$.Map.Item.Value.uid
or $$.Map.Item.Value.email
The template
element is used as the TemplateData
parameter for the ses:sendTemplatedEmail
action. In the email template, I defined four data elements: date
(date of the reading), reading
(text of the reading), complete
(link to the API endpoint that marks a reading complete), and unsubscribe
(link to the API endpoint if a user wants to unsubscribe).
For the date
, I use intrinsics to split the string and select the first element of that array. For the reading
, I get the body of the reading from $.reading.response.body
(not depicted here but shown in an earlier example). For complete
and unsubscribe
, I construct the API endpoint and query string parameters using the endpoints and user uid
.
At the end of all this, I have an item for loop iteration that looks as follows:
{
"data": {
"user_id": "0b22a47f-0123-424f-aa00-f3b7348a8140",
"group_ids": [
"8fe42e6c-ea78-4c61-b922-2e5776a87b6b"
],
"email": [
"user.a@example.com"
]
},
"template": {
"date": "2024-01-19",
"unsubscribe": "https://api.example.com/reading/user?unsubscribe=0b22a47f-0123-424f-aa00-f3b7348a8140",
"reading": "...",
"complete": "https://api.example.com/reading/reading?uid=135ac1a6-a394-4557-b5e9-c709eb508b06&user_id=0b22a47f-0123-424f-aa00-f3b7348a8140"
},
"source_email": "reading@mail.example.com"
}
Sending emails with SDK integrations
With all the required data defined in the InputSelector
, I can now configure the parameters for sending a templated email with SES.
"Parameters": {
"Source.$": "$.source_email",
"Destination": {
"ToAddresses.$": "$.data.email"
},
"Template": "${EmailTemplate}",
"TemplateData.$": "States.JsonToString($.template)"
}
The destination addresses parameters expects an array of email addresses. However, I am only sending to a single email address. To comply with the requirements of the API, I wrapped the single email address in an array using an intrinsic function: States.Array($$.Map.Item.Value.email)
.
The template is an ARN that is injected via DefinitionSubstitutions
in the SAM template.
The template data is the $.template
element that I constructed with the ItemSelector
and now convert to a JSON string using an intrinsic function.
Cleaning out iterator outputs
At the end of a loop iteration, after an email has been sent, I have the following iteration output:
{
"user_id": "0b22a47f-0123-424f-aa00-f3b7348a8140",
"group_ids": [
"8fe42e6c-ea78-4c61-b922-2e5776a87b6b"
],
"email": [
"user.a@example.com"
],
"publish_result": {
"message_id": "0100018d5a07831f-0173840f-ab44-49ac-8126-58f11d2ed2ca-000000"
}
}
Using a Pass
state with Parameters
, I collapse the outputs to a flat JSON object.
"Parameters": {
"user_id.$": "$.user_id",
"group_id.$": "$.group_ids[0]",
"email.$": "$.email[0]",
"message_id.$": "$.publish_result.message_id"
}
And get a final output that looks like this:
{
"user_id": "0b22a47f-0123-424f-aa00-f3b7348a8140",
"group_id": "8fe42e6c-ea78-4c61-b922-2e5776a87b6b",
"email": "user.a@example.com",
"message_id": "0100018d5a07831f-0173840f-ab44-49ac-8126-58f11d2ed2ca-000000"
}
Setting sent count statistics for the daily reading
After the Map state completes, I end up with an array of the final output with the user_id
, group_id
, email
, and message_id
. I take that array of outputs, and send that to an API endpoint to update the sent_count
attribute of the reading with a list of each user that was sent the email and a count of how many was sent to each group id. I use this data for stats and achievement tracking later.
Wrapping up the workflow
If all went well with the workflow, I get the following visualization of the workflow execution.
When I was building this workflow, I started with SNS notifications. I then switched to sending emails and refined that with sending templated emails. Now that I’ve landed on using templated emails, I could simplify the workflow and remove MapEmail
and SendNotification
as options in the workflow. As a result of removing those, I could also remove SendChoice
. For now, I left them in to show how to set up those integrations.
Conclusion
I walked through the process of building a scheduled workflow, using public API endpoints for business logic and intrinsic functions for data transformation. I demonstrated how to use SDK integrations by constructing the appropriate input parameters. I gave concrete payload examples to help illustrate exactly how input parameters are passed in and how output results are passed along.
I hope this was of practical help as you build your own scheduled workflows using EventBridge Scheduler and Step Functions.
Stay tuned for the next post, where I plan to write about observability, troubleshooting production issues, and optimizing performance for better end user experience.
Resources
- https://docs.aws.amazon.com/scheduler/latest/UserGuide/managing-schedule-context-attributes.html
- https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-events-rule-inputtransformer.html
- https://docs.aws.amazon.com/ses/latest/APIReference/API_SendTemplatedEmail.html
- https://stackoverflow.com/questions/37528301/email-address-is-not-verified-aws-ses
- https://aws.amazon.com/blogs/compute/using-jsonpath-effectively-in-aws-step-functions/