### reduce:减少、降低、缩小、减低、削减、缩减、压缩、简化、裁减、精简、简约 ...
总之,reduce就是由多变少的意思。
```java
public interface ReduceFunction<T> extends Function, Serializable {
T reduce(T value1, T value2) throws Exception;
}
```
`reduce`方法,传入`value1`和`value2`两个相同类型的值,传出去一个同类型的值。所以,输入的多,输出的少。就是`reduce减少`的意思。
---
```java
public class WordCountStreamingJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> dataStreamSource = env.fromElements("Apollo", "Paul", "Tom", "Paul", "Apollo", "Tom", "Marry");
SingleOutputStreamOperator<Tuple2> mapped = dataStreamSource.map((MapFunction<String, Tuple2>) item -> Tuple2.of(item, 1)).returns(Types.TUPLE(Types.STRING, Types.INT));
KeyedStream<Tuple2, Tuple> keyed = mapped.keyBy(0);
SingleOutputStreamOperator<Tuple2> returns = keyed.reduce((ReduceFunction<Tuple2>) (value1, value2) -> Tuple2.of(value1.f0, (Integer) value1.f1 + (Integer) value2.f1)).returns(Types.TUPLE(Types.STRING, Types.INT));
returns.print();
env.execute("WordCountStreamingJob");
}
}
```
示例中,传入两个Tuple2<String,Integer>类型的元素,传出一个Tuple2<String,Integer>类型的元素。
其中,Tuple2.f0都一样,Tuple2.f1进行了相加,然后返回。
---
### Max与Min,分组后聚合函数。
```java
public class WordCountStreamingJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> dataStreamSource = env.fromElements("Apollo 1", "Paul 3", "Tom 2", "Paul 1", "Apollo 2", "Tom 4", "Marry 6");
SingleOutputStreamOperator<Tuple2> mapped = dataStreamSource.map(new MapFunction<String, Tuple2>() {
@Override
public Tuple2 map(String item) throws Exception {
return Tuple2.of(item.split(" ")[0], Integer.valueOf(item.split(" ")[1]));
}
}).returns(Types.TUPLE(Types.STRING, Types.INT));
KeyedStream<Tuple2, Tuple> keyed = mapped.keyBy(0);
SingleOutputStreamOperator<Tuple2> result = keyed.max(1);
result.print();
env.execute("WordCountStreamingJob");
}
}
```
SingleOutputStreamOperator<Tuple2> result = keyed.max(1); **根据第二个字段求最大值。**
- Flink简介
- flink搭建standalone模式与测试
- flink提交任务(界面方式)
- Flink项目初始化
- Java版WordCount(匿名类)
- Java版WordCount(lambda)
- Scala版WordCount
- Java版WordCount[批处理]
- Scala版WordCount[批处理]
- 流处理非并行的Source
- 流处理可并行的Source
- kafka的Source
- Flink算子(Map,FlatMap,Filter)
- Flink算子KeyBy
- Flink算子Reduce和Max与Min
- addSink自定义Sink
- startNewChain和disableChaining
- 资源槽slotSharingGroup
- 计数窗口
- 滚动窗口
- 滑动窗口
- Session窗口
- 按照EventTime作为标准