章
目
录
随着直播行业的崛起,大型互联网直播公司每日都会产生海量的直播数据,为了更好地服务主播与用户,提高直播质量与用户粘性,往往会对大量的数据进行分析与统计,从中挖掘商业价值,我们将通过一个实战案例,来使用Hadoop技术来实现对直播数据的统计与分析。
我这里是简化过后的数据格式,大致如下:
{"id":"1580089010000","uid":"12001002543","nickname":"jack2543","gold":561,"watchnumpv":1697,"follower":1509,"gifter":2920,"watchnumuv":5410,"length":3542,"exp":183}
{"id":"1580089010001","uid":"12001001853","nickname":"jack1853","gold":660,"watchnumpv":8160,"follower":1781,"gifter":551,"watchnumuv":4798,"length":189,"exp":89}
{"id":"1580089010002","uid":"12001003786","nickname":"jack3786","gold":14,"watchnumpv":577,"follower":1759,"gifter":2643,"watchnumuv":8910,"length":1203,"exp":54}
数据格式为json格式,大家可以自己写程序批量随机生成相关数据,当然,这里我也为大家提供模拟数据文件的下载,解压后的文件名为video_20200504.log:
|
文件下载 |
文件名称:直播模拟数据 | 文件大小:206KB |
| 下载声明:本站资源仅供学习和研究使用,不得用于商业用途。如资源不慎侵犯你的版权请联系博主处理! | ||
| 下载地址:免费下载" rel="nofollow noopener noreferrer" target="_blank"> 点击下载 | |
|
运营部门门需要针对主播每天的开播数据进行分析,统计出来每天受欢迎程度比较高的一些主播,进而对这些主播分发更多流量,挖掘最大价值,我们这里主要做两个具体需求:
1、对数据中的金币数量,总观看pv,粉丝关注数量,视频总开播时长等指标进行统计
2、统计每天开播时长最长的前10名主播及对应的开播时长
具体实现流程我们分为4个大步骤,分别如下:
- 实现数据清洗
- 实现指标统计
- 实现TOP N统计
- 实现定时任务脚本
1、实现数据清洗
由于原始数据是通过日志方式进行记录的,在使用日志采集工具采集到HDFS之后,还需要对数据进行清洗过滤,丢弃缺失字段的数据,针对异常字段值进行标准化处理。
需求:
1.从原始数据(JSON格式)中过滤出来需要的字段
主播id(uid)、金币数量(gold)、总观看PV(watchnumpv)、粉丝关注数量(follower)、视频开播总时长(length)
2.针对核心字段进行异常值判断
金币数量、总观看PV、粉丝关注数量、视频总开播时长
以上四个字段正常情况下都不应该是负值,也不应该缺失,如果这些字段值为负值,则认为是异常数据,直接丢弃,如果这些字段个别缺失,则认为该字段的值为0
分析:
1:由于原始数据是json格式的,所以可以使用fastison对原始数据进行解析,获取指定字段的内容
2:然后对获取到的数据进行判断,只保留满足条件的数据即可
3:由于不需要聚合过程, 只是一个简单的过滤操作,所以只需要map阶段即可,reduce阶段就不需要了
4:其中map阶段的k1, v1的数据类型是固定的:
k2, v2的数据类型为:
1)创建Maven项目,pom.xml除了之前的配置,还需要新增fastjson依赖
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.68</version>
</dependency>
2)创建清洗Mapper
public class DataCleanMapper extends Mapper<LongWritable, Text, NullWritable, Text> {
@Override
protected void map(LongWritable k1, Text v1, Context context)
throws IOException, InterruptedException {
//1.将v1转为json对象
JSONObject json = JSON.parseObject(v1.toString());
//2.提取v1中的核心字段-使用getIntValue而不使用getIntger,因为getIntValue获取不到会返回0
String uid = json.getString("uid");//主播id(uid)
int gold = json.getIntValue("gold");//金币数量(gold)
int watchnumpv = json.getIntValue("watchnumpv");//总观看PV(watchnumpv)
int follower = json.getIntValue("follower");//粉丝关注数量(follower)
int length = json.getIntValue("length");//视频开播总时长(length)
//过滤异常数据
if(uid!= null && !"".equals(uid) && gold>=0
&& watchnumpv>=0 && follower>=0 && length>=0) {
//组装k2,v2
NullWritable k2 = NullWritable.get();
Text v2 = new Text(uid+"\t"+gold+"\t"+watchnumpv+"\t"+follower+"\t"+length);
//将k2,v2写出去
context.write(k2, v2);
}
}
}
3)创建清洗JOB
public class DataCleanJob {
public static void main(String[] args) {
try {
if(args.length!=2) {
System.exit(1);
}
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(DataCleanJob.class);
job.setMapperClass(DataCleanMapper.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//设置reduce数量为0
job.setNumReduceTasks(0);
//设置Mapper输出类型
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Text.class);
//提交job
job.waitForCompletion(true);
} catch (Exception e) {
e.printStackTrace();
}
}
}
4)打包提交运行测试
注意上传的jar包是带有依赖的jar,因为我们要使用fastjson,video log日志文件上传到/data/videoinfo/日志日期目录递归创建/data/videoinfo/日志日期目录语句:
hdfs dfs -mkdir -p /data/videoinfo/20200504
运行
hadoop jar video-0.0.1-SNAPSHOT-jar-with-dependencies.jar com.pzy.clean.DataCleanJob /data/videoinfo/20200504/video_20200504.log /res0504
查看清洗后的总纪录数:
hdfs dfs -cat /res0504/* | wc -l
2、指标统计实现
对数据中的金币数量,总观看pv,粉丝关注数量,视频总开播时长等指标进行统计。
1)自定义bean,封装金币数量,总观看pv,粉丝关注数量,视频总开播时长等统计指标
public class VideoInfoWritable implements Writable {
private long gold;
private long watchnumpv;
private long follower;
private long length;
public long getGold() {
return gold;
}
public void setGold(long gold) {
this.gold = gold;
}
public long getWatchnumpv() {
return watchnumpv;
}
public void setWatchnumpv(long watchnumpv) {
this.watchnumpv = watchnumpv;
}
public long getFollower() {
return follower;
}
public void setFollower(long follower) {
this.follower = follower;
}
public long getLength() {
return length;
}
public void setLength(long length) {
this.length = length;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(gold);
out.writeLong(watchnumpv);
out.writeLong(follower);
out.writeLong(length);
}
@Override
public void readFields(DataInput in) throws IOException {
gold = in.readLong();
watchnumpv = in.readLong();
follower = in.readLong();
length = in.readLong();
}
@Override
public String toString() {
return gold+"\t"+watchnumpv+"\t"+follower+"\t"+length;
}
}
2)自定义Mapper
/**
* 统计金币数量,总观看pv,粉丝关注数量,视频总开播时长等指标
* k1,v1来自清洗后的文件
*/
public class VideoInfoMapper extends Mapper<LongWritable, Text, Text, VideoInfoWritable> {
@Override
protected void map(LongWritable k1, Text v1, Context context)
throws IOException, InterruptedException {
String[] line = v1.toString().split("\t");
//获取指标
String uid = line[0];
long gold = Long.parseLong(line[1]);
long watchnumpv = Long.parseLong(line[2]);
long follower = Long.parseLong(line[3]);
long length = Long.parseLong(line[4]);
//组装k2,v2
Text k2 = new Text(uid);
VideoInfoWritable v2 = new VideoInfoWritable();
v2.setGold(gold);
v2.setWatchnumpv(watchnumpv);
v2.setFollower(follower);
v2.setLength(length);
//将k2,v2写出去
context.write(k2, v2);
}
}
3)自定义Reducer
public class VideoInfoReducer extends Reducer<Text, VideoInfoWritable, Text, VideoInfoWritable>{
@Override
protected void reduce(Text k2, Iterable<VideoInfoWritable> v2s,Context context)
throws IOException, InterruptedException {
//遍历v2s,对对应的指标求和
long goldsum = 0;
long watchnumpvsum = 0;
long followersum = 0;
long lengthsum = 0;
for(VideoInfoWritable v2:v2s) {
goldsum += v2.getGold();
watchnumpvsum += v2.getWatchnumpv();
followersum += v2.getFollower();
lengthsum += v2.getLength();
}
//组装k3,v3
Text k3 = k2;
VideoInfoWritable v3 = new VideoInfoWritable();
v3.setGold(goldsum);
v3.setWatchnumpv(watchnumpvsum);
v3.setFollower(followersum);
v3.setLength(lengthsum);
//将k3,v3写出去
context.write(k3, v3);
}
}
4)自定义JOB
public class VideoInfoJob {
public static void main(String[] args) {
try {
if(args.length!=2) {
System.exit(0);
}
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(VideoInfoJob.class);
job.setMapperClass(VideoInfoMapper.class);
job.setReducerClass(VideoInfoReducer.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//设置Mapper输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(VideoInfoWritable.class);
//设置Reducer输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(VideoInfoWritable.class);
//提交job
job.waitForCompletion(true);
} catch (Exception e) {
e.printStackTrace();
}
}
}
3、实现TOP N统计
统计每天开播时长最 长的前10名主播及对应的开播时长
1:为了统计每天开播最长的前10名主播信息,需要在map阶段获取数据中每个主播的id和直播时长
2:所以map阶段的<k2,v2> 为<Text, LongWritable>
3:在Reduce端对相同主播的直播时长进行累加求和,把这些数据存储到一 个临时的map集合中
4:在Reduce 端的cleanup函数中对map集合中的数据根据直播时长进行排序
注意:cleanup只执行一次,会在所有的reduce执行完,可以将最终数据进行筛选。
5:最后在cleanup函数中把直播时长最长的前10名主播的信息写出到hdfs.文件中
1)自定义Mapper
public class VideoInfoTop10Mapper extends Mapper<LongWritable, Text, Text, LongWritable> {
@Override
protected void map(LongWritable k1, Text v1,Context context)
throws IOException, InterruptedException {
String[] line = v1.toString().split("\t");
String uid = line[0];
long length = Long.parseLong(line[4]);
//组装k2,v2
Text k2 = new Text(uid);
LongWritable v2 = new LongWritable(length);
//将k2,v2写出去
context.write(k2, v2);
}
}
2)自定义Reducer
public class VideoInfoTop10Reducer extends Reducer<Text, LongWritable, Text, LongWritable> {
//存储最终所有数据
private Map<String,Long> map = new HashMap<String,Long>();
@Override
protected void reduce(Text k2, Iterable<LongWritable> v2s,Context context) throws IOException, InterruptedException {
long lengthsum = 0;
for(LongWritable v2:v2s) {
lengthsum += v2.get();
}
map.put(k2.toString(), lengthsum);
}
//任务初始化的时候执行一次,仅执行一次,一般在里面做一些初始化资源链接的动作
@Override
protected void setup(Context context)
throws IOException, InterruptedException {
super.setup(context);
}
//任务结束的时候执行次,仅执行一次,做一些关闭资源的操作
@Override
protected void cleanup(Context context)
throws IOException, InterruptedException {
//获取日期
Configuration conf = context.getConfiguration();
String dt = conf.get("dt");
//对map按照length降序排序
Map<String,Long> sortedMap = MapUtils.sortByValueDesc(map);
int count = 1;
Set<Entry<String, Long>> set = sortedMap.entrySet();
Iterator<Entry<String, Long>> it = set.iterator();
while(count<=10 && it.hasNext()) {
Entry<String, Long> entry = it.next();
//封装k3,v3
Text k3 = new Text(dt+"\t"+entry.getKey());
LongWritable v3 = new LongWritable(entry.getValue());
//将k3,v3写出去
context.write(k3, v3);
count++;
}
}
}
3)MapUtils
package com.pzy.utils;
import java.util.Collections;
import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.checkerframework.checker.units.qual.K;
public class MapUtils {
//降序排序
public static <K, V extends Comparable<? super V>> Map<K, V> sortByValueDesc(Map<K, V> map)
{
List<Map.Entry<K, V>> list = new LinkedList<Map.Entry<K, V>>(map.entrySet());
Collections.sort(list, new Comparator<Map.Entry<K, V>>()
{
@Override
public int compare(Map.Entry<K, V> o1, Map.Entry<K, V> o2)
{
int compare = (o1.getValue()).compareTo(o2.getValue());
return -compare;
}
});
Map<K, V> result = new LinkedHashMap<K, V>();
for (Map.Entry<K, V> entry : list) {
result.put(entry.getKey(), entry.getValue());
}
return result;
}
//升序排序
public static <K, V extends Comparable<? super V>> Map<K, V> sortByValueAsc(Map<K, V> map)
{
List<Map.Entry<K, V>> list = new LinkedList<Map.Entry<K, V>>(map.entrySet());
Collections.sort(list, new Comparator<Map.Entry<K, V>>()
{
@Override
public int compare(Map.Entry<K, V> o1, Map.Entry<K, V> o2)
{
int compare = (o1.getValue()).compareTo(o2.getValue());
return compare;
}
});
Map<K, V> result = new LinkedHashMap<K, V>();
for (Map.Entry<K, V> entry : list) {
result.put(entry.getKey(), entry.getValue());
}
return result;
}
}
4)自定义job
public class VideoInfoTop10Job {
public static void main(String[] args) {
try {
if(args.length!=2) {
System.exit(1);
}
//从输入参数中获取日期并转为yyyy-MM-dd格式,方便导入mysql数据库
String[] fields = args[0].split("/");
String tempDt = fields[fields.length-1];
SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
SimpleDateFormat sdf2 = new SimpleDateFormat("yyyy-MM-dd");
String dt = sdf2.format(sdf.parse(tempDt));
Configuration conf = new Configuration();
//为了方便Map和Reduce使用dt参数,可以将dt存入conf中
conf.set("dt", dt);
Job job = Job.getInstance(conf);
job.setJarByClass(VideoInfoTop10Job.class);
job.setMapperClass(VideoInfoTop10Mapper.class);
job.setReducerClass(VideoInfoTop10Reducer.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//设置Mapper输出类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//设置Reducer输出类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//提交job
job.waitForCompletion(true);
} catch (Exception e) {
e.printStackTrace();
}
}
}
4、实现定时任务脚本
1)把任务提交命令进行封装,方便使用,便于定时任务调度
2)脚本开发
步骤:
a.在video项目下新建bin目录,里面新建名为video_handler.sh的的脚本,创建后eclipse提示商店有支持shell的编辑器,我们可以去安装


b.脚本
在eclipse中创建名为video_handler.sh的脚本文件,代码如下:
#!/bin/bash
# 获取环境变量
source /etc/profile
# 判断用户是否输入日期参数,如果输入则处理对应日期数据,否则默认处理昨天数据
if [ "X$1" = "X" ]
then
yes_time=`date +%Y%m%d --date="1 days ago"`
else
yes_time=$1
fi
# 定义clean job的输入输出路径
cleanjob_input=hdfs://master:9820/data/videoinfo/${yes_time}
cleanjob_output=hdfs://master:9820/data/videoinfo_clean/${yes_time}
# 定义统计指标任务1输入输出路径
videoinfojob_input=${cleanjob_output}
videoinfojob_output=hdfs://master:9820/res/videoinfojob/${yes_time}
# 定义统计指标任务2输入输出路径
videoinfotop10job_input=${cleanjob_output}
videoinfotop10job_output=hdfs://master:9820/res/videoinfotop10job/${yes_time}
# 定义jar包统一路径
jobs_home=/usr/hadoop/jobs
# 删除输出目录,为了兼容脚本重跑情况
hdfs dfs -rm -r ${cleanjob_output}
hdfs dfs -rm -r ${videoinfojob_output}
hdfs dfs -rm -r ${videoinfotop10job_output}
# 执行数据清洗任务 \表示换行,该指令还未结束
hadoop jar \
${jobs_home}/video-0.0.1-SNAPSHOT-jar-with-dependencies.jar com.pzy.clean.DataCleanJob \
${cleanjob_input} ${cleanjob_output}
# 判断数据清洗任务是否执行成功-查看输出目录的_SUCCESS文件
hdfs dfs -ls ${cleanjob_output}/_SUCCESS
# $?指令的返回值如果是0表示上一条指令执行成功,非0则失败
if [ "$?" = "0" ]
then
echo "cleanjob execute success....."
# 执行统计指标任务1
hadoop jar \
${jobs_home}/video-0.0.1-SNAPSHOT-jar-with-dependencies.jar com.pzy.videoinfo.VideoInfoJob \
${videoinfojob_input} ${videoinfojob_output}
#执行统计指标任务2
hadoop jar \
${jobs_home}/video-0.0.1-SNAPSHOT-jar-with-dependencies.jar com.pzy.top10.VideoInfoTop10Job \
${videoinfotop10job_input} ${videoinfotop10job_output}
else
echo "cleanjob execute failed.....date is ${yes_time}"
fi
c.将jar传到jobs目录
将脚本新建到jobs目录下:
指令:
sudo vi video_handler.sh
1)脚本复制内容到linux系统创建(如果上传运行会报错,因为windows下换行符和linux下不一样)。
2)粘贴式要注意可能会有部分代码丢失,你需要去查看,如果丢失就补齐。
3)给 video_handler.sh赋予权限,否则hadoop用户运行时会无权限执行
赋予权限指令:
sudo chmod +x video_handler.sh
d.手工执行脚本
-x 代表调试模式
sh -x video_handler.sh 参数(可选)
e.在hadoop目录下新建joblogs目录,指令:
mkdir joblogs
f.定时执行脚本,时间每天0点30分,执行日志重定向到/usr/hadoop/joblogs/video_handler.log下
需要把下面这行代码:
30 00 * * * /usr/hadoop/jobs/video_handler.sh >> /usr/hadoop/joblogs/video_handler.log
添加到当前用户的crontab中(添加到/etc/crontab中也可以,不过需要在shell脚本路径前指定执行用户为hadoop),指令:
crontab -e
ps:定时任务执行脚本出现问题,可以去应的mail下去查看:
sudo cat /var/spool/mail/hadoop







