企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
# **任务:** #!/bin/bash workPath=/data/bigdata/job/callLog-export dataDate=`date  +"%Y-%m-%d" -d  "-1 days"` if \[ -n "$1" \] ;then dataDate=$1 fi readFileName=/data/data\_center/cus/call\_log\_record/call\_log\_record-${dataDate}\* resultDataDir=/tmp/sunzm/callLog-tmp/${dataDate}/ companyId=0302fbb6bf6b4fb1bad1416ab641202c echo "处理的数据日期: $collechDate" echo "读取的文件为: ${readFileName}" echo "处理结果保存路径: ${resultDataDir}" echo "处理的公司Id为: ${companyId}" /var/lib/hadoop-hdfs/softwares/spark/bin/spark-submit \\ \-class com.sobot.offlineJobProcess.handler.temporary.job.hdfstocsv.HDFSDataToCSVJob \\ \-master spark://192.168.30.178:7077 \\ \-total-executor-cores 18 \\ \-executor-memory 4G \\ \-executor-cores 3 \\ \-driver-cores 3 \\ \-driver-memory 4G \\ \-conf spark.sql.shuffle.partitions=16 \\ \-conf spark.default.parallelism=16 \\ ${workPath}/offlineJobProcess-es6-1.0-SNAPSHOT.jar \\ \-readFileName ${readFileName} \\ \-resultFileDir ${resultDataDir} \\ \-companyId ${companyId} \\ * \-companyIdFieldName${companyIdFieldName} \\ \-fields startTime,caller,voiceAliyunUrl \\ \-headers 通话开始时间,主叫号码,录音地址 \\ \-whereCondition callFlag=1 \\ \-selectExpr "date\_format(cast(startTime/1000 as timestamp), 'yyyy-MM-dd HH:mm:ss') AS startTime,caller,voiceAliyunUrl" \\ > ${workPath}/logs/callLog-filter.log # **Spark任务提交:** **StandaLone与Yarn的任务执行情况不同在于资源分配管理者不同,独立模式下是有Master负责管理,yarn模式是ResourceManager负责调度。** 1、当集群启动后,Worker会向Master汇报资源,然后Master就会掌握Worker的集群信息。 2、当启动集群后,NodeManager会向RsourceManager汇报资源,而RM就掌握了集群的资源。 任务提交模式有一种是client客户端,另一种是cluster集群方式,由shell命令指定模式。 ## **Client:** 1.当在客户端提交SparkApplication时,Driver会在客户端启动,客户端会向RM申请启动ApplicationMaster。 2.RM 收到请求会向随机找一个满足资源的NM启动Application Master,AM启动后,会向RM申请资源用于启动executor,RM会返回一批NM节点给AM,AM收到返回结果后,会真正的向NM中去启动executor,每个executor中会有线程池。 3.executor启动后会向Driver注册,Driver会向executor发送task,并且监控task执行,收回task执行的结果。 它的问题在于:因为Driver运行在本地,Driver会与yarn集群中的Executor进行大量的通信,会造成客户机网卡流量的大量增加,可以在客户端看到task的执行和结果.Yarn-client模式同样是适用于测试。 ## **Cluster:** 1.当客户端提交Spark Appliction。会向RM申请启动ApplictaionMaster,而RM会随机找到一个满足资源的NM去启动AM。 2.当AM启动之后它负责任务调度,所以这里就不启动Driver,而AM就相当于Driver一样的功能存在。 3.AM启动后会向RM申请启动Executor,每个Executor会由线程池,RM会返回一批满足资源的NM节点。 4.AM接收到返回结果会找到相应的NM,启动Executor,executor启动后会向AM注册,而AM会将task发送到executor去执行,并且监控task,回收task'处理的结果。 它的问题在于:Yarn-Cluster主要用于生产环境中,因为Driver运行在Yarn集群中某一台nodeManager中,每次提交任务的Driver所在的机器都是随机的,不会产生某一台机器网卡流量激增的现象,缺点是任务提交后不能看到日志。只能通过yarn查看日志。 # **spark算子相关** ## **UpdateStateByKey(基于磁盘读写)** UpdateStateBykey会统计全局的key的状态,不管有没有数据输入,它会在每一个批次间隔返回之前的key的状态。updateStateBykey会对已存在的key进行state的状态更新,同时还会对每个新出现的key执行相同的更新函数操作。如果通过更新函数对state更新后返回来为none,此时刻key对应的state状态会删除(state可以是任意类型的数据结构)。 ### **适用场景:** UpdataStateBykey可以用来统计历史数据,每次输出所有的key值。列如统计不同时间段用户平均消费金额,消费次数,消费总额,网站的不同时间段的返回量等指标。 ### **适用实例条件:** 1. 首先会以DStream中的数据进行按key做reduce操作,然后再对各个批次的数据进行累加。 2. updataStateByKey要求必须设置checkpoint点(设置中间结果文件夹) 3. updataStateByKey方法中updataFunc就要传入的参数,Seq\[V\]表示当前key对应的所有值,Option\[S\]是当前key的历史状态,返回的是新的封装的数据。 ## **MapWithState(基于磁盘存储+缓存)** mapWithState也是用于对于全局统计key的状态,但是它如果没有数据输入,便不会返回之前的key的状态,类型于增量的感觉。使用场景mapWithState可以用于一些实时性较高,延迟较少的一些场景,例如你在某宝上下单买了个东西,付款之后返回你账户里余额信息。 ### **适用实例条件:** 1. 如果有初始化的值得需要,可以使用initialState(RDD)来初始化key的值 2. 还可以指定timeout函数,该函数的作用是,如果一个key超过timeout设定的时间没有更新值,那么这个key将会失效。这个控制需要在fun中实现,必须使用state.isTimingOut()来判断失效的key值。如果在失效时间之后,这个key又有新的值了,则会重新计算。如果没有使用isTimingOut,则会报错。3. checkpoint不会必须的 ### **区别:** updataeStateByKey可以在指定的批次间隔内返回之前的全部历史数据,包括新增的,改变的和没有改变的。由于updateStateByKey在使用的时候一定要做checkpoint,当数据量过大的时候,checkpoint会占据庞大的数据量,会影响性能,效率不高。 mapWithState只返回变化后的key的值,这样做的好处是,我们可以只关心那些已经发生的变化的key,对于没有数据输入,则不会返回那些没有变化的key 的数据。这样的话,即使数据量很大,checkpint也不会updateBykey那样,占用太多的存储,效率比较高(再生产环境中建议使用这个)。 详细使用:https://www.jianshu.com/p/a54b142067e5 ## **Map和MapPartition的区别:** map是对RDD的每一个元素使用一个方法操作,mapPartitions是对每个partition的迭代器使用一个方法操作。 ## **MapPartitions的优点:** 使用MapPartitions操作之后,一个task仅仅会执行一次function,function一次接收所有的partition数据。只要执行一次就可以了,性能比较高。通常体现在map过程中需要频繁创建额外的对象(例如将rdd中的数据通过jdbc写入数据库,map需要为每个元素创建一个链接而mapPartition为每个partition创建一个链接),则mapPartitions效率比map高的多。SparkSql或DataFrame默认会对程序进行mapPartition的优化。 ## **MapPartitions的缺点:** 如果是普通的map操作,一次function的执行就处理一条数据,可以将已经处理完的数据从内存里面释放掉。所以说普通的map操作通常不会导致内存的OOM异常。 但是MapPartitions操作,对于大量数据来说,如果直接将迭代器中数据取出来放内存,可能就OOM(内存溢出)。 ## **Foreach和ForeachPartition的区别:** ### **foreachPartition:** foreachPartition是spark-core的action算子,foreachPartition是对每个partition中的iterator分别处理,通过将iterator传入function对进行数据的处理,也就是说在foreachPartition中函数处理的是分区迭代器,而非具体的数据,源码中的注释是:Applies a function func to each parition of this RDD.(将函数func应用于此RDD的每个分区) ### **foreach:** foreach也是spark-core的action算子,与foreachPartition类似的是,foreach也是对每个partition中的iterator分别处理,通过对每个iterator迭代获取数据传给function进行数据的处理,也就是说在foreach中函数处理的是具体的数据,源码中的注释是:Applies a function fun to all elements of this RDD.(将函数func用于此RDD的所有元素). ### **foreachRDD与上面两个的区别:** foreachRDD是sparkStreaming的OutputOperation算子。但是foreachRDD并不会触发立即处理,必须在碰到sparkcore的foreach或者foreachPartition算子后,才会触发action动作。同时要注意,function的应用在的driver端进行,而不是Executor端进行。 ## **GoupByKey和ReduceByKey的区别:** ![](images/screenshot_1639970836485.png) ![](images/screenshot_1639970842720.png) ### GroupByKey:只是将键相同的值给归纳到一个序列,没有其它函数操作。不同分区数据直接shuffle到新分区在聚合,不会先再shuffle前聚合,造成shuffle过程IO压力更大 ProcessRDD.GroupByKey() ### ReduceByKey:将键相同的值使用同一个函数聚合操作,先再shuffle前使用函数局部聚合,再shuffle到不同分区后继续使用函数聚合 ProcessRDD.ReduceByKey(“函数(U,U) => U”) aggregateByKey:将键相同的值在两端使用不同函数操作,一端是shuffle前局部聚合使用的一个函数,另一端是shuffle后聚合使用的另一个函数,相对ReduceBykey可以提前指定初始值,并依据初始值类型返回值 ### ProcessRDD.aggregateByKey("初始值K类型")(“函数1(K,U) => K”, “函数2(K,K) => K”) **这三种都可以指定shuffle时的分区数,假如分区数不够,实际会出现不同键在同一物理分区,但spark处理时还是认为不同键是属于不同区的。既aggregateByKey初始值K是会给每个键初始时加上的。** # **SparkSQL相关:** ## **SQL 解析:** SQL Query,需要经过词法和语法解析,由字符串转换为,树形的抽象语法树。 * 通过遍历抽象语法树生成未解析的逻辑语法树(unresolved logic plan),对应SQL解析后的一种树形结构,本身不包含任务数据信息。 * 需要经过一次遍历之后,转换成成包含解析后的逻辑算子树(Analyzed LogicPlan),本身携带了各种信息。 * 最后经过优化后得到最终的逻辑语法树(Optimized LogicPlan)。 不管解析被划分为几步,在Spark 执行环境中,都要转化成RDD的调用代码,才能被spark core所执行,示意图如下: ## **创建视图:** createOrReplaceTempView 的作用是创建一个临时的表 , 一旦创建这个表的会话关闭 , 这个表也会立马消失 其他的SparkSession 不能共享应已经创建的临时表createOrReplaceGlobalTempView创建一个全局的临时表 , 这个表的生命周期是 整个Spark应用程序 ,只要Spark 应用程序不关闭 , 那么. 这个临时表依然是可以使用的 ,并且这个表对其他的SparkSession共享(**要 global\_temp.‘tablename’ 使用**) ## **分组语句:** GROUP BY : 使用时不像MySQL分组,MySQL可以返回不在group by条件中的列的第一条数据作为该列返回值,spark sql 和 hive sql 类似,不能这样随机返回,能返回聚合字段和聚合函数,但可以通过指定Frist(col)或者 Last(col)。 ## **窗口函数:** function OVER (PARITION BY … ORDER BY … FRAME\_TYPE   BETWEEN … AND …) function :对窗口内所有行都处理的函数 PARITION BY : 依据指定列进行分窗处理 ORDER BY : 窗口内依据指定字段排序 FRAME\_TYPE :FRAME是当前分区的一个子集,子句用来定义子集的规则,通常用来作为滑动窗口使用。主要用来控制每行数据在应用窗口函数时,这个窗口函数的作用范围。 ( FRAME\_TYPE rows 指定函数基于当前行的窗口范围 rows between …T… and …T… T 如下: unbounded preceding 前面所有行 unbounded following 后面所有行 current row 当前行 n following 后面n行 n preceding 前面n行 )