Learning Kafka with Python – implementing a database sink

Very often, either the source or the target of a Kafka based message queue is a classical relational database. Consuming data and using it to update a database table sounds straightforward, but poses a few challenges around reliability and delivery semantics. In this post, we look into two options to realize such an architecture.

The challenge

To illustrate the problem we are aiming to solve, let us suppose that we want to build an application that maintains an account balance. There is a front-end which acts as a Kafka producer and which a customer can use to either deposit money in the account or withdraw money. These transactions are then written into a Kafka topic, and a consumer reads from this topic and updates the balance kept in a relational datastore.


Thus the messages stored in the Kafka topic contains transactions, i.e. changes in the account balance, while the database table we need to maintain contains the actual balance. This is an example of what is called stream / table duality in the world of streaming – the event stream is the source of truth and reflects changes, the database contains the resulting state of the world after all changes have been applied and can at any time be reconstructed from the stream.

When we want to implement his pattern, the crucial part of our design will be to make sure that every message in the queue leads to only one update of the balance, so that no transaction is missed and no transaction is processed twice.

Pattern 1: using a message ID and de-duplication

The first pattern we could use to make this work is to achieve de-duplication based on a unique message ID, which ideally is an integer that is increased with every message. The consumer could then store the sequence number of the last processed message in the database, and could thus detect duplicates.

In a bit more detail, this would work as follows. When the producer processes an action, it first retrieves a unique message ID. This message ID could be created using a database sequence, or – if the used database does not allow for this – it could read a sequence number from a table, increment it by one and update the table accordingly. It would then add this sequence number as a key to the Kafka message.

The consumer would store the latest processed sequence number in the database. When it reads a mesage from Kafka, it uses this number to check whether the message has already been processed before. If not, it updates balance and sequence number in one transaction and then commits the new offset to Kafka.


Let us see how duplicates are handled in this pattern. If, for some reason, the topic contains two messages with the same ID, the consumer will process the first message, increase the latest processed sequence number in the database and commit the new balance. It will then read the duplicate, compare its sequence number against the latest value, detect the duplicate and simply ignore it. Thus the consumer is able to do a de-duplication based on the sequence number.

Unfortunately, this simple pattern has one major disadvantage – it does not work. The problem is that the order of messages is not guaranteed across partitions. Suppose, for instance, that the producer creates the following messages:

  • Message 1, partition 0
  • Message 2, partition 1
  • Message 3, partition 0

Now it might happen that the consumer processes the messages in the order 1,3,2. Thus the consumer would, after having processed message 3, set the highest consumed sequence number to “3” in the database. When now processing message two, our simple duplicate detection algorithm would then classify this message as a duplicate. Thus, to make this work, it is vital that we store the highest processed sequence number by partition and not across all partitions. We could even create the sequence number per partition, which would also remove a possible bottleneck as creating the sequence number would otherwise effectively serialize the producers.

Also note that, as we need to maintain a “last processed” sequence number per partition in a database, we also need to maintain this table if we add new partitions to our topic or remove partitions.

Alternatively, if there is a message ID which is not ordered and increasing with each message, the consumer could store all processed message IDs in a separate database table to keep track of the messages that have already been processed (which, depending on the throughput, might require some sort of periodic cleanup to avoid that the table grows too big).

If the consumer fails after committing the changed balance to the database, but before committing the offset to Kafka, the same mechanism will kick in, as long as we commit the new balance and the updated value of the last processed message in one database transaction. Thus, duplicates can be detected, and we can therefore rely on the standard mechanisms Kafka offers to manage the offset – we could even read entire batches from the topic and use auto-commit to let Kafka manage the offset.

Let us now try out this pattern with the code samples from my repository. To be able to run this, you will have to clone my GitHub repository and follow the instructions in my initial post in this series to bring up your local Kafka cluster. Then, make sure that your current working directory is the root directory of the repository and run the following commands which will install the Python package to access a MySQL database and bring up a Docker based installation of MySQL with a prepared database.

pip3 install mysql-connector-python

This script will start a Docker container kafka-mysql running MySQL, add a user kafka with a default password to it and create a database kafka for which this user has all privileges. Next, let us run a second script which will (re)-create a Kafka topic transactions with two partitions and initialize the database.


Now we run three Python scripts. The first script is the producer that we have already described, which will simply create ten records, each of which describing a transaction. The second script is our consumer. It will

  • subscribe to the transactions topic
  • Read batches of records from the topic
  • for each record, it will (in one transaction!) update the account balance and the last processed sequence number
  • commit offsets in Kafka after processing a batch
  • apply the duplicate detection mechanism outlined above while processing each record

Finally, the third script is a little helper that will (without committing any offsets, so that we can run it over and over again) scan the topic, calculate the expected account balances, retrieve actual account balances from the database and check whether they coincide.

python3 db/producer1.py
python3 db/consumer1.py
python3 db/dump1.py --check

Let us now try to understand how our scripts work if an error occurs. For that purpose, the consumer has a built-in mechanism to simulate random errors between committing to the database and committing offsets to Kafka which is activated using the parameter –error_probability. Let us repeat our test run, but this time we simulate an error with a probability of 20%.

./db/reset.sh && python3 db/producer1.py
python3 db/consumer1.py --error_probability=0.2 --verbose
python3 db/dump1.py --check

You should now see that the consumer processes a couple of messages before our simulated error kicks in, which will make the consumer stop. The check script should detect the difference resulting from the fact that not all messages have been processed. However, when we now restart the script without simulating any errors, we should see that even though there are duplicate records, these records are properly detected and eventually, all records will be processed and the balances will again be correct.

python3 db/consumer1.py --verbose
python3 db/dump1.py --check

Pattern 2: store the consumer offset in the target database

Let us now take a look at an alternative implementation (which is in fact the pattern which you will hit upon first when consulting the Kafka documentation or other sources) – maintaining the offsets in the database altogether. This implementation does not require a sequence number generated by the producer, but is therefore also not able to detect any duplicate messages in case the duplicates originate already in the producer.

The idea behind this pattern is simple. Instead of asking Kafka to maintain offsets for us, the consumer application handles offsets independently and maintains a database table containing offsets and partitions. When a record is processed, the consumer opens a database transaction, updates the account balance and updates the offset in the same transaction. This guarantees that offsets and balances are always in sync.

This sound simple enough, but there are a few subtleties that need to be kept in mind when this is combined with consumer groups, i.e. if Kafka handles partition assignments dynamically. Whenever a partition is assigned to our consumer, we need to make sure that we position the consumer at the latest committed offset before processing any records. Conversely, when an assignment is revoked, we need to commit the current offsets to make sure that they are not lost. This can be implemented using a rebalance listener which is registered when we subscribe to the topic.


To try this out, run the following commands which will first reset the database and the involved topic, run the producer to create a few messages (this time without the additional sequence number) and then run our new consumer to read the messages and update the database.

./db/reset.sh && python3 db/producer2.py
python3 db/consumer2.py 
python3 db/dump2.py --check

The last command, which again checks the updated database table against the expected values, should again show you that expected and actual values in the database match.

It is instructive to try a few more advanced scenarios. You could, for instance, run the producer and then start a consumer which reads the first set of messages produced by the consumer (use the switch –runtime=3600 to make this consumer run for one hour). Then, start a second consumer in a separate terminal window and observe the rebalancing that occurs. Finally, run the producer again and verify that the partition assignment worked and both consumers are processing the messages in their respective partition. And again, you can simulate errors along the way and see how the consumer behaves.

Learning Kafka with Python – producing data

As proud owners of a brand new Kafka installation, we are now ready to explore how applications interact with Kafka. Today, we will look at producers and understand how they write data to Kafka.

Basic design considerations

At first glance, writing data to Kafka sounds easy – connect to a Kafka broker and submit a message. However, there are some basic design considerations that are relevant when building a Kafka producer.

First, we have seen that Kafka stores the data in a topic in multiple partitions, and then each partition has a leader which is responsible for writing messages into the partition. Thus a producer needs to determine to which partition a message should be written and contact the responsible leader for this partition.

Defining the mapping of messages to partitions can be crucial for reliability and scalability of your application. Partitions determine how the application can scale horizontally, and we will learn later that partitions also determine to which extent consumers can scale. In addition, Kafka guarantees message order only within a partition. Specifically, if message A is written to partition before message B is written to the same partition, message A will receive a lower offset than B and will be read first by a (well behaving) consumer. This is no longer true if messages A and B are written to different partitions. Think of partitions as lanes on a highway – there is no guarantee that two cars entering the highway in a certain order but in different lanes will arrive at the destination in the same order.

Often, you will want to use a business entity to partition your data. If you are building a customer facing application, you might want to partition your data by customer group, if you are building a securities processing application the financial instrument might be a good partition criterion, if you are maintaining accounts then the account number might be a good choice and so forth. In other cases, where ordering is not important, you might go for a purely technical criterion.

The next fundamental question we have to figure out is when a message is considered to be successfully written. When the broker has received it? Or when the leader has written the message? Or should we wait until all followers have successfully stored the message? And what if a follower lags behind – should we stop writing messages until the follower has recovered or move on, accepting that we have lost one follower without knowing whether it will recover at a later time?

Kafka does not give a definitive answer to all these questions, but leaves you a choice – put differently, when you create a producer, you can specify its behavior using a variety of options. So let us now see how this is done in Python.

The producer object

Let us now see how a producer can be created using Python. If you have not yet done so, please install the Kafka Python library to be able to run the examples.

pip3 install kafka-python

This series uses version 2.0.1 of the library, if you want to use exactly that version you need to specify that as usual, i.e. run

pip3 install kafka-python==2.0.1

To send messages to Kafka, the first thing we need to do is to create a producer object, i.e. an instance of the class kafka.Producer. The init-method of this class accepts a large number of arguments, but in the most straightforward case, there is exactly one argument bootstrap_servers. This argument is a list of listener URLs, for instance, which the producer will use to make an initial connection to a Kafka broker. This list does not need to contain all brokers, in fact one entry will do, but the producer will use this broker to obtain other brokers if needed. It is a good idea to list at least two or three brokers here, in case one broker is temporarily unavailable. So creating a producer could look like this.

import kafka
producer=kafka.KafkaProducer(bootstrap_servers=["broker1:9092", "broker2:9092"])

When started, a Producer will create a separate sender thread which will asynchronously send messages to the brokers. In addition, it will create an internal client which holds the actual connections to the Kafka cluster.

When using an SSL listener, we need a few additional configuration items. Specifically, we need to add the following named parameter when creating a KafkaProducer

  • ssl_cafile – this is the location of a CA certificate that the client will use to verify the certificate presented by the server
  • ssl_certfile – this is the location of the client certificate that the client will in turn present to the server when the server requests a certificate
  • ssl_keyfile – the key matching the client certificate

In order to be bit more flexible when it comes to connecting to different setups, the code examples that we will use in this series read the list of brokers and the SSL configuration from a YAML file config.yaml that the installation script will create in the subdirectory .state of the repository directory. All test scripts accept a parameter –config that you can use to overwrite this default location, in case you want to use your own configuration.

Once we are done using a producer, we should close it using producer.close() to clean up.

Keys and partitions

Once we have a producer in our hands, we can actually start to send messages. This requires only two parameters: the topic (a string) and the payload of the message (a sequence of bytes).

producer.send("test",value=bytes("hello", "utf-8"))

Note that this will create a topic “test” if it does not exist yet, using default values specified in the server configuration (server.properties), so be careful to use this as the default configuration might not be what you want (you can also turn this feature off by setting auto.create.topics.enable to false in server.properties).

Now you might remember from my previous post that a record in Kafka actually consists of a payload and a key. Here, we do not specify a key, so the key will remain empty. But of course, you can define a key for your record by simply adding the named parameter key to the method invocation, like this.

        value=bytes("hello", "utf-8"),
        key=bytes("mykey", "utf-8"))

What about partitions? The low-level protocol that a Kafka broker understands expects the client to send a PRODUCE request containing a valid partition ID, so it is up to the client to take this decision. The application programmer can either decide to explictly specify a partition ID (an integer) as an optional parameter to the send method, or let the framework take the decision. In this case, a so-called partitioner is invoked which, based on the value of the key, selects a partition to write to.

An application can set the configuration item partitioner when creating a producer to define a customer partitioner (which is simply a callable object that the producer will invoke). If no partitioner is specified, the default partitioner will be used, which implements the following logic.

  • If no key is given, the default partitioner will simply distribute the messages randomly across the available partitions
  • If a key is provided, a hash value of the key will be computed (using a so-called MurmurHash, which will always be an integer. The value of this hash (more precisely, of its last 31 bits) modulo the number of partitions will then determine the partition to use

The important thing to keep in mind is that if you do not provide a key, your message will end up in a random partition. If you do provide a key, then Kafka will guarantee that messages with the same key will go to the same partition and hence be processed in order.


In our examples so far, we have passed a sequence of bytes to the send method, both for the key and the value. This is the format that the low-level protocol expects – at the end of the day, keys and values are sent over the wire as a sequence of bytes, and stored as a sequence of bytes.

In many applications, however, you will want to store more complex data types, like JSON data or even objects. So be able to do this with Kafka, you will have to convert your data into a sequence of bytes when sending the data, a process known as serializing.

When creating the producer, you can specify your own serializers for keys and payloads by adding the named parameters key_serializer and value_serializer when creating the producer. Here, a serializer can either be a function which accepts whatever input format you prefer and returns a sequence of bytes, or an instance of the class kafka.serializer.Serializer which has a serialize method which the framework will invoke.

Suppose for instance you wanted to serialize JSON data. Then, you need to provide a serializer which accepts a JSON object and returns a sequence of bytes. For that purpose, we can use the standard json.dumps method to first produce a string, and then encode the string using e.g. UTF-8 to obtain a sequence of bytes. Thus your serializer would look something like

def serialize(data):
    return bytes(json.dumps(data), "utf-8")

and when creating the producer, the call would be something like

   value_serializer=serialize, ...)

Choosing a reasonable serializer is an important design choice. As Kafka topics are designed to be durable objects, you need to think about things like versioning when the decoding changes as you release new features, and obviously all components of a system need to use matching serializers and de-serializers to be able to exchange data. Many Kafka projects actually use third-party serializers, like Apache Avro or Google’s protobuf.


So far, we have seen how we can send messages using the send method of a KafkaProducer object. But in reality, you of course want to know whether your message was successfully send and stored by Kafka.

This leads us to the question at which point a new record can be considered to be committed to a Kafka cluster, i.e. stored and available for consumers. Before getting into this, however, we first have to understand the notion of an in-sync replica.

Recall that Kafka replication works by designating a leader for a partition and zero, one or more followers which constantly ask the leader for new records in the partition and store them in their own copy of the partition log. As the replicas read the records from the leader, the leaders knows which record has been delivered to which follower. The partition can therefore determine whether a replica is out-of-sync, which happens if a follower fails to retrieve the latest message within a defined time frame, or in-sync.

Having enough in-sync replicas is vital for the reliability. If a leader goes down, Kafka has to elect a new leader from the set of available replicas. Of course, choosing an out-of-sync replica to be the new leader would imply that we promote a replica to the master and thus to our new source of truth that not yet replicated all messages that producers have sent to the leader. Thus, making such a replica the new leader results in a loss of records. In some situations, you might still opt to do so, which Kafka allows you if the parameter unclean.leader.election.enable is set to True in the broker configuration.


Now let us come back to the question of when a message sent by a producer will be considered committed. Again, Kafka offers you a choice, governed by the value of the parameter acks of a producer.

  • When acks = 1 (the default), a message will be considered committed once the leader acknowledges the message. Thus Kafka guarantees that a committed message has been added to the leading partition, but not that it has already been written to one or even all replicas. Note that, as the leader might cache the record in memory, this can lead to data loss if the leader goes down after acknowledging the message, but before a follower has copied it
  • When acks = -1 (all), a record will be considered committed only once the leader and in addition all in-sync replicas have acknowledged receipt of the message. As long as you have enough in-sync replicas, this gives you a strong guarantee that the message is available on several nodes and thus data loss has become very unlikely
  • Finally, a value of acks = 0 means that the message will be considered committed once it has been sent over the network, regardless of any acknowledgement from the leader or a follower. This obviously is a very weak guarantee and only reasonable if you have a strong focus on throughtput and can live with a loss of (potentially many) records

When usings acks = all, the number of in-sync replicas is of course vital. To illustrate this, let us assume that you have configured a topic with three replicas, but both followers have become out-of-sync. Now a message will be committed once the leader has written it, meaning that if the leader is lost, the record will be lost as well. To avoid such a scenario, you can set the server property min.insync.replicas. This number determines how many replicas (including the leader) need to be in-sync in order to still accept new messages. Thus if you use, for instance, a topic with a replication factor of three and min.insync.replicas=2 in combinations with acks=-1, then Kafka will guarantee that a message is only reported as committed once the leader and at least one follower have received the record.


Finally, there is one more parameter that is important for the reliability of a producer – max_in_flight_requests_per_connection. A request (which typically contains more than one record to be written) will be considered as in-flight as long as no result – either an acknowledgement or an error – has been received from the broker. If this parameter (which defaults to 5!) is set to a value greater than one, this implies that the producer will not wait for an acknowledgement before sending the next batch. In combination with retries, this can imply that the order in which messages are added to the log is not identical to the order in which they have been sent, and if the producer goes down, in-flight messages night be lost if the broker is not able to process them. Thus set this to one if you need strong guarantees on at-least-one delivery and ordering.

Retries and error handling

Finally, the last important design decision that you need to take when writing a producer is how to deal with errors.

The send method that we are using delivers messages asynchronously to the actual sender and immediately returns. To figure out whether the record was successfully committed, we therefore cannot simply use its return value, but need a different approach. Therefore, the send method returns a handler which can later be used to retrieve the status of the message, a so called Future, or, more precisely, a subclass called FutureRecordMetadata. Once we have this object, we can call its get method with a timeout in seconds to wait for the request to complete. If the request was successful, this method returns a dictionary containing record metadata, otherwise it raises an exception of type kafka.errors.KafkaError. Alternatively, you can also specify callback functions for successful and failed sends.

Note that the producer also has an option to automatically retry failed messages, which can be configured by setting the parameter retries to a value different from zero. In general, however, you should be careful with this as it might conflict with ordering, see the discussion of in-flight requests above.

Testing our producer

After all this theory, it is now time to test our producer. I assume that you have followed my previous post and installed Kafka in three virtual nodes on your PC. Now navigate to the root of the repository and run the following command to create a test topic.

./kafka/bin/kafka-topics.sh \
  --bootstrap-server=$(./python/getBrokerURL.py) \
  --command-config=.state/client_ssl_config.properties \
  --create \
  --topic test \
  --replication-factor 3 \
  --partitions 2 

Let us see what this command is doing. The script that we run, kafka-topics.sh, is part of the standard Kafka admin command line tools that are bundled with the distribution. In the second line, we invoke a little Python script that evaluates the configuration in YAML format which our installation procedure has created to determine the URL of a broker, which we then pass to the script. In the third line, we provide a Java properties file containing the SSL parameters to connect to our secured listener.

The remaining switches instruct the tool to create a new topic called “test” with a replication factor of three and two partitions (which, of course, will fail if you have already created this topic in the previous post).

Next, we will run another tool coming with Kafka – the console consumer. This is a simple consumer that will simply subscribe to a topic and dump all records in this topic to the console. To run it, enter

kafka/bin/kafka-console-consumer.sh   \
   --bootstrap-server $(./python/getBrokerURL.py)   \
   --consumer.config .state/client_ssl_config.properties \
   --from-beginning \
   --topic test 

Now open an additional terminal, navigate to the root of the repository and run the producer.

python3 python/producer.py

This should print the producer configuration used and the number of messages produced, plus timestamps and the number of seconds and microseconds it took to send all messages. In the first terminal window, in which the consumer is running, you should then see this messages flicker by.

Now let us try out a few things. First, let us create 10000 messages with a set of configurations promising the highest throughput (acks=0, fully asynchronous send, no keys provided, five requests in flight).

python3 python/producer.py \
  --messages=10000 \

On my PC, producing these 10000 messages takes roughly half a second, i.e. our throughput is somewhere around 20.000 messages per seconds, without any tuning (of course the results will heavily depend on the machine on which you are running this). Next, we produce again 10000 messages, but this time, we use very conservative settings (acks=all, only one message in flight, wait for reply after each request, create and store keys and use them to determine the partition).

python3 python/producer.py \
  --messages=10000 \
  --ack=-1 \
  --max_in_flight_requests_per_connection=1 \
  --wait \

Obviously, this will be much slower. On my PC, this took roughly 17 seconds, i.e. it is slower by a factor of about 25 than the first run. This hopefully illustrates nicely that Kafka leaves you many choices for trade-offs between performance and availability. Use this freedom with care and make sure you understand the consequences that the various settings have, otherwise you might loose data!

Putting it all together – the send method behind the scenes

Having seen a producer in action, it is instructive to take at a short look at the source code of the Python implementation of KafkaProducer, specifically at its send method. First, the partitions of the topic are retrieved, either from cached metadata or by requesting updated metadata from the server. Then, the key and value are serialized, and the partitioner is invoked which determines the partition to which we write the record.

Next, instead of directly sending the record to the broker, it is appended to an internal buffer using an internal helper class called a RecordAccumulator. If the accumulator signals that the buffer is full, the sender thread is triggered which will then actually transmit the entire batch to the Kafka broker. Finally, the future object is returned.


This completes our discussion of Kafka producers. In the next post, we will learn how we can consume data from Kafka and how consumers, producers and brokers play together.

Learning Kafka with Python – the basics

More or less by accident, I recently took a closer look at Kafka, trying to understand how it is installed, how it works and how the Python API can be used to write Kafka applications with Python. So I decided its time to create a series of blog posts on Kafka to document my learnings and to (hopefully) give you a quick start if you plan to get to know Kafka in a bit more detail. Today, we take a first look at Kafkas architecture before describing the installation in the next post.

What is Kafka?

Kafka is a system for the distributed processing of messages and streams which is maintained as Open Source by the Apache Software foundation. Initially, Kafka was develop by LinkedIn and open-sourced in 2011. Kafka lets applications store data in streams (comparable to a message queue, more on that later), retrieve data from streams and process data in streams.

From the ground up, Kafka has been designed as a clustered system to achieve scalability and reliability. Kafka stores its data on all nodes in a cluster using a combination of sharding (i.e. distributing data across nodes) and replication (i.e. keeping the same record redundantly on several nodes to avoid data loss if a nodes goes down). Kafka clusters can reach an impressive size and throughput and can be employed for a variety of use cases (see for instance this post,this post or this posts to get an idea). Kafka uses another distributed system – Apache ZooKeeper – to store metadata in a reliable way and to synchronize the work of the various nodes in a cluster.

Topics and partitions

Let us start to take a look at some of the core concepts behind Kafka. First, data in Kafka is organized in entities called topics. Roughly speaking, topics are a bit like message queues. Applications called producers write records into a topic which is then stored by Kafka in a highly fault-tolerant and scalable way. Other applications, called consumers, can read records from a topic and process them.


You might have seen similar diagrams before, at least if you have ever worked with messaging systems like RabbitMQ, IBM MQ Series or ActiveMQ. One major difference between these systems and Kafka is that a topic is designed to be a persistent entity with an essentially unlimited history. Thus, while in other messaging systems, messages are typically removed from a queue when they have been read or expired, Kafka records are kept for a potentially very long time (even though you can of course configure Kafka to remove old records from a topic after some time). This is why the data structure that Kafka uses to store the records of a topic is called a log – conceptually, this is like a log file into which you write records sequentially and which you clean up from time to time, but from which you rarely ever delete records programmatically.

That looks rather simple, but there is more to it – partitions. We have mentioned above that Kafka uses sharding to distribute data across several nodes. To be able to implement this, Kafka somehow needs to split the data in a topic into several entities which can then be placed on different nodes. This entity is called a partition. Physically, a partition is simply a directory on one of the nodes, and the data in a partition is split into several files called segments.


Of course, clients (i.e. producers and consumers) do not write directly into these files. Instead, there is a daemon running on each node, called the Kafka broker, which is maintaining the logs on this node. Thus if a producer wants to write into a topic, it will talk to the Broker responsible for the logs on the target node (which depends on the partition, as we will see in later post when we discuss producers in more detail), send the data to the Broker, and the Broker will store the data by appending it to the log. Similarly, a consumer will ask a Broker to retrieve data from a log (with Kafka, consumers pull for data, in contrast to some other messaging systems where data is pushed out to a consumer).

Records in a log are always read and written in batches, not as individual records. A batch consists of some metadata, like the number of records in the batch, and a couple of records. Each record again starts with a short header, followed by a record key and the record payload.

It is important to understand that in Kafka, the record key is NOT identifying a record uniquely. Actually, the record key can be empty and is mainly used to determine the partition to which a record will be written (again, we will discuss this in more detail in the post on producers). Instead, a record is identified by its offset within a partition. Thus if a consumer wants to read a specific record, it needs to specify the topic, the partition number and the offset of the record within the partition. The offset is simply an integer starting at zero which is increased by one for every new record in the topic and serves as a unique, primary key within this partition (i.e. it is not unique across partitions).

Replication, leaders and controllers

Sharding is one pattern that Kafka uses to distribute records across nodes. Within a topic, each partition will be placed on a different node (unless of course the number of nodes is smaller than the number of partitions, in which case some nodes will hold more than one partition). This allows us to scale a Kafka cluster horizontally and to maintain topics that exceed the storage capacity of a single node.

But what if a node goes down? In this case, all data on that node might be lost (in fact, Kafka heavily uses caching and will not immediately flush to disk when a new record is written, so if a node comes down, you will typically lose data even if your file system survives the crash). To avoid data loss in that case, Kafka replicates partitions across nodes. Specifically, for each partition, Kafka will nominate a partition leader and one or more followers. If, for instance, you configure a topic with a replication factor of three, then each partition will have one leader and two followers. All of those three brokers will maintain a copy of the partition. A producer and a consumer will always talk to the partition leader, and when data is written to a partition, it will be synced to all followers.

So let us assume that we are running a cluster with three Kafka nodes. One each node, there is a broker managing the logs on this node. If we now have a topic with two partitions and a replication factor of three, then each of the two partitions will be stored three times, once by the leader and two times on one of the broker nodes which are followers for this topic. This could lead to an assignment of partitions and replicas to nodes as shown below.


If in this situation one of the nodes, say node 1, goes down, then two things will happen. First, Kafka will elect a new leader for partition 0, say node 2. Second, it will ask a new node to create a replica for partition 1, as (even though node 1 was not the leader for partition 1) we now have only one replica for partition 1 left. This process is called partition reassignment.

To support this process, one of the Kafka brokers will be elected as the controller. It is the responsibility of the controller to detect failed brokers and reassign the leadership for the affected partitions.

Producers and consumers

In fact, the process of maintaining replicas is much more complicated than this simple diagram suggests. One of the major challenges is that it can of course happen that a record has been received by the leader and not yet synchronized to all replicas when the leader dies. Will these messages be lost? As often with Kafka, the answer is “it depends on the configuration” and we will learn more about this in one of the upcoming posts.

We have also not yet said anything about the process of consuming from a topic. Of course, in most situations, you will have more than one consumer, either because there is more than one application interested in getting the data or because we want to scale horizontally within an application. Here, the concept of a consumer group comes into play, which roughly is the logical equivalent of an application in Kafka. Within a consumer group, we can have as many consumers as the topic has partitions, and each partition will be read by only one consumer. For consumers, the biggest challenge is to keep track of the messages already read, which is of course essential if we want to implement guarantees like at-least-once or even exactly-once delivery. Kafka offers several mechanisms to deal with this problem, and we will discuss them in depth in a separate post on consumers.

In the next post of this series, we will leave the theory behind for the time being and move on to the installation process, so that you can bring up your own cluster in virtual machines on your PC to start playing with Kafka.

OpenStack Nova – deep-dive into the provisioning process

In the last post, we did go through the installation process and the high-level architecture of Nova, talking about the Nova API server, the Nova scheduler and the Nova agent. Today, we will make this a bit more tangible by observing how a typical request to provision an instance flows through this architecture.

The use case we are going to consider is the creation of a virtual server, triggered by a POST request to the /servers/ API endpoint. This is a long and complicated process, and we try to focus on the main path through the code without diving into every possible detail. This implies that we will skim over some points very briefly, but the understanding of the overall process should put us in a position to dig into other parts of the code if needed.

Roughly speaking, the processing of the request will start in the Nova API server which will perform validations and enrichments and populate the database. Then, the request is forwarded to the Nova conductor which will invoke the scheduler and eventually the Nova compute agent on the compute nodes. We will go through each of these phases in a bit more detail in the following sections.

Part I – the Nova API server

Being triggered by an API request, the process of course starts in the Nova API server. We have already seen in the previous post that the request is dispatched to a controller based on a set of hard-wired routes. For the endpoint in question, we find that the request is routed to the method create of the server controller.

This method first assembles some information like the user data which needs to be passed to the instance or the name of the SSH key to be placed in the instance. Then, authorization is carried out be calling the can method on the context (which, behind the scenes, will eventually invoke the Oslo policy rule engine that we have studied in our previous deep dive). Then the request data for networks, block devices and the requested image is processed before we eventually call the create method of the compute API. Finally, we parse the result and use a view builder to assemble a response.

Let us now see follow the call into the compute API. Here, all input parameters are validated and normalized, for instance by adding defaults. Then the method _provision_instances is invoked, which builds a request specification and the actual instance object and stores these objects in the database.

At this point, the Nova API server is almost done. We now call the method schedule_and_build_instances of the compute task API. From here, the call will simply be delegated to the corresponding method of the client side of the conductor RPC API which will send a corresponding RPC message to the conductor. At this point, we leave the Nova API server and enter the conductor. The flow through the code up to this point is summarized in the diagram below.


Part II – the conductor

In the last post, we have already seen that RPC calls are accepted by the Nova conductor service and are passed on to the Nova conductor manager. The corresponding method is schedule_and_build_instances

This method first retrieves the UUIDs of the instances from the request. Then, for each instance, the private method self._schedule_instances is called. Here, the class SchedulerQueryClient is used to submit an RPC call to the scheduler, which is being processed by the schedulers select_destinations method.

We will not go into the details of the scheduling process here, but simply note that this will in turn make a call to the placement service to retrieve allocation candidates and then calls the scheduler driver to actually select a target host.

Back in the conductor, we check whether the scheduling was successful. It not, the instance is moved into the cell0. If yes, we determine the cell in which the selected host is living, update some status information and eventually, at the end of the method, invoke the method build_and_run_instance of the RPC client for the Nova compute service. At this point, we leave the Nova conductor service and the processing continues in the Nova compute service running on the selected host.


Part III – the processing on the compute node

We have now reached the Nova compute agent running on the selected compute node, more precisely the method build_and_run_instance of the Nova compute manager. Here we spawn a separate worker thread which runs the private method _do_build_and_run_instance.

This method updates the VM state to BUILDING and calls _build_and_run_instance. Within this method, we first invoke _build_resources which triggers the creation of resources like networks and storage devices, and then move on to the spawn method of the compute driver from nova.virt. Note that this is again a pluggable driver mechanism – in fact the compute driver class is an abstract class, and needs to be implemented by each compute driver.

Now let us see how the processing works in our specific case of the libvirt driver library. First, we create an image for the VM by calling the private method _create_image. Next, we create the XML descriptor for the guest, i.e. we retrieve the required configuration data and turn it into the XML structure that libvirt expects. Finally, we call _create_domain_and_network and finally set a timer to periodically check the state of the instance until the boot process is complete.

In _create_domain_and_network, we plug in the virtual network interfaces, set up the firewall (in our installation, this is the point where we use the No-OP firewall driver as firewall functionality is taken over by Neutron) and then call _create_domain which creates the actual guest (called a domain in libvirt).

This delegates the call to nova.virt.libvirt.Guest.create()and then powers on the guest using the launch method on the newly created guest. Let us take a short look at each of these methods in turn.

In nova.virt.libvirt.Guest.create(), we use the write_instance_config method of the host class to create the libvirt guest without starting it.

In the launch method in nova/virt/libvirt/guest.py, we now call createWithFlags on the domain. This is actually a call into the libvirt library itself and will launch the previously defined guest.


At this point, our newly created instance will start to boot. The timer which we have created earlier will check in periodic intervalls whether the boot process is complete and update the status of the instance in the database accordingly.

This completes our short tour through the instance creation process. There are a few points which we have deliberately skipped, for instance the details of the scheduling process, the image creation and image caching on the compute nodes or the network configuration, but the information in this post might be a good starting point for further deep dives.

OpenStack Nova – installation and overview

In this post, we will look into Nova, the cloud fabric component of OpenStack. We will see how Nova is installed and go briefly through the individual components and Nova services.


Before getting into the installation process, let us briefly discuss the various components of Nova on the controller and compute nodes.

First, there is the Nova API server which runs on the controller node. The Nova service will register itself as a systemd service with entry point /usr/bin/nova-api. Similar to Glance, invoking this script will bring up an WSGI server which uses PasteDeploy to build a pipeline with the actual Nova API endpoint (an instance of nova.api.openstack.compute.APIRouterV21) being the last element of the pipeline. This component will then distribute incoming API requests to various controllers which are part of the nova.api.openstack.compute module. The routing rules themselves are actually hardcoded in the ROUTE_LIST which is part of the Router class and maps request paths to controller objects and their methods.


When you browse the source code, you will find that Nova offers some APIs like the image API or the bare metal API which are simply proxies to other OpenStack services like Glance or Ironic. These APIs are deprecated, but still present for backwards compatibility. Nova also has a network API which, depending on the value of the configuration item use_neutron will either acts as proxy to Neutron or will present the legacy Nova networking API.

The second Nova component on the controller node is the Nova conductor. The Nova conductor does not expose a REST API, but communicates with the other Nova components via RPC calls (based on RabbitMQ). The conductor is used to handle long-running tasks like building an instance or performing a live migration.

Similar to the Nova API server, the conductor has a tiered architecture. The actual binary which is started by the systemd mechanism creates a so called service object. In Nova, a service objects represents an RPC API endpoint. When a service object is created, it starts up an RPC service that handles the actual communication via RabbitMQ and forwards incoming requests to an associated service manager object.


Again, the mapping between binaries and manager classes is hardcoded and, for the Stein release, is as follows.

  'nova-compute': 'nova.compute.manager.ComputeManager',
  'nova-console': 'nova.console.manager.ConsoleProxyManager',
  'nova-conductor': 'nova.conductor.manager.ConductorManager',
  'nova-metadata': 'nova.api.manager.MetadataManager',
  'nova-scheduler': 'nova.scheduler.manager.SchedulerManager',

Apart from the conductor service, this list contains one more component that runs on the controller node and use the same mechanism to handle RPC requests (the nova-console binary is deprecated and we use the noVNC proxy, see the section below, the nova-compute binary is running on the compute node, and the nova-metadata binary is the old metadata service used with the legacy Nova networking API). This is the the Nova scheduler.

The scheduler receives and maintains information on the instances running on the individual hosts and, upon request, uses the Placement API that we have looked at in the previous post to take a decision where a new instance should be placed. The actual scheduling is carried out by a pluggable instance of the nova.scheduler.Scheduler base class. The default scheduler is the filter scheduler which first applies a set of filters to filter out individual hosts which are candidates for hosting the instance, and then computes a score using a set of weights to take a final decision. Details on the scheduling algorithm are described here.

The last service which we have not yet discussed is Nova compute. One instance of the Nova compute service runs on each compute node. The manager class behind this service is the ComputeManager which itself invokes various APIs like the networking API or the Cinder API to manage the instances on this node. The compute service interacts with the underlying hypervisor via a compute driver. Nova comes with compute driver for the most commonly used hypervisors, including KVM (via libvirt), VMWare, HyperV or the Xen hypervisor. In a later post, we will go once through the call chain when provisioning a new instance to see how the Nova API, the Nova conductor service, the Nova compute service and the compute driver interact to bring up the machine.

The Nova compute service itself does not have a connection to the database. However, in some cases, the compute service needs to access information stored in the database, for instance when the Nova compute service initializes on a specific host and needs to retrieve a list of instances running on this host from the database. To make this possible, Nova uses remotable objects provided by the Oslo versioned objects library. This library provides decorators like remotable_classmethod to mark methods of a class or an object as remotable. These decorators point to the conductor API (indirection_api within Oslo) and delegate the actual method invocation to a remote copy via an RPC call to the conductor API. In this way, only the conductor needs access to the database and Nova compute offloads all database access to the conductor.

Nova cells

In a large OpenStack installation, access to the instance data stored in the MariaDB database can easily become a bottleneck. To avoid this, OpenStack provides a sharding mechanism for the database known as cells.

The idea behind this is that the set of your compute nodes are partitioned into cells. Every compute node is part of a cell, and in addition to these regular cells, there is a cell called cell0 (which is usually not used and only holds instances which could not be scheduled to a node). The Nova database schema is split into a global part which is stored in a database called the API database and a cell-local part. This cell-local database is different for each cell, so each cell can use a different database running (potentially) on a different host. A similar sharding applies to message queues. When you set up a compute node, the configuration of the database connection and the connection to the RabbitMQ service determine to which cell the node belongs. The compute node will then use this database connection to register itself with the corresponding cell database, and a special script (nova-manage) needs to be run to make these hosts visible in the API database as well so that they can be used by the scheduler.

Cells themselves are stored in a database table cell_mappings in the API database. Here each cell is set up with a dedicated RabbitMQ connection string (called the transport URL) and a DB connection string. Our setup will have two cells – the special cell0 which is always present and a cell1. Therefore, our installation will required three databases.

Database Description
nova_api Nova API database
nova_cell0 Database for cell0
nova Database for cell1

In a deployment with more than one real cell, each cell will have its own Nova conductor service, in addition to a “super conductor” running across cells, as explained here and in diagram below which is part of the OpenStack documentation.

The Nova VNC proxy

Usually, you will use SSH to access your instances. However, sometimes, for instance if the SSHD is not coming up properly or the network configuration is broken, it would be very helpful to have a way to connect to the instance directly. For that purpose, OpenStack offers a VNC console access to running instances. Several VNC clients can be used, but the default is to use the noVNC browser based client embedded directly into the Horizon dashboard.

How exactly does this work? First, there is KVM. The KVM hypervisor has the option to export the content of the emulated graphics card of the instance as a VNC server. Obviously, this VNC server is running on the compute node on which the instance is located. The server for the first instance will listen on port 5900, the server for the second instance will listen on port 5901 and so forth. The server_listen configuration option determines the IP address to which the server will bind.

Now theoretically a VNC client like noVNC could connect directly to the VNC server. However, in most setups, the network interfaces of the compute node are not directly reachable from a browser in which the Horizon GUI is running. To solve this, Nova comes with a dedicated proxy for noVNC. This proxy is typically running on the controller node. The IP address and port number on which this proxy is listening can again be configured using the novncproxy_host and novncproxy_port configuration items. The default port is 6080.

When a client like the Horizon dashboard wants to get access to the proxy, it can use the Nova API path /servers/{server_id}/remote-consoles. This call will be forwarded to the Nova compute method get_vnc_console on the compute node. This method will return an URL, consisting of the base URL (which can again be configured using the novncproxy_base_url configuration item), and a token which is stored in the database as well. When the client uses this URL to connect to the proxy, the token is used to verify that the call is authorized.

The following diagram summarizes the process to connect to the VNC console of an instance from a browser running noVNC.


  1. Client uses Nova API /servers/{server_id}/remote-consoles to retrieve the URL of a proxy
  2. Nova API delegates the request to Nova Compute on the compute node
  3. Nova Compute assembles the URL, which points to the proxy, and creates a token, containing the ID of the instance as well as additional information, and the URL including the token is handed out to the client
  4. Client uses the URL to connect to the proxy
  5. The proxy validates the token, extracts the target compute node and port information, establishes the connection to the actual VNC server and starts to service the session

Installing Nova on the controller node

Armed with the knowledge from the previous discussions, we can now almost guess what steps we need to take in order to install Nova on the controller node.


First, we need to create the Nova databases – the Nova API database (nova_api), the Nova database for cell0 (nova_cell0) and the Nova database for our only real cell cell1 (nova). We also need to create a user which has the necessary grants on these databases.

Next, we create a user in Keystone representing the Nova service, register the Nova API service with Keystone and define endpoints.

We then install the Ubuntu packages corresponding to the four components that we will install on the controller node – the Nova API service, the Nova conductor, the Nova scheduler and the VNC proxy.

Finally, we adapt the configuration file /etc/nova/nova.conf. The first change is easy – we set the value my_ip to the IP of the controller management interface.

We then need to set up the networking part. To enforce the use of Neutron instead of the built-in legacy Nova networking, we set the configuration option use_neutron that we already discussed above to True. We also set the firewall driver to the No-OP driver nova.virt.firewall.NoopFirewallDriver.

The next information we need to provide is the connection information to RabbitMQ and the database. Recall that we need to configure two database connections, one for the API database and one for the database for cell 1 (Nova will automatically append _cell0 to this database name to obtain the database connection for cell 0).

We also need to provide some information that Nova needs to communicate with other Nova services. In the glance section, we need to define the URL to reach the Glance API server. In the neutron section, we need to set up the necessary credentials to connect to Neutron. Here we use a Keystone user neutron which we will set up when installing Neutron in a later post, and we also define some data needed for the metadata proxy that we will discuss in a later post. And finally Nova needs to connect to the Placement service for which we have to provide credentials as well, this time using the placement user created earlier.

To set up the communication with Keystone, we need to set the authorization strategy to Keystone (which will also select the PasteDeploy Pipeline containing the Keystone authtoken middleware) and provide the credentials that the authtoken middleware needs. And finally, we set the path that the Oslo concurrency library will use to create temporary files.

Once all this has been done, we need to prepare the database for use. As with the other services, we need to sync the database schema to the latest version which, in our case, will simply create the database schema from scratch. We also need to establish our cell 1 in the database using the nova-manage utility.

Installing Nova on the compute nodes

Let us now turn to the installation of Nova on the compute nodes. Recall that on the compute nodes, only nova-compute needs to be running. There is no database connection needed, so the only installation step is to install the nova-compute package and to adapt the configuration file.

The configuration file nova.conf on the compute node is very similar to the configuration file on the controller node, with a few differences.

As there is no database connection, we can comment out the DB connection string. In the light of our above discussion of the VNC proxy mechanism, we also need to provide some configuration items for the proxy mechanism.

  • The configuration item server_proxyclient_address is evaluated by the get_vnc_console of the compute driver and used to return the IP and port number on which the actual VNC server is running and can be reached from the controller node (this is the address to which the proxy will connect)
  • The server_listen configuration item is the IP address to which the KVM VNC server will bind on the compute host and should be reachable via the server_proxyclient_address from the controller node
  • the novncproxy_base_url is the URL which is handed out by the compute node for use by the proxy

Finally, there is a second configuration file nova-compute.conf specific to the compute nodes. This file determines the compute driver used (in our case, we use libvirt) and the virtualization type. With libvirt, we can either use KVM or QEMU. KVM will only work if the CPU supports virtualization (i.e. offers the VT-X extension for Intel or AMD-V for AMD). In our setup, the virtual machines will run on top of another virtual machine (Virtualbox), which does only pass through these features for AMD CPUs. We will therefore set the virtualization type to QEMU.

Finally, after installing Nova on all compute nodes, we need to run the nova-manage tool once more to make these nodes known and move them into the correct cells.

Run and verify the installation

Let us now run the installation and verify that is has succeeded. Here are the commands to bring up the environment and to obtain and execute the needed playbooks.

git clone https://www.github.com/christianb93/openstack-labs
cd openstack-labs/Lab4
vagrant up
ansible-playbook -i hosts.ini site.yaml

This will run a few minutes, depending on the network connection and the resources available on your machine. Once the installation completes, log into the controller and source the admin credentials.

vagrant ssh controller
source admin-openrc

First, we verify that all components are running on the controller. To do this, enter

systemctl | grep "nova"

The output should contain four lines, corresponding to the four services nova-api, nova-conductor, nova-scheduler and nova-novncproxy running on the controller node. Next, let us inspect the Nova database to see which compute services have registered with Nova.

openstack compute service list

The output should be similar to the sample output below, listing the scheduler, the conductor and two compute instances, corresponding to the two compute nodes that our installation has.

| ID | Binary         | Host       | Zone     | Status  | State | Updated At                 |
|  1 | nova-scheduler | controller | internal | enabled | up    | 2019-11-18T08:35:04.000000 |
|  6 | nova-conductor | controller | internal | enabled | up    | 2019-11-18T08:34:56.000000 |
|  7 | nova-compute   | compute1   | nova     | enabled | up    | 2019-11-18T08:34:56.000000 |
|  8 | nova-compute   | compute2   | nova     | enabled | up    | 2019-11-18T08:34:57.000000 |

Finally, the command sudo nova-status upgrade check will run some checks meant to be executed after an update that can be used to further verify the installation.

OpenStack Keystone – a deep-dive into tokens and policies

In the previous post, we have installed Keystone and provided an overview of its functionality. Today, we will dive in detail into a typical authorization handshake and take you through the Keystone source code to see how it works under the hood.

The overall workflow

Let us first take a look at the overall process before we start to dig into details. As an example, we will use the openstack CLI to list all existing projects. To better see what is going on behind the scenes, we run the openstack client with the -v command line switch which creates a bit more output than usual.

So, log into the controller node and run

source admin-demorc
openstack -vv project list

This will give a rather lengthy output, so let us focus on those lines that signal that a requests to the API is made. The first API is a GET request to the URL


This request will return a list of available API versions, marked with a status. In our case, the result indicates that the stable version is version v3. Next, the clients submits a POST request to the URL


If we look up this API endpoint in the Keystone Identity API reference, we find that this method is used to create and return a token. When making this request, the client will use the data provided in the environment variables set by our admin-openrc script to authenticate with Keystone, and Keystone will assemble and return a token.

The returned data has actually two parts. First, there is the actual Fernet token, which is provided in the HTTP header instead of the HTTP body. Second, there is a token structure which is returned in the response body. This structure contains the user that owns the token, the date when the token expires and the data when the token has been issued, the project for which the token is valid (for a project scoped token) and the roles that the user has for this project. In addition, it contains a service catalog. Here is an example, where I have collapsed the catalog part for better readibility.


Finally, at the bottom of the output, we see that the actual API call to get a list of projects is made, using our newly acquired token and the endpoint


So our overall flow looks like this, ignoring some client internal processes like selecting the endpoint (and recovering from failed authorizations, see the last section of this post).


Let us now go through these requests step by step and see how tokens and policies interact.

Creating a token

When we submit the API request to create a token, we end up in the method post in the AuthTokenResource class defined in keystone/api/auth.py. Here we find the code.

          token, include_catalog=include_catalog

The method authenticate_for_token is defined in keystone/api/_shared/authentication.py. Here, we first authenticate the user, using the auth data provided in the request, in our case this is username, password, domain and project as defined in admin-openrc. Then, the actual token generation is triggered by the call


Here we see an additional layer of indirection in action – the ProviderAPIRegistry as defined in keystone/common/provider_api.py. Without getting into details, here is the idea of this approach which is used in a similar way in other OpenStack services.

Keystone itself consists of several components, each of which provide different methods (aka internal APIs). There is, for instance, the code in keystone/identity handling the core identity features, the code in keystone/assignment handling role assigments, the code in keystone/token handling tokens and so forth. Each of these components contains a class typically called Manager which is derived from the base class Manager in keystone/common/manager.py.

When such a class is instantiated, it registers its methods with the static instance ProviderAPI of the class ProviderAPIRegistry defined in keystone/common/provider_api.py. Technically, registering means that the object is added as attribute to the ProviderAPI object. For the token API, for instance, the the Manager class in keystone/token/provider.py registers itself using the name token_provider_api, so that it is added to the provider registry object as the attribute token_provider_api. Thus a method XXX of this manager class can now be invoked using

from keystone.common import provider_api

or by

from keystone.common import provider_api
PROVIDERS = provider_api.ProviderAPIs

This is exactly what happens here, and this is why the above line will actually take us to the method issue_token of the Manager class defined in keystone/token/provider.py. Here, we build and populate an instance of the Token class defined in keystone/models/token_model.py and populate it with the available data. We then populate the field token.id where we put the actual token, i.e. the encoded string that will end up in the HTTP header of future requests. This is done in the line

token_id, issued_at =

which calls the actual token provider, for instance the Fernet provider. For a Fernet token, this will eventually end up in the line


calling the token formatter which will do the low level work of actually creating and encrypting the token. The token ID will then be added to the token data structure, along with the creation time (a process known as minting) before the token is returned up the call chain.

At this point, the token does not yet contain any role information and no service catalog. To enrich the token by this information, it is rendered by calling render_token defined in keystone/common/render_token.py. Here, a dictionary is built and populated with data including information on role, scope and endpoints.

Note that the role information in the token is dynamic, in fact, in the Token class, the property decoration is used to divert access to the roles property to a method call. Here, we receive the scope information and select and return only these roles which are bound to the respective domain or project if the token is domain scoped or project scoped. When we render the token, we access the roles attribute and retrieve the role information from the method bound to it.

Within this method, an additional piece of logic is implemented which is relevant for the later authorization process. Keystone allows an administrator to define a so-called admin project. Any user who authenticates with a token scoped to this special project is called a cloud admin, a special role which can be referenced in policies. When rendering the token, the project to which the token refers (if it its project scoped) is compared to this special project, and if they match, an additional attribute is_admin_project is added to the token dictionary.

Finally, back in the post method, we build the response body from the token structure and add the actual token to the response header in the line

response.headers['X-Subject-Token'] = token.id

Here is a graphical overview on the process as we have discussed it so far.


The key learnings from the code that we can deduce so far are

  • The actual Fernet token contains a minimum of information, like the user for whom the token is issued and – depending on the scope – the Ids of the project or domain to which the token is scoped
  • When a token is requested, the actual Fernet token (the token ID) is returned in the response header, and an enriched version of the token is added in the response body
  • This enrichment is done dynamically using the Keystone database, and the enrichment will only add the roles to the token data that are relevant for the token scope
  • There is a special admin project, and a token scoped to this project implies the cloud administrator role

Using the token to authorize a request

Let us now see what happens when a client uses this token to actually make a request to the API – in our example, this happens when the openstack client makes the actual API call to the endpoint http://controller:5000/v3/projects.

Before this request is actually dispatched to the business logic, it passes through the WSGI middleware. Here, more precisely in the class method AuthContextMiddleware.process_request defined in the file keystone/server/flask/request_processing/middleware/auth_context.py, the token is retrieved from the field X-Auth-Token in the HTTP header of the request (here we also put the marker field is_admin into the context when an admin_token is defined in the configuration and equal to the actual token). Then the process_request method of the superclass is called which invokes fetch_token (of the derived class!). Here, the validate_token method of the token provider is called which performs the actual token validation. Finally, the token is again rendered as above, thereby adding the relevant roles dynamically, and put as token_reference in the request context (this happens in the method fill_context respectively _keystone_specific_values of the middleware class).

At this point, it is instructive to take a closer look at the method that actually selects the relevant roles – the method roles of the token class defined in keystone/models/token_model.py. If you follow the call chain, you will find that, to obtain for instance all project roles, the internal API of the assignment component is used. This API returns the effective roles of the user, i.e. roles that include those roles that the user has due to group membership and roles that are inherited, for instance from the domain-level to the project level or down a tree of subprojects. Effective roles also include implied roles. It is important to understand (and reasonable) that it is the effective roles that enter a token and are therefore evaluated during the authorization process.

Once the entire chain of middleware has been processed, we finally reach the method _list_projects in keystone/api/projects.py. Close to the start of this method, the enforce_call method of the class RBACEnforcer in keystone/common/rbac_enforcer/enforcer.py. When making this call, the action identity:list_projects is passed as a parameter. In addition, a parameter called target is passed, a dictionary which contains some information on the objects to which the API request refers. In our example, as long as we do not specify any filters, this dictionary will be empty. If, however, we specify a domain ID as a filter, it will contain the ID of this domain. As we will see later, this allows us to define policies that allow a user to see projects in a specific domain, but not globally.

The enforce_call method will first make a couple of validations before it checks whether the request context contains the attribute is_admin. If yes, the token validation is skipped and the request is always allowed- this is to support the ADMIN_TOKEN bootstrapping mechanism. Then, close to the bottom of the method, we retrieve the request context, instantiate a new object and calls its _enforce method which essentially delegates the call to the Oslo policy rules engine and its Enforcer class, more precisely to the enforce method of this class.

As input, this method receives the action (identity:list_projects in our case), the target of the action, and the credentials, in the form of the Oslo request context, and the processing of the rules starts.


Again, let us quickly summarize what the key take aways from this discussion should be – these points actually apply to most other OpenStack services as well.

  • When a request is received, the WSGI middleware is responsible for validating the token, retrieving the additional information like role data and placing it in the request context
  • Again, only those roles are stored in the context which the user has for the scope of the token (i.e. on project level for project-scoped token, on the domain level for domain-scoped token and on the system level for system-scoped token)
  • The roles in the token are effective roles, i.e. taking inheritance into account
  • The actual check against the policy is done by the Oslo policy rule engine

The Oslo policy rule engine

Before getting into the details of the rule engine, let us quickly summarize what data the rule engine has at its disposal. First, we have seen that it receives the action, which is simply a string, identity:list_projects in our case. Then, it has information on the target, which, generally speaking, is the object on which the action should be performed (this is less relevant in our example, but becomes important when we modify data). Finally, it has the credentials, including the token and role information which was part of the token and is now stored in the request context which the rule engine receives.

The engine will now run this data through all rules which are defined in the policy. Within the engine, a rule (or check) is simply an object with a __call__ method, so that they can be treated and invoked like a function. In the module _checks.py, a few basic checks are defined. There are, for instance, simple checks that always return true or false, and their checks like AndCheck and OrCheck which can be used to build more complex rules from basic building blocks. And there are other checks like the RoleCheck which checks whether a certain role is present in the credentials, which, as we know from the discussion above, is the case if the token use to authorize contains this role, i..e if the user who is owning the token has this role with respect to the scope of the token.

Where do the rules come from that are processed? First, note that the parameter rule to the enforce method does, in our case at least, contain a string, namely the action (identity:list_projects). To load the actual rules, the method enforce will first call load_rules which loads rules from a policy file, at which we will take a look in a second. Loading the policy file will create a new instance of the Rules class, which is a container class to hold a set of rules.

After loading all rules, the following line in enforce identifies the actual rule to be processed.

to_check = self.rules[rule]

This looks a bit confusing, but recall that here, rule actually contains the action identity:list_projects, so we look up the rule associated with this action. Finally, the actual rule checking is done by invoking the _check methods of the _checks module.

Let us now take a closer look at the policy files themselves. These files are typically located in the /etc/XXX subdirectory, where XXX is the OpenStack component in question. Samples files are maintained by the OpenStack team. To see an example, let us take a look at the sample policy file for Keystone which was distributed with the Rocky release. Here, we find a line

identity:list_projects": "rule:cloud_admin or rule:admin_and_matching_domain_id",

This file is in JSON syntax, and this line defines a dictionary entry with the action identity:list_projects and the rule rule:cloud_admin or rule:admin_and_matching_domain_id. The full syntax of the rule is explained nicely here or in the comments at the start of policy.py. In essence, in our example, the rule says that the action is allowed if either the user is a cloud administrator (i.e. an administrator the the special admin project or admin domain which can be configured in the Keystone configuration file) or is an admin for the requested domain.

When I first looked at the policy files in my test installation, however, which uses the Stein release, I was more than confused. Here, the rule for the action identity:list_projects is as follows.

"identity:list_projects": "rule:identity:list_projects"

Here we define a rule called identity:list_projects for the action with the same name, but where is this rule defined?

The answer is that there is a second source of rules, namely software defined rules (which the OpenStack documentation calls policy-in-code) which are registered when the enforcer object is created. This happens in the _enforcer method of the RBACEnforcer when a new enforcer is created. Here we call register_rules which creates a list of rules by calling the function list_rules define in the keystone/common/policies module which returns a list of sofware-defined rules, and registers these rules with the Oslo policy enforcer. The rule we are looking for, for instance, is defined in keystone/common/policies/project.py and looks as follows.

        name=base.IDENTITY % 'list_projects',
        scope_types=['system', 'domain'],
        description='List projects.',
        operations=[{'path': '/v3/projects',
                     'method': 'GET'}],

Here we see that the actual rule (in the attribute check_str) has now changed compared to the Rocky release, and allows access if either the user has the reader role on the system level or has the reader role for the requested domain. In addition, there is a deprecated rule for backwards compatibility which is OR’ed with the actual rule. So the rule that really gets evaluated in our case is

(role:reader and system_scope:all) or (role:reader and domain_id:%(target.domain_id)s) or rule:admin_required

In our case, asking OpenStack to list all projects, there is a further piece of magic involved. This becomes visible if you try a different user. For instance, we can create a new project demo with a user demo who has the reader role for this project. If you now run the OpenStack client again to get all projects, you will only see those projects for which the user has a role. This is again a bit confusing, because by what we have discussed above, the authorization should fail.

In fact, it does, but the client is smart enough to have a plan B. If you look at the output of the OpenStack CLI with the -vvv flag, you will that a first request is made to list all projects which fails, as expected. The client then tries a second request, this time using the URL /users//projects to get all projects for that specific user. This call ends up in the method get of the class UserProjectsResource defined in keystone/api/users.py which will list all projects for which a specifc user has a role. Here, a call is made with a different action called identity:list_user_projects, and the rule for this action allows access if the user making the request (i.e. the user data from the token) is equal to target user (i.e. the user ID specified in the request). Thus this final call succeeds.

These examples are hopefully sufficient to demonstrate that policies can be a tricky topic. It is actually very instructive to add debugging output to the involved classes (the Python source code is on the controller node in /usr/lib/python3/dist-packages, do not forget to restart Apache if you have made changes to the code) to print out the various structures and trace the flow through the code. Happy hacking!

Automating provisioning with Ansible – working with inventories

So far, we have used Ansible inventories more or less as a simple list of nodes. But there is much more you can do with inventories – you can assign hosts to groups, build hierarchies of groups, use dynamic inventories and assign variables. In this post, we will look at some of these options.

Groups in inventories

Recall that our simple inventory file (when using the Vagrant setup demonstrated in an earlier post) looks as follows (say this is saved as hosts.ini in the current working directory)


We have two servers and one group called servers. Of course, we could have more than one group. Suppose, for instance, that we change our file as follows.


When running Ansible, we can now specify either the group web or db, for instance

ansible -u vagrant \
        -i hosts.ini \
        --private-key ~/vagrant/vagrant_key \
        -m ping web

then Ansible will only operate on the hosts in the group web. If you want to run Ansible for all hosts, you can still use the group “all”.

Hosts can be in more than one group. For instance, when we change the file as follows


then the host will be on both groups, db and web. If you run Ansible for the db group, it will operate on both hosts, if you run it for the web group, it will operate only on Of course, if you use the pseudo-group all, then Ansible will touch this host only once, even though it appears twice in the configuration file.

It is also possible to define groups as the union of other groups. To create a group that contains all servers, we could for instance do something like


Finally, as we have seen in the last post, we can define variables directly in the inventory file. We have already seen this on the level of individual hosts, but it is also possible on the level of groups. Let us take a look at the following example.


Here we define two variables, a and b. The first variable is defined for all servers in the db group. The second variable is defined for all servers in the servers group. We can print out the values of these variables using the debug Ansible module

ansible -i hosts.ini \
        --private-key ~/vagrant/vagrant_key  \
        -u vagrant -m debug -a "var=a,b" all

which will give us the following output | SUCCESS => {
    "a,b": "(5, 10)"
} | SUCCESS => {
    "a,b": "(5, 10)"

To better understand the structure of an inventory file, it is often useful to create a YAML representation of the file, which is better suited to visualize the hierarchical structure. For that purpose, we can use the ansible-inventory command line tool. The command

ansible-inventory -i hosts.ini --list -y

will create the following YAML representation of our inventory.

              a: 5
              b: 10
              a: 5
              b: 10
    ungrouped: {}

which nicely demonstrates that we have effectively built a tree, with the implicit group all at the top, the group servers as the only descendant and the groups db and web as children of this group. In addition, there is a group ungrouped which lists all hosts which are not explicitly assigned to a group. If you omit the -y flag, you will get a JSON representation which lists the variables in a separate dictionary _meta.

Group and host variables in separate files

Ansible also offers you the option to maintain group and host variables in separate files, which can again be useful if you need to deal with different environments. By convention, Ansible will look for variables defined on group level in a directory group_vars that needs to be a subdirectory of the directory in which the inventory file is located. Thus, if your inventory file is called /home/user/somedir/hosts.ini, you would have to create a directory /home/user/somedir/group_vars. Inside this directory, place a YAML file containing a dictionary with key-value pairs which will then be used as variable definitions and whose name is that of the group. In our case, if we wanted to define a variable c for all hosts in the db group, we would create a file group_vars/db.yaml with the following content

  c: 15

We can again check that this has worked by printing the variables a, b and c.

ansible -i hosts.ini \
        --private-key ~/vagrant/vagrant_key  \
        -u vagrant -m debug -a "var=a,b,c" db

Similary, you could create a directory host_vars and place a file there to define variables for a specific host – again, the file name should match the host name. It is also possible to merge several files – see the corresponding page in the documentation for all the details.

Dynamic inventories

So far, our inventories have been static, i.e. we prepare them before we run Ansible, and they are unchanged while the playbook is running. This is nice in a classical setup where you have a certain set of machines you need to manage, and make changes only if a new machine is physically added to the data center or removed. In a cloud environment, however, the setup tends to be much more dynamic. To deal with this situation, Ansible offers different approaches to manage dynamic inventories that change at runtime.

The first approach is to use inventory scripts. An inventory script is simply a script (an executable, which can be written in any programming language) that creates an inventory in JSON format, similarly to what ansible-inventory does. When you provide such an executable using the -i switch, Ansible will invoke the inventory script and use the output as inventory.

Inventory scripts are invoked by Ansible in two modes. When Ansible needs a full list of all hosts and groups, it will add the switch –list. When Ansible needs details on a specific host, it will pass the switch –host and, in addition, the hostname as an argument.

Let us take a look at an example to see how this works. Ansible comes bundled with a large number of inventory scripts. Let us play with the script for Vagrant. After downloading the script to your working directory and installing the required module paramiko using pip, you can run the script as follows.

python3 vagrant.py --list

This will give you an output similar to the following (I have piped this through jq to improve readability)

  "vagrant": [
  "_meta": {
    "hostvars": {
      "boxA": {
        "ansible_user": "vagrant",
        "ansible_host": "",
        "ansible_ssh_private_key_file": "/home/chr/vagrant/vagrant_key",
        "ansible_port": "2222"
      "boxB": {
        "ansible_user": "vagrant",
        "ansible_host": "",
        "ansible_ssh_private_key_file": "/home/chr/vagrant/vagrant_key",
        "ansible_port": "2200"

We see that the script has created a group vagrant with two hosts, using the names in your Vagrantfile. For each host, it has, in addition, declared some variables, like the private key file, the SSH user and the IP address and port to use for SSH.

To use this dynamically created inventory with ansible, we first have to make the Python script executable, using chmod 700 vagrant.py. Then, we can simply invoke Ansible pointing to the script with the -i switch.

ansible -i vagrant.py -m ping all

Note that we do not have to use the switches –private-key and -u as this information is already present in the inventory.

It is instructive to look at the (rather short) script. Doing this, you will see that behind the scenes, the script simply invokes vagrant status and vagrant ssh-config. This implies, however, that the script will only detect your running instances properly if you execute it – and thus Ansible – in the directory in which your Vagrantfile is living and in which you issued vagrant up to bring up the machines!.

In practice, dynamic inventories are mostly used with public cloud providers. Ansible comes with inventory scripts for virtually every cloud provider you can imagine. As an example, let us try out the script for EC2.

First, there is some setup required. Download the inventory script ec2.py and, placing it in the same directory, the configuration file ec2.ini. Then, make the script executable.

A short look at the script will show you that it uses the Boto library to access the AWS API. So you need Boto installed, and you need to make sure that Boto has access to your AWS credentials, for instance because you have a working AWS CLI configuration (see my previous post on using Python with AWS for more details on this). As explained in the comments in the script, you can also use environment variables to provide your AWS credentials (which are stored in ~/.aws/credentials when you have set up the AWS CLI).

Next, bring up some instances on EC2 and then run the script using

./ec2.py --list

Note that the script uses a cache to avoid having to repeat time-consuming API calls. The expiration time is provided as a parameter in the ec2.ini file. The default is 5 minutes, which can be a bit too long when playing around, so I recommend to change this to e.g. 30 seconds.

Even if you have only one or two machines running, the output that the script produces is significantly more complex than the output of the Vagrant dynamic inventory script. The reason for this is that, instead of just listing the hosts, the EC2 script will group the hosts according to certain criteria (that again can be selected in ec2.ini), for instance availability zone, region, AMI, platform, VPC and so forth. This allows you to target for instance all Linux boxes, all machines running in a specific data center and so forth. If you tag your machines, you will also find that the script groups the hosts by tags. This is very useful and allows you to differentiate between different types of machines (e.g. database servers, web servers), or different stages. The script also attaches certain characteristics as variables to each host.

In addition to inventory scripts, there is a second mechanism to dynamically change an inventory – there is actually an Ansible module which maintains the copy of the inventory that Ansible builds in memory at startup (and thus makes changes that are only valid for this run), the add_host module. This is mostly used when we use Ansible to actually bring up hosts.

To demonstrate this, we will use a slightly different setup as we have used so far. Recall that Ansible can work with any host on which Python is installed and which can be accessed via SSH. Thus, instead of spinning up a virtual machine, we can as well bring up a container and use that to simulate a host. To spin up a container, we can use the Ansible module docker_container that we execute on the control machine, i.e. with the pseudo-group localhost, which is present even if the inventory is empty. After we have created the container, we add the container dynamically to the inventory and can then use it for the remainder of the playbook.

To realize this setup, the first thing which we need is a container image with a running SSH daemon. As base image, we can use the latest version of the official Ubuntu image for Ubuntu bionic. I have created a Dockerfile which, based on the Ubuntu image, installs the OpenSSH server and sudo, creates a user ansible, adds the user to the sudoer group and imports a key pair which is assumed to exist in the directory.

Once this image has been built, we can use the following playbook to bring up a container, dynamically add it to the inventory and run ping on it to test that the setup has worked. Note that this requires that the docker Python module is installed on the control host.

  # This is our first play - it will bring up a new Docker container and register it with
  # the inventory
  - name: Bring up a Docker container that we will use as our host and build a dynamic inventory
    hosts: localhost
    # We first use the docker_container module to start our container. This of course assumes that you
    # have built the image node:latest according to the Dockerfile which is distributed along with
    # this script. We use bridge networking, but do not expose port 22
    - name: Run Docker container
        auto_remove: yes
        detach: yes
        name: myTestNode
        image: node:latest
        network_mode: bridge
        state: started
      register: dockerData
    # As we have chosen not to export the SSH port, we will have to figure out the IP of the container just created. We can
    # extract this from the return value of the docker run command
    - name: Extract IP address from docker dockerData
        ipAddress:  "{{ dockerData['ansible_facts']['docker_container']['NetworkSettings']['IPAddress'] }}"
    # Now we add the new host to the inventory, as part of a new group docker_nodes
    # This inventory is then valid for the remainder of the playbook execution
    - name: Add new host to inventory
        hostname: myTestNode
        ansible_ssh_host: "{{ ipAddress }}"
        ansible_ssh_user: ansible
        ansible_ssh_private_key_file: "./ansible"
        groups: docker_nodes
  # Our second play. We now ping our host using the newly created inventory
  - name: Ping host
    hosts: docker_nodes
    become: yes
    - name: Ping  host

If you want to run this example, you can download all the required files from my GitHub repository, build the required Docker image, generate the key and run the example as follows.

pip install docker
git clone https://www.github.com/christianb93/ansible-samples
cd ansible-samples/partV
ansible-playbook docker.yaml

This is nice, and you might want to play with this to create additional containers for more advanced test setups. When you do this, however, you will soon realize that it would be very beneficial to be able to execute one task several times, i.e. to use loops. Time to look at control structures in Ansible, which we do in the next post.

Database programming with Python

Most of us probably started to use Python as a scripting language to quickly create working code for e.g. numerical and scientific calculations. But, of course, Python is much more than that. If you intend to use Python for more traditional applications, you will sooner or later need to interface with a database. Today, we will see how the Python DB-API can be used for that purpose.

The Python DB-API

Of course there are many databases that can be used with Python – various modules exist like mysql-connector for MySQL, Psycopg for PostgreSQL or sqlite3 for the embedded database SQLite. Obviously, all these drivers differ slightly, but it turns out that they follow a common design pattern which is known as the Python DB-API and described in PEP-249.

If you know Java, you have probably heard of JDBC, which is a standard to access relational databases from Java applications. The Python DB-API is a bit different – it is not a library, but it is a set of design patterns that drivers are supposed to follow.

When using JDBC, there is, for instance, an interface class called java.sql.Connection. Whatever driver and whatever database you use, this interface will always be the same for compliant drivers. For Python, the situation is a bit different. If you use SQLite, then there will be a class sqlite3.Connection. If you use MySQL, there will be a class with the slightly cryptic name mysql.connector.connection_cext.CMySQLConnection. These are different classes, and they do not inherit from a common superclass or interface. Still, the Python DB-ABI dictates that these classes all have a common set of methods and attributes to make switching between different databases as easy as possible.

Before we start to look at an example, it is helpful to understand the basic objects of the DB-API class model. First, there is a connection, which, of course, represents a connection to a database. How to initially obtain a connection is specific for the database used, and we will see some examples further below.

When a connection is obtained, it is active or open. It can be closed by executing its close() method, which will render the connection unusable and roll back any uncommitted changes. A connection also has commit() and rollback() to support transaction control (however, the specification leaves the implementation some freedom with respect to features like auto-commit and isolation levels, and does not even mandate that implementations support transactions at all). The specification is also not fully clear on how to start transactions, it seems that most driver automatically start a new transaction if a statement is executed and no transaction is in progress.

To actually execute statements and fetch results, a cursor is used. Again, it is implementation specific whether this is implemented as a real, server-side cursor or as a local cache within the client. Cursors are created by calling the connections cursor method, and is only valid within the context of a connection (i.e. if a connection is closed, all cursors obtained from it will become unusable). Once we have a cursor, we can execute statements on it using either its execute method (which executes a statement with one set of parameters) or its executemany method, which executes a statement several times with differents sets of parameters. How exactly parameters are referenced in a statement is implementation specific, and we will see some examples below.

The statement executed via a cursor can be an update or a select statement. In the latter case, we can access its results either via the fetchall() method which returns all rows of the result set as a sequence, or the fetchone() method which returns the next row. The API also defines a method fetchmany() which returns a specified number of rows at a time.

Using Python with SQLite

After this general introduction, let us now see how the API works in practice. For the sake of simplicity, we will first use the embedded database SQLite.

One of the nice things about SQLite3 is that the module that you will need to use it in a Python program – slqite3 is part of the Python standard library and therefore part of any Python standard installation. So there is no separate installation needed to use it, and we can start to play with it right away.

To use a databases, the first thing we have to do is to establish a connection to it. As mentioned above, the process to do this is specific for the SQLite database. As SQLite identifies a database by the file where the data is stored, we basically only have to provide the file name to the driver to either establish a connection to an existing database or to create a new database (which will happen automatically if the file we refer to does not exist). Here is a code snippet that will establish a connection to a database stored in your home directory in the file.

import sqlite3 as dblib
from pathlib import Path

home = str(Path.home())
db = home + "/example.db"
c = dblib.connect(db)

Now let us create a table. This sounds simple enough, just execute a CREATE TABLE statement. However, this will raise an error if the table already exists. Of course, we could use the IF NOT EXISTS clause to achieve this, but for the sake of demonstration let us choose a different approach.

We first try to drop the table. If the table does not exist, this will raise an exception which we can catch, if the table exists it will be removed. In any case, we can now proceed to create the table. To execute these statements, we need a cursor as explained above which we can get from the newly created connection. So our code looks as follows (of course, in productive code, you would put more effort into making sure that the reason for the exception is what you expect):

cursor = c.cursor()
    cursor.execute('''DROP TABLE books''')
except dblib.Error as err:
cursor.execute('''CREATE TABLE books
             (author text, title text)''')

Next, we want to insert data into our database. Suppose you wanted to insert a book called “Moby Dick” written by Herman Melville. So we want to execute an SQL statement like

  (author, title)
  ('Herman Melville', 'Moby  Dick')

Of course, we could simply assemble this SQL statement in Python and pass it to the execute method of our cursor directly. But this is not the recommended way of doing things. Instead, one generally uses SQL host variables. These are variable parts of a statement which are declared inside the statement and, at runtime, are bound to Python variables. This is generally advisable, for two reasons. First, using the naive approach of manually assembling SQL statements and parameter values makes your program more vulnerable to SQL injection. Second, using host variables allows the driver to prepare the statement only once (i.e. to parse and tokenize the statement and prepare an execution plan) even if you insert multiple rows, thus increasing performance.

To use host variables with SQLite, you would execute a statement in which the values you want to insert are replaced by placeholders, using question marks.

  (author, title)
  (?, ?)

When you call the execute method of a cursor, you pass this SQL string along with the values for the placeholders, and the database driver will then replace by their actual values, a process called binding.

However, when coding this, there is a subtlety we need to observe. The special characters used for the placeholders are not specified by the DB-API Python standard and might therefore be driver specific. Fortunately, even though the standard does not define the placeholders, it defines a way to obtain them that is the same for all drivers.

Specifically, the standard specifies several placeholder styles which can be queried by accessing a global variable. In our example, we use only two of these styles, called pyformat and question mark. Pyformat, the style used by e.g. MySQL, uses Python-like placeholders like %s, whereas question mark uses – well, you might have guessed that – question marks as SQLite does it. So to keep our code reusable, we first retrieve the placeholder style, translate this into the actual placeholder (a dictionary comes in handy here) and then assemble our statement.

knownMarkers = {'pyformat' : '%s', 'qmark' : '?'}
marker =  knownMarkers[dblib.paramstyle]

examples = [('Dickens', 'David Copperfield'),
            ('Melville', 'Moby Dick')]

sqlString =  '''INSERT INTO books
                       (author, title)
                 VALUES ''' + "(" + marker + "," + marker + ")"
cursor.executemany(sqlString, examples)

Note that we explicitly commit at the end of the statement, as closing the connection would otherwise rollback our changes. We also use the executemany method which performs several insertions, binding host variables from a sequence of value tuples.

Reading from our database is now very simple. Again, we first need to get a connection and a cursor. Once we have that, we assemble a SELECT statement and execute it. We then use the fetch or fetchall method to retrieve the result set and can iterate through the result set as usual.

# Get all rows from table books
statement = "SELECT author, title from books;"
rows = cursor.fetchall()

# Iterate through result set
for row in rows:
    print ("Author: " + row[0])
    print ("Title:  " + row[1])

As we can see, the order of the columns in the individual tuples within the result set is as in our SELECT statement. However, I have not found an explicit guarantee for this behaviour in the specification. If we want to make sure that we get the columns right, we can use the cursor.description attribute which the standard mandates and which contains a sequence of tuples, wherein each tuple represents a column in the result and contains a set of attributes like name and type.

We have now seen how we can use the DB-API to create database tables, to insert rows and to retrieve results. I have assembled the code that we have discussed in two scripts, one to prepare a database and one to read from it, which you can find here.

Using Python with MySQL

To illustrate the usage of the DB-API for different databases, let us now try to do the same thing with MySQL. Of course, there is some more setup involved in this case. First, the driver that we will use – the mysql-connector – is not part of the standard Python library and needs to be installed separately. Of course, this is done using

pip3 install mysql-connector-python

We will also need the MySQL command line client mysql and the admin client mysqladmin. On an Ubuntu system, you can install them using

sudo apt-get install  mysql-client

As we intend to run the MySQL server in a docker container, you also need a Docker engine installed on your system. If you do not have that yet, the easiest way might be to install it using snap.

snap install docker

Next, we need to bring up an instance of the MySQL database using docker run, give it some time to come up, create a database called books and grant access rights to a new user that we will use to access our database from within Python. Here is a short script that will perform all these update scripts and that you can also find here.

# Start docker container
docker run -d --name some-mysql \
           --rm \
           -p 3306:3306 \
           -e MYSQL_ROOT_PASSWORD=my-secret-root-pw \
           -e MYSQL_USER=chr \
           -e MYSQL_PASSWORD=my-secret-pw \

# Give the database some time to come up
mysqladmin --user=root --password=my-secret-root-pw \
           --host=''  \
            --silent status

while [ $? == 1  ]
  sleep 5
  mysqladmin --user=root --password=my-secret-root-pw \
             --host=''  \
             --silent status

# Create database books and grant rights to user chr
mysqladmin --user=root \
           --password=my-secret-root-pw \
            --host=''  create books
echo 'grant all on books.* to 'chr';' \
              | mysql --user=root \
                      --password=my-secret-root-pw \
                      --host='' books

BE CAREFUL: if you have read my post on Docker networking, you will know that using the -p switch implies that we can reach the database from every machine in the local network – so if you are not in a fully trusted environment, you definitely want to change this script and the program that will follow to use real passwords, not the ones used in this post.

Let us now see how we can access our newly created database from Python. Thanks to the DB-API, most of the code that we will have to use is actually the same as in the case of SQLite. Basically, there are two differences. First, the module that we have to import is of course different, but we can again use the alias dblib to be able to use the remainder of the code without changes.

import mysql.connector as dblib

The second change that we need to make is the way how we obtain the initial database connection, as connecting to a MySQL database requires additional parameters like credentials. Here is the respective statement

c = dblib.connect(user='chr', password='my-secret-pw',

All the remainder of the code can now be taken over from the SQLite case unchanged. On my GitHub page, I have created a separate directory holding the code, and if you compare this code to the code that we used earlier for SQLite, you will see that it is in fact only those two parts of the code that are different.

Of course, there is much more that could be said about database programming with Python. In reality, each database behaves differently, and you will have to deal with specifics like auto-commit, generated fields, different types, performance, result set sizes and so forth. However, the basics are the same thanks to the DB-API, and I hope that I could give you a good starting point for further investigations.

Managing traffic with Kubernetes ingress controllers

In one of the previous posts, we have learned how to expose arbitrary ports to the outside world using services and load balancers. However, we also found that this is not very efficient – in the worst case, the number of load balancers we need equals the number of services.

Specifically for HTTP/HTTPS traffic, there is a different option available – an ingress rule.

Ingress rules and ingress controllers

An ingress rule is a Kubernetes resource that defines a kind of routing on the HTTP(S) path level. To illustrate this, assume that you have two services running, call them svcA and svcB. Assume further that both services work with HTTP as the underlying protocol and are listening on port 80. If you expose these services naively as in the previous post, you will need two load balancers, call them LB1 and LB2. Then, to reach svcA, you would use the URL


and to access svcB, you would use


The idea of an ingress is to have only one load balancer, with one DNS entry, and to use the path name to route traffic to our services. So with an ingress, we would be able to reach svcA under the URL


and the second service under the URL


With this approach, you have several advantages. First, you only need one load balancer that clients can use for all services. Second, the path name is related to the service that you invoke, which follows best practises and makes coding against your services much easier. Finally, you can easily add new services and remove old services without a need to change DNS names.

In Kubernetes, two components are required to make this work. First, there are ingress rules. These are Kubernetes resources that contain rules which specify how to route incoming requests based on their path (or even hostname). Second, there are ingress controllers. These are controllers which are not part of the Kubernetes core software, but need to be installed on top. These controllers will work with the ingress rules to manage the actual routing. So before trying this out, we need to install an ingress controller in our cluster.

Installing an ingress controller

On EKS, there are several options for an ingress controller. One possible choice is the nginx ingress controller. This is the controller that we will use for our examples. In addition, AWS has created their own controller called AWS ALB controller that you could use as an alternative – I have not yet looked at this in detail, though.

So let us see how we can install the nginx ingress controller in our cluster. Fortunately, there is a very clear installation instruction which tells us that to run the install, we simply have to execute a number of manifest files, as you would expect for a Kubernetes application. If you have cloned my repository and brought up your cluster using the up.sh script, you are done – this script will set up the controller automatically. If not, here are the commands to do this manually.

$ kubectl apply -f https://raw.githubusercontent.com/kubernetes/ingress-nginx/master/deploy/mandatory.yaml
$ kubectl apply -f https://raw.githubusercontent.com/kubernetes/ingress-nginx/master/deploy/provider/aws/service-l4.yaml
$ kubectl apply -f https://raw.githubusercontent.com/kubernetes/ingress-nginx/master/deploy/provider/aws/patch-configmap-l4.yaml

Let us go through this and see what is going on. The first file (mandatory.yaml) will set up several config maps, a service account and a new role and connect the service account with the newly created role. It then starts a deployment to bring up one instance of the nginx controller itself. You can see this pod running if you do a

kubectl get pods --namespace ingress-nginx

The first AWS specific manifest file service-l4.yaml will establish a Kubernetes service of type LoadBalancer. This will create an elastic load balancer in your AWS VPC. Traffic on the ports 80 and 443 is directed to the nginx controller.

kubectl get svc --namespace ingress-nginx

Finally, the second AWS specific file will update the config map that stores the nginx configuration and set the parameter use-proxy-protocol to True.

To verify that the installation has worked, you can use aws elb describe-load-balancers to verify that a new load balancer has been created and curl the DNS name provided by

kubectl get svc ingress-nginx --namespace ingress-nginx

from your local machine. This will still give you an error as we have not yet defined ingress rule, but show that the ingress controller is up and running.

Setting up and testing an ingress rule

Having our ingress controller in place, we can now set up ingress rules. To have a toy example at hand, let us first apply a manifest file that will

  • Add a deployment of two instances of the Apache httpd
  • Install a service httpd-service accepting traffic for these two pods on port 8080
  • Add a deployment of two instances of Tomcat
  • Create a service tomcat-service listening on port 8080 and directing traffic to these two pods

You can either download the file here or directly use the URL with kubectl

$ kubectl apply -f https://raw.githubusercontent.com/christianb93/Kubernetes/master/network/ingress-prep.yaml
deployment.apps/httpd created
deployment.apps/tomcat created
service/httpd-service created
service/tomcat-service created

When all pods are up and running, you can again spawn a shell in one of the pods and use the DNS name entries created by Kubernetes to verify that the services are available from within the pods.

$ pod=$(kubectl get pods --output \
$ kubectl exec -it $pod "/bin/bash"
bash-4.4# apk add curl
bash-4.4# curl tomcat-service:8080
bash-4.4# curl httpd-service:8080

Let us now define an ingress rule which directs requests to the path /httpd to our httpd service and correspondingly requests to /tomcat to the Tomcat service. Here is the manifest file for this rule.

apiVersion: extensions/v1beta1
kind: Ingress
  name: test-ingress
    nginx.ingress.kubernetes.io/rewrite-target : "/"
  - http:
      - path: /tomcat
          serviceName: tomcat-service
          servicePort: 8080
      - path: /alpine
          serviceName: alpine-service
          servicePort: 8080

The first few lines are familiar by now, specifying the API version, the type of resource that we want to create and a name. In the metadata section, we also add an annotation. The nginx ingress controller can be configured using this and similar annotations (see here for a list), and this annotation is required to make the example work.

In the specification section, we now define a set of rules. Our rule is a http rule, which, at the time of writing, is the only supported protocol. This is followed by a list of paths. Each path consists of a selector (“/httpd” and “/tomcat” in our case), followed by the specification of a backend, i.e. a combination of service name and service port, serving requests matching this path.

Next set up the ingress rule. Assuming that you have saved the manifest file above as ingress.yaml, simply run

$ kubectl apply -f ingress.yaml
ingress.extensions/test-ingress created

Now let us try this. We already know that the ingress controller has created a load balancer for us which serves all ingress rules. So let us get the name of this load balancer from the service specification and then use curl to try out both paths

$ host=$(kubectl get svc ingress-nginx -n ingress-nginx --output\
$ curl -k https://$host/httpd
$ curl -k https://$host/tomcat

The first curl should give you the standard output of the httpd service, the second one the standard Tomcat welcome page. So our ingress rule works.

Let us try to understand what is happening. The load balancer is listening on the HTTPS port 443 and picking up the traffic coming from there. This traffic is then redirected to a host port that you can read off from the output of aws elb describe-load-balancers, in my case this was 31135. This node port belongs to the service ingress-nginx that our ingress controller has set up. So the traffic is forwarded to the ingress controller. The ingress controller interprets the rules, determines the target service and forwards the traffic to the target service. Ignoring the fact that the traffic goes through the node port, this gives us the following simplified picture.


In fact, this diagram is a bit simplified as (see here) the controller does not actually send the traffic to the service cluster IP, but directly to the endpoints, thus bypassing the kube-proxy mechanism, so that advanced features like session affinity can be applied.

Ingress rules have many additional options that we have not yet discussed. You can define virtual hosts, i.e. routing based on host names, define a default backend for requests that do not match any of the path selectors, use regular expressions in your paths and use TLS secrets to secure your HTTPS entry points. This is described in the Kubernetes networking documentation and the API reference.

Creating ingress rules in Python

To close this post, let us again study how to implement Ingress rules in Python. Again, it is easiest to build up the objects from the bottom to the top. So we start with our backends and the corresponding path objects.


Having this, we can now define our rule and our specification section.

            paths=[tomcat_path, httpd_path]))

Finally, we assemble our ingress object. Again, this consists of the metadata (including the annotation) and the specification section.

metadata.annotations={"nginx.ingress.kubernetes.io/rewrite-target" : "/"}

We are now ready for our final steps. We again read the configuration, create an API endpoint and submit our creation request. You can find the full script including comments and all imports here


Quantum teleportation

Quantum states are in many ways different from information stored in classical systems – quantum states cannot be cloned and quantum information cannot be erased. However, it turns out that quantum information can be transmitted and replicated by combining a quantum channel and a classical channel – a process known as quantum teleportation.

Bell states

Before we explain the teleportation algorithm, let us quickly recall some definitions. Suppose that we are given a system with two qubits, say A and B. Consider the unitary operator

U = C_{NOT} (H \otimes I)

i.e. the operator that applies a Hadamard operator to qubit A and then a CNOT gate with control qubit A and target qubit B. Let us calculate the action of this operator on the basis  state |0 \rangle |0 \rangle.

U |0 \rangle |0 \rangle = \frac{1}{\sqrt{2}} C_{NOT} (|0 \rangle |0 \rangle + |1 \rangle |0 \rangle) = \frac{1}{\sqrt{2}} (|00 \rangle + |11 \rangle)

It is not difficult to show that this state cannot be written as a product – it is an entangled state. Traditionally, this state is called a Bell state.

Now the operator U is unitary, and it therefore maps a Hilbert space basis to a Hilbert space basis. We can therefore obtain a basis of our two-qubit Hilbert space that consists of entangled states by applying the transformation U to the elements of the computational basis. The resulting states are easily calculated and are as follows (we use the notation from [1]).

Computational basis vector Bell basis vector
|00 \rangle |\beta_{00} \rangle = \frac{1}{\sqrt{2}} (|00 \rangle + |11 \rangle)
|01 \rangle |\beta_{01} \rangle = \frac{1}{\sqrt{2}} (|01 \rangle + |10 \rangle)
|10 \rangle |\beta_{10} \rangle = \frac{1}{\sqrt{2}} (|00 \rangle - |11 \rangle)
|11 \rangle |\beta_{11} \rangle = \frac{1}{\sqrt{2}} (|01 \rangle - |10 \rangle)

Of course we can also reverse this process and express the elements of the computational basis in terms of the Bell basis. For later reference, we note that we can write the computational basis as

|00\rangle = \frac{1}{\sqrt{2}} (\beta_{00} + \beta_{10})
|01\rangle = \frac{1}{\sqrt{2}} (\beta_{01} + \beta_{11})
|10\rangle = \frac{1}{\sqrt{2}} (\beta_{01} - \beta_{11})
|11\rangle = \frac{1}{\sqrt{2}} (\beta_{00} - \beta_{10})

Quantum teleportation

Let us now turn to the real topic of this post – quantum teleportation. Suppose that Alice is in possession of a qubit that captures some quantum state |\psi \rangle = a |0 \rangle + b|1 \rangle, stored in some sort of quantum device, which could be a superconducting qubit, a trapped ion, a polarized photon or any other physical implementation of a qubit. Bob is operating a similar device. The task is to create a quantum state in Bob’s device which is identical to the state stored in Alice device.

To be able to do this, Alice and Bob will need some communication channel. Let us suppose that there is a classical channel that Alice can use to transmit a finite number of bits to Bob. Is that sufficient to transmit the state of the qubit?

Obviously, the answer is no. Alice will not be able to measure both coefficients a and b, and even if she would find a way to do this, she would be faced with the challenge of transmitting two complex numbers with arbitrary precision using a finite string of bits.

At the other extreme, if Alice and Bob were able to fully entangle their quantum devices, they would probably be able to transmit the state. They could for instance implement a swap gate to move the state from Alice device onto Bob’s device.

So if Alice and Bob are able to perform arbitrary entangled quantum operations on the combined system consisting of Alice qubit and Bob’s qubit, a transmission is possible. But what happens if the devices are separated? It turns out that a transmission is still possible based on the classical channel provided that Alice and Bob had a chance to prepare a common state before Alice prepares |\psi \rangle. In fact, this common state does not depend at all on |\psi \rangle, and at no point during the process, any knowledge of the state |\psi \rangle is needed.

The mechanism first described in [2] works as follows. We need three qubits that we denote by A, B and C. The qubit C contains the unknown state |\psi \rangle_C (we use the lower index to denote the qubit in which the state lives). We also assume that Alice is able to perform arbitrary measurements and operations on A and C, and that B similarly controls qubit B.

The first step in the algorithm is to prepare an entangled state

|\beta_{00}^{AB} \rangle = \frac{1}{\sqrt{2}} \big[ |0\rangle _A |0 \rangle _B+ |1\rangle_A |1 \rangle_B) \big]

Here the upper index at \beta indicates the two qubits in which the Bell state vector lives. This is the common state mentioned above, and it can obviously be prepared upfront, without having the state |\psi \rangle_C. Bob and Alice could even prepare an entire repository of such states, ready for being used when the actual transmission should take place.

Now let us look at the state of the combined system consisting of all three qubits.

|\beta_{00}^{AB} \rangle |\psi\rangle_C = \frac{1}{\sqrt{2}} \big[ a |000 \rangle + a |110 \rangle + b |001\rangle + b |111 \rangle \big]

For a reason that will become clear in a second, we will now adapt the tensor product order and choose the new order A-C-B. Then our state is (simply swap the last two qubits)

\frac{1}{\sqrt{2}} \big[ a |000 \rangle + a |101 \rangle + b |010\rangle + b |111 \rangle \big]

Now instead of using the computational basis for our three-qubit system, we could as well use the basis that is given by the Bell basis of the qubits A and C and the computational basis of B, i.e. |\beta^{AC}_{ij} \rangle |k\rangle. Using the table above, we can express our state in this basis. This will give us four terms that contain a and four terms that contain b. The terms that contain a are

\frac{a}{2} \big[ |\beta^{AC}_{00}\rangle |0 \rangle + |\beta^{AC}_{10}\rangle |0 \rangle + |\beta^{AC}_{01}\rangle |1 \rangle - |\beta^{AC}_{11} \rangle |1 \rangle \big]

and the terms that contain b are

\frac{b}{2} \big[ |\beta^{AC}_{01}\rangle |0 \rangle + |\beta^{AC}_{11}\rangle |0 \rangle + |\beta^{AC}_{00}\rangle |1 \rangle - |\beta^{AC}_{10} \rangle |1 \rangle \big]

So far we have not done anything to our state, we have simply re-written the state as an expansion into a different basis. Now Alice conducts a measurement – but she measures A and C in the Bell basis. This will project the state onto one of the basis vectors |\beta_{ij}^{AC}\rangle. Thus there are four different possible outcomes of this measurement, which we can read off from the expansion in terms of the Bell basis above.

Measurement State of qubit B
\beta_{00} a|0\rangle + b|1\rangle = |\psi \rangle
\beta_{01} a|1\rangle + b|0\rangle = X|\psi \rangle
\beta_{10} a|0\rangle - b|1\rangle = Z|\psi \rangle
\beta_{11} b|0\rangle - a|1\rangle = -XZ|\psi \rangle

Now Alice sends the outcome of the measurement to Bob using the classical channel. The table above shows that the state in Bob’s qubit B is now the result of a unitary transformation which depends only on the measurement outcome. Thus if Bob receives the measurement outcomes (two bits), he can do a lookup on the table above and apply the inverse transformation on his qubit. If, for instance, he receives the measurement outcome 01, he can apply a Pauli X gate to his qubit and then knows that the state in this qubit will be identical to the original state |\psi \rangle. The teleportation process is complete.

Implementing teleportation as a circuit

Let us now build a circuit that models the teleportation procedure. The first part that we need is a circuit that creates a Bell state (or, more generally, transforms the elements of the computational basis into the Bell basis). This is achieved by the circuit below (which again uses the Qiskit convention that the most significant qubit q1 is the leftmost one in the tensor product).


It is obvious how this circuit can be reversed – as both individual gates used are there own inverse, we simply need to reverse their order to obtain a circuit that maps the Bell basis to the computational basis.

Let us now see how the overall teleportation circuit looks like. Here is the circuit (drawn with Qiskit).


Let us see how this relates to the above description. First, the role of the qubits is, from the top to the bottom

C = q[2]
A = q[1]
B = q[0]

The first two gates (the Hadamard and the CNOT gate) act on qubits A and B and create the Bell basis vector \beta_{00} from the initial state. This state is the shared entangled state that Alice and Bob prepare upfront.

The next section of the circuit operates on the qubits A and C and realizes the measurement in the Bell basis. We first use a combination of another CNOT gate and another Hadamard gate to map the Bell basis to the computational basis and then perform a measurement – these steps are equivalent to doing a measurement in the Bell basis directly.

Finally, we have a controlled X gate and a controlled Z gate. As described above, these gates apply the conditional corrections to Bob’s state in qubit B that depend on the outcome of the measurement. As a result, the state of qubit B will be the same as the initial state of qubit C.

How can we test this circuit? One approach could be to add a pre-processing part and a post-processing part to our teleportation circuit. The pre-processing gate acts with one or more gates on qubit C to put this qubit into a defined state. We then apply the teleportation circuit. Finally, we apply a post-processing circuit, namely the inverse sequence of gates to the “target” of the teleportation, i.e. to qubit B. If the teleportation works, the pre- and post-steps act on the same logical state and therefore cancel each other. Thus the final result in qubit B will be the initial state of qubit C, i.e. |0 \rangle. Therefore, a measurement of this qubit will always yield zero. Thus our final test circuit looks as follows.


When we run this on a simulator, the result will be as expected – the value in the classical register c2 after the measurement will always be zero. I have not been able to test this on real hardware as the IBM device does not (yet?) support classical conditional gates, but the result is predictable – we would probably get the same output with some noise. If you want to play with the circuits yourself, you can, as always, find the code in my GitHub repository.

So this is quantum teleportation – not quite as mysterious as the name suggests. In particular, we do not magically teleport particles and actual matter, but simply quantum information, based on a clever split of the information contained in an unknown state into a classical part, transmitted in two classical bit, and a quantum part that we can prepare upfront. We also do not violate special relativity – to complete the process, we depend on the measurement outcomes that are transmitted via a classical channel and therefore not faster than the speed of light. It is also important to note that quantum teleportation does also not violate the “no-cloning”-principle – the measurements let the original state collapse and therefore we do not end up with two copies of the same state.

Quantum teleportation has already been demonstrated in reality several times. The first experimental verification was published in [3] in 1997. Since then, the distance between the involved parties “Alice” and “Bob” has been gradually increased. In 2017, a chinese team reported a teleportation of a single qubit between a ground observatory to a low-Earth-orbit satellite (see [4]).

The term quantum teleportation is also used in the context of quantum error correction for a circuit that uses a measurement on an entangled state to bring one of the involved qubits into a specific state. When using a surface code, for instance, this technique – also called gate teleportation – is used to implement the T gate on the level of logical qubits. We refer to [5] for a more detailed discussion of this pattern.


1. M. Nielsen, I. Chuang, Quantum Computation and Quantum Information, Cambridge University Press, Cambridge 2010
2. C.H. Bennett, G. Brassard, C. Crepeau, R. Jozsa, A Peres, W.K. Wootter, Teleporting an Unknown Quantum State via Dual Classical and EPR Channels, Phys. Rev. Lett. 70, March 1993
3. D. Bouwmeester, Jian-Wei Pan, K. Mattle, M. Eibl, H. Weinfurter, A. Zeilinger, Experimental quantum teleportation, Nature Vol. 390, Dec. 1997
4. Ji-Gang Ren et. al., Ground-to-satellite quantum teleportation, Nature Vol. 549, September 2017
5. D. Gottesman, I. L. Chuang, Quantum Teleportation is a Universal Computational Primitive, arXiv:quant-ph/9908010