Learning Kafka with Python – retries and idempotent writes

In the past few posts, we have discussed approaches to implement at-least-once processing on the consumer side, i.e. mechanisms that make sure that every record in the partition is only processed once. Today, we will look at a similar problem on the producer side – how can we make sure that every record is written into the partition only once? This sounds easy, but can be tricky if we need to retry failed message without knowing the exact error that has occured.

The retry problem

In the sample producer that we have looked at in a previous post, we missed an important point – error handling. The most important error that a reliable producer needs to handle is an error when handing over a new record to the broker.

In general, Kafka differentiates between retriable errors, i.e. transient errors like individual packets being lost on the network, and non-retriable errors, i.e. errors like an invalid authorization for which a retry does not make sense. For most transient errors, the client will – under the hood – automatically attempt a retry if a record could not be sent.

Let us take a short look at the Java producer as an example. When a batch of records has been sent to the broker as a ProduceRequest, the response is handled in the method handleProduceResponse. Here, a decision is made whether an automatic retry should be initiated, in which case the batch of records will simply be added to the queue of batches to be sent again. The logic to decide when a retry should be attempted is contained in the method canretry, and in the absence of transactions (see the last section of this post), it will decide to retry if the batch has not timed-out yet (i.e. has been created more than delivery.timeout.ms before), the error is retriable and the number of allowed retries (set via the parameter retries) has not yet been reached. Examples for retriable exceptions are exceptions due to a low number of in-sync replicas, timeouts, connection failures and so forth.

This is nice, but there is a significant problem when using automated retries. If, for instance, a produce request times out, it might very well be that this is only due to a network issue and in the background, the broker has actually stored the record in the partition log. If we retry, we will simply send the same batch of records again, which could lead to duplicate records in the partition log. As these records will have different offsets, there is no way for a consumer to detect this duplicate. Depending on the type of application, this can be a major issue.

If you wanted to solve this on the application level, you would probably set retries to zero, implement your own retry logic and use a sequence number to allow the consumer to detect duplicates. A similar logic referred to as idempotent writes has been added to Kafka with KIP-98 which was implemented in release 0.11 in 2016.

What are idempotent writes?

Essentially, idempotent writes use a sequence number which is added to each record by the producer to allow the broker to detect duplicates due to automated retries. This sequence number is added to a record shortly before it is sent (more precisely, a batch of records receives a base sequence number, and the sequence number of a record is the base sequence number plus its index in the batch), and if an automated retry is made, the exact same batch with the same sequence number is sent again. The broker keeps track of the highest sequence number received, and will not store any records with a sequence number smaller than or equal to the currently highest processed sequence number.

To allow all followers to maintain this information as well, the sequence number is actually added to the partition log and therefore made available to all followers replicating the partitions, so that this data survives the election of a new partition leader.

In a bit more detail, the implementation is slightly more complicated than this. First, it would imply a high overhead to maintain a globally unique sequence number across all producers and partitions. Instead, the sequence number is maintained per producer and per partition. To make this work, producers will be assigned a unique ID called the producer ID. In fact, when a producer that uses idempotent writes starts, it will send an InitPidRequest to the broker. The broker will then assign a producer ID and return it in the response. The producer stores the producer ID in memory and adds it to all records being sent, so that the broker knows from which producer a record originates. Similar to the sequence number, this information is added to the records in the partition log. Note, however, that neither the producer ID nor the sequence number are passed to a consumer by the consumer API.

How does the broker determine the producer ID to be assigned? This depends on whether idempotent writes are used in combination with transactions. If transactions are used, we will learn in the next post that applications need to define an ID called transaction ID that is supposed to uniquely identify a producer. In this case, the broker will assign a producer ID to each transaction ID, so that the producer ID is effectively persisted across restarts. If, however, idempotent writes are used stand-alone, the broker uses a ZooKeeper sequence to assign a sequence number, and if a producer is either restarted or (for instance due to some programming error) sends another InitPidRequest, it will receive a new producer ID. For each new partition assigned to a producer not using transactions, the sequence number will start again at zero, so that the sequence number is only unique per partition and producer ID (which is good enough for our purpose).

Another useful feature of idempotent writes is that a Kafka broker is now able to detect record batches arriving in the wrong order. In fact, if a record arrives whose sequence number is higher than the previously seen sequence number plus one, the broker assumes that records got lost in flight or we see an ordering issue due to a retry and raises an error. Thus ordering is now guaranteed even if we allow more than one in-flight batch.

Trying it out

Time again to try all this. Unfortunately, the Kafka Python client that we have used so far does not (yet) support KIP-98. We could of course use a Java or Go client, but to stick to the idea of this little series to use Python, let us alternatively employ the Python client provided by Confluent.

To install this client, use

pip3 install confluent-kafka==1.4.1

Here I am using version 1.4.1 which was the most recent version at the time when this post was written, so you might want to use the same version. Using the package is actually straightforward. Again, we first create a configuration, then a producer and then send records to the broker asynchronously. Compared to the Kafka Python library used so far, there are a few differences which are worth being noted.

  • Similar to the Kafka Python library, sends are done asynchronously. However, you do not receive a future when sending as it is the case for the Kafka Python library, but you define a callback directly
  • To make sure that the callback is invoked, you have to call the poll method of the producer on a regular basis
  • When you are done producing, you have to explicitly call flush to make sure that all buffered messages are sent
  • The configuration parameters of the client follow the Java naming conventions. So the bootstrap servers, for instance, are defined by a configuration parameter called bootstrap.servers instead of bootstrap_servers, and the parameter itself is not a Python list but a comma-separated list passed as a string
  • The base producer class accepts bytes as values and does not invoke a serializer (there is a derived class doing this, but this class is flagged as not yet stable in the API documentation so I decided not to use it)

To turn on idempotent writes, there are a couple of parameters that need to be set in the producer configuration.

  • enable.idempotence needs to be 1 to turn on the feature
  • acks needs to be set to “all”, i.e. -1
  • max.in.flight should be set to one
  • retries needs to be positive (after all, idempotent writes are designed to make automated retries safe)

Using these instructions, it is now straightforward to put together a little test client that uses idempotent writes to a “test” topic. To try this, bring up the Kafka cluster as in the previous posts, create a topic called “test” with three replicas, navigate to the root of the repository and run

python3 python/idempotent_writes.py

You should see a couple of messages showing the configuration used and indicating that ten records have been written. To verify that these records do actually contain a producer ID and a sequence number, we need to dump the log file on one of the brokers.

vagrant ssh broker1
/opt/kafka/kafka_2.13-2.4.1/bin/kafka-dump-log.sh \
  --print-data-log \
  --files /opt/kafka/logs/test-0/00000000000000000000.log

The output should look similar to the following sample output.

Dumping /opt/kafka/logs/test-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 9 count: 10 baseSequence: 0 lastSequence: 9 producerId: 3001 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1589818655781 size: 291 magic: 2 compresscodec: NONE crc: 307611005 isvalid: true
| offset: 0 CreateTime: 1589818655780 keysize: -1 valuesize: 16 sequence: 0 headerKeys: [] payload: {"msg_count": 0}
| offset: 1 CreateTime: 1589818655780 keysize: -1 valuesize: 16 sequence: 1 headerKeys: [] payload: {"msg_count": 1}
| offset: 2 CreateTime: 1589818655780 keysize: -1 valuesize: 16 sequence: 2 headerKeys: [] payload: {"msg_count": 2}
| offset: 3 CreateTime: 1589818655780 keysize: -1 valuesize: 16 sequence: 3 headerKeys: [] payload: {"msg_count": 3}
| offset: 4 CreateTime: 1589818655780 keysize: -1 valuesize: 16 sequence: 4 headerKeys: [] payload: {"msg_count": 4}
| offset: 5 CreateTime: 1589818655780 keysize: -1 valuesize: 16 sequence: 5 headerKeys: [] payload: {"msg_count": 5}
| offset: 6 CreateTime: 1589818655780 keysize: -1 valuesize: 16 sequence: 6 headerKeys: [] payload: {"msg_count": 6}
| offset: 7 CreateTime: 1589818655780 keysize: -1 valuesize: 16 sequence: 7 headerKeys: [] payload: {"msg_count": 7}
| offset: 8 CreateTime: 1589818655780 keysize: -1 valuesize: 16 sequence: 8 headerKeys: [] payload: {"msg_count": 8}
| offset: 9 CreateTime: 1589818655781 keysize: -1 valuesize: 16 sequence: 9 headerKeys: [] payload: {"msg_count": 9}

Here, the third line contains the header of the entire record batch. We see that the batch contains ten records, and we find a producer ID (3001). In each of the records, we also see a sequence number, ranging from 0 to 9.

Transactions

When you read KIP-98, the Kafka improvement proposal with which idempotent writes where introduced, then you realize that the main objective of this KIP is not just to provide idempotent writes, but to be able to handle transactions in Kafka. Here, handling transactions does not mean that Kafka somehow acts as a distributed transaction manager, joining transactions of a relational database. It does, however, mean that writes and reads in Kafka are transactional in the sense that a producer can write records within a transaction, and consumers will either see all of the records written as part of this transaction or none of them.

This makes it possible to model scenarios that occur quite often in business applications. Suppose, for instance, you are putting together an application handling security deposits. When you sell securities, you produce one record which will trigger the delivery of the securities to the buyer, and a second record that will trigger the payment that you receive for them. Now suppose that the first record is written, and them something goes wrong, so that the second record cannot be written. Without transactions, the first record would be in the log and consumers would pick it up, so that the security side of the transaction would still be processed. With transactions, you can abort the transaction, and the record triggering the security transfer will not become visible for consumers.

We will not go into details about transactions in this post, but KIP-98 is actually quite readable. I also recommend that you take a look at this well written blog post on the Confluent pages that provides some more background and additional links.

With that, it is time to close this short series on Kafka and Python. I hope I was able to give you a good introduction into the architecture and operations of a Kafka cluster and a good starting point for own projects. Happy hacking!

Learning Kafka with Python – a deep dive into consumers and rebalancing

In the previous posts, we have already used the Python client to implement Kafka consumers. Today, we will take a closer look at the components that make up a consumer and discuss their inner workings and how they communicate with the Kafka cluster.

High level overview of the consumer

Our discussion will be based on the Kafka Python library, which seems to be loosely modeled after the Java consumer which is part of the official Apache Kafka project, so that the underlying principles are the same. These notes are based on version 2.0.1 of the library, the design might of course change in future versions (and has already changed substantially in the past).

Looking at the code, we see that roughly speaking, the consumer consists of three parts – the actual consumer in the package kafka.consumer, the coordinator which is responsible for talking to the group coordinator and assign partitions in the package kafka.coordinator and the network client in the top-level package which is used by other parts of the library as well. Broken down to the level of modules and classes, the following diagram shows the most important components of the consumer and their relations.

KafkaConsumerComponents

Let us start our discussion with the class on the left hand side of the diagram, the subscription state. This class is used to manage the topics and partitions a consumer has subscribed to as well as the positions of the consumer within these partitions. Note that these positions are not the committed offsets, but are the positions maintained locally (and in-memory) by the consumer that are used to determine the offset that the next fetch will use. Initially, there is no valid position for a newly assigned partition, and the partition is considered fetchable only once a position has been determined.

The second class which is used by the consumer is the fetcher. As the name suggests, this class is in charge for actually fetching data and offsets from the leader of a partition (here, offsets does not refer to committed offsets, but to the valid offsets, i.e. the first and last offset of a partition).

Fetching records from the partition leader typically works asynchronously. As an example, let us consider the method send_fetches. As indicated above, a partition is called fetchable if there is a valid position for it, the partition has not been paused and there are no unfetched records already present in the cache. After creating a list of all fetchable partitions, the send_fetches method then figures out the partition leader and assembles a fetch request. These requests are then sent to the respective partition leader using the client object. This operation returns a future, i.e. a handle which can be used to asynchronously track the progress of the fetch operation. Attached to this future, there is a callback operation. When the records are sent from the partition leader to the consumer, the client object will invoke this callback which will then add the returned records to a queue maintained by the fetcher. From there, it is retrieved when a consumer calls the method fetched_records.

It is in this function where the positions are actually updated, so that the position really reflects the records that have been consumed, not those which have been received by the fetcher but are still in the queue. Note that records are skipped if a partition has become unfetchable in the meantime or if the offset does not match the expected value in the original request.

The following diagram shows a simplified view of how records are fetched (some important details are skipped, for instance the deserialization that takes place when fetched records are removed from the queue and handed over to the consumer).

FetchingRecords

Coordinating group membership and partition assignments

Apart from fetching records, a core responsibility of the consumer is to manage the membership in a consumer group and to handle assigned partitions. This is done by the coordinator. The coordinator communicates with the group coordinator (which is one dedicated broker per consumer group) to trigger the addition and removal of group members and to balance partitions between group members. In addition, the coordinator is responsible for managing committed offsets.

Looking at the source code of the coordinator, we can see how the process of adding members to the group and assigning partitions works. This process, commonly referred to as rebalancing, typically starts when a consumer invokes the poll method of the coordinator. When this happens, the coordinator will first check whether it needs to join (or rejoin) the group, for instance because the consumer was just started. If yes, the processing in ensure_active_group will first prepare the join process, for instance by committing all offsets if auto-commit is enabled and calling the revoke method of all registered rebalance listeners (conceptually, when a rebalancing starts, all existing members will loose ownership of previously handled partitions and consequently stop processing records so that the group coordinator can reassign partitions freely – there is an ongoing effort known as cooperative rebalancing with the objective to change this).

We then wait until there are no more in-flight requests to the coordinator, and then send a JoinGroupRequest to the group coordinator. The group coordinator (broker) will wait until all members have handed in their requests (see below for more on the timeline) and then determine one member to be the group leader. As part of the JoinGroupResponse, every consumer will be informed about the newly elected leader. The group leader will then perform the actual assignment of partitions to group members (using a configurable assignor). Then, all group members send another request to the group coordinator, called the SyncGroupRequest. In this request, the group leader will inform the group coordinator about the defined partition assignments, and in the response to this message, the partition assignments will be distributed to all group members.

Once the SyncGroupResponse has been received, the method ensure_active_group will invoke _on_join_complete which will in turn trigger a call of the on_partitions_assigned method of all registered rebalance listeners. Note that at this point, all exceptions raised by the listener are swallowed, so exceptions should be caught and handled inside the listener.

This is all nice if our own consumer joins a group, but what happens if another consumer joins? This is where the heartbeat thread comes into play. This is a thread which is running in the background and periodically sending heartbeat messages to the group coordinator (with a frequency determined by the parameter heartbeat_interval_ms). If a rebalancing has been initiated by another member joining or leaving, the heartbeat response will have an error flag set, so that the consumer learns about the start of the rebalancing process. It then sets a flag, which will be evaluated during the next call of the coordinators poll method, which is in turn invoked from the consumers poll loop. If this flag is set, the coordinator will rejoin the group following the process outlined above.

At this point, timing is vital. If a consumer does not call the poll method for a long period of time, it might miss a rebalancing and will forcefully be removed from the group. This again will lead to errors when the consumer tries to commit offsets, which are difficult to handle and almost inevitably lead to duplicate processing. In general, a consumer should invoke the poll method on a regular basis, and there is again a parameter (max_poll_interval_ms) which determines the maximum allowed time between two subsequent invocations of this method.

Indirectly, this parameter also determines how long the group coordinator will wait for members to join the group (it is sent to the group coordinator as part of the join group request). The following diagram shows the typical sequence of events when a new member joins a group and triggers a rebalancing.

RebalancingProtocolTimeline

The consumers poll loop

After all these preparations, we are now ready to discuss the poll method of the Kafka consumer. In this method (or rather the private method _poll_once), we first use the coordinator and its poll method discussed above to verify that the consumer is part of a group and has partitions assigned and to trigger a rebalancing process if needed. Note that if a rebalancing is needed, this call will block so that it is made sure that we only reach the main part of the consumers poll method after the rebalancing is done.

Next, we will typically have to update all fetch positions. This happens in several steps.

  • call the method reset_offsets_if_needed of the fetcher. This method will check a flag to see if any offsets need to be reset. If yes, it will retrieve the valid offsets and apply the chosen offset reset strategy
  • if there are still partitions which do not have a valid position, we call the method refresh_committed_offsets_if_needed of the coordinator which will fetch the committed offsets from the group coordinator
  • Then, the method update_fetch_positions of the fetcher is invoked which will set the fetch positions of the partitions in question to the committed value

Back in _poll_once, we then check whether the fetcher has any previously obtained records still in its queue. If yes, we immediately return this data (and at the same time initiate a pre-fetch of the next records). Recall that the process of getting these queued records also triggers the update of the position. Then, new fetches are sent, and we poll the client until we either time out or obtain new records which we then return.

Summarizing, the diagram below displays the (slightly simplified) flow of events in case a consumer calls poll (where some calls indicated in the diagram are not made every time, depending on available fetch positions and committed offsets).

PollOnceCaseOne

From what we have said above, it is now clear that a rebalancing listener is always invoked from within the poll method – which also implies that you should not spend too much time in a rebalance listener and not make any blocking calls.

This completes our short summary of the processing inside the Kafka consumer. With this introduction and using the Java library and the rich comments inside the code, you should now be able to dig deeper into the bits and pieces if needed.

Learning Kafka with Python – implementing a database sink

Very often, either the source or the target of a Kafka based message queue is a classical relational database. Consuming data and using it to update a database table sounds straightforward, but poses a few challenges around reliability and delivery semantics. In this post, we look into two options to realize such an architecture.

The challenge

To illustrate the problem we are aiming to solve, let us suppose that we want to build an application that maintains an account balance. There is a front-end which acts as a Kafka producer and which a customer can use to either deposit money in the account or withdraw money. These transactions are then written into a Kafka topic, and a consumer reads from this topic and updates the balance kept in a relational datastore.

DatabaseSink

Thus the messages stored in the Kafka topic contains transactions, i.e. changes in the account balance, while the database table we need to maintain contains the actual balance. This is an example of what is called stream / table duality in the world of streaming – the event stream is the source of truth and reflects changes, the database contains the resulting state of the world after all changes have been applied and can at any time be reconstructed from the stream.

When we want to implement his pattern, the crucial part of our design will be to make sure that every message in the queue leads to only one update of the balance, so that no transaction is missed and no transaction is processed twice.

Pattern 1: using a message ID and de-duplication

The first pattern we could use to make this work is to achieve de-duplication based on a unique message ID, which ideally is an integer that is increased with every message. The consumer could then store the sequence number of the last processed message in the database, and could thus detect duplicates.

In a bit more detail, this would work as follows. When the producer processes an action, it first retrieves a unique message ID. This message ID could be created using a database sequence, or – if the used database does not allow for this – it could read a sequence number from a table, increment it by one and update the table accordingly. It would then add this sequence number as a key to the Kafka message.

The consumer would store the latest processed sequence number in the database. When it reads a mesage from Kafka, it uses this number to check whether the message has already been processed before. If not, it updates balance and sequence number in one transaction and then commits the new offset to Kafka.

DatabaseSinkPatternI

Let us see how duplicates are handled in this pattern. If, for some reason, the topic contains two messages with the same ID, the consumer will process the first message, increase the latest processed sequence number in the database and commit the new balance. It will then read the duplicate, compare its sequence number against the latest value, detect the duplicate and simply ignore it. Thus the consumer is able to do a de-duplication based on the sequence number.

Unfortunately, this simple pattern has one major disadvantage – it does not work. The problem is that the order of messages is not guaranteed across partitions. Suppose, for instance, that the producer creates the following messages:

  • Message 1, partition 0
  • Message 2, partition 1
  • Message 3, partition 0

Now it might happen that the consumer processes the messages in the order 1,3,2. Thus the consumer would, after having processed message 3, set the highest consumed sequence number to “3” in the database. When now processing message two, our simple duplicate detection algorithm would then classify this message as a duplicate. Thus, to make this work, it is vital that we store the highest processed sequence number by partition and not across all partitions. We could even create the sequence number per partition, which would also remove a possible bottleneck as creating the sequence number would otherwise effectively serialize the producers.

Also note that, as we need to maintain a “last processed” sequence number per partition in a database, we also need to maintain this table if we add new partitions to our topic or remove partitions.

Alternatively, if there is a message ID which is not ordered and increasing with each message, the consumer could store all processed message IDs in a separate database table to keep track of the messages that have already been processed (which, depending on the throughput, might require some sort of periodic cleanup to avoid that the table grows too big).

If the consumer fails after committing the changed balance to the database, but before committing the offset to Kafka, the same mechanism will kick in, as long as we commit the new balance and the updated value of the last processed message in one database transaction. Thus, duplicates can be detected, and we can therefore rely on the standard mechanisms Kafka offers to manage the offset – we could even read entire batches from the topic and use auto-commit to let Kafka manage the offset.

Let us now try out this pattern with the code samples from my repository. To be able to run this, you will have to clone my GitHub repository and follow the instructions in my initial post in this series to bring up your local Kafka cluster. Then, make sure that your current working directory is the root directory of the repository and run the following commands which will install the Python package to access a MySQL database and bring up a Docker based installation of MySQL with a prepared database.

pip3 install mysql-connector-python
./db/createDB.sh

This script will start a Docker container kafka-mysql running MySQL, add a user kafka with a default password to it and create a database kafka for which this user has all privileges. Next, let us run a second script which will (re)-create a Kafka topic transactions with two partitions and initialize the database.

./db/reset.sh

Now we run three Python scripts. The first script is the producer that we have already described, which will simply create ten records, each of which describing a transaction. The second script is our consumer. It will

  • subscribe to the transactions topic
  • Read batches of records from the topic
  • for each record, it will (in one transaction!) update the account balance and the last processed sequence number
  • commit offsets in Kafka after processing a batch
  • apply the duplicate detection mechanism outlined above while processing each record

Finally, the third script is a little helper that will (without committing any offsets, so that we can run it over and over again) scan the topic, calculate the expected account balances, retrieve actual account balances from the database and check whether they coincide.

python3 db/producer1.py
python3 db/consumer1.py
python3 db/dump1.py --check

Let us now try to understand how our scripts work if an error occurs. For that purpose, the consumer has a built-in mechanism to simulate random errors between committing to the database and committing offsets to Kafka which is activated using the parameter –error_probability. Let us repeat our test run, but this time we simulate an error with a probability of 20%.

./db/reset.sh && python3 db/producer1.py
python3 db/consumer1.py --error_probability=0.2 --verbose
python3 db/dump1.py --check

You should now see that the consumer processes a couple of messages before our simulated error kicks in, which will make the consumer stop. The check script should detect the difference resulting from the fact that not all messages have been processed. However, when we now restart the script without simulating any errors, we should see that even though there are duplicate records, these records are properly detected and eventually, all records will be processed and the balances will again be correct.

python3 db/consumer1.py --verbose
python3 db/dump1.py --check

Pattern 2: store the consumer offset in the target database

Let us now take a look at an alternative implementation (which is in fact the pattern which you will hit upon first when consulting the Kafka documentation or other sources) – maintaining the offsets in the database altogether. This implementation does not require a sequence number generated by the producer, but is therefore also not able to detect any duplicate messages in case the duplicates originate already in the producer.

The idea behind this pattern is simple. Instead of asking Kafka to maintain offsets for us, the consumer application handles offsets independently and maintains a database table containing offsets and partitions. When a record is processed, the consumer opens a database transaction, updates the account balance and updates the offset in the same transaction. This guarantees that offsets and balances are always in sync.

This sound simple enough, but there are a few subtleties that need to be kept in mind when this is combined with consumer groups, i.e. if Kafka handles partition assignments dynamically. Whenever a partition is assigned to our consumer, we need to make sure that we position the consumer at the latest committed offset before processing any records. Conversely, when an assignment is revoked, we need to commit the current offsets to make sure that they are not lost. This can be implemented using a rebalance listener which is registered when we subscribe to the topic.

DatabaseSinkPatternII

To try this out, run the following commands which will first reset the database and the involved topic, run the producer to create a few messages (this time without the additional sequence number) and then run our new consumer to read the messages and update the database.

./db/reset.sh && python3 db/producer2.py
python3 db/consumer2.py 
python3 db/dump2.py --check

The last command, which again checks the updated database table against the expected values, should again show you that expected and actual values in the database match.

It is instructive to try a few more advanced scenarios. You could, for instance, run the producer and then start a consumer which reads the first set of messages produced by the consumer (use the switch –runtime=3600 to make this consumer run for one hour). Then, start a second consumer in a separate terminal window and observe the rebalancing that occurs. Finally, run the producer again and verify that the partition assignment worked and both consumers are processing the messages in their respective partition. And again, you can simulate errors along the way and see how the consumer behaves.

Learning Kafka with Python – consuming data

We now understand how Kafka producers add data to partitions. So let us move on and take a look at consumers – how they operate, how they are configured and how different levels of reliability and delivery guarantees can be achieved.

Consumer groups

In the previous post on producers, we have seen that the interaction between a producer and a Kafka broker is rather simple. Basically, producers request metadata to obtain data on partitions and leading brokers and then send records to the partition leader. The Kafka broker does not keep track of a producers state, and producers can actually come and go without Kafka even noticing it (this is a bit different when transactions are used, as in this case, the broker needs to keep track of the producers state as well, but this is beyond the scope of this post).

For consumers, the situation is different. The main reason for this different design is that while a producer determines itself to which partition data is written, a consumer typically lets Kafka make this decision. In this programming model, Kafka distributes the available partitions to the available consumers, trying to balance the load evenly. If a new consumer appears, Kafka will assign partitions to it, and if a consumer goes down, Kafka will re-assign these partitions to one of the remaining consumers. To make this work, Kafka needs to keep track of the state of a consumer (in fact, a consumer is expected to send periodic heartbeats so that Kafka can detect when a consumer goes down, and Kafka tracks the state of consumers as part of the ZooKeeper data structures).

To better understand how this works, we first have to understand consumer groups. Logically, a consumer group is very similar to an application – it is a logical entity reading data from a topic. If, for instance, you are using Kafka to distribute instrument master data in a securities processing application, there will typically be different application components that need this data – say a trading frontend, a settlement module or a tax processing module. So each of these application components could be set up as a consumer group in Kafka, so that they all obtain records from the instrument master data topic independently, similar to the pub/sub semantics of a traditional messaging system.

To increase scalability and fault tolerance, there can be many consumers inside a consumer group, but Kafka will try to make sure that within a consumer group, every message is delivered to only one consumer.

ConsumerGroups

To achieve this, Kafka will assign partitions in a topic to consumers within a consumer group. To make sure that each message is only processed once by one consumer group, each partition can be assigned to only one consumer, but if there are more partitions than consumer, a single consumer can read from more than one partition (so scaling the number of consumers beyond the number of partitions will lead to idle consumers).

ConsumerGroupsPartitions

It is worth mentioning that this is not the only programming model supported by Kafka. Instead of letting Kafka determine the assignment of partitions to consumers, consumers can also subscribe directly to a partition and thus define their own assignment. In this case, consumers need to implement their own mechanisms to detect a change in the number of partitions or to rebalance the load if a consumer goes down. This can, however, be useful if the number of partitions is constant and lost consumers are immediately replaced by some sort of restart mechanism. If you want to take a closer look at how exactly Kafka manages the assignment of partitions to consumers in a group, take a look at this excellent blog post on the Confluent web site or this page on the Confluent Wiki.

Maintaining the offset

The next problem we have to solve is the maintenance of the offset. A traditional messaging system typically makes sure that a message is only delivered once. With Kafka, this task is left to the consumer (this is why some people refer to this model as the “dumb broker – smart consumer” model). In fact, the low level API “FETCH” call of the Kafka protocol expects that a consumer specifies the offset of the record (batch) it wants to read. So the consumer needs to know which offsets is has already processed to make sure that all records are read and that no record is processed twice.

This is not an easy task and subject to race conditions. Suppose, for instance, we decide to store the offset in a separate database, and our processing logic is (in a hopefully readable pseudo-code)

offset = db.read_offset()
while true:
  record = read_record(offset)
  process(record)
  offset = offset + 1
  db.store_offset(offset)

Now suppose that this consumer fails after processing the record, but before writing the updated offset into the database. When we now restart the consumer, it will read the old offset from the database and process the last record twice. If, conversely, we change the order and commit the new offset before processing the record, we would miss a record if the consumer dies between these two steps.

Instead of persisting the offset yourself, you can also ask Kafka to do this for you. When making use of this option, Kafka will store your offset in a dedicated topic. A consumer can either explicitly commit the offset to this topic, or can use auto-commit, which simply means that Kafka will automatically commit every few seconds (which, of course, leads to duplicate processing if this interval is, say, 5 seconds and the consumer dies 4 seconds after the last commit). In a later post, we will look into transactional writes, which even allow exactly-once delivery as long as no other data stores are involved.

Creating and using a KafkaConsumer

Let us now see how we can create and use a consumer with the Python Kafka API and how the consumer is configured.

First, we need to create a consumer object. When creating a consumer, there are three parameters that we need to provide: the topic from which we want to read data, the ID of the consumer group that the consumer is part of (which is an arbitrary string), at least if we plan to use the automatic assignment of partitions and / or we want Kafka to store offsets for us, and a list of bootstrap servers. So a code snippet creating a consumer could be as follows.

import kafka
consumer=kafka.KafkaConsumer("test", 
         group_id="my_group",
         bootstrap_servers="broker1:9092")

Once we are done with a consumer. we should always clean up again by calling consumer.close() so that the consumer can properly leave the group.

When using SSL to connect to the broker, you will again have to provide additional parameters when building the consumer, as we have done it for the producer.

As for the producer, a consumer can also be configured with custom deserializers. If, for instance, we use JSON as a serializer, as we have demonstread in the previous post on building a producer, we now need to provide a matching deserializer that converts a byte stream back into the target format used by the application. As for the producer, deserializers for keys and payloads can be supplied using the additional configuration parameters key_deserializer and value_deserializer.

As mentioned above, one option to deal with offsets is to leave the processing to Kafka and to ask Kafka to automatically commit offsets for us. This is in fact the default behavior, and controlled by the following parameters.

  • enable_auto_commit – this is a boolean flag which tells Kafka whether we want to automatically commit offsets, and defaults to true
  • auto_commit_interval_ms – this specifies the interval at which Kafka will commit offsets. The default is five seconds, which implies that in the worst case, the messages processed during the last five seconds will be consumed twice if your consumer fails shortly before a commit
  • auto_offset_reset – this parameter determines from which offset Kafka should start processing if no valid offset can be found. This clearly happens when we start the consumer for the first time, but can also happen if messages are deleted or are lost. If we set this to “earliest”, Kafka will start the processing at the first available offset. If we use “latest”, it will start processing at the end of the log, i.e. with the next message that will be added to the log. The default is “earliest”

Before we can read any data, we have to subscribe to a topic. When creating the consumer, we already refer to a topic, and in fact, the consumer will automatically subscribe to this topic. It is also possible to manually subscribe. This is typically done when you want to add a rebalance listener to be informed about changes in the set of assigned partitions. A rebalance listener is any class derived from kafka.ConsumerRebalanceListener which is passed as argument to the subscribe call. Whenever a partition assignment is made or revoked, Kafka will then call the corresponding method of the listener.

consumer.subscribe(TOPIC, 
    listener=MyConsumerRebalanceListener())

When an application wants to manually store offsets, for instance in a database, it can use this mechanism and / or the method consumer.assignment() to keep track of the records assigned to it. Note that, as explained in the source code comments of the listener class, Kafka will first invoke the on_partitions_revoked method of all listeners before calling any of the on_partitions_assigned methods. These handlers will be invoked from the polling loop, i.e. only when you pass control to the consumer by reading data from it, not in a separate thread (we will learn more about the exact mechanics of this process in a separate future post).

Now let us see how we can actually read data from a topic. The library offers two options to do this. First, we can simply invoke the poll method of the consumer object, which will return a batch of records. Alternatively, and more “pythonish”, we can treat the consumer object as an iterator and simply loop over it to get one record at at time.

Note that some methods of the consumer can block as they are waiting for responses from the server. As in general, consumers should make sure to not block outside of the polling loop, it is not advised to call the consumers methods in separate threads. My experience is that it can lead to problems if a signal handler, for instance, invokes methods of the consumer to shut down the consumer. Instead, it should only set a stop flag, while invoking all methods of the consumer object in the polling loop.

def signal_handler(signal, frame):
  stop = 1

while not stop:
  for record in consumer:
    .....
consumer.close()

Let us now discuss different options to commit offsets. We have already seen that the default is auto-commit, which implies that Kafka will commit automatically every 5 seconds. When using this option, we can guarantee that all messages will be read at least once, but need to be prepared to receive messages more than once. If we need full control over the process of committing offsets, we need to disable auto-commit by setting enable_auto_commit to false.

At this point, it is important to remember that the Kafka client requests data from the broker in batches. If we ask the client to commit the offset, it will commit the entire batch. It therefore does not make sense to commit once during every loop iteration of the pseudo-code above, but once at the end of the batch. As the iterator interface of the consumer object makes it difficult to determine when a batch has ended, it is easier to use the poll method of the consumer. This method returns a dictionary, where the keys are TopicPartition objects, i.e. named tuples describing a combination of topic and partition, and arrays of records.

When using manual commits, we again have several choices. First, we can commit after every record. In this way, we will have at most one duplicate in case of an error, but create an additional overhead and reduce our throughput significantly. Alternatively, we can use a batch size greater than one and commit after each batch. This will be more efficient, but if a the processing fails in the middle of the batch, we will re-read the first few records in the batch when we restart and thus process records twice.

Trying it out

Let us now see how this works in practice. If you have cloned my GitHub repository and installed Kafka as described in my previous post, you are ready to run some examples that are part of the repository and located in the python subdirectory. First let us delete and re-create the topic that we have already used for our producer tests by running the following commands on the lab PC (after changing to the repository root directory)

./kafka/bin/kafka-topics.sh \
  --bootstrap-server=$(python/getBrokerURL.py) \
  --command-config ./.state/client_ssl_config.properties \
  --topic test \
  --delete
./kafka/bin/kafka-topics.sh \
  --bootstrap-server=$(python/getBrokerURL.py) \
  --command-config ./.state/client_ssl_config.properties \
  --topic test \
  --create \
  --partitions 2 \
  --replication-factor 3

Next, we can create 10 messages in this topic by running the producer that we have already used in the previous post.

python3 python/producer.py --create_keys

We can now run our consumer to read the messages that we have just written. To do this, simply enter

python3 python/consumer.py 

Looking at the output, we see that the first attempt to poll triggers a partition reassignment. First, the coordinator will revoke the existing group assignments for all group members. Then it will assign the existing two partitions to our consumer, as this is the only consumer in the group, so that our listener is called. As this is the first read, there are no committed offsets yet, and as we have set auto_offset_reset to “earliest”, we start our read at position zero (the first offset).

We now start to read records from the log. In a second terminal window, we can inspect the currently stored offsets.

./kafka/bin/kafka-consumer-groups.sh \
  --bootstrap-server=$(python/getBrokerURL.py) \
  --command-config ./.state/client_ssl_config.properties \
  --group test-group \
  --describe

We should now see that Kafka has assigned our consumer to the two partitions and has recorded the updated offsets. As we have read every record once, the offsets should now be identical to the last read position. If you run this command quickly after starting the consumer, you should even be able to see that the automated commit only takes place after a couple of seconds.

When you now stop the consumer by hitting Ctrl-C and run it again, it will not print any new records, as it will restart at the committed offsets. To re-read our messages, we will have to reset the offsets. There are two ways to do this. You can either run

./kafka/bin/kafka-consumer-groups.sh \
  --bootstrap-server=$(python/getBrokerURL.py) \
  --command-config ./.state/client_ssl_config.properties \
  --group test-group \
  --topic test \
  --reset-offsets \
  --to-earliest \
  --execute 

or use our consumer, which has a switch –reset instructing it to only reset the offsets without reading any records. In both cases, we should now be ready for another test. This time, we disable auto-commit and use manual commits.

python3 python/consumer.py --disable_auto_commit

You should now see that the messages are processed once again, and that the offsets will again be committed, though this is triggered by our explicit calls to consumer.commit() this time.

Next, let us try what happens if we do not commit any offsets at all. Our test client supports this by setting the flag –no_commit

python3 python/consumer.py --reset
python3 python/consumer.py --no_commit
python3 python/consumer.py --no_commit

As expected, the second and third invocation both return the full set of data, as the offsets are never committed and the third invocation therefore starts at the same point at which the second invocation started.

Finally, it is instructive to see how several consumer interact. To set this, first reset all offsets again. Then, open a second terminal, start the consumer in the first terminal and then start a second consumer in the second window. The output should show you that

  • Initially, the first consumer will start to process both partitions
  • When the second consumer is started, the partitions will be revoked, and the corresponding listener is called for both consumers
  • Then, each of the two partitions will be assigned to one of the two consumers

Both consumers should now wait for data on their respective partition. If you now run the producer again to generate ten additional messages, you should nicely see that both consumers receive messages for their respective partitions in parallel.

This completes our discussion of consumers for the time being. There are a couple of points that we have not yet explored (like the manual assignment of partitions, different options for timeouts, the heartbeat thread which periodically sends a hearbeat to the Kafka group coordinator or consumers without consumer groups), but most of this is readily accessible in the Kafka documentation. In the next post, we will look at some patterns to read data from a Kafka topic and use it to maintain state in a relational database.

Learning Kafka with Python – producing data

As proud owners of a brand new Kafka installation, we are now ready to explore how applications interact with Kafka. Today, we will look at producers and understand how they write data to Kafka.

Basic design considerations

At first glance, writing data to Kafka sounds easy – connect to a Kafka broker and submit a message. However, there are some basic design considerations that are relevant when building a Kafka producer.

First, we have seen that Kafka stores the data in a topic in multiple partitions, and then each partition has a leader which is responsible for writing messages into the partition. Thus a producer needs to determine to which partition a message should be written and contact the responsible leader for this partition.

Defining the mapping of messages to partitions can be crucial for reliability and scalability of your application. Partitions determine how the application can scale horizontally, and we will learn later that partitions also determine to which extent consumers can scale. In addition, Kafka guarantees message order only within a partition. Specifically, if message A is written to partition before message B is written to the same partition, message A will receive a lower offset than B and will be read first by a (well behaving) consumer. This is no longer true if messages A and B are written to different partitions. Think of partitions as lanes on a highway – there is no guarantee that two cars entering the highway in a certain order but in different lanes will arrive at the destination in the same order.

Often, you will want to use a business entity to partition your data. If you are building a customer facing application, you might want to partition your data by customer group, if you are building a securities processing application the financial instrument might be a good partition criterion, if you are maintaining accounts then the account number might be a good choice and so forth. In other cases, where ordering is not important, you might go for a purely technical criterion.

The next fundamental question we have to figure out is when a message is considered to be successfully written. When the broker has received it? Or when the leader has written the message? Or should we wait until all followers have successfully stored the message? And what if a follower lags behind – should we stop writing messages until the follower has recovered or move on, accepting that we have lost one follower without knowing whether it will recover at a later time?

Kafka does not give a definitive answer to all these questions, but leaves you a choice – put differently, when you create a producer, you can specify its behavior using a variety of options. So let us now see how this is done in Python.

The producer object

Let us now see how a producer can be created using Python. If you have not yet done so, please install the Kafka Python library to be able to run the examples.

pip3 install kafka-python

This series uses version 2.0.1 of the library, if you want to use exactly that version you need to specify that as usual, i.e. run

pip3 install kafka-python==2.0.1

To send messages to Kafka, the first thing we need to do is to create a producer object, i.e. an instance of the class kafka.Producer. The init-method of this class accepts a large number of arguments, but in the most straightforward case, there is exactly one argument bootstrap_servers. This argument is a list of listener URLs, for instance 10.100.0.11:9092, which the producer will use to make an initial connection to a Kafka broker. This list does not need to contain all brokers, in fact one entry will do, but the producer will use this broker to obtain other brokers if needed. It is a good idea to list at least two or three brokers here, in case one broker is temporarily unavailable. So creating a producer could look like this.

import kafka
producer=kafka.KafkaProducer(bootstrap_servers=["broker1:9092", "broker2:9092"])

When started, a Producer will create a separate sender thread which will asynchronously send messages to the brokers. In addition, it will create an internal client which holds the actual connections to the Kafka cluster.

When using an SSL listener, we need a few additional configuration items. Specifically, we need to add the following named parameter when creating a KafkaProducer

  • ssl_cafile – this is the location of a CA certificate that the client will use to verify the certificate presented by the server
  • ssl_certfile – this is the location of the client certificate that the client will in turn present to the server when the server requests a certificate
  • ssl_keyfile – the key matching the client certificate

In order to be bit more flexible when it comes to connecting to different setups, the code examples that we will use in this series read the list of brokers and the SSL configuration from a YAML file config.yaml that the installation script will create in the subdirectory .state of the repository directory. All test scripts accept a parameter –config that you can use to overwrite this default location, in case you want to use your own configuration.

Once we are done using a producer, we should close it using producer.close() to clean up.

Keys and partitions

Once we have a producer in our hands, we can actually start to send messages. This requires only two parameters: the topic (a string) and the payload of the message (a sequence of bytes).

producer.send("test",value=bytes("hello", "utf-8"))

Note that this will create a topic “test” if it does not exist yet, using default values specified in the server configuration (server.properties), so be careful to use this as the default configuration might not be what you want (you can also turn this feature off by setting auto.create.topics.enable to false in server.properties).

Now you might remember from my previous post that a record in Kafka actually consists of a payload and a key. Here, we do not specify a key, so the key will remain empty. But of course, you can define a key for your record by simply adding the named parameter key to the method invocation, like this.

producer.send("test",
        value=bytes("hello", "utf-8"),
        key=bytes("mykey", "utf-8"))

What about partitions? The low-level protocol that a Kafka broker understands expects the client to send a PRODUCE request containing a valid partition ID, so it is up to the client to take this decision. The application programmer can either decide to explictly specify a partition ID (an integer) as an optional parameter to the send method, or let the framework take the decision. In this case, a so-called partitioner is invoked which, based on the value of the key, selects a partition to write to.

An application can set the configuration item partitioner when creating a producer to define a customer partitioner (which is simply a callable object that the producer will invoke). If no partitioner is specified, the default partitioner will be used, which implements the following logic.

  • If no key is given, the default partitioner will simply distribute the messages randomly across the available partitions
  • If a key is provided, a hash value of the key will be computed (using a so-called MurmurHash, which will always be an integer. The value of this hash (more precisely, of its last 31 bits) modulo the number of partitions will then determine the partition to use

The important thing to keep in mind is that if you do not provide a key, your message will end up in a random partition. If you do provide a key, then Kafka will guarantee that messages with the same key will go to the same partition and hence be processed in order.

Serialization

In our examples so far, we have passed a sequence of bytes to the send method, both for the key and the value. This is the format that the low-level protocol expects – at the end of the day, keys and values are sent over the wire as a sequence of bytes, and stored as a sequence of bytes.

In many applications, however, you will want to store more complex data types, like JSON data or even objects. So be able to do this with Kafka, you will have to convert your data into a sequence of bytes when sending the data, a process known as serializing.

When creating the producer, you can specify your own serializers for keys and payloads by adding the named parameters key_serializer and value_serializer when creating the producer. Here, a serializer can either be a function which accepts whatever input format you prefer and returns a sequence of bytes, or an instance of the class kafka.serializer.Serializer which has a serialize method which the framework will invoke.

Suppose for instance you wanted to serialize JSON data. Then, you need to provide a serializer which accepts a JSON object and returns a sequence of bytes. For that purpose, we can use the standard json.dumps method to first produce a string, and then encode the string using e.g. UTF-8 to obtain a sequence of bytes. Thus your serializer would look something like

def serialize(data):
    return bytes(json.dumps(data), "utf-8")

and when creating the producer, the call would be something like

producer=kafka.KafkaProducer(..., 
   value_serializer=serialize, ...)

Choosing a reasonable serializer is an important design choice. As Kafka topics are designed to be durable objects, you need to think about things like versioning when the decoding changes as you release new features, and obviously all components of a system need to use matching serializers and de-serializers to be able to exchange data. Many Kafka projects actually use third-party serializers, like Apache Avro or Google’s protobuf.

Acknowledgements

So far, we have seen how we can send messages using the send method of a KafkaProducer object. But in reality, you of course want to know whether your message was successfully send and stored by Kafka.

This leads us to the question at which point a new record can be considered to be committed to a Kafka cluster, i.e. stored and available for consumers. Before getting into this, however, we first have to understand the notion of an in-sync replica.

Recall that Kafka replication works by designating a leader for a partition and zero, one or more followers which constantly ask the leader for new records in the partition and store them in their own copy of the partition log. As the replicas read the records from the leader, the leaders knows which record has been delivered to which follower. The partition can therefore determine whether a replica is out-of-sync, which happens if a follower fails to retrieve the latest message within a defined time frame, or in-sync.

Having enough in-sync replicas is vital for the reliability. If a leader goes down, Kafka has to elect a new leader from the set of available replicas. Of course, choosing an out-of-sync replica to be the new leader would imply that we promote a replica to the master and thus to our new source of truth that not yet replicated all messages that producers have sent to the leader. Thus, making such a replica the new leader results in a loss of records. In some situations, you might still opt to do so, which Kafka allows you if the parameter unclean.leader.election.enable is set to True in the broker configuration.

InSyncOutOfSync

Now let us come back to the question of when a message sent by a producer will be considered committed. Again, Kafka offers you a choice, governed by the value of the parameter acks of a producer.

  • When acks = 1 (the default), a message will be considered committed once the leader acknowledges the message. Thus Kafka guarantees that a committed message has been added to the leading partition, but not that it has already been written to one or even all replicas. Note that, as the leader might cache the record in memory, this can lead to data loss if the leader goes down after acknowledging the message, but before a follower has copied it
  • When acks = -1 (all), a record will be considered committed only once the leader and in addition all in-sync replicas have acknowledged receipt of the message. As long as you have enough in-sync replicas, this gives you a strong guarantee that the message is available on several nodes and thus data loss has become very unlikely
  • Finally, a value of acks = 0 means that the message will be considered committed once it has been sent over the network, regardless of any acknowledgement from the leader or a follower. This obviously is a very weak guarantee and only reasonable if you have a strong focus on throughtput and can live with a loss of (potentially many) records

When usings acks = all, the number of in-sync replicas is of course vital. To illustrate this, let us assume that you have configured a topic with three replicas, but both followers have become out-of-sync. Now a message will be committed once the leader has written it, meaning that if the leader is lost, the record will be lost as well. To avoid such a scenario, you can set the server property min.insync.replicas. This number determines how many replicas (including the leader) need to be in-sync in order to still accept new messages. Thus if you use, for instance, a topic with a replication factor of three and min.insync.replicas=2 in combinations with acks=-1, then Kafka will guarantee that a message is only reported as committed once the leader and at least one follower have received the record.

Acks

Finally, there is one more parameter that is important for the reliability of a producer – max_in_flight_requests_per_connection. A request (which typically contains more than one record to be written) will be considered as in-flight as long as no result – either an acknowledgement or an error – has been received from the broker. If this parameter (which defaults to 5!) is set to a value greater than one, this implies that the producer will not wait for an acknowledgement before sending the next batch. In combination with retries, this can imply that the order in which messages are added to the log is not identical to the order in which they have been sent, and if the producer goes down, in-flight messages night be lost if the broker is not able to process them. Thus set this to one if you need strong guarantees on at-least-one delivery and ordering.

Retries and error handling

Finally, the last important design decision that you need to take when writing a producer is how to deal with errors.

The send method that we are using delivers messages asynchronously to the actual sender and immediately returns. To figure out whether the record was successfully committed, we therefore cannot simply use its return value, but need a different approach. Therefore, the send method returns a handler which can later be used to retrieve the status of the message, a so called Future, or, more precisely, a subclass called FutureRecordMetadata. Once we have this object, we can call its get method with a timeout in seconds to wait for the request to complete. If the request was successful, this method returns a dictionary containing record metadata, otherwise it raises an exception of type kafka.errors.KafkaError. Alternatively, you can also specify callback functions for successful and failed sends.

Note that the producer also has an option to automatically retry failed messages, which can be configured by setting the parameter retries to a value different from zero. In general, however, you should be careful with this as it might conflict with ordering, see the discussion of in-flight requests above.

Testing our producer

After all this theory, it is now time to test our producer. I assume that you have followed my previous post and installed Kafka in three virtual nodes on your PC. Now navigate to the root of the repository and run the following command to create a test topic.

./kafka/bin/kafka-topics.sh \
  --bootstrap-server=$(./python/getBrokerURL.py) \
  --command-config=.state/client_ssl_config.properties \
  --create \
  --topic test \
  --replication-factor 3 \
  --partitions 2 

Let us see what this command is doing. The script that we run, kafka-topics.sh, is part of the standard Kafka admin command line tools that are bundled with the distribution. In the second line, we invoke a little Python script that evaluates the configuration in YAML format which our installation procedure has created to determine the URL of a broker, which we then pass to the script. In the third line, we provide a Java properties file containing the SSL parameters to connect to our secured listener.

The remaining switches instruct the tool to create a new topic called “test” with a replication factor of three and two partitions (which, of course, will fail if you have already created this topic in the previous post).

Next, we will run another tool coming with Kafka – the console consumer. This is a simple consumer that will simply subscribe to a topic and dump all records in this topic to the console. To run it, enter

kafka/bin/kafka-console-consumer.sh   \
   --bootstrap-server $(./python/getBrokerURL.py)   \
   --consumer.config .state/client_ssl_config.properties \
   --from-beginning \
   --topic test 

Now open an additional terminal, navigate to the root of the repository and run the producer.

python3 python/producer.py

This should print the producer configuration used and the number of messages produced, plus timestamps and the number of seconds and microseconds it took to send all messages. In the first terminal window, in which the consumer is running, you should then see this messages flicker by.

Now let us try out a few things. First, let us create 10000 messages with a set of configurations promising the highest throughput (acks=0, fully asynchronous send, no keys provided, five requests in flight).

python3 python/producer.py \
  --messages=10000 \
  --ack=0

On my PC, producing these 10000 messages takes roughly half a second, i.e. our throughput is somewhere around 20.000 messages per seconds, without any tuning (of course the results will heavily depend on the machine on which you are running this). Next, we produce again 10000 messages, but this time, we use very conservative settings (acks=all, only one message in flight, wait for reply after each request, create and store keys and use them to determine the partition).

python3 python/producer.py \
  --messages=10000 \
  --ack=-1 \
  --max_in_flight_requests_per_connection=1 \
  --wait \
  --create_keys

Obviously, this will be much slower. On my PC, this took roughly 17 seconds, i.e. it is slower by a factor of about 25 than the first run. This hopefully illustrates nicely that Kafka leaves you many choices for trade-offs between performance and availability. Use this freedom with care and make sure you understand the consequences that the various settings have, otherwise you might loose data!

Putting it all together – the send method behind the scenes

Having seen a producer in action, it is instructive to take at a short look at the source code of the Python implementation of KafkaProducer, specifically at its send method. First, the partitions of the topic are retrieved, either from cached metadata or by requesting updated metadata from the server. Then, the key and value are serialized, and the partitioner is invoked which determines the partition to which we write the record.

Next, instead of directly sending the record to the broker, it is appended to an internal buffer using an internal helper class called a RecordAccumulator. If the accumulator signals that the buffer is full, the sender thread is triggered which will then actually transmit the entire batch to the Kafka broker. Finally, the future object is returned.

SendingData

This completes our discussion of Kafka producers. In the next post, we will learn how we can consume data from Kafka and how consumers, producers and brokers play together.

Learning Kafka with Python – installation

In this post, we will install a Kafka cluster with three nodes on our lab PC, using KVM, Vagrant and Ansible.

The setup

Of course it is possible (and actually easy, see the instructions in the quickstart section of the excellent Apache Kafka documentation) to run Kafka as a single-node cluster on your PC. The standard distribution of Kafka contains all you need for this, even an embedded ZooKeeper and a default configuration that should work out-of-the-box. Some of the more advanced topics that we want to try it in the course of this series, like replication and failover scenarios, do however only make sense if you have a clustered setup.

Fortunately, creating such a setup is comparatively easy using KVM virtual machines and tools like Vagrant and Ansible. If you have never used these tools before – do not worry, I will show you in the last section of this post how to download and run my samples. Still, you might want to take a look at some of the previous posts that I have written to get a basic understanding of Ansible and Vagrant with libvirt.

In our test setup, we will be creating three virtual machines, each sized with two vCPUs and 2 GB of memory. On each of the three nodes, we will be running a ZooKeeper instance and a Kafka broker. Note that in a real-world setup, you would probably use dedicated nodes for the ZooKeeper ensemble, but we co-locate these components to keep the number of involved virtual machines reasonable.

Our machines will be KVM machines running Debian Buster. I have chosen this distribution primarily because it is small (less than 300 MB download for the libvirt vagrant box) and boots up blazingly fast – once the initial has initially been downloaded to your machine, creating the machines from scratch takes only a bit less than a minute.

To simulate a more realistic setup, each machine has two network interfaces. One interface is the default interface that Vagrant will attach to a “public” network and that we will use to SSH into the machine. On this interface, we will expose a Kafka listener secured using TLS, and the network will be connected to the internet using NATing. A second interface will connect to a “private” network (which, however, will still be reachable from the lab PC). On this network, we will run an unsecured listener and Kafka will use this interface for inter-broker communication and connecting to the ZooKeeper instances.

Setup

Using the public interface, we will be able to connect to the Kafka brokers via the TLS secured listener and run our sample programs. This setup could easily be migrated to your favored public cloud provider.

Installation steps

The installation is mostly straightforward. First, we setup networking on each machine, i.e. we bring up the interfaces, assign IP addresses and add the host names to /etc/hosts on each node, so that the hostname will resolve to the private interface IP address. We then install ZooKeeper from the official Debian packages (which, at the time of writing, is version 3.4.13).

The ZooKeeper configuration can basically be taken over from the packaged configuration, with one change – we need to define the ensemble by listing all ZooKeeper nodes, i.e. by adding the section

server.1=broker1:2888:3888
server.2=broker2:2888:3888
server.3=broker3:2888:3888

to /etc/zookeeper/conf/zoo.cfg. On each node, we also have to create a file called myid in /etc/zookeeper/conf/ (which is symlinked to /var/lib/zookeeper/myid) containing the unique ZooKeeper ID – here we just use the last character of the server name, i.e. “1” for broker1 and so forth.

Once ZooKeeper is up and running, we can install Kafka. First, as we want to use TLS to secure one of the listeners, we need to create a few keys and certificates. Specifically, we need

  • A self-signed CA certificate and a matching key pair
  • A key-pair and matching server certificate for each broker, signed by the CA
  • A client key-pair and a client certificate, signed by the same CA (though we could of course use a different CA for the client certificates, but let us keep things simple)

Creating these certificates with OpenSSL is straighforward (if you have never worked with OpenSSL certificates before, you might want to take a look at my previous post on this). We also need to bundle keys and certificates into a key store and a trust store for the server and similarly a key store and a trust store for the client (where the keystore holds the credentials, i.e. keys and certificates, presented by the server respectively client, whereas the trust store holds the CA certificate). For the server, I was able to use a PKCS12 key store created by OpenSSL. For the client, however, this did not work, and I had to use the Java keytool to convert the PKCS12 keystore to the JKS format.

After these preparations, we can now install Kafka. I have used the official tarball from the 2.13-2.4.1 release downloaded from this mirror URL. After unpacking the archive, we first need to adapt the configuration file server.properties. The item we need to change are

  • On each broker, we need to set a broker ID – again, I have used the last character of the hostname for this. Note that this implies that broker ID and ZooKeeper ID are the same, which is pure coincidence and not needed
  • The default configuration contains an unsecured (“PLAINTEXT”) listener on port 9092, to which we add an SSL listener on port 9093, using the IP of the public interface
  • The default configuration places the Kafka logs in /tmp, which is obviously not a good idea for anything but a simple test, as most Linux distributions clean up /tmp when you reboot. So we need to change this to point to a different directory
  • As we are running more than one node, it makes sense to change the replication settings for the internal topics
  • Finally, we need to adapt the ZooKeeper connection string so that all Kafka brokers connect to our ZooKeeper ensemble (and thus form a cluster, note that more or less by definition, a cluster in Kafka is simply a bunch of brokers that all connect to the same ZooKeeper ensemble).

Finally, it makes sense to add a systemd unit so that Kafka comes up again if you restart the machine.

Trying it out

After all this theory, we can now finally try this out. As mentioned above, I have prepared a set of Ansible scripts to set up virtual machines and install ZooKeeper and Kafka along the lines of the procedure described above. To run them, you will first have to install a few packages on your machine. Specifically, you need to install (if you have not done so yet) libvirt, Vagrant and Ansible, and install the libvirt Vagrant plugin. The instructions below work on Ubuntu Bionic, if you use a different Linux distribution, you might have to use slightly different package names and / or install the Vagrant libvirt plugin directly using the instructions here. Also, some of the packages (especially Java which we only need to be able to use the Java keytool) might already be present on your machine.

sudo apt-get update 
sudo apt-get install \
  libvirt-daemon \
  libvirt-clients \
  python3-pip \
  python3-libvirt \
  virt-manager \
  vagrant \
  vagrant-libvirt \
  git \
  openjdk-8-jdk
sudo adduser $(id -un) libvirt
sudo adduser $(id -un) kvm
pip3 install ansible lxml pyopenssl

Note that when installing Ansible as individual user as above, Ansible will be installed in ~/.local/bin, so make sure to add this to your path.

Next, clone my repository, change into the created directory, use virsh and vagrant up to start the network and the virtual machines and then run Ansible to install ZooKeeper and Kafka.

git clone https://github.com/christianb93/kafka.git
cd kafka
virsh net-define kafka-private-network.xml
wget http://mirror.cc.columbia.edu/pub/software/apache/kafka/2.4.1/kafka_2.13-2.4.1.tgz
tar xvf kafka_2.13-2.4.1.tgz
mv kafka_2.13-2.4.1 kafka
vagrant up
ansible-playbook site.yaml

Once the installation completes, it is time to run a few checks. First, let us verify that the ZooKeeper is running correctly on each node. For that purpose, SSH into the first node using vagrant ssh broker1 and run

/usr/share/zookeeper/bin/zkServer.sh status

This should print out the configuration file used by ZooKeeper as well as the mode the node is in (follower or leader).

Now let us see whether Kafka is running on each node. First, of course, you should check the status using systemctl status kafka. Then, we can see whether all brokers have registered themselves with ZooKeeper. To do this, run

sudo /usr/share/zookeeper/bin/zkCli.sh \
  -server broker1:2181 \
  ls /brokers/ids

on any of the broker nodes. You should get a list with the broker ids of the cluster, i.e. “[1,2,3]”. Finally, we can try to create a topic.

/opt/kafka/kafka_2.13-2.4.1/bin/kafka-topics.sh \
  --create \
  --bootstrap-server broker1:9092 \
  --replication-factor 3 \
  --partitions 2 \
  --topic test

Congratulations, you are now proud owner of a working three-node Kafka cluster. Having this in place, we are now ready to dive deeper into producing Kafka data, which will be the topic of our next post.

Learning Kafka with Python – the basics

More or less by accident, I recently took a closer look at Kafka, trying to understand how it is installed, how it works and how the Python API can be used to write Kafka applications with Python. So I decided its time to create a series of blog posts on Kafka to document my learnings and to (hopefully) give you a quick start if you plan to get to know Kafka in a bit more detail. Today, we take a first look at Kafkas architecture before describing the installation in the next post.

What is Kafka?

Kafka is a system for the distributed processing of messages and streams which is maintained as Open Source by the Apache Software foundation. Initially, Kafka was develop by LinkedIn and open-sourced in 2011. Kafka lets applications store data in streams (comparable to a message queue, more on that later), retrieve data from streams and process data in streams.

From the ground up, Kafka has been designed as a clustered system to achieve scalability and reliability. Kafka stores its data on all nodes in a cluster using a combination of sharding (i.e. distributing data across nodes) and replication (i.e. keeping the same record redundantly on several nodes to avoid data loss if a nodes goes down). Kafka clusters can reach an impressive size and throughput and can be employed for a variety of use cases (see for instance this post,this post or this posts to get an idea). Kafka uses another distributed system – Apache ZooKeeper – to store metadata in a reliable way and to synchronize the work of the various nodes in a cluster.

Topics and partitions

Let us start to take a look at some of the core concepts behind Kafka. First, data in Kafka is organized in entities called topics. Roughly speaking, topics are a bit like message queues. Applications called producers write records into a topic which is then stored by Kafka in a highly fault-tolerant and scalable way. Other applications, called consumers, can read records from a topic and process them.

Topic

You might have seen similar diagrams before, at least if you have ever worked with messaging systems like RabbitMQ, IBM MQ Series or ActiveMQ. One major difference between these systems and Kafka is that a topic is designed to be a persistent entity with an essentially unlimited history. Thus, while in other messaging systems, messages are typically removed from a queue when they have been read or expired, Kafka records are kept for a potentially very long time (even though you can of course configure Kafka to remove old records from a topic after some time). This is why the data structure that Kafka uses to store the records of a topic is called a log – conceptually, this is like a log file into which you write records sequentially and which you clean up from time to time, but from which you rarely ever delete records programmatically.

That looks rather simple, but there is more to it – partitions. We have mentioned above that Kafka uses sharding to distribute data across several nodes. To be able to implement this, Kafka somehow needs to split the data in a topic into several entities which can then be placed on different nodes. This entity is called a partition. Physically, a partition is simply a directory on one of the nodes, and the data in a partition is split into several files called segments.

Partitions

Of course, clients (i.e. producers and consumers) do not write directly into these files. Instead, there is a daemon running on each node, called the Kafka broker, which is maintaining the logs on this node. Thus if a producer wants to write into a topic, it will talk to the Broker responsible for the logs on the target node (which depends on the partition, as we will see in later post when we discuss producers in more detail), send the data to the Broker, and the Broker will store the data by appending it to the log. Similarly, a consumer will ask a Broker to retrieve data from a log (with Kafka, consumers pull for data, in contrast to some other messaging systems where data is pushed out to a consumer).

Records in a log are always read and written in batches, not as individual records. A batch consists of some metadata, like the number of records in the batch, and a couple of records. Each record again starts with a short header, followed by a record key and the record payload.

It is important to understand that in Kafka, the record key is NOT identifying a record uniquely. Actually, the record key can be empty and is mainly used to determine the partition to which a record will be written (again, we will discuss this in more detail in the post on producers). Instead, a record is identified by its offset within a partition. Thus if a consumer wants to read a specific record, it needs to specify the topic, the partition number and the offset of the record within the partition. The offset is simply an integer starting at zero which is increased by one for every new record in the topic and serves as a unique, primary key within this partition (i.e. it is not unique across partitions).

Replication, leaders and controllers

Sharding is one pattern that Kafka uses to distribute records across nodes. Within a topic, each partition will be placed on a different node (unless of course the number of nodes is smaller than the number of partitions, in which case some nodes will hold more than one partition). This allows us to scale a Kafka cluster horizontally and to maintain topics that exceed the storage capacity of a single node.

But what if a node goes down? In this case, all data on that node might be lost (in fact, Kafka heavily uses caching and will not immediately flush to disk when a new record is written, so if a node comes down, you will typically lose data even if your file system survives the crash). To avoid data loss in that case, Kafka replicates partitions across nodes. Specifically, for each partition, Kafka will nominate a partition leader and one or more followers. If, for instance, you configure a topic with a replication factor of three, then each partition will have one leader and two followers. All of those three brokers will maintain a copy of the partition. A producer and a consumer will always talk to the partition leader, and when data is written to a partition, it will be synced to all followers.

So let us assume that we are running a cluster with three Kafka nodes. One each node, there is a broker managing the logs on this node. If we now have a topic with two partitions and a replication factor of three, then each of the two partitions will be stored three times, once by the leader and two times on one of the broker nodes which are followers for this topic. This could lead to an assignment of partitions and replicas to nodes as shown below.

Replication

If in this situation one of the nodes, say node 1, goes down, then two things will happen. First, Kafka will elect a new leader for partition 0, say node 2. Second, it will ask a new node to create a replica for partition 1, as (even though node 1 was not the leader for partition 1) we now have only one replica for partition 1 left. This process is called partition reassignment.

To support this process, one of the Kafka brokers will be elected as the controller. It is the responsibility of the controller to detect failed brokers and reassign the leadership for the affected partitions.

Producers and consumers

In fact, the process of maintaining replicas is much more complicated than this simple diagram suggests. One of the major challenges is that it can of course happen that a record has been received by the leader and not yet synchronized to all replicas when the leader dies. Will these messages be lost? As often with Kafka, the answer is “it depends on the configuration” and we will learn more about this in one of the upcoming posts.

We have also not yet said anything about the process of consuming from a topic. Of course, in most situations, you will have more than one consumer, either because there is more than one application interested in getting the data or because we want to scale horizontally within an application. Here, the concept of a consumer group comes into play, which roughly is the logical equivalent of an application in Kafka. Within a consumer group, we can have as many consumers as the topic has partitions, and each partition will be read by only one consumer. For consumers, the biggest challenge is to keep track of the messages already read, which is of course essential if we want to implement guarantees like at-least-once or even exactly-once delivery. Kafka offers several mechanisms to deal with this problem, and we will discuss them in depth in a separate post on consumers.

In the next post of this series, we will leave the theory behind for the time being and move on to the installation process, so that you can bring up your own cluster in virtual machines on your PC to start playing with Kafka.