```java
StreamExecutionEnvironment.getExecutionEnvironment();
```
根据环境判断是本地环境还是集群环境,来创建运行环境。
```java
DataStream<String> lines = env.socketTextStream("192.168.8.111", 8888);
```
DataStreamSource是DataStream的实现类。
![](https://img.kancloud.cn/ae/09/ae091933d705118a6f167f5353876d39_327x393.png)
DataStream是抽象的数据集,不实际装数据,只是数据集的描述。
通过转换方法可以被转换成其他的DataStream。
```java
DataStreamSource<String> lines = env.fromElements();
```
fromElements方法,通常用来做实验的。(这只是一个玩具 ^_^ )
```java
DataStreamSource<Integer> streamSource = env.fromElements(1, 2, 3, 4, 5, 6, 7, 8);
```
同理,fromCollection和fromElements方法类似,只不过它是个集合。
```java
DataStreamSource<Integer> streamSource = env.fromCollection(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8));
```
获取并行度 [getParallelism方法]
```java
streamSource.getParallelism()
```
```java
public class WordCountStreamingJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Integer> streamSource = env.fromCollection(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14));
/**
* fromCollection返回的DataStreamSource并行度为1
*/
System.out.println(streamSource.getParallelism());
SingleOutputStreamOperator<Integer> filtered = streamSource.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) throws Exception {
return value % 2 == 0;
}
});
/**
* filter返回的DataStreamSource并行度为12
*/
System.out.println(filtered.getParallelism());
filtered.print();
env.execute("WordCountStreamingJob");
}
}
```
并行度在程序执行前,程序已经知道了,以为它只是一个描述信息,已经知道了又几个并行。
```java
env.socketTextStream("192.168.8.111", 8888); // 并行度也为1
```
---
综上所述:
socketTextStream、fromElements、fromCollection返回DataStream的并行度默认均为1。
可以通过<u>setParallelism</u>方法进行设置并行度。
```java
filtered.setParallelism(6);
```
- Flink简介
- flink搭建standalone模式与测试
- flink提交任务(界面方式)
- Flink项目初始化
- Java版WordCount(匿名类)
- Java版WordCount(lambda)
- Scala版WordCount
- Java版WordCount[批处理]
- Scala版WordCount[批处理]
- 流处理非并行的Source
- 流处理可并行的Source
- kafka的Source
- Flink算子(Map,FlatMap,Filter)
- Flink算子KeyBy
- Flink算子Reduce和Max与Min
- addSink自定义Sink
- startNewChain和disableChaining
- 资源槽slotSharingGroup
- 计数窗口
- 滚动窗口
- 滑动窗口
- Session窗口
- 按照EventTime作为标准