Kafka Producer And Consumer Example

A simple producer/consumer application

The Kafka producer will retrieve user input from the console and send each new line as a message to a Kafka server. The consumer will retrieve messages for a given topic and print them to the console. The producer and consumer components in this case are your own implementations of kafka-console-producer.sh and kafka-console-consumer.sh.

Let’s start by creating a Producer.java class. This client class contains logic to read user input from the console and send that input as a message to the Kafka server.

Import the dependencies from the maven pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.empeccable.kafka</groupId>
    <artifactId>KafkaAPIClient</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.9.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.12</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.12</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.17</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>com.spnotes.kafka.simple.Producer</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

 

The Kafka producer

After filling the Properties class with the necessary configuration properties, we can use it to create an object of KafkaProducer. Whenever we want to send a message to the Kafka server after that, we’ll create an object of ProducerRecordand call the KafkaProducer‘s send() method with that record to send the message. The ProducerRecord takes two parameters: the name of the topic to which message should be published, and the actual message. Don’t forget to call the Producer.close() method when you’re done using the producer:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.Scanner;

public class Producer {
 private static Scanner in;
 public static void main(String[] argv)throws Exception {
 if (argv.length != 1) {
 System.err.println("Please specify 1 parameters ");
 System.exit(-1);
 }
 String topicName = argv[0];
 in = new Scanner(System.in);
 System.out.println("Enter message(type exit to quit)");

 //Configure the Producer
 Properties configProperties = new Properties();
 configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
 configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer");
 configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

 org.apache.kafka.clients.producer.Producer producer = new KafkaProducer(configProperties);
 String line = in.nextLine();
 while(!line.equals("exit")) {
 //TODO: Make sure to use the ProducerRecord constructor that does not take parition Id
 ProducerRecord<String, String> rec = new ProducerRecord<String, String>(topicName,line);
 producer.send(rec);
 line = in.nextLine();
 }
 in.close();
 producer.close();
 }
}

ConfiguringĀ theĀ messageĀ consumer

Next we’ll create a simple consumer that subscribes to a topic.

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;
import java.util.Arrays;
import java.util.Properties;
import java.util.Scanner;

public class Consumer {
    private static Scanner in;

    public static void main(String[] argv)throws Exception{
        if (argv.length != 2) {
            System.err.printf("Usage: %s <topicName> <groupId>\n",
                    Consumer.class.getSimpleName());
            System.exit(-1);
        }
        in = new Scanner(System.in);
        String topicName = argv[0];
        String groupId = argv[1];

        ConsumerThread consumerRunnable = new ConsumerThread(topicName,groupId);
        consumerRunnable.start();
        String line = "";
        while (!line.equals("exit")) {
            line = in.next();
        }
        consumerRunnable.getKafkaConsumer().wakeup();
        System.out.println("Stopping consumer .....");
        consumerRunnable.join();
    }

    private static class ConsumerThread extends Thread{
        private String topicName;
        private String groupId;
        private KafkaConsumer<String,String> kafkaConsumer;

        public ConsumerThread(String topicName, String groupId){
            this.topicName = topicName;
            this.groupId = groupId;
        }
        public void run() {
            Properties configProperties = new Properties();
            configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            configProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            configProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            configProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            configProperties.put(ConsumerConfig.CLIENT_ID_CONFIG, "simple");

            //Figure out where to start processing messages from
            kafkaConsumer = new KafkaConsumer<String, String>(configProperties);
            kafkaConsumer.subscribe(Arrays.asList(topicName));
            //Start processing messages
            try {
                while (true) {
                    ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
                    for (ConsumerRecord<String, String> record : records)
                        System.out.println(record.value());
                }
            }catch(WakeupException ex){
                System.out.println("Exception caught " + ex.getMessage());
            }finally{
                kafkaConsumer.close();
                System.out.println("After closing KafkaConsumer");
            }
        }
        public KafkaConsumer<String,String> getKafkaConsumer(){
           return this.kafkaConsumer;
        }
    }
}

 

Consumer and ConsumerThread

Writing the consumer code in Listing 2 in two parts ensures that we close the Consumer object before exiting. I’ll describe each class in turn. First, ConsumerThread is an inner class that takes a topic name and group name as its arguments. In the run() method it creates a KafkaConsumer object, with appropriate properties. It subscribes to the topic that was passed as an argument in the constructor, by calling the kafkaConsumer.subscribe() method, then polls the Kafka server every 100 milliseconds to check if there are any new messages in the topic. It will iterate through the list of any new messages and print them to the console.

In the Consumer class we create a new object of ConsumerThread and start it in a different thread. The ConsumerThead starts an infinite loop and keeps polling the topic for new messages. Meanwhile in the Consumer class, the main thread waits for a user to enter exit on the console. Once a user enters exit, it calls the KafkaConsumer.wakeup() method, causing the KafkaConsumer to stop polling for new messages and throw a WakeupException. We can then close the KafkaConsumer gracefully, by calling kafkaConsumer‘s close() method.

Run the application

To test this application you can run the code in Listings 1 and 2 from your IDE, or you can follow these steps:

  1. Download the sample code, KafkaAPIClient, by executing the command: git clone https://github.com/dawudr/KafkaAPIClient.git.
  2. Compile the code and create a fat JAR with the command: mvn clean compile assembly:single.
  3. Start the consumer: java -cp target/KafkaAPIClient-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.simple.Consumer test group1.
  4. Start the producer: java -cp target/KafkaAPIClient-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.simple.Producer test.
  5. Enter a message in the producer console and check to see whether that message appears in the consumer. Try a few messages.
  6. Type exit in the consumer and producer consoles to close them.
Figure 2:
Figure 2. A Kafka producer/consumer application

 

admin has written 51 articles