There is apache-spark. Â I installed it examined examples and run them. Worked. Now what?
One way is to go to linkedin and mark yourself now as the expert of spark and forget the topic.
Another way is to create a interesting problem and try to solve it. After that you may go to the linkedin and mark yourself as a expert đ
So here is my prolem
What do I have here. From the left to right. Apache-kafka is receives stream like string “a b c” thous are quadratic equation‘s (ax2Â + bx + c = 0) input.
Apache-Spark is going to resolve quadratic equation and saves input parameters and results x1 and x2 to hbase.
The big picture
As you see now the day is much more interesting đ
On let’s jump into technology.
First I am going to create HBase table with one column family:
hbase(main):027:0> create ‘rootvorrand’, ‘info’
hbase(main):043:0> describe ‘rootvorrand’
L => ‘2147483647’, KEEP_DELETED_CELLS => ‘false’, BLOCKSIZE => ‘65536’, IN_MEMORY => ‘false’, BLOCKCACHE => ‘true’}
1 row(s) in 0.0330 seconds
I can see my new table in UI too
Now I going to create apache-kafka topic with 3 replica and 1 partition
margusja@IRack:~/kafka_2.9.1-$ bin/ –create –topic rootvorrand –partitions 1 –replication-factor 3 –zookeeper vm24:2181
margusja@IRack:~/kafka_2.9.1-$ bin/ –describe –topic rootvorrand –zookeeper
Topic:rootvorrand PartitionCount:1 ReplicationFactor:3 Configs:
Topic: rootvorrand Partition: 0 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1
Add some input data
Now lets set up development environment in Eclipse. I need some external jars. As you can see I am using the latest apache-spark 1.0.0 released about week ago.
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.regex.Pattern;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.kafka.*;
import scala.Tuple2;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
public class KafkaSparkHbase {
private static final Pattern SPACE = Pattern.compile(” “);
private static HTable table;
public static void main(String[] args) {
String topic = “rootvorrand”;
int numThreads = 1;
String zkQuorum = “vm38:2181,vm37:2181,vm24:2181”;
String KafkaConsumerGroup = “sparkScript”;
String master = “spark://dlvm2:7077”;
// HBase config
Configuration conf = HBaseConfiguration.create();
conf.set(“hbase.zookeeper.quorum”, “vm24,vm38,vm37”);
conf.set(“”, “2181”);
conf.set(“hbase.rootdir”, “hdfs://vm38:8020/user/hbase/data”);
try {
table = new HTable(conf, “rootvorrand”);
} catch (IOException e) {
// TODO Auto-generated catch block
//SparkConf sparkConf = new SparkConf().setAppName(“KafkaSparkHbase”).setMaster(master).setJars(jars);
JavaStreamingContext jssc = new JavaStreamingContext(master, “KafkaWordCount”,
new Duration(2000), System.getenv(“SPARK_HOME”),
//JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
Map<String, Integer> topicMap = new HashMap<String, Integer>();
topicMap.put(topic, numThreads);
JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, zkQuorum, KafkaConsumerGroup, topicMap);
JavaDStream lines = Function<Tuple2<String, String>, String>() {
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
// resolve quadratic equation
JavaDStream result =
new Function<String, String>()
public String call(String x) throws Exception {
//System.out.println(“Input is: “+ x);
Integer a,b,c,y, d;
String[] splitResult = null;
Integer x1 = null;
Integer x2 = null;
splitResult = SPACE.split(x);
//System.out.println(“Split: “+ splitResult.length);
if (splitResult.length == 3)
a = Integer.valueOf(splitResult[0]);
b = Integer.valueOf(splitResult[1]);
c = Integer.valueOf(splitResult[2]);
d=(int) Math.sqrt(y);
//System.out.println(“discriminant: “+ d);
if (d > 0)
return x + ” “+ x1 + ” “+ x2;
new Function<JavaRDD, Void>() {
public Void call(final JavaRDD x) throws Exception {
List arr = x.toArray();
for (String entry : arr) {
Put p = new Put(Bytes.toBytes(UUID.randomUUID().toString()));
p.add(Bytes.toBytes(“info”), Bytes.toBytes(“record”), Bytes.toBytes(entry));
return null;
pack it and run it:
[root@dlvm2 ~]java -cp kafkasparkhbase-0.1.jar KafkaSparkHbase
Input is: 1 4 3
Split: 3
discriminant: 2
x1: -1 x2: -3
1 4 3 -1 -3
As we can see. Input data from kafka is 1 4 3 and spark output line is 1 4 3 -1 -3 where -1 -3 are roots.
We can also see that there no lag in kafka queue. Spark worker is consumed all data from the queue
Let’s empty our HBase table:
put some input variables to kafka queue
-2 -3 10
2 -4 -10
And scan HBase table rootvorrand
As you can see. There are our input variables and roots.