温度排序示例是一个综合性比较强的Hadoop经典案例,除了基础的MapReduce,还有自定义序列化对象、分区、分组、自定义排序等相关知识,对于刚入门的同学来说,理解起来可能会稍有困难。
假设有多年气温数据,如下:
1949-10-01 14:21:02 34 1949-10-02 15:01:01 36 1949-10-03 15:01:01 39 1949-10-04 17:01:01 38 1949-10-05 18:01:01 42 1950-01-01 11:21:02 32 1950-01-02 12:21:02 37 1950-10-03 12:21:02 27 1950-10-01 12:21:02 37 1950-10-02 12:21:02 41 1951-07-01 12:21:02 45 1951-07-02 13:21:02 41 1951-12-01 12:21:02 20 1951-12-02 13:21:02 27 1951-07-03 11:21:03 47 1951-07-04 14:21:03 39 1951-07-05 14:21:03 43 1951-12-03 12:21:02 23
注意:模拟数据存放在weather.txt
中,时间和温度之间不是空格,而是tab键隔开的。
现在要求找到每年每月的3个最高温度时刻并按照温度进行降序排序,同时由于数据数量可能会很大,为了提高效率,将每一年的数据分别交给不同的reduce执行,产生不同的文件。
1、将weather.txt
上传至HDFS的/input
目录下
2、自定义时间温度的封装类MyKey.java
/** * 自定义时间温度的封装类 */ public class MyKey implements WritableComparable<MyKey> { private int year;//年 private int month;//月 private int temp;//温度 public int getYear() { return year; } public void setYear(int year) { this.year = year; } public int getMonth() { return month; } public void setMonth(int month) { this.month = month; } public int getTemp() { return temp; } public void setTemp(int temp) { this.temp = temp; } //序列化:通过write方法写入序列化数据流 @Override public void write(DataOutput out) throws IOException { out.writeInt(year); out.writeInt(month); out.writeInt(temp); } //反序列化:通过readFields方法从序列化的数据流中读出进行赋值 @Override public void readFields(DataInput in) throws IOException { //注意读取顺序要和序列化顺序保持一致 year = in.readInt(); month = in.readInt(); temp = in.readInt(); } //按照字典顺序进行比较,返回值是一个int型 public int compareTo(MyKey mykey) { return this==mykey?0:-1; } }
3、自定义Mapper
/** * 自定义Mapper * 对k1,v1进行处理,提取年、月和温度,封装成MyKey */ public class MyMapper extends Mapper<LongWritable, Text, MyKey, Text> { @Override protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { //按照tab键切割v1 String[] datas = v1.toString().split("\t"); //获取日期 String date = datas[0]; //获取温度 int temp = Integer.parseInt(datas[1]); //切割日期,获取年和月 String[] dates = date.split("-"); int year = Integer.parseInt(dates[0]); int month = Integer.parseInt(dates[1]); //组装<k2,v2> MyKey k2 = new MyKey(); k2.setYear(year); k2.setMonth(month); k2.setTemp(temp); Text v2 = v1; //写出<k2,v2> context.write(k2, v2); } }
4、自定义分区
/** * 自定义分区:溢写之前对<k2,v2>进行分区 * 要求相同年份的数据在同一个分区,我们这针对1949年、1950年和1951年分成对应 * 3个分区,对应3个reduce task进行处理 */ public class MyPartitioner extends Partitioner<MyKey, Text> { @Override public int getPartition(MyKey k2, Text v2, int numPartitions) { /* * 假如1949年、1950年和1951年这3年: * 1949-1949=0;得到第1个分区 * 1950-1949=1;得到第2个分区 * 1952-1949=2;得到第3个分区 */ return (k2.getYear()-1949)%numPartitions; } }
5、自定义Sort
/** * 自定义排序比较器:在溢写之前,分区之后对分区内<k2,v2>按照k2或者k2的部分进行排序 * 这里我们按照年、月、温度进行排序,先以年做比较,如果年相等就比较月,年不相等 * 就返回比较结果,如果月相等就以温度倒序排序,月不相等就返回比较结果 * 需要继承WritableComparator,重新compare(WritableComparable a, WritableComparable b)方法 */ public class MySort extends WritableComparator { //使用super()调用序列化构造函数 public MySort() { super(MyKey.class,true); } //重写compare @Override public int compare(WritableComparable a, WritableComparable b) { //将WritableComparable强转为MyKey MyKey mykey1 = (MyKey)a; MyKey mykey2 = (MyKey)b; //先比较年 int r1 = Integer.compare(mykey1.getYear(), mykey2.getYear()); //如果年相等,就比较月 if(r1==0) { int r2 = Integer.compare(mykey1.getMonth(), mykey2.getMonth()); //如果月相等,就对温度进行倒序排序,加-,表示倒序排序 if(r2==0) { return -Integer.compare(mykey1.getTemp(), mykey2.getTemp()); } return r2; } return r1; } }
6、自定义Group
/** * 自定义分组比较器:在溢写之前,分区排序之后,对同一分区内排序好的<k2,v2>按照k2或k2的部分进行分组 * 相同key的数据对应的value制作成一个iterable * 这里我们按照年月进行分组,同年同月分一组,先以年做比较,如果年不相等返回比较结果, * 如果年相等,返回月份的比较值 */ public class MyGroup extends WritableComparator { //使用super()调用序列化构造函数 public MyGroup() { super(MyKey.class,true); } //重写compare @Override public int compare(WritableComparable a, WritableComparable b) { //将WritableComparable强转为MyKey MyKey mykey1 = (MyKey)a; MyKey mykey2 = (MyKey)b; //先以年做比较 int r1 = Integer.compare(mykey1.getYear(), mykey2.getYear()); //如果年相等,返回月份的比较值 if(r1==0) { return Integer.compare(mykey1.getMonth(), mykey2.getMonth()); } return r1; } }
7、自定义Reducer
public class MyReducer extends Reducer<MyKey, Text, NullWritable, Text> { @Override protected void reduce(MyKey k2, Iterable<Text> v2s, Context context) throws IOException, InterruptedException { System.out.println(k2.getYear()+"-"+k2.getMonth()+"-"+k2.getTemp()); //遍历v2s,取前3个纪录 int num = 0;//计数器 for(Text v2:v2s) { if(++num>3) {//超过3条,结束循环 break; }else { //k3为空 Text v3 = v2; //写出<k3,v3> context.write(NullWritable.get(), v3); } } } }
8、自定义Job
/** * 自定义温度排序处理任务 */ public class MyRunJob { public static void main(String[] args) { try { if(args.length!=2) { System.exit(1); } Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(MyRunJob.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //设置自定义分区 job.setPartitionerClass(MyPartitioner.class); //设置自定义排序 job.setSortComparatorClass(MySort.class); //设置自定义分组 job.setGroupingComparatorClass(MyGroup.class); //设置Reducer数量 job.setNumReduceTasks(3); //设置Mapper输出类型 job.setMapOutputKeyClass(MyKey.class); job.setMapOutputValueClass(Text.class); //设置Reducer输出类型 job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class); //提交job job.waitForCompletion(true); } catch (IOException | ClassNotFoundException | InterruptedException e) { e.printStackTrace(); } } }
9、生成jar上传运行
hadoop jar weather-0.0.1-SNAPSHOT.jar com.pzy.weather.MyRunJob /input/weather.txt /outputweather