企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
按照数据所携带的时间来划分窗口 ```java public class EventTimeSessionWindowAll { private static final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); private static final DataStreamSource<String> stream = env.socketTextStream("192.168.8.111", 8888); public static void main(String[] args) throws Exception { /** * 设置EventTime作为标准 */ env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); /** * 从数据中提取时间字段作为EventTime,不会改变原有数据的样子。 */ SingleOutputStreamOperator<String> dataStream = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(0)) { @Override public long extractTimestamp(String item) { // 1000,spark,1 String[] data = item.split(","); return Long.parseLong(data[0]); } }); SingleOutputStreamOperator<Tuple3> mapped = dataStream.map((MapFunction<String, Tuple3>) item -> { String[] data = item.split(","); return Tuple3.of(Long.parseLong(data[0]), data[1], Integer.valueOf(data[2])); }).returns(Types.TUPLE(Types.LONG, Types.STRING, Types.INT)); AllWindowedStream<Tuple3, TimeWindow> eventTimeSessionWindowAll = mapped.windowAll(EventTimeSessionWindows.withGap(Time.seconds(5))); eventTimeSessionWindowAll.sum(2).print(); env.execute("EventTimeSessionWindowAll"); } } ```