Asynchronous I/O with Python part II – iterators and generators

As explained in my previous post, historically coroutines in Python have evolved from iterators and generators, and understanding generators is still vital to understanding native coroutines. In this post, we take a short tour through iterators in Python and how generators have traditionally been implemented.

Iterables and iterators

In Python (and in other programming languages), an iterator is an object that returns a sequence of values, one at a time. While in languages like Java, iterators are classes implementing a specific interface, Python iterators are simply classes that have a method __next__ which is supposed to either return the next element from the iterator or raise a StopIteration exception to signal that no further elements exist.

Iterators are typically not created explicitly, but are provided by factory classes called iterables. An iterable is simply a class with a method __iter__ which in turn returns an iterator. Behind the scenes, iterables and iterators are used when you run a for-loop in Python – Python will first invoke the __iter__ of the object to which you refer in the loop to get an iterator and then call the __next__ method of this iterator once for every iteration of the loop. The loop stops when a StopIteration is raised.

This might sound a bit confusing, so let us look at an example. Suppose you wanted to build an object which – like the range object – allows you to loop over all numbers from 0 to a certain limit. You would then first write a class that implements a method __next__ that returns the next value (so it has to remember the last returned value), and then implement an iterable returning an instance of this class.

class SampleIterator:

    def __init__(self, limit):
        self._position = 0
        self._limit = limit

    def __next__(self):
        if self._position < self._limit:
            self._position += 1
            return self._position - 1
        else:
            raise StopIteration

class SampleIterable:

    def __init__(self, limit):
        self._limit = limit

    def __iter__(self):
        return SampleIterator(self._limit)


myIterable = SampleIterable(10)
for i in myIterable:
    print("i = %d" % i)

Often, the same object will implement the __next__ method and the __iter__ method and therefore act as iterable and iterator at the same time.

Note that the iterator typically needs to maintain a state – it needs to remember the state after the last invocation of __next__ has completed. In our example, this is rather straightforward, but in more complex siutations, programmatically managing this state can be tricky. With PEP-255, a new approach was introduced into Python which essentially allows a programmer to ask the Python interpreter to take over this state management – generators.

Generators in Python

The secret sauce behind generators in Python is the yield statement. This statement is a bit like return in that it returns a value and the flow of control to the caller, but with the important difference that state of the currently executed function is saved by Python and the function can be resumed at a later point in time. A function that uses yield in this way is called a generator function.

Again, it is instructive to look at an example. The following code implements our simple loop using generators.

def my_generator(limit=5):
    _position = 0
    while _position < limit:
        yield _position 
        _position += 1

for i in my_generator(10):
    print("i = %d" % i)

We see that we define a new function my_generator which, at the first glance, looks like an ordinary function. When we run this function for the first time, it will set a local variable to set its current position to zero. We then enter a loop to increase the position until we reach the limit. In each iteration, we then invoke yield to return the current position back to the caller.

In our main program, we first call my_generator() with an argument. As opposed to an ordinary function, this invocation does not execute the function. Instead, it evaluates the argument and builds and returns an object called a generator object. This object is an iterator, i.e. it has a __next__ method. When this method is called for the first time, the execution of our function body starts until it hits the first yield statement. At this point, the execution returns to the caller and whatever we yield is returned by the call to __next__. When now __next__ is invoked again, the Python interpreter will restore the current state of the function and resume its execution after the yield. We increase our internal position, enter the loop again, hit the next yield and so forth. This continues until the limit is reached. Then, the function returns, which is equivalent to raising a StopIteration and signals to the caller that the iterator is exhausted.

Instead of using the for loop, we can also go through the same steps manually to see how this works.

generator = my_generator(5)
while True:
    try:
        value = generator.__next__()
        print("Value: %d" % value)
    except StopIteration:
        break

This is already quite close to the programming model of a co-routine – we can start a coroutine, yield control back to the caller and resume execution at a later point in time. However, there are a few points that are still missing and that have been added to Python coroutines with additional PEPs.

Delegation to other coroutines

With PEP-380, the yield from statement was added to Python, which essentially allows a coroutine to delegate execution to another coroutine.

A yield from statement can delegate either to an ordinary iterable or to another generator.

What yield from is essentially doing is to retrieve an iterator from its argument and call the __next__ method of this iterator, thus – if the iterable is a generator – running the generator up to the next yield. Whatever this yield returns will then be yielded back to the caller of the generator containing the yield from statement.

When I looked at this first, I initially was under the impression that if a generator A delegates to generator B by doing yield from B, and B yields a value, control would go back to A, similar to a subroutine call. However, this is not the case. Instead of thinking of a yield from like a call, its better to think of it like a jump. In fact, when B yields a value, this value will be returned directly to the caller of A. The yield from statement in A only returns when B either returns or raises a StopIteration (which is equivalent), and the return value of B will then be the value of the yield from statement. So you might think of the original caller and A as being connected through a pipe through which yielded values are sent back to the caller, and if A delegates to B, it also hands the end of the pipe over to B where it remains until B returns (i.e. is exhausted in the sense of an iterator).

YieldFrom

Passing values and exceptions into coroutines

We have seen that when a coroutine executes a yield, control goes back to the caller, i.e. to the code that triggered the coroutine using __next__, and when the coroutine is resumed, its execution continues at the first statement after the yield. Note that yield is a statement and takes an argument, so that the coroutine can hand data back to the caller, but not the other way round. With PEP-342, this was changed and yield became an expression so that it actually returns a value. This allows the caller to pass a value back into the generator function. The statement to do this is called send.

Doing a send is a bit like a __next__, with the difference that send takes an argument and this argument is delivered to the coroutine as result of the yield expression. When a coroutine runs for the first time, i.e. is not resumed at a yield, only send(None) is allowed, which, in general, is equivalent to __next__. Here is a version of our generator that uses this mechanism to be reset.

def my_generator(limit=5):
    _position = 0
    while _position < limit:
        cur = _position
        val = yield cur 
        if val is not None:
            # 
            # We have been resumed due to a send statement. 
            #
            _position = val
            yield val
        else:
            _position += 1

We can now retrieve a few values from the generator using __next__, then use send to set the position to a specific value and then continue to iterate through the generator.

generator = my_generator(20)
assert 0 == generator.__next__()
assert 1 == generator.__next__()
generator.send(7)
assert 7 == generator.__next__()

Instead of passing a value into a coroutine, we can also throw an exception into a coroutine. This actually quite similar to the process of sending a value – if we send a value into a suspended coroutine, this value becomes visible inside the coroutine as the return value of the yield at which the coroutine is suspended, and if we throw an exception into it, the yield at which the coroutine is suspended will raise this exception. To throw an exception into a coroutine, use the throw statement, like

generator = my_generator(20)
assert 0 == generator.__next__()
generator.throw(BaseException())

If you run this code and look at the resulting stack trace, you will see that in fact, the behavior is exactly as if the yield statement had raised the exception inside the coroutine.

The generator has a choice whether it wants to catch and handle the exception or not. If the generator handles the exception, processing continues as normal, and the value of the next yield will be returned as result of throw(). If, however the generator decides to not handle the exception or to raise another exception, this exception will be passed through and will show up in the calling code as if it had been raised by throw. So in general, both send and throw statements should be enclosed in a try-block as they might raise exceptions.

Speaking of exceptions, there are a few exceptions that are specific for generators. We have already seen the StopIteration exception which is thrown if an iterator or generator is exhausted. A similar exception is GeneratorExit which can be thrown into a generator to signal that the generator should complete. A generator function should re-raise this exception or raise a StopIteration so that its execution stops, and the caller needs to handle the exception. There is even a special method close that can be used to close a coroutine which essentially does exactly this – it throws a GeneratorExit into the coroutine and expects the generator to re-raise it or to replace it by a StopIteration exception which is then handled. If a generator is garbage-collected, the Python interpreter will execute this method.

This completes our discussion of the “old-style” coroutines in Python using generator functions and yielding. In the next post, we will move on to discuss the new syntax for native coroutines introduced with Python 3.5 in 2015.

Asynchronous I/O with Python part I – the basics

Though not really new, a programming model commonly known as asynchronous I/O has been attracting a lot of attention over the last couple of years and even influenced the development of languages like Java, Go or Kotlin. In this and the next few posts, we will take a closer look at this model and how it can be implemented using Python.

What is asynchronous I/O?

The basic ideas of asynchronous I/O are maybe explained best using an example from the world of networking, which is at the same time the area where the approach excels. Suppose you are building a REST gateway that accepts incoming connections and forwards them to a couple of microservices. When a new client connects, you will have to make a connection to a service, send a request, wait for the response and finally deliver the response back to the client.

Doing this, you will most likely have to wait at some points. If, for instance, you build a TCP connection to the target service, this involves a handshake during which you have to wait for network messages from the downstream server. Similarly, when you have established the connection and send the request, it might take some time for the response to arrive. While this entire process is n progress, you will have to maintain some state, for instance the connection to the client which you need at the end to send the reply back.

If you do all this sequentially, your entire gateway will block while a request is being processed – not a good idea. The traditional way to deal with this problem has been to use threads. Every time a new request comes in, you spawn a thread. While you have to wait for the downstream server, this thread will block, and the scheduler (the OS scheduler if you use OS-level threads or some other mechanism) will suspend the thread, yield the CPU to some other thread and thus allow the gateway to serve other requests in the meantime. When the response from the downstream server arrives, the thread is woken up, and, having saved the state, the processing of the client’s request can be completed.

This approach works, but, depending on the implementation, creating and running threads can create significant overhead. In addition to the state, concurrently managing a large number of threads typically involves a lot of scheduling, locking, handling of concurrent memory access and kernel calls. This is why you might try a different implementation that entirely uses user-space mechanism.

You could, for instance, implement some user-space scheduler mechanism. When a connection is being made, you would read the incoming request, send a connection request (a TCP SYN) to the downstream server and then voluntarily return control to the scheduler. The scheduler would then monitor (maybe in a tight polling loop) all currently open network connections to downstream servers. Once the connection is made, it would execute a callback function which triggers the next steps of the processing and send a request to the downstream server. Then, control would be returned to the scheduler which would invoke another callback when the response arrives and so forth.

With this approach, you would still have to store some state, for instance the involved connections, but otherwise the processing would be based on a sequence of individual functions or methods tied together by a central scheduler and a series of callbacks. This is likely to be very efficient, as switching between “threads” only involves an ordinary function call which is much cheaper than a switch between two different threads. In addition, each “thread” would only return control to the scheduler voluntarily, implementing a form of cooperative multitasking, and can not be preempted at unexpected points. This of course makes synchronization much easier and avoids most if not all locking, which again removes some overhead. Thus such a model is likely to be fast and efficient.

On the downside, without support from the used programming language for such a model, you will easily end up with a complex set of small functions and callbacks, sometimes turning into a phenomenon known as callback hell. To avoid this, more and more programming languages offer a programming model which supports this approach with libraries and language primitives, and so does Python.

Coroutines and futures

The model which we have described is not exactly new and has been described many years ago. In this model, processing takes place in a set of coroutines. Coroutines are subroutines or functions which have the ability to deliberately suspend their own execution – a process known as yielding. This will save the current state of the coroutine and return control to some central scheduler. The scheduler can later resume the execution of the coroutine which will pick up the state and continue to run until it either completes or yields again (yes, this is cooperative multitasking, and this is where the name – cooperative routines – comes from).

Coroutines can also wait for the result of a computation which is not yet available. Such a result is encapsulated in an object called a future. If, for instance, a coroutine sends a query to a downstream server, it would send the HTTP request over the network, create a future representing the reply and then yield and wait for the completion of this future. Thus the scheduler would gain back control and could run other coroutines. At the same time, the scheduler would have to monitor open network connections, and, when the response arrives, complete the future, i.e. provide a value, and reschedule the corresponding coroutine.

FuturesCoroutinesScheduler

Finally, some additional features would be desirable. To support modularization, it would be nice if coroutines could somehow call each other, i.e. if a coroutine could delegate a part of its work to another coroutine and wait for its completion. We would probably also want to see some model of exception handling. If, for instance, a coroutine has made a request and the response signals an error, we would like to see a way how the coroutine learns about this error by being woken up with an exception. And finally, being able to pass data into an already running coroutines could be beneficial. We will later see that the programming model that Python implements for coroutines supports all of these features.

Organisation of this series

Coroutines in Python have a long history – they started as support for iterators, involved into what is today known as generator-based coroutines and finally turned into the native coroutines that Python supports today. In addition, the asyncio library provides a framework to schedule coroutines and integrate them with asynchronous I/O operations.

Even today, the implementation of coroutines in Python is still internally based on iterators and generators, and therefore it is still helpful to understands these concepts, even if we are mainly interested in the “modern” native coroutines. To reflect this, the remaining posts in this series will cover the following topics.

  • Iterators and generator-based coroutines
  • Native coroutines
  • The main building blocks of the low-level asyncio API – tasks, futures and the event loop
  • Asynchronous I/O and servers
  • Building an asynchronous HTTP server from scratch

To follow the programming examples, you will need a comparatively new version of Python, specifically you will need Python 3.7 or above. In case you have an older version, either get the latest version from the Python download page and build it from source, or (easier) try to get a more recent package for your OS (for Ubuntu, for instance, there is the deadsnake PPA that you can use for that purpose).

Learning Kafka with Python – retries and idempotent writes

In the past few posts, we have discussed approaches to implement at-least-once processing on the consumer side, i.e. mechanisms that make sure that every record in the partition is only processed once. Today, we will look at a similar problem on the producer side – how can we make sure that every record is written into the partition only once? This sounds easy, but can be tricky if we need to retry failed message without knowing the exact error that has occured.

The retry problem

In the sample producer that we have looked at in a previous post, we missed an important point – error handling. The most important error that a reliable producer needs to handle is an error when handing over a new record to the broker.

In general, Kafka differentiates between retriable errors, i.e. transient errors like individual packets being lost on the network, and non-retriable errors, i.e. errors like an invalid authorization for which a retry does not make sense. For most transient errors, the client will – under the hood – automatically attempt a retry if a record could not be sent.

Let us take a short look at the Java producer as an example. When a batch of records has been sent to the broker as a ProduceRequest, the response is handled in the method handleProduceResponse. Here, a decision is made whether an automatic retry should be initiated, in which case the batch of records will simply be added to the queue of batches to be sent again. The logic to decide when a retry should be attempted is contained in the method canretry, and in the absence of transactions (see the last section of this post), it will decide to retry if the batch has not timed-out yet (i.e. has been created more than delivery.timeout.ms before), the error is retriable and the number of allowed retries (set via the parameter retries) has not yet been reached. Examples for retriable exceptions are exceptions due to a low number of in-sync replicas, timeouts, connection failures and so forth.

This is nice, but there is a significant problem when using automated retries. If, for instance, a produce request times out, it might very well be that this is only due to a network issue and in the background, the broker has actually stored the record in the partition log. If we retry, we will simply send the same batch of records again, which could lead to duplicate records in the partition log. As these records will have different offsets, there is no way for a consumer to detect this duplicate. Depending on the type of application, this can be a major issue.

If you wanted to solve this on the application level, you would probably set retries to zero, implement your own retry logic and use a sequence number to allow the consumer to detect duplicates. A similar logic referred to as idempotent writes has been added to Kafka with KIP-98 which was implemented in release 0.11 in 2016.

What are idempotent writes?

Essentially, idempotent writes use a sequence number which is added to each record by the producer to allow the broker to detect duplicates due to automated retries. This sequence number is added to a record shortly before it is sent (more precisely, a batch of records receives a base sequence number, and the sequence number of a record is the base sequence number plus its index in the batch), and if an automated retry is made, the exact same batch with the same sequence number is sent again. The broker keeps track of the highest sequence number received, and will not store any records with a sequence number smaller than or equal to the currently highest processed sequence number.

To allow all followers to maintain this information as well, the sequence number is actually added to the partition log and therefore made available to all followers replicating the partitions, so that this data survives the election of a new partition leader.

In a bit more detail, the implementation is slightly more complicated than this. First, it would imply a high overhead to maintain a globally unique sequence number across all producers and partitions. Instead, the sequence number is maintained per producer and per partition. To make this work, producers will be assigned a unique ID called the producer ID. In fact, when a producer that uses idempotent writes starts, it will send an InitPidRequest to the broker. The broker will then assign a producer ID and return it in the response. The producer stores the producer ID in memory and adds it to all records being sent, so that the broker knows from which producer a record originates. Similar to the sequence number, this information is added to the records in the partition log. Note, however, that neither the producer ID nor the sequence number are passed to a consumer by the consumer API.

How does the broker determine the producer ID to be assigned? This depends on whether idempotent writes are used in combination with transactions. If transactions are used, we will learn in the next post that applications need to define an ID called transaction ID that is supposed to uniquely identify a producer. In this case, the broker will assign a producer ID to each transaction ID, so that the producer ID is effectively persisted across restarts. If, however, idempotent writes are used stand-alone, the broker uses a ZooKeeper sequence to assign a sequence number, and if a producer is either restarted or (for instance due to some programming error) sends another InitPidRequest, it will receive a new producer ID. For each new partition assigned to a producer not using transactions, the sequence number will start again at zero, so that the sequence number is only unique per partition and producer ID (which is good enough for our purpose).

Another useful feature of idempotent writes is that a Kafka broker is now able to detect record batches arriving in the wrong order. In fact, if a record arrives whose sequence number is higher than the previously seen sequence number plus one, the broker assumes that records got lost in flight or we see an ordering issue due to a retry and raises an error. Thus ordering is now guaranteed even if we allow more than one in-flight batch.

Trying it out

Time again to try all this. Unfortunately, the Kafka Python client that we have used so far does not (yet) support KIP-98. We could of course use a Java or Go client, but to stick to the idea of this little series to use Python, let us alternatively employ the Python client provided by Confluent.

To install this client, use

pip3 install confluent-kafka==1.4.1

Here I am using version 1.4.1 which was the most recent version at the time when this post was written, so you might want to use the same version. Using the package is actually straightforward. Again, we first create a configuration, then a producer and then send records to the broker asynchronously. Compared to the Kafka Python library used so far, there are a few differences which are worth being noted.

  • Similar to the Kafka Python library, sends are done asynchronously. However, you do not receive a future when sending as it is the case for the Kafka Python library, but you define a callback directly
  • To make sure that the callback is invoked, you have to call the poll method of the producer on a regular basis
  • When you are done producing, you have to explicitly call flush to make sure that all buffered messages are sent
  • The configuration parameters of the client follow the Java naming conventions. So the bootstrap servers, for instance, are defined by a configuration parameter called bootstrap.servers instead of bootstrap_servers, and the parameter itself is not a Python list but a comma-separated list passed as a string
  • The base producer class accepts bytes as values and does not invoke a serializer (there is a derived class doing this, but this class is flagged as not yet stable in the API documentation so I decided not to use it)

To turn on idempotent writes, there are a couple of parameters that need to be set in the producer configuration.

  • enable.idempotence needs to be 1 to turn on the feature
  • acks needs to be set to “all”, i.e. -1
  • max.in.flight should be set to one
  • retries needs to be positive (after all, idempotent writes are designed to make automated retries safe)

Using these instructions, it is now straightforward to put together a little test client that uses idempotent writes to a “test” topic. To try this, bring up the Kafka cluster as in the previous posts, create a topic called “test” with three replicas, navigate to the root of the repository and run

python3 python/idempotent_writes.py

You should see a couple of messages showing the configuration used and indicating that ten records have been written. To verify that these records do actually contain a producer ID and a sequence number, we need to dump the log file on one of the brokers.

vagrant ssh broker1
/opt/kafka/kafka_2.13-2.4.1/bin/kafka-dump-log.sh \
  --print-data-log \
  --files /opt/kafka/logs/test-0/00000000000000000000.log

The output should look similar to the following sample output.

Dumping /opt/kafka/logs/test-0/00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 9 count: 10 baseSequence: 0 lastSequence: 9 producerId: 3001 producerEpoch: 0 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1589818655781 size: 291 magic: 2 compresscodec: NONE crc: 307611005 isvalid: true
| offset: 0 CreateTime: 1589818655780 keysize: -1 valuesize: 16 sequence: 0 headerKeys: [] payload: {"msg_count": 0}
| offset: 1 CreateTime: 1589818655780 keysize: -1 valuesize: 16 sequence: 1 headerKeys: [] payload: {"msg_count": 1}
| offset: 2 CreateTime: 1589818655780 keysize: -1 valuesize: 16 sequence: 2 headerKeys: [] payload: {"msg_count": 2}
| offset: 3 CreateTime: 1589818655780 keysize: -1 valuesize: 16 sequence: 3 headerKeys: [] payload: {"msg_count": 3}
| offset: 4 CreateTime: 1589818655780 keysize: -1 valuesize: 16 sequence: 4 headerKeys: [] payload: {"msg_count": 4}
| offset: 5 CreateTime: 1589818655780 keysize: -1 valuesize: 16 sequence: 5 headerKeys: [] payload: {"msg_count": 5}
| offset: 6 CreateTime: 1589818655780 keysize: -1 valuesize: 16 sequence: 6 headerKeys: [] payload: {"msg_count": 6}
| offset: 7 CreateTime: 1589818655780 keysize: -1 valuesize: 16 sequence: 7 headerKeys: [] payload: {"msg_count": 7}
| offset: 8 CreateTime: 1589818655780 keysize: -1 valuesize: 16 sequence: 8 headerKeys: [] payload: {"msg_count": 8}
| offset: 9 CreateTime: 1589818655781 keysize: -1 valuesize: 16 sequence: 9 headerKeys: [] payload: {"msg_count": 9}

Here, the third line contains the header of the entire record batch. We see that the batch contains ten records, and we find a producer ID (3001). In each of the records, we also see a sequence number, ranging from 0 to 9.

Transactions

When you read KIP-98, the Kafka improvement proposal with which idempotent writes where introduced, then you realize that the main objective of this KIP is not just to provide idempotent writes, but to be able to handle transactions in Kafka. Here, handling transactions does not mean that Kafka somehow acts as a distributed transaction manager, joining transactions of a relational database. It does, however, mean that writes and reads in Kafka are transactional in the sense that a producer can write records within a transaction, and consumers will either see all of the records written as part of this transaction or none of them.

This makes it possible to model scenarios that occur quite often in business applications. Suppose, for instance, you are putting together an application handling security deposits. When you sell securities, you produce one record which will trigger the delivery of the securities to the buyer, and a second record that will trigger the payment that you receive for them. Now suppose that the first record is written, and them something goes wrong, so that the second record cannot be written. Without transactions, the first record would be in the log and consumers would pick it up, so that the security side of the transaction would still be processed. With transactions, you can abort the transaction, and the record triggering the security transfer will not become visible for consumers.

We will not go into details about transactions in this post, but KIP-98 is actually quite readable. I also recommend that you take a look at this well written blog post on the Confluent pages that provides some more background and additional links.

With that, it is time to close this short series on Kafka and Python. I hope I was able to give you a good introduction into the architecture and operations of a Kafka cluster and a good starting point for own projects. Happy hacking!