企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
[TOC] ## 一、 引入 Spark Spark 1.2.0 使用 Scala 2.10 写应用程序,你需要使用一个兼容的 Scala 版本(例如:2.10.X)。写 Spark 应用程序时,你需要添加 Spark 的 Maven 依赖,Spark 可以通过 Maven 中心仓库来获得: ``` <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.10.5</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.6.2</version> </dependency> ``` 另外,如果你希望访问 HDFS 集群,你需要根据你的 HDFS 版本添加 `hadoop-client` 的依赖。一些公共的 HDFS 版本 tags 在[第三方发行页面](https://spark.apache.org/docs/latest/hadoop-third-party-distributions.html)中被列出。 ``` groupId = org.apache.hadoop artifactId = hadoop-client version = <your-hdfs-version> ``` 最后,你需要导入一些 Spark 的类和隐式转换到你的程序,添加下面的行就可以了: ```scala import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf ``` ## 二、初始化 Spark Spark 编程的第一步是需要创建一个 [SparkContext](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkContext) 对象,用来告诉 Spark 如何访问集群。在创建 `SparkContext` 之前,你需要构建一个 [SparkConf](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.SparkConf) 对象, SparkConf 对象包含了一些你应用程序的信息。 ```scala val conf = new SparkConf().setAppName(appName).setMaster(master) new SparkContext(conf) ``` `appName` 参数是你程序的名字,它会显示在 cluster UI 上。`master` 是 [Spark, Mesos 或 YARN 集群的 URL](https://spark.apache.org/docs/latest/submitting-applications.html#master-urls),或运行在本地模式时,使用专用字符串 “local”。在实践中,当应用程序运行在一个集群上时,你并不想要把 `master` 硬编码到你的程序中,你可以[用 spark-submit 启动你的应用程序](https://spark.apache.org/docs/latest/submitting-applications.html)的时候传递它。然而,你可以在本地测试和单元测试中使用 “local” 运行 Spark 进程。 ### 使用 Shell 在 Spark shell 中,有一个专有的 SparkContext 已经为你创建好。在变量中叫做 `sc`。你自己创建的 SparkContext 将无法工作。可以用 `--master` 参数来设置 SparkContext 要连接的集群,用 `--jars` 来设置需要添加到 classpath 中的 JAR 包,如果有多个 JAR 包使用**逗号**分割符连接它们。例如:在一个拥有 4 核的环境上运行 `bin/spark-shell`,使用: ``` $ ./bin/spark-shell --master local[4] ``` 或在 classpath 中添加 `code.jar`,使用: ``` $ ./bin/spark-shell --master local[4] --jars code.jar ``` 执行 `spark-shell --help` 获取完整的选项列表。在这之后,调用 `spark-shell` 会比 [spark-submit 脚本](https://spark.apache.org/docs/latest/submitting-applications.html)更为普遍。 ## 三、并行集合 并行集合 (_Parallelized collections_) 的创建是通过在一个已有的集合(Scala `Seq`)上调用 SparkContext 的 `parallelize` 方法实现的。集合中的元素被复制到一个可并行操作的分布式数据集中。例如,这里演示了如何在一个包含 1 到 5 的数组中创建并行集合: ```scala val data = Array(1, 2, 3, 4, 5) val distData = sc.parallelize(data) ``` 一旦创建完成,这个分布式数据集(`distData`)就可以被并行操作。例如,我们可以调用 `distData.reduce((a, b) => a + b)` 将这个数组中的元素相加。我们以后再描述在分布式上的一些操作。 并行集合一个很重要的参数是切片数(_slices_),表示一个数据集切分的份数。Spark 会在集群上为每一个切片运行一个任务。你可以在集群上为每个 CPU 设置 2-4 个切片(slices)。正常情况下,Spark 会试着基于你的集群状况自动地设置切片的数目。然而,你也可以通过 `parallelize` 的第二个参数手动地设置(例如:`sc.parallelize(data, 10)`)。