多应用+插件架构,代码干净,二开方便,首家独创一键云编译技术,文档视频完善,免费商用码云13.8K 广告
假定Session时间设为5秒,那么只要5秒内保持数据流入,那么就认为数据Session没有中断,否则就划分一个Session。 ``` 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 ``` 两个数据之间的距离不超过5秒,就在一个Session中。 ```java public class SessionWindowAll { private static final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); private static final DataStreamSource<String> stream = env.socketTextStream("192.168.8.111", 8888); public static void main(String[] args) throws Exception { SingleOutputStreamOperator<Integer> mapped = stream.map((MapFunction<String, Integer>) Integer::valueOf).returns(Types.INT); AllWindowedStream<Integer, TimeWindow> processingTimeSessionWindows = mapped.windowAll(ProcessingTimeSessionWindows.withGap(Time.seconds(5))); SingleOutputStreamOperator<Integer> summed = processingTimeSessionWindows.sum(0); summed.print(); env.execute("SessionWindowAll"); } } ``` 换句话说,输入一个数据超过`5`秒,就会计算结果。