139 lines
4.2 KiB
Java
139 lines
4.2 KiB
Java
package com.fjy.hadoop.mapreduce;
|
||
|
||
import org.apache.hadoop.conf.Configuration;
|
||
import org.apache.hadoop.fs.FileSystem;
|
||
import org.apache.hadoop.fs.Path;
|
||
import org.apache.hadoop.io.LongWritable;
|
||
import org.apache.hadoop.io.Text;
|
||
import org.apache.hadoop.mapreduce.Job;
|
||
import org.apache.hadoop.mapreduce.Mapper;
|
||
import org.apache.hadoop.mapreduce.Reducer;
|
||
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
|
||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||
|
||
import java.io.IOException;
|
||
|
||
/**
|
||
* 使用MapReduce开发WordCount应用
|
||
* @author F嘉阳
|
||
* @date 2018-04-17
|
||
*/
|
||
public class WordCountApp {
|
||
/**
|
||
* Map:读取输入文件
|
||
* Text:类似字符串
|
||
*/
|
||
public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {
|
||
|
||
LongWritable one = new LongWritable(1);
|
||
|
||
/**
|
||
* @param key 偏移量
|
||
* @param value 每行的字符串
|
||
* @param context 上下文
|
||
* @throws IOException
|
||
* @throws InterruptedException
|
||
*/
|
||
@Override
|
||
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
|
||
/*super.map(key, value, context);*/
|
||
|
||
//接收到每一行数据
|
||
String line = value.toString();
|
||
|
||
//按照指定分隔符进行拆分
|
||
/*line.split("\t");//以Tab分隔*/
|
||
String[] words = line.split(" ");//以空格分隔
|
||
|
||
for (String word : words) {
|
||
//通过上下文把map的处理结果输出
|
||
context.write(new Text(word), one);
|
||
}
|
||
}
|
||
}
|
||
|
||
/**
|
||
* Reduce 归并操作
|
||
* LongWritable:文本出现的次数/求和后的次数
|
||
*/
|
||
public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
|
||
/**
|
||
* @param key
|
||
* @param values 相同偏移量的集合
|
||
* @param context
|
||
* @throws IOException
|
||
* @throws InterruptedException
|
||
*/
|
||
@Override
|
||
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
|
||
//super.reduce(key, values, context);
|
||
|
||
long sum = 0;
|
||
|
||
for (LongWritable value : values) {
|
||
//求key出现的次数和总和
|
||
sum += value.get();
|
||
}
|
||
//统计结果的输出
|
||
context.write(key, new LongWritable(sum));
|
||
|
||
}
|
||
}
|
||
|
||
/**
|
||
* 定义Driver,封装MapReduce作业的所有信息
|
||
*/
|
||
public static void main(String[] args) throws Exception {
|
||
|
||
//创建Configuration
|
||
Configuration configuration = new Configuration();
|
||
|
||
//清理已存在的输出目录
|
||
Path outputPath = new Path(args[1]);
|
||
FileSystem fileSystem = FileSystem.get(configuration);
|
||
if (fileSystem.exists(outputPath)){
|
||
fileSystem.delete(outputPath);
|
||
System.out.println("The exist files had been deleted!");
|
||
}
|
||
|
||
//创建作业
|
||
Job job = Job.getInstance(configuration, "wordcount");
|
||
|
||
//设置作业的主类
|
||
job.setJarByClass(WordCountApp.class);
|
||
|
||
//作业处理的输入路径
|
||
FileInputFormat.setInputPaths(job, new Path(args[0]));
|
||
|
||
//设置map相关参数
|
||
job.setMapperClass(MyMapper.class);
|
||
|
||
//设置map输出的key的类型
|
||
job.setMapOutputKeyClass(Text.class);
|
||
|
||
//设置map输出的value的类型
|
||
job.setMapOutputValueClass(LongWritable.class);
|
||
|
||
//设置Reduce相关参数
|
||
job.setReducerClass(MyReducer.class);
|
||
job.setOutputKeyClass(Text.class);
|
||
job.setOutputValueClass(LongWritable.class);
|
||
|
||
//通过job设置combiner处理类,逻辑上与reduce一致,注意,如果要计算平均数等不能使用Combiner!
|
||
job.setCombinerClass(MyReducer.class);
|
||
|
||
|
||
//设置作业处理输出结果的输出路径
|
||
FileOutputFormat.setOutputPath(job, new Path(args[1]));
|
||
|
||
//作业提交
|
||
job.waitForCompletion(true);
|
||
|
||
//作业完成后退出
|
||
System.exit(job.waitForCompletion(true) ? 0 : 1);
|
||
|
||
}
|
||
|
||
|
||
}
|