Wednesday, May 28, 2025

How to re-execute failed step functions programatically

Problem
We will discuss how to re-execute failed AWS Step Functions in bulk. For a small number of failures, re-execution can be done manually via the AWS Console. However, when dealing with a large volume—hundreds or thousands of failures—we need an automated, programmatic approach to handle the re-execution efficiently.

Challenge
The AWS Console does not offer a way to retrieve Execution ARNs of failed Step Function executions in bulk. Instead, we have to manually inspect each execution to obtain its ARN. To efficiently handle a large number of failures, we need to leverage the AWS API to retrieve this information programmatically. I'll now walk you through, step by step, how to address this challenge using Java.

Steps

1. Let's start by retrieving the Execution ARNs of failed executions. The AWS API provides several execution statuses, including ABORTED, FAILED, PENDING_REDRIVE, RUNNING, SUCCEEDED, and TIMED_OUT. We can use the relevant failure-related status to filter and identify failed executions.

  • STEP_MACHINE_ARN_RECON: is the ARN of your step function
  • First it iterates through all executions and retrieve failures using the filter "ExecutionStatus.FAILED"
  • Then it adds executionARN and executionStartDate to a List
  • This list is being written to a CSV file


	private void detectFailures(){
		
		List failures = new ArrayList<>();
		String nextToken = null;

		do {
			ListExecutionsRequest listExecutionsRequest = new ListExecutionsRequest();
			listExecutionsRequest.setStateMachineArn(STEP_MACHINE_ARN_RECON);
			listExecutionsRequest.setStatusFilter(ExecutionStatus.FAILED);
			if(nextToken != null) {
				listExecutionsRequest.setNextToken(nextToken);
			}


			ListExecutionsResult listExecutionsResult = stepClient.listExecutions(listExecutionsRequest);

			for (ExecutionListItem executionListItem : listExecutionsResult.getExecutions()) {

				String executionArn = executionListItem.getExecutionArn();
				String[] a = new String[] { executionArn, executionListItem.getStartDate().toString() };
				failures.add(a);
			}
			logger.info("failures.size(): " + failures.size());
			nextToken = listExecutionsResult.getNextToken();
			Thread.sleep(1000);

		} while (nextToken != null);

		File f = createFile(failures);
		writeToS3(f);
		
	}
	
	private File createFile(List list) {

		File inputFile = new File(ROOT_DIR + "timedouts.csv");

		try (CSVWriter writer = new CSVWriter(new FileWriter(inputFile), CSVWriter.DEFAULT_SEPARATOR,
				CSVWriter.NO_QUOTE_CHARACTER, CSVWriter.DEFAULT_ESCAPE_CHARACTER, CSVWriter.DEFAULT_LINE_END)) {

			inputFile.createNewFile();
			writer.writeAll(list);
			writer.flush();
			logger.info("Path: " + inputFile.toPath().toAbsolutePath().toString());

		} catch (Exception e) {
			logger.info("Exception: " + e.getMessage());

		}
		return inputFile;

	}

	private String writeToS3(File file) {

		try {

			PutObjectRequest request = new PutObjectRequest(bucketName, key, file);

			PutObjectResult result = s3Client.putObject(request);
			logger.info("uploaded file to S3: " + result.getETag());
			return result.getETag();

		} catch (Exception e) {
			logger.error("Exception in putJsonToS3(): Amazon S3 couldn't process");
			throw e;
		}

	}

2. Now we have a CSV file containing the ExecutionARN and executionStartDate of all failed executions. If needed, you can further filter the data based on executionStartDate—for example, to isolate failures within a specific time range. Following is teh code snippet to go through this CSV file and re-execute failed executions.

	public void processFailures() throws Exception {

		int count = 0;
		//CSV file is stored in S3 bucket. Provide bucketname and key to the file.
		byte[] content = getObjectFromS3(bucketName, key);

		String json = csvToJson(content);

		ObjectMapper objectMapper = new ObjectMapper();
		JsonNode rootNode = objectMapper.readTree(json);

		for (JsonNode element : rootNode) {
			count++;
			String executionArn = element.get(0).asText();
			logger.info("Count: " + count + " ,executionArn: " + executionArn);
			retryExecution(executionArn);
			Thread.sleep(Long.valueOf(sleep));
		}
		
		logger.info("Finished replay......... " + count);

	}

	private void retryExecution(String executionArn) {
		
        DescribeExecutionRequest describeExecutionRequest = new DescribeExecutionRequest()
                .withExecutionArn(executionArn);
        DescribeExecutionResult describeExecutionResult = stepClient.describeExecution(describeExecutionRequest);
        
        String input = describeExecutionResult.getInput();
        logger.info("Input for execution " + executionArn + ": " + input);

        
		StartExecutionRequest startExecutionRequest = new StartExecutionRequest();
		startExecutionRequest.setStateMachineArn(STEP_MACHINE_ARN_RECON);
		startExecutionRequest.setInput(input);

		try {
			StartExecutionResult result = stepClient.startExecution(startExecutionRequest);

			logger.info(executionArn + " Successfully triggered Retry execution. Result: " + result.toString());

		} catch (Exception e) {

			logger.error("Failed to retry execution: " + e.getMessage());
			
		}
	}

	private byte[] getObjectFromS3(String bucketName, String key) throws IOException {

		byte[] s3Content = null;
		if (s3Client.doesObjectExist(bucketName, key)) {
			S3Object so = s3Client.getObject(bucketName, key);
			s3Content = IOUtils.toByteArray(so.getObjectContent());
			logger.info("Fetched S3 object: " + bucketName + " " + key);
			return s3Content;
		} else {
			logger.info("S3 Object doesn't exist - " + bucketName + " " + key);
		}

		return s3Content;
	}

	public String csvToJson(byte[] csvFile) throws IOException, CsvException {

		CSVReader reader = null;
		try {
			String csvFileString = new String(csvFile, StandardCharsets.UTF_8);

			reader = new CSVReader(new StringReader(csvFileString));
			List csvRows = reader.readAll();

			ObjectMapper objectMapper = new ObjectMapper();
			String json = objectMapper.writeValueAsString(csvRows);

			logger.info("csv converted to json: " + json);

			return json;
		} finally {

			if (reader != null) {
				reader.close();
			}
		}

	}

you will be needing following imports for this code.


import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.StringReader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.PutObjectResult;
import com.amazonaws.services.s3.model.S3Object;
import com.amazonaws.services.stepfunctions.AWSStepFunctions;
import com.amazonaws.services.stepfunctions.AWSStepFunctionsClientBuilder;
import com.amazonaws.services.stepfunctions.model.DescribeExecutionRequest;
import com.amazonaws.services.stepfunctions.model.DescribeExecutionResult;
import com.amazonaws.services.stepfunctions.model.ExecutionListItem;
import com.amazonaws.services.stepfunctions.model.ExecutionStatus;
import com.amazonaws.services.stepfunctions.model.ListExecutionsRequest;
import com.amazonaws.services.stepfunctions.model.ListExecutionsResult;
import com.amazonaws.services.stepfunctions.model.StartExecutionRequest;
import com.amazonaws.services.stepfunctions.model.StartExecutionResult;
import com.amazonaws.util.IOUtils;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.opencsv.CSVReader;
import com.opencsv.CSVWriter;
import com.opencsv.exceptions.CsvException;

Wednesday, September 14, 2022

DynamoDB query from Global Secondary Index with AWS 2.X

With AWS 2.X enhanced DynamoDB client, they have redesigned ‘DynamoDB mapper’ in the Java v1 SDK. In latest version, it requires the annotation @DynamoDbBean in POJO that identifies the class as being a DynamoDb mappable entity. Let's look at how to query Global Secondary Index(GSI).

Solution
Example POJO using DynamoDbBean:
       

    import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbBean;
    import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbPartitionKey;
    import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbSecondaryPartitionKey;

    @DynamoDbBean
    public class Customer {

        private String identifier;
        private String name;
        private int age;


        @DynamoDbPartitionKey
        public String getIdentifier(){
            return this.identifier;
        }
        public void setIdentifier(String transactionID){
            this.identifier  = transactionID;
        }

        @DynamoDbSecondaryPartitionKey(indexNames = {"name-index"}) 
        public String getName(){
            return name;
        }
        public void setName(String name){
            this.name = name;
        }

        public int getAge() {
            return age;
        }
        public void setAge(int age) {
            this.age = age;
        }

    }

  

       
 
following is how you can query GSI using enhanced DynamoDB client.
       

	public void getCustomersByName(String name) {
		
		DynamoDbEnhancedClient enhancedClient = DynamoDbEnhancedClient.builder()
		        .dynamoDbClient(ddb)
		        .build();
		
		DynamoDbTable customerTable = enhancedClient.table("Customer", TableSchema.fromBean(Customer.class));
		
		
		DynamoDbIndex secIndex = customerTable.index("name-index");
        AttributeValue attVal = AttributeValue.builder().s(name).build();
        QueryConditional queryConditional = QueryConditional
                .keyEqualTo(Key.builder().partitionValue(attVal).build());

        Iterable> results =  secIndex.query(
                QueryEnhancedRequest.builder()
                        .queryConditional(queryConditional)
                        .build());
        
        results.forEach(page -> {
            List customers = page.items();
            for (Customer c: customers) {
                System.out.println(c.getName());
            }
        });
		
	}

       
       

 

Wednesday, July 27, 2022

Using dynamic table name when saving item to DynamDB Table

If you are using Java DynamoDBMapper API, it requires the annotation @DynamoDBTable on the POJO. This is a compile-time annotation and requires a table name parameter. How to change/pass this table name in runtime?

Solution
Looking a bit closer at the DynamoMapper API, I found a class called DynamoDBMapperConfig. This class has a builder method which allows us to specify a so called TableNameOverride that overrides the table name defined in the DynamoDBTable annotation in Java class. For example if you want to override the table name defined in the following class;
       

    @DynamoDBTable(tableName = "temp_name")
    public static class IDRecord {
        private String id;
        private String StartDate;
        private String EndDate;

        @DynamoDBHashKey(attributeName = "ID")
        public String geID() {
            return id;
        }

        public void setID(String id) {
            this.id = id;
        }

        @DynamoDBAttribute(attributeName = "StartDate")
        public String getStartDate() {
            return StartDate;
        }

        public void setStartDate(String StartDate) {
            this.StartDate = StartDate;
        }

        @DynamoDBAttribute(attributeName = "EndDate")
        public String getEndDate() {
            return EndDate;
        }

        public void setEndDate(String EndDate) {
            this.EndDate = EndDate;
        }

        @Override
        public String toString() {
            return "IDRecord [ID=" + id + ", StartDate=" + StartDate  + ", EndDate=" + EndDate+ "]";
        }

    }
  

       
 
when querying the table, do the following. This will ignore the table name defined by the @DynamoDBTable annotation in IDRecord class and instead use the tableName that we read from environment variable.
       

private static AmazonDynamoDB client = AmazonDynamoDBClientBuilder.standard().withRegion("ap-southeast-2").build();
	
private static DynamoDBMapper dynamoDBMapper = new DynamoDBMapper(client);
    
DynamoDBMapperConfig mapperConfig = DynamoDBMapperConfig.builder().withTableNameOverride(new TableNameOverride(tableName)).build();
                
dynamoDBMapper.save(iDRecord, mapperConfig);

       
       
Note: In previous version, it had DynamoDBMapperConfig(DynamoDBMapperConfig.TableNameOverride tableNameOverride) which is now Deprecated
 

Tuesday, August 24, 2021

Choosing a language for AWS Serverless

Choosing a language that best suits for AWS serverless application solely depends on your level of comfort and skills with each of the language/supported runtimes. However, language selection becomes a key consideration when application runtime is exepected to have great performance characteristics. We can compare performace of language runtimes by dividing them into 2 major categories;
  1. Compiled languages (Java and .NET)
  2. Has the largest Cold-Start overhead, therefore, container's first invocation is known to be slow but shows best performance for subsequent invocations.
  3. Interpreted languages (Node.js and Python)
  4. The interpreted languages have very fast initial invocation times compared to the compiled languages, but can’t reach the same level of maximum performance as the compiled languages.

What is Cold Start?
“Cold start” in the serverless world means that serverless application is started and initialized to handle the request. In here, “serverless application” term represents both application and container itself where user code runs. Initialization adds extra latency to the execution of the request since they need to be done before handling the request.

Fortunately, this initialization doesn’t occur at every request as almost all the serverless platforms are smart to reuse containers as much as possible. However depending on the serverless platform itself, existing containers can be destroyed and new ones can be created at any time due to many internal (resource scheduling/sharing, fixes/patches on the host environment, etc …) or external (new application deploy, configuration change, etc …) reasons.
In AWS Lambda platform, the following causes trigger new container starts which results in cold starts:
  • There is no container alive
  • There are containers alive but none of them are available as all of them are busy with handling other requests
  • This happens if your application use case has very spiky traffic.
  • New application was deployed so new containers must start with the newer version of the application
  • Configuration (env. variable, memory limit, etc …) are changed so new containers must start with new configurations

How to reduce cold start overhead?
  • Sending periodic warmup requests
  • AWS Lambda Custom Runtime support that bootstrapping the runtime
  • AWS can optimize the runtime bootstrapping phase to start faster (for ex. at Java runtime, by tweaking JVM arguments) This happens if your application use case has very spiky traffic.
  • Use latest AWS SDKs
  • For example, AWS Java SDK 2 has 500–750 ms less initialization overhead than AWS Java SDK1.

Saturday, August 21, 2021

How to use dynamic DynamoDB table name for query

In my current project, it requires to initialize the names of DynamoDB tables using environment variables during runtime. So far so good but the problem is that the API requires the annotation @DynamoDBTable on my POJO. This is a compile-time annotation requires a table name parameter and so would restrict me using the POJO in a dynamic manner.

Solution
Looking a bit closer at the DynamoMapper API, I found a class called DynamoDBMapperConfig. This class allows us to specify a so called TableNameOverride that overrides the table name defined in the DynamoDBTable annotation in Java class. You can supply a DynamoDBMapperConfig as a second parameter to several methods in the DynamoMapper API. For example if you want to override the table name defined in the following class;
       

    @DynamoDBTable(tableName = "temp_name")
    public static class IDRecord {
        private String id;
        private String StartDate;
        private String EndDate;

        @DynamoDBHashKey(attributeName = "ID")
        public String geID() {
            return id;
        }

        public void setID(String id) {
            this.id = id;
        }

        @DynamoDBAttribute(attributeName = "StartDate")
        public String getStartDate() {
            return StartDate;
        }

        public void setStartDate(String StartDate) {
            this.StartDate = StartDate;
        }

        @DynamoDBAttribute(attributeName = "EndDate")
        public String getEndDate() {
            return EndDate;
        }

        public void setEndDate(String EndDate) {
            this.EndDate = EndDate;
        }

        @Override
        public String toString() {
            return "IDRecord [ID=" + id + ", StartDate=" + StartDate  + ", EndDate=" + EndDate+ "]";
        }

    }
  

       
 
when querying the table, do the following. This will ignore the table name defined by the @DynamoDBTable annotation in IDRecord class and instead use the tableName that we read from environment variable.
       

private static DynamoDBMapperConfig dynamoConfig = new DynamoDBMapperConfig(new DynamoDBMapperConfig.TableNameOverride(System.getenv("ID_TABLE")));

QueryResultPage queryResult = mapper.queryPage(IDRecord.class, queryExpression, dynamoConfig);

       
 

Thursday, December 6, 2018

How to enable HTTP access logs in Red Hat JBoss EAP 7.x

Add the following highlighted  setting inside host name="default-host" under Undertow subsystem in profile you are using.
e.g. It would be in one of the followings depending on the profile you have configured in your application.

  1. standalone-ha.xml: Default profile with clustering capabilities
  2. standalone-full-ha.xml: Full profile with clustering capabilities
  3. standalone-full.xml: Support of Java EE Full-Profile and all server capabilities without clustering
  4. standalone.xml: Support of Java EE Web-Profile plus some extensions like RESTFul Web Services and support for EJB3 remote invocations
 <subsystem xmlns="urn:jboss:domain:undertow:4.0">  
      .....  
      <server name="default-server">  
           <http-listener name="default" socket-binding="http" record-request-start-time="true" redirect-socket="https" enable-http2="true"/>  
           <https-listener name="https" socket-binding="https" record-request-start-time="true" security-realm="ApplicationRealm" enable-http2="true"/>  
           <host name="default-host" alias="localhost">  
                <access-log pattern="%h %l %u %t &quot;%r&quot; %s %b &quot;%{i,Referer}&quot; &quot;%{i,User-Agent}&quot; &quot;%{i,COOKIE}&quot; &quot;%{o,SET-COOKIE}&quot; %S &quot;%I&quot; %T" directory="/accesslog" relative-to="jboss.server.log.dir"/>  
                <filter-ref name="server-header"/>  
                <filter-ref name="x-powered-by-header"/>  
                <http-invoker security-realm="ApplicationRealm"/>  
           </host>  
      </server>  
      .....  
 </subsystem>  


Also note that you need to set record-request-start-time attribute to true for the listener(http-listener, ajp-listener and https-listener which you are using) to log response time (%D in milliseconds or %T in seconds) in access logging

This will be logged to the JBoss log directory by default. You may specify a custom directory through the directory parameter. You may also add the relative-to="XXXXX" parameter to make that custom directory relative to another.

Also note that you need to set record-request-start-time attribute to true for the listener(http-listener, ajp-listener and https-listener which you are using) to log response time (%D in milliseconds or %T in seconds) in access logging

This will be logged to the JBoss log directory by default. You may specify a custom directory through the directory parameter. You may also add the relative-to parameter to make that custom directory relative to another.

Find below more details for access log pattern used above;

  • %a - Remote IP address
  • %A - Local IP address
  • %b - Bytes sent, excluding HTTP headers, or '-' if no bytes were sent
  • %B - Bytes sent, excluding HTTP headers
  • %h - Remote host name
  • %H - Request protocol
  • %l - Remote logical username from identd (always returns '-')
  • %m - Request method
  • %p - Local port
  • %q - Query string (excluding the '?' character)
  • %r - First line of the request
  • %s - HTTP status code of the response
  • %t - Date and time, in Common Log Format format
  • %u - Remote user that was authenticated
  • %U - Requested URL path
  • %v - Local server name
  • %D - Time taken to process the request, in millis
  • %T - Time taken to process the request, in seconds
  • %I - current Request thread name (can compare later with stacktraces)


In addition, the caller can specify one of the following aliases for commonly utilized patterns:

  • common - %h %l %u %t "%r" %s %b
  • combined - %h %l %u %t "%r" %s %b "%{i,Referer}" "%{i,User-Agent}"

There is also support to write information from the cookie, incoming header, or the session
It is modeled after the apache syntax:

  • %{i,xxx} for incoming headers
  • %{o,xxx} for outgoing response headers
  • %{c,xxx} for a specific cookie
  • %{r,xxx} xxx is an attribute in the ServletRequest
  • %{s,xxx} xxx is an attribute in the HttpSession




Thursday, December 14, 2017

Profiling Heap Stats of Java Applications - Java Flight Recorder(JFR)

Java Flight Recorder (JFR) is a tool for collecting diagnostic and profiling data about a running Java application. In this post, I am going to explain how create detailed profile including heap and class statistics of your java application. As you may already know, JFR will be executed via JCMD Utility which sends diagnostic command requests to the JVM.

jcmd.exe comes by default with JDK installation which you can find in $JAVA_HOME\bin directory. What JFR does is collecting low level and detailed run time information of java applications that runs on top of JVM. JFR comes with a default profiler which has generic recording templates that you can find in below directory of the JDK installation.

$JAVA_HOME\jre\lib\jfr







Using below in command prompt, you can collect general profiling information which used default setting.

jcmd.exe ProcessID JFR.start duration=600s filename=FileName.jfr

Above command will use "default.jfc" which is written to collect below run-time statistics.






















Now let's see how to conduct a detailed profiling by changing these settings. To do this, open the recording that first collected using default settings. This will open up "Oracle java Mission Control" UI. Now select "Window -> Flight Recording Template Manager" from the menu.


















Select "Default" and then click "Duplicate". Then select newly created template and click "Edit". You can then make changes highlighted in below image and click OK. This will add a new template "DetailedProfiling" which we can now use for detailed profiling.

Now open the command prompt and execute below command.

jcmd.exe ProcessID JFR.start settings=C:\Java\jdk1.7.0_80\jre\lib\jfr\DetailedProfiling.jfc duration=600s filename=FileName.jfr






This will now start the recording and once the profiling duration is completed, you can open it and see statistics related to Memory, Code, System, etc which will really be useful in finding memory leakages and performance improvements of java applications.