```java
DataStreamSource<Long> streamSource = env.fromParallelCollection(new NumberSequenceIterator(1, 10), Long.class);
```
```java
DataStreamSource<Long> streamSource = env.fromParallelCollection(new NumberSequenceIterator(1, 10), TypeInformation.of(Long.TYPE));
```
查看并行度
```java
System.out.println(streamSource.getParallelism()); // 并行度是 12
```
可以设置并行度
```java
streamSource.setParallelism(4);
```
1到100的序列
```java
DataStreamSource<Long> streamSource = env.generateSequence(1, 100);
```
generateSequence得到的DataStreamSource的并行度是12。
以上的Source都是玩具,实际上不会使用的。
```java
env.readTextFile("C:\\Users\\Administrator\\Desktop\\data");
```
readTextFile的并行度是12。
```java
public class WordCountStreamingJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> streamSource = env.readTextFile("C:\\Users\\Administrator\\Desktop\\data");
System.out.println(streamSource.getParallelism());
SingleOutputStreamOperator<Tuple2> wordAndOne = streamSource.flatMap((FlatMapFunction<String, Tuple2>) (line, out) -> {
for (String word : line.split(" ")) {
out.collect(Tuple2.of(word, 1));
}
}).returns(Types.TUPLE(Types.STRING, Types.INT));
System.out.println(wordAndOne.getParallelism());
wordAndOne.keyBy(0).sum(1).print();
env.execute("WordCountStreamingJob");
}
}
```
综上所述:
readTextFile、fromParallelCollection、generateSequence它们的并行度是跟可用的逻辑核数是相关的。可以多并行的。
readTextFile也不是可以一直运行的。socketTextStream是可以一直运行的,但是并行度是1。
- Flink简介
- flink搭建standalone模式与测试
- flink提交任务(界面方式)
- Flink项目初始化
- Java版WordCount(匿名类)
- Java版WordCount(lambda)
- Scala版WordCount
- Java版WordCount[批处理]
- Scala版WordCount[批处理]
- 流处理非并行的Source
- 流处理可并行的Source
- kafka的Source
- Flink算子(Map,FlatMap,Filter)
- Flink算子KeyBy
- Flink算子Reduce和Max与Min
- addSink自定义Sink
- startNewChain和disableChaining
- 资源槽slotSharingGroup
- 计数窗口
- 滚动窗口
- 滑动窗口
- Session窗口
- 按照EventTime作为标准