[TOC]
# 数据样本
~~~
i am jdxia
i am xjd
i am jdxia
i am jelly
~~~
# jar包
~~~
<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-core</artifactId>
<!--<scope>provided</scope>-->
<version>0.9.5</version>
</dependency>
~~~
安装log4j
# 数据获取
~~~
package com.learnstorm;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import org.apache.commons.lang.StringUtils;
import java.io.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
//数据获取
public class MyLocalFileSpout extends BaseRichSpout {
private SpoutOutputCollector collector;
private BufferedReader bufferedReader;
//初始化方法
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
try {
//定义这个去读取数据
this.bufferedReader = new BufferedReader(new FileReader(new File("/Users/jdxia/Desktop/MyFile/i.txt")));
} catch (FileNotFoundException e) {
e.printStackTrace();
}
}
//storm流式计算的特征就是数据一条一条的处理
// while(true) {
// this.nextTuple();
// }
//这个方法会被循环调用
@Override
public void nextTuple() {
//每被调用一次就会发送一条数据出去
try {
//读取一行
String line = bufferedReader.readLine();
//如果不是空的话
if (StringUtils.isNotBlank(line)) {
List<Object> arrayList = new ArrayList<Object>();
//把数据放到ArrayList中
arrayList.add(line);
//把数据发出去
collector.emit(arrayList);
}
} catch (IOException e) {
e.printStackTrace();
}
}
//定义下我的输出
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("juzi"));
}
}
~~~
# 数据截取
~~~
package com.learnstorm;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
//相当于map-->world,1
//业务逻辑
//对句子进行切割
public class MySplitBolt extends BaseBasicBolt {
//处理函数
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
//1.数据如何获取,用tuple获取
//强转为string,juzi是上一步定义的
String juzi = (String) tuple.getValueByField("juzi");
//2.进行切割
String[] strings = juzi.split(" ");
//3.发送数据
for (String word : strings) {
//我们之前用ArrayList存储,这边怎么变为Values
//可以看下Values的源码,他是继承了ArrayList,他存的时候用了一个循环
basicOutputCollector.emit(new Values(word, 1));
}
}
//定义下我的输出
//单词world和他的次数
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declare(new Fields("word", "num"));
}
}
~~~
# 单词统计
~~~
package com.learnstorm;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Tuple;
import java.util.HashMap;
import java.util.Map;
//打印
public class MyWordCountAndPrintBolt extends BaseBasicBolt {
private Map<String, Integer> wordCountMap = new HashMap<String, Integer>();
//处理函数
@Override
public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) {
//根据之前定义的word和num
//强转为string
String word = (String) tuple.getValueByField("word");
Integer num = (Integer) tuple.getValueByField("num");
//1.查看单词对应的value是否存在
Integer integer = wordCountMap.get(word);
if (integer == null || integer.intValue() == 0) {
//如果不存在就直接放入新的
wordCountMap.put(word, num);
} else {
//如果之前已经有了,就把对应统计加上
wordCountMap.put(word, integer.intValue() + num);
}
System.out.println(wordCountMap);
}
//不需要定义输出字段了
@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
}
~~~
# 任务描述
这边写的是本地提交到集群
~~~
package com.learnstorm;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.StormTopology;
import backtype.storm.topology.TopologyBuilder;
public class StormTopologyDriver {
public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
//1. 描述任务
TopologyBuilder topologyBuilder = new TopologyBuilder();
//任务的名字自己定义
topologyBuilder.setSpout("mySpout", new MyLocalFileSpout());
//shuffleGrouping和前一个任务关联
topologyBuilder.setBolt("bolt1", new MySplitBolt()).shuffleGrouping("mySpout");
topologyBuilder.setBolt("bolt2", new MyWordCountAndPrintBolt()).shuffleGrouping("bolt1");
//2. 任务提交
//提交给谁?提交什么内容?
Config config = new Config();
StormTopology stormTopology = topologyBuilder.createTopology();
//本地模式
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("wordCount", config, stormTopology);
//集群模式
// StormSubmitter.submitTopology("worldCount1", config, stormTopology);
}
}
~~~
- linux
- 常用命令
- 高级文本命令
- 面试题
- redis
- String
- list
- hash
- set
- sortedSet
- 案例-推荐
- java高级特性
- 多线程
- 实现线程的三种方式
- 同步关键词
- 读写锁
- 锁的相关概念
- 多线程的join
- 有三个线程T1 T2 T3,保证顺序执行
- java五种线程池
- 守护线程与普通线程
- ThreadLocal
- BlockingQueue消息队列
- JMS
- 反射
- volatile
- jvm
- IO
- nio
- netty
- netty简介
- 案例一发送字符串
- 案例二发送对象
- 轻量级RPC开发
- 简介
- spring(IOC/AOP)
- spring初始化顺序
- 通过ApplicationContextAware加载Spring上下文
- InitializingBean的作用
- 结论
- 自定义注解
- zk在框架中的应用
- hadoop
- 简介
- hadoop集群搭建
- hadoop单机安装
- HDFS简介
- hdfs基本操作
- hdfs环境搭建
- 常见问题汇总
- hdfs客户端操作
- mapreduce工作机制
- 案列-单词统计
- 局部聚合Combiner
- 案列-流量统计(分区,排序,比较)
- 案列-倒排索引
- 案例-共同好友
- 案列-join算法实现
- 案例-求topN(分组)
- 自定义inputFormat
- 自定义outputFormat
- 框架运算全流程
- mapreduce的优化方案
- HA机制
- Hive
- 安装
- DDL操作
- 创建表
- 修改表
- DML操作
- Load
- insert
- select
- join操作
- 严格模式
- 数据类型
- shell参数
- 函数
- 内置运算符
- 内置函数
- 自定义函数
- Transform实现
- 特殊分割符处理
- 案例
- 级联求和accumulate
- flume
- 简介
- 安装
- 常用的组件
- 拦截器
- 案例
- 采集目录到HDFS
- 采集文件到HDFS
- 多个agent串联
- 日志采集和汇总
- 自定义拦截器
- 高可用配置
- 使用注意
- sqoop
- 安装
- 数据导入
- 导入数据到HDFS
- 导入关系表到HIVE
- 导入表数据子集
- 增量导入
- 数据导出
- 作业
- 原理
- azkaban
- 简介
- 安装
- 案例
- 简介
- command类型单一job
- command类型多job工作流flow
- HDFS操作任务
- mapreduce任务
- hive脚本任务
- hbase
- 简介
- 安装
- 命令行
- 基本CURD
- 过滤器查询
- 系统架构
- 物理存储
- 寻址机制
- 读写过程
- Region管理
- master工作机制
- 建表高级属性
- 与mapreduce结合
- 协处理器
- 点击流平台开发
- 简介
- storm
- 简介
- 安装
- 集群启动及任务过程分析
- 单词统计
- 并行度
- ACK容错机制
- ACK简介