Understanding Kubernetes controllers part I – queues and the core controller loop

In previous posts in my series on Kubernetes, we have stumbled across the concept of a controller. Essentially, a controller is a daemon that monitors the to-be state of components of a Kubernetes cluster against the as-is state and takes action to reach the to-be state.

A classical example is the Replica set controller which monitors replica sets and pods and is responsible for creating new pods or deleting existing pods if the number of replicas is out-of-sync with the defined value.

In this series, we will perform a deep dive into controllers. Specifically, we will take a tour through the sample controller that is provided by the Kubernetes project and try to understand how this controller works. We will also explain how this relates to customer resource definitions (CRDs) and what steps are needed to implement a customer controller for a given CRD.

Testing the sample controller

Let us now start with our analysis of the sample controller. To follow this analysis, I advise to download a copy of the sample controller into your Go workspace using go get github.com/kubernetes/sample-controller and then using an editor like Atom that offers plugins to navigate Go code.

To test the client and to have a starting point for debugging and tracing, let us follow the instructions in the README file that is located at the root of the repository. Assuming that you have a working kubectl config in $HOME/.kube/config, build and start the controller as follows.

$ cd $GOPATH/src/k8s.io/sample-controller
$ go build
$ kubectl create -f artifacts/examples/crd.yaml
$ ./sample-controller --kubeconfig=$HOME/.kube/config

This will create a custom resource definition, specifically a new resource type named “Foo” that we can use as any other resource like Pods or Deployments. In a separate terminal, we can now create a Foo resource.

$ kubectl create -f artifacts/examples/example-foo.yaml 
$ kubectl get deployments
$ kubectl get pods

You will find that our controller has created one deployment which in turn brings up one pod running nginx. If you delete the custom resource again using kubectl delete foo example-foo, both, the deployment and the pods disappear again. However, if you manually delete the deployment, it is is recreated by the controller. So apparently, our controller is able to detect changes to deployments and foo resources and to match them accordingly. How does this work?

Basically, a controller will periodically match the state of the system to the to-be state. For that purpose, several functionalities are required.

  • We need to be able to keep track of the state of the system. This is done based on an event-driven processing and handled by informers that are able to subscribe to events and invoke specific handlers and listers that are able to list all resources in a given Kubernetes cluster
  • We need to be able to keep track of the state of the system. This is done using object stores and their indexed variants indexer
  • Ideally, we should be able to process larger volumes using multi-threading, coordinated by queues

In this and the next post, we will go through these elements one by one. We start with queues.

Queues and concurrency

Let us start by investigating threads and queues in the Kubernetes library. The ability to easily create threads (called go-routines) in Go and the support for managing concurrency and locking are one of the key differentiators of the Go programming language, and of course the Kubernetes client library makes use of these features.

Essentially, a queue in the Kubernetes client library is something that implements the interface k8s.io/client-go/util/workqueue/Interface. That interface contains (among others) the following methods.

  • Add adds an object to the queue
  • Get blocks until an item is available in the queue, and then returns the first item in the queue
  • Done marks an item as processed

Internally, the standard implementation of a queue in queue.go uses Go maps. The keys of these maps are arbitrary objects, the items in the map are actually just placeholders (empty structures). One of these maps is called the dirty set, this map contains all elements that make up the actual queue, i.e. need to be processed. The second map is called the processing set, these are all items which have been retrieved using Get, but for which Done has not yet been called. As maps are unordered, there is also an array which holds the elements in the queue and is used to define the order of processing. Note that each of the maps can hold a specific object only once, whereas the queue can hold several copies of the object.

Queue

If we add something to the queue, it is added to the dirty set and appended to the queue array. If we call Get, the first item is retrieved from the queue, removed from the dirty set and added to the processing set. Calling Done will remove the element from the processing set as well, unless someone else has called Add in the meantime again on the same object – in this case it will be removed from the processing set, but also be added to the queue again.

Let us implement a little test program that works with queues. For that purpose, we will establish two threads aka goroutines. The first thread will call Add five times to add something to the queue and then complete. The second thread will sleep for three seconds and then read from the queue in a loop to retrieve the elements. Here are the functions to send and receive.

func fillQueue(queue workqueue.Interface) {
	time.Sleep(time.Second)
	queue.Add("this")
	queue.Add("is")
	queue.Add("a")
	queue.Add("complete")
	queue.Add("sentence")
}

func readFromQueue(queue workqueue.Interface) {
	time.Sleep(3 * time.Second)
	for {
		item, _ := queue.Get()
		fmt.Printf("Got item: %s\n", item)
		queue.Done(item)
	}
}

With these two functions in place, we can now easily create two goroutines that use the same queue to communicate (goroutines, being threads, share the same address space and can therefore communicate using common data structures).

myQueue := workqueue.New()
go fillQueue(myQueue)
go readFromQueue(myQueue)

However, if you run this, you will find that there is a problem. Our main thread completes after creating both worker threads, and this will cause the program to exit and kill both worker threads before the reader has done anything. To avoid this, we need to wait for the reader thread (which, reading from the queue, will in turn wait for the writer thread). One way to do this with Go language primitives is to use channels. So we change our reader function to receive a channel of integer elements

func readFromQueue(queue workqueue.Interface, stop chan int)

and in the main function, we create a channel, pass it to the function and then read from it which will block until the reader thread sends a confirmation that it is done reading.

stop := make(chan int)
myQueue := workqueue.New()
go fillQueue(myQueue)
go readFromQueue(myQueue, stop)
<-stop

Now, however, there is another problem – how does the reader know that no further items will be written to the queue? Fortunately, queues offers a way to handle this. When a writer is done using the queue, it will call Shutdown on the queue. This will change the queues behavior – reads will no longer be blocking, and the second return value of a Get will be true if the queue is empty. If a reader recognizes this situation, it can stop its goroutine.

A full example can be found here – of course this is made up to demonstrate goroutines, queues and channels and not the most efficient solution for the problem at hand.

The core controller loop

Armed with our understanding of concurrency and queues, we can now take a first look at the code of the sample controller. The main entry points for are the function handleObject and enqueueFoo – these are the functions invoked by the Informer, which we will discuss in one of the next posts, whenever either a Foo object or a Deployment is created, updated or deleted.

The function enqueueFoo is called whenever a Foo object is changed (i.e. added, updated or deleted). It simply determines a key for the object and adds that key to the workqueue.

The workqueue is read by worker threads, which are created in the Run function of the controller. This function creates a certain number of goroutines and then listens on a channel called stopCh, as we have done it in our simplified example before. This channel is created by main.go and used to be able to stop all workers and the main thread if a signal is received.

Each worker thread executes the method processNextItem of the controller in a loop. For each item in the queue, this method calls another method – syncHandler – passing the item retrieved from the queue, i.e. the key of the Foo resource. This method then uses a Lister to retrieve the current state of the Foo resource. It then retrieves the deployment behind the Foo resource, creates it if it could not be found, and updates the number of replicas if needed.

The function handleObject is similar. It is invoked by the informer with the new, updated state of the Deployment object. It then determines the owning Foo resource and simply enqueues that Foo resource. The rest of the processing will then be the same.

At this point, two open ends remain. First, we will have to understand how an Informer works and how it invokes the functions handleObject and enqueueFoo. And we will need to understand what a Lister is doing and where the lister and the data is uses comes from. This will be the topic of our next post.

Leave a Comment

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s