多应用+插件架构,代码干净,二开方便,首家独创一键云编译技术,文档视频完善,免费商用码云13.8K 广告
# keyBy ### 元组方式 ```java public class WordCountStreamingJob { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Integer> streamSource = env.fromCollection(Arrays.asList(1, 2, 3, 1, 2, 3, 1, 2, 3)); SingleOutputStreamOperator<Tuple2> mapped = streamSource.map(new MapFunction<Integer, Tuple2>() { @Override public Tuple2 map(Integer item) throws Exception { return Tuple2.of(item, 1); } }).returns(Types.TUPLE(Types.INT, Types.INT)); KeyedStream<Tuple2, Tuple> stream = mapped.keyBy(0); stream.print(); env.execute("WordCountStreamingJob"); } } ``` ### 自定义Bean方式 ##### 定义一个`Bean`名称为`WordCount` ```java package com.gosuncn; public class WordCount { public Integer word; public Integer count; public WordCount() { } public WordCount(Integer word, Integer count) { this.word = word; this.count = count; } public static WordCount of(Integer word, Integer count) { return new WordCount(word, count); } @Override public String toString() { return "WordCount{" + "word=" + word + ", count=" + count + '}'; } } ``` ##### 通过`WordCount`中的`word`字段进行分组,不能使用脚标`0 1 2 3 ...`了。 ```java public class WordCountStreamingJob { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Integer> streamSource = env.fromCollection(Arrays.asList(1, 2, 3, 1, 2, 3, 1, 2, 3)); SingleOutputStreamOperator<WordCount> mapped = streamSource.map(new MapFunction<Integer, WordCount>() { @Override public WordCount map(Integer item) throws Exception { return WordCount.of(item, 1); } }).returns(WordCount.class); KeyedStream<WordCount, Tuple> stream = mapped.keyBy("word"); stream.print(); env.execute("WordCountStreamingJob"); } } ``` ### 多字段分组 ```java public class WordCountStreamingJob { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> streamSource = env.fromCollection(Arrays.asList("江苏 南京", "江苏 徐州", "湖南 长沙", "广东 广州", "辽宁 沈阳", "广东 广州", "湖南 长沙", "江苏 南京", "辽宁 沈阳", "湖南 张家界")); SingleOutputStreamOperator<Tuple3> mapped = streamSource.map(new MapFunction<String, Tuple3>() { @Override public Tuple3 map(String item) throws Exception { return Tuple3.of(item.split(" ")[0], item.split(" ")[1], 1); } }).returns(Types.TUPLE(Types.STRING, Types.STRING, Types.INT)); KeyedStream<Tuple3/*分组后的类型*/, Tuple/*分组的key的类型*/> keyed = mapped.keyBy(0, 1); SingleOutputStreamOperator<Tuple3> summed = keyed.sum(2); summed.print(); env.execute("WordCountStreamingJob"); } } ``` **同理,如果自定义Bean的话,不能用脚标了,要用Bean的字段名称。**