ThinkChat2.0新版上线,更智能更精彩,支持会话、画图、阅读、搜索等,送10W Token,即刻开启你的AI之旅 广告
## **数据流组** 设计一个拓扑时,你要做的最重要的事情之一就是定义如何在各组件之间交换数据(数据流是如何被*bolts*消费的)。一个*数据流组*指定了每个*bolt*会消费哪些数据流,以及如何消费它们。 **NOTE**:一个节点能够发布一个以上的数据流,一个数据流组允许我们选择接收哪个。 数据流组在定义拓扑时设置,就像我们在[第二章](http://ifeve.com/getting-started-with-storm-2/#more-10677)看到的: ~~~ ··· builder.setBolt("word-normalizer", new WordNormalizer()) .shuffleGrouping("word-reader"); ··· ~~~ 在前面的代码块里,一个*bolt*由**TopologyBuilder**对象设定, 然后使用随机数据流组指定数据源。数据流组通常将数据源组件的ID作为参数,取决于数据流组的类型不同还有其它可选参数。 **NOTE:**每个**InputDeclarer**可以有一个以上的数据源,而且每个数据源可以分到不同的组。 ### [](https://github.com/runfriends/GettingStartedWithStorm-cn/blob/master/chapter3/%E6%8B%93%E6%89%91.md#%E9%9A%8F%E6%9C%BA%E6%95%B0%E6%8D%AE%E6%B5%81%E7%BB%84)**随机数据流组** 随机流组是最常用的数据流组。它只有一个参数(数据源组件),并且数据源会向随机选择的*bolt*发送元组,保证每个消费者收到近似数量的元组。 随机数据流组用于数学计算这样的原子操作。然而,如果操作不能被随机分配,就像[第二章](https://github.com/runfriends/GettingStartedWithStorm-cn/blob/master/chapter2/Getting%20Started.md)为单词计数的例子,你就要考虑其它分组方式了。 ### [](https://github.com/runfriends/GettingStartedWithStorm-cn/blob/master/chapter3/%E6%8B%93%E6%89%91.md#%E5%9F%9F%E6%95%B0%E6%8D%AE%E6%B5%81%E7%BB%84)**域数据流组** 域数据流组允许你基于元组的一个或多个域控制如何把元组发送给*bolts*。它保证拥有相同域组合的值集发送给同一个*bolt*。回到单词计数器的例子,如果你用*word*域为数据流分组,**word-normalizer** *bolt*将只会把相同单词的元组发送给同一个**word-counter***bolt*实例。 ~~~ ··· builder.setBolt("word-counter", new WordCounter(),2) .fieldsGrouping("word-normalizer", new Fields("word")); ··· ~~~ **NOTE:** 在域数据流组中的所有域集合必须存在于数据源的域声明中。 ### [](https://github.com/runfriends/GettingStartedWithStorm-cn/blob/master/chapter3/%E6%8B%93%E6%89%91.md#%E5%85%A8%E9%83%A8%E6%95%B0%E6%8D%AE%E6%B5%81%E7%BB%84)**全部数据流组** 全部数据流组,为每个接收数据的实例复制一份元组副本。这种分组方式用于向*bolts*发送信号。比如,你要刷新缓存,你可以向所有的*bolts*发送一个*刷新缓存信号*。在单词计数器的例子里,你可以使用一个全部数据流组,添加清除计数器缓存的功能(见[拓扑示例](https://github.com/storm-book/examples-ch03-topologies)) ~~~ public void execute(Tuple input) { String str = null; try{ if(input.getSourceStreamId().equals("signals")){ str = input.getStringByField("action"); if("refreshCache".equals(str)) counters.clear(); } }catch (IllegalArgumentException e){ //什么也不做 } ··· } ~~~ 我们添加了一个if分支,用来检查源数据流。Storm允许我们声明具名数据流(如果你不把元组发送到一个具名数据流,默认发送到名为”**default**“的数据流)。这是一个识别元组的极好的方式,就像这个例子中,我们想识别**signals**一样。 在拓扑定义中,你要向**word-counter** *bolt*添加第二个数据流,用来接收从**signals-spout**数据流发送到所有*bolt*实例的每一个元组。 ~~~ builder.setBolt("word-counter", new WordCounter(),2) .fieldsGroupint("word-normalizer",new Fields("word")) .allGrouping("signals-spout","signals"); ~~~ **signals-spout**的实现请参考[git仓库](https://github.com/storm-book/examples-ch03-topologies)。 ### [](https://github.com/runfriends/GettingStartedWithStorm-cn/blob/master/chapter3/%E6%8B%93%E6%89%91.md#%E8%87%AA%E5%AE%9A%E4%B9%89%E6%95%B0%E6%8D%AE%E6%B5%81%E7%BB%84)**自定义数据流组** 你可以通过实现**backtype.storm.grouping.CustormStreamGrouping**接口创建自定义数据流组,让你自己决定哪些*bolt*接收哪些元组。 让我们修改单词计数器示例,使首字母相同的单词由同一个*bolt*接收。 ~~~ public class ModuleGrouping mplents CustormStreamGrouping, Serializable{ int numTasks = 0; @Override public List<Integer> chooseTasks(List<Object> values) { List<Integer> boltIds = new ArrayList<Integer>(); if(values.size()>0){ String str = values.get(0).toString(); if(str.isEmpty()){ boltIds.add(0); }else{ boltIds.add(str.charAt(0) % numTasks); } } return boltIds; } @Override public void prepare(TopologyContext context, Fields outFields, List<Integer> targetTasks) { numTasks = targetTasks.size(); } } ~~~ 这是一个**CustomStreamGrouping**的简单实现,在这里我们采用单词首字母字符的整数值与任务数的余数,决定接收元组的*bolt*。 按下述方式**word-normalizer**修改即可使用这个自定义数据流组。 ~~~ builder.setBolt("word-normalizer", new WordNormalizer()) .customGrouping("word-reader", new ModuleGrouping()); ~~~ ### [](https://github.com/runfriends/GettingStartedWithStorm-cn/blob/master/chapter3/%E6%8B%93%E6%89%91.md#%E7%9B%B4%E6%8E%A5%E6%95%B0%E6%8D%AE%E6%B5%81%E7%BB%84)**直接数据流组** 这是一个特殊的数据流组,数据源可以用它决定哪个组件接收元组。与前面的例子类似,数据源将根据单词首字母决定由哪个*bolt*接收元组。要使用直接数据流组,在**WordNormalizer** *bolt*中,使用**emitDirect**方法代替**emit**。 ~~~ public void execute(Tuple input) { ... for(String word : words){ if(!word.isEmpty()){ ... collector.emitDirect(getWordCountIndex(word),new Values(word)); } } //对元组做出应答 collector.ack(input); } public Integer getWordCountIndex(String word) { word = word.trim().toUpperCase(); if(word.isEmpty()){ return 0; }else{ return word.charAt(0) % numCounterTasks; } } ~~~ 在**prepare**方法中计算任务数 ~~~ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { this.collector = collector; this.numCounterTasks = context.getComponentTasks("word-counter"); } ~~~ 在拓扑定义中指定数据流将被直接分组: ~~~ builder.setBolt("word-counter", new WordCounter(),2) .directGrouping("word-normalizer"); ~~~ ### [](https://github.com/runfriends/GettingStartedWithStorm-cn/blob/master/chapter3/%E6%8B%93%E6%89%91.md#%E5%85%A8%E5%B1%80%E6%95%B0%E6%8D%AE%E6%B5%81%E7%BB%84)**全局数据流组** 全局数据流组把所有数据源创建的元组发送给单一目标实例(即拥有最低ID的任务)。 ### [](https://github.com/runfriends/GettingStartedWithStorm-cn/blob/master/chapter3/%E6%8B%93%E6%89%91.md#%E4%B8%8D%E5%88%86%E7%BB%84)**不分组** 写作本书时(Stom0.7.1版),这个数据流组相当于随机数据流组。也就是说,使用这个数据流组时,并不关心数据流是如何分组的。 ### [](https://github.com/runfriends/GettingStartedWithStorm-cn/blob/master/chapter3/%E6%8B%93%E6%89%91.md#localcluster-vs-stormsubmitter)**LocalCluster VS StormSubmitter** 到目前为止,你已经用一个叫做**LocalCluster**的工具在你的本地机器上运行了一个拓扑。Storm的基础工具,使你能够在自己的计算机上方便的运行和调试不同的拓扑。但是你怎么把自己的拓扑提交给运行中的Storm集群呢?Storm有一个有趣的功能,在一个真实的集群上运行自己的拓扑是很容易的事情。要实现这一点,你需要把**LocalCluster**换成**StormSubmitter**并实现**submitTopology**方法, 它负责把拓扑发送给集群。 下面是修改后的代码: ~~~ //LocalCluster cluster = new LocalCluster(); //cluster.submitTopology("Count-Word-Topology-With-Refresh-Cache", conf, //builder.createTopology()); StormSubmitter.submitTopology("Count-Word-Topology-With_Refresh-Cache", conf, builder.createTopology()); //Thread.sleep(1000); //cluster.shutdown(); ~~~ **NOTE:** 当你使用**StormSubmitter**时,你就不能像使用**LocalCluster**时一样通过代码控制集群了。 接下来,把源码压缩成一个jar包,运行Storm客户端命令,把拓扑提交给集群。如果你已经使用了Maven, 你只需要在命令行进入源码目录运行:**mvn package**。 现在你生成了一个jar包,使用**storm jar**命令提交拓扑(关于如何安装Storm客户端请参考[附录A](https://github.com/runfriends/GettingStartedWithStorm-cn/blob/master/appendix/A.md))。命令格式:**storm jar allmycode.jar org.me.MyTopology arg1 arg2 arg3**。 对于这个例子,在拓扑工程目录下面运行: ~~~ storm jar target/Topologies-0.0.1-SNAPSHOT.jar countword.TopologyMain src/main/resources/words.txt ~~~ 通过这些命令,你就把拓扑发布集群上了。 如果想停止或杀死它,运行: ~~~ storm kill Count-Word-Topology-With-Refresh-Cache ~~~ **NOTE:**拓扑名称必须保证惟一性。 **NOTE:**如何安装Storm客户端,参考[附录A](https://github.com/runfriends/GettingStartedWithStorm-cn/blob/master/appendix/A.md) ### **DRPC拓扑** 有一种特殊的拓扑类型叫做分布式远程过程调用(DRPC),它利用Storm的分布式特性执行远程过程调用(RPC)(见下图)。Storm提供了一些用来实现DRPC的工具。第一个是DRPC服务器,它就像是客户端和Storm拓扑之间的连接器,作为拓扑的*spout*的数据源。它接收一个待执行的函数和函数参数,然后对于函数操作的每一个数据块,这个服务器都会通过拓扑分配一个请求ID用来识别RPC请求。拓扑执行最后的*bolt*时,它必须分配RPC请求ID和结果,使DRPC服务器把结果返回正确的客户端。 [![](https://box.kancloud.cn/2015-09-21_55ffed6591b64.png)](http://ifeve.com/getting-started-with-storm-3/figure3-1-drpc-topology-schema/) **NOTE:**单实例DRPC服务器能够执行许多函数。每个函数由一个惟一的名称标识。 Storm提供的第二个工具(已在例子中用过)是**LineDRPCTopologyBuilder**,一个辅助构建DRPC拓扑的抽象概念。生成的拓扑创建**DRPCSpouts**——它连接到DRPC服务器并向拓扑的其它部分分发数据——并包装*bolts*,使结果从最后一个*bolt*返回。依次执行所有添加到**LinearDRPCTopologyBuilder**对象的*bolts*。 作为这种类型的拓扑的一个例子,我们创建了一个执行加法运算的进程。虽然这是一个简单的例子,但是这个概念可以扩展到复杂的分布式计算。 *bolt*按下面的方式声明输出: ~~~ public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id","result")); } ~~~ 因为这是拓扑中惟一的*bolt*,它必须发布RPC ID和结果。**execute**方法负责执行加法运算。 ~~~ public void execute(Tuple input) { String[] numbers = input.getString(1).split("\\+"); Integer added = 0; if(numbers.length<2){ throw new InvalidParameterException("Should be at least 2 numbers"); } for(String num : numbers){ added += Integer.parseInt(num); } collector.emit(new Values(input.getValue(0),added)); } ~~~ 包含加法*bolt*的拓扑定义如下: ~~~ public static void main(String[] args) { LocalDRPC drpc = new LocalDRPC(); LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("add"); builder.addBolt(AdderBolt(),2); Config conf = new Config(); conf.setDebug(true); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("drpcder-topology", conf, builder.createLocalTopology(drpc)); String result = drpc.execute("add", "1+-1"); checkResult(result,0); result = drpc.execute("add", "1+1+5+10"); checkResult(result,17); cluster.shutdown(); drpc.shutdown(); } ~~~ 创建一个**LocalDRPC**对象在本地运行DRPC服务器。接下来,创建一个拓扑构建器(译者注:LineDRpctopologyBuilder对象),把*bolt*添加到拓扑。运行DRPC对象(LocalDRPC对象)的**execute**方法测试拓扑。 **NOTE:**使用**DRPCClient**类连接远程DRPC服务器。DRPC服务器暴露了[Thrift API](http://thrift.apache.org/),因此可以跨语言编程;并且不论是在本地还是在远程运行DRPC服务器,它们的API都是相同的。 对于采用Storm配置的DRPC配置参数的Storm集群,调用构建器对象的**createRemoteTopology**向Storm集群提交一个拓扑,而不是调用**createLocalTopology**。 **原创文章,转载请注明:** 转载自[并发编程网 – ifeve.com](http://ifeve.com/) **本文链接地址:** [Storm入门之第三章拓扑](http://ifeve.com/getting-started-with-storm-3/)