# Joining Streams in Storm Core
Storm 支持通过 JoinBolt 来 join 多个 data streams 变成一个 stream. `JoinBolt`是一个 Windowed bolt。`JoinBolt` 会等待配置的窗口时间来匹配被join 的streams的tuples。这有助于通过窗口边界生成streams.
`JoinBolt` 每个进来的 data streams 必须基于一个字段进行 Field Group。stream只能使用被 FieldsGrouped的字段 join 其他stream。
## Performing Joins
考虑下面的 SQL join,设计四张表:
```
select userId, key4, key2, key3
from table1
inner join table2 on table2.userId = table1.key1
inner join table3 on table3.key3 = table2.userId
left join table4 on table4.key4 = table3.key3
```
相同的可以使用`JoinBolt`,join四个spouts,生成想要的tuples:
```
JoinBolt jbolt = new JoinBolt("spout1", "key1") // from spout1
.join ("spout2", "userId", "spout1") // inner join spout2 on spout2.userId = spout1.key1
.join ("spout3", "key3", "spout2") // inner join spout3 on spout3.key3 = spout2.userId
.leftJoin ("spout4", "key4", "spout3") // left join spout4 on spout4.key4 = spout3.key3
.select ("userId, key4, key2, spout3:key3") // chose output fields
.withTumblingWindow( new Duration(10, TimeUnit.MINUTES) ) ;
topoBuilder.setBolt("joiner", jbolt, 1)
.fieldsGrouping("spout1", new Fields("key1") )
.fieldsGrouping("spout2", new Fields("userId") )
.fieldsGrouping("spout3", new Fields("key3") )
.fieldsGrouping("spout4", new Fields("key4") );
```
bolt 构造器需要两个参数.第一个参数介绍了第一个stream来自于 `spout1`,并指定了通过key1来和其他 streams 连接.组件的名称必须根据直接连接 Join bolt的 `spout` 或者bolt来设置.这里,来自于spout1的数据必须根据 `key1` 来 field group。同样的,调用 `leftJoin()` 和 `join()` 方法的时候,也会通过这个字段来join.根据上面的例子,FieldGrouping 要求也适用于其他spout 的streams。第三个参数表示streams要和哪个spout的streams连接.
`select()` 方法用来指定 output fields。`select` 参数是逗号分隔的字段列表。单个字段可以通过 stream名称作为前缀,来区别不同streams中相同的字段,像这样:`.select("spout3:key3, spout4:key3")`.嵌套的tuple 类型是支持的.例如,`outer.inner.innermost` 就是一个字段嵌套三层深度,`outer` 和 `inner` 是 `Map` 的类型.
join 参数中的字段不允许用 stream 名称作为前缀,但是支持嵌套字段.
上面调用 `withTumblingWindow()`方法,将join 窗口配置成10分钟的翻滚窗口.由于 `JoinBolt` 是一个窗口 spout,我们还可以使用 `withWindow` 方法将其配置为滑动窗口(参考下面的提示部分).
## Stream names and Join order
* Streams name 在引用之前必须声明,在构造函数和 join方法的第一个参数都需要 Streams name,join方法的第三个参数会用到Stream name.像下面这样引用stream name是不允许的:
```
new JoinBolt( "spout1", "key1")
.join ( "spout2", "userId", "spout3") //not allowed. spout3 not yet introduced
.join ( "spout3", "key3", "spout1")
```
* 在内部,join将按照用户所表示的顺序执行。
## Joining based on Stream names
为了简单起见,Storm topology(拓扑)经常使用 `default` stream 。拓扑也可以使用命名的stream 而不是`default` stream。为了支持这种 topology(拓扑),可以通过构造函数的第一个参数将 `JoinBolt` 配置为使用 stream name 而不是源组件(spout / bolt)名称:
```
new JoinBolt(JoinBolt.Selector.STREAM, "stream1", "key1")
.join("stream2", "key2")
...
```
第一个参数 `JoinBolt.Selector.STREAM` 通知 bolt `stream1/2/3/4` 引用 named stream (而不是上游 spouts/bolts的名称)。
以下示例从四个 spouts 连接两个命名流:
```
new JoinBolt(JoinBolt.Selector.STREAM, "stream1", "key1")
.join ("stream2", "userId", "stream1" )
.select ("userId, key1, key2")
.withTumblingWindow( new Duration(10, TimeUnit.MINUTES) ) ;
topoBuilder.setBolt("joiner", jbolt, 1)
.fieldsGrouping("bolt1", "stream1", new Fields("key1") )
.fieldsGrouping("bolt2", "stream1", new Fields("key1") )
.fieldsGrouping("bolt3", "stream2", new Fields("userId") )
.fieldsGrouping("bolt4", "stream1", new Fields("key1") );
```
在上述示例中,例如,spout1也可能发送其他 stream。但是连接 bolt 只是从不同的 bolts 订阅了`stream1`&stream2。来自`bolt1`,`bolt2`和`bolt4`的`stream1`被视为单个 stream,并且与`bolt3`相连接。
## Limitations:
1.当前值支持 INNER 和LEFT join.
1. 不同于SQL,它允许通过不同的 keys 将相同的表和不同的表连接,这里必须在stream 上使用相同的字段. Fields Grouping 保证tuples被正确连接到 JoinBolt的实例.因此,FieldsGrouping字段必须与 join 字段相同,以获得正确的结果.要在多个字段上执行 join,可以将这些字段组着成一个字段,然后发送到 Join Bolt。
## Tips:
1. Join 可以是CPU和内存密集型.当前窗口中积累的数据越大(与窗口长度成正比),join所需要的时间就越长。滑动间隔很短(例如几秒钟)会触发频繁的连接.因此,如果使用大的窗口长度或者小的滑动间隔,则性能可能受损.
2. 使用滑动窗口时,跨窗口重复 join 记录。这是因为使用滑动窗口时,tuples 在多个窗口中继续存在。
3. 如果启用了消息超时,请确保超时设置(topology.message.timeout.secs)足够大以舒适地适应窗口大小,以及其他 spouts 和 bolts 的附加处理。
4. 在最坏的情况下,连接一个具有M和N个元素的两个 streams 的窗口,可以产生每个输出元组的MxN元素,每个输出 tuple 从每个输入流锚定到一个 tuple 。这可能意味着来自JoinBolt的大量输出元组和甚至更多的ACK用于下游 spout 发出。这可能会对消息传递系统造成重大压力,如果不小心,则会大大减缓 topology(拓扑)结构。要管理消息传递子系统上的负载,建议:
* 增加 worker 堆(topology.worker.max.heap.size.mb)。
* 如果您的 topology(拓扑)不需要ACK,则禁用ACKers(topology.acker.executors = 0)。
* 禁用事件记录器(topology.eventlogger.executors = 0)。
* 打开拓扑调试(topology.debug = false)。
* 将topology.max.spout设置为一个足够大的值,以容纳估计的全窗口值的 tuple 加上一些更多的余量。这有助于减少在消息传递子系统遇到过载时发出过多元组的端口的可能性。当它的值设置为null时,可能会发生这种情况。
* 最后,将窗口大小保持在解决手头问题所需的最小值。
- Storm 基础
- 概念
- Scheduler(调度器)
- Configuration
- Guaranteeing Message Processing
- 守护进程容错
- 命令行客户端
- Storm UI REST API
- 理解 Storm Topology 的 Parallelism(并行度)
- FAQ
- Layers on Top of Storm
- Storm Trident
- Trident 教程
- Trident API 综述
- Trident State
- Trident Spouts
- Trident RAS API
- Storm SQL
- Storm SQL 集成
- Storm SQL 示例
- Storm SQL 语言参考
- Storm SQL 内部实现
- Flux
- Storm 安装和部署
- 设置Storm集群
- 本地模式
- 疑难解答
- 在生产集群上运行 Topology
- Maven
- 安全地运行 Apache Storm
- CGroup Enforcement
- Pacemaker
- 资源感知调度器 (Resource Aware Scheduler)
- 用于分析 Storm 的各种内部行为的 Metrics
- Windows 用户指南
- Storm 中级
- 序列化
- 常见 Topology 模式
- Clojure DSL
- 使用没有jvm的语言编辑storm
- Distributed RPC
- Transactional Topologies
- Hooks
- Storm Metrics
- Storm 状态管理
- Windowing Support in Core Storm
- Joining Streams in Storm Core
- Storm Distributed Cache API
- Storm 调试
- 动态日志级别设置
- Storm Logs
- 动态员工分析
- 拓扑事件检查器
- Storm 与外部系统, 以及其它库的集成
- Storm Kafka Integration
- Storm Kafka 集成(0.10.x+)
- Storm HBase Integration
- Storm HDFS Integration
- Storm Hive 集成
- Storm Solr 集成
- Storm Cassandra 集成
- Storm JDBC 集成
- Storm JMS 集成
- Storm Redis 集成
- Azue Event Hubs 集成
- Storm Elasticsearch 集成
- Storm MQTT(Message Queuing Telemetry Transport, 消息队列遥测传输) 集成
- Storm MongoDB 集成
- Storm OpenTSDB 集成
- Storm Kinesis 集成
- Storm Druid 集成
- Storm and Kestrel
- Container, Resource Management System Integration
- Storm 高级
- 针对 Storm 定义一个不是 JVM 的 DSL
- 多语言协议
- Storm 内部实现
- 翻译进度