# keyBy
### 元组方式
```java
public class WordCountStreamingJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Integer> streamSource = env.fromCollection(Arrays.asList(1, 2, 3, 1, 2, 3, 1, 2, 3));
SingleOutputStreamOperator<Tuple2> mapped = streamSource.map(new MapFunction<Integer, Tuple2>() {
@Override
public Tuple2 map(Integer item) throws Exception {
return Tuple2.of(item, 1);
}
}).returns(Types.TUPLE(Types.INT, Types.INT));
KeyedStream<Tuple2, Tuple> stream = mapped.keyBy(0);
stream.print();
env.execute("WordCountStreamingJob");
}
}
```
### 自定义Bean方式
##### 定义一个`Bean`名称为`WordCount`
```java
package com.gosuncn;
public class WordCount {
public Integer word;
public Integer count;
public WordCount() {
}
public WordCount(Integer word, Integer count) {
this.word = word;
this.count = count;
}
public static WordCount of(Integer word, Integer count) {
return new WordCount(word, count);
}
@Override
public String toString() {
return "WordCount{" +
"word=" + word +
", count=" + count +
'}';
}
}
```
##### 通过`WordCount`中的`word`字段进行分组,不能使用脚标`0 1 2 3 ...`了。
```java
public class WordCountStreamingJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Integer> streamSource = env.fromCollection(Arrays.asList(1, 2, 3, 1, 2, 3, 1, 2, 3));
SingleOutputStreamOperator<WordCount> mapped = streamSource.map(new MapFunction<Integer, WordCount>() {
@Override
public WordCount map(Integer item) throws Exception {
return WordCount.of(item, 1);
}
}).returns(WordCount.class);
KeyedStream<WordCount, Tuple> stream = mapped.keyBy("word");
stream.print();
env.execute("WordCountStreamingJob");
}
}
```
### 多字段分组
```java
public class WordCountStreamingJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> streamSource = env.fromCollection(Arrays.asList("江苏 南京", "江苏 徐州", "湖南 长沙", "广东 广州", "辽宁 沈阳", "广东 广州", "湖南 长沙", "江苏 南京", "辽宁 沈阳", "湖南 张家界"));
SingleOutputStreamOperator<Tuple3> mapped = streamSource.map(new MapFunction<String, Tuple3>() {
@Override
public Tuple3 map(String item) throws Exception {
return Tuple3.of(item.split(" ")[0], item.split(" ")[1], 1);
}
}).returns(Types.TUPLE(Types.STRING, Types.STRING, Types.INT));
KeyedStream<Tuple3/*分组后的类型*/, Tuple/*分组的key的类型*/> keyed = mapped.keyBy(0, 1);
SingleOutputStreamOperator<Tuple3> summed = keyed.sum(2);
summed.print();
env.execute("WordCountStreamingJob");
}
}
```
**同理,如果自定义Bean的话,不能用脚标了,要用Bean的字段名称。**
- Flink简介
- flink搭建standalone模式与测试
- flink提交任务(界面方式)
- Flink项目初始化
- Java版WordCount(匿名类)
- Java版WordCount(lambda)
- Scala版WordCount
- Java版WordCount[批处理]
- Scala版WordCount[批处理]
- 流处理非并行的Source
- 流处理可并行的Source
- kafka的Source
- Flink算子(Map,FlatMap,Filter)
- Flink算子KeyBy
- Flink算子Reduce和Max与Min
- addSink自定义Sink
- startNewChain和disableChaining
- 资源槽slotSharingGroup
- 计数窗口
- 滚动窗口
- 滑动窗口
- Session窗口
- 按照EventTime作为标准