# **任务:**
currentTime=$(date "+%Y%m%d%H%M%S")
source /etc/profile &> /dev/null
FLINK\_HOME=/var/lib/hadoop-hdfs/softwares/flink
${FLINK\_HOME}/bin/flink run -t yarn-per-job \\
\--detached \\
\-c com.sobot.icall.analysis.job.ICallDataAnalysisJob \\
\-p 2 \\
\-D taskmanager.numberOfTaskSlots=1 \\
\-D yarn.appmaster.vcores=1 \\
\-D yarn.containers.vcores=1 \\
\-D yarn.application.name=ICallDataAnalysisJob \\
\-D taskmanager.memory.process.size=1024m \\
\-D jobmanager.memory.process.size=1024m \\
\-s hdfs://cdh.test.ten.sobot.com:8020/data/flink/stream/icall-data-anasysis/savepoint/savepoint-92d9ba-63e37de2e4d1 \\
\--allowNonRestoredState \\
/var/lib/hadoop-hdfs/flinkJob/icall-data-analysis.jar \\
\--systemScheme hdfs://cdh.test.ten.sobot.com:8020 \\
\--kafkaParallelism 3 \\
\> /var/lib/hadoop-hdfs/flinkJob/icall-data-analysis/logs/icall-data-analysis-${currentTime}.log 2>&1 &
参数解释:
\-c == --class 指定jar包运行类
\-p == --parallelism 指定并行度
\-D 设定任务运行相关参数
\-s == --fromSavepoint 使用原有保存快照来重起工作
\--allowNonRestoredState只允许 job从状态快照(保存点或 checkpoints)启动,该快照包含在正在启动的 job中无处可还原的状态。换言之,一些 state被撤销了。
\--kafkaParallelism 3 写入Kafka并行度 ,jar包自定义传入参数
2>&1 : 对于command>a 2>&1这条命令,等价于command 1>a 2>&1
2就是标准错误,1是标准输出,那么这条命令不就是相当于把标准错误重定向到标准输出(0 表示stdin标准输入,1 表示stdout标准输出,2 表示stderr标准错误)
# **1、日志的配置:**
log4j-cli.properties:由 Flink 命令行客户端使用(例如 flink run)(不包括在集群上执行的代码)。这个文件是我们使用flink run提交任务时,任务提交到集群前打印的日志所需的配置。
log4j-session.properties:Flink 命令行客户端在启动 YARN 或 Kubernetes session 时使用(yarn-session.sh,kubernetes-session.sh)。
log4j.properties:作为 JobManager/TaskManager 日志配置使用(standalone 和 YARN 两种模式下皆使用)
日志文件可以设置滚动策略,具体设置如下:(log4j.properties)
\# Allows this configuration to be modified at runtime. The file will be checked every 60 seconds.
monitorInterval=60
\# 滚动日志的配置
\# This affects logging for both user code and Flink
rootLogger.level = DEBUG
rootLogger.appenderRef.rolling.ref = RollingFileAppender
\# Uncomment this if you want to \_only\_ change Flink's logging
#logger.flink.name = org.apache.flink
#logger.flink.level = INFO
logger.sobot.name = com.sobot
logger.sobot.level = INFO
logger.sobot.additivity = false
\# The following lines keep the log level of common libraries/connectors on
\# log level INFO. The root logger does not override this. You have to manually
\# change the log levels here.
logger.akka.name = akka
logger.akka.level = INFO
logger.kafka.name= org.apache.kafka
logger.kafka.level = INFO
logger.hadoop.name = org.apache.hadoop
logger.hadoop.level = INFO
logger.zookeeper.name = org.apache.zookeeper
logger.zookeeper.level = INFO
\# Log all infos in the given rolling file
appender.rolling.name = RollingFileAppender
appender.rolling.type = RollingFile
appender.rolling.append = false
#日志文件名
appender.rolling.fileName = ${sys:log.file}
#指定当发生文件滚动时,文件重命名规则
appender.rolling.filePattern = ${sys:log.file}.%i
appender.rolling.layout.type = PatternLayout
\# 输出模板
appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
\# 指定记录文件的保存策略,该策略主要是完成周期性的日志文件保存工作
appender.rolling.policies.type = Policies
\# 基于日志文件大小的触发策略
appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
\# 当日志文件大小大于size指定的值时,触发滚动
appender.rolling.policies.size.size = 5M
\# 文件保存的覆盖策略
appender.rolling.strategy.type = DefaultRolloverStrategy
\# 生成分割(保存)文件的个数,默认为5(-1,-2,-3,-4,-5)
appender.rolling.strategy.max = 10
\# Suppress the irrelevant (wrong) warnings from the Netty channel handler
logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
logger.netty.level = OFF
## **2、yarn中需要占用多少资源是怎么决定的:**
给定参数,指定taskmanager拥有多少slot和内存,然后在根据任务的共享分组数和算子的并行度来分配资源,算子未设置分组和并行度就采用全局默认值。同一分组可以共用slot,算子并行度要小于等于分组内slot数量,否则重分配slot给该分组。任务链条的分配与前后并行度不同和shuffle操作有关。
## **3、Flink UI中需要使用启动几个Slot是如何决定的:**
Slot数量与任务并行度和设置的共享组有关,同一任务在同一共享组可以共用slot,且该共享组solt数量与它运行的任务并行度相等,不同共享组需要不同solt,一个处理流程可能不同算子设置不同共享组,那么slot总数是共享组solt数之和。
## **4、任务提交(on yarn)流程:**
![](images/screenshot_1639971129678.png)
1.Client先上传flink任务所需jar包至HDFS,便于JobManager和TaskManager共享HDFS的数据。
2.Client接着向Yarn ResourceManager 提交任务,ResouceManager接到请求后,先分配container资源,然后选取某一个NodeManager启动ApplicationMaster。
3.ApplicationMaster启动后加载flink的jar包和配置构建环境,然后启动JobManager,然后JobManager会分析当前的作业图,将它转化成执行图(包含了所有可以并发执行的任务),从而知道当前需要的具体资源。
4.JobManager会向ResourceManager申请资源,ResouceManager接到请求后,继续分配container资源,然后通知ApplictaionMaster启动更多的TaskManager(先分配好container资源,再到指定NodeManager启动TaskManager)。NodeManager在启动TaskManager时也会从HDFS加载数据。
5.最后,TaskManager启动后,会向JobManager发送心跳包。JobManager向TaskManager分配任务。
# **5、状态后端:**
## **State Backends 的作用**
有状态的流计算是Flink的一大特点,状态本质上是数据,数据是需要维护的,例如数据库就是维护数据的一种解决方案。State Backends 的作用就是用来维护State的。一个 State Backend 主要负责两件事:Local State Management(本地状态管理) 和 Remote State Checkpointing(远程状态备份)。
## **Local State Management(本地状态管理)**
State Management 的主要任务是确保状态的更新和访问。类似于数据库系统对数据的管理,State Backends 的状态管理就是提供对 State 的访问或更新操作,从这一点上看,State Backends 与数据库很相似。Flink 提供的 State Backends 主要有两种形式的状态管理:
·直接将 State 以对象的形式存储到JVM的堆上面。
·将 State 对象序列化后存储到 RocksDB 中(RocksDB会写到本地的磁盘上)
以上两种方式,第一种存储到JVM堆中,因为是在内存中读写,延迟会很低,但State的大小受限于内存的大小;第二种方式存储到State Backends上(本地磁盘上),读写较内存会慢一些,但不受内存大小的限制,同时因为state存储在磁盘上,可以减少应用程序对内存的占用。根据使用经验,对延迟不是特别敏感的应用,选择第二种方式较好,尤其是State比较大的情况下。
## **Remote State Checkpointing(远程状态备份)**
Flink程序是分布式运行的,而State都是存储到各个节点上的,一旦TaskManager节点出现问题,就会导致State的丢失。State Backend 提供了 State Checkpointing 的功能,将 TaskManager 本地的 State 的备份到远程的存储介质上,可以是分布式的存储系统或者数据库。不同的 State Backends 备份的方式不同,会有效率高低的区别。
## **FLink 目前提供了三种状态后端:**
### **1、MemoryStateBackend**
对于状态管理,MemoryStateBackend直接将State对象存储到TaskManager的JVM堆上,如MapState会被存储为一个HashMap对象。can suffer from garbage collection pauses because it puts many long-lived objects on the heap.
对于远程备份,MemoryStateBackend会将State备份到JobManager的堆内存上,这种方式是非常不安全的,且受限于JobManager的内存大小。
### **2、FsStateBackend**
对于状态管理,FsStateBackend与MemoryStateBackend一样,将State存储到TaskManager的JVM堆上。
对于远程备份,FsStateBackend会将State写入到远程的文件系统,如HDFS中。
### **3、RocksDBStateBackend**
对于状态管理,RocksDBStateBackend将state存储到TaskManager节点上的RocksDB数据库实例上。
对于远程备份,RocksDBstateBackend会将State备份到远程的存储系统中。
# **6、Flink Checkpoint 深入理解**
## **1、flink中state(状态)**
·state泛指:flink中有状态函数和运算符在各个元素(element)/事件(event)的处理过程中存储的数据(注意:状态数据可以修改和查询,可以自己维护,根据自己的业务场景,保存历史数据或者中间结果到状态(state)中);
使用状态计算的例子:
·当应用程序搜索某些事件模式时,状态将存储到目前为止遇到的事件序列。
·在每分钟/小时/天聚合事件时,状态保存待处理的聚合。
·当在数据点流上训练机器学习模型时,状态保持模型参数的当前版本。
·当需要管理历史数据时,状态允许有效访问过去发生的事件。
## **为什么需要state管理**
流式作业的特点是7\*24小时运行,数据不重复消费,不丢失,保证只计算一次,数据实时产出不延迟,但是当状态很大,内存容量限制,或者实例运行奔溃,或需要扩展并发度等情况下,如何保证状态正确的管理,在任务重新执行的时候能正确执行,状态管理就显得尤为重要。
## **2、什么是checkpoint**
·checkpoint机制是Flink可靠性的基石,可以保证Flink集群在某个算子因为某些原因(如 异常退出)出现故障时,能够通过checkpoint将整个应用流图的状态恢复到故障之前的某一状态,保证应用流图状态的一致性。Flink的checkpoint机制原理来自“Chandy-Lamport algorithm”算法。 (分布式快照算)
·每个需要checkpoint的应用在启动时,Flink的JobManager为其创建一个 CheckpointCoordinator,CheckpointCoordinator全权负责本应用的快照制作。
## **3、checkpoint的过程**
1.JobManager端的 CheckPointCoordinator向 所有SourceTask发送CheckPointTrigger,Source Task会在数据流中安插CheckPoint barrier
2.当task收到所有的barrier后,向自己的下游继续传递barrier,然后自身执行快照,并将自己的状态异步写入到持久化存储中。增量CheckPoint只是把最新的一部分更新写入到 外部存储;为了下游尽快做CheckPoint,所以会先发送barrier到下游,自身再同步进行快照
3.当task完成备份后,会将备份数据的地址(state handle)通知给JobManager的CheckPointCoordinator;如果CheckPoint的持续时长超过 了CheckPoint设定的超时时间,CheckPointCoordinator 还没有收集完所有的 State Handle,CheckPointCoordinator就会认为本次CheckPoint失败,会把这次CheckPoint产生的所有 状态数据全部删除。
4.最后 CheckPoint Coordinator 会把整个 StateHandle 封装成 completed CheckPoint Meta,写入到hdfs或其他文件系统。
## **4、从checkpoint的恢复**
当我们任务执行过程中出现失败后仍能从设定的checkpoint地址恢复任务失败前状态。默认情况下,如果设置了Checkpoint选项,则Flink只保留最近成功生成的1个Checkpoint,而当Flink程序失败时,可以从最近的这个Checkpoint来进行恢复。如果我们希望保留多个Checkpoint,并能够根据实际需要选择其中一个进行恢复,这样会更加灵活,比如,我们发现最近4个小时数据记录处理有问题,希望将整个状态还原到4小时之前。(但因有一定的时间间隔,每次checkpoint的间隙势必出现重复消费处理数据的情况,设置极短的间隙则会产生大量的check文件和I/O开销,增加执行任务负担,故要合理设置时间和保留适量check文件)
## **开启checkpoint例子:**
// 设置保存点的保存路径,这里是保存在hdfs中
env.setStateBackend(new FsStateBackend("hdfs://namenode01.td.com/flink-1.5.3/flink-checkpoints"));
CheckpointConfig config = env.getCheckpointConfig();
// 任务流取消和故障应保留检查点
config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN\_ON\_CANCELLATION);
// 保存点模式:exactly\_once
config.setCheckpointingMode(CheckpointingMode.EXACTLY\_ONCE);
// 触发保存点的时间间隔
config.setCheckpointInterval(60000);
## **为了确保作业在失败后能自动恢复,我们可以设置重启策略,例如失败后最多重启3次,每次重启间隔10s:**
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000));
## **那如果重启3次后仍失败呢?**
## **这个时候因为设置了任务完全失败后保留check文件**
// 任务流取消和故障应保留检查点 config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN\_ON\_CANCELLATION);
我们可以找到对应的check文件
Shell模式下:bin/flink run** \-s (指定文件地址如:hdfs://namenode01.td.com/flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-860/\_metadata)**
## **5、checkpoint与savepoint**
1、checkpoint的侧重点是“容错”,即Flink作业意外失败并重启之后,能够直接从早先打下的checkpoint恢复运行,且不影响作业逻辑的准确性。而savepoint的侧重点是“维护”,即Flink作业需要在人工干预下手动重启、升级、迁移或A/B测试时,先将状态整体写入可靠存储,维护完毕之后再从savepoint恢复现场。
2、savepoint是“通过checkpoint机制”创建的,所以savepoint本质上是特殊的checkpoint。
3、checkpoint面向Flink Runtime本身,由Flink的各个TaskManager定时触发快照并自动清理,一般不需要用户干预;savepoint面向用户,完全根据用户的需要触发与清理。
4、checkpoint的频率往往比较高(因为需要尽可能保证作业恢复的准确度),所以checkpoint的存储格式非常轻量级,但作为trade-off牺牲了一切可移植(portable)的东西,比如不保证改变并行度和升级的兼容性。savepoint则以二进制形式存储所有状态数据和元数据,执行起来比较慢而且“贵”,但是能够保证portability,如并行度改变或代码升级之后,仍然能正常恢复。
5、checkpoint是支持增量的(通过RocksDB),特别是对于超大状态的作业而言可以降低写入成本。savepoint并不会连续自动触发,所以savepoint没有必要支持增量。
## **6、什么是barrier对齐与不对齐**
![](images/screenshot_1639971438145.png)
·一旦Operator从输入流接收到CheckPoint barrier n,它就不能处理来自该流的任何数据记录,直到它从其他所有输入接收到barrier n为止。否则,它会混合属于快照n的记录和属于快照n + 1的记录;
·上图中第2个图,虽然数字流对应的barrier n已经到达了,但是barrier n之后的1、2、3这些数据只能放到buffer中,等待字母流的barrier n到达;
·一旦最后所有输入流都接收到barrier n,Operator就会把缓冲区中pending 的输出数据发出去,然后把CheckPoint barrier n+1接着往下游发送。
·这里还会对自身进行快照;之后,Operator将继续处理来自所有输入流的记录,在处理来自流的记录之前先处理来自输入缓冲区的记录。
·上述图2中,当还有其他输入流的barrier还没有到达时,会把已到达的barrier之后的数据1、2、3搁置在缓冲区,等待其他流的barrier到达后才能处理,这便是barrier对齐了。
·barrier不对齐就是指当还有其他流的barrier还没到达时,为了不影响性能,也不用理会,直接处理barrier之后的数据。等到所有流的barrier的都到达后,就可以对该Operator做CheckPoint了;
## **为什么要进行barrier对齐?不对齐到底行不行?**
答:Exactly Once(精准一次)时必须barrier对齐,barrier不对齐就变成了At Least Once(至少一次);
**详情来源:**[**https://blog.csdn.net/qq\_43081842/article/details/112161557**](https://blog.csdn.net/qq_43081842/article/details/112161557)
# 7、**算子相关**
### **Dataset与Dataframe很多算子相同,不过一个偏向批处理,一个则是流处理。**
**流处理是很难对全部数据进行全局操作的,除开窗或处理离线数据即批处理。**
## **Source 算子:**
**fromCollection**:从本地集合读取数据,参数可以是可迭代的对象或能转迭代的集合类。(流批处理)
**fromElements**: 直接按个的输入任意类型数据,可以用作简单示例。(流批处理)
**fromSource**: 指定数据来源,可以是数据库,消息队列,socket等(原:addSource)(流批处理)
**readTextFile**: 从指定文件路径读取文件。可以是hdfs或本地等等。(流批处理)
**readFile**: 直接读取文件,需要提前获取文件当参数。(流批处理)
## **Transform转换算子:**
**Map**: map是最基本的数据转换算子,可以用清洗数据与转换。参数可以是自定义实现的MapFunction或匿名(Lamaba)函数。(流批处理)
**Filter**: filter是过滤筛选算子,可以用作筛选符合要求的数据。参数可以是自定义实现的FilterFunct或匿名(Lamaba)函数。(流批处理)
**FlatMap**: 与map算子类似,属于转换算子,用于进行数据扁平化(降维)处理。常作为多维数据转一维且也能起到数据过滤或减少的作用。参数如以上......(流批处理)
**mapPartition**: 同map功能,不同在于它按分区来组织数据放入迭代器传给一个函数处理。map则是每个数据应用一次函数处理。优点是减少开辟函数,缺点是数据收集到迭代器需要很大的内存空间,容易OOM。(批处理
**Reduce**: 可以对一个dataset或者一个group指定字段来进行聚合计算,最终同字段聚合成一个元素。可以用作数据统计或合并。(Key流批处理)
**Fold**: 具有初始值的被Keys化数据流上的“滚动”折叠。将当前数据元与最后折叠的值组合并发出新值 (Key流处理)
**ReduceGroup**: 同Reduce算子,不同点在于它会先在各计算(task)节点上进行分组reduce,在将所有数据做整体reduce。这样做的好处就是可以减少网络IO。(批处理)
**KeyBy**: 逻辑上将流分区为不相交的分区。具有相同Keys的所有记录都分配给同一分区。在内部,keyBy是使用散列分区实现的。指定键有不同的方法。(流批处理)
**SortPartition**: 根据指定的字段值进行分区的排序,会将同key放至同分区。(批处理)
**minBy和maxBy**: 选择具有最小值或最大值的元素。(Key流批处理)
**Distinct**: 去除重复的数据(批处理)
**First:** 返回前n条数据(批处理)
**Join**: 将两个DataSet按照一定条件连接到一起,形成新的DataSet,类似表关联。还有leftOuterJoin:左外连接,
**rightOuterJoin**:右外连接,fullOuterJoin:全连接 (批处理)
**Cross**: 交叉操作,通过形成这个数据集和其他数据集的笛卡尔积,创建一个新的数据集和join类似,但是这种交叉操作会产生笛卡尔积,在数据比较大的时候,是非常消耗内存的操作。(批处理)
**Window**: 可以在已经分区的KeyedStream上定义Windows。Windows根据某些特征(例如,在最后5秒内到达的数据)对每个Keys中的数据进行分窗。(Key流处理)
**WindowAll**: Windows可以在常规DataStream上定义。Windows根据某些特征(例如,在最后5秒内到达的数据)对所有流事件进行分窗。(流处理)
**Connect**: “连接”两个保存其类型的数据流。连接允许两个流之间的共享状态(流批处理)
**Select**: 从可拆分流中选择一个或多个流(流批处理)
**Union**: 联合操作,创建包含来自该数据集和其他数据集的元素的新数据集,不会去重(流批处理)
**Rebalance**: 数据重分区,内部使用round robin方法将数据均匀打散。这对于数据倾斜时是很好的选择。(流批处理)
## **Sink算子:**
**Collect**: 将数据输出到本地集合(批处理)
**writeAsText**: 将数据输出到文件,包括本地文件,hdfs文件等(流批处理)
**addSink**: 添加sink接收数据,可以自定义接收器或使用指定sinkUtils。如JdbcSink...(流批处理)
**sinkTo**: 给定一个已经构建好的sink。(流批处理)
# **8、Flink时间机制**
Flink在流处理程序支持不同的时间概念。分别为Event Time/Processing Time/Ingestion Time,也就是事件时间、处理时间、提取时间。
从时间序列角度来说,发生的先后顺序是:
**事件时间(Event Time)----> 摄入时间(Ingestion Time)----> 处理时间(Processing Time)**
**处理时间**
是数据流入到具体某个算子时候相应的系统时间。
这个系统时间指的是执行相应操作的机器的系统时间。当一个流程序通过处理时间来运行时,所有基于时间的操作(如: 时间窗口)将使用各自操作所在的物理机的系统时间。
ProcessingTime 有最好的性能和最低的延迟。但在分布式计算环境或者异步环境中,ProcessingTime具有不确定性,相同数据流多次运行有可能产生不同的计算结果。因为它容易受到从记录到达系统的速度(例如从消息队列)到记录在系统内的operator之间流动的速度的影响(停电,调度或其他)。
**提取时间**
IngestionTime是数据进入Apache Flink框架的时间,是在Source Operator中设置的。每个记录将源的当前时间作为时间戳,并且后续基于时间的操作(如时间窗口)引用该时间戳。
提取时间在概念上位于事件时间和处理时间之间。与处理时间相比,它稍早一些。IngestionTime与ProcessingTime相比可以提供更可预测的结果,因为IngestionTime的时间戳比较稳定(在源处只记录一次),所以同一数据在流经不同窗口操作时将使用相同的时间戳,而对于ProcessingTime同一数据在流经不同窗口算子会有不同的处理时间戳。
与事件时间相比,提取时间程序无法处理任何无序事件或后期数据,但程序不必指定如何生成水位线。
在内部,提取时间与事件时间非常相似,但具有自动时间戳分配和自动水位线生成功能。
**事件时间**
事件时间就是事件在真实世界的发生时间,即每个事件在产生它的设备上发生的时间(当地时间)。比如一个点击事件的时间发生时间,是用户点击操作所在的手机或电脑的时间。
在进入Apache Flink框架之前EventTime通常要嵌入到记录中,并且EventTime也可以从记录中提取出来。在实际的网上购物订单等业务场景中,大多会使用EventTime来进行数据计算。
基于事件时间处理的强大之处在于即使在乱序事件,延迟事件,历史数据以及从备份或持久化日志中的重复数据也能获得正确的结果。对于事件时间,时间的进度取决于数据,而不是任何时钟。
事件时间程序必须指定如何生成事件时间的Watermarks,这是表示事件时间进度的机制。
现在假设我们正在创建一个排序的数据流。这意味着应用程序处理流中的乱序到达的事件,并生成同样事件但按时间戳(事件时间)排序的新数据流。
比如:
有1~10个事件。
乱序到达的序列是:1,2,4,5,6,3,8,9,10,7
经过按 事件时间 处理后的序列是:1,2,3,4,5,6,7,8,9,10
为了处理事件时间,Flink需要知道事件的时间戳,这意味着流中的每条数据都需要分配其事件时间戳。这通常通过提取每条数据中的固定字段来完成时间戳的获取。
# **9、Watermark机制**
**先看下flink是如何处理流式数据中的按时分段统计情况:**
**聚合类的处理:** Flink可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。所以Flink引入了窗口概念。
**窗口:** 窗口的作用为了周期性的获取数据。就是把传入的原始数据流切分成多个buckets,所有计算都在单一的buckets中进行。窗口(window)就是从 Streaming 到 Batch 的一个桥梁。
**带来的问题:** 聚合类处理带来了新的问题,比如乱序/延迟。其解决方案就是 Watermark / allowLateNess / sideOutPut 这一组合拳。
**三者的作用:**
**1、 Watermark **的作用是防止 数据乱序 / 指定时间内获取不到全部数据。
**2、 allowLateNess **是将窗口关闭时间再延迟一段时间。
**3、 sideOutPut**是最后兜底操作,当指定窗口已经彻底关闭后,就会把所有过期延迟数据放到侧输出流,让用户决定如何处理。
**一句话: 用Windows把流数据分块处理,用Watermark确定什么时候不再等待更早的数据/触发窗口进行计算,用allowLateNess 将窗口关闭时间再延迟一段时间。用sideOutPut 最后兜底把数据导出到其他地方。**
**三者的理解:**
**1、 Watermark** 标识的是当前流的时间段,可以通过在最大事件时间上设定延迟时间来改变窗口认为的当前流时间。**例如实际到达12.05的数据,设定延迟1分钟,那么对应窗口则认为流还处在12.04的时间段。只有当12.06事件数据到了后才认为这个窗口接收最后12.05的数据并触发窗口计算。**
**2、 allowLateNess **设置的时间是真正允许迟到的时间,**Watermark** 设定的延迟时间只是假设流可能会迟到,但认为延迟时间后流一定能到。**allowLateNess 允许在窗口收到对应Watermark** 要触发计算时仍保留一定时间的状态继续等待数据,超过等待时间后才开始真正计算。
**3、 sideOutPut **是最后兜底操作,当指定窗口已经彻底关闭后,就会把所有过期延迟数据放到侧输出流,让用户决定如何处理。
# **10、Flink迟到事件**
虽说水位线表明着早于它的事件不应该再出现,可以起到一定的数据乱序整理。但是上如上文所讲,接收到水位线以前的的消息是不可避免的,这就是所谓的迟到事件。实际上迟到事件是乱序事件的特例,和一般乱序事件不同的是它们的乱序程度超出了水位线的预计,导致窗口在它们到达之前已经关闭。
迟到事件出现时窗口已经关闭并产出了计算结果,因此处理的方法有3种:
·重新激活已经关闭的窗口并重新计算以修正结果。
·将迟到事件收集起来另外处理。
·将迟到事件视为错误消息并丢弃。
Flink 默认的处理方式是第3种直接丢弃,其他两种方式分别使用**Side Output**和**Allowed Lateness。**
Side Output机制可以将迟到事件单独放入一个数据流分支,这会作为 window 计算结果的副产品,以便用户获取并对其进行特殊处理。
Allowed Lateness机制允许用户设置一个允许的最大迟到时长。Flink 会在窗口关闭后一直保存窗口的状态直至超过允许迟到时长,这期间的迟到事件不会被丢弃,而是默认会触发窗口重新计算。因为保存窗口状态需要额外内存,并且如果窗口计算使用了 ProcessWindowFunction API 还可能使得每个迟到事件触发一次窗口的全量计算,代价比较大,所以允许迟到时长不宜设得太长,迟到事件也不宜过多,否则应该考虑降低水位线提高的速度或者调整算法。
# **11、旁路输出(sideOutPut)**
除了来自数据流算子的主流结果输出之外,可以产生任意数量的流旁路输出结果。旁路输出结果数据类型与主流结果的数据类型以及其他旁路输出结果数据类型可以是完全不同的。当你需要分割数据流时,这个算子非常有用。**通常需要复制流,然后从每个数据流中过滤掉不需要的数据。也常用来处理延迟乱序数据,通过截取延迟数据另外计算在合并到最终数据流并依照原有时间顺序排序。**
可以通过以下函数发射数据到旁路输出。
·[ ProcessFunction](https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/stream/operators/process_function.html)
·CoProcessFunction
·[ ProcessWindowFunction](#processwindowfunction)
·ProcessAllWindowFunction
例子:
val input: DataStream\[Int\] = ...
val outputTag = OutputTag\[String\]("side-output")
val mainDataStream = input
.process(new ProcessFunction\[Int, Int\] {
override def processElement(
value: Int,
ctx: ProcessFunction\[Int, Int\]#Context,
out: Collector\[Int\]): Unit = {
// emit data to regular output
out.collect(value)
// emit data to side output
ctx.output(outputTag, "sideout-" + String.valueOf(value))
}
})
要读取旁路输出流,在数据流运算后使用**getSideOutput(OutputTag)**。此时将会获得键入旁路输出流的结果。
val sideOutputStream: DataStream\[String\] = mainDataStream.getSideOutput(outputTag)
# **12、Flink窗口机制**
窗口分类可以分成:翻滚窗口(Tumbling Window,无重叠),滑动窗口(Sliding Window,有重叠),和会话窗口,(Session Window,有间隙)
**滚动窗口**
滚动窗口分配器将每个元素分配给固定窗口大小的窗口。滚动窗口大小固定的并且不重叠。例如,如果指定大小为5分钟的滚动窗口,则将执行当前窗口,并且每五分钟将启动一个新窗口。
![](images/screenshot_1639972321287.png)
**滑动窗口**
滑动窗口与滚动窗口的区别就是滑动窗口有重复的计算部分。
滑动窗口分配器将每个元素分配给固定窗口大小的窗口。类似于滚动窗口分配器,窗口的大小由窗口大小参数配置。另外一个窗口滑动参数控制滑动窗口的启动频率(how frequently a sliding window is started)。因此,如果滑动大小小于窗口大小,滑动窗可以重叠。在这种情况下,元素被分配到多个窗口。
例如,你可以使用窗口大小为10分钟的窗口,滑动大小为5分钟。这样,每5分钟会生成一个窗口,包含最后10分钟内到达的事件。
![](images/screenshot_1639972312665.png)
**会话窗口**
会话窗口分配器通过活动会话分组元素。与滚动窗口和滑动窗口相比,会话窗口不会重叠,也没有固定的开始和结束时间。相反,当会话窗口在一段时间内没有接收到元素时会关闭。
例如,不活动的间隙时。会话窗口分配器配置会话间隙,定义所需的不活动时间长度(defines how long is the required period of inactivity)。当此时间段到期时,当前会话关闭,后续元素被分配到新的会话窗口。
![](images/screenshot_1639972362641.png)
**更多相关:**
[https://www.cnblogs.com/rossiXYZ/p/12286407.html、https://mp.weixin.qq.com/s/S-RmP5OWiGqwn-C\_TZNO5A](https://www.cnblogs.com/rossiXYZ/p/12286407.html、https:/mp.weixin.qq.com/s/S-RmP5OWiGqwn-C_TZNO5A)
# **13、Flink异步IO**
**将Flink用于流计算时,若涉及到和外部系统进行交互,如利用Flink从数据库中读取数据,这种需要获取I/O的场景时,我们需要考虑交互所带来的时延问题。**
![](images/screenshot_1639972409072.png)
若图1虚线左侧所示,请求a发送到database后,MapFunction等待回复后才进行下发送下一个请求b,期间,I/O处于空闲状态,请求b又开始重复此过程,这样在两个来回的时间内(发送请求-收到结果为一个来回),只处理两个请求。如图1虚线右侧所示,同样是在两个来回的时间内,以异步的形式进行交互,请求a发出去后,在等待回复时,请求b,c,d依次发出,这样既可以处理4个请求了。(好比你煲饭时可以继续炒菜)
**在某些场景下,为了提高系统的吞吐能力,可以仅通过增大MapFunction的并发度以达目的,但是随之而来是资源的大量消耗且利用率仍然未提高。**
**注意点:**
1、**为了实现以异步I/O访问数据库或K/V存储,数据库等需要有能支持异步请求的client;若是没有,可以通过创建多个同步的client并使用线程池处理同步call的方式实现类似并发的client,但是这方式没有异步I/O的性能好。**
2、**AsyncFunction不是以多线程方式调用的,一个AsyncFunction实例按顺序为每个独立消息发送请求。只是不用一直等请求回应才能继续操作而已,可以通过请求回应后回调函数取回结果。**
**异步结果:**
**由于请求响应的快慢可能不一样,AsyncFunction的“并发”请求可能导致结果的乱序 。如图1中虚线右侧所示,若请求b发出之后,其结果在请求a的之前返回,这样异步I/O算子前后的消息顺序就不一致了。**
**为了控制结果的返回顺序,Flink提供了两种模式:**
1)**Unordered**:当异步的请求完成时,其结果立马返回,不考虑结果顺序即乱序模式。当以processing time作为时间属性时,该模式可以获得最小的延时和最小的开销,使用方式:**AsyncDataStream.unorderedWait**(...);
2)**Ordered**:该模式下,消息在异步I/O算子前后的顺序一致,先请求的先返回,即有序模式。为实现有序模式,算子将请求返回的结果放入缓存,直到该请求之前的结果全部返回或超时。该模式通常情况下回引入额外的时延以及在checkpoint过程中会带来开销,这是因为,和无序模式相比,消息和请求返回的结果都会在checkpoint的状态中维持更长时间。使用方式:**AsyncDataStream.orderedWai**t(...);
在此,我们需要针对流任务和**event time**相结合的情况进行补充说明。为什么?是因为**watermark**和消息的整体相对位置是不会变的,什么意思了?发生在某个**watermark**之后的消息,只能在**watermark**被发出之后发出,其请求结果也是。换句话说,两个**watermark**之间的消息整体与**watermark**是有序的。当然这个区间内消息之间是否有序这得根据使用的模式来分析。
1)对**Ordered**模式,因为消息本身是有序的,所以**watermark**和消息之间也是有序的,和**processing time**相比,其不需要引入额外的开销;
2)对**Unordered**模式,其模式是先响应先返回,但在与**event time**结合的情况里,消息或结果都需在特定**watermark**发出之后才能发出,此时,就会引入延时和开销,其开销的大小取决于**watermark**的频率
13、**FlinkCDC**
### **CDC (Change Data Capture)****:**简称 改变数据捕获
flink自己实现的实时同步MySQL的binlog日志,从而监测数据的变化与获取变化后的数据。能够较快的响应数据的变更,对于要求实时性通知变更的场景非常适用。原有方案是通过中间件kafka不断获取新数据来变更通知,CDC则帮助这个场景减少了对Kafka的依赖。
**前提MySQL授予flink使用的用户读取binlog权限**
# **14、Flink自定义数据类型TypeInformation**
![](images/screenshot_1639972442241.png)
![](images/screenshot_1639972433148.png)
# **15、Flink广播(变量)数据流**
创建广播流需要指定MapStateDescriptor(它描述了用于存储广播流名称与广播流本身的map 存储结构)
![](images/screenshot_1639972469684.png)
为了关联一个非广播流(keyed 或者 non-keyed)与一个广播流(BroadcastStream),我们可以调用非广播流的方法 connect(),并将 BroadcastStream 当做参数传入。 这个方法的返回参数是 BroadcastConnectedStream,具有类型方法 process(),传入一个特殊的 ProcessFunction 来书写我们的模式识别逻辑。 具体传入 process() 的是哪个类型取决于非广播流的类型:
* 如果流是一个 keyed 流,那就是 KeyedBroadcastProcessFunction 类型;
* 如果流是一个 non-keyed 流,那就是 BroadcastProcessFunction 类型。
* 注册一个定时器只能在 对应ProcessFunction 的 processElement() 方法中进行。 在 processBroadcastElement() 方法中不能注册定时器,因为广播的元素中并没有关联的 key。但广播流状态只能在processBroadcastElement()中进行改变,processElement()中只能读取。
![](images/screenshot_1639972489661.png)
![](images/screenshot_1639972500195.png)
广播的好处: 根据广播流或广播变量对主流数据进行相关更新等动态操作,并且广播流自身也是动态流数据,可以实现不同状态下的规则变更。
# **16、Flink广播与累加器区别**
广播变量只能在Driver端定义,不能在Executor端定义,在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值
累加器只能在Driver端定义赋予初始值,只能在在Driver端读取,在 Excutor 端更新
Flink广播变量,使用广播变量的好处:每个节点的executor有一个副本,不是每个task有一个副本,可以优化资源提高性能。是将一个公用的小数据集通过广播变量,发送到每个TaskManager中,作为公共只读变量使用,供每个task共享使用,以减少复制变量到每个task的次数,降低资源开销,从而提高性能。
累加器:累加器可以在各个executor之间共享,修改,需要注意,累加器只能在Driver端定义赋予初始值,只能在在Driver端读取,在 Excutor 端更新。Flink现有的内置累加器为以下几个:
IntCounter,LongCounter,DoubleCounter
# **17、**