窗口(window)

- 一般真实的流都是无界的,怎样处理无界的数据?
- 可以把无限的数据流进行切分,得到有限的数据集进行处理-也就是得到有界流
- 窗口(window)就是将无限流切割为有限流的一种方式,它会将流数据分发到有限大小的桶(bucket)中进行分析
window类型
- 时间窗口(Time Window)
- 计数窗口(Count Window)
滚动窗口(Tumbling Windows)

- 将数据有固定的窗口长度对数据进行切分
- 时间对齐,窗口长度固定,没有重叠
- 时间按照左闭右开进行划分
[t1,t1 + window size)
滑动窗口(Sliding Windows)

- 滑动窗口是固定窗口的更广义的一种形式,滑动窗口由固定的窗口长度(window size)和滑动间隔(window slide)组成
- 窗口长度固定,可以有重叠
会话窗口(Session Windows)

- 由一系列事件组合一个指定时间长度的timeout间隙组成,也就是一段时间没有接收到的新数据就会生成新的窗口
- 特点:事件无对齐
Window Api
- 窗口分配器 - window() 方法
- 我们可以用
.window()
来定义一个窗口,然后基于这个window去做一些聚合或者其他处理操作。注意window方法必须在keyBy之后使用
- Flink提供了更加简单的
.timeWindow
和.countWindow
方法,用于定义时间窗口和计数窗口。
val env = StreamExecutionEnvironment.getExecutionEnvironment
val ds = env.fromCollection(Seq(
(18, "zhangsan"),
(19, "lisi"),
(18, "wangwu")
))
ds.keyBy(x => x._1).window(TumblingProcessingTimeWindows.of(Time.seconds(15)))
// 计数窗口
// ds.keyBy(x => x._1) .countWindow(10,15)
env.execute()
窗口函数(window function)
- window function 定义了要对窗口中手机的数据做的计算操作
- 增量聚合函数(incremental aggregation functions)
- 每条数据到来就进行计算,保持一个简单的状态
- ReduceFunction,AggregateFunction
- 全窗口函数(full window functions)
- 先把窗口所有数据收集起来,等到计算的时候遍历所有数据
- ProcessWindowFunction,WindowFunction
aggregate
val env = StreamExecutionEnvironment.getExecutionEnvironment
val d1 = env.socketTextStream("localhost", 9999, '\n')
.keyBy(_.length).window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
d1.aggregate(new AggregateFunction[String, Int, Int] {
override def createAccumulator(): Int = 0
override def add(value: String, accumulator: Int): Int = accumulator + 1
override def getResult(accumulator: Int): Int = accumulator
override def merge(a: Int, b: Int): Int = a + b
}).print("aggregate")
env.execute()

apply
val env = StreamExecutionEnvironment.getExecutionEnvironment
val d1 = env.socketTextStream("localhost", 9999, '\n')
.keyBy(_.length).window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
// WindowFunction泛型 => IN, OUT, KEY, W <: Window
d1.apply(new WindowFunction[String, (Int, Int), Int, TimeWindow] {
override def apply(key: Int, window: TimeWindow, input: Iterable[String], out: Collector[(Int, Int)]): Unit = {
val size = input.foldLeft(0)((u, v) => u + 1)
out.collect((key, size))
}
}).print("apply")
env.execute()

process
val env = StreamExecutionEnvironment.getExecutionEnvironment
val d1 = env.socketTextStream("localhost", 9999, '\n')
.keyBy(_.length).window(TumblingProcessingTimeWindows.of(Time.seconds(3)))
// ProcessWindowFunction泛型 => IN, OUT, KEY, W <: Window
d1.process(new ProcessWindowFunction[String, (String, Int), Int, TimeWindow] {
override def process(key: Int, context: Context, elements: Iterable[String], out: Collector[(String, Int)]): Unit = {
val size = elements.foldLeft(0)((u, v) => u + 1)
val now = new DateTime().toString("yyyy-MM-dd HH:mm:ss")
out.collect((now, size))
}
}).print("process")
env.execute()

val env = StreamExecutionEnvironment.getExecutionEnvironment
val d1 = env.socketTextStream("localhost", 9999, '\n')
.keyBy(_.length).countWindow(3, 3)
// ProcessWindowFunction泛型 => IN, OUT, KEY, W <: Window
d1.process(new ProcessWindowFunction[String, (String, Int, Int), Int, GlobalWindow] {
override def process(key: Int, context: Context, elements: Iterable[String], out: Collector[(String, Int, Int)]): Unit = {
val size = elements.foldLeft(0)((u, v) => u + 1)
val now = new DateTime().toString("yyyy-MM-dd HH:mm:ss")
out.collect((now, key, size))
}
}).print("process")
env.execute()

其它可选API
- trigger 触发器
- evictor 移除器
- allowedLateness 允许处理迟到的数据
- sideOutputLateData 将迟到的数据放入侧输出流
- getSideOutput 获取输出流
window API总览
