Published
September 22, 2022

Writing Kubernetes Operators with Python

Oz Tiram
Oz Tiram
Senior Software Engineer
Article - the beauty of Python
This article originally appeared as the cover story in the July 2022 edition of entwickler magazine (in German)

What is an Operator, and why write it with Python?

Kubernetes is de facto the platform for deploying applications using Linux containers. Originally developed by Google, to deploy web applications on a cluster of computers, it is now open source code. The developers of Kubernetes allowed extending the API of Kubernetes from very early versions, and today Kubernetes can deploy more than just Linux containers. It can deploy Virtual Machines, using KubeVirt, FreeBSD Jails, and even whole Kubernetes clusters using the Cluster API.

Very early on in the Kubernetes developers realized that allowing to extend Kubernetes is key to successful adoption. Version 1.7 added the ability to define ThirdPartyResource, which allowed extending Kubernetes. These were later named CustomResourceDefinition in version 1.8 and onward.

While Golang is the dominant language in the Kubernetes ecosystem, nothing stops you from writing components in other languages, as long as they fulfill the API. For example, one can replace runc, written in Go, with crun, which is written in C, as both implement the OCI Container Runtime specifications. Or you can replace the Kubelet with Krustlet, written in Rust, as it fulfills the Kubelet API.

Hopefully, this is enough to convince you that you do not need to know Golang to extend Kubernetes. In this article, we will see how to extend the Kubernetes API and schedule our own workloads based on these newly defined API extensions using Python.

If you are reading this magazine, there is a high chance you already know Python or at least interested in it. But that is not all, Python is the most popular language according to some surveys, and it is also very popular in introductory programming courses in Universities. Hence, using Python to extend Kubernetes is likely to be easy for you and other team members who might already know Python and are less likely to know Go.

By now, you already know that Kubernetes can schedule more than just Containers, and that you can extend the API with your own Custom Resource Definition. However, you might still ponder with the question: what is an Operator?

An operator is a collection of domain specific custom resources and a controller program to react to changes to the cluster or these specific resources. For example, an operator can watch certain annotations on Pods or Deployments, and manipulate objects inside or outside the cluster when these annotations are detected. This is how CertManager or ExternalDNS work for example. Specifically, when you create an annotation on an Ingress, there is a chain of actions, which is triggered inside and outside the cluster. Along the process a certificate request is sent to LetEncrypt and if authenticated successfully a new secret containing a certificate is created and used to secure access to the Ingress with HTTPS. The key message here is that: an operator can watch Kubernetes objects, built-in or custom, and act on objects, which can be external or internal to the cluster, bringing them to a desired state (See Fig. 1).

writing Kubernetes operators

Getting a Kubernetes cluster running with Minikube

To develop a Kubernetes operator, you will need access to a working cluster. If you do not have one, it’s easy to spawn a cluster using minikube.

$ minikube start -p entwickler.de --driver docker

 
	$ minikube start -p entwickler.de --driver docker

😄 [entwickler.de] minikube v1.25.2 on Gentoo 2.8
✨ Using the docker driver based on user configuration
👍 Starting control plane node entwickler.de in cluster entwickler.de
🚜 Pulling base image ...
🔥 Creating docker container (CPUs=2, Memory=2848MB) ...
🐳 Preparing Kubernetes v1.23.3 on Docker 20.10.12 ...
▪  kubelet.housekeeping-interval=5m
▪  Generating certificates and keys ...
▪  Booting up control plane ...
▪  Configuring RBAC rules ...
🔎 Verifying Kubernetes components...
▪  Using image gcr.io/k8s-minikube/storage-provisioner:v5
🌟 Enabled addons: storage-provisioner, default-storageclass
🏄 Done! kubectl is now configured to use "entwickler.de" cluster and "default" namespace by default
	

Now that you have your cluster running, you can verify that it is working properly by doing:

 
	$ kubectl get nodes
	

NAME STATUS ROLES AGE VERSION

 
	entwickler.de Ready control-plane,master 6m27s v1.23.3
	

Introducing the operator Blackadder

We are going to write a small chaos engineering operator, which will produce havoc in the cluster. As with all operators, it needs a name. The blackadder is named after the Chaotic Baron in the BBC Sitcom with the same name.

Chaos Engineering is the discipline of experimenting on a system in order to build confidence in the system’s capability to withstand turbulent conditions in production. There are some pretty mature operators which can create havoc in Kubernetes. Don’t dare running this operator in a Production cluster, unless of course all your applications are already cloud native and chaos resistant. Our operator will randomly kill pods and write garbage inside ConfigMaps. In addition, it will scale deployments to many replicas randomly.

To write a Kubernetes Operator, we can use the official Python client or any other alternative client, or any Python library can communicate with the kube-api-server via HTTP. For this article,

I will be using pykube-ng, which is self-described as a lightweight client library for the Kubernetes API. Personally, I like using it, because it feels more pythonic than the official Python Client for Kubernetes.

We start by creating a CustomResourceDefinition:

 
	$ cat k8s/blackadder-v1alpha1.yml
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  name: chaosagents.blackadder.io
spec:
  group: blackadder.io
  scope: Cluster  # a CRD can also be Namespaced
  names:
    plural: chaosagents
    singular: chaosagent
    kind: ChaosAgent
    shortNames:
    - ca
  versions:
  - name: v1alpha1  # you can serve multiple versions e.g v1beta2 or v1alpha1
    served: true
    storage: true
    schema:
      openAPIV3Schema:
        type: object
        properties:
          spec:
            type: object
            properties:
              tantrumMode:
                type: boolean
              podTolerance:
                type: integer
    additionalPrinterColumns:
    - name: Tantrum
      type: boolean
      description: Kills Pods randomly
      jsonPath: .spec.tantrumMode
    - name: Tolerance
      type: integer
      description: Total number of Pod to tolerate before randomly killing Pods
      jsonPath: .spec.podTolerance
	

We apply this CRD manifest:

 
	$ kubectl apply -f k8s/blackadder-v1alpha1.yml
customresourcedefinition.apiextensions.k8s.io/chaosagents.blackadder.io created
	

We then create an example chaos agent with:

 
	$ cat k8s/edmund.yml
apiVersion: blackadder.io/v1alpha1
kind: ChaosAgent
metadata:
name: princeedmund
spec:
tantrumMode: true
podTolerance: 10
	

When apply this manifest and then view it, we see:

 
	$ kubectl apply -f k8s/edmund.yml
chaosagent.blackadder.io/princeedmund1 created
$ kubectl get chaosagents.blackadder.io
NAME TANTRUM TOLERANCE
princeedmund true 10
	

We can add more toggles and switches and definitions to our CustomResourceDefintion until it looks like this:

 
	$ kubectl get chaosagents.blackadder.io
NAME TANTRUM TOLERANCE CANCER IPSUM EAGERNESS PAUSE EXCLUDED
princeedmund true 10 false true 20 30 ["kube-system"]
	

Note also, that CustomResourceDefinition can do various type input validation, for example we can define eagerness as an integer ranging from 1 to 100:

 
	$ kubectl apply -f k8s/edmund-v1beta1.yml
The ChaosAgent "princeedmund" is invalid: spec.eagerness: Invalid value: 200: spec.eagerness in body should be less than or equal to 100
	

You can find the complete definition of the chaosagents.blackadder.io and the manifests for creating agents in the source code repository accompanying this article.

Adding the controller logic

We have now created a new resource that is stored in Kubernetes and served by* kube-api-server*. Hence, we can now create the controller logic. We begin by drafting an algorithm in pseudocode which will explain the intended behavior of the chaos agent:

 
	client = connect_to_kubernetes()

# retrieves our agent configuration from the kube-api-server
chaos_agent = client.get_chaos_agent()

while True:
pods = client.list_pods(exclude_namespaces)
deployments = client.list_deployments(exclude_namespaces)
namespaces = client.list_configmaps(exclude_namespaces)
if chaos_agent.tantrum:
randomly_kill_pods(pods, chaos_agent.tolerance, chaos_agent.eagerness)

if chaos_agent.cancer:
randomly_scale_deployments(deployments, chaos_agent.eagerness)
if chaos_agent.ipsum:
randomly_write_configmaps(configmaps, chaos_agent.eagerness)

time.sleep(chaos_agent.pause)
	

The algorithm is pretty naive, but it’s got all the basics of a Kubernetes operator. Obviously,

if you put it under a magnifying glass, there are a lot of obvious possible improvements. For example, for a cluster with hundreds of ConfigMaps and Pods each cycle can take a long while to complete, especially if cancer mode, which randomly scales up Deployments, is also active. However, we are not in the business of premature optimization, so we’ll ignore these limitations, and continue to the actual implementation in Python.

The first thing we need to do, is to get a Kubernetes client so that we can communicate with kube-api-server:

 
	import pykube
# automatically detect load in cluster token from
# /run/secrets/kubernetes.io/serviceaccount/token in cluster or
# from ~/.kube/config
config = pykube.KubeConfig.from_env()
api = pykube.HTTPClient(config)
	

With the client we just created, it’s easy to list objects stored in the Kubernetes database. First, let’s create a couple of Pods and a Deployment:

 
	$ kubectl run --image docker.io/nginx test -n kube-public
$ kubectl run --image docker.io/nginx test -n default
$ kubectl create deployment my-dep --image=nginx --replicas=3
	

Then we can list them using the interactive Python console, which is extremely handy for discovering the API while prototyping:

 
	$ python -m pykube
Pykube v22.7.0, loaded "/home/oznt/.kube/config" with context "etwickler.de".

  Example commands:

    [d.name for d in Deployment.objects(api)]              # get names of deployments in default namespace

    list(DaemonSet.objects(api, namespace='kube-system'))  # list daemonsets in "kube-system"

    Pod.objects(api).get_by_name('mypod').labels           # labels of pod "mypod"

  Use Ctrl-D to exit

>>> [f"{p.namespace}/{p.name}" for p in Pod.objects(api, namespace=pykube.all)
       if p.namespace not in ["kube-system"]]
    ['default/my-dep-84885b44-29vg7', 'default/my-dep-84885b44-l5nkw',
    'default/my-dep-84885b44-p2gcd', 'default/test', 'kube-public/test']
	

The API is very intuitive! Hence, getting ConfigMaps and Deployments is similar. We can also filter objects by labels, or metadata, which we will see later. However, pykube-ng does not have predefined objects for listing ChaosAgents. Using an object factory, we can create such object:

We now have all that is needed to create the complete controller of a ChaosAgent:

 
	import time
import pykube
import munch

config = pykube.KubeConfig.from_env()
api = pykube.HTTPClient(config)
ChaosAgent = pykube.object_factory(api, "blackadder.io/v1beta1", "ChaosAgent")

# retrieves our agent configuration from the kube-api-server
agent = ChaosAgent().objects(api, namespace=pykube.all)
agent.config = munch.munchify(agent.obj["spec"])

exclude_namespaces = agent.config.excludedNamespaces

def randomly_kill_pods(pods, tolerance, eagerness):
    pass

def randomly_scale_deployments(deployments, eagerness):
    pass

def randomly_write_configmaps(configmaps, eagerness):
    pass

while True:
    pods = api.list_pods(exclude_namespaces)
    deployments = api.list_deployments(exclude_namespaces)
    configmaps = api.list_configmaps(exclude_namespaces)

    if agent.config.tantrumMode:
        randomly_kill_pods(pods, agent.config.tolerance, agent.config.eagerness)
    if agent.config.cancerMode:
        randomly_scale_deployments(deployments, agent.config.eagerness)
    if agent.config.ipsumMode:
        randomly_write_configmaps(configmaps, agent.config.eagerness)

    time.sleep(agent.config.pauseDuration)
	

Note, the api instance does not have a method for listing objects with the signature I wrote. There are two ways we can filter objects. The first, naive one, is like I already showed:

 
	>>> [p for p in Pod.objects(api, namespace=pykube.all)
if p.namespace not in exclude_namespaces]
	

This works and might be easy to read if you are versed in Python and like list comprehensions. However, it sends a lot of data between the _kube-api-server and the client. The better way is to select the objects based on field selectors sent to the server:

We can repeat this for Deployments and ConfigMaps, but we should not. Instead, we create a generic method and add it to the client instance. We are leveraging on Python’s dynamic types to modify objects at runtime, which can be fun but also dangerous:

 
	from pykube import Pod, Deployment, ConfigMap
...
def list_objects(self, k8s_obj, exclude_namespaces):
    exclude_namespaces = ",".join("metadata.namespace!=" + ns
                                  for ns in exclude_namespaces)
    return list(
            k8s_obj.objects(api).filter(namespace=pykube.all,
                                        field_selector=exclude_namespaces
                                        ))
...
pykube.HTTPClient.list_objects = list_objects
api = pykube.HTTPClient(config)
...
while True:
    pods = api.list_objects(Pod, exclude_namespaces)
    deployments = api.list_objects(Deployment, exclude_namespaces)
    configmaps = api.list_objects(ConfigMap, exclude_namespaces)
    ...
	

The next steps are implementing the chaos functions. We begin with pod deletion:

 
	def randomly_kill_pods(pods, tolerance, eagerness):
    if len(pods) < tolerance:
        return

    for p in pods:
        if random.randint(0, 100) < eagerness:
            p.delete()
            print(f"Deleted {p.namespace}/{p.name}")

	

The next chaos function is deployment scaling:

 
	def randomly_scale_deployments(deployments, eagerness):
    for d in deployments:
        if random.randint(0, 100) < eagerness:
            while True:
                try:
                    d.replicas =  if d.replicas < 128:
                        d.replicas = min(d.replicas * 2, 128)
                    d.update()
                    print(f"scaled {d.namespace}/{d.name} to {d.replicas}")
                    break
                except (requests.exceptions.HTTPError, pykube.exceptions.HTTPError):
                    print(f"error scaling {d.namespace}/{d.name} to {d.replicas}")
                    d.reload()
                    continue
	

Finally, we implement the function which writes Lorem Ipsum snippets:

 
	def randomly_write_configmaps(configmaps, eagerness):
    for cm in configmaps:
        print(f"Checking {cm.namespace}/{cm.name}")
        if cm.obj.get("immutable"):
            continue

        if random.randint(0, 100) < eagerness:
            for k, v in cm.obj["data"].items():
                cm.obj["data"][k] = lorem.paragraph()

            print(f"Lorem Ipsum in {cm.namespace}/{cm.name}")
	

With that, the controller code is complete. You can view the complete code in **controller.py **in the code repository. If you are a seasoned Pythonista, you probably think, that this code can be tremendously improved, or that iterating over a large set of objects is a perfect use case for ConcurrentFutures or aysncio, you are probably right! However, these are optimizations which will mask the purpose of learning Kubernetes Operators. Hence, I will not demonstrate this here, and leave to you as an exercise.

If you followed until now and tried running controller.py, you will see output similar to this:

 
	$ python controller.py
Deleted default/my-dep-84885b44-bjg4t
Deleted default/my-dep-84885b44-ljvdn
...
Lorem Impsum in kube-node-lease/kube-root-ca.crt
...
scaled default/my-dep to 4
...
	

This works on the local shell with your admin configuration file, which was created for you by minikube. When you deploy the controller to the cluster you will need to give the controller permissions to list, patch and delete Pod, Deployment and ConfigMap objects.

Before we examine how to create these permissions, we create a Docker image and deploy the controller to the cluster.

The Dockerfile is using a multistage build and pipenv to manage dependency installation:

 
	FROM docker.io/python:3.10 AS builder
RUN pip install --user pipenv
# Tell pipenv to create venv in the current directory
ENV PIPENV_VENV_IN_PROJECT=1
ADD Pipfile.lock Pipfile /usr/src/
WORKDIR /usr/src
RUN /root/.local/bin/pipenv sync
RUN /usr/src/.venv/bin/python3 -c "import pykube; print(pykube.__version__)"
FROM docker.io/python:3.10 AS runtime
RUN mkdir -v /usr/src/venv
COPY --from=builder /usr/src/.venv/ /usr/src/venv/
RUN /usr/src/venv/bin/python3 -c "import pykube; print(pykube.__version__)"
WORKDIR /usr/src/
COPY controller.py .
CMD ["./venv/bin/python", "-u", "controller.py"]
	

Then, you need to build the image and push it to a public or private repository:

 
	$ docker build -t oz123/blackadder:0.1 .
Sending build context to Docker daemon 166.4kB
Step 1/13 : FROM docker.io/python:3.10 AS builder
3.10: Pulling from library/python
1339eaac5b67: Pull complete
4c78fa1b9799: Pull complete
	
 
	$ docker push oz123/blackadder:0.1
The push refers to repository [docker.io/oz123/blackadder]
2ce87cdce319: Pushed
645d7db6379e: Pushing [==================================================>] 17.26MB
3c924eba81b8: Pushed
...
	


After that, create a namespace for the controller to be deployed in:

 
	$ kubectl create namespace chaos-operator
namespace/chaos-operator created
	

Before we deploy the controller, we should exclude that namespace from the list of watched namespaces:

Now, we can create the deployment for the chaos controller:

 
	$ kubectl create deployment blackadder –image=oz123/blackadder:0.1 --replicas=1 \
   -n chaos-operator
deployment.apps/blackadder created
	

When you look at the logs of the container, you will see it crashed:

 
	$ kubectl logs -n chaos-operator blackadder-65bc54f7f9-v56bp
Traceback (most recent call last):
  File "/usr/src/controller.py", line 35, in 
    agent = list(ChaosAgent.objects(api, namespace=pykube.all))[0]
  File "/usr/src/venv/lib/python3.10/site-packages/pykube/query.py", line 195, in __iter__
    return iter(self.query_cache["objects"])
  File "/usr/src/venv/lib/python3.10/site-packages/pykube/query.py", line 185, in query_cache
    cache["response"] = self.execute().json()
  File "/usr/src/venv/lib/python3.10/site-packages/pykube/query.py", line 160, in execute
    r.raise_for_status()
  File "/usr/src/venv/lib/python3.10/site-packages/requests/models.py", line 1021, in raise_for_status
    raise HTTPError(http_error_msg, response=self)
Requests.exceptions.HTTPError                                        :
 403 Client Error: Forbidden for url: https://10.96.0.1:443/apis/blackadder.io/v1beta1/chaosagents
	

That is because the service account for the namespace has no permissions to list ChaosAgent objects.

To fix this, we need to define a ClusterRole and ClusterRoleBinding and assign them to the user which runs the controller.

 
	apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: blackadder
rules:
- apiGroups: ["apps"]
resources: ["deployments"]
verbs: ["get", "list", "patch"]
- apiGroups: ["blackadder.io"]
resources: ["chaosagents"]
verbs: ["get", "list"]
- apiGroups: [""]
resources: ["pods"]
verbs: ["get", "list", "delete"]
- apiGroups: [""]
resources: ["configmaps"]
verbs: ["get", "list", "patch"]
	

The ClusterRoleBinding is defined with:

 
	apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: blackadder
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: blackadder
subjects:
- apiGroup: rbac.authorization.k8s.io
kind: User
name: system:serviceaccount:chaos-operator:default
	

We apply these manifests:

 
	$ kubectl apply -f k8s/clusterrole.yml
clusterrole.rbac.authorization.k8s.io/blackadder created
$ kubectl apply -f k8s/clusterrolebinding.yml
clusterrolebinding.rbac.authorization.k8s.io/blackadder created
	

Once you restart the Pod, you will see it running and doing its job. Note that, in the final version of the controller, the while True loop is moved into a main function, such that the code looks like this:

 
	# this is docker label oz123/blackadder:0.1.1
def main():
    while True:
        pods = api.list_objects(Pod, exclude_namespaces)
        deployments = api.list_objects(Deployment, exclude_namespaces)
        configmaps = api.list_objects(ConfigMap, exclude_namespaces)
        ...
if __name__ == "__main__":

    print("This is the blackadder version 0.1")
    print("Ready to start a havoc in your cluster")
    main()
	

When you watch the controller logs, you will see:

 
	$ kubectl logs -n chaos-operator blackadder-7695b89559-8q4qp
This is the blackadder version 0.1.1
Ready to start a havoc in your cluster
Checking default/kube-root-ca.crt
Checking kube-node-lease/kube-root-ca.crt
Checking kube-public/cluster-info
...
	

With that, we are finished implementing the chaos controller part of the Blackadder Operator.

Summary

We have seen that it is easy to create Kubernetes Operators with Python. Creating Operators allows us to extend Kubernetes in ways that fit our needs, and which the original developers of Kubernetes might have not thought of. We can create controller logic for anything that Python can access inside the cluster or outside the cluster, and by using a CustomResourceDefinition we can store information in the Kubernetes database, which can be used to configure the Operator, or save data on the objects the controller works on.

I truly hope you enjoyed reading this article and that you are feeling comfortable enough to embark on your own journey to extend Kubernetes with your own Python Operators.

Tags:
How to
Developers
Operations
Subscribe to our newsletter
By signing up, you agree with our Terms of Service and our Privacy Policy