ETL,是Extract-Load-Transform的缩写,用来描述将数据从来源端经过抽取(extract)、(清洗)转换(transform)、加载(load)至目的端的过程,目的是将企业中的分散、零乱、标准不统一的数据整合到一起,为企业管理层的决策提供分析依据,ETL是BI(商业智能)项目重要的一个环节。下面潘老师通过一个简单的数据清洗案例,来带大家初步认识ETL。
1、数据清洗:将不规整数据转化为规整数据。在运行核心业务MapReduce程序之前,往往要先对数据进行清洗,清理掉不符合用户要求 的数据。清理的过程往往只需要运行Mapper程序,不需要运行Reduce程序。
2、数据清洗目的:刚刚采集到HDFS中的原生数据,我们也称为不规整数据,即目前来说,该数据的格式还无 法满足我们对数据处理的基本要求,需要对其进行预处理,转化为我们后面工作所需要的较 为规整的数据,所以这里的数据清洗,其实指的就是对数据进行基本的预处理,以方便我们 后面的统计分析,所以这一步并不是必须的,需要根据不同的业务需求来进行取舍,只是在 我们的场景中需要对数据进行一定的处理。
1、数据说明
本案例采用简化后的发货信息模拟数据(部分),其文本格式如下(字段之间使用tab键隔开),存放在名为ems.txt文件中:
发货地 收货地 订单平台 发货时间(毫秒值)快递类型 ZheJiang ShangDong TIANMAO 1585105319000 YOUZHENG SiChuan FuJian TAOBAO 1586874652000 SHUNFENG HeiBei GuangDong TIANMAO 1584691816000 YUNDA
2、清洗要求
1)提取字段只要:订单平台 发货时间 快递类型
2)订单平台只获取TAOBAO的
3)对发货时间进行格式化为”yyyy-MM-dd HH:mm:ss”
4)字段之间用”,”隔开
3、过程实现
1)这里我将ems.txt上传至HDFS的/input目录下。
2)实现发货数据清洗类
public class ETLMapper extends Mapper<LongWritable, Text, NullWritable, Text> { @Override protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { //对v1按照空格分隔 String[] datas = v1.toString().split("\t"); String plat = datas[2]; //只获取订单平台 发货时间 快递类型 -平台限制为TAOBAO if (!"TAOBAO".equals(plat)) { return; } Date date = new Date(); date.setTime(Long.parseLong(datas[3])); //格式化日期 String time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(date); String ems = datas[4]; //组装k2,v2,中间用逗号隔开,k2为空 Text v2 = new Text(plat + "," + time + "," + ems); //写出<k2,v2> context.write(NullWritable.get(), v2); } }
3)发货数据清洗任务类
public class ETLJob { public static void main(String[] args) { try { if(args.length!=2) { System.exit(1); } Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(ETLJob.class); job.setMapperClass(ETLMapper.class); //表示没有Reducer job.setNumReduceTasks(0); job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(Text.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true); } catch (IOException | ClassNotFoundException | InterruptedException e) { e.printStackTrace(); } } }
4)执行maven install
将代码打包成jar,上传执行如下命令运行:
hadoop jar etl-0.0.1-SNAPSHOT.jar com.pzy.etl.ETLJob /input/ems.txt /outputfahuo