🔥码云GVP开源项目 12k star Uniapp+ElementUI 功能强大 支持多语言、二开方便! 广告
```java public class CountWindowAll { private static final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); private static final DataStreamSource<String> stream = env.socketTextStream("192.168.8.111", 8888); public static void main(String[] args) throws Exception { SingleOutputStreamOperator<Integer> mapped = stream.map((MapFunction<String, Integer>) Integer::valueOf).returns(Types.INT); AllWindowedStream<Integer, GlobalWindow> countWindowAll = mapped.countWindowAll(5); SingleOutputStreamOperator<Integer> summed = countWindowAll.sum(0); summed.print(); env.execute("CountWindowAll"); } } ``` `mapped.countWindowAll(5)` 窗口大小是`5`个数据。 ```java public class CountWindow { private static final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); private static final DataStreamSource<String> stream = env.socketTextStream("192.168.8.111", 8888); public static void main(String[] args) throws Exception { SingleOutputStreamOperator<Tuple2> mapped = stream.map((MapFunction<String, Tuple2>) item -> { String[] data = item.split(" "); return Tuple2.of(data[0], Integer.valueOf(data[1])); }).returns(Types.TUPLE(Types.STRING, Types.INT)); KeyedStream<Tuple2, Tuple> keyed = mapped.keyBy(0); WindowedStream<Tuple2, Tuple, GlobalWindow> countWindow = keyed.countWindow(5); SingleOutputStreamOperator<Tuple2> summed = countWindow.sum(1); summed.print(); env.execute("CountWindow"); } } ``` 分组后,每个组中的数据达到一定条数,`CountWindow`窗口才会被触发。