Learning Kafka with Python – a deep dive into consumers and rebalancing

In the previous posts, we have already used the Python client to implement Kafka consumers. Today, we will take a closer look at the components that make up a consumer and discuss their inner workings and how they communicate with the Kafka cluster.

High level overview of the consumer

Our discussion will be based on the Kafka Python library, which seems to be loosely modeled after the Java consumer which is part of the official Apache Kafka project, so that the underlying principles are the same. These notes are based on version 2.0.1 of the library, the design might of course change in future versions (and has already changed substantially in the past).

Looking at the code, we see that roughly speaking, the consumer consists of three parts – the actual consumer in the package kafka.consumer, the coordinator which is responsible for talking to the group coordinator and assign partitions in the package kafka.coordinator and the network client in the top-level package which is used by other parts of the library as well. Broken down to the level of modules and classes, the following diagram shows the most important components of the consumer and their relations.

KafkaConsumerComponents

Let us start our discussion with the class on the left hand side of the diagram, the subscription state. This class is used to manage the topics and partitions a consumer has subscribed to as well as the positions of the consumer within these partitions. Note that these positions are not the committed offsets, but are the positions maintained locally (and in-memory) by the consumer that are used to determine the offset that the next fetch will use. Initially, there is no valid position for a newly assigned partition, and the partition is considered fetchable only once a position has been determined.

The second class which is used by the consumer is the fetcher. As the name suggests, this class is in charge for actually fetching data and offsets from the leader of a partition (here, offsets does not refer to committed offsets, but to the valid offsets, i.e. the first and last offset of a partition).

Fetching records from the partition leader typically works asynchronously. As an example, let us consider the method send_fetches. As indicated above, a partition is called fetchable if there is a valid position for it, the partition has not been paused and there are no unfetched records already present in the cache. After creating a list of all fetchable partitions, the send_fetches method then figures out the partition leader and assembles a fetch request. These requests are then sent to the respective partition leader using the client object. This operation returns a future, i.e. a handle which can be used to asynchronously track the progress of the fetch operation. Attached to this future, there is a callback operation. When the records are sent from the partition leader to the consumer, the client object will invoke this callback which will then add the returned records to a queue maintained by the fetcher. From there, it is retrieved when a consumer calls the method fetched_records.

It is in this function where the positions are actually updated, so that the position really reflects the records that have been consumed, not those which have been received by the fetcher but are still in the queue. Note that records are skipped if a partition has become unfetchable in the meantime or if the offset does not match the expected value in the original request.

The following diagram shows a simplified view of how records are fetched (some important details are skipped, for instance the deserialization that takes place when fetched records are removed from the queue and handed over to the consumer).

FetchingRecords

Coordinating group membership and partition assignments

Apart from fetching records, a core responsibility of the consumer is to manage the membership in a consumer group and to handle assigned partitions. This is done by the coordinator. The coordinator communicates with the group coordinator (which is one dedicated broker per consumer group) to trigger the addition and removal of group members and to balance partitions between group members. In addition, the coordinator is responsible for managing committed offsets.

Looking at the source code of the coordinator, we can see how the process of adding members to the group and assigning partitions works. This process, commonly referred to as rebalancing, typically starts when a consumer invokes the poll method of the coordinator. When this happens, the coordinator will first check whether it needs to join (or rejoin) the group, for instance because the consumer was just started. If yes, the processing in ensure_active_group will first prepare the join process, for instance by committing all offsets if auto-commit is enabled and calling the revoke method of all registered rebalance listeners (conceptually, when a rebalancing starts, all existing members will loose ownership of previously handled partitions and consequently stop processing records so that the group coordinator can reassign partitions freely – there is an ongoing effort known as cooperative rebalancing with the objective to change this).

We then wait until there are no more in-flight requests to the coordinator, and then send a JoinGroupRequest to the group coordinator. The group coordinator (broker) will wait until all members have handed in their requests (see below for more on the timeline) and then determine one member to be the group leader. As part of the JoinGroupResponse, every consumer will be informed about the newly elected leader. The group leader will then perform the actual assignment of partitions to group members (using a configurable assignor). Then, all group members send another request to the group coordinator, called the SyncGroupRequest. In this request, the group leader will inform the group coordinator about the defined partition assignments, and in the response to this message, the partition assignments will be distributed to all group members.

Once the SyncGroupResponse has been received, the method ensure_active_group will invoke _on_join_complete which will in turn trigger a call of the on_partitions_assigned method of all registered rebalance listeners. Note that at this point, all exceptions raised by the listener are swallowed, so exceptions should be caught and handled inside the listener.

This is all nice if our own consumer joins a group, but what happens if another consumer joins? This is where the heartbeat thread comes into play. This is a thread which is running in the background and periodically sending heartbeat messages to the group coordinator (with a frequency determined by the parameter heartbeat_interval_ms). If a rebalancing has been initiated by another member joining or leaving, the heartbeat response will have an error flag set, so that the consumer learns about the start of the rebalancing process. It then sets a flag, which will be evaluated during the next call of the coordinators poll method, which is in turn invoked from the consumers poll loop. If this flag is set, the coordinator will rejoin the group following the process outlined above.

At this point, timing is vital. If a consumer does not call the poll method for a long period of time, it might miss a rebalancing and will forcefully be removed from the group. This again will lead to errors when the consumer tries to commit offsets, which are difficult to handle and almost inevitably lead to duplicate processing. In general, a consumer should invoke the poll method on a regular basis, and there is again a parameter (max_poll_interval_ms) which determines the maximum allowed time between two subsequent invocations of this method.

Indirectly, this parameter also determines how long the group coordinator will wait for members to join the group (it is sent to the group coordinator as part of the join group request). The following diagram shows the typical sequence of events when a new member joins a group and triggers a rebalancing.

RebalancingProtocolTimeline

The consumers poll loop

After all these preparations, we are now ready to discuss the poll method of the Kafka consumer. In this method (or rather the private method _poll_once), we first use the coordinator and its poll method discussed above to verify that the consumer is part of a group and has partitions assigned and to trigger a rebalancing process if needed. Note that if a rebalancing is needed, this call will block so that it is made sure that we only reach the main part of the consumers poll method after the rebalancing is done.

Next, we will typically have to update all fetch positions. This happens in several steps.

  • call the method reset_offsets_if_needed of the fetcher. This method will check a flag to see if any offsets need to be reset. If yes, it will retrieve the valid offsets and apply the chosen offset reset strategy
  • if there are still partitions which do not have a valid position, we call the method refresh_committed_offsets_if_needed of the coordinator which will fetch the committed offsets from the group coordinator
  • Then, the method update_fetch_positions of the fetcher is invoked which will set the fetch positions of the partitions in question to the committed value

Back in _poll_once, we then check whether the fetcher has any previously obtained records still in its queue. If yes, we immediately return this data (and at the same time initiate a pre-fetch of the next records). Recall that the process of getting these queued records also triggers the update of the position. Then, new fetches are sent, and we poll the client until we either time out or obtain new records which we then return.

Summarizing, the diagram below displays the (slightly simplified) flow of events in case a consumer calls poll (where some calls indicated in the diagram are not made every time, depending on available fetch positions and committed offsets).

PollOnceCaseOne

From what we have said above, it is now clear that a rebalancing listener is always invoked from within the poll method – which also implies that you should not spend too much time in a rebalance listener and not make any blocking calls.

This completes our short summary of the processing inside the Kafka consumer. With this introduction and using the Java library and the rich comments inside the code, you should now be able to dig deeper into the bits and pieces if needed.

Learning Kafka with Python – implementing a database sink

Very often, either the source or the target of a Kafka based message queue is a classical relational database. Consuming data and using it to update a database table sounds straightforward, but poses a few challenges around reliability and delivery semantics. In this post, we look into two options to realize such an architecture.

The challenge

To illustrate the problem we are aiming to solve, let us suppose that we want to build an application that maintains an account balance. There is a front-end which acts as a Kafka producer and which a customer can use to either deposit money in the account or withdraw money. These transactions are then written into a Kafka topic, and a consumer reads from this topic and updates the balance kept in a relational datastore.

DatabaseSink

Thus the messages stored in the Kafka topic contains transactions, i.e. changes in the account balance, while the database table we need to maintain contains the actual balance. This is an example of what is called stream / table duality in the world of streaming – the event stream is the source of truth and reflects changes, the database contains the resulting state of the world after all changes have been applied and can at any time be reconstructed from the stream.

When we want to implement his pattern, the crucial part of our design will be to make sure that every message in the queue leads to only one update of the balance, so that no transaction is missed and no transaction is processed twice.

Pattern 1: using a message ID and de-duplication

The first pattern we could use to make this work is to achieve de-duplication based on a unique message ID, which ideally is an integer that is increased with every message. The consumer could then store the sequence number of the last processed message in the database, and could thus detect duplicates.

In a bit more detail, this would work as follows. When the producer processes an action, it first retrieves a unique message ID. This message ID could be created using a database sequence, or – if the used database does not allow for this – it could read a sequence number from a table, increment it by one and update the table accordingly. It would then add this sequence number as a key to the Kafka message.

The consumer would store the latest processed sequence number in the database. When it reads a mesage from Kafka, it uses this number to check whether the message has already been processed before. If not, it updates balance and sequence number in one transaction and then commits the new offset to Kafka.

DatabaseSinkPatternI

Let us see how duplicates are handled in this pattern. If, for some reason, the topic contains two messages with the same ID, the consumer will process the first message, increase the latest processed sequence number in the database and commit the new balance. It will then read the duplicate, compare its sequence number against the latest value, detect the duplicate and simply ignore it. Thus the consumer is able to do a de-duplication based on the sequence number.

Unfortunately, this simple pattern has one major disadvantage – it does not work. The problem is that the order of messages is not guaranteed across partitions. Suppose, for instance, that the producer creates the following messages:

  • Message 1, partition 0
  • Message 2, partition 1
  • Message 3, partition 0

Now it might happen that the consumer processes the messages in the order 1,3,2. Thus the consumer would, after having processed message 3, set the highest consumed sequence number to “3” in the database. When now processing message two, our simple duplicate detection algorithm would then classify this message as a duplicate. Thus, to make this work, it is vital that we store the highest processed sequence number by partition and not across all partitions. We could even create the sequence number per partition, which would also remove a possible bottleneck as creating the sequence number would otherwise effectively serialize the producers.

Also note that, as we need to maintain a “last processed” sequence number per partition in a database, we also need to maintain this table if we add new partitions to our topic or remove partitions.

Alternatively, if there is a message ID which is not ordered and increasing with each message, the consumer could store all processed message IDs in a separate database table to keep track of the messages that have already been processed (which, depending on the throughput, might require some sort of periodic cleanup to avoid that the table grows too big).

If the consumer fails after committing the changed balance to the database, but before committing the offset to Kafka, the same mechanism will kick in, as long as we commit the new balance and the updated value of the last processed message in one database transaction. Thus, duplicates can be detected, and we can therefore rely on the standard mechanisms Kafka offers to manage the offset – we could even read entire batches from the topic and use auto-commit to let Kafka manage the offset.

Let us now try out this pattern with the code samples from my repository. To be able to run this, you will have to clone my GitHub repository and follow the instructions in my initial post in this series to bring up your local Kafka cluster. Then, make sure that your current working directory is the root directory of the repository and run the following commands which will install the Python package to access a MySQL database and bring up a Docker based installation of MySQL with a prepared database.

pip3 install mysql-connector-python
./db/createDB.sh

This script will start a Docker container kafka-mysql running MySQL, add a user kafka with a default password to it and create a database kafka for which this user has all privileges. Next, let us run a second script which will (re)-create a Kafka topic transactions with two partitions and initialize the database.

./db/reset.sh

Now we run three Python scripts. The first script is the producer that we have already described, which will simply create ten records, each of which describing a transaction. The second script is our consumer. It will

  • subscribe to the transactions topic
  • Read batches of records from the topic
  • for each record, it will (in one transaction!) update the account balance and the last processed sequence number
  • commit offsets in Kafka after processing a batch
  • apply the duplicate detection mechanism outlined above while processing each record

Finally, the third script is a little helper that will (without committing any offsets, so that we can run it over and over again) scan the topic, calculate the expected account balances, retrieve actual account balances from the database and check whether they coincide.

python3 db/producer1.py
python3 db/consumer1.py
python3 db/dump1.py --check

Let us now try to understand how our scripts work if an error occurs. For that purpose, the consumer has a built-in mechanism to simulate random errors between committing to the database and committing offsets to Kafka which is activated using the parameter –error_probability. Let us repeat our test run, but this time we simulate an error with a probability of 20%.

./db/reset.sh && python3 db/producer1.py
python3 db/consumer1.py --error_probability=0.2 --verbose
python3 db/dump1.py --check

You should now see that the consumer processes a couple of messages before our simulated error kicks in, which will make the consumer stop. The check script should detect the difference resulting from the fact that not all messages have been processed. However, when we now restart the script without simulating any errors, we should see that even though there are duplicate records, these records are properly detected and eventually, all records will be processed and the balances will again be correct.

python3 db/consumer1.py --verbose
python3 db/dump1.py --check

Pattern 2: store the consumer offset in the target database

Let us now take a look at an alternative implementation (which is in fact the pattern which you will hit upon first when consulting the Kafka documentation or other sources) – maintaining the offsets in the database altogether. This implementation does not require a sequence number generated by the producer, but is therefore also not able to detect any duplicate messages in case the duplicates originate already in the producer.

The idea behind this pattern is simple. Instead of asking Kafka to maintain offsets for us, the consumer application handles offsets independently and maintains a database table containing offsets and partitions. When a record is processed, the consumer opens a database transaction, updates the account balance and updates the offset in the same transaction. This guarantees that offsets and balances are always in sync.

This sound simple enough, but there are a few subtleties that need to be kept in mind when this is combined with consumer groups, i.e. if Kafka handles partition assignments dynamically. Whenever a partition is assigned to our consumer, we need to make sure that we position the consumer at the latest committed offset before processing any records. Conversely, when an assignment is revoked, we need to commit the current offsets to make sure that they are not lost. This can be implemented using a rebalance listener which is registered when we subscribe to the topic.

DatabaseSinkPatternII

To try this out, run the following commands which will first reset the database and the involved topic, run the producer to create a few messages (this time without the additional sequence number) and then run our new consumer to read the messages and update the database.

./db/reset.sh && python3 db/producer2.py
python3 db/consumer2.py 
python3 db/dump2.py --check

The last command, which again checks the updated database table against the expected values, should again show you that expected and actual values in the database match.

It is instructive to try a few more advanced scenarios. You could, for instance, run the producer and then start a consumer which reads the first set of messages produced by the consumer (use the switch –runtime=3600 to make this consumer run for one hour). Then, start a second consumer in a separate terminal window and observe the rebalancing that occurs. Finally, run the producer again and verify that the partition assignment worked and both consumers are processing the messages in their respective partition. And again, you can simulate errors along the way and see how the consumer behaves.

Learning Kafka with Python – consuming data

We now understand how Kafka producers add data to partitions. So let us move on and take a look at consumers – how they operate, how they are configured and how different levels of reliability and delivery guarantees can be achieved.

Consumer groups

In the previous post on producers, we have seen that the interaction between a producer and a Kafka broker is rather simple. Basically, producers request metadata to obtain data on partitions and leading brokers and then send records to the partition leader. The Kafka broker does not keep track of a producers state, and producers can actually come and go without Kafka even noticing it (this is a bit different when transactions are used, as in this case, the broker needs to keep track of the producers state as well, but this is beyond the scope of this post).

For consumers, the situation is different. The main reason for this different design is that while a producer determines itself to which partition data is written, a consumer typically lets Kafka make this decision. In this programming model, Kafka distributes the available partitions to the available consumers, trying to balance the load evenly. If a new consumer appears, Kafka will assign partitions to it, and if a consumer goes down, Kafka will re-assign these partitions to one of the remaining consumers. To make this work, Kafka needs to keep track of the state of a consumer (in fact, a consumer is expected to send periodic heartbeats so that Kafka can detect when a consumer goes down, and Kafka tracks the state of consumers as part of the ZooKeeper data structures).

To better understand how this works, we first have to understand consumer groups. Logically, a consumer group is very similar to an application – it is a logical entity reading data from a topic. If, for instance, you are using Kafka to distribute instrument master data in a securities processing application, there will typically be different application components that need this data – say a trading frontend, a settlement module or a tax processing module. So each of these application components could be set up as a consumer group in Kafka, so that they all obtain records from the instrument master data topic independently, similar to the pub/sub semantics of a traditional messaging system.

To increase scalability and fault tolerance, there can be many consumers inside a consumer group, but Kafka will try to make sure that within a consumer group, every message is delivered to only one consumer.

ConsumerGroups

To achieve this, Kafka will assign partitions in a topic to consumers within a consumer group. To make sure that each message is only processed once by one consumer group, each partition can be assigned to only one consumer, but if there are more partitions than consumer, a single consumer can read from more than one partition (so scaling the number of consumers beyond the number of partitions will lead to idle consumers).

ConsumerGroupsPartitions

It is worth mentioning that this is not the only programming model supported by Kafka. Instead of letting Kafka determine the assignment of partitions to consumers, consumers can also subscribe directly to a partition and thus define their own assignment. In this case, consumers need to implement their own mechanisms to detect a change in the number of partitions or to rebalance the load if a consumer goes down. This can, however, be useful if the number of partitions is constant and lost consumers are immediately replaced by some sort of restart mechanism. If you want to take a closer look at how exactly Kafka manages the assignment of partitions to consumers in a group, take a look at this excellent blog post on the Confluent web site or this page on the Confluent Wiki.

Maintaining the offset

The next problem we have to solve is the maintenance of the offset. A traditional messaging system typically makes sure that a message is only delivered once. With Kafka, this task is left to the consumer (this is why some people refer to this model as the “dumb broker – smart consumer” model). In fact, the low level API “FETCH” call of the Kafka protocol expects that a consumer specifies the offset of the record (batch) it wants to read. So the consumer needs to know which offsets is has already processed to make sure that all records are read and that no record is processed twice.

This is not an easy task and subject to race conditions. Suppose, for instance, we decide to store the offset in a separate database, and our processing logic is (in a hopefully readable pseudo-code)

offset = db.read_offset()
while true:
  record = read_record(offset)
  process(record)
  offset = offset + 1
  db.store_offset(offset)

Now suppose that this consumer fails after processing the record, but before writing the updated offset into the database. When we now restart the consumer, it will read the old offset from the database and process the last record twice. If, conversely, we change the order and commit the new offset before processing the record, we would miss a record if the consumer dies between these two steps.

Instead of persisting the offset yourself, you can also ask Kafka to do this for you. When making use of this option, Kafka will store your offset in a dedicated topic. A consumer can either explicitly commit the offset to this topic, or can use auto-commit, which simply means that Kafka will automatically commit every few seconds (which, of course, leads to duplicate processing if this interval is, say, 5 seconds and the consumer dies 4 seconds after the last commit). In a later post, we will look into transactional writes, which even allow exactly-once delivery as long as no other data stores are involved.

Creating and using a KafkaConsumer

Let us now see how we can create and use a consumer with the Python Kafka API and how the consumer is configured.

First, we need to create a consumer object. When creating a consumer, there are three parameters that we need to provide: the topic from which we want to read data, the ID of the consumer group that the consumer is part of (which is an arbitrary string), at least if we plan to use the automatic assignment of partitions and / or we want Kafka to store offsets for us, and a list of bootstrap servers. So a code snippet creating a consumer could be as follows.

import kafka
consumer=kafka.KafkaConsumer("test", 
         group_id="my_group",
         bootstrap_servers="broker1:9092")

Once we are done with a consumer. we should always clean up again by calling consumer.close() so that the consumer can properly leave the group.

When using SSL to connect to the broker, you will again have to provide additional parameters when building the consumer, as we have done it for the producer.

As for the producer, a consumer can also be configured with custom deserializers. If, for instance, we use JSON as a serializer, as we have demonstread in the previous post on building a producer, we now need to provide a matching deserializer that converts a byte stream back into the target format used by the application. As for the producer, deserializers for keys and payloads can be supplied using the additional configuration parameters key_deserializer and value_deserializer.

As mentioned above, one option to deal with offsets is to leave the processing to Kafka and to ask Kafka to automatically commit offsets for us. This is in fact the default behavior, and controlled by the following parameters.

  • enable_auto_commit – this is a boolean flag which tells Kafka whether we want to automatically commit offsets, and defaults to true
  • auto_commit_interval_ms – this specifies the interval at which Kafka will commit offsets. The default is five seconds, which implies that in the worst case, the messages processed during the last five seconds will be consumed twice if your consumer fails shortly before a commit
  • auto_offset_reset – this parameter determines from which offset Kafka should start processing if no valid offset can be found. This clearly happens when we start the consumer for the first time, but can also happen if messages are deleted or are lost. If we set this to “earliest”, Kafka will start the processing at the first available offset. If we use “latest”, it will start processing at the end of the log, i.e. with the next message that will be added to the log. The default is “earliest”

Before we can read any data, we have to subscribe to a topic. When creating the consumer, we already refer to a topic, and in fact, the consumer will automatically subscribe to this topic. It is also possible to manually subscribe. This is typically done when you want to add a rebalance listener to be informed about changes in the set of assigned partitions. A rebalance listener is any class derived from kafka.ConsumerRebalanceListener which is passed as argument to the subscribe call. Whenever a partition assignment is made or revoked, Kafka will then call the corresponding method of the listener.

consumer.subscribe(TOPIC, 
    listener=MyConsumerRebalanceListener())

When an application wants to manually store offsets, for instance in a database, it can use this mechanism and / or the method consumer.assignment() to keep track of the records assigned to it. Note that, as explained in the source code comments of the listener class, Kafka will first invoke the on_partitions_revoked method of all listeners before calling any of the on_partitions_assigned methods. These handlers will be invoked from the polling loop, i.e. only when you pass control to the consumer by reading data from it, not in a separate thread (we will learn more about the exact mechanics of this process in a separate future post).

Now let us see how we can actually read data from a topic. The library offers two options to do this. First, we can simply invoke the poll method of the consumer object, which will return a batch of records. Alternatively, and more “pythonish”, we can treat the consumer object as an iterator and simply loop over it to get one record at at time.

Note that some methods of the consumer can block as they are waiting for responses from the server. As in general, consumers should make sure to not block outside of the polling loop, it is not advised to call the consumers methods in separate threads. My experience is that it can lead to problems if a signal handler, for instance, invokes methods of the consumer to shut down the consumer. Instead, it should only set a stop flag, while invoking all methods of the consumer object in the polling loop.

def signal_handler(signal, frame):
  stop = 1

while not stop:
  for record in consumer:
    .....
consumer.close()

Let us now discuss different options to commit offsets. We have already seen that the default is auto-commit, which implies that Kafka will commit automatically every 5 seconds. When using this option, we can guarantee that all messages will be read at least once, but need to be prepared to receive messages more than once. If we need full control over the process of committing offsets, we need to disable auto-commit by setting enable_auto_commit to false.

At this point, it is important to remember that the Kafka client requests data from the broker in batches. If we ask the client to commit the offset, it will commit the entire batch. It therefore does not make sense to commit once during every loop iteration of the pseudo-code above, but once at the end of the batch. As the iterator interface of the consumer object makes it difficult to determine when a batch has ended, it is easier to use the poll method of the consumer. This method returns a dictionary, where the keys are TopicPartition objects, i.e. named tuples describing a combination of topic and partition, and arrays of records.

When using manual commits, we again have several choices. First, we can commit after every record. In this way, we will have at most one duplicate in case of an error, but create an additional overhead and reduce our throughput significantly. Alternatively, we can use a batch size greater than one and commit after each batch. This will be more efficient, but if a the processing fails in the middle of the batch, we will re-read the first few records in the batch when we restart and thus process records twice.

Trying it out

Let us now see how this works in practice. If you have cloned my GitHub repository and installed Kafka as described in my previous post, you are ready to run some examples that are part of the repository and located in the python subdirectory. First let us delete and re-create the topic that we have already used for our producer tests by running the following commands on the lab PC (after changing to the repository root directory)

./kafka/bin/kafka-topics.sh \
  --bootstrap-server=$(python/getBrokerURL.py) \
  --command-config ./.state/client_ssl_config.properties \
  --topic test \
  --delete
./kafka/bin/kafka-topics.sh \
  --bootstrap-server=$(python/getBrokerURL.py) \
  --command-config ./.state/client_ssl_config.properties \
  --topic test \
  --create \
  --partitions 2 \
  --replication-factor 3

Next, we can create 10 messages in this topic by running the producer that we have already used in the previous post.

python3 python/producer.py --create_keys

We can now run our consumer to read the messages that we have just written. To do this, simply enter

python3 python/consumer.py 

Looking at the output, we see that the first attempt to poll triggers a partition reassignment. First, the coordinator will revoke the existing group assignments for all group members. Then it will assign the existing two partitions to our consumer, as this is the only consumer in the group, so that our listener is called. As this is the first read, there are no committed offsets yet, and as we have set auto_offset_reset to “earliest”, we start our read at position zero (the first offset).

We now start to read records from the log. In a second terminal window, we can inspect the currently stored offsets.

./kafka/bin/kafka-consumer-groups.sh \
  --bootstrap-server=$(python/getBrokerURL.py) \
  --command-config ./.state/client_ssl_config.properties \
  --group test-group \
  --describe

We should now see that Kafka has assigned our consumer to the two partitions and has recorded the updated offsets. As we have read every record once, the offsets should now be identical to the last read position. If you run this command quickly after starting the consumer, you should even be able to see that the automated commit only takes place after a couple of seconds.

When you now stop the consumer by hitting Ctrl-C and run it again, it will not print any new records, as it will restart at the committed offsets. To re-read our messages, we will have to reset the offsets. There are two ways to do this. You can either run

./kafka/bin/kafka-consumer-groups.sh \
  --bootstrap-server=$(python/getBrokerURL.py) \
  --command-config ./.state/client_ssl_config.properties \
  --group test-group \
  --topic test \
  --reset-offsets \
  --to-earliest \
  --execute 

or use our consumer, which has a switch –reset instructing it to only reset the offsets without reading any records. In both cases, we should now be ready for another test. This time, we disable auto-commit and use manual commits.

python3 python/consumer.py --disable_auto_commit

You should now see that the messages are processed once again, and that the offsets will again be committed, though this is triggered by our explicit calls to consumer.commit() this time.

Next, let us try what happens if we do not commit any offsets at all. Our test client supports this by setting the flag –no_commit

python3 python/consumer.py --reset
python3 python/consumer.py --no_commit
python3 python/consumer.py --no_commit

As expected, the second and third invocation both return the full set of data, as the offsets are never committed and the third invocation therefore starts at the same point at which the second invocation started.

Finally, it is instructive to see how several consumer interact. To set this, first reset all offsets again. Then, open a second terminal, start the consumer in the first terminal and then start a second consumer in the second window. The output should show you that

  • Initially, the first consumer will start to process both partitions
  • When the second consumer is started, the partitions will be revoked, and the corresponding listener is called for both consumers
  • Then, each of the two partitions will be assigned to one of the two consumers

Both consumers should now wait for data on their respective partition. If you now run the producer again to generate ten additional messages, you should nicely see that both consumers receive messages for their respective partitions in parallel.

This completes our discussion of consumers for the time being. There are a couple of points that we have not yet explored (like the manual assignment of partitions, different options for timeouts, the heartbeat thread which periodically sends a hearbeat to the Kafka group coordinator or consumers without consumer groups), but most of this is readily accessible in the Kafka documentation. In the next post, we will look at some patterns to read data from a Kafka topic and use it to maintain state in a relational database.

Learning Kafka with Python – producing data

As proud owners of a brand new Kafka installation, we are now ready to explore how applications interact with Kafka. Today, we will look at producers and understand how they write data to Kafka.

Basic design considerations

At first glance, writing data to Kafka sounds easy – connect to a Kafka broker and submit a message. However, there are some basic design considerations that are relevant when building a Kafka producer.

First, we have seen that Kafka stores the data in a topic in multiple partitions, and then each partition has a leader which is responsible for writing messages into the partition. Thus a producer needs to determine to which partition a message should be written and contact the responsible leader for this partition.

Defining the mapping of messages to partitions can be crucial for reliability and scalability of your application. Partitions determine how the application can scale horizontally, and we will learn later that partitions also determine to which extent consumers can scale. In addition, Kafka guarantees message order only within a partition. Specifically, if message A is written to partition before message B is written to the same partition, message A will receive a lower offset than B and will be read first by a (well behaving) consumer. This is no longer true if messages A and B are written to different partitions. Think of partitions as lanes on a highway – there is no guarantee that two cars entering the highway in a certain order but in different lanes will arrive at the destination in the same order.

Often, you will want to use a business entity to partition your data. If you are building a customer facing application, you might want to partition your data by customer group, if you are building a securities processing application the financial instrument might be a good partition criterion, if you are maintaining accounts then the account number might be a good choice and so forth. In other cases, where ordering is not important, you might go for a purely technical criterion.

The next fundamental question we have to figure out is when a message is considered to be successfully written. When the broker has received it? Or when the leader has written the message? Or should we wait until all followers have successfully stored the message? And what if a follower lags behind – should we stop writing messages until the follower has recovered or move on, accepting that we have lost one follower without knowing whether it will recover at a later time?

Kafka does not give a definitive answer to all these questions, but leaves you a choice – put differently, when you create a producer, you can specify its behavior using a variety of options. So let us now see how this is done in Python.

The producer object

Let us now see how a producer can be created using Python. If you have not yet done so, please install the Kafka Python library to be able to run the examples.

pip3 install kafka-python

This series uses version 2.0.1 of the library, if you want to use exactly that version you need to specify that as usual, i.e. run

pip3 install kafka-python==2.0.1

To send messages to Kafka, the first thing we need to do is to create a producer object, i.e. an instance of the class kafka.Producer. The init-method of this class accepts a large number of arguments, but in the most straightforward case, there is exactly one argument bootstrap_servers. This argument is a list of listener URLs, for instance 10.100.0.11:9092, which the producer will use to make an initial connection to a Kafka broker. This list does not need to contain all brokers, in fact one entry will do, but the producer will use this broker to obtain other brokers if needed. It is a good idea to list at least two or three brokers here, in case one broker is temporarily unavailable. So creating a producer could look like this.

import kafka
producer=kafka.KafkaProducer(bootstrap_servers=["broker1:9092", "broker2:9092"])

When started, a Producer will create a separate sender thread which will asynchronously send messages to the brokers. In addition, it will create an internal client which holds the actual connections to the Kafka cluster.

When using an SSL listener, we need a few additional configuration items. Specifically, we need to add the following named parameter when creating a KafkaProducer

  • ssl_cafile – this is the location of a CA certificate that the client will use to verify the certificate presented by the server
  • ssl_certfile – this is the location of the client certificate that the client will in turn present to the server when the server requests a certificate
  • ssl_keyfile – the key matching the client certificate

In order to be bit more flexible when it comes to connecting to different setups, the code examples that we will use in this series read the list of brokers and the SSL configuration from a YAML file config.yaml that the installation script will create in the subdirectory .state of the repository directory. All test scripts accept a parameter –config that you can use to overwrite this default location, in case you want to use your own configuration.

Once we are done using a producer, we should close it using producer.close() to clean up.

Keys and partitions

Once we have a producer in our hands, we can actually start to send messages. This requires only two parameters: the topic (a string) and the payload of the message (a sequence of bytes).

producer.send("test",value=bytes("hello", "utf-8"))

Note that this will create a topic “test” if it does not exist yet, using default values specified in the server configuration (server.properties), so be careful to use this as the default configuration might not be what you want (you can also turn this feature off by setting auto.create.topics.enable to false in server.properties).

Now you might remember from my previous post that a record in Kafka actually consists of a payload and a key. Here, we do not specify a key, so the key will remain empty. But of course, you can define a key for your record by simply adding the named parameter key to the method invocation, like this.

producer.send("test",
        value=bytes("hello", "utf-8"),
        key=bytes("mykey", "utf-8"))

What about partitions? The low-level protocol that a Kafka broker understands expects the client to send a PRODUCE request containing a valid partition ID, so it is up to the client to take this decision. The application programmer can either decide to explictly specify a partition ID (an integer) as an optional parameter to the send method, or let the framework take the decision. In this case, a so-called partitioner is invoked which, based on the value of the key, selects a partition to write to.

An application can set the configuration item partitioner when creating a producer to define a customer partitioner (which is simply a callable object that the producer will invoke). If no partitioner is specified, the default partitioner will be used, which implements the following logic.

  • If no key is given, the default partitioner will simply distribute the messages randomly across the available partitions
  • If a key is provided, a hash value of the key will be computed (using a so-called MurmurHash, which will always be an integer. The value of this hash (more precisely, of its last 31 bits) modulo the number of partitions will then determine the partition to use

The important thing to keep in mind is that if you do not provide a key, your message will end up in a random partition. If you do provide a key, then Kafka will guarantee that messages with the same key will go to the same partition and hence be processed in order.

Serialization

In our examples so far, we have passed a sequence of bytes to the send method, both for the key and the value. This is the format that the low-level protocol expects – at the end of the day, keys and values are sent over the wire as a sequence of bytes, and stored as a sequence of bytes.

In many applications, however, you will want to store more complex data types, like JSON data or even objects. So be able to do this with Kafka, you will have to convert your data into a sequence of bytes when sending the data, a process known as serializing.

When creating the producer, you can specify your own serializers for keys and payloads by adding the named parameters key_serializer and value_serializer when creating the producer. Here, a serializer can either be a function which accepts whatever input format you prefer and returns a sequence of bytes, or an instance of the class kafka.serializer.Serializer which has a serialize method which the framework will invoke.

Suppose for instance you wanted to serialize JSON data. Then, you need to provide a serializer which accepts a JSON object and returns a sequence of bytes. For that purpose, we can use the standard json.dumps method to first produce a string, and then encode the string using e.g. UTF-8 to obtain a sequence of bytes. Thus your serializer would look something like

def serialize(data):
    return bytes(json.dumps(data), "utf-8")

and when creating the producer, the call would be something like

producer=kafka.KafkaProducer(..., 
   value_serializer=serialize, ...)

Choosing a reasonable serializer is an important design choice. As Kafka topics are designed to be durable objects, you need to think about things like versioning when the decoding changes as you release new features, and obviously all components of a system need to use matching serializers and de-serializers to be able to exchange data. Many Kafka projects actually use third-party serializers, like Apache Avro or Google’s protobuf.

Acknowledgements

So far, we have seen how we can send messages using the send method of a KafkaProducer object. But in reality, you of course want to know whether your message was successfully send and stored by Kafka.

This leads us to the question at which point a new record can be considered to be committed to a Kafka cluster, i.e. stored and available for consumers. Before getting into this, however, we first have to understand the notion of an in-sync replica.

Recall that Kafka replication works by designating a leader for a partition and zero, one or more followers which constantly ask the leader for new records in the partition and store them in their own copy of the partition log. As the replicas read the records from the leader, the leaders knows which record has been delivered to which follower. The partition can therefore determine whether a replica is out-of-sync, which happens if a follower fails to retrieve the latest message within a defined time frame, or in-sync.

Having enough in-sync replicas is vital for the reliability. If a leader goes down, Kafka has to elect a new leader from the set of available replicas. Of course, choosing an out-of-sync replica to be the new leader would imply that we promote a replica to the master and thus to our new source of truth that not yet replicated all messages that producers have sent to the leader. Thus, making such a replica the new leader results in a loss of records. In some situations, you might still opt to do so, which Kafka allows you if the parameter unclean.leader.election.enable is set to True in the broker configuration.

InSyncOutOfSync

Now let us come back to the question of when a message sent by a producer will be considered committed. Again, Kafka offers you a choice, governed by the value of the parameter acks of a producer.

  • When acks = 1 (the default), a message will be considered committed once the leader acknowledges the message. Thus Kafka guarantees that a committed message has been added to the leading partition, but not that it has already been written to one or even all replicas. Note that, as the leader might cache the record in memory, this can lead to data loss if the leader goes down after acknowledging the message, but before a follower has copied it
  • When acks = -1 (all), a record will be considered committed only once the leader and in addition all in-sync replicas have acknowledged receipt of the message. As long as you have enough in-sync replicas, this gives you a strong guarantee that the message is available on several nodes and thus data loss has become very unlikely
  • Finally, a value of acks = 0 means that the message will be considered committed once it has been sent over the network, regardless of any acknowledgement from the leader or a follower. This obviously is a very weak guarantee and only reasonable if you have a strong focus on throughtput and can live with a loss of (potentially many) records

When usings acks = all, the number of in-sync replicas is of course vital. To illustrate this, let us assume that you have configured a topic with three replicas, but both followers have become out-of-sync. Now a message will be committed once the leader has written it, meaning that if the leader is lost, the record will be lost as well. To avoid such a scenario, you can set the server property min.insync.replicas. This number determines how many replicas (including the leader) need to be in-sync in order to still accept new messages. Thus if you use, for instance, a topic with a replication factor of three and min.insync.replicas=2 in combinations with acks=-1, then Kafka will guarantee that a message is only reported as committed once the leader and at least one follower have received the record.

Acks

Finally, there is one more parameter that is important for the reliability of a producer – max_in_flight_requests_per_connection. A request (which typically contains more than one record to be written) will be considered as in-flight as long as no result – either an acknowledgement or an error – has been received from the broker. If this parameter (which defaults to 5!) is set to a value greater than one, this implies that the producer will not wait for an acknowledgement before sending the next batch. In combination with retries, this can imply that the order in which messages are added to the log is not identical to the order in which they have been sent, and if the producer goes down, in-flight messages night be lost if the broker is not able to process them. Thus set this to one if you need strong guarantees on at-least-one delivery and ordering.

Retries and error handling

Finally, the last important design decision that you need to take when writing a producer is how to deal with errors.

The send method that we are using delivers messages asynchronously to the actual sender and immediately returns. To figure out whether the record was successfully committed, we therefore cannot simply use its return value, but need a different approach. Therefore, the send method returns a handler which can later be used to retrieve the status of the message, a so called Future, or, more precisely, a subclass called FutureRecordMetadata. Once we have this object, we can call its get method with a timeout in seconds to wait for the request to complete. If the request was successful, this method returns a dictionary containing record metadata, otherwise it raises an exception of type kafka.errors.KafkaError. Alternatively, you can also specify callback functions for successful and failed sends.

Note that the producer also has an option to automatically retry failed messages, which can be configured by setting the parameter retries to a value different from zero. In general, however, you should be careful with this as it might conflict with ordering, see the discussion of in-flight requests above.

Testing our producer

After all this theory, it is now time to test our producer. I assume that you have followed my previous post and installed Kafka in three virtual nodes on your PC. Now navigate to the root of the repository and run the following command to create a test topic.

./kafka/bin/kafka-topics.sh \
  --bootstrap-server=$(./python/getBrokerURL.py) \
  --command-config=.state/client_ssl_config.properties \
  --create \
  --topic test \
  --replication-factor 3 \
  --partitions 2 

Let us see what this command is doing. The script that we run, kafka-topics.sh, is part of the standard Kafka admin command line tools that are bundled with the distribution. In the second line, we invoke a little Python script that evaluates the configuration in YAML format which our installation procedure has created to determine the URL of a broker, which we then pass to the script. In the third line, we provide a Java properties file containing the SSL parameters to connect to our secured listener.

The remaining switches instruct the tool to create a new topic called “test” with a replication factor of three and two partitions (which, of course, will fail if you have already created this topic in the previous post).

Next, we will run another tool coming with Kafka – the console consumer. This is a simple consumer that will simply subscribe to a topic and dump all records in this topic to the console. To run it, enter

kafka/bin/kafka-console-consumer.sh   \
   --bootstrap-server $(./python/getBrokerURL.py)   \
   --consumer.config .state/client_ssl_config.properties \
   --from-beginning \
   --topic test 

Now open an additional terminal, navigate to the root of the repository and run the producer.

python3 python/producer.py

This should print the producer configuration used and the number of messages produced, plus timestamps and the number of seconds and microseconds it took to send all messages. In the first terminal window, in which the consumer is running, you should then see this messages flicker by.

Now let us try out a few things. First, let us create 10000 messages with a set of configurations promising the highest throughput (acks=0, fully asynchronous send, no keys provided, five requests in flight).

python3 python/producer.py \
  --messages=10000 \
  --ack=0

On my PC, producing these 10000 messages takes roughly half a second, i.e. our throughput is somewhere around 20.000 messages per seconds, without any tuning (of course the results will heavily depend on the machine on which you are running this). Next, we produce again 10000 messages, but this time, we use very conservative settings (acks=all, only one message in flight, wait for reply after each request, create and store keys and use them to determine the partition).

python3 python/producer.py \
  --messages=10000 \
  --ack=-1 \
  --max_in_flight_requests_per_connection=1 \
  --wait \
  --create_keys

Obviously, this will be much slower. On my PC, this took roughly 17 seconds, i.e. it is slower by a factor of about 25 than the first run. This hopefully illustrates nicely that Kafka leaves you many choices for trade-offs between performance and availability. Use this freedom with care and make sure you understand the consequences that the various settings have, otherwise you might loose data!

Putting it all together – the send method behind the scenes

Having seen a producer in action, it is instructive to take at a short look at the source code of the Python implementation of KafkaProducer, specifically at its send method. First, the partitions of the topic are retrieved, either from cached metadata or by requesting updated metadata from the server. Then, the key and value are serialized, and the partitioner is invoked which determines the partition to which we write the record.

Next, instead of directly sending the record to the broker, it is appended to an internal buffer using an internal helper class called a RecordAccumulator. If the accumulator signals that the buffer is full, the sender thread is triggered which will then actually transmit the entire batch to the Kafka broker. Finally, the future object is returned.

SendingData

This completes our discussion of Kafka producers. In the next post, we will learn how we can consume data from Kafka and how consumers, producers and brokers play together.