窗口(window)

image.png

  • 一般真实的流都是无界的,怎样处理无界的数据?
  • 可以把无限的数据流进行切分,得到有限的数据集进行处理-也就是得到有界流
  • 窗口(window)就是将无限流切割为有限流的一种方式,它会将流数据分发到有限大小的桶(bucket)中进行分析

window类型

  • 时间窗口(Time Window)
    • 滚动时间窗口
    • 滑动时间窗口
    • 会话窗口
  • 计数窗口(Count Window)
    • 滚动计数窗口
    • 滑动计数窗口

滚动窗口(Tumbling Windows)

image.png

  • 将数据有固定的窗口长度对数据进行切分
  • 时间对齐,窗口长度固定,没有重叠
  • 时间按照左闭右开进行划分[t1,t1 + window size)

滑动窗口(Sliding Windows)

image.png

  • 滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度(window size)和滑动间隔(window slide)组成
  • 窗口长度固定,可以有重叠

会话窗口(Session Windows)

image.png

  • 由一系列事件组合一个指定时间长度的timeout间隙组成,也就是一段时间没有接收到的新数据就会生成新的窗口
  • 特点:事件无对齐

Window Api

  • 窗口分配器 - window() 方法
    • 我们可以用.window()来定义一个窗口,然后基于这个window去做一些聚合或者其他处理操作。注意window方法必须在keyBy之后使用
    • Flink提供了更加简单的.timeWindow.countWindow方法,用于定义时间窗口和计数窗口。
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val ds = env.fromCollection(Seq(
      (18, "zhangsan"),
      (19, "lisi"),
      (18, "wangwu")
    ))
    ds.keyBy(x => x._1).window(TumblingProcessingTimeWindows.of(Time.seconds(15)))
    // 计数窗口
    // ds.keyBy(x => x._1) .countWindow(10,15)
    env.execute()

窗口函数(window function)

  • window function 定义了要对窗口中手机的数据做的计算操作
    • 增量聚合函数(incremental aggregation functions)
      • 每条数据到来就进行计算,保持一个简单的状态
      • ReduceFunction,AggregateFunction
    • 全窗口函数(full window functions)
      • 先把窗口所有数据收集起来,等到计算的时候遍历所有数据
      • ProcessWindowFunction,WindowFunction

aggregate

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val d1 = env.socketTextStream("localhost", 9999, '\n')
      .keyBy(_.length).window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
    d1.aggregate(new AggregateFunction[String, Int, Int] {
      override def createAccumulator(): Int = 0

      override def add(value: String, accumulator: Int): Int = accumulator + 1

      override def getResult(accumulator: Int): Int = accumulator

      override def merge(a: Int, b: Int): Int = a + b
    }).print("aggregate")
    env.execute()

image.png

apply

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val d1 = env.socketTextStream("localhost", 9999, '\n')
      .keyBy(_.length).window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
    // WindowFunction泛型 => IN, OUT, KEY, W <: Window
    d1.apply(new WindowFunction[String, (Int, Int), Int, TimeWindow] {
      override def apply(key: Int, window: TimeWindow, input: Iterable[String], out: Collector[(Int, Int)]): Unit = {
        val size = input.foldLeft(0)((u, v) => u + 1)
        out.collect((key, size))
      }
    }).print("apply")
    env.execute()

image.png

process

  • 每3秒统计一次
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val d1 = env.socketTextStream("localhost", 9999, '\n')
      .keyBy(_.length).window(TumblingProcessingTimeWindows.of(Time.seconds(3)))
    // ProcessWindowFunction泛型 => IN, OUT, KEY, W <: Window
    d1.process(new ProcessWindowFunction[String, (String, Int), Int, TimeWindow] {
      override def process(key: Int, context: Context, elements: Iterable[String], out: Collector[(String, Int)]): Unit = {
        val size = elements.foldLeft(0)((u, v) => u + 1)
        val now = new DateTime().toString("yyyy-MM-dd HH:mm:ss")
        out.collect((now, size))
      }
    }).print("process")
    env.execute()

image.png

  • 每3次统计一次
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val d1 = env.socketTextStream("localhost", 9999, '\n')
      .keyBy(_.length).countWindow(3, 3)
    // ProcessWindowFunction泛型 => IN, OUT, KEY, W <: Window
    d1.process(new ProcessWindowFunction[String, (String, Int, Int), Int, GlobalWindow] {
      override def process(key: Int, context: Context, elements: Iterable[String], out: Collector[(String, Int, Int)]): Unit = {
        val size = elements.foldLeft(0)((u, v) => u + 1)
        val now = new DateTime().toString("yyyy-MM-dd HH:mm:ss")
        out.collect((now, key, size))
      }
    }).print("process")
    env.execute()

image.png

其它可选API

  • trigger 触发器
    • 定义window什么时候关闭,触发计算并输出结果
  • evictor 移除器
    • 定义移除某些数据的逻辑
  • allowedLateness 允许处理迟到的数据
  • sideOutputLateData 将迟到的数据放入侧输出流
    • 后续通过getSideOutput获取侧输出流
  • getSideOutput 获取输出流

window API总览

image.png