package com.fjy.hadoop.mapreduce; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * 使用MapReduce开发Partitioner组件应用 * @author F嘉阳 * @date 2018-04-20 */ public class WordCountPartitionerApp { /** * Map:读取输入文件 * Text:类似字符串 */ public static class MyMapper extends Mapper { LongWritable one = new LongWritable(1); /** * @param key 偏移量 * @param value 每行的字符串 * @param context 上下文 * @throws IOException * @throws InterruptedException */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { /*super.map(key, value, context);*/ //接收到每一行数据 String line = value.toString(); String[] words = line.split(" ");//以空格分隔 //每一个空格是一个手机品牌,另一个是销售数量 context.write(new Text(words[0]), new LongWritable(Long.parseLong(words[1]))); } } /** * Reduce 归并操作 * LongWritable:文本出现的次数/求和后的次数 */ public static class MyReducer extends Reducer { /** * @param key * @param values 相同偏移量的集合 * @param context * @throws IOException * @throws InterruptedException */ @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { //super.reduce(key, values, context); long sum = 0; for (LongWritable value : values) { //求key出现的次数和总和 sum += value.get(); } //统计结果的输出 context.write(key, new LongWritable(sum)); } } /** * Partitioner处理类 */ public static class MyPartitioner extends Partitioner{ @Override public int getPartition(Text key, LongWritable value, int i) { if ("xiaomi".equals(key.toString())) { return 0;//若为xiaomi则交由0 ReduceTask处理 } if ("huawei".equals(key.toString())) { return 1;//若为huawei则交由1 ReduceTask处理 } if ("iphone".equals(key.toString())) { return 2;//若为iphone则交由2 ReduceTask处理 } return 3;//若为其他,则交由3 ReduceTask处理 } } /** * 定义Driver,封装MapReduce作业的所有信息 */ public static void main(String[] args) throws Exception { //创建Configuration Configuration configuration = new Configuration(); //清理已存在的输出目录 Path outputPath = new Path(args[1]); FileSystem fileSystem = FileSystem.get(configuration); if (fileSystem.exists(outputPath)){ fileSystem.delete(outputPath); System.out.println("The exist files had been deleted!"); } //创建作业 Job job = Job.getInstance(configuration, "wordcount"); //设置作业的主类 job.setJarByClass(WordCountPartitionerApp.class); //作业处理的输入路径 FileInputFormat.setInputPaths(job, new Path(args[0])); //设置map相关参数 job.setMapperClass(MyMapper.class); //设置map输出的key的类型 job.setMapOutputKeyClass(Text.class); //设置map输出的value的类型 job.setMapOutputValueClass(LongWritable.class); //设置Reduce相关参数 job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //设置job的Partition job.setPartitionerClass(MyPartitioner.class); //设置四个reducer,每个分区一个,否则Partitioner配置不生效 job.setNumReduceTasks(4); //设置作业处理输出结果的输出路径 FileOutputFormat.setOutputPath(job, new Path(args[1])); //作业提交 job.waitForCompletion(true); //作业完成后退出 System.exit(job.waitForCompletion(true) ? 0 : 1); } }