How to Set Up Kafka Integration Test – Grape Up

BySEO Need This Info

Jul 28, 2022 , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , , ,


Do you contemplate device testing as not ample answer for trying to keep the application’s reliability and balance? Are you scared that in some way or somewhere there is a potential bug hiding in the assumption that device tests should go over all cases? And also is mocking Kafka not plenty of for job demands? If even a person remedy is  ‘yes’, then welcome to a wonderful and straightforward guide on how to set up Integration Checks for Kafka employing TestContainers and Embedded Kafka for Spring!

What is TestContainers?

TestContainers is an open up-source Java library specialized in supplying all desired answers for the integration and testing of exterior sources. It signifies that we are equipped to mimic an precise databases, internet server, or even an occasion bus setting and treat that as a responsible place to examination app performance. All these fancy attributes are hooked into docker photos, defined as containers. Do we require to take a look at the databases layer with actual MongoDB? No problems, we have a check container for that. We can not also neglect about UI tests – Selenium Container will do something that we essentially want.
In our scenario, we will aim on Kafka Testcontainer.

What is Embedded Kafka?

As the title indicates, we are heading to deal with an in-memory Kafka instance, all set to be applied as a normal broker with complete performance. It makes it possible for us to perform with producers and shoppers, as standard, building our integration exams light-weight. 

Right before we begin

The principle for our check is simple – I would like to test Kafka consumer and producer making use of two diverse strategies and look at how we can use them in genuine conditions. 

Kafka Messages are serialized making use of Avro schemas.

Embedded Kafka – Producer Test

The strategy is effortless – let’s create a simple task with the controller, which invokes a assistance strategy to press a Kafka Avro serialized message.


implementation "org.apache.avro:avro:1.10.1"
implementation 'org.springframework.boot:spring-boot-starter-validation'
implementation 'org.springframework.kafka:spring-kafka'

implementation 'org.projectlombok:lombok:1.18.16'

compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.kafka:spring-kafka-test'

Also really worth mentioning amazing plugin for Avro. Listed here plugins part:

id 'org.springframework.boot' variation '2.6.8'
id 'io.spring.dependency-management' version '1..11.RELEASE'
id 'java'
id "com.github.davidmc24.gradle.plugin.avro" model "1.3."

Avro Plugin supports schema vehicle-generating. This is a ought to-have.

Url to plugin:

Now let us define the Avro schema:

  "namespace": "com.grapeup.myawesome.myawesomeproducer",
  "type": "file",
  "title": "RegisterRequest",
  "fields": [
    "name": "id", "type": "long",
    "name": "address", "type": "string", "": "String"


Our ProducerService will be targeted only on sending messages to Kafka using a template, absolutely nothing interesting about that part. Major functionality can be carried out just making use of this line:

ListenableFuture> long run = this.kafkaTemplate.send out("register-ask for", kafkaMessage)

We just can’t neglect about exam properties:

  most important:
    permit-bean-definition-overriding: legitimate
      team-id: team_id
      car-offset-reset: earliest
      important-deserializer: org.apache.kafka.frequent.serialization.StringDeserializer
      worth-deserializer: com.grapeup.myawesome.myawesomeconsumer.prevalent.CustomKafkaAvroDeserializer
      car.sign-up.schemas: correct
      crucial-serializer: org.apache.kafka.frequent.serialization.StringSerializer
      price-serializer: com.grapeup.myawesome.myawesomeconsumer.frequent.CustomKafkaAvroSerializer
      unique.avro.reader: correct

As we see in the described examination attributes, we declare a custom deserializer/serializer for KafkaMessages. It is highly encouraged to use Kafka with Avro – never enable JSONs preserve item structure, let us use civilized mapper and object definition like Avro.


community course CustomKafkaAvroSerializer extends KafkaAvroSerializer 
    community CustomKafkaAvroSerializer() 
        super.schemaRegistry = new MockSchemaRegistryClient()

    public CustomKafkaAvroSerializer(SchemaRegistryClient shopper) 
        tremendous(new MockSchemaRegistryClient())

    community CustomKafkaAvroSerializer(SchemaRegistryClient client, Map props) 
        tremendous(new MockSchemaRegistryClient(), props)


general public class CustomKafkaAvroSerializer extends KafkaAvroSerializer 
    public CustomKafkaAvroSerializer() 
        tremendous.schemaRegistry = new MockSchemaRegistryClient()

    public CustomKafkaAvroSerializer(SchemaRegistryClient shopper) 
        super(new MockSchemaRegistryClient())

    general public CustomKafkaAvroSerializer(SchemaRegistryClient client, Map props) 
        super(new MockSchemaRegistryClient(), props)

And we have anything to start producing our check.

@TestInstance(TestInstance.Lifecycle.For each_Course)
@EmbeddedKafka(partitions = 1, topics = "register-request")
course ProducerControllerTest {

All we need to have to do is add @EmbeddedKafka annotation with mentioned subject areas and partitions. Software Context will boot Kafka Broker with delivered configuration just like that. Preserve in thoughts that @TestInstance need to be utilized with exclusive thought. Lifecycle.For each_Course will steer clear of developing the identical objects/context for each and every examination approach. Truly worth examining if exams are much too time-consuming.

Shopper consumerServiceTest
void Setup() 
DefaultKafkaConsumerFactory buyer = new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties()

consumerServiceTest = consumer.createConsumer()
consumerServiceTest.subscribe(Collections.singletonList(Subject matter_Name))

Below we can declare the examination customer, dependent on the Avro schema return style. All Kafka houses are now provided in the .yml file. That shopper will be utilised as a check out if the producer basically pushed a concept.

In this article is the precise test approach:

void whenValidInput_therReturns200() throws Exception 
        RegisterRequestDto request = RegisterRequestDto.builder()
                .deal with("tempAddress")

                submit("/sign-up-ask for")

      ConsumerRecord consumedRegisterRequest =  KafkaTestUtils.getSingleRecord(consumerServiceTest, Topic_Name)

        RegisterRequest valueReceived = consumedRegisterRequest.worth()

        assertEquals(12, valueReceived.getId())
        assertEquals("tempAddress", valueReceived.getAddress())

To start with of all, we use MockMvc to carry out an motion on our endpoint. That endpoint employs ProducerService to force messages to Kafka. KafkaConsumer is employed to verify if the producer labored as envisioned. And that is it – we have a entirely operating take a look at with embedded Kafka.

Check Containers – Purchaser Exam

TestContainers are nothing at all else like independent docker visuals completely ready for remaining dockerized. The next examination scenario will be improved by a MongoDB image. Why not keep our info in the database proper immediately after something transpired in Kafka circulation?

Dependencies are not considerably diverse than in the prior instance. The adhering to methods are necessary for check containers:

testImplementation 'org.testcontainers:junit-jupiter'
testImplementation 'org.testcontainers:kafka'
testImplementation 'org.testcontainers:mongodb'

set('testcontainersVersion', "1.17.1")

mavenBom "org.testcontainers:testcontainers-bom:$testcontainersVersion"

Let us emphasis now on the Consumer part. The exam circumstance will be straightforward – a person customer company will be dependable for acquiring the Kafka message and storing the parsed payload in the MongoDB selection. All that we want to know about KafkaListeners, for now, is that annotation:

@KafkaListener(topics = "sign up-request")

By the features of the annotation processor, KafkaListenerContainerFactory will be responsible to create a listener on our technique. From this minute our system will respond to any impending Kafka message with the outlined subject matter.

Avro serializer and deserializer configs are the exact same as in the previous exam.

About TestContainer, we really should start off with the following annotations:

community class AbstractIntegrationTest {

Through startup, all configured TestContainers modules will be activated. It signifies that we will get obtain to the complete functioning surroundings of the chosen source. As instance:

non-public KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry

public static KafkaContainer kafkaContainer = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))

static MongoDBContainer mongoDBContainer = new MongoDBContainer("mongo:4.4.2").withExposedPorts(27017)

As a consequence of booting the take a look at, we can count on two docker containers to start out with the supplied configuration.

What is actually vital for the mongo container – it offers us full entry to the database applying just a basic relationship uri. With such a function, we are in a position to consider a seem what is the existing condition in our collections, even throughout debug mode and well prepared breakpoints.
Just take a glance also at the Ryuk container – it is effective like overwatch and checks if our containers have started out accurately.

And listed here is the last element of the configuration:

static void dataSourceProperties(DynamicPropertyRegistry registry) 
   registry.increase("spring.kafka.bootstrap-servers", kafkaContainer::getBootstrapServers)
   registry.add("spring.kafka.shopper.bootstrap-servers", kafkaContainer::getBootstrapServers)
   registry.incorporate("spring.kafka.producer.bootstrap-servers", kafkaContainer::getBootstrapServers)
   registry.add("", mongoDBContainer::getReplicaSetUrl)

   kafkaContainer.get started()
   mongoDBContainer.start off()

   mongoDBContainer.waitingFor(Hold out.forListeningPort()

general public void beforeTest() 

           messageListenerContainer -> 
                       .waitForAssignment(messageListenerContainer, 1)


static void tearDown() 

DynamicPropertySource provides us the choice to established all required natural environment variables all through the take a look at lifecycle. Strongly needed for any config uses for TestContainers. Also, beforeTestClass kafkaListenerEndpointRegistry waits for each individual listener to get anticipated partitions throughout container startup.

And the past aspect of the Kafka check containers journey – the major overall body of the take a look at:

community void containerStartsAndPublicPortIsAvailable() throws Exception 
   writeToTopic("register-ask for", RegisterRequest.newBuilder().setId(123).setAddress("dummyAddress").construct())

   //Hold out for KafkaListener
   Assertions.assertEquals(1, taxiRepository.findAll().dimension())

non-public KafkaProducer createProducer() 
   return new KafkaProducer<>(kafkaProperties.buildProducerProperties())

non-public void writeToTopic(String topicName, RegisterRequest... registerRequests) 

   consider (KafkaProducer producer = createProducer())
               .forEach(registerRequest -> 
                           ProducerRecord report = new ProducerRecord<>(topicName, registerRequest)
                           producer.send out(report)

The custom made producer is liable for producing our message to KafkaBroker. Also, it is recommended to give some time for consumers to deal with messages properly. As we see, the message was not just consumed by the listener, but also stored in the MongoDB selection.


As we can see, existing solutions for integration checks are fairly quick to put into action and manage in initiatives. There is no level in retaining just unit tests and counting on all strains covered as a indicator of code/logic excellent. Now the question is, really should we use an Embedded solution or TestContainers? I suggest initially of all concentrating on the word “Embedded”. As a excellent integration examination, we want to get an virtually ideal duplicate of the output ecosystem with all properties/attributes included. In-memory alternatives are great, but mainly, not more than enough for massive small business tasks. Certainly, the edge of Embedded expert services is the straightforward way to apply these tests and maintain configuration, just when something comes about in memory.
TestContainers at the first sight may seem like overkill, but they give us the most crucial attribute, which is a different setting. We really don’t have to even rely on existing docker photographs – if we want we can use customized ones. This is a enormous advancement for possible check scenarios.
What about Jenkins? There is no cause to be worried also to use TestContainers in Jenkins. I firmly endorse checking TestContainers documentation on how simply we can set up the configuration for Jenkins agents.
To sum up – if there is no blocker or any unwelcome condition for employing TestContainers, then do not hesitate. It is normally great to keep all providers managed and secured with integration take a look at contracts.


Supply hyperlink