Apache-PIG how to save output into different places

pig-logoRecently I had problem where I had many huge files containing timestamps and I had to separate thous lines into separate files.

Basically group by and save groups to separate files.

First I tried do to it in apache-hive and somehow I reached to the result but I did’t like it. Deep inside I felt there have to be better and cleaner solution for that.

I can’t share the original dataset but lets generate some sample data and play with it. Because problem is the same.

A frame of my example dataset

Screenshot 2014-12-18 12.12.14

Actually there are 1001 rows including header

So as you can see there is column country. I going to use apache-pig to split rows so that finally I can save them in to different directories in to hadoop HDFS.

First let’s load our data and describe schema. chararray is similar to the string type what is familiar from many different langues to as.

A = LOAD ‘/user/margusja/pig_demo_files/MOCK_DATA.csv’ using PigStorage(‘,’) AS (id: int, first_name: chararray, last_name: chararray, email: chararray, country: chararray,
ip_address: chararray);

so and final PIG sentence will be:

STORE A INTO ‘/user/margusja/pig_demo_out/’ USING org.apache.pig.piggybank.storage.MultiStorage(‘/user/margusja/pig_demo_out’, ‘4’, ‘none’, ‘,’);

Some additional words about line above. Let me explain MultiStorage’s header (‘/user/margusja/pig_demo_out’, ‘4’, ‘none’, ‘,’)

The first argument is path in HDFS. That is the place we are going to find our generated directories containing countries files. It has to be similar we are using after STORE A INTO …

The second argument is column’s index we are going to use as directory name. Third, in our case in none, we can use if we’d like to compress data. The last one is separator between columns.

So let’s run our tiny but very useful pig script

> pig -f demo.pig

It starting map/reduce job in our hadoop cluster. After it finished we can admit the result in our hadoop HDFS.

Some snapshots from the result via HUE

Screenshot 2014-12-18 12.24.10

 

If we look into directory Estonia we can find there is a file contains only rows where country is Estonia

Screenshot 2014-12-18 12.26.09

So my opinion is that this is awesome!

Kafka Benchmark – Could not find or load main class org.apache.kafka.clients.tools.ProducerPerformance

OS: Centos 6.5

Kafka from kafka-0.8.1.2.1.4.0-632.el6.noarch repo. Installed using yum.

When I wanted to use perfomance tools:

[server1 kafka]# bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance

Error: Could not find or load main class org.apache.kafka.clients.tools.ProducerPerformance

Then I tried different methods to get workaround but for me worked following:

cd /usr/local/

git clone https://git-wip-us.apache.org/repos/asf/kafka.git

 yum install git

 git clone https://git-wip-us.apache.org/repos/asf/kafka.git

  cd kafka

  git checkout -b 0.8 remotes/origin/0.8

  ./sbt update

  ./sbt package

  ./sbt assembly-package-dependency

  ./bin/kafka-producer-perf-test.sh – it now works!

[root@server1 kafka]# ./bin/kafka-producer-perf-test.sh –topic kafkatopic –broker-list server1:9092 –messages 1000000 –show-detailed-stats –message-size 1024

start.time, end.time, compression, message.size, batch.size, total.data.sent.in.MB, MB.sec, total.data.sent.in.nMsg, nMsg.sec

2014-10-07 09:45:13:825, 2014-10-07 09:45:23:781, 0, 1024, 200, 976.56, 98.0878, 1000000, 100441.9446

Hadoop namenode HA and hive metastore location urls

hive_logo_medium

Recently switched hadoop namenode to namenode HA. Most steps went successfully but hive was unhappy and tried to locate files via old url. So I found tutorial http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.1.5/bk_system-admin-guide/content/sysadminguides_ha_chap3.html and used thous commands but only some tables changed after using thous commands.

Then I did it manually and it helped. In case your hive metadata is in mysql then you can connect to your hive db and use command:

UPDATE SDS SET LOCATION=REPLACE(LOCATION, ‘ hdfs://mycluster’, ‘hdfs://namenode:8020’);

After it I can switch active namenodes around and hive still can locate files via metastore.

Maybe this is helpful for someone 🙂

apache hive how to join log files and use sql queries over joined data

Let’s create two very simple log files. Into log1.txt file lets put in example users problems log data and into log2.txt file solutions log data

log1.txt:

user1 | 2014-09-23 | error message 1
user2 | 2014-09-23 | error message 2
user3 | 2014-09-23 | error message 3
user4 | 2014-09-23 | error message 1
user5 | 2014-09-23 | error message 2
user6 | 2014-09-23 | error message 12
user7 | 2014-09-23 | error message 11
user1 | 2014-09-24 | error message 1
user2 | 2014-09-24 | error message 2
user3 | 2014-09-24 | error message 3
user4 | 2014-09-24 | error message 10
user1 | 2014-09-24 | error message 17
user2 | 2014-09-24 | error message 13
user1 | 2014-09-24 | error message 1

log2.txt:
user1 | support2 | solution message 1
user2 | support1 | solution message 2
user3 | support2 | solution message 3
user1 | support1 | solution message 4
user2 | support2 | solution message 5
user4 | support1 | solution message 6
user2 | support2 | solution message 7
user5 | support1 | solution message 8

Create two tables for datasets above:

hive> create table log1 (user STRING, date STRING, error STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘|’ STORED AS TEXTFILE;       

OK

Time taken: 5.968 seconds

hive> LOAD DATA INPATH ‘/user/margusja/hiveinput/log1.txt’ OVERWRITE INTO TABLE log1;                                             

Loading data to table default.log1

rmr: DEPRECATED: Please use ‘rm -r’ instead.

Moved: ‘hdfs://bigdata1.host.int:8020/apps/hive/warehouse/log1’ to trash at: hdfs://bigdata1.host.int:8020/user/margusja/.Trash/Current

Table default.log1 stats: [numFiles=1, numRows=0, totalSize=523, rawDataSize=0]

OK

Time taken: 4.687 seconds

hive> create table log2 (user STRING, support STRING, solution STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘|’ STORED AS TEXTFILE;

OK

Time taken: 0.997 seconds

hive> LOAD DATA INPATH ‘/user/margusja/hiveinput/log2.txt’ OVERWRITE INTO TABLE log2;                                                   

Loading data to table default.log2

rmr: DEPRECATED: Please use ‘rm -r’ instead.

Moved: ‘hdfs://bigdata1.host.int:8020/apps/hive/warehouse/log2’ to trash at: hdfs://bigdata1.host.int:8020/user/margusja/.Trash/Current

Table default.log2 stats: [numFiles=1, numRows=0, totalSize=304, rawDataSize=0]

OK

Time taken: 0.72 seconds

hive>

Now let’s make SQL over two datafile placed to HDFS storage using HIVE:

hive> select log1.user, log1.date, log1.error, log2.support, log2.solution from log2 join log1 on (log1.user = log2.user);

And result. We see now how two separated log file are joined together and now we can see in example that user2 has error message 2 in 2012-09-23 and support2 offered solution message 7.

user1  2014-09-23  error message 1 support2  solution message 1

user1  2014-09-23  error message 1 support1  solution message 4

user2  2014-09-23  error message 2 support1  solution message 2

user2  2014-09-23  error message 2 support2  solution message 5

user2  2014-09-23  error message 2 support2  solution message 7

user3  2014-09-23  error message 3 support2  solution message 3

user4  2014-09-23  error message 1 support1  solution message 6

user5  2014-09-23  error message 2 support1  solution message 8

user1  2014-09-24  error message 1 support2  solution message 1

user1  2014-09-24  error message 1 support1  solution message 4

user2  2014-09-24  error message 2 support1  solution message 2

user2  2014-09-24  error message 2 support2  solution message 5

user2  2014-09-24  error message 2 support2  solution message 7

user3  2014-09-24  error message 3 support2  solution message 3

user4  2014-09-24  error message 10 support1  solution message 6

user1  2014-09-24  error message 17 support2  solution message 1

user1  2014-09-24  error message 17 support1  solution message 4

user2  2014-09-24  error message 13 support1  solution message 2

user2  2014-09-24  error message 13 support2  solution message 5

user2  2014-09-24  error message 13 support2  solution message 7

user1  2014-09-24  error message 1 support2  solution message 1

user1  2014-09-24  error message 1 support1  solution message 4

Time taken: 34.561 seconds, Fetched: 22 row(s)

More cool things:

We can select only specified user:

hive> select log1.user, log1.date, log1.error, log2.support, log2.solution from log2 join log1 on (log1.user = log2.user) where log1.user like ‘%user1%’;

user1  2014-09-23  error message 1 support2  solution message 1

user1  2014-09-23  error message 1 support1  solution message 4

user1  2014-09-24  error message 1 support2  solution message 1

user1  2014-09-24  error message 1 support1  solution message 4

user1  2014-09-24  error message 17 support2  solution message 1

user1  2014-09-24  error message 17 support1  solution message 4

user1  2014-09-24  error message 1 support2  solution message 1

user1  2014-09-24  error message 1 support1  solution message 4

Time taken: 31.932 seconds, Fetched: 8 row(s)

We can query by date:

hive> select log1.user, log1.date, log1.error, log2.support, log2.solution from log2 join log1 on (log1.user = log2.user) where log1.date like ‘%2014-09-23%’;

user1  2014-09-23  error message 1 support2  solution message 1

user1  2014-09-23  error message 1 support1  solution message 4

user2  2014-09-23  error message 2 support1  solution message 2

user2  2014-09-23  error message 2 support2  solution message 5

user2  2014-09-23  error message 2 support2  solution message 7

user3  2014-09-23  error message 3 support2  solution message 3

user4  2014-09-23  error message 1 support1  solution message 6

user5  2014-09-23  error message 2 support1  solution message 8

Now lets forward our awesome join sentence to the next table – log3 where we are going to hold our joined data

hive> create table log3 (user STRING, date STRING, error STRING, support STRING, solution STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘ \t’ STORED AS TEXTFILE;

hive> insert table log3 select log1.user, log1.date, log1.error, log2.support, log2.solution from log2 join log1 on (log1.user = log2.user)

And now we cat use very simple sql to get data:

hive> select * from log3;

OK

user1  2014-09-23  error message 1 support2  solution message 1

user1  2014-09-23  error message 1 support1  solution message 4

user2  2014-09-23  error message 2 support1  solution message 2

user2  2014-09-23  error message 2 support2  solution message 5

user2  2014-09-23  error message 2 support2  solution message 7

user3  2014-09-23  error message 3 support2  solution message 3

user4  2014-09-23  error message 1 support1  solution message 6

user5  2014-09-23  error message 2 support1  solution message 8

user1  2014-09-24  error message 1 support2  solution message 1

user1  2014-09-24  error message 1 support1  solution message 4

user2  2014-09-24  error message 2 support1  solution message 2

user2  2014-09-24  error message 2 support2  solution message 5

user2  2014-09-24  error message 2 support2  solution message 7

user3  2014-09-24  error message 3 support2  solution message 3

user4  2014-09-24  error message 10 support1  solution message 6

user1  2014-09-24  error message 17 support2  solution message 1

user1  2014-09-24  error message 17 support1  solution message 4

user2  2014-09-24  error message 13 support1  solution message 2

user2  2014-09-24  error message 13 support2  solution message 5

user2  2014-09-24  error message 13 support2  solution message 7

user1  2014-09-24  error message 1 support2  solution message 1

user1  2014-09-24  error message 1 support1  solution message 4

Time taken: 0.075 seconds, Fetched: 22 row(s)

hive>

how to start hadoop MRv2

yarn_architecture

Since hadoop MRv1 and MRv2 are different I found good starting point from http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.1-latest/bk_using-apache-hadoop/content/running_mapreduce_examples_on_yarn.html.

[root@vm37 hadoop-yarn]# yarn jar /usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples-2.2.0.2.0.6.0-101.jar 

An example program must be given as the first argument.

Valid program names are:

  aggregatewordcount: An Aggregate based map/reduce program that counts the words in the input files.

  aggregatewordhist: An Aggregate based map/reduce program that computes the histogram of the words in the input files.

  bbp: A map/reduce program that uses Bailey-Borwein-Plouffe to compute exact digits of Pi.

  dbcount: An example job that count the pageview counts from a database.

  distbbp: A map/reduce program that uses a BBP-type formula to compute exact bits of Pi.

  grep: A map/reduce program that counts the matches of a regex in the input.

  join: A job that effects a join over sorted, equally partitioned datasets

  multifilewc: A job that counts words from several files.

  pentomino: A map/reduce tile laying program to find solutions to pentomino problems.

  pi: A map/reduce program that estimates Pi using a quasi-Monte Carlo method.

  randomtextwriter: A map/reduce program that writes 10GB of random textual data per node.

  randomwriter: A map/reduce program that writes 10GB of random data per node.

  secondarysort: An example defining a secondary sort to the reduce.

  sort: A map/reduce program that sorts the data written by the random writer.

  sudoku: A sudoku solver.

  teragen: Generate data for the terasort

  terasort: Run the terasort

  teravalidate: Checking results of terasort

  wordcount: A map/reduce program that counts the words in the input files.

  wordmean: A map/reduce program that counts the average length of the words in the input files.

  wordmedian: A map/reduce program that counts the median length of the words in the input files.

  wordstandarddeviation: A map/reduce program that counts the standard deviation of the length of the words in the input files.

Now lets take wordcount example. I have downloaded example dataset and put it into hadoop fs

[root@vm37 hadoop-mapreduce]# hfds dfs -put wc.txt /user/margusja/wc/input/

Now execute mapreduce job

[root@vm37 hadoop-mapreduce]# yarn jar /usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples-2.2.0.2.0.6.0-101.jar wordcount /user/margusja/wc/input /user/margusja/wc/output

14/08/13 12:34:59 INFO client.RMProxy: Connecting to ResourceManager at vm38.dbweb.ee/192.168.1.72:8032

14/08/13 12:35:00 INFO input.FileInputFormat: Total input paths to process : 1

14/08/13 12:35:01 INFO mapreduce.JobSubmitter: number of splits:1

14/08/13 12:35:01 INFO Configuration.deprecation: user.name is deprecated. Instead, use mapreduce.job.user.name

14/08/13 12:35:01 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar

14/08/13 12:35:01 INFO Configuration.deprecation: mapred.output.value.class is deprecated. Instead, use mapreduce.job.output.value.class

14/08/13 12:35:01 INFO Configuration.deprecation: mapreduce.combine.class is deprecated. Instead, use mapreduce.job.combine.class

14/08/13 12:35:01 INFO Configuration.deprecation: mapreduce.map.class is deprecated. Instead, use mapreduce.job.map.class

14/08/13 12:35:01 INFO Configuration.deprecation: mapred.job.name is deprecated. Instead, use mapreduce.job.name

14/08/13 12:35:01 INFO Configuration.deprecation: mapreduce.reduce.class is deprecated. Instead, use mapreduce.job.reduce.class

14/08/13 12:35:01 INFO Configuration.deprecation: mapred.input.dir is deprecated. Instead, use mapreduce.input.fileinputformat.inputdir

14/08/13 12:35:01 INFO Configuration.deprecation: mapred.output.dir is deprecated. Instead, use mapreduce.output.fileoutputformat.outputdir

14/08/13 12:35:01 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps

14/08/13 12:35:01 INFO Configuration.deprecation: mapred.output.key.class is deprecated. Instead, use mapreduce.job.output.key.class

14/08/13 12:35:01 INFO Configuration.deprecation: mapred.working.dir is deprecated. Instead, use mapreduce.job.working.dir

14/08/13 12:35:01 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1407837551751_0037

14/08/13 12:35:01 INFO impl.YarnClientImpl: Submitted application application_1407837551751_0037 to ResourceManager at vm38.dbweb.ee/192.168.1.72:8032

14/08/13 12:35:02 INFO mapreduce.Job: The url to track the job: http://vm38.dbweb.ee:8088/proxy/application_1407837551751_0037/

14/08/13 12:35:02 INFO mapreduce.Job: Running job: job_1407837551751_0037

14/08/13 12:35:11 INFO mapreduce.Job: Job job_1407837551751_0037 running in uber mode : false

14/08/13 12:35:11 INFO mapreduce.Job:  map 0% reduce 0%

14/08/13 12:35:21 INFO mapreduce.Job:  map 100% reduce 0%

14/08/13 12:35:31 INFO mapreduce.Job:  map 100% reduce 100%

14/08/13 12:35:31 INFO mapreduce.Job: Job job_1407837551751_0037 completed successfully

14/08/13 12:35:31 INFO mapreduce.Job: Counters: 43

        File System Counters

                FILE: Number of bytes read=167524

                FILE: Number of bytes written=493257

                FILE: Number of read operations=0

                FILE: Number of large read operations=0

                FILE: Number of write operations=0

                HDFS: Number of bytes read=384341

                HDFS: Number of bytes written=120766

                HDFS: Number of read operations=6

                HDFS: Number of large read operations=0

                HDFS: Number of write operations=2

        Job Counters 

                Launched map tasks=1

                Launched reduce tasks=1

                Data-local map tasks=1

                Total time spent by all maps in occupied slots (ms)=8033

                Total time spent by all reduces in occupied slots (ms)=7119

        Map-Reduce Framework

                Map input records=9488

                Map output records=67825

                Map output bytes=643386

                Map output materialized bytes=167524

                Input split bytes=134

                Combine input records=67825

                Combine output records=11900

                Reduce input groups=11900

                Reduce shuffle bytes=167524

                Reduce input records=11900

                Reduce output records=11900

                Spilled Records=23800

                Shuffled Maps =1

                Failed Shuffles=0

                Merged Map outputs=1

                GC time elapsed (ms)=172

                CPU time spent (ms)=5880

                Physical memory (bytes) snapshot=443211776

                Virtual memory (bytes) snapshot=1953267712

                Total committed heap usage (bytes)=317194240

        Shuffle Errors

                BAD_ID=0

                CONNECTION=0

                IO_ERROR=0

                WRONG_LENGTH=0

                WRONG_MAP=0

                WRONG_REDUCE=0

        File Input Format Counters 

                Bytes Read=384207

        File Output Format Counters 

                Bytes Written=120766

And in my cluster UI I can see my submitted job

Screen Shot 2014-08-13 at 12.35.16

After job is finished you can explore result via HDFS UI

Screen Shot 2014-08-13 at 12.41.04 Screen Shot 2014-08-13 at 12.40.18

or you can move it to your local dir using hdfs command line command

[root@vm37 hadoop-mapreduce]# hdfs dfs -get /user/margusja/wc/output/part-r-00000

[root@vm37 hadoop-mapreduce]# head part-r-00000 

”       6

“‘Among 2

“‘And   1

“‘Appen 1

“‘Ce    1

“‘Doigts’       1

“‘E’s   1

“‘Ello, 1

“‘Er    1

“‘Er’s  1

Next step might by digging into source code

http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/

I am lazy person so I use maven to manage my java class dependencies

[margusja@vm37 ~]$ mvn archetype:generate -DgroupId=com.deciderlab.wordcount -DartifactId=wordcount -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false

[INFO] Scanning for projects…

[INFO]                                                                         

[INFO] ————————————————————————

[INFO] Building Maven Stub Project (No POM) 1

[INFO] ————————————————————————

[INFO] 

[INFO] >>> maven-archetype-plugin:2.2:generate (default-cli) @ standalone-pom >>>

[INFO] 

[INFO] <<< maven-archetype-plugin:2.2:generate (default-cli) @ standalone-pom <<<

[INFO] 

[INFO] — maven-archetype-plugin:2.2:generate (default-cli) @ standalone-pom —

[INFO] Generating project in Batch mode

[INFO] —————————————————————————-

[INFO] Using following parameters for creating project from Old (1.x) Archetype: maven-archetype-quickstart:1.0

[INFO] —————————————————————————-

[INFO] Parameter: groupId, Value: com.deciderlab.wordcount

[INFO] Parameter: packageName, Value: com.deciderlab.wordcount

[INFO] Parameter: package, Value: com.deciderlab.wordcount

[INFO] Parameter: artifactId, Value: wordcount

[INFO] Parameter: basedir, Value: /var/www/html/margusja

[INFO] Parameter: version, Value: 1.0-SNAPSHOT

[INFO] project created from Old (1.x) Archetype in dir: /var/www/html/margusja/wordcount

[INFO] ————————————————————————

[INFO] BUILD SUCCESS

[INFO] ————————————————————————

[INFO] Total time: 6.382s

[INFO] Finished at: Wed Aug 13 13:08:59 EEST 2014

[INFO] Final Memory: 14M/105M

[INFO] ————————————————————————

Now you can move WordCount.java source to your src/main/…[whar ever is your dir]

I made some changes in Wordcount.java

package com.deciderlab.wordcount;

Job job = new Job(conf, “Margusja’s word count demo”);

in pom.xml I added some dependencies

    <dependency>

      <groupId>org.apache.hadoop</groupId>

      <artifactId>hadoop-common</artifactId>

      <version>2.4.1</version>

    </dependency>

    <dependency>

      <groupId>org.apache.hadoop</groupId>

      <artifactId>hadoop-core</artifactId>

      <version>1.2.1</version>

    </dependency>

now build your jar

[margusja@vm37 wordcount]$ mvn package

[INFO] Scanning for projects…

[INFO]                                                                         

[INFO] ————————————————————————

[INFO] Building wordcount 1.0-SNAPSHOT

[INFO] ————————————————————————

[INFO] 

[INFO] — maven-resources-plugin:2.5:resources (default-resources) @ WordCount —

[debug] execute contextualize

[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!

[INFO] skip non existing resourceDirectory /var/www/html/margusja/wordcount/src/main/resources

[INFO] 

[INFO] — maven-compiler-plugin:2.3.2:compile (default-compile) @ WordCount —

[INFO] Nothing to compile – all classes are up to date

[INFO] 

[INFO] — maven-resources-plugin:2.5:testResources (default-testResources) @ WordCount —

[debug] execute contextualize

[WARNING] Using platform encoding (UTF-8 actually) to copy filtered resources, i.e. build is platform dependent!

[INFO] skip non existing resourceDirectory /var/www/html/margusja/wordcount/src/test/resources

[INFO] 

[INFO] — maven-compiler-plugin:2.3.2:testCompile (default-testCompile) @ WordCount —

[INFO] Nothing to compile – all classes are up to date

[INFO] 

[INFO] — maven-surefire-plugin:2.10:test (default-test) @ WordCount —

[INFO] Surefire report directory: /var/www/html/margusja/wordcount/target/surefire-reports

——————————————————-

 T E S T S

——————————————————-

Running com.deciderlab.wordcount.AppTest

Tests run: 1, Failures: 0, Errors: 0, Skipped: 0, Time elapsed: 0.052 sec

Results :

Tests run: 1, Failures: 0, Errors: 0, Skipped: 0

[INFO] 

[INFO] — maven-jar-plugin:2.3.2:jar (default-jar) @ WordCount —

[INFO] ————————————————————————

[INFO] BUILD SUCCESS

[INFO] ————————————————————————

[INFO] Total time: 4.552s

[INFO] Finished at: Wed Aug 13 13:58:08 EEST 2014

[INFO] Final Memory: 12M/171M

[INFO] ————————————————————————

[margusja@vm37 wordcount]$ 

Now you are ready to lunch your first hadoop MRv2 job

[margusja@vm37 wordcount]$ hadoop jar /var/www/html/margusja/wordcount/target/WordCount-1.0-SNAPSHOT.jar com.deciderlab.wordcount.WordCount /user/margusja/wc/input /user/margusja/wc/output4

14/08/13 14:01:38 INFO mapreduce.Job: Running job: job_1407837551751_0040

14/08/13 14:01:47 INFO mapreduce.Job: Job job_1407837551751_0040 running in uber mode : false

14/08/13 14:01:47 INFO mapreduce.Job:  map 0% reduce 0%

14/08/13 14:01:58 INFO mapreduce.Job:  map 100% reduce 0%

14/08/13 14:02:07 INFO mapreduce.Job:  map 100% reduce 100%

14/08/13 14:02:08 INFO mapreduce.Job: Job job_1407837551751_0040 completed successfully

14/08/13 14:02:08 INFO mapreduce.Job: Counters: 43

        File System Counters

                FILE: Number of bytes read=167524

                FILE: Number of bytes written=493091

                FILE: Number of read operations=0

                FILE: Number of large read operations=0

                FILE: Number of write operations=0

                HDFS: Number of bytes read=384341

                HDFS: Number of bytes written=120766

                HDFS: Number of read operations=6

                HDFS: Number of large read operations=0

                HDFS: Number of write operations=2

        Job Counters 

                Launched map tasks=1

                Launched reduce tasks=1

                Data-local map tasks=1

                Total time spent by all maps in occupied slots (ms)=7749

                Total time spent by all reduces in occupied slots (ms)=6591

        Map-Reduce Framework

                Map input records=9488

                Map output records=67825

                Map output bytes=643386

                Map output materialized bytes=167524

                Input split bytes=134

                Combine input records=67825

                Combine output records=11900

                Reduce input groups=11900

                Reduce shuffle bytes=167524

                Reduce input records=11900

                Reduce output records=11900

                Spilled Records=23800

                Shuffled Maps =1

                Failed Shuffles=0

                Merged Map outputs=1

                GC time elapsed (ms)=114

                CPU time spent (ms)=6020

                Physical memory (bytes) snapshot=430088192

                Virtual memory (bytes) snapshot=1945890816

                Total committed heap usage (bytes)=317194240

        Shuffle Errors

                BAD_ID=0

                CONNECTION=0

                IO_ERROR=0

                WRONG_LENGTH=0

                WRONG_MAP=0

                WRONG_REDUCE=0

        File Input Format Counters 

                Bytes Read=384207

        File Output Format Counters 

                Bytes Written=120766

You can examine your running jobs

Screen Shot 2014-08-13 at 14.01.48

And result in HDFS UI

Screen Shot 2014-08-13 at 14.04.08

Apache-Spark map-reduce quick overview

I have file containing lines with text. I’d like to count all chars in the text. In case we have small file this is not the problem but what if we have huge text file. In example 1000TB distributed over Hadoop-HDFS.

Apache-Spark is one alternative for that job.

Big picture

Spark map-reduce

Now let’s dig into details


Apache Spark is a fast and general-purpose cluster computing system.
It provides high-level APIs in Java, Scala and Python,
and an optimized engine that supports general execution graphs.
It also supports a rich set of higher-level tools including Shark (Hive on Spark),
Spark SQL for structured data,
MLlib for machine learning,
GraphX for graph processing, and Spark Streaming.

This is java (spark) code:

String master = "local[3]";
String appName = "SparkReduceDemo";

SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaSparkContext sc = new JavaSparkContext(conf);

JavaRDD file = sc.textFile(“data.txt”); // this is our text file

// this is map function – splits tasks between workers
JavaRDD lineLengths = file.map(new Function<String, Integer>() {
public Integer call(String s) {
System.out.println(“line: “+s + ” “+ s.length());
return s.length();
}
});

// this is reduce function – collects results from workers for driver programm
int totalLength = lineLengths.reduce(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer a, Integer b) {
System.out.println(“a: “+ a);
System.out.println(“b: “+ b);
return a + b;
}
});

System.out.println(“Lenght: “+ totalLength);

compile and run. Let’s analyse result:


14/06/19 10:34:42 INFO SecurityManager: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
14/06/19 10:34:42 INFO SecurityManager: Changing view acls to: margusja
14/06/19 10:34:42 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(margusja)
14/06/19 10:34:42 INFO Slf4jLogger: Slf4jLogger started
14/06/19 10:34:42 INFO Remoting: Starting remoting
14/06/19 10:34:43 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@10.10.5.12:52116]
14/06/19 10:34:43 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@10.10.5.12:52116]
14/06/19 10:34:43 INFO SparkEnv: Registering MapOutputTracker
14/06/19 10:34:43 INFO SparkEnv: Registering BlockManagerMaster
14/06/19 10:34:43 INFO DiskBlockManager: Created local directory at /var/folders/vm/5pggdh2x3_s_l6z55brtql3h0000gn/T/spark-local-20140619103443-bc5e
14/06/19 10:34:43 INFO MemoryStore: MemoryStore started with capacity 74.4 MB.
14/06/19 10:34:43 INFO ConnectionManager: Bound socket to port 52117 with id = ConnectionManagerId(10.10.5.12,52117)
14/06/19 10:34:43 INFO BlockManagerMaster: Trying to register BlockManager
14/06/19 10:34:43 INFO BlockManagerInfo: Registering block manager 10.10.5.12:52117 with 74.4 MB RAM
14/06/19 10:34:43 INFO BlockManagerMaster: Registered BlockManager
14/06/19 10:34:43 INFO HttpServer: Starting HTTP Server
14/06/19 10:34:43 INFO HttpBroadcast: Broadcast server started at http://10.10.5.12:52118
14/06/19 10:34:43 INFO HttpFileServer: HTTP File server directory is /var/folders/vm/5pggdh2x3_s_l6z55brtql3h0000gn/T/spark-b5c0fd3b-d197-4318-9d1d-7eda2b856b3c
14/06/19 10:34:43 INFO HttpServer: Starting HTTP Server
14/06/19 10:34:43 INFO SparkUI: Started SparkUI at http://10.10.5.12:4040
2014-06-19 10:34:43.751 java[816:1003] Unable to load realm info from SCDynamicStore
14/06/19 10:34:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
14/06/19 10:34:44 INFO MemoryStore: ensureFreeSpace(146579) called with curMem=0, maxMem=77974732
14/06/19 10:34:44 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 143.1 KB, free 74.2 MB)
14/06/19 10:34:44 INFO FileInputFormat: Total input paths to process : 1
14/06/19 10:34:44 INFO SparkContext: Starting job: reduce at SparkReduce.java:50
14/06/19 10:34:44 INFO DAGScheduler: Got job 0 (reduce at SparkReduce.java:50) with 2 output partitions (allowLocal=false)
14/06/19 10:34:44 INFO DAGScheduler: Final stage: Stage 0(reduce at SparkReduce.java:50)
14/06/19 10:34:44 INFO DAGScheduler: Parents of final stage: List()
14/06/19 10:34:44 INFO DAGScheduler: Missing parents: List()
14/06/19 10:34:44 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[2] at map at SparkReduce.java:43), which has no missing parents
14/06/19 10:34:44 INFO DAGScheduler: Submitting 2 missing tasks from Stage 0 (MappedRDD[2] at map at SparkReduce.java:43)
14/06/19 10:34:44 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
14/06/19 10:34:44 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on executor localhost: localhost (PROCESS_LOCAL)
14/06/19 10:34:44 INFO TaskSetManager: Serialized task 0.0:0 as 1993 bytes in 3 ms
14/06/19 10:34:44 INFO TaskSetManager: Starting task 0.0:1 as TID 1 on executor localhost: localhost (PROCESS_LOCAL)
14/06/19 10:34:44 INFO TaskSetManager: Serialized task 0.0:1 as 1993 bytes in 0 ms
14/06/19 10:34:44 INFO Executor: Running task ID 0
14/06/19 10:34:44 INFO Executor: Running task ID 1
14/06/19 10:34:44 INFO BlockManager: Found block broadcast_0 locally
14/06/19 10:34:44 INFO BlockManager: Found block broadcast_0 locally
14/06/19 10:34:44 INFO HadoopRDD: Input split: file:/Users/margusja/Documents/workspace/Spark_1.0.0/data.txt:0+192
14/06/19 10:34:44 INFO HadoopRDD: Input split: file:/Users/margusja/Documents/workspace/Spark_1.0.0/data.txt:192+193
14/06/19 10:34:44 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
14/06/19 10:34:44 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
14/06/19 10:34:44 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
14/06/19 10:34:44 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
14/06/19 10:34:44 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
14/06/19 10:34:44 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
line: Apache Spark is a fast and general-purpose cluster computing system. 69
line: It provides high-level APIs in Java, Scala and Python, 55
a: 69
b: 55
line: and an optimized engine that supports general execution graphs. 64
a: 124
b: 64
line: It also supports a rich set of higher-level tools including Shark (Hive on Spark), 83
a: 188
b: 83
line: Spark SQL for structured data, 31
line: MLlib for machine learning, 28
a: 31
b: 28
line: GraphX for graph processing, and Spark Streaming. 49
a: 59
b: 49
14/06/19 10:34:44 INFO Executor: Serialized size of result for 0 is 675
14/06/19 10:34:44 INFO Executor: Serialized size of result for 1 is 675
14/06/19 10:34:44 INFO Executor: Sending result for 1 directly to driver
14/06/19 10:34:44 INFO Executor: Sending result for 0 directly to driver
14/06/19 10:34:44 INFO Executor: Finished task ID 1
14/06/19 10:34:44 INFO Executor: Finished task ID 0
14/06/19 10:34:44 INFO TaskSetManager: Finished TID 0 in 88 ms on localhost (progress: 1/2)
14/06/19 10:34:44 INFO TaskSetManager: Finished TID 1 in 79 ms on localhost (progress: 2/2)
14/06/19 10:34:44 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
14/06/19 10:34:44 INFO DAGScheduler: Completed ResultTask(0, 0)
14/06/19 10:34:44 INFO DAGScheduler: Completed ResultTask(0, 1)
14/06/19 10:34:44 INFO DAGScheduler: Stage 0 (reduce at SparkReduce.java:50) finished in 0.110 s
a: 271
b: 108
14/06/19 10:34:45 INFO SparkContext: Job finished: reduce at SparkReduce.java:50, took 0.2684 s
Lenght: 379

(r) above a|b means task was reduce.

2014-06-19 10.47.49

Why there is BigData

Even I am creating continuous data every day via home made temperature sensors in my home

2014-03-31 22.19.16 2014-03-31 22.19.38 2014-03-31 22.20.24 2014-03-31 22.21.40

 

 

And let’s send our data to the “real world” – in to the Internet

2014-03-31 22.29.42

 

And output on zabbix

 

Screen Shot 2014-06-15 at 17.46.37

How to motivate yourself to discover new technology

There is apache-spark.  I installed it examined examples and run them. Worked. Now what?

One way is to go to linkedin and mark yourself now as the expert of spark and forget the topic.

Another way is to create a interesting problem and try to solve it. After that you may go to the linkedin and mark yourself as a expert 😉

So here is my prolem

2014-06-13 12.17.35

What do I have here. From the left to right. Apache-kafka is receives stream like string “a b c” thous are quadratic equation‘s (ax2 + bx + c = 0) input.

Apache-Spark is going to resolve quadratic equation and saves input parameters and results x1 and x2 to hbase.

The big picture

Screen Shot 2014-06-13 at 13.37.05

 

Screen Shot 2014-06-18 at 18.35.26

As you see now the day is much more interesting 🙂

On let’s jump into technology.

First I am going to create HBase table with one column family:

hbase(main):027:0> create ‘rootvorrand’, ‘info’

hbase(main):043:0> describe ‘rootvorrand’
DESCRIPTION ENABLED
‘rootvorrand’, {NAME => ‘info’, DATA_BLOCK_ENCODING => ‘NONE’, BLOOMFILTER => ‘ROW’, REPLICATION_SCOPE => ‘0’, VERSIONS => ‘1’, COMPRESSION => ‘NONE’, MIN_VERSIONS => ‘0’, TT true
L => ‘2147483647’, KEEP_DELETED_CELLS => ‘false’, BLOCKSIZE => ‘65536’, IN_MEMORY => ‘false’, BLOCKCACHE => ‘true’}

1 row(s) in 0.0330 seconds

I can see my new table in UI too

Screen Shot 2014-06-13 at 12.33.19

 

Now I going to create apache-kafka topic with 3 replica and 1 partition

margusja@IRack:~/kafka_2.9.1-0.8.1.1$ bin/kafka-topics.sh –create –topic rootvorrand –partitions 1 –replication-factor 3 –zookeeper vm24:2181

margusja@IRack:~/kafka_2.9.1-0.8.1.1$ bin/kafka-topics.sh –describe –topic rootvorrand –zookeeper vm24.dbweb.ee:2181
Topic:rootvorrand PartitionCount:1 ReplicationFactor:3 Configs:
Topic: rootvorrand Partition: 0 Leader: 3 Replicas: 3,2,1 Isr: 3,2,1

Add some input data

Screen Shot 2014-06-13 at 14.38.09

Now lets set up development environment in Eclipse. I need some external jars. As you can see I am using the latest apache-spark 1.0.0 released about week ago.

 

Screen Shot 2014-06-17 at 12.19.04

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.regex.Pattern;

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.*;

import scala.Tuple2;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;

public class KafkaSparkHbase {

private static final Pattern SPACE = Pattern.compile(” “);
private static HTable table;

public static void main(String[] args) {

String topic = “rootvorrand”;
int numThreads = 1;
String zkQuorum = “vm38:2181,vm37:2181,vm24:2181”;
String KafkaConsumerGroup = “sparkScript”;
String master = “spark://dlvm2:7077”;

// HBase config
Configuration conf = HBaseConfiguration.create();
conf.set(“hbase.defaults.for.version”,”0.96.0.2.0.6.0-76-hadoop2″);
conf.set(“hbase.defaults.for.version.skip”,”true”);
conf.set(“hbase.zookeeper.quorum”, “vm24,vm38,vm37”);
conf.set(“hbase.zookeeper.property.clientPort”, “2181”);
conf.set(“hbase.rootdir”, “hdfs://vm38:8020/user/hbase/data”);

try {
table = new HTable(conf, “rootvorrand”);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

//SparkConf sparkConf = new SparkConf().setAppName(“KafkaSparkHbase”).setMaster(master).setJars(jars);

JavaStreamingContext jssc = new JavaStreamingContext(master, “KafkaWordCount”,
new Duration(2000), System.getenv(“SPARK_HOME”),
JavaStreamingContext.jarOfClass(KafkaSparkHbase.class));

//JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));

Map<String, Integer> topicMap = new HashMap<String, Integer>();
topicMap.put(topic, numThreads);
JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, zkQuorum, KafkaConsumerGroup, topicMap);

JavaDStream lines = messages.map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});

// resolve quadratic equation
JavaDStream result = lines.map(
new Function<String, String>()
{
@Override
public String call(String x) throws Exception {
//System.out.println(“Input is: “+ x);
Integer a,b,c,y, d;
String[] splitResult = null;
Integer x1 = null;
Integer x2 = null;

splitResult = SPACE.split(x);
//System.out.println(“Split: “+ splitResult.length);
if (splitResult.length == 3)
{
a = Integer.valueOf(splitResult[0]);
b = Integer.valueOf(splitResult[1]);
c = Integer.valueOf(splitResult[2]);

y=(b*b)-(4*a*c);
d=(int) Math.sqrt(y);
//System.out.println(“discriminant: “+ d);
if (d > 0)
{
x1=(-b+d)/(2*a);
x2=(-b-d)/(2*a);
}

}

return x + ” “+ x1 + ” “+ x2;
}
}
);

result.foreachRDD(
new Function<JavaRDD, Void>() {
@Override
public Void call(final JavaRDD x) throws Exception {

System.out.println(x.count());
List arr = x.toArray();
for (String entry : arr) {
//System.out.println(entry);
Put p = new Put(Bytes.toBytes(UUID.randomUUID().toString()));
p.add(Bytes.toBytes(“info”), Bytes.toBytes(“record”), Bytes.toBytes(entry));
table.put(p);
}
table.flushCommits();

return null;
}

}
);

//result.print();
jssc.start();
jssc.awaitTermination();
}

}

 

pack it and run it:

[root@dlvm2 ~]java -cp kafkasparkhbase-0.1.jar KafkaSparkHbase

14/06/17 12:14:48 INFO JobScheduler: Finished job streaming job 1402996488000 ms.0 from job set of time 1402996488000 ms
14/06/17 12:14:48 INFO JobScheduler: Total delay: 0.052 s for time 1402996488000 ms (execution: 0.047 s)
14/06/17 12:14:48 INFO MappedRDD: Removing RDD 134 from persistence list
14/06/17 12:14:48 INFO BlockManager: Removing RDD 134
14/06/17 12:14:48 INFO MappedRDD: Removing RDD 133 from persistence list
14/06/17 12:14:48 INFO BlockManager: Removing RDD 133
14/06/17 12:14:48 INFO BlockRDD: Removing RDD 132 from persistence list
14/06/17 12:14:48 INFO BlockManager: Removing RDD 132
14/06/17 12:14:48 INFO KafkaInputDStream: Removing blocks of RDD BlockRDD[132] at BlockRDD at ReceiverInputDStream.scala:69 of time 1402996488000 ms
14/06/17 12:14:48 INFO BlockManagerInfo: Added input-0-1402996488400 in memory on dlvm2:33264 (size: 78.0 B, free: 294.9 MB)
14/06/17 12:14:48 INFO BlockManagerInfo: Added input-0-1402996488400 in memory on dlvm1:41044 (size: 78.0 B, free: 294.9 MB)
14/06/17 12:14:50 INFO ReceiverTracker: Stream 0 received 1 blocks
14/06/17 12:14:50 INFO JobScheduler: Starting job streaming job 1402996490000 ms.0 from job set of time 1402996490000 ms
14/06/17 12:14:50 INFO JobScheduler: Added jobs for time 1402996490000 ms
14/06/17 12:14:50 INFO SparkContext: Starting job: take at DStream.scala:593
14/06/17 12:14:50 INFO DAGScheduler: Got job 5 (take at DStream.scala:593) with 1 output partitions (allowLocal=true)
14/06/17 12:14:50 INFO DAGScheduler: Final stage: Stage 6(take at DStream.scala:593)
14/06/17 12:14:50 INFO DAGScheduler: Parents of final stage: List()
14/06/17 12:14:50 INFO DAGScheduler: Missing parents: List()
14/06/17 12:14:50 INFO DAGScheduler: Computing the requested partition locally
14/06/17 12:14:50 INFO BlockManager: Found block input-0-1402996488400 remotely
Input is: 1 4 3
Split: 3
discriminant: 2
x1: -1 x2: -3
14/06/17 12:14:50 INFO SparkContext: Job finished: take at DStream.scala:593, took 0.011823803 s
——————————————-
Time: 1402996490000 ms
——————————————-
1 4 3 -1 -3

 

As we can see. Input data from kafka is 1 4 3 and spark output line is 1 4 3 -1 -3 where -1 -3 are roots.

Screen Shot 2014-06-17 at 12.24.34

 

Screen Shot 2014-06-18 at 13.35.02

Screen Shot 2014-06-17 at 12.57.18

 

We can also see that there no lag in kafka queue. Spark worker is consumed all data from the queue

Screen Shot 2014-06-17 at 12.26.21

Let’s empty our HBase table:

Screen Shot 2014-06-18 at 13.21.09

put some input variables to kafka queue

-2 -3 10
2 -4 -10

And scan HBase table rootvorrand

Screen Shot 2014-06-18 at 13.31.45

As you can see. There are our input variables and roots.