🔥码云GVP开源项目 12k star Uniapp+ElementUI 功能强大 支持多语言、二开方便! 广告
```java public class TumblingWindowAll { private static final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); private static final DataStreamSource<String> stream = env.socketTextStream("192.168.8.111", 8888); public static void main(String[] args) throws Exception { SingleOutputStreamOperator<Integer> mapped = stream.map((MapFunction<String, Integer>) Integer::valueOf).returns(Types.INT); AllWindowedStream<Integer, TimeWindow> timeWindowAll = mapped.timeWindowAll(Time.seconds(5)); SingleOutputStreamOperator<Integer> summed = timeWindowAll.sum(0); summed.print(); env.execute("TumblingWindowAll"); } } ``` 窗口大小是5秒,时间间隔是5秒。换句话说,就是每隔五秒,统计五秒内的结果。 ```java public class TumblingWindow { private static final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); private static final DataStreamSource<String> stream = env.socketTextStream("192.168.8.111", 8888); public static void main(String[] args) throws Exception { SingleOutputStreamOperator<Tuple2> mapped = stream.map((MapFunction<String, Tuple2>) item -> { String[] data = item.split(" "); return Tuple2.of(data[0], Integer.valueOf(data[1])); }).returns(Types.TUPLE(Types.STRING, Types.INT)); KeyedStream<Tuple2, Tuple> keyed = mapped.keyBy(0); WindowedStream<Tuple2, Tuple, TimeWindow> timeWindow = keyed.timeWindow(Time.seconds(5)); SingleOutputStreamOperator<Tuple2> summed = timeWindow.sum(1); summed.print(); env.execute("TumblingWindow"); } } ``` 分组后,每隔五秒,统计一次各个分组的情况。若某些分组数据无变化,则不打印无变化的分组。 ```java public class TumblingProcessingTimeWindowsTest { private static final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); private static final DataStreamSource<String> stream = env.socketTextStream("192.168.8.111", 8888); public static void main(String[] args) throws Exception { SingleOutputStreamOperator<Tuple2> mapped = stream.map((MapFunction<String, Tuple2>) item -> { String[] data = item.split(" "); return Tuple2.of(data[0], Integer.valueOf(data[1])); }).returns(Types.TUPLE(Types.STRING, Types.INT)); KeyedStream<Tuple2, Tuple> keyed = mapped.keyBy(0); WindowedStream<Tuple2, Tuple, TimeWindow> timeWindow = keyed.window(TumblingProcessingTimeWindows.of(Time.seconds(5))); SingleOutputStreamOperator<Tuple2> summed = timeWindow.sum(1); summed.print(); env.execute("TumblingWindow"); } } ```