# Windowing Support in Core Storm
Storm core 支持处理落在窗口内的一组元组。窗口操作指定了一下两个参数
```
1.窗口的长度 - 窗口的长度或持续时间
2.滑动间隔 - 窗口滑动的时间间隔
```
## 滑动窗口
元组被分组在窗口和每个滑动间隔窗口中。 一个元组可以属于多个窗口。
例如一个持续时间长度为 10 秒和滑动间隔 5 秒的滑动窗口。 `........| e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |... -5 0 5 10 15 -> time |<------- w1 -->| |<---------- w2 ----->| |<-------------- w3 ---->|` 窗口每5秒进行一次评估,第一个窗口中的某些元组与第二个窗口重叠。
注意:窗口第一次滑动在 t = 5s,并且将包含在前 5 秒钟内收到的事件。
## Tumbling Window
元组根据时间或数量被分组在一个窗口中。任何元组只属于其中一个窗口。
例如一个持续时间长度为 5s 的 tumbling window。
```
| e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |...
0 5 10 15 -> time
w1 w2 w3
```
窗口每五秒进行一次评估,并且没有窗口重叠。
Storm 支持指定窗口长度和滑动间隔作为元组数的计数或持续时间。 bolt 接口 `IWindowedBolt` 需要由窗口支持的bolts来实现。
```
public interface IWindowedBolt extends IComponent {
void prepare(Map stormConf, TopologyContext context, OutputCollector collector);
/**
* Process tuples falling within the window and optionally emit
* new tuples based on the tuples in the input window.
*/
void execute(TupleWindow inputWindow);
void cleanup();
}
```
每次窗口激活时,都会调用 `execute` 方法。TupleWindow 的参数允许访问窗口中的当前元组,过期的元组以及自上次窗口计算后添加的新元组,这对于高效的窗口计算将是有用的。
需要窗口支持的 Bolts 一般会扩展为 `BaseWindowedBolt`,它有用来指定窗口长度和滑动间隔的apis.
例如
```
public class SlidingWindowBolt extends BaseWindowedBolt {
private OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(TupleWindow inputWindow) {
for(Tuple tuple: inputWindow.get()) {
// do the windowing computation
...
}
// emit the results
collector.emit(new Values(computedValue));
}
}
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 1);
builder.setBolt("slidingwindowbolt",
new SlidingWindowBolt().withWindow(new Count(30), new Count(10)),
1).shuffleGrouping("spout");
Config conf = new Config();
conf.setDebug(true);
conf.setNumWorkers(1);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
}
```
支持以下窗口配置
```
withWindow(Count windowLength, Count slidingInterval)
基于元组计数的滑动窗口,在多个tuples进行 `slidingInterval`滑动之后。
withWindow(Count windowLength)
基于元组计数的窗口,它与每个传入的元组一起滑动。
withWindow(Count windowLength, Duration slidingInterval)
基于元组计数的滑动窗口,在`slidingInterval`持续时间滑动之后。
withWindow(Duration windowLength, Duration slidingInterval)
基于持续时间的滑动窗口,在`slidingInterval`持续时间滑动之后。
withWindow(Duration windowLength)
基于持续时间的窗口,它与每个传入的元组一起滑动。
withWindow(Duration windowLength, Count slidingInterval)
基于时间的滑动窗口配置在`slidingInterval`多个元组之后滑动。
withTumblingWindow(BaseWindowedBolt.Count count)
计数的tumbling窗口在指定的元组数之后tumbles.
withTumblingWindow(BaseWindowedBolt.Duration duration)
基于持续时间的tumbling窗口在指定的持续时间后tumbles。
```
## 元组时间戳和乱序元组
默认情况下,在窗口中追踪的时间戳是 bolt 处理元组的时间。窗口计算是根据正在处理的时间戳进行的。 Storm 支持基于源生成的时间戳的追踪窗口。
```
/**
* Specify a field in the tuple that represents the timestamp as a long value. If this
* field is not present in the incoming tuple, an {@link IllegalArgumentException} will be thrown.
*
* @param fieldName the name of the field that contains the timestamp
*/
public BaseWindowedBolt withTimestampField(String fieldName)
```
上述`fieldName`的值将从传入的元组中查找并考虑进行窗口计算。如果该元组中不存在该字段,将抛出异常。或者,[TimestampExtractor](../storm-core/src/jvm/org/apache/storm/windowing/TimestampExtractor.java)可以用于从元组导出时间戳值(例如,从元组中的嵌套字段提取时间戳)。
```
/**
* Specify the timestamp extractor implementation.
*
* @param timestampExtractor the {@link TimestampExtractor} implementation
*/
public BaseWindowedBolt withTimestampExtractor(TimestampExtractor timestampExtractor)
```
与时间戳字段 name/extractor 一起,可以指定一个时间滞后参数,它指示具有无序时间戳的元组的最大时间限制。
```
/**
* Specify the maximum time lag of the tuple timestamp in milliseconds. It means that the tuple timestamps
* cannot be out of order by more than this amount.
*
* @param duration the max lag duration
*/
public BaseWindowedBolt withLag(Duration duration)
```
例如:如果滞后是5秒,并且元组`t1`到达时间戳为`06:00:05`没有元组可能会在早于`06:00:00`的元组时间戳到达。 如果一个元组在`t1`之后到达时间戳`05:59:59`,并且窗口已经移动过`t1`了,它将被视为迟到的元组。 默认情况下不处理迟到的元组,只需在INFO级别打印到工作日志文件。 ```java /** * Specify a stream id on which late tuples are going to be emitted. They are going to be accessible via the * {@link org.apache.storm.topology.WindowedBoltExecutor#LATE_TUPLE_FIELD} field. * It must be defined on a per-component basis, and in conjunction with the * {@link BaseWindowedBolt#withTimestampField}, otherwise {@link IllegalArgumentException} will be thrown. * * @param streamId the name of the stream used to emit late tuples on */ public BaseWindowedBolt withLateTupleStream(String streamId)
```
通过指定上述 `streamId` 来更改此行为。 在这种情况下,迟到的元组将在指定的流中发出并可通过`WindowedBoltExecutor.LATE_TUPLE_FIELD` 访问
字段。
### Watermarks
为了处理具有时间戳字段的元组,storm 根据传入的元组时间戳内部计算 watermarks。Watermark 是所有输入流中最新的元组时间戳(减去滞后)的最小值。在较高级别,watermark类似于 Flink 和 Google 的 MillWheel 用于跟踪基于事件的时间戳的概念。
定期的(默认每秒),watermark时间戳被发出,如果基于元组的时间戳被使用,这被认为是窗口计算的 clock tick(时钟勾)。可以用下面的api来改变发出 watermarks 的时间间隔。
```java
/**
* Specify the watermark event generation interval. For tuple based timestamps, watermark events
* are used to track the progress of time
*
* @param interval the interval at which watermark events are generated
*/
public BaseWindowedBolt withWatermarkInterval(Duration interval)
```
当接收到watermark时,将对所有时间戳记进行评估。
例如,考虑具有以下窗口参数基于元组的时间戳处理,
`Window length = 20s, sliding interval = 10s, watermark emit frequency = 1s, max lag = 5s`
```
|-----|-----|-----|-----|-----|-----|-----|
0 10 20 30 40 50 60 70
```
当前 ts = `09:00:00`
在`9:00:00`到`9:00:01`收到的元组`e1(6:00:03), e2(6:00:05), e3(6:00:07), e4(6:00:18), e5(6:00:26), e6(6:00:36)`
在time t = `09:00:01`, watermark w1 = `6:00:31`被发出,没有早于`6:00:31`的元组可以到达。
三个窗口将被评估。通过采取最早的事件时间戳(06:00:03)并基于滑动间隔(10s)计算上限来计算第一个窗口结束在 ts(06:00:10)。
1. `5:59:50 - 06:00:10` 有元组 e1, e2, e3
2. `6:00:00 - 06:00:20` 有元组 e1, e2, e3, e4
3. `6:00:10 - 06:00:30` 有元组 e4, e5
e6未被评估,因为 watermark 时间戳`6:00:31`比元组 ts`6:00:36`更旧。
在`9:00:01`和 `9:00:02`之间,接收到的元组`e7(8:00:25), e8(8:00:26), e9(8:00:27), e10(8:00:39)`
在 time t = `09:00:02`另一个 watermark w2 = `08:00:34`被发出,没有元组比`8:00:34`更早到达。
三个窗口将被评估
1. `6:00:20 - 06:00:40` 有元组 e5, e6 (从早期批次)
2. `6:00:30 - 06:00:50` 有元组 e6 (从早期批次)
3. `8:00:10 - 08:00:30` 有元组 e7, e8, e9
e10 不被评估,因为元组 ts `8:00:39`超出了watermark time `8:00:34`.
窗口计算考虑时间间隔,并基于元组时间戳计算窗口。
## Guarantees
storm core的窗口功能目前提供一致性保证。`执行(TupleWindow inputWindow)`方法发出的值将自动锁定到 inputWindow 中的所有元组。预计下游 bolts 将确认接收的元组(即从窗口 bolt 发出的元组)以完成元组树。如果不是,元组将重播,并且重新评估窗口计算。
窗口中的元组会在过期后被自动确认,即当它们在`windowLength + slidingInterval`之后从窗口中滑落出来。请注意,配置`topology.message.timeout.secs`应该远远超过基于时间窗口的`windowLength + slidingInterval`; 否则元组将超时并重播,并可能导致重复的评估。对于基于计数的窗口,应该调整配置,使得在超时时间段内可以接收到`windowLength + slidingInterval`元组。
## 拓扑示例
示例拓扑`滑动窗口拓扑`显示了如何使用apis来计算滑动窗口总和和滚动窗口平均值。
- Storm 基础
- 概念
- Scheduler(调度器)
- Configuration
- Guaranteeing Message Processing
- 守护进程容错
- 命令行客户端
- Storm UI REST API
- 理解 Storm Topology 的 Parallelism(并行度)
- FAQ
- Layers on Top of Storm
- Storm Trident
- Trident 教程
- Trident API 综述
- Trident State
- Trident Spouts
- Trident RAS API
- Storm SQL
- Storm SQL 集成
- Storm SQL 示例
- Storm SQL 语言参考
- Storm SQL 内部实现
- Flux
- Storm 安装和部署
- 设置Storm集群
- 本地模式
- 疑难解答
- 在生产集群上运行 Topology
- Maven
- 安全地运行 Apache Storm
- CGroup Enforcement
- Pacemaker
- 资源感知调度器 (Resource Aware Scheduler)
- 用于分析 Storm 的各种内部行为的 Metrics
- Windows 用户指南
- Storm 中级
- 序列化
- 常见 Topology 模式
- Clojure DSL
- 使用没有jvm的语言编辑storm
- Distributed RPC
- Transactional Topologies
- Hooks
- Storm Metrics
- Storm 状态管理
- Windowing Support in Core Storm
- Joining Streams in Storm Core
- Storm Distributed Cache API
- Storm 调试
- 动态日志级别设置
- Storm Logs
- 动态员工分析
- 拓扑事件检查器
- Storm 与外部系统, 以及其它库的集成
- Storm Kafka Integration
- Storm Kafka 集成(0.10.x+)
- Storm HBase Integration
- Storm HDFS Integration
- Storm Hive 集成
- Storm Solr 集成
- Storm Cassandra 集成
- Storm JDBC 集成
- Storm JMS 集成
- Storm Redis 集成
- Azue Event Hubs 集成
- Storm Elasticsearch 集成
- Storm MQTT(Message Queuing Telemetry Transport, 消息队列遥测传输) 集成
- Storm MongoDB 集成
- Storm OpenTSDB 集成
- Storm Kinesis 集成
- Storm Druid 集成
- Storm and Kestrel
- Container, Resource Management System Integration
- Storm 高级
- 针对 Storm 定义一个不是 JVM 的 DSL
- 多语言协议
- Storm 内部实现
- 翻译进度