Hello everyone. I decided to share my experience of testing the logic of consumers and producers in the standard Spring Boot application. I approached this task several times with different options and found different pitfalls that stimulated me to look further – a better solution. And so, once again, after going through the accumulated experience of mankind (stackoverflow), I implemented another version, in which I have not yet found the disadvantages of previous implementations. In any case, I will share with you how I came to this and why other options did not work for my cases (or seemed worse than the latter).

Disclaimer. I am a supporter of integration tests when starting a project build with tests. That is, this is when the Spring context is raised for unit tests with all the pros (the environment as similar as possible to combat) and cons (it is launched for a LONG time and described for a LONG time).

Part 1. Is that a problem at all?

In short: yes.

It all started by admitting that the logic inside the consumer and producer needs to be tested. What does the most standard concierge look like? Well, I’ll assume that something like this:

@Slf4j
@Service
@RequiredArgsConstructor
public class UserConsumer {

private final UserService userService;
private final NotUserService notUserService;
private final UserHandler userHandler;
private final UserMapper userMapper;
private final List<String> userCodes;

@KafkaListener(groupId = "userConsumerGroupId",
clientIdPrefix = "UserConsumer",
topics = {"user-topic"},
containerFactory = "kafkaListenerContainerFactory")
public void consume(UserInfoRecord userInfoRecord) {
log.info("Тут разнообразная логика обработки");
log.info("Cохранение в бд?");
log.info("Работа с внешними сервисами?");
log.info("Вызов мапперов?");
log.info("Отправка информации в Kafka?");
}
}

The consume() method can just hide non-trivial logic, or maybe even trivial, but which you want to check before giving it to the customer.

And what does a standard producer look like? Well, probably something like this:

@Slf4j
@Service
@RequiredArgsConstructor
public class UserProducer {

private final UserService userService;
/** Если у нас какой-то персистинг сообщений перед отправкой или обработка ошибок отправки */
private final KafkaMessageService kafkaMessageService;
/** Если мы просто отправляем - используем KafkaProducer, например, у нас по бОльшей части используетс AVRO */
private final KafkaProducer<String, SpecificRecordBase> kafkaProducer;
/** Но есть места, где используется JSON вместо AVRO */
private final KafkaProducer<String, String> kafkaJsonProducer;
private final ApplicationProperties applicationProperties;

public void sendUserInfoMessage(User user, Set<Integer> managerIds) {
log.info("Тут логика отправки");
log.info("Обычно сначала идет сборка модели для отправки.")
log.info("Но может быть и какая-то более сложная логика.");
var record = UserInfoRecord.newBuilder().build();
log.info("Тут может быть как непосредственно отправка в Kafka");
log.info("Так и реализация outbox-паттерна.");
kafkaMessageService.persistMessageToSend(applicationProperties.userTopic(), record);
}
}

Outbox-pattern can be used, and then before sending, we just put a message in the database that needs to be sent. Or we put only those messages in the database that could not be sent for some reason. Or we don’t put it in the database, but where else we send it. The essence does not change this – we collect the message and send it either to the db or directly to Kafka using KafkaProducer. We do not want to check in each integration test the logic of direct sending of messages, the logic of persistent messages – we do this in the starter of work with Kafka. And in the end services, we believe that everything has been tested and everything works correctly. But I really want to check the logic of the DO, that is, how the model is assembled, whether we put the fields correctly, whether we forgot to add the missing data to the model, whether we call the user’s growth calculator correctly based on the number of letters in his name.

Total: logic in consumers and producers is also asked for unit tests, as well as the rest of the code.

Part 1.5. Intermediate options

For example, use the Kafka test bench, that is, connect to it directly from the tests. I think it’s a bad option when you have a dependence on an external service to perform tests.

This can also include the option of lifting Kafka (at least in a container, for example) directly on the assembly machine (Gitlab Runner in our case). It’s already better than the paragraph above, but this design looks poorly supported.

Part 2. “Just check the logic, forget it on the logs”

Yes, the first stage is just to check the logic, despite the fact that Kafka connection errors are constantly written in the console when the Spring context is raised:

[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] requestId: INFO  o.apache.kafka.clients.NetworkClient - [Consumer clientId=Consumer-0, groupId=app-group] Node -1 disconnected.
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] requestId: WARN  o.apache.kafka.clients.NetworkClient - [Consumer clientId=Consumer-0, groupId=app-group] Connection to node -1 (localhost/127.0.0.1:3333) could not be established. Broker may not be available.
[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] requestId: WARN  o.apache.kafka.clients.NetworkClient - [Consumer clientId=Consumer-0, groupId=app-group] Bootstrap broker localhost:3333 (id: -1 rack: null) disconnected

Perhaps this is quite a working option and these connection errors will not interfere with the work of all tests during assembly. But if there are 5-10 consumers in one application, there are significantly many such logs (for example, there may be more than half of the total volume of logs in tests), and in general such a decision can affect the performance of tests.

Part 3. “Lock you up and that’s it..”

The second advice I heard or saw was just to lock the consumers and/or producers. That is, like this (let’s say, for integration tests we use a certain BaseTest to configure the context):

@SpringJUnitConfig
@SpringBootTest
@EnableConfigurationProperties(ApplicationProperties.class)
public abstract class BaseTest {

@MockBean(name = "kafkaProducer")
protected KafkaProducer kafkaProducer;
@MockBean
protected UserConsumer userConsumer;
}

This will definitely help not to catch the connection errors to Kafka, because neither consumers nor producers will be configured for the context in this case. And they’ll just be locked.

And by the way, this is quite the best option for the producer! Mokaya KafkaProducer – that is, bean sending messages to Kafka, we leave the producer class bean in context and can easily write a test for it:

class UserProducerTest extends BaseTest {

@Autowired
private UserProducer userProducer;

@Test
void testUserProducer() {
log.info("тестируем логику");
userProducer.produce();
}
}

How to write tests for the consumer in this case? Well, for example, like this:

class UserConsumerTest extends BaseTest {

private final UserConsumer userConsumer = new UserConsumer(<все зависимости>);

@Test
void testUserProducer() {
log.info("тестируем логику");
userConsumer.consume();
}
}

Remember that we can have many dependencies on other services/classes in UserConsumer? All of them need to be either prepared here or taken from the context and prepare only the consumer. In any case, it turns out to be a manual preparation of the class.

In total, to lock the producer – ok, to lock the concierge? It is a doubt, but OKE – you will have to initialize the consumers manually (and when you start the application, Spring does it, it turns out that the tests occur in context, but the launch conditions are different).

Part 4. “Just take EmbeddedKafka, that’s what it’s made for!”

Yes, there is an option – to raise EmbeddedKafka for tests. This option is easily configured – we hank annotation for the test:

@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })

And add a dependency to maven (in my case):

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
</dependency>

What are the advantages? Literally one line of setting we have a simple embedded Kafka in tests, which allows you to check the full flow. For example, check that we will accept in the consumer exactly what we sent to the producer.

More information about this and the next version is written, for example, here: https://www.baeldung.com/spring-boot-kafka-testing

I’ll just say the minus I found: the spring-kafka-test dependence pulls transitively dependencies on scala:

  • jackson-module-scala_2.13
  • scala-collection-compat_2.13
  • scala-java8-compat_2.13
  • scala-library
  • scala-logging_2.13
  • scala-reflect

And in general, we could say that “well, okay, well, let it’s only in the tests anyway.” But these libs affect the execution of the code! For example, I made this mistake:

ClassCast class scala.collection.immutable.$colon$colon cannot be cast to class java.util.List (scala.collection.immutable.$colon$colon is in unnamed module of loader 'app'; java.util.List is in module java.base of loader 'bootstrap')

Of course, we did not use this class in the code and, in principle, the scala package. But we use querydsl. And during code generation, a “substand” occurs. Perhaps it can be easily repaired by fine-tuning the maven plugin, but I didn’t want to dig there.

Part 5. “Well, then testcontainers will help you, run Kafka on it!”

It really will help. The latest versions of Spring Boot have good integration with testcontainers, which makes it very easy to configure the raising of containers in tests.

Oh yes, at the beginning I casually mentioned that we use AVRO to describe the structure of messages transmitted by Kafka. That is, we use Schema Registry (SR). So in the tests you need to solve the problem of interaction with SR. There are also several options – lock through WireMock, use the SR test bench, raise a separate one on Gitlab Runner or somewhere else. I tried two options – emulated the real SR through wiremock – cumbersome, I have to throw emulations of new schemes, update the current ones. At some point, we just set up interaction with the company’s centralized test SR. But the difficulty here is that you definitely need to register the scheme in the SR before running the tests. And generally speaking, the tests are run BEFORE the release of projects on the test stand.

To make it easier to connect kafka-testcontainers in different services, I created a separate liba for the test, with the following dependencies:

<dependencies>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<scope>compile</scope>
</dependency>
<!-- Тут у нас просто либа с модельками и зависимостью на AVRO -->
<dependency>
<groupId>ru.alfastrah</groupId>
<artifactId>avro-models</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.wiremock</groupId>
<artifactId>wiremock-standalone</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>kafka</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-testcontainers</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

Well, the main class in the liba is like this:

@ImportTestcontainers
public interface KafkaBaseTest {

Logger log = org.slf4j.LoggerFactory.getLogger(KafkaTestingClass.class);
/** Kafka-контейнер */
KafkaContainer KAFKA_CONTAINER = new KafkaContainer(KAFKA_IMAGE_NAME);

/** Динамическая конфигурация свойств подключения к БД */
@DynamicPropertySource
static void kafkaProperties(DynamicPropertyRegistry registry) {
if (!KAFKA_CONTAINER.isRunning()) {
KAFKA_CONTAINER.start();
var servers = KAFKA_CONTAINER.getBootstrapServers();
log.info("Kafka-контейнер запущен по адресу {} (и адрес положен в свойство bootstrap-servers)", servers);
registry.add("bootstrap-servers", () -> servers);
}
}
}

All this allows you to include in the final service kafka on testcontainers like this (implements KafkaBaseTest):

@SpringJUnitConfig
@SpringBootTest
@EnableConfigurationProperties(value = ApplicationProperties.class)
public abstract class BaseTest implements KafkaBaseTest {
}

Cons:

  • the assembly has become longer, as the container with kafka is now launched;
  • we do not check exactly what we want initially – not the logic of the consumer or producer, but the configuration of interaction with kafka. This, of course, is also important, but;
  • the code in the tests has become quite cumbersome, for example:
class UserProducerTest extends BaseTest {

@Autowired
private KafkaSettings kafkaSettings;
@Autowired
private ApplicationProperties appProperties;
@Autowired
private UserProducer userProducer;
@Autowired
private KafkaListenerContainerFactory kafkaListenerContainerFactory;
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

@Test
void KafkaProducerTest() {
var counter = new AtomicInteger(0);
new KafkaListenerCreator<SomeEvent>(kafkaSettings.groupId(), kafkaListenerContainerFactory, kafkaListenerEndpointRegistry)
.createAndRegisterListener(appProperties.topicName(), "KafkaProducerServiceTestListener", (record) -> {
log.info("Получено сообщение: {}", record);
if ("UserProducerTest".equals(record.getTestData())) {
counter.getAndIncrement();
}
});
AtomicInteger iteration = new AtomicInteger(0);
Arrays.stream(EventType.values()).forEach(eventType -> {
var record = createNewRecord(iteration.getAndIncrement());
userProducer.send(record);
});
await().atMost(Duration.ofSeconds(6L)).untilAsserted(() -> {
assertEquals(EventType.values().length, iteration.get());
assertEquals(2, counter.get());
});
}
}

Part 6. If not everything above, then what?

Getting some difficulties from the option above, the issue of testing logic was still not closed. I plunged once again into the accumulated experience of mankind and found an entertaining thing there, namely: the autoStartup parameter in the @KafkaListener annotation (I didn’t look here right away for some reason).

And then we forget almost everything that is written above and write the following construction:

// В тестах мокаем бины продюсеров сообщений в Kafka
public abstract class BaseTest {

@MockBean(name = "kafkaProducer")
protected KafkaProducer kafkaProducer;
@MockBean(name = "kafkaJsonProducer")
protected KafkaProducer kafkaJsonProducer;
}
@Slf4j
@Service
@RequiredArgsConstructor
public class UserConsumer {

private final UserService userService;
private final NotUserService notUserService;
private final UserHandler userHandler;
private final UserMapper userMapper;
private final List<String> userCodes;

@KafkaListener(
// прописываем по умолчанию и возможность повлиять на него через свойство
autoStartup = "${auto-startup:true}",
groupId = "userConsumerGroupId",
clientIdPrefix = "UserConsumer",
topics = {"user-topic"},
containerFactory = "kafkaListenerContainerFactory")
public void consume(UserInfoRecord userInfoRecord) {
log.info("Тут разнообразная логика обработки");
log.info("Cохранение в бд?");
log.info("Работа с внешними сервисами?");
log.info("Вызов мапперов?");
log.info("Отправка информации в Kafka?");
}
}
# в тестовом application.yml выключаем автозапуск:
auto-startup: false

What we get as a result: the consumer bin in context, when initializing, the concierge does not try to connect to kafka. At the same time, we have validation at the level of “incorrectly set containerFactory” – an error will fall during startup or tests.

And then the consumer test will look like this:

class UserConsumerTest extends BaseTest {

// Просто берем бин из контекста и проверяем метод consume
@Autowired
private UserConsumer userConsumer;

@Test
@DataSet(value = "userConsumer_initial.yml")
@ExpectedDataSet(value = "userConsumer_expected.yml")
void testConsume() {
userConsumer.consume(buildMessage());
}
}

And here’s an example of a producer’s check:

class UserProducerTest extends BaseTest {

@Autowired
private UserProducer userProducer;
@Autowired
private ApplicationProperties appProperties;

@Test
@DataSet(value = {"UserProducerTest.yml"})
void testPolicyPaymentProducer() {
getUsers().forEach(user -> userProducer.sendInfo(user));

verify(kafkaMessageService, times(4)).persistMessageToSend(eq(appProperties.topicName()), argThat((arg) -> {
var record = (UserRecord) arg;
assertEquals(expectedField1, record.getId());
assertEquals(expectedField2, record.getStatus());
assertEquals(expectedField3, record.getUid());
return record.getExpectedSomething() == null;
}));
}
}

So far, I have dwelled on this option as the easiest to describe and use, and at the same time it allows you to check what we initially want – the logic inside producers and consumers.