Flink kafka的offset怎么保存

2020-07-24 10:12发布

2条回答
王超
2楼 · 2020-07-24 10:12

flink对接kafka这方面对接的比较好,由kafka自身维护就可以,外部并设置了checkpoint保存 高版本 低版本的kafka保证村早ZK中

乔治与佩奇
3楼 · 2021-12-03 11:32

Blog Address:http://blog.csdn.net/jsjsjs1789 https://blog.csdn.net/jsjsjs1789/article/details/88956080

Flink对Offset的管理,有两种方式: 1.Checkpointing disabled 完全依赖于kafka自身的API 2.Checkpointing enabled 当checkpoint做完的时候,会将offset提交给kafka or zk 本文只针对于第二种,Checkpointing enabled

FlinkKafkaConsumerBase中的 notifyCheckpointComplete

@Override//当checkpoint完成的时候,此方法会被调用
 public final void notifyCheckpointComplete(long checkpointId) throws Exception {
  if (!running) {
   LOG.debug("notifyCheckpointComplete() called on closed source");
   return;
  }

  final AbstractFetcher fetcher = this.kafkaFetcher;
  if (fetcher == null) {
   LOG.debug("notifyCheckpointComplete() called on uninitialized source");
   return;
  }

  if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
   // only one commit operation must be in progress
   if (LOG.isDebugEnabled()) {
    LOG.debug("Committing offsets to Kafka/ZooKeeper for checkpoint " + checkpointId);
   }

   try {
    final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
    if (posInMap == -1) {
     LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
     return;
    }

    @SuppressWarnings("unchecked")
    Map offsets =
     (Map) pendingOffsetsToCommit.remove(posInMap);

    // remove older checkpoints in map
    for (int i = 0; i < posInMap; i++) {
     pendingOffsetsToCommit.remove(0);
    }

    if (offsets == null || offsets.size() == 0) {
     LOG.debug("Checkpoint state was empty.");
     return;
    }

   //通过kafkaFetcher提交offset fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback);
   } catch (Exception e) {
    if (running) {
     throw e;
    }
    // else ignore exception if we are no longer running
   }
  }
 }

跳转到kafkaFetcher

@Override protected void doCommitInternalOffsetsToKafka(
  Map offsets,
  @Nonnull KafkaCommitCallback commitCallback) throws Exception {

  @SuppressWarnings("unchecked")
  List> partitions = subscribedPartitionStates();

  Map offsetsToCommit = new HashMap<>(partitions.size());

  for (KafkaTopicPartitionState partition : partitions) {
   Long lastProcessedOffset = offsets.get(partition.getKafkaTopicPartition());
   if (lastProcessedOffset != null) {
    checkState(lastProcessedOffset >= 0, "Illegal offset value to commit");

    // committed offsets through the KafkaConsumer need to be 1 more than the last processed offset.
    // This does not affect Flink's checkpoints/saved state.
    long offsetToCommit = lastProcessedOffset + 1;

    offsetsToCommit.put(partition.getKafkaPartitionHandle(), new OffsetAndMetadata(offsetToCommit));
    partition.setCommittedOffset(offsetToCommit);
   }
  }

  // record the work to be committed by the main consumer thread and make sure the consumer notices that
  consumerThread.setOffsetsToCommit(offsetsToCommit, commitCallback);
 }

可以看到调用consumerThread.setOffsetsToCommit方法

void setOffsetsToCommit(
   Map offsetsToCommit,
   @Nonnull KafkaCommitCallback commitCallback) {

  // record the work to be committed by the main consumer thread and make sure the consumer notices that
  /*
  !=null的时候,说明kafkaConsumerThread更新的太慢了,新的将会覆盖old
  当此处执行的时候,kafkaconsumerThread中consumer.commitAsync()
  
这个方法还是关键的方法,直接给nextOffsetsToCommit赋值了
nextOffsetsToCommit,我们可以看到是AtomicReference,可以原子更新对象的引用
   */
 
  if (nextOffsetsToCommit.getAndSet(Tuple2.of(offsetsToCommit, commitCallback)) != null) {
   log.warn("Committing offsets to Kafka takes longer than the checkpoint interval. " +
     "Skipping commit of previous offsets because newer complete checkpoint offsets are available. " +
     "This does not compromise Flink's checkpoint integrity.");
  }

  // if the consumer is blocked in a poll() or handover operation, wake it up to commit soon
  handover.wakeupProducer();

  synchronized (consumerReassignmentLock) {
   if (consumer != null) {
    consumer.wakeup();
   } else {
    // the consumer is currently isolated for partition reassignment;
    // set this flag so that the wakeup state is restored once the reassignment is complete
    hasBufferedWakeup = true;
   }
  }
 }

nextOffsetsToCommit已经有值了,接下我们来看一下KafkaConsumerThread的run方法

@Override public void run() {
  // early exit check
  if (!running) {
   return;
  }

  ......
   // main fetch loop
   while (running) {

    // check if there is something to commit//default false
    if (!commitInProgress) {
     // get and reset the work-to-be committed, so we don't repeatedly commit the same//setCommittedOffset方法已经给nextOffsetsToCommit赋值了,这里进行获取,所以commitOffsetsAndCallback is not null
     final Tuple2, KafkaCommitCallback> commitOffsetsAndCallback =
       nextOffsetsToCommit.getAndSet(null);

     if (commitOffsetsAndCallback != null) {
      log.debug("Sending async offset commit request to Kafka broker");

      // also record that a commit is already in progress
      // the order here matters! first set the flag, then send the commit command.
      commitInProgress = true;
      consumer.commitAsync(commitOffsetsAndCallback.f0, new CommitCallback(commitOffsetsAndCallback.f1));
     }
    }

    ....
 }

至此offset就更新完毕了,我们可以很清楚的看到,当checkpoint完成时,调用相关的commit方法,将kafka offset提交至kafka broker


相关问题推荐

  • 什么是大数据时代?2021-01-13 21:23
    回答 100

    大数据(big data)一词越来越多地被提及,人们用它来描述和定义信息爆炸时代产生的海量数据,而这个海量数据的时代则被称为大数据时代。随着云时代的来临,大数据(Big data)也吸引了越来越多的关注。大数据(Big data)通常用来形容一个公司创造的大量非结...

  • 回答 84

    Java和大数据的关系:Java是计算机的一门编程语言;可以用来做很多工作,大数据开发属于其中一种;大数据属于互联网方向,就像现在建立在大数据基础上的AI方向一样,他两不是一个同类,但是属于包含和被包含的关系;Java可以用来做大数据工作,大数据开发或者...

  • 回答 52
    已采纳

    学完大数据可以从事很多工作,比如说:hadoop 研发工程师、大数据研发工程师、大数据分析工程师、数据库工程师、hadoop运维工程师、大数据运维工程师、java大数据工程师、spark工程师等等都是我们可以从事的工作岗位!不同的岗位,所具备的技术知识也是不一样...

  • 回答 29

    简言之,大数据是指大数据集,这些数据集经过计算分析可以用于揭示某个方面相关的模式和趋势。大数据技术的战略意义不在于掌握庞大的数据信息,而在于对这些含有意义的数据进行专业化处理。大数据的特点:数据量大、数据种类多、 要求实时性强、数据所蕴藏的...

  • 回答 14

    tail -f的时候,发现一个奇怪的现象,首先 我在一个窗口中 tail -f test.txt 然后在另一个窗口中用vim编辑这个文件,增加了几行字符,并保存,这个时候发现第一个窗口中并没有变化,没有将最新的内容显示出来。tail -F,重复上面的实验过程, 发现这次有变化了...

  • 回答 18

    您好针对您的问题,做出以下回答,希望有所帮助!1、大数据行业还是有非常大的人才需求的,对于就业也有不同的岗位可选,比如大数据工程师,大数据运维,大数据架构师,大数据分析师等等,就业难就难在能否找到适合的工作,能否与你的能力和就业预期匹配。2、...

  • 回答 33

    大数据的定义。大数据,又称巨量资料,指的是所涉及的数据资料量规模巨大到无法通过人脑甚至主流软件工具,在合理时间内达到撷取、管理、处理、并整理成为帮助企业经营决策更积极目的的资讯。大数据是对大量、动态、能持续的数据,通过运用新系统、新工具、新...

  • 回答 17

    最小的基本单位是Byte应该没多少人不知道吧,下面先按顺序给出所有单位:Byte、KB、MB、GB、TB、PB、EB、ZB、YB、DB、NB,按照进率1024(2的十次方)计算:1Byte = 8 Bit1 KB = 1,024 Bytes 1 MB = 1,024 KB = 1,048,576 Bytes 1 GB = 1,024 MB = 1,048,576...

  • 回答 5

    MySQL是一种关系型数据库管理系统,关系数据库将数据保存在不同的表中,而不是将所有数据放在一个大仓库内,这样就增加了速度并提高了灵活性。MySQL的版本:针对不同的用户,MySQL分为两种不同的版本:MySQL Community Server社区版本,免费,但是Mysql不提供...

  • mysql安装步骤mysql 2022-05-07 18:01
    回答 2

    mysql安装需要先使用yum安装mysql数据库的软件包 ;然后启动数据库服务并运行mysql_secure_installation去除安全隐患,最后登录数据库,便可完成安装

  • 回答 5

    1.查看所有数据库showdatabases;2.查看当前使用的数据库selectdatabase();3.查看数据库使用端口showvariableslike&#39;port&#39;;4.查看数据库编码showvariableslike‘%char%’;character_set_client 为客户端编码方式; character_set_connection 为建立连接...

  • 回答 5

    CREATE TABLE IF NOT EXISTS `runoob_tbl`(    `runoob_id` INT UNSIGNED AUTO_INCREMENT,    `runoob_title` VARCHAR(100) NOT NULL,    `runoob_author` VARCHAR(40) NOT NULL,    `submission_date` DATE,    PRI...

  • 回答 9

    学习多久,我觉得看你基础情况。1、如果原来什么语言也没有学过,也没有基础,那我觉得最基础的要先选择一种语言来学习,是VB,C..,pascal,看个人的喜好,一般情况下,选择C语言来学习。2、如果是有过语言的学习,我看应该一个星期差不多,因为语言的理念互通...

  • 回答 7

    添加语句 INSERT插入语句:INSERT INTO 表名 VALUES (‘xx’,‘xx’)不指定插入的列INSERT INTO table_name VALUES (值1, 值2,…)指定插入的列INSERT INTO table_name (列1, 列2,…) VALUES (值1, 值2,…)查询插入语句: INSERT INTO 插入表 SELECT * FROM 查...

  • 回答 5

    看你什么岗位吧。如果是后端,只会CRUD。应该是可以找到实习的,不过公司应该不会太好。如果是数据库开发岗位,那这应该是不会找到的。

  • 回答 7

    查找数据列 SELECT column1, column2, … FROM table_name; SELECT column_name(s) FROM table_name 

没有解决我的问题,去提问