Building a stream processing pipeline with Kafka, Storm and Cassandra – Part 2: Using Docker Containers

April 22, 2015

Docker

In case you missed it, part 1 of this series introduced the applications that we’re going to use and explained how they work individually.

In this post, we’ll see how to run Zookeeper, Kafka, Storm and Cassandra clusters inside Docker containers on a single host. We’re going to use Ubuntu 14.04 LTS as the base operating system.

Introducing Docker

Docker is a software platform used for the packaging and deployment of applications which are then run on a host operating system in their own isolated environment. While it occupies the same category as virtual machines (namely ‘virtualization software’), Docker uses a container-based approach which is a rather different way of achieving virtualization. Instead of simulating a whole other ‘guest’ OS, containers instead take advantage of the host OS’s facilities for isolating processes from others – such an isolated process is said to be ‘containerized’.

In the case of Docker, it uses certain features of the Linux kernel (like namespacing and cgroups) to give containerized applications their own private space in the OS. This is particularly advantageous for building distributed systems where you can use Docker containers to deploy multiple applications more simply and as needed.

VM vs Container

To learn more, check out Docker’s web page.

Deploying Apache Zookeeper

So let’s get building. As an underlying entity, let’s review how to start Apache Zookeeper in a Docker container. To build a Docker container, we need to write the instructions into a Dockerfile, which we will now do step-by-step.

Fortunately Ubuntu 14.04 contains Zookeeper version 3.4.5 in its package repository. The Dockerfile for a Zookeeper installation looks like this:

FROM ubuntu:trusty
RUN apt-get update && apt-get dist-upgrade -y
RUN apt-get install -y zookeeper

This Dockerfile tells Docker to use the basic Ubuntu 14.04 container image as a starting point (FROM ubuntu:trusty ). It then uses the package manager to additionally build Zookeeper into this image as well.

We will also change the Zookeeper log destination from file to console by adding this to the Dockerfile:

RUN sed -i 's/ROLLINGFILE/CONSOLE/' /etc/zookeeper/conf/environment

The most basic way to get Zookeeper started is as a single-node instance, which we can do by running zkServer.sh.

CMD [“/usr/share/zookeeper/bin/zkServer.sh”, “start-foreground”]

However, we want to start a Zookeeper cluster. The recommended minimum Zookeeper quorum is three Zookeeper nodes. Before starting the cluster we should define its topology in /etc/zookeeper/zoo.cfg  like this:

server.1=zoo1:2888:3888
server.2=zoo2:2888:3888
server.3=zoo3:2888:3888

And each Zookeeper node should have its own unique ID (1-255) in the /etc/zookeeper/conf/myid file.

To avoid having to edit the configuration for each Docker container, let’s create a simple start handler script called start.sh  which parses the Docker environment variables, sets the Zookeeper configuration and starts the Zookeeper server in the foreground:

#!/bin/bash
if [[ -z "${ZK_ID}" || -z "${ZK_SERVERS}" ]]; then
       echo "Please set ZK_ID and ZK_SERVERS environment variables first."
       exit 1
fi
echo "${ZK_SERVERS}" | tr ' ' '\n' | tee -a /etc/zookeeper/conf/zoo.cfg
echo "${ZK_ID}" | tee /var/lib/zookeeper/myid
/usr/share/zookeeper/bin/zkServer.sh start-foreground

The final Dockerfile should look like this:

FROM ubuntu:trusty
RUN apt-get update && apt-get dist-upgrade -y
RUN apt-get install -y zookeeper
RUN apt-get clean && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
RUN sed -i 's/ROLLINGFILE/CONSOLE/' /etc/zookeeper/conf/environment
ADD start.sh /usr/local/bin/
CMD ["/usr/local/bin/start.sh"]

Now let’s put our Dockerfile and start.sh into the zookeeper_docker  directory and build the docker image:

docker build -t endocode/zookeeper zookeeper_docker

Then start the Zookeeper cluster in some Docker containers (we will use the default Docker network interface 172.17.42.1):

$ docker run --rm -ti -e ZK_SERVERS="server.1=172.17.42.1:2888:3888 server.2=172.17.42.1:2889:3889 server.3=172.17.42.1:2890:3890" -e ZK_ID=1 --publish 2181:2181 --publish 2888:2888 --publish 3888:3888 endocode/zookeeper
$ docker run --rm -ti -e ZK_SERVERS="server.1=172.17.42.1:2888:3888 server.2=172.17.42.1:2889:3889 server.3=172.17.42.1:2890:3890" -e ZK_ID=2 --publish 2182:2181 --publish 2889:2889 --publish 3889:3889 endocode/zookeeper
$ docker run --rm -ti -e ZK_SERVERS="server.1=172.17.42.1:2888:3888 server.2=172.17.42.1:2889:3889 server.3=172.17.42.1:2890:3890" -e ZK_ID=3 --publish 2183:2181 --publish 2890:2890 --publish 3890:3890 endocode/zookeeper

We can check if the Zookeeper cluster works with this command

$ for i in {2181..2183}; do echo mntr | nc 172.17.42.1 $i | grep zk_followers ; done

which should return zk_followers 2  as output.

Deploying Apache Kafka

Let’s now write a Dockerfile for a Kafka image. Unlike with Zookeeper, there is no Kafka package in the Ubuntu repository. Instead, we’ll tell Docker to download and unpack a tarball obtained from the Kafka download page.

At the time of writing, the latest Kafka version is 0.8.2.1 and it is recommended to use Scala version 2.10. Being a Scala project, Kafka requires the JVM and it should work fine on OpenJDK 7. The Kafka Dockerfile should initially look like this:

RUN apt-get update && apt-get dist-upgrade -y
FROM ubuntu:trusty
RUN apt-get install -y tar openjdk-7-jre-headless wget
RUN apt-get clean && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
RUN wget -q http://artfiles.org/apache.org/kafka/0.8.2.0/kafka_2.10-0.8.2.0.tgz -O /tmp/kafka_2.10-0.8.2.0.tgz
RUN tar xfz /tmp/kafka_2.10-0.8.2.0.tgz -C /opt

Similarly to Zookeeper, we will write a start-up handler script (start-kafka.sh ) for Kafka container configuration:

#!/bin/bash
if [[ -z "$KAFKA_BROKER_ID" ]]; then
   export KAFKA_BROKER_ID=$KAFKA_ADVERTISED_PORT
fi
if [[ -z "$KAFKA_LOG_DIRS" ]]; then
   export KAFKA_LOG_DIRS="/kafka/kafka-logs-$KAFKA_BROKER_ID"
fi
if [[ -n "$KAFKA_HEAP_OPTS" ]]; then
   sed -r -i "s/(export KAFKA_HEAP_OPTS)=\"(.*)\"/\1=\"$KAFKA_HEAP_OPTS\"/g" $KAFKA_HOME/bin/kafka-server-start.sh
   unset KAFKA_HEAP_OPTS
fi
for VAR in `env`
do
 if [[ $VAR =~ ^KAFKA_ && ! $VAR =~ ^KAFKA_HOME ]]; then
   kafka_name=`echo "$VAR" | sed -r "s/KAFKA_(.*)=.*/\1/g" | tr '[:upper:]' '[:lower:]' | tr _ .`
   env_var=`echo "$VAR" | sed -r "s/(.*)=.*/\1/g"`
   if egrep -q "(^|^#)$kafka_name=" $KAFKA_HOME/config/server.properties; then
       sed -r -i "s@(^|^#)($kafka_name)=(.*)@\2=${!env_var}@g" $KAFKA_HOME/config/server.properties #note that no config values may contain an '@' char
   else
       echo "$kafka_name=${!env_var}" >> $KAFKA_HOME/config/server.properties
   fi
 fi
done

$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties

The Dockerfile then needs updating so that Kafka’s home directory is set and the start-up script is defined as the main application for this container.

ENV KAFKA_HOME /opt/kafka_2.10-0.8.2.0
ADD start-kafka.sh /usr/local/bin/start-kafka.sh
CMD start-kafka.sh

Then, we can start a Kafka cluster of three containers:

$ docker run --rm -ti -e KAFKA_HEAP_OPTS='-Xmx64M -Xms64M' -e KAFKA_BROKER_ID=1 -e KAFKA_ZOOKEEPER_CONNECT="172.17.42.1:2181,172.17.42.1:2182,172.17.42.1:2183" -e KAFKA_ADVERTISED_PORT=9092 -e KAFKA_ADVERTISED_HOST_NAME=172.17.42.1 --publish 9092:9092 endocode/kafka
$ docker run --rm -ti -e KAFKA_HEAP_OPTS='-Xmx64M -Xms64M' -e KAFKA_BROKER_ID=2 -e KAFKA_ZOOKEEPER_CONNECT="172.17.42.1:2181,172.17.42.1:2182,172.17.42.1:2183" -e KAFKA_ADVERTISED_PORT=9093 -e KAFKA_ADVERTISED_HOST_NAME=172.17.42.1 --publish 9093:9092 endocode/kafka
$ docker run --rm -ti -e KAFKA_HEAP_OPTS='-Xmx64M -Xms64M' -e KAFKA_BROKER_ID=3 -e KAFKA_ZOOKEEPER_CONNECT="172.17.42.1:2181,172.17.42.1:2182,172.17.42.1:2183" -e KAFKA_ADVERTISED_PORT=9094 -e KAFKA_ADVERTISED_HOST_NAME=172.17.42.1 --publish 9094:9092 endocode/kafka

Once this cluster is up, we can do several things, including:

  • Create Kafka topics:
$ docker run --rm -ti -e ZK="172.17.42.1:2181,172.17.42.1:2182,172.17.42.1:2183" endocode/kafka bash -c "\$KAFKA_HOME/bin/kafka-topics.sh --create --topic topic --partitions 3 --zookeeper \$ZK --replication-factor 2"
  • Get topic information:
$ docker run --rm -ti -e ZK="172.17.42.1:2181,172.17.42.1:2182,172.17.42.1:2183" endocode/kafka bash -c "\$KAFKA_HOME/bin/kafka-topics.sh --describe --topic topic --zookeeper \$ZK"
  • Write messages to topic (be the producer):
$ docker run --rm -ti -e ZK="172.17.42.1:2181,172.17.42.1:2182,172.17.42.1:2183" endocode/kafka bash -c "\$KAFKA_HOME/bin/kafka-console-producer.sh --topic=topic --broker-list=172.17.42.1:9092"
  • And read messages (be the consumer):
$ docker run --rm -ti -e ZK="172.17.42.1:2181,172.17.42.1:2182,172.17.42.1:2183" endocode/kafka bash -c "\$KAFKA_HOME/bin/kafka-console-consumer.sh --zookeeper \$ZK --topic topic –from-beginning"

Deploying Apache Storm

Configuring the Dockerfile for Apache Storm is a little bit more complicated. Each Storm Supervisor container should run storm-supervisor and storm-logviewer for log view, but there is no easy way to run several daemonized processes inside Docker containers.

We will use supervisord, a Python-based process control system to run several storm processes inside the Docker container. We’ve adjusted some existing Dockerfiles (available on wurstmeister’s GitHub page) which have similar start-up scripts to handle environment variables passed to the Docker container.

What we need is to start one storm-nimbus, one storm-ui and at least one storm-supervisor container (it is not possible to set the port of each Zookeeper node in the storm.yaml configuration file, so we’ll use only one Zookeeper node in this example):

$ docker run --rm -ti -e NIMBUS_HOST="172.17.42.1" -e ZK="172.17.42.1" --publish 3772:3772 --publish 3773:3773 --publish 6627:6627 endocode/storm-nimbus:0.9.2
$ docker run --rm -ti -e HOST_NAME="172.17.42.1" -e NIMBUS_HOST="172.17.42.1" -e ZK="172.17.42.1:2181,172.17.42.1:2182,172.17.42.1:2183" --publish 8080:8080 endocode/storm-ui:0.9.2
$ docker run --rm -ti -e HOST_NAME="172.17.42.1" -e NIMBUS_HOST="172.17.42.1" -e ZK="172.17.42.1" --publish 6700:6700 --publish 6701:6701 --publish 6702:6702 --publish 6703:6703 --publish 8000:8000 endocode/storm-supervisor:0.9.2

Now we can view our Storm cluster status at the IP address [http://172.17.42.1:8080].

We can also submit an example Storm topology to Nimbus using our own Docker image endocode/devel-node:0.9.2 (with Apache Storm 0.9.2, as https://github.com/Yelp/pyleus doesn’t support Storm 0.9.3 at the time of writing):

$ docker run --rm -ti -e NIMBUS_HOST="172.17.42.1" -e ZK="172.17.42.1" endocode/devel-node:0.9.2 start-shell.sh "pyleus build /root/pyleus/examples/word_count/pyleus_topology.yaml && pyleus submit -n 172.17.42.1 word_count.jar"

endocode/devel-node is our own special development container which has all necessary tools for intercommunication with Zookeeper, Kafka, Storm and Cassandra clusters.

Deploying Apache Cassandra

Cassandra packages are available through the Datastax repository, but Datastax Cassandra requires Oracle Java version 7. To ensure this is available, the Dockerfile downloads the default tarball, builds a deb package and installs Oracle Java 7 like a regular deb package.

The Cassandra container uses its own start handler to parse Docker environment variables. You can see the Cassandra Dockerfile in our demo repo (we’ve adjusted the Spotify Dockerfile available on their github page).

This command starts the initial Cassandra Seed node (the start-up script will generate a password for Cassandra JMX; please use it while accessing the Cassandra cluster through the nodetool):

$ docker run --rm -ti --name cassandra_seed -e CASSANDRA_CLUSTERNAME="cluster" -e CASSANDRA_LOCAL_JMX=no endocode/cassandra

To start additional nodes just run:

$ docker run --rm -ti -e CASSANDRA_CLUSTERNAME="cluster" -e CASSANDRA_SEEDS=`docker inspect --format '{{ .NetworkSettings.IPAddress }}' cassandra_seed` endocode/cassandra

When that’s done, you can check your Cassandra cluster status:


$ docker run --rm -ti --entrypoint=/bin/bash endocode/cassandra nodetool -h`docker inspect --format '{{ .NetworkSettings.IPAddress }}' cassandra_seed` status -u cassandra -pw %GENERATED_JMX_PASSWORD%
Datacenter: datacenter1
=======================                                                  
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address       Load       Tokens  Owns (effective)  Host ID                               Rack
UJ  172.17.0.108  48.64 KB   256     ?                 25f5a86a-c6bb-4b9c-a7d7-12bfc1aa804d  rack1
UN  172.17.0.101  45.85 KB   256     100.0%            252c19ad-ab89-4e94-a447-b1c6d0577890  rack1

You can also play with your Cassandra cluster using the CQL shell. For example, you could create a keyspace and table:


$ docker run --rm -ti --entrypoint=/bin/bash endocode/cassandra /usr/bin/cqlsh `docker inspect --format '{{ .NetworkSettings.IPAddress }}' cassandra_seed`
Connected to cluster at 172.17.0.101:9160.
[cqlsh 4.1.1 | Cassandra 2.0.14 | CQL spec 3.1.1 | Thrift protocol 19.39.0]
Use HELP for help.
cqlsh> CREATE KEYSPACE testkeyspace WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };
cqlsh> CREATE TABLE testkeyspace.users ( userid text PRIMARY KEY, first_name text, last_name text, emails set<text>, top_scores list<int>, todo map<timestamp, text>);
cqlsh> SELECT * FROM testkeyspace.users;

(0 rows)

Summary

In this post, we have seen how to containerize the individual applications that will make up our pipeline using Docker. Even when we can reuse existing Dockerfiles and pre-built Docker images, we saw that it can get rather complicated.

In the next and final post of this series, we will how can we combine all these applications into one working system and we will see how this work can be made easier with help from CoreOS. In the end, our pipeline will looks something like this:

Posts in this series