Published 2022-09-22

Writing Kubernetes Operators with Python

Senior Software Engineer @ Spectro Cloud

This article originally appeared as the cover story in the July 2022 edition of entwickler magazine (in German)

82 image1

What is an Operator, and why write it with Python?

Kubernetes is de facto the platform for deploying applications using Linux containers. Originally developed by Google, to deploy web applications on a cluster of computers, it is now open source code. The developers of Kubernetes allowed extending the API of Kubernetes from very early versions, and today Kubernetes can deploy more than just Linux containers. It can deploy Virtual Machines, using KubeVirt, FreeBSD Jails, and even whole Kubernetes clusters using the Cluster API.

Very early on in the Kubernetes developers realized that allowing to extend Kubernetes is key to successful adoption. Version 1.7 added the ability to define ThirdPartyResource, which allowed extending Kubernetes. These were later named CustomResourceDefinition in version 1.8 and onward.

While Golang is the dominant language in the Kubernetes ecosystem, nothing stops you from writing components in other languages, as long as they fulfill the API. For example, one can replace runc, written in Go, with crun, which is written in C, as both implement the OCI Container Runtime specifications. Or you can replace the Kubelet with Krustlet, written in Rust, as it fulfills the Kubelet API.

Hopefully, this is enough to convince you that you do not need to know Golang to extend Kubernetes. In this article, we will see how to extend the Kubernetes API and schedule our own workloads based on these newly defined API extensions using Python.

If you are reading this magazine, there is a high chance you already know Python or at least interested in it. But that is not all, Python is the most popular language according to some surveys, and it is also very popular in introductory programming courses in Universities. Hence, using Python to extend Kubernetes is likely to be easy for you and other team members who might already know Python and are less likely to know Go.

By now, you already know that Kubernetes can schedule more than just Containers, and that you can extend the API with your own Custom Resource Definition. However, you might still ponder with the question: what is an Operator?

An operator is a collection of domain specific custom resources and a controller program to react to changes to the cluster or these specific resources. For example, an operator can watch certain annotations on Pods or Deployments, and manipulate objects inside or outside the cluster when these annotations are detected. This is how CertManager or ExternalDNS work for example. Specifically, when you create an annotation on an Ingress, there is a chain of actions, which is triggered inside and outside the cluster. Along the process a certificate request is sent to LetEncrypt and if authenticated successfully a new secret containing a certificate is created and used to secure access to the Ingress with HTTPS. The key message here is that: an operator can watch Kubernetes objects, built-in or custom, and act on objects, which can be external or internal to the cluster, bringing them to a desired state (See Fig. 1).

82 image2

Getting a Kubernetes cluster running with Minikube

To develop a Kubernetes operator, you will need access to a working cluster. If you do not have one, it’s easy to spawn a cluster using minikube.

$ minikube start -p entwickler.de --driver docker 😄 [entwickler.de] minikube v1.25.2 on Gentoo 2.8 ✨ Using the docker driver based on user configuration 👍 Starting control plane node entwickler.de in cluster entwickler.de 🚜 Pulling base image ... 🔥 Creating docker container (CPUs=2, Memory=2848MB) ... 🐳 Preparing Kubernetes v1.23.3 on Docker 20.10.12 ... ▪ kubelet.housekeeping-interval=5m ▪ Generating certificates and keys ... ▪ Booting up control plane ... ▪ Configuring RBAC rules ... 🔎 Verifying Kubernetes components... ▪ Using image gcr.io/k8s-minikube/storage-provisioner:v5 🌟 Enabled addons: storage-provisioner, default-storageclass 🏄 Done! kubectl is now configured to use "entwickler.de" cluster and "default" namespace by default

Now that you have your cluster running, you can verify that it is working properly by doing:

$ kubectl get nodes

NAME STATUS ROLES AGE VERSION

entwickler.de Ready control-plane,master 6m27s v1.23.3

Introducing the operator Blackadder

We are going to write a small chaos engineering operator, which will produce havoc in the cluster. As with all operators, it needs a name. The blackadder is named after the Chaotic Baron in the BBC Sitcom with the same name.

Chaos Engineering is the discipline of experimenting on a system in order to build confidence in the system’s capability to withstand turbulent conditions in production. There are some pretty mature operators which can create havoc in Kubernetes. Don’t dare running this operator in a Production cluster, unless of course all your applications are already cloud native and chaos resistant. Our operator will randomly kill pods and write garbage inside ConfigMaps. In addition, it will scale deployments to many replicas randomly.

To write a Kubernetes Operator, we can use the official Python client or any other alternative client, or any Python library can communicate with the kube-api-server via HTTP. For this article,

I will be using pykube-ng, which is self-described as a lightweight client library for the Kubernetes API. Personally, I like using it, because it feels more pythonic than the official Python Client for Kubernetes.

We start by creating a CustomResourceDefinition:

$ cat k8s/blackadder-v1alpha1.yml apiVersion: apiextensions.k8s.io/v1 kind: CustomResourceDefinition metadata: name: chaosagents.blackadder.io spec: group: blackadder.io scope: Cluster # a CRD can also be Namespaced names: plural: chaosagents singular: chaosagent kind: ChaosAgent shortNames: - ca versions: - name: v1alpha1 # you can serve multiple versions e.g v1beta2 or v1alpha1 served: true storage: true schema: openAPIV3Schema: type: object properties: spec: type: object properties: tantrumMode: type: boolean podTolerance: type: integer additionalPrinterColumns: - name: Tantrum type: boolean description: Kills Pods randomly jsonPath: .spec.tantrumMode - name: Tolerance type: integer description: Total number of Pod to tolerate before randomly killing Pods jsonPath: .spec.podTolerance

We apply this CRD manifest:

$ kubectl apply -f k8s/blackadder-v1alpha1.yml customresourcedefinition.apiextensions.k8s.io/chaosagents.blackadder.io created

We then create an example chaos agent with:

$ cat k8s/edmund.yml apiVersion: blackadder.io/v1alpha1 kind: ChaosAgent metadata: name: princeedmund spec: tantrumMode: true podTolerance: 10

When apply this manifest and then view it, we see:

$ kubectl apply -f k8s/edmund.yml chaosagent.blackadder.io/princeedmund1 created $ kubectl get chaosagents.blackadder.io NAME TANTRUM TOLERANCE princeedmund true 10

We can add more toggles and switches and definitions to our CustomResourceDefintion until it looks like this:

$ kubectl get chaosagents.blackadder.io NAME TANTRUM TOLERANCE CANCER IPSUM EAGERNESS PAUSE EXCLUDED princeedmund true 10 false true 20 30 ["kube-system"]

Note also, that CustomResourceDefinition can do various type input validation, for example we can define eagerness as an integer ranging from 1 to 100:

$ kubectl apply -f k8s/edmund-v1beta1.yml The ChaosAgent "princeedmund" is invalid: spec.eagerness: Invalid value: 200: spec.eagerness in body should be less than or equal to 100

You can find the complete definition of the chaosagents.blackadder.io and the manifests for creating agents in the source code repository accompanying this article.

Adding the controller logic

We have now created a new resource that is stored in Kubernetes and served by kube-api-server. Hence, we can now create the controller logic. We begin by drafting an algorithm in pseudocode which will explain the intended behavior of the chaos agent:

client = connect_to_kubernetes() # retrieves our agent configuration from the kube-api-server chaos_agent = client.get_chaos_agent() while True: pods = client.list_pods(exclude_namespaces) deployments = client.list_deployments(exclude_namespaces) namespaces = client.list_configmaps(exclude_namespaces) if chaos_agent.tantrum: randomly_kill_pods(pods, chaos_agent.tolerance, chaos_agent.eagerness) if chaos_agent.cancer: randomly_scale_deployments(deployments, chaos_agent.eagerness) if chaos_agent.ipsum: randomly_write_configmaps(configmaps, chaos_agent.eagerness) time.sleep(chaos_agent.pause)

The algorithm is pretty naive, but it’s got all the basics of a Kubernetes operator. Obviously,

if you put it under a magnifying glass, there are a lot of obvious possible improvements. For example, for a cluster with hundreds of ConfigMaps and Pods each cycle can take a long while to complete, especially if cancer mode, which randomly scales up Deployments, is also active. However, we are not in the business of premature optimization, so we’ll ignore these limitations, and continue to the actual implementation in Python.

The first thing we need to do, is to get a Kubernetes client so that we can communicate with kube-api-server:

import pykube # automatically detect load in cluster token from # /run/secrets/kubernetes.io/serviceaccount/token in cluster or # from ~/.kube/config config = pykube.KubeConfig.from_env() api = pykube.HTTPClient(config)

With the client we just created, it’s easy to list objects stored in the Kubernetes database. First, let’s create a couple of Pods and a Deployment:

$ kubectl run --image docker.io/nginx test -n kube-public $ kubectl run --image docker.io/nginx test -n default $ kubectl create deployment my-dep --image=nginx --replicas=3

Then we can list them using the interactive Python console, which is extremely handy for discovering the API while prototyping:

$ python -m pykube Pykube v22.7.0, loaded "/home/oznt/.kube/config" with context "etwickler.de". Example commands: [d.name for d in Deployment.objects(api)] # get names of deployments in default namespace list(DaemonSet.objects(api, namespace='kube-system')) # list daemonsets in "kube-system" Pod.objects(api).get_by_name('mypod').labels # labels of pod "mypod" Use Ctrl-D to exit >>> [f"{p.namespace}/{p.name}" for p in Pod.objects(api, namespace=pykube.all) if p.namespace not in ["kube-system"]] ['default/my-dep-84885b44-29vg7', 'default/my-dep-84885b44-l5nkw', 'default/my-dep-84885b44-p2gcd', 'default/test', 'kube-public/test']

The API is very intuitive! Hence, getting ConfigMaps and Deployments is similar. We can also filter objects by labels, or metadata, which we will see later. However, pykube-ng does not have predefined objects for listing ChaosAgents. Using an object factory, we can create such object:

>>> ChaosAgent = pykube.object_factory(api, "blackadder.io/v1beta1", "ChaosAgent") >>> ChaosAgent.objects(api, namespace=pykube.all) <Query of ChaosAgent at 0x7f4997df18d0> >>> list(ChaosAgent.objects(api, namespace=pykube.all)) [<ChaosAgent princeedmund1>] >>> # To access the configuration of princeedmund1 we can do >>> ca = list(ChaosAgent.objects(api, namespace=pykube.all))[0] >>> ca.obj["spec"] {'cancerMode': True, 'eagerness': 20, 'excludedNamespaces':['kube-system'], 'ipsumMode': True, 'pauseDuration': 30, 'podTolerance': 10, 'tantrumMode': True} 'pauseDuration': 30, 'podTolerance': 10, 'tantrumMode': True} >>> # Personally, I find access attributes easier than dict keys, hence: >>> ca = munch.munchify(ca.obj) >>> caobj.spec.tantrumMode True

We now have all that is needed to create the complete controller of a ChaosAgent:

import time import pykube import munch config = pykube.KubeConfig.from_env() api = pykube.HTTPClient(config) ChaosAgent = pykube.object_factory(api, "blackadder.io/v1beta1", "ChaosAgent") # retrieves our agent configuration from the kube-api-server agent = ChaosAgent().objects(api, namespace=pykube.all) agent.config = munch.munchify(agent.obj["spec"]) exclude_namespaces = agent.config.excludedNamespaces def randomly_kill_pods(pods, tolerance, eagerness): pass def randomly_scale_deployments(deployments, eagerness): pass def randomly_write_configmaps(configmaps, eagerness): pass while True: pods = api.list_pods(exclude_namespaces) deployments = api.list_deployments(exclude_namespaces) configmaps = api.list_configmaps(exclude_namespaces) if agent.config.tantrumMode: randomly_kill_pods(pods, agent.config.tolerance, agent.config.eagerness) if agent.config.cancerMode: randomly_scale_deployments(deployments, agent.config.eagerness) if agent.config.ipsumMode: randomly_write_configmaps(configmaps, agent.config.eagerness) time.sleep(agent.config.pauseDuration)

Note, the api instance does not have a method for listing objects with the signature I wrote. There are two ways we can filter objects. The first, naive one, is like I already showed:

>>> [p for p in Pod.objects(api, namespace=pykube.all) if p.namespace not in exclude_namespaces]

This works and might be easy to read if you are versed in Python and like list comprehensions. However, it sends a lot of data between the _kube-api-server and the client. The better way is to select the objects based on field selectors sent to the server:

>>> list(Pod.objects(api).filter(namespace=pykube.all, field_selector="metadata.namespace!=kube-system,metadata.namespace!=default"))

We can repeat this for Deployments and ConfigMaps, but we should not. Instead, we create a generic method and add it to the client instance. We are leveraging on Python’s dynamic types to modify objects at runtime, which can be fun but also dangerous:

from pykube import Pod, Deployment, ConfigMap ... def list_objects(self, k8s_obj, exclude_namespaces): exclude_namespaces = ",".join("metadata.namespace!=" + ns for ns in exclude_namespaces) return list( k8s_obj.objects(api).filter(namespace=pykube.all, field_selector=exclude_namespaces )) ... pykube.HTTPClient.list_objects = list_objects api = pykube.HTTPClient(config) ... while True: pods = api.list_objects(Pod, exclude_namespaces) deployments = api.list_objects(Deployment, exclude_namespaces) configmaps = api.list_objects(ConfigMap, exclude_namespaces) ...

The next steps are implementing the chaos functions. We begin with pod deletion:

def randomly_kill_pods(pods, tolerance, eagerness): if len(pods) < tolerance: return for p in pods: if random.randint(0, 100) < eagerness: p.delete() print(f"Deleted {p.namespace}/{p.name}")

The next chaos function is deployment scaling:

def randomly_scale_deployments(deployments, eagerness): for d in deployments: if random.randint(0, 100) < eagerness: while True: try: d.replicas = if d.replicas < 128: d.replicas = min(d.replicas * 2, 128) d.update() print(f"scaled {d.namespace}/{d.name} to {d.replicas}") break except (requests.exceptions.HTTPError, pykube.exceptions.HTTPError): print(f"error scaling {d.namespace}/{d.name} to {d.replicas}") d.reload() continue

Finally, we implement the function which writes Lorem Ipsum snippets:

def randomly_write_configmaps(configmaps, eagerness): for cm in configmaps: print(f"Checking {cm.namespace}/{cm.name}") if cm.obj.get("immutable"): continue if random.randint(0, 100) < eagerness: for k, v in cm.obj["data"].items(): cm.obj["data"][k] = lorem.paragraph() print(f"Lorem Ipsum in {cm.namespace}/{cm.name}")

With that, the controller code is complete. You can view the complete code in controller.py in the code repository. If you are a seasoned Pythonista, you probably think, that this code can be tremendously improved, or that iterating over a large set of objects is a perfect use case for ConcurrentFutures or aysncio, you are probably right! However, these are optimizations which will mask the purpose of learning Kubernetes Operators. Hence, I will not demonstrate this here, and leave to you as an exercise.

If you followed until now and tried running controller.py, you will see output similar to this:

$ python controller.py Deleted default/my-dep-84885b44-bjg4t Deleted default/my-dep-84885b44-ljvdn ... Lorem Impsum in kube-node-lease/kube-root-ca.crt ... scaled default/my-dep to 4 ...

This works on the local shell with your admin configuration file, which was created for you by minikube. When you deploy the controller to the cluster you will need to give the controller permissions to list, patch and delete Pod, Deployment and ConfigMap objects.

Before we examine how to create these permissions, we create a Docker image and deploy the controller to the cluster.

The Dockerfile is using a multistage build and pipenv to manage dependency installation:

FROM docker.io/python:3.10 AS builder RUN pip install --user pipenv # Tell pipenv to create venv in the current directory ENV PIPENV_VENV_IN_PROJECT=1 ADD Pipfile.lock Pipfile /usr/src/ WORKDIR /usr/src RUN /root/.local/bin/pipenv sync RUN /usr/src/.venv/bin/python3 -c "import pykube; print(pykube.__version__)" FROM docker.io/python:3.10 AS runtime RUN mkdir -v /usr/src/venv COPY --from=builder /usr/src/.venv/ /usr/src/venv/ RUN /usr/src/venv/bin/python3 -c "import pykube; print(pykube.__version__)" WORKDIR /usr/src/ COPY controller.py . CMD ["./venv/bin/python", "-u", "controller.py"]

Then, you need to build the image and push it to a public or private repository:

$ docker build -t oz123/blackadder:0.1 . Sending build context to Docker daemon 166.4kB Step 1/13 : FROM docker.io/python:3.10 AS builder 3.10: Pulling from library/python 1339eaac5b67: Pull complete 4c78fa1b9799: Pull complete
$ docker push oz123/blackadder:0.1 The push refers to repository [docker.io/oz123/blackadder] 2ce87cdce319: Pushed 645d7db6379e: Pushing [==================================================>] 17.26MB 3c924eba81b8: Pushed ...

After that, create a namespace for the controller to be deployed in:

$ kubectl create namespace chaos-operator namespace/chaos-operator created

Before we deploy the controller, we should exclude that namespace from the list of watched namespaces:

$ kubectl patch chaosagents.blackadder.io princeedmund \ --type merge --patch='{"spec": {"excludedNamespaces": ["kube-system", "chaos-operator"]}}' chaosagent.blackadder.io/princeedmund patched

Now, we can create the deployment for the chaos controller:

$ kubectl create deployment blackadder –image=oz123/blackadder:0.1 --replicas=1 \ -n chaos-operator deployment.apps/blackadder created

When you look at the logs of the container, you will see it crashed:

$ kubectl logs -n chaos-operator blackadder-65bc54f7f9-v56bp Traceback (most recent call last): File "/usr/src/controller.py", line 35, in <module> agent = list(ChaosAgent.objects(api, namespace=pykube.all))[0] File "/usr/src/venv/lib/python3.10/site-packages/pykube/query.py", line 195, in __iter__ return iter(self.query_cache["objects"]) File "/usr/src/venv/lib/python3.10/site-packages/pykube/query.py", line 185, in query_cache cache["response"] = self.execute().json() File "/usr/src/venv/lib/python3.10/site-packages/pykube/query.py", line 160, in execute r.raise_for_status() File "/usr/src/venv/lib/python3.10/site-packages/requests/models.py", line 1021, in raise_for_status raise HTTPError(http_error_msg, response=self) Requests.exceptions.HTTPError : 403 Client Error: Forbidden for url: https://10.96.0.1:443/apis/blackadder.io/v1beta1/chaosagents

That is because the service account for the namespace has no permissions to list ChaosAgent objects.

To fix this, we need to define a ClusterRole and ClusterRoleBinding and assign them to the user which runs the controller.

apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRole metadata: name: blackadder rules: - apiGroups: ["apps"] resources: ["deployments"] verbs: ["get", "list", "patch"] - apiGroups: ["blackadder.io"] resources: ["chaosagents"] verbs: ["get", "list"] - apiGroups: [""] resources: ["pods"] verbs: ["get", "list", "delete"] - apiGroups: [""] resources: ["configmaps"] verbs: ["get", "list", "patch"]

The ClusterRoleBinding is defined with:

apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding metadata: name: blackadder roleRef: apiGroup: rbac.authorization.k8s.io kind: ClusterRole name: blackadder subjects: - apiGroup: rbac.authorization.k8s.io kind: User name: system:serviceaccount:chaos-operator:default

We apply these manifests:

$ kubectl apply -f k8s/clusterrole.yml clusterrole.rbac.authorization.k8s.io/blackadder created $ kubectl apply -f k8s/clusterrolebinding.yml clusterrolebinding.rbac.authorization.k8s.io/blackadder created

Once you restart the Pod, you will see it running and doing its job. Note that, in the final version of the controller, the while True loop is moved into a main function, such that the code looks like this:

# this is docker label oz123/blackadder:0.1.1 def main(): while True: pods = api.list_objects(Pod, exclude_namespaces) deployments = api.list_objects(Deployment, exclude_namespaces) configmaps = api.list_objects(ConfigMap, exclude_namespaces) ... if __name__ == "__main__": print("This is the blackadder version 0.1") print("Ready to start a havoc in your cluster") main()

When you watch the controller logs, you will see:

$ kubectl logs -n chaos-operator blackadder-7695b89559-8q4qp This is the blackadder version 0.1.1 Ready to start a havoc in your cluster Checking default/kube-root-ca.crt Checking kube-node-lease/kube-root-ca.crt Checking kube-public/cluster-info ...

With that, we are finished implementing the chaos controller part of the Blackadder Operator.

Summary

We have seen that it is easy to create Kubernetes Operators with Python. Creating Operators allows us to extend Kubernetes in ways that fit our needs, and which the original developers of Kubernetes might have not thought of. We can create controller logic for anything that Python can access inside the cluster or outside the cluster, and by using a CustomResourceDefinition we can store information in the Kubernetes database, which can be used to configure the Operator, or save data on the objects the controller works on.

I truly hope you enjoyed reading this article and that you are feeling comfortable enough to embark on your own journey to extend Kubernetes with your own Python Operators.

Related Articles

Blog Posts

Enterprise challenges for containers and Kubernetes

Read our article
Be the first to receive the latest on
K8s, Palette, our upcoming webinar, events, and much more!

We are using the information you provide to us to send you our montly newsletter. You may unsubscribe at any time.
For more information, please see our Privacy Policy.

Spectro Cloud uniquely enables organizations to manage Kubernetes in production, at scale. Our Palette management platform gives effortless control of the full Kubernetes lifecycle, across clouds, data centers, bare metal and edge environments.
Follow us
Follow us

© 2022 Spectro Cloud®. All rights reserved.