Concurrency in Spring's StreamListener and Kafka

Another too fast, too furious post. I have spent a few hours trying to make my event processor multi-threaded, and it’s so damn easy that I don’t want anyone to spend more than a few minutes in this. I couldn’t find it anywhere on Internet, so I share it here.

We are using the Spring Cloud Stream layer to configure our Kafka consumers.

For example, a configuration for a processor named ‘reservations-input’ connected to a Kafka topic ‘reservations-topic’ would be similar to this:

      content-type: application/json
      destination: reservations-topic
      group: consumer-service-group

And your class to start processing those events:

public class MessagingConfiguration {
    public interface ReservationTopic {

        String INPUT = "reservations-channel";

        SubscribableChannel input();

public class ReservationProcessor {
    public void handle(@Nonnull Message<ReservationEvent> reservationMessage) {
        // your stuff

Easy peasy. Only problem here is concurrency.

If you have used Kafka before, you would know that the number of partitions in your topic limits the concurrency. Each partition have 1 single consumer.

I don’t know whether (and where) I read that, but I assumed that my application would generate as many threads/consumers as partitions my topic has. But I was wrong. By default, Spring’s only generates 1-threaded processor.

Solutions? Get more instances of your application or configure ConcurrentKafkaListenerContainerFactory to be able to throw more threads (see https://docs.spring.io/spring-kafka/docs/2.3.x/reference/html/#container-factory).

Option 1: create your own instance of ConcurrentKafkaListenerContainerFactory.

The only hint I found in the documentation or stackoverflow was to instance a bean of type ConcurrentKafkaListenerContainerFactory.

    public ConcurrentKafkaListenerContainerFactory<String, Object> kafkaListenerContainerFactory(
        @Nonnull ConsumerFactory<String, Object> consumerFactory) {

        ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();

        return factory;

I am not very prone to creating my own beans to configure stuff that seems too obvious. It is easy to overwrite some Spring default values that I am already using, it is more code to maintain…

There has to be a way through configuration.

Option 2: use configuration

Getting back to configuration, what we write under spring.cloud.stream.bindings.channel-name.consumer ends in the configuration of Kafka. Therefore, I tried to configure the property concurrency. That is:

      content-type: application/json
      consumer.concurrency: 3
      destination: reservations-topic
      group: consumer-service-group

Starting our application, we see that we have 3 binders.

    December 17th 2019, 14:22:57.274	2019-12-17 13:22:57.274  INFO [consumer-service,,,] 1 --- [container-1-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1  : partitions assigned: [reservations-topic-1]
	December 17th 2019, 14:22:57.259	2019-12-17 13:22:57.259  INFO [consumer-service,,,] 1 --- [container-2-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1  : partitions assigned: [reservations-topic-2]
	December 17th 2019, 14:22:57.256	2019-12-17 13:22:57.256  INFO [consumer-service,,,] 1 --- [container-3-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$1  : partitions assigned: [reservations-topic-3]