Learning Kafka with Python – retries and idempotent writes

In the past few posts, we have discussed approaches to implement at-least-once processing on the consumer side, i.e. mechanisms that make sure that every record in the partition is only processed once. Today, we will look at a similar problem on the producer side – how can we make sure that every record is written into the partition only once? This sounds easy, but can be tricky if we need to retry failed message without knowing the exact error that has occured.

The retry problem

In the sample producer that we have looked at in a previous post, we missed an important point – error handling. The most important error that a reliable producer needs to handle is an error when handing over a new record to the broker.

In general, Kafka differentiates between retriable errors, i.e. transient errors like individual packets being lost on the network, and non-retriable errors, i.e. errors like an invalid authorization for which a retry does not make sense. For most transient errors, the client will – under the hood – automatically attempt a retry if a record could not be sent.

Let us take a short look at the Java producer as an example. When a batch of records has been sent to the broker as a ProduceRequest, the response is handled in the method handleProduceResponse. Here, a decision is made whether an automatic retry should be initiated, in which case the batch of records will simply be added to the queue of batches to be sent again. The logic to decide when a retry should be attempted is contained in the method canretry, and in the absence of transactions (see the last section of this post), it will decide to retry if the batch has not timed-out yet (i.e. has been created more than delivery.timeout.ms before), the error is retriable and the number of allowed retries (set via the parameter retries) has not yet been reached. Examples for retriable exceptions are exceptions due to a low number of in-sync replicas, timeouts, connection failures and so forth.

This is nice, but there is a significant problem when using automated retries. If, for instance, a produce request times out, it might very well be that this is only due to a network issue and in the background, the broker has actually stored the record in the partition log. If we retry, we will simply send the same batch of records again, which could lead to duplicate records in the partition log. As these records will have different offsets, there is no way for a consumer to detect this duplicate. Depending on the type of application, this can be a major issue.

If you wanted to solve this on the application level, you would probably set retries to zero, implement your own retry logic and use a sequence number to allow the consumer to detect duplicates. A similar logic referred to as idempotent writes has been added to Kafka with KIP-98 which was implemented in release 0.11 in 2016.

What are idempotent writes?

Essentially, idempotent writes use a sequence number which is added to each record by the producer to allow the broker to detect duplicates due to automated retries. This sequence number is added to a record shortly before it is sent (more precisely, a batch of records receives a base sequence number, and the sequence number of a record is the base sequence number plus its index in the batch), and if an automated retry is made, the exact same batch with the same sequence number is sent again. The broker keeps track of the highest sequence number received, and will not store any records with a sequence number smaller than or equal to the currently highest processed sequence number.

To allow all followers to maintain this information as well, the sequence number is actually added to the partition log and therefore made available to all followers replicating the partitions, so that this data survives the election of a new partition leader.

In a bit more detail, the implementation is slightly more complicated than this. First, it would imply a high overhead to maintain a globally unique sequence number across all producers and partitions. Instead, the sequence number is maintained per producer and per partition. To make this work, producers will be assigned a unique ID called the producer ID. In fact, when a producer that uses idempotent writes starts, it will send an InitPidRequest to the broker. The broker will then assign a producer ID and return it in the response. The producer stores the producer ID in memory and adds it to all records being sent, so that the broker knows from which producer a record originates. Similar to the sequence number, this information is added to the records in the partition log. Note, however, that neither the producer ID nor the sequence number are passed to a consumer by the consumer API.

How does the broker determine the producer ID to be assigned? This depends on whether idempotent writes are used in combination with transactions. If transactions are used, we will learn in the next post that applications need to define an ID called transaction ID that is supposed to uniquely identify a producer. In this case, the broker will assign a producer ID to each transaction ID, so that the producer ID is effectively persisted across restarts. If, however, idempotent writes are used stand-alone, the broker uses a ZooKeeper sequence to assign a sequence number, and if a producer is either restarted or (for instance due to some programming error) sends another InitPidRequest, it will receive a new producer ID. For each new partition assigned to a producer not using transactions, the sequence number will start again at zero, so that the sequence number is only unique per partition and producer ID (which is good enough for our purpose).

Another useful feature of idempotent writes is that a Kafka broker is now able to detect record batches arriving in the wrong order. In fact, if a record arrives whose sequence number is higher than the previously seen sequence number plus one, the broker assumes that records got lost in flight or we see an ordering issue due to a retry and raises an error. Thus ordering is now guaranteed even if we allow more than one in-flight batch.

Trying it out

Time again to try all this. Unfortunately, the Kafka Python client that we have used so far does not (yet) support KIP-98. We could of course use a Java or Go client, but to stick to the idea of this little series to use Python, let us alternatively employ the Python client provided by Confluent.

To install this client, use

pip3 install confluent-kafka==1.4.1

Here I am using version 1.4.1 which was the most recent version at the time when this post was written, so you might want to use the same version. Using the package is actually straightforward. Again, we first create a configuration, then a producer and then send records to the broker asynchronously. Compared to the Kafka Python library used so far, there are a few differences which are worth being noted.

  • Similar to the Kafka Python library, sends are done asynchronously. However, you do not receive a future when sending as it is the case for the Kafka Python library, but you define a callback directly
  • To make sure that the callback is invoked, you have to call the poll method of the producer on a regular basis
  • When you are done producing, you have to explicitly call flush to make sure that all buffered messages are sent
  • The configuration parameters of the client follow the Java naming conventions. So the bootstrap servers, for instance, are defined by a configuration parameter called bootstrap.servers instead of bootstrap_servers, and the parameter itself is not a Python list but a comma-separated list passed as a string
  • The base producer class accepts bytes as values and does not invoke a serializer (there is a derived class doing this, but this class is flagged as not yet stable in the API documentation so I decided not to use it)

To turn on idempotent writes, there are a couple of parameters that need to be set in the producer configuration.

  • enable.idempotence needs to be 1 to turn on the feature
  • acks needs to be set to “all”, i.e. -1
  • max.in.flight should be set to one
  • retries needs to be positive (after all, idempotent writes are designed to make automated retries safe)

Using these instructions, it is now straightforward to put together a little test client that uses idempotent writes to a “test” topic. To try this, bring up the Kafka cluster as in the previous posts, create a topic called “test” with three replicas, navigate to the root of the repository and run

python3 python/idempotent_writes.py

You should see a couple of messages showing the configuration used and indicating that ten records have been written. To verify that these records do actually contain a producer ID and a sequence number, we need to dump the log file on one of the brokers.

vagrant ssh broker1
/opt/kafka/kafka_2.13-2.4.1/bin/kafka-dump-log.sh \
  --print-data-log \
  --files /opt/kafka/logs/test-0/00000000000000000000.log

The output should look similar to the following sample output.

Dumping /opt/kafka/logs/test-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 9 count: 10 baseSequence: 0 lastSequence: 9 producerId: 3001 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1589818655781 size: 291 magic: 2 compresscodec: NONE crc: 307611005 isvalid: true
| offset: 0 CreateTime: 1589818655780 keysize: -1 valuesize: 16 sequence: 0 headerKeys: [] payload: {"msg_count": 0}
| offset: 1 CreateTime: 1589818655780 keysize: -1 valuesize: 16 sequence: 1 headerKeys: [] payload: {"msg_count": 1}
| offset: 2 CreateTime: 1589818655780 keysize: -1 valuesize: 16 sequence: 2 headerKeys: [] payload: {"msg_count": 2}
| offset: 3 CreateTime: 1589818655780 keysize: -1 valuesize: 16 sequence: 3 headerKeys: [] payload: {"msg_count": 3}
| offset: 4 CreateTime: 1589818655780 keysize: -1 valuesize: 16 sequence: 4 headerKeys: [] payload: {"msg_count": 4}
| offset: 5 CreateTime: 1589818655780 keysize: -1 valuesize: 16 sequence: 5 headerKeys: [] payload: {"msg_count": 5}
| offset: 6 CreateTime: 1589818655780 keysize: -1 valuesize: 16 sequence: 6 headerKeys: [] payload: {"msg_count": 6}
| offset: 7 CreateTime: 1589818655780 keysize: -1 valuesize: 16 sequence: 7 headerKeys: [] payload: {"msg_count": 7}
| offset: 8 CreateTime: 1589818655780 keysize: -1 valuesize: 16 sequence: 8 headerKeys: [] payload: {"msg_count": 8}
| offset: 9 CreateTime: 1589818655781 keysize: -1 valuesize: 16 sequence: 9 headerKeys: [] payload: {"msg_count": 9}

Here, the third line contains the header of the entire record batch. We see that the batch contains ten records, and we find a producer ID (3001). In each of the records, we also see a sequence number, ranging from 0 to 9.

Transactions

When you read KIP-98, the Kafka improvement proposal with which idempotent writes where introduced, then you realize that the main objective of this KIP is not just to provide idempotent writes, but to be able to handle transactions in Kafka. Here, handling transactions does not mean that Kafka somehow acts as a distributed transaction manager, joining transactions of a relational database. It does, however, mean that writes and reads in Kafka are transactional in the sense that a producer can write records within a transaction, and consumers will either see all of the records written as part of this transaction or none of them.

This makes it possible to model scenarios that occur quite often in business applications. Suppose, for instance, you are putting together an application handling security deposits. When you sell securities, you produce one record which will trigger the delivery of the securities to the buyer, and a second record that will trigger the payment that you receive for them. Now suppose that the first record is written, and them something goes wrong, so that the second record cannot be written. Without transactions, the first record would be in the log and consumers would pick it up, so that the security side of the transaction would still be processed. With transactions, you can abort the transaction, and the record triggering the security transfer will not become visible for consumers.

We will not go into details about transactions in this post, but KIP-98 is actually quite readable. I also recommend that you take a look at this well written blog post on the Confluent pages that provides some more background and additional links.

With that, it is time to close this short series on Kafka and Python. I hope I was able to give you a good introduction into the architecture and operations of a Kafka cluster and a good starting point for own projects. Happy hacking!

Leave a Comment

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s