Steps to read Kafka topic messages from Test-consumer in Karate Framework

Priyanka Brahmane
6 min readDec 18, 2020

Hello Folks !!! I am back with my 2nd blog on how to read Kafka topic from test-consumer in Karate API Automation framework.

On reading blog title, what is the first thing that comes in your mind ?? Hmm.. Let me guess !!

What is Apache Kafka ?

Apache Kafka was originally developed by LinkedIn, and was subsequently open sourced in early 2011. This software platform is currently developed and owned by Apache Software Foundation and it is written in Scala and Java.

Apache Kafka is a distributed streaming platform for building real-time data pipelines and real-time streaming applications.

In simple terms, is a message bus optimized for high-access data replays and streams i.e. Like a publish-subscribe system that can deliver in-order, persistent messages in a scalable way.

The robust message broker allows applications to continually process and re-process stream data.

This open-source platform uses an uncomplicated and easy routing approach that engages a routing key in sending messages to a topic.

How does Apache Kafka work?

Kafka allows you to send messages between applications in distributed systems. The sender can send messages to Kafka, while the recipient gets messages from the stream published by Kafka.

Messages are grouped into topics — a primary Kafka’s abstraction. The sender (producer) sends messages on a specific topic. The recipient (consumer) receives all messages on a specific topic from many senders. Any message from a given topic sent by any sender will go to every recipient who is listening to that topic.

Message flow from producer to consumer via topic

By now you must have acquired basic idea of Kafka and its working mechanism.

How can we create a test-consumer for reading Kafka topic messages in Karate API Automation Framework ?

To achieve this, we have to follow below steps:

Step 1 : Create a Kafka Consumer java class with the default properties. A consumer starts listening to the topic as soon as it is created. This class will consist of all necessary methods to read and process available Kafka records in the Queue, close consumer after use etc.

public class KarateKafkaConsumer implements Runnable {private static Logger logger = LoggerFactory.getLogger(KarateKafkaConsumer.class.getName());private KafkaConsumer<Object, Object> kafka;private CountDownLatch startupLatch = new CountDownLatch(1);private CountDownLatch shutdownLatch = new CountDownLatch(1);private boolean partitionsAssigned = false;private Pattern keyFilter; // Java regular expressionprivate String valueFilter; // Json path expressionprivate BlockingQueue<String> outputList = new LinkedBlockingQueue<>();public KarateKafkaConsumer(String kafkaTopic, Map<String, String> consumerProperties) {this(kafkaTopic, consumerProperties, null, null);}public KarateKafkaConsumer(String kafkaTopic) {this(kafkaTopic, null, null);}public KarateKafkaConsumer( String kafkaTopic, String keyFilterExpression, String valueFilterExpression) {Properties cp = getDefaultProperties();setKeyValueFilters(keyFilterExpression, valueFilterExpression);create(kafkaTopic, cp);}public KarateKafkaConsumer(String kafkaTopic,Map<String, String> consumerProperties,String keyFilterExpression,String valueFilterExpression) {setKeyValueFilters(keyFilterExpression, valueFilterExpression);Properties cp = new Properties();for (String key : consumerProperties.keySet()) {String value = consumerProperties.get(key);cp.setProperty(key, value);}create(kafkaTopic, cp);}// All constructors eventually call this ....private void create(String kafkaTopic, Properties cp) {// Create the consumer and subscribe to the topickafka = new KafkaConsumer<Object, Object>(cp);kafka.subscribe(Collections.singleton(kafkaTopic));// Start the thread which will poll the topic for data.Thread t = new Thread(this);t.start();// And we will wait until topics have been assigned to this consumer// Once topics have been assigned to this consumer, the latch is set. Until then we wait ...// and wait ... and wait ...logger.info("Waiting for consumer to be ready..");try {startupLatch.await();} catch (InterruptedException e) {e.printStackTrace();}logger.info("consumer is ready");}/*** Sets the predicate to filter kafka records based on key or/and value** @param keyFilterExpression Java regular expression pattern* @param valueFilterExpression <a href="https://github.com/json-path/JsonPath">JsonPath</a>*     expression*/private void setKeyValueFilters(String keyFilterExpression, String valueFilterExpression) {if (!isEmpty(keyFilterExpression)) {this.keyFilter = Pattern.compile(keyFilterExpression);}if (!isEmpty(valueFilterExpression)) {this.valueFilter = valueFilterExpression;}}public static Properties getDefaultProperties() {// Consumer ConfigurationProperties cp = new Properties();cp.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");cp.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());cp.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());cp.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer");cp.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");cp.put("consumer.timeout.ms", "5000");return cp;}public void close() {logger.info("asking consumer to shutdown ...");kafka.wakeup();try {shutdownLatch.await();} catch (InterruptedException e) {e.printStackTrace();}}public void signalWhenReady() {if (!partitionsAssigned) {logger.info("checking partition assignment");Set<TopicPartition> partitions = kafka.assignment();if (partitions.size() > 0) {partitionsAssigned = true;logger.info("partitions assigned to consumer ...");startupLatch.countDown();}}/*** @return The next available kafka record in the Queue (head of the queue). If no record is available, then the call is blocked.* @throws InterruptedException - if interrupted while waiting*/public synchronized String take() throws InterruptedException {logger.info("take() called");return outputList.take(); // wait if necessary for data to become available}/*** @param n  The number of records to read* @return The next available kafka record in the Queue (head of the queue). If no record is available, then the call is blocked.* @throws InterruptedException - if interrupted while waiting*/public synchronized String take(int n) throws InterruptedException {logger.info("take() called");List<String> list = new ArrayList<>();for(int i=0; i<n; i++){list.add(outputList.take()); // wait if necessary for data to become available}// We want to return a String that can be interpreted by Karate as a JSONString str = list.toString();return str;}

Step 2 : As test-consumer will be consuming message stream from producer topic, we have to create a JSON Deserializer java class on Kafka consumer side which will deserialize message obtained from Kafka topic. (The Deserialization is a process of converting the serialized object back into a copy of an object)

import com.fasterxml.jackson.databind.ObjectMapper;import org.apache.kafka.common.errors.SerializationException;import org.apache.kafka.common.serialization.Deserializer;import java.io.IOException;import java.util.Map;public class JsonDeserializer<T> implements Deserializer<T> {private final ObjectMapper objectMapper = new ObjectMapper();private final Class<T> theClass;public JsonDeserializer(Class<T> theClass) {this.theClass = theClass;}@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {}@Overridepublic T deserialize(String topic, byte[] data) {try {return this.objectMapper.readValue(data, theClass);} catch (IOException e) {throw new SerializationException(e);}}@Overridepublic void close() {}}

Step 3 : Create a feature file that calls Kafka consumer class

Feature: Kafka testBackground:And def KafkaConsumer = Java.type('kafka.KarateKafkaConsumer')And def topic = 'your_kafka_topic_name'Scenario: Consume message from Kafka topic by Test-consumer "
"
"
# Print the default properties for consumerAnd def props = KafkaConsumer.getDefaultProperties()And def kc = new KafkaConsumer(topic)# To read messages from topicAnd json kafka_output = kc.take() # To read specified number of message from Kafka-topicAnd json kafka_output = kc.take(4)# Don't forget to close kafka-topic before matching. Otherwise if #the test fails you will not be able to close the consumerAnd kc.close()# Get required key-value pairs from Kafka topicAnd def kafka_obj = karate.jsonPath(kafka_output,"$..['provider_code','provider_name','provider_count']")And match api_expected_pairs == '#(^*kafka_obj)'

We can also define a consumer with customized properties as follows :

And def props = { ... }
And def kc = new KafkaConsumer(topic, props)

Step 4 : Run above feature file using TestRunner class

import org.junit.runner.RunWith;
import com.intuit.karate.KarateOptions;
import com.intuit.karate.junit4.Karate;
@RunWith(Karate.class)
@KarateOptions(features = {"classpath:KafkaTopicValidation.feature"},
tags="~@ignore")

public class KafkaRunner {
}

Thanks for reading !!!

Was this blog useful to you? I’d be glad to hear your feedback, so don’t hold back. If you are interested in Automating your Backend service APIs using Karate framework or just have any questions related to KarateDSL/Selenium/BDD framework, you can follow me on LinkedIn. Happy Automation !!!

--

--

Priyanka Brahmane

AM SDET Automation @M&G | QA Lead @ MyGlamm | QA Automation engineer @ Ex-Paytm Insider| Ex-Automation Tester @ Reliance Jio Infocomm Ltd. | Ex-Software Develop