# 概念
本页列出了Storm 的主要概念, 以及可以获取到更多信息的资源链接, 概念如下:
1. Topologies(拓扑)
2. Streams(流)
3. Spouts
4. Bolts
5. Stream groupings(流分组)
6. Reliability(可靠性)
7. Tasks
8. Workers
### Topologies(拓扑)
实时应用程序的逻辑被封装在 Storm topology(拓扑)中. Storm topology(拓扑)类似于 MapReduce 作业. 两者之间关键的区别是 MapReduce 作业最终会完成, 而 topology(拓扑)任务会永远运行(除非 kill 掉它). 一个拓扑是 Spout 和 Bolt 通过 stream groupings 连接起来的有向无环图.这些概念会在下面的段落中具体描述.
**相关资料:**
* [TopologyBuilder](javadocs/org/apache/storm/topology/TopologyBuilder.html): Java中使用这个类来构建 topology(拓扑)
* [如何在生产集群上运行 topologies(拓扑)](Running-topologies-on-a-production-cluster.html)
* [如何使用 local 模式](Local-mode.html): 学习如何用本地模式开发和测试 topology(拓扑)
### Streams(流)
stream 是 Storm 中的核心概念.一个 stream 是一个无界的、以分布式方式并行创建和处理的 Tuple 序列. stream 以一个 schema 来定义, 这个 schema 用来命名 stream tuple(元组)中的字段.默认情况下 Tuple 可以包含 integers, longs, shorts, bytes, strings, doubles, floats, booleans, and byte arrays 等数据类型.你也可以定义自己的 serializers, 以至于可以在 Tuple 中使用自定义的类型.
每一个流在声明的时候会赋予一个 ID. 由于只包含一个 stream 的 Spout 和 Bolt 比较常见, [OutputFieldsDeclarer](javadocs/org/apache/storm/topology/OutputFieldsDeclarer.html) 有更方便的方法可以定义一个单一的 stream 而不用指定ID. 这个 stream 被赋予一个默认的 ID, "default".
**相关资料:**
* [Tuple](javadocs/org/apache/storm/tuple/Tuple.html): stream 由一系列连续的 Tuple 组成
* [OutputFieldsDeclarer](javadocs/org/apache/storm/topology/OutputFieldsDeclarer.html): 用于声明 streams 和它的 schemas.
* [Serialization](Serialization.html): Storm Tuple 的动态类型,和自定义 serializations 的相关信息
### Spouts
Spout 是一个 topology(拓扑)中 streams 的源头. 通常 Spout 会从外部数据源读取 Tuple,然后把他们发送到拓扑中(如 Kestel 队列, 或者 Twitter API). Spout 可以是 **可靠的** 或 **不可靠的**. 可靠的 Spout 在 Storm 处理失败的时候能够重放 Tuple, 不可靠的 Spout 一旦把一个 Tuple 发送出去就撒手不管了.
Spout 可以发送多个流. 可以使用 [OutputFieldsDeclarer](javadocs/org/apache/storm/topology/OutputFieldsDeclarer.html) 的 declareStream 方法定义多个流, 在 [SpoutOutputCollector](javadocs/org/apache/storm/spout/SpoutOutputCollector.html) 对象的 emit 方法中指定要发送到的 stream .
Spout 中的最主要的方法是 `nextTuple`. `nextTuple` 要么向 topology(拓扑)中发送一个新的 Tuple, 要么在没有 Tuple 需要发送的情况下直接返回. 对于任何 Spout 实现, `nextTuple` 方法都必须非阻塞的, 因为 Storm 在一个线程中调用所有的 Spout 方法.
Spout 的另外几个重要的方法是 `ack` 和 `fail`. 这些方法在 Storm 检测到 Spout 发送出去的 Tuple 被成功处理或者处理失败的时候调用. `ack`和`fail`只会在可靠的 Spout 中调用. 更多相关信息, 请参见 [the Javadoc](javadocs/org/apache/storm/spout/ISpout.html).
**相关资料:**
* [IRichSpout](javadocs/org/apache/storm/topology/IRichSpout.html): 创建 Spout 时必须实现的接口
* [Guaranteeing message processing](Guaranteeing-message-processing.html)
### Bolts
拓扑中所有的业务处理都在 Bolts 中完成. Bolt 可以做很多事情,过滤, 函数, 聚合, 关联, 与数据库交互等.
Bolt 可以做简单 stream 转换. 复杂的 stream 转换一般需要多个步骤,因此也就要多个 Bolt 协同工作. 如, 转换一个 tweets stream 为一个 trending images stream 需要两个步骤:一个 Bolt 做每个图片被收藏 的滚动计数,同时一个或者多个 Bolt 输出被收藏 Top X 的图片 (你可以使用更具弹性的方式处理这个 stream 转换, 用3个 Bolt 而不是先前的2个 Bolt ).
Bolt 可以发送多个 stream. 可以使用 [OutputFieldsDeclarer](javadocs/org/apache/storm/topology/OutputFieldsDeclarer.html) 的 `declareStream` 方法定义多个 streams, 并且在使用 [OutputCollector](javadocs/org/apache/storm/task/OutputCollector.html) `emit` 方法的时候指定要发送的 stream.
当你声明一个 Bolt 的 input stream,你总是会订阅其他组件特定的 stream .如果你想要订阅其他组件所有的 streams,你必须一个个的订阅. [InputDeclarer](javadocs/org/apache/storm/topology/InputDeclarer.html) 有语法可以订阅默认 stream-id 的 stream,代码:`declarer.shuffleGrouping ("1")`,意思是: 订阅组件 “1” 的默认 stream, 等价于 `declarer.shuffleGrouping("1", DEFAULT_STREAM_ID)`.
Bolt 中最主要的方法是 `execute` 方法, 当有一个新 Tuple 输入的时候会进入这个方法. Bolt 使用[OutputCollector](javadocs/org/apache/storm/task/OutputCollector.html) 对象发送新的 Tuple. Bolt 必须在每一个 Tuple 处理完以后调用 `OutputCollector` 上的 `ack` 方法, Storm 就会知道 tuple 什么时候完成 (最终可以确定 调用源 Spout Tuple 是没有问题的). 当处理一个输入的 Tuple:会基于这个 Tuple 产生零个或者多个 Tuple 发送出去,当所有的tuple 完成后,会调用 acking. Storm 提供了 [IBasicBolt](javadocs/org/apache/storm/topology/IBasicBolt.html) 接口会自动执行 acking .
最好在 Bolt 中启动新的线程异步处理 tuples. [OutputCollector](javadocs/org/apache/storm/task/OutputCollector.html) 是线程安全的, 并且可以在任何时刻调用.
**相关资料:**
* [IRichBolt](javadocs/org/apache/storm/topology/IRichBolt.html): Bolts 的通用接口
* [IBasicBolt](javadocs/org/apache/storm/topology/IBasicBolt.html): 一个可以使用过滤或者一些简单功能的 Bolt 的接口
* [OutputCollector](javadocs/org/apache/storm/task/OutputCollector.html): Bolts 使用这个类的实例发送 Tuple 到他们的输出流
* [Guaranteeing message processing](Guaranteeing-message-processing.html)
### Stream groupings
topology(拓扑)定义中有一部分是为每一个 bolt 指定输入的 streams . stream grouping 定义了stream 如何在 Bolts tasks 之间分区.
Storm 中一共有8个内置的 Stream Grouping. 可以通过实现 [CustomStreamGrouping](javadocs/org/apache/storm/grouping/CustomStreamGrouping.html) 接口来自定义 Stream groupings.
1. **Shuffle grouping**: Tuple 随机的分发到 Bolt Task, 每个 Bolt 获取到等量的 Tuple.
2. **Fields grouping**: streams 通过 grouping 指定的字段来分区. 例如流通过 "user-id" 字段分区, 具有相同 "user-id" 的 Tuple 会发送到同一个task, 不同 "user-id" 的 Tuple 可能会流入到不同的 tasks.
3. **Partial Key grouping**: stream 通过 grouping 中指定的 field 来分组, 与 Fields Grouping 相似. 但是对于 2 个下游的 Bolt 来说是负载均衡的, 可以在输入数据不平均的情况下提供更好的优化. 以下地址 [This paper](https://melmeric.files.wordpress.com/2014/11/the-power-of-both-choices-practical-load-balancing-for-distributed-stream-processing-engines.pdf) 更好的解释了它是如何工作的及它的优势.
4. **All grouping**: stream 在所有的 Bolt Tasks之间复制. 这个 Grouping 小心使用.
5. **Global grouping**: 整个 stream 会进入 Bolt 其中一个任务.特别指出, 它会进入 id 最小的 task.
6. **None grouping**: 这个 grouping , 你不需要关心 stream 如何分组. 当前, None grouping 和 Shuffle grouping 等价. 同时, Storm 将使用 None grouping 的 bolts 和上游订阅的 bolt和spout 运行在同一个线程 (when possible).
7. **Direct grouping**: 这是一种特殊的 grouping 方式. stream 用这个方式 group 意味着由这个 Tuple 的 **生产者** 来决定哪个 **消费者** 来接收它. Direct grouping 只能被用于 direct streams . 被发射到 direct stream 的 tuple 必须使用 [emitDirect](javadocs/org/apache/storm/task/OutputCollector.html#emitDirect)(int, int, java.util.List) 方法来发送. Bolt 可以使用 [TopologyContext](javadocs/org/apache/storm/task/TopologyContext.html) 或者通过保持对[OutputCollector](javadocs/org/apache/storm/task/OutputCollector.html)(返回 Tuple 被发送到的目标 task id) 中的 `emit` 方法输出的跟踪,获取到它的所有消费者的 ID .
8. **Local or shuffle grouping**: 如果目标 Bolt 有多个 task 和 streams源 在同一个 woker 进程中, Tuple 只会 shuffle 到相同 worker 的任务.否则, 就和 shuffle goruping 一样.
**相关资料:**
* [TopologyBuilder](javadocs/org/apache/storm/topology/TopologyBuilder.html): 使用这个类来定义一个拓扑
* [InputDeclarer](javadocs/org/apache/storm/topology/InputDeclarer.html): 当在 `TopologyBuilder`上调用 `setBolt` 方法的时候返回这个对象, 用于声明一个 Bolt 的 input streams 以及这些 streams 如何分组. ### 可靠性
Storm 保障每一个 Spout 的 Tuple 都会被 topology(拓扑)处理.通过跟踪 tuples tree,每个 spout tuple 都会触发 tree , 确保 tuples tree 成功完成. 每一个拓扑都有一个关联的“message timeout”. 如果 Storm 检测到一个 Spout Tuple 没有在这个超时时间内被处理完成, 则判定这个 Tuple 失败, 稍后重新执行.
要利用这个可靠性的功能, 当在 Tuple tree 中创建一个新的 edge ,必须告诉Storm,并且在一个单独的 tuple 完成时也要通知 Storm. 以上操作在 Bolt 用于发送 Tuple 的 [OutputCollector](javadocs/org/apache/storm/task/OutputCollector.html) 对象中完成这个操作. Anchoring(锚点)在 `emit` 方法中完成, 使用 `ack` 方法来声明你已经成功完成了一个 Tuple 的处理.
更详细的说明, 请参阅 [保证消息容错](Guaranteeing-message-processing.html).
### Tasks
每个 Spout 或者 Bolt 都以跨集群的多个 Task 方式执行. 每个 Task 对应一个 execution 的线程, stream groupings 定义如何从一个 Task 发送 Tuple 到另一个 Task. 可以在 [TopologyBuilder](http://storm.apache.org/releases/1.1.1/javadocs/org/apache/storm/topology/TopologyBuilder.html) 的`setSpout` 和 `setBolt` 方法中为每个 Spout 或者 Bolt 设置并行度,.
### Workers
Topologies (拓扑)在一个或者跨多个 worker 执行. 每个 Worker 进程是一个物理的 JVM, 执行 topology(拓扑) Tasks 中的一个子集. 例如, 如果一个拓扑的并行度是 300, 共有 50 个 Worker 在运行, 每个 Worker 会分配到 6 个 Task(作为 Worker 中的线程). Storm 会尽量把所有 Task 均匀的分配到所有的 Worker 上.
**相关资料:**
* [Config.TOPOLOGY_WORKERS](javadocs/org/apache/storm/Config.html#TOPOLOGY_WORKERS): 这个配置项设置用于运行 topology(拓扑)的 worker 数量.
- Storm 基础
- 概念
- Scheduler(调度器)
- Configuration
- Guaranteeing Message Processing
- 守护进程容错
- 命令行客户端
- Storm UI REST API
- 理解 Storm Topology 的 Parallelism(并行度)
- FAQ
- Layers on Top of Storm
- Storm Trident
- Trident 教程
- Trident API 综述
- Trident State
- Trident Spouts
- Trident RAS API
- Storm SQL
- Storm SQL 集成
- Storm SQL 示例
- Storm SQL 语言参考
- Storm SQL 内部实现
- Flux
- Storm 安装和部署
- 设置Storm集群
- 本地模式
- 疑难解答
- 在生产集群上运行 Topology
- Maven
- 安全地运行 Apache Storm
- CGroup Enforcement
- Pacemaker
- 资源感知调度器 (Resource Aware Scheduler)
- 用于分析 Storm 的各种内部行为的 Metrics
- Windows 用户指南
- Storm 中级
- 序列化
- 常见 Topology 模式
- Clojure DSL
- 使用没有jvm的语言编辑storm
- Distributed RPC
- Transactional Topologies
- Hooks
- Storm Metrics
- Storm 状态管理
- Windowing Support in Core Storm
- Joining Streams in Storm Core
- Storm Distributed Cache API
- Storm 调试
- 动态日志级别设置
- Storm Logs
- 动态员工分析
- 拓扑事件检查器
- Storm 与外部系统, 以及其它库的集成
- Storm Kafka Integration
- Storm Kafka 集成(0.10.x+)
- Storm HBase Integration
- Storm HDFS Integration
- Storm Hive 集成
- Storm Solr 集成
- Storm Cassandra 集成
- Storm JDBC 集成
- Storm JMS 集成
- Storm Redis 集成
- Azue Event Hubs 集成
- Storm Elasticsearch 集成
- Storm MQTT(Message Queuing Telemetry Transport, 消息队列遥测传输) 集成
- Storm MongoDB 集成
- Storm OpenTSDB 集成
- Storm Kinesis 集成
- Storm Druid 集成
- Storm and Kestrel
- Container, Resource Management System Integration
- Storm 高级
- 针对 Storm 定义一个不是 JVM 的 DSL
- 多语言协议
- Storm 内部实现
- 翻译进度