Hadoop经典入门案例实战-WordCount单词统计代码实现及原理分析

大数据技术 潘老师 4年前 (2020-07-24) 2794 ℃ (0) 扫码查看

Hadoop入门学习最经典的案例就是WordCount单词统计实战案例,这个案例可以帮你迅速理解hadoop中mapreduce执行过程及其原理。下面潘老师通过代码的实现并结合图文说明,给大家详解下具体内容。

1、保证Hadoop伪分布式环境正常启动。如果还没有搭建好,请参考博文:Hadoop3.x伪分布式环境搭建图文详解教程,让你快速入门大数据开发
2、保证Eclipse正确整合了Maven。如果还没有整合,请参考博文:Maven安装与配置,学习整合Eclipse构建第一个Maven项目

首选我们有一个文本文件,里面的内容很简单,就3行共6个单词,每个单词之间用空格隔开,具体内容如下,你可以复制一下内容粘贴到自己创建的words.txt中。

Deer Bear River
Car Car River
Deer Car Bear

我们的需求是将每个单词在文本中出现的次数统计出来,统计结果格式如下:

Bear 2
Car 3
Deer 2
River 2

先直接上执行过程原理图:
Hadoop经典入门案例实战-WordCount单词统计代码实现及原理分析

图中整个MapReduce统计过程主要分为Split、Map、Shuffle和Reduce 4个阶段,每个阶段作用如下:
1)Split:首先大文件会被切分成多份,假设这里被切为3份,每一行代表一份
2)Map:解析出每个单词,并在后面记上数字1
3)Shuffle:将每一份中的单词分组到一起,并默认按照字母顺序进行排序
4)Reduce:将相同的单词进行;累加
5)将结果输出结果到HDFS

1、首先启动Hadoop,然后使用rz指令先将words.txt文件上传至/usr/hadoop目录(也可以自定义目录),然后ls查看下。
Hadoop经典入门案例实战-WordCount单词统计代码实现及原理分析
2、使用hdfs指令hdfs dfs -mkdir /input在HDFS根目录下新建在input文件夹(名称可自定义),然后执行查看指令hdfs dfs -ls /
Hadoop经典入门案例实战-WordCount单词统计代码实现及原理分析
3、使用hdfs指令hdfs dfs -put /usr/hadoop/words.txt /input将words.txt上传到input目录。
Hadoop经典入门案例实战-WordCount单词统计代码实现及原理分析
4、使用指令hdfs dfs -cat /input/words.txt查看上传的文本内容
Hadoop经典入门案例实战-WordCount单词统计代码实现及原理分析
5、在Eclipse中新建Maven项目wordconut,并删除App.java和AppTest.java文件。
Hadoop经典入门案例实战-WordCount单词统计代码实现及原理分析
6、在pom.xml的dependencies添加hadoop包依赖(也可顺便删除junit依赖,因为我们这里用不到),具体如下:

注意:这里的hadoop版本要和你的虚拟机上的hadoop版本保持一致,我这里是hadoop3.2.1版本
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.2.1</version>
    </dependency>

7、在wordcount项目中自定义Mapper类,具体代码如下:

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
/***
 * 自定义Mapper类,需要继承Mapper类
 *     在这里原来的Long变为了LongWritable类型
 *  字符串变为了Text类型,int变为了IntWritable类型
 */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    /**
     * 需要重写map方法
     * 该方法输入k1,v1参数,输出k2,v2参数
     */
    @Override
    protected void map(LongWritable k1, Text v1, Context context)
            throws IOException, InterruptedException {
        //k1代表每一行行首的偏移量,v1代表每一行的内容
        //对每一行内容进行切割,把单词切割出来
        String[] words = v1.toString().split(" ");
        //迭代切割出来的单词数组
        for(String word:words) {
            //把迭代出来的单词封装成<k2,v2>形式
            Text k2 = new Text(word);
            IntWritable v2 = new IntWritable(1);
            //将<k2,v2>写出去
            context.write(k2, v2);
        }
    }
}

8、在wordcount项目中自定义Reducer类,具体代码如下:

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
 * 自定义Reducer类,继承Reducer类
 */
public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    /**
     * 需要重写reduce方法,针对v2s数据进行累加求和
     * 并最终把数据转换为<k3,v3>写出去
     */
    @Override
    protected void reduce(Text k2, Iterable<IntWritable> v2s,Context context) throws IOException, InterruptedException {
        //创建sum变量,保存v2s和
        int sum = 0;
        //迭代v2s,累加求和
        for(IntWritable v2:v2s) {
            sum += v2.get();
        }
        //组装<k3,v3>
        Text k3 = k2;
        IntWritable v3 = new IntWritable(sum);
        //将<k3,v3>写出去
        context.write(k3, v3);
    }
}

9、在wordcount项目中自定义Job类,具体代码如下:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


/**
 * 单词统计任务类
 */
public class WordCountJob {
    //组装job=map+reduce
    public static void main(String[] args) {
        try {
            if(args.length!=2) {
                //如果传递的参数个数不够,直接退出
                System.exit(1);
            }
            //job需要配置的参数
            Configuration conf = new Configuration();
            //创建一个job
            Job job = Job.getInstance(conf);
            
            //设置主方法-这一行必须设置,否则在集群中执行无法找到WordCountJob这个类
            job.setJarByClass(WordCountJob.class);
            //指定输入路径(可以是文件也可以是目录)
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            //指定输出路径(只能是一个不存在的目录)
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            //指定Mapper相关代码
            job.setMapperClass(WordCountMapper.class);
            //指定k2的类型
            job.setMapOutputKeyClass(Text.class);
            //指定v2的类型
            job.setMapOutputValueClass(IntWritable.class);
            //指定Reducer相关代码
            job.setReducerClass(WordCountReducer.class);
            //指定k3类型
            job.setOutputKeyClass(Text.class);
            //指定v3类型
            job.setOutputValueClass(IntWritable.class);
            //提交job
            job.waitForCompletion(true);
            
        } catch (IOException | ClassNotFoundException | InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

10、在pom.xml添加编译打包配置,具体在dependencies依赖配置同级别添加如下配置:

注意:这里的JDK版本,依据自己安装的JDK版本来设置
<build>
    <plugins>
        <!-- compiler插件, 设定JDK版本 -->
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.3.2</version>
            <configuration>
                <encoding>UTF-8</encoding>
                <source>1.8</source>
                <target>1.8</target>
                <showWarnings>true</showWarnings>
            </configuration>
        </plugin>
        <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
                <archive>
                    <manifest>
                        <mainClass></mainClass>
                    </manifest>
                </archive>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

11、修改hadoop依赖,限制使用范围,变成如下配置:

<dependencies>
    <!-- hadoop-client依赖 -->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.2.1</version>
        <!-- 表示只在编译时使用这个依赖,在执行及打包时都不依赖 -->
        <scope>provided</scope>
    </dependency>
  </dependencies>

12、右键wordcount项目->run as->maven install,执行打包操作。
注意,如果出现如图所示错误:
Hadoop经典入门案例实战-WordCount单词统计代码实现及原理分析
则需要检查Window->Preferences->Installed JREs配置的是否是jdk目录,而不是jre目录,确保是jdk目录,然后重新执行上面的指令。
Hadoop经典入门案例实战-WordCount单词统计代码实现及原理分析
13、执行成功后,在wordcount项目的target文件夹下,会生成两个jar包,一个是带依赖的,一个是不带依赖的,这里我们使用不带依赖的,也就是第二个。
Hadoop经典入门案例实战-WordCount单词统计代码实现及原理分析
14、将wordcount-0.0.1-SNAPSHOT.jar上传到/usr/hadoop目录。
Hadoop经典入门案例实战-WordCount单词统计代码实现及原理分析
15、执行任务指令:
hadoop jar wordcount-0.0.1-SNAPSHOT.jar com.pzy.wordcount.WordCountJob /input/words.txt /outputwords

注意指令格式(包路径一定不能少,输出路径一定是HDFS中不存在的文件夹):
hadoop jar XX.jar XX全类名 参数1(数据输入路径) 参数2(数据输出路径) 参数3 参数4…

16、耐心等待执行完成,直到map 100%,reduce 100%。
Hadoop经典入门案例实战-WordCount单词统计代码实现及原理分析
也可去浏览器访问YARN的ResourceManager界面,我这里地址是http://192.168.217.100:8088/cluster,查看任务状态。
Hadoop经典入门案例实战-WordCount单词统计代码实现及原理分析
17、使用如下两个指令查看统计结果:
hdfs dfs -ls /outputwords
hdfs dfs -cat /outputwords/part-r-00000

Hadoop经典入门案例实战-WordCount单词统计代码实现及原理分析

相信大家通过这个经典入门案例的学习,一定对hadoop大数据处理有了新的认识,对MapReduce执行原理也有了进一步的理解。


版权声明:本站文章,如无说明,均为本站原创,转载请注明文章来源。如有侵权,请联系博主删除。
本文链接:https://www.panziye.com/java/bigdata/359.html
喜欢 (6)
请潘老师喝杯Coffee吧!】
分享 (0)
用户头像
发表我的评论
取消评论
表情 贴图 签到 代码

Hi,您需要填写昵称和邮箱!

  • 昵称【必填】
  • 邮箱【必填】
  • 网址【可选】