原文链接 : http://zeppelin.apache.org/docs/0.7.2/quickstart/tutorial.html

译文链接 : http://cwiki.apachecn.org/pages/viewpage.action?pageId=10030571

贡献者 : 片刻 ApacheCN Apache中文网

本教程将引导您了解Zeppelin的一些基本概念。我们假设你已经安装了Zeppelin。如果没有,请先看这里

Zeppelin当前的主要后端处理引擎是 Apache Spark。如果您刚刚接触到该系统,您可能希望首先了解如何处理数据以充分利用Zeppelin。

本地文件教程

数据优化

在开始Zeppelin教程之前,您需要下载bank.zip

首先,将csv格式的数据转换成RDD Bank对象,运行以下脚本。这也将使用filter功能删除标题。

  1. val bankText = sc.textFile("yourPath/bank/bank-full.csv")
  2.  
  3. case class Bank(age:Integer, job:String, marital : String, education : String, balance : Integer)
  4.  
  5. // split each line, filter out header (starts with "age"), and map it into Bank case class
  6. val bank = bankText.map(s=>s.split(";")).filter(s=>s(0)!="\"age\"").map(
  7. s=>Bank(s(0).toInt,
  8. s(1).replaceAll("\"", ""),
  9. s(2).replaceAll("\"", ""),
  10. s(3).replaceAll("\"", ""),
  11. s(5).replaceAll("\"", "").toInt
  12. )
  13. )
  14.  
  15. // convert to DataFrame and create temporal table
  16. bank.toDF().registerTempTable("bank")

数据检索

假设我们想看到年龄分布bank。为此,运行:

  1. %sql select age, count(1) from bank where age < 30 group by age order by age

您可以输入框通过更换设置年龄条件30${maxAge=30}

  1. %sql select age, count(1) from bank where age < ${maxAge=30} group by age order by age

现在我们要看到具有某种婚姻状况的年龄分布,并添加组合框来选择婚姻状况。跑:

  1. %sql select age, count(1) from bank where marital="${marital=single,single|divorced|married}" group by age order by age

具有流数据的教程

数据优化

由于本教程基于Twitter的示例tweet流,您必须使用Twitter帐户配置身份验证。要做到这一点,看看Twitter Credential Setup。当您得到API密钥,您应填写证书相关的值(apiKeyapiSecretaccessTokenaccessTokenSecret与下面的脚本您的API密钥)。

这将创建一个Tweet对象的RDD 并将这些流数据注册为一个表:

  1. import org.apache.spark.streaming._
  2. import org.apache.spark.streaming.twitter._
  3. import org.apache.spark.storage.StorageLevel
  4. import scala.io.Source
  5. import scala.collection.mutable.HashMap
  6. import java.io.File
  7. import org.apache.log4j.Logger
  8. import org.apache.log4j.Level
  9. import sys.process.stringSeqToProcess
  10.  
  11. /** Configures the Oauth Credentials for accessing Twitter */
  12. def configureTwitterCredentials(apiKey: String, apiSecret: String, accessToken: String, accessTokenSecret: String) {
  13. val configs = new HashMap[String, String] ++= Seq(
  14. "apiKey" -> apiKey, "apiSecret" -> apiSecret, "accessToken" -> accessToken, "accessTokenSecret" -> accessTokenSecret)
  15. println("Configuring Twitter OAuth")
  16. configs.foreach{ case(key, value) =>
  17. if (value.trim.isEmpty) {
  18. throw new Exception("Error setting authentication - value for " + key + " not set")
  19. }
  20. val fullKey = "twitter4j.oauth." + key.replace("api", "consumer")
  21. System.setProperty(fullKey, value.trim)
  22. println("\tProperty " + fullKey + " set as [" + value.trim + "]")
  23. }
  24. println()
  25. }
  26.  
  27. // Configure Twitter credentials
  28. val apiKey = "xxxxxxxxxxxxxxxxxxxxxxxxx"
  29. val apiSecret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
  30. val accessToken = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
  31. val accessTokenSecret = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
  32. configureTwitterCredentials(apiKey, apiSecret, accessToken, accessTokenSecret)
  33.  
  34. import org.apache.spark.streaming.twitter._
  35. val ssc = new StreamingContext(sc, Seconds(2))
  36. val tweets = TwitterUtils.createStream(ssc, None)
  37. val twt = tweets.window(Seconds(60))
  38.  
  39. case class Tweet(createdAt:Long, text:String)
  40. twt.map(status=>
  41. Tweet(status.getCreatedAt().getTime()/1000, status.getText())
  42. ).foreachRDD(rdd=>
  43. // Below line works only in spark 1.3.0.
  44. // For spark 1.1.x and spark 1.2.x,
  45. // use rdd.registerTempTable("tweets") instead.
  46. rdd.toDF().registerAsTable("tweets")
  47. )
  48.  
  49. twt.print
  50.  
  51. ssc.start()

数据检索

对于每个以下脚本,每次单击运行按钮,您将看到不同的结果,因为它是基于实时数据。

我们开始提取包含单词girl的最多10个tweets 。

  1. %sql select * from tweets where text like '%girl%' limit 10

这次假设我们想看看在过去60秒内每秒创建的tweet有多少。为此,运行:

  1. %sql select createdAt, count(1) from tweets group by createdAt order by createdAt

您可以在Spark SQL中进行用户定义的功能并使用它们。让我们通过命名函数来尝试sentiment。该功能将返回参数中的三种态度之一(正,负,中性)。

  1. def sentiment(s:String) : String = {
  2. val positive = Array("like", "love", "good", "great", "happy", "cool", "the", "one", "that")
  3. val negative = Array("hate", "bad", "stupid", "is")
  4.  
  5. var st = 0;
  6.  
  7. val words = s.split(" ")
  8. positive.foreach(p =>
  9. words.foreach(w =>
  10. if(p==w) st = st+1
  11. )
  12. )
  13.  
  14. negative.foreach(p=>
  15. words.foreach(w=>
  16. if(p==w) st = st-1
  17. )
  18. )
  19. if(st>0)
  20. "positivie"
  21. else if(st<0)
  22. "negative"
  23. else
  24. "neutral"
  25. }
  26.  
  27. // Below line works only in spark 1.3.0.
  28. // For spark 1.1.x and spark 1.2.x,
  29. // use sqlc.registerFunction("sentiment", sentiment _) instead.
  30. sqlc.udf.register("sentiment", sentiment _)

要检查人们如何看待使用sentiment上述功能的女孩,请运行以下操作:

  1. %sql select sentiment(text), count(1) from tweets where text like '%girl%' group by sentiment(text)