Hadoop-MapReduce详解-3
[TOC]
Hadoop-MapReduce详解-3 第4章 Hadoop数据压缩 压缩概述 压缩技术能够有效减少底层存储系统(HDFS)读写字节数。压缩提高了网络带宽和磁盘空间的效率。在运行MR程序时, I/O操作、网络数据传输、Shufle和Merge要花大量的时间,尤其是数据规模很大和工作负载密集的情况下,因此,使用数据压缩显得非常重要。
鉴于磁盘I/O和网络带宽是Hadoop的宝贵资源,数据压缩对于节省资源、最小化磁盘I/O和网络传输非常有帮助。可以在任意MapReduce阶段启用压缩。不过,尽管压缩与解压操作的CPU开销不高,其性能的提升和资源的节省并非没有代价。
压缩是提高Hadoop运行效率的一种优化策略。
通过对Mapper、Reducer运行过程的数据进行压缩,以减少磁盘IO,提高MR程序运行速度。
注意:采用压缩技术减少了磁盘IO,但同时增加了CPU运算负担。所以,压缩特性运用得当能提高性能,但运用不当也可能降低性能。
(1) 运算密集型的job,少用压缩(涉及大量公式,算法) (2) IO密集型的job,多用压缩(数据传输)
MR支持的压缩编码
压缩格式
hadoop自带?
算法
文件扩展名
是否可切分
换成压缩格式后,原来的程序是否需要修改
DEFLATE
是,直接使用
DEFLATE
.deflate
否
和文本处理一样,不需要修改
Gzip
是,直接使用
DEFLATE
.gz
否
和文本处理一样,不需要修改
bzip2
是,直接使用
bzip2
.bz2
是
和文本处理一样,不需要修改
LZO
否,需要安装
LZO
.lzo
是
需要建索引,还需要指定输入格式
Snappy
否,需要安装
Snappy
.snappy
否
和文本处理一样,不需要修改
为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器,如下表所示。
压缩格式
对应的编码/解码器
DEFLATE
org.apache.hadoop.io.compress.DefaultCodec
gzip
org.apache.hadoop.io.compress.GzipCodec
bzip2
org.apache.hadoop.io.compress.BZip2Codec
LZO
com.hadoop.compression.lzo.LzopCodec
Snappy
org.apache.hadoop.io.compress.SnappyCodec
压缩性能的比较
压缩算法
原始文件大小
压缩文件大小
压缩速度
解压速度
gzip
8.3GB
1.8GB
17.5MB/s
58MB/s
bzip2
8.3GB
1.1GB
2.4MB/s
9.5MB/s
LZO
8.3GB
2.9GB
49.3MB/s
74.6MB/s
Google/Snappy github/Snappy
压缩方式选择 Gzip压缩
压缩率比较高,而且压缩解压速度也比较快; Hadoop本身支持, 在应用中处理Gzip格式的文件就和直接处理文本一样; 大部分Linux系统都自带Gzip命令,使用方便。
不支持Split。
当每个文件压缩之后在130M以内的(1个块大小内) ,都可以考虑用Gzip压缩格式。例如说一天或者一个小时的日志压缩成一个Gzip文件。
Bzip2压缩
支持Split; 具有很高的压缩率,比Gzip压缩率都高; Hadoop本身自带,使用方便。
压缩/解压速度慢。
适合对速度要求不高,但需要较高的压缩率的时候; 或者输出之后的数据比较大,处理之后的数据需要压缩存档减少磁盘空间并且以后数据用得比较少的情况(历史数据备份); 或者对单个很大的文本文件想压缩减少存储空间,同时又需要支持Split,而且兼容之前的应用程序的情况。
Lzo压缩
压缩/解压速度也比较快,合理的压缩率; 支持Split,是Hadoop中最流行的压缩格式; 可以在Linux系统下安装Izop命令,使用方便。
压缩率比Gzip要低一些; Hadoop本身不支持,需要安装; 在应用中对Lzo格式的文件需要做一些特殊处理(为了支持Split需要建索引,还需要指定InputFormat为Lzo格式)
一个很大的文本文件,压缩之后还大于200M以,上的可以考虑,而且单个文件越大,Lzo优点越越明显。
Snappy压缩
高速压缩速度和合理的压缩率。
不支持Split; 压缩率比Gzip要低; Hadoop本身不支持, 需要安装。
当NapReduce作业的Map输出的数据比较大的时候, 作为Map到Redunce的中间数据的压缩格式; 或者作为一个MapRednce作业的输出和另外一个MapReduce作业的输入。
压缩位置选择 输入端采用压缩 在有大量数据并计划重复处理的情况下,应该考虑对输入进行压缩。然而,你无须显示指定使用的编解码方式。 Hadoop自动检查文件扩展名,如果扩展名能够匹配,就会用恰当的编解码方式对文件进行压缩和解压。 否则,Hadoop就不会使用任何编解码器。
Mapper输出采用压缩 当Map任务输出的中间数据量很大时,应考虑在此阶段采用压缩技术。这能显著改善内部数据Shufne过程,而Shffle过程在Hadoop处理过程中是资源肖耗最多的环节。 如果发现数据量大造成网络传输缓慢,应该考虑使用压缩技术。 可用于压缩Mapper 输出的快速编解码器包括LZO或者Snappy。
注: LZO是供Hadoop压缩数据用的通用压缩编解码器。其设计目标是达到与硬盘读取速度相当的压缩速度,因此速度是优先考虑的因素,而不是压缩率。与Gzip编解码器相比,它的压缩速度是Gzip的5倍,而解压速度是Gzp的2倍。同一个文件用LZO压缩后比用Gzip压缩后大50%,但比压缩前小25%~ 50%。这对改善性能非常有利。Map阶段完成时间快4倍。
Reducer输出采用压缩 在此阶段启用压缩技术能够减少要存储的数据量,因此降低所需的磁盘空间。当MapReduce作业形成作业链条时,因为第二个作业的输入也已压缩,所以启用压缩同样有效。
压缩参数配置 要在Hadoop中启用压缩,可以配置如下参数:
参数
默认值
阶段
建议
io.compression.codecs(在core-site.xml中配置)
org.apache.hadoop.io.compress.DefaultCodec, org.apache.hadoop.io.compress.GzipCodec, org.apache.hadoop.io.compress.BZip2Codec
输入压缩
Hadoop使用文件扩展名判断是否支持某种编解码器
mapreduce.map.output.compress(在mapred-site.xml中配置)
false
mapper输出
这个参数设为true启用压缩
mapreduce.map.output.compress.codec(在mapred-site.xml中配置)
org.apache.hadoop.io.compress.DefaultCodec
mapper输出
企业多使用LZO或Snappy编解码器在此阶段压缩数据
mapreduce.output.fileoutputformat.compress(在mapred-site.xml中配置)
false
reducer输出
这个参数设为true启用压缩
mapreduce.output.fileoutputformat.compress.codec(在mapred-site.xml中配置)
org.apache.hadoop.io.compress.DefaultCodec
reducer输出
使用标准工具或者编解码器,如gzip和bzip2
mapreduce.output.fileoutputformat.compress.type(在mapred-site.xml中配置)
RECORD
reducer输出
SequenceFile输出使用的压缩类型:NONE和BLOCK
压缩实操案例 数据流的压缩和解压缩 CompressionCodec有两个方法可以用于轻松地压缩或解压缩数据。
要想对正在被写入一个输出流的数据进行压缩,我们可以使用reateOutputStream(OutputStreamout)
方法创建一个CompressionOutputStream
,将其以压缩格式写入底层的流。
相反,要想对从输入流读取而来的数据进行解压缩,则调用createInputStream(InputStreamin)
函数,从而获得一个CompressionInputStream
, 从而从底层的流读取未压缩的数据。
测试
DEFLATE
org.apache.hadoop.io.compress.DefaultCodec
gzip
org.apache.hadoop.io.compress.GzipCodec
bzip2
org.apache.hadoop.io.compress.BZip2Codec
TestCompress.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 package com.atguigu.mr.compress;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IOUtils;import org.apache.hadoop.io.compress.CompressionCodec;import org.apache.hadoop.io.compress.CompressionCodecFactory;import org.apache.hadoop.io.compress.CompressionInputStream;import org.apache.hadoop.io.compress.CompressionOutputStream;import org.apache.hadoop.util.ReflectionUtils;import java.io.*;public class TestCompress { public static void main (String[] args) throws IOException, ClassNotFoundException { decompress("f:/IDEAWS/dashju/compressinput/hello.txt.bz2" ); } private static void compress (String filename, String method) throws IOException, ClassNotFoundException { FileInputStream fis = new FileInputStream (new File (filename)); Class codeClass = Class.forName(method); CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codeClass, new Configuration ()); FileOutputStream fos = new FileOutputStream (new File (filename + codec.getDefaultExtension())); CompressionOutputStream cos = codec.createOutputStream(fos); IOUtils.copyBytes(fis, cos, 1024 * 1024 * 5 , false ); IOUtils.closeStream(cos); IOUtils.closeStream(fos); IOUtils.closeStream(fis); } private static void decompress (String filename) throws IOException { CompressionCodecFactory compressionCodecFactory = new CompressionCodecFactory (new Configuration ()); CompressionCodec compressionCodec = compressionCodecFactory.getCodec(new Path (filename)); if (null == compressionCodec) { System.out.println("不能解压缩" ); return ; } FileInputStream fis = new FileInputStream (new File (filename)); CompressionInputStream cis = compressionCodec.createInputStream(fis); FileOutputStream fos = new FileOutputStream (new File (filename + "decode" )); IOUtils.copyBytes(cis, fos, 1024 * 1024 * 5 , false ); IOUtils.closeStream(fos); IOUtils.closeStream(cis); IOUtils.closeStream(fis); } }
Map输出端采用压缩 即使你的MapReduce的输入输出文件都是未压缩的文件,你仍然可以对Map任务的中间结果输出做压缩,因为它要写在硬盘并且通过网络传输到Reduce节点,对其压缩可以提高很多性能,这些工作只要设置两个属性即可,我们来看下代码怎么设置。
不影响输出结果,只是提高IO效率。
在wordcount案例中对WordCountDriver.java
修改配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 Configuration configuration = new Configuration ();configuration.setBoolean("mapreduce.map.output.compress" , true ); configuration.setClass("mapreduce.map.output.compress.codec" , BZip2Codec.class, CompressionCodec.class); Job job = Job.getInstance(configuration);job.setJarByClass(WordCountDriver.class); job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, new Path (args[0 ])); FileOutputFormat.setOutputPath(job, new Path (args[1 ])); boolean result = job.waitForCompletion(true );System.exit(result ? 1 : 0 );
Reduce输出端采用压缩 最终输出文件为压缩文件。
在wordcount案例中对WordCountDriver.java
修改配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 Configuration configuration = new Configuration ();Job job = Job.getInstance(configuration);job.setJarByClass(WordCountDriver.class); job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, new Path (args[0 ])); FileOutputFormat.setOutputPath(job, new Path (args[1 ])); FileOutputFormat.setCompressOutput(job, true ); FileOutputFormat.setOutputCompressorClass(job, BZip2Codec.class); boolean result = job.waitForCompletion(true );System.exit(result?1 :0 );
第5章 Yarn资源调度器 Yarn是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作系统平台,而MapReduce等运算程序则相当于运行于操作系统之上的应用程序。
Yarn基本架构 YARN主要由ResourceManager
、NodeManager
、ApplicationMaster
和Container
等组件构成。
Yarn工作机制
(1)MR程序提交到客户端所在的节点。 (2)YarnRunner向ResourceManager申请一个Application。 (3)RM将该应用程序的资源路径返回给YarnRunner。 (4)该程序将运行所需资源提交到HDFS上。 (5)程序资源提交完毕后,申请运行mrAppMaster。 (6)RM将用户的请求初始化成一个Task。 (7)其中一个NodeManager领取到Task任务。 (8)该NodeManager创建容器Container,并产生MRAppmaster。 (9)Container从HDFS上拷贝资源到本地。 (10)MRAppmaster向RM 申请运行MapTask资源。 (11)RM将运行MapTask任务分配给另外两个NodeManager,另两个NodeManager分别领取任务并创建容器。 (12)MR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动MapTask,MapTask对数据分区排序。 (13)MrAppMaster等待所有MapTask运行完毕后,向RM申请容器,运行ReduceTask。 (14)ReduceTask向MapTask获取相应分区的数据。 (15)程序运行完毕后,MR会向RM申请注销自己。
作业提交全过程 Yarn阶段 作业提交全过程详解
(1)作业提交
第1步:Client调用job.waitForCompletion
方法,向整个集群提交MapReduce作业。 第2步:Client向RM申请一个作业id。 第3步:RM给Client返回该job资源的提交路径和作业id。 第4步:Client提交jar包、切片信息和配置文件到指定的资源提交路径。 第5步:Client提交完资源后,向RM申请运行MrAppMaster。
(2)作业初始化
第6步:当RM收到Client的请求后,将该job添加到容量调度器中。 第7步:某一个空闲的NM领取到该Job。 第8步:该NM创建Container,并产生MRAppmaster。 第9步:下载Client提交的资源到本地。
(3)任务分配
第10步:MrAppMaster向RM申请运行多个MapTask任务资源。 第11步:RM将运行MapTask任务分配给另外两个NodeManager,另两个NodeManager分别领取任务并创建容器。
(4)任务运行
第12步:MR向两个接收到任务的NodeManager发送程序启动脚本,这两个NodeManager分别启动MapTask,MapTask对数据分区排序。 第13步:MrAppMaster等待所有MapTask运行完毕后,向RM申请容器,运行ReduceTask。 第14步:ReduceTask向MapTask获取相应分区的数据。 第15步:程序运行完毕后,MR会向RM申请注销自己。
(5)进度和状态更新
YARN中的任务将其进度和状态(包括counter)返回给应用管理器, 客户端每秒(通过mapreduce.client.progressmonitor.pollinterval
设置)向应用管理器请求进度更新, 展示给用户。
(6)作业完成
除了向应用管理器请求作业进度外, 客户端每5秒都会通过调用waitForCompletion()
来检查作业是否完成。时间间隔可以通过mapreduce.client.completion.pollinterval
来设置。作业完成之后, 应用管理器和Container会清理工作状态。作业的信息会被作业历史服务器存储以备之后用户核查。
MapReduce阶段
资源调度器 目前,Hadoop作业调度器主要有三种:FIFO
、Capacity Scheduler
和Fair Scheduler
。 Hadoop2.7.2默认的资源调度器是Capacity Scheduler。
具体设置详见:yarn-default.xml
文件
1 2 3 4 5 <property > <description > The class to use as the resource scheduler.</description > <name > yarn.resourcemanager.scheduler.class</name > <value > org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value > </property >
先进先出调度器(FIFO)
容量调度器(Capacity Scheduler) 为多个FIFO调度器组合,并发度较高。
1、支持多个队列,每个队列可配置一定的资源量,每个队列采用FIFO调度策略。
2、为了防止同一个用户的作业独占队列中的资源,该调庶器会对同一用户提交的作业所占资源虽进行限定。
3、首先,计算每个队列中正在运行的任务数与其应该分得的计算资原之间的比值,选择一个该比值最小的队列——最闲的。
4、其次,按照作业优先级和提交时间顺字,同时考虑用户资源量限制和内存限制对队列内任务排序。
5、三个队列同时按照任务的先后顺序依次执行,比如,jobl1、 job21和jb3l分别排在队列最前面,先运行,也是并行运行。
公平调度器(Fair Scheduler) 缺额排序,多个FIFO调度器组合。
支持多队列多用户,每个队列中的资源量可以配置,同一队列中的作业公平共享队列中所有资源。
比如有三个队列: queueA、 queueB和queueC, 每个队列中的job按照优先级分配资源,优先级越高分配的资源越多,但是每个job都会分配到资源以确保公平。
在资源有限的情况下,每个job理想情兄下获得的计算资源与实际获得的计算资源存在一种差距,这个差距就叫做缺额
。
在同一个队列中,job的资源缺额越大,越先获得资源优先执行。作业是按照缺额的高低来先后执行的,而且可以看到上图有多个作业同时运行。
任务的推测执行 1.作业完成时间取决于最慢的任务完成时间
一个作业由若干个Map任务和Reduce任务构成。因硬件老化、软件Bug等,某些任务可能运行非常慢。
思考:系统中有99%的Map任务都完成了,只有少数几个Map老是进度很慢,完不成,怎么办?
2.推测执行机制
发现拖后腿的任务,比如某个任务运行速度远慢于任务平均速度。为拖后腿任务启动一个备份任务,同时运行。谁先运行完,则采用谁的结果。
执行推测任务的前提条件
(1)每个Task只能有一个备份任务 (2)当前Job已完成的Task必须不小于0.05(5%) (3)开启推测执行参数设置。mapred-site.xml
文件中默认是打开的。
1 2 3 4 5 6 7 8 9 10 11 <property > <name > mapreduce.map.speculative</name > <value > true</value > <description > If true, then multiple instances of some map tasks may be executed in parallel.</description > </property > <property > <name > mapreduce.reduce.speculative</name > <value > true</value > <description > If true, then multiple instances of some reduce tasks may be executed in parallel.</description > </property >
不能启用推测执行机制情况
(1)任务间存在严重的负载倾斜; (2)特殊任务,比如任务向数据库中写数据。
算法原理
假设某一时刻,任务T的执行进度为progress
,则可通过一定的算法推测出该任务的最终完成时刻estimateEndTime
。另一方面,如果此刻为该任务启动一个备份任务,则可推断出它可能的完成时刻estimateEndTime2
;于是可得出以下几个公式:
1 2 estimatedRunTime = (currentTimestamp - taskStartTime) / progress 推测运行时间(60s) = (当前时刻(6) - 任务启动时刻(0)) / 任务运行比例(10%)
1 2 estimateEndTime = estimatedRunTime + taskStartTime 推测执行完时刻(60) = 推测运行时间(60s) + 任务启动时刻(0)
1 2 estimateEndTime2 = currentTimestamp + averageRunTime 备份任务推测完成时刻(16) = 当前时刻(6) + 运行完成任务的平均时间(10s)
(1)MR总是选择(estimateEndTime - estimateEndTime2
)差值最大的任务,并为之启动备份任务。
(2)为了防止大量任务同时启动备份任务造成的资源浪费,MR为每个作业设置了同时启动的备份任务数目上限。
(3)推测执行机制实际上采用了经典的优化算法:以空间换时间,它同时启动多个相同任务处理相同的数据,并让这些任务竞争以缩短数据处理时间。显然,这种方法需要占用更多的计算资源。在集群资源紧缺的情况下,应合理使用该机制,争取在多用少量资源的情况下,减少作业的计算时间。
第6章 Hadoop企业优化 MapReduce跑得慢的原因 MapReduce程序效率的瓶颈在于两点
CPU、内存、磁盘健康、网络
(1)数据倾斜; (2)Map和Reduce数设置不合理; (3)Map运行时间太长,导致Reduce等待过久; (4)小文件过多; (5)大量的不可分块的超大文件; (6)Spill次数过多; (7)Merge次数过多。
MapReduce优化方法 MapReduce优化方法主要从六个方面考虑:数据输入、Map阶段、Reduce 阶段、Io传输、数据倾斜问题和常用的调优参数。
数据输入 (1)合并小文件在执行MR任务前将小文件进行合并,大量的小文件会产生大量的Map任务,增大Map任务装载次数,而任务的装载比较耗时,从而导致MR运行较慢。
(2)采用CombineTextInputFormat
来作为输入,解决输入端大小文件场景。
Map阶段 (1)减少溢写(Spill)次数
通过调整io.sort.mb
(环形缓冲区)及sort.spill.percent
(到达多少开始溢写)参数值,增大触发Spill的内存上限,减少Spill次数,从而减少磁盘IO。
(2)减少合并(Merge)次数
通过调整io.sort.factor
参数,增大Merge的文件数目,减少Merge的次数,从而缩短MR处理时间。
(3)在Map之后,不影响业务逻辑前提下,先进行Combine处理,减少I/O。
Reduce阶段 (1)合理设置Map和Reduce数
两个都不能设置太少,也不能设置太多。太少,会导致Task等待,延长处理时间;太多,会导致Map、Reduce任务间竞争资源,造成处理超时等错误。
(2)设置Map、Reduce共存
调整slowstart.completedmaps
参数, 使Map运行到一定程度后,Reduce也开始运行,减少Reduce的等待时间。
(3)规避使用Reduce
因为Reduce在用于连接数据集的时候将会产生大量的网络消耗。
(4)合理设置Reduce端的Buffer
默认情况下,数据达到一个阈值的时候,Buffer中的数据就会写入磁盘,然后Reduce会从磁盘中获得所有的数据。也就是说,Buffen和Reduce是没有直接关联的,中间多次写磁盘->读磁盘的过程,既然有这个弊端,那么就可以通过参数来配置,使得Buffer中的一 分数据可以直接输送到Reduce,从而减少IO开销:mapreduce.reduce.input.bufer.percent
,默认为0.0。当值大于0的时候,会保留指定比例的内存读Buffer中的数据直接拿给Reduce使用。这样一来,设置Buffer需要内存,读取数据需要内存,Reduce计算也要内存,所以要根据作业的运行情况进行调整。
IO传输 (1)采用数据压缩的方式,减少网络IO的的时间。安装Snappy和LZO压缩编码器。
(2)使用SequenceFile二进制文件。
数据倾斜问题 数据倾斜现象 数据频率倾斜:某一个区域的数据要远远大于其他区域。
数据大小倾斜:部分记录的大小远远大于平均值。
减少数据倾斜的方法
可以通过对原始数据进行抽样得到的结果集来预设分区边界值。
基于输出键的背景知识进行自定义分区。例如,如果Map输出键的单词来源于一本书。且其中某几个专业司汇较多。那么就可以自定义分区将这这些专业司汇发送给固定的一部分Reduce实例。而将其他的都发送给剩余的Reduce实例。
使用Combine可以大量地减小数据倾斜。在可能的情况下,Cobine的目的就是聚合并精简数据。
方法4:采用Map Join,尽量避免Reduce Join。
常用的调优参数 资源相关参数 (1)以下参数是在用户自己的MR应用程序中配置就可以生效(mapred-default.xml
)
配置参数
参数说明
mapreduce.map.memory.mb
一个MapTask可使用的资源上限(单位:MB),默认为1024。如果MapTask实际使用的资源量超过该值,则会被强制杀死。(开发中2-4G)
mapreduce.reduce.memory.mb
一个ReduceTask可使用的资源上限(单位:MB),默认为1024。如果ReduceTask实际使用的资源量超过该值,则会被强制杀死。
mapreduce.map.cpu.vcores
每个MapTask可使用的最多cpu core数目,默认值:1。
mapreduce.reduce.cpu.vcores
每个ReduceTask可使用的最多cpu core数目,默认值:1。
mapreduce.reduce.shuffle.parallelcopies
每个Reduce去Map中取数据的并行数,默认值:5。
mapreduce.reduce.shuffle.merge.percent
Buffer中的数据达到多少比例开始写入磁盘,默认值:0.66。
mapreduce.reduce.shuffle.input.buffer.percent
Buffer大小占Reduce可用内存的比例,默认值:0.7。
mapreduce.reduce.input.buffer.percent
指定多少比例的内存用来存放Buffer中的数据,默认值:0。
(2)应该在YARN启动之前就配置在服务器的配置文件中才能生效(yarn-default.xml
)
配置参数
参数说明
yarn.scheduler.minimum-allocation-mb
给应用程序Container分配的最小内存,默认值:1024。
yarn.scheduler.maximum-allocation-mb
给应用程序Container分配的最大内存,默认值:8192。
yarn.scheduler.minimum-allocation-vcores
每个Container申请的最小CPU核数,默认值:1。
yarn.scheduler.maximum-allocation-vcores
每个Container申请的最大CPU核数,默认值:32。
yarn.nodemanager.resource.memory-mb
给Containers分配的最大物理内存,默认值:8192。
(3)Shuffle性能优化的关键参数,应在YARN启动之前就配置好(mapred-default.xml
)
配置参数
参数说明
mapreduce.task.io.sort.mb
Shuffle的环形缓冲区大小,默认值:100M。
mapreduce.map.sort.spill.percent
环形缓冲区溢出的阈值,默认值:80%。
容错相关参数(MapReduce性能优化)
配置参数
参数说明
mapreduce.map.maxattempts
每个Map Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。
mapreduce.reduce.maxattempts
每个Reduce Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。
mapreduce.task.timeout
Task超时时间,经常需要设置的一个参数,该参数表达的意思为:如果一个Task在一定时间内没有任何进入,即不会读取新的数据,也没有输出数据,则认为该Task处于Block状态,可能是卡住了,也许永远会卡住,为了防止因为用户程序永远Block住不退出,则强制设置了一个该超时时间(单位毫秒),默认是600000。如果你的程序对每条输入数据的处理时间过长(比如会访问数据库,通过网络拉取数据等),建议将该参数调大,该参数过小常出现的错误提示是AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by the ApplicationMaster.
。
HDFS小文件优化方法 HDFS小文件弊端 HDFS上每个文件都要在NameNode上建立一个索引,这个索引的大小约为150byte,这样当小文件比较多的时候,就会产生很多的索引文件,一方面会大量占用NameNode的内存空间,另一方面就是索引文件过大使得索引速度变慢。
HDFS小文件解决方案 小文件的优化无非以下几种方式:
(1)在数据采集的时候,就将小文件或小批数据合成大文件再上传HDFS。 (2)在业务处理之前,在HDFS上使用MapReduce程序对小文件进行合并。 (3)在MapReduce处理时,可采用CombineTextInputFormat提高效率。
是一个高效地将小文件放入HDFS块中的文件存档工具,它能够将多个小文件打包成一个HAR文件,这样就减少了NameNode的内存使用。
SequenceFile由一系列的二进制key/value组成,如果key为文件名,value为文件内容,则可以将大批小文件合并成一个大文件。
CombineFileInputFormat是一种新的InputFomat, 用于将多个文件台并成一个单独的Split,另外,它会考虑数据的存储位置。
对于大量小文件Job,可以开启JVM重用会减少45%运行时间。
JVM重用原理:一个Map运行在一个JVM上,开启重用的话,该Map在JVM上运行完毕后,JVM继续运行其他Map。
具体设置: mapreduce.job.jvm.numtasks
(JVM线程池)值在10-20之间。
第7章 MapReduce扩展案例 倒排索引案例(多job串联) 需求分析
有大量的文本(文档、网页),需要建立搜索索引。
a.txt
1 2 3 atguigu pingping atguigu ss atguigu ss
b.txt
1 2 3 atguigu pingping atguigu pingping atguigu ss
c.txt
1 2 atguigu ss atguigu pingping
1 2 3 atguigu c.txt-->2 b.txt-->2 a.txt-->3 pingping c.txt-->1 b.txt-->3 a.txt-->1 ss c.txt-->1 b.txt-->1 a.txt-->2
代码实现 第一次处理 (1)第一次处理,编写OneIndexMapper类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 package com.atguigu.mr.index;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.IOException;public class OneIndexMapper extends Mapper <LongWritable, Text, Text, IntWritable> { String name; IntWritable v = new IntWritable (1 ); Text k = new Text (); @Override protected void setup (Context context) throws IOException, InterruptedException { FileSplit fileSplit = (FileSplit)context.getInputSplit(); name = fileSplit.getPath().getName(); } @Override protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split(" " ); for (String word : fields) { k.set(word + "--" + name); context.write(k, v); } } }
(2)第一次处理,编写OneIndexReducer类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 package com.atguigu.mr.index;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class OneIndexReducer extends Reducer <Text, IntWritable, Text, IntWritable> { IntWritable v = new IntWritable (); @Override protected void reduce (Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0 ; for (IntWritable value : values) { sum += value.get(); } v.set(sum); context.write(key, v); } }
(3)第一次处理,编写OneIndexDriver类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 package com.atguigu.mr.index;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class OneIndexDriver { public static void main (String[] args) throws IOException, ClassNotFoundException, InterruptedException { args = new String []{"f:/IDEAWS/dashju/oneindexinput" , "f:/IDEAWS/dashju/oneindexoutput" }; Configuration conf = new Configuration (); Job job = Job.getInstance(conf); job.setJarByClass(OneIndexDriver.class); job.setMapperClass(OneIndexMapper.class); job.setReducerClass(OneIndexReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.setInputPaths(job, new Path (args[0 ])); FileOutputFormat.setOutputPath(job, new Path (args[1 ])); job.waitForCompletion(true ); } }
(4)查看第一次输出结果
1 2 3 4 5 6 7 8 9 atguigu--a.txt 3 atguigu--b.txt 3 atguigu--c.txt 2 pingping--a.txt 1 pingping--b.txt 2 pingping--c.txt 1 ss--a.txt 2 ss--b.txt 1 ss--c.txt 1
第二次处理 (1)第二次处理,编写TwoIndexMapper类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 package com.atguigu.mr.index;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class TwoIndexMapper extends Mapper <LongWritable, Text, Text, Text> { Text k = new Text (); Text v = new Text (); @Override protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split("--" ); k.set(fields[0 ]); v.set(fields[1 ]); context.write(k, v); } }
(2)第二次处理,编写TwoIndexReducer类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 package com.atguigu.mr.index;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class TwoIndexReducer extends Reducer <Text, Text, Text, Text> { Text v = new Text (); @Override protected void reduce (Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuilder sb = new StringBuilder (); for (Text value : values) { sb.append(value.toString().replace("\t" , "-->" ) + "\t" ); } v.set(sb.toString()); context.write(key, v); } }
(3)第二次处理,编写TwoIndexDriver类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 package com.atguigu.mr.index;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class TwoIndexDriver { public static void main (String[] args) throws IOException, ClassNotFoundException, InterruptedException { args = new String []{"f:/IDEAWS/dashju/twoindexinput" , "f:/IDEAWS/dashju/twoindexoutput" }; Configuration config = new Configuration (); Job job = Job.getInstance(config); job.setJarByClass(TwoIndexDriver.class); job.setMapperClass(TwoIndexMapper.class); job.setReducerClass(TwoIndexReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job, new Path (args[0 ])); FileOutputFormat.setOutputPath(job, new Path (args[1 ])); boolean result = job.waitForCompletion(true ); System.exit(result ? 0 : 1 ); } }
(4)第二次查看最终结果
1 2 3 atguigu c.txt-->2 b.txt-->3 a.txt-->3 pingping c.txt-->1 b.txt-->2 a.txt-->1 ss c.txt-->1 b.txt-->1 a.txt-->2
TopN案例 需求分析 需求 对需求序列化输出结果进行加工,输出流量使用量在前10的用户信息。
输入数据 手机号,上行流量,下行流量,总流量
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 13470253144 180 180 360 13509468723 7335 110349 117684 13560439638 918 4938 5856 13568436656 3597 25635 29232 13590439668 1116 954 2070 13630577991 6960 690 7650 13682846555 1938 2910 4848 13729199489 240 0 240 13736230513 2481 24681 27162 13768778790 120 120 240 13846544121 264 0 264 13956435636 132 1512 1644 13966251146 240 0 240 13975057813 11058 48243 59301 13992314666 3008 3720 6728 15043685818 3659 3538 7197 15910133277 3156 2936 6092 15959002129 1938 180 2118 18271575951 1527 2106 3633 18390173782 9531 2412 11943 84188413 4116 1432 5548
输出数据 1 2 3 4 5 6 7 8 9 10 13509468723 7335 110349 117684 13975057813 11058 48243 59301 13568436656 3597 25635 29232 13736230513 2481 24681 27162 18390173782 9531 2412 11943 13630577991 6960 690 7650 15043685818 3659 3538 7197 13992314666 3008 3720 6728 15910133277 3156 2936 6092 13560439638 918 4938 5856
代码实现 (1)编写FlowBean类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 package com.atguigu.mr.topN;import org.apache.hadoop.io.Writable;import org.apache.hadoop.io.WritableComparable;import java.io.DataInput;import java.io.DataOutput;import java.io.IOException;public class FlowBean implements WritableComparable <FlowBean> { private long upFlow; private long downFlow; private long sumFlow; public FlowBean () { super (); } public FlowBean (long upFlow, long downFlow, long sumFlow) { this .upFlow = upFlow; this .downFlow = downFlow; this .sumFlow = sumFlow; } public void setUpFlow (long upFlow) { this .upFlow = upFlow; } public void setDownFlow (long downFlow) { this .downFlow = downFlow; } public void setSumFlow (long sumFlow) { this .sumFlow = sumFlow; } public long getUpFlow () { return upFlow; } public long getDownFlow () { return downFlow; } public long getSumFlow () { return sumFlow; } public void set (long downFlow2, long upFlow2) { downFlow = downFlow2; upFlow = upFlow2; sumFlow = downFlow2 + upFlow2; } @Override public String toString () { return upFlow + "\t" + downFlow + "\t" + sumFlow; } @Override public int compareTo (FlowBean o) { int result; if (this .sumFlow > o.getSumFlow()) { result = -1 ; } else if (this .sumFlow < o.getSumFlow()) { result = 1 ; } else { result = 0 ; } return result; } @Override public void write (DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(downFlow); out.writeLong(sumFlow); } @Override public void readFields (DataInput in) throws IOException { upFlow = in.readLong(); downFlow = in.readLong(); sumFlow = in.readLong(); } }
(2)编写TopNMapper类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 package com.atguigu.mr.topN;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;import java.util.Iterator;import java.util.TreeMap;public class TopNMapper extends Mapper <LongWritable, Text, FlowBean, Text> { private TreeMap<FlowBean, Text> flowMap = new TreeMap <FlowBean, Text>(); private FlowBean kBean; @Override protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException { kBean = new FlowBean (); Text v = new Text (); String line = value.toString(); String[] fields = line.split("\t" ); String phoneNum = fields[0 ]; Long upFlow = Long.parseLong(fields[1 ]); Long downFlow = Long.parseLong(fields[2 ]); Long sumFlow = Long.parseLong(fields[3 ]); v.set(phoneNum); kBean.setUpFlow(upFlow); kBean.setDownFlow(downFlow); kBean.setSumFlow(sumFlow); flowMap.put(kBean, v); if (flowMap.size() > 10 ) { flowMap.remove(flowMap.lastKey()); } } @Override protected void cleanup (Context context) throws IOException, InterruptedException { Iterator<FlowBean> bean = flowMap.keySet().iterator(); while (bean.hasNext()) { FlowBean k = bean.next(); context.write(k, flowMap.get(k)); } } }
(3)编写TopNReducer类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 package com.atguigu.mr.topN;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;import java.util.Iterator;import java.util.TreeMap;public class TopNReducer extends Reducer <FlowBean, Text, Text, FlowBean> { TreeMap<FlowBean, Text> flowMap = new TreeMap <FlowBean, Text>(); @Override protected void reduce (FlowBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException { for (Text value : values) { FlowBean bean = new FlowBean (); bean.set(key.getDownFlow(), key.getUpFlow()); flowMap.put(bean, new Text (value)); if (flowMap.size() > 10 ) { flowMap.remove(flowMap.firstKey()); } } } @Override protected void cleanup (Context context) throws IOException, InterruptedException { Iterator<FlowBean> it = flowMap.keySet().iterator(); while (it.hasNext()) { FlowBean v = it.next(); context.write(new Text (flowMap.get(v)), v); } } }
(4)编写TopNDriver类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 package com.atguigu.mr.topN;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class TopNDriver { public static void main (String[] args) throws InterruptedException, IOException, ClassNotFoundException { args = new String []{"f:/IDEAWS/dashju/topNinput" , "f:/IDEAWS/dashju/topNoutput" }; Configuration configuration = new Configuration (); Job job = Job.getInstance(configuration); job.setJarByClass(TopNDriver.class); job.setMapperClass(TopNMapper.class); job.setReducerClass(TopNReducer.class); job.setMapOutputKeyClass(FlowBean.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); FileInputFormat.setInputPaths(job, new Path (args[0 ])); FileOutputFormat.setOutputPath(job, new Path (args[1 ])); boolean result = job.waitForCompletion(true ); System.exit(result ? 0 : 1 ); } }
找博客共同粉丝案例 需求分析 需求 以下是博客的粉丝列表数据,冒号前是一个用户,冒号后是该用户的所有粉丝(数据中的粉丝关系是单向的) 求出哪些人两两之间有共同粉丝,及他俩粉的谁?
数据输入 1 2 3 4 5 6 7 8 9 10 11 12 13 14 A:B,C,D,F,E,O B:A,C,E,K C:F,A,D,I D:A,E,F,L E:B,C,D,M,L F:A,B,C,D,E,O,M G:A,C,D,E,F H:A,C,D,E,O I:A,O J:B,O K:A,C,D L:D,E,F M:E,F,G O:A,H,I,J
流程分析 先求出A、B、C、….等是谁的粉丝 第一次输出结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 A I,K,C,B,G,F,H,O,D, B A,F,J,E, C A,E,B,H,F,G,K, D G,C,K,A,L,F,E,H, E G,M,L,H,A,F,B,D, F L,M,D,C,G,A, G M, H O, I O,C, J O, K B, L D,E, M E,F, O A,H,I,J,F,
第二次输出结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 A-B E C A-C D F A-D E F A-E D B C A-F O B C D E A-G F E C D A-H E C D O A-I O A-J O B A-K D C A-L F E D A-M E F B-C A B-D A E B-E C B-F E A C B-G C E A B-H A E C B-I A B-K C A B-L E B-M E B-O A C-D A F C-E D C-F D A C-G D F A C-H D A C-I A C-K A D C-L D F C-M F C-O I A D-E L D-F A E D-G E A F D-H A E D-I A D-K A D-L E F D-M F E D-O A E-F D M C B E-G C D E-H C D E-J B E-K C D E-L D F-G D C A E F-H A D O E C F-I O A F-J B O F-K D C A F-L E D F-M E F-O A G-H D C E A G-I A G-K D A C G-L D F E G-M E F G-O A H-I O A H-J O H-K A C D H-L D E H-M E H-O A I-J O I-K A I-O A K-L D K-O A L-M E F
代码实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 package com.atguigu.mr.friends;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;public class ShareFriendsOneMapper extends Mapper <LongWritable, Text, Text, Text> { @Override protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split(":" ); String person = fields[0 ]; String[] friends = fields[1 ].split("," ); for (String friend : friends) { context.write(new Text (friend), new Text (person)); } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 package com.atguigu.mr.friends;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class ShareFriendsOneReducer extends Reducer <Text, Text, Text, Text> { @Override protected void reduce (Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuffer sb = new StringBuffer (); for (Text person : values) { sb.append(person).append("," ); } context.write(key, new Text (sb.toString())); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 package com.atguigu.mr.friends;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class ShareFriendsOneDriver { public static void main (String[] args) throws IOException, ClassNotFoundException, InterruptedException { args = new String []{"F:/IDEAWS/dashju/ShareFriendsinput1" , "F:/IDEAWS/dashju/ShareFriendsoutput1" }; Configuration configuration = new Configuration (); Job job = Job.getInstance(configuration); job.setJarByClass(ShareFriendsOneDriver.class); job.setMapperClass(ShareFriendsOneMapper.class); job.setReducerClass(ShareFriendsOneReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job, new Path (args[0 ])); FileOutputFormat.setOutputPath(job, new Path (args[1 ])); boolean result = job.waitForCompletion(true ); System.exit(result ? 0 : 1 ); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 package com.atguigu.mr.friends;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;import java.util.Arrays;public class ShareFriendsTwoMapper extends Mapper <LongWritable, Text, Text, Text> { @Override protected void map (LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fans_posters = line.split("\t" ); String fans = fans_posters[0 ]; String[] posters = fans_posters[1 ].split("," ); Arrays.sort(posters); for (int i = 0 ; i < posters.length - 1 ; i++) { for (int j = 1 ; j < posters.length; j++) { context.write(new Text (posters[i] + "-" + posters[j]), new Text (fans)); } } } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 package com.atguigu.mr.friends;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;public class ShareFriendsTwoReducer extends Reducer <Text, Text, Text, Text> { @Override protected void reduce (Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException { StringBuffer sb = new StringBuffer (); for (Text fans : values) { sb.append(fans).append(" " ); } context.write(key, new Text (sb.toString())); } }
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 package com.atguigu.mr.friends;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class ShareFriendsTwoDriver { public static void main (String[] args) throws IOException, ClassNotFoundException, InterruptedException { args = new String []{"F:/IDEAWS/dashju/ShareFriendsoutput1" , "F:/IDEAWS/dashju/ShareFriendsoutput2" }; Configuration configuration = new Configuration (); Job job = Job.getInstance(configuration); job.setJarByClass(ShareFriendsTwoDriver.class); job.setMapperClass(ShareFriendsTwoMapper.class); job.setReducerClass(ShareFriendsTwoReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); FileInputFormat.setInputPaths(job, new Path (args[0 ])); FileOutputFormat.setOutputPath(job, new Path (args[1 ])); boolean result = job.waitForCompletion(true ); System.exit(result ? 0 : 1 ); } }
第8章 常见错误及解决方案 1)导包容易出错。尤其Text和CombineTextInputFormat。
2)Mapper中第一个输入的参数必须是LongWritable或者NullWritable,不可以是IntWritable. 报的错误是类型转换异常。
3)java.lang.Exception: java.io.IOException: Illegal partition for 13926435656 (4),说明Partition和ReduceTask个数没对上,调整ReduceTask个数。
4)如果分区数不是1,但是reducetask为1,是否执行分区过程。答案是:不执行分区过程。因为在MapTask的源码中,执行分区的前提是先判断ReduceNum个数是否大于1。不大于1肯定不执行。
5)在Windows环境编译的jar包导入到Linux环境中运行
1 hadoop jar wc.jar com.atguigu.mapreduce.wordcount.WordCountDriver /user/atguigu/ /user/atguigu/output
报如下错误:
1 Exception in thread "main" java.lang.UnsupportedClassVersionError: com/atguigu/mapreduce/wordcount/WordCountDriver : Unsupported major.minor version 52.0
原因是Windows环境用的jdk1.7,Linux环境用的jdk1.8。
解决方案:统一jdk版本。
6)缓存pd.txt小文件案例中,报找不到pd.txt文件
原因:大部分为路径书写错误。还有就是要检查pd.txt.txt的问题。还有个别电脑写相对路径找不到pd.txt,可以修改为绝对路径。
7)报类型转换异常。
通常都是在驱动函数中设置Map输出和最终输出时编写错误。 Map输出的key如果没有排序,也会报类型转换异常。
8)集群中运行wc.jar时出现了无法获得输入文件。
原因:WordCount案例的输入文件不能放用HDFS集群的根目录。
9)出现了如下相关异常
1 2 3 4 5 6 7 8 Exception in thread "main" java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Ljava/lang/String;I)Z at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method) at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:609) at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:977) java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop binaries. at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:356) at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:371) at org.apache.hadoop.util.Shell.<clinit>(Shell.java:364)
解决方案:拷贝hadoop.dll文件到Windows目录C:\Windows\System32。个别电脑还需要修改Hadoop源码。
10)自定义Outputformat时,注意在RecordWirter中的close方法必须关闭流资源。否则输出的文件内容中数据为空。
1 2 3 4 5 6 7 8 9 @Override public void close (TaskAttemptContext context) throws IOException, InterruptedException { if (atguigufos != null ) { atguigufos.close(); } if (otherfos != null ) { otherfos.close(); } }