Spark】SparkStreaming消费kafka数据的时候如何保证exactly-once消费语义

2020-05-28 09:05发布

2条回答
天天
2楼 · 2020-08-28 12:00

Spark Streaming的输出一般是靠foreachRDD()算子来实现,它默认是at least once的。如果输出过程中途出错,那么就会重复执行直到写入成功。为了让它符合exactly once,可以施加两种限制之一:幂等性写入(idempotent write)、事务性写入(transactional write)。


小橘子
3楼 · 2021-07-01 11:26

在Kafka、Storm、Flink、Spark Streaming等分布式流处理系统中(没错,Kafka本质上是流处理系统,不是单纯的“消息队列”),存在三种消息传递语义(message delivery semantics),分别是:

  • at least once:每条消息会被收到1次或多次。例如发送方S在超时时间内没有收到接收方R的通知(如ack),或者收到了R的报错,就会不断重发消息直至R传回ack。

  • at most once:每条消息会被收到0次或1次。也就是说S只负责向R发送消息,R也没有任何通知机制。无论R最终是否收到,S都不会重发。

  • exactly once:是上面两个的综合,保证S发送的每一条消息,R都会“不重不漏”地恰好收到1次。它是最强最精确的语义,也最难实现。

在我们的日常工作中,90%的流处理业务都是通过Kafka+Spark Streaming+HDFS来实现的(这里Kafka的作用是消息队列了)。本篇探讨保证exactly once语义的方法。

如上面的图所示,一个Spark Streaming程序由三步组成:输入、处理逻辑、输出。要达到exactly once的理想状态,需要三步协同进行,而不是只与处理逻辑有关。Kafka与Spark Streaming集成时有两种方法:旧的基于receiver的方法,新的基于direct stream的方法。下面两张图可以清楚地说明。

基于receiver的方法

基于receiver的方法采用Kafka的高级消费者API,每个executor进程都不断拉取消息,并同时保存在executor内存与HDFS上的预写日志(write-ahead log/WAL)。当消息写入WAL后,自动更新ZooKeeper中的offset。
它可以保证at least once语义,但无法保证exactly once语义。虽然引入了WAL来确保消息不会丢失,但还有可能会出现消息已经写入WAL,但offset更新失败的情况,Kafka就会按上一次的offset重新发送消息。这种方式还会造成数据冗余(Kafka broker中一份,Spark executor中一份),使吞吐量和内存利用率降低。现在基本都使用下面基于direct stream的方法了。

基于direct stream的方法

基于direct stream的方法采用Kafka的简单消费者API,它的流程大大简化了。executor不再从Kafka中连续读取消息,也消除了receiver和WAL。还有一个改进就是Kafka分区与RDD分区是一一对应的,更可控。
driver进程只需要每次从Kafka获得批次消息的offset range,然后executor进程根据offset range去读取该批次对应的消息即可。由于offset在Kafka中能唯一确定一条消息,且在外部只能被Streaming程序本身感知到,因此消除了不一致性,达到了exactly once。
不过,由于它采用了简单消费者API,我们就需要自己来管理offset。否则一旦程序崩溃,整个流只能从earliest或者latest点恢复,这肯定是不稳妥的。offset管理在之前的文章中提到过,这里不再赘述。

Kafka作为输入源可以保证exactly once,那么处理逻辑呢?答案是显然的,Spark Streaming的处理逻辑天生具备exactly once语义。
Spark RDD之所以被称为“弹性分布式数据集”,是因为它具有不可变、可分区、可并行计算、容错的特征。一个RDD只能由稳定的数据集生成,或者从其他RDD转换(transform)得来。如果在执行RDD lineage的过程中失败,那么只要源数据不发生变化,无论重新执行多少次lineage,都一定会得到同样的、确定的结果。

最后,我们还需要保证输出过程也符合exactly once语义。Spark Streaming的输出一般是靠foreachRDD()算子来实现,它默认是at least once的。如果输出过程中途出错,那么就会重复执行直到写入成功。为了让它符合exactly once,可以施加两种限制之一:幂等性写入(idempotent write)、事务性写入(transactional write)。

  • 幂等性写入
    幂等原来是数学里的概念,即f(f(x))=f(x)。幂等写入就是写入多次与写入一次的结果完全相同,可以自动将at least once转化为exactly once。这对于自带主键或主键组的业务比较合适(比如各类日志、MySQL binlog等),并且实现起来比较简单。
    但是它要求处理逻辑是map-only的,也就是只能包含转换、过滤等操作,不能包含shuffle、聚合等操作。如果条件更严格,就只能采用事务性写入方法。

stream.foreachRDD { rdd =>rdd.foreachPartition { iter =>// make sure connection pool is set up on the executor before writingSetupJdbc(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword)iter.foreach { case (key, msg) =>DB.autoCommit { implicit session =>// the unique key for idempotency is just the text of the message itself, for example purposessql"insert into idem_data(msg) values (${msg})".update.apply}}}}

事务性写入
这里的事务与DBMS中的事务含义基本相同,就是对数据进行一系列访问与更新操作所组成的逻辑块。为了符合事务的ACID特性(https://en.wikipedia.org/wiki/ACID_(computer_science)),必须引入一个唯一ID标识当前的处理逻辑,并且将计算结果与该ID一起落盘。ID可以由主题、分区、时间、offset等共同组成。
事务操作可以在foreachRDD()时进行。如果数据写入失败,或者offset写入与当前offset range不匹配,那么这一批次数据都将失败并且回滚。

// localTx is transactional, if metric update or offset update fails, neither will be committedDB.localTx { implicit session =>// store metric dataval metricRows = sql"""    update txn_data set metric = metric + ${metric}      where topic = ${osr.topic}    """.update.apply()if (metricRows != 1) {throw new Exception("...")}// store offsetsval offsetRows = sql"""    update txn_offsets set off = ${osr.untilOffset}      where topic = ${osr.topic} and part = ${osr.partition} and off = ${osr.fromOffset}    """.update.apply()if (offsetRows != 1) {throw new Exception("...")}}

相关问题推荐

  • 回答 1

    自己随意编写一份测试数据,所用的测试数据如下,需求是按照第一列的字母分组,然后按照第二列数据取出每一组内前N个数据,后面我分别列出了我使用的三种方案来实现该需求,不同方案在不同的场景下会有各自的优势 a 25b 36c 24d 45e 60a 33b 26c 47d 43e 62a...

  • 回答 9

    1. Spark 概述1.1. 什么是 Spark(官网:http://spark.apache.org)spark 中文官网:http://spark.apachecn.orgSpark 是一种快速、通用、可扩展的大数据分析引擎,2009 年诞生于加州大学伯克利分校AMPLab,2010 年开源,2013 年 6 月成为 Apache 孵化项目,20...

  • 回答 2

    搭建高可用模式用的协同处理框架。

  • 回答 1

    多数是版本不对、、、、、、引用错误,,,等

  • 回答 1

    目前,还是选择saprkflink还有成长的空间

  • 回答 2

            org.springframework.boot         spring-boot-starter-parent         1.3.2.RELEASE                             2.10.4         1.6.2                       ...

  • 回答 1

    一、图概念术语1.1 基本概念图是由顶点集合(vertex)及顶点间的关系集合(边edge)组成的一种数据结构。这里的图并非指代数中的图。图可以对事物以及事物之间的关系建模,图可以用来表示自然发生的连接数据,如:社交网络、互联网web页面常用的应用有:在地图...

  • 回答 4

    该模式被称为 Local[N] 模式,是用单机的多个线程来模拟 Spark 分布式计算,通常用来验证开发出来的应用程序逻辑上有没有问题其中N代表可以使用 N 个线程,每个线程拥有一个 core 。如果不指定 N,则默认是1个线程(该线程有1个 core )。如果是 local[*],则...

  • 回答 17

    flume和kafka的侧重点不同,flume追求的是数据和数据源、数据流向的多样性,适合多个生产者的场景;flume有自己内置的多种source和sink组件,具体操作方式是编写source、channel和sink的.conf配置文件,开启flume组件的时候用命令关联读取配置文件实现kafka追...

  • 回答 2

    探究的是kafka的数据生产出来之后究竟落到了哪一个分区里面去了第一种分区策略:给定了分区号,直接将数据发送到指定的分区里面去第二种分区策略:没有给定分区号,给定数据的key值,通过key取上hashCode进行分区第三种分区策略:既没有给定分区号,也没有给...

  • 回答 2

    读取数据的过程中,数据是属于某一个topic的某一个partition对应的某一个segment文件中的某一条记录。①定位到具体的segment日志文件②计算查找的offset在日志文件的相对偏移量

  • 回答 3

    消息丢失解决方案:        首先对kafka进行限速, 其次启用重试机制,重试间隔时间设置长一些,最后Kafka设置acks=all,即需要相应的所有处于ISR的分区都确认收到该消息后,才算发送成功。消息重复解决方案:        消息可以使用唯一id标识 ...

  • 回答 4

    kafka数据推送失败,如果是生产者推送kafka数据失败,有可能是网络端口问题,配置网络ip的域名映射,防火墙问题可以做防火墙策略,还有配置的ip和端口是否正确,还要确定kafka是否启动服务。kafka推送数据到其他系统,可以使用flume以及消费者来做...

  • 回答 5

    遇到这种问题,基本上是心跳或offset更新不及时导致。在kafka环境中,有以下几个参数对于数据重复有很好的效果。auto.commit.interval.msconsumer向zookeeper提交offset的频率,单位是秒,默认60*1000此值太大会导致数据重复消费,将其调小可避免重复数据。建...

  • 回答 3

    数据一致性可以理解为数据前后是否一样,比如说数据的丢失或者重复都回造成数据的不一致!kafka的0.11之后引入了幂等性可以解决单次会话、单个partition的数据不重复,生产者可以利用其0.11后引入的事务来保证其数据一致性,消费者的话其实事务是无法保证的,...

没有解决我的问题,去提问