Hadoop-MapReduce详解-3

Hadoop-MapReduce详解-3

[TOC]

Hadoop-MapReduce详解-3

第4章 Hadoop数据压缩

压缩概述

压缩技术能够有效减少底层存储系统(HDFS)读写字节数。压缩提高了网络带宽和磁盘空间的效率。在运行MR程序时, I/O操作、网络数据传输、Shufle和Merge要花大量的时间,尤其是数据规模很大和工作负载密集的情况下,因此,使用数据压缩显得非常重要。

鉴于磁盘I/O和网络带宽是Hadoop的宝贵资源,数据压缩对于节省资源、最小化磁盘I/O和网络传输非常有帮助。可以在任意MapReduce阶段启用压缩。不过,尽管压缩与解压操作的CPU开销不高,其性能的提升和资源的节省并非没有代价。

压缩是提高Hadoop运行效率的一种优化策略。

通过对Mapper、Reducer运行过程的数据进行压缩,以减少磁盘IO,提高MR程序运行速度。

注意:采用压缩技术减少了磁盘IO,但同时增加了CPU运算负担。所以,压缩特性运用得当能提高性能,但运用不当也可能降低性能。

  • 压缩基本原则:

(1) 运算密集型的job,少用压缩(涉及大量公式,算法)
(2) IO密集型的job,多用压缩(数据传输)

MR支持的压缩编码

压缩格式 hadoop自带? 算法 文件扩展名 是否可切分 换成压缩格式后,原来的程序是否需要修改
DEFLATE 是,直接使用 DEFLATE .deflate 和文本处理一样,不需要修改
Gzip 是,直接使用 DEFLATE .gz 和文本处理一样,不需要修改
bzip2 是,直接使用 bzip2 .bz2 和文本处理一样,不需要修改
LZO 否,需要安装 LZO .lzo 需要建索引,还需要指定输入格式
Snappy 否,需要安装 Snappy .snappy 和文本处理一样,不需要修改

为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器,如下表所示。

压缩格式 对应的编码/解码器
DEFLATE org.apache.hadoop.io.compress.DefaultCodec
gzip org.apache.hadoop.io.compress.GzipCodec
bzip2 org.apache.hadoop.io.compress.BZip2Codec
LZO com.hadoop.compression.lzo.LzopCodec
Snappy org.apache.hadoop.io.compress.SnappyCodec

压缩性能的比较

压缩算法 原始文件大小 压缩文件大小 压缩速度 解压速度
gzip 8.3GB 1.8GB 17.5MB/s 58MB/s
bzip2 8.3GB 1.1GB 2.4MB/s 9.5MB/s
LZO 8.3GB 2.9GB 49.3MB/s 74.6MB/s

Google/Snappy
github/Snappy

压缩方式选择

Gzip压缩

  • 优点

压缩率比较高,而且压缩解压速度也比较快;
Hadoop本身支持, 在应用中处理Gzip格式的文件就和直接处理文本一样;
大部分Linux系统都自带Gzip命令,使用方便。

  • 缺点

不支持Split。

  • 应用场景

当每个文件压缩之后在130M以内的(1个块大小内) ,都可以考虑用Gzip压缩格式。例如说一天或者一个小时的日志压缩成一个Gzip文件。

Bzip2压缩

  • 优点

支持Split;
具有很高的压缩率,比Gzip压缩率都高;
Hadoop本身自带,使用方便。

  • 缺点

压缩/解压速度慢。

  • 应用场景

适合对速度要求不高,但需要较高的压缩率的时候;
或者输出之后的数据比较大,处理之后的数据需要压缩存档减少磁盘空间并且以后数据用得比较少的情况(历史数据备份);
或者对单个很大的文本文件想压缩减少存储空间,同时又需要支持Split,而且兼容之前的应用程序的情况。

Lzo压缩

  • 优点

压缩/解压速度也比较快,合理的压缩率;
支持Split,是Hadoop中最流行的压缩格式;
可以在Linux系统下安装Izop命令,使用方便。

  • 缺点

压缩率比Gzip要低一些;
Hadoop本身不支持,需要安装;
在应用中对Lzo格式的文件需要做一些特殊处理(为了支持Split需要建索引,还需要指定InputFormat为Lzo格式)

  • 应用场景

一个很大的文本文件,压缩之后还大于200M以,上的可以考虑,而且单个文件越大,Lzo优点越越明显。

Snappy压缩

  • 优点

高速压缩速度和合理的压缩率。

  • 缺点

不支持Split;
压缩率比Gzip要低;
Hadoop本身不支持, 需要安装。

  • 应用场景

当NapReduce作业的Map输出的数据比较大的时候, 作为Map到Redunce的中间数据的压缩格式;
或者作为一个MapRednce作业的输出和另外一个MapReduce作业的输入。

压缩位置选择

输入端采用压缩

在有大量数据并计划重复处理的情况下,应该考虑对输入进行压缩。然而,你无须显示指定使用的编解码方式。
Hadoop自动检查文件扩展名,如果扩展名能够匹配,就会用恰当的编解码方式对文件进行压缩和解压。
否则,Hadoop就不会使用任何编解码器。

Mapper输出采用压缩

当Map任务输出的中间数据量很大时,应考虑在此阶段采用压缩技术。这能显著改善内部数据Shufne过程,而Shffle过程在Hadoop处理过程中是资源肖耗最多的环节。
如果发现数据量大造成网络传输缓慢,应该考虑使用压缩技术。
可用于压缩Mapper 输出的快速编解码器包括LZO或者Snappy。

注: LZO是供Hadoop压缩数据用的通用压缩编解码器。其设计目标是达到与硬盘读取速度相当的压缩速度,因此速度是优先考虑的因素,而不是压缩率。与Gzip编解码器相比,它的压缩速度是Gzip的5倍,而解压速度是Gzp的2倍。同一个文件用LZO压缩后比用Gzip压缩后大50%,但比压缩前小25%~ 50%。这对改善性能非常有利。Map阶段完成时间快4倍。

Reducer输出采用压缩

在此阶段启用压缩技术能够减少要存储的数据量,因此降低所需的磁盘空间。当MapReduce作业形成作业链条时,因为第二个作业的输入也已压缩,所以启用压缩同样有效。

压缩参数配置

要在Hadoop中启用压缩,可以配置如下参数:

参数 默认值 阶段 建议
io.compression.codecs(在core-site.xml中配置) org.apache.hadoop.io.compress.DefaultCodec, org.apache.hadoop.io.compress.GzipCodec, org.apache.hadoop.io.compress.BZip2Codec 输入压缩 Hadoop使用文件扩展名判断是否支持某种编解码器
mapreduce.map.output.compress(在mapred-site.xml中配置) false mapper输出 这个参数设为true启用压缩
mapreduce.map.output.compress.codec(在mapred-site.xml中配置) org.apache.hadoop.io.compress.DefaultCodec mapper输出 企业多使用LZO或Snappy编解码器在此阶段压缩数据
mapreduce.output.fileoutputformat.compress(在mapred-site.xml中配置) false reducer输出 这个参数设为true启用压缩
mapreduce.output.fileoutputformat.compress.codec(在mapred-site.xml中配置) org.apache.hadoop.io.compress.DefaultCodec reducer输出 使用标准工具或者编解码器,如gzip和bzip2
mapreduce.output.fileoutputformat.compress.type(在mapred-site.xml中配置) RECORD reducer输出 SequenceFile输出使用的压缩类型:NONE和BLOCK

压缩实操案例

数据流的压缩和解压缩

CompressionCodec有两个方法可以用于轻松地压缩或解压缩数据。

要想对正在被写入一个输出流的数据进行压缩,我们可以使用reateOutputStream(OutputStreamout)方法创建一个CompressionOutputStream,将其以压缩格式写入底层的流。

相反,要想对从输入流读取而来的数据进行解压缩,则调用createInputStream(InputStreamin)函数,从而获得一个CompressionInputStream, 从而从底层的流读取未压缩的数据。

测试

DEFLATE org.apache.hadoop.io.compress.DefaultCodec
gzip org.apache.hadoop.io.compress.GzipCodec
bzip2 org.apache.hadoop.io.compress.BZip2Codec

TestCompress.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package com.atguigu.mr.compress;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionInputStream;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.util.ReflectionUtils;

import java.io.*;

public class TestCompress {
public static void main(String[] args) throws IOException, ClassNotFoundException {
// compress("f:/IDEAWS/dashju/compressinput/hello.txt", "org.apache.hadoop.io.compress.BZip2Codec");
// compress("f:/IDEAWS/dashju/compressinput/hello.txt", "org.apache.hadoop.io.compress.GzipCodec");
// compress("f:/IDEAWS/dashju/compressinput/hello.txt", "org.apache.hadoop.io.compress.DefaultCodec");

decompress("f:/IDEAWS/dashju/compressinput/hello.txt.bz2");
}

// 压缩
private static void compress(String filename, String method) throws IOException, ClassNotFoundException {
// 1 获取输入流
FileInputStream fis = new FileInputStream(new File(filename));

// 2 获取输出流
// 反射获取流
Class codeClass = Class.forName(method);
CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codeClass, new Configuration());

FileOutputStream fos = new FileOutputStream(new File(filename + codec.getDefaultExtension()));

// 包装输出流
CompressionOutputStream cos = codec.createOutputStream(fos);

// 3 流的对拷
IOUtils.copyBytes(fis, cos, 1024 * 1024 * 5, false);

// 4 关闭资源
IOUtils.closeStream(cos);
IOUtils.closeStream(fos);
IOUtils.closeStream(fis);
}

// 解压缩
private static void decompress(String filename) throws IOException {
// 1校验是否可以解压缩
CompressionCodecFactory compressionCodecFactory = new CompressionCodecFactory(new Configuration());
CompressionCodec compressionCodec = compressionCodecFactory.getCodec(new Path(filename));

if (null == compressionCodec) {
System.out.println("不能解压缩");
return;
}

// 2 获取输入流
FileInputStream fis = new FileInputStream(new File(filename));
CompressionInputStream cis = compressionCodec.createInputStream(fis);

// 3 获取输出流
FileOutputStream fos = new FileOutputStream(new File(filename + "decode"));

// 4 流的对拷
IOUtils.copyBytes(cis, fos, 1024 * 1024 * 5, false);

// 5 关闭资源
IOUtils.closeStream(fos);
IOUtils.closeStream(cis);
IOUtils.closeStream(fis);
}
}

Map输出端采用压缩

即使你的MapReduce的输入输出文件都是未压缩的文件,你仍然可以对Map任务的中间结果输出做压缩,因为它要写在硬盘并且通过网络传输到Reduce节点,对其压缩可以提高很多性能,这些工作只要设置两个属性即可,我们来看下代码怎么设置。

不影响输出结果,只是提高IO效率。

在wordcount案例中对WordCountDriver.java修改配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Configuration configuration = new Configuration();

// 开启map端输出压缩
configuration.setBoolean("mapreduce.map.output.compress", true);
// 设置map端输出压缩方式
configuration.setClass("mapreduce.map.output.compress.codec", BZip2Codec.class, CompressionCodec.class);

Job job = Job.getInstance(configuration);
job.setJarByClass(WordCountDriver.class);
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean result = job.waitForCompletion(true);
System.exit(result ? 1 : 0);

Reduce输出端采用压缩

最终输出文件为压缩文件。

在wordcount案例中对WordCountDriver.java修改配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(WordCountDriver.class);
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 设置reduce端输出压缩开启
FileOutputFormat.setCompressOutput(job, true);

// 设置压缩的方式
FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class);

//修改map端输出和reduce输出,采用不一样的格式,最终输出格式由reduce端输出决定
//FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
//FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);

boolean result = job.waitForCompletion(true);
System.exit(result?1:0);

第5章 Yarn资源调度器

Yarn是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作系统平台,而MapReduce等运算程序则相当于运行于操作系统之上的应用程序。

Yarn基本架构

YARN主要由ResourceManagerNodeManagerApplicationMasterContainer等组件构成。

Yarn工作机制

(1)MR程序提交到客户端所在的节点。
(2)YarnRunner向ResourceManager申请一个Application。
(3)RM将该应用程序的资源路径返回给YarnRunner。
(4)该程序将运行所需资源提交到HDFS上。
(5)程序资源提交完毕后,申请运行mrAppMaster。
(6)RM将用户的请求初始化成一个Task。
(7)其中一个NodeManager领取到Task任务。
(8)该NodeManager创建容器Container,并产生MRAppmaster。
(9)Container从HDFS上拷贝资源到本地。
(10)MRAppmaster向RM 申请运行MapTask资源。
(11)RM将运行MapTask任务分配给另外两个NodeManager,另两个NodeManager分别领取任务并创建容器。
(12)MR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动MapTask,MapTask对数据分区排序。
(13)MrAppMaster等待所有MapTask运行完毕后,向RM申请容器,运行ReduceTask。
(14)ReduceTask向MapTask获取相应分区的数据。
(15)程序运行完毕后,MR会向RM申请注销自己。

作业提交全过程

Yarn阶段

作业提交全过程详解

(1)作业提交

第1步:Client调用job.waitForCompletion方法,向整个集群提交MapReduce作业。
第2步:Client向RM申请一个作业id。
第3步:RM给Client返回该job资源的提交路径和作业id。
第4步:Client提交jar包、切片信息和配置文件到指定的资源提交路径。
第5步:Client提交完资源后,向RM申请运行MrAppMaster。

(2)作业初始化

第6步:当RM收到Client的请求后,将该job添加到容量调度器中。
第7步:某一个空闲的NM领取到该Job。
第8步:该NM创建Container,并产生MRAppmaster。
第9步:下载Client提交的资源到本地。

(3)任务分配

第10步:MrAppMaster向RM申请运行多个MapTask任务资源。
第11步:RM将运行MapTask任务分配给另外两个NodeManager,另两个NodeManager分别领取任务并创建容器。

(4)任务运行

第12步:MR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动MapTask,MapTask对数据分区排序。
第13步:MrAppMaster等待所有MapTask运行完毕后,向RM申请容器,运行ReduceTask。
第14步:ReduceTask向MapTask获取相应分区的数据。
第15步:程序运行完毕后,MR会向RM申请注销自己。

(5)进度和状态更新

YARN中的任务将其进度和状态(包括counter)返回给应用管理器, 客户端每秒(通过mapreduce.client.progressmonitor.pollinterval设置)向应用管理器请求进度更新, 展示给用户。

(6)作业完成

除了向应用管理器请求作业进度外, 客户端每5秒都会通过调用waitForCompletion()来检查作业是否完成。时间间隔可以通过mapreduce.client.completion.pollinterval来设置。作业完成之后, 应用管理器和Container会清理工作状态。作业的信息会被作业历史服务器存储以备之后用户核查。

MapReduce阶段

资源调度器

目前,Hadoop作业调度器主要有三种:FIFOCapacity SchedulerFair Scheduler
Hadoop2.7.2默认的资源调度器是Capacity Scheduler。

具体设置详见:yarn-default.xml文件

1
2
3
4
5
<property>
<description>The class to use as the resource scheduler.</description>
<name>yarn.resourcemanager.scheduler.class</name>
<value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value>
</property>

先进先出调度器(FIFO)

容量调度器(Capacity Scheduler)

为多个FIFO调度器组合,并发度较高。

1、支持多个队列,每个队列可配置一定的资源量,每个队列采用FIFO调度策略。

2、为了防止同一个用户的作业独占队列中的资源,该调庶器会对同一用户提交的作业所占资源虽进行限定。

3、首先,计算每个队列中正在运行的任务数与其应该分得的计算资原之间的比值,选择一个该比值最小的队列——最闲的。

4、其次,按照作业优先级和提交时间顺字,同时考虑用户资源量限制和内存限制对队列内任务排序。

5、三个队列同时按照任务的先后顺序依次执行,比如,jobl1、 job21和jb3l分别排在队列最前面,先运行,也是并行运行。

公平调度器(Fair Scheduler)

缺额排序,多个FIFO调度器组合。

支持多队列多用户,每个队列中的资源量可以配置,同一队列中的作业公平共享队列中所有资源。

比如有三个队列: queueA、 queueB和queueC, 每个队列中的job按照优先级分配资源,优先级越高分配的资源越多,但是每个job都会分配到资源以确保公平。

在资源有限的情况下,每个job理想情兄下获得的计算资源与实际获得的计算资源存在一种差距,这个差距就叫做缺额

在同一个队列中,job的资源缺额越大,越先获得资源优先执行。作业是按照缺额的高低来先后执行的,而且可以看到上图有多个作业同时运行。

任务的推测执行

1.作业完成时间取决于最慢的任务完成时间

一个作业由若干个Map任务和Reduce任务构成。因硬件老化、软件Bug等,某些任务可能运行非常慢。

思考:系统中有99%的Map任务都完成了,只有少数几个Map老是进度很慢,完不成,怎么办?

2.推测执行机制

发现拖后腿的任务,比如某个任务运行速度远慢于任务平均速度。为拖后腿任务启动一个备份任务,同时运行。谁先运行完,则采用谁的结果。

  1. 执行推测任务的前提条件

(1)每个Task只能有一个备份任务
(2)当前Job已完成的Task必须不小于0.05(5%)
(3)开启推测执行参数设置。mapred-site.xml文件中默认是打开的。

1
2
3
4
5
6
7
8
9
10
11
<property>
<name>mapreduce.map.speculative</name>
<value>true</value>
<description>If true, then multiple instances of some map tasks may be executed in parallel.</description>
</property>

<property>
<name>mapreduce.reduce.speculative</name>
<value>true</value>
<description>If true, then multiple instances of some reduce tasks may be executed in parallel.</description>
</property>
  1. 不能启用推测执行机制情况

(1)任务间存在严重的负载倾斜;
(2)特殊任务,比如任务向数据库中写数据。

  1. 算法原理

假设某一时刻,任务T的执行进度为progress,则可通过一定的算法推测出该任务的最终完成时刻estimateEndTime。另一方面,如果此刻为该任务启动一个备份任务,则可推断出它可能的完成时刻estimateEndTime2;于是可得出以下几个公式:

1
2
estimatedRunTime = (currentTimestamp - taskStartTime) / progress
推测运行时间(60s) = (当前时刻(6) - 任务启动时刻(0)) / 任务运行比例(10%)
1
2
estimateEndTime = estimatedRunTime + taskStartTime
推测执行完时刻(60) = 推测运行时间(60s) + 任务启动时刻(0)
1
2
estimateEndTime2 = currentTimestamp + averageRunTime
备份任务推测完成时刻(16) = 当前时刻(6) + 运行完成任务的平均时间(10s)

(1)MR总是选择(estimateEndTime - estimateEndTime2)差值最大的任务,并为之启动备份任务。

(2)为了防止大量任务同时启动备份任务造成的资源浪费,MR为每个作业设置了同时启动的备份任务数目上限。

(3)推测执行机制实际上采用了经典的优化算法:以空间换时间,它同时启动多个相同任务处理相同的数据,并让这些任务竞争以缩短数据处理时间。显然,这种方法需要占用更多的计算资源。在集群资源紧缺的情况下,应合理使用该机制,争取在多用少量资源的情况下,减少作业的计算时间。

第6章 Hadoop企业优化

MapReduce跑得慢的原因

MapReduce程序效率的瓶颈在于两点

  • 计算机性能

CPU、内存、磁盘健康、网络

  • 1/O 操作优化

(1)数据倾斜;
(2)Map和Reduce数设置不合理;
(3)Map运行时间太长,导致Reduce等待过久;
(4)小文件过多;
(5)大量的不可分块的超大文件;
(6)Spill次数过多;
(7)Merge次数过多。

MapReduce优化方法

MapReduce优化方法主要从六个方面考虑:数据输入、Map阶段、Reduce 阶段、Io传输、数据倾斜问题和常用的调优参数。

数据输入

(1)合并小文件在执行MR任务前将小文件进行合并,大量的小文件会产生大量的Map任务,增大Map任务装载次数,而任务的装载比较耗时,从而导致MR运行较慢。

(2)采用CombineTextInputFormat来作为输入,解决输入端大小文件场景。

Map阶段

(1)减少溢写(Spill)次数

通过调整io.sort.mb(环形缓冲区)及sort.spill.percent(到达多少开始溢写)参数值,增大触发Spill的内存上限,减少Spill次数,从而减少磁盘IO。

(2)减少合并(Merge)次数

通过调整io.sort.factor参数,增大Merge的文件数目,减少Merge的次数,从而缩短MR处理时间。

(3)在Map之后,不影响业务逻辑前提下,先进行Combine处理,减少I/O。

Reduce阶段

(1)合理设置Map和Reduce数

两个都不能设置太少,也不能设置太多。太少,会导致Task等待,延长处理时间;太多,会导致Map、Reduce任务间竞争资源,造成处理超时等错误。

(2)设置Map、Reduce共存

调整slowstart.completedmaps参数, 使Map运行到一定程度后,Reduce也开始运行,减少Reduce的等待时间。

(3)规避使用Reduce

因为Reduce在用于连接数据集的时候将会产生大量的网络消耗。

(4)合理设置Reduce端的Buffer

默认情况下,数据达到一个阈值的时候,Buffer中的数据就会写入磁盘,然后Reduce会从磁盘中获得所有的数据。也就是说,Buffen和Reduce是没有直接关联的,中间多次写磁盘->读磁盘的过程,既然有这个弊端,那么就可以通过参数来配置,使得Buffer中的一 分数据可以直接输送到Reduce,从而减少IO开销:mapreduce.reduce.input.bufer.percent,默认为0.0。当值大于0的时候,会保留指定比例的内存读Buffer中的数据直接拿给Reduce使用。这样一来,设置Buffer需要内存,读取数据需要内存,Reduce计算也要内存,所以要根据作业的运行情况进行调整。

IO传输

(1)采用数据压缩的方式,减少网络IO的的时间。安装Snappy和LZO压缩编码器。

(2)使用SequenceFile二进制文件。

数据倾斜问题

数据倾斜现象

数据频率倾斜:某一个区域的数据要远远大于其他区域。

数据大小倾斜:部分记录的大小远远大于平均值。

减少数据倾斜的方法

  • 方法1:抽样和范围分区

可以通过对原始数据进行抽样得到的结果集来预设分区边界值。

  • 方法2:自定义分区

基于输出键的背景知识进行自定义分区。例如,如果Map输出键的单词来源于一本书。且其中某几个专业司汇较多。那么就可以自定义分区将这这些专业司汇发送给固定的一部分Reduce实例。而将其他的都发送给剩余的Reduce实例。

  • 方法3:Combine

使用Combine可以大量地减小数据倾斜。在可能的情况下,Cobine的目的就是聚合并精简数据。

  • 方法4:采用Map Join,尽量避免Reduce Join。

常用的调优参数

资源相关参数

(1)以下参数是在用户自己的MR应用程序中配置就可以生效(mapred-default.xml

配置参数 参数说明
mapreduce.map.memory.mb 一个MapTask可使用的资源上限(单位:MB),默认为1024。如果MapTask实际使用的资源量超过该值,则会被强制杀死。(开发中2-4G)
mapreduce.reduce.memory.mb 一个ReduceTask可使用的资源上限(单位:MB),默认为1024。如果ReduceTask实际使用的资源量超过该值,则会被强制杀死。
mapreduce.map.cpu.vcores 每个MapTask可使用的最多cpu core数目,默认值:1。
mapreduce.reduce.cpu.vcores 每个ReduceTask可使用的最多cpu core数目,默认值:1。
mapreduce.reduce.shuffle.parallelcopies 每个Reduce去Map中取数据的并行数,默认值:5。
mapreduce.reduce.shuffle.merge.percent Buffer中的数据达到多少比例开始写入磁盘,默认值:0.66。
mapreduce.reduce.shuffle.input.buffer.percent Buffer大小占Reduce可用内存的比例,默认值:0.7。
mapreduce.reduce.input.buffer.percent 指定多少比例的内存用来存放Buffer中的数据,默认值:0。

(2)应该在YARN启动之前就配置在服务器的配置文件中才能生效(yarn-default.xml

配置参数 参数说明
yarn.scheduler.minimum-allocation-mb 给应用程序Container分配的最小内存,默认值:1024。
yarn.scheduler.maximum-allocation-mb 给应用程序Container分配的最大内存,默认值:8192。
yarn.scheduler.minimum-allocation-vcores 每个Container申请的最小CPU核数,默认值:1。
yarn.scheduler.maximum-allocation-vcores 每个Container申请的最大CPU核数,默认值:32。
yarn.nodemanager.resource.memory-mb 给Containers分配的最大物理内存,默认值:8192。

(3)Shuffle性能优化的关键参数,应在YARN启动之前就配置好(mapred-default.xml

配置参数 参数说明
mapreduce.task.io.sort.mb Shuffle的环形缓冲区大小,默认值:100M。
mapreduce.map.sort.spill.percent 环形缓冲区溢出的阈值,默认值:80%。

容错相关参数(MapReduce性能优化)

配置参数 参数说明
mapreduce.map.maxattempts 每个Map Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。
mapreduce.reduce.maxattempts 每个Reduce Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。
mapreduce.task.timeout Task超时时间,经常需要设置的一个参数,该参数表达的意思为:如果一个Task在一定时间内没有任何进入,即不会读取新的数据,也没有输出数据,则认为该Task处于Block状态,可能是卡住了,也许永远会卡住,为了防止因为用户程序永远Block住不退出,则强制设置了一个该超时时间(单位毫秒),默认是600000。如果你的程序对每条输入数据的处理时间过长(比如会访问数据库,通过网络拉取数据等),建议将该参数调大,该参数过小常出现的错误提示是AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by the ApplicationMaster.

HDFS小文件优化方法

HDFS小文件弊端

HDFS上每个文件都要在NameNode上建立一个索引,这个索引的大小约为150byte,这样当小文件比较多的时候,就会产生很多的索引文件,一方面会大量占用NameNode的内存空间,另一方面就是索引文件过大使得索引速度变慢。

HDFS小文件解决方案

小文件的优化无非以下几种方式:

(1)在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS。
(2)在业务处理之前,在HDFS上使用MapReduce程序对小文件进行合并。
(3)在MapReduce处理时,可采用CombineTextInputFormat提高效率。

    1. Hadoop Archive

是一个高效地将小文件放入HDFS块中的文件存档工具,它能够将多个小文件打包成一个HAR文件,这样就减少了NameNode的内存使用。

    1. SequenceFile

SequenceFile由一系列的二进制key/value组成,如果key为文件名,value为文件内容,则可以将大批小文件合并成一个大文件。

    1. CombineFileInputFormat

CombineFileInputFormat是一种新的InputFomat, 用于将多个文件台并成一个单独的Split,另外,它会考虑数据的存储位置。

    1. 开启JVM重用

对于大量小文件Job,可以开启JVM重用会减少45%运行时间。

JVM重用原理:一个Map运行在一个JVM上,开启重用的话,该Map在JVM上运行完毕后,JVM继续运行其他Map。

具体设置: mapreduce.job.jvm.numtasks(JVM线程池)值在10-20之间。

第7章 MapReduce扩展案例

倒排索引案例(多job串联)

需求分析

  • 需求

有大量的文本(文档、网页),需要建立搜索索引。

  • 数据输入

a.txt

1
2
3
atguigu pingping
atguigu ss
atguigu ss

b.txt

1
2
3
atguigu pingping
atguigu pingping
atguigu ss

c.txt

1
2
atguigu ss
atguigu pingping
  • 期望输出
1
2
3
atguigu	c.txt-->2	b.txt-->2	a.txt-->3	
pingping c.txt-->1 b.txt-->3 a.txt-->1
ss c.txt-->1 b.txt-->1 a.txt-->2

代码实现

第一次处理

(1)第一次处理,编写OneIndexMapper类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.atguigu.mr.index;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

public class OneIndexMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
String name;
IntWritable v = new IntWritable(1);
Text k = new Text();

@Override
protected void setup(Context context) throws IOException, InterruptedException {
//获取文件名称
FileSplit fileSplit = (FileSplit)context.getInputSplit();
name = fileSplit.getPath().getName();
}

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 获取一行
String line = value.toString();

// 2 切割
String[] fields = line.split(" ");

for (String word : fields) {
// 3 拼接
k.set(word + "--" + name);

// 4 写出
context.write(k, v);
}
}
}

(2)第一次处理,编写OneIndexReducer类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.atguigu.mr.index;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class OneIndexReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
IntWritable v = new IntWritable();

@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;

// 1 累加求和
for (IntWritable value : values) {
sum += value.get();
}

v.set(sum);

// 2 写出
context.write(key, v);
}
}

(3)第一次处理,编写OneIndexDriver类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.atguigu.mr.index;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class OneIndexDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 输入输出路径需要根据自己电脑上实际的输入输出路径设置
args = new String[]{"f:/IDEAWS/dashju/oneindexinput", "f:/IDEAWS/dashju/oneindexoutput"};

Configuration conf = new Configuration();

Job job = Job.getInstance(conf);
job.setJarByClass(OneIndexDriver.class);

job.setMapperClass(OneIndexMapper.class);
job.setReducerClass(OneIndexReducer.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

job.waitForCompletion(true);
}
}

(4)查看第一次输出结果

1
2
3
4
5
6
7
8
9
atguigu--a.txt	3
atguigu--b.txt 3
atguigu--c.txt 2
pingping--a.txt 1
pingping--b.txt 2
pingping--c.txt 1
ss--a.txt 2
ss--b.txt 1
ss--c.txt 1

第二次处理

(1)第二次处理,编写TwoIndexMapper类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.atguigu.mr.index;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class TwoIndexMapper extends Mapper<LongWritable, Text, Text, Text> {
Text k = new Text();
Text v = new Text();

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 获取一行
String line = value.toString();

// 2 用“--“切割
String[] fields = line.split("--");

k.set(fields[0]);
v.set(fields[1]);

// 3 输出
context.write(k, v);
}
}

(2)第二次处理,编写TwoIndexReducer类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.atguigu.mr.index;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class TwoIndexReducer extends Reducer<Text, Text, Text, Text> {
Text v = new Text();

@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
StringBuilder sb = new StringBuilder();
// 1 拼接
for (Text value : values) {
sb.append(value.toString().replace("\t", "-->") + "\t");
}

v.set(sb.toString());

// 2 写出
context.write(key, v);
}
}

(3)第二次处理,编写TwoIndexDriver类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.atguigu.mr.index;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class TwoIndexDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 输入输出路径需要根据自己电脑上实际的输入输出路径设置
args = new String[]{"f:/IDEAWS/dashju/twoindexinput", "f:/IDEAWS/dashju/twoindexoutput"};

Configuration config = new Configuration();
Job job = Job.getInstance(config);

job.setJarByClass(TwoIndexDriver.class);
job.setMapperClass(TwoIndexMapper.class);
job.setReducerClass(TwoIndexReducer.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}

(4)第二次查看最终结果

1
2
3
atguigu	c.txt-->2	b.txt-->3	a.txt-->3	
pingping c.txt-->1 b.txt-->2 a.txt-->1
ss c.txt-->1 b.txt-->1 a.txt-->2

TopN案例

需求分析

需求

对需求序列化输出结果进行加工,输出流量使用量在前10的用户信息。

输入数据

手机号,上行流量,下行流量,总流量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
13470253144	180	180	360
13509468723 7335 110349 117684
13560439638 918 4938 5856
13568436656 3597 25635 29232
13590439668 1116 954 2070
13630577991 6960 690 7650
13682846555 1938 2910 4848
13729199489 240 0 240
13736230513 2481 24681 27162
13768778790 120 120 240
13846544121 264 0 264
13956435636 132 1512 1644
13966251146 240 0 240
13975057813 11058 48243 59301
13992314666 3008 3720 6728
15043685818 3659 3538 7197
15910133277 3156 2936 6092
15959002129 1938 180 2118
18271575951 1527 2106 3633
18390173782 9531 2412 11943
84188413 4116 1432 5548

输出数据

1
2
3
4
5
6
7
8
9
10
13509468723	7335	110349	117684
13975057813 11058 48243 59301
13568436656 3597 25635 29232
13736230513 2481 24681 27162
18390173782 9531 2412 11943
13630577991 6960 690 7650
15043685818 3659 3538 7197
13992314666 3008 3720 6728
15910133277 3156 2936 6092
13560439638 918 4938 5856

代码实现

(1)编写FlowBean类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
package com.atguigu.mr.topN;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class FlowBean implements WritableComparable<FlowBean> {
private long upFlow;
private long downFlow;
private long sumFlow;

public FlowBean() {
super();
}

public FlowBean(long upFlow, long downFlow, long sumFlow) {
this.upFlow = upFlow;
this.downFlow = downFlow;
this.sumFlow = sumFlow;
}

public void setUpFlow(long upFlow) {
this.upFlow = upFlow;
}

public void setDownFlow(long downFlow) {
this.downFlow = downFlow;
}

public void setSumFlow(long sumFlow) {
this.sumFlow = sumFlow;
}

public long getUpFlow() {
return upFlow;
}

public long getDownFlow() {
return downFlow;
}

public long getSumFlow() {
return sumFlow;
}

public void set(long downFlow2, long upFlow2) {
downFlow = downFlow2;
upFlow = upFlow2;
sumFlow = downFlow2 + upFlow2;
}

@Override
public String toString() {
return upFlow + "\t" + downFlow + "\t" + sumFlow;
}

@Override
public int compareTo(FlowBean o) {
int result;

if (this.sumFlow > o.getSumFlow()) {
result = -1;
} else if (this.sumFlow < o.getSumFlow()) {
result = 1;
} else {
result = 0;
}
return result;
}

@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(sumFlow);
}

@Override
public void readFields(DataInput in) throws IOException {
upFlow = in.readLong();
downFlow = in.readLong();
sumFlow = in.readLong();
}
}

(2)编写TopNMapper类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.atguigu.mr.topN;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.Iterator;
import java.util.TreeMap;

public class TopNMapper extends Mapper<LongWritable, Text, FlowBean, Text> {
private TreeMap<FlowBean, Text> flowMap = new TreeMap<FlowBean, Text>();
private FlowBean kBean;

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
kBean = new FlowBean();
Text v = new Text();

// 1 获取一行
String line = value.toString();

// 2 切割
String[] fields = line.split("\t");

// 3 数据封装
String phoneNum = fields[0];
Long upFlow = Long.parseLong(fields[1]);
Long downFlow = Long.parseLong(fields[2]);
Long sumFlow = Long.parseLong(fields[3]);

v.set(phoneNum);

kBean.setUpFlow(upFlow);
kBean.setDownFlow(downFlow);
kBean.setSumFlow(sumFlow);

// 4 向TreeMap中添加数据
flowMap.put(kBean, v);

// 5 限制TreeMap的数据量,超过10条就删除掉流量最小的一条数据
if (flowMap.size() > 10) {
flowMap.remove(flowMap.lastKey());
}
}

@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
// 6 遍历treeMap集合,输出数据
Iterator<FlowBean> bean = flowMap.keySet().iterator();

while (bean.hasNext()) {
FlowBean k = bean.next();
context.write(k, flowMap.get(k));
}
}
}

(3)编写TopNReducer类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.atguigu.mr.topN;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.Iterator;
import java.util.TreeMap;

public class TopNReducer extends Reducer<FlowBean, Text, Text, FlowBean> {
// 定义一个TreeMap作为存储数据的容器(天然按key排序)
TreeMap<FlowBean, Text> flowMap = new TreeMap<FlowBean, Text>();

@Override
protected void reduce(FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
for (Text value : values) {
FlowBean bean = new FlowBean();
bean.set(key.getDownFlow(), key.getUpFlow());

// 1 向treeMap集合中添加数据
flowMap.put(bean, new Text(value));

// 2 限制TreeMap数据量,超过10条就删除掉流量最小的一条数据
if (flowMap.size() > 10) {
flowMap.remove(flowMap.firstKey());
}
}
}

@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
// 3 遍历集合,输出数据
Iterator<FlowBean> it = flowMap.keySet().iterator();

while (it.hasNext()) {
FlowBean v = it.next();

context.write(new Text(flowMap.get(v)), v);
}
}
}

(4)编写TopNDriver类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.atguigu.mr.topN;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class TopNDriver {
public static void main(String[] args) throws InterruptedException, IOException, ClassNotFoundException {
args = new String[]{"f:/IDEAWS/dashju/topNinput", "f:/IDEAWS/dashju/topNoutput"};

// 1 获取配置信息,或者job对象实例
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);

// 6 指定本程序的jar包所在的本地路径
job.setJarByClass(TopNDriver.class);

// 2 指定本业务job要使用的mapper/Reducer业务类
job.setMapperClass(TopNMapper.class);
job.setReducerClass(TopNReducer.class);

// 3 指定mapper输出数据的kv类型
job.setMapOutputKeyClass(FlowBean.class);
job.setMapOutputValueClass(Text.class);

// 4 指定最终输出的数据的kv类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);

// 5 指定job的输入原始文件所在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}

找博客共同粉丝案例

需求分析

需求

以下是博客的粉丝列表数据,冒号前是一个用户,冒号后是该用户的所有粉丝(数据中的粉丝关系是单向的)
求出哪些人两两之间有共同粉丝,及他俩粉的谁?

数据输入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J

流程分析

先求出A、B、C、….等是谁的粉丝
第一次输出结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
A	I,K,C,B,G,F,H,O,D,
B A,F,J,E,
C A,E,B,H,F,G,K,
D G,C,K,A,L,F,E,H,
E G,M,L,H,A,F,B,D,
F L,M,D,C,G,A,
G M,
H O,
I O,C,
J O,
K B,
L D,E,
M E,F,
O A,H,I,J,F,

第二次输出结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
A-B	E C 
A-C D F
A-D E F
A-E D B C
A-F O B C D E
A-G F E C D
A-H E C D O
A-I O
A-J O B
A-K D C
A-L F E D
A-M E F
B-C A
B-D A E
B-E C
B-F E A C
B-G C E A
B-H A E C
B-I A
B-K C A
B-L E
B-M E
B-O A
C-D A F
C-E D
C-F D A
C-G D F A
C-H D A
C-I A
C-K A D
C-L D F
C-M F
C-O I A
D-E L
D-F A E
D-G E A F
D-H A E
D-I A
D-K A
D-L E F
D-M F E
D-O A
E-F D M C B
E-G C D
E-H C D
E-J B
E-K C D
E-L D
F-G D C A E
F-H A D O E C
F-I O A
F-J B O
F-K D C A
F-L E D
F-M E
F-O A
G-H D C E A
G-I A
G-K D A C
G-L D F E
G-M E F
G-O A
H-I O A
H-J O
H-K A C D
H-L D E
H-M E
H-O A
I-J O
I-K A
I-O A
K-L D
K-O A
L-M E F

代码实现

  • (1)第一次Mapper类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.atguigu.mr.friends;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class ShareFriendsOneMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 获取一行 A:B,C,D,F,E,O
String line = value.toString();

// 2 切割
String[] fields = line.split(":");

// 3 获取博主和粉丝
String person = fields[0]; //A
String[] friends = fields[1].split(","); //B,C,D,F,E,O

// 4 写出
for (String friend : friends) {
// 输出<粉丝,博主>
// <(B,C,D,F,E,O),A> -> <B,A> <C,A> <D,A> <F,A> <E,A> <O,A> | <粉丝,博主>
context.write(new Text(friend), new Text(person));
}
}
}
  • (2)第一次Reducer类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.atguigu.mr.friends;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class ShareFriendsOneReducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// <(B,C,D,F,E,O),A> -> <B,A> <C,A> <D,A> <F,A> <E,A> <O,A> | <粉丝,博主>

StringBuffer sb = new StringBuffer();

// 1 拼接
// 每个粉丝后边拼接所关注的博主
for (Text person : values) {
sb.append(person).append(",");
}

// 2 写出
context.write(key, new Text(sb.toString()));
}
}
  • (3)第一次Driver类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.atguigu.mr.friends;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class ShareFriendsOneDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
args = new String[]{"F:/IDEAWS/dashju/ShareFriendsinput1", "F:/IDEAWS/dashju/ShareFriendsoutput1"};

// 1 获取job对象
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);

// 2 指定jar包运行的路径
job.setJarByClass(ShareFriendsOneDriver.class);

// 3 指定map/reduce使用的类
job.setMapperClass(ShareFriendsOneMapper.class);
job.setReducerClass(ShareFriendsOneReducer.class);

// 4 指定map输出的数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

// 5 指定最终输出的数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

// 6 指定job的输入原始所在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 7 提交
boolean result = job.waitForCompletion(true);

System.exit(result ? 0 : 1);
}
}
  • (4)第二次Mapper类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.atguigu.mr.friends;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;
import java.util.Arrays;

public class ShareFriendsTwoMapper extends Mapper<LongWritable, Text, Text, Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 粉丝,博主,博主,博主
// A I,K,C,B,G,F,H,O,D,
// B A,F,J,E,

// 1 获取一行
String line = value.toString();

// 2 切割
String[] fans_posters = line.split("\t");
String fans = fans_posters[0]; // A
String[] posters = fans_posters[1].split(","); // I,K,C,B,G,F,H,O,D,

Arrays.sort(posters); // B,C,D,F,G,H,I,K,O

// 共同粉丝迭代
// 对于相同的博主-博主,依次写出
for (int i = 0; i < posters.length - 1; i++) {
for (int j = 1; j < posters.length; j++) {
// 发出<博主-博主,粉丝>,相同的"博主-博主"对的所有好友就会到同一个reduce中去。
context.write(new Text(posters[i] + "-" + posters[j]), new Text(fans));
// (B-B,A) (B-C,A) (B-D,A)....
}
}
}
}
  • (5)第二次Reducer类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.atguigu.mr.friends;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class ShareFriendsTwoReducer extends Reducer<Text, Text, Text, Text> {
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
// <(博主-博主),粉丝>
// (B-B,A) (B-C,A) (B-D,A)....

StringBuffer sb = new StringBuffer();

// 1 汇总
// 以(博主-博主)为key,粉丝作为value
for (Text fans : values) {
sb.append(fans).append(" ");
}

// 2 写出
context.write(key, new Text(sb.toString()));
}
}
  • (6)第二次Driver类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
package com.atguigu.mr.friends;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class ShareFriendsTwoDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
args = new String[]{"F:/IDEAWS/dashju/ShareFriendsoutput1", "F:/IDEAWS/dashju/ShareFriendsoutput2"};

// 1 获取job对象
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);

// 2 指定jar包运行的路径
job.setJarByClass(ShareFriendsTwoDriver.class);

// 3 指定map/reduce使用的类
job.setMapperClass(ShareFriendsTwoMapper.class);
job.setReducerClass(ShareFriendsTwoReducer.class);

// 4 指定map输出的数据类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);

// 5 指定最终输出的数据类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

// 6 指定job的输入原始所在目录
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));

// 7 提交
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}

第8章 常见错误及解决方案

1)导包容易出错。尤其Text和CombineTextInputFormat。

2)Mapper中第一个输入的参数必须是LongWritable或者NullWritable,不可以是IntWritable. 报的错误是类型转换异常。

3)java.lang.Exception: java.io.IOException: Illegal partition for 13926435656 (4),说明Partition和ReduceTask个数没对上,调整ReduceTask个数。

4)如果分区数不是1,但是reducetask为1,是否执行分区过程。答案是:不执行分区过程。因为在MapTask的源码中,执行分区的前提是先判断ReduceNum个数是否大于1。不大于1肯定不执行。

5)在Windows环境编译的jar包导入到Linux环境中运行

1
hadoop jar wc.jar com.atguigu.mapreduce.wordcount.WordCountDriver /user/atguigu/ /user/atguigu/output

报如下错误:

1
Exception in thread "main" java.lang.UnsupportedClassVersionError: com/atguigu/mapreduce/wordcount/WordCountDriver : Unsupported major.minor version 52.0

原因是Windows环境用的jdk1.7,Linux环境用的jdk1.8。

解决方案:统一jdk版本。

6)缓存pd.txt小文件案例中,报找不到pd.txt文件

原因:大部分为路径书写错误。还有就是要检查pd.txt.txt的问题。还有个别电脑写相对路径找不到pd.txt,可以修改为绝对路径。

7)报类型转换异常。

通常都是在驱动函数中设置Map输出和最终输出时编写错误。
Map输出的key如果没有排序,也会报类型转换异常。

8)集群中运行wc.jar时出现了无法获得输入文件。

原因:WordCount案例的输入文件不能放用HDFS集群的根目录。

9)出现了如下相关异常

1
2
3
4
5
6
7
8
Exception in thread "main" java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z
at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:609)
at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:977)
java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries.
at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:356)
at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:371)
at org.apache.hadoop.util.Shell.<clinit>(Shell.java:364)

解决方案:拷贝hadoop.dll文件到Windows目录C:\Windows\System32。个别电脑还需要修改Hadoop源码。

10)自定义Outputformat时,注意在RecordWirter中的close方法必须关闭流资源。否则输出的文件内容中数据为空。

1
2
3
4
5
6
7
8
9
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
if (atguigufos != null) {
atguigufos.close();
}
if (otherfos != null) {
otherfos.close();
}
}
文章作者: HibisciDai
文章链接: http://hibiscidai.com/2020/11/09/Hadoop-MapReduce详解-3/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 HibisciDai
好用、实惠、稳定的梯子,点击这里