一文彻底了解Hadoop的来龙去脉( 五 )

然后我们创建一个WordCount类 。
在这个类里,首先我们要创建一个Map方法,需要继承MApper类:
public static class WordCountMap extends Mapper<LongWritable, Text, Text, IntWritable> { private final IntWritable one = new IntWritable(1); private Text word = new Text();public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); StringTokenizer token = new StringTokenizer(line); while (token.hasMoreTokens()) { word.set(token.nextToken()); context.write(word, one); } }}Mapper<LongWritable, Text, Text, IntWritable>是什么意思呢?
前面两个类参数是输入,后面两个是输出 。
也就是WordCOuntMap方法接收LongWritable,Text的参数,返回<Text,IntWriatable>键值对 。
需要重写map方法,可以看到Context对象即为返回结果,内部其实是<Text,IntWriatable>键值对 。
这里需要注意的是,value的值,value默认是一行数据,你文件中有多少行,map函数就会被调用多少次 。
这我们就看懂了吧,首先拿到一行的数据,使用StringTokenizer根据空格分割字符串,得到token 。遍历token并写入context中返回即可 。
然后我们需要编写reduce方法:同样的,reduce方法继承reduce类 。
public static class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } context.write(key, new IntWritable(sum)); }}wordCountReduce方法接收<Text, IntWritable>键值对,将键值对组合起来,结果写入另外一个键值对中,返回即可 。
其中最重要是重写reduce方法,同样的context也是返回的结果 。
这里需要注意的是,reduce方法是什么时候调用的呢?是在所有mapTask都被执行完成之后,reduceTask启动了才调用 。
所有reduce方法中接收到的是所有map返回的参数 。所以我们简单的求和写入context中就可以了 。
最后我们编写main方法作为入口,调用两个函数 。
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf);
job.setJarByClass(WordCount.class);
job.setJobName("wordcount");
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setMapperClass(WordCountMap.class);
job.setReducerClass(WordCountReduce.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
这里我们主要是告诉JobTracker,告诉他去调用什么就可以了 。
类都编写好了之后`,我们需要的是jar包,所以我们将程序打包为jar包 。
拿到jar包之后,我们需要将jar包作为作业提交给Hadoop执行 。怎么做呢?
hadoop jar WordCount.jar WordCount input_wordcount output_wordcount
hadoop jar WordCount.jar WordCount这里提交jar包,并且告诉主类在哪 。
后面两个都是我们自定义的参数了 。会在main中获取到,即输入参数为input_wordcount 。输出参数为output_wordcount
执行完成之后可以看到 。
hdfs dfs -ls
Found 2 items
drwxr-xr-x - haoye supergroup 0 2017-05-06 20:34 input_wordcount
drwxr-xr-x - haoye supergroup 0 2017-05-06 20:40 output_wordcount
hdfs dfs -ls output_wordcount
Found 2 items
-rw-r--r-- 3 haoye supergroup 0 2017-05-06 20:40 output_wordcount/_SUCCESS
-rw-r--r-- 3 haoye supergroup 83 2017-05-06 20:40 output_wordcount/part-r-00000
其中part-r-00000为结果文件 。
我们可以查看它的内容
hdfs dfs -cat output_wordcount/part-r-00000
api 1
file 3
free 2
hadoop 7
hello 3
home 1
java 2
new 2
school 1
system 1
world 2
得到结果了吧 。
对于hadoop来说,执行任务需要操作HDFS,需要job对应的jar包 。而jar包中需要编写mapTask和ReduceTask对应的方法 。交给jobTracker执行就可以了 。十分的方便 。




推荐阅读