[TOC] # Storm简介及安装 ## 1 概述 > 在过去的十年里,数据处理发生了革命性的变化。MapReduce,Hadoop,以及相关的技术使我们可以存储和处理以前不可想象规模的数据。很遗憾,这些数据处理系统都不是实时系统,命中注定也不是它们。根本没办法把Hadoop变成一个实时系统;实时数据处理和批处理的许多要求在根本上有很大不同。 > 然而,企业对大规模实时数据处理要求越来越多。缺乏“实时Hadoop”是数据处理生态系统中最大的窘境。 > Storm解决了这个窘境。 > Storm之前,你通常必须手动建立一个由许多队列和许多worker组成的网络来实现实时处理。worker处理队列消息,更新数据库,发送新消息给其它队列以供后续处理。很遗憾, > 这种方法有很大的局限性: 1. 乏味:你大部份开发时间花费在配置消息发送,部署worker,部署中间队列。你关心的实时处理逻辑对应到你的代码的比例相对较小 。 2. 脆弱:没有多少容错。你负责保持每个worker和队列正常工作。 3. 痛苦伸缩:当单个worker或队列的消息吞吐量太高时,你需要分区,即数据如何分散。你需要重新配置其它worker,让它们发送消息到新位置。这导致删除或添加部件都可能失败。 4. 虽然队列+workers的范式能解决大量的消息,消息处理显然是实时计算的基本范式。 > 问题是:你要怎么做,才能在某种程度上保证数据不会丢失,对海量消息轻松扩容,并且使用和运营工作都超级简单呢? > Storm满足这些目标。 > Storm公开(expose)一组实时计算原语。类似MapReduce极大地简化了编写并行批处理程序,storm的原语极大地简化了编写并行实时计算程序。 > Storm的关键特性: 1. 用例非常广泛:Storm可用于处理消息和更新数据库(流处理),在数据流上进行持续查询,并以流的形式返回结果到客户端(持续计算),并行化一个类似实时查询的热点查询(分布式的RPC),还有更多的用例。Storm的一组很小的原语满足了惊人数量的用例。 2. 可伸缩:Storm随时都可对大规模消息进行扩容。扩容一个拓扑,你只需要添加机器和增加的拓扑结构的并行设置。看一个storm规模的例子,一个storm集群有10个节点,一个最初的Storm应用每秒可以处理1,000,000个消息(指spout和bolt总共发射的消息总和),拓扑的其中一部分每秒数有数百个数据库调用。Storm使用Zookeeper协调集群,使其集群可以扩容到非常大。 3. 保证数据不丢失:实时系统必须对成功处理数据提供有力保证 。系统丢弃数据的用例非常有限。Storm保证每个消息都被处理,这直接与其它系统截然不同,如S4。 4. 非常健壮:Storm与Hadoop不同,Hadoop难于管理早已臭名昭著,Storm集群只是干活。使用户尽可能方便地管理storm集群是storm项目的一个明确目标。 5. 容错:计算的执行过程中如果发生故障,Storm将在必要时重新分配任务。Storm确保计算永远运行(或者直到你kill此计算) 。 6. 编程语言无关性:健壮和可伸缩的实时处理不应仅限于一个单一的平台。Storm的拓扑结构和处理组件可以用任何语言定义,对任何人而言,Storm都是易接受的。 > 更多信息请参考:http://www.maoxiangyi.cn/index.php/archives/337 ## 2 Storm ### 1 特性 >   对比Hadoop的批处理,Storm是个实时的、分布式以及具备高容错的计算系统。同Hadoop一样Storm也可以处理大批量的数据,然而Storm在保证高可靠性的前提下还可以让处理进行的更加实时;也就是说,所有的信息都会被处理。Storm同样还具备容错和分布计算这些特性,这就让Storm可以扩展到不同的机器上进行大批量的数据处理。他同样还有以下的这些特性: 1.   ·易于扩展。对于扩展,你只需要添加机器和改变对应的topology(拓扑)设置。Storm使用Hadoop Zookeeper进行集群协调,这样可以充分的保证大型集群的良好运行。 2.   ·每条信息的处理都可以得到保证。 3.   ·Storm集群管理简易。 4.   ·Storm的容错机能:一旦topology递交,Storm会一直运行它直到topology被废除或者被关闭。而在执行中出现错误时,也会由Storm重新分配任务。 5.   ·尽管通常使用Java,Storm中的topology可以用任何语言设计。 6.   当然为了更好的理解文章,你首先需要安装和设置Storm。需要通过以下几个简单的步骤: 7.   ·从Storm官方下载Storm安装文件 8.   ·将bin/directory解压到你的PATH上,并保证bin/storm脚本是可执行的。 ### 2 Storm组件 > Storm集群主要由一个主节点和一群工作节点(worker node)组成,通过 Zookeeper进行协调。 >  主节点: *   主节点通常运行一个后台程序 —— Nimbus,用于响应分布在集群中的节点,分配任务和监测故障。这个很类似于Hadoop中的Job Tracker。 > 工作节点: *   工作节点同样会运行一个后台程序 —— Supervisor,用于收听工作指派并基于要求运行工作进程。每个工作节点都是topology中一个子集的实现。而Nimbus和Supervisor之间的协调则通过Zookeeper系统或者集群。 ### 4 Zookeeper > Zookeeper是完成Supervisor和Nimbus之间协调的服务。而应用程序实现实时的逻辑则被封装进Storm中的“topology”。topology则是一组由Spouts(数据源)和Bolts(数据操作)通过Stream Groupings进行连接的图。 ### 4 编程模型 spout、Bolt、Stream Groupings >   Spout *   简而言之,Spout从来源处读取数据并放入topology。Spout分成可靠和不可靠两种;当Storm接收失败时,可靠的Spout会对tuple(元组,数据项组成的列表)进行重发;而不可靠的Spout不会考虑接收成功与否只发射一次。而Spout中最主要的方法就是nextTuple(),该方法会发射一个新的tuple到topology,如果没有新tuple发射则会简单的返回。 >   Bolt *   Topology中所有的处理都由Bolt完成。Bolt可以完成任何事,比如:连接的过滤、聚合、访问文件/数据库、等等。Bolt从Spout中接收数据并进行处理,如果遇到复杂流的处理也可能将tuple发送给另一个Bolt进行处理。而Bolt中最重要的方法是execute(),以新的tuple作为参数接收。不管是Spout还是Bolt,如果将tuple发射成多个流,这些流都可以通过declareStream()来声明。 >   Stream Groupings *   Stream Grouping定义了一个流在Bolt任务间该如何被切分。这里有Storm提供的6个Stream Grouping类型: 1. 随机分组(Shuffle grouping):随机分发tuple到Bolt的任务,保证每个任务获得相等数量的tuple。 2. 字段分组(Fields grouping):根据指定字段分割数据流,并分组。例如,根据“user-id”字段,相同“user-id”的元组总是分发到同一个任务,不同“user-id”的元组可能分发到不同的任务。 3. 全部分组(All grouping):tuple被复制到bolt的所有任务。这种类型需要谨慎使用。 4. 全局分组(Global grouping):全部流都分配到bolt的同一个任务。明确地说,是分配给ID最小的那个task。 5. 无分组(None grouping):你不需要关心流是如何分组。目前,无分组等效于随机分组。但最终,Storm将把无分组的Bolts放到Bolts或Spouts订阅它们的同一线程去执行(如果可能)。 6. 直接分组(Direct grouping):这是一个特别的分组类型。元组生产者决定tuple由哪个元组处理者任务接收。 >  当然还可以实现CustomStreamGroupimg接口来定制自己需要的分组。 ## 3 Storm 伪分布式环境搭建 ### 1、下载安装包 ~~~ wget http://124.202.164.6/files/1139000006794ECA/apache.fayea.com/storm/apache-storm-0.9.5/apache-storm-0.9.5.tar.gz ~~~ ### 2、解压安装包 ~~~ tar -zxvf apache-storm-0.9.5.tar.gz -C /export/servers/ cd /export/servers/ ln -s apache-storm-0.9.5 storm ~~~ ### 3、修改配置文件 ~~~ mv /export/servers/storm/conf/storm.yaml /export/servers/storm/conf/storm.yaml.bak vi /export/servers/storm/conf/storm.yaml ~~~ > 输入以下内容: ![](https://box.kancloud.cn/453c16701cfd11896fe9677c8d630b44_554x252.png) ~~~ #指定storm使用的zk集群 storm.zookeeper.servers: - "zk01" - "zk02" - "zk03" #指定storm集群中的nimbus节点所在的服务器 nimbus.host: "storm01" #指定nimbus启动JVM最大可用内存大小 nimbus.childopts: "-Xmx1024m" #指定supervisor启动JVM最大可用内存大小 supervisor.childopts: "-Xmx1024m" #指定supervisor节点上,每个worker启动JVM最大可用内存大小 worker.childopts: "-Xmx768m" #指定ui启动JVM最大可用内存大小,ui服务一般与nimbus同在一个节点上。 ui.childopts: "-Xmx768m" #指定supervisor节点上,启动worker时对应的端口号,每个端口对应槽,每个槽位对应一个worker supervisor.slots.ports: - 6700 - 6701 - 6702 - 6703 ~~~ > 注:配置文件每一行前面必须加一个‘空格’不然会有问题 ### 4、分发安装包 ~~~ scp -r /export/servers/apache-storm-0.9.5 storm02:/export/servers ~~~ > 然后分别在各机器上创建软连接 ~~~ cd /export/servers/ ln -s apache-storm-0.9.5 storm ~~~ ### 5、启动集群 * 在nimbus.host所属的机器上启动 nimbus服务 ~~~ cd /export/servers/storm/bin/ nohup ./storm nimbus & ~~~ * 在nimbus.host所属的机器上启动ui服务 ~~~ cd /export/servers/storm/bin/ nohup ./storm ui & ~~~ * 在其它个点击上启动supervisor服务 ~~~ cd /export/servers/storm/bin/ nohup ./storm supervisor & ~~~ ### 6、查看集群 > 访问nimbus.host:/8080,即可看到storm的ui界面。 ![](https://box.kancloud.cn/887ae8a07ad50e341dd1aa037152474e_554x262.png) ### 7、Storm常用操作命令 > 有许多简单且有用的命令可以用来管理拓扑,它们可以提交、杀死、禁用、再平衡拓扑。 1. 提交任务命令格式:storm jar 【jar路径】 【拓扑包名.拓扑类名】 【拓扑名称】 ~~~ bin/storm jar examples/storm-starter/storm-starter-topologies-0.10.0.jar storm.starter.WordCountTopology wordcount ~~~ 2) 杀死任务命令格式:storm kill 【拓扑名称】 -w 10(执行kill命令时可以通过-w [等待秒数]指定拓扑停用以后的等待时间) ~~~ storm kill topology-name -w 10 ~~~ 3) 停用任务命令格式:storm deactivte 【拓扑名称】 ~~~ storm deactivte topology-name ~~~ > 我们能够挂起或停用运行中的拓扑。当停用拓扑时,所有已分发的元组都会得到处理,但是spouts的nextTuple方法不会被调用。销毁一个拓扑,可以使用kill命令。它会以一种安全的方式销毁一个拓扑,首先停用拓扑,在等待拓扑消息的时间段内允许拓扑完成当前的数据流。 4) 启用任务命令格式:storm activate【拓扑名称】 ~~~ storm activate topology-name ~~~ 5) 重新部署任务命令格式:storm rebalance 【拓扑名称】 ~~~ storm rebalance topology-name ~~~ > 再平衡使你重分配集群任务。这是个很强大的命令。比如,你向一个运行中的集群增加了节点。再平衡命令将会停用拓扑,然后在相应超时时间之后重分配worker,并重启拓扑。 ## Storm编程快速入门资源图片 ![](https://box.kancloud.cn/c2bd9237197b61fc49e0f0e9f70f010f_3000x11000.png)