Monthly Archives: October 2014

Build_Jdbc_connection_to_Hive

Before this, the Hive should be correctly installed. This not only means that you can enter hive, but also your hive can interact with the mysql you configured.
1. Jars needed. It’s better you import all .jar files in /Hive/lib. Besides, you need hadoop-common-2.3.0.jar and slf4j-api-1.6.6.jar.Include them:
activation-1.1.jar
ant-1.9.1.jar
ant-launcher-1.9.1.jar
antlr-2.7.7.jar
antlr-runtime-3.4.jar
asm-commons-3.1.jar
asm-tree-3.1.jar
avro-1.7.5.jar
bonecp-0.8.0.RELEASE.jar
commons-cli-1.2.jar
commons-codec-1.4.jar
commons-collections-3.1.jar
commons-compress-1.4.1.jar
commons-httpclient-3.0.1.jar
commons-io-2.4.jar
commons-lang-2.4.jar
commons-lang3-3.1.jar
commons-logging-1.1.3.jar
datanucleus-api-jdo-3.2.6.jar
datanucleus-core-3.2.10.jar
datanucleus-rdbms-3.2.9.jar
derby-10.10.1.1.jar
geronimo-annotation_1.0_spec-1.1.1.jar
geronimo-jaspic_1.0_spec-1.0.jar
geronimo-jta_1.1_spec-1.1.1.jar
groovy-all-2.1.6.jar
guava-11.0.2.jar
hadoop-common-2.3.0.jar
hamcrest-core-1.1.jar
hive-ant-0.13.1.jar
hive-beeline-0.13.1.jar
hive-cli-0.13.1.jar
hive-common-0.13.1.jar
hive-contrib-0.13.1.jar
hive-exec-0.13.1.jar
hive-hbase-handler-0.13.1.jar
hive-hwi-0.13.1.jar
hive-jdbc-0.13.1.jar
hive-metastore-0.13.1.jar
hive-serde-0.13.1.jar
hive-service-0.13.1.jar
hive-shims-0.13.1.jar
hive-shims-0.20-0.13.1.jar
hive-shims-0.20S-0.13.1.jar
hive-shims-0.23-0.13.1.jar
hive-shims-common-0.13.1.jar
hive-shims-common-secure-0.13.1.jar
hive-testutils-0.13.1.jar
httpclient-4.2.5.jar
httpcore-4.2.5.jar
jdo-api-3.0.1.jar
jetty-6.1.26.jar
jetty-all-7.6.0.v20120127.jar
jetty-util-6.1.26.jar
jline-0.9.94.jar
jpam-1.1.jar
jsr305-1.3.9.jar
jta-1.1.jar
junit-4.10.jar
libfb303-0.9.0.jar
libthrift-0.9.0.jar
log4j-1.2.16.jar
mail-1.4.1.jar
oro-2.0.8.jar
paranamer-2.3.jar
servlet-api-2.5-20081211.jar
servlet-api-2.5.jar
slf4j-api-1.6.6.jar
snappy-java-1.0.5.jar
ST4-4.0.4.jar
stax-api-1.0.1.jar
stringtemplate-3.2.1.jar
tempus-fugit-1.1.jar
velocity-1.5.jar
xz-1.0.jar
zookeeper-3.4.5.jar

2. Run the following code, and you can find your result.

  1. import java.sql.Connection;
  2. import java.sql.DriverManager;
  3. import java.sql.ResultSet;
  4. import java.sql.SQLException;
  5. import java.sql.Statement;
  6. public class HiveJdbcConenection {
  7.     private static String driverName=”org.apache.hadoop.hive.jdbc.HiveDriver”;
  8.     private static String url=”jdbc:hive://192.168.1.160:10000/default”;
  9.     private static String userName=””;
  10.     private static String password=””;
  11.     public static void main(String[] args) {
  12.         try {
  13.             Class.forName(driverName);
  14.             Connection conn = DriverManager.getConnection(url, “”, “”);
  15.             Statement stmt = conn.createStatement();
  16.             String sql = “select * from hive_1_1”;
  17.             ResultSet rs = stmt.executeQuery(sql);
  18.             while(rs.next()){
  19.                 String line = rs.getString(1) +”\t”+rs.getString(2)+”\t”+rs.getString(3);
  20.                 System.out.println(line);
  21.             }
  22.         } catch (SQLException e) {
  23.             System.out.println(“url, userName or password is wrong!”);
  24.             e.printStackTrace();
  25.         } catch (ClassNotFoundException e) {
  26.             System.out.println(“Driver is not found!”);
  27.             e.printStackTrace();
  28.         }
  29.     }
  30. }

3. Result

Debug:
In my case, I got the error message when I ran my code.
org.apache.thrift.transport.TTransportException: java.net.SocketException: Connection reset

I used netstat -tulpn, I saw a process is taking up that port. The easiest way is that you kill it, and start the hive server again:
kill -15 PID
hive –service hiveserver &

Or, you can establish the hiveserver with a different port:
hive –service hiveserver -p 10001 &

The smybol ‘&’ means it starts in background. With it, you can still use your CLI while the Hiveserver is running.

Create hadoop work environment in myeclipse

Hadoop requires a lot of jars. Sometimes, we don’t know which jar should we include. It took me for a very long time. I got bored of it. Finally, I found the maven, it can build the hadoop environment easily and fast. Here we go:
1. Create a Maven project


2. Open the pom.xml, put the following code into the <dependencies> part.
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.3.0</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.3.0</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.3.0</version>
</dependency>

3. Save the pom.xml. While it is saving, you can see Myeclipse is building the workspace. It may probably download jar

files online.

4. After the building workspace is done, you can see necessary jars are prepared automatically by maven in “Maven Dependencies”.

Format of Mapper and Reducer

The fomular of MapReduce can be described as follow. Besides, you can find the configuration of a Job.
map: (K1, V1) → list(K2, V2)
reduce: (K2, list(V2)) → list(K3, V3)

(K1, V1):
jobConf.setInputKeyClass(K1. class );
jobConf.setInputValueClass(V1. class );

list(K2, V2):
job.setMapOutputKeyClass(K2.class);
job.setMapOutputValueClass(V2.class);

list(K3, V3):
jobConf.setOutputKeyClass(K3. class );
jobConf.setOutputValueClass(V3. class );

Normally, (K2, V2) equals (K3, V3). I show the code1:

  1. public class Dedup2 {
  2.     public static class Map extends Mapper<Object,Text,Text,Text>{
  3.         private static Text line=new Text();
  4.         public void map(Object key,Text value,Context context)
  5.                 throws IOException,InterruptedException{
  6.             line=value;
  7.             context.write(line, new Text(“”));
  8.         }
  9.     }
  10.     public static class Reduce extends Reducer<Text,Text,Text,Text>{
  11.         public void reduce(Text key,Iterable<Text> values,Context context)
  12.                 throws IOException,InterruptedException{
  13.             context.write(key, new Text(“”));
  14.         }
  15.     }
  16.     public static void main(String[] args) throws Exception{
  17.         Configuration conf = new Configuration();
  18.         System.setProperty(“HADOOP_USER_NAME”, “root”);
  19.         String[] otherArgs = {“hdfs://centmaster:9000/input”, “hdfs://centmaster:9000/output/debup1”};
  20.         Job job = new Job(conf, “Data Deduplication”);
  21.         job.setJarByClass(Dedup.class);
  22.         job.setMapperClass(Map.class);
  23.         job.setCombinerClass(Reduce.class);
  24.         job.setReducerClass(Reduce.class);
  25.         job.setOutputKeyClass(Text.class);
  26.         job.setOutputValueClass(Text.class);
  27.         FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  28.         FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  29.         System.exit(job.waitForCompletion(true) ? 0 : 1);
  30.      }
  31. }

But I want to set a different (K2, V2) from (K3, V3). So I did the following change:

  1. public class Dedup{
  2.     public static class Map extends Mapper<Object,Text,Text,IntWritable>{   //Change Text to IntWritable
  3.         private static Text line=new Text();
  4.         public void map(Object key,Text value,Context context)
  5.                 throws IOException,InterruptedException{
  6.             line=value;
  7.             context.write(line, new IntWritable(0));    //Change Text(“”) to IntWritable(0)
  8.         }
  9.     }
  10.     public static class Reduce extends Reducer<Text,IntWritable,Text,Text>{     //Change Text to IntWritable
  11.         public void reduce(Text key,Iterable<IntWritable> values,Context context)   //Change Text to IntWritable
  12.                 throws IOException,InterruptedException{
  13.             context.write(key, new Text(“”));
  14.         }
  15.     }
  16.     public static void main(String[] args) throws Exception{
  17.         Configuration conf = new Configuration();
  18.         System.setProperty(“HADOOP_USER_NAME”, “root”);
  19.         String[] otherArgs = {“hdfs://centmaster:9000/input”, “hdfs://centmaster:9000/output/debup2”};
  20.         Job job = new Job(conf, “Data Deduplication”);
  21.         job.setJarByClass(Dedup.class);
  22.         job.setMapperClass(Map.class);
  23.         job.setCombinerClass(Reduce.class);
  24.         job.setReducerClass(Reduce.class);
  25.         job.setOutputKeyClass(Text.class);
  26.         job.setOutputValueClass(Text.class);
  27.     job.setMapOutputValueClass(IntWritable.class);    //New added. To indicate the Mapper Output Value is not default Text, but IntWritable
  28.         FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
  29.         FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
  30.         System.exit(job.waitForCompletion(true) ? 0 : 1);
  31.      }
  32. }

After the change, I run the code. It doesn’t generate the output. I found out that Reducer is not running. There is no error information in eclipse. I added the try…catch… to Reducer code:

  1.     public static class Reduce extends Reducer<Text,IntWritable,Text,Text>{
  2.         public void reduce(Text key,Iterable<IntWritable> values,Context context)
  3.                 throws IOException,InterruptedException{
  4.             try{
  5.                 context.write(key, new Text(“”));
  6.                 System.out.println(“in reducer”);
  7.             }catch(Exception e){
  8.                 System.out.println(e);
  9.             }
  10.         }
  11.     }

The error information shows:
java.io.IOException: wrong value class: class org.apache.hadoop.io.Text is not class org.apache.hadoop.io.IntWritable

Google it, I found it is because that the Combiner shouldn’t be set. So I commented this line:
//job.setCombinerClass(Reduce.class);

Ding… The output came out. This debug took me 1 day to solve.

Summary:
Back to the original rule of Mapper and Reducer. It is like below:

(K1, V1):
jobConf.setInputKeyClass(K1. class );
jobConf.setInputValueClass(V1. class );

list(K2, V2):
job.setMapOutputKeyClass(K2.class);
job.setMapOutputValueClass(V2.class);

list(K3, V3):
jobConf.setOutputKeyClass(K3. class );
jobConf.setOutputValueClass(V3. class );

But if you want a different (K2, V2) from (K3, V3), the CombinerClass shouldn’t be set. Comment it.

HA Federation on Hadoop2.0


In Hadoop 1.0, there is only 1 NameNode in whole cluster, this brings the risk of “single point failure”. In order to solve this problem, Hadoop 2 introduces HA mechanism.
In the picture, there are 4 DataNodes, and 4 NameNodes. 2 NameNodes are active, 2 NameNodes are standby.
Let’s see the left group of NameNode, one is active, another one is standby. In a cluster, there are at least 3 JournalNode. JournalNode share the active NameNode with the standby NameNode. In this way, the group of active NameNode and standby NameNode is a HA1.

In the right, there is another HA2(one active NameNode and one standby NameNode). HA2 manages the 4 DataNodes too. But HA1 and HA2 have different files, directories. For examlpe, if you use ls -l HA1, it may shows:
file1
file2
file3

When you use ls -l HA2, it may shows:
file4
file5
file6

In this case, we call HA1 is a federeation, HA2 is another federation. They operates its own HDFS separately. But they use the same DataNodes.

Hadoop ecosystem


From hadoop 2.0, yarn is introduced. Yarn is a resource management, similar to JobTracker. On yarn, different calculation model can be implemented. Such as MapReduce, Tez, Storm, Spark. Database like hbase, hive are supported on yarn.
MapReduce, use Map, Reduce, <Key, Value> to calculate.
Storm, there is constant input to the calculation model. Once there is new input, the calculation will change. It is a real time calculation model.
Spark, a memory calculation model
Tez, a DAG calculation model

Run remote wordcount mapreduce from eclipse in windows8

I have already deployed 4 nodes hadoop cluster in VMWare. I will run the wordcount application in myeclipse. Myeclipse is run in win8, which is my own laptop.
1. Deploy a hadoop work environment in eclipse. (You can refer to “Build hadoop work environment in MyEclipse” in my blog).
2. Import hadoop-mapreduce-client-common-2.3.0.jar and hadoop-mapreduce-client-jobclient-2.3.0.jar to workspace. They exist in HADOOP/share/hadoop/mapreduce directory.

3. You should still have a hadoop directory in windows. And set set the environment vairable of HADOOP_HOME. This is mandantory.

4. Download hadoop-common-2.2.0-bin-master.rar, extract the files and overwrite in hadoop/bin. Here, there are 2 necessary files. They are hadoop.dll and winutils.exe.

6. I copied file1.txt, file2.txt to centmaster:9000/input

5. Modify the WordCount.java, and run it in eclipse.
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

public static class TokenizerMapper extends
Mapper《Object, Text, Text, IntWritable》 {

private final static IntWritable one = new IntWritable(1);
private Text word = new Text();

public void map(Object key, Text value, Context context)
throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}

public static class IntSumReducer extends
Reducer《Text, IntWritable, Text, IntWritable》 {
private IntWritable result = new IntWritable();

public void reduce(Text key, Iterable<IntWritable> values,
Context context) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}

public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = { “hdfs://centmaster:9000/input”,
“hdfs://centmaster:9000/output” };
System.setProperty(“HADOOP_USER_NAME”, “root”);        //this is very important. Set the root name of the NameNode(centmaster).
Job job = new Job(conf, “word count”);
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
6. You can find the result.


Use the cat to check the result:
[root@centmaster bin]# bash hadoop fs -cat /output/part-r-00000
2012-3-1        2
2012-3-2        2
2012-3-3        4
2012-3-4        2
2012-3-5        2
2012-3-6        2
2012-3-7        2
a       4
b       4
c       5
d       3

Some difference from the online articles.
1. Some articles said, you should configure the hadoop/etc/hadoop/hadoop-env.cmd, and set the JAVA_HOME. But in my case, it is not necessary. I tested, that I modified “etc” into “etc2”. Mapreduce runs well.
2. Some articles said, that you should put hadoop.dll in windows/system32. I copied, but I didn’t test if it is necessary to do that. You can try that. But you need to restart your computer.
3. I didn’t configure the %HADOOP_HOME%\bin in PATH environment viariable, it still runs well.

Experience to debug:
Eclipse doesn’t show many error information. The best way is that you find it in logs/hadoop-root-namenode-centmaster.log file in the namenode, and google it.

HQL3

hive.mapred.mode=strict mode
By default, “order by” will transfer to only one reducer. If the data amount is huge, it may exhuast the resource of the only reducer. So, it is suggested to use “limit” keyword to limit the output amount.
When hive.mapred.mode=strict is set, hive will force to use “limit” when “order by” is used. Or it will report error.
When hive.mapred.mode=strict is set, “join..on..” should replace “where” keyword.
When hive.mapred.mode=strict is set, “partition” field should be indicated if the table has a partition.

Sort by
When hive uses more than one reducer to sort the result, using sort by can gurantee the data in each reducer is sorted.

But the whole result is not guranteed to be sorted. The final result is overlapped by different reducer result.

Create a bucket. Bucket is for enhancing the efficiency for sampling.
create table emp(id int, name string, salary int,gender string,level string)
partitioned by(date string)
clustered by(id) sorted by(salary asc) into 3 buckets
row format delimited fields terminated by ‘,’
stored as textfile

HQL2

Where
select * from partition_table where dt=’2014-04-01′ and dep=’R&D’;

Limit. This can’t be used like “limit 1,3”;
select * from partition_table where dt=’2014-04-01′ and dep=’R&D’ limit 5;

“select *”, and the partition fields after where. They don’t require MapReduce, which improve the query efficiency.
For example, salary is not a partition field, so this select requires MapReduce: select * from partition_table where salary>10000;

Case
select name, salary,
case
when salary<5000 then ‘L1-poor’
when salary>=5000 and salary<10000 then ‘L2-middle’
when salary<=10000 and salary<15000 then ‘L3-rich’
when salary>=15000 then ‘L4-super’
else ‘L0’
end as salary_level, gender, level, dt, dep from partition_table limit 4;

Group by, Having They are the same as the usage in sql.
select gender, sum(salary) from partition_table group by gender having count(*)>4;

set hive.map.aggr=true; This command can make the query much efficient, without more cost of memory.

Left Join
select partition_table.name, dep, class from partition_table left join student on partition_table.name=student.name;

Map Join,When a small table joins with another big table, use the small table in mapjoin. This can improve the efficiency.
select /*+ mapjoin(c) */ * from orders o join cities c on (o.city_id = c.id);
For more, read https://www.facebook.com/notes/facebook-engineering/join-optimization-in-apache-hive/470667928919

HQL1

Create an internal table:
create table hive_1_1(id string, name string, gender string)
row format delimited fields terminated by ‘,’
stored as textfile;

Create an out table:
create external table hive_1_1(id string, name string, gender string)
row format delimited fields terminated by ‘,’
stored as textfile;

Load data from local computer:
load data local inpath ‘/home/hadoop/hive-0.13.1/student.txt’ into table hive_1_1;

Load data and overwrite:
load data local inpath ‘/home/hadoop/hive-0.13.1/student.txt’ overwrite into table hive_1_1;

Load data from HDFS. It will move the file from HDFS to hive directory in HDFS.
load data local inpath ‘/user/hive/data/student.txt’ into table hive_1_1;

Delete table:
drop table hive_1_1;    //hive_1_1是内部表,只删除schema与HDFS上的数据
drop table hive_1_2;    //hive_1_2是外部表,只删除schema, HDFS上的数据还在

Create a table with 2 partitions. Partition should be before row. Partition field shouldn’t be showed in table field.
create table partition_table(
name string,
salary float,
gender string,
level string
)
partitioned by (dt string, dep string)
row format delimited fields terminated by ‘,’
stored as textfile;

Load data into partition table:
load data local inpath [overwrite] ‘/home/hadoop/hive-0.13.1/emp.log’ into table partition_table partition(dt=’2014-04-01′,dep=’R&D’);

List the partition table:
show partitions partition_table;

MapReduce process on Yarn


1. Client submits a task to ResourceManager
2. ApplicationsManager in RM will asign this task to a Node Manager. It will launch a MapReduceApplicationsMaster. MapReduceApplicationMaster in this NodeManager will be totally responsible for this task.
3. MRApplicationMaster calculates how many CPU/Memory are needed for this task, and response to ApplicationsManager in RM.
4. Resource Scheduler knows all the resources usage in Cluster. It will tell MRApplicationMaster from which NodeManager you can get the resources you need.
5. MRApplicationMaster send the task to these NodeManagers.
6. Each NodeManger launch MapTask, ReduceTask to calculate the task. Here, we have a concept — Container. Container describes the resources, including CPU/Memory. In order to MapTask, ReduceTask, it requires CPU/Memory to run them. So MapTask, ReduceTask are type of containers.
7. When each NodeManger is running, they report to MRAapplicationMaster its current status.
8. After all NodeManager finish the work, it send the final result to ApplicationsManager