多应用+插件架构,代码干净,二开方便,首家独创一键云编译技术,文档视频完善,免费商用码云13.8K 广告
# 教程 原文链接 : [http://zeppelin.apache.org/docs/0.7.2/quickstart/tutorial.html](http://zeppelin.apache.org/docs/0.7.2/quickstart/tutorial.html) 译文链接 : [http://www.apache.wiki/pages/viewpage.action?pageId=10030571](http://www.apache.wiki/pages/viewpage.action?pageId=10030571) 贡献者 : [片刻](/display/~jiangzhonglian) [ApacheCN](/display/~apachecn) [Apache中文网](/display/~apachechina) 本教程将引导您了解Zeppelin的一些基本概念。我们假设你已经安装了Zeppelin。如果没有,请先看[这里](http://zeppelin.apache.org/docs/0.7.1/install/install.html)。 Zeppelin当前的主要后端处理引擎是[Apache Spark](https://spark.apache.org/)。如果您刚刚接触到该系统,您可能希望首先了解如何处理数据以充分利用Zeppelin。 ## 本地文件教程 ### 数据优化 在开始Zeppelin教程之前,您需要下载[bank.zip](http://archive.ics.uci.edu/ml/machine-learning-databases/00222/bank.zip)。 首先,将csv格式的数据转换成RDD `Bank`对象,运行以下脚本。这也将使用`filter`功能删除标题。 ``` val bankText = sc.textFile("yourPath/bank/bank-full.csv") case class Bank(age:Integer, job:String, marital : String, education : String, balance : Integer) // split each line, filter out header (starts with "age"), and map it into Bank case class val bank = bankText.map(s=>s.split(";")).filter(s=>s(0)!="\"age\"").map( s=>Bank(s(0).toInt, s(1).replaceAll("\"", ""), s(2).replaceAll("\"", ""), s(3).replaceAll("\"", ""), s(5).replaceAll("\"", "").toInt ) ) // convert to DataFrame and create temporal table bank.toDF().registerTempTable("bank") ``` ### 数据检索 假设我们想看到年龄分布`bank`。为此,运行: ``` %sql select age, count(1) from bank where age < 30 group by age order by age ```  您可以输入框通过更换设置年龄条件`30`用`${maxAge=30}`。 ``` %sql select age, count(1) from bank where age < ${maxAge=30} group by age order by age ```  现在我们要看到具有某种婚姻状况的年龄分布,并添加组合框来选择婚姻状况。跑: ``` %sql select age, count(1) from bank where marital="${marital=single,single|divorced|married}" group by age order by age ``` ## 具有流数据的教程 ### 数据优化 由于本教程基于Twitter的示例tweet流,您必须使用Twitter帐户配置身份验证。要做到这一点,看看[Twitter Credential Setup](https://databricks-training.s3.amazonaws.com/realtime-processing-with-spark-streaming.html#twitter-credential-setup)。当您得到API密钥,您应填写证书相关的值(`apiKey`,`apiSecret`,`accessToken`,`accessTokenSecret`与下面的脚本您的API密钥)。 这将创建一个`Tweet`对象的RDD 并将这些流数据注册为一个表: ``` import org.apache.spark.streaming._ import org.apache.spark.streaming.twitter._ import org.apache.spark.storage.StorageLevel import scala.io.Source import scala.collection.mutable.HashMap import java.io.File import org.apache.log4j.Logger import org.apache.log4j.Level import sys.process.stringSeqToProcess /** Configures the Oauth Credentials for accessing Twitter */ def configureTwitterCredentials(apiKey: String, apiSecret: String, accessToken: String, accessTokenSecret: String) { val configs = new HashMap[String, String] ++= Seq( "apiKey" -> apiKey, "apiSecret" -> apiSecret, "accessToken" -> accessToken, "accessTokenSecret" -> accessTokenSecret) println("Configuring Twitter OAuth") configs.foreach{ case(key, value) => if (value.trim.isEmpty) { throw new Exception("Error setting authentication - value for " + key + " not set") } val fullKey = "twitter4j.oauth." + key.replace("api", "consumer") System.setProperty(fullKey, value.trim) println("\tProperty " + fullKey + " set as [" + value.trim + "]") } println() } // Configure Twitter credentials val apiKey = "xxxxxxxxxxxxxxxxxxxxxxxxx" val apiSecret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" val accessToken = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" val accessTokenSecret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" configureTwitterCredentials(apiKey, apiSecret, accessToken, accessTokenSecret) import org.apache.spark.streaming.twitter._ val ssc = new StreamingContext(sc, Seconds(2)) val tweets = TwitterUtils.createStream(ssc, None) val twt = tweets.window(Seconds(60)) case class Tweet(createdAt:Long, text:String) twt.map(status=> Tweet(status.getCreatedAt().getTime()/1000, status.getText()) ).foreachRDD(rdd=> // Below line works only in spark 1.3.0. // For spark 1.1.x and spark 1.2.x, // use rdd.registerTempTable("tweets") instead. rdd.toDF().registerAsTable("tweets") ) twt.print ssc.start()  ``` ### 数据检索 对于每个以下脚本,每次单击运行按钮,您将看到不同的结果,因为它是基于实时数据。 我们开始提取包含单词**girl的**最多10个tweets 。 ``` %sql select * from tweets where text like '%girl%' limit 10 ```  这次假设我们想看看在过去60秒内每秒创建的tweet有多少。为此,运行: ``` %sql select createdAt, count(1) from tweets group by createdAt order by createdAt ```  您可以在Spark SQL中进行用户定义的功能并使用它们。让我们通过命名函数来尝试`sentiment`。该功能将返回参数中的三种态度之一(正,负,中性)。 ``` def sentiment(s:String) : String = { val positive = Array("like", "love", "good", "great", "happy", "cool", "the", "one", "that") val negative = Array("hate", "bad", "stupid", "is") var st = 0; val words = s.split(" ") positive.foreach(p => words.foreach(w => if(p==w) st = st+1 ) ) negative.foreach(p=> words.foreach(w=> if(p==w) st = st-1 ) ) if(st>0) "positivie" else if(st<0) "negative" else "neutral" } // Below line works only in spark 1.3.0. // For spark 1.1.x and spark 1.2.x, // use sqlc.registerFunction("sentiment", sentiment _) instead. sqlc.udf.register("sentiment", sentiment _) ```  要检查人们如何看待使用`sentiment`上述功能的女孩,请运行以下操作: ``` %sql select sentiment(text), count(1) from tweets where text like '%girl%' group by sentiment(text) ```