Basics of the Ethereum blockchain

Today we will take a closer look at the Ethereum blockchain and discuss its most important structures, namely transactions, blocks and state. I assume that you are familiar with the basics of the blockchain technology, if not, I suggest that you read a few of my earlier posts on blocks, transactions and mining. This will be a long post, we will go a bit deeper than the usual introductions that you might have seen. As a consequence, this post is a bit more theoretical, we will get to more practical exercises soon once we have mastered the basics.

A short history of the Ethereum blockchain

As of today, the real identity of Satoshi Nakamoto, the author of the bitcoin white paper, is unknown, even though different people have claimed to be Satoshi over time. The origin of the Ethereum blockchain is far less mysterious. In fact, the Ethereum white paper that defines the basic structures and ideas of the Ethereum blockchain, was published in 2013 by Vitalik Buterin. Subsequently, its formal specification, known as yellow paper and working out the ideas presented in the white paper, was developed in 2014 (the initial commit on GitHub by Gavin Wood is from April 2014). The Ethereum foundation was established in the same year, and in 2015, the Ethereum network was launched with the creation of the first block, known as the genesis block.

Since then, Ethereum has been under constant development. Changes to the protocol are controlled by a formal process, based on EIPs (Ethereum improvement proposals). Several clients have been developed over time, like Geth in Go, OpenEthereum in Rust or the Hyperledger Besu client in Java.

At the time of writing, Ethereum is transitioning from a Proof-of-work consensus mechanism to a Proof-of-Stake (PoS) mechanism as part of the next major version of the protocol commonly referred to as Ethereum 2.0. With PoS, special nodes called validators are taking over the process of reaching consensus on the order of transactions by creating and validating new blocks. To become a validator, you have to invest a certain stake of digital currency that you lose if you misbehave. The intention of this change is to reduce the environmental footprint of the mining process, reduce transaction fees and – by supporting sharding – increase scalability. Even though the Beacon chain, which is the foundation for the new PoS approach, is already operational as of August 2021, the final transition will still take some time and is expected to happen at some point late in 2021 or early in 2022. As of today, the Rinkeby test network is already running a proof-of-authority (PoA) consensus algorithm known as clique, see EIP-255, but the final Ethereum 2.0 chain will be based on a protocol known as Casper (see for instance this paper or this paper on the Arxiv for more details on this)

Addresses and accounts

On a certain level, the Ethereum blockchain is conceptually very simple – there is, at any point in time, a state, describing among other things the balances of the participants in the network, there are transactions changing the state and there are blocks that group transactions for the purpose of achieving consensus on the order of transactions. And, of course, there are addresses and accounts that represent the participants in the network.

An account represents a (typically human) participant in the Ethereum network. Accounts are not stored in a central place, there is no such thing as “signing up” for an account. Instead, an account is simply a randomly generated public and private key pair, more precisely an ECDSA key pair with a 32 byte private key and a 64 byte public key (which, as always with ECDSA, can be derived from the private key). Everyone can create an account by simply creating such a key pair and use that account to build and submit transactions to the network. There is no central mechanism that makes sure that the same account is not used by two different actors, but given the length of the key (32 bytes, i.e. 256 bit) this is highly unlikely.

Associated with every account is the address, which is defined as the rightmost 160 bit (i.e. 20 bytes) of the hash value of the public key. Ethereum uses the Keccak hashing algorithm (which is not exactly what NIST has standardized as SHA3-256, but close to it). Again, theoretically two different public keys could produce the same address due to a collision, but in practice this possibility is mostly ignored, and the address is considered to be in a one-to-one relation to the key pair.

Actually, we have been cheating a bit at this point. The relation between accounts and addresses described above is only valid for accounts that are owned by (typically) human actors external to the blockchain. However, we have already learned that Ethereum offers the possibility to store and run smart contracts, i.e. pieces of code that execute on the blockchain. These contracts are represented by addresses as well, but there is no private key behind these addresses (and consequently, a smart contract can, by itself, not create and sign a transaction). To distinguish these two types of accounts, accounts that hold a key pair are sometimes called externally owned account (EOA), while addresses occupied by a smart contract are called contract accounts.


The Ethereum state is organized as key-value pairs, where the key is the address and the value is again a complex data structure which consists of the following fields.

  • First, there is the nonce. The nonce is a counter that is increased with every transaction that originates from this address (i.e. account), and we get back to its role when discussing transactions below
  • Then, there is a balance, which reflects the current amount of Ether (the native currency of the Ethereum blockchain) owned by the account
  • Next, there is a field called code, which, if the account is a contract account, holds the bytecode of the smart contract
  • Finally, there is a field called storage. This is again a set of key-value pairs that a smart contract can use as persistent storage – we will get to this when we learn more about smart contracts.

Technically, the state is not really stored in the blockchain. Instead, the blockchain contains the initial state (in block zero of the chain, i.e. the genesis block), and all transactions. As the state can only be changed as part of a transaction, this is sufficient to reconstruct the state from the blockchain data. This is in fact what a full Ethereum client (the piece of software making up a node – some people including myself find the term client for this a bit confusing, given that we will learn later that most of these clients actually act as a server) does when it is initially started – it gets all blocks from the current blockchain, replays the transactions which are part of the blocks and uses this to reconstruct the state, which is then stored in a database on disk.

Note that there are some clients, the so-called light clients, which do not actually go through this process, but only download block headers and access full clients to be able to read the state if needed. Even though the state is not stored as part of the blocks, each block contains a hash value built from the state. This (and the fact that the state is organized in a data structure called a Merkle Patricia trie) allows even a light clients to access and validate the state for a given address.

Ether and gas

Before proceeding to discuss transactions, it is helpful to understand what Ether and gas are. Ether is nothing but the native digital currency of the Ethereum blockchain, similar to what bitcoin is to the bitcoin blockchain. The smallest amount of Ether that can be transferred is one Wei, and 1018 Wei is equivalent to one Ether. The balance, for instance, that is stored as part of the state of an account, is the balance in Wei. Another unit that is frequently used is the GWei, which is 109 Wei, so that 109 GWei is again one Ether.

Ether is the currency used to make payments in the Ethereum blockchain. One of the things you pay for is the processing, i.e. validation and inclusion in a block, of transactions. Similar to the bitcoin network, miners (or, after transitioning to the proof-of-stake algorithm, validators) are rewarded in Ether. The way how the fees are calculated, however, is a bit more subtle than with the bitcoin network. The reason behind this is that as part of a transaction, a smart contract might have to be executed, and to avoid DoS attacks, we want the fees to depend on the complexity of that smart contract.

To achieve this, the Ethereum blockchain uses a measurement for the complexity of a transaction called gas. Every transaction consumes a certain amount of gas. A simple transfer, for instance, requires 21000 units of gas. When a smart contract is executed as part of a transaction, every instruction consumes a certain amount of gas as well.

To price gas and to therefore price transactions, every transaction contains a gas price. This is the amount of Ether (or Wei) that the participant posting the transaction is willing to pay per unit of gas consumed by the transaction. Miners can use the gas price to select the transactions that are most beneficial for them, so that transactions with a high gas price tend to be mined faster, while transactions with a low gas price can stay pending for a long time – potentially forever. Thus choosing a reasonable gas price is essential for the successful processing of a transaction, and clients typically use a heuristic to propose a gas price for a given transaction.

In addition to the amount of gas consumed by a transaction and the gas price, there is also a gas limit that the originator of a transaction can define. Especially when executing an unknown smart contract, the amount of gas needed can be hard to predict, and therefore the gas limit serves as a safeguard to make sure that a malicious contract cannot consume an unlimited amount of gas. If during the execution of a transaction, the gas limit is exceeded, the transaction is reverted – note, however, that the gas used up to this point is lost. Therefore choosing the gas limit is also vital to ensure proper processing of a transaction.

Let us go through an example to see how this works. Suppose that you run a comparatively complex transaction, like the deployment of a smart contract, that consumes 2,409,371 units of gas. Suppose further that the current gas price is 1 GWei. Then the amount of Ether you would have to pay for this transaction is

1 GWei = 1000000000 Wei x 2409371 = 2.409371 x 1015 Wei = 0.002409371 Ether

Assuming a price of 2.600 USD per Ether, this would cost you roughly 6.26 USD. In reality, however, the real gas price is even higher (today it was 25 GWei), so this would be more than 150 USD. This is an expensive transaction, and most transactions are cheaper. Still, transaction costs can be significant on the Ethereum mainnet (the example is actually taken from the Rinkeby test network, where of course Ether costs nothing, but just to give you an idea).

If, however, you submit the transaction with a gas limit below 2,409,371, the transaction would not complete and would be reverted, resulting in a loss of the gas consumed so far.

To make things a bit more complicated, there is actually a second gas limit – the gas limit per block. This limit is set by the miners and determines an upper limit for the amount of gas that all transactions included in a block can consume. The block gas limit is a field in the block header, and, according to the yellow paper, section 4.3.4 and section 6.2, a node will verify that for each block:

  • the block gas limit of the current block differs from the block limit of the parent block by at most roughly 0,1% (1/1024), so that miners can only change the block gas limit within this range with every new block
  • when a transaction is added to a block, the sum of the gas limit of this transaction and the gas used by the transactions already part of the block must not exceed the block gas limit

At the time of writing, the gas limit of a block on the mainnet is roughly 15 Mio. units of gas, and the utilisation is pretty efficient, meaning that most blocks seem to spend an amount of gas which is only barely below the block gas limit.


Let us now take a closer look at an Ethereum transaction. Here is a diagram that shows the fields that make up a transaction.

Most of these fields should be clear by now. We have already discussed the gas price and the gas limit. The signature actually consists of three values, conventionally called v, r and s. Here, r and s are the components of the signature as in the usual ECDSA algorithm, and v is an additional value called the parity that can be used to unambiguously recover the public key from the signature – this explains why the public key is not part of the transaction. Note that therefore, the signature also implicitly contains the address of the sender of the transaction.

The value is the amount of Ether (in Wei) that is to be transferred from the sender to the recipient of the transaction, which can of course be zero. Finally, there is the nonce, which is a counter that needs to be incremented by one for each transaction that is generated for a given sender. This value is needed at different points during the execution of a transaction (see again the yellow paper, sections 6 and 7)

  • When a transaction is validated, the nonce of the transaction needs to be equal to the nonce stored in the state of the senders address. When a transaction is executed, the nonce in the senders state is incremented by one
  • When a smart contract is deployed as part of a transaction, the nonce determines, together with the sender address, the address of the smart contract

A miner will actually queue a transaction if it detects a gap in the nonce. Thus if you submit a transaction with nonce one and a transaction with nonce three, the miner will process transaction one, but will put the transaction with nonce three into a queue, assuming that a transaction with nonce two is still on its way through the network.

Also note that miners typically allow you to replace a transaction that is still pending by sending a new transaction with the same nonce. This is useful if your transaction is stuck because the gas price is too low – you can then send the transaction again, using a higher gas price and the same nonce, and the miner will replace the transaction in its pool of pending transactions with the new version. Of course, this is only possible as long as the transaction has not yet been included in a block. Some wallets also allow you to do the same to cancel a pending transaction – simply send a transaction with the same nonce and value zero. Note, however, that miners will only accept the replacement if the gas price of the new transaction is at most equal to that of the pending transaction, and to cancel a transaction, you will actually want to increase the gas price to make sure that the replacement is mined, not the original transaction.

Why is the nonce needed? One answer to this is that it avoids a form of malicious behavior known as replay attack. Suppose you create, sign and submit a transaction that transfers 100 ETH to Eve. The transaction is mined, included in a block and added to the chain, and Eve happily receives the 100 ETH. Now Eve is greedy – she takes the transaction from the block (which she can easily do, as the data is public) and submits exactly the same transaction once more – and again, and again – you see where this is going. The nonce avoids this – when Eve tries to submit the transaction again, the nonce of the state will already have increased as a consequence of the first transaction, and miners and validators will reject the second copy of the transaction.

We have not yet touched upon the two extra fields on the right hand side of the diagram. The first field, the data, is relevant for transactions that are ordinary transactions, i.e. transactions targeting an existing smart contract or a EOA. The field can be used to include arbitrary data in the transaction, for instance arguments for the invocation of a smart contract. The second field, called init, is only relevant if a transaction is used to deploy a smart contract. A transaction will serve as deployment when its to field is the zero address. In this case, the init field is supposed to contain byte code that will be executed, and the result of this byte code will be stored as new smart contract at the contract address determined by sender and nonce.


After all these preparations, we are now ready to finally discuss blocks. In Ethereum, a block consists of three pieces – the block header, the list of transactions included in the block and a list of headers of other blocks, the so-called ommers (which is a gender-neutral term, sometimes these blocks are called uncle blocks). These components are displayed in the diagram below

The block header contains the following information (note that the order in the list and the diagram is not exactly the order in which the fields actually appear in the block, see the source code or the yellow paper for a full and more formal description)

  • First, there is a reference to the parent of the block, given by the hash value of the parent block. So the parent cannot be changed without breaking the chain, which is, after all, a characteristic property that you would expect from any blockchain. Formally, this is the Keccak hash of the RLP encoded parent block
  • Next, there is a hash value of the list of ommers that can be used to validate this data
  • The third field is the beneficiary, which is the address to which the mining reward for this block belongs
  • The next field is the hash value root of the Merkle-Patricia trie built from the state of all addresses. The presence of this field allows a light client to validate state information without having to download the entire chain
  • Similar to the hash value of the ommers list, the block header also contains a hash value of the tree of transactions included in this block
  • The next field is again the hash value of the root of a trie, this time the tree built from all transaction receipts. A transaction receipt is a data structure that describes the outcome of the process of validating a transaction. Similar to the state, it is not stored in the blockchain, but needs to be calculated by a client by replaying the transactions, and the presence of the root in the block header allows a client to validate that a receipt is not manipulated
  • A special part of a transaction receipt is the set of logs generated by the transaction. We will look at logs and events in a bit more detail when we talk about the Solidity programming language. To make it easier to scan the blockchain for specific log entries, the block header contains a Bloom filter of the log entries which is a special data structure that supports fast searching
  • The block header also contains the block number, i.e. the number of ancestors (the genesis block therefore has block number zero) and a creation timestamp
  • As discussed before, the block header contains the block gas limit along with the total gas used for this block, i.e. by all transactions in the block
  • A miner can use the extra data field to add at most 32 bytes of data to a block
  • Finally, there are the nonce, the mixed hash and the difficulty, which are used for the PoW algorithm (Ethereum uses an algorithm called ethash which aims at making the use of ASICS for mining more difficult by using large data structures that need to be manipulated in memory, see also appendix J of the yellow paper for a formal definition)

As you would expect, a miner that mines a new block is rewarded for this work. The reward consists of two components. First, the miner receives a base reward of currently 2 ETH for each newly mined block, regardless of the transactions contained in the block. Second, the miner receives transaction fees. The mechanism by which this happens is currently being changed by EIP-1599. Previously, a miner received the full transaction fees for all transactions in the block being mined. After implementation of the EIP, the fees will consist of a base fee that is allowed to vary slowly over time, and a priority fee that is the difference of the total transaction fees and the base fee. The miner will only receive the priority fee, and the base fee will be burned. The EIP also adds the base fee as an additional field to the block header.

Most of the above should sound familiar – but there is one detail that struck me on first reading, namely the role of the ommers. Why are they needed? The reason for including ommers is that in addition to the block reward that a miner receives for a new block, the miners (i.e. beneficiaries) of an ommer block referenced in a new block will also receive a reward, called the uncle reward.

The motivation behind this (you might want to read this paper for all the glorious details and how this impacts the rewards of miners) is as follows. Suppose you are a miner that is mining a new block, say A. At the same time, a second miner is mining another valid block, called B. Both blocks have the same parent P (the current tip of the canonical chain). Now, as the block mining rate on the Ethereum block chain is rather high, it can happen and will happen that A and B are found and distributed at roughly the same point in time. This will lead to a short fork, but after some time, the chain stabilizes and only A or B will become part of the canonical chain (the longest chain). Suppose that block B ends up being on the canonical chain – then your mining reward for block A is not part of “the” state any more, and your reward is lost.

However, suppose that you now mine a new block C, with parent B. Then, the stale block A will be an ommer of block C. If you manage to include A in the ommer list of block C, and block C makes it to the chain, you (being the beneficiary of block A) will still receive the uncle reward. The uncle reward is lower than the standard block reward, but at least the reward is not zero. In this sense, the uncle reward is a mechanism that fosters fast mining and rewards miners for producing blocks, even if this block does not make it to the canonical chain. In addition, a miner who includes ommers in a block also receives a small reward as an incentive to also include ommers created by other miners.

This closes todays post. Yes, this was a long post, but if you have followed me so far, you should have gained a solid understanding of the basic building blocks of the Ethereum block chain. Armed with this understanding, we will – in the next post – go ahead and learn more about the mysterious smart contracts that we have already touched upon several times.

The Ethereum blockchain, smart contracts and token

If you have followed my blog for some time, you might know that it started with a few posts on the bitcoin blockchain – about its foundations in elliptic curve cryptography, blocks, mining and transactions. The bitcoin blockchain has been established in 2009, and since then, a lot has happened in the blockchain world.

Maybe the most exciting new development are token – tradable coins that actually live on top of an existing blockchain. In particular non-fungible token are all the rage these days, and allow you to document ownership in a particular, uniquely identifiable asset like a piece of classical or digital art in the blockchain. Everybody who has access to a blockchain can create a token, and, according to Investopedia, more than 200.000 of these token did already exist by the end of 2019.

Technically, a token is nothing else but an application that uses persistent storage to store the information who owns which token respectively how many token. The point of a token is that this is not simply an ordinary application running in some data center, which might raise the usual concerns about whether you can trust the programmer and the operator, but is a so-called smart contract – an application whose code is stored in the blockchain and which is in a certain sense running on top of the blockchain and uses the blockchain as storage.

Thus, token ownership is stored in the blockchain, and as such, is subject to the usual guarantees in terms of integrity and durability that a blockchain has to offer. As the program code itself is also stored in the blockchain, you can also trust that it is not manipulated after initial deployment, and, as every node can run the code independently, the consensus mechanism of the blockchain also makes a manipulation during program execution at least extremely difficult.

Token are an important, but by far not the only application of smart contracts. You could, for instance, implement a smart contract that allows you to cast votes based on blockchain technology – the technology will make sure that every participant can only vote once, that votes are correctly accounted for and cannot be manipulated, and that the entire voting process is documented transparently in the blockchain. Or you can build a smart contract that acts as a deposit for collateral, where the logic implemented in the contract makes sure that the collateral is only released if a certain condition is met. There are broker applications that allow you to trade digital currency without the need for a trusted third party, fully decentralized organisations (DAO), whose members would, for instance, jointly invest into startups and vote transparently on the usage of funds, and many more applications of smart contracts. There are even games – check out Crypto Kitties, one of the first NFTs that was implemented.

Not every blockchain supports smart contracts. The bitcoin blockchain, for instance, does not (even though there is some scripting built into the validation process). The most popular (and, to my understanding, the first) blockchain that introduced smart contracts is the Ethereum blockchain, which was initially designed in 2013 and launched as a project in 2015. The Hyperledger Fabric blockchain has a similar concept (although smart contracts are technically quite different from what Ethereum does), and the same is true for Corda or EOS.

A couple of weeks back I became curious and wanted to understand how exactly a token works. I started to dive a bit into the Ethereum blockchain, smart contracts, token standards, dApps and Solidity, and, as always, decided to document my findings in a short series on this blog. If you follow along, here is what you will learn.

At the end, we will put everything together, mint our own NFT and build a frontend that will act as a wallet, allow you to list the token that you and others own and to trade token. As always, I will make the corresponding code available in my GitHub account so that you can get your hands dirty and directly jump into coding.

So let us get started and dive into the Ethereum blockchain – what it is, why it is different from the bitcoin blockchain and how it serves as basis for smart contracts and token. Watch out for my next post to appear which will talk about this.

Asynchronous I/O with Python part III – native coroutines and the event loop

In the previous post, we have seen how iterators and generators can be used in Python to implement coroutines. With this approach, a coroutine is simply a function that contains a yield statement somewhere. This is nice, but makes the code hard to read, as the function signature does not immediately give you a hint whether it is a generator function or not. Newer Python releases introduce a way to natively designate functions as asynchronous functions that behave similar to coroutines and can be waited for using the new async and await syntax.

Native coroutines in Python

We have seen that coroutines can be implemented in Python based on generators. A coroutine, then, is a generator function which runs until it is suspended using yield. At a later point in time, it can be resumed using send. If you know Javascript, this will sound familiar – in fact, with ES6, Javascript has introduced a new syntax to declare generator functions. However, most programmers will probably be more acquainted with the concepts of an asynchronous functions in Javascript and the corresponding await and async keyword.

Apparently partially motivated by this example and by the increasing popularity of asynchronous programming models, Python now has a similar concept that was added to the language with PEP-492 which introduces the same keywords into Python as well (as a side note: I find it interesting to see how these two languages have influenced each other over the last couple of years).

In this approach, a coroutine is a function marked with the async keyword. Similar to a generator-based coroutine which runs up to the next yield statement and then suspends, a native coroutine will run up to the next await statement and then suspend execution.

The argument to the await statement needs to be an awaitable object, i.e. one of the following three types:

  • another native coroutine
  • a wrapped generator-based coroutine
  • an object implementing the __await__ method

Let us look at each of these three options in a bit more detail

Waiting for native coroutines

The easiest option is to use a native coroutine as target for the await statement. Similar to a yield from, this coroutine will then resume execution and run until it hits upon an await statement itself. An example for such a coroutine is asyncio.sleep(), which sleeps for the specified number of seconds. You can define your own native coroutine and await the sleep coroutine to suspend your coroutine until a certain time has passed.

async def coroutine():
    await asyncio.sleep(3)

Similar to yield from, this builds a chain of coroutines that hand over control to each other. A coroutine that has been “awaited” in this way can hand over execution to a second coroutine, which in turn waits for a third coroutine and so forth. Thus await statements in a typical asynchronous flow form a chain.

Now we have seen that a chain of yield from statements typically ends with a yield statement, returning a value or None. Based on that analogy, one might think that the end of a chain of await statements is an await statement with no argument. This, however, is not allowed and would also not appear to make sense, after all you wait “for something”. But if that does not work, where does the chain end?

Time to look at the source code of the sleep function that we have used in our example above. Here we need to distinguish two different cases. When the argument is zero, we immediately delegate to __sleep0, which is actually very short (we will look at the more general case later).

def __sleep0():

So this is a generator function as we have seen it in the last post, with an additional annotation, which turns it into a generator-based coroutine.

Generator-based coroutines

PEP-492 emphasizes that native coroutines are different from generator-based coroutines, and also enforces this separation. It is, for instance, an error to execute a yield inside a native coroutine. However, there is some interoperability between these two worlds, provided by the the decorator *types.coroutine that we have seen in action above.

When we decorate a generator-based coroutine with this decorator, it becomes a native coroutine, which can be awaited. The behaviour is very similar to yield from, i.e. if a native coroutine A awaits a generator-based coroutine B and is run via send, then

  • if B yields a value, this value is directly returned to the caller of A.send() as the result of the send invocation
  • at this point, B suspends
  • if we call A.send again, this will resume B (!), and the yield inside B will evaluate to the argument of the send call
  • if B returns or raises a StopIteration, the return value respectively the value of the StopIteration will be visible inside A as the value of the await statement

Thus in the example of asyncio.sleep(0), generator-based coroutines are the answer to our chicken-and-egg issue and provide the end point for the chain of await statements. If you go back to the code of sleep, however, and look at the more general case, you will find that this case is slightly more difficult, and we will only be able to understand it in the next post once we have discussed the event loop. What you can see, however, is that eventually, we wait for something called a future, so time to talk about this in a bit more detail.

Iterators as future-like objects

Going back to our list of things which can be waited for, we see that by now, we have touched on the first two – native coroutines and generator-based coroutines. A future (and the way it is implemented in Python) is a good example for the third case – objects that implement __await__.

Following the terminology used in PEP-492, any object that has an __await__ method is called a future-like object, and any such object can be the target of an await statement. Note that both a native coroutine as well as a generator-based coroutine have an __await__ method and are therefore future-like objects. The __await__ method is supposed to return an iterator, and when we wait for an object implementing __await__, this iterator will be run until it yields or returns.

To make this more tangible, let us see how we can use this mechanism to implement a simple future. Recall that a future is an object that is a placeholder for a value still to be determined by an asynchronous operation (if you have ever worked with Javascript, you might have heard of promises, which is a very similar concept). Suppose, for instance, we are building a HTTP library which has a method like fetch to asynchronously fetch some data from a server. This method should return immediately without blocking, even though the request is still ongoing. So it cannot yet return the result of the request. Instead, it can return a future. This future serves as a placeholder for the result which is still to come. A coroutine could use await to wait until the future is resolved, i.e. the result becomes available.

Of course we will not write a HTTP client today, but still, we can implement a simple future-like object which is initially pending and yields control if invoked. We can then set a value on this future (in reality, this would be done by a callback that triggers when the actual HTTP response arrives), and a waiting coroutine could then continue to run to retrieve the value. Here is the code

class Future:

    def __await__(self):
        if not self._done:
            return self._result

    def __init__(self):
        self._done = False

    def done(self, result):
        self._result = result
        self._done = True

When we initially create such an object, its status will be pending, i.e. the attribute _done will be set to false. Awaiting a future in that state will run the coroutine inside the __await__ method which will immediately yield, so that the control goes back to the caller. If now some other asynchronous task or callback calls done, the result is set and the status is updated. When the coroutine is now resumed, it will return the result.

To trigger this behaviour, we need to create an instance of our Future class and call await on it. Now using await is only possible from within a native coroutine, so let us write one.

async def waiting_coroutine(future):
    data = None
    while data is None:
        data = await future
    return data

Finally, we need to run the whole thing. Similar as for generator-based coroutines, we can use send to advance the coroutine to the next suspension point. So we could something like this.

coro = waiting_coroutine(future)
# Trigger a first iteration - this will suspend in await
assert(None == coro.send(None))
# Mark the future as done
# Now the second iteration should complete the coroutine
except StopIteration as e:
    print("Got StopIteration with value %d" % e.value)

Let us see what is happening behind the scenes when this code runs. First, we create the future which will initially be pending. We then make a call to our waiting_coroutine. This will not yet start the process, but just build and return a native coroutine, which we store as coro.

Next, we call send on this coroutine. As for a generator-based coroutine, this will run the coroutine. We reach the point where our coroutine waits for the future. Here, control will be handed over to the coroutine declared in the __await__ method of the future, i.e. this coroutine will be created and run. As _done is not yet set, it will yield control, and our send statement returns with None as result.

Next, we change the state of the future and provide a value, i.e we resolve the future. When we now call send once more, the coroutine is resumed. It picks up where it left, i.e. in the loop, and calls await again on the future. This time, this returns a value (25). This value is returned, and thus the coroutine runs to completion. We therefore get a StopIteration which we catch and from which we can retrieve the value.

The event loop

So far, we have seen a few examples of coroutines, but always needed some synchronous code that uses send to advance the coroutine to the next yield. In a real application, we would probably have an entire collection of coroutines, representing various tasks that run asynchronously. We would then need a piece of logic that acts as a scheduler and periodically goes through all coroutines, calls send on them to advance them to the point at which they return control by yielding, and look at the result of the call to determine when the next attempt to resume the coroutine should be made.

To make this useful in a scenario where we wait for other asynchronous operations, like network traffic or other types of I/O operations, this scheduler would also need to check for pending I/O and to understand which coroutine is waiting for the result of a pending I/O operation. Again, if you know Javascript, this concept will sound familiar – this is more or less what the event loop built into every browser or the JS engine running in Node.js is doing. Python, however, does not come with a built-in event loop. Instead, you have to select one of the available libraries that implement such a loop, for instance the asyncio library which is distributed with CPython. Using this library, you define tasks which wrap native coroutines, schedule them for execution by the event loop and allow them to wait for e.g. the result of a network request represented by a future. In a nutshell, the asyncio event loop is doing exactly this


In the next post, we will dig a bit deeper into the asyncio library and the implementation of the event loop.

Asynchronous I/O with Python part II – iterators and generators

As explained in my previous post, historically coroutines in Python have evolved from iterators and generators, and understanding generators is still vital to understanding native coroutines. In this post, we take a short tour through iterators in Python and how generators have traditionally been implemented.

Iterables and iterators

In Python (and in other programming languages), an iterator is an object that returns a sequence of values, one at a time. While in languages like Java, iterators are classes implementing a specific interface, Python iterators are simply classes that have a method __next__ which is supposed to either return the next element from the iterator or raise a StopIteration exception to signal that no further elements exist.

Iterators are typically not created explicitly, but are provided by factory classes called iterables. An iterable is simply a class with a method __iter__ which in turn returns an iterator. Behind the scenes, iterables and iterators are used when you run a for-loop in Python – Python will first invoke the __iter__ of the object to which you refer in the loop to get an iterator and then call the __next__ method of this iterator once for every iteration of the loop. The loop stops when a StopIteration is raised.

This might sound a bit confusing, so let us look at an example. Suppose you wanted to build an object which – like the range object – allows you to loop over all numbers from 0 to a certain limit. You would then first write a class that implements a method __next__ that returns the next value (so it has to remember the last returned value), and then implement an iterable returning an instance of this class.

class SampleIterator:

    def __init__(self, limit):
        self._position = 0
        self._limit = limit

    def __next__(self):
        if self._position < self._limit:
            self._position += 1
            return self._position - 1
            raise StopIteration

class SampleIterable:

    def __init__(self, limit):
        self._limit = limit

    def __iter__(self):
        return SampleIterator(self._limit)

myIterable = SampleIterable(10)
for i in myIterable:
    print("i = %d" % i)

Often, the same object will implement the __next__ method and the __iter__ method and therefore act as iterable and iterator at the same time.

Note that the iterator typically needs to maintain a state – it needs to remember the state after the last invocation of __next__ has completed. In our example, this is rather straightforward, but in more complex siutations, programmatically managing this state can be tricky. With PEP-255, a new approach was introduced into Python which essentially allows a programmer to ask the Python interpreter to take over this state management – generators.

Generators in Python

The secret sauce behind generators in Python is the yield statement. This statement is a bit like return in that it returns a value and the flow of control to the caller, but with the important difference that state of the currently executed function is saved by Python and the function can be resumed at a later point in time. A function that uses yield in this way is called a generator function.

Again, it is instructive to look at an example. The following code implements our simple loop using generators.

def my_generator(limit=5):
    _position = 0
    while _position < limit:
        yield _position 
        _position += 1

for i in my_generator(10):
    print("i = %d" % i)

We see that we define a new function my_generator which, at the first glance, looks like an ordinary function. When we run this function for the first time, it will set a local variable to set its current position to zero. We then enter a loop to increase the position until we reach the limit. In each iteration, we then invoke yield to return the current position back to the caller.

In our main program, we first call my_generator() with an argument. As opposed to an ordinary function, this invocation does not execute the function. Instead, it evaluates the argument and builds and returns an object called a generator object. This object is an iterator, i.e. it has a __next__ method. When this method is called for the first time, the execution of our function body starts until it hits the first yield statement. At this point, the execution returns to the caller and whatever we yield is returned by the call to __next__. When now __next__ is invoked again, the Python interpreter will restore the current state of the function and resume its execution after the yield. We increase our internal position, enter the loop again, hit the next yield and so forth. This continues until the limit is reached. Then, the function returns, which is equivalent to raising a StopIteration and signals to the caller that the iterator is exhausted.

Instead of using the for loop, we can also go through the same steps manually to see how this works.

generator = my_generator(5)
while True:
        value = generator.__next__()
        print("Value: %d" % value)
    except StopIteration:

This is already quite close to the programming model of a co-routine – we can start a coroutine, yield control back to the caller and resume execution at a later point in time. However, there are a few points that are still missing and that have been added to Python coroutines with additional PEPs.

Delegation to other coroutines

With PEP-380, the yield from statement was added to Python, which essentially allows a coroutine to delegate execution to another coroutine.

A yield from statement can delegate either to an ordinary iterable or to another generator.

What yield from is essentially doing is to retrieve an iterator from its argument and call the __next__ method of this iterator, thus – if the iterable is a generator – running the generator up to the next yield. Whatever this yield returns will then be yielded back to the caller of the generator containing the yield from statement.

When I looked at this first, I initially was under the impression that if a generator A delegates to generator B by doing yield from B, and B yields a value, control would go back to A, similar to a subroutine call. However, this is not the case. Instead of thinking of a yield from like a call, its better to think of it like a jump. In fact, when B yields a value, this value will be returned directly to the caller of A. The yield from statement in A only returns when B either returns or raises a StopIteration (which is equivalent), and the return value of B will then be the value of the yield from statement. So you might think of the original caller and A as being connected through a pipe through which yielded values are sent back to the caller, and if A delegates to B, it also hands the end of the pipe over to B where it remains until B returns (i.e. is exhausted in the sense of an iterator).


Passing values and exceptions into coroutines

We have seen that when a coroutine executes a yield, control goes back to the caller, i.e. to the code that triggered the coroutine using __next__, and when the coroutine is resumed, its execution continues at the first statement after the yield. Note that yield is a statement and takes an argument, so that the coroutine can hand data back to the caller, but not the other way round. With PEP-342, this was changed and yield became an expression so that it actually returns a value. This allows the caller to pass a value back into the generator function. The statement to do this is called send.

Doing a send is a bit like a __next__, with the difference that send takes an argument and this argument is delivered to the coroutine as result of the yield expression. When a coroutine runs for the first time, i.e. is not resumed at a yield, only send(None) is allowed, which, in general, is equivalent to __next__. Here is a version of our generator that uses this mechanism to be reset.

def my_generator(limit=5):
    _position = 0
    while _position < limit:
        cur = _position
        val = yield cur 
        if val is not None:
            # We have been resumed due to a send statement. 
            _position = val
            yield val
            _position += 1

We can now retrieve a few values from the generator using __next__, then use send to set the position to a specific value and then continue to iterate through the generator.

generator = my_generator(20)
assert 0 == generator.__next__()
assert 1 == generator.__next__()
assert 7 == generator.__next__()

Instead of passing a value into a coroutine, we can also throw an exception into a coroutine. This actually quite similar to the process of sending a value – if we send a value into a suspended coroutine, this value becomes visible inside the coroutine as the return value of the yield at which the coroutine is suspended, and if we throw an exception into it, the yield at which the coroutine is suspended will raise this exception. To throw an exception into a coroutine, use the throw statement, like

generator = my_generator(20)
assert 0 == generator.__next__()

If you run this code and look at the resulting stack trace, you will see that in fact, the behavior is exactly as if the yield statement had raised the exception inside the coroutine.

The generator has a choice whether it wants to catch and handle the exception or not. If the generator handles the exception, processing continues as normal, and the value of the next yield will be returned as result of throw(). If, however the generator decides to not handle the exception or to raise another exception, this exception will be passed through and will show up in the calling code as if it had been raised by throw. So in general, both send and throw statements should be enclosed in a try-block as they might raise exceptions.

Speaking of exceptions, there are a few exceptions that are specific for generators. We have already seen the StopIteration exception which is thrown if an iterator or generator is exhausted. A similar exception is GeneratorExit which can be thrown into a generator to signal that the generator should complete. A generator function should re-raise this exception or raise a StopIteration so that its execution stops, and the caller needs to handle the exception. There is even a special method close that can be used to close a coroutine which essentially does exactly this – it throws a GeneratorExit into the coroutine and expects the generator to re-raise it or to replace it by a StopIteration exception which is then handled. If a generator is garbage-collected, the Python interpreter will execute this method.

This completes our discussion of the “old-style” coroutines in Python using generator functions and yielding. In the next post, we will move on to discuss the new syntax for native coroutines introduced with Python 3.5 in 2015.

Asynchronous I/O with Python part I – the basics

Though not really new, a programming model commonly known as asynchronous I/O has been attracting a lot of attention over the last couple of years and even influenced the development of languages like Java, Go or Kotlin. In this and the next few posts, we will take a closer look at this model and how it can be implemented using Python.

What is asynchronous I/O?

The basic ideas of asynchronous I/O are maybe explained best using an example from the world of networking, which is at the same time the area where the approach excels. Suppose you are building a REST gateway that accepts incoming connections and forwards them to a couple of microservices. When a new client connects, you will have to make a connection to a service, send a request, wait for the response and finally deliver the response back to the client.

Doing this, you will most likely have to wait at some points. If, for instance, you build a TCP connection to the target service, this involves a handshake during which you have to wait for network messages from the downstream server. Similarly, when you have established the connection and send the request, it might take some time for the response to arrive. While this entire process is n progress, you will have to maintain some state, for instance the connection to the client which you need at the end to send the reply back.

If you do all this sequentially, your entire gateway will block while a request is being processed – not a good idea. The traditional way to deal with this problem has been to use threads. Every time a new request comes in, you spawn a thread. While you have to wait for the downstream server, this thread will block, and the scheduler (the OS scheduler if you use OS-level threads or some other mechanism) will suspend the thread, yield the CPU to some other thread and thus allow the gateway to serve other requests in the meantime. When the response from the downstream server arrives, the thread is woken up, and, having saved the state, the processing of the client’s request can be completed.

This approach works, but, depending on the implementation, creating and running threads can create significant overhead. In addition to the state, concurrently managing a large number of threads typically involves a lot of scheduling, locking, handling of concurrent memory access and kernel calls. This is why you might try a different implementation that entirely uses user-space mechanism.

You could, for instance, implement some user-space scheduler mechanism. When a connection is being made, you would read the incoming request, send a connection request (a TCP SYN) to the downstream server and then voluntarily return control to the scheduler. The scheduler would then monitor (maybe in a tight polling loop) all currently open network connections to downstream servers. Once the connection is made, it would execute a callback function which triggers the next steps of the processing and send a request to the downstream server. Then, control would be returned to the scheduler which would invoke another callback when the response arrives and so forth.

With this approach, you would still have to store some state, for instance the involved connections, but otherwise the processing would be based on a sequence of individual functions or methods tied together by a central scheduler and a series of callbacks. This is likely to be very efficient, as switching between “threads” only involves an ordinary function call which is much cheaper than a switch between two different threads. In addition, each “thread” would only return control to the scheduler voluntarily, implementing a form of cooperative multitasking, and can not be preempted at unexpected points. This of course makes synchronization much easier and avoids most if not all locking, which again removes some overhead. Thus such a model is likely to be fast and efficient.

On the downside, without support from the used programming language for such a model, you will easily end up with a complex set of small functions and callbacks, sometimes turning into a phenomenon known as callback hell. To avoid this, more and more programming languages offer a programming model which supports this approach with libraries and language primitives, and so does Python.

Coroutines and futures

The model which we have described is not exactly new and has been described many years ago. In this model, processing takes place in a set of coroutines. Coroutines are subroutines or functions which have the ability to deliberately suspend their own execution – a process known as yielding. This will save the current state of the coroutine and return control to some central scheduler. The scheduler can later resume the execution of the coroutine which will pick up the state and continue to run until it either completes or yields again (yes, this is cooperative multitasking, and this is where the name – cooperative routines – comes from).

Coroutines can also wait for the result of a computation which is not yet available. Such a result is encapsulated in an object called a future. If, for instance, a coroutine sends a query to a downstream server, it would send the HTTP request over the network, create a future representing the reply and then yield and wait for the completion of this future. Thus the scheduler would gain back control and could run other coroutines. At the same time, the scheduler would have to monitor open network connections, and, when the response arrives, complete the future, i.e. provide a value, and reschedule the corresponding coroutine.


Finally, some additional features would be desirable. To support modularization, it would be nice if coroutines could somehow call each other, i.e. if a coroutine could delegate a part of its work to another coroutine and wait for its completion. We would probably also want to see some model of exception handling. If, for instance, a coroutine has made a request and the response signals an error, we would like to see a way how the coroutine learns about this error by being woken up with an exception. And finally, being able to pass data into an already running coroutines could be beneficial. We will later see that the programming model that Python implements for coroutines supports all of these features.

Organisation of this series

Coroutines in Python have a long history – they started as support for iterators, involved into what is today known as generator-based coroutines and finally turned into the native coroutines that Python supports today. In addition, the asyncio library provides a framework to schedule coroutines and integrate them with asynchronous I/O operations.

Even today, the implementation of coroutines in Python is still internally based on iterators and generators, and therefore it is still helpful to understands these concepts, even if we are mainly interested in the “modern” native coroutines. To reflect this, the remaining posts in this series will cover the following topics.

  • Iterators and generator-based coroutines
  • Native coroutines
  • The main building blocks of the low-level asyncio API – tasks, futures and the event loop
  • Asynchronous I/O and servers
  • Building an asynchronous HTTP server from scratch

To follow the programming examples, you will need a comparatively new version of Python, specifically you will need Python 3.7 or above. In case you have an older version, either get the latest version from the Python download page and build it from source, or (easier) try to get a more recent package for your OS (for Ubuntu, for instance, there is the deadsnake PPA that you can use for that purpose).

Learning Kafka with Python – retries and idempotent writes

In the past few posts, we have discussed approaches to implement at-least-once processing on the consumer side, i.e. mechanisms that make sure that every record in the partition is only processed once. Today, we will look at a similar problem on the producer side – how can we make sure that every record is written into the partition only once? This sounds easy, but can be tricky if we need to retry failed message without knowing the exact error that has occured.

The retry problem

In the sample producer that we have looked at in a previous post, we missed an important point – error handling. The most important error that a reliable producer needs to handle is an error when handing over a new record to the broker.

In general, Kafka differentiates between retriable errors, i.e. transient errors like individual packets being lost on the network, and non-retriable errors, i.e. errors like an invalid authorization for which a retry does not make sense. For most transient errors, the client will – under the hood – automatically attempt a retry if a record could not be sent.

Let us take a short look at the Java producer as an example. When a batch of records has been sent to the broker as a ProduceRequest, the response is handled in the method handleProduceResponse. Here, a decision is made whether an automatic retry should be initiated, in which case the batch of records will simply be added to the queue of batches to be sent again. The logic to decide when a retry should be attempted is contained in the method canretry, and in the absence of transactions (see the last section of this post), it will decide to retry if the batch has not timed-out yet (i.e. has been created more than before), the error is retriable and the number of allowed retries (set via the parameter retries) has not yet been reached. Examples for retriable exceptions are exceptions due to a low number of in-sync replicas, timeouts, connection failures and so forth.

This is nice, but there is a significant problem when using automated retries. If, for instance, a produce request times out, it might very well be that this is only due to a network issue and in the background, the broker has actually stored the record in the partition log. If we retry, we will simply send the same batch of records again, which could lead to duplicate records in the partition log. As these records will have different offsets, there is no way for a consumer to detect this duplicate. Depending on the type of application, this can be a major issue.

If you wanted to solve this on the application level, you would probably set retries to zero, implement your own retry logic and use a sequence number to allow the consumer to detect duplicates. A similar logic referred to as idempotent writes has been added to Kafka with KIP-98 which was implemented in release 0.11 in 2016.

What are idempotent writes?

Essentially, idempotent writes use a sequence number which is added to each record by the producer to allow the broker to detect duplicates due to automated retries. This sequence number is added to a record shortly before it is sent (more precisely, a batch of records receives a base sequence number, and the sequence number of a record is the base sequence number plus its index in the batch), and if an automated retry is made, the exact same batch with the same sequence number is sent again. The broker keeps track of the highest sequence number received, and will not store any records with a sequence number smaller than or equal to the currently highest processed sequence number.

To allow all followers to maintain this information as well, the sequence number is actually added to the partition log and therefore made available to all followers replicating the partitions, so that this data survives the election of a new partition leader.

In a bit more detail, the implementation is slightly more complicated than this. First, it would imply a high overhead to maintain a globally unique sequence number across all producers and partitions. Instead, the sequence number is maintained per producer and per partition. To make this work, producers will be assigned a unique ID called the producer ID. In fact, when a producer that uses idempotent writes starts, it will send an InitPidRequest to the broker. The broker will then assign a producer ID and return it in the response. The producer stores the producer ID in memory and adds it to all records being sent, so that the broker knows from which producer a record originates. Similar to the sequence number, this information is added to the records in the partition log. Note, however, that neither the producer ID nor the sequence number are passed to a consumer by the consumer API.

How does the broker determine the producer ID to be assigned? This depends on whether idempotent writes are used in combination with transactions. If transactions are used, we will learn in the next post that applications need to define an ID called transaction ID that is supposed to uniquely identify a producer. In this case, the broker will assign a producer ID to each transaction ID, so that the producer ID is effectively persisted across restarts. If, however, idempotent writes are used stand-alone, the broker uses a ZooKeeper sequence to assign a sequence number, and if a producer is either restarted or (for instance due to some programming error) sends another InitPidRequest, it will receive a new producer ID. For each new partition assigned to a producer not using transactions, the sequence number will start again at zero, so that the sequence number is only unique per partition and producer ID (which is good enough for our purpose).

Another useful feature of idempotent writes is that a Kafka broker is now able to detect record batches arriving in the wrong order. In fact, if a record arrives whose sequence number is higher than the previously seen sequence number plus one, the broker assumes that records got lost in flight or we see an ordering issue due to a retry and raises an error. Thus ordering is now guaranteed even if we allow more than one in-flight batch.

Trying it out

Time again to try all this. Unfortunately, the Kafka Python client that we have used so far does not (yet) support KIP-98. We could of course use a Java or Go client, but to stick to the idea of this little series to use Python, let us alternatively employ the Python client provided by Confluent.

To install this client, use

pip3 install confluent-kafka==1.4.1

Here I am using version 1.4.1 which was the most recent version at the time when this post was written, so you might want to use the same version. Using the package is actually straightforward. Again, we first create a configuration, then a producer and then send records to the broker asynchronously. Compared to the Kafka Python library used so far, there are a few differences which are worth being noted.

  • Similar to the Kafka Python library, sends are done asynchronously. However, you do not receive a future when sending as it is the case for the Kafka Python library, but you define a callback directly
  • To make sure that the callback is invoked, you have to call the poll method of the producer on a regular basis
  • When you are done producing, you have to explicitly call flush to make sure that all buffered messages are sent
  • The configuration parameters of the client follow the Java naming conventions. So the bootstrap servers, for instance, are defined by a configuration parameter called bootstrap.servers instead of bootstrap_servers, and the parameter itself is not a Python list but a comma-separated list passed as a string
  • The base producer class accepts bytes as values and does not invoke a serializer (there is a derived class doing this, but this class is flagged as not yet stable in the API documentation so I decided not to use it)

To turn on idempotent writes, there are a couple of parameters that need to be set in the producer configuration.

  • enable.idempotence needs to be 1 to turn on the feature
  • acks needs to be set to “all”, i.e. -1
  • should be set to one
  • retries needs to be positive (after all, idempotent writes are designed to make automated retries safe)

Using these instructions, it is now straightforward to put together a little test client that uses idempotent writes to a “test” topic. To try this, bring up the Kafka cluster as in the previous posts, create a topic called “test” with three replicas, navigate to the root of the repository and run

python3 python/

You should see a couple of messages showing the configuration used and indicating that ten records have been written. To verify that these records do actually contain a producer ID and a sequence number, we need to dump the log file on one of the brokers.

vagrant ssh broker1
/opt/kafka/kafka_2.13-2.4.1/bin/ \
  --print-data-log \
  --files /opt/kafka/logs/test-0/00000000000000000000.log

The output should look similar to the following sample output.

Dumping /opt/kafka/logs/test-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 9 count: 10 baseSequence: 0 lastSequence: 9 producerId: 3001 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1589818655781 size: 291 magic: 2 compresscodec: NONE crc: 307611005 isvalid: true
| offset: 0 CreateTime: 1589818655780 keysize: -1 valuesize: 16 sequence: 0 headerKeys: [] payload: {"msg_count": 0}
| offset: 1 CreateTime: 1589818655780 keysize: -1 valuesize: 16 sequence: 1 headerKeys: [] payload: {"msg_count": 1}
| offset: 2 CreateTime: 1589818655780 keysize: -1 valuesize: 16 sequence: 2 headerKeys: [] payload: {"msg_count": 2}
| offset: 3 CreateTime: 1589818655780 keysize: -1 valuesize: 16 sequence: 3 headerKeys: [] payload: {"msg_count": 3}
| offset: 4 CreateTime: 1589818655780 keysize: -1 valuesize: 16 sequence: 4 headerKeys: [] payload: {"msg_count": 4}
| offset: 5 CreateTime: 1589818655780 keysize: -1 valuesize: 16 sequence: 5 headerKeys: [] payload: {"msg_count": 5}
| offset: 6 CreateTime: 1589818655780 keysize: -1 valuesize: 16 sequence: 6 headerKeys: [] payload: {"msg_count": 6}
| offset: 7 CreateTime: 1589818655780 keysize: -1 valuesize: 16 sequence: 7 headerKeys: [] payload: {"msg_count": 7}
| offset: 8 CreateTime: 1589818655780 keysize: -1 valuesize: 16 sequence: 8 headerKeys: [] payload: {"msg_count": 8}
| offset: 9 CreateTime: 1589818655781 keysize: -1 valuesize: 16 sequence: 9 headerKeys: [] payload: {"msg_count": 9}

Here, the third line contains the header of the entire record batch. We see that the batch contains ten records, and we find a producer ID (3001). In each of the records, we also see a sequence number, ranging from 0 to 9.


When you read KIP-98, the Kafka improvement proposal with which idempotent writes where introduced, then you realize that the main objective of this KIP is not just to provide idempotent writes, but to be able to handle transactions in Kafka. Here, handling transactions does not mean that Kafka somehow acts as a distributed transaction manager, joining transactions of a relational database. It does, however, mean that writes and reads in Kafka are transactional in the sense that a producer can write records within a transaction, and consumers will either see all of the records written as part of this transaction or none of them.

This makes it possible to model scenarios that occur quite often in business applications. Suppose, for instance, you are putting together an application handling security deposits. When you sell securities, you produce one record which will trigger the delivery of the securities to the buyer, and a second record that will trigger the payment that you receive for them. Now suppose that the first record is written, and them something goes wrong, so that the second record cannot be written. Without transactions, the first record would be in the log and consumers would pick it up, so that the security side of the transaction would still be processed. With transactions, you can abort the transaction, and the record triggering the security transfer will not become visible for consumers.

We will not go into details about transactions in this post, but KIP-98 is actually quite readable. I also recommend that you take a look at this well written blog post on the Confluent pages that provides some more background and additional links.

With that, it is time to close this short series on Kafka and Python. I hope I was able to give you a good introduction into the architecture and operations of a Kafka cluster and a good starting point for own projects. Happy hacking!

Learning Kafka with Python – a deep dive into consumers and rebalancing

In the previous posts, we have already used the Python client to implement Kafka consumers. Today, we will take a closer look at the components that make up a consumer and discuss their inner workings and how they communicate with the Kafka cluster.

High level overview of the consumer

Our discussion will be based on the Kafka Python library, which seems to be loosely modeled after the Java consumer which is part of the official Apache Kafka project, so that the underlying principles are the same. These notes are based on version 2.0.1 of the library, the design might of course change in future versions (and has already changed substantially in the past).

Looking at the code, we see that roughly speaking, the consumer consists of three parts – the actual consumer in the package kafka.consumer, the coordinator which is responsible for talking to the group coordinator and assign partitions in the package kafka.coordinator and the network client in the top-level package which is used by other parts of the library as well. Broken down to the level of modules and classes, the following diagram shows the most important components of the consumer and their relations.


Let us start our discussion with the class on the left hand side of the diagram, the subscription state. This class is used to manage the topics and partitions a consumer has subscribed to as well as the positions of the consumer within these partitions. Note that these positions are not the committed offsets, but are the positions maintained locally (and in-memory) by the consumer that are used to determine the offset that the next fetch will use. Initially, there is no valid position for a newly assigned partition, and the partition is considered fetchable only once a position has been determined.

The second class which is used by the consumer is the fetcher. As the name suggests, this class is in charge for actually fetching data and offsets from the leader of a partition (here, offsets does not refer to committed offsets, but to the valid offsets, i.e. the first and last offset of a partition).

Fetching records from the partition leader typically works asynchronously. As an example, let us consider the method send_fetches. As indicated above, a partition is called fetchable if there is a valid position for it, the partition has not been paused and there are no unfetched records already present in the cache. After creating a list of all fetchable partitions, the send_fetches method then figures out the partition leader and assembles a fetch request. These requests are then sent to the respective partition leader using the client object. This operation returns a future, i.e. a handle which can be used to asynchronously track the progress of the fetch operation. Attached to this future, there is a callback operation. When the records are sent from the partition leader to the consumer, the client object will invoke this callback which will then add the returned records to a queue maintained by the fetcher. From there, it is retrieved when a consumer calls the method fetched_records.

It is in this function where the positions are actually updated, so that the position really reflects the records that have been consumed, not those which have been received by the fetcher but are still in the queue. Note that records are skipped if a partition has become unfetchable in the meantime or if the offset does not match the expected value in the original request.

The following diagram shows a simplified view of how records are fetched (some important details are skipped, for instance the deserialization that takes place when fetched records are removed from the queue and handed over to the consumer).


Coordinating group membership and partition assignments

Apart from fetching records, a core responsibility of the consumer is to manage the membership in a consumer group and to handle assigned partitions. This is done by the coordinator. The coordinator communicates with the group coordinator (which is one dedicated broker per consumer group) to trigger the addition and removal of group members and to balance partitions between group members. In addition, the coordinator is responsible for managing committed offsets.

Looking at the source code of the coordinator, we can see how the process of adding members to the group and assigning partitions works. This process, commonly referred to as rebalancing, typically starts when a consumer invokes the poll method of the coordinator. When this happens, the coordinator will first check whether it needs to join (or rejoin) the group, for instance because the consumer was just started. If yes, the processing in ensure_active_group will first prepare the join process, for instance by committing all offsets if auto-commit is enabled and calling the revoke method of all registered rebalance listeners (conceptually, when a rebalancing starts, all existing members will loose ownership of previously handled partitions and consequently stop processing records so that the group coordinator can reassign partitions freely – there is an ongoing effort known as cooperative rebalancing with the objective to change this).

We then wait until there are no more in-flight requests to the coordinator, and then send a JoinGroupRequest to the group coordinator. The group coordinator (broker) will wait until all members have handed in their requests (see below for more on the timeline) and then determine one member to be the group leader. As part of the JoinGroupResponse, every consumer will be informed about the newly elected leader. The group leader will then perform the actual assignment of partitions to group members (using a configurable assignor). Then, all group members send another request to the group coordinator, called the SyncGroupRequest. In this request, the group leader will inform the group coordinator about the defined partition assignments, and in the response to this message, the partition assignments will be distributed to all group members.

Once the SyncGroupResponse has been received, the method ensure_active_group will invoke _on_join_complete which will in turn trigger a call of the on_partitions_assigned method of all registered rebalance listeners. Note that at this point, all exceptions raised by the listener are swallowed, so exceptions should be caught and handled inside the listener.

This is all nice if our own consumer joins a group, but what happens if another consumer joins? This is where the heartbeat thread comes into play. This is a thread which is running in the background and periodically sending heartbeat messages to the group coordinator (with a frequency determined by the parameter heartbeat_interval_ms). If a rebalancing has been initiated by another member joining or leaving, the heartbeat response will have an error flag set, so that the consumer learns about the start of the rebalancing process. It then sets a flag, which will be evaluated during the next call of the coordinators poll method, which is in turn invoked from the consumers poll loop. If this flag is set, the coordinator will rejoin the group following the process outlined above.

At this point, timing is vital. If a consumer does not call the poll method for a long period of time, it might miss a rebalancing and will forcefully be removed from the group. This again will lead to errors when the consumer tries to commit offsets, which are difficult to handle and almost inevitably lead to duplicate processing. In general, a consumer should invoke the poll method on a regular basis, and there is again a parameter (max_poll_interval_ms) which determines the maximum allowed time between two subsequent invocations of this method.

Indirectly, this parameter also determines how long the group coordinator will wait for members to join the group (it is sent to the group coordinator as part of the join group request). The following diagram shows the typical sequence of events when a new member joins a group and triggers a rebalancing.


The consumers poll loop

After all these preparations, we are now ready to discuss the poll method of the Kafka consumer. In this method (or rather the private method _poll_once), we first use the coordinator and its poll method discussed above to verify that the consumer is part of a group and has partitions assigned and to trigger a rebalancing process if needed. Note that if a rebalancing is needed, this call will block so that it is made sure that we only reach the main part of the consumers poll method after the rebalancing is done.

Next, we will typically have to update all fetch positions. This happens in several steps.

  • call the method reset_offsets_if_needed of the fetcher. This method will check a flag to see if any offsets need to be reset. If yes, it will retrieve the valid offsets and apply the chosen offset reset strategy
  • if there are still partitions which do not have a valid position, we call the method refresh_committed_offsets_if_needed of the coordinator which will fetch the committed offsets from the group coordinator
  • Then, the method update_fetch_positions of the fetcher is invoked which will set the fetch positions of the partitions in question to the committed value

Back in _poll_once, we then check whether the fetcher has any previously obtained records still in its queue. If yes, we immediately return this data (and at the same time initiate a pre-fetch of the next records). Recall that the process of getting these queued records also triggers the update of the position. Then, new fetches are sent, and we poll the client until we either time out or obtain new records which we then return.

Summarizing, the diagram below displays the (slightly simplified) flow of events in case a consumer calls poll (where some calls indicated in the diagram are not made every time, depending on available fetch positions and committed offsets).


From what we have said above, it is now clear that a rebalancing listener is always invoked from within the poll method – which also implies that you should not spend too much time in a rebalance listener and not make any blocking calls.

This completes our short summary of the processing inside the Kafka consumer. With this introduction and using the Java library and the rich comments inside the code, you should now be able to dig deeper into the bits and pieces if needed.

Learning Kafka with Python – implementing a database sink

Very often, either the source or the target of a Kafka based message queue is a classical relational database. Consuming data and using it to update a database table sounds straightforward, but poses a few challenges around reliability and delivery semantics. In this post, we look into two options to realize such an architecture.

The challenge

To illustrate the problem we are aiming to solve, let us suppose that we want to build an application that maintains an account balance. There is a front-end which acts as a Kafka producer and which a customer can use to either deposit money in the account or withdraw money. These transactions are then written into a Kafka topic, and a consumer reads from this topic and updates the balance kept in a relational datastore.


Thus the messages stored in the Kafka topic contains transactions, i.e. changes in the account balance, while the database table we need to maintain contains the actual balance. This is an example of what is called stream / table duality in the world of streaming – the event stream is the source of truth and reflects changes, the database contains the resulting state of the world after all changes have been applied and can at any time be reconstructed from the stream.

When we want to implement his pattern, the crucial part of our design will be to make sure that every message in the queue leads to only one update of the balance, so that no transaction is missed and no transaction is processed twice.

Pattern 1: using a message ID and de-duplication

The first pattern we could use to make this work is to achieve de-duplication based on a unique message ID, which ideally is an integer that is increased with every message. The consumer could then store the sequence number of the last processed message in the database, and could thus detect duplicates.

In a bit more detail, this would work as follows. When the producer processes an action, it first retrieves a unique message ID. This message ID could be created using a database sequence, or – if the used database does not allow for this – it could read a sequence number from a table, increment it by one and update the table accordingly. It would then add this sequence number as a key to the Kafka message.

The consumer would store the latest processed sequence number in the database. When it reads a mesage from Kafka, it uses this number to check whether the message has already been processed before. If not, it updates balance and sequence number in one transaction and then commits the new offset to Kafka.


Let us see how duplicates are handled in this pattern. If, for some reason, the topic contains two messages with the same ID, the consumer will process the first message, increase the latest processed sequence number in the database and commit the new balance. It will then read the duplicate, compare its sequence number against the latest value, detect the duplicate and simply ignore it. Thus the consumer is able to do a de-duplication based on the sequence number.

Unfortunately, this simple pattern has one major disadvantage – it does not work. The problem is that the order of messages is not guaranteed across partitions. Suppose, for instance, that the producer creates the following messages:

  • Message 1, partition 0
  • Message 2, partition 1
  • Message 3, partition 0

Now it might happen that the consumer processes the messages in the order 1,3,2. Thus the consumer would, after having processed message 3, set the highest consumed sequence number to “3” in the database. When now processing message two, our simple duplicate detection algorithm would then classify this message as a duplicate. Thus, to make this work, it is vital that we store the highest processed sequence number by partition and not across all partitions. We could even create the sequence number per partition, which would also remove a possible bottleneck as creating the sequence number would otherwise effectively serialize the producers.

Also note that, as we need to maintain a “last processed” sequence number per partition in a database, we also need to maintain this table if we add new partitions to our topic or remove partitions.

Alternatively, if there is a message ID which is not ordered and increasing with each message, the consumer could store all processed message IDs in a separate database table to keep track of the messages that have already been processed (which, depending on the throughput, might require some sort of periodic cleanup to avoid that the table grows too big).

If the consumer fails after committing the changed balance to the database, but before committing the offset to Kafka, the same mechanism will kick in, as long as we commit the new balance and the updated value of the last processed message in one database transaction. Thus, duplicates can be detected, and we can therefore rely on the standard mechanisms Kafka offers to manage the offset – we could even read entire batches from the topic and use auto-commit to let Kafka manage the offset.

Let us now try out this pattern with the code samples from my repository. To be able to run this, you will have to clone my GitHub repository and follow the instructions in my initial post in this series to bring up your local Kafka cluster. Then, make sure that your current working directory is the root directory of the repository and run the following commands which will install the Python package to access a MySQL database and bring up a Docker based installation of MySQL with a prepared database.

pip3 install mysql-connector-python

This script will start a Docker container kafka-mysql running MySQL, add a user kafka with a default password to it and create a database kafka for which this user has all privileges. Next, let us run a second script which will (re)-create a Kafka topic transactions with two partitions and initialize the database.


Now we run three Python scripts. The first script is the producer that we have already described, which will simply create ten records, each of which describing a transaction. The second script is our consumer. It will

  • subscribe to the transactions topic
  • Read batches of records from the topic
  • for each record, it will (in one transaction!) update the account balance and the last processed sequence number
  • commit offsets in Kafka after processing a batch
  • apply the duplicate detection mechanism outlined above while processing each record

Finally, the third script is a little helper that will (without committing any offsets, so that we can run it over and over again) scan the topic, calculate the expected account balances, retrieve actual account balances from the database and check whether they coincide.

python3 db/
python3 db/
python3 db/ --check

Let us now try to understand how our scripts work if an error occurs. For that purpose, the consumer has a built-in mechanism to simulate random errors between committing to the database and committing offsets to Kafka which is activated using the parameter –error_probability. Let us repeat our test run, but this time we simulate an error with a probability of 20%.

./db/ && python3 db/
python3 db/ --error_probability=0.2 --verbose
python3 db/ --check

You should now see that the consumer processes a couple of messages before our simulated error kicks in, which will make the consumer stop. The check script should detect the difference resulting from the fact that not all messages have been processed. However, when we now restart the script without simulating any errors, we should see that even though there are duplicate records, these records are properly detected and eventually, all records will be processed and the balances will again be correct.

python3 db/ --verbose
python3 db/ --check

Pattern 2: store the consumer offset in the target database

Let us now take a look at an alternative implementation (which is in fact the pattern which you will hit upon first when consulting the Kafka documentation or other sources) – maintaining the offsets in the database altogether. This implementation does not require a sequence number generated by the producer, but is therefore also not able to detect any duplicate messages in case the duplicates originate already in the producer.

The idea behind this pattern is simple. Instead of asking Kafka to maintain offsets for us, the consumer application handles offsets independently and maintains a database table containing offsets and partitions. When a record is processed, the consumer opens a database transaction, updates the account balance and updates the offset in the same transaction. This guarantees that offsets and balances are always in sync.

This sound simple enough, but there are a few subtleties that need to be kept in mind when this is combined with consumer groups, i.e. if Kafka handles partition assignments dynamically. Whenever a partition is assigned to our consumer, we need to make sure that we position the consumer at the latest committed offset before processing any records. Conversely, when an assignment is revoked, we need to commit the current offsets to make sure that they are not lost. This can be implemented using a rebalance listener which is registered when we subscribe to the topic.


To try this out, run the following commands which will first reset the database and the involved topic, run the producer to create a few messages (this time without the additional sequence number) and then run our new consumer to read the messages and update the database.

./db/ && python3 db/
python3 db/ 
python3 db/ --check

The last command, which again checks the updated database table against the expected values, should again show you that expected and actual values in the database match.

It is instructive to try a few more advanced scenarios. You could, for instance, run the producer and then start a consumer which reads the first set of messages produced by the consumer (use the switch –runtime=3600 to make this consumer run for one hour). Then, start a second consumer in a separate terminal window and observe the rebalancing that occurs. Finally, run the producer again and verify that the partition assignment worked and both consumers are processing the messages in their respective partition. And again, you can simulate errors along the way and see how the consumer behaves.

Learning Kafka with Python – consuming data

We now understand how Kafka producers add data to partitions. So let us move on and take a look at consumers – how they operate, how they are configured and how different levels of reliability and delivery guarantees can be achieved.

Consumer groups

In the previous post on producers, we have seen that the interaction between a producer and a Kafka broker is rather simple. Basically, producers request metadata to obtain data on partitions and leading brokers and then send records to the partition leader. The Kafka broker does not keep track of a producers state, and producers can actually come and go without Kafka even noticing it (this is a bit different when transactions are used, as in this case, the broker needs to keep track of the producers state as well, but this is beyond the scope of this post).

For consumers, the situation is different. The main reason for this different design is that while a producer determines itself to which partition data is written, a consumer typically lets Kafka make this decision. In this programming model, Kafka distributes the available partitions to the available consumers, trying to balance the load evenly. If a new consumer appears, Kafka will assign partitions to it, and if a consumer goes down, Kafka will re-assign these partitions to one of the remaining consumers. To make this work, Kafka needs to keep track of the state of a consumer (in fact, a consumer is expected to send periodic heartbeats so that Kafka can detect when a consumer goes down, and Kafka tracks the state of consumers as part of the ZooKeeper data structures).

To better understand how this works, we first have to understand consumer groups. Logically, a consumer group is very similar to an application – it is a logical entity reading data from a topic. If, for instance, you are using Kafka to distribute instrument master data in a securities processing application, there will typically be different application components that need this data – say a trading frontend, a settlement module or a tax processing module. So each of these application components could be set up as a consumer group in Kafka, so that they all obtain records from the instrument master data topic independently, similar to the pub/sub semantics of a traditional messaging system.

To increase scalability and fault tolerance, there can be many consumers inside a consumer group, but Kafka will try to make sure that within a consumer group, every message is delivered to only one consumer.


To achieve this, Kafka will assign partitions in a topic to consumers within a consumer group. To make sure that each message is only processed once by one consumer group, each partition can be assigned to only one consumer, but if there are more partitions than consumer, a single consumer can read from more than one partition (so scaling the number of consumers beyond the number of partitions will lead to idle consumers).


It is worth mentioning that this is not the only programming model supported by Kafka. Instead of letting Kafka determine the assignment of partitions to consumers, consumers can also subscribe directly to a partition and thus define their own assignment. In this case, consumers need to implement their own mechanisms to detect a change in the number of partitions or to rebalance the load if a consumer goes down. This can, however, be useful if the number of partitions is constant and lost consumers are immediately replaced by some sort of restart mechanism. If you want to take a closer look at how exactly Kafka manages the assignment of partitions to consumers in a group, take a look at this excellent blog post on the Confluent web site or this page on the Confluent Wiki.

Maintaining the offset

The next problem we have to solve is the maintenance of the offset. A traditional messaging system typically makes sure that a message is only delivered once. With Kafka, this task is left to the consumer (this is why some people refer to this model as the “dumb broker – smart consumer” model). In fact, the low level API “FETCH” call of the Kafka protocol expects that a consumer specifies the offset of the record (batch) it wants to read. So the consumer needs to know which offsets is has already processed to make sure that all records are read and that no record is processed twice.

This is not an easy task and subject to race conditions. Suppose, for instance, we decide to store the offset in a separate database, and our processing logic is (in a hopefully readable pseudo-code)

offset = db.read_offset()
while true:
  record = read_record(offset)
  offset = offset + 1

Now suppose that this consumer fails after processing the record, but before writing the updated offset into the database. When we now restart the consumer, it will read the old offset from the database and process the last record twice. If, conversely, we change the order and commit the new offset before processing the record, we would miss a record if the consumer dies between these two steps.

Instead of persisting the offset yourself, you can also ask Kafka to do this for you. When making use of this option, Kafka will store your offset in a dedicated topic. A consumer can either explicitly commit the offset to this topic, or can use auto-commit, which simply means that Kafka will automatically commit every few seconds (which, of course, leads to duplicate processing if this interval is, say, 5 seconds and the consumer dies 4 seconds after the last commit). In a later post, we will look into transactional writes, which even allow exactly-once delivery as long as no other data stores are involved.

Creating and using a KafkaConsumer

Let us now see how we can create and use a consumer with the Python Kafka API and how the consumer is configured.

First, we need to create a consumer object. When creating a consumer, there are three parameters that we need to provide: the topic from which we want to read data, the ID of the consumer group that the consumer is part of (which is an arbitrary string), at least if we plan to use the automatic assignment of partitions and / or we want Kafka to store offsets for us, and a list of bootstrap servers. So a code snippet creating a consumer could be as follows.

import kafka

Once we are done with a consumer. we should always clean up again by calling consumer.close() so that the consumer can properly leave the group.

When using SSL to connect to the broker, you will again have to provide additional parameters when building the consumer, as we have done it for the producer.

As for the producer, a consumer can also be configured with custom deserializers. If, for instance, we use JSON as a serializer, as we have demonstread in the previous post on building a producer, we now need to provide a matching deserializer that converts a byte stream back into the target format used by the application. As for the producer, deserializers for keys and payloads can be supplied using the additional configuration parameters key_deserializer and value_deserializer.

As mentioned above, one option to deal with offsets is to leave the processing to Kafka and to ask Kafka to automatically commit offsets for us. This is in fact the default behavior, and controlled by the following parameters.

  • enable_auto_commit – this is a boolean flag which tells Kafka whether we want to automatically commit offsets, and defaults to true
  • auto_commit_interval_ms – this specifies the interval at which Kafka will commit offsets. The default is five seconds, which implies that in the worst case, the messages processed during the last five seconds will be consumed twice if your consumer fails shortly before a commit
  • auto_offset_reset – this parameter determines from which offset Kafka should start processing if no valid offset can be found. This clearly happens when we start the consumer for the first time, but can also happen if messages are deleted or are lost. If we set this to “earliest”, Kafka will start the processing at the first available offset. If we use “latest”, it will start processing at the end of the log, i.e. with the next message that will be added to the log. The default is “earliest”

Before we can read any data, we have to subscribe to a topic. When creating the consumer, we already refer to a topic, and in fact, the consumer will automatically subscribe to this topic. It is also possible to manually subscribe. This is typically done when you want to add a rebalance listener to be informed about changes in the set of assigned partitions. A rebalance listener is any class derived from kafka.ConsumerRebalanceListener which is passed as argument to the subscribe call. Whenever a partition assignment is made or revoked, Kafka will then call the corresponding method of the listener.


When an application wants to manually store offsets, for instance in a database, it can use this mechanism and / or the method consumer.assignment() to keep track of the records assigned to it. Note that, as explained in the source code comments of the listener class, Kafka will first invoke the on_partitions_revoked method of all listeners before calling any of the on_partitions_assigned methods. These handlers will be invoked from the polling loop, i.e. only when you pass control to the consumer by reading data from it, not in a separate thread (we will learn more about the exact mechanics of this process in a separate future post).

Now let us see how we can actually read data from a topic. The library offers two options to do this. First, we can simply invoke the poll method of the consumer object, which will return a batch of records. Alternatively, and more “pythonish”, we can treat the consumer object as an iterator and simply loop over it to get one record at at time.

Note that some methods of the consumer can block as they are waiting for responses from the server. As in general, consumers should make sure to not block outside of the polling loop, it is not advised to call the consumers methods in separate threads. My experience is that it can lead to problems if a signal handler, for instance, invokes methods of the consumer to shut down the consumer. Instead, it should only set a stop flag, while invoking all methods of the consumer object in the polling loop.

def signal_handler(signal, frame):
  stop = 1

while not stop:
  for record in consumer:

Let us now discuss different options to commit offsets. We have already seen that the default is auto-commit, which implies that Kafka will commit automatically every 5 seconds. When using this option, we can guarantee that all messages will be read at least once, but need to be prepared to receive messages more than once. If we need full control over the process of committing offsets, we need to disable auto-commit by setting enable_auto_commit to false.

At this point, it is important to remember that the Kafka client requests data from the broker in batches. If we ask the client to commit the offset, it will commit the entire batch. It therefore does not make sense to commit once during every loop iteration of the pseudo-code above, but once at the end of the batch. As the iterator interface of the consumer object makes it difficult to determine when a batch has ended, it is easier to use the poll method of the consumer. This method returns a dictionary, where the keys are TopicPartition objects, i.e. named tuples describing a combination of topic and partition, and arrays of records.

When using manual commits, we again have several choices. First, we can commit after every record. In this way, we will have at most one duplicate in case of an error, but create an additional overhead and reduce our throughput significantly. Alternatively, we can use a batch size greater than one and commit after each batch. This will be more efficient, but if a the processing fails in the middle of the batch, we will re-read the first few records in the batch when we restart and thus process records twice.

Trying it out

Let us now see how this works in practice. If you have cloned my GitHub repository and installed Kafka as described in my previous post, you are ready to run some examples that are part of the repository and located in the python subdirectory. First let us delete and re-create the topic that we have already used for our producer tests by running the following commands on the lab PC (after changing to the repository root directory)

./kafka/bin/ \
  --bootstrap-server=$(python/ \
  --command-config ./.state/ \
  --topic test \
./kafka/bin/ \
  --bootstrap-server=$(python/ \
  --command-config ./.state/ \
  --topic test \
  --create \
  --partitions 2 \
  --replication-factor 3

Next, we can create 10 messages in this topic by running the producer that we have already used in the previous post.

python3 python/ --create_keys

We can now run our consumer to read the messages that we have just written. To do this, simply enter

python3 python/ 

Looking at the output, we see that the first attempt to poll triggers a partition reassignment. First, the coordinator will revoke the existing group assignments for all group members. Then it will assign the existing two partitions to our consumer, as this is the only consumer in the group, so that our listener is called. As this is the first read, there are no committed offsets yet, and as we have set auto_offset_reset to “earliest”, we start our read at position zero (the first offset).

We now start to read records from the log. In a second terminal window, we can inspect the currently stored offsets.

./kafka/bin/ \
  --bootstrap-server=$(python/ \
  --command-config ./.state/ \
  --group test-group \

We should now see that Kafka has assigned our consumer to the two partitions and has recorded the updated offsets. As we have read every record once, the offsets should now be identical to the last read position. If you run this command quickly after starting the consumer, you should even be able to see that the automated commit only takes place after a couple of seconds.

When you now stop the consumer by hitting Ctrl-C and run it again, it will not print any new records, as it will restart at the committed offsets. To re-read our messages, we will have to reset the offsets. There are two ways to do this. You can either run

./kafka/bin/ \
  --bootstrap-server=$(python/ \
  --command-config ./.state/ \
  --group test-group \
  --topic test \
  --reset-offsets \
  --to-earliest \

or use our consumer, which has a switch –reset instructing it to only reset the offsets without reading any records. In both cases, we should now be ready for another test. This time, we disable auto-commit and use manual commits.

python3 python/ --disable_auto_commit

You should now see that the messages are processed once again, and that the offsets will again be committed, though this is triggered by our explicit calls to consumer.commit() this time.

Next, let us try what happens if we do not commit any offsets at all. Our test client supports this by setting the flag –no_commit

python3 python/ --reset
python3 python/ --no_commit
python3 python/ --no_commit

As expected, the second and third invocation both return the full set of data, as the offsets are never committed and the third invocation therefore starts at the same point at which the second invocation started.

Finally, it is instructive to see how several consumer interact. To set this, first reset all offsets again. Then, open a second terminal, start the consumer in the first terminal and then start a second consumer in the second window. The output should show you that

  • Initially, the first consumer will start to process both partitions
  • When the second consumer is started, the partitions will be revoked, and the corresponding listener is called for both consumers
  • Then, each of the two partitions will be assigned to one of the two consumers

Both consumers should now wait for data on their respective partition. If you now run the producer again to generate ten additional messages, you should nicely see that both consumers receive messages for their respective partitions in parallel.

This completes our discussion of consumers for the time being. There are a couple of points that we have not yet explored (like the manual assignment of partitions, different options for timeouts, the heartbeat thread which periodically sends a hearbeat to the Kafka group coordinator or consumers without consumer groups), but most of this is readily accessible in the Kafka documentation. In the next post, we will look at some patterns to read data from a Kafka topic and use it to maintain state in a relational database.

Learning Kafka with Python – producing data

As proud owners of a brand new Kafka installation, we are now ready to explore how applications interact with Kafka. Today, we will look at producers and understand how they write data to Kafka.

Basic design considerations

At first glance, writing data to Kafka sounds easy – connect to a Kafka broker and submit a message. However, there are some basic design considerations that are relevant when building a Kafka producer.

First, we have seen that Kafka stores the data in a topic in multiple partitions, and then each partition has a leader which is responsible for writing messages into the partition. Thus a producer needs to determine to which partition a message should be written and contact the responsible leader for this partition.

Defining the mapping of messages to partitions can be crucial for reliability and scalability of your application. Partitions determine how the application can scale horizontally, and we will learn later that partitions also determine to which extent consumers can scale. In addition, Kafka guarantees message order only within a partition. Specifically, if message A is written to partition before message B is written to the same partition, message A will receive a lower offset than B and will be read first by a (well behaving) consumer. This is no longer true if messages A and B are written to different partitions. Think of partitions as lanes on a highway – there is no guarantee that two cars entering the highway in a certain order but in different lanes will arrive at the destination in the same order.

Often, you will want to use a business entity to partition your data. If you are building a customer facing application, you might want to partition your data by customer group, if you are building a securities processing application the financial instrument might be a good partition criterion, if you are maintaining accounts then the account number might be a good choice and so forth. In other cases, where ordering is not important, you might go for a purely technical criterion.

The next fundamental question we have to figure out is when a message is considered to be successfully written. When the broker has received it? Or when the leader has written the message? Or should we wait until all followers have successfully stored the message? And what if a follower lags behind – should we stop writing messages until the follower has recovered or move on, accepting that we have lost one follower without knowing whether it will recover at a later time?

Kafka does not give a definitive answer to all these questions, but leaves you a choice – put differently, when you create a producer, you can specify its behavior using a variety of options. So let us now see how this is done in Python.

The producer object

Let us now see how a producer can be created using Python. If you have not yet done so, please install the Kafka Python library to be able to run the examples.

pip3 install kafka-python

This series uses version 2.0.1 of the library, if you want to use exactly that version you need to specify that as usual, i.e. run

pip3 install kafka-python==2.0.1

To send messages to Kafka, the first thing we need to do is to create a producer object, i.e. an instance of the class kafka.Producer. The init-method of this class accepts a large number of arguments, but in the most straightforward case, there is exactly one argument bootstrap_servers. This argument is a list of listener URLs, for instance, which the producer will use to make an initial connection to a Kafka broker. This list does not need to contain all brokers, in fact one entry will do, but the producer will use this broker to obtain other brokers if needed. It is a good idea to list at least two or three brokers here, in case one broker is temporarily unavailable. So creating a producer could look like this.

import kafka
producer=kafka.KafkaProducer(bootstrap_servers=["broker1:9092", "broker2:9092"])

When started, a Producer will create a separate sender thread which will asynchronously send messages to the brokers. In addition, it will create an internal client which holds the actual connections to the Kafka cluster.

When using an SSL listener, we need a few additional configuration items. Specifically, we need to add the following named parameter when creating a KafkaProducer

  • ssl_cafile – this is the location of a CA certificate that the client will use to verify the certificate presented by the server
  • ssl_certfile – this is the location of the client certificate that the client will in turn present to the server when the server requests a certificate
  • ssl_keyfile – the key matching the client certificate

In order to be bit more flexible when it comes to connecting to different setups, the code examples that we will use in this series read the list of brokers and the SSL configuration from a YAML file config.yaml that the installation script will create in the subdirectory .state of the repository directory. All test scripts accept a parameter –config that you can use to overwrite this default location, in case you want to use your own configuration.

Once we are done using a producer, we should close it using producer.close() to clean up.

Keys and partitions

Once we have a producer in our hands, we can actually start to send messages. This requires only two parameters: the topic (a string) and the payload of the message (a sequence of bytes).

producer.send("test",value=bytes("hello", "utf-8"))

Note that this will create a topic “test” if it does not exist yet, using default values specified in the server configuration (, so be careful to use this as the default configuration might not be what you want (you can also turn this feature off by setting auto.create.topics.enable to false in

Now you might remember from my previous post that a record in Kafka actually consists of a payload and a key. Here, we do not specify a key, so the key will remain empty. But of course, you can define a key for your record by simply adding the named parameter key to the method invocation, like this.

        value=bytes("hello", "utf-8"),
        key=bytes("mykey", "utf-8"))

What about partitions? The low-level protocol that a Kafka broker understands expects the client to send a PRODUCE request containing a valid partition ID, so it is up to the client to take this decision. The application programmer can either decide to explictly specify a partition ID (an integer) as an optional parameter to the send method, or let the framework take the decision. In this case, a so-called partitioner is invoked which, based on the value of the key, selects a partition to write to.

An application can set the configuration item partitioner when creating a producer to define a customer partitioner (which is simply a callable object that the producer will invoke). If no partitioner is specified, the default partitioner will be used, which implements the following logic.

  • If no key is given, the default partitioner will simply distribute the messages randomly across the available partitions
  • If a key is provided, a hash value of the key will be computed (using a so-called MurmurHash, which will always be an integer. The value of this hash (more precisely, of its last 31 bits) modulo the number of partitions will then determine the partition to use

The important thing to keep in mind is that if you do not provide a key, your message will end up in a random partition. If you do provide a key, then Kafka will guarantee that messages with the same key will go to the same partition and hence be processed in order.


In our examples so far, we have passed a sequence of bytes to the send method, both for the key and the value. This is the format that the low-level protocol expects – at the end of the day, keys and values are sent over the wire as a sequence of bytes, and stored as a sequence of bytes.

In many applications, however, you will want to store more complex data types, like JSON data or even objects. So be able to do this with Kafka, you will have to convert your data into a sequence of bytes when sending the data, a process known as serializing.

When creating the producer, you can specify your own serializers for keys and payloads by adding the named parameters key_serializer and value_serializer when creating the producer. Here, a serializer can either be a function which accepts whatever input format you prefer and returns a sequence of bytes, or an instance of the class kafka.serializer.Serializer which has a serialize method which the framework will invoke.

Suppose for instance you wanted to serialize JSON data. Then, you need to provide a serializer which accepts a JSON object and returns a sequence of bytes. For that purpose, we can use the standard json.dumps method to first produce a string, and then encode the string using e.g. UTF-8 to obtain a sequence of bytes. Thus your serializer would look something like

def serialize(data):
    return bytes(json.dumps(data), "utf-8")

and when creating the producer, the call would be something like

   value_serializer=serialize, ...)

Choosing a reasonable serializer is an important design choice. As Kafka topics are designed to be durable objects, you need to think about things like versioning when the decoding changes as you release new features, and obviously all components of a system need to use matching serializers and de-serializers to be able to exchange data. Many Kafka projects actually use third-party serializers, like Apache Avro or Google’s protobuf.


So far, we have seen how we can send messages using the send method of a KafkaProducer object. But in reality, you of course want to know whether your message was successfully send and stored by Kafka.

This leads us to the question at which point a new record can be considered to be committed to a Kafka cluster, i.e. stored and available for consumers. Before getting into this, however, we first have to understand the notion of an in-sync replica.

Recall that Kafka replication works by designating a leader for a partition and zero, one or more followers which constantly ask the leader for new records in the partition and store them in their own copy of the partition log. As the replicas read the records from the leader, the leaders knows which record has been delivered to which follower. The partition can therefore determine whether a replica is out-of-sync, which happens if a follower fails to retrieve the latest message within a defined time frame, or in-sync.

Having enough in-sync replicas is vital for the reliability. If a leader goes down, Kafka has to elect a new leader from the set of available replicas. Of course, choosing an out-of-sync replica to be the new leader would imply that we promote a replica to the master and thus to our new source of truth that not yet replicated all messages that producers have sent to the leader. Thus, making such a replica the new leader results in a loss of records. In some situations, you might still opt to do so, which Kafka allows you if the parameter unclean.leader.election.enable is set to True in the broker configuration.


Now let us come back to the question of when a message sent by a producer will be considered committed. Again, Kafka offers you a choice, governed by the value of the parameter acks of a producer.

  • When acks = 1 (the default), a message will be considered committed once the leader acknowledges the message. Thus Kafka guarantees that a committed message has been added to the leading partition, but not that it has already been written to one or even all replicas. Note that, as the leader might cache the record in memory, this can lead to data loss if the leader goes down after acknowledging the message, but before a follower has copied it
  • When acks = -1 (all), a record will be considered committed only once the leader and in addition all in-sync replicas have acknowledged receipt of the message. As long as you have enough in-sync replicas, this gives you a strong guarantee that the message is available on several nodes and thus data loss has become very unlikely
  • Finally, a value of acks = 0 means that the message will be considered committed once it has been sent over the network, regardless of any acknowledgement from the leader or a follower. This obviously is a very weak guarantee and only reasonable if you have a strong focus on throughtput and can live with a loss of (potentially many) records

When usings acks = all, the number of in-sync replicas is of course vital. To illustrate this, let us assume that you have configured a topic with three replicas, but both followers have become out-of-sync. Now a message will be committed once the leader has written it, meaning that if the leader is lost, the record will be lost as well. To avoid such a scenario, you can set the server property min.insync.replicas. This number determines how many replicas (including the leader) need to be in-sync in order to still accept new messages. Thus if you use, for instance, a topic with a replication factor of three and min.insync.replicas=2 in combinations with acks=-1, then Kafka will guarantee that a message is only reported as committed once the leader and at least one follower have received the record.


Finally, there is one more parameter that is important for the reliability of a producer – max_in_flight_requests_per_connection. A request (which typically contains more than one record to be written) will be considered as in-flight as long as no result – either an acknowledgement or an error – has been received from the broker. If this parameter (which defaults to 5!) is set to a value greater than one, this implies that the producer will not wait for an acknowledgement before sending the next batch. In combination with retries, this can imply that the order in which messages are added to the log is not identical to the order in which they have been sent, and if the producer goes down, in-flight messages night be lost if the broker is not able to process them. Thus set this to one if you need strong guarantees on at-least-one delivery and ordering.

Retries and error handling

Finally, the last important design decision that you need to take when writing a producer is how to deal with errors.

The send method that we are using delivers messages asynchronously to the actual sender and immediately returns. To figure out whether the record was successfully committed, we therefore cannot simply use its return value, but need a different approach. Therefore, the send method returns a handler which can later be used to retrieve the status of the message, a so called Future, or, more precisely, a subclass called FutureRecordMetadata. Once we have this object, we can call its get method with a timeout in seconds to wait for the request to complete. If the request was successful, this method returns a dictionary containing record metadata, otherwise it raises an exception of type kafka.errors.KafkaError. Alternatively, you can also specify callback functions for successful and failed sends.

Note that the producer also has an option to automatically retry failed messages, which can be configured by setting the parameter retries to a value different from zero. In general, however, you should be careful with this as it might conflict with ordering, see the discussion of in-flight requests above.

Testing our producer

After all this theory, it is now time to test our producer. I assume that you have followed my previous post and installed Kafka in three virtual nodes on your PC. Now navigate to the root of the repository and run the following command to create a test topic.

./kafka/bin/ \
  --bootstrap-server=$(./python/ \
  --command-config=.state/ \
  --create \
  --topic test \
  --replication-factor 3 \
  --partitions 2 

Let us see what this command is doing. The script that we run,, is part of the standard Kafka admin command line tools that are bundled with the distribution. In the second line, we invoke a little Python script that evaluates the configuration in YAML format which our installation procedure has created to determine the URL of a broker, which we then pass to the script. In the third line, we provide a Java properties file containing the SSL parameters to connect to our secured listener.

The remaining switches instruct the tool to create a new topic called “test” with a replication factor of three and two partitions (which, of course, will fail if you have already created this topic in the previous post).

Next, we will run another tool coming with Kafka – the console consumer. This is a simple consumer that will simply subscribe to a topic and dump all records in this topic to the console. To run it, enter

kafka/bin/   \
   --bootstrap-server $(./python/   \
   --consumer.config .state/ \
   --from-beginning \
   --topic test 

Now open an additional terminal, navigate to the root of the repository and run the producer.

python3 python/

This should print the producer configuration used and the number of messages produced, plus timestamps and the number of seconds and microseconds it took to send all messages. In the first terminal window, in which the consumer is running, you should then see this messages flicker by.

Now let us try out a few things. First, let us create 10000 messages with a set of configurations promising the highest throughput (acks=0, fully asynchronous send, no keys provided, five requests in flight).

python3 python/ \
  --messages=10000 \

On my PC, producing these 10000 messages takes roughly half a second, i.e. our throughput is somewhere around 20.000 messages per seconds, without any tuning (of course the results will heavily depend on the machine on which you are running this). Next, we produce again 10000 messages, but this time, we use very conservative settings (acks=all, only one message in flight, wait for reply after each request, create and store keys and use them to determine the partition).

python3 python/ \
  --messages=10000 \
  --ack=-1 \
  --max_in_flight_requests_per_connection=1 \
  --wait \

Obviously, this will be much slower. On my PC, this took roughly 17 seconds, i.e. it is slower by a factor of about 25 than the first run. This hopefully illustrates nicely that Kafka leaves you many choices for trade-offs between performance and availability. Use this freedom with care and make sure you understand the consequences that the various settings have, otherwise you might loose data!

Putting it all together – the send method behind the scenes

Having seen a producer in action, it is instructive to take at a short look at the source code of the Python implementation of KafkaProducer, specifically at its send method. First, the partitions of the topic are retrieved, either from cached metadata or by requesting updated metadata from the server. Then, the key and value are serialized, and the partitioner is invoked which determines the partition to which we write the record.

Next, instead of directly sending the record to the broker, it is appended to an internal buffer using an internal helper class called a RecordAccumulator. If the accumulator signals that the buffer is full, the sender thread is triggered which will then actually transmit the entire batch to the Kafka broker. Finally, the future object is returned.


This completes our discussion of Kafka producers. In the next post, we will learn how we can consume data from Kafka and how consumers, producers and brokers play together.