🔥码云GVP开源项目 12k star Uniapp+ElementUI 功能强大 支持多语言、二开方便! 广告
### reduce:减少、降低、缩小、减低、削减、缩减、压缩、简化、裁减、精简、简约 ... 总之,reduce就是由多变少的意思。 ```java public interface ReduceFunction<T> extends Function, Serializable { T reduce(T value1, T value2) throws Exception; } ``` `reduce`方法,传入`value1`和`value2`两个相同类型的值,传出去一个同类型的值。所以,输入的多,输出的少。就是`reduce减少`的意思。 --- ```java public class WordCountStreamingJob { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> dataStreamSource = env.fromElements("Apollo", "Paul", "Tom", "Paul", "Apollo", "Tom", "Marry"); SingleOutputStreamOperator<Tuple2> mapped = dataStreamSource.map((MapFunction<String, Tuple2>) item -> Tuple2.of(item, 1)).returns(Types.TUPLE(Types.STRING, Types.INT)); KeyedStream<Tuple2, Tuple> keyed = mapped.keyBy(0); SingleOutputStreamOperator<Tuple2> returns = keyed.reduce((ReduceFunction<Tuple2>) (value1, value2) -> Tuple2.of(value1.f0, (Integer) value1.f1 + (Integer) value2.f1)).returns(Types.TUPLE(Types.STRING, Types.INT)); returns.print(); env.execute("WordCountStreamingJob"); } } ``` 示例中,传入两个Tuple2<String,Integer>类型的元素,传出一个Tuple2<String,Integer>类型的元素。 其中,Tuple2.f0都一样,Tuple2.f1进行了相加,然后返回。 --- ### Max与Min,分组后聚合函数。 ```java public class WordCountStreamingJob { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<String> dataStreamSource = env.fromElements("Apollo 1", "Paul 3", "Tom 2", "Paul 1", "Apollo 2", "Tom 4", "Marry 6"); SingleOutputStreamOperator<Tuple2> mapped = dataStreamSource.map(new MapFunction<String, Tuple2>() { @Override public Tuple2 map(String item) throws Exception { return Tuple2.of(item.split(" ")[0], Integer.valueOf(item.split(" ")[1])); } }).returns(Types.TUPLE(Types.STRING, Types.INT)); KeyedStream<Tuple2, Tuple> keyed = mapped.keyBy(0); SingleOutputStreamOperator<Tuple2> result = keyed.max(1); result.print(); env.execute("WordCountStreamingJob"); } } ``` SingleOutputStreamOperator<Tuple2> result = keyed.max(1); **根据第二个字段求最大值。**