Flink中的watermark有哪些特点

2020-08-19 09:49发布

4条回答
小猪仔
2楼 · 2020-08-19 11:03

1、是一条通过自定义函数生成的特殊数据记录,直接插入到数据流中的。

2、必须单调递增,因为watermark是用来表示系统语义上的时间,既然是时间,那么就必须递增。

对于乱序数据,怎么做到单调递增?

很简单,就是一直把已有的watermark与新生成watermark的取最大值返回。

3、watermark与数据时间相关,是通过数据流中的数据时间产生的。


金喆
3楼 · 2020-08-19 18:48

最近在慢慢看flink的知识,我们都知道,flink和sparkstreaming的一大区别就是flink支持多种时间类型以及增加了watermark水位的概念,那么flink增加的这些功能有什么好处呢?

文章目录

  • 时间类型

  • watermark


时间类型

对于流式数据,最大的特点就是数据上带有时间属性特征,flink根据时间产生的位置不同,将时间分为三种概念。

1.Event Time 事件生成时间 (事件产生的时间)
2.Ingestion Time 事件接入时间 (事件刚接入到flink系统的事件)
3.Processing Time 事件处理时间 (事件经过flink处理,处理算子的实例所在的系统时间)

那么flink为什么要分这三种时间呢?我们直接拿实际生产中的数据来说一下
Event1:

{“logId”:“c171bc53-75f4-41b2-9a8f-065813cd4960”,“responseId”:“c171bc53-75f4-41b2-9a8f-065813cd4960”,“logTime”:1567493296000,“environment”:“prod”,“code”:200,“createTime”:1567493430000}

LogTime:1567493296000 (2019-09-03 14:48:16) 事件产生时间
CreatTime: 1567493430000 (2019-09-03 14:50:30) 事件达到服务端的时间

Event2:

{“logId”:“c171bc53-75f4-41b2-9a8f-065813cd4960”,“responseId”:“c171bc53-75f4-41b2-9a8f-065813cd4960”,“logTime”:1567493361715,“environment”:“prod”,“code”:200,“createTime”:1567493392000}

LogTime:1567493361715 (2019-09-03 14:49:21) 事件产生时间
CreatTime: 1567493392000 (2019-09-03 14:49:52) 事件达到服务端的时间

我们可以看到实际生产中,Event1的产生的时间是要比Event2早的,可是因为种种原因,Event1到达服务端的时间却要比Event2要晚,这样就会产生一个事件乱序的问题,如果对一个时间精度很高的需求,就要避免这种乱序事件发生。

所以我们在实际开发中就要根据不同需求,选择时间类型:
1.Event Time 可以处理数据乱序以及分布式机器时钟不统一的情况
2.Ingestion Time 可以处理分布式机器时钟不统一的情况但处理不了数据乱序的问题
3.Processing Time 都处理不了但在性能和易用性的角度有优势

watermark

这里会有个小疑问,刚接触的人可能会想,那我有了这个事件时间类型就可以避免了数据乱序以及分布式机器时钟不统一的情况,那这个watermark是不是多余呢?显然是不是的,如下图就说明了没有水位,但只有事件时间的弊处。
理解flink的时间概念和watermark_第1张图片
上图表示一个滑动窗口,红色的a是第13s产生的消息,但却因为网络延时,在第19秒到达flink的系统,我们这里采用了eventTime但没有用到watermark,理应第13秒的消息应该在window1和window2中,但是因为没有水位,在时间到达15s钟的时候,window1已经处理了,所以当延迟到19s的消息到达时,虽然这条消息是13s产生的,但是window1已经关闭,所以只能存在于window2里了。

所以我们这里必须要引入watermark,watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用watermark机制结合window来实现。

我们用四个问题简单明了的解释watermark(水位)
Q1.什么是watermark?
A1.watermark可以理解为就是一个时间戳,他是每条数据都会带有的一个隐藏属性。

Q2.watermark的含义?
A2.表示在时间上,小于这个watermark时间戳的数据已经全部达到

Q3.watermark有什么用?
A3.正如上面举的例子,对于有延迟的数据,watermark可以保证该数据在正确的时间处理,
并避免了处理数据时无限期的等下去。

Q4.如何使用watermark?
目前flink支持两种方式指定Timestamps和watermark,第一种是在DataStream Source算子接口的Source Function中定义。因为我们在使用中,我们读数据都是对接外部数据源的,所以此种方法就不多说了。(补充:读kafak数据源支持在source中定义)
另一种就是自定义Timestamps Assigner和Watermark Generator。我们有两种自定义方法,每种自定义方法又根据watermarks生成形式不同分为两种类型,如下所示:
理解flink的时间概念和watermark_第2张图片
我们平时最常用的就是打星号的方法,这两种方法的区别就是第一种的水位是传入的最大时间戳减去固定时长,第二种则是可以自定义水位时间戳
代码一:flink自带的以固定时长生成watermark的方法

import org.apache.flink.api.java.utils.ParameterToolimport org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractorimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindowsimport org.apache.flink.streaming.api.windowing.time.Time

object watermarkTest {

  def main(args: Array[String]): Unit = {

    val parameter = ParameterTool.fromArgs(args)
    val host = parameter.get("host")
    val port = parameter.getInt("port")
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)
    val sockStream = env.socketTextStream(host,port)
    val resultDS = sockStream.map(x=>(x.split(",")(0),x.split(",")(1),1))
    val watermark = resultDS.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[(String, String,Int)](Time.seconds(2)) {
      override def extractTimestamp(element: (String, String,Int)): Long = element._2.toLong    })
    val result = watermark.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))).sum(2)
    result.print()

    env.execute("test")

  }}

代码二:实现接口自定义生成watermark的方法

import org.apache.flink.api.java.utils.ParameterToolimport org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarksimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.watermark.Watermarkimport org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindowsimport org.apache.flink.streaming.api.windowing.time.Time

object watermarkTest1 {

  def main(args: Array[String]): Unit = {

    val parameter = ParameterTool.fromArgs(args)
    val host = parameter.get("host")
    val port = parameter.getInt("port")
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1)
    val sockStream = env.socketTextStream(host,port)
    val resultDS = sockStream.map(x=>(x.split(",")(0),x.split(",")(1).toLong,1))
    val watermark = resultDS.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(String,Long,Int)] {

      var currentMaxTimestamp = 0L
      val maxOutOfOrderness = 2000L//最大允许的乱序时间是10s

      var a : Watermark = _

      override def getCurrentWatermark: Watermark = {
        a = new Watermark(currentMaxTimestamp - maxOutOfOrderness)
        a      }

      override def extractTimestamp(t: (String, Long, Int), previousElementTimestamp: Long): Long = {
        val timestamp = t._2
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)
        timestamp      }
    })

    val result = watermark.keyBy(_._1).window(TumblingEventTimeWindows.of(Time.seconds(5))).sum(2)

    result.print()

    env.execute("test")

  }}

这两种方法都是采用了Periodic Watermarks的方式,即是根据设定时间间隔周期性的生成watermarks,
这个周期具体由ExecutionConfig.setAutoWatermarkInterval设置,如果没有设置会一直调用getCurrentWatermark方法。


乐xenia
4楼 · 2020-08-25 17:34

由于数据事件在传输中,难免会遇到带宽,网络,存储问题或者通过kafak这种管道(同一topic的同一partition中有序)时,则会导致最终到达flink的数据是乱序的,而有些场景需要对时间的准确性有很高的要求,所以就要对乱序数据进行处理,所谓乱序,指的是事件并没按照数据产生的时间(EventTime)到达flink,所以对乱序数据进行处理,首先需要设置flink的时间语意为EventTime

val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment 
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
env.getConfig.setAutoWatermarkInterval(100L) //默认watermark产生间隔时间为200毫秒,通过这个方法可以自定义设置123

那么问题来了,时间属性被设置为EventTime该怎么推进数据的前进呢,比如这个场景,每隔5秒计算之中的最小值,该怎么判断这个5秒窗口的关闭呢,触发窗口计算呢???,此时watermark就登场了,

watermark用来推进EventTime数据的前进,结合window用来处理乱序数据(遇到一个时间戳达到了窗口关闭时间,不应该立刻触发窗口计算,而是等待一段时间,等迟到的数据来了再关闭窗口),

watermark是一种衡量eventTime进展的机制,可以设定延迟触发(比如设置延时时间为一秒,此时5秒的数据到达,因为设置延时为一秒,watermark=maxTimestamp-bound,watermark=5-1=4) 则认为4秒之前的数据都已经到达了

数据流中的watermark用于表示timestamp小于watermark的数据都已经到达了,计算公式为当前最大的时间戳减去延时

watermark的延时设置:太久收到结果的速度就会变慢,实时性不够好,解决办法在水位线到达之前输出一个近似结果

太早,则会导致收到的结果不正确,一些数据没有收集进来,解决办法:可以通过侧输出流来解决

有两种方式生成watermar

1 继承AssignerWithPeriodicWatermarks

2 继承AssignerWithPunctuatedWatermarks

两者区别 前者定时生成watermark默认周期200毫秒按照生成watermark的方式定时插入到数据流中

后者特定事件才产生watermark,一般不常用

class MyAssigner() extends AssignerWithPeriodicWatermarks[SensorReading]{

//定义固定延迟为3秒,毫秒展示

val bound:Long=3*1000L

//定义当前收到的最大的时间戳

var maxTs:Long=Long.MinValue

Override def getCurrentWatermark:Watermark={

//生成watermark的方式,watermark=当前最大的时间戳-延时,比如当前最大的时间戳为5,延时为一秒,watermark 为4,则认为4秒之前的数据已经全部到达

new Watermark(maxTs-bound)

}

Override def extractTimestamp(element:SensorReading,previousElementTimestamp:Long):Long={

maxTs=maxTs.max(element.timestamp*1000L)

Element.timestamp*1000L

}

}

但是上面实现方式过于烦碎,一般采用其简化实现的方法,直接new实例化 new BoundedOutOfOrdernessTimestampExtractor 并且在原始数据进行一次map 或者filter操作之后立刻产生水印watermark,例子如下

val dataStream = inputStream
.map(
data => {
val dataArray = data.split(",")
SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
}
)
//        .assignAscendingTimestamps(.timestamp * 1000L)
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractorSensorReading {
override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L
})
//      .assignTimestampsAndWatermarks( new MyAssigner() )
.map(data => (data.id, data.temperature))
.keyBy(
._1)
//        .process( new MyProcess() )
.timeWindow(Time.seconds(10), Time.seconds(3))
.reduce((result, data) => (data._1, result._2.min(data._2))) // 统计10秒内的最低温度值


完整代码如下

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.time.Time
//定义传感器样例类
case class SensorReading(id: String, timeStamp: Long, temperature: Double)

object windowTest {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.getConfig.setAutoWatermarkInterval(100L)

    val stream: DataStream[String] = env.socketTextStream("localhost", 999)

    val dataStream: DataStream[SensorReading] = stream.map(data => {
      val dataArray: Array[String] = data.split(",")
      SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
    })
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
        override def extractTimestamp(t: SensorReading): Long =t.timeStamp*1000L
      })
//      .assignTimestampsAndWatermarks(new MyAssigner())

    val minTempWindowStream: DataStream[(String, Double)] = dataStream
      .map(data => (data.id, data.temperature))
      .keyBy(0)
      .timeWindow(Time.seconds(10))
      .reduce((d1, d2) => (d1._1, d1._2.min(d2._2)))

    minTempWindowStream.print("min temp")
    dataStream.print("input data")

    env.execute("window test")
  }

  class MyAssigner() extends AssignerWithPeriodicWatermarks[SensorReading]{
    val bound:Long=10000
    var maxTs=Long.MinValue

    override def getCurrentWatermark: Watermark =new Watermark(maxTs-bound)

    override def extractTimestamp(t: SensorReading, l: Long): Long = {
      maxTs=maxTs.max(t.timeStamp)
      t.timeStamp*1000


我的网名不再改
5楼 · 2020-09-04 12:07

背景

image

实时计算中,数据时间比较敏感。有eventTime和processTime区分,一般来说eventTime是从原始的消息中提取过来的,processTime是Flink自己提供的,Flink中一个亮点就是可以基于eventTime计算,这个功能很有用,因为实时数据可能会经过比较长的链路,多少会有延时,并且有很大的不确定性,对于一些需要精确体现事件变化趋势的场景中,单纯使用processTime显然是不合理的。

概念

watermark是一种衡量Event Time进展的机制,它是数据本身的一个隐藏属性。通常基于Event Time的数据,自身都包含一个timestamp.watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用watermark机制结合window来实现。

流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生(out-of-order或者说late element)。

但是对于late element,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark。

window划分

window的设定无关数据本身,而是系统定义好了的。
window是flink中划分数据一个基本单位,window的划分方式是固定的,默认会根据自然时间划分window,并且划分方式是前闭后开。

window划分w1w2w3
3s[00:00:00~00:00:03)[00:00:03~00:00:06)[00:00:06~00:00:09)
5s[00:00:00~00:00:05)[00:00:05~00:00:10)[00:00:10~00:00:15)
10s[00:00:00~00:00:10)[00:00:10~00:00:20)[00:00:20~00:00:30)
1min[00:00:00~00:01:00)[00:01:00~00:02:00)[00:02:00~00:03:00)

示例

如果设置最大允许的乱序时间是10s,滚动时间窗口为5s

{"datetime":"2019-03-26 16:25:24","name":"zhangsan"}//currentThreadId:38,key:zhangsan,eventTime:[2019-03-26 16:25:24],currentMaxTimestamp:[2019-03-26 16:25:24],watermark:[2019-03-26 16:25:14]

触达改记录的时间窗口应该为2019-03-26 16:25:20~2019-03-26 16:25:25
即当有数据eventTime >= 2019-03-26 16:25:35 时

{"datetime":"2019-03-26 16:25:35","name":"zhangsan"}//currentThreadId:38,key:zhangsan,eventTime:[2019-03-26 16:25:35],currentMaxTimestamp:[2019-03-26 16:25:35],watermark:[2019-03-26 16:25:25]//(zhangsan,1,2019-03-26 16:25:24,2019-03-26 16:25:24,2019-03-26 16:25:20,2019-03-26 16:25:25)

后面会详细讲解。_

提取watermark

watermark的提取工作在taskManager中完成,意味着这项工作是并行进行的的,而watermark是一个全局的概念,就是一个整个Flink作业之后一个warkermark。

AssignerWithPeriodicWatermarks

定时提取watermark,这种方式会定时提取更新wartermark。

//默认200mspublic void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {
    this.timeCharacteristic = Preconditions.checkNotNull(characteristic);
    if (characteristic == TimeCharacteristic.ProcessingTime) {
        getConfig().setAutoWatermarkInterval(0);
    } else {
        getConfig().setAutoWatermarkInterval(200);
    }}

AssignerWithPunctuatedWatermarks

伴随event的到来就提取watermark,就是每一个event到来的时候,就会提取一次Watermark。
这样的方式当然设置watermark更为精准,但是当数据量大的时候,频繁的更新wartermark会比较影响性能。
通常情况下采用定时提取就足够了。

使用

设置数据流时间特征

//设置为事件时间env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

默认为TimeCharacteristic.ProcessingTime,默认水位线更新每隔200ms

入口文件

val env = StreamExecutionEnvironment.getExecutionEnvironment//便于测试,并行度设置为1env.setParallelism(1)//env.getConfig.setAutoWatermarkInterval(9000)//设置为事件时间env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//设置source 本地socketval text: DataStream[String] = env.socketTextStream("localhost", 9000)val lateText = new OutputTag[(String, String, Long, Long)]("late_data")val value = text.filter(new MyFilterNullOrWhitespace).flatMap(new MyFlatMap).assignTimestampsAndWatermarks(new MyWaterMark).map(x => (x.name, x.datetime, x.timestamp, 1L)).keyBy(_._1).window(TumblingEventTimeWindows.of(Time.seconds(5))).sideOutputLateData(lateText)//.sum(2).apply(new MyWindow)//.window(TumblingEventTimeWindows.of(Time.seconds(3)))//.apply(new MyWindow)value.getSideOutput(lateText).map(x => {"延迟数据|name:" + x._1 + "|datetime:" + x._2}).print()value.print()env.execute("watermark test")

class MyWaterMark extends AssignerWithPeriodicWatermarks[EventObj] {

  val maxOutOfOrderness = 10000L // 3.0 seconds
  var currentMaxTimestamp = 0L

  val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")

  /**
    * 用于生成新的水位线,新的水位线只有大于当前水位线才是有效的
    *
    * 通过生成水印的间隔(每n毫秒)定义 ExecutionConfig.setAutoWatermarkInterval(...)。
    * getCurrentWatermark()每次调用分配器的方法,如果返回的水印非空并且大于先前的水印,则将发出新的水印。
    *
    * @return
    */
  override def getCurrentWatermark: Watermark = {
    new Watermark(this.currentMaxTimestamp - this.maxOutOfOrderness)
  }

  /**
    * 用于从消息中提取事件时间
    *
    * @param element                  EventObj
    * @param previousElementTimestamp Long
    * @return
    */
  override def extractTimestamp(element: EventObj, previousElementTimestamp: Long): Long = {

    currentMaxTimestamp = Math.max(element.timestamp, currentMaxTimestamp)

    val id = Thread.currentThread().getId
    println("currentThreadId:" + id + ",key:" + element.name + ",eventTime:[" + element.datetime + "],currentMaxTimestamp:[" + sdf.format(currentMaxTimestamp) + "],watermark:[" + sdf.format(getCurrentWatermark().getTimestamp) + "]")

    element.timestamp  }}

代码详解

  1. 设置为事件时间

  2. 接受本地socket数据

  3. 抽取timestamp生成watermark,打印(线程id,key,eventTime,currentMaxTimestamp,watermark)

  4. event time每隔3秒触发一次窗口,打印(key,窗口内元素个数,窗口内最早元素的时间,窗口内最晚元素的时间,窗口自身开始时间,窗口自身结束时间)

试验

第一次

数据

{"datetime":"2019-03-26 16:25:24","name":"zhangsan"}

输出

|currentThreadId:38,key:zhangsan,eventTime:[2019-03-26 16:25:24],currentMaxTimestamp:[2019-03-26 16:25:24],watermark:[2019-03-26 16:25:14]

汇总

KeyEventTimecurrentMaxTimestampWatermark
zhangsan2019-03-26 16:25:242019-03-26 16:25:242019-03-26 16:25:14

第二次

数据

{"datetime":"2019-03-26 16:25:27","name":"zhangsan"}

输出

currentThreadId:38,key:zhangsan,eventTime:[2019-03-26 16:25:27],currentMaxTimestamp:[2019-03-26 16:25:27],watermark:[2019-03-26 16:25:17]

汇总

KeyEventTimecurrentMaxTimestampWatermark
zhangsan2019-03-26 16:25:242019-03-26 16:25:242019-03-26 16:25:14
zhangsan2019-03-26 16:25:272019-03-26 16:25:272019-03-26 16:25:17

随着EventTime的升高,Watermark升高。

第三次

数据

{"datetime":"2019-03-26 16:25:34","name":"zhangsan"}

输出

currentThreadId:38,key:zhangsan,eventTime:[2019-03-26 16:25:34],currentMaxTimestamp:[2019-03-26 16:25:34],watermark:[2019-03-26 16:25:24]

汇总

KeyEventTimecurrentMaxTimestampWatermark
zhangsan2019-03-26 16:25:242019-03-26 16:25:242019-03-26 16:25:14
zhangsan2019-03-26 16:25:272019-03-26 16:25:272019-03-26 16:25:17
zhangsan2019-03-26 16:25:342019-03-26 16:25:342019-03-26 16:25:24

到这里,window仍然没有被触发,此时watermark的时间已经等于了第一条数据的Event Time了。

第四次

数据

{"datetime":"2019-03-26 16:25:35","name":"zhangsan"}

image

输出

currentThreadId:38,key:zhangsan,eventTime:[2019-03-26 16:25:35],currentMaxTimestamp:[2019-03-26 16:25:35],watermark:[2019-03-26 16:25:25](zhangsan,1,2019-03-26 16:25:24,2019-03-26 16:25:24,2019-03-26 16:25:20,2019-03-26 16:25:25)

image

汇总

KeyEventTimecurrentMaxTimestampWatermarkWindowStartTimeWindowEndTime
zhangsan2019-03-26 16:25:242019-03-26 16:25:242019-03-26 16:25:14

zhangsan2019-03-26 16:25:272019-03-26 16:25:272019-03-26 16:25:17

zhangsan2019-03-26 16:25:342019-03-26 16:25:342019-03-26 16:25:24

zhangsan2019-03-26 16:25:352019-03-26 16:25:352019-03-26 16:25:25[2019-03-26 16:25:202019-03-26 16:25:25)

直接证明了window的设定无关数据本身,而是系统定义好了的。
输入的数据中,根据自身的Event Time,将数据划分到不同的window中,如果window中有数据,则当watermark时间>=Event Time时,就符合了window触发的条件了,最终决定window触发,还是由数据本身的Event Time所属的window中的window_end_time决定。

当最后一条数据16:25:35到达是,Watermark提升到16:25:25,此时窗口16:25:20~16:25:25中有数据,Window被触发。

第五次

数据

{"datetime":"2019-03-26 16:25:37","name":"zhangsan"}

输出

currentThreadId:38,key:zhangsan,eventTime:[2019-03-26 16:25:37],currentMaxTimestamp:[2019-03-26 16:25:37],watermark:[2019-03-26 16:25:27]

汇总

KeyEventTimecurrentMaxTimestampWatermarkWindowStartTimeWindowEndTime
zhangsan2019-03-26 16:25:242019-03-26 16:25:242019-03-26 16:25:14

zhangsan2019-03-26 16:25:272019-03-26 16:25:272019-03-26 16:25:17

zhangsan2019-03-26 16:25:342019-03-26 16:25:342019-03-26 16:25:24

zhangsan2019-03-26 16:25:352019-03-26 16:25:352019-03-26 16:25:25[2019-03-26 16:25:202019-03-26 16:25:25)
zhangsan2019-03-26 16:25:372019-03-26 16:25:372019-03-26 16:25:27

此时,watermark时间虽然已经达到了第二条数据的时间,但是由于其没有达到第二条数据所在window的结束时间,所以window并没有被触发。

第二条数据所在的window时间是:[2019-03-26 16:25:25,2019-03-26 16:25:30)

第六次

数据

{"datetime":"2019-03-26 16:25:40","name":"zhangsan"}

输出

currentThreadId:38,key:zhangsan,eventTime:[2019-03-26 16:25:40],currentMaxTimestamp:[2019-03-26 16:25:40],watermark:[2019-03-26 16:25:30](zhangsan,1,2019-03-26 16:25:27,2019-03-26 16:25:27,2019-03-26 16:25:25,2019-03-26 16:25:30)

汇总

KeyEventTimecurrentMaxTimestampWatermarkWindowStartTimeWindowEndTime
zhangsan2019-03-26 16:25:242019-03-26 16:25:242019-03-26 16:25:14

zhangsan2019-03-26 16:25:272019-03-26 16:25:272019-03-26 16:25:17

zhangsan2019-03-26 16:25:342019-03-26 16:25:342019-03-26 16:25:24

zhangsan2019-03-26 16:25:352019-03-26 16:25:352019-03-26 16:25:25[2019-03-26 16:25:202019-03-26 16:25:25)
zhangsan2019-03-26 16:25:372019-03-26 16:25:372019-03-26 16:25:27

zhangsan2019-03-26 16:25:402019-03-26 16:25:402019-03-26 16:25:30[2019-03-26 16:25:252019-03-26 16:25:30)

结论

window的触发要符合以下几个条件:

  1. watermark时间 >= window_end_time

  2. 在[window_start_time,window_end_time)中有数据存在

同时满足了以上2个条件,window才会触发。
watermark是一个全局的值,不是某一个key下的值,所以即使不是同一个key的数据,其warmark也会增加.

多并行度

image

总结

Flink如何处理乱序?

watermark+window机制。window中可以对input进行按照Event Time排序,使得完全按照Event Time发生的顺序去处理数据,以达到处理乱序数据的目的。

Flink何时触发window?

对于late element太多的数据而言

  1. Event Time < watermark>

对于out-of-order以及正常的数据而言

  1. watermark时间 >= window_end_time

  2. 在[window_start_time,window_end_time)中有数据存在

Flink应该如何设置最大乱序时间?

结合自己的业务以及数据情况去设置。



相关问题推荐

  • 什么是大数据时代?2021-01-13 21:23
    回答 100

    大数据(big data)一词越来越多地被提及,人们用它来描述和定义信息爆炸时代产生的海量数据,而这个海量数据的时代则被称为大数据时代。随着云时代的来临,大数据(Big data)也吸引了越来越多的关注。大数据(Big data)通常用来形容一个公司创造的大量非结...

  • 回答 84

    Java和大数据的关系:Java是计算机的一门编程语言;可以用来做很多工作,大数据开发属于其中一种;大数据属于互联网方向,就像现在建立在大数据基础上的AI方向一样,他两不是一个同类,但是属于包含和被包含的关系;Java可以用来做大数据工作,大数据开发或者...

  • 回答 52
    已采纳

    学完大数据可以从事很多工作,比如说:hadoop 研发工程师、大数据研发工程师、大数据分析工程师、数据库工程师、hadoop运维工程师、大数据运维工程师、java大数据工程师、spark工程师等等都是我们可以从事的工作岗位!不同的岗位,所具备的技术知识也是不一样...

  • 回答 29

    简言之,大数据是指大数据集,这些数据集经过计算分析可以用于揭示某个方面相关的模式和趋势。大数据技术的战略意义不在于掌握庞大的数据信息,而在于对这些含有意义的数据进行专业化处理。大数据的特点:数据量大、数据种类多、 要求实时性强、数据所蕴藏的...

  • 回答 14

    tail -f的时候,发现一个奇怪的现象,首先 我在一个窗口中 tail -f test.txt 然后在另一个窗口中用vim编辑这个文件,增加了几行字符,并保存,这个时候发现第一个窗口中并没有变化,没有将最新的内容显示出来。tail -F,重复上面的实验过程, 发现这次有变化了...

  • 回答 18

    您好针对您的问题,做出以下回答,希望有所帮助!1、大数据行业还是有非常大的人才需求的,对于就业也有不同的岗位可选,比如大数据工程师,大数据运维,大数据架构师,大数据分析师等等,就业难就难在能否找到适合的工作,能否与你的能力和就业预期匹配。2、...

  • 回答 17

    最小的基本单位是Byte应该没多少人不知道吧,下面先按顺序给出所有单位:Byte、KB、MB、GB、TB、PB、EB、ZB、YB、DB、NB,按照进率1024(2的十次方)计算:1Byte = 8 Bit1 KB = 1,024 Bytes 1 MB = 1,024 KB = 1,048,576 Bytes 1 GB = 1,024 MB = 1,048,576...

  • 回答 33

    大数据的定义。大数据,又称巨量资料,指的是所涉及的数据资料量规模巨大到无法通过人脑甚至主流软件工具,在合理时间内达到撷取、管理、处理、并整理成为帮助企业经营决策更积极目的的资讯。大数据是对大量、动态、能持续的数据,通过运用新系统、新工具、新...

  • 回答 5

    MySQL是一种关系型数据库管理系统,关系数据库将数据保存在不同的表中,而不是将所有数据放在一个大仓库内,这样就增加了速度并提高了灵活性。MySQL的版本:针对不同的用户,MySQL分为两种不同的版本:MySQL Community Server社区版本,免费,但是Mysql不提供...

  • mysql安装步骤mysql 2022-05-07 18:01
    回答 2

    mysql安装需要先使用yum安装mysql数据库的软件包 ;然后启动数据库服务并运行mysql_secure_installation去除安全隐患,最后登录数据库,便可完成安装

  • 回答 5

    1.查看所有数据库showdatabases;2.查看当前使用的数据库selectdatabase();3.查看数据库使用端口showvariableslike&#39;port&#39;;4.查看数据库编码showvariableslike‘%char%’;character_set_client 为客户端编码方式; character_set_connection 为建立连接...

  • 回答 5

    CREATE TABLE IF NOT EXISTS `runoob_tbl`(    `runoob_id` INT UNSIGNED AUTO_INCREMENT,    `runoob_title` VARCHAR(100) NOT NULL,    `runoob_author` VARCHAR(40) NOT NULL,    `submission_date` DATE,    PRI...

  • 回答 9

    学习多久,我觉得看你基础情况。1、如果原来什么语言也没有学过,也没有基础,那我觉得最基础的要先选择一种语言来学习,是VB,C..,pascal,看个人的喜好,一般情况下,选择C语言来学习。2、如果是有过语言的学习,我看应该一个星期差不多,因为语言的理念互通...

  • 回答 7

    添加语句 INSERT插入语句:INSERT INTO 表名 VALUES (‘xx’,‘xx’)不指定插入的列INSERT INTO table_name VALUES (值1, 值2,…)指定插入的列INSERT INTO table_name (列1, 列2,…) VALUES (值1, 值2,…)查询插入语句: INSERT INTO 插入表 SELECT * FROM 查...

  • 回答 5

    看你什么岗位吧。如果是后端,只会CRUD。应该是可以找到实习的,不过公司应该不会太好。如果是数据库开发岗位,那这应该是不会找到的。

  • 回答 7

    查找数据列 SELECT column1, column2, … FROM table_name; SELECT column_name(s) FROM table_name 

没有解决我的问题,去提问