温度排序示例是一个综合性比较强的Hadoop经典案例,除了基础的MapReduce,还有自定义序列化对象、分区、分组、自定义排序等相关知识,对于刚入门的同学来说,理解起来可能会稍有困难。
假设有多年气温数据,如下:
1949-10-01 14:21:02 34 1949-10-02 15:01:01 36 1949-10-03 15:01:01 39 1949-10-04 17:01:01 38 1949-10-05 18:01:01 42 1950-01-01 11:21:02 32 1950-01-02 12:21:02 37 1950-10-03 12:21:02 27 1950-10-01 12:21:02 37 1950-10-02 12:21:02 41 1951-07-01 12:21:02 45 1951-07-02 13:21:02 41 1951-12-01 12:21:02 20 1951-12-02 13:21:02 27 1951-07-03 11:21:03 47 1951-07-04 14:21:03 39 1951-07-05 14:21:03 43 1951-12-03 12:21:02 23
注意:模拟数据存放在weather.txt中,时间和温度之间不是空格,而是tab键隔开的。
现在要求找到每年每月的3个最高温度时刻并按照温度进行降序排序,同时由于数据数量可能会很大,为了提高效率,将每一年的数据分别交给不同的reduce执行,产生不同的文件。
1、将weather.txt上传至HDFS的/input目录下
2、自定义时间温度的封装类MyKey.java
/**
* 自定义时间温度的封装类
*/
public class MyKey implements WritableComparable<MyKey> {
private int year;//年
private int month;//月
private int temp;//温度
public int getYear() {
return year;
}
public void setYear(int year) {
this.year = year;
}
public int getMonth() {
return month;
}
public void setMonth(int month) {
this.month = month;
}
public int getTemp() {
return temp;
}
public void setTemp(int temp) {
this.temp = temp;
}
//序列化:通过write方法写入序列化数据流
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(year);
out.writeInt(month);
out.writeInt(temp);
}
//反序列化:通过readFields方法从序列化的数据流中读出进行赋值
@Override
public void readFields(DataInput in) throws IOException {
//注意读取顺序要和序列化顺序保持一致
year = in.readInt();
month = in.readInt();
temp = in.readInt();
}
//按照字典顺序进行比较,返回值是一个int型
public int compareTo(MyKey mykey) {
return this==mykey?0:-1;
}
}
3、自定义Mapper
/**
* 自定义Mapper
* 对k1,v1进行处理,提取年、月和温度,封装成MyKey
*/
public class MyMapper extends Mapper<LongWritable, Text, MyKey, Text> {
@Override
protected void map(LongWritable k1, Text v1, Context context)
throws IOException, InterruptedException {
//按照tab键切割v1
String[] datas = v1.toString().split("\t");
//获取日期
String date = datas[0];
//获取温度
int temp = Integer.parseInt(datas[1]);
//切割日期,获取年和月
String[] dates = date.split("-");
int year = Integer.parseInt(dates[0]);
int month = Integer.parseInt(dates[1]);
//组装<k2,v2>
MyKey k2 = new MyKey();
k2.setYear(year);
k2.setMonth(month);
k2.setTemp(temp);
Text v2 = v1;
//写出<k2,v2>
context.write(k2, v2);
}
}
4、自定义分区
/**
* 自定义分区:溢写之前对<k2,v2>进行分区
* 要求相同年份的数据在同一个分区,我们这针对1949年、1950年和1951年分成对应
* 3个分区,对应3个reduce task进行处理
*/
public class MyPartitioner extends Partitioner<MyKey, Text> {
@Override
public int getPartition(MyKey k2, Text v2, int numPartitions) {
/*
* 假如1949年、1950年和1951年这3年:
* 1949-1949=0;得到第1个分区
* 1950-1949=1;得到第2个分区
* 1952-1949=2;得到第3个分区
*/
return (k2.getYear()-1949)%numPartitions;
}
}
5、自定义Sort
/**
* 自定义排序比较器:在溢写之前,分区之后对分区内<k2,v2>按照k2或者k2的部分进行排序
* 这里我们按照年、月、温度进行排序,先以年做比较,如果年相等就比较月,年不相等
* 就返回比较结果,如果月相等就以温度倒序排序,月不相等就返回比较结果
* 需要继承WritableComparator,重新compare(WritableComparable a, WritableComparable b)方法
*/
public class MySort extends WritableComparator {
//使用super()调用序列化构造函数
public MySort() {
super(MyKey.class,true);
}
//重写compare
@Override
public int compare(WritableComparable a, WritableComparable b) {
//将WritableComparable强转为MyKey
MyKey mykey1 = (MyKey)a;
MyKey mykey2 = (MyKey)b;
//先比较年
int r1 = Integer.compare(mykey1.getYear(), mykey2.getYear());
//如果年相等,就比较月
if(r1==0) {
int r2 = Integer.compare(mykey1.getMonth(), mykey2.getMonth());
//如果月相等,就对温度进行倒序排序,加-,表示倒序排序
if(r2==0) {
return -Integer.compare(mykey1.getTemp(), mykey2.getTemp());
}
return r2;
}
return r1;
}
}
6、自定义Group
/**
* 自定义分组比较器:在溢写之前,分区排序之后,对同一分区内排序好的<k2,v2>按照k2或k2的部分进行分组
* 相同key的数据对应的value制作成一个iterable
* 这里我们按照年月进行分组,同年同月分一组,先以年做比较,如果年不相等返回比较结果,
* 如果年相等,返回月份的比较值
*/
public class MyGroup extends WritableComparator {
//使用super()调用序列化构造函数
public MyGroup() {
super(MyKey.class,true);
}
//重写compare
@Override
public int compare(WritableComparable a, WritableComparable b) {
//将WritableComparable强转为MyKey
MyKey mykey1 = (MyKey)a;
MyKey mykey2 = (MyKey)b;
//先以年做比较
int r1 = Integer.compare(mykey1.getYear(), mykey2.getYear());
//如果年相等,返回月份的比较值
if(r1==0) {
return Integer.compare(mykey1.getMonth(), mykey2.getMonth());
}
return r1;
}
}
7、自定义Reducer
public class MyReducer extends Reducer<MyKey, Text, NullWritable, Text> {
@Override
protected void reduce(MyKey k2, Iterable<Text> v2s, Context context)
throws IOException, InterruptedException {
System.out.println(k2.getYear()+"-"+k2.getMonth()+"-"+k2.getTemp());
//遍历v2s,取前3个纪录
int num = 0;//计数器
for(Text v2:v2s) {
if(++num>3) {//超过3条,结束循环
break;
}else {
//k3为空
Text v3 = v2;
//写出<k3,v3>
context.write(NullWritable.get(), v3);
}
}
}
}
8、自定义Job
/**
* 自定义温度排序处理任务
*/
public class MyRunJob {
public static void main(String[] args) {
try {
if(args.length!=2) {
System.exit(1);
}
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(MyRunJob.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//设置自定义分区
job.setPartitionerClass(MyPartitioner.class);
//设置自定义排序
job.setSortComparatorClass(MySort.class);
//设置自定义分组
job.setGroupingComparatorClass(MyGroup.class);
//设置Reducer数量
job.setNumReduceTasks(3);
//设置Mapper输出类型
job.setMapOutputKeyClass(MyKey.class);
job.setMapOutputValueClass(Text.class);
//设置Reducer输出类型
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
//提交job
job.waitForCompletion(true);
} catch (IOException | ClassNotFoundException | InterruptedException e) {
e.printStackTrace();
}
}
}
9、生成jar上传运行
hadoop jar weather-0.0.1-SNAPSHOT.jar com.pzy.weather.MyRunJob /input/weather.txt /outputweather






