Hadoop实战案例—直播数据统计与分析

Hadoop 潘老师 3周前 (11-10) 48 ℃ (0) 扫码查看
文章目录[隐藏]

随着直播行业的崛起,大型互联网直播公司每日都会产生海量的直播数据,为了更好地服务主播与用户,提高直播质量与用户粘性,往往会对大量的数据进行分析与统计,从中挖掘商业价值,我们将通过一个实战案例,来使用Hadoop技术来实现对直播数据的统计与分析。

我这里是简化过后的数据格式,大致如下:

{"id":"1580089010000","uid":"12001002543","nickname":"jack2543","gold":561,"watchnumpv":1697,"follower":1509,"gifter":2920,"watchnumuv":5410,"length":3542,"exp":183}
{"id":"1580089010001","uid":"12001001853","nickname":"jack1853","gold":660,"watchnumpv":8160,"follower":1781,"gifter":551,"watchnumuv":4798,"length":189,"exp":89}
{"id":"1580089010002","uid":"12001003786","nickname":"jack3786","gold":14,"watchnumpv":577,"follower":1759,"gifter":2643,"watchnumuv":8910,"length":1203,"exp":54}

数据格式为json格式,大家可以自己写程序批量随机生成相关数据,当然,这里我也为大家提供模拟数据文件的下载,解压后的文件名为video_20200504.log

文件下载

  文件名称:直播模拟数据  文件大小:206KB
  下载声明:本站资源仅供学习和研究使用,不得用于商业用途。如资源不慎侵犯你的版权请联系博主处理! 本站资源全部采用7z压缩,建议使用360压缩解压,解压密码为www.panziye.com
  下载地址:免费下载

运营部门门需要针对主播每天的开播数据进行分析,统计出来每天受欢迎程度比较高的一些主播,进而对这些主播分发更多流量,挖掘最大价值,我们这里主要做两个具体需求:
1、对数据中的金币数量,总观看pv,粉丝关注数量,视频总开播时长等指标进行统计
2、统计每天开播时长最长的前10名主播及对应的开播时长

具体实现流程我们分为4个大步骤,分别如下:

  • 实现数据清洗
  • 实现指标统计
  • 实现TOP N统计
  • 实现定时任务脚本

1、实现数据清洗

由于原始数据是通过日志方式进行记录的,在使用日志采集工具采集到HDFS之后,还需要对数据进行清洗过滤,丢弃缺失字段的数据,针对异常字段值进行标准化处理。

数据清洗作业:
需求:
1.从原始数据(JSON格式)中过滤出来需要的字段
主播id(uid)、金币数量(gold)、总观看PV(watchnumpv)、粉丝关注数量(follower)、视频开播总时长(length)
2.针对核心字段进行异常值判断
金币数量、总观看PV、粉丝关注数量、视频总开播时长
以上四个字段正常情况下都不应该是负值,也不应该缺失,如果这些字段值为负值,则认为是异常数据,直接丢弃,如果这些字段个别缺失,则认为该字段的值为0
分析:
1:由于原始数据是json格式的,所以可以使用fastison对原始数据进行解析,获取指定字段的内容
2:然后对获取到的数据进行判断,只保留满足条件的数据即可
3:由于不需要聚合过程, 只是一个简单的过滤操作,所以只需要map阶段即可,reduce阶段就不需要了
4:其中map阶段的k1, v1的数据类型是固定的:
k2, v2的数据类型为: k2为空, v2存储核心字段,多个字段中间用\t分割

1)创建Maven项目,pom.xml除了之前的配置,还需要新增fastjson依赖

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.68</version>
</dependency>

2)创建清洗Mapper

public class DataCleanMapper extends Mapper<LongWritable, Text, NullWritable, Text> {
    @Override
    protected void map(LongWritable k1, Text v1, Context context)
            throws IOException, InterruptedException {
        //1.将v1转为json对象
        JSONObject json = JSON.parseObject(v1.toString());
        
        //2.提取v1中的核心字段-使用getIntValue而不使用getIntger,因为getIntValue获取不到会返回0
        String uid = json.getString("uid");//主播id(uid)
        int gold = json.getIntValue("gold");//金币数量(gold)
        int watchnumpv = json.getIntValue("watchnumpv");//总观看PV(watchnumpv)
        int follower = json.getIntValue("follower");//粉丝关注数量(follower)
        int length = json.getIntValue("length");//视频开播总时长(length)
        //过滤异常数据
        if(uid!= null && !"".equals(uid) && gold>=0 
                && watchnumpv>=0 && follower>=0 && length>=0) {
            //组装k2,v2
            NullWritable k2 = NullWritable.get();
            Text v2 = new Text(uid+"\t"+gold+"\t"+watchnumpv+"\t"+follower+"\t"+length);
            //将k2,v2写出去
            context.write(k2, v2);
        }
    }
}

3)创建清洗JOB

public class DataCleanJob {
    public static void main(String[] args) {
        try {
            if(args.length!=2) {
                System.exit(1);
            }
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            job.setJarByClass(DataCleanJob.class);
            job.setMapperClass(DataCleanMapper.class);
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            //设置reduce数量为0
            job.setNumReduceTasks(0);
            //设置Mapper输出类型
            job.setMapOutputKeyClass(NullWritable.class);
            job.setMapOutputValueClass(Text.class);
            //提交job
            job.waitForCompletion(true);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

4)打包提交运行测试
注意上传的jar包是带有依赖的jar,因为我们要使用fastjson,video log日志文件上传到/data/videoinfo/日志日期目录递归创建/data/videoinfo/日志日期目录语句:

hdfs dfs -mkdir -p /data/videoinfo/20200504

运行

hadoop jar video-0.0.1-SNAPSHOT-jar-with-dependencies.jar com.pzy.clean.DataCleanJob /data/videoinfo/20200504/video_20200504.log /res0504

查看清洗后的总纪录数:

hdfs dfs -cat /res0504/* | wc -l

Hadoop实战案例—直播数据统计与分析
查看原始纪录数:
Hadoop实战案例—直播数据统计与分析

2、指标统计实现

对数据中的金币数量,总观看pv,粉丝关注数量,视频总开播时长等指标进行统计。
1)自定义bean,封装金币数量,总观看pv,粉丝关注数量,视频总开播时长等统计指标

public class VideoInfoWritable implements Writable {

    private long gold;
    private long watchnumpv;
    private long follower;
    private long length;
    
    public long getGold() {
        return gold;
    }

    public void setGold(long gold) {
        this.gold = gold;
    }

    public long getWatchnumpv() {
        return watchnumpv;
    }

    public void setWatchnumpv(long watchnumpv) {
        this.watchnumpv = watchnumpv;
    }

    public long getFollower() {
        return follower;
    }

    public void setFollower(long follower) {
        this.follower = follower;
    }

    public long getLength() {
        return length;
    }

    public void setLength(long length) {
        this.length = length;
    }

    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(gold);
        out.writeLong(watchnumpv);
        out.writeLong(follower);
        out.writeLong(length);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        gold = in.readLong();
        watchnumpv = in.readLong();
        follower = in.readLong();
        length = in.readLong();
    }

    @Override
    public String toString() {
        return gold+"\t"+watchnumpv+"\t"+follower+"\t"+length;
    }
}

2)自定义Mapper

/**
 * 统计金币数量,总观看pv,粉丝关注数量,视频总开播时长等指标
 * k1,v1来自清洗后的文件
 */
public class VideoInfoMapper extends Mapper<LongWritable, Text, Text, VideoInfoWritable> {

    @Override
    protected void map(LongWritable k1, Text v1, Context context)
            throws IOException, InterruptedException {
        String[] line = v1.toString().split("\t");
        //获取指标
        String uid = line[0];
        long gold = Long.parseLong(line[1]);
        long watchnumpv = Long.parseLong(line[2]);
        long follower = Long.parseLong(line[3]);
        long length = Long.parseLong(line[4]);
        //组装k2,v2
        Text k2 = new Text(uid);
        VideoInfoWritable v2 = new VideoInfoWritable();
        v2.setGold(gold);
        v2.setWatchnumpv(watchnumpv);
        v2.setFollower(follower);
        v2.setLength(length);
        //将k2,v2写出去
        context.write(k2, v2);
    }
}

3)自定义Reducer

public class VideoInfoReducer extends Reducer<Text, VideoInfoWritable, Text, VideoInfoWritable>{

    @Override
    protected void reduce(Text k2, Iterable<VideoInfoWritable> v2s,Context context)
            throws IOException, InterruptedException {
        //遍历v2s,对对应的指标求和
        long goldsum = 0;
        long watchnumpvsum = 0;
        long followersum = 0;
        long lengthsum = 0;
        for(VideoInfoWritable v2:v2s) {
            goldsum += v2.getGold();
            watchnumpvsum += v2.getWatchnumpv();
            followersum += v2.getFollower();
            lengthsum += v2.getLength();
        }
        //组装k3,v3
        Text k3 = k2;
        VideoInfoWritable v3 = new VideoInfoWritable();
        v3.setGold(goldsum);
        v3.setWatchnumpv(watchnumpvsum);
        v3.setFollower(followersum);
        v3.setLength(lengthsum);
        //将k3,v3写出去
        context.write(k3, v3);
    }
}

4)自定义JOB

public class VideoInfoJob {
    public static void main(String[] args) {
        try {
            if(args.length!=2) {
                System.exit(0);
            }
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
            job.setJarByClass(VideoInfoJob.class);
            job.setMapperClass(VideoInfoMapper.class);
            job.setReducerClass(VideoInfoReducer.class);
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            
            //设置Mapper输出类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(VideoInfoWritable.class);
            //设置Reducer输出类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(VideoInfoWritable.class);
            //提交job
            job.waitForCompletion(true);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

3、实现TOP N统计

统计每天开播时长最 长的前10名主播及对应的开播时长

分析:基于清洗后的数据进行统计TOP10。
1:为了统计每天开播最长的前10名主播信息,需要在map阶段获取数据中每个主播的id和直播时长
2:所以map阶段的<k2,v2> 为<Text, LongWritable>
3:在Reduce端对相同主播的直播时长进行累加求和,把这些数据存储到一 个临时的map集合中
4:在Reduce 端的cleanup函数中对map集合中的数据根据直播时长进行排序
注意:cleanup只执行一次,会在所有的reduce执行完,可以将最终数据进行筛选。
5:最后在cleanup函数中把直播时长最长的前10名主播的信息写出到hdfs.文件中

注意:要求根据输入路径参数获取日期,将日期以yyyy-MM-dd的格式放到k3中一起写出来,方便导入到mysql数据库能看到数据对应的日期。

1)自定义Mapper

public class VideoInfoTop10Mapper extends Mapper<LongWritable, Text, Text, LongWritable> {
    @Override
    protected void map(LongWritable k1, Text v1,Context context)
            throws IOException, InterruptedException {
        String[] line = v1.toString().split("\t");
        String uid = line[0];
        long length = Long.parseLong(line[4]);
        //组装k2,v2
        Text k2 = new Text(uid);
        LongWritable v2 = new LongWritable(length);
        //将k2,v2写出去
        context.write(k2, v2);
    }
}

2)自定义Reducer

public class VideoInfoTop10Reducer extends Reducer<Text, LongWritable, Text, LongWritable> {
    //存储最终所有数据
    private Map<String,Long> map = new HashMap<String,Long>();

    @Override
    protected void reduce(Text k2, Iterable<LongWritable> v2s,Context context) throws IOException, InterruptedException {
        long lengthsum = 0;
        for(LongWritable v2:v2s) {
            lengthsum += v2.get();
        }
        map.put(k2.toString(), lengthsum);
    }

    //任务初始化的时候执行一次,仅执行一次,一般在里面做一些初始化资源链接的动作
    @Override
    protected void setup(Context context)
            throws IOException, InterruptedException {
        super.setup(context);
    }
    //任务结束的时候执行次,仅执行一次,做一些关闭资源的操作
    @Override
    protected void cleanup(Context context)
            throws IOException, InterruptedException {
        //获取日期
        Configuration conf = context.getConfiguration();
        String dt = conf.get("dt");
        //对map按照length降序排序
        Map<String,Long> sortedMap = MapUtils.sortByValueDesc(map);
        int count = 1;
        Set<Entry<String, Long>> set = sortedMap.entrySet();
        Iterator<Entry<String, Long>> it = set.iterator();
        while(count<=10 && it.hasNext()) {
            Entry<String, Long> entry =  it.next();
            //封装k3,v3
            Text k3 = new Text(dt+"\t"+entry.getKey());
            LongWritable v3 = new LongWritable(entry.getValue());
            //将k3,v3写出去
            context.write(k3, v3);
            count++;
        }
    }
}

3)MapUtils

package com.pzy.utils;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

import org.checkerframework.checker.units.qual.K;

public class MapUtils {
    //降序排序
    public static <K, V extends Comparable<? super V>> Map<K, V> sortByValueDesc(Map<K, V> map)
    {
        List<Map.Entry<K, V>> list = new LinkedList<Map.Entry<K, V>>(map.entrySet());
        Collections.sort(list, new Comparator<Map.Entry<K, V>>()
        {
            @Override
            public int compare(Map.Entry<K, V> o1, Map.Entry<K, V> o2)
            {
                int compare = (o1.getValue()).compareTo(o2.getValue());
                return -compare;
            }
        });

        Map<K, V> result = new LinkedHashMap<K, V>();
        for (Map.Entry<K, V> entry : list) {
            result.put(entry.getKey(), entry.getValue());
        }
        return result;
    }
    //升序排序
    public static <K, V extends Comparable<? super V>> Map<K, V> sortByValueAsc(Map<K, V> map)
    {
        List<Map.Entry<K, V>> list = new LinkedList<Map.Entry<K, V>>(map.entrySet());
        Collections.sort(list, new Comparator<Map.Entry<K, V>>()
        {
            @Override
            public int compare(Map.Entry<K, V> o1, Map.Entry<K, V> o2)
            {
                int compare = (o1.getValue()).compareTo(o2.getValue());
                return compare;
            }
        });

        Map<K, V> result = new LinkedHashMap<K, V>();
        for (Map.Entry<K, V> entry : list) {
            result.put(entry.getKey(), entry.getValue());
        }
        return result;
    }
}

4)自定义job

public class VideoInfoTop10Job {
    public static void main(String[] args) {
        try {
            if(args.length!=2) {
                System.exit(1);
            }
            //从输入参数中获取日期并转为yyyy-MM-dd格式,方便导入mysql数据库
            String[] fields = args[0].split("/");
            String tempDt = fields[fields.length-1];
            SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
            SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd");
            String dt = sdf2.format(sdf.parse(tempDt));
            
            Configuration conf = new Configuration();
            //为了方便Map和Reduce使用dt参数,可以将dt存入conf中
            conf.set("dt", dt);
            
            Job job = Job.getInstance(conf);
            job.setJarByClass(VideoInfoTop10Job.class);
            job.setMapperClass(VideoInfoTop10Mapper.class);
            job.setReducerClass(VideoInfoTop10Reducer.class);
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            
            //设置Mapper输出类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(LongWritable.class);
            //设置Reducer输出类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(LongWritable.class);
            //提交job
            job.waitForCompletion(true);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

4、实现定时任务脚本

1)把任务提交命令进行封装,方便使用,便于定时任务调度
2)脚本开发
步骤:
a.在video项目下新建bin目录,里面新建名为video_handler.sh的的脚本,创建后eclipse提示商店有支持shell的编辑器,我们可以去安装
Hadoop实战案例—直播数据统计与分析
Hadoop实战案例—直播数据统计与分析
b.脚本
在eclipse中创建名为video_handler.sh的脚本文件,代码如下:

注意:if [ “X$1” = “X” ] 中5个空格一个不能少。 if(空格1)[(空格2)”X$1″(空格3)=(空格4)”X”(空格5)]

#!/bin/bash
# 获取环境变量
source /etc/profile

# 判断用户是否输入日期参数,如果输入则处理对应日期数据,否则默认处理昨天数据
if [ "X$1" = "X" ]
then
    yes_time=`date +%Y%m%d --date="1 days ago"`
else
    yes_time=$1
fi
# 定义clean job的输入输出路径
cleanjob_input=hdfs://master:9820/data/videoinfo/${yes_time}
cleanjob_output=hdfs://master:9820/data/videoinfo_clean/${yes_time}

# 定义统计指标任务1输入输出路径
videoinfojob_input=${cleanjob_output}
videoinfojob_output=hdfs://master:9820/res/videoinfojob/${yes_time}

# 定义统计指标任务2输入输出路径
videoinfotop10job_input=${cleanjob_output}
videoinfotop10job_output=hdfs://master:9820/res/videoinfotop10job/${yes_time}

# 定义jar包统一路径
jobs_home=/usr/hadoop/jobs

# 删除输出目录,为了兼容脚本重跑情况
hdfs dfs -rm -r ${cleanjob_output}
hdfs dfs -rm -r ${videoinfojob_output}
hdfs dfs -rm -r ${videoinfotop10job_output}
# 执行数据清洗任务 \表示换行,该指令还未结束
hadoop jar \
${jobs_home}/video-0.0.1-SNAPSHOT-jar-with-dependencies.jar com.pzy.clean.DataCleanJob \
${cleanjob_input} ${cleanjob_output}

# 判断数据清洗任务是否执行成功-查看输出目录的_SUCCESS文件
hdfs dfs -ls ${cleanjob_output}/_SUCCESS
# $?指令的返回值如果是0表示上一条指令执行成功,非0则失败
if [ "$?" = "0" ]
then
    echo "cleanjob execute success....."
     # 执行统计指标任务1
    hadoop jar \
    ${jobs_home}/video-0.0.1-SNAPSHOT-jar-with-dependencies.jar com.pzy.videoinfo.VideoInfoJob \
    ${videoinfojob_input} ${videoinfojob_output}
 
     #执行统计指标任务2
     hadoop jar \
    ${jobs_home}/video-0.0.1-SNAPSHOT-jar-with-dependencies.jar com.pzy.top10.VideoInfoTop10Job \
    ${videoinfotop10job_input} ${videoinfotop10job_output}
else
    echo "cleanjob execute failed.....date is ${yes_time}"
fi

c.将jar传到jobs目录
将脚本新建到jobs目录下:
指令:

sudo vi video_handler.sh

注意:
1)脚本复制内容到linux系统创建(如果上传运行会报错,因为windows下换行符和linux下不一样)。
2)粘贴式要注意可能会有部分代码丢失,你需要去查看,如果丢失就补齐。
3)给 video_handler.sh赋予权限,否则hadoop用户运行时会无权限执行

赋予权限指令:

sudo chmod +x video_handler.sh

d.手工执行脚本
-x 代表调试模式

sh -x video_handler.sh 参数(可选)

e.在hadoop目录下新建joblogs目录,指令:

mkdir joblogs

注意:不能用sudo去创建,否则hadoop执行脚本时会提示无操作权限

f.定时执行脚本,时间每天0点30分,执行日志重定向到/usr/hadoop/joblogs/video_handler.log
需要把下面这行代码:

30 00 * * *  /usr/hadoop/jobs/video_handler.sh >> /usr/hadoop/joblogs/video_handler.log

添加到当前用户的crontab中(添加到/etc/crontab中也可以,不过需要在shell脚本路径前指定执行用户为hadoop),指令:

crontab -e

ps:定时任务执行脚本出现问题,可以去应的mail下去查看:

sudo cat /var/spool/mail/hadoop

版权声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系潘老师进行处理。
喜欢 (1)
请潘老师喝杯Coffee吧!】
分享 (0)

您必须 微信登录 才能发表评论!

登录