章
目
录
随着直播行业的崛起,大型互联网直播公司每日都会产生海量的直播数据,为了更好地服务主播与用户,提高直播质量与用户粘性,往往会对大量的数据进行分析与统计,从中挖掘商业价值,我们将通过一个实战案例,来使用Hadoop技术来实现对直播数据的统计与分析。
我这里是简化过后的数据格式,大致如下:
{"id":"1580089010000","uid":"12001002543","nickname":"jack2543","gold":561,"watchnumpv":1697,"follower":1509,"gifter":2920,"watchnumuv":5410,"length":3542,"exp":183} {"id":"1580089010001","uid":"12001001853","nickname":"jack1853","gold":660,"watchnumpv":8160,"follower":1781,"gifter":551,"watchnumuv":4798,"length":189,"exp":89} {"id":"1580089010002","uid":"12001003786","nickname":"jack3786","gold":14,"watchnumpv":577,"follower":1759,"gifter":2643,"watchnumuv":8910,"length":1203,"exp":54}
数据格式为json格式,大家可以自己写程序批量随机生成相关数据,当然,这里我也为大家提供模拟数据文件的下载,解压后的文件名为video_20200504.log
:
文件下载 |
文件名称:直播模拟数据 | 文件大小:206KB |
下载声明:本站资源仅供学习和研究使用,不得用于商业用途。如资源不慎侵犯你的版权请联系博主处理! | ||
下载地址:免费下载" rel="nofollow noopener noreferrer" target="_blank"> 点击下载 | |
运营部门门需要针对主播每天的开播数据进行分析,统计出来每天受欢迎程度比较高的一些主播,进而对这些主播分发更多流量,挖掘最大价值,我们这里主要做两个具体需求:
1、对数据中的金币数量,总观看pv,粉丝关注数量,视频总开播时长等指标进行统计
2、统计每天开播时长最长的前10名主播及对应的开播时长
具体实现流程我们分为4个大步骤,分别如下:
- 实现数据清洗
- 实现指标统计
- 实现TOP N统计
- 实现定时任务脚本
1、实现数据清洗
由于原始数据是通过日志方式进行记录的,在使用日志采集工具采集到HDFS之后,还需要对数据进行清洗过滤,丢弃缺失字段的数据,针对异常字段值进行标准化处理。
需求:
1.从原始数据(JSON格式)中过滤出来需要的字段
主播id(uid)、金币数量(gold)、总观看PV(watchnumpv)、粉丝关注数量(follower)、视频开播总时长(length)
2.针对核心字段进行异常值判断
金币数量、总观看PV、粉丝关注数量、视频总开播时长
以上四个字段正常情况下都不应该是负值,也不应该缺失,如果这些字段值为负值,则认为是异常数据,直接丢弃,如果这些字段个别缺失,则认为该字段的值为0
分析:
1:由于原始数据是json格式的,所以可以使用fastison对原始数据进行解析,获取指定字段的内容
2:然后对获取到的数据进行判断,只保留满足条件的数据即可
3:由于不需要聚合过程, 只是一个简单的过滤操作,所以只需要map阶段即可,reduce阶段就不需要了
4:其中map阶段的k1, v1的数据类型是固定的:
k2, v2的数据类型为:
1)创建Maven项目,pom.xml除了之前的配置,还需要新增fastjson依赖
<dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.68</version> </dependency>
2)创建清洗Mapper
public class DataCleanMapper extends Mapper<LongWritable, Text, NullWritable, Text> { @Override protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { //1.将v1转为json对象 JSONObject json = JSON.parseObject(v1.toString()); //2.提取v1中的核心字段-使用getIntValue而不使用getIntger,因为getIntValue获取不到会返回0 String uid = json.getString("uid");//主播id(uid) int gold = json.getIntValue("gold");//金币数量(gold) int watchnumpv = json.getIntValue("watchnumpv");//总观看PV(watchnumpv) int follower = json.getIntValue("follower");//粉丝关注数量(follower) int length = json.getIntValue("length");//视频开播总时长(length) //过滤异常数据 if(uid!= null && !"".equals(uid) && gold>=0 && watchnumpv>=0 && follower>=0 && length>=0) { //组装k2,v2 NullWritable k2 = NullWritable.get(); Text v2 = new Text(uid+"\t"+gold+"\t"+watchnumpv+"\t"+follower+"\t"+length); //将k2,v2写出去 context.write(k2, v2); } } }
3)创建清洗JOB
public class DataCleanJob { public static void main(String[] args) { try { if(args.length!=2) { System.exit(1); } Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(DataCleanJob.class); job.setMapperClass(DataCleanMapper.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //设置reduce数量为0 job.setNumReduceTasks(0); //设置Mapper输出类型 job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(Text.class); //提交job job.waitForCompletion(true); } catch (Exception e) { e.printStackTrace(); } } }
4)打包提交运行测试
注意上传的jar包是带有依赖的jar,因为我们要使用fastjson,video log日志文件上传到/data/videoinfo/
日志日期目录递归创建/data/videoinfo/
日志日期目录语句:
hdfs dfs -mkdir -p /data/videoinfo/20200504
运行
hadoop jar video-0.0.1-SNAPSHOT-jar-with-dependencies.jar com.pzy.clean.DataCleanJob /data/videoinfo/20200504/video_20200504.log /res0504
查看清洗后的总纪录数:
hdfs dfs -cat /res0504/* | wc -l
2、指标统计实现
对数据中的金币数量,总观看pv,粉丝关注数量,视频总开播时长等指标进行统计。
1)自定义bean,封装金币数量,总观看pv,粉丝关注数量,视频总开播时长等统计指标
public class VideoInfoWritable implements Writable { private long gold; private long watchnumpv; private long follower; private long length; public long getGold() { return gold; } public void setGold(long gold) { this.gold = gold; } public long getWatchnumpv() { return watchnumpv; } public void setWatchnumpv(long watchnumpv) { this.watchnumpv = watchnumpv; } public long getFollower() { return follower; } public void setFollower(long follower) { this.follower = follower; } public long getLength() { return length; } public void setLength(long length) { this.length = length; } @Override public void write(DataOutput out) throws IOException { out.writeLong(gold); out.writeLong(watchnumpv); out.writeLong(follower); out.writeLong(length); } @Override public void readFields(DataInput in) throws IOException { gold = in.readLong(); watchnumpv = in.readLong(); follower = in.readLong(); length = in.readLong(); } @Override public String toString() { return gold+"\t"+watchnumpv+"\t"+follower+"\t"+length; } }
2)自定义Mapper
/** * 统计金币数量,总观看pv,粉丝关注数量,视频总开播时长等指标 * k1,v1来自清洗后的文件 */ public class VideoInfoMapper extends Mapper<LongWritable, Text, Text, VideoInfoWritable> { @Override protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { String[] line = v1.toString().split("\t"); //获取指标 String uid = line[0]; long gold = Long.parseLong(line[1]); long watchnumpv = Long.parseLong(line[2]); long follower = Long.parseLong(line[3]); long length = Long.parseLong(line[4]); //组装k2,v2 Text k2 = new Text(uid); VideoInfoWritable v2 = new VideoInfoWritable(); v2.setGold(gold); v2.setWatchnumpv(watchnumpv); v2.setFollower(follower); v2.setLength(length); //将k2,v2写出去 context.write(k2, v2); } }
3)自定义Reducer
public class VideoInfoReducer extends Reducer<Text, VideoInfoWritable, Text, VideoInfoWritable>{ @Override protected void reduce(Text k2, Iterable<VideoInfoWritable> v2s,Context context) throws IOException, InterruptedException { //遍历v2s,对对应的指标求和 long goldsum = 0; long watchnumpvsum = 0; long followersum = 0; long lengthsum = 0; for(VideoInfoWritable v2:v2s) { goldsum += v2.getGold(); watchnumpvsum += v2.getWatchnumpv(); followersum += v2.getFollower(); lengthsum += v2.getLength(); } //组装k3,v3 Text k3 = k2; VideoInfoWritable v3 = new VideoInfoWritable(); v3.setGold(goldsum); v3.setWatchnumpv(watchnumpvsum); v3.setFollower(followersum); v3.setLength(lengthsum); //将k3,v3写出去 context.write(k3, v3); } }
4)自定义JOB
public class VideoInfoJob { public static void main(String[] args) { try { if(args.length!=2) { System.exit(0); } Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(VideoInfoJob.class); job.setMapperClass(VideoInfoMapper.class); job.setReducerClass(VideoInfoReducer.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //设置Mapper输出类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(VideoInfoWritable.class); //设置Reducer输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(VideoInfoWritable.class); //提交job job.waitForCompletion(true); } catch (Exception e) { e.printStackTrace(); } } }
3、实现TOP N统计
统计每天开播时长最 长的前10名主播及对应的开播时长
1:为了统计每天开播最长的前10名主播信息,需要在map阶段获取数据中每个主播的id和直播时长
2:所以map阶段的<k2,v2> 为<Text, LongWritable>
3:在Reduce端对相同主播的直播时长进行累加求和,把这些数据存储到一 个临时的map集合中
4:在Reduce 端的cleanup函数中对map集合中的数据根据直播时长进行排序
注意:cleanup只执行一次,会在所有的reduce执行完,可以将最终数据进行筛选。
5:最后在cleanup函数中把直播时长最长的前10名主播的信息写出到hdfs.文件中
1)自定义Mapper
public class VideoInfoTop10Mapper extends Mapper<LongWritable, Text, Text, LongWritable> { @Override protected void map(LongWritable k1, Text v1,Context context) throws IOException, InterruptedException { String[] line = v1.toString().split("\t"); String uid = line[0]; long length = Long.parseLong(line[4]); //组装k2,v2 Text k2 = new Text(uid); LongWritable v2 = new LongWritable(length); //将k2,v2写出去 context.write(k2, v2); } }
2)自定义Reducer
public class VideoInfoTop10Reducer extends Reducer<Text, LongWritable, Text, LongWritable> { //存储最终所有数据 private Map<String,Long> map = new HashMap<String,Long>(); @Override protected void reduce(Text k2, Iterable<LongWritable> v2s,Context context) throws IOException, InterruptedException { long lengthsum = 0; for(LongWritable v2:v2s) { lengthsum += v2.get(); } map.put(k2.toString(), lengthsum); } //任务初始化的时候执行一次,仅执行一次,一般在里面做一些初始化资源链接的动作 @Override protected void setup(Context context) throws IOException, InterruptedException { super.setup(context); } //任务结束的时候执行次,仅执行一次,做一些关闭资源的操作 @Override protected void cleanup(Context context) throws IOException, InterruptedException { //获取日期 Configuration conf = context.getConfiguration(); String dt = conf.get("dt"); //对map按照length降序排序 Map<String,Long> sortedMap = MapUtils.sortByValueDesc(map); int count = 1; Set<Entry<String, Long>> set = sortedMap.entrySet(); Iterator<Entry<String, Long>> it = set.iterator(); while(count<=10 && it.hasNext()) { Entry<String, Long> entry = it.next(); //封装k3,v3 Text k3 = new Text(dt+"\t"+entry.getKey()); LongWritable v3 = new LongWritable(entry.getValue()); //将k3,v3写出去 context.write(k3, v3); count++; } } }
3)MapUtils
package com.pzy.utils; import java.util.Collections; import java.util.Comparator; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; import java.util.Map; import org.checkerframework.checker.units.qual.K; public class MapUtils { //降序排序 public static <K, V extends Comparable<? super V>> Map<K, V> sortByValueDesc(Map<K, V> map) { List<Map.Entry<K, V>> list = new LinkedList<Map.Entry<K, V>>(map.entrySet()); Collections.sort(list, new Comparator<Map.Entry<K, V>>() { @Override public int compare(Map.Entry<K, V> o1, Map.Entry<K, V> o2) { int compare = (o1.getValue()).compareTo(o2.getValue()); return -compare; } }); Map<K, V> result = new LinkedHashMap<K, V>(); for (Map.Entry<K, V> entry : list) { result.put(entry.getKey(), entry.getValue()); } return result; } //升序排序 public static <K, V extends Comparable<? super V>> Map<K, V> sortByValueAsc(Map<K, V> map) { List<Map.Entry<K, V>> list = new LinkedList<Map.Entry<K, V>>(map.entrySet()); Collections.sort(list, new Comparator<Map.Entry<K, V>>() { @Override public int compare(Map.Entry<K, V> o1, Map.Entry<K, V> o2) { int compare = (o1.getValue()).compareTo(o2.getValue()); return compare; } }); Map<K, V> result = new LinkedHashMap<K, V>(); for (Map.Entry<K, V> entry : list) { result.put(entry.getKey(), entry.getValue()); } return result; } }
4)自定义job
public class VideoInfoTop10Job { public static void main(String[] args) { try { if(args.length!=2) { System.exit(1); } //从输入参数中获取日期并转为yyyy-MM-dd格式,方便导入mysql数据库 String[] fields = args[0].split("/"); String tempDt = fields[fields.length-1]; SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd"); SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd"); String dt = sdf2.format(sdf.parse(tempDt)); Configuration conf = new Configuration(); //为了方便Map和Reduce使用dt参数,可以将dt存入conf中 conf.set("dt", dt); Job job = Job.getInstance(conf); job.setJarByClass(VideoInfoTop10Job.class); job.setMapperClass(VideoInfoTop10Mapper.class); job.setReducerClass(VideoInfoTop10Reducer.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //设置Mapper输出类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); //设置Reducer输出类型 job.setOutputKeyClass(Text.class); job.setOutputValueClass(LongWritable.class); //提交job job.waitForCompletion(true); } catch (Exception e) { e.printStackTrace(); } } }
4、实现定时任务脚本
1)把任务提交命令进行封装,方便使用,便于定时任务调度
2)脚本开发
步骤:
a.在video项目下新建bin目录,里面新建名为video_handler.sh的的脚本,创建后eclipse提示商店有支持shell的编辑器,我们可以去安装
b.脚本
在eclipse中创建名为video_handler.sh
的脚本文件,代码如下:
#!/bin/bash # 获取环境变量 source /etc/profile # 判断用户是否输入日期参数,如果输入则处理对应日期数据,否则默认处理昨天数据 if [ "X$1" = "X" ] then yes_time=`date +%Y%m%d --date="1 days ago"` else yes_time=$1 fi # 定义clean job的输入输出路径 cleanjob_input=hdfs://master:9820/data/videoinfo/${yes_time} cleanjob_output=hdfs://master:9820/data/videoinfo_clean/${yes_time} # 定义统计指标任务1输入输出路径 videoinfojob_input=${cleanjob_output} videoinfojob_output=hdfs://master:9820/res/videoinfojob/${yes_time} # 定义统计指标任务2输入输出路径 videoinfotop10job_input=${cleanjob_output} videoinfotop10job_output=hdfs://master:9820/res/videoinfotop10job/${yes_time} # 定义jar包统一路径 jobs_home=/usr/hadoop/jobs # 删除输出目录,为了兼容脚本重跑情况 hdfs dfs -rm -r ${cleanjob_output} hdfs dfs -rm -r ${videoinfojob_output} hdfs dfs -rm -r ${videoinfotop10job_output} # 执行数据清洗任务 \表示换行,该指令还未结束 hadoop jar \ ${jobs_home}/video-0.0.1-SNAPSHOT-jar-with-dependencies.jar com.pzy.clean.DataCleanJob \ ${cleanjob_input} ${cleanjob_output} # 判断数据清洗任务是否执行成功-查看输出目录的_SUCCESS文件 hdfs dfs -ls ${cleanjob_output}/_SUCCESS # $?指令的返回值如果是0表示上一条指令执行成功,非0则失败 if [ "$?" = "0" ] then echo "cleanjob execute success....." # 执行统计指标任务1 hadoop jar \ ${jobs_home}/video-0.0.1-SNAPSHOT-jar-with-dependencies.jar com.pzy.videoinfo.VideoInfoJob \ ${videoinfojob_input} ${videoinfojob_output} #执行统计指标任务2 hadoop jar \ ${jobs_home}/video-0.0.1-SNAPSHOT-jar-with-dependencies.jar com.pzy.top10.VideoInfoTop10Job \ ${videoinfotop10job_input} ${videoinfotop10job_output} else echo "cleanjob execute failed.....date is ${yes_time}" fi
c.将jar传到jobs目录
将脚本新建到jobs目录下:
指令:
sudo vi video_handler.sh
1)脚本复制内容到linux系统创建(如果上传运行会报错,因为windows下换行符和linux下不一样)。
2)粘贴式要注意可能会有部分代码丢失,你需要去查看,如果丢失就补齐。
3)给 video_handler.sh赋予权限,否则hadoop用户运行时会无权限执行
赋予权限指令:
sudo chmod +x video_handler.sh
d.手工执行脚本
-x 代表调试模式
sh -x video_handler.sh 参数(可选)
e.在hadoop目录下新建joblogs目录,指令:
mkdir joblogs
f.定时执行脚本,时间每天0点30分,执行日志重定向到/usr/hadoop/joblogs/video_handler.log
下
需要把下面这行代码:
30 00 * * * /usr/hadoop/jobs/video_handler.sh >> /usr/hadoop/joblogs/video_handler.log
添加到当前用户的crontab中(添加到/etc/crontab中也可以,不过需要在shell脚本路径前指定执行用户为hadoop),指令:
crontab -e
ps:定时任务执行脚本出现问题,可以去应的mail下去查看:
sudo cat /var/spool/mail/hadoop