Learning Kafka with Python – a deep dive into consumers and rebalancing

In the previous posts, we have already used the Python client to implement Kafka consumers. Today, we will take a closer look at the components that make up a consumer and discuss their inner workings and how they communicate with the Kafka cluster.

High level overview of the consumer

Our discussion will be based on the Kafka Python library, which seems to be loosely modeled after the Java consumer which is part of the official Apache Kafka project, so that the underlying principles are the same. These notes are based on version 2.0.1 of the library, the design might of course change in future versions (and has already changed substantially in the past).

Looking at the code, we see that roughly speaking, the consumer consists of three parts – the actual consumer in the package kafka.consumer, the coordinator which is responsible for talking to the group coordinator and assign partitions in the package kafka.coordinator and the network client in the top-level package which is used by other parts of the library as well. Broken down to the level of modules and classes, the following diagram shows the most important components of the consumer and their relations.

KafkaConsumerComponents

Let us start our discussion with the class on the left hand side of the diagram, the subscription state. This class is used to manage the topics and partitions a consumer has subscribed to as well as the positions of the consumer within these partitions. Note that these positions are not the committed offsets, but are the positions maintained locally (and in-memory) by the consumer that are used to determine the offset that the next fetch will use. Initially, there is no valid position for a newly assigned partition, and the partition is considered fetchable only once a position has been determined.

The second class which is used by the consumer is the fetcher. As the name suggests, this class is in charge for actually fetching data and offsets from the leader of a partition (here, offsets does not refer to committed offsets, but to the valid offsets, i.e. the first and last offset of a partition).

Fetching records from the partition leader typically works asynchronously. As an example, let us consider the method send_fetches. As indicated above, a partition is called fetchable if there is a valid position for it, the partition has not been paused and there are no unfetched records already present in the cache. After creating a list of all fetchable partitions, the send_fetches method then figures out the partition leader and assembles a fetch request. These requests are then sent to the respective partition leader using the client object. This operation returns a future, i.e. a handle which can be used to asynchronously track the progress of the fetch operation. Attached to this future, there is a callback operation. When the records are sent from the partition leader to the consumer, the client object will invoke this callback which will then add the returned records to a queue maintained by the fetcher. From there, it is retrieved when a consumer calls the method fetched_records.

It is in this function where the positions are actually updated, so that the position really reflects the records that have been consumed, not those which have been received by the fetcher but are still in the queue. Note that records are skipped if a partition has become unfetchable in the meantime or if the offset does not match the expected value in the original request.

The following diagram shows a simplified view of how records are fetched (some important details are skipped, for instance the deserialization that takes place when fetched records are removed from the queue and handed over to the consumer).

FetchingRecords

Coordinating group membership and partition assignments

Apart from fetching records, a core responsibility of the consumer is to manage the membership in a consumer group and to handle assigned partitions. This is done by the coordinator. The coordinator communicates with the group coordinator (which is one dedicated broker per consumer group) to trigger the addition and removal of group members and to balance partitions between group members. In addition, the coordinator is responsible for managing committed offsets.

Looking at the source code of the coordinator, we can see how the process of adding members to the group and assigning partitions works. This process, commonly referred to as rebalancing, typically starts when a consumer invokes the poll method of the coordinator. When this happens, the coordinator will first check whether it needs to join (or rejoin) the group, for instance because the consumer was just started. If yes, the processing in ensure_active_group will first prepare the join process, for instance by committing all offsets if auto-commit is enabled and calling the revoke method of all registered rebalance listeners (conceptually, when a rebalancing starts, all existing members will loose ownership of previously handled partitions and consequently stop processing records so that the group coordinator can reassign partitions freely – there is an ongoing effort known as cooperative rebalancing with the objective to change this).

We then wait until there are no more in-flight requests to the coordinator, and then send a JoinGroupRequest to the group coordinator. The group coordinator (broker) will wait until all members have handed in their requests (see below for more on the timeline) and then determine one member to be the group leader. As part of the JoinGroupResponse, every consumer will be informed about the newly elected leader. The group leader will then perform the actual assignment of partitions to group members (using a configurable assignor). Then, all group members send another request to the group coordinator, called the SyncGroupRequest. In this request, the group leader will inform the group coordinator about the defined partition assignments, and in the response to this message, the partition assignments will be distributed to all group members.

Once the SyncGroupResponse has been received, the method ensure_active_group will invoke _on_join_complete which will in turn trigger a call of the on_partitions_assigned method of all registered rebalance listeners. Note that at this point, all exceptions raised by the listener are swallowed, so exceptions should be caught and handled inside the listener.

This is all nice if our own consumer joins a group, but what happens if another consumer joins? This is where the heartbeat thread comes into play. This is a thread which is running in the background and periodically sending heartbeat messages to the group coordinator (with a frequency determined by the parameter heartbeat_interval_ms). If a rebalancing has been initiated by another member joining or leaving, the heartbeat response will have an error flag set, so that the consumer learns about the start of the rebalancing process. It then sets a flag, which will be evaluated during the next call of the coordinators poll method, which is in turn invoked from the consumers poll loop. If this flag is set, the coordinator will rejoin the group following the process outlined above.

At this point, timing is vital. If a consumer does not call the poll method for a long period of time, it might miss a rebalancing and will forcefully be removed from the group. This again will lead to errors when the consumer tries to commit offsets, which are difficult to handle and almost inevitably lead to duplicate processing. In general, a consumer should invoke the poll method on a regular basis, and there is again a parameter (max_poll_interval_ms) which determines the maximum allowed time between two subsequent invocations of this method.

Indirectly, this parameter also determines how long the group coordinator will wait for members to join the group (it is sent to the group coordinator as part of the join group request). The following diagram shows the typical sequence of events when a new member joins a group and triggers a rebalancing.

RebalancingProtocolTimeline

The consumers poll loop

After all these preparations, we are now ready to discuss the poll method of the Kafka consumer. In this method (or rather the private method _poll_once), we first use the coordinator and its poll method discussed above to verify that the consumer is part of a group and has partitions assigned and to trigger a rebalancing process if needed. Note that if a rebalancing is needed, this call will block so that it is made sure that we only reach the main part of the consumers poll method after the rebalancing is done.

Next, we will typically have to update all fetch positions. This happens in several steps.

  • call the method reset_offsets_if_needed of the fetcher. This method will check a flag to see if any offsets need to be reset. If yes, it will retrieve the valid offsets and apply the chosen offset reset strategy
  • if there are still partitions which do not have a valid position, we call the method refresh_committed_offsets_if_needed of the coordinator which will fetch the committed offsets from the group coordinator
  • Then, the method update_fetch_positions of the fetcher is invoked which will set the fetch positions of the partitions in question to the committed value

Back in _poll_once, we then check whether the fetcher has any previously obtained records still in its queue. If yes, we immediately return this data (and at the same time initiate a pre-fetch of the next records). Recall that the process of getting these queued records also triggers the update of the position. Then, new fetches are sent, and we poll the client until we either time out or obtain new records which we then return.

Summarizing, the diagram below displays the (slightly simplified) flow of events in case a consumer calls poll (where some calls indicated in the diagram are not made every time, depending on available fetch positions and committed offsets).

PollOnceCaseOne

From what we have said above, it is now clear that a rebalancing listener is always invoked from within the poll method – which also implies that you should not spend too much time in a rebalance listener and not make any blocking calls.

This completes our short summary of the processing inside the Kafka consumer. With this introduction and using the Java library and the rich comments inside the code, you should now be able to dig deeper into the bits and pieces if needed.

Learning Kafka with Python – implementing a database sink

Very often, either the source or the target of a Kafka based message queue is a classical relational database. Consuming data and using it to update a database table sounds straightforward, but poses a few challenges around reliability and delivery semantics. In this post, we look into two options to realize such an architecture.

The challenge

To illustrate the problem we are aiming to solve, let us suppose that we want to build an application that maintains an account balance. There is a front-end which acts as a Kafka producer and which a customer can use to either deposit money in the account or withdraw money. These transactions are then written into a Kafka topic, and a consumer reads from this topic and updates the balance kept in a relational datastore.

DatabaseSink

Thus the messages stored in the Kafka topic contains transactions, i.e. changes in the account balance, while the database table we need to maintain contains the actual balance. This is an example of what is called stream / table duality in the world of streaming – the event stream is the source of truth and reflects changes, the database contains the resulting state of the world after all changes have been applied and can at any time be reconstructed from the stream.

When we want to implement his pattern, the crucial part of our design will be to make sure that every message in the queue leads to only one update of the balance, so that no transaction is missed and no transaction is processed twice.

Pattern 1: using a message ID and de-duplication

The first pattern we could use to make this work is to achieve de-duplication based on a unique message ID, which ideally is an integer that is increased with every message. The consumer could then store the sequence number of the last processed message in the database, and could thus detect duplicates.

In a bit more detail, this would work as follows. When the producer processes an action, it first retrieves a unique message ID. This message ID could be created using a database sequence, or – if the used database does not allow for this – it could read a sequence number from a table, increment it by one and update the table accordingly. It would then add this sequence number as a key to the Kafka message.

The consumer would store the latest processed sequence number in the database. When it reads a mesage from Kafka, it uses this number to check whether the message has already been processed before. If not, it updates balance and sequence number in one transaction and then commits the new offset to Kafka.

DatabaseSinkPatternI

Let us see how duplicates are handled in this pattern. If, for some reason, the topic contains two messages with the same ID, the consumer will process the first message, increase the latest processed sequence number in the database and commit the new balance. It will then read the duplicate, compare its sequence number against the latest value, detect the duplicate and simply ignore it. Thus the consumer is able to do a de-duplication based on the sequence number.

Unfortunately, this simple pattern has one major disadvantage – it does not work. The problem is that the order of messages is not guaranteed across partitions. Suppose, for instance, that the producer creates the following messages:

  • Message 1, partition 0
  • Message 2, partition 1
  • Message 3, partition 0

Now it might happen that the consumer processes the messages in the order 1,3,2. Thus the consumer would, after having processed message 3, set the highest consumed sequence number to “3” in the database. When now processing message two, our simple duplicate detection algorithm would then classify this message as a duplicate. Thus, to make this work, it is vital that we store the highest processed sequence number by partition and not across all partitions. We could even create the sequence number per partition, which would also remove a possible bottleneck as creating the sequence number would otherwise effectively serialize the producers.

Also note that, as we need to maintain a “last processed” sequence number per partition in a database, we also need to maintain this table if we add new partitions to our topic or remove partitions.

Alternatively, if there is a message ID which is not ordered and increasing with each message, the consumer could store all processed message IDs in a separate database table to keep track of the messages that have already been processed (which, depending on the throughput, might require some sort of periodic cleanup to avoid that the table grows too big).

If the consumer fails after committing the changed balance to the database, but before committing the offset to Kafka, the same mechanism will kick in, as long as we commit the new balance and the updated value of the last processed message in one database transaction. Thus, duplicates can be detected, and we can therefore rely on the standard mechanisms Kafka offers to manage the offset – we could even read entire batches from the topic and use auto-commit to let Kafka manage the offset.

Let us now try out this pattern with the code samples from my repository. To be able to run this, you will have to clone my GitHub repository and follow the instructions in my initial post in this series to bring up your local Kafka cluster. Then, make sure that your current working directory is the root directory of the repository and run the following commands which will install the Python package to access a MySQL database and bring up a Docker based installation of MySQL with a prepared database.

pip3 install mysql-connector-python
./db/createDB.sh

This script will start a Docker container kafka-mysql running MySQL, add a user kafka with a default password to it and create a database kafka for which this user has all privileges. Next, let us run a second script which will (re)-create a Kafka topic transactions with two partitions and initialize the database.

./db/reset.sh

Now we run three Python scripts. The first script is the producer that we have already described, which will simply create ten records, each of which describing a transaction. The second script is our consumer. It will

  • subscribe to the transactions topic
  • Read batches of records from the topic
  • for each record, it will (in one transaction!) update the account balance and the last processed sequence number
  • commit offsets in Kafka after processing a batch
  • apply the duplicate detection mechanism outlined above while processing each record

Finally, the third script is a little helper that will (without committing any offsets, so that we can run it over and over again) scan the topic, calculate the expected account balances, retrieve actual account balances from the database and check whether they coincide.

python3 db/producer1.py
python3 db/consumer1.py
python3 db/dump1.py --check

Let us now try to understand how our scripts work if an error occurs. For that purpose, the consumer has a built-in mechanism to simulate random errors between committing to the database and committing offsets to Kafka which is activated using the parameter –error_probability. Let us repeat our test run, but this time we simulate an error with a probability of 20%.

./db/reset.sh && python3 db/producer1.py
python3 db/consumer1.py --error_probability=0.2 --verbose
python3 db/dump1.py --check

You should now see that the consumer processes a couple of messages before our simulated error kicks in, which will make the consumer stop. The check script should detect the difference resulting from the fact that not all messages have been processed. However, when we now restart the script without simulating any errors, we should see that even though there are duplicate records, these records are properly detected and eventually, all records will be processed and the balances will again be correct.

python3 db/consumer1.py --verbose
python3 db/dump1.py --check

Pattern 2: store the consumer offset in the target database

Let us now take a look at an alternative implementation (which is in fact the pattern which you will hit upon first when consulting the Kafka documentation or other sources) – maintaining the offsets in the database altogether. This implementation does not require a sequence number generated by the producer, but is therefore also not able to detect any duplicate messages in case the duplicates originate already in the producer.

The idea behind this pattern is simple. Instead of asking Kafka to maintain offsets for us, the consumer application handles offsets independently and maintains a database table containing offsets and partitions. When a record is processed, the consumer opens a database transaction, updates the account balance and updates the offset in the same transaction. This guarantees that offsets and balances are always in sync.

This sound simple enough, but there are a few subtleties that need to be kept in mind when this is combined with consumer groups, i.e. if Kafka handles partition assignments dynamically. Whenever a partition is assigned to our consumer, we need to make sure that we position the consumer at the latest committed offset before processing any records. Conversely, when an assignment is revoked, we need to commit the current offsets to make sure that they are not lost. This can be implemented using a rebalance listener which is registered when we subscribe to the topic.

DatabaseSinkPatternII

To try this out, run the following commands which will first reset the database and the involved topic, run the producer to create a few messages (this time without the additional sequence number) and then run our new consumer to read the messages and update the database.

./db/reset.sh && python3 db/producer2.py
python3 db/consumer2.py 
python3 db/dump2.py --check

The last command, which again checks the updated database table against the expected values, should again show you that expected and actual values in the database match.

It is instructive to try a few more advanced scenarios. You could, for instance, run the producer and then start a consumer which reads the first set of messages produced by the consumer (use the switch –runtime=3600 to make this consumer run for one hour). Then, start a second consumer in a separate terminal window and observe the rebalancing that occurs. Finally, run the producer again and verify that the partition assignment worked and both consumers are processing the messages in their respective partition. And again, you can simulate errors along the way and see how the consumer behaves.

Learning Kafka with Python – consuming data

We now understand how Kafka producers add data to partitions. So let us move on and take a look at consumers – how they operate, how they are configured and how different levels of reliability and delivery guarantees can be achieved.

Consumer groups

In the previous post on producers, we have seen that the interaction between a producer and a Kafka broker is rather simple. Basically, producers request metadata to obtain data on partitions and leading brokers and then send records to the partition leader. The Kafka broker does not keep track of a producers state, and producers can actually come and go without Kafka even noticing it (this is a bit different when transactions are used, as in this case, the broker needs to keep track of the producers state as well, but this is beyond the scope of this post).

For consumers, the situation is different. The main reason for this different design is that while a producer determines itself to which partition data is written, a consumer typically lets Kafka make this decision. In this programming model, Kafka distributes the available partitions to the available consumers, trying to balance the load evenly. If a new consumer appears, Kafka will assign partitions to it, and if a consumer goes down, Kafka will re-assign these partitions to one of the remaining consumers. To make this work, Kafka needs to keep track of the state of a consumer (in fact, a consumer is expected to send periodic heartbeats so that Kafka can detect when a consumer goes down, and Kafka tracks the state of consumers as part of the ZooKeeper data structures).

To better understand how this works, we first have to understand consumer groups. Logically, a consumer group is very similar to an application – it is a logical entity reading data from a topic. If, for instance, you are using Kafka to distribute instrument master data in a securities processing application, there will typically be different application components that need this data – say a trading frontend, a settlement module or a tax processing module. So each of these application components could be set up as a consumer group in Kafka, so that they all obtain records from the instrument master data topic independently, similar to the pub/sub semantics of a traditional messaging system.

To increase scalability and fault tolerance, there can be many consumers inside a consumer group, but Kafka will try to make sure that within a consumer group, every message is delivered to only one consumer.

ConsumerGroups

To achieve this, Kafka will assign partitions in a topic to consumers within a consumer group. To make sure that each message is only processed once by one consumer group, each partition can be assigned to only one consumer, but if there are more partitions than consumer, a single consumer can read from more than one partition (so scaling the number of consumers beyond the number of partitions will lead to idle consumers).

ConsumerGroupsPartitions

It is worth mentioning that this is not the only programming model supported by Kafka. Instead of letting Kafka determine the assignment of partitions to consumers, consumers can also subscribe directly to a partition and thus define their own assignment. In this case, consumers need to implement their own mechanisms to detect a change in the number of partitions or to rebalance the load if a consumer goes down. This can, however, be useful if the number of partitions is constant and lost consumers are immediately replaced by some sort of restart mechanism. If you want to take a closer look at how exactly Kafka manages the assignment of partitions to consumers in a group, take a look at this excellent blog post on the Confluent web site or this page on the Confluent Wiki.

Maintaining the offset

The next problem we have to solve is the maintenance of the offset. A traditional messaging system typically makes sure that a message is only delivered once. With Kafka, this task is left to the consumer (this is why some people refer to this model as the “dumb broker – smart consumer” model). In fact, the low level API “FETCH” call of the Kafka protocol expects that a consumer specifies the offset of the record (batch) it wants to read. So the consumer needs to know which offsets is has already processed to make sure that all records are read and that no record is processed twice.

This is not an easy task and subject to race conditions. Suppose, for instance, we decide to store the offset in a separate database, and our processing logic is (in a hopefully readable pseudo-code)

offset = db.read_offset()
while true:
  record = read_record(offset)
  process(record)
  offset = offset + 1
  db.store_offset(offset)

Now suppose that this consumer fails after processing the record, but before writing the updated offset into the database. When we now restart the consumer, it will read the old offset from the database and process the last record twice. If, conversely, we change the order and commit the new offset before processing the record, we would miss a record if the consumer dies between these two steps.

Instead of persisting the offset yourself, you can also ask Kafka to do this for you. When making use of this option, Kafka will store your offset in a dedicated topic. A consumer can either explicitly commit the offset to this topic, or can use auto-commit, which simply means that Kafka will automatically commit every few seconds (which, of course, leads to duplicate processing if this interval is, say, 5 seconds and the consumer dies 4 seconds after the last commit). In a later post, we will look into transactional writes, which even allow exactly-once delivery as long as no other data stores are involved.

Creating and using a KafkaConsumer

Let us now see how we can create and use a consumer with the Python Kafka API and how the consumer is configured.

First, we need to create a consumer object. When creating a consumer, there are three parameters that we need to provide: the topic from which we want to read data, the ID of the consumer group that the consumer is part of (which is an arbitrary string), at least if we plan to use the automatic assignment of partitions and / or we want Kafka to store offsets for us, and a list of bootstrap servers. So a code snippet creating a consumer could be as follows.

import kafka
consumer=kafka.KafkaConsumer("test", 
         group_id="my_group",
         bootstrap_servers="broker1:9092")

Once we are done with a consumer. we should always clean up again by calling consumer.close() so that the consumer can properly leave the group.

When using SSL to connect to the broker, you will again have to provide additional parameters when building the consumer, as we have done it for the producer.

As for the producer, a consumer can also be configured with custom deserializers. If, for instance, we use JSON as a serializer, as we have demonstread in the previous post on building a producer, we now need to provide a matching deserializer that converts a byte stream back into the target format used by the application. As for the producer, deserializers for keys and payloads can be supplied using the additional configuration parameters key_deserializer and value_deserializer.

As mentioned above, one option to deal with offsets is to leave the processing to Kafka and to ask Kafka to automatically commit offsets for us. This is in fact the default behavior, and controlled by the following parameters.

  • enable_auto_commit – this is a boolean flag which tells Kafka whether we want to automatically commit offsets, and defaults to true
  • auto_commit_interval_ms – this specifies the interval at which Kafka will commit offsets. The default is five seconds, which implies that in the worst case, the messages processed during the last five seconds will be consumed twice if your consumer fails shortly before a commit
  • auto_offset_reset – this parameter determines from which offset Kafka should start processing if no valid offset can be found. This clearly happens when we start the consumer for the first time, but can also happen if messages are deleted or are lost. If we set this to “earliest”, Kafka will start the processing at the first available offset. If we use “latest”, it will start processing at the end of the log, i.e. with the next message that will be added to the log. The default is “earliest”

Before we can read any data, we have to subscribe to a topic. When creating the consumer, we already refer to a topic, and in fact, the consumer will automatically subscribe to this topic. It is also possible to manually subscribe. This is typically done when you want to add a rebalance listener to be informed about changes in the set of assigned partitions. A rebalance listener is any class derived from kafka.ConsumerRebalanceListener which is passed as argument to the subscribe call. Whenever a partition assignment is made or revoked, Kafka will then call the corresponding method of the listener.

consumer.subscribe(TOPIC, 
    listener=MyConsumerRebalanceListener())

When an application wants to manually store offsets, for instance in a database, it can use this mechanism and / or the method consumer.assignment() to keep track of the records assigned to it. Note that, as explained in the source code comments of the listener class, Kafka will first invoke the on_partitions_revoked method of all listeners before calling any of the on_partitions_assigned methods. These handlers will be invoked from the polling loop, i.e. only when you pass control to the consumer by reading data from it, not in a separate thread (we will learn more about the exact mechanics of this process in a separate future post).

Now let us see how we can actually read data from a topic. The library offers two options to do this. First, we can simply invoke the poll method of the consumer object, which will return a batch of records. Alternatively, and more “pythonish”, we can treat the consumer object as an iterator and simply loop over it to get one record at at time.

Note that some methods of the consumer can block as they are waiting for responses from the server. As in general, consumers should make sure to not block outside of the polling loop, it is not advised to call the consumers methods in separate threads. My experience is that it can lead to problems if a signal handler, for instance, invokes methods of the consumer to shut down the consumer. Instead, it should only set a stop flag, while invoking all methods of the consumer object in the polling loop.

def signal_handler(signal, frame):
  stop = 1

while not stop:
  for record in consumer:
    .....
consumer.close()

Let us now discuss different options to commit offsets. We have already seen that the default is auto-commit, which implies that Kafka will commit automatically every 5 seconds. When using this option, we can guarantee that all messages will be read at least once, but need to be prepared to receive messages more than once. If we need full control over the process of committing offsets, we need to disable auto-commit by setting enable_auto_commit to false.

At this point, it is important to remember that the Kafka client requests data from the broker in batches. If we ask the client to commit the offset, it will commit the entire batch. It therefore does not make sense to commit once during every loop iteration of the pseudo-code above, but once at the end of the batch. As the iterator interface of the consumer object makes it difficult to determine when a batch has ended, it is easier to use the poll method of the consumer. This method returns a dictionary, where the keys are TopicPartition objects, i.e. named tuples describing a combination of topic and partition, and arrays of records.

When using manual commits, we again have several choices. First, we can commit after every record. In this way, we will have at most one duplicate in case of an error, but create an additional overhead and reduce our throughput significantly. Alternatively, we can use a batch size greater than one and commit after each batch. This will be more efficient, but if a the processing fails in the middle of the batch, we will re-read the first few records in the batch when we restart and thus process records twice.

Trying it out

Let us now see how this works in practice. If you have cloned my GitHub repository and installed Kafka as described in my previous post, you are ready to run some examples that are part of the repository and located in the python subdirectory. First let us delete and re-create the topic that we have already used for our producer tests by running the following commands on the lab PC (after changing to the repository root directory)

./kafka/bin/kafka-topics.sh \
  --bootstrap-server=$(python/getBrokerURL.py) \
  --command-config ./.state/client_ssl_config.properties \
  --topic test \
  --delete
./kafka/bin/kafka-topics.sh \
  --bootstrap-server=$(python/getBrokerURL.py) \
  --command-config ./.state/client_ssl_config.properties \
  --topic test \
  --create \
  --partitions 2 \
  --replication-factor 3

Next, we can create 10 messages in this topic by running the producer that we have already used in the previous post.

python3 python/producer.py --create_keys

We can now run our consumer to read the messages that we have just written. To do this, simply enter

python3 python/consumer.py 

Looking at the output, we see that the first attempt to poll triggers a partition reassignment. First, the coordinator will revoke the existing group assignments for all group members. Then it will assign the existing two partitions to our consumer, as this is the only consumer in the group, so that our listener is called. As this is the first read, there are no committed offsets yet, and as we have set auto_offset_reset to “earliest”, we start our read at position zero (the first offset).

We now start to read records from the log. In a second terminal window, we can inspect the currently stored offsets.

./kafka/bin/kafka-consumer-groups.sh \
  --bootstrap-server=$(python/getBrokerURL.py) \
  --command-config ./.state/client_ssl_config.properties \
  --group test-group \
  --describe

We should now see that Kafka has assigned our consumer to the two partitions and has recorded the updated offsets. As we have read every record once, the offsets should now be identical to the last read position. If you run this command quickly after starting the consumer, you should even be able to see that the automated commit only takes place after a couple of seconds.

When you now stop the consumer by hitting Ctrl-C and run it again, it will not print any new records, as it will restart at the committed offsets. To re-read our messages, we will have to reset the offsets. There are two ways to do this. You can either run

./kafka/bin/kafka-consumer-groups.sh \
  --bootstrap-server=$(python/getBrokerURL.py) \
  --command-config ./.state/client_ssl_config.properties \
  --group test-group \
  --topic test \
  --reset-offsets \
  --to-earliest \
  --execute 

or use our consumer, which has a switch –reset instructing it to only reset the offsets without reading any records. In both cases, we should now be ready for another test. This time, we disable auto-commit and use manual commits.

python3 python/consumer.py --disable_auto_commit

You should now see that the messages are processed once again, and that the offsets will again be committed, though this is triggered by our explicit calls to consumer.commit() this time.

Next, let us try what happens if we do not commit any offsets at all. Our test client supports this by setting the flag –no_commit

python3 python/consumer.py --reset
python3 python/consumer.py --no_commit
python3 python/consumer.py --no_commit

As expected, the second and third invocation both return the full set of data, as the offsets are never committed and the third invocation therefore starts at the same point at which the second invocation started.

Finally, it is instructive to see how several consumer interact. To set this, first reset all offsets again. Then, open a second terminal, start the consumer in the first terminal and then start a second consumer in the second window. The output should show you that

  • Initially, the first consumer will start to process both partitions
  • When the second consumer is started, the partitions will be revoked, and the corresponding listener is called for both consumers
  • Then, each of the two partitions will be assigned to one of the two consumers

Both consumers should now wait for data on their respective partition. If you now run the producer again to generate ten additional messages, you should nicely see that both consumers receive messages for their respective partitions in parallel.

This completes our discussion of consumers for the time being. There are a couple of points that we have not yet explored (like the manual assignment of partitions, different options for timeouts, the heartbeat thread which periodically sends a hearbeat to the Kafka group coordinator or consumers without consumer groups), but most of this is readily accessible in the Kafka documentation. In the next post, we will look at some patterns to read data from a Kafka topic and use it to maintain state in a relational database.

Learning Kafka with Python – producing data

As proud owners of a brand new Kafka installation, we are now ready to explore how applications interact with Kafka. Today, we will look at producers and understand how they write data to Kafka.

Basic design considerations

At first glance, writing data to Kafka sounds easy – connect to a Kafka broker and submit a message. However, there are some basic design considerations that are relevant when building a Kafka producer.

First, we have seen that Kafka stores the data in a topic in multiple partitions, and then each partition has a leader which is responsible for writing messages into the partition. Thus a producer needs to determine to which partition a message should be written and contact the responsible leader for this partition.

Defining the mapping of messages to partitions can be crucial for reliability and scalability of your application. Partitions determine how the application can scale horizontally, and we will learn later that partitions also determine to which extent consumers can scale. In addition, Kafka guarantees message order only within a partition. Specifically, if message A is written to partition before message B is written to the same partition, message A will receive a lower offset than B and will be read first by a (well behaving) consumer. This is no longer true if messages A and B are written to different partitions. Think of partitions as lanes on a highway – there is no guarantee that two cars entering the highway in a certain order but in different lanes will arrive at the destination in the same order.

Often, you will want to use a business entity to partition your data. If you are building a customer facing application, you might want to partition your data by customer group, if you are building a securities processing application the financial instrument might be a good partition criterion, if you are maintaining accounts then the account number might be a good choice and so forth. In other cases, where ordering is not important, you might go for a purely technical criterion.

The next fundamental question we have to figure out is when a message is considered to be successfully written. When the broker has received it? Or when the leader has written the message? Or should we wait until all followers have successfully stored the message? And what if a follower lags behind – should we stop writing messages until the follower has recovered or move on, accepting that we have lost one follower without knowing whether it will recover at a later time?

Kafka does not give a definitive answer to all these questions, but leaves you a choice – put differently, when you create a producer, you can specify its behavior using a variety of options. So let us now see how this is done in Python.

The producer object

Let us now see how a producer can be created using Python. If you have not yet done so, please install the Kafka Python library to be able to run the examples.

pip3 install kafka-python

This series uses version 2.0.1 of the library, if you want to use exactly that version you need to specify that as usual, i.e. run

pip3 install kafka-python==2.0.1

To send messages to Kafka, the first thing we need to do is to create a producer object, i.e. an instance of the class kafka.Producer. The init-method of this class accepts a large number of arguments, but in the most straightforward case, there is exactly one argument bootstrap_servers. This argument is a list of listener URLs, for instance 10.100.0.11:9092, which the producer will use to make an initial connection to a Kafka broker. This list does not need to contain all brokers, in fact one entry will do, but the producer will use this broker to obtain other brokers if needed. It is a good idea to list at least two or three brokers here, in case one broker is temporarily unavailable. So creating a producer could look like this.

import kafka
producer=kafka.KafkaProducer(bootstrap_servers=["broker1:9092", "broker2:9092"])

When started, a Producer will create a separate sender thread which will asynchronously send messages to the brokers. In addition, it will create an internal client which holds the actual connections to the Kafka cluster.

When using an SSL listener, we need a few additional configuration items. Specifically, we need to add the following named parameter when creating a KafkaProducer

  • ssl_cafile – this is the location of a CA certificate that the client will use to verify the certificate presented by the server
  • ssl_certfile – this is the location of the client certificate that the client will in turn present to the server when the server requests a certificate
  • ssl_keyfile – the key matching the client certificate

In order to be bit more flexible when it comes to connecting to different setups, the code examples that we will use in this series read the list of brokers and the SSL configuration from a YAML file config.yaml that the installation script will create in the subdirectory .state of the repository directory. All test scripts accept a parameter –config that you can use to overwrite this default location, in case you want to use your own configuration.

Once we are done using a producer, we should close it using producer.close() to clean up.

Keys and partitions

Once we have a producer in our hands, we can actually start to send messages. This requires only two parameters: the topic (a string) and the payload of the message (a sequence of bytes).

producer.send("test",value=bytes("hello", "utf-8"))

Note that this will create a topic “test” if it does not exist yet, using default values specified in the server configuration (server.properties), so be careful to use this as the default configuration might not be what you want (you can also turn this feature off by setting auto.create.topics.enable to false in server.properties).

Now you might remember from my previous post that a record in Kafka actually consists of a payload and a key. Here, we do not specify a key, so the key will remain empty. But of course, you can define a key for your record by simply adding the named parameter key to the method invocation, like this.

producer.send("test",
        value=bytes("hello", "utf-8"),
        key=bytes("mykey", "utf-8"))

What about partitions? The low-level protocol that a Kafka broker understands expects the client to send a PRODUCE request containing a valid partition ID, so it is up to the client to take this decision. The application programmer can either decide to explictly specify a partition ID (an integer) as an optional parameter to the send method, or let the framework take the decision. In this case, a so-called partitioner is invoked which, based on the value of the key, selects a partition to write to.

An application can set the configuration item partitioner when creating a producer to define a customer partitioner (which is simply a callable object that the producer will invoke). If no partitioner is specified, the default partitioner will be used, which implements the following logic.

  • If no key is given, the default partitioner will simply distribute the messages randomly across the available partitions
  • If a key is provided, a hash value of the key will be computed (using a so-called MurmurHash, which will always be an integer. The value of this hash (more precisely, of its last 31 bits) modulo the number of partitions will then determine the partition to use

The important thing to keep in mind is that if you do not provide a key, your message will end up in a random partition. If you do provide a key, then Kafka will guarantee that messages with the same key will go to the same partition and hence be processed in order.

Serialization

In our examples so far, we have passed a sequence of bytes to the send method, both for the key and the value. This is the format that the low-level protocol expects – at the end of the day, keys and values are sent over the wire as a sequence of bytes, and stored as a sequence of bytes.

In many applications, however, you will want to store more complex data types, like JSON data or even objects. So be able to do this with Kafka, you will have to convert your data into a sequence of bytes when sending the data, a process known as serializing.

When creating the producer, you can specify your own serializers for keys and payloads by adding the named parameters key_serializer and value_serializer when creating the producer. Here, a serializer can either be a function which accepts whatever input format you prefer and returns a sequence of bytes, or an instance of the class kafka.serializer.Serializer which has a serialize method which the framework will invoke.

Suppose for instance you wanted to serialize JSON data. Then, you need to provide a serializer which accepts a JSON object and returns a sequence of bytes. For that purpose, we can use the standard json.dumps method to first produce a string, and then encode the string using e.g. UTF-8 to obtain a sequence of bytes. Thus your serializer would look something like

def serialize(data):
    return bytes(json.dumps(data), "utf-8")

and when creating the producer, the call would be something like

producer=kafka.KafkaProducer(..., 
   value_serializer=serialize, ...)

Choosing a reasonable serializer is an important design choice. As Kafka topics are designed to be durable objects, you need to think about things like versioning when the decoding changes as you release new features, and obviously all components of a system need to use matching serializers and de-serializers to be able to exchange data. Many Kafka projects actually use third-party serializers, like Apache Avro or Google’s protobuf.

Acknowledgements

So far, we have seen how we can send messages using the send method of a KafkaProducer object. But in reality, you of course want to know whether your message was successfully send and stored by Kafka.

This leads us to the question at which point a new record can be considered to be committed to a Kafka cluster, i.e. stored and available for consumers. Before getting into this, however, we first have to understand the notion of an in-sync replica.

Recall that Kafka replication works by designating a leader for a partition and zero, one or more followers which constantly ask the leader for new records in the partition and store them in their own copy of the partition log. As the replicas read the records from the leader, the leaders knows which record has been delivered to which follower. The partition can therefore determine whether a replica is out-of-sync, which happens if a follower fails to retrieve the latest message within a defined time frame, or in-sync.

Having enough in-sync replicas is vital for the reliability. If a leader goes down, Kafka has to elect a new leader from the set of available replicas. Of course, choosing an out-of-sync replica to be the new leader would imply that we promote a replica to the master and thus to our new source of truth that not yet replicated all messages that producers have sent to the leader. Thus, making such a replica the new leader results in a loss of records. In some situations, you might still opt to do so, which Kafka allows you if the parameter unclean.leader.election.enable is set to True in the broker configuration.

InSyncOutOfSync

Now let us come back to the question of when a message sent by a producer will be considered committed. Again, Kafka offers you a choice, governed by the value of the parameter acks of a producer.

  • When acks = 1 (the default), a message will be considered committed once the leader acknowledges the message. Thus Kafka guarantees that a committed message has been added to the leading partition, but not that it has already been written to one or even all replicas. Note that, as the leader might cache the record in memory, this can lead to data loss if the leader goes down after acknowledging the message, but before a follower has copied it
  • When acks = -1 (all), a record will be considered committed only once the leader and in addition all in-sync replicas have acknowledged receipt of the message. As long as you have enough in-sync replicas, this gives you a strong guarantee that the message is available on several nodes and thus data loss has become very unlikely
  • Finally, a value of acks = 0 means that the message will be considered committed once it has been sent over the network, regardless of any acknowledgement from the leader or a follower. This obviously is a very weak guarantee and only reasonable if you have a strong focus on throughtput and can live with a loss of (potentially many) records

When usings acks = all, the number of in-sync replicas is of course vital. To illustrate this, let us assume that you have configured a topic with three replicas, but both followers have become out-of-sync. Now a message will be committed once the leader has written it, meaning that if the leader is lost, the record will be lost as well. To avoid such a scenario, you can set the server property min.insync.replicas. This number determines how many replicas (including the leader) need to be in-sync in order to still accept new messages. Thus if you use, for instance, a topic with a replication factor of three and min.insync.replicas=2 in combinations with acks=-1, then Kafka will guarantee that a message is only reported as committed once the leader and at least one follower have received the record.

Acks

Finally, there is one more parameter that is important for the reliability of a producer – max_in_flight_requests_per_connection. A request (which typically contains more than one record to be written) will be considered as in-flight as long as no result – either an acknowledgement or an error – has been received from the broker. If this parameter (which defaults to 5!) is set to a value greater than one, this implies that the producer will not wait for an acknowledgement before sending the next batch. In combination with retries, this can imply that the order in which messages are added to the log is not identical to the order in which they have been sent, and if the producer goes down, in-flight messages night be lost if the broker is not able to process them. Thus set this to one if you need strong guarantees on at-least-one delivery and ordering.

Retries and error handling

Finally, the last important design decision that you need to take when writing a producer is how to deal with errors.

The send method that we are using delivers messages asynchronously to the actual sender and immediately returns. To figure out whether the record was successfully committed, we therefore cannot simply use its return value, but need a different approach. Therefore, the send method returns a handler which can later be used to retrieve the status of the message, a so called Future, or, more precisely, a subclass called FutureRecordMetadata. Once we have this object, we can call its get method with a timeout in seconds to wait for the request to complete. If the request was successful, this method returns a dictionary containing record metadata, otherwise it raises an exception of type kafka.errors.KafkaError. Alternatively, you can also specify callback functions for successful and failed sends.

Note that the producer also has an option to automatically retry failed messages, which can be configured by setting the parameter retries to a value different from zero. In general, however, you should be careful with this as it might conflict with ordering, see the discussion of in-flight requests above.

Testing our producer

After all this theory, it is now time to test our producer. I assume that you have followed my previous post and installed Kafka in three virtual nodes on your PC. Now navigate to the root of the repository and run the following command to create a test topic.

./kafka/bin/kafka-topics.sh \
  --bootstrap-server=$(./python/getBrokerURL.py) \
  --command-config=.state/client_ssl_config.properties \
  --create \
  --topic test \
  --replication-factor 3 \
  --partitions 2 

Let us see what this command is doing. The script that we run, kafka-topics.sh, is part of the standard Kafka admin command line tools that are bundled with the distribution. In the second line, we invoke a little Python script that evaluates the configuration in YAML format which our installation procedure has created to determine the URL of a broker, which we then pass to the script. In the third line, we provide a Java properties file containing the SSL parameters to connect to our secured listener.

The remaining switches instruct the tool to create a new topic called “test” with a replication factor of three and two partitions (which, of course, will fail if you have already created this topic in the previous post).

Next, we will run another tool coming with Kafka – the console consumer. This is a simple consumer that will simply subscribe to a topic and dump all records in this topic to the console. To run it, enter

kafka/bin/kafka-console-consumer.sh   \
   --bootstrap-server $(./python/getBrokerURL.py)   \
   --consumer.config .state/client_ssl_config.properties \
   --from-beginning \
   --topic test 

Now open an additional terminal, navigate to the root of the repository and run the producer.

python3 python/producer.py

This should print the producer configuration used and the number of messages produced, plus timestamps and the number of seconds and microseconds it took to send all messages. In the first terminal window, in which the consumer is running, you should then see this messages flicker by.

Now let us try out a few things. First, let us create 10000 messages with a set of configurations promising the highest throughput (acks=0, fully asynchronous send, no keys provided, five requests in flight).

python3 python/producer.py \
  --messages=10000 \
  --ack=0

On my PC, producing these 10000 messages takes roughly half a second, i.e. our throughput is somewhere around 20.000 messages per seconds, without any tuning (of course the results will heavily depend on the machine on which you are running this). Next, we produce again 10000 messages, but this time, we use very conservative settings (acks=all, only one message in flight, wait for reply after each request, create and store keys and use them to determine the partition).

python3 python/producer.py \
  --messages=10000 \
  --ack=-1 \
  --max_in_flight_requests_per_connection=1 \
  --wait \
  --create_keys

Obviously, this will be much slower. On my PC, this took roughly 17 seconds, i.e. it is slower by a factor of about 25 than the first run. This hopefully illustrates nicely that Kafka leaves you many choices for trade-offs between performance and availability. Use this freedom with care and make sure you understand the consequences that the various settings have, otherwise you might loose data!

Putting it all together – the send method behind the scenes

Having seen a producer in action, it is instructive to take at a short look at the source code of the Python implementation of KafkaProducer, specifically at its send method. First, the partitions of the topic are retrieved, either from cached metadata or by requesting updated metadata from the server. Then, the key and value are serialized, and the partitioner is invoked which determines the partition to which we write the record.

Next, instead of directly sending the record to the broker, it is appended to an internal buffer using an internal helper class called a RecordAccumulator. If the accumulator signals that the buffer is full, the sender thread is triggered which will then actually transmit the entire batch to the Kafka broker. Finally, the future object is returned.

SendingData

This completes our discussion of Kafka producers. In the next post, we will learn how we can consume data from Kafka and how consumers, producers and brokers play together.

Learning Kafka with Python – the basics

More or less by accident, I recently took a closer look at Kafka, trying to understand how it is installed, how it works and how the Python API can be used to write Kafka applications with Python. So I decided its time to create a series of blog posts on Kafka to document my learnings and to (hopefully) give you a quick start if you plan to get to know Kafka in a bit more detail. Today, we take a first look at Kafkas architecture before describing the installation in the next post.

What is Kafka?

Kafka is a system for the distributed processing of messages and streams which is maintained as Open Source by the Apache Software foundation. Initially, Kafka was develop by LinkedIn and open-sourced in 2011. Kafka lets applications store data in streams (comparable to a message queue, more on that later), retrieve data from streams and process data in streams.

From the ground up, Kafka has been designed as a clustered system to achieve scalability and reliability. Kafka stores its data on all nodes in a cluster using a combination of sharding (i.e. distributing data across nodes) and replication (i.e. keeping the same record redundantly on several nodes to avoid data loss if a nodes goes down). Kafka clusters can reach an impressive size and throughput and can be employed for a variety of use cases (see for instance this post,this post or this posts to get an idea). Kafka uses another distributed system – Apache ZooKeeper – to store metadata in a reliable way and to synchronize the work of the various nodes in a cluster.

Topics and partitions

Let us start to take a look at some of the core concepts behind Kafka. First, data in Kafka is organized in entities called topics. Roughly speaking, topics are a bit like message queues. Applications called producers write records into a topic which is then stored by Kafka in a highly fault-tolerant and scalable way. Other applications, called consumers, can read records from a topic and process them.

Topic

You might have seen similar diagrams before, at least if you have ever worked with messaging systems like RabbitMQ, IBM MQ Series or ActiveMQ. One major difference between these systems and Kafka is that a topic is designed to be a persistent entity with an essentially unlimited history. Thus, while in other messaging systems, messages are typically removed from a queue when they have been read or expired, Kafka records are kept for a potentially very long time (even though you can of course configure Kafka to remove old records from a topic after some time). This is why the data structure that Kafka uses to store the records of a topic is called a log – conceptually, this is like a log file into which you write records sequentially and which you clean up from time to time, but from which you rarely ever delete records programmatically.

That looks rather simple, but there is more to it – partitions. We have mentioned above that Kafka uses sharding to distribute data across several nodes. To be able to implement this, Kafka somehow needs to split the data in a topic into several entities which can then be placed on different nodes. This entity is called a partition. Physically, a partition is simply a directory on one of the nodes, and the data in a partition is split into several files called segments.

Partitions

Of course, clients (i.e. producers and consumers) do not write directly into these files. Instead, there is a daemon running on each node, called the Kafka broker, which is maintaining the logs on this node. Thus if a producer wants to write into a topic, it will talk to the Broker responsible for the logs on the target node (which depends on the partition, as we will see in later post when we discuss producers in more detail), send the data to the Broker, and the Broker will store the data by appending it to the log. Similarly, a consumer will ask a Broker to retrieve data from a log (with Kafka, consumers pull for data, in contrast to some other messaging systems where data is pushed out to a consumer).

Records in a log are always read and written in batches, not as individual records. A batch consists of some metadata, like the number of records in the batch, and a couple of records. Each record again starts with a short header, followed by a record key and the record payload.

It is important to understand that in Kafka, the record key is NOT identifying a record uniquely. Actually, the record key can be empty and is mainly used to determine the partition to which a record will be written (again, we will discuss this in more detail in the post on producers). Instead, a record is identified by its offset within a partition. Thus if a consumer wants to read a specific record, it needs to specify the topic, the partition number and the offset of the record within the partition. The offset is simply an integer starting at zero which is increased by one for every new record in the topic and serves as a unique, primary key within this partition (i.e. it is not unique across partitions).

Replication, leaders and controllers

Sharding is one pattern that Kafka uses to distribute records across nodes. Within a topic, each partition will be placed on a different node (unless of course the number of nodes is smaller than the number of partitions, in which case some nodes will hold more than one partition). This allows us to scale a Kafka cluster horizontally and to maintain topics that exceed the storage capacity of a single node.

But what if a node goes down? In this case, all data on that node might be lost (in fact, Kafka heavily uses caching and will not immediately flush to disk when a new record is written, so if a node comes down, you will typically lose data even if your file system survives the crash). To avoid data loss in that case, Kafka replicates partitions across nodes. Specifically, for each partition, Kafka will nominate a partition leader and one or more followers. If, for instance, you configure a topic with a replication factor of three, then each partition will have one leader and two followers. All of those three brokers will maintain a copy of the partition. A producer and a consumer will always talk to the partition leader, and when data is written to a partition, it will be synced to all followers.

So let us assume that we are running a cluster with three Kafka nodes. One each node, there is a broker managing the logs on this node. If we now have a topic with two partitions and a replication factor of three, then each of the two partitions will be stored three times, once by the leader and two times on one of the broker nodes which are followers for this topic. This could lead to an assignment of partitions and replicas to nodes as shown below.

Replication

If in this situation one of the nodes, say node 1, goes down, then two things will happen. First, Kafka will elect a new leader for partition 0, say node 2. Second, it will ask a new node to create a replica for partition 1, as (even though node 1 was not the leader for partition 1) we now have only one replica for partition 1 left. This process is called partition reassignment.

To support this process, one of the Kafka brokers will be elected as the controller. It is the responsibility of the controller to detect failed brokers and reassign the leadership for the affected partitions.

Producers and consumers

In fact, the process of maintaining replicas is much more complicated than this simple diagram suggests. One of the major challenges is that it can of course happen that a record has been received by the leader and not yet synchronized to all replicas when the leader dies. Will these messages be lost? As often with Kafka, the answer is “it depends on the configuration” and we will learn more about this in one of the upcoming posts.

We have also not yet said anything about the process of consuming from a topic. Of course, in most situations, you will have more than one consumer, either because there is more than one application interested in getting the data or because we want to scale horizontally within an application. Here, the concept of a consumer group comes into play, which roughly is the logical equivalent of an application in Kafka. Within a consumer group, we can have as many consumers as the topic has partitions, and each partition will be read by only one consumer. For consumers, the biggest challenge is to keep track of the messages already read, which is of course essential if we want to implement guarantees like at-least-once or even exactly-once delivery. Kafka offers several mechanisms to deal with this problem, and we will discuss them in depth in a separate post on consumers.

In the next post of this series, we will leave the theory behind for the time being and move on to the installation process, so that you can bring up your own cluster in virtual machines on your PC to start playing with Kafka.

OpenStack Nova – deep-dive into the provisioning process

In the last post, we did go through the installation process and the high-level architecture of Nova, talking about the Nova API server, the Nova scheduler and the Nova agent. Today, we will make this a bit more tangible by observing how a typical request to provision an instance flows through this architecture.

The use case we are going to consider is the creation of a virtual server, triggered by a POST request to the /servers/ API endpoint. This is a long and complicated process, and we try to focus on the main path through the code without diving into every possible detail. This implies that we will skim over some points very briefly, but the understanding of the overall process should put us in a position to dig into other parts of the code if needed.

Roughly speaking, the processing of the request will start in the Nova API server which will perform validations and enrichments and populate the database. Then, the request is forwarded to the Nova conductor which will invoke the scheduler and eventually the Nova compute agent on the compute nodes. We will go through each of these phases in a bit more detail in the following sections.

Part I – the Nova API server

Being triggered by an API request, the process of course starts in the Nova API server. We have already seen in the previous post that the request is dispatched to a controller based on a set of hard-wired routes. For the endpoint in question, we find that the request is routed to the method create of the server controller.

This method first assembles some information like the user data which needs to be passed to the instance or the name of the SSH key to be placed in the instance. Then, authorization is carried out be calling the can method on the context (which, behind the scenes, will eventually invoke the Oslo policy rule engine that we have studied in our previous deep dive). Then the request data for networks, block devices and the requested image is processed before we eventually call the create method of the compute API. Finally, we parse the result and use a view builder to assemble a response.

Let us now see follow the call into the compute API. Here, all input parameters are validated and normalized, for instance by adding defaults. Then the method _provision_instances is invoked, which builds a request specification and the actual instance object and stores these objects in the database.

At this point, the Nova API server is almost done. We now call the method schedule_and_build_instances of the compute task API. From here, the call will simply be delegated to the corresponding method of the client side of the conductor RPC API which will send a corresponding RPC message to the conductor. At this point, we leave the Nova API server and enter the conductor. The flow through the code up to this point is summarized in the diagram below.

NovaProvisioningPartI

Part II – the conductor

In the last post, we have already seen that RPC calls are accepted by the Nova conductor service and are passed on to the Nova conductor manager. The corresponding method is schedule_and_build_instances

This method first retrieves the UUIDs of the instances from the request. Then, for each instance, the private method self._schedule_instances is called. Here, the class SchedulerQueryClient is used to submit an RPC call to the scheduler, which is being processed by the schedulers select_destinations method.

We will not go into the details of the scheduling process here, but simply note that this will in turn make a call to the placement service to retrieve allocation candidates and then calls the scheduler driver to actually select a target host.

Back in the conductor, we check whether the scheduling was successful. It not, the instance is moved into the cell0. If yes, we determine the cell in which the selected host is living, update some status information and eventually, at the end of the method, invoke the method build_and_run_instance of the RPC client for the Nova compute service. At this point, we leave the Nova conductor service and the processing continues in the Nova compute service running on the selected host.

InstanceCreationPartII

Part III – the processing on the compute node

We have now reached the Nova compute agent running on the selected compute node, more precisely the method build_and_run_instance of the Nova compute manager. Here we spawn a separate worker thread which runs the private method _do_build_and_run_instance.

This method updates the VM state to BUILDING and calls _build_and_run_instance. Within this method, we first invoke _build_resources which triggers the creation of resources like networks and storage devices, and then move on to the spawn method of the compute driver from nova.virt. Note that this is again a pluggable driver mechanism – in fact the compute driver class is an abstract class, and needs to be implemented by each compute driver.

Now let us see how the processing works in our specific case of the libvirt driver library. First, we create an image for the VM by calling the private method _create_image. Next, we create the XML descriptor for the guest, i.e. we retrieve the required configuration data and turn it into the XML structure that libvirt expects. Finally, we call _create_domain_and_network and finally set a timer to periodically check the state of the instance until the boot process is complete.

In _create_domain_and_network, we plug in the virtual network interfaces, set up the firewall (in our installation, this is the point where we use the No-OP firewall driver as firewall functionality is taken over by Neutron) and then call _create_domain which creates the actual guest (called a domain in libvirt).

This delegates the call to nova.virt.libvirt.Guest.create()and then powers on the guest using the launch method on the newly created guest. Let us take a short look at each of these methods in turn.

In nova.virt.libvirt.Guest.create(), we use the write_instance_config method of the host class to create the libvirt guest without starting it.

In the launch method in nova/virt/libvirt/guest.py, we now call createWithFlags on the domain. This is actually a call into the libvirt library itself and will launch the previously defined guest.

InstanceCreationPartIII

At this point, our newly created instance will start to boot. The timer which we have created earlier will check in periodic intervalls whether the boot process is complete and update the status of the instance in the database accordingly.

This completes our short tour through the instance creation process. There are a few points which we have deliberately skipped, for instance the details of the scheduling process, the image creation and image caching on the compute nodes or the network configuration, but the information in this post might be a good starting point for further deep dives.

WSGI, middleware, PasteDeploy and all that

When you are a Python programmer or study open source software written in Python, you will sooner or later be exposed to the WSGI standard and to related concepts like WSGI middleware. In this post, I will give you a short overview of this technology and point you to some additional references.

What is WSGI?

WSGI stands for “Web Server Gateway Interface” and is a standard that defines how Python applications can run inside a web container (“server”), quite similar to Java servlets running in a servlet container. The WSGI standard is defined in PEP 333 (and, for Python3, in PEP 3333) and describes the interface between the application and the server.

In essence, the standard is quite simple. First, an application needs to provide a callable object (that can be a function, an instance of a class with a __call__ method or a method of a class or object) to the server which accepts two arguments. The first argument, traditionally called environ, is a dictionary that plays the role of a request context. The standard defines a set of fields in that object that a server needs to populate, including

Field Description
REQUEST_METHOD The HTTP request method (GET, POST, ..)
HTTP_* Variables corresponding to the various components of the HTTP request header
QUERY_STRING The part of the request strings after the ?
wsgi.input A stream from which the response body can be read, using methods like read(), readline() or __iter__
wsgi.errors A stream to which the application can write error logs

The second argument that is passed to the application is actually a function, with the signature

start_response(status, response_headers)

This function is supposed to return a stream-like object implementing the write method. The application can call use this object to write the response into it (which, however, is not the preferred way, in general, the application should simpyl return the response data). The argument status is a HTTP status code along with the respective string, like “200 OK”. The response_headers is a list of tuples of the form (name, value) which are added to the HTTP header of the response. The idea of this function is to give the server a chance to prepare the HTTP header of the response before the actual response body is written.

In fact, there is a third, optional argument to this method, which is an expection information as returned by sys.exc_info, which can be used to ask the server to re-raise an exception caught by the application and which we will ignore here.

The application function is supposed to return the response data, i.e. the data should go into the HTTP response body. Note that with Python3, this is supposed to be a bytes object, so text needs to be converted to bytes first.

Armed with this information, let us now write our first WSGI application. Of course, we need a WSGI server, and for our tests, we will use a very simple embedded WSGI server that comes as part of the wsgiref module. Here is the code.


from wsgiref.simple_server import make_server
def application(environ, start_response):
start_response(
'200 OK',
[('Content-type', 'text/html')]
)
response = "<html><body><p><b>Environment data:</b></p>"
response += "<table><tr><th>Key</th><th>Value</th></tr>"
for key, value in environ.items():
response += "<tr><td>%s</td><td>%s</td></tr>" % ( key, value)
response = response + "</table></body></html>"
return [bytes(response, 'utf-8')]
print("Starting up")
httpd = make_server('', 8800, application)
httpd.serve_forever()

view raw

wsgi.py

hosted with ❤ by GitHub

Let us see what this application does. First, there is the application function with the signature defined by the standard. We see that we call start_response and then create a response string. The response string contains an HTML table with one entry for each key/value pair in the environ dictionary. Finally we convert this to a byte object and return it to the server.

In the main processing, we create a wsgiref.simple_server that points to our application and start it.

To run the example, simply save the above code as wsgi.py (or whatever name you prefer) and run it with

python3 wsgi.py

When you now point your browser to 127.0.0.1:8800, you should see a table containing your environment values (the simple_server includes all currently defined OS level environment variables, so you will have to scroll down to see the WSGI specific parts).

Let us now try something else. Our application actually returns a sequence of byte objects. The server is supposed to iterate over this sequence and assemble the results to obtain the entire response. Thus the only thing that matters is that our application is something that can be called and returns something that has a method __iter__. Instead of using a function which returns a sequence, we can therefore as well use a class that has an __iter__ method as in the example below.


from wsgiref.simple_server import make_server
class Application:
def __init__(self, environ, start_response):
self.environ = environ
self.start_response = start_response
def __iter__(self):
self.start_response(
'200 OK',
[('Content-type', 'text/html')]
)
yield b'Hello!'
httpd = make_server('', 8800, Application)
httpd.serve_forever()

view raw

wsgi.py

hosted with ❤ by GitHub

When the server receives a request, it will call the “thing called application”, i.e. it will do something like Application(). This will create a new instance of the application object, i.e. call the __init__ method, which simply stores the parameters for later use. Then, the server will iterate over this object, i.e. call __iter__, where the actual result is assembled and returned.

Finally, we could also pass an instance of a class instead of a class to make_server. This instance than needs a __call__ method so that it can be invoked like a function.

WSGI middleware

As we have seen, the WSGI specification has two parts. First, it defines how an application should behave (call start_response and return response data) and it defines how a server should behave (call the application), as displayed below.

WSGIInterface

A WSGI middleware is simply a piece of Python code that implements both behaviours – it can act as a server and as an application. This allows middleware components to be chained: the server calls the middleware, the middleware performs whatever action it wishes, for instance manipulating the environment variable, and then invokes the application, and the application prepares the actual response.

WSGIMiddleware

Of course, instead of just passing through the start_response function to the application, a middleware could also pass in a different function and then call the original start_response function itself.

A nice feature of middleware is that it can be chained. You could for instance have a middleware which performs authorization, followed by a middleware to rewrite URLs and so forth, until finally the application is invoked. Here is a simple example.


from wsgiref.simple_server import make_server
class Middleware:
def __init__(self, app):
self.app = app
def __call__(self, environ, start_response):
environ['added_by_middleware'] = 1
return self.app(environ, start_response)
def application(environ, start_response):
start_response(
'200 OK',
[('Content-type', 'text/html')]
)
response = "<html><body><p><b>Environment data:</b></p>"
response += "<table><tr><th>Key</th><th>Value</th></tr>"
for key, value in environ.items():
response += "<tr><td>%s</td><td>%s</td></tr>" % ( key, value)
response = response + "</table></body></html>"
return [bytes(response, 'utf-8')]
httpd = make_server('', 8800, Middleware(application))
httpd.serve_forever()

view raw

wsgi.py

hosted with ❤ by GitHub

If you run this example as before, you will see that in addition to the environment variables produced by our first example, there is the additional key added_by_middleware which has been added by the middleware. In this example, the full call chain is as follows.

  • When the server starts, it creates an instance of the class Middleware that points to the function application
  • This instance is passed as argument to make_server
  • The server gets the request from the browser
  • The server makes a call on the “thing” supplied with make_server, i.e. the middleware instance
  • The server calls the middleware instance, i.e. it invokes its __call__ function
  • The __call__ function adds the additional key to the environment and then delegates the request to the function application

Building middleware chains with PasteDeploy

So far, we have chained middleware programmatically, but in real life, it is often much more flexible to do this via a configuration. Enter PasteDeploy, a Python module that allows you to build chains of middleware components from a configuration. To make sure that you have this installed, run

pip3 install PasteDeploy

before proceeding.

PasteDeploy is able to parse configuration files and to dynamically pipe together WSGI applications and WSGI middleware. To understand how this works, let us first consider an example. Suppose that in our working directory, we have the following code, stored in a file wsgi.py


from paste.deploy import loadapp
from wsgiref.simple_server import make_server
#
# This is our application, as usual
#
def application(environ, start_response):
start_response(
'200 OK',
[('Content-type', 'text/html')]
)
response = "<html><body><p><b>Environment data:</b></p>"
response += "<table><tr><th>Key</th><th>Value</th></tr>"
for key, value in environ.items():
response += "<tr><td>%s</td><td>%s</td></tr>" % ( key, value)
response = response + "</table></body></html>"
return [bytes(response, 'utf-8')]
#
# This is the factory which is invoked by PasteDeploy, passing
# additional configuration data from the INI file
#
def app_factory(global_config, **local_conf):
return application
#
# This call evaluates the INI file and builds an application
#
wsgi_app = loadapp('config:paste.ini', relative_to=".")
httpd = make_server('', 8800, wsgi_app)
httpd.serve_forever()

view raw

wsgi.py

hosted with ❤ by GitHub

In addition, let us create a configuration file paste.ini in the same directory, with the following content.

[app:main]
use = call:wsgi:app_factory

When we now run wsgi.py, we again get the same server as in our first, basic example. But what is happening behind the scenes?

First, we invoke the PasteDeploy API by calling loadapp. This function will evaluate the INI file passed as argument for different types of objects PasteDeploy knows. In our case, the section name app:main implies that we want to define an application and that this is the main entry point for our WSGI server. The argument that PasteDeploy expects here is the the full path to a factory function (i.e. in our case, the function app_factory in wsgi.py). PasteDeploy will then simply call this factory and return the result of this call as an application. We then start a server using this application as before. Note that PasteDeploy can also pass configuration data in the INI file to the factory.

A second basic object in PasteDeploy are filters. Filters are used to create filtered versions of an application, i.e. the application behind a defined middleware (the filter). In the configuration file, filters are specified in a section starting with the keyword filter, and refer to a filter factory. A filter factory is a callable which is called with the configuration in the INI file as argument, and returns a filter. A filter, in turn, is a function which receives an application as an argument and returns a WSGI application wrapping this application. This sounds a bit confusing, so it might be a good idea to look at an example. Our new code looks as follows


from paste.deploy import loadapp
from wsgiref.simple_server import make_server
#
# A middleware that adds a key to the environment
#
class Middleware:
def __init__(self, app, key="test", value=1):
self._key = key
self._value = value
self._app = app
def __call__(self, environ, status_response):
environ[self._key] = self._value
return self._app(environ, status_response)
#
# This is our application, as usual
#
def application(environ, start_response):
start_response(
'200 OK',
[('Content-type', 'text/html')]
)
response = "<html><body><p><b>Environment data:</b></p>"
response += "<table><tr><th>Key</th><th>Value</th></tr>"
for key, value in environ.items():
response += "<tr><td>%s</td><td>%s</td></tr>" % ( key, value)
response = response + "</table></body></html>"
return [bytes(response, 'utf-8')]
#
# This is the factory which is invoked by PasteDeploy, passing
# additional configuration data from the INI file
#
def app_factory(global_config, **local_conf):
return application
#
# A filter factory. A filter factory returns a filter function
#
def filter_factory(global_conf, key):
# A filter function returns a middleware, wrapping the
# provided app
def filter(app):
return Middleware(app, key)
return filter
#
# This call evaluates the INI file and builds an application
#
wsgi_app = loadapp('config:paste.ini', relative_to=".")
httpd = make_server('', 8800, wsgi_app)
httpd.serve_forever()

view raw

wsgi.py

hosted with ❤ by GitHub

with the following configuration

[app:main]
use = call:wsgi:app_factory
filter-with = filter1 

[filter:filter1]
use = call:wsgi:filter_factory 
key = "abc"

What happens if you run the example? First, PasteDeploy will create an application as before, by calling the app_factory function. Then, it will find the configuration option filter-with that tells the library that we wish to wrap the application. Here, we refer to a filter called filter1 which is defined in the section of the INI file.

When evaluating this section, PasteDeploy will call the provided filter factory filter_factory, passing the additional configuration in the section as parameters. The filter factory returns a function, the filter function. PasteDeploy will now take the application and call the filter function with this application as argument. The return value of this call will then be used as the actual application that is returned by loadapp and started using the simple_server (in fact, PasteDeploy will first call the filter factory, then the app factory and then the filter itself).

Of course, you can apply more than one filter to an application. To make this as easy as possible, PasteDeploy offers a third type of objects called pipelines. A pipeline is just a sequence of filters which are applied to an application. The nice thing about pipelines is that they are piped together by PasteDeploy automatically, without any need to write additional factory objects. So our source code remains the same, we only have to change the application.

[pipeline:main]
pipeline = filter1 filter2 myapp

[app:myapp]
use = call:wsgi:app_factory

[filter:filter1]
use = call:wsgi:filter_factory 
key = "abc"

[filter:filter2]
use = call:wsgi:filter_factory 
key = "def"

Here, we define a pipeline which will first apply filter1, then filter2 and then finally pass control to our app. These three objects are created by the same calls to factory functions as before, and PasteDeploy will automatically load the pipeline and plumb the objects together. The result will be that once the application is reached, both keys (abc and def) will be present in the request context.

This is now what we want. We can, of course, have filters in different Python modules, and thus completely decoupled. PasteDeploy will then happily plumb together the final WSGI application according to the configuration, and we can easily add middleware components to the pipelines and remove them, without having to change our code.

Finally, there is another approach to configure a pipeline which is also the one described in the documentation. Here, we realize a pipeline as a composite object. This object again corresponds to a factory function with a specific signature. Part of this signature is a loader object which we can use to load the individual filters by name and apply them step by step to the application. A nice example where this is implemented is the configuration of the OpenStack Nova compute service, with the factory being implemented here. And yes, it was an effort to understand this example which eventually made me carry out some research and write this blog post – expect to see a bit more on OpenStack soon on this blog!

Automating provisioning with Ansible – variables and facts

In the playbooks that we have considered so far, we have used tasks, links to the inventory and modules. In this post, we will add another important feature of Ansible to our toolbox – variables.

Declaring variables

Using variables in Ansible is slightly more complex than you might expect at the first glance, mainly due to the fact that variables can be defined at many different points, and the precedence rules are a bit complicated, making errors likely. Ignoring some of the details, here are the essential options that you have to define variables and assign values:

  • You can assign variables in a playbook on the level of a play, which are then valid for all tasks and all hosts within that play
  • Similarly, variables can be defined on the level of an individual task in a playbook
  • You can define variables on the level of hosts or groups of hosts in the inventory file
  • There is a module called set_fact that allows you to define variables and assign values which are then scoped per host and for the remainder of the playbook execution
  • Variables can be defined on the command line when executing a playbook
  • Variables can be bound to a module so that the return values of that module are assigned to that variable within the scope of the respective host
  • Variable definitions can be moved into separate files and be referenced from within the playbook
  • Finally, Ansible will provide some variables and facts

Let us go through these various options using the following playbook as an example.

---
- hosts: all
  become: yes
  # We can define a variable on the level of a play, it is
  # then valid for all hosts to which the play applies
  vars:
    myVar1: "Hello"
  vars_files:
  - vars.yaml
  tasks:
    # We can also set a variable using the set_fact module
    # This will be valid for the respective host until completion
    # of the playbook
  - name: Set variable
    set_fact:
      myVar2: "World"
      myVar5: "{{ ansible_facts['machine_id'] }}"
    # We can register variables with tasks, so that the output of the
    # task will be captured in the variable
  - name: Register variables
    command: "date"
    register:
      myVar3
  - name: Print variables
    # We can also set variables on the task level
    vars:
      myVar4: 123
    debug:
      var: myVar1, myVar2, myVar3['stdout'], myVar4, myVar5, myVar6, myVar7

At the top of this playbook, we see an additional attribute vars on the level of the play. This attribute itself is a list and contains key-value pairs that define variables which are valid across all tasks in the playbook. In our example, this is the variable myVar1.

The same syntax is used for the variable myVar4 on the task level. This variable is then only valid for that specific task.

Directly below the declaration of myVar1, we instruct Ansible to pick up variable definitions from an external file. This file is again in YAML syntax and can define arbitrary key-value pairs. In our example, this file could be as simple as

---
  myVar7: abcd

Separating variable definitions from the rest of the playbook is very useful if you deal with several environments. You could then move all environment-specific variables into separate files so that you can use the same playbook for all environments. You could even turn the name of the file holding the variables into a variable that is then set using a command line switch (see below), which allows you to use different sets of variables for each execution without having to change the playbook.

The variable myVar3 is registered with the module command, meaning that it will capture the output of this module. Note that this output will usually be a complex data structure, i.e. a dictionary. One of the keys in this dictionary, which depends on the module, is typically stdout and captures the output of the command.

For myVar2, we use the module set_fact to define it and assign a value to it. Note that this value will only be valid per host, as myVar5 demonstrates (here we use a fact and Jinja2 syntax – we will discuss this further below).

In the last task of the playbook, we print out the value of all variables using the debug module. If you look at this statement, you will see that we print out a variable – myVar6 – which is not defined anywhere in the playbook. This variable is in fact defined in the inventory. Recall that the inventory for our test setup with Vagrant introduced in the last post looked as follows.

[servers]
192.168.33.10
192.168.33.11

To define the variable myVar6, change this file as follows.

[servers]
192.168.33.10 myVar6=10
192.168.33.11 myVar6=11

Note that behind each host, we have added the variable name along with its value which is specific per host. If you now run this playbook with a command like

export ANSIBLE_HOST_KEY_CHECKING=False
ansible-playbook  \
        -u vagrant \
        --private-key ~/vagrant/vagrant_key \
        -i ~/vagrant/hosts.ini \
         definingVariables.yaml

then the last task will produce an output that contains a list of all variables along with their values. You will see that myVar6 has the value defined in the inventory, that myVar5 is in fact different for each host and that all other variables have the values defined in the playbook.

As mentioned before, it is also possible to define variables using an argument to the ansible-playbook executable. If, for instance, you use the following command to run the playbook

export ANSIBLE_HOST_KEY_CHECKING=False
ansible-playbook  \
        -u vagrant \
        --private-key ~/vagrant/vagrant_key \
        -i ~/vagrant/hosts.ini \
        -e myVar4=789 \
         definingVariables.yaml

then the output will change and the variable myVar4 has the value 789. This is an example of the precedence rule mentioned above – an assignment specified on the command line overwrites all other definitions.

Using facts and special variables

So far we have been using variables that we instantiate and to which we assign values. In addition to these custom variables, Ansible will create and populate a few variables for us.

First, for every machine in the inventory to which Ansible connects, it will create a complex data structure called ansible_facts which represents data that Ansible collects on the machine. To see an example, run the command (assuming again that you use the setup from my last post)

export ANSIBLE_HOST_KEY_CHECKING=False
ansible \
      -u vagrant \
      -i ~/vagrant/hosts.ini \
      --private-key ~/vagrant/vagrant_key  \
      -m setup all 

This will print a JSON representation of the facts that Ansible has gathered. We see that facts include information on the machine like the number of cores and hyperthreads per core, the available memory, the IP addresses, the devices, the machine ID (which we have used in our example above), environment variables and so forth. In addition, we find some information on the user that Ansible is using to connect, the used Python interpreter and the operating system installed.

It is also possible to add custom facts by placing a file with key-value pairs in a special directory on the host. Confusingly, this is called local facts, even though these facts are not defined on the control machine on which Ansible is running but on the host that is provisioned. Specifically, a file in /etc/ansible/facts.d ending with .fact can contain key-value pairs that are interpreted as facts and added to the dictionary ansible_local.

Suppose, for instance, that on one of your hosts, you have created a file called /etc/ansible/facts.d/myfacts.fact with the following content

[test]
testvar=1

If you then run the above command again to gather all facts, then, for that specific host, the output will contain a variable

"ansible_local": {
            "myfacts": {
                "test": {
                    "testvar": "1"
                }
            }

So we see that ansible_local is a dictionary, with the keys being the names of the files in which the facts are stored (without the extension). The value for each of the files is again a dictionary, where the key is the section in the facts file (the one in brackets) and the value is a dictionary with one entry for each of the variables defined in this section (you might want to consult the Wikipedia page on the INI file format).

In addition to facts, Ansible will populate some special variables like the inventory_hostname, or the groups that appear in the inventory file.

Using variables – Jinja2 templates

In the example above, we have used variables in the debug statement to print their value. This, of course, is not a typical usage. In general, you will want to expand a variable to use it at some other point in your playbook. To this end, Ansible uses Jinja2 templates.

Jinja2 is a very powerful templating language and Python library, and we will only be able to touch on some of its features in this post. Essentially, Jinja2 accepts a template and a set of Python variables and then renders the template, substituting special expressions according to the values of the variables. Jinja2 differentiates between expressions which are evaluated and replaced by the values of the variables they refer to, and tags which control the flow of the template processing, so that you can realize things like loops and if-then statements.

Let us start with a very simple and basic example. In the above playbook, we have hard-coded the name of our variable file vars.yaml. As mentioned above, it is sometimes useful to use different variable files, depending on the environment. To see how this can be done, change the start of our playbook as follows.

---
- hosts: all
  become: yes
  # We can define a variable on the level of a play, it is
  # then valid for all hosts to which the play applies
  vars:
    myVar1: "Hello"
  vars_files:
  - "{{ myfile }}"

When you now run the playbook again, the execution will fail and Ansible will complain about an undefined variable. To fix this, we need to define the variable myfile somewhere, say on the command line.

export ANSIBLE_HOST_KEY_CHECKING=False
ansible-playbook  \
        -u vagrant \
        --private-key ~/vagrant/vagrant_key \
        -i ~/vagrant/hosts.ini \
        -e myVar4=789 \
        -e myfile=vars.yaml \
         definingVariables.yaml

What happens is that before executing the playbook, Ansible will run the playbook through the Jinja2 templating engine. The expression {{myfile}} is the most basic example of a Jinja2 expression and evaluates to the value of the variable myfile. So the entire expression gets replaced by vars.yaml and Ansible will read the variables defined there.

Simple variable substitution is probably the most commonly used feature of Jinja2. But Jinja2 can do much more. As an example, let us modify our playbook so that we use a certain value for myVar7 in DEV and a different value in PROD. The beginning of our playbook now looks as follows (everything else is unchanged):

---
- hosts: all
  become: yes
  # We can define a variable on the level of a play, it is
  # then valid for all hosts to which the play applies
  vars:
    myVar1: "Hello"
    myVar7: "{% if myEnv == 'DEV' %} A {% else %} B {% endif %}"

Let us run this again. On the command line, we set the variable myEnv to DEV.

export ANSIBLE_HOST_KEY_CHECKING=False
ansible-playbook  \
        -u vagrant \
        --private-key ~/vagrant/vagrant_key \
        -i ~/vagrant/hosts.ini \
        -e myVar4=789 \
        -e myEnv=DEV \
         definingVariables.yaml

In the output, you will see that the value of the variable is ” A “, as expected. If you use a different value for myEnv, you get ” B “. The characters “{%” instruct Jinja2 to treat everything that follows (until “%}”) as tag. Tags are comparable to statements in a programming language. Here, we use the if-then-else tag which evaluates to a value depending on a condition.

Jinja2 comes with many tags, and I advise you to browse the documentation of all available control structures. In addition to control structures, Jinja2 also uses filters that can be applied to variables and can be chained.

To see this in action, we turn to an example which demonstrates a second common use of Jinja2 templates with Ansible apart from using them in playbooks – the template module. This module is very similar to the copy module, but it takes a Jinja2 template on the control machine and does not only copy it to the remote machine, but also evaluates it.

Suppose, for instance, you wanted to dynamically create a web page on the remote machine that reflects some of the machines’s characteristics, as captured by the Ansible facts. Then, you could use a template that refers to facts to produce some HTML output. I have created a playbook that demonstrates how this works – this playbook will install NGINX in a Docker container and dynamically create a webpage containing machine details. If you run this playbook with our Vagrant based setup and point your browser to http://192.168.33.10/, you will see a screen similar to the one below, displaying things like the number of CPU cores in the virtual machine or the network interfaces attached to it.

Screenshot from 2019-10-05 18-01-59

I will not go through this in detail, but I advise you to try out the playbook and take a look at the Jinja2 template that it uses. I have added a few comments which, along with the Jinja2 documentation, should give you a good idea how the evaluation works.

To close this post, let us see how we can test Jinja2 templates. Of course, you could simply run Ansible, but this is a bit slow and creates an overhead that you might want to avoid. As Jinja2 is a Python library, there is a much easier approach – you can simply create a small Python script that imports your template, runs the Jinja2 engine and prints the result. First, of course, you need to install the Jinja2 Python module.

pip3 install jinja2

Here is an example of how this might work. We import the template index.html.j2 that we also use for our dynamic webpage displayed above, define some test data, run the engine and print the result.

import jinja2
#
# Create a Jinja2 environment, using the file system loader
# to be able to load templates from the local file system
#
env = jinja2.Environment(
    loader=jinja2.FileSystemLoader('.')
)
#
# Load our template
#
template = env.get_template('index.html.j2')
#
# Prepare the input variables, as Ansible would do it (use ansible -m setup to see
# how this structure looks like, you can even copy the JSON output)
#
groups = {'all': ['127.0.0.1', '192.168.33.44']}
ansible_facts={
        "all_ipv4_addresses": [
            "10.0.2.15",
            "192.168.33.10"
        ],
        "env": {
            "HOME": "/home/vagrant",
        },
        "interfaces": [
            "enp0s8"
        ],
        "enp0s8": {
            "ipv4": {
                "address": "192.168.33.11",
                "broadcast": "192.168.33.255",
                "netmask": "255.255.255.0",
                "network": "192.168.33.0"
            },
            "macaddress": "08:00:27:77:1a:9c",
            "type": "ether"
        },
}
#
# Render the template and print the output
#
print(template.render(groups=groups, ansible_facts=ansible_facts))

An additional feature that Ansible offers on top of the standard Jinja2 templating language and that is sometimes useful are lookups. Lookups allow you to query data from external sources on the control machine (where they are evaluated), like environment variables, the content of a file, and many more. For example, the expression

"{{ lookup('env', 'HOME') }}"

in a playbook or a template will evaluate to the value of the environment variable HOME on the control machine. Lookups are enabled by plugins (the name of the plugin is the first argument to the lookup statement), and Ansible comes with a large number of pre-installed lookup plugins.

We have now discussed Ansible variables in some depth. You might want to read through the corresponding section of the Ansible documentation which contains some more details and links to additional information. In the next post, we will turn our attention back from playbooks to inventories and how to structure and manage them.

Database programming with Python

Most of us probably started to use Python as a scripting language to quickly create working code for e.g. numerical and scientific calculations. But, of course, Python is much more than that. If you intend to use Python for more traditional applications, you will sooner or later need to interface with a database. Today, we will see how the Python DB-API can be used for that purpose.

The Python DB-API

Of course there are many databases that can be used with Python – various modules exist like mysql-connector for MySQL, Psycopg for PostgreSQL or sqlite3 for the embedded database SQLite. Obviously, all these drivers differ slightly, but it turns out that they follow a common design pattern which is known as the Python DB-API and described in PEP-249.

If you know Java, you have probably heard of JDBC, which is a standard to access relational databases from Java applications. The Python DB-API is a bit different – it is not a library, but it is a set of design patterns that drivers are supposed to follow.

When using JDBC, there is, for instance, an interface class called java.sql.Connection. Whatever driver and whatever database you use, this interface will always be the same for compliant drivers. For Python, the situation is a bit different. If you use SQLite, then there will be a class sqlite3.Connection. If you use MySQL, there will be a class with the slightly cryptic name mysql.connector.connection_cext.CMySQLConnection. These are different classes, and they do not inherit from a common superclass or interface. Still, the Python DB-ABI dictates that these classes all have a common set of methods and attributes to make switching between different databases as easy as possible.

Before we start to look at an example, it is helpful to understand the basic objects of the DB-API class model. First, there is a connection, which, of course, represents a connection to a database. How to initially obtain a connection is specific for the database used, and we will see some examples further below.

When a connection is obtained, it is active or open. It can be closed by executing its close() method, which will render the connection unusable and roll back any uncommitted changes. A connection also has commit() and rollback() to support transaction control (however, the specification leaves the implementation some freedom with respect to features like auto-commit and isolation levels, and does not even mandate that implementations support transactions at all). The specification is also not fully clear on how to start transactions, it seems that most driver automatically start a new transaction if a statement is executed and no transaction is in progress.

To actually execute statements and fetch results, a cursor is used. Again, it is implementation specific whether this is implemented as a real, server-side cursor or as a local cache within the client. Cursors are created by calling the connections cursor method, and is only valid within the context of a connection (i.e. if a connection is closed, all cursors obtained from it will become unusable). Once we have a cursor, we can execute statements on it using either its execute method (which executes a statement with one set of parameters) or its executemany method, which executes a statement several times with differents sets of parameters. How exactly parameters are referenced in a statement is implementation specific, and we will see some examples below.

The statement executed via a cursor can be an update or a select statement. In the latter case, we can access its results either via the fetchall() method which returns all rows of the result set as a sequence, or the fetchone() method which returns the next row. The API also defines a method fetchmany() which returns a specified number of rows at a time.

Using Python with SQLite

After this general introduction, let us now see how the API works in practice. For the sake of simplicity, we will first use the embedded database SQLite.

One of the nice things about SQLite3 is that the module that you will need to use it in a Python program – slqite3 is part of the Python standard library and therefore part of any Python standard installation. So there is no separate installation needed to use it, and we can start to play with it right away.

To use a databases, the first thing we have to do is to establish a connection to it. As mentioned above, the process to do this is specific for the SQLite database. As SQLite identifies a database by the file where the data is stored, we basically only have to provide the file name to the driver to either establish a connection to an existing database or to create a new database (which will happen automatically if the file we refer to does not exist). Here is a code snippet that will establish a connection to a database stored in your home directory in the file.

import sqlite3 as dblib
from pathlib import Path

home = str(Path.home())
db = home + "/example.db"
c = dblib.connect(db)

Now let us create a table. This sounds simple enough, just execute a CREATE TABLE statement. However, this will raise an error if the table already exists. Of course, we could use the IF NOT EXISTS clause to achieve this, but for the sake of demonstration let us choose a different approach.

We first try to drop the table. If the table does not exist, this will raise an exception which we can catch, if the table exists it will be removed. In any case, we can now proceed to create the table. To execute these statements, we need a cursor as explained above which we can get from the newly created connection. So our code looks as follows (of course, in productive code, you would put more effort into making sure that the reason for the exception is what you expect):

cursor = c.cursor()
try:
    cursor.execute('''DROP TABLE books''')
except dblib.Error as err:
    pass
cursor.execute('''CREATE TABLE books
             (author text, title text)''')

Next, we want to insert data into our database. Suppose you wanted to insert a book called “Moby Dick” written by Herman Melville. So we want to execute an SQL statement like

INSERT INTO books
  (author, title)
VALUES
  ('Herman Melville', 'Moby  Dick')
;

Of course, we could simply assemble this SQL statement in Python and pass it to the execute method of our cursor directly. But this is not the recommended way of doing things. Instead, one generally uses SQL host variables. These are variable parts of a statement which are declared inside the statement and, at runtime, are bound to Python variables. This is generally advisable, for two reasons. First, using the naive approach of manually assembling SQL statements and parameter values makes your program more vulnerable to SQL injection. Second, using host variables allows the driver to prepare the statement only once (i.e. to parse and tokenize the statement and prepare an execution plan) even if you insert multiple rows, thus increasing performance.

To use host variables with SQLite, you would execute a statement in which the values you want to insert are replaced by placeholders, using question marks.

INSERT INTO books
  (author, title)
VALUES
  (?, ?)
;

When you call the execute method of a cursor, you pass this SQL string along with the values for the placeholders, and the database driver will then replace by their actual values, a process called binding.

However, when coding this, there is a subtlety we need to observe. The special characters used for the placeholders are not specified by the DB-API Python standard and might therefore be driver specific. Fortunately, even though the standard does not define the placeholders, it defines a way to obtain them that is the same for all drivers.

Specifically, the standard specifies several placeholder styles which can be queried by accessing a global variable. In our example, we use only two of these styles, called pyformat and question mark. Pyformat, the style used by e.g. MySQL, uses Python-like placeholders like %s, whereas question mark uses – well, you might have guessed that – question marks as SQLite does it. So to keep our code reusable, we first retrieve the placeholder style, translate this into the actual placeholder (a dictionary comes in handy here) and then assemble our statement.

knownMarkers = {'pyformat' : '%s', 'qmark' : '?'}
marker =  knownMarkers[dblib.paramstyle]

examples = [('Dickens', 'David Copperfield'),
            ('Melville', 'Moby Dick')]

sqlString =  '''INSERT INTO books
                       (author, title)
                 VALUES ''' + "(" + marker + "," + marker + ")"
cursor.executemany(sqlString, examples)
c.commit()

Note that we explicitly commit at the end of the statement, as closing the connection would otherwise rollback our changes. We also use the executemany method which performs several insertions, binding host variables from a sequence of value tuples.

Reading from our database is now very simple. Again, we first need to get a connection and a cursor. Once we have that, we assemble a SELECT statement and execute it. We then use the fetch or fetchall method to retrieve the result set and can iterate through the result set as usual.

# Get all rows from table books
statement = "SELECT author, title from books;"
cursor.execute(statement)
rows = cursor.fetchall()

# Iterate through result set
for row in rows:
    print ("Author: " + row[0])
    print ("Title:  " + row[1])

As we can see, the order of the columns in the individual tuples within the result set is as in our SELECT statement. However, I have not found an explicit guarantee for this behaviour in the specification. If we want to make sure that we get the columns right, we can use the cursor.description attribute which the standard mandates and which contains a sequence of tuples, wherein each tuple represents a column in the result and contains a set of attributes like name and type.

We have now seen how we can use the DB-API to create database tables, to insert rows and to retrieve results. I have assembled the code that we have discussed in two scripts, one to prepare a database and one to read from it, which you can find here.

Using Python with MySQL

To illustrate the usage of the DB-API for different databases, let us now try to do the same thing with MySQL. Of course, there is some more setup involved in this case. First, the driver that we will use – the mysql-connector – is not part of the standard Python library and needs to be installed separately. Of course, this is done using

pip3 install mysql-connector-python

We will also need the MySQL command line client mysql and the admin client mysqladmin. On an Ubuntu system, you can install them using

sudo apt-get install  mysql-client

As we intend to run the MySQL server in a docker container, you also need a Docker engine installed on your system. If you do not have that yet, the easiest way might be to install it using snap.

snap install docker

Next, we need to bring up an instance of the MySQL database using docker run, give it some time to come up, create a database called books and grant access rights to a new user that we will use to access our database from within Python. Here is a short script that will perform all these update scripts and that you can also find here.

# Start docker container
docker run -d --name some-mysql \
           --rm \
           -p 3306:3306 \
           -e MYSQL_ROOT_PASSWORD=my-secret-root-pw \
           -e MYSQL_USER=chr \
           -e MYSQL_PASSWORD=my-secret-pw \
            mysql 

# Give the database some time to come up
mysqladmin --user=root --password=my-secret-root-pw \
           --host='127.0.0.1'  \
            --silent status

while [ $? == 1  ]
do
  sleep 5
  mysqladmin --user=root --password=my-secret-root-pw \
             --host='127.0.0.1'  \
             --silent status
done 

# Create database books and grant rights to user chr
mysqladmin --user=root \
           --password=my-secret-root-pw \
            --host='127.0.0.1'  create books
echo 'grant all on books.* to 'chr';' \
              | mysql --user=root \
                      --password=my-secret-root-pw \
                      --host='127.0.0.1' books

BE CAREFUL: if you have read my post on Docker networking, you will know that using the -p switch implies that we can reach the database from every machine in the local network – so if you are not in a fully trusted environment, you definitely want to change this script and the program that will follow to use real passwords, not the ones used in this post.

Let us now see how we can access our newly created database from Python. Thanks to the DB-API, most of the code that we will have to use is actually the same as in the case of SQLite. Basically, there are two differences. First, the module that we have to import is of course different, but we can again use the alias dblib to be able to use the remainder of the code without changes.

import mysql.connector as dblib

The second change that we need to make is the way how we obtain the initial database connection, as connecting to a MySQL database requires additional parameters like credentials. Here is the respective statement

c = dblib.connect(user='chr', password='my-secret-pw',
                              host='127.0.0.1',
                              database='books')

All the remainder of the code can now be taken over from the SQLite case unchanged. On my GitHub page, I have created a separate directory holding the code, and if you compare this code to the code that we used earlier for SQLite, you will see that it is in fact only those two parts of the code that are different.

Of course, there is much more that could be said about database programming with Python. In reality, each database behaves differently, and you will have to deal with specifics like auto-commit, generated fields, different types, performance, result set sizes and so forth. However, the basics are the same thanks to the DB-API, and I hope that I could give you a good starting point for further investigations.

Managing traffic with Kubernetes ingress controllers

In one of the previous posts, we have learned how to expose arbitrary ports to the outside world using services and load balancers. However, we also found that this is not very efficient – in the worst case, the number of load balancers we need equals the number of services.

Specifically for HTTP/HTTPS traffic, there is a different option available – an ingress rule.

Ingress rules and ingress controllers

An ingress rule is a Kubernetes resource that defines a kind of routing on the HTTP(S) path level. To illustrate this, assume that you have two services running, call them svcA and svcB. Assume further that both services work with HTTP as the underlying protocol and are listening on port 80. If you expose these services naively as in the previous post, you will need two load balancers, call them LB1 and LB2. Then, to reach svcA, you would use the URL

http://LB1/

and to access svcB, you would use

http://LB2/

The idea of an ingress is to have only one load balancer, with one DNS entry, and to use the path name to route traffic to our services. So with an ingress, we would be able to reach svcA under the URL

http://LB/svcA

and the second service under the URL

http://LB/svcB

With this approach, you have several advantages. First, you only need one load balancer that clients can use for all services. Second, the path name is related to the service that you invoke, which follows best practises and makes coding against your services much easier. Finally, you can easily add new services and remove old services without a need to change DNS names.

In Kubernetes, two components are required to make this work. First, there are ingress rules. These are Kubernetes resources that contain rules which specify how to route incoming requests based on their path (or even hostname). Second, there are ingress controllers. These are controllers which are not part of the Kubernetes core software, but need to be installed on top. These controllers will work with the ingress rules to manage the actual routing. So before trying this out, we need to install an ingress controller in our cluster.

Installing an ingress controller

On EKS, there are several options for an ingress controller. One possible choice is the nginx ingress controller. This is the controller that we will use for our examples. In addition, AWS has created their own controller called AWS ALB controller that you could use as an alternative – I have not yet looked at this in detail, though.

So let us see how we can install the nginx ingress controller in our cluster. Fortunately, there is a very clear installation instruction which tells us that to run the install, we simply have to execute a number of manifest files, as you would expect for a Kubernetes application. If you have cloned my repository and brought up your cluster using the up.sh script, you are done – this script will set up the controller automatically. If not, here are the commands to do this manually.

$ kubectl apply -f https://raw.githubusercontent.com/kubernetes/ingress-nginx/master/deploy/mandatory.yaml
$ kubectl apply -f https://raw.githubusercontent.com/kubernetes/ingress-nginx/master/deploy/provider/aws/service-l4.yaml
$ kubectl apply -f https://raw.githubusercontent.com/kubernetes/ingress-nginx/master/deploy/provider/aws/patch-configmap-l4.yaml

Let us go through this and see what is going on. The first file (mandatory.yaml) will set up several config maps, a service account and a new role and connect the service account with the newly created role. It then starts a deployment to bring up one instance of the nginx controller itself. You can see this pod running if you do a

kubectl get pods --namespace ingress-nginx

The first AWS specific manifest file service-l4.yaml will establish a Kubernetes service of type LoadBalancer. This will create an elastic load balancer in your AWS VPC. Traffic on the ports 80 and 443 is directed to the nginx controller.

kubectl get svc --namespace ingress-nginx

Finally, the second AWS specific file will update the config map that stores the nginx configuration and set the parameter use-proxy-protocol to True.

To verify that the installation has worked, you can use aws elb describe-load-balancers to verify that a new load balancer has been created and curl the DNS name provided by

kubectl get svc ingress-nginx --namespace ingress-nginx

from your local machine. This will still give you an error as we have not yet defined ingress rule, but show that the ingress controller is up and running.

Setting up and testing an ingress rule

Having our ingress controller in place, we can now set up ingress rules. To have a toy example at hand, let us first apply a manifest file that will

  • Add a deployment of two instances of the Apache httpd
  • Install a service httpd-service accepting traffic for these two pods on port 8080
  • Add a deployment of two instances of Tomcat
  • Create a service tomcat-service listening on port 8080 and directing traffic to these two pods

You can either download the file here or directly use the URL with kubectl

$ kubectl apply -f https://raw.githubusercontent.com/christianb93/Kubernetes/master/network/ingress-prep.yaml
deployment.apps/httpd created
deployment.apps/tomcat created
service/httpd-service created
service/tomcat-service created

When all pods are up and running, you can again spawn a shell in one of the pods and use the DNS name entries created by Kubernetes to verify that the services are available from within the pods.

$ pod=$(kubectl get pods --output \
  jsonpath="{.items[0].metadata.name}")
$ kubectl exec -it $pod "/bin/bash"
bash-4.4# apk add curl
bash-4.4# curl tomcat-service:8080
bash-4.4# curl httpd-service:8080

Let us now define an ingress rule which directs requests to the path /httpd to our httpd service and correspondingly requests to /tomcat to the Tomcat service. Here is the manifest file for this rule.

apiVersion: extensions/v1beta1
kind: Ingress
metadata:
  name: test-ingress
  annotations:
    nginx.ingress.kubernetes.io/rewrite-target : "/"
spec:
  rules:
  - http:
      paths:
      - path: /tomcat
        backend:
          serviceName: tomcat-service
          servicePort: 8080
      - path: /alpine
        backend: 
          serviceName: alpine-service
          servicePort: 8080

The first few lines are familiar by now, specifying the API version, the type of resource that we want to create and a name. In the metadata section, we also add an annotation. The nginx ingress controller can be configured using this and similar annotations (see here for a list), and this annotation is required to make the example work.

In the specification section, we now define a set of rules. Our rule is a http rule, which, at the time of writing, is the only supported protocol. This is followed by a list of paths. Each path consists of a selector (“/httpd” and “/tomcat” in our case), followed by the specification of a backend, i.e. a combination of service name and service port, serving requests matching this path.

Next set up the ingress rule. Assuming that you have saved the manifest file above as ingress.yaml, simply run

$ kubectl apply -f ingress.yaml
ingress.extensions/test-ingress created

Now let us try this. We already know that the ingress controller has created a load balancer for us which serves all ingress rules. So let us get the name of this load balancer from the service specification and then use curl to try out both paths

$ host=$(kubectl get svc ingress-nginx -n ingress-nginx --output\
  jsonpath="{.status.loadBalancer.ingress[0].hostname}")
$ curl -k https://$host/httpd
$ curl -k https://$host/tomcat

The first curl should give you the standard output of the httpd service, the second one the standard Tomcat welcome page. So our ingress rule works.

Let us try to understand what is happening. The load balancer is listening on the HTTPS port 443 and picking up the traffic coming from there. This traffic is then redirected to a host port that you can read off from the output of aws elb describe-load-balancers, in my case this was 31135. This node port belongs to the service ingress-nginx that our ingress controller has set up. So the traffic is forwarded to the ingress controller. The ingress controller interprets the rules, determines the target service and forwards the traffic to the target service. Ignoring the fact that the traffic goes through the node port, this gives us the following simplified picture.

Ingress

In fact, this diagram is a bit simplified as (see here) the controller does not actually send the traffic to the service cluster IP, but directly to the endpoints, thus bypassing the kube-proxy mechanism, so that advanced features like session affinity can be applied.

Ingress rules have many additional options that we have not yet discussed. You can define virtual hosts, i.e. routing based on host names, define a default backend for requests that do not match any of the path selectors, use regular expressions in your paths and use TLS secrets to secure your HTTPS entry points. This is described in the Kubernetes networking documentation and the API reference.

Creating ingress rules in Python

To close this post, let us again study how to implement Ingress rules in Python. Again, it is easiest to build up the objects from the bottom to the top. So we start with our backends and the corresponding path objects.

tomcat_backend=client.V1beta1IngressBackend(
          service_name="tomcat-service", 
          service_port=8080)
httpd_backend=client.V1beta1IngressBackend(
          service_name="httpd-service", 
          service_port=8080)

Having this, we can now define our rule and our specification section.

rule=client.V1beta1IngressRule(
     http=client.V1beta1HTTPIngressRuleValue(
            paths=[tomcat_path, httpd_path]))
spec=client.V1beta1IngressSpec()
spec.rules=[rule]

Finally, we assemble our ingress object. Again, this consists of the metadata (including the annotation) and the specification section.

ingress=client.V1beta1Ingress()
ingress.api_version="extensions/v1beta1"
ingress.kind="Ingress"
metadata=client.V1ObjectMeta()
metadata.name="test-ingress"
metadata.annotations={"nginx.ingress.kubernetes.io/rewrite-target" : "/"}
ingress.metadata=metadata
ingress.spec=spec

We are now ready for our final steps. We again read the configuration, create an API endpoint and submit our creation request. You can find the full script including comments and all imports here

config.load_kube_config()
apiv1beta1=client.ExtensionsV1beta1Api()
apiv1beta1.create_namespaced_ingress(
              namespace="default",
              body=ingress)