ETL,是Extract-Load-Transform的缩写,用来描述将数据从来源端经过抽取(extract)、(清洗)转换(transform)、加载(load)至目的端的过程,目的是将企业中的分散、零乱、标准不统一的数据整合到一起,为企业管理层的决策提供分析依据,ETL是BI(商业智能)项目重要的一个环节。下面潘老师通过一个简单的数据清洗案例,来带大家初步认识ETL。
1、数据清洗:将不规整数据转化为规整数据。在运行核心业务MapReduce程序之前,往往要先对数据进行清洗,清理掉不符合用户要求 的数据。清理的过程往往只需要运行Mapper程序,不需要运行Reduce程序。
2、数据清洗目的:刚刚采集到HDFS中的原生数据,我们也称为不规整数据,即目前来说,该数据的格式还无 法满足我们对数据处理的基本要求,需要对其进行预处理,转化为我们后面工作所需要的较 为规整的数据,所以这里的数据清洗,其实指的就是对数据进行基本的预处理,以方便我们 后面的统计分析,所以这一步并不是必须的,需要根据不同的业务需求来进行取舍,只是在 我们的场景中需要对数据进行一定的处理。
1、数据说明
本案例采用简化后的发货信息模拟数据(部分),其文本格式如下(字段之间使用tab键隔开),存放在名为ems.txt文件中:
发货地 收货地 订单平台 发货时间(毫秒值)快递类型 ZheJiang ShangDong TIANMAO 1585105319000 YOUZHENG SiChuan FuJian TAOBAO 1586874652000 SHUNFENG HeiBei GuangDong TIANMAO 1584691816000 YUNDA
2、清洗要求
1)提取字段只要:订单平台 发货时间 快递类型
2)订单平台只获取TAOBAO的
3)对发货时间进行格式化为”yyyy-MM-dd HH:mm:ss”
4)字段之间用”,”隔开
3、过程实现
1)这里我将ems.txt上传至HDFS的/input目录下。
2)实现发货数据清洗类
public class ETLMapper extends Mapper<LongWritable, Text, NullWritable, Text> {
@Override
protected void map(LongWritable k1, Text v1, Context context)
throws IOException, InterruptedException {
//对v1按照空格分隔
String[] datas = v1.toString().split("\t");
String plat = datas[2];
//只获取订单平台 发货时间 快递类型 -平台限制为TAOBAO
if (!"TAOBAO".equals(plat)) {
return;
}
Date date = new Date();
date.setTime(Long.parseLong(datas[3]));
//格式化日期
String time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(date);
String ems = datas[4];
//组装k2,v2,中间用逗号隔开,k2为空
Text v2 = new Text(plat + "," + time + "," + ems);
//写出<k2,v2>
context.write(NullWritable.get(), v2);
}
}
3)发货数据清洗任务类
public class ETLJob {
public static void main(String[] args) {
try {
if(args.length!=2) {
System.exit(1);
}
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(ETLJob.class);
job.setMapperClass(ETLMapper.class);
//表示没有Reducer
job.setNumReduceTasks(0);
job.setMapOutputKeyClass(NullWritable.class);
job.setMapOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
} catch (IOException | ClassNotFoundException | InterruptedException e) {
e.printStackTrace();
}
}
}
4)执行maven install将代码打包成jar,上传执行如下命令运行:
hadoop jar etl-0.0.1-SNAPSHOT.jar com.pzy.etl.ETLJob /input/ems.txt /outputfahuo






