Kafka

Apache Kafka is messaging system built to scale for big data. Similar to Apache ActiveMQ or RabbitMq, Kafka enables applications built on different platforms to communicate via asynchronous message passing. But Kafka differs from these more traditional messaging systems in key ways:

  • It’s designed to scale horizontally, by adding more commodity servers.
  • It provides much higher throughput for both producer and consumer processes.
  • It can be used to support both batch and real-time use cases.
  • It doesn’t support JMS, Java’s message-oriented middleware API.

Website

  • Apache Kafka

    Apache Kafka: A Distributed Streaming Platform.

    Source: kafka.apache.org/

  • Confluent: Streaming Platform | Apache Kafka for the Enterprise

    Confluent, founded by the creators of Apache Kafka, delivers a complete execution of Kafka for the Enterprise, to help you run your business in real time.

    Source: www.confluent.io/

  • Kafka Connect | Confluent

    Kafka Connect integrates Apache Kafka with other systems and makes it easy to add new systems to your scalable and secure stream data pipelines.

    Source: www.confluent.io/product/connectors/

Getting Started

  1. Visit the Kafka download page to install the most recent version (0.9 as of this writing).
  2. Extract the binaries into a software/kafka folder. For the current version it’s software/kafka_2.11-0.9.0.0.
  3. Change your current directory to point to the new folder.
  4. Start the Zookeeper server by executing the command: bin/zookeeper-server-start.sh config/zookeeper.properties.
  5. Start the Kafka server by executing: bin/kafka-server-start.sh config/server.properties.
  6. Create a test topic that you can use for testing: bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic javaworld.
  7. Start a simple console consumer that can consume messages published to a given topic, such as javaworld: bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic javaworld --from-beginning.
  8. Start up a simple producer console that can publish messages to the test topic: bin/kafka-console-producer.sh --broker-list localhost:9092 --topic javaworld.
  9. Try typing one or two messages into the producer console. Your messages should show in the consumer console.
  • Windows platforms use bin\windows\ instead of bin/, and change the script extension to .bat

Kafka’s architecture

Basic terminology:

  • A producer is process that can publish a message to a topic.
  • a consumer is a process that can subscribe to one or more topics and consume messages published to topics.
  • A topic category is the name of the feed to which messages are published.
  • A broker is a process running on single machine.
  • A cluster is a group of brokers working together.
Figure 1: Kafka's architecture
Figure 1. Architecture of a Kafka message system

Kafka’s architecture is very simple, which can result in better performance and throughput in some systems. Every topic in Kafka is like a simple log file. When a producer publishes a message, the Kafka server appends it to the end of the log file for its given topic. The server also assigns an offset, which is a number used to permanently identify each message. As the number of messages grows, the value of each offset increases; for example if the producer publishes three messages the first one might get an offset of 1, the second an offset of 2, and the third an offset of 3.

When the Kafka consumer first starts, it will send a pull request to the server, asking to retrieve any messages for a particular topic with an offset value higher than 0. The server will check the log file for that topic and return the three new messages. The consumer will process the messages, then send a request for messages with an offset higher than 3, and so on.

In Kafka, the client is responsible for remembering the offset count and retrieving messages.The Kafka server doesn’t track or manage message consumption. By default, a Kafka server will keep a message for seven days (to change in log.retention.hours=168 in server.properties). A background thread in the server checks and deletes messages that are seven days or older. A consumer can access messages as long as they are on the server. It can read a message multiple times, and even read messages in reverse order of receipt. But if the consumer fails to retrieve the message before the seven days are up, it will miss that message.

 

Use Cases

Kafka architecture can be extended to integrate with data sources and data ingestion platform. Kafka connect is a set of certified connectors provided by Confluent that extend Kafka to communicate from various database and messaging platform vendors.

Confluent Platform

Kafka Connect is a framework included in Apache Kafka that integrates Kafka with other systems. Its purpose is to make it easy to add new systems to your scalable and secure stream data pipelines.

To copy data between Kafka and another system, users instantiate Kafka Connectors for the systems they want to pull data from or push data to. Source Connectors import data from another system (e.g. a relational database into Kafka) and Sink Connectors export data (e.g. the contents of a Kafka topic to an HDFS file).

 

Creating the message producer

We configure the producer by creating an object from the java.util.Propertiesclass and setting its properties. The ProducerConfig class defines all the different properties available, but Kafka’s default values are sufficient for most uses. For the default config we only need to set three mandatory properties:

  • BOOTSTRAP_SERVERS_CONFIG
  • KEY_SERIALIZER_CLASS_CONFIG
  • VALUE_SERIALIZER_CLASS_CONFIG

BOOTSTRAP_SERVERS_CONFIG (bootstrap.servers) sets a list of host:port pairs used for establishing the initial connections to the Kakfa cluster in the host1:port1,host2:port2,... format. Even if we have more than one broker in our Kafka cluster, we only need to specify the value of the first broker’s host:port. The Kafka client will use this value to make a discover call on the broker, which will return a list of all the brokers in the cluster. It’s a good idea to specify more than one broker in the BOOTSTRAP_SERVERS_CONFIG, so that if that first broker is down the client will be able to try other brokers.

The Kafka server expects messages in byte[] key, byte[] value format. Rather than converting every key and value, Kafka’s client-side library permits us to use friendlier types like String and int for sending messages. The library will convert these to the appropriate type. For example, the sample app doesn’t have a message-specific key, so we’ll use null for the key. For the value we’ll use a String, which is the data entered by the user on the console.

To configure the message key, we set a value of KEY_SERIALIZER_CLASS_CONFIGon the org.apache.kafka.common.serialization.ByteArraySerializer. This works because null doesn’t need to be converted into byte[]. For the message value, we set VALUE_SERIALIZER_CLASS_CONFIG on the org.apache.kafka.common.serialization.StringSerializer, because that class knows how to convert a String into a byte[].

See Kafka Producer And Consumer Example

 

Configuring the message consumer

Whenever a new message is published to the topic, it will read that message and print it to the console. The consumer code is quite similar to the producer code. We start by creating an object of java.util.Properties, setting its consumer-specific properties, and then using it to create a new object of KafkaConsumer. The ConsumerConfig class defines all the properties that we can set. There are just four mandatory properties:

  • BOOTSTRAP_SERVERS_CONFIG (bootstrap.servers)
  • KEY_DESERIALIZER_CLASS_CONFIG (key.deserializer)
  • VALUE_DESERIALIZER_CLASS_CONFIG (value.deserializer)
  • GROUP_ID_CONFIG (bootstrap.servers)

Just as we did for the producer class, we’ll use BOOTSTRAP_SERVERS_CONFIG to configure the host/port pairs for the consumer class. This config lets us establish the initial connections to the Kakfa cluster in the host1:port1,host2:port2,...format.

As I previously noted, the Kafka server expects messages in byte[] key and byte[] value formats, and has its own implementation for serializing different types into byte[]. Just as we did with the producer, on the consumer side we’ll have to use a custom deserializer to convert byte[] back into the appropriate type.

In the case of the example application, we know the producer is using ByteArraySerializer for the key and StringSerializer for the value. On the client side we therefore need to use org.apache.kafka.common.serialization.ByteArrayDeserializer for the key and org.apache.kafka.common.serialization.StringDeserializer for the value. Setting those classes as values for KEY_DESERIALIZER_CLASS_CONFIG and VALUE_DESERIALIZER_CLASS_CONFIG will enable the consumer to deserialize byte[] encoded types sent by the producer.

See: Kafka Producer And Consumer Example

 

 

Partitions in Kafka

Topics in Kafka can be subdivided into partitions. For example, while creating a topic named Demo, you might configure it to have three partitions. The server would create three log files, one for each of the demo partitions. When a producer published a message to the topic, it would assign a partition ID for that message. The server would then append the message to the log file for that partition only.

If you then started two consumers, the server might assign partitions 1 and 2 to the first consumer, and partition 3 to the second consumer. Each consumer would read only from its assigned partitions. You can see the Demo topic configured for three partitions in Figure 1.

A partitioned topic in Apache Kafka
Figure 1. A partitioned topic in Apache Kafka

To expand the scenario, imagine a Kafka cluster with two brokers, housed in two machines. When you partitioned the demo topic, you would configure it to have two partitions and two replicas. For this type of configuration, the Kafka server would assign the two partitions to the two brokers in your cluster. Each broker would be the leader for one of the partitions.

When a producer published a message, it would go to the partition leader. The leader would take the message and append it to the log file on the local machine. The second broker would passively replicate that commit log to its own machine. If the partition leader went down, the second broker would become the new leader and start serving client requests. In the same way, when a consumer sent a request to a partition, that request would go first to the partition leader, which would return the requested messages.

Benefits of partitioning

Consider the benefits of partitioning a Kafka-based messaging system:

  1. Scalability: In a system with just one partition, messages published to a topic are stored in a log file, which exists on a single machine. The number of messages for a topic must fit into a single commit log file, and the size of messages stored can never be more than that machine’s disk space. Partitioning a topic lets you scale your system by storing messages on different machines in a cluster. If you wanted to store 30 gigabytes (GB) of messages for the Demo topic, for instance, you could build a Kafka cluster of three machines, each with 10 GB of disk space. Then you would configure the topic to have three partitions.
  2. Server-load balancing: Having multiple partitions lets you spread message requests across brokers. For example, If you had a topic that processed 1 million messages per second, you could divide it into 100 partitions and add 100 brokers to your cluster. Each broker would be the leader for single partition, responsible for responding to just 10,000 client requests per second.
  3. Consumer-load balancing: Similar to server-load balancing, hosting multiple consumers on different machine lets you spread the consumer load. Let’s say you wanted to consume 1 million messages per second from a topic with 100 partitions. You could create 100 consumers and run them in parallel. The Kafka server would assign one partition to each of the consumers, and each consumer would process 10,000 messages in parallel. Since Kafka assigns each partition to only one consumer, within the partition each message would be consumed in order.

 

Two ways to partition

The producer is responsible for deciding what partition a message will go to. The producer has two options for controlling this assignment:

  • Custom partitioner: You can create a class implementing the org.apache.kafka.clients.producer.Partitioner interface. This custom Partitioner will implement the business logic to decide where messages are sent. See: Partitioning in Kafka Example
  • DefaultPartitioner: If you don’t create a custom partitioner class, then by default the org.apache.kafka.clients.producer.internals.DefaultPartitionerclass will be used. The default partitioner is good enough for most cases, providing three options:
    1. Manual: When you create a ProducerRecord, use the overloaded constructor new ProducerRecord(topicName, partitionId,messageKey,message) to specify a partition ID.
    2. Hashing(Locality sensitive): When you create a ProducerRecord, specify a messageKey, by calling new ProducerRecord(topicName,messageKey,message). DefaultPartitioner will use the hash of the key to ensure that all messages for the same key go to same producer. This is the easiest and most common approach.
    3. Spraying(Random Load Balancing): If you don’t want to control which partition messages go to, simply call new ProducerRecord(topicName, message) to create your ProducerRecord. In this case the partitioner will send messages to all the partitions in round-robin fashion, ensuring a balanced server load.

Assigning partitions to consumers

The Kafka server guarantees that a partition is assigned to only one consumer, thereby guaranteeing the order of message consumption. You can manually assign a partition or have it assigned automatically.

If your business logic demands more control, then you’ll need to manually assign partitions. In this case you would use KafkaConsumer.assign(<listOfPartitions>) to pass a list of partitions that each consumer was interested in to the Kakfa server.

Having partitions assigned automatically is the default and most common choice. In this case, the Kafka server will assign a partition to each consumer, and will reassign partitions to scale for new consumers.

Say you’re creating a new topic with three partitions. When you start the first consumer for the new topic, Kafka will assign all three partitions to the same consumer. If you then start a second consumer, Kafka will reassign all the partitions, assigning one partition to the first consumer and the remaining two partitions to the second consumer. If you add a third consumer, Kafka will reassign the partitions again, so that each consumer is assigned a single partition. Finally, if you start fourth and fifth consumers, then three of the consumers will have an assigned partition, but the others won’t receive any messages. If one of the initial three partitions goes down, Kafka will use the same partitioning logic to reassign that consumer’s partition to one of the additional consumers.

Being able to partition a single topic into multiple parts is one essential to Kafka’s scalability. Partitioning lets you scale your messaging infrastructure horizontally while also maintaining order within each partition. Next we’ll look at how Kafka uses message offsets to track and manage complex messaging scenarios.

 

Managing message offsets

Whenever a producer publishes a message, the Kafka server assigns an offset to that message. A consumer is able to control which messages it wants to consume by setting or resetting the message offset. When developing a consumer you have two options for managing the offset: automatic and manual.

Two types of offset

When you start a consumer in the Kafka client, it will read the value of your ConsumerConfig.AUTO_OFFSET_RESET_CONFIG(auto.offset.reset)configuration. If that config is set to earliest then the consumer will start with the smallest offset available for the topic. In its first request to Kafka, the consumer will say: give me all the messages in this partition with an offset greater than the smallest one available. It will also specify a batch size. The Kafka server will return it all the matching messages in batches of the specified size.

The consumer keeps track of the offset of the last message it has processed, so it will always request messages with an offset higher than the last offset. This setup works when a consumer is functioning normally, but what happens if the consumer crashes, or you want to stop it for maintenance? In this case you would want the consumer to remember the offset of last message processed, so that it can start with the first unprocessed message.

In order to ensure message persistence, Kafka uses two types of offset: The current offset is used to track messages consumed when the consumer is working normally. The committed offset also tracks the last message offset, but it sends that information to the Kafka server for persistent storage.

If the consumer goes down or is taken down for some reason, it can query the Kafka server for the last committed offset and resume message consumption as if no time has been lost. For its part, the Kafka broker stores this information in a topic called __consumer_offsets. This data is replicated to multiple brokers so that the broker won’t ever lose the offsets.

Committing offset data

You have a choice about how often to commit offset data. If you commit frequently, you’ll take a performance penalty. On the other hand, if the consumer does go down you will have fewer messages to reprocess and consume. Your other option is to commit less frequently (for better performance), but reprocess more messages in case of failure. In either case the consumer has two options for committing the offset:

  1. Auto commits: You can set auto.commit to true and set the auto.commit.interval.ms property with a value in milliseconds. Once you’ve enabled this, the Kafka consumer will commit the offset of the last message received in response to its poll() call. The poll() call is issued in the background at the set auto.commit.interval.ms.
  2. Manual commits: You can call a commitSync() or commitAsync() method anytime on the KafkaConsumer. When you issue the call, the consumer will take the offset of the last message received during a poll() and commit that to the Kafka server.

Three use cases for manual offsets

Let’s consider three use cases where you wouldn’t want to use Kafka’s default offset management infrastructure. Instead, you’ll manually decide what message to to start from.

  1. Start from the beginning: In this use case, you are capturing database changes in Kafka. The first record was the full record; thereafter you only get columns whose value has changed (delta of changes). In this case you always need to read all the messages in a topic from the beginning, in order to construct the full state of the record. To solve a scenario like this, you can configure the consumer to read from the beginning by calling the kafkaConsumer.seekToBeginning(topicPartition) method. Remember that by default Kafka will delete messages more than seven days old, so you need to configure log.retention.hours to a higher value for this use case.
  2. Go to the end: Now let’s say you’re building a stock recommendation application by analyzing trades in realtime. The worst case happens and your consumer application goes down. In this case, you’ve used kafkaConsumer.seekToEnd(topicPartition) to configure the offset to ignore messages that are posted during downtime. Instead, the consumer will begin processing trades that are happening from the instant that it restarts.
  3. Start at a given offset: Finally, say that you just released a new version of the producer in your production environment. After watching it produce a few messages, you realize that it is generating bad messages. You fix the producer and start it again. You don’t want your consumer to consume those bad messages, so you manually set the offset to the first good message produced, by calling kafkaConsumer.seek(topicPartition, startingOffset).

 

Consumer groups in Kafka

Traditional messaging use cases can be divided into two main types: point to point and publish-subscribe. In a point-to-point scenario, one consumer consumes one message. When a message relays a bank transaction, only one consumer should respond by updating the bank account. In a publish-subscribescenario, multiple consumers will consume a single message but respond differently to it. When a web server goes down, you want the alert to go to consumers programmed to respond in different ways.

Queue refers to a point-to-point scenario, where a message is consumed by only one consumer. Topic refers to a publish-subscribe scenario, where a message is consumed by every consumer. Kafka doesn’t define a separate API for the queue and topic use cases; instead, when you start your consumer you need to specify the ConsumerConfig.GROUP_ID_CONFIG property.

If you use the same GROUP_ID_CONFIG for more than one consumer, Kafka will assume that both of them are part of a single group, and it will deliver messages to only one of the consumers. If you start the two consumers in separate group.ids, Kafka will assume that they are not related, so each consumer will get its own copy of the message.

 

Conclusion

Early use cases for big data message systems called for batch processing, such as running a nightly ETL process or moving data from the RDBMS to a NoSQL datastore at regular intervals. In the past few years the demand for realtime processing has increased, especially for fraud detection and emergency response systems. Kafka was built for just these types of realtime scenarios.

Kafka is a great open source product but it does have some limitations; for instance you can’t query data from inside a topic before it reaches its destination, or replicate data across multiple geographically distributed clusters. You could combine MapR Streams (a commercial product) with the Kafka API for these and other more complex publish-subscribe scenarios.

 

References