输入1: TCP
安装netcat
nc -l -p 9999
代码
~~~
def main(args: Array[String]): Unit = {
val conf=new SparkConf().setMaster("local").setAppName("WordCount")
val ssc=new StreamingContext(conf,Seconds(10));
//通过 ssc 创建 Dstream 对象,得到 lines 变量从服务器获得数据,每条记录表示一行文本数据
val lines=ssc.socketTextStream("localhost",9999)
val words=lines.flatMap(_.split(" "))
//统计单词个数,words 由 map+reduceByKey 两个操作,pairs 类型 DStream
import org.apache.spark.streaming.StreamingContext._
val pairs=words.map(w=>(w,1))
val wordCounts=pairs.reduceByKey(_+_)
wordCounts.print()
//启动上面计算并等待终止
ssc.start()
ssc.awaitTermination()
}
~~~
输入2:文件
~~~
def main(args: Array[String]): Unit = {
println("heelllsssssssssssssssssssssssssssssssssssssss")
val conf=new SparkConf().setMaster("local").setAppName("WordCount")
val ssc=new StreamingContext(conf,Seconds(10));
//通过 ssc 创建 Dstream 对象,得到 lines 变量从服务器获得数据,每条记录表示一行文本数据
val lines=ssc.textFileStream("d:/aa")
val words=lines.flatMap(_.split(" "))
//统计单词个数,words 由 map+reduceByKey 两个操作,pairs 类型 DStream
import org.apache.spark.streaming.StreamingContext._
val pairs=words.map(w=>(w,1))
val wordCounts=pairs.reduceByKey(_+_)
wordCounts.print()
//启动上面计算并等待终止
ssc.start()
ssc.awaitTermination()
}
~~~
输入3:kafaka
1) 启动zookeeper zkServer.sh start
查看zookeeper状态 zkServer.sh status
2) 启动kafka
bin/kafka-server-start.sh –daemon config/server.properties
3) 创建kafka topic
bin/kafka-topics.sh --create --zookeeper master:2181 --replication-factor 2 --partitions 4 --topic test_1
bin/kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic test_1
![](https://box.kancloud.cn/4c2987f3c2ae07507d113b8460a13983_576x61.png)
查看topic
bin/kafka-topics.sh --list --zookeeper master:2181
4) 启动生产者消费者
在启动 kafka-server 之后启动,运行producer:
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_1
在另一个终端运行 consumer:
$ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test_1
5) 使用程序替换消费者
~~~
import kafka.serializer.StringDecoder
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Duration, StreamingContext}
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setMaster("local").setAppName("kafka-spark-demo")
val scc = new StreamingContext(sparkConf, Duration(5000))
scc.sparkContext.setLogLevel("ERROR")
scc.checkpoint(".") // 因为使用到了updateStateByKey,所以必须要设置checkpoint
val topics = Set("test") //我们需要消费的kafka数据的topic
val brokers = "192.168.80.3:9092"
val kafkaParam = Map[String, String](
"zookeeper.connect" -> "192.168.80.3:2181",
// "group.id" -> "test-consumer-group",
"metadata.broker.list" -> brokers, // kafka的broker list地址
"serializer.class" -> "kafka.serializer.StringEncoder"
)
val stream:InputDStream[(String, String)] = createStream(scc, kafkaParam, topics)
stream.map(_._2) //取出value
.flatMap(_.split(" ")) // 将字符串使用空格分隔
.map(r => (r, 1)) // 每个单词映射成一个pair
.updateStateByKey[Int](updateFunc) // 用当前batch的数据区更新已有的数据
.print() // 打印前10个数据
scc.start() // 真正启动程序
scc.awaitTermination() //阻塞等待
}
val updateFunc = (currentValues: Seq[Int], preValue: Option[Int]) => {
val curr = currentValues.sum
val pre = preValue.getOrElse(0)
Some(curr + pre)
}
def createStream(scc: StreamingContext, kafkaParam: Map[String, String], topics: Set[String]) = {
KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](scc, kafkaParam, topics)
}
~~~
- 空白目录
- 第一章 Linux虚拟机安装
- 第二章 SSH配置
- 第三章 jdk配置
- 第四章 Hadoop配置-单机
- 第五章 Hadoop配置-集群
- 第六章 HDFS
- 第七章 MapReduce
- 7.1 MapReduce(上)
- 7.2 MapReduce(下)
- 7.3 MapReduce实验1 去重
- 7.4 MapReduce实验2 单例排序
- 7.5 MapReduce实验3 TopK
- 7.6 MapReduce实验4 倒排索引
- 第八章 Hive
- Hive安装
- 数据定义
- 数据操作
- 第九章 HBase
- 第十章 SaCa RealRec数据科学平台
- 第十一章 Spark Core
- 第十二章 Spark Streaming
- 第十章 Spark测试题