Learning Kafka with Python – consuming data

We now understand how Kafka producers add data to partitions. So let us move on and take a look at consumers – how they operate, how they are configured and how different levels of reliability and delivery guarantees can be achieved.

Consumer groups

In the previous post on producers, we have seen that the interaction between a producer and a Kafka broker is rather simple. Basically, producers request metadata to obtain data on partitions and leading brokers and then send records to the partition leader. The Kafka broker does not keep track of a producers state, and producers can actually come and go without Kafka even noticing it (this is a bit different when transactions are used, as in this case, the broker needs to keep track of the producers state as well, but this is beyond the scope of this post).

For consumers, the situation is different. The main reason for this different design is that while a producer determines itself to which partition data is written, a consumer typically lets Kafka make this decision. In this programming model, Kafka distributes the available partitions to the available consumers, trying to balance the load evenly. If a new consumer appears, Kafka will assign partitions to it, and if a consumer goes down, Kafka will re-assign these partitions to one of the remaining consumers. To make this work, Kafka needs to keep track of the state of a consumer (in fact, a consumer is expected to send periodic heartbeats so that Kafka can detect when a consumer goes down, and Kafka tracks the state of consumers as part of the ZooKeeper data structures).

To better understand how this works, we first have to understand consumer groups. Logically, a consumer group is very similar to an application – it is a logical entity reading data from a topic. If, for instance, you are using Kafka to distribute instrument master data in a securities processing application, there will typically be different application components that need this data – say a trading frontend, a settlement module or a tax processing module. So each of these application components could be set up as a consumer group in Kafka, so that they all obtain records from the instrument master data topic independently, similar to the pub/sub semantics of a traditional messaging system.

To increase scalability and fault tolerance, there can be many consumers inside a consumer group, but Kafka will try to make sure that within a consumer group, every message is delivered to only one consumer.

ConsumerGroups

To achieve this, Kafka will assign partitions in a topic to consumers within a consumer group. To make sure that each message is only processed once by one consumer group, each partition can be assigned to only one consumer, but if there are more partitions than consumer, a single consumer can read from more than one partition (so scaling the number of consumers beyond the number of partitions will lead to idle consumers).

ConsumerGroupsPartitions

It is worth mentioning that this is not the only programming model supported by Kafka. Instead of letting Kafka determine the assignment of partitions to consumers, consumers can also subscribe directly to a partition and thus define their own assignment. In this case, consumers need to implement their own mechanisms to detect a change in the number of partitions or to rebalance the load if a consumer goes down. This can, however, be useful if the number of partitions is constant and lost consumers are immediately replaced by some sort of restart mechanism. If you want to take a closer look at how exactly Kafka manages the assignment of partitions to consumers in a group, take a look at this excellent blog post on the Confluent web site or this page on the Confluent Wiki.

Maintaining the offset

The next problem we have to solve is the maintenance of the offset. A traditional messaging system typically makes sure that a message is only delivered once. With Kafka, this task is left to the consumer (this is why some people refer to this model as the “dumb broker – smart consumer” model). In fact, the low level API “FETCH” call of the Kafka protocol expects that a consumer specifies the offset of the record (batch) it wants to read. So the consumer needs to know which offsets is has already processed to make sure that all records are read and that no record is processed twice.

This is not an easy task and subject to race conditions. Suppose, for instance, we decide to store the offset in a separate database, and our processing logic is (in a hopefully readable pseudo-code)

offset = db.read_offset()
while true:
  record = read_record(offset)
  process(record)
  offset = offset + 1
  db.store_offset(offset)

Now suppose that this consumer fails after processing the record, but before writing the updated offset into the database. When we now restart the consumer, it will read the old offset from the database and process the last record twice. If, conversely, we change the order and commit the new offset before processing the record, we would miss a record if the consumer dies between these two steps.

Instead of persisting the offset yourself, you can also ask Kafka to do this for you. When making use of this option, Kafka will store your offset in a dedicated topic. A consumer can either explicitly commit the offset to this topic, or can use auto-commit, which simply means that Kafka will automatically commit every few seconds (which, of course, leads to duplicate processing if this interval is, say, 5 seconds and the consumer dies 4 seconds after the last commit). In a later post, we will look into transactional writes, which even allow exactly-once delivery as long as no other data stores are involved.

Creating and using a KafkaConsumer

Let us now see how we can create and use a consumer with the Python Kafka API and how the consumer is configured.

First, we need to create a consumer object. When creating a consumer, there are three parameters that we need to provide: the topic from which we want to read data, the ID of the consumer group that the consumer is part of (which is an arbitrary string), at least if we plan to use the automatic assignment of partitions and / or we want Kafka to store offsets for us, and a list of bootstrap servers. So a code snippet creating a consumer could be as follows.

import kafka
consumer=kafka.KafkaConsumer("test", 
         group_id="my_group",
         bootstrap_servers="broker1:9092")

Once we are done with a consumer. we should always clean up again by calling consumer.close() so that the consumer can properly leave the group.

When using SSL to connect to the broker, you will again have to provide additional parameters when building the consumer, as we have done it for the producer.

As for the producer, a consumer can also be configured with custom deserializers. If, for instance, we use JSON as a serializer, as we have demonstread in the previous post on building a producer, we now need to provide a matching deserializer that converts a byte stream back into the target format used by the application. As for the producer, deserializers for keys and payloads can be supplied using the additional configuration parameters key_deserializer and value_deserializer.

As mentioned above, one option to deal with offsets is to leave the processing to Kafka and to ask Kafka to automatically commit offsets for us. This is in fact the default behavior, and controlled by the following parameters.

  • enable_auto_commit – this is a boolean flag which tells Kafka whether we want to automatically commit offsets, and defaults to true
  • auto_commit_interval_ms – this specifies the interval at which Kafka will commit offsets. The default is five seconds, which implies that in the worst case, the messages processed during the last five seconds will be consumed twice if your consumer fails shortly before a commit
  • auto_offset_reset – this parameter determines from which offset Kafka should start processing if no valid offset can be found. This clearly happens when we start the consumer for the first time, but can also happen if messages are deleted or are lost. If we set this to “earliest”, Kafka will start the processing at the first available offset. If we use “latest”, it will start processing at the end of the log, i.e. with the next message that will be added to the log. The default is “earliest”

Before we can read any data, we have to subscribe to a topic. When creating the consumer, we already refer to a topic, and in fact, the consumer will automatically subscribe to this topic. It is also possible to manually subscribe. This is typically done when you want to add a rebalance listener to be informed about changes in the set of assigned partitions. A rebalance listener is any class derived from kafka.ConsumerRebalanceListener which is passed as argument to the subscribe call. Whenever a partition assignment is made or revoked, Kafka will then call the corresponding method of the listener.

consumer.subscribe(TOPIC, 
    listener=MyConsumerRebalanceListener())

When an application wants to manually store offsets, for instance in a database, it can use this mechanism and / or the method consumer.assignment() to keep track of the records assigned to it. Note that, as explained in the source code comments of the listener class, Kafka will first invoke the on_partitions_revoked method of all listeners before calling any of the on_partitions_assigned methods. These handlers will be invoked from the polling loop, i.e. only when you pass control to the consumer by reading data from it, not in a separate thread (we will learn more about the exact mechanics of this process in a separate future post).

Now let us see how we can actually read data from a topic. The library offers two options to do this. First, we can simply invoke the poll method of the consumer object, which will return a batch of records. Alternatively, and more “pythonish”, we can treat the consumer object as an iterator and simply loop over it to get one record at at time.

Note that some methods of the consumer can block as they are waiting for responses from the server. As in general, consumers should make sure to not block outside of the polling loop, it is not advised to call the consumers methods in separate threads. My experience is that it can lead to problems if a signal handler, for instance, invokes methods of the consumer to shut down the consumer. Instead, it should only set a stop flag, while invoking all methods of the consumer object in the polling loop.

def signal_handler(signal, frame):
  stop = 1

while not stop:
  for record in consumer:
    .....
consumer.close()

Let us now discuss different options to commit offsets. We have already seen that the default is auto-commit, which implies that Kafka will commit automatically every 5 seconds. When using this option, we can guarantee that all messages will be read at least once, but need to be prepared to receive messages more than once. If we need full control over the process of committing offsets, we need to disable auto-commit by setting enable_auto_commit to false.

At this point, it is important to remember that the Kafka client requests data from the broker in batches. If we ask the client to commit the offset, it will commit the entire batch. It therefore does not make sense to commit once during every loop iteration of the pseudo-code above, but once at the end of the batch. As the iterator interface of the consumer object makes it difficult to determine when a batch has ended, it is easier to use the poll method of the consumer. This method returns a dictionary, where the keys are TopicPartition objects, i.e. named tuples describing a combination of topic and partition, and arrays of records.

When using manual commits, we again have several choices. First, we can commit after every record. In this way, we will have at most one duplicate in case of an error, but create an additional overhead and reduce our throughput significantly. Alternatively, we can use a batch size greater than one and commit after each batch. This will be more efficient, but if a the processing fails in the middle of the batch, we will re-read the first few records in the batch when we restart and thus process records twice.

Trying it out

Let us now see how this works in practice. If you have cloned my GitHub repository and installed Kafka as described in my previous post, you are ready to run some examples that are part of the repository and located in the python subdirectory. First let us delete and re-create the topic that we have already used for our producer tests by running the following commands on the lab PC (after changing to the repository root directory)

./kafka/bin/kafka-topics.sh \
  --bootstrap-server=$(python/getBrokerURL.py) \
  --command-config ./.state/client_ssl_config.properties \
  --topic test \
  --delete
./kafka/bin/kafka-topics.sh \
  --bootstrap-server=$(python/getBrokerURL.py) \
  --command-config ./.state/client_ssl_config.properties \
  --topic test \
  --create \
  --partitions 2 \
  --replication-factor 3

Next, we can create 10 messages in this topic by running the producer that we have already used in the previous post.

python3 python/producer.py --create_keys

We can now run our consumer to read the messages that we have just written. To do this, simply enter

python3 python/consumer.py 

Looking at the output, we see that the first attempt to poll triggers a partition reassignment. First, the coordinator will revoke the existing group assignments for all group members. Then it will assign the existing two partitions to our consumer, as this is the only consumer in the group, so that our listener is called. As this is the first read, there are no committed offsets yet, and as we have set auto_offset_reset to “earliest”, we start our read at position zero (the first offset).

We now start to read records from the log. In a second terminal window, we can inspect the currently stored offsets.

./kafka/bin/kafka-consumer-groups.sh \
  --bootstrap-server=$(python/getBrokerURL.py) \
  --command-config ./.state/client_ssl_config.properties \
  --group test-group \
  --describe

We should now see that Kafka has assigned our consumer to the two partitions and has recorded the updated offsets. As we have read every record once, the offsets should now be identical to the last read position. If you run this command quickly after starting the consumer, you should even be able to see that the automated commit only takes place after a couple of seconds.

When you now stop the consumer by hitting Ctrl-C and run it again, it will not print any new records, as it will restart at the committed offsets. To re-read our messages, we will have to reset the offsets. There are two ways to do this. You can either run

./kafka/bin/kafka-consumer-groups.sh \
  --bootstrap-server=$(python/getBrokerURL.py) \
  --command-config ./.state/client_ssl_config.properties \
  --group test-group \
  --topic test \
  --reset-offsets \
  --to-earliest \
  --execute 

or use our consumer, which has a switch –reset instructing it to only reset the offsets without reading any records. In both cases, we should now be ready for another test. This time, we disable auto-commit and use manual commits.

python3 python/consumer.py --disable_auto_commit

You should now see that the messages are processed once again, and that the offsets will again be committed, though this is triggered by our explicit calls to consumer.commit() this time.

Next, let us try what happens if we do not commit any offsets at all. Our test client supports this by setting the flag –no_commit

python3 python/consumer.py --reset
python3 python/consumer.py --no_commit
python3 python/consumer.py --no_commit

As expected, the second and third invocation both return the full set of data, as the offsets are never committed and the third invocation therefore starts at the same point at which the second invocation started.

Finally, it is instructive to see how several consumer interact. To set this, first reset all offsets again. Then, open a second terminal, start the consumer in the first terminal and then start a second consumer in the second window. The output should show you that

  • Initially, the first consumer will start to process both partitions
  • When the second consumer is started, the partitions will be revoked, and the corresponding listener is called for both consumers
  • Then, each of the two partitions will be assigned to one of the two consumers

Both consumers should now wait for data on their respective partition. If you now run the producer again to generate ten additional messages, you should nicely see that both consumers receive messages for their respective partitions in parallel.

This completes our discussion of consumers for the time being. There are a couple of points that we have not yet explored (like the manual assignment of partitions, different options for timeouts, the heartbeat thread which periodically sends a hearbeat to the Kafka group coordinator or consumers without consumer groups), but most of this is readily accessible in the Kafka documentation. In the next post, we will look at some patterns to read data from a Kafka topic and use it to maintain state in a relational database.

2 Comments

  1. Anonymous says:

    Thanks for the post !! finally found a post that shows how to reset the commits and use poll the proper way. Thanks !!

    Like

Leave a Comment

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s