Learning Kafka with Python – producing data

As proud owners of a brand new Kafka installation, we are now ready to explore how applications interact with Kafka. Today, we will look at producers and understand how they write data to Kafka.

Basic design considerations

At first glance, writing data to Kafka sounds easy – connect to a Kafka broker and submit a message. However, there are some basic design considerations that are relevant when building a Kafka producer.

First, we have seen that Kafka stores the data in a topic in multiple partitions, and then each partition has a leader which is responsible for writing messages into the partition. Thus a producer needs to determine to which partition a message should be written and contact the responsible leader for this partition.

Defining the mapping of messages to partitions can be crucial for reliability and scalability of your application. Partitions determine how the application can scale horizontally, and we will learn later that partitions also determine to which extent consumers can scale. In addition, Kafka guarantees message order only within a partition. Specifically, if message A is written to partition before message B is written to the same partition, message A will receive a lower offset than B and will be read first by a (well behaving) consumer. This is no longer true if messages A and B are written to different partitions. Think of partitions as lanes on a highway – there is no guarantee that two cars entering the highway in a certain order but in different lanes will arrive at the destination in the same order.

Often, you will want to use a business entity to partition your data. If you are building a customer facing application, you might want to partition your data by customer group, if you are building a securities processing application the financial instrument might be a good partition criterion, if you are maintaining accounts then the account number might be a good choice and so forth. In other cases, where ordering is not important, you might go for a purely technical criterion.

The next fundamental question we have to figure out is when a message is considered to be successfully written. When the broker has received it? Or when the leader has written the message? Or should we wait until all followers have successfully stored the message? And what if a follower lags behind – should we stop writing messages until the follower has recovered or move on, accepting that we have lost one follower without knowing whether it will recover at a later time?

Kafka does not give a definitive answer to all these questions, but leaves you a choice – put differently, when you create a producer, you can specify its behavior using a variety of options. So let us now see how this is done in Python.

The producer object

Let us now see how a producer can be created using Python. If you have not yet done so, please install the Kafka Python library to be able to run the examples.

pip3 install kafka-python

This series uses version 2.0.1 of the library, if you want to use exactly that version you need to specify that as usual, i.e. run

pip3 install kafka-python==2.0.1

To send messages to Kafka, the first thing we need to do is to create a producer object, i.e. an instance of the class kafka.Producer. The init-method of this class accepts a large number of arguments, but in the most straightforward case, there is exactly one argument bootstrap_servers. This argument is a list of listener URLs, for instance 10.100.0.11:9092, which the producer will use to make an initial connection to a Kafka broker. This list does not need to contain all brokers, in fact one entry will do, but the producer will use this broker to obtain other brokers if needed. It is a good idea to list at least two or three brokers here, in case one broker is temporarily unavailable. So creating a producer could look like this.

import kafka
producer=kafka.KafkaProducer(bootstrap_servers=["broker1:9092", "broker2:9092"])

When started, a Producer will create a separate sender thread which will asynchronously send messages to the brokers. In addition, it will create an internal client which holds the actual connections to the Kafka cluster.

When using an SSL listener, we need a few additional configuration items. Specifically, we need to add the following named parameter when creating a KafkaProducer

  • ssl_cafile – this is the location of a CA certificate that the client will use to verify the certificate presented by the server
  • ssl_certfile – this is the location of the client certificate that the client will in turn present to the server when the server requests a certificate
  • ssl_keyfile – the key matching the client certificate

In order to be bit more flexible when it comes to connecting to different setups, the code examples that we will use in this series read the list of brokers and the SSL configuration from a YAML file config.yaml that the installation script will create in the subdirectory .state of the repository directory. All test scripts accept a parameter –config that you can use to overwrite this default location, in case you want to use your own configuration.

Once we are done using a producer, we should close it using producer.close() to clean up.

Keys and partitions

Once we have a producer in our hands, we can actually start to send messages. This requires only two parameters: the topic (a string) and the payload of the message (a sequence of bytes).

producer.send("test",value=bytes("hello", "utf-8"))

Note that this will create a topic “test” if it does not exist yet, using default values specified in the server configuration (server.properties), so be careful to use this as the default configuration might not be what you want (you can also turn this feature off by setting auto.create.topics.enable to false in server.properties).

Now you might remember from my previous post that a record in Kafka actually consists of a payload and a key. Here, we do not specify a key, so the key will remain empty. But of course, you can define a key for your record by simply adding the named parameter key to the method invocation, like this.

producer.send("test",
        value=bytes("hello", "utf-8"),
        key=bytes("mykey", "utf-8"))

What about partitions? The low-level protocol that a Kafka broker understands expects the client to send a PRODUCE request containing a valid partition ID, so it is up to the client to take this decision. The application programmer can either decide to explictly specify a partition ID (an integer) as an optional parameter to the send method, or let the framework take the decision. In this case, a so-called partitioner is invoked which, based on the value of the key, selects a partition to write to.

An application can set the configuration item partitioner when creating a producer to define a customer partitioner (which is simply a callable object that the producer will invoke). If no partitioner is specified, the default partitioner will be used, which implements the following logic.

  • If no key is given, the default partitioner will simply distribute the messages randomly across the available partitions
  • If a key is provided, a hash value of the key will be computed (using a so-called MurmurHash, which will always be an integer. The value of this hash (more precisely, of its last 31 bits) modulo the number of partitions will then determine the partition to use

The important thing to keep in mind is that if you do not provide a key, your message will end up in a random partition. If you do provide a key, then Kafka will guarantee that messages with the same key will go to the same partition and hence be processed in order.

Serialization

In our examples so far, we have passed a sequence of bytes to the send method, both for the key and the value. This is the format that the low-level protocol expects – at the end of the day, keys and values are sent over the wire as a sequence of bytes, and stored as a sequence of bytes.

In many applications, however, you will want to store more complex data types, like JSON data or even objects. So be able to do this with Kafka, you will have to convert your data into a sequence of bytes when sending the data, a process known as serializing.

When creating the producer, you can specify your own serializers for keys and payloads by adding the named parameters key_serializer and value_serializer when creating the producer. Here, a serializer can either be a function which accepts whatever input format you prefer and returns a sequence of bytes, or an instance of the class kafka.serializer.Serializer which has a serialize method which the framework will invoke.

Suppose for instance you wanted to serialize JSON data. Then, you need to provide a serializer which accepts a JSON object and returns a sequence of bytes. For that purpose, we can use the standard json.dumps method to first produce a string, and then encode the string using e.g. UTF-8 to obtain a sequence of bytes. Thus your serializer would look something like

def serialize(data):
    return bytes(json.dumps(data), "utf-8")

and when creating the producer, the call would be something like

producer=kafka.KafkaProducer(..., 
   value_serializer=serialize, ...)

Choosing a reasonable serializer is an important design choice. As Kafka topics are designed to be durable objects, you need to think about things like versioning when the decoding changes as you release new features, and obviously all components of a system need to use matching serializers and de-serializers to be able to exchange data. Many Kafka projects actually use third-party serializers, like Apache Avro or Google’s protobuf.

Acknowledgements

So far, we have seen how we can send messages using the send method of a KafkaProducer object. But in reality, you of course want to know whether your message was successfully send and stored by Kafka.

This leads us to the question at which point a new record can be considered to be committed to a Kafka cluster, i.e. stored and available for consumers. Before getting into this, however, we first have to understand the notion of an in-sync replica.

Recall that Kafka replication works by designating a leader for a partition and zero, one or more followers which constantly ask the leader for new records in the partition and store them in their own copy of the partition log. As the replicas read the records from the leader, the leaders knows which record has been delivered to which follower. The partition can therefore determine whether a replica is out-of-sync, which happens if a follower fails to retrieve the latest message within a defined time frame, or in-sync.

Having enough in-sync replicas is vital for the reliability. If a leader goes down, Kafka has to elect a new leader from the set of available replicas. Of course, choosing an out-of-sync replica to be the new leader would imply that we promote a replica to the master and thus to our new source of truth that not yet replicated all messages that producers have sent to the leader. Thus, making such a replica the new leader results in a loss of records. In some situations, you might still opt to do so, which Kafka allows you if the parameter unclean.leader.election.enable is set to True in the broker configuration.

InSyncOutOfSync

Now let us come back to the question of when a message sent by a producer will be considered committed. Again, Kafka offers you a choice, governed by the value of the parameter acks of a producer.

  • When acks = 1 (the default), a message will be considered committed once the leader acknowledges the message. Thus Kafka guarantees that a committed message has been added to the leading partition, but not that it has already been written to one or even all replicas. Note that, as the leader might cache the record in memory, this can lead to data loss if the leader goes down after acknowledging the message, but before a follower has copied it
  • When acks = -1 (all), a record will be considered committed only once the leader and in addition all in-sync replicas have acknowledged receipt of the message. As long as you have enough in-sync replicas, this gives you a strong guarantee that the message is available on several nodes and thus data loss has become very unlikely
  • Finally, a value of acks = 0 means that the message will be considered committed once it has been sent over the network, regardless of any acknowledgement from the leader or a follower. This obviously is a very weak guarantee and only reasonable if you have a strong focus on throughtput and can live with a loss of (potentially many) records

When usings acks = all, the number of in-sync replicas is of course vital. To illustrate this, let us assume that you have configured a topic with three replicas, but both followers have become out-of-sync. Now a message will be committed once the leader has written it, meaning that if the leader is lost, the record will be lost as well. To avoid such a scenario, you can set the server property min.insync.replicas. This number determines how many replicas (including the leader) need to be in-sync in order to still accept new messages. Thus if you use, for instance, a topic with a replication factor of three and min.insync.replicas=2 in combinations with acks=-1, then Kafka will guarantee that a message is only reported as committed once the leader and at least one follower have received the record.

Acks

Finally, there is one more parameter that is important for the reliability of a producer – max_in_flight_requests_per_connection. A request (which typically contains more than one record to be written) will be considered as in-flight as long as no result – either an acknowledgement or an error – has been received from the broker. If this parameter (which defaults to 5!) is set to a value greater than one, this implies that the producer will not wait for an acknowledgement before sending the next batch. In combination with retries, this can imply that the order in which messages are added to the log is not identical to the order in which they have been sent, and if the producer goes down, in-flight messages night be lost if the broker is not able to process them. Thus set this to one if you need strong guarantees on at-least-one delivery and ordering.

Retries and error handling

Finally, the last important design decision that you need to take when writing a producer is how to deal with errors.

The send method that we are using delivers messages asynchronously to the actual sender and immediately returns. To figure out whether the record was successfully committed, we therefore cannot simply use its return value, but need a different approach. Therefore, the send method returns a handler which can later be used to retrieve the status of the message, a so called Future, or, more precisely, a subclass called FutureRecordMetadata. Once we have this object, we can call its get method with a timeout in seconds to wait for the request to complete. If the request was successful, this method returns a dictionary containing record metadata, otherwise it raises an exception of type kafka.errors.KafkaError. Alternatively, you can also specify callback functions for successful and failed sends.

Note that the producer also has an option to automatically retry failed messages, which can be configured by setting the parameter retries to a value different from zero. In general, however, you should be careful with this as it might conflict with ordering, see the discussion of in-flight requests above.

Testing our producer

After all this theory, it is now time to test our producer. I assume that you have followed my previous post and installed Kafka in three virtual nodes on your PC. Now navigate to the root of the repository and run the following command to create a test topic.

./kafka/bin/kafka-topics.sh \
  --bootstrap-server=$(./python/getBrokerURL.py) \
  --command-config=.state/client_ssl_config.properties \
  --create \
  --topic test \
  --replication-factor 3 \
  --partitions 2 

Let us see what this command is doing. The script that we run, kafka-topics.sh, is part of the standard Kafka admin command line tools that are bundled with the distribution. In the second line, we invoke a little Python script that evaluates the configuration in YAML format which our installation procedure has created to determine the URL of a broker, which we then pass to the script. In the third line, we provide a Java properties file containing the SSL parameters to connect to our secured listener.

The remaining switches instruct the tool to create a new topic called “test” with a replication factor of three and two partitions (which, of course, will fail if you have already created this topic in the previous post).

Next, we will run another tool coming with Kafka – the console consumer. This is a simple consumer that will simply subscribe to a topic and dump all records in this topic to the console. To run it, enter

kafka/bin/kafka-console-consumer.sh   \
   --bootstrap-server $(./python/getBrokerURL.py)   \
   --consumer.config .state/client_ssl_config.properties \
   --from-beginning \
   --topic test 

Now open an additional terminal, navigate to the root of the repository and run the producer.

python3 python/producer.py

This should print the producer configuration used and the number of messages produced, plus timestamps and the number of seconds and microseconds it took to send all messages. In the first terminal window, in which the consumer is running, you should then see this messages flicker by.

Now let us try out a few things. First, let us create 10000 messages with a set of configurations promising the highest throughput (acks=0, fully asynchronous send, no keys provided, five requests in flight).

python3 python/producer.py \
  --messages=10000 \
  --ack=0

On my PC, producing these 10000 messages takes roughly half a second, i.e. our throughput is somewhere around 20.000 messages per seconds, without any tuning (of course the results will heavily depend on the machine on which you are running this). Next, we produce again 10000 messages, but this time, we use very conservative settings (acks=all, only one message in flight, wait for reply after each request, create and store keys and use them to determine the partition).

python3 python/producer.py \
  --messages=10000 \
  --ack=-1 \
  --max_in_flight_requests_per_connection=1 \
  --wait \
  --create_keys

Obviously, this will be much slower. On my PC, this took roughly 17 seconds, i.e. it is slower by a factor of about 25 than the first run. This hopefully illustrates nicely that Kafka leaves you many choices for trade-offs between performance and availability. Use this freedom with care and make sure you understand the consequences that the various settings have, otherwise you might loose data!

Putting it all together – the send method behind the scenes

Having seen a producer in action, it is instructive to take at a short look at the source code of the Python implementation of KafkaProducer, specifically at its send method. First, the partitions of the topic are retrieved, either from cached metadata or by requesting updated metadata from the server. Then, the key and value are serialized, and the partitioner is invoked which determines the partition to which we write the record.

Next, instead of directly sending the record to the broker, it is appended to an internal buffer using an internal helper class called a RecordAccumulator. If the accumulator signals that the buffer is full, the sender thread is triggered which will then actually transmit the entire batch to the Kafka broker. Finally, the future object is returned.

SendingData

This completes our discussion of Kafka producers. In the next post, we will learn how we can consume data from Kafka and how consumers, producers and brokers play together.

1 Comment

Leave a Comment

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s