实验目的
1.了解Spark Streaming的框架结构
2.准确理解Spark Streaming的实现原理
3.熟练掌握Spark Streaming进行WordCount的实验流程
实验原理
Spark是一个类似于MapReduce的分布式计算框架,其核心是弹性分布式数据集,提供了比MapReduce更丰富的模型,可以快速在内存中对数据集进行多次迭代,以支持复杂的数据挖掘算法和图形计算算法。Spark Streaming是一种构建在Spark上的实时计算框架,它扩展了Spark处理大规模流式数据的能力。
1.Spark Streaming的优势
(1)能运行在100个以上的结点上,并达到秒级延迟;
(2)使用基于内存的Spark作为执行引擎,具有高效和容错的特性;
(3)能集成Spark的批处理和交互查询;
(4)为实现复杂的算法提供了与批处理类似的简单接口。
2.基于Spark on Yarn的Spark Streaming总体架构如下图所示:
Spark on Yarn启动后,由Spark AppMaster把Receiver作为一个Task提交给某一个Spark Executor;Receive启动后输入数据,生成数据块,然后通知Spark AppMaster;Spark AppMaster会根据数据块生成相应的Job,并把Job的Task提交给空闲Spark Executor 执行。图中粗箭头显示被处理的数据流,输入数据流可以是磁盘、网络和HDFS等,输出可以是HDFS,数据库等。
3.Spark Streaming的基本原理
将输入数据流以时间片(秒级)为单位进行拆分,然后以类似批处理的方式处理每个时间片数据,其基本原理如下图所示。
首先,Spark Streaming把实时输入数据流以时间片Δt (如1秒)为单位切分成块。Spark Streaming会把每块数据作为一个RDD,并使用RDD操作处理每一小块数据。每个块都会生成一个Spark Job处理,最终结果也返回多块。
4.Spark Streaming内部实现原理
使用Spark Streaming编写的程序与编写Spark程序非常相似,在Spark程序中,主要通过操作RDD(Resilient Distributed Datasets弹性分布式数据集)提供的接口,如map、reduce、filter等,实现数据的批处理。而在Spark Streaming中,则通过操作DStream(表示数据流的RDD序列)提供的接口,这些接口和RDD提供的接口类似。
实验环境
Linux Ubuntu 16.04
jdk-7u75-linux-x64
scala-2.10.5
spark-1.6.0-bin-hadoop2.6
hadoop-2.6.0-cdh5.4.5
hadoop-2.6.0-eclipse-cdh5.4.5.jar
eclipse-java-juno-SR2-linux-gtk-x86_64
实验内容
下图为项目的流程图,通过nc命令,向9999端口持续发送消息,并使用spark streaming对从9999端口发来的数据进行统计,将统计的结果输出到console界面上。
实验步骤
1.使用jps查看HDFS以及Spark是否已经启动,若未启动,则切换对应目录下,启动Hadoop及Spark。
- jps
- cd /apps/hadoop/sbin
- ./start-dfs.sh
- cd /apps/spark/sbin
- ./start-all.sh
2.使用nc命令,向9999端口,发送数据。
- nc -lk 9999
nc 为netcat,一般多用于在局域网之间传输文件。在执行nc -lk 9999后,界面会进入持续等待输入内容状态。我们可以随便输入一些文字,来作为输入,发给9999端口。
3.打开一个新的连接窗口,并切换目录到/apps/spark目录下,调用spark的example中,自带的wordcount程序。
- cd /apps/spark
- bin/run-example org.apache.spark.examples.streaming.JavaNetworkWordCount localhost 9999
这里,可以调用example中,使用java程序编写的wordcount。在这里localhost和9999为spark steaming接收数据的主机和端口。在此作为参数,传递给wordcount程序。
可以看到wordcount程序,一直等待接受数据。
4.在执行nc -lk 9999的界面中,输入一些文本,并按回车。
- hello spark streaming hello hadoop
再次切换到执行wordcount程序的终端界面,可以看到spark streaming对输入的数据,进行统计,得到单词个数如下:
此程序可以表明,Spark Streaming数据处理流程。
在两个终端界面中,输入CTRL+C,可以终止程序的运行。
5.使用Spark Streaming的Scala API编写wordcount程序,以实现对单词个数的统计。
首先在Linux本地,新建/data/spark7目录,用于存放所需文件。
- mkdir -p /data/spark7
切换目录到/data/spark7下,使用wget命令,下载项目所需jar包spark-assembly-1.6.0-hadoop2.6.0.jar。
- cd /data/spark7
- wget http://172.16.103.12:60000/allfiles/spark7/spark-assembly-1.6.0-hadoop2.6.0.jar
创建一个Scala项目,命名为spark7。
在spark7项目下创建包,包名为my.sparkstreaming。
在my.sparkstreaming包下创建Scala Object,名为NetworkWordCount。
6.右键单击项目,新建一个目录,名为spark7lib,用于存放项目所需的jar包
将/data/spark7目录下的spark-assembly-1.6.0-hadoop2.6.0.jar包,拷贝到eclipse中spark7项目的spark7lib目录下
选中spark7lib目录下所有jar包,单击右键,选择Build Path→Add to Build Path。
7.编写sparkstreaming的wordcount代码。
- if (args.length < 2) {
- System.err.println("Usage: NetworkWordCount
" - System.exit(1)
- }
首先传递两个参数,第一个参数为发送数据的地址,第二个参数为发送数据的端口号。
- val sparkConf = new SparkConf().setAppName("networkwordcount").setMaster("spark://localhost:7077")
创建一个sparkconf对象。
- val ssc = new StreamingContext(sparkConf, Seconds(1) )
设置监听数据的时间窗口为1分钟。
- val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
Spark接受数据,并对数据进行存储。
- val words = lines.flatMap(_.split(" "))
- val wordCounts = words.map( wd => (wd, 1)).reduceByKey( _ + _ )
- wordCounts.print();
对接收到数据,放置在一行,并对数据以空格分隔。进行map和reduce操作。得到每个单词出现的个数,并进行打印输出。
- ssc.start();
- ssc.awaitTermination();
最后这两行代码,是开始执行,前面所定义的Spark Streaming的任务。
完整代码如下:
- package my.sparkstreaming
- import org.apache.spark.SparkConf
- import org.apache.spark.streaming.StreamingContext
- import org.apache.spark.streaming.Seconds
- import org.apache.spark.storage.StorageLevel
- object NetworkWordCount {
- def main(args: Array[String]) {
- if (args.length < 2) {
- System.err.println("Usage: NetworkWordCount
" - System.exit(1)
- }
- val sparkConf = new SparkConf().setAppName("networkwordcount").setMaster("spark://localhost:7077")
- val ssc = new StreamingContext(sparkConf, Seconds(1) )
- val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
- val words = lines.flatMap(_.split(" "))
- val wordCounts = words.map( wd => (wd, 1)).reduceByKey( _ + _ )
- wordCounts.print();
- ssc.start();
- ssc.awaitTermination();
- }
- }
8.切换到Linux本地,执行nc命令,发送数据。
- nc -lk 9999
使用Spark Streaming处理发来的数据。
在NetworkWordCount.scala类上,单击右键选择Run As=>Run Configurations=>Arguments
在Program arguments后面的文本框中,输入执行nc命令发送数据的ip和端口。
- localhost 9999
然后点击Main,进入下面界面,查看项目名和主类名是否与程序的项目名和主类名对应,
若不对应,则在Project下面的文本框,输入本程序的项目名为
- spark7
在Main class下面的文本框中,输入本程序的包名.类名
- my.sparkstreaming.NetworkWordCount
点击Run 执行。
9.在nc窗口,输入数据"hello world hello hadoop"。
- hello word hello hadoop
可以看到程序的console界面,输出为:
实验结论及心得
1. 了解了Spark Streaming的框架结构,包括Spark Streaming的总体架构和内部实现原理。
2. 对Spark Streaming的实现原理有了准确的理解,了解了将输入数据流以时间片为单位拆分并使用RDD操作处理的基本原理。
3. 熟练掌握了使用Spark Streaming进行WordCount的实验流程。
4. Spark Streaming是一种强大的实时计算框架,通过扩展Spark的能力,可以快速处理大规模流式数据。
5. Spark Streaming的优势在于可以运行在大规模集群上并实现秒级延迟,同时结合了Spark的高效和容错特性。
6. 理解Spark Streaming的框架结构和实现原理对于正确使用和优化Spark Streaming应用程序非常重要。
7. 在实验中,学会了使用Spark Streaming进行WordCount实验的流程,这是Spark Streaming的入门应用,也是理解Spark Streaming基本原理的一种方式。
通过这次实验,对Spark Streaming有了更深入的了解,并且掌握了相关操作的基本流程。这将有助于在实际应用中更好地使用Spark Streaming进行实时计算和数据处理任务。
评论记录:
回复评论: