The CAP theorem

Before we get into the role of NOSQL, we must rst understand the CAP theorem. In the theory of computer science, the CAP theorem or Brewer’s theorem talks about distributed consistency. It states that it is impossible to achieve all of the following in a distributed system:

  • Consistency: Every client sees the most recently updated data state
  • Availability: The distributed system functions as expected, even if there are node failures
  • Partition tolerance: Intermediate network failure among nodes does not impact system functioning

    Although all three are impossible to achieve, any two can be achieved by the systems. That means in order to get high availability and partition tolerance, you need to sacri ce consistency. There are three types of systems:

  • CA: Data is consistent between all nodes, and you can read/write from any node, while you cannot afford to let your network go down. (For example: relational databases, columnar relational stores)
  • CP: Data is consistent and maintains tolerance for partitioning and preventing data going out of sync. (For example: Berkeley DB (key-value), MongoDB (document oriented), and HBase (columnar))
  • AP: Nodes are online always, but they may not get you the latest data; however, they sync whenever the lines are up. (For example: Dynamo (key-value), CouchDB (document oriented), and Cassandra (columnar))

    High availability can achieved through data replication; consistency is achieved by updating multiple nodes for changes in data. Relational databases are designed to achieve CA capabilities. NOSQL databases can either achieve CP or AP.

Source: Scaling Big Data with Hadoop and Solr, H. Karambelkar (2013)

Hive UDF

Sometimes (often) we need some custom functions to work with records. Hive has most necessary functions but still if you find yourself in situation where you need do some hack in your programming language after you got records there is place to consider to use Hive UDF.

In example in case we need add string “Hello Margusja” before field. Yes there is concat in Hive string functions but this is an example how to build and deploy UDF’s. So in case there is no any alternative to put two string together we are coing to build own UDF.

Java code is very simple – you just have to extend org.apache.hadoop.hive.ql.exec.UDF:

package com.margusja.example;

import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.Text;

public final class DemoUDF extends UDF {

String hello = “Hello Margusja”;

public Text evaluate(final Text s) {
if (s == null) { return null; }
return new Text(hello + ” ” + s );
}
}

 

build and package it in example HiveDemoUDF.jar

Now in hive command line add it to classpath:

hive> add jar /tmp/HiveDemoUDF.jar;

Added /tmp/HiveDemoUDF.jar to class path
Added resource: /tmp/HiveDemoUDF.jar

Now you can use your brand new UDF:

hive> select my_lower(“input”);
Query ID = margusja_20141106153636_564cd6c4-01f1-4daa-841c-4388255135a8
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there’s no reduce operator
Starting Job = job_1414681778119_0094, Tracking URL = http://nn1.server.int:8088/proxy/application_1414681778119_0094/
Kill Command = /usr/lib/hadoop/bin/hadoop job -kill job_1414681778119_0094
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 0
2014-11-06 15:36:21,935 Stage-1 map = 0%, reduce = 0%
2014-11-06 15:36:31,206 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 1.18 sec
MapReduce Total cumulative CPU time: 1 seconds 180 msec
Ended Job = job_1414681778119_0094
MapReduce Jobs Launched:
Job 0: Map: 1 Cumulative CPU: 1.18 sec HDFS Read: 281 HDFS Write: 21 SUCCESS
Total MapReduce CPU Time Spent: 1 seconds 180 msec
OK
Hello Margusja input
Time taken: 21.417 seconds, Fetched: 1 row(s)

Kafka Benchmark – Could not find or load main class org.apache.kafka.clients.tools.ProducerPerformance

OS: Centos 6.5

Kafka from kafka-0.8.1.2.1.4.0-632.el6.noarch repo. Installed using yum.

When I wanted to use perfomance tools:

[server1 kafka]# bin/kafka-run-class.sh org.apache.kafka.clients.tools.ProducerPerformance

Error: Could not find or load main class org.apache.kafka.clients.tools.ProducerPerformance

Then I tried different methods to get workaround but for me worked following:

cd /usr/local/

git clone https://git-wip-us.apache.org/repos/asf/kafka.git

 yum install git

 git clone https://git-wip-us.apache.org/repos/asf/kafka.git

  cd kafka

  git checkout -b 0.8 remotes/origin/0.8

  ./sbt update

  ./sbt package

  ./sbt assembly-package-dependency

  ./bin/kafka-producer-perf-test.sh – it now works!

[root@server1 kafka]# ./bin/kafka-producer-perf-test.sh –topic kafkatopic –broker-list server1:9092 –messages 1000000 –show-detailed-stats –message-size 1024

start.time, end.time, compression, message.size, batch.size, total.data.sent.in.MB, MB.sec, total.data.sent.in.nMsg, nMsg.sec

2014-10-07 09:45:13:825, 2014-10-07 09:45:23:781, 0, 1024, 200, 976.56, 98.0878, 1000000, 100441.9446

Hadoop namenode HA and hive metastore location urls

hive_logo_medium

Recently switched hadoop namenode to namenode HA. Most steps went successfully but hive was unhappy and tried to locate files via old url. So I found tutorial http://docs.hortonworks.com/HDPDocuments/HDP2/HDP-2.1.5/bk_system-admin-guide/content/sysadminguides_ha_chap3.html and used thous commands but only some tables changed after using thous commands.

Then I did it manually and it helped. In case your hive metadata is in mysql then you can connect to your hive db and use command:

UPDATE SDS SET LOCATION=REPLACE(LOCATION, ‘ hdfs://mycluster’, ‘hdfs://namenode:8020’);

After it I can switch active namenodes around and hive still can locate files via metastore.

Maybe this is helpful for someone 🙂

Hadoop HDFS is CORRUPT

There are probably many ways to fix the problem.

In my case thous commands helped

# su – hdfs

Find corrupt and missing files on hdfs

# hadoop fsck / | egrep -v ‘^\.+$’ | grep -v eplica

Lets delete them. This is not a problem in case we are going to delete one block – we have replicas in another machines

#hadoop fsck / -delete

And now my hdfs is healthy

apache hive how to join log files and use sql queries over joined data

Let’s create two very simple log files. Into log1.txt file lets put in example users problems log data and into log2.txt file solutions log data

log1.txt:

user1 | 2014-09-23 | error message 1
user2 | 2014-09-23 | error message 2
user3 | 2014-09-23 | error message 3
user4 | 2014-09-23 | error message 1
user5 | 2014-09-23 | error message 2
user6 | 2014-09-23 | error message 12
user7 | 2014-09-23 | error message 11
user1 | 2014-09-24 | error message 1
user2 | 2014-09-24 | error message 2
user3 | 2014-09-24 | error message 3
user4 | 2014-09-24 | error message 10
user1 | 2014-09-24 | error message 17
user2 | 2014-09-24 | error message 13
user1 | 2014-09-24 | error message 1

log2.txt:
user1 | support2 | solution message 1
user2 | support1 | solution message 2
user3 | support2 | solution message 3
user1 | support1 | solution message 4
user2 | support2 | solution message 5
user4 | support1 | solution message 6
user2 | support2 | solution message 7
user5 | support1 | solution message 8

Create two tables for datasets above:

hive> create table log1 (user STRING, date STRING, error STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘|’ STORED AS TEXTFILE;       

OK

Time taken: 5.968 seconds

hive> LOAD DATA INPATH ‘/user/margusja/hiveinput/log1.txt’ OVERWRITE INTO TABLE log1;                                             

Loading data to table default.log1

rmr: DEPRECATED: Please use ‘rm -r’ instead.

Moved: ‘hdfs://bigdata1.host.int:8020/apps/hive/warehouse/log1’ to trash at: hdfs://bigdata1.host.int:8020/user/margusja/.Trash/Current

Table default.log1 stats: [numFiles=1, numRows=0, totalSize=523, rawDataSize=0]

OK

Time taken: 4.687 seconds

hive> create table log2 (user STRING, support STRING, solution STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘|’ STORED AS TEXTFILE;

OK

Time taken: 0.997 seconds

hive> LOAD DATA INPATH ‘/user/margusja/hiveinput/log2.txt’ OVERWRITE INTO TABLE log2;                                                   

Loading data to table default.log2

rmr: DEPRECATED: Please use ‘rm -r’ instead.

Moved: ‘hdfs://bigdata1.host.int:8020/apps/hive/warehouse/log2’ to trash at: hdfs://bigdata1.host.int:8020/user/margusja/.Trash/Current

Table default.log2 stats: [numFiles=1, numRows=0, totalSize=304, rawDataSize=0]

OK

Time taken: 0.72 seconds

hive>

Now let’s make SQL over two datafile placed to HDFS storage using HIVE:

hive> select log1.user, log1.date, log1.error, log2.support, log2.solution from log2 join log1 on (log1.user = log2.user);

And result. We see now how two separated log file are joined together and now we can see in example that user2 has error message 2 in 2012-09-23 and support2 offered solution message 7.

user1  2014-09-23  error message 1 support2  solution message 1

user1  2014-09-23  error message 1 support1  solution message 4

user2  2014-09-23  error message 2 support1  solution message 2

user2  2014-09-23  error message 2 support2  solution message 5

user2  2014-09-23  error message 2 support2  solution message 7

user3  2014-09-23  error message 3 support2  solution message 3

user4  2014-09-23  error message 1 support1  solution message 6

user5  2014-09-23  error message 2 support1  solution message 8

user1  2014-09-24  error message 1 support2  solution message 1

user1  2014-09-24  error message 1 support1  solution message 4

user2  2014-09-24  error message 2 support1  solution message 2

user2  2014-09-24  error message 2 support2  solution message 5

user2  2014-09-24  error message 2 support2  solution message 7

user3  2014-09-24  error message 3 support2  solution message 3

user4  2014-09-24  error message 10 support1  solution message 6

user1  2014-09-24  error message 17 support2  solution message 1

user1  2014-09-24  error message 17 support1  solution message 4

user2  2014-09-24  error message 13 support1  solution message 2

user2  2014-09-24  error message 13 support2  solution message 5

user2  2014-09-24  error message 13 support2  solution message 7

user1  2014-09-24  error message 1 support2  solution message 1

user1  2014-09-24  error message 1 support1  solution message 4

Time taken: 34.561 seconds, Fetched: 22 row(s)

More cool things:

We can select only specified user:

hive> select log1.user, log1.date, log1.error, log2.support, log2.solution from log2 join log1 on (log1.user = log2.user) where log1.user like ‘%user1%’;

user1  2014-09-23  error message 1 support2  solution message 1

user1  2014-09-23  error message 1 support1  solution message 4

user1  2014-09-24  error message 1 support2  solution message 1

user1  2014-09-24  error message 1 support1  solution message 4

user1  2014-09-24  error message 17 support2  solution message 1

user1  2014-09-24  error message 17 support1  solution message 4

user1  2014-09-24  error message 1 support2  solution message 1

user1  2014-09-24  error message 1 support1  solution message 4

Time taken: 31.932 seconds, Fetched: 8 row(s)

We can query by date:

hive> select log1.user, log1.date, log1.error, log2.support, log2.solution from log2 join log1 on (log1.user = log2.user) where log1.date like ‘%2014-09-23%’;

user1  2014-09-23  error message 1 support2  solution message 1

user1  2014-09-23  error message 1 support1  solution message 4

user2  2014-09-23  error message 2 support1  solution message 2

user2  2014-09-23  error message 2 support2  solution message 5

user2  2014-09-23  error message 2 support2  solution message 7

user3  2014-09-23  error message 3 support2  solution message 3

user4  2014-09-23  error message 1 support1  solution message 6

user5  2014-09-23  error message 2 support1  solution message 8

Now lets forward our awesome join sentence to the next table – log3 where we are going to hold our joined data

hive> create table log3 (user STRING, date STRING, error STRING, support STRING, solution STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘ \t’ STORED AS TEXTFILE;

hive> insert table log3 select log1.user, log1.date, log1.error, log2.support, log2.solution from log2 join log1 on (log1.user = log2.user)

And now we cat use very simple sql to get data:

hive> select * from log3;

OK

user1  2014-09-23  error message 1 support2  solution message 1

user1  2014-09-23  error message 1 support1  solution message 4

user2  2014-09-23  error message 2 support1  solution message 2

user2  2014-09-23  error message 2 support2  solution message 5

user2  2014-09-23  error message 2 support2  solution message 7

user3  2014-09-23  error message 3 support2  solution message 3

user4  2014-09-23  error message 1 support1  solution message 6

user5  2014-09-23  error message 2 support1  solution message 8

user1  2014-09-24  error message 1 support2  solution message 1

user1  2014-09-24  error message 1 support1  solution message 4

user2  2014-09-24  error message 2 support1  solution message 2

user2  2014-09-24  error message 2 support2  solution message 5

user2  2014-09-24  error message 2 support2  solution message 7

user3  2014-09-24  error message 3 support2  solution message 3

user4  2014-09-24  error message 10 support1  solution message 6

user1  2014-09-24  error message 17 support2  solution message 1

user1  2014-09-24  error message 17 support1  solution message 4

user2  2014-09-24  error message 13 support1  solution message 2

user2  2014-09-24  error message 13 support2  solution message 5

user2  2014-09-24  error message 13 support2  solution message 7

user1  2014-09-24  error message 1 support2  solution message 1

user1  2014-09-24  error message 1 support1  solution message 4

Time taken: 0.075 seconds, Fetched: 22 row(s)

hive>

WARN org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit code from container container_xxx is : 1

2014-09-03 14:34:46,574 INFO SecurityLogger.org.apache.hadoop.ipc.Server: Auth successful for appattempt_1409734969311_0027_000001 (auth:SIMPLE)

2014-09-03 14:34:46,773 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl: Start request for container_1409734969311_0027_01_000001 by user margusja

2014-09-03 14:34:46,810 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl: Creating a new application reference for app application_1409734969311_0027

2014-09-03 14:34:46,816 INFO org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger: USER=margusjaIP=90.190.106.48OPERATION=Start Container RequestTARGET=ContainerManageImplRESULT=SUCCESSAPPID=application_1409734969311_0027CONTAINERID=container_1409734969311_0027_01_000001

2014-09-03 14:34:46,817 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application: Application application_1409734969311_0027 transitioned from NEW to INITING

2014-09-03 14:34:46,818 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application: Adding container_1409734969311_0027_01_000001 to application application_1409734969311_0027

2014-09-03 14:34:46,828 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application: Application application_1409734969311_0027 transitioned from INITING to RUNNING

2014-09-03 14:34:46,838 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container: Container container_1409734969311_0027_01_000001 transitioned from NEW to LOCALIZING

2014-09-03 14:34:46,838 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices: Got event CONTAINER_INIT for appId application_1409734969311_0027

2014-09-03 14:34:46,895 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalizedResource: Resource hdfs://h14.dbweb.ee:8020/user/margusja/.staging/job_1409734969311_0027/job.jar transitioned from INIT to DOWNLOADING

2014-09-03 14:34:46,895 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalizedResource: Resource hdfs://h14.dbweb.ee:8020/user/margusja/.staging/job_1409734969311_0027/job.splitmetainfo transitioned from INIT to DOWNLOADING

Screen Shot 2014-09-03 at 15.05.25

2014-09-03 14:34:46,895 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalizedResource: Resource hdfs://h14.dbweb.ee:8020/user/margusja/.staging/job_1409734969311_0027/job.split transitioned from INIT to DOWNLOADING

2014-09-03 14:34:46,895 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalizedResource: Resource hdfs://h14.dbweb.ee:8020/user/margusja/.staging/job_1409734969311_0027/job.xml transitioned from INIT to DOWNLOADING

2014-09-03 14:34:46,895 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService: Created localizer for container_1409734969311_0027_01_000001

2014-09-03 14:34:47,105 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ResourceLocalizationService: Writing credentials to the nmPrivate file /tmp/hadoop-yarn/nm-local-dir/nmPrivate/container_1409734969311_0027_01_000001.tokens. Credentials list: 

2014-09-03 14:34:47,109 INFO org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Initializing user margusja

2014-09-03 14:34:47,172 INFO org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Copying from /tmp/hadoop-yarn/nm-local-dir/nmPrivate/container_1409734969311_0027_01_000001.tokens to /tmp/hadoop-yarn/nm-local-dir/usercache/margusja/appcache/application_1409734969311_0027/container_1409734969311_0027_01_000001.tokens

2014-09-03 14:34:47,172 INFO org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: CWD set to /tmp/hadoop-yarn/nm-local-dir/usercache/margusja/appcache/application_1409734969311_0027 = file:/tmp/hadoop-yarn/nm-local-dir/usercache/margusja/appcache/application_1409734969311_0027

2014-09-03 14:34:48,150 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalizedResource: Resource hdfs://h14.dbweb.ee:8020/user/margusja/.staging/job_1409734969311_0027/job.jar(->file:/tmp/hadoop-yarn/nm-local-dir/usercache/margusja/appcache/application_1409734969311_0027/filecache/10/job.jar) transitioned from DOWNLOADING to LOCALIZED

2014-09-03 14:34:48,180 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalizedResource: Resource hdfs://h14.dbweb.ee:8020/user/margusja/.staging/job_1409734969311_0027/job.splitmetainfo(->file:/tmp/hadoop-yarn/nm-local-dir/usercache/margusja/appcache/application_1409734969311_0027/filecache/11/job.splitmetainfo) transitioned from DOWNLOADING to LOCALIZED

2014-09-03 14:34:48,211 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalizedResource: Resource hdfs://h14.dbweb.ee:8020/user/margusja/.staging/job_1409734969311_0027/job.split(->file:/tmp/hadoop-yarn/nm-local-dir/usercache/margusja/appcache/application_1409734969311_0027/filecache/12/job.split) transitioned from DOWNLOADING to LOCALIZED

2014-09-03 14:34:48,249 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalizedResource: Resource hdfs://h14.dbweb.ee:8020/user/margusja/.staging/job_1409734969311_0027/job.xml(->file:/tmp/hadoop-yarn/nm-local-dir/usercache/margusja/appcache/application_1409734969311_0027/filecache/13/job.xml) transitioned from DOWNLOADING to LOCALIZED

2014-09-03 14:34:48,251 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container: Container container_1409734969311_0027_01_000001 transitioned from LOCALIZING to LOCALIZED

2014-09-03 14:34:48,300 INFO org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container: Container container_1409734969311_0027_01_000001 transitioned from LOCALIZED to RUNNING

2014-09-03 14:34:48,310 INFO org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: launchContainer: [nice, -n, 0, bash, /tmp/hadoop-yarn/nm-local-dir/usercache/margusja/appcache/application_1409734969311_0027/container_1409734969311_0027_01_000001/default_container_executor.sh]

2014-09-03 14:34:48,557 WARN org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exit code from container container_1409734969311_0027_01_000001 is : 1

2014-09-03 14:34:48,559 WARN org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor: Exception from container-launch with container ID: container_1409734969311_0027_01_000001 and exit code: 1

org.apache.hadoop.util.Shell$ExitCodeException: 

at org.apache.hadoop.util.Shell.runCommand(Shell.java:505)

at org.apache.hadoop.util.Shell.run(Shell.java:418)

at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:650)

at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:195)

at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:300)

at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:81)

at java.util.concurrent.FutureTask.run(FutureTask.java:262)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)

The problem:

  • org.apache.hadoop.mapreduce.v2.app.MRAppMaster was missing!

Solution: you need install:

(1/2): hadoop-client                                                                                                                                                                                                    |  16 kB     00:00     

(2/2): hadoop-mapreduce