# **任务:**
#!/bin/bash
workPath=/data/bigdata/job/callLog-export
dataDate=`date +"%Y-%m-%d" -d "-1 days"`
if \[ -n "$1" \] ;then
dataDate=$1
fi
readFileName=/data/data\_center/cus/call\_log\_record/call\_log\_record-${dataDate}\*
resultDataDir=/tmp/sunzm/callLog-tmp/${dataDate}/
companyId=0302fbb6bf6b4fb1bad1416ab641202c
echo "处理的数据日期: $collechDate"
echo "读取的文件为: ${readFileName}"
echo "处理结果保存路径: ${resultDataDir}"
echo "处理的公司Id为: ${companyId}"
/var/lib/hadoop-hdfs/softwares/spark/bin/spark-submit \\
\-class com.sobot.offlineJobProcess.handler.temporary.job.hdfstocsv.HDFSDataToCSVJob \\
\-master spark://192.168.30.178:7077 \\
\-total-executor-cores 18 \\
\-executor-memory 4G \\
\-executor-cores 3 \\
\-driver-cores 3 \\
\-driver-memory 4G \\
\-conf spark.sql.shuffle.partitions=16 \\
\-conf spark.default.parallelism=16 \\
${workPath}/offlineJobProcess-es6-1.0-SNAPSHOT.jar \\
\-readFileName ${readFileName} \\
\-resultFileDir ${resultDataDir} \\
\-companyId ${companyId} \\
* \-companyIdFieldName${companyIdFieldName} \\
\-fields startTime,caller,voiceAliyunUrl \\
\-headers 通话开始时间,主叫号码,录音地址 \\
\-whereCondition callFlag=1 \\
\-selectExpr "date\_format(cast(startTime/1000 as timestamp), 'yyyy-MM-dd HH:mm:ss') AS startTime,caller,voiceAliyunUrl" \\
> ${workPath}/logs/callLog-filter.log
# **Spark任务提交:**
**StandaLone与Yarn的任务执行情况不同在于资源分配管理者不同,独立模式下是有Master负责管理,yarn模式是ResourceManager负责调度。**
1、当集群启动后,Worker会向Master汇报资源,然后Master就会掌握Worker的集群信息。
2、当启动集群后,NodeManager会向RsourceManager汇报资源,而RM就掌握了集群的资源。
任务提交模式有一种是client客户端,另一种是cluster集群方式,由shell命令指定模式。
## **Client:**
1.当在客户端提交SparkApplication时,Driver会在客户端启动,客户端会向RM申请启动ApplicationMaster。
2.RM 收到请求会向随机找一个满足资源的NM启动Application Master,AM启动后,会向RM申请资源用于启动executor,RM会返回一批NM节点给AM,AM收到返回结果后,会真正的向NM中去启动executor,每个executor中会有线程池。
3.executor启动后会向Driver注册,Driver会向executor发送task,并且监控task执行,收回task执行的结果。
它的问题在于:因为Driver运行在本地,Driver会与yarn集群中的Executor进行大量的通信,会造成客户机网卡流量的大量增加,可以在客户端看到task的执行和结果.Yarn-client模式同样是适用于测试。
## **Cluster:**
1.当客户端提交Spark Appliction。会向RM申请启动ApplictaionMaster,而RM会随机找到一个满足资源的NM去启动AM。
2.当AM启动之后它负责任务调度,所以这里就不启动Driver,而AM就相当于Driver一样的功能存在。
3.AM启动后会向RM申请启动Executor,每个Executor会由线程池,RM会返回一批满足资源的NM节点。
4.AM接收到返回结果会找到相应的NM,启动Executor,executor启动后会向AM注册,而AM会将task发送到executor去执行,并且监控task,回收task'处理的结果。
它的问题在于:Yarn-Cluster主要用于生产环境中,因为Driver运行在Yarn集群中某一台nodeManager中,每次提交任务的Driver所在的机器都是随机的,不会产生某一台机器网卡流量激增的现象,缺点是任务提交后不能看到日志。只能通过yarn查看日志。
# **spark算子相关**
## **UpdateStateByKey(基于磁盘读写)**
UpdateStateBykey会统计全局的key的状态,不管有没有数据输入,它会在每一个批次间隔返回之前的key的状态。updateStateBykey会对已存在的key进行state的状态更新,同时还会对每个新出现的key执行相同的更新函数操作。如果通过更新函数对state更新后返回来为none,此时刻key对应的state状态会删除(state可以是任意类型的数据结构)。
### **适用场景:**
UpdataStateBykey可以用来统计历史数据,每次输出所有的key值。列如统计不同时间段用户平均消费金额,消费次数,消费总额,网站的不同时间段的返回量等指标。
### **适用实例条件:**
1. 首先会以DStream中的数据进行按key做reduce操作,然后再对各个批次的数据进行累加。
2. updataStateByKey要求必须设置checkpoint点(设置中间结果文件夹)
3. updataStateByKey方法中updataFunc就要传入的参数,Seq\[V\]表示当前key对应的所有值,Option\[S\]是当前key的历史状态,返回的是新的封装的数据。
## **MapWithState(基于磁盘存储+缓存)**
mapWithState也是用于对于全局统计key的状态,但是它如果没有数据输入,便不会返回之前的key的状态,类型于增量的感觉。使用场景mapWithState可以用于一些实时性较高,延迟较少的一些场景,例如你在某宝上下单买了个东西,付款之后返回你账户里余额信息。
### **适用实例条件:**
1. 如果有初始化的值得需要,可以使用initialState(RDD)来初始化key的值
2. 还可以指定timeout函数,该函数的作用是,如果一个key超过timeout设定的时间没有更新值,那么这个key将会失效。这个控制需要在fun中实现,必须使用state.isTimingOut()来判断失效的key值。如果在失效时间之后,这个key又有新的值了,则会重新计算。如果没有使用isTimingOut,则会报错。3. checkpoint不会必须的
### **区别:**
updataeStateByKey可以在指定的批次间隔内返回之前的全部历史数据,包括新增的,改变的和没有改变的。由于updateStateByKey在使用的时候一定要做checkpoint,当数据量过大的时候,checkpoint会占据庞大的数据量,会影响性能,效率不高。
mapWithState只返回变化后的key的值,这样做的好处是,我们可以只关心那些已经发生的变化的key,对于没有数据输入,则不会返回那些没有变化的key 的数据。这样的话,即使数据量很大,checkpint也不会updateBykey那样,占用太多的存储,效率比较高(再生产环境中建议使用这个)。
详细使用:https://www.jianshu.com/p/a54b142067e5
## **Map和MapPartition的区别:**
map是对RDD的每一个元素使用一个方法操作,mapPartitions是对每个partition的迭代器使用一个方法操作。
## **MapPartitions的优点:**
使用MapPartitions操作之后,一个task仅仅会执行一次function,function一次接收所有的partition数据。只要执行一次就可以了,性能比较高。通常体现在map过程中需要频繁创建额外的对象(例如将rdd中的数据通过jdbc写入数据库,map需要为每个元素创建一个链接而mapPartition为每个partition创建一个链接),则mapPartitions效率比map高的多。SparkSql或DataFrame默认会对程序进行mapPartition的优化。
## **MapPartitions的缺点:**
如果是普通的map操作,一次function的执行就处理一条数据,可以将已经处理完的数据从内存里面释放掉。所以说普通的map操作通常不会导致内存的OOM异常。 但是MapPartitions操作,对于大量数据来说,如果直接将迭代器中数据取出来放内存,可能就OOM(内存溢出)。
## **Foreach和ForeachPartition的区别:**
### **foreachPartition:**
foreachPartition是spark-core的action算子,foreachPartition是对每个partition中的iterator分别处理,通过将iterator传入function对进行数据的处理,也就是说在foreachPartition中函数处理的是分区迭代器,而非具体的数据,源码中的注释是:Applies a function func to each parition of this RDD.(将函数func应用于此RDD的每个分区)
### **foreach:**
foreach也是spark-core的action算子,与foreachPartition类似的是,foreach也是对每个partition中的iterator分别处理,通过对每个iterator迭代获取数据传给function进行数据的处理,也就是说在foreach中函数处理的是具体的数据,源码中的注释是:Applies a function fun to all elements of this RDD.(将函数func用于此RDD的所有元素).
### **foreachRDD与上面两个的区别:**
foreachRDD是sparkStreaming的OutputOperation算子。但是foreachRDD并不会触发立即处理,必须在碰到sparkcore的foreach或者foreachPartition算子后,才会触发action动作。同时要注意,function的应用在的driver端进行,而不是Executor端进行。
## **GoupByKey和ReduceByKey的区别:**
![](images/screenshot_1639970836485.png) ![](images/screenshot_1639970842720.png)
### GroupByKey:只是将键相同的值给归纳到一个序列,没有其它函数操作。不同分区数据直接shuffle到新分区在聚合,不会先再shuffle前聚合,造成shuffle过程IO压力更大
ProcessRDD.GroupByKey()
### ReduceByKey:将键相同的值使用同一个函数聚合操作,先再shuffle前使用函数局部聚合,再shuffle到不同分区后继续使用函数聚合
ProcessRDD.ReduceByKey(“函数(U,U) => U”)
aggregateByKey:将键相同的值在两端使用不同函数操作,一端是shuffle前局部聚合使用的一个函数,另一端是shuffle后聚合使用的另一个函数,相对ReduceBykey可以提前指定初始值,并依据初始值类型返回值
### ProcessRDD.aggregateByKey("初始值K类型")(“函数1(K,U) => K”, “函数2(K,K) => K”)
**这三种都可以指定shuffle时的分区数,假如分区数不够,实际会出现不同键在同一物理分区,但spark处理时还是认为不同键是属于不同区的。既aggregateByKey初始值K是会给每个键初始时加上的。**
# **SparkSQL相关:**
## **SQL 解析:**
SQL Query,需要经过词法和语法解析,由字符串转换为,树形的抽象语法树。
* 通过遍历抽象语法树生成未解析的逻辑语法树(unresolved logic plan),对应SQL解析后的一种树形结构,本身不包含任务数据信息。
* 需要经过一次遍历之后,转换成成包含解析后的逻辑算子树(Analyzed LogicPlan),本身携带了各种信息。
* 最后经过优化后得到最终的逻辑语法树(Optimized LogicPlan)。
不管解析被划分为几步,在Spark 执行环境中,都要转化成RDD的调用代码,才能被spark core所执行,示意图如下:
## **创建视图:**
createOrReplaceTempView 的作用是创建一个临时的表 , 一旦创建这个表的会话关闭 , 这个表也会立马消失 其他的SparkSession 不能共享应已经创建的临时表createOrReplaceGlobalTempView创建一个全局的临时表 , 这个表的生命周期是 整个Spark应用程序 ,只要Spark 应用程序不关闭 , 那么. 这个临时表依然是可以使用的 ,并且这个表对其他的SparkSession共享(**要 global\_temp.‘tablename’ 使用**)
## **分组语句:**
GROUP BY : 使用时不像MySQL分组,MySQL可以返回不在group by条件中的列的第一条数据作为该列返回值,spark sql 和 hive sql 类似,不能这样随机返回,能返回聚合字段和聚合函数,但可以通过指定Frist(col)或者 Last(col)。
## **窗口函数:**
function OVER (PARITION BY … ORDER BY … FRAME\_TYPE BETWEEN … AND …)
function :对窗口内所有行都处理的函数
PARITION BY : 依据指定列进行分窗处理
ORDER BY : 窗口内依据指定字段排序
FRAME\_TYPE :FRAME是当前分区的一个子集,子句用来定义子集的规则,通常用来作为滑动窗口使用。主要用来控制每行数据在应用窗口函数时,这个窗口函数的作用范围。
(
FRAME\_TYPE rows 指定函数基于当前行的窗口范围
rows between …T… and …T…
T 如下:
unbounded preceding 前面所有行
unbounded following 后面所有行
current row 当前行
n following 后面n行
n preceding 前面n行
)