Manual offsets in Kafka Consumers Example

The consumer code in Kafka Producer And Consumer Example¬†so far auto-commits records every 5 seconds. Now let’s update the consumer to take a third argument that manually sets your offset consumption.

If you use the value of the last argument equal to 0, the consumer will assume that you want to start from the beginning, so it will call a kafkaConsumer.seekToBeginning() method for each of its partitions. If you pass a value of -1 it will assume that you want to ignore the existing messages and only consume messages published after the consumer has been restarted. In this case it will call kafkaConsumer.seekToEnd() on each of the partitions. Finally, if you specify any value other than 0 or -1 it will assume that you have specified the offset that you want the consumer to start from; for example, if you pass the third value as 5, then on restart the consumer will consume messages with an offset greater than 5. For this it would call kafkaConsumer.seek(<topicname>, <startingoffset>).

Below is the Consumer with adding a third argument to the consumer for the offset decision.

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import java.util.*;

public class Consumer {
    private static Scanner in;

    public static void main(String[] argv)throws Exception{
        if (argv.length != 3) {
            System.err.printf("Usage: %s <topicName> <groupId> <startingOffset>\n",
                    Consumer.class.getSimpleName());
            System.exit(-1);
        }
        in = new Scanner(System.in);

        String topicName = argv[0];
        String groupId = argv[1];
        final long startingOffset = Long.parseLong(argv[2]);

        ConsumerThread consumerThread = new ConsumerThread(topicName,groupId,startingOffset);
        consumerThread.start();
        String line = "";
        while (!line.equals("exit")) {
            line = in.next();
        }
        consumerThread.getKafkaConsumer().wakeup();
        System.out.println("Stopping consumer .....");
        consumerThread.join();

    }

    private static class ConsumerThread extends Thread{
        private String topicName;
        private String groupId;
        private long startingOffset;
        private KafkaConsumer<String,String> kafkaConsumer;

        public ConsumerThread(String topicName, String groupId, long startingOffset){
            this.topicName = topicName;
            this.groupId = groupId;
            this.startingOffset=startingOffset;
        }
        public void run() {
            Properties configProperties = new Properties();
            configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            configProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
            configProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            configProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
            configProperties.put(ConsumerConfig.CLIENT_ID_CONFIG, "offset123");
            configProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
            configProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");

            //Figure out where to start processing messages from
            kafkaConsumer = new KafkaConsumer<String, String>(configProperties);
            kafkaConsumer.subscribe(Arrays.asList(topicName), new ConsumerRebalanceListener() {
                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                    System.out.printf("%s topic-partitions are revoked from this consumer\n", Arrays.toString(partitions.toArray()));
                }
                public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                    System.out.printf("%s topic-partitions are assigned to this consumer\n", Arrays.toString(partitions.toArray()));
                    Iterator<TopicPartition> topicPartitionIterator = partitions.iterator();
                    while(topicPartitionIterator.hasNext()){
                        TopicPartition topicPartition = topicPartitionIterator.next();
                        System.out.println("Current offset is " + kafkaConsumer.position(topicPartition) + " committed offset is ->" + kafkaConsumer.committed(topicPartition) );
                        if(startingOffset == -2) {
                            System.out.println("Leaving it alone");
                        }else if(startingOffset ==0){
                            System.out.println("Setting offset to begining");

                            kafkaConsumer.seekToBeginning(topicPartition);
                        }else if(startingOffset == -1){
                            System.out.println("Setting it to the end ");

                            kafkaConsumer.seekToEnd(topicPartition);
                        }else {
                            System.out.println("Resetting offset to " + startingOffset);
                            kafkaConsumer.seek(topicPartition, startingOffset);
                        }
                    }
                }
            });
            //Start processing messages
            try {
                while (true) {
                    ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
                    for (ConsumerRecord<String, String> record : records) {
                        System.out.println(record.value());
                    }
                    if(startingOffset == -2)
                        kafkaConsumer.commitSync();
                }
            }catch(WakeupException ex){
                System.out.println("Exception caught " + ex.getMessage());
            }finally{
                kafkaConsumer.close();
                System.out.println("After closing KafkaConsumer");
            }
        }
        public KafkaConsumer<String,String> getKafkaConsumer(){
            return this.kafkaConsumer;
        }
    }
}

 

Below is the Producer with adding a third argument for the offset decision.

import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.Scanner;

public class Producer {
    private static Scanner in;
    public static void main(String[] argv)throws Exception {
        if (argv.length != 1) {
            System.err.println("Please specify 1 parameters ");
            System.exit(-1);
        }
        String topicName = argv[0];
        in = new Scanner(System.in);
        System.out.println("Enter message(type exit to quit)");

        //Configure the Producer
        Properties configProperties = new Properties();
        configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
        configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer");
        configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

        org.apache.kafka.clients.producer.Producer producer = new KafkaProducer(configProperties);
        String line = in.nextLine();
        while(!line.equals("exit")) {
            ProducerRecord<String, String> rec = new ProducerRecord<String, String>(topicName, null, line);
            producer.send(rec, new Callback() {
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    System.out.println("Message sent to topic ->" + metadata.topic() +" stored at offset->" + metadata.offset());
                }
            });
            line = in.nextLine();
        }
        in.close();
        producer.close();
    }
}

 

Once your code is ready you can test it by executing following command:

java -cp target/KafkaAPIClient-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.offset.Consumer part-demo group1 0

The Kafka client should print all the messages from an offset of 0, or you could change the value of the last argument to jump around in the message queue.

 

The above Consumer takes groupId as its second parameter. Now we’ll use the groupId parameter to implement both queue and topic use cases for the consumer.

  1. Create a topic named group-test with two partitions: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 –partitions 2 –topic group-test
  2. Start a producer that could be used for publishing messages to the group-test topic that you just created: java -cp target/KafkaDemo-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.partition.Producer group-test
  3. Start three consumers that listen for messages published to the group-test topic. Use group1 for the value of your group id. This will give you three consumers in group1: java -cp target/KafkaDemo-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.simple.Consumer group-test group1
  4. Start a fourth consumer, but this time change the value of the group id to group2. This will give you three consumers in group1 and a single consumer in group2: java -cp target/KafkaDemo-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.simple.Consumer group-test group2
  5. Return to the producer console and start typing messages. Every new message you publish should appear once in the group2 consumer window and once in one of the three group1 consumer windows, as shown in Figure 3.
Consumer group output

admin has written 51 articles