Apache-kafka and Atmel 328p + enc28j60

I put together a simple hardware

atmega328p – executes programm

enc28j60 – ethernet

programmed it using C code:

// Demo using DHCP and DNS to perform a web client request.
// 2011-06-08 <jc@wippler.nl> http://opensource.org/licenses/mit-license.php

#include <EtherCard.h>

// ethernet interface mac address, must be unique on the LAN
static byte mymac[] = { 0x74,0x69,0x69,0x2D,0x30,0x31 };

byte Ethernet::buffer[700];
static uint32_t timer;
Stash stash;

char website[] PROGMEM = “vm37.dbweb.ee”;
#define PATH “”
#define VARIABLE “test”

// called when the client request is complete
static void my_callback (byte status, word off, word len) {
Serial.println(“>>>”);
Ethernet::buffer[off+300] = 0;
Serial.print((const char*) Ethernet::buffer + off);
Serial.println(“…”);
}

void setup () {
Serial.begin(57600);
Serial.println(“\n[webClient]”);

if (ether.begin(sizeof Ethernet::buffer, mymac) == 0)
Serial.println( “Failed to access Ethernet controller”);
if (!ether.dhcpSetup())
Serial.println(“DHCP failed”);
ether.hisport = 8080;
ether.printIp(“IP: “, ether.myip);
ether.printIp(“GW: “, ether.gwip);
ether.printIp(“DNS: “, ether.dnsip);

if (!ether.dnsLookup(website))
Serial.println(“DNS failed”);

ether.printIp(“SRV: “, ether.hisip);
}

void loop () {
ether.packetLoop(ether.packetReceive());

if (millis() > timer) {
timer = millis() + 10000;

byte sd = stash.create();
stash.print(“{\”messages\”: [{\”value\”:{\”key\”:\”Margusja\”}}]}”);
stash.print(VARIABLE);
stash.print(“&action=Submit”);
stash.save();

// generate the header with payload – note that the stash size is used,
// and that a “stash descriptor” is passed in as argument using “$H”
Stash::prepare(PSTR(“POST /topics/kafkademo1 HTTP/1.1” “\r\n”
“Host: vm37.dbweb.ee:8080” “\r\n”
//”User-Agent: margusja” “\r\n”
“Accept: */*” “\r\n”
“Content-Type: application/json” “\r\n”
“Content-Length: $D” “\r\n”
“\r\n”
“$H”),
website, PSTR(PATH), website, stash.size(), sd);

// send the packet – this also releases all stash buffers once done
ether.tcpSend();
}
}

Uploaded code to MC and powered it up.

2014-05-14 21.46.00

 

 

And after it got data from DHCP I saw nice picture – it sends data into my kafka queue

Screen Shot 2014-05-14 at 21.45.11

A very simple apache-kafka cluster demo

Kafka is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system, but with a unique design.

Try this video in full screen.

Apache-storm

Storm has many use cases: realtime analytics, online machine learning, continuous computation, distributed RPC, ETL, and more. Storm is fast: a benchmark clocked it at over a million tuples processed per second per node. It is scalable, fault-tolerant, guarantees your data will be processed, and is easy to set up and operate

 

Apache-zookeeper http://zookeeper.apache.org/doc/trunk/index.html

zookeeper on mõeldud hoidma teenuste seadistusi ja staatusi. Näiteks antud juhul on zookeeper serverites talletatud informatsioon, millised storm’i workerid on olemas.

Zookeeper teenus võib olla jaotunud eraldi serverite vahel, mis tagab kõrge veakindluse

zkservice

 

Zookeeper hoiab seadistusi hierarhias

zookeeper2

Näiteks minu testkeskkonnas on üks storm supervisor e worker ja hetkel on üks topoloogia, see kajastub zookeeperis:

[root@sandbox ~]# /usr/lib/zookeeper/bin/zkCli.sh -server 127.0.0.1:2181
Connecting to 127.0.0.1:2181

[zk: 127.0.0.1:2181(CONNECTED) 1] ls /storm
[workerbeats, errors, supervisors, storms, assignments]
[zk: 127.0.0.1:2181(CONNECTED) 2] ls /storm/storms
[engineMessages5-2-1398208863]
[zk: 127.0.0.1:2181(CONNECTED) 3]

Zookeeper võimaldab stormi workereid dünaamiliselt juurde lisada. Storm master e nimbus oskab zookeeper serverist saadud info kohaselt workereid kasutada. Näiteks, kui mõni worker mingil põhjusel ei ole enam kättesaadav, siis zookeeper saab sellest teada, kuna heardbeate enam ei tule ja nimbus organiseerib voogude teekonnad ringi tekitades kadunud workeri asemel uue, eeldusel, et on kuhugile tekitada ehk on veel vabu supervisoreid.

 

Storm

Storm has many use cases: realtime analytics, online machine learning, continuous computation, distributed RPC, ETL, and more. Storm is fast: a benchmark clocked it at over a million tuples processed per second per node. It is scalable, fault-tolerant, guarantees your     data will be processed, and is easy to set up and operate.

storm-cluster2

 

Nimbus

On master topoloogias, kes kordineerib, kasutades zookeeper-klastris olevat informatsiooni, storm-supervisorite töid ehk tagab voogude läbimise topoloogiast.

 Storm-supervisor ehk worker

Spout(id) ja/või Bolt(id), kes kuuluvad mingisse topoloogiasse. Võivad asuda ühes füüsilises serveris või jaotatud erinevate füüsiliste serverite vahel. Zookeeperi abil annavad nimbusele teada oma olemasolust.

Storm-supervisor versus supervisor (http://supervisord.org/)

Etteruttavalt selgitan, et antud juhul on kasutusel kaks supervisor teenust, mis on erinevad ja mida on vaja lahti seletada.

storm-supervisor – strom worker

supervisor – Process Control System.

On kasutusel, tagamaks, et teenused – nimbus, zookeeper, storm_supervisor (worker) oleksid kiirelt taastatud, kui mõni neist peaks mingil põhjusel seiskuma.

Näide:

Hetkel on minu testkeskkonnas supervisor (mitte storm-supervisor) kontrolli all vajalikud storm teenused

[root@sandbox ~]# supervisorctl
storm-supervisor RUNNING pid 3483, uptime 2:14:55
storm_nimbus RUNNING pid 3765, uptime 1:44:23
storm_ui RUNNING pid 3672, uptime 2:13:09
zookeeper RUNNING pid 3484, uptime 2:14:55
supervisor>

Peatades näiteks storm_nimbus protsessi 3765

[root@sandbox ~]# kill -9 3765

supervisord logis:

2014-04-22 17:53:20,884 INFO exited: storm_nimbus (terminated by SIGKILL; not expected)
2014-04-22 17:53:20,884 INFO received SIGCLD indicating a child quit
2014-04-22 17:53:21,894 INFO spawned: ‘storm_nimbus’ with pid 4604
2014-04-22 17:53:22,898 INFO success: storm_nimbus entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)

Kontrollime supervusord statust

supervisor> status
storm-supervisor RUNNING pid 3483, uptime 2:30:50
storm_nimbus RUNNING pid 4604, uptime 0:00:38
storm_ui RUNNING pid 3672, uptime 2:29:04
zookeeper RUNNING pid 3484, uptime 2:30:50

On näha, et just on uus protsess käivitatud.

Toodangusüsteemides on soovitatav jaotada storm komponendid nii, et nimbus, ui ja üks zookeeper server  on ühes masinas ja teistes asuvad zookeeper server ja storm-supervisor. Komplekte zookeeper-server ja storm-supervusor võib dünaamiliselt hiljem lisada.

Vahemärkusena, et tegelikult ei pea storm-supervisor ja zookeeper ühes füüsilises serveris asuma. Piisab, kui storm-supervisor teab, kus asub zookeeper server, et sinna oma staatus teatada.

Kui mingil põhjusel peaks üks  storm-supervusor kättesaamatuks muutuma, siis nimbus saab sellest teada ja organiseerib topoloogia niimoodi, et voog oleks täielik.

Kui mingil põhjusel peaks muutuma mittekättesaadavaks nimbus, siis topoloogia on terviklik ja vood täätavad edasi.

Kui mingil põhjusel peaks muutuma korraga mittekättesaadavaks nimbus ja mõni hetkel topoloogias aktiivselt osalev storm-supervisor, siis tekib esimene reaalne probleem. Samas ka siin ei kao voos liikuvad andmed vaid õige seadistuse puhul iga topoloogias olev Spout registreerib voos olevate sõnumite mittekohalejõudmise ja kui nüüd taastatakse nimbus ja/või storm-supervisor, siis Spout saadab sõnumi uuesti.

Kujutame ette, et meil on allpool toodud topoloogia

topo1

Kõik Bolt’d ja Spout’d asuvad eraldi masinates ehk on srorm-supervusor + zookeeper komplektid, siis juhul, kui peaks tekkima selline olukord

topo2

Siis nimbus saab sellest teada, sest zookeeperisse enam heartbeate ei tule ja nimbus üritab leida zookeeperi kaudu mõnda vaba serverit, kus on storm-supervisor.

Kui nüüd on olemas zookeeperi kaudu nimbusele teada mõni vaba storm-supervisor, siis topoloogia taastatakse. Kui mõnii sõnum ei jõudnud vahepeal kohale, kuna topoloogia ei olnud täielik, siis Spout on sellest teadlik ja saadab sõnumi uuesti.

topo3

jätkub…