We will discuss how to re-execute failed AWS Step Functions in bulk. For a small number of failures, re-execution can be done manually via the AWS Console. However, when dealing with a large volume—hundreds or thousands of failures—we need an automated, programmatic approach to handle the re-execution efficiently.
Challenge
The AWS Console does not offer a way to retrieve Execution ARNs of failed Step Function executions in bulk. Instead, we have to manually inspect each execution to obtain its ARN. To efficiently handle a large number of failures, we need to leverage the AWS API to retrieve this information programmatically. I'll now walk you through, step by step, how to address this challenge using Java.
Steps
1. Let's start by retrieving the Execution ARNs of failed executions. The AWS API provides several execution statuses, including ABORTED, FAILED, PENDING_REDRIVE, RUNNING, SUCCEEDED, and TIMED_OUT. We can use the relevant failure-related status to filter and identify failed executions.
- STEP_MACHINE_ARN_RECON: is the ARN of your step function
- First it iterates through all executions and retrieve failures using the filter "ExecutionStatus.FAILED"
- Then it adds executionARN and executionStartDate to a List
- This list is being written to a CSV file
private void detectFailures(){
List failures = new ArrayList<>();
String nextToken = null;
do {
ListExecutionsRequest listExecutionsRequest = new ListExecutionsRequest();
listExecutionsRequest.setStateMachineArn(STEP_MACHINE_ARN_RECON);
listExecutionsRequest.setStatusFilter(ExecutionStatus.FAILED);
if(nextToken != null) {
listExecutionsRequest.setNextToken(nextToken);
}
ListExecutionsResult listExecutionsResult = stepClient.listExecutions(listExecutionsRequest);
for (ExecutionListItem executionListItem : listExecutionsResult.getExecutions()) {
String executionArn = executionListItem.getExecutionArn();
String[] a = new String[] { executionArn, executionListItem.getStartDate().toString() };
failures.add(a);
}
logger.info("failures.size(): " + failures.size());
nextToken = listExecutionsResult.getNextToken();
Thread.sleep(1000);
} while (nextToken != null);
File f = createFile(failures);
writeToS3(f);
}
private File createFile(List list) {
File inputFile = new File(ROOT_DIR + "timedouts.csv");
try (CSVWriter writer = new CSVWriter(new FileWriter(inputFile), CSVWriter.DEFAULT_SEPARATOR,
CSVWriter.NO_QUOTE_CHARACTER, CSVWriter.DEFAULT_ESCAPE_CHARACTER, CSVWriter.DEFAULT_LINE_END)) {
inputFile.createNewFile();
writer.writeAll(list);
writer.flush();
logger.info("Path: " + inputFile.toPath().toAbsolutePath().toString());
} catch (Exception e) {
logger.info("Exception: " + e.getMessage());
}
return inputFile;
}
private String writeToS3(File file) {
try {
PutObjectRequest request = new PutObjectRequest(bucketName, key, file);
PutObjectResult result = s3Client.putObject(request);
logger.info("uploaded file to S3: " + result.getETag());
return result.getETag();
} catch (Exception e) {
logger.error("Exception in putJsonToS3(): Amazon S3 couldn't process");
throw e;
}
}
2. Now we have a CSV file containing the ExecutionARN and executionStartDate of all failed executions. If needed, you can further filter the data based on executionStartDate—for example, to isolate failures within a specific time range. Following is teh code snippet to go through this CSV file and re-execute failed executions.
public void processFailures() throws Exception {
int count = 0;
//CSV file is stored in S3 bucket. Provide bucketname and key to the file.
byte[] content = getObjectFromS3(bucketName, key);
String json = csvToJson(content);
ObjectMapper objectMapper = new ObjectMapper();
JsonNode rootNode = objectMapper.readTree(json);
for (JsonNode element : rootNode) {
count++;
String executionArn = element.get(0).asText();
logger.info("Count: " + count + " ,executionArn: " + executionArn);
retryExecution(executionArn);
Thread.sleep(Long.valueOf(sleep));
}
logger.info("Finished replay......... " + count);
}
private void retryExecution(String executionArn) {
DescribeExecutionRequest describeExecutionRequest = new DescribeExecutionRequest()
.withExecutionArn(executionArn);
DescribeExecutionResult describeExecutionResult = stepClient.describeExecution(describeExecutionRequest);
String input = describeExecutionResult.getInput();
logger.info("Input for execution " + executionArn + ": " + input);
StartExecutionRequest startExecutionRequest = new StartExecutionRequest();
startExecutionRequest.setStateMachineArn(STEP_MACHINE_ARN_RECON);
startExecutionRequest.setInput(input);
try {
StartExecutionResult result = stepClient.startExecution(startExecutionRequest);
logger.info(executionArn + " Successfully triggered Retry execution. Result: " + result.toString());
} catch (Exception e) {
logger.error("Failed to retry execution: " + e.getMessage());
}
}
private byte[] getObjectFromS3(String bucketName, String key) throws IOException {
byte[] s3Content = null;
if (s3Client.doesObjectExist(bucketName, key)) {
S3Object so = s3Client.getObject(bucketName, key);
s3Content = IOUtils.toByteArray(so.getObjectContent());
logger.info("Fetched S3 object: " + bucketName + " " + key);
return s3Content;
} else {
logger.info("S3 Object doesn't exist - " + bucketName + " " + key);
}
return s3Content;
}
public String csvToJson(byte[] csvFile) throws IOException, CsvException {
CSVReader reader = null;
try {
String csvFileString = new String(csvFile, StandardCharsets.UTF_8);
reader = new CSVReader(new StringReader(csvFileString));
List csvRows = reader.readAll();
ObjectMapper objectMapper = new ObjectMapper();
String json = objectMapper.writeValueAsString(csvRows);
logger.info("csv converted to json: " + json);
return json;
} finally {
if (reader != null) {
reader.close();
}
}
}
you will be needing following imports for this code.
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.StringReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.PutObjectResult;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.stepfunctions.AWSStepFunctions;
import com.amazonaws.services.stepfunctions.AWSStepFunctionsClientBuilder;
import com.amazonaws.services.stepfunctions.model.DescribeExecutionRequest;
import com.amazonaws.services.stepfunctions.model.DescribeExecutionResult;
import com.amazonaws.services.stepfunctions.model.ExecutionListItem;
import com.amazonaws.services.stepfunctions.model.ExecutionStatus;
import com.amazonaws.services.stepfunctions.model.ListExecutionsRequest;
import com.amazonaws.services.stepfunctions.model.ListExecutionsResult;
import com.amazonaws.services.stepfunctions.model.StartExecutionRequest;
import com.amazonaws.services.stepfunctions.model.StartExecutionResult;
import com.amazonaws.util.IOUtils;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.opencsv.CSVReader;
import com.opencsv.CSVWriter;
import com.opencsv.exceptions.CsvException;