💎一站式轻松地调用各大LLM模型接口,支持GPT4、智谱、星火、月之暗面及文生图 广告
```java public class SlidingWindowAll { private static final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); private static final DataStreamSource<String> stream = env.socketTextStream("192.168.8.111", 8888); public static void main(String[] args) throws Exception { SingleOutputStreamOperator<Integer> mapped = stream.map((MapFunction<String, Integer>) Integer::valueOf).returns(Types.INT); AllWindowedStream<Integer, TimeWindow> timeWindowAll = mapped.timeWindowAll(Time.seconds(5), Time.seconds(1)); SingleOutputStreamOperator<Integer> summed = timeWindowAll.sum(0); summed.print(); env.execute("SlidingWindowAll"); } } ``` `mapped.timeWindowAll(Time.seconds(5), Time.seconds(1));` 每隔一秒,计算出五秒内的数据。 ```java public class SlidingWindow { private static final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); private static final DataStreamSource<String> stream = env.socketTextStream("192.168.8.111", 8888); public static void main(String[] args) throws Exception { SingleOutputStreamOperator<Tuple2> mapped = stream.map((MapFunction<String, Tuple2>) item -> { String[] data = item.split(" "); return Tuple2.of(data[0], Integer.valueOf(data[1])); }).returns(Types.TUPLE(Types.STRING, Types.INT)); KeyedStream<Tuple2, Tuple> keyed = mapped.keyBy(0); WindowedStream<Tuple2, Tuple, TimeWindow> timeWindow = keyed.timeWindow(Time.seconds(5), Time.seconds(1)); SingleOutputStreamOperator<Tuple2> summed = timeWindow.sum(1); summed.print(); env.execute("SlidingWindow"); } } ``` 这是分组的情况。 timeWindow方法,传递一个参数,是滚动窗口。传入两个窗口,是滑动窗口。 也可以用Window方法: ```java public class SlidingProcessingTimeWindowsTest { private static final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); private static final DataStreamSource<String> stream = env.socketTextStream("192.168.8.111", 8888); public static void main(String[] args) throws Exception { SingleOutputStreamOperator<Tuple2> mapped = stream.map((MapFunction<String, Tuple2>) item -> { String[] data = item.split(" "); return Tuple2.of(data[0], Integer.valueOf(data[1])); }).returns(Types.TUPLE(Types.STRING, Types.INT)); KeyedStream<Tuple2, Tuple> keyed = mapped.keyBy(0); WindowedStream<Tuple2, Tuple, TimeWindow> timeWindow = keyed.window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1))); SingleOutputStreamOperator<Tuple2> summed = timeWindow.sum(1); summed.print(); env.execute("SlidingProcessingTimeWindowsTest"); } } ```