🔥码云GVP开源项目 12k star Uniapp+ElementUI 功能强大 支持多语言、二开方便! 广告
[TOC] # 并行度 当storm程序中某一个环节的计算能力跟不上的时候,需要增加多个线程同时执行,由此会产生一个概念叫并行度 **Bolt的个数** ![](https://box.kancloud.cn/ce5cb0738a22af9b09c9927c3421061e_1118x112.png) **work的个数(也就是几个jvm)** ~~~ config.setNumWorkers(2); ~~~ 如果设置了work和Bolt的个数,Bolt会均匀的分布在work上 ## 并行度和work数设置现象 ![](https://box.kancloud.cn/fe6d65621fddbf6aed4b36af50b7b1d0_1612x592.png) ![](https://box.kancloud.cn/d0f2d75388ca2b4e81163dc9bf716b09_1767x183.png) 可以点进去看executors是线程 ![](https://box.kancloud.cn/0742e50459580a44312060f9e4179929_538x380.png) ![](https://box.kancloud.cn/917133fcecb66e42f32377901c1f0e2c_130x167.png) 一般将多个并行度中的实例,叫做task. 默认情况下,一个bolt的并行度是4,代表了4个task # Stream grouping Stream grouping:即消息的partition方法。 Stream Grouping定义了一个流在Bolt任务间该如何被切分。这里有Storm提供的6个Stream Grouping类型: 1. 随机分组(Shuffle grouping):随机分发tuple到Bolt的任务,保证每个任务获得相等数量的tuple。 2. 字段分组(Fields grouping):根据指定字段分割数据流,并分组。例如,根据“user-id”字段,相同“user-id”的元组总是分发到同一个任务,不同“user-id”的元组可能分发到不同的任务。 3. 全复制分组(All grouping):所有tuple被复制到所有bolt的所有任务。这种类型需要谨慎使用。每个订阅的数据流的task都会接收到tuple的拷贝. 4. 全局分组(Global grouping):将所有的tuples路由到唯一一个task上。明确地说,是分配给ID最小的那个task。注意当使用全局分组方式 5. 无分组(None grouping):你不需要关心流是如何分组。目前,无分组等效于随机分组。但最终,Storm将把无分组的Bolts放到Bolts或Spouts订阅它们的同一线程去执行(如果可能)。 6. 直接分组(Direct grouping):这是一个特别的分组类型。元组生产者决定tuple由哪个元组处理者任务接收。 # 并发机制 ## 概念 * Workers (JVMs): 在一个物理节点上可以运行一个或多个独立的JVM 进程。一个Topology可以包含一个或多个worker(并行的跑在不同的物理机上), 所以worker process就是执行一个topology的子集, 并且worker只能对应于一个topology  * Executors (threads): 在一个worker JVM进程中运行着多个Java线程。一个executor线程可以执行一个或多个tasks。但一般默认每个executor只执行一个task。一个worker可以包含一个或多个executor, 每个component (spout或bolt)至少对应于一个executor, 所以可以说executor执行一个compenent的子集, 同时一个executor只能对应于一个component。  * Tasks(bolt/spout instances):Task就是具体的处理逻辑对象,每一个Spout和Bolt会被当作很多task在整个集群里面执行。每一个task对应到一个线程,而stream grouping则是定义怎么从一堆task发射tuple到另外一堆task。你可以调用TopologyBuilder.setSpout和TopBuilder.setBolt来设置并行度 — 也就是有多少个task。 ## 配置并行度 * 对于并发度的配置, 在storm里面可以在多个地方进行配置, 优先级为:defaults.yaml < storm.yaml < topology-specific configuration < internal component-specific configuration < external component-specific configuration  * worker processes的数目, 可以通过配置文件和代码中配置, worker就是执行进程, 所以考虑并发的效果, 数目至少应该大于machines的数目  * executor的数目, component的并发线程数,只能在代码中配置(通过setBolt和setSpout的参数), 例如, setBolt("green-bolt", new GreenBolt(), 2)  * tasks的数目, 可以不配置, 默认和executor1:1, 也可以通过setNumTasks()配置Topology的worker数通过config设置,即执行该topology的worker(java)进程数。它可以通过 storm rebalance 命令任意调整。 ![](https://box.kancloud.cn/19a8a6e15141a75194401f1e2843e001_754x364.png) ![](https://box.kancloud.cn/bb81f4d8f1cb14346ffd71543b04b9fd_825x764.png) 3个组件的并发度加起来是10,就是说拓扑一共有10个executor,一共有2个worker,每个worker产生10 / 2 = 5条线程。 绿色的bolt配置成2个executor和4个task。为此每个executor为这个bolt运行2个task。 ## 动态改变并行度 * 动态的改变并行度 Storm支持在不 restart topology 的情况下, 动态的改变(增减) worker processes 的数目和 executors 的数目, 称为rebalancing. 通过Storm web UI,或者通过storm rebalance命令实现:  ~~~ storm rebalance mytopology -n 5 -e blue-spout=3 -e yellow-bolt=10 ~~~