<small id='3TGfHmL'></small> <noframes id='QXwe86KBjm'>

  • <tfoot id='DwrKuq'></tfoot>

      <legend id='m3AlFgbs'><style id='tscEJRH7Nn'><dir id='sMlhtIQcd'><q id='h0cuXgKM2j'></q></dir></style></legend>
      <i id='W6frunmezP'><tr id='RB8TGnomSh'><dt id='VLf6js'><q id='caD1Szm'><span id='yT0I'><b id='wCGPan'><form id='Q0jX5mnga'><ins id='SNW0lAK'></ins><ul id='MOXrkwG'></ul><sub id='Yavg0'></sub></form><legend id='79WzjkH'></legend><bdo id='xcCfYmBy'><pre id='yrELTv37'><center id='etI2TOciDo'></center></pre></bdo></b><th id='KxJBv9cY'></th></span></q></dt></tr></i><div id='Y9wIN'><tfoot id='CxVzJI'></tfoot><dl id='dgV7lHr8f4'><fieldset id='oLSjwvg'></fieldset></dl></div>

          <bdo id='EsbtfH'></bdo><ul id='NJmgWR9D'></ul>

          1. <li id='MDyJ'></li>
            登陆

            章鱼彩票电脑-从 PageRank Example 谈 Spark 应用程序调优

            admin 2020-02-14 140人围观 ,发现0个评论

            大数据技能与架构

            点击右侧重视,大数据开发范畴最强大众号!

            暴走大数据

            点击右侧重视,暴走大数据!

            最近做了关于Spark Cache功用测验,开端是拿BigData-Benchmark中Spark KMeans来作为测验基准,别离测验各种Cache下运用程序的运转速度,终究运用Spark PageRank Example来验证。在做PageRank测验时,发现有许多风趣的调长处,想到这些调长处或许对用户来说是遍及有用的,现把它收拾出来逐个剖析,以供咱们参阅。

            BigData-Benchmark中的Spark PageRank选用的是Spark开源代码里的PageRank样例代码,原理及代码完成都比较简略,下面我简略地介绍下。

            PageRank根本原理介绍

            PageRank的效果是点评网页的重要性,除了运用于查找成果的排序之外,在其他范畴也有广泛的运用,例如图算法中的节点重要度等。假定一个由4个页面组成的网络如下图所示,B链接到A、C,C链接到A,D链接到一切页面。

            那么A的PR(PageRank)值别离来自B、C、D的奉献之和,因为B除了链接到A还链接到C,D除了链接到A还链接B、C,所以它们对A的奉献需求平摊,核算公式为:

            简略来说,便是依据链出总数平分一个页面的PR值:

            关于上图中的A页面来说,它没有外链,这样核算迭代下去,PR值会悉数收敛到A上去,所以实践上需求对这类没有外链的页面加上系数:

            Spark PageRank Example

            Spark Examples中给出了一个简易的完成,后续评论的相关优化都是依据该简易完成,所以并不必定能够用来处理实践PageRank问题,这儿仅用于引出关于Spark调优的考虑。下面是原始版别的完成代码,因为KM对代码排版极端丑恶,或许影响读者心境,这儿以截图办法展示,完好的代码见PageRank.scala中的runV1。

            上面的代码应该不难了解,它首要经过groupByKey得到每个url链接的urls列表,初始化每个url的初始rank为1.0,然后经过join将每个url的rank均摊到其链接的urls上,终究经过reduceByKey规约来自每个url奉献的rank,经过若干次迭代后得到终究的ranks,为了便利测验,上面代码29行我改成了一个空操作的action,用于触发核算。

            优化一(Cache&Checkpoint)

            从原始版别的代码来看,有些童鞋或许会觉得有必要对ranks做cache,防止每次迭代重核算,咱们无妨先运转下原始代码,看看是否真的有必要,下图是指定迭代次数为3时的Job DAG图,其间蓝色的点表明被cache过。

            从上图能够看到,3次迭代核算是在一个job里趁热打铁的,所以没必要对ranks做cache,因为从整个代码来看,在迭代循环里没有呈现action办法,所以迭代循环中不会触发job,只是是安排rdd之间的依靠联系。

            可是,一般来说迭代次数都比较大,假如迭代1000乃至10000次,上述rdd依靠联系将变得十分长。一方面会添加driver的保护压力,很或许导致driver oom;另一方面或许导致失利重算,单个task失利后,会依据rdd的依靠链从头开端核算。英文歌曲所以从容错以及可用性来说,上述代码完成是不可取的。所幸,spark供给了checkpoint机制,来完成断链及中心成果耐久化。

            运用checkpoint,咱们来改造上述迭代循环,在每迭代若干次后做一次checkpoint,保存中心成果状况,并堵截rdd依靠联系链,迭代循环代码改造如下:

            上述代码中每隔10次迭代,做一次checkpoint,并强制触发核算。必定要留意,在做checkpoint前,必定要对要checkpoint的rdd做cache,否则会重核算。这儿简略描绘下checkpoint的核算流程: 调用rdd.checkpoint()只是是符号该rdd需求做checkpoint,并不会触发核算,只要在遇到action办法后,才会触发核算,在job履行结束后,会发动checkpoint核算,假如rdd依靠链中有rdd被符号为checkpoint,则会对这个rdd再次触发一个job履行checkpoint核算。所以在checkpoint前,对rdd做cache,能够防止checkpoint核算进程中从头依据rdd依靠链核算。在上述代码中变量lastCheckpointRanks记载上一次checkpoint的成果,在一次迭代结束后,删去上一次checkpoint章鱼彩票电脑-从 PageRank Example 谈 Spark 应用程序调优的成果,并更新变量lastCheckpointRanks。

            为了便利测验,我每隔3次迭代做一次checkpoint,一共迭代5次,运转上述代码,整个核算进程中会有一次checkpoint,依据前面checkpoint的核算描绘可知,在代码15行处会有两个job,一个是惯例核算,一个是checkpoint核算,checkpoint核算是直接从缓存中拿数据写到hdfs,所以核算开支是很小的。加上终究的一个job,整个核算进程中一共有3个job,下面是测验进程中job的截图,留意图中对应的行号跟上面贴的代码没有对应联系哦。

            第一个job履行3次迭代核算,并将成果缓存起来,下面是第一个job的DAG。

            第二个job做checkpoint,因为需求checkpoint的rdd现已缓存了,所以不会从头核算,它会越过依靠链中前面的rdd,直接从缓存中读取数据写到hdfs,所以前面的依靠链显现是灰色的。

            第三个job履行剩余的2次迭代核算,因为前3次迭代的成果现已做过checkpoint,所以这儿的依靠链中不包括前3次迭代核算的依靠链,也便是说checkpoint起到了断链效果,这样driver保护的依靠链就不会越变越长了。

            到这儿,咱们有一个略微比较稳定的版别了,完好的代码见PageRank.scala中的runV2。可是,一般实践场景中,links或许会特别大,例如老友联系,就有近10亿的key,每个key对应的value均匀应该也有100-200,不必定能悉数缓存到内存,从之前文章Spark Cache功用测验的定论可知,咱们能够挑选带紧缩的MEMORY_ONLY_SER或DISK_ONLY的缓存办法来削减内存的运用,因为在YARN集群环境中磁盘资源是没有被阻隔的,也便是说一台机器上的磁盘资源是多使命同享的,所以运用DISK_ONLY存在磁盘溢出或被其他使命影响的危险,仍是主张运用带紧缩的MEMORY_ONLY_SER,这样能够大大下降内存的运用,一同功用不至于丢失太多。在上面加了checkpoint的代码根底上,把一切运用cache的当地悉数改成如下办法。

            相同资源和参数下别离运用默许的MEMORY_ONLY和带紧缩的MEMORY_ONLY_SER测验3次迭代的功用,下图是运用默许的MEMORY_ONLY办法缓存时,links在内存中的巨细,能够看到links缓存后占用了6.6G内存。

            改用带紧缩的MEMORY_ONLY_SER的缓存办法后,links缓存后只占用了861.8M内存,仅为之前6.6G的12%。

            经过在日志中打印运转时刻,得到运用MEMORY_ONLY时运转时刻为333s,运用MEMORY_ONLY_SER时运转时刻为391s,功用献身了17%左右,所以运用MEMORY_ONLY_SER是以献身CPU价值来交换内存的一种较为保险的计划,在实践运用进程中需求权衡功用以及内存资源状况,其实关于长时刻运转的离线使命来说,他们之间的功用不同不是特别显着。

            优化二(数据结构)

            在上述PageRank代码完成中,links中的记载为url -> urls,url类型为String,一般状况下,String占用的内存比Int、 Long等原生类型要多,在上述代码完成中,url完全能够被编码成一个Long型,因为在整个核算进程中底子没有用到url中的内容,这章鱼彩票电脑-从 PageRank Example 谈 Spark 应用程序调优样就能够必定程度上削减links缓存时的内存占用。因为在我的测验数据中,url自身是由数字来表明的,所以在前面代码的根底上再将links的界说改为如下代码:

            完好的代码见PageRank.scala中的runV3。经过测验发现,url改成Long型后,运用MEMORY_ONLY缓存办法时,如下图所示,links仅占用2.5G,相比为String类型时的6.6G,缩小了一半多。此外,url改成Long型后,运转3次迭代的时刻为278s,相比为String类型时的333s,功用提升了17%左右。

            运用带紧缩的MEMORY_ONLY_SER缓存办法时,如下图所示,links仅占用549.5M,相比为String类型时的861.8M,也缩小了近一半。此外,url改成Long型后,运转3次迭代的时刻为306s,相比为String类型时的391s,功用提升了21%左右。

            优化三(数据歪斜)

            经过前面两个优化后,根本能够运用到线上跑了,可是,或许还不行,假如咱们的数据会集有少量url链接的urls特别多,那么在运用groupByKey初始化links时,少量记载的value(urls)或许会有溢出危险,因为groupByKey底层是用一个Array保存value,假如一个节点链接了数十万个节点,那么要开一个超大的数组,即便不溢出,很或许因为没有足够大的接连内存,导致频频GC,从而引发OOM等致命性过错,一般咱们把这类问题称之为数据歪斜问题。此外,在后续迭代循环中links和ranks的join也或许因为数据歪斜导致部分task十分慢乃至引发OOM,下图是groupByKey和join的示意图,左面是groupByKey后得到每个url链接的urls,底层用数组保存,在join时,shuffle阶段会将来自两个rdd相同key的记载经过网络拉到一个partition中,右边显现对url1的shuffle read,假如url1对应的urls特别多,join进程将会十分慢。

            对key进行分桶

            首要咱们应该考虑防止运用groupByKey,这是导致后续数据歪斜的源头。已然或许存在单个key对应的value(urls)特别多,那么能够将key做一个随机化处理,例如将具有相同key的记载随机分配到10个桶中,这样就适当于把数据歪斜的记载给打散了,其大约原理如下图所示。

            依据上面的理论根底,咱们先得到不必groupByKey的links:

            再剖析前面代码里的迭代循环,发现咱们之前运用groupByKey很大一部分原因是想要得到每个key对应的urls size,咱们能够独自经过reduceByKey来得到,reduceByKey会做本地combine,这个操作shuffle开支很小的:

            现在咱们就能够运用cogroup将links、outCnts以及ranks三者join起来了,很快咱们会想到运用如下代码:

            可是!可是!可是!这样做仍是会跟之前相同呈现数据歪斜,因为cogroup履行进程中,在shuffle阶段仍是会把links中相同key的记载分到同一个partition,也就说上面代码pair._1.iterator也或许十分大,这个iterator底层也是Array,面对的问题根本没处理。

            所以咱们就要考虑运用前面介绍的分桶办法了,对links中的每条记载都随机打散到10个桶中,那么相同key的记载就会被随机分到不同桶中了:

            可章鱼彩票电脑-从 PageRank Example 谈 Spark 应用程序调优是,cogroup是依照key进行join的,便是说它把来自多个rdd具有相同key的记载会聚到一同核算,已然links的key现已被咱们改变了,那么outCnts和ranks也要变成跟links相同的办法,才干join到一同去核算:

            有了这个根底后,咱们就能够将前面的cogroup逻辑修正一下,让他们能够顺畅join到一块儿去:

            完好的代码见PageRank.scala中的runV4。将上述逻辑收拾成如下图,能够看到,其实咱们对outCnts和ranks做了胀大处理,才干确保cogroupshuffle阶段关于links中的每条记载,都能找到与之对应的outCnts和ranks记载。

            其实这种做法会极大地丢失功用,尽管这样做或许把之前OOM的问题搞定,能够不犯错的跑完,可是因为数据胀大,实践跑起来是十分慢的,不主张选用这种办法处理数据歪斜问题,这儿只是引出一些问题让咱们更多地去考虑。

            拆分发作歪斜的key

            有了前面的剖析根底,咱们知道对key分桶的办法,是不加区分地对一切key都一股脑地处理了,把不歪斜的key也作为歪斜来处理了,其实大部分实践状况下,只要少量key有歪斜,假如大部分key都歪斜那就不叫数据歪斜,那叫数据量特别大。所以咱们能够考虑对歪斜的key和不歪斜的key别离用不同的处理逻辑,对不歪斜的key,仍是用本来groupByKey和join办法来处理,对歪斜的key能够考虑运用broadcast来完成map join,因为歪斜的key一般来说是可数的,其对应的outCnts和ranks信息在咱们PageRank场景里也不会很大,所以能够运用播送。

            首要咱们把链接的urls个数超越1000000的key界说为歪斜key,运用下面代码将links切分为两部分:

            首要核算出链接数超越1000000的key,播送到每个核算节点,然后过滤links,章鱼彩票电脑-从 PageRank Example 谈 Spark 应用程序调优假如key在播送变量中则为歪斜的数据,否则为非歪斜的数据,过滤结束后原始links被毁掉。下面就能够在迭代循环中别离处理歪斜的数据skewed和非歪斜的数据noSkewed了。

            对noSkewed运用本来的办法:

            对skewed运用broadcast办法完成map join,类似地,要把歪斜的key对应的rank搜集起来播送,之前的cogroup中的outCnts和ranks在这儿就都被播送了,所以能够直接在map操作里完成对skewed中的数据处理:

            终究将两部分的处理成果union一下:

            后边的逻辑就跟前面相同了,完好的代码见PageRank.scala中的runV5。别离测验runV3和runV5版别,迭代3次,在没有数据歪斜的状况下,相同数据、资源和参数下runV3运转时刻306s,runV5运转时刻311s,可是在有数据歪斜的状况下,相同数据、资源和参数下runV3运转时刻722s并伴有严峻的GC,runV5运转时刻472s。能够发现runV5版别在不献身功用的状况能够处理数据歪斜问题,一同还能以runV3相同的功用处理不歪斜的数据集,所以说runV5版别鲁棒性和可用性更强。

            优化四(资源运用最大化)

            经过前面几个优化操作后,PageRank.scala中的runV5版别根本能够用于线上例行化跑作业了,可是布置到线上集群,咱们应当考虑怎么让资源运用最大化。为了测验便利,测验数据会集没有数据歪斜,下面就拿PageRank.scalarunV5来测验并监控资源运用状况。

            原始测验数据(运用带紧缩的MEMORY_ONLY_SER缓存办法)状况如下表:

            磁盘中巨细

            links缓存巨细

            分区数

            1.5G

            549.5M

            20

            运转3次迭代,一开端大约估量运用如下资源,运用5个executor,每个executor配2个core,一次并行运转10个partition,20个partition 2轮task就能够跑完:

            driver_mem

            num_executor

            executor_mem

            executor_cores

            4G

            5

            2G

            2

            在提交参数中加上如下额定jvm参数,表明别离对driver和executor在运转期间敞开Java Flight Recorder:

            运转结束后,核算运转时刻为439s,将driver.jfr和excutor.jfr拿到开发机上来,翻开jmc剖析东西(坐落java装置目录bin/下面),首要咱们看driver的监控信息,主页如下图所示,能够看到driver的cpu占用是很小的,不存在瓶颈。

            切到内存tab,把物理内存的两个勾选去掉,能够看到driver的内存运用曲线,咱们给了4g,可是实践上最大也就用了差不多1g,看下图中的GC核算信息,没有什么瓶颈。

            所以给driver分配4g是糟蹋的,咱们把它调到章鱼彩票电脑-从 PageRank Example 谈 Spark 应用程序调优2g,尽管实践上只用了大约1g,这儿多给driver留点地步,其他装备不变,从头提交程序,核算运转时刻为443s,跟4g时运转时刻439s差不多。

            再来看executor的监控信息,主页如下图所示,能够看到executor的cpu运用显着比driver多,因为要做序列化、紧缩、排序等操作。

            再切到内存tab,如下图所示,能够看到executor的内存运用动摇较大,最大内存运用差不多1.75g,咱们给了2g,仍是适当适宜的。可是看下面的GC核算信息,发现最长暂停4s多,并且废物收回次数也较多。

            为此,咱们切到"GC时刻"tab,能够看到,GC仍是比较频频的,还有一次暂停4s多的GC,看右边GC类型,对最长暂停时刻从大到小排序,居然有几个SerialOld类型的GC,其他一部分是ParNew类型GC,一部分是CMS类型的GC,没有呈现FULL GC,下面先剖析内存占用状况,回过头来再剖析这儿呈现的怪异SerialOld。

            咱们再看下堆内存大目标占用状况,大目标主要是在ExternalAppendOnlyMap和ExternalSorter中,ExternalAppendOnlyMap用于寄存shuffle read的数据,ExternalSorter用于寄存shuffle write的数据,意图是对记载排序,这两个数据结构底层运用Array(表现为大目标)存储kv记载。

            切换到TLAB,再细化到小目标,能够看到大部分是Long型(url),翻开仓库盯梢,大部分是用在shuffle阶段,因为在join时,一方面会读取groupByKey后的links,用于做shuffle write,一方面在shuffle read阶段,将相同key的links和ranks拉到一同做join核算。

            所以全体来说,内存状况是契合事务逻辑的,没有呈现不可思议的内存占用。让人有点摸不清脑筋的是,GC信息中有SerialOld这玩意儿,我分明用了CMS废物收回办法,经过一番Google查阅材料,说"Concurrent Mode Failure"或许导致Serial Old的呈现,原因是当CMS GC正进行时,此刻有新的目标要进入老时代,可是老时代空间缺乏。仔细剖析,个人觉得或许是因为CMS GC后存在较多的内存碎片,而咱们的程序在shuffle阶段底层运用Array,需求接连内存,导致CMS 章鱼彩票电脑-从 PageRank Example 谈 Spark 应用程序调优GC进程中呈现了"Concurrent Mode Failure",才退化到Serial Old,Serial Old是选用符号收拾收回算法,收回进程中会收拾内存碎片。这样看来,应该是CMS GC进程中,老时代空间缺乏导致的,从两个方面考虑优化下,一是添加老时代内存占比,二是减小参数-XX:CMSInitiatingOccupancyFraction,下降触发CMS GC的阈值,让CMS GC及早收回老时代。

            首要咱们添加老时代内存占比,也便是下降新生代内存占比,默许-XX:NewRatio=2,咱们把它改成-XX:NewRatio=3,将老时代内存占比由2/3提升到3/4,从头提交程序,得到executor.jfr,翻开GC监控信息,发现有很大的改进,不在呈现Serial Old类型的GC了,最长暂停时刻从本来的4s下降到600ms左右,全体运转时刻从448s下降到436s。

            把上述-XX:NewRatio=3去掉,设置参数-XX:CMSInitiatingOccupancyFraction=50(前面是设置了60),从头提交程序,得到executor GC的监控信息,发现GC最大暂停时刻也降下来了,可是因为老时代GC的频率加大了,全体运转时刻为498s,比本来的436s还要长。

            归纳考虑以上信息,添加executor的jvm发动参数-XX:NewRatio=3,能把gc状况调整到一个较优的状况。

            值得一提的是,现在咱们渠道还没开放给用户装备jvm参数的功用,以上监控信息暂时只能内部开发人员才干拿的到,这儿拿出来仅作为相关调优考虑,考虑渠道的通用性,咱们现已装备好了较为平衡的jvm参数,后续探究怎么向用户供给这些监控信息作为参阅。

            总结

            Spark给咱们供给了一种简略灵敏的大数据编程结构,可是关于许多实践问题的处理,还应该多考虑下怎么让咱们写出来的运用程序更高效更节省。除了终究关于功用监控外,以上其他几个调长处是能够推行到其他运用的,在咱们编写spark运用程序时,经过这种考虑也能够加深咱们对spark的了解。

            欢迎点赞+保藏+转发朋友圈本质三连

            文章不错?点个【在看】吧!

            请关注微信公众号
            微信二维码
            不容错过
            Powered By Z-BlogPHP