Hadoop经典综合性案例—温度排序示例

大数据技术 潘老师 4年前 (2020-11-09) 2063 ℃ (0) 扫码查看

温度排序示例是一个综合性比较强的Hadoop经典案例,除了基础的MapReduce,还有自定义序列化对象、分区、分组、自定义排序等相关知识,对于刚入门的同学来说,理解起来可能会稍有困难。

假设有多年气温数据,如下:

1949-10-01 14:21:02    34
1949-10-02 15:01:01    36
1949-10-03 15:01:01    39
1949-10-04 17:01:01    38
1949-10-05 18:01:01    42
1950-01-01 11:21:02    32
1950-01-02 12:21:02    37
1950-10-03 12:21:02    27
1950-10-01 12:21:02    37
1950-10-02 12:21:02    41
1951-07-01 12:21:02    45
1951-07-02 13:21:02    41
1951-12-01 12:21:02    20
1951-12-02 13:21:02    27
1951-07-03 11:21:03    47
1951-07-04 14:21:03    39
1951-07-05 14:21:03    43
1951-12-03 12:21:02    23

注意:模拟数据存放在weather.txt中,时间和温度之间不是空格,而是tab键隔开的。

现在要求找到每年每月的3个最高温度时刻并按照温度进行降序排序,同时由于数据数量可能会很大,为了提高效率,将每一年的数据分别交给不同的reduce执行,产生不同的文件。

1、将weather.txt上传至HDFS的/input目录下

2、自定义时间温度的封装类MyKey.java

/**
 * 自定义时间温度的封装类
 */
public class MyKey implements WritableComparable<MyKey> {

    private int year;//年
    private int month;//月
    private int temp;//温度
    
    public int getYear() {
        return year;
    }

    public void setYear(int year) {
        this.year = year;
    }

    public int getMonth() {
        return month;
    }

    public void setMonth(int month) {
        this.month = month;
    }

    public int getTemp() {
        return temp;
    }

    public void setTemp(int temp) {
        this.temp = temp;
    }

    //序列化:通过write方法写入序列化数据流
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeInt(year);
        out.writeInt(month);
        out.writeInt(temp);
    }

    //反序列化:通过readFields方法从序列化的数据流中读出进行赋值
    @Override
    public void readFields(DataInput in) throws IOException {
        //注意读取顺序要和序列化顺序保持一致
        year = in.readInt();
        month = in.readInt();
        temp = in.readInt();
    }

    //按照字典顺序进行比较,返回值是一个int型
    public int compareTo(MyKey mykey) {
        return this==mykey?0:-1;
    }
}

3、自定义Mapper

/**
 * 自定义Mapper
 * 对k1,v1进行处理,提取年、月和温度,封装成MyKey
 */
public class MyMapper extends Mapper<LongWritable, Text, MyKey, Text> {
    @Override
    protected void map(LongWritable k1, Text v1, Context context)
            throws IOException, InterruptedException {
        //按照tab键切割v1
        String[] datas = v1.toString().split("\t");
        //获取日期
        String date = datas[0];
        //获取温度
        int temp = Integer.parseInt(datas[1]);
        //切割日期,获取年和月
        String[] dates = date.split("-");
        int year = Integer.parseInt(dates[0]);
        int month = Integer.parseInt(dates[1]);
        //组装<k2,v2>
        MyKey k2 = new MyKey();
        k2.setYear(year);
        k2.setMonth(month);
        k2.setTemp(temp);
        Text v2 = v1;
        //写出<k2,v2>
        context.write(k2, v2);
    }
}

4、自定义分区

/**
 * 自定义分区:溢写之前对<k2,v2>进行分区
 * 要求相同年份的数据在同一个分区,我们这针对1949年、1950年和1951年分成对应
 * 3个分区,对应3个reduce task进行处理
 */
public class MyPartitioner extends Partitioner<MyKey, Text> {

    @Override
    public int getPartition(MyKey k2, Text v2, int numPartitions) {
        /*
         * 假如1949年、1950年和1951年这3年:
         * 1949-1949=0;得到第1个分区
         * 1950-1949=1;得到第2个分区
         * 1952-1949=2;得到第3个分区
         */
        return (k2.getYear()-1949)%numPartitions;
    }

}

5、自定义Sort

/**
 * 自定义排序比较器:在溢写之前,分区之后对分区内<k2,v2>按照k2或者k2的部分进行排序
 * 这里我们按照年、月、温度进行排序,先以年做比较,如果年相等就比较月,年不相等
 * 就返回比较结果,如果月相等就以温度倒序排序,月不相等就返回比较结果
 * 需要继承WritableComparator,重新compare(WritableComparable a, WritableComparable b)方法
 */
public class MySort extends WritableComparator {
    //使用super()调用序列化构造函数
    public MySort() {
        super(MyKey.class,true);
    }
    //重写compare
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        //将WritableComparable强转为MyKey
        MyKey mykey1 = (MyKey)a;
        MyKey mykey2 = (MyKey)b;
        //先比较年
        int r1 = Integer.compare(mykey1.getYear(), mykey2.getYear());
        //如果年相等,就比较月
        if(r1==0) {
            int r2 = Integer.compare(mykey1.getMonth(), mykey2.getMonth());
            //如果月相等,就对温度进行倒序排序,加-,表示倒序排序
            if(r2==0) {
                return -Integer.compare(mykey1.getTemp(), mykey2.getTemp());
            }
            return r2;
        }
        return r1;
    }
}

6、自定义Group

/**
 * 自定义分组比较器:在溢写之前,分区排序之后,对同一分区内排序好的<k2,v2>按照k2或k2的部分进行分组
 * 相同key的数据对应的value制作成一个iterable
 * 这里我们按照年月进行分组,同年同月分一组,先以年做比较,如果年不相等返回比较结果,
 * 如果年相等,返回月份的比较值
 */
public class MyGroup extends WritableComparator {
    //使用super()调用序列化构造函数
    public MyGroup() {
        super(MyKey.class,true);
    }
    //重写compare
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        //将WritableComparable强转为MyKey
        MyKey mykey1 = (MyKey)a;
        MyKey mykey2 = (MyKey)b;
        //先以年做比较
        int r1 = Integer.compare(mykey1.getYear(), mykey2.getYear());
        //如果年相等,返回月份的比较值
        if(r1==0) {
            return Integer.compare(mykey1.getMonth(), mykey2.getMonth());
        }
        return r1;
    }
    
}

7、自定义Reducer

public class MyReducer extends Reducer<MyKey, Text, NullWritable, Text> {

    @Override
    protected void reduce(MyKey k2, Iterable<Text> v2s, Context context)
            throws IOException, InterruptedException {
        System.out.println(k2.getYear()+"-"+k2.getMonth()+"-"+k2.getTemp());
        //遍历v2s,取前3个纪录
        int num = 0;//计数器
        for(Text v2:v2s) {
            if(++num>3) {//超过3条,结束循环
                break;
            }else {
                //k3为空
                Text v3 = v2;
                //写出<k3,v3>
                context.write(NullWritable.get(), v3);
            }
        }
    }
}

8、自定义Job

/**
 * 自定义温度排序处理任务
 */
public class MyRunJob {
    public static void main(String[] args) {
        try {
            if(args.length!=2) {
                System.exit(1);
            }
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            job.setJarByClass(MyRunJob.class);
            job.setMapperClass(MyMapper.class);
            job.setReducerClass(MyReducer.class);
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            //设置自定义分区
            job.setPartitionerClass(MyPartitioner.class);
            //设置自定义排序
            job.setSortComparatorClass(MySort.class);
            //设置自定义分组
            job.setGroupingComparatorClass(MyGroup.class);
            //设置Reducer数量
            job.setNumReduceTasks(3);
            //设置Mapper输出类型
            job.setMapOutputKeyClass(MyKey.class);
            job.setMapOutputValueClass(Text.class);
            //设置Reducer输出类型
            job.setOutputKeyClass(NullWritable.class);
            job.setOutputValueClass(Text.class);
            //提交job
            job.waitForCompletion(true);
        } catch (IOException | ClassNotFoundException | InterruptedException e) {
            e.printStackTrace();
        }
        
    }
}

9、生成jar上传运行

hadoop jar weather-0.0.1-SNAPSHOT.jar com.pzy.weather.MyRunJob /input/weather.txt /outputweather

10、查看结果
Hadoop经典综合性案例—温度排序示例


版权声明:本站文章,如无说明,均为本站原创,转载请注明文章来源。如有侵权,请联系博主删除。
本文链接:https://www.panziye.com/java/bigdata/1586.html
喜欢 (7)
请潘老师喝杯Coffee吧!】
分享 (0)
用户头像
发表我的评论
取消评论
表情 贴图 签到 代码

Hi,您需要填写昵称和邮箱!

  • 昵称【必填】
  • 邮箱【必填】
  • 网址【可选】