Partitioning in Kafka Example

DefaultPartitioner is good enough for most cases for sending messages to each partition on a round robin basis to balance out the load. But if you want to control which partition your messages are sent to you need to implement a custom partitioner instead. For this example, let’s assume that we have a retail site that consumers can use to order products anywhere in the world. Based on usage, we know that most consumers are in either the United States or India. We want to partition our application to send orders from the US or India to their own respective consumers, while orders from anywhere else will go to a third consumer.

To start, we’ll create a CountryPartitioner that implements the org.apache.kafka.clients.producer.Partitioner interface. We must implement the following methods:

  1. Kafka will call configure() when we initialize the Partitioner class, with a Map of configuration properties. This method initializes functions specific to the application’s business logic, such as connecting to a database. In this case we want a fairly generic partitioner that takes countryName as a property. We can then use configProperties.put("partitions.0","USA")to map the flow of messages to partitions. In the future we can use this format to change which countries get their own partition.
  2. The Producer API calls partition() once for every message. In this case we’ll use it to read the message and parse the name of the country from the message. If the name of the country is in the countryToPartitionMap, it will return partitionId stored in the Map. If not, it will hash the value of the country and use it to calculate which partition it should go to.
  3. We call close() to shut down the partitioner. Using this method ensures that any resources acquired during initialization are cleaned up during shutdown.

Note that when Kafka calls configure(), the Kafka producer will pass all the properties that we’ve configured for the producer to the Partitioner class. It is essential that we read only those properties that start with partitions., parse them to get the partitionId, and store the ID in countryToPartitionMap.

Below is our custom implementation of the Partitioner interface.

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class CountryPartitioner implements Partitioner {
    private static Map<String,Integer> countryToPartitionMap;

    // This method will gets called at the start, you should use it to do one time startup activity
    public void configure(Map<String, ?> configs) {
        System.out.println("Inside CountryPartitioner.configure " + configs);
        countryToPartitionMap = new HashMap<String, Integer>();
        for(Map.Entry<String,?> entry: configs.entrySet()){
            if(entry.getKey().startsWith("partitions.")){
                String keyName = entry.getKey();
                String value = (String)entry.getValue();
                System.out.println( keyName.substring(11));
                int paritionId = Integer.parseInt(keyName.substring(11));
                countryToPartitionMap.put(value,paritionId);
            }
        }
    }

    //This method will get called once for each message
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes,
                         Cluster cluster) {
        List partitions = cluster.availablePartitionsForTopic(topic);
        String valueStr = (String)value;
        String countryName = ((String) value).split(":")[0];
        if(countryToPartitionMap.containsKey(countryName)){
            //If the country is mapped to particular partition return it
            return countryToPartitionMap.get(countryName);
        }else {
            //If no country is mapped to particular partition distribute between remaining partitions
            int noOfPartitions = cluster.topics().size();
            return  value.hashCode()%noOfPartitions + countryToPartitionMap.size() ;
        }
    }

    // This method will get called at the end and gives your partitioner class chance to cleanup
    public void close() {}
}

 

The Producer class in Listing 2 (below) is very similar to our simple producer from Kafka Producer And Consumer Example, with two changes:

  1. We set a config property with a key equal to the value of ProducerConfig.PARTITIONER_CLASS_CONFIG, which matches the fully qualified name of our CountryPartitioner class. We also set countryNameto partitionId, thus mapping the properties that we want to pass to CountryPartitioner.
    configProperties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,CountryPartitioner.class.getCanonicalName());
    configProperties.put("partition.1","USA");
    configProperties.put("partition.2","India");

     

  2. We pass an instance of a class implementing the org.apache.kafka.clients.producer.Callback interface as a second argument to the producer.send() method. The Kafka client will call its onCompletion() method once a message is successfully published, attaching a RecordMetadata object. We’ll be able to use this object to find out which partition a message was sent to, as well as the offset assigned to the published message.
                producer.send(rec, new Callback() {
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        System.out.println("Message sent to topic ->" + metadata.topic()+ " ,parition->" + metadata.partition() +" stored at offset->" + metadata.offset());
    ;
                    }
                });

     

 

Below is our Producer with the custom Partitioner.

import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.Scanner;

public class Producer {
    private static Scanner in;
    public static void main(String[] argv)throws Exception {
        if (argv.length != 1) {
            System.err.println("Please specify 1 parameters ");
            System.exit(-1);
        }
        String topicName = argv[0];
        in = new Scanner(System.in);
        System.out.println("Enter message(type exit to quit)");

        //Configure the Producer
        Properties configProperties = new Properties();
        configProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
        configProperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.ByteArraySerializer");
        configProperties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");

        configProperties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,CountryPartitioner.class.getCanonicalName());
        configProperties.put("partitions.0","USA");
        configProperties.put("partitions.1","India");

        org.apache.kafka.clients.producer.Producer producer = new KafkaProducer(configProperties);
        String line = in.nextLine();
        while(!line.equals("exit")) {
            ProducerRecord<String, String> rec = new ProducerRecord<String, String>(topicName, line);

            producer.send(rec, new Callback() {
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    System.out.println("Message sent to topic ->" + metadata.topic()+ " ,parition->" + metadata.partition() +" stored at offset->" + metadata.offset());
                }
            });
            line = in.nextLine();
        }
        in.close();
        producer.close();
    }
}

 

 

 

We’ll use automatic assignment for the example application. Most of our consumer code will be the same as it was for the simple consumer seen in Part 1. The only difference is that we’ll pass an instance of ConsumerRebalanceListener as a second argument to our KafkaConsumer.subscribe() method. Kafka will call methods of this class every time it either assigns or revokes a partition to this consumer. We’ll override ConsumerRebalanceListener‘s onPartitionsRevoked() and onPartitionsAssigned() methods and print the list of partitions that were assigned or revoked from this subscriber.

Below is our Consumer with the custom Partitioner.

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import java.lang.reflect.Array;
import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;
import java.util.Scanner;

public class Consumer {
    private static Scanner in;
    private static boolean stop = false;

    public static void main(String[] argv) throws Exception {
        if (argv.length != 2) {
            System.err.printf("Usage: %s <topicName> <groupId>\n",
                    Consumer.class.getSimpleName());
            System.exit(-1);
        }
        in = new Scanner(System.in);
        String topicName = argv[0];
        String groupId = argv[1];

        ConsumerThread consumerThread = new ConsumerThread(topicName, groupId);
        consumerThread.start();
        String line = "";
        while (!line.equals("exit")) {
            line = in.next();
        }
        consumerThread.getKafkaConsumer().wakeup();
        System.out.println("Stopping consumer .....");
        consumerThread.join();
    }

    private static class ConsumerThread extends Thread {
        private String topicName;
        private String groupId;
        private KafkaConsumer<String, String> kafkaConsumer;

        public ConsumerThread(String topicName, String groupId) {
            this.topicName = topicName;
            this.groupId = groupId;
        }

        public void run() {
            Properties configProperties = new Properties();
            configProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            configProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            configProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            configProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

            //Figure out where to start processing messages from
            kafkaConsumer = new KafkaConsumer<String, String>(configProperties);
            kafkaConsumer.subscribe(Arrays.asList(topicName), new ConsumerRebalanceListener() {
                public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                    System.out.printf("%s topic-partitions are revoked from this consumer\n", Arrays.toString(partitions.toArray()));
                }
                public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                    System.out.printf("%s topic-partitions are assigned to this consumer\n", Arrays.toString(partitions.toArray()));
                }
            });
            //Start processing messages
            try {
                while (true) {
                    ConsumerRecords<String, String> records = kafkaConsumer.poll(100);
                    for (ConsumerRecord<String, String> record : records)
                        System.out.println(record.value());
                }
            } catch (WakeupException ex) {
                System.out.println("Exception caught " + ex.getMessage());
            } finally {
                kafkaConsumer.close();
                System.out.println("After closing KafkaConsumer");
            }
        }

        public KafkaConsumer<String, String> getKafkaConsumer() {
            return this.kafkaConsumer;
        }
    }
}

Test the application

We’re ready to run and test the current iteration of our producer/consumer application. As you’ve done previously, you can use the code in Listings 1 through 3, or download the complete source code on GitHub.

  1. Compile and create a fat JAR by invoking: mvn compile assembly:single.
  2. Create a topic named part-demo with three partitions and one replication factor: <KAFKA_HOME>bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic part-demo
  3. Start a producer: java -cp target/KafkaAPIClient-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.partition.Producer part-demo
  4. Start three consumers, then watch the console to see how your partitions are assigned and revoked every time you start a new instance of the consumer: java -cp target/KafkaAPIClient-1.0-SNAPSHOT-jar-with-dependencies.jar com.spnotes.kafka.partition.Consumer part-demo group1
  5. Type some messages into your producer console and verify whether the messages are routed to the correct consumer: USA: First order India: First order USA: Second order France: First order

Figure 2 shows producer/consumer output in the partitioned topic.

Producer/consumer output
Figure 2. Producer/consumer output

Being able to partition a single topic into multiple parts is one essential to Kafka’s scalability. Partitioning lets you scale your messaging infrastructure horizontally while also maintaining order within each partition. Next we’ll look at how Kafka uses message offsets to track and manage complex messaging scenarios.

admin has written 51 articles