<small id='JLkuN4Z'></small> <noframes id='8bSkolIyu'>

  • <tfoot id='uV8FwXvR'></tfoot>

      <legend id='M5pXPhYC3v'><style id='bDEkSm0rZ'><dir id='zeZEXRSd5o'><q id='flx9AJUW5R'></q></dir></style></legend>
      <i id='Kw94Xb2QA'><tr id='7ln04Y'><dt id='LC0SqX58A'><q id='gn91p8Q'><span id='Q6Z3'><b id='wHtbxiRKES'><form id='rCBD'><ins id='Rq7GEKz'></ins><ul id='XNsJQhg'></ul><sub id='UCD1fTv'></sub></form><legend id='LluS'></legend><bdo id='rKjdXID'><pre id='z6b0g'><center id='xKPH'></center></pre></bdo></b><th id='K8EgLUta2'></th></span></q></dt></tr></i><div id='05G8Bj4w'><tfoot id='hAZmn72IEl'></tfoot><dl id='uomKvF'><fieldset id='FCcB1XxS4l'></fieldset></dl></div>

          <bdo id='b2aOv7A'></bdo><ul id='MDdg'></ul>

          1. <li id='6pIDtEs'></li>
            登陆

            章鱼彩票电脑-Kafka+Spark Streaming办理offset的几种办法

            admin 2020-02-14 177人围观 ,发现0个评论

            来历:大数据技能与架构作者:王知无

            大数据技能与架构

            点击右侧重视,大数据开发范畴最强大众号!

            暴走大数据

            点击右侧重视,暴走大数据!

            By 大数据技能与架构

            场景描绘:Kafka合作Spark Streaming是大数据范畴常见的黄金搭档之一,首要是用于数据实时入库或剖析。为了应对或许呈现的引起Streaming程序溃散的异常情况,咱们一般都需求手动办理好Kafka的offset,而不是让它主动提交,即需求将enable.auto.commit设为false。只要办理好offset,才能使整个流式体系最大极限地挨近exactly once语义。

            关键词:offset Spark Streaming

            Kafka+Spark Streaming首要用于实时流处理。到目前为止,在大数据范畴中是一种十分常见的架构。Kafka在其间首要起着一个缓冲的效果,一切的实时数据都会通过kafka。所以对kafka offset的办理是其间至关重要的一环。

            咱们一般都需求手动办理好Kafka的offset,而不是让它主动提交,即需求将enable.auto.commit设为false。

            一但办理不善,就会到导致数据丢掉或重复消费。

            offset的办理办法

            一个简略的流程如下:

            • 在Kafka DirectStream初始化时,取得当时一切partition的存量offset,以让DirectStream能够从正确的方位开端读取数据。
            • 读取音讯数据,处理并存储成果。
            • 提交offset,并将其耐久化在牢靠的章鱼彩票电脑-Kafka+Spark Streaming办理offset的几种办法外部存储中。
            • 图中的“process and store results”及“commit offsets”两项,都能够施加更强的约束,比方存储成果时保证幂等性,或许提交offset时选用原子操作。

            保存offset的办法

            Checkpoint:

            Spark Streaming的checkpoints是最基本的存储状况信息的办法,一般是保存在HDFS中。可是最大的问题是假如streaming程序晋级的话,checkpoints的数据无法运用,所以简直没人运用。

            offset的三种办理办法:

            主动提交offset:

            • enable.auto.commit=true。
            • 一但consumer挂掉,就会导致数据丢掉或重复消费。
            • offset不可控。

            Kafka本身的offset办理:

            • (归于At-least-once语义,假如做好了幂等性,能够运用这种办法):
            • 在Kafka 0.10+版别中,offset章鱼彩票电脑-Kafka+Spark Streaming办理offset的几种办法的默许存储由ZooKeeper移动到了一个自带的topic中,名为__consumer_offsets。
            • Spark Streaming也专门供给了commitAsync() API用于提交offset。
            • 需求将参数修正为enable.auto.commit=false。
            • 在我实践测验中发现,这种offset的办理办法,不会丢掉数据,但会呈现重复消费。
            • 停掉streaming运用程序再次发动后,会再次消费停掉前最终的一个批次数据,应该是由于offset是异步提交的办法导致,offset更新不及时引起的。
            • 因而需求做好数据的幂等性。
            • (修正源码将异步改为同步,应该是能够做到Exactly-once语义的)

            自界说offset:

            • (引荐,选用这种办法,能够做到At-least-once语义):
            • 能够将offset存放在第三方储中,包括RDBMS、Redis、ZK、ES等。
            • 若消费数据存储在带业务的组件上,则强烈引荐将offset存储在一起,凭借业务完成 Exactly-once 语义。

            示例

            Kafka本身办理offset:

            在Kafka 0.10+版别中,offset的默许存储由ZooKeeper移动到了一个自带的topic中,名为__consumer_offsets。所以咱们读写offset的目标正是这个topic,Spark Streaming也专门供给了commitAsync() API用于提交offset。实践上,一切都现已封装好了,直接调用相关API即可。

            stream.foreachRDD { rdd =>
            val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
            // 保证成果都现已正确且幂等地输出了
            stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
            }

            ZooKeeper

            在Spark Streaming衔接Kafka运用中运用Zookeeper来存储offsets也是一种比较牢靠的办法。

            在这个计划中,Spark Streaming使命在发动时会去Zookeeper中读取每个分区的offsets。假如有新的分区呈现,那么他的offset将会设置在最开端的方位。在每批数据处理完之后,用户需求能够挑选存储已处理数据的一个offset或许最终一个offset。此外,新顾客将运用跟旧的Kafka 顾客API相同的格局将offset保存在ZooKeeper中。因而,任何追寻或监控Zookeeper中Kafka Offset的东西依然收效的。

            一个典型的东西类:

            class ZkKafkaOffsetManager(zkUrl: String) {
            private val logger = LoggerFactory.getLogger(classOf[ZkKafkaOffsetManager])
            private val zkClientAndConn = ZkUtils.createZkClientAndConnection(zkUrl, 30000, 30000);
            private val zkUtils = new ZkUtils(zkClientAndConn._1, zkCl英菲迪尼ientAndConn._2, false)
            def readOffsets(topics: Seq[String], groupId: String): Map[TopicPartition, Long] = {
            val offsets = mutable.HashMap.empty[TopicPartition, Long]
            val partitionsForTopics = zkUtils.getPartitionsForTopics(topics)
            // /consumers//offsets//
            partitionsForTopics.foreach(partitions => {
            val topic = partitions._1
            val groupTopicDirs = new ZKGroupTopicDirs(groupId, topic)
            partitions._2.foreach(partition => {
            val path = groupTopicDirs.consumerOffsetDir + "/" + partition
            try {
            val data = zkUtils.readData(path)
            if (data != null) {
            offsets.put(new TopicPartition(topic, partition), data._1.toLong)
            logger.info(
            "Read offset - topic={}, partition={}, offset={}, path={}",
            Seq[AnyRef](topic, partition.toString, data._1, path)
            )
            }
            } catch {
            case ex: Exception =>
            offsets.put(new TopicPartition(topic, partition), 0L)
            logger.info(
            "Read offset - not exist: {}, topic={}, partition={}, path={}",
            Seq[AnyRef](ex.getMessage, topic, partition.toString, path)
            )
            }
            })
            })
            offsets.toMap
            }
            def saveOffsets(offsetRanges: Seq[OffsetRa章鱼彩票电脑-Kafka+Spark Streaming办理offset的几种办法nge], groupId: String): Unit = {
            offsetRanges.foreach(range => {
            val groupTopicDirs = new ZKGroupTopicDirs(groupId, range.topic)
            val path = groupTopicDirs.consumerOffsetDir + "/" + range.partition
            zkUtils.updatePersiste章鱼彩票电脑-Kafka+Spark Streaming办理offset的几种办法ntPath(path, range.untilOffset.toString)
            logger.info(
            "Save offset - topic={}, partition={}, offset={}, path={}",
            Seq[AnyRef](range.topic, range.partition.toString, range.untilOffset.toString, path)
            )
            })
            }
            }

            这样,offset就会被存储在ZK的/consumers/[groupId]/offsets/[topic]/[partition]途径下。当初始化DirectStream时,调用readOffsets()办法取得offset。当数据处理完成后,调用saveOffsets()办法来更新ZK中的值。

            其他介质

            Hbase、Redis乃至Mysql也经常被用作进行offset的存储。办法和上面相似,代码能够去网上搜一搜。

            需求留意的点

            特别需求留意,在转化过程中不能损坏RDD分区与Kafka分区之间的映射联系。亦即像map()/mapPartitions()这样的算子是安全的,而会引起shuffle或许repartition的算子,如reduceByKey()/join()/coalesce()等等都是不安全的。

            对Dstream进行窗口操作后就不能手动提交offset。由于保存offset需求HasOffsetRanges这个类。而HasOffsetRanges是KafkaRDD的一个trait,而CanCommitOffsets是DirectKafkaInputDStream的一个trait。

            Scala Trait(特征) 相当于章鱼彩票电脑-Kafka+Spark Streaming办理offset的几种办法 Java 的接口,实践上它比接口还功能强大。

            与接口不同的是,它还能够界说特点和办法的完成。

            如下:private[spark] class KafkaRDD[K, V](
            sc: SparkContext,
            val kafkaParams: ju.Map[String, Object],
            val offsetRanges: Array[OffsetRange],
            val preferredHosts: ju.Map[TopicPartition, String],
            useConsumerCache: Boolean
            ) extends RDD[ConsumerRecord[K, V]](sc, Nil) with Logging with HasOffsetRanges
            private[spark] class DirectKafkaInputDStream[K, V](
            _ssc: StreamingContext,
            locationStrategy: LocationStrategy,
            consumerStrategy: ConsumerStrategy[K, V],
            ppc: PerPartitionConfig
            ) extends InputDStream[ConsumerRecord[K, V]](_ssc) with Logging with CanCommitOffsets {

            不能对stream目标做transformation操作之后的成果进行强制转化(会直接报ClassCastException),由于RDD与DStream的类型都改变了。只要RDD或DStream的包括类型为ConsumerRecord才行。

            欢迎点赞+保藏+转发朋友圈本质三连

            文章不错?点个【在看】吧!

            请关注微信公众号
            微信二维码
            不容错过
            Powered By Z-BlogPHP