Using Google’s Kubernetes to build a distributed task management cluster

March 24, 2015

Kubernetes logo

Introduction

Computer clusters have been with us quite a few years now in one form or another, but several trends have come together to make them incredibly important today. Low-cost commodity hardware, ubiquitous fast networking, and solid distributed systems have all helped usher in today’s era of server farms and “Big Data”, things which make clusters a critical tool.

In a cluster, multiple computers (or nodes) are connected together through a network. Each node is nominally independent, with its own CPU and operating system, but together they function as though they are a single system and focus on solving a common task. Clusters are often used for things like parallel processing and load balancing.

There are numerous solutions you can use to create and manage clusters. In fact, Google is working on providing their own solution called Kubernetes (named after the Greek word for “Commander”). Kubernetes is a fully open source system for managing containerized applications that are spread across multiple nodes in a cluster. Although it’s still currently in beta, it’s already functional and has numerous features.

In this post, I’ll guide you through an example that Endocode has contributed to the Kubernetes codebase, namely how to set up a distributed task queue using Celery, a RabbitMQ and Flower.

Pods

To get started, you need to know about the most fundamental of Kubernetes’ concepts, namely ” pods”. There are a few additional concepts to learn as well, but I’ll explain those below as we encounter them.

A pod is a co-located group of applications running within a shared context and with access to shared volumes. Those applications are tightly coupled, and in a more classic setup they would have executed on the same physical or virtual host. In more concrete terms, a pod is a collection of colocated containers. Each pod has an IP address in a flat shared networking namespace that has full communication with other physical computers and containers across the network.

Pods live on things called “minions”. These are nodes that use container technology (e.g. Docker) to control pods that they host. You can have multiple minions and each minion can host many pods. See the diagram below to see how these things fit together.

architecture

Architecture of Kubernetes (click for larger version)

We’re going to use pods when deploying the components of our distributed task queue. Speaking of which…

The Setup

Celery

Celery is a distributed task queue based on message passing. It’s used to create tasks which are then executed on one or more worker nodes, either synchronously or asynchronously.

RabbitMQ

Since Celery is based on message passing, it requires some middleware (to handle translation of the message between sender and receiver) called a ”message broker”. RabbitMQ is a message broker often used in conjunction with Celery.

Flower

Flower is a web-based tool for monitoring and administering Celery clusters.

Goal

At the end of the example, we will have:

  • Three pods:
    • Celery task queue
    • RabbitMQ message broker
    • Flower frontend
  • A service that provides access to the message broker (more on what a service is later)
  • A basic celery task that can be passed to the worker node

Getting started

When you initialise a Kubernetes cluster, there are a couple of things to decide:

  • What kind of provider will you use?
  • How many minions do you want?

There are several providers, including cloud-based ones like the Google Compute Engine and Amazon EC2. If you want to test locally, you can use Vagrant. Choose your provider by setting an environment variable. In this example, we’re going to use Vagrant to create virtual machines on our local host, so we set the variable like so:

export KUBERNETES_PROVIDER=vagrant

To get the most of this example, ensure that Kubernetes will create more than one minion by setting this environment variable:

export NUM_MINIONS=2

Once that’s done, you can instruct Kubernetes to bring up a cluster using kubectl, the command-line tool used for controlling Kubernetes (see the documentation on how to do that).

Step 1: Fire up a RabbitMQ pod

A pod containing the RabbitMQ broker can be created by giving a file containing the specification to Kubernetes. Under examples/celery-rabbitmq/ you’ll find a file called rabbitmq-controller.json containing this:

{
  "id": "rabbitmq-controller",
  "kind": "ReplicationController",
  "apiVersion": "v1beta1",
  "desiredState": {
    "replicas": 1,
    "replicaSelector": {"name": "rabbitmq"},
    "podTemplate": {
      "desiredState": {
        "manifest": {
          "version": "v1beta1",
          "id": "rabbitmq",
          "containers": [{
            "name": "rabbitmq",
            "image": "dockerfile/rabbitmq",
            "cpu": 100,
            "ports": [{"containerPort": 5672, "hostPort": 5672}]
          }]
        }
      },
      "labels": {
        "name": "rabbitmq",
        "app": "taskQueue"
      }
    }
  },
  "labels": {
    "name": "rabbitmq"
  }
}

Running $ kubectl create -f examples/celery-rabbitmq/rabbitmq-controller.json brings up a replication controller that ensures one pod exists which is running a RabbitMQ instance.

Note that bringing up the pod includes pulling down a docker image, which may take a few moments. When it’s ready, the status will switch from “Pending” to “Running”. This applies to all other pods in this example.

Replication controllers are one of the other basic concepts of Kubernetes. They ensure that a specified number of pods are running at any one time by creating or destroying as required. They’re particularly helpful when it comes to things like load balancing, but they’re still recommended when you simply want to create a single pod.

Step 2: Start the RabbitMQ service

The Celery task queue will need to communicate with the RabbitMQ broker. RabbitMQ will eventually appear on a separate pod, but it’s important to realise that pods are ”ephemeral” — that is, they can be dynamically destroyed and recreated. This means that the IP addresses of a pod cannot reliably remain the same over time. We therefore need to make sure that any program wanting to connect to the pod can keep track of where it is.

For this, Kubernetes provides things called services. They act like a proxy, providing a layer of indirection between a pod and its clients, thus ensuring the clients can always connect to the pod.

In this example, we need a service that can transparently route requests to RabbitMQ. One is defined in the file examples/celery-rabbitmq/rabbitmq-service.json:

{
  "id": "rabbitmq-service",
  "kind": "Service",
  "apiVersion": "v1beta1",
  "port": 5672,
  "containerPort": 5672,
  "selector": {
    "app": "taskQueue"
  },
  "labels": {
    "name": "rabbitmq-service"
  }
}

To start the service, run:

$ kubectl create -f examples/celery-rabbitmq/rabbitmq-service.json

This service allows other pods to connect to the RabbitMQ instance. To them, it will be seen as available on port 5672, although the service is routing the traffic to the container (also via port 5672).

Step 3: Fire up Celery

Bringing up the celery worker is done by running $ kubectl create -f examples/celery-rabbitmq/celery-controller.json, which contains this:

{
  "id": "celery-controller",
  "kind": "ReplicationController",
  "apiVersion": "v1beta1",
  "desiredState": {
    "replicas": 1,
    "replicaSelector": {"name": "celery"},
    "podTemplate": {
      "desiredState": {
        "manifest": {
          "version": "v1beta1",
          "id": "celery",
          "containers": [{
            "name": "celery",
            "image": "endocode/celery-app-add",
            "cpu": 100,
            "ports": [{"containerPort": 5672, "hostPort": 5672}]
          }]
        }
      },
      "labels": {
        "name": "celery",
        "app": "taskQueue"
      }
    }
  },
  "labels": {
    "name": "celery"
  }
}

There are several things to point out here…

Like the RabbitMQ controller, this controller ensures that there is always a pod running a Celery worker instance. The celery-app-add Docker image is an extension of the standard Celery image. This is the Dockerfile:

FROM dockerfile/celery

ADD celery_conf.py /data/celery_conf.py
ADD run_tasks.py /data/run_tasks.py
ADD run.sh /usr/local/bin/run.sh

ENV C_FORCE_ROOT 1

CMD ["/bin/bash", "/usr/local/bin/run.sh"]

The celery_conf.py contains the defintion of a simple Celery task that adds two numbers. This last line starts the Celery worker.

NOTE: ENV C_FORCE_ROOT 1 forces Celery to be run as the root user, which is ”not” recommended in production!

The celery_conf.py file contains the following:

import os

from celery import Celery

# Get Kubernetes-provided address of the broker service
broker_service_host = os.environ.get('RABBITMQ_SERVICE_SERVICE_HOST')

app = Celery('tasks', broker='amqp://guest@%s//' % broker_service_host, backend='amqp')

@app.task
def add(x, y):
    return x + y

This example is adapted from the official Getting Started guide. We’ve only really added one new thing to the example, which is the line:

broker_service_host = os.environ.get('RABBITMQ_SERVICE_SERVICE_HOST')

This environment variable contains the IP address of the RabbitMQ service we created in step 1. Kubernetes automatically provides this environment variable to all containers which have the same “app” label as that defined in the RabbitMQ service (in this case “taskQueue”). In the Python code above, this has the effect of automatically filling in the broker address when the pod is started.

The second python script (run_tasks.py) periodically executes the add() task every 5 seconds with a couple of random numbers.

The question now is, how do you see what’s going on?

Step 4: Put a frontend in place

By connecting a frontend to the node that contains Celery, you can see the behaviour of all the workers and their tasks in real-time. For this purpose, we use Flower.

To bring up the frontend, run this command $ kubectl create -f examples/celery-rabbitmq/celery-controller.json. This controller is defined like so:

{
  "id": "flower-controller",
  "kind": "ReplicationController",
  "apiVersion": "v1beta1",
  "desiredState": {
    "replicas": 1,
    "replicaSelector": {"name": "flower"},
    "podTemplate": {
      "desiredState": {
        "manifest": {
          "version": "v1beta1",
          "id": "flower",
          "containers": [{
            "name": "flower",
            "image": "endocode/flower",
            "cpu": 100,
            "ports": [{"containerPort": 5555, "hostPort": 5555}]
          }]
        }
      },
      "labels": {
        "name": "flower",
        "app": "taskQueue"
      }
    }
  },
  "labels": {
    "name": "flower"
  }
}

This will bring up a new pod with Flower installed and port 5555 (Flower’s default port) exposed. This image uses the following command to start Flower:

flower --broker=amqp://guest:guest@${RABBITMQ_SERVICE_SERVICE_HOST:localhost}:5672//

Again, it uses the Kubernetes-provided environment variable to obtain the address of the RabbitMQ service.

Once all pods are up and running, running kubectl get pods will display something like this:

POD                         IP                  CONTAINER(S)        IMAGE(S)                           HOST                    LABELS                                                STATUS
celery-controller-h3x9k     10.246.1.11         celery              endocode/celery-app-add            10.245.1.3/10.245.1.3   app=taskQueue,name=celery                             Running
flower-controller-cegta     10.246.2.17         flower              endocode/flower                    10.245.1.4/10.245.1.4   app=taskQueue,name=flower                             Running
kube-dns-fplln              10.246.1.3          etcd                quay.io/coreos/etcd:latest         10.245.1.3/10.245.1.3   k8s-app=kube-dns,kubernetes.io/cluster-service=true   Running
                                                kube2sky            kubernetes/kube2sky:1.0                                                                                          
                                                skydns              kubernetes/skydns:2014-12-23-001                                                                                 
rabbitmq-controller-pjzb3   10.246.2.16         rabbitmq            dockerfile/rabbitmq                10.245.1.4/10.245.1.4   app=taskQueue,name=rabbitmq                           Running

Now you know on which host Flower is running (in this case, 10.245.1.4), you can open your browser and enter the address (e.g. http://10.245.1.4:5555). If you click on the tab called “Tasks”, you should see an ever-growing list of tasks called “celery_conf.add” which the run_tasks.py script is dispatching.

Summary

So there we have it. We’ve set up a distributed task queue and also put a frontend in place to monitor it, all using a Kubernetes cluster.

This was just one example. On GitHub you can find many more examples of using Kubernetes.