如何在代码中定义flink的滑动窗口

2020-08-19 10:43发布

1条回答
yangzp
2020-08-26 13:53

滑动窗口(Sliding Windows

滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度滑动间隔组成。一次数据统计的时间长度 每次统计移动多长的时间

特点:时间对齐,窗口长度固定,可以有重叠。一个数据可以被统计多次,滑动间隔窗口长度某个数值的整数倍

滑动窗口分配器将元素分配到固定长度的窗口中,与滚动窗口类似,窗口的大小由窗口大小参数来配置,另一个窗口滑动参数控制滑动窗口开始的频率。因此,滑动窗口如果滑动参数小于窗口大小的话,窗口是可以重叠的,在这种情况下元素会被分配到多个窗口中。

例如,你有 10 分钟的窗口和 5 分钟的滑动,那么每个窗口中 5 分钟的窗口里包含着上个 10 分钟产生的数据,如下图所示:

image.png 

适用场景:对最近一个时间段内的统计(求某接口最近 5min 的失败率来决定是否要报警)。

滑动窗口和滚动窗口的函数名是完全一致的,只是在传参数时需要传入两个参数,一个

window_size,一个是 sliding_size

下面代码中的 sliding_size 设置为了 5s,也就是说,窗口每 5s 就计算一次,每一次计

算的 window 范围是 15s 内的所有元素。

object Windows {

  def main(args: Array[String]): Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val dataStream: DataStream[String] = env.socketTextStream("linux01", 9999)

    val countStream: DataStream[(String, Int)] = dataStream.map(r => (r, 1)).keyBy(0).timeWindow(Time.milliseconds(5000),Time.milliseconds(1000)).sum(1)

    countStream.print()

    env.execute("Windows is runned")

  }

}

时间间隔可以通过 Time.milliseconds(x)Time.seconds(x)Time.minutes(x)等其中的一个来指定。




一周热门 更多>