企业🤖AI智能体构建引擎,智能编排和调试,一键部署,支持私有化部署方案 广告
# 快速入门 - [安全](https://spark.apache.org/docs/latest/quick-start.html#security) * [使用 Spark Shell 进行交互式分析](#使用-spark-shell-进行交互式分析) * [基础](#基础) * [Dataset 上的更多操作](#dataset-上的更多操作) * [缓存](#缓存) * [独立的应用](#独立的应用) * [快速跳转](#快速跳转) 本教程提供了如何使用 Spark 的快速入门介绍。首先通过运行 Spark 交互式的 shell(在 Python 或 Scala 中)来介绍 API,然后展示如何使用 Java,Scala 和 Python 来编写应用程序。 为了继续阅读本指南,首先从 [Spark 官网](http://spark.apache.org/downloads.html) 下载 Spark 的发行包。因为我们不使用 HDFS,所以你可以下载一个任何 Hadoop 版本的软件包。 请注意,在 Spark 2.0 之前,Spark 的主要编程接口是弹性分布式数据集(RDD)。 在 Spark 2.0 之后,RDD 被 Dataset 替换,它是像RDD 一样的 strongly-typed(强类型),但是在引擎盖下更加优化。 RDD 接口仍然受支持,您可以在 [RDD 编程指南](rdd-programming-guide.html) 中获得更完整的参考。 但是,我们强烈建议您切换到使用 Dataset(数据集),其性能要更优于 RDD。 请参阅 [SQL 编程指南](sql-programming-guide.html) 获取更多有关 Dataset 的信息。 # 安全 默认情况下,Spark中的安全性处于关闭状态。这意味着您默认情况下容易受到攻击。在下载和运行Spark之前,请参阅[Spark Security](https://spark.apache.org/docs/latest/security.html)。 # 使用 Spark Shell 进行交互式分析 ## 基础 Spark shell 提供了一种来学习该 API 比较简单的方式,以及一个强大的来分析数据交互的工具。在 Scala(运行于 Java 虚拟机之上,并能很好的调用已存在的 Java 类库)或者 Python 中它是可用的。通过在 Spark 目录中运行以下的命令来启动它: ``` ./bin/spark-shell ``` Spark 的主要抽象是一个称为 Dataset 的分布式的 item 集合。Datasets 可以从 Hadoop 的 InputFormats(例如 HDFS文件)或者通过其它的 Datasets 转换来创建。让我们从 Spark 源目录中的 README 文件来创建一个新的 Dataset: ``` scala> val textFile = spark.read.textFile("README.md") textFile: org.apache.spark.sql.Dataset[String] = [value: string] ``` 您可以直接从 Dataset 中获取 values(值),通过调用一些 actions(动作),或者 transform(转换)Dataset 以获得一个新的。更多细节,请参阅 _[API doc](api/scala/index.html#org.apache.spark.sql.Dataset)_。 ``` scala> textFile.count() // Number of items in this Dataset res0: Long = 126 // May be different from yours as README.md will change over time, similar to other outputs scala> textFile.first() // First item in this Dataset res1: String = # Apache Spark ``` 现在让我们 transform 这个 Dataset 以获得一个新的 。我们调用 `filter` 以返回一个新的 Dataset,它是文件中的 items 的一个子集。 ``` scala> val linesWithSpark = textFile.filter(line => line.contains("Spark")) linesWithSpark: org.apache.spark.sql.Dataset[String] = [value: string] ``` 我们可以链式操作 transformation(转换)和 action(动作): ``` scala> textFile.filter(line => line.contains("Spark")).count() // How many lines contain "Spark"? res3: Long = 15 ``` ``` ./bin/pyspark ``` Spark有一个主要的抽象概念叫做 Dataset 的分布式集合类。Dataset 可以从Hadoop InputFormats(例如HDFS文件)或通过 transforming 其他数据集来创建数据集。 由于Python的动态特性,我们不需要在Python中定义强类型的 Dataset。 因此,Python中的所有数据集都是 Dataset[Row],我们称之为"DataFrame" 来与 Pandas 和R中的数据框概念一致。让我们从Spark源文件中的README文件中创建一个新的 DataFrame 目录: ``` >>> textFile = spark.read.text("README.md") ``` 您可以通过调用某些action直接从DataFrame获取值,也可以transform DataFrame以获取新的DataFrame。 有关详细信息,请阅读 _[API doc](api/python/index.html#pyspark.sql.DataFrame)_. ``` >>> textFile.count() # 这个DataFrame 有多少行 126 >>> textFile.first() # DataFrame的第一行 Row(value=u'# Apache Spark') ``` 现在让我们 transform(转换) 这个DataFrame来获得一个新的DataFrame. 我们调用 `filter` 方法来返回文件中的一个子集. ``` >>> linesWithSpark = textFile.filter(textFile.value.contains("Spark")) ``` 我们可以把 transform 和 acition 连在一起用: ``` >>> textFile.filter(textFile.value.contains("Spark")).count() # 统计文件中 "Spark" 字符串有多少个 15 ``` ## Dataset 上的更多操作 Dataset actions(操作)和 transformations(转换)可以用于更复杂的计算。例如,统计出现次数最多的行 : ``` scala> textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b) res4: Long = 15 ``` 第一个 map 操作创建一个新的 Dataset,将一行数据 map 为一个整型值。在 Dataset 上调用 `reduce` 来找到最大的行计数。参数 `map` 与 `reduce` 是 Scala 函数(closures),并且可以使用 Scala/Java 库的任何语言特性。例如,我们可以很容易地调用函数声明,我们将定义一个 max 函数来使代码更易于理解 : ``` scala> import java.lang.Math import java.lang.Math scala> textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b)) res5: Int = 15 ``` 一种常见的数据流模式是被 Hadoop 所推广的 MapReduce。Spark 可以很容易实现 MapReduce: ``` scala> val wordCounts = textFile.flatMap(line => line.split(" ")).groupByKey(identity).count() wordCounts: org.apache.spark.sql.Dataset[(String, Long)] = [value: string, count(1): bigint] ``` 在这里,我们调用了 `flatMap` 以 transform 一个 lines 的 Dataset 为一个 words 的 Dataset,然后结合 `groupByKey` 和 `count` 来计算文件中每个单词的 counts 作为一个 (String, Long) 的 Dataset pairs。要在 shell 中收集 word counts,我们可以调用 `collect`: ``` scala> wordCounts.collect() res6: Array[(String, Int)] = Array((means,1), (under,2), (this,3), (Because,1), (Python,2), (agree,1), (cluster.,1), ...) ``` ``` >>> from pyspark.sql.functions import * >>> textFile.select(size(split(textFile.value, "\s+")).name("numWords")).agg(max(col("numWords"))).collect() [Row(max(numWords)=15)] ``` 这首先将一行映射为一个整数值并且别名为 "numWords" ,从中创建一个新的DataFrame. 在该 DataFrame 上调用 `agg` 函数是为了找到最大词数(word count) . `select` 和 `agg` 的参数都是 _[Column](api/python/index.html#pyspark.sql.Column)_ 里的,我们也可以通过 `df.colName` 来获得该 DataFrame 的一列. 我们也可以导入 pyspark.sql.functions 来提供了许多方便的功能来从旧的列构建一个新的列, 一个常见的数据流模式是MapReduce,由Hadoop推广。 Spark可以轻松实现MapReduce流程: ``` >>> wordCounts = textFile.select(explode(split(textFile.value, "\s+")).as("word")).groupBy("word").count() ``` 在这里我们在 `select` 中使用 `explode` 函数来将一个 Dataset 的所有行转换成一个词的数据集,然后组合使用 `groupBy` 和 `count` 来计算文件中各个单词的计数作为 DataFrame 的两列: "word" 和 "count".要在我们的shell中统计单词的词频,我们可以调用 `collect`: ``` >>> wordCounts.collect() [Row(word=u'online', count=1), Row(word=u'graphs', count=1), ...] ``` ## 缓存 Spark 还支持 Pulling(拉取)数据集到一个群集范围的内存缓存中。例如当查询一个小的 “hot” 数据集或运行一个像 PageRANK 这样的迭代算法时,在数据被重复访问时是非常高效的。举一个简单的例子,让我们标记我们的 `linesWithSpark` 数据集到缓存中: ``` scala> linesWithSpark.cache() res7: linesWithSpark.type = [value: string] scala> linesWithSpark.count() res8: Long = 15 scala> linesWithSpark.count() res9: Long = 15 ``` 使用 Spark 来探索和缓存一个 100 行的文本文件看起来比较愚蠢。有趣的是,即使在他们跨越几十或者几百个节点时,这些相同的函数也可以用于非常大的数据集。您也可以像 [编程指南](rdd-programming-guide.html#using-the-shell). 中描述的一样通过连接 `bin/spark-shell` 到集群中,使用交互式的方式来做这件事情。 ``` >>> linesWithSpark.cache() >>> linesWithSpark.count() 15 >>> linesWithSpark.count() 15 ``` 使用Spark探索和缓存100行文本文件似乎很愚蠢. T有趣的是,这些相同的功能可用于非常大的数据集,即使它们跨越数十个或数百个节点交错着, 你也可以通过`bin/pyspark` 连接到集群来进行交互,详细描述在 [RDD programming guide](rdd-programming-guide.html#using-the-shell). # 独立的应用 假设我们希望使用 Spark API 来创建一个独立的应用程序。我们在 Scala(SBT),Java(Maven)和 Python 中练习一个简单应用程序。 我们将在 Scala 中创建一个非常简单的 Spark 应用程序 - 很简单的,事实上,它名为 `SimpleApp.scala`: ``` /* SimpleApp.scala */ import org.apache.spark.sql.SparkSession object SimpleApp { def main(args: Array[String]) { val logFile = "YOUR_SPARK_HOME/README.md" // Should be some file on your system val spark = SparkSession.builder.appName("Simple Application").getOrCreate() val logData = spark.read.textFile(logFile).cache() val numAs = logData.filter(line => line.contains("a")).count() val numBs = logData.filter(line => line.contains("b")).count() println(s"Lines with a: $numAs, Lines with b: $numBs") spark.stop() } } ``` 注意,这个应用程序我们应该定义一个 `main()` 方法而不是去扩展 `scala.App`。使用 `scala.App` 的子类可能不会正常运行。 该程序仅仅统计了 Spark README 文件中每一行包含 ‘a’ 的数量和包含 ‘b’ 的数量。注意,您需要将 YOUR_SPARK_HOME 替换为您 Spark 安装的位置。不像先前使用 spark shell 操作的示例,它们初始化了它们自己的 SparkContext,我们初始化了一个 SparkContext 作为应用程序的一部分。 我们调用 `SparkSession.builder` 以构造一个 [[SparkSession]],然后设置 application name(应用名称),最终调用 `getOrCreate` 以获得 [[SparkSession]] 实例。 我们的应用依赖了 Spark API,所以我们将包含一个名为 `build.sbt` 的 sbt 配置文件,它描述了 Spark 的依赖。该文件也会添加一个 Spark 依赖的 repository: ``` name := "Simple Project" version := "1.0" scalaVersion := "2.12.8" libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.4" ``` 为了让 sbt 正常的运行,我们需要根据经典的目录结构来布局 `SimpleApp.scala` 和 `build.sbt` 文件。在成功后,我们可以创建一个包含应用程序代码的 JAR 包,然后使用 `spark-submit` 脚本来运行我们的程序。 ``` # Your directory layout should look like this $ find . . ./build.sbt ./src ./src/main ./src/main/scala ./src/main/scala/SimpleApp.scala # Package a jar containing your application $ sbt package ... [info] Packaging {..}/{..}/target/scala-2.12/simple-project_2.12-1.0.jar # Use spark-submit to run your application $ YOUR_SPARK_HOME/bin/spark-submit \ --class "SimpleApp" \ --master local[4] \ target/scala-2.12/simple-project_2.12-1.0.jar ... Lines with a: 46, Lines with b: 23 ``` 这个例子使用Maven来编译成一个jar应用程序,其他的构建系统(如Ant、Gradle,译者注)也可以。 我们会创建一个非常简单的Spark应用,`SimpleApp.java`: ``` /* SimpleApp.java */ import org.apache.spark.sql.SparkSession; public class SimpleApp { public static void main(String[] args) { String logFile = "YOUR_SPARK_HOME/README.md"; // Should be some file on your system SparkSession spark = SparkSession.builder().appName("Simple Application").getOrCreate(); Dataset<String> logData = spark.read.textFile(logFile).cache(); long numAs = logData.filter(s -> s.contains("a")).count(); long numBs = logData.filter(s -> s.contains("b")).count(); System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs); spark.stop(); } } ``` 这个程序计算Spark README文档中包含字母’a’和字母’b’的行数。注意把YOUR_SPARK_HOME修改成你的Spark的安装目录。 跟之前的Spark shell不同,我们需要初始化SparkSession。 把Spark依赖添加到Maven的`pom.xml`文件里。 注意Spark的artifacts使用Scala版本进行标记。 ``` <project> <groupId>edu.berkeley</groupId> <artifactId>simple-project</artifactId> <modelVersion>4.0.0</modelVersion> <name>Simple Project</name> <packaging>jar</packaging> <version>1.0</version> <dependencies> <dependency> <!-- Spark dependency --> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.12</artifactId> <version>2.4.4</version> <scope>provided</scope> </dependency> </dependencies> </project> ``` 我们按照Maven经典的目录结构组织这些文件: ``` $ find . ./pom.xml ./src ./src/main ./src/main/java ./src/main/java/SimpleApp.java ``` 现在我们用Maven打包这个应用,然后用`./bin/spark-submit`执行它。 ``` # 打包包含应用程序的JAR $ mvn package ... [INFO] Building jar: {..}/{..}/target/simple-project-1.0.jar # 用spark-submit来运行程序 $ YOUR_SPARK_HOME/bin/spark-submit \ --class "SimpleApp" \ --master local[4] \ target/simple-project-1.0.jar ... Lines with a: 46, Lines with b: 23 ``` 现在我们来展示如何用python API 来写一个应用 (pyspark). 如果要构建打包的PySpark应用程序或库,则可以添加以下内容到setup.py文件中: ``` install_requires=[ 'pyspark=={site.SPARK_VERSION}' ] ``` 我们以一个简单的例子为例,创建一个简单的pyspark 应用 `SimpleApp.py`: ``` """SimpleApp.py""" from pyspark.sql import SparkSession logFile = "YOUR_SPARK_HOME/README.md" # Should be some file on your system spark = SparkSession.builder().appName(appName).master(master).getOrCreate() logData = spark.read.text(logFile).cache() numAs = logData.filter(logData.value.contains('a')).count() numBs = logData.filter(logData.value.contains('b')).count() print("Lines with a: %i, lines with b: %i" % (numAs, numBs)) spark.stop() ``` 该程序只是统计计算在该文本中包含a字母和包含b字母的行数. 请注意你需要将 YOUR_SPARK_HOME 替换成你的spark路径.就像scala 示例和java示例一样,我们使用 SparkSession 来创建数据集, 对于使用自定义类护着第三方库的应用程序,我们还可以通过 `spark-submit` 带着 `--py-files` 来添加代码依赖 , 我们也可以通过把代码打成zip包来进行依赖添加 (详细请看 `spark-submit --help` ). `SimpleApp` 是个简单的例子我们不需要添加特别的代码或自定义类. 我们可以通过 `bin/spark-submit` 脚本来运行应用: ``` # Use spark-submit to run your application $ YOUR_SPARK_HOME/bin/spark-submit \ --master local[4] \ SimpleApp.py ... Lines with a: 46, Lines with b: 23 ``` 如果您的环境中已安装PySpark pip(例如pip install pyspark),则可以使用常规Python解释器运行应用程序,也可以根据需要使用前面的“ spark-submit”。 ``` # Use the Python interpreter to run your application $ python SimpleApp.py ... Lines with a: 46, Lines with b: 23 ``` # 快速跳转 恭喜您成功的运行了您的第一个 Spark 应用程序! * 更多 API 的深入概述,从 [RDD programming guide](rdd-programming-guide.html) 和 [SQL programming guide](sql-programming-guide.html) 这里开始,或者看看 “编程指南” 菜单中的其它组件。 * 为了在集群上运行应用程序,请前往 [deployment overview](cluster-overview.html). * 最后,在 Spark 的 `examples` 目录中包含了一些 ([Scala](https://github.com/apache/spark/tree/master/examples/src/main/scala/org/apache/spark/examples),[Java](https://github.com/apache/spark/tree/master/examples/src/main/java/org/apache/spark/examples),[Python](https://github.com/apache/spark/tree/master/examples/src/main/python),[R](https://github.com/apache/spark/tree/master/examples/src/main/r)) 示例。您可以按照如下方式来运行它们: ``` # 针对 Scala 和 Java,使用 run-example: ./bin/run-example SparkPi # 针对 Python 示例,直接使用 spark-submit: ./bin/spark-submit examples/src/main/python/pi.py # 针对 R 示例,直接使用 spark-submit: ./bin/spark-submit examples/src/main/r/dataframe.R ```