ThinkChat2.0新版上线,更智能更精彩,支持会话、画图、阅读、搜索等,送10W Token,即刻开启你的AI之旅 广告
[TOC] # 数据获取 ~~~ package com.jdxia.storm; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values; public class MySplitBolt extends BaseBasicBolt { //处理函数 @Override public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { //1. 数据用tuple获取 //和kafka接入,这边的名字就变为bytes了 byte[] juzi = (byte[]) tuple.getValueByField("bytes"); //2. 进行切割 String[] strings = new String(juzi).split(" "); //3. 发送数据 for (String string : strings) { basicOutputCollector.emit(new Values(string, 1)); } } //定义下我的输出 @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { outputFieldsDeclarer.declare(new Fields("word", "num")); } } ~~~ # 数据计算 ~~~ package com.jdxia.storm; import backtype.storm.task.TopologyContext; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Tuple; import redis.clients.jedis.Jedis; import java.util.HashMap; import java.util.Map; public class MyWordCountAndPrintBolt extends BaseBasicBolt { private Map<String, String> wordCountMap = new HashMap<String, String>(); private Jedis jedis; //初始化连接redis @Override public void prepare(Map stormConf, TopologyContext context) { //建立redis连接 jedis = new Jedis("0.0.0.0", 6379); jedis.auth("root"); //调用本来的方法 super.prepare(stormConf, context); } //处理函数 @Override public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { //根据之前定义的word和num //强转为string String word = (String) tuple.getValueByField("word"); Integer num = (Integer) tuple.getValueByField("num"); //1.查看单词对应的value是否存在 Integer integer = wordCountMap.get(word) == null ? 0 : Integer.parseInt(wordCountMap.get(word)); if (integer == 0) { //如果不存在就直接放入新的 wordCountMap.put(word, num + ""); } else { //如果之前已经有了,就把对应统计加上 wordCountMap.put(word, (integer + num) + ""); } //保存数据到redis // redis key wordCount:->Map jedis.hmset("wordCount",wordCountMap); } //不需要定义输出字段了 @Override public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { } } ~~~ # 任务提交 ~~~ package com.jdxia.storm; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.generated.StormTopology; import backtype.storm.topology.TopologyBuilder; import storm.kafka.KafkaSpout; import storm.kafka.SpoutConfig; import storm.kafka.ZkHosts; public class StormTopologyDriver { public static void main(String[] args) { //1. 描述任务 TopologyBuilder topologyBuilder = new TopologyBuilder(); //任务的名字自己定义 //kafka中第一个参数写broker对应的zk,第二个写topic,第三个写zk的节点,第四个写id //参数3:zkRoot将offset值存放在zk的哪里 //参数4:zk的子目录,防止被覆盖和其他人冲突 topologyBuilder.setSpout("kafkaSpout", new KafkaSpout(new SpoutConfig(new ZkHosts("master:2181"), "wordCount", "/wc", "wc"))); //shuffleGrouping和前一个任务关联.shuffleGrouping可以连接多个任务 topologyBuilder.setBolt("bolt1", new MySplitBolt()).shuffleGrouping("kafkaSpout"); topologyBuilder.setBolt("bolt2", new MyWordCountAndPrintBolt()).shuffleGrouping("bolt1"); //2. 任务提交 //提交给谁?提交什么内容? Config config = new Config(); //Config类实际上是继承HashMap //设置在几个work上运行,也就是在几个jvm中运行,如果不指定,默认是在一个work中 // config.setNumWorkers(2); StormTopology stormTopology = topologyBuilder.createTopology(); //本地模式 LocalCluster localCluster = new LocalCluster(); localCluster.submitTopology("wordCount", config, stormTopology); //这种是集群模式 // StormSubmitter.submitTopology("worldCount1", config, stormTopology); } } ~~~ # 测试 我们创建对应的topic,然后往topic写入数据,数据用空格分开