spring,  kafka,  concurrency,  stream

Concurrency in Spring's StreamListener and Kafka

Concurrency in Spring's StreamListener and Kafka

Another too fast, too furious post. I have spent a few hours trying to make my event processor multi-threaded, and it’s so damn easy that I don’t want anyone to spend more than a few minutes in this. I couldn’t find it anywhere on Internet, so I share it here.

We are using the Spring Cloud Stream layer to configure our Kafka consumers.

For example, a configuration for a processor named ‘reservations-input’ connected to a Kafka topic ‘reservations-topic’ would be similar to this:

spring.cloud.stream:
  bindings:
    reservations-input:
      content-type: application/json
      destination: reservations-topic
      group: consumer-service-group

And your class to start processing those events:

@EnableBinding({MessagingConfiguration.ReservationTopic.class})
public class MessagingConfiguration {
    public interface ReservationTopic {

        String INPUT = "reservations-channel";

        @Input(INPUT)
        SubscribableChannel input();
    }
}

@Service
public class ReservationProcessor {
    @StreamListener(MessagingConfiguration.ReservationTopic.INPUT)
    public void handle(@Nonnull Message<ReservationEvent> reservationMessage) {
        // your stuff
    }

Easy peasy. Only problem here is concurrency.

If you have used Kafka before, you would know that the number of partitions in your topic limits the concurrency. Each partition have 1 single consumer.

I don’t know whether (and where) I read that, but I assumed that my application would generate as many threads/consumers as partitions my topic has. But I was wrong. By default, Spring’s only generates 1-threaded processor.

Solutions? Get more instances of your application or configure ConcurrentKafkaListenerContainerFactory to be able to throw more threads (see https://docs.spring.io/spring-kafka/docs/2.3.x/reference/html/#container-factory).

Option 1: create your own instance of ConcurrentKafkaListenerContainerFactory.

The only hint I found in the documentation or stackoverflow was to instance a bean of type ConcurrentKafkaListenerContainerFactory.

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory(
        @Nonnull ConsumerFactory<String, Object> consumerFactory) {

        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.setConcurrency(5);

        return factory;
    }

I am not very prone to creating my own beans to configure stuff that seems too obvious. It is easy to overwrite some Spring default values that I am already using, it is more code to maintain…

There has to be a way through configuration.

Option 2: use configuration

Getting back to configuration, what we write under spring.cloud.stream.bindings.channel-name.consumer ends in the configuration of Kafka. Therefore, I tried to configure the property concurrency. That is:

spring.cloud.stream:
  bindings:
    reservations-input:
      content-type: application/json
      consumer.concurrency: 3
      destination: reservations-topic
      group: consumer-service-group

Starting our application, we see that we have 3 binders.

    December 17th 2019, 14:22:57.274	2019-12-17 13:22:57.274  INFO [consumer-service,,,] 1 --- [container-1-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1  : partitions assigned: [reservations-topic-1]
	December 17th 2019, 14:22:57.259	2019-12-17 13:22:57.259  INFO [consumer-service,,,] 1 --- [container-2-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1  : partitions assigned: [reservations-topic-2]
	December 17th 2019, 14:22:57.256	2019-12-17 13:22:57.256  INFO [consumer-service,,,] 1 --- [container-3-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1  : partitions assigned: [reservations-topic-3]

Profit!