Building a stream processing pipeline with Kafka, Storm and Cassandra – Part 3: Using CoreOS

May 6, 2015

CoreOS

In part 2 of this series, we learned about Docker and how you can use it to deploy the individual components of a stream processing pipeline by containerizing them. In the process, we also saw that it is can get a little complicated.

This part will show how to tie all the components together using CoreOS. We already introduced CoreOS in part 1 of this series, so go back and take a look if you need to familiarize yourself. You’ll see how to install CoreOS and get it running, as well as how to use it to control the component Docker containers and bring them together to get the pipeline flowing.

Installing CoreOS

There are many ways to obtain CoreOS and get it running, including in the cloud, directly on your machine or in a virtual machine. You can find a list of platforms and instructions for how run CoreOS at the official website. In this example, we will show you how to run a CoreOS cluster using Vagrant.

Here are CoreOS technologies we will use in this demo:

  • cloud-config: A single-file YAML-formatted system configuration. It contains settings for all base CoreOS components.

  • etcd: A Zookeeper-like key-value distributed database.

  • fleet: A distributed systemd wrapper/handler for managing units.

  • flannel: An overlay network that gives a subnet to each machine.

To make things easier, we provide you with a ready-made project (including many of the components covered in the previous post) available via our GitHub repository.

Running a CoreOS cluster

Before starting, we recommend that your host machines have at least 16Gb of RAM and 30Gb of hard drive space.

Install Vagrant (version 1.6 or greater) and VirtualBox. Our demo project has a Vagrant setup you can use to get started, so clone the demo project to your local machine and tell Vagrant to bring up the machines.

$ git clone https://github.com/endocode/CoreOS-Kafka-Storm-Cassandra-cluster-demo
$ cd CoreOS-Kafka-Storm-Cassandra-cluster-demo/coreos-vagrant
$ vagrant up

Vagrant will create a CoreOS cluster with the following config file:

#cloud-config
---
coreos:
units:
- name: etcd.service
command: start
- name: fleet.service
command: start
- name: flanneld.service
drop-ins:
#flanneld configuration. Uses etcd to store configuration for flannel virtual overlay network subnet.
- name: 50-network-config.conf
content: |
[Service]
ExecStartPre=/usr/bin/etcdctl set /coreos.com/network/config '{ "Network": "10.1.0.0/16" }'
command: start
etcd:
discovery: https://discovery.etcd.io/77d8174a8e3d88d513c9f84d9c0c7c68
# listen address and port for client communication
addr: $public_ipv4:4001
# listen address and port for server communication
peer-addr: $public_ipv4:7001
fleet:
# listen fleet socket on IP
public-ip: $public_ipv4
# set custom metadata for each CoreOS host, in this case we've solved chicken-egg problem and set zookeeper topology before Zookeeper quorum start
metadata: zookeeperid=$zookeeperid

Once created, check your CoreOS cluster by running this command:

$ vagrant ssh core-01 -- -A fleetctl list-machines
MACHINE IP METADATA
41c56f34... 172.17.8.101 zookeeperid=1
b78e09a5... 172.17.8.102 zookeeperid=2
d992f125... 172.17.8.103 zookeeperid=3

Adapting our Docker commands to fleet

Now, you should recall from the previous post that our applications are Dockerized. When using CoreOS they remain so, but we will need to adapt the Docker commands to the format of a fleet unit.

Let’s start with Zookeeper:

$ docker run --rm -ti -e ZK_SERVERS="server.1=172.17.42.1:2888:3888 server.2=172.17.42.1:2889:3889 server.3=172.17.42.1:2890:3890" -e ZK_ID=1 --publish 2181:2181 --publish 2888:2888 --publish 3888:3888 endocode/zookeeper

The file format of a fleet unit is based on systemd, although there is a slight difference between the fleet unit template and a simple unit file. For Zookeeper we will use the fleet template since we need to run several instances. The fleet template filename will look like this: servicename@.service

First of all let’s describe our fleet template file:

[Unit]
Description=zookeeper
# systemd should start that unit after the docker service is started and running
After=docker.service
Requires=docker.service

Then we should describe our service:

[Service]
# Stop and remove any old containers with the %p-%i name (unit filename prefix and instance - %p@%i.service).
ExecStartPre=-/usr/bin/docker kill %p-%i
ExecStartPre=-/usr/bin/docker rm %p-%i
# Make sure we use the latest Docker image
ExecStartPre=/usr/bin/docker pull endocode/%p
# Configure Zookeeper through the Docker environment variable and start the container
# ZK_SERVERS are based on CoreOS instances’ metadata
# ZK_ID is based on instance name.
ExecStart=/usr/bin/bash -c "ZK_SERVERS=$(/usr/bin/fleetctl list-machines -fields='ip,metadata' -no-legend=true | awk -F'[=\t]' '{print \"server.\"$3\"=\"$1\":2888:3888\"}' | tr '\n' ' ') && exec /usr/bin/docker run --name %p-%i -e JVMFLAGS='-Xmx64m -Xms64M' -e ZK_SERVERS=\"$ZK_SERVERS\" -e ZK_ID=%i --hostname %H --publish 2181:2181 --publish 2888:2888 --publish 3888:3888 endocode/%p"
# Run Docker command to stop the container
ExecStop=/usr/bin/docker stop %p-%i
# Wait 15 minutes for the Docker container start. It is necessary since we need to pull the Docker container from the registry which could take some time.
TimeoutStartSec=900s

Then come the fleet-related options:

[X-Fleet]
# We will not run Zookeeper on the host which already has a running Zookeeper
Conflicts=%p@*.service
# Run a zookeeper instance only on the host with corresponding metadata
MachineMetadata=zookeeperid=%i

Let’s also look at the Cassandra unit template:

[Unit]
Description=cassandra
After=docker.service
Requires=docker.service

[Service]
# Set Cassandra cluster name and custom SSL storage port because 7001 port is kept busy by the etcd daemon.
Environment=CASSANDRA_CLUSTERNAME=cluster CASSANDRA_SSL_STORAGE_PORT=7002
# Use /etc/environment file and don’t fail (-) if the file is not available.
EnvironmentFile=-/etc/environment
ExecStartPre=-/usr/bin/docker kill %p-%i
ExecStartPre=-/usr/bin/docker rm %p-%i
ExecStartPre=/usr/bin/docker pull endocode/%p
# Set Cassandra node IP address into etcd storage.
ExecStartPre=/usr/bin/bash -c "echo $${COREOS_PUBLIC_IPV4:-$$(hostname -i)} | /usr/bin/etcdctl set /cassandra_%i"
# Wait for the first Cassandra node IP which is supposed to be Seed node
ExecStartPre=/usr/bin/bash -c "while [[ ! $$(/usr/bin/etcdctl get /cassandra_1) ]]; do echo 'Waiting for Cassandra Seed node'; sleep 1; done; echo 'Cassandra Seed node is UP'; /usr/bin/etcdctl get /cassandra_1"
ExecStart=/usr/bin/bash -c "BROADCAST_ADDR=$${COREOS_PUBLIC_IPV4:-$$(hostname -i)} && CASSANDRA_SEEDS=$(/usr/bin/etcdctl get /cassandra_1 | /usr/bin/tr -d '\n') && exec /usr/bin/docker run --rm --name %p-%i -e CASSANDRA_CLUSTERNAME=${CASSANDRA_CLUSTERNAME} -e CASSANDRA_SEEDS=\"$CASSANDRA_SEEDS\" -e BROADCAST_ADDR=$BROADCAST_ADDR -e CASSANDRA_SSL_STORAGE_PORT=$CASSANDRA_SSL_STORAGE_PORT --publish 7000:7000 --publish $CASSANDRA_SSL_STORAGE_PORT:$CASSANDRA_SSL_STORAGE_PORT --publish 9160:9160 --publish 9042:9042 --publish 7199:7199 endocode/%p"
ExecStop=/usr/bin/docker stop %p-%i
# Remove Cassandra node IP
ExecStopPost=/usr/bin/etcdctl rm /cassandra_%i
TimeoutStartSec=900s

[X-Fleet]
# Run only once Cassandra instance on host
Conflicts=%p@*.service

We won’t cover the remaining unit files and templates here, but they are available in our demo project in the fleet directory. You can find detailed information about fleet unit files on the CoreOS site.

Running fleet units on CoreOS

Now, we’re going to delve into the cluster to get our apps running and the pipeline flowing.

Log into your first CoreOS instance:

$ vagrant ssh core-01 -- -A

Now, you can get the already prepared fleet units into the CoreOS cluster by using the submit command. The /tmp/fleet directory was defined by us as the nodes’ fleet location in coreos-vagrant/Vagrantfile:

$ fleetctl submit /tmp/fleet/*

Then you can start your applications’ instances. The first instance should be Zookeeper:

$ fleetctl start zookeeper@{1..3}.service

This will automatically pull the Docker image to run Zookeeper.

You can watch the progress through system logs using the ‘journalctl -f’ command or directly watch the unit logs using ‘fleetctl journal -f zookeeper@1.service’ . Wait until fleet show that the instances are active and running:

$ fleetctl list-units
UNIT MACHINE ACTIVE SUB
zookeeper@1.service 41c56f34.../172.17.8.101 active running
zookeeper@2.service b78e09a5.../172.17.8.102 active running
zookeeper@3.service d992f125.../172.17.8.103 active running

Then run the Kafka and Cassandra instances and wait until they are ready:

$ fleetctl start kafka@{1..3}.service
$ fleetctl start cassandra@{1..3}.service

In the case of Storm, you should run Storm Nimbus first, and then Storm Supervisors:

$ fleetctl start storm-nimbus.service
$ fleetctl start storm-supervisor@{1..3}.service

Once you’ve started all instances, you should enter the development container (it contains all necessary tools to work with Kafka, Storm and Cassandra):

$ docker run --rm -ti -v /home/core/devel:/root/devel -e BROKER_LIST=`fleetctl list-machines -no-legend=true -fields=ip | sed 's/$/:9092/' | tr '\n' ','` -e NIMBUS_HOST=`etcdctl get /storm-nimbus` -e ZK=`fleetctl list-machines -no-legend=true -fields=ip | tr '\n' ','` endocode/devel-node:0.9.2 start-shell.sh bash

From here, you can run the steps that get the data flowing through the pipeline:

Create a Kafka topic and a Cassandra table:

$ $KAFKA_HOME/bin/kafka-topics.sh --create --topic topic --partitions 3 --zookeeper $ZK --replication-factor 2
$ echo "CREATE KEYSPACE testkeyspace WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 2 };" | cqlsh 172.17.8.101
$ echo "CREATE TABLE IF NOT EXISTS testkeyspace.meter_data ( id uuid, Timestamp timestamp, P_1 float, P_2 float, P_3 float, Q_1 float, Q_2 float, Q_3 float, HARM list<int>, PRIMARY KEY (id, Timestamp) );" | cqlsh 172.17.8.101

Build and submit a Storm topology:

$ cd ~/kafka_cassandra_topology
$ pyleus build
$ pyleus submit -n $NIMBUS_HOST kafka-cassandra.jar

Run the Kafka random data producer. This generates random data and sends it down the pipeline:

$ ./single_python_producer.py

Monitor your Cassandra cluster in a separate endocode/devel-node:0.9.2 container:

$ cqlsh 172.17.8.101
Connected to cluster at 172.17.8.101:9160.
[cqlsh 4.1.1 | Cassandra 2.0.12 | CQL spec 3.1.1 | Thrift protocol 19.39.0]
Use HELP for help.
cqlsh> SELECT COUNT(*) FROM testkeyspace.meter_data LIMIT 1000000;

As time goes on, you will see the result of this query continually increase as more and more data ends up at your Cassandra cluster.

Summary

This post concludes a three-part series exploring how to use some of the most cutting-edge tools and techniques to build a clustered data processing pipeline. We learned about:

  • Kafka, Storm and Cassandra – the components of our pipeline.
  • The container approach to virtualization and how to containerize our apps using Docker.
  • How to use CoreOS to build and manage the cluster.

As we said right back at the beginning, clusters are powerful tools when speed, scale and availability are essential features of your system. This series has showcased some of the best tools for doing clustering right, tools that we at Endocode pride ourselves in using and developing in our daily business.

Posts in this series