ThinkChat2.0新版上线,更智能更精彩,支持会话、画图、阅读、搜索等,送10W Token,即刻开启你的AI之旅 广告
```java StreamExecutionEnvironment.getExecutionEnvironment(); ``` 根据环境判断是本地环境还是集群环境,来创建运行环境。 ```java DataStream<String> lines = env.socketTextStream("192.168.8.111", 8888); ``` DataStreamSource是DataStream的实现类。 ![](https://img.kancloud.cn/ae/09/ae091933d705118a6f167f5353876d39_327x393.png) DataStream是抽象的数据集,不实际装数据,只是数据集的描述。 通过转换方法可以被转换成其他的DataStream。 ```java DataStreamSource<String> lines = env.fromElements(); ``` fromElements方法,通常用来做实验的。(这只是一个玩具 ^_^ ) ```java DataStreamSource<Integer> streamSource = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8); ``` 同理,fromCollection和fromElements方法类似,只不过它是个集合。 ```java DataStreamSource<Integer> streamSource = env.fromCollection(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8)); ``` 获取并行度 [getParallelism方法] ```java streamSource.getParallelism() ``` ```java public class WordCountStreamingJob { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStreamSource<Integer> streamSource = env.fromCollection(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14)); /** * fromCollection返回的DataStreamSource并行度为1 */ System.out.println(streamSource.getParallelism()); SingleOutputStreamOperator<Integer> filtered = streamSource.filter(new FilterFunction<Integer>() { @Override public boolean filter(Integer value) throws Exception { return value % 2 == 0; } }); /** * filter返回的DataStreamSource并行度为12 */ System.out.println(filtered.getParallelism()); filtered.print(); env.execute("WordCountStreamingJob"); } } ``` 并行度在程序执行前,程序已经知道了,以为它只是一个描述信息,已经知道了又几个并行。 ```java env.socketTextStream("192.168.8.111", 8888); // 并行度也为1 ``` --- 综上所述: socketTextStream、fromElements、fromCollection返回DataStream的并行度默认均为1。 可以通过<u>setParallelism</u>方法进行设置并行度。 ```java filtered.setParallelism(6); ```