Back-End Development

Kafka Consumer Error Handling, Retry, and Recovery

This blog post is about Kafka’s consumer resiliency when we are working with apache Kafka and spring boot.

As a scenario, let’s assume a Kafka consumer, polling the events from a PackageEvents topic.

service class (Package service) is responsible for storing the consumed events into a database.

Note: Here in the place of the database, it can be an API or third-party application call.

 Image for post

 I would like to cover how to handle the exceptions at the service level, where an exception can be in service as validation or while persisting into a database or it can be also when you are making a call to an API.

Kafka Consumer:

To create a consumer listening to a certain topic, we use @KafkaListener(topics = {“packages-received”}) on a method in the spring boot application.

Here “packages-received” is the topic to poll messages from.

@KafkaListener(topics = {"packages-received"})
public void packagesListener(ConsumerRecord<String,PackageInfoEvent> packageInfoEvent){

    log.info("Received event to persist packageInfoEvent :{}", packageInfoEvent.value());

}

In general, Kafka Listener gets all the properties like groupId, key, and value serializer information specified in the property files is by “kafkaListenerFactory” bean.

In simple words “kafkaListenerFactory” bean is key for configuring the Kafka Listener.

If we need to configure the Kafka listener configuration overwriting the default behavior you need to create your “kafkaListenerFactory” bean and set your desired configurations.

This is what we are going to leverage to set up the Error handling, retry, and recovery for the Kafka Listener/consumer.

@Bean
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
        ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory) {
    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
    configurer.configure(factory, kafkaConsumerFactory);
    
    return factory;
}

Kafka Retry:

In general, Runtime exceptions caused in the service layer, these are the exceptions caused by the service(DB, API) you are trying to access is down or have some issue.

These Exceptions are those which can be succeeded when they are tried later.

The following code snippet shows how to configure a retry with RetryTemplate.

In return, RetryTemplate is set with Retry policy which specifies the maximum attempts you want to retry and what are the exceptions you want to retry and what are not to be retried.

public class ConsumerConfig {

    @Bean
    ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);

        factory.setRetryTemplate(retryTemplate());

        return factory;
    }

    private RetryTemplate retryTemplate() {

        RetryTemplate retryTemplate = new RetryTemplate();

      /* here retry policy is used to set the number of attempts to retry and what exceptions you wanted to try and what you don't want to retry.*/
         retryTemplate.setRetryPolicy(getSimpleRetryPolicy());

        return retryTemplate;
    }

    private SimpleRetryPolicy getSimpleRetryPolicy() {
        Map<Class<? extends Throwable>, Boolean> exceptionMap = new HashMap<>();
     
        // the boolean value in the map determines whether exception should be retried
        exceptionMap.put(IllegalArgumentException.class, false);
        exceptionMap.put(TimeoutException.class, true);

        return new SimpleRetryPolicy(3,exceptionMap,true);
    }

}

when the event is failed, even after retrying certain exceptions for the max number of retries, the recovery phase kicks in.

The Retry and Recover go hand in hand,

if the number of retries is exhausted, the recovery will test if the event exception is recoverable and take necessary recovery steps like putting it back to retry topic or saving it to DB to try for later.

In case the event exception is not recoverable it simply passes it on to the Error handler. We will talk about error handling in a minute here.

Kafka Recovery :

There is a handly method setRecoveryCallBack() on ConcurrentKafkaListenerContainerFactory where it accepts the Retry context parameter,

here we get context (after max retries attempted), it has information about the event.

Recovery implementation:

@Configuration
@Slf4j
public class ConsumerConfig {

    @Bean
    ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);

        factory.setRetryTemplate(retryTemplate());

        factory.setRecoveryCallback((context -> {

            if(context.getLastThrowable().getCause() instanceof RecoverableDataAccessException){

                //here you can do your recovery mechanism where you can put back on to the topic using a Kafka producer

            } else{

                // here you can log things and throw some custom exception that Error handler will take care of.
                throw new RuntimeException(context.getLastThrowable().getMessage());
            }
            
            return null;

        }));

        return factory;
    }

    private RetryTemplate retryTemplate() {

        RetryTemplate retryTemplate = new RetryTemplate();

      /* here retry policy is used to set the number of attempts to 
        retry and what exceptions you wanted to try and 
         what you don't want to retry.*/

        retryTemplate.setRetryPolicy(getSimpleRetryPolicy());

        return retryTemplate;
    }

    private SimpleRetryPolicy getSimpleRetryPolicy() {
        Map<Class<? extends Throwable>, Boolean> exceptionMap = new HashMap<>();
        exceptionMap.put(IllegalArgumentException.class, false);
        exceptionMap.put(TimeoutException.class, true);

        return new SimpleRetryPolicy(3,exceptionMap,true);
    }

}

Kafka Error handling:

For any exception in the process of the consumed event, an error is logged by Kafka “LoggingErrorHandler.class” in org.springframework.kafka.listener package,

LoggingErrorHandler implements “ErrorHandler” interface.

we can implement our own Error Handler by implementing the “ErrorHandler” interface.

 

@Configuration
@Slf4j
public class ConsumerConfig {
    @Bean
    ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        factory.setErrorHandler(((exception, data) -> {

         /* here you can do you custom handling, I am just logging it same as default Error handler does
        If you just want to log. you need not configure the error handler here. The default handler does it for you.
         Generally, you will persist the failed records to DB for tracking the failed records.  */

         log.error("Error in process with Exception {} and the record is {}", exception, data);
        }));
        return factory;
    }
}

Code Snippet all strategies working together

package com.pack.events.consumer.config;


import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.boot.autoconfigure.kafka.ConcurrentKafkaListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.dao.RecoverableDataAccessException;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeoutException;

@Configuration
@Slf4j
public class ConsumerConfig {

    @Bean
    ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory) {
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);

        factory.setRetryTemplate(retryTemplate());

        factory.setRecoveryCallback((context -> {

            if(context.getLastThrowable().getCause() instanceof RecoverableDataAccessException){

                //here you can do your recovery mechanism where you can put back on to the topic using a Kafka producer

            } else{

                // here you can log things and throw some custom exception that Error handler will take care of ..
                throw new RuntimeException(context.getLastThrowable().getMessage());
            }

            return null;

        }));

        factory.setErrorHandler(((exception, data) -> {

           /* here you can do you custom handling, I am just logging it same as default Error handler does
          If you just want to log. you need not configure the error handler here. The default handler does it for you.
          Generally, you will persist the failed records to DB for tracking the failed records.  */

            log.error("Error in process with Exception {} and the record is {}", exception, data);
        }));

        return factory;
    }

    private RetryTemplate retryTemplate() {

        RetryTemplate retryTemplate = new RetryTemplate();

       /* here retry policy is used to set the number of attempts to retry and what exceptions you wanted to try and what you don't want to retry.*/

        retryTemplate.setRetryPolicy(getSimpleRetryPolicy());

        return retryTemplate;
    }

    private SimpleRetryPolicy getSimpleRetryPolicy() {
        Map<Class<? extends Throwable>, Boolean> exceptionMap = new HashMap<>();
        exceptionMap.put(IllegalArgumentException.class, false);
        exceptionMap.put(TimeoutException.class, true);

        return new SimpleRetryPolicy(3,exceptionMap,true);
    }

}

Thank you.

About the Author

Technical lead consultant | Tech Enthusiast | Constant Learner

More from this Author

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.

Subscribe to the Weekly Blog Digest:

Sign Up