September 22, 2022

Writing Kubernetes Operators with Python

Oz Tiram
Oz Tiram
Senior Software Engineer
Article - the beauty of Python
This article originally appeared as the cover story in the July 2022 edition of entwickler magazine (in German)

What is an Operator, and why write it with Python?

Kubernetes is de facto the platform for deploying applications using Linux containers. Originally developed by Google, to deploy web applications on a cluster of computers, it is now open source code. The developers of Kubernetes allowed extending the API of Kubernetes from very early versions, and today Kubernetes can deploy more than just Linux containers. It can deploy Virtual Machines, using KubeVirt, FreeBSD Jails, and even whole Kubernetes clusters using the Cluster API.

Very early on in the Kubernetes developers realized that allowing to extend Kubernetes is key to successful adoption. Version 1.7 added the ability to define ThirdPartyResource, which allowed extending Kubernetes. These were later named CustomResourceDefinition in version 1.8 and onward.

While Golang is the dominant language in the Kubernetes ecosystem, nothing stops you from writing components in other languages, as long as they fulfill the API. For example, one can replace runc, written in Go, with crun, which is written in C, as both implement the OCI Container Runtime specifications. Or you can replace the Kubelet with Krustlet, written in Rust, as it fulfills the Kubelet API.

Hopefully, this is enough to convince you that you do not need to know Golang to extend Kubernetes. In this article, we will see how to extend the Kubernetes API and schedule our own workloads based on these newly defined API extensions using Python.

If you are reading this magazine, there is a high chance you already know Python or at least interested in it. But that is not all, Python is the most popular language according to some surveys, and it is also very popular in introductory programming courses in Universities. Hence, using Python to extend Kubernetes is likely to be easy for you and other team members who might already know Python and are less likely to know Go.

By now, you already know that Kubernetes can schedule more than just Containers, and that you can extend the API with your own Custom Resource Definition. However, you might still ponder with the question: what is an Operator?

An operator is a collection of domain specific custom resources and a controller program to react to changes to the cluster or these specific resources. For example, an operator can watch certain annotations on Pods or Deployments, and manipulate objects inside or outside the cluster when these annotations are detected. This is how CertManager or ExternalDNS work for example. Specifically, when you create an annotation on an Ingress, there is a chain of actions, which is triggered inside and outside the cluster. Along the process a certificate request is sent to LetEncrypt and if authenticated successfully a new secret containing a certificate is created and used to secure access to the Ingress with HTTPS. The key message here is that: an operator can watch Kubernetes objects, built-in or custom, and act on objects, which can be external or internal to the cluster, bringing them to a desired state (See Fig. 1).

writing Kubernetes operators

Getting a Kubernetes cluster running with Minikube

To develop a Kubernetes operator, you will need access to a working cluster. If you do not have one, it’s easy to spawn a cluster using minikube.

$ minikube start -p --driver docker

	$ minikube start -p --driver docker

😄 [] minikube v1.25.2 on Gentoo 2.8
✨ Using the docker driver based on user configuration
👍 Starting control plane node in cluster
🚜 Pulling base image ...
🔥 Creating docker container (CPUs=2, Memory=2848MB) ...
🐳 Preparing Kubernetes v1.23.3 on Docker 20.10.12 ...
▪  kubelet.housekeeping-interval=5m
▪  Generating certificates and keys ...
▪  Booting up control plane ...
▪  Configuring RBAC rules ...
🔎 Verifying Kubernetes components...
▪  Using image
🌟 Enabled addons: storage-provisioner, default-storageclass
🏄 Done! kubectl is now configured to use "" cluster and "default" namespace by default

Now that you have your cluster running, you can verify that it is working properly by doing:

	$ kubectl get nodes

NAME STATUS ROLES AGE VERSION Ready control-plane,master 6m27s v1.23.3

Introducing the operator Blackadder

We are going to write a small chaos engineering operator, which will produce havoc in the cluster. As with all operators, it needs a name. The blackadder is named after the Chaotic Baron in the BBC Sitcom with the same name.

Chaos Engineering is the discipline of experimenting on a system in order to build confidence in the system’s capability to withstand turbulent conditions in production. There are some pretty mature operators which can create havoc in Kubernetes. Don’t dare running this operator in a Production cluster, unless of course all your applications are already cloud native and chaos resistant. Our operator will randomly kill pods and write garbage inside ConfigMaps. In addition, it will scale deployments to many replicas randomly.

To write a Kubernetes Operator, we can use the official Python client or any other alternative client, or any Python library can communicate with the kube-api-server via HTTP. For this article,

I will be using pykube-ng, which is self-described as a lightweight client library for the Kubernetes API. Personally, I like using it, because it feels more pythonic than the official Python Client for Kubernetes.

We start by creating a CustomResourceDefinition:

	$ cat k8s/blackadder-v1alpha1.yml
kind: CustomResourceDefinition
  scope: Cluster  # a CRD can also be Namespaced
    plural: chaosagents
    singular: chaosagent
    kind: ChaosAgent
    - ca
  - name: v1alpha1  # you can serve multiple versions e.g v1beta2 or v1alpha1
    served: true
    storage: true
        type: object
            type: object
                type: boolean
                type: integer
    - name: Tantrum
      type: boolean
      description: Kills Pods randomly
      jsonPath: .spec.tantrumMode
    - name: Tolerance
      type: integer
      description: Total number of Pod to tolerate before randomly killing Pods
      jsonPath: .spec.podTolerance

We apply this CRD manifest:

	$ kubectl apply -f k8s/blackadder-v1alpha1.yml created

We then create an example chaos agent with:

	$ cat k8s/edmund.yml
kind: ChaosAgent
name: princeedmund
tantrumMode: true
podTolerance: 10

When apply this manifest and then view it, we see:

	$ kubectl apply -f k8s/edmund.yml created
$ kubectl get
princeedmund true 10

We can add more toggles and switches and definitions to our CustomResourceDefintion until it looks like this:

	$ kubectl get
princeedmund true 10 false true 20 30 ["kube-system"]

Note also, that CustomResourceDefinition can do various type input validation, for example we can define eagerness as an integer ranging from 1 to 100:

	$ kubectl apply -f k8s/edmund-v1beta1.yml
The ChaosAgent "princeedmund" is invalid: spec.eagerness: Invalid value: 200: spec.eagerness in body should be less than or equal to 100

You can find the complete definition of the and the manifests for creating agents in the source code repository accompanying this article.

Adding the controller logic

We have now created a new resource that is stored in Kubernetes and served by* kube-api-server*. Hence, we can now create the controller logic. We begin by drafting an algorithm in pseudocode which will explain the intended behavior of the chaos agent:

	client = connect_to_kubernetes()

# retrieves our agent configuration from the kube-api-server
chaos_agent = client.get_chaos_agent()

while True:
pods = client.list_pods(exclude_namespaces)
deployments = client.list_deployments(exclude_namespaces)
namespaces = client.list_configmaps(exclude_namespaces)
if chaos_agent.tantrum:
randomly_kill_pods(pods, chaos_agent.tolerance, chaos_agent.eagerness)

if chaos_agent.cancer:
randomly_scale_deployments(deployments, chaos_agent.eagerness)
if chaos_agent.ipsum:
randomly_write_configmaps(configmaps, chaos_agent.eagerness)


The algorithm is pretty naive, but it’s got all the basics of a Kubernetes operator. Obviously,

if you put it under a magnifying glass, there are a lot of obvious possible improvements. For example, for a cluster with hundreds of ConfigMaps and Pods each cycle can take a long while to complete, especially if cancer mode, which randomly scales up Deployments, is also active. However, we are not in the business of premature optimization, so we’ll ignore these limitations, and continue to the actual implementation in Python.

The first thing we need to do, is to get a Kubernetes client so that we can communicate with kube-api-server:

	import pykube
# automatically detect load in cluster token from
# /run/secrets/ in cluster or
# from ~/.kube/config
config = pykube.KubeConfig.from_env()
api = pykube.HTTPClient(config)

With the client we just created, it’s easy to list objects stored in the Kubernetes database. First, let’s create a couple of Pods and a Deployment:

	$ kubectl run --image test -n kube-public
$ kubectl run --image test -n default
$ kubectl create deployment my-dep --image=nginx --replicas=3

Then we can list them using the interactive Python console, which is extremely handy for discovering the API while prototyping:

	$ python -m pykube
Pykube v22.7.0, loaded "/home/oznt/.kube/config" with context "".

  Example commands:

    [ for d in Deployment.objects(api)]              # get names of deployments in default namespace

    list(DaemonSet.objects(api, namespace='kube-system'))  # list daemonsets in "kube-system"

    Pod.objects(api).get_by_name('mypod').labels           # labels of pod "mypod"

  Use Ctrl-D to exit

>>> [f"{p.namespace}/{}" for p in Pod.objects(api, namespace=pykube.all)
       if p.namespace not in ["kube-system"]]
    ['default/my-dep-84885b44-29vg7', 'default/my-dep-84885b44-l5nkw',
    'default/my-dep-84885b44-p2gcd', 'default/test', 'kube-public/test']

The API is very intuitive! Hence, getting ConfigMaps and Deployments is similar. We can also filter objects by labels, or metadata, which we will see later. However, pykube-ng does not have predefined objects for listing ChaosAgents. Using an object factory, we can create such object:

We now have all that is needed to create the complete controller of a ChaosAgent:

	import time
import pykube
import munch

config = pykube.KubeConfig.from_env()
api = pykube.HTTPClient(config)
ChaosAgent = pykube.object_factory(api, "", "ChaosAgent")

# retrieves our agent configuration from the kube-api-server
agent = ChaosAgent().objects(api, namespace=pykube.all)
agent.config = munch.munchify(agent.obj["spec"])

exclude_namespaces = agent.config.excludedNamespaces

def randomly_kill_pods(pods, tolerance, eagerness):

def randomly_scale_deployments(deployments, eagerness):

def randomly_write_configmaps(configmaps, eagerness):

while True:
    pods = api.list_pods(exclude_namespaces)
    deployments = api.list_deployments(exclude_namespaces)
    configmaps = api.list_configmaps(exclude_namespaces)

    if agent.config.tantrumMode:
        randomly_kill_pods(pods, agent.config.tolerance, agent.config.eagerness)
    if agent.config.cancerMode:
        randomly_scale_deployments(deployments, agent.config.eagerness)
    if agent.config.ipsumMode:
        randomly_write_configmaps(configmaps, agent.config.eagerness)


Note, the api instance does not have a method for listing objects with the signature I wrote. There are two ways we can filter objects. The first, naive one, is like I already showed:

	>>> [p for p in Pod.objects(api, namespace=pykube.all)
if p.namespace not in exclude_namespaces]

This works and might be easy to read if you are versed in Python and like list comprehensions. However, it sends a lot of data between the _kube-api-server and the client. The better way is to select the objects based on field selectors sent to the server:

We can repeat this for Deployments and ConfigMaps, but we should not. Instead, we create a generic method and add it to the client instance. We are leveraging on Python’s dynamic types to modify objects at runtime, which can be fun but also dangerous:

	from pykube import Pod, Deployment, ConfigMap
def list_objects(self, k8s_obj, exclude_namespaces):
    exclude_namespaces = ",".join("metadata.namespace!=" + ns
                                  for ns in exclude_namespaces)
    return list(
pykube.HTTPClient.list_objects = list_objects
api = pykube.HTTPClient(config)
while True:
    pods = api.list_objects(Pod, exclude_namespaces)
    deployments = api.list_objects(Deployment, exclude_namespaces)
    configmaps = api.list_objects(ConfigMap, exclude_namespaces)

The next steps are implementing the chaos functions. We begin with pod deletion:

	def randomly_kill_pods(pods, tolerance, eagerness):
    if len(pods) < tolerance:

    for p in pods:
        if random.randint(0, 100) < eagerness:
            print(f"Deleted {p.namespace}/{}")


The next chaos function is deployment scaling:

	def randomly_scale_deployments(deployments, eagerness):
    for d in deployments:
        if random.randint(0, 100) < eagerness:
            while True:
                    d.replicas =  if d.replicas < 128:
                        d.replicas = min(d.replicas * 2, 128)
                    print(f"scaled {d.namespace}/{} to {d.replicas}")
                except (requests.exceptions.HTTPError, pykube.exceptions.HTTPError):
                    print(f"error scaling {d.namespace}/{} to {d.replicas}")

Finally, we implement the function which writes Lorem Ipsum snippets:

	def randomly_write_configmaps(configmaps, eagerness):
    for cm in configmaps:
        print(f"Checking {cm.namespace}/{}")
        if cm.obj.get("immutable"):

        if random.randint(0, 100) < eagerness:
            for k, v in cm.obj["data"].items():
                cm.obj["data"][k] = lorem.paragraph()

            print(f"Lorem Ipsum in {cm.namespace}/{}")

With that, the controller code is complete. You can view the complete code in ** **in the code repository. If you are a seasoned Pythonista, you probably think, that this code can be tremendously improved, or that iterating over a large set of objects is a perfect use case for ConcurrentFutures or aysncio, you are probably right! However, these are optimizations which will mask the purpose of learning Kubernetes Operators. Hence, I will not demonstrate this here, and leave to you as an exercise.

If you followed until now and tried running, you will see output similar to this:

	$ python
Deleted default/my-dep-84885b44-bjg4t
Deleted default/my-dep-84885b44-ljvdn
Lorem Impsum in kube-node-lease/kube-root-ca.crt
scaled default/my-dep to 4

This works on the local shell with your admin configuration file, which was created for you by minikube. When you deploy the controller to the cluster you will need to give the controller permissions to list, patch and delete Pod, Deployment and ConfigMap objects.

Before we examine how to create these permissions, we create a Docker image and deploy the controller to the cluster.

The Dockerfile is using a multistage build and pipenv to manage dependency installation:

	FROM AS builder
RUN pip install --user pipenv
# Tell pipenv to create venv in the current directory
ADD Pipfile.lock Pipfile /usr/src/
WORKDIR /usr/src
RUN /root/.local/bin/pipenv sync
RUN /usr/src/.venv/bin/python3 -c "import pykube; print(pykube.__version__)"
FROM AS runtime
RUN mkdir -v /usr/src/venv
COPY --from=builder /usr/src/.venv/ /usr/src/venv/
RUN /usr/src/venv/bin/python3 -c "import pykube; print(pykube.__version__)"
WORKDIR /usr/src/
CMD ["./venv/bin/python", "-u", ""]

Then, you need to build the image and push it to a public or private repository:

	$ docker build -t oz123/blackadder:0.1 .
Sending build context to Docker daemon 166.4kB
Step 1/13 : FROM AS builder
3.10: Pulling from library/python
1339eaac5b67: Pull complete
4c78fa1b9799: Pull complete
	$ docker push oz123/blackadder:0.1
The push refers to repository []
2ce87cdce319: Pushed
645d7db6379e: Pushing [==================================================>] 17.26MB
3c924eba81b8: Pushed

After that, create a namespace for the controller to be deployed in:

	$ kubectl create namespace chaos-operator
namespace/chaos-operator created

Before we deploy the controller, we should exclude that namespace from the list of watched namespaces:

Now, we can create the deployment for the chaos controller:

	$ kubectl create deployment blackadder –image=oz123/blackadder:0.1 --replicas=1 \
   -n chaos-operator
deployment.apps/blackadder created

When you look at the logs of the container, you will see it crashed:

	$ kubectl logs -n chaos-operator blackadder-65bc54f7f9-v56bp
Traceback (most recent call last):
  File "/usr/src/", line 35, in 
    agent = list(ChaosAgent.objects(api, namespace=pykube.all))[0]
  File "/usr/src/venv/lib/python3.10/site-packages/pykube/", line 195, in __iter__
    return iter(self.query_cache["objects"])
  File "/usr/src/venv/lib/python3.10/site-packages/pykube/", line 185, in query_cache
    cache["response"] = self.execute().json()
  File "/usr/src/venv/lib/python3.10/site-packages/pykube/", line 160, in execute
  File "/usr/src/venv/lib/python3.10/site-packages/requests/", line 1021, in raise_for_status
    raise HTTPError(http_error_msg, response=self)
Requests.exceptions.HTTPError                                        :
 403 Client Error: Forbidden for url:

That is because the service account for the namespace has no permissions to list ChaosAgent objects.

To fix this, we need to define a ClusterRole and ClusterRoleBinding and assign them to the user which runs the controller.

kind: ClusterRole
name: blackadder
- apiGroups: ["apps"]
resources: ["deployments"]
verbs: ["get", "list", "patch"]
- apiGroups: [""]
resources: ["chaosagents"]
verbs: ["get", "list"]
- apiGroups: [""]
resources: ["pods"]
verbs: ["get", "list", "delete"]
- apiGroups: [""]
resources: ["configmaps"]
verbs: ["get", "list", "patch"]

The ClusterRoleBinding is defined with:

kind: ClusterRoleBinding
name: blackadder
kind: ClusterRole
name: blackadder
- apiGroup:
kind: User
name: system:serviceaccount:chaos-operator:default

We apply these manifests:

	$ kubectl apply -f k8s/clusterrole.yml created
$ kubectl apply -f k8s/clusterrolebinding.yml created

Once you restart the Pod, you will see it running and doing its job. Note that, in the final version of the controller, the while True loop is moved into a main function, such that the code looks like this:

	# this is docker label oz123/blackadder:0.1.1
def main():
    while True:
        pods = api.list_objects(Pod, exclude_namespaces)
        deployments = api.list_objects(Deployment, exclude_namespaces)
        configmaps = api.list_objects(ConfigMap, exclude_namespaces)
if __name__ == "__main__":

    print("This is the blackadder version 0.1")
    print("Ready to start a havoc in your cluster")

When you watch the controller logs, you will see:

	$ kubectl logs -n chaos-operator blackadder-7695b89559-8q4qp
This is the blackadder version 0.1.1
Ready to start a havoc in your cluster
Checking default/kube-root-ca.crt
Checking kube-node-lease/kube-root-ca.crt
Checking kube-public/cluster-info

With that, we are finished implementing the chaos controller part of the Blackadder Operator.


We have seen that it is easy to create Kubernetes Operators with Python. Creating Operators allows us to extend Kubernetes in ways that fit our needs, and which the original developers of Kubernetes might have not thought of. We can create controller logic for anything that Python can access inside the cluster or outside the cluster, and by using a CustomResourceDefinition we can store information in the Kubernetes database, which can be used to configure the Operator, or save data on the objects the controller works on.

I truly hope you enjoyed reading this article and that you are feeling comfortable enough to embark on your own journey to extend Kubernetes with your own Python Operators.

How to
Subscribe to our newsletter
By signing up, you agree with our Terms of Service and our Privacy Policy