首页 最新 热门 推荐

  • 首页
  • 最新
  • 热门
  • 推荐

4.Spark Streaming:实时wordcount程序开发

  • 25-03-07 20:02
  • 2715
  • 8740
blog.csdn.net

1、安装nc工具:yum install nc,然后运行nc -lk 9999

2、开发实时wordcount程序

java版本

  1. package cn.spark.study.streaming;
  2. import java.util.Arrays;
  3. import org.apache.spark.SparkConf;
  4. import org.apache.spark.api.java.function.FlatMapFunction;
  5. import org.apache.spark.api.java.function.Function2;
  6. import org.apache.spark.api.java.function.PairFunction;
  7. import org.apache.spark.streaming.Durations;
  8. import org.apache.spark.streaming.api.java.JavaDStream;
  9. import org.apache.spark.streaming.api.java.JavaPairDStream;
  10. import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
  11. import org.apache.spark.streaming.api.java.JavaStreamingContext;
  12. import scala.Tuple2;
  13. /**
  14. * 实时wordcount程序
  15. * @author Administrator
  16. *
  17. */
  18. public class WordCount {
  19. public static void main(String[] args) throws Exception {
  20. // 创建SparkConf对象
  21. // 但是这里有一点不同,我们是要给它设置一个Master属性,但是我们测试的时候使用local模式
  22. // local后面必须跟一个方括号,里面填写一个数字,数字代表了,我们用几个线程来执行我们的
  23. // Spark Streaming程序
  24. SparkConf conf = new SparkConf()
  25. .setMaster("local[2]")
  26. .setAppName("WordCount");
  27. // 创建JavaStreamingContext对象
  28. // 该对象,就类似于Spark Core中的JavaSparkContext,就类似于Spark SQL中的SQLContext
  29. // 该对象除了接收SparkConf对象对象之外
  30. // 还必须接收一个batch interval参数,就是说,每收集多长时间的数据,划分为一个batch,进行处理
  31. // 这里设置一秒
  32. JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
  33. // 首先,创建输入DStream,代表了一个从数据源(比如kafka、socket)来的持续不断的实时数据流
  34. // 调用JavaStreamingContext的socketTextStream()方法,可以创建一个数据源为Socket网络端口的
  35. // 数据流,JavaReceiverInputStream,代表了一个输入的DStream
  36. // socketTextStream()方法接收两个基本参数,第一个是监听哪个主机上的端口,第二个是监听哪个端口
  37. JavaReceiverInputDStream lines = jssc.socketTextStream("localhost", 9999);
  38. // 到这里为止,你可以理解为JavaReceiverInputDStream中的,每隔一秒,会有一个RDD,其中封装了
  39. // 这一秒发送过来的数据
  40. // RDD的元素类型为String,即一行一行的文本
  41. // 所以,这里JavaReceiverInputStream的泛型类型,其实就代表了它底层的RDD的泛型类型
  42. // 开始对接收到的数据,执行计算,使用Spark Core提供的算子,执行应用在DStream中即可
  43. // 在底层,实际上是会对DStream中的一个一个的RDD,执行我们应用在DStream上的算子
  44. // 产生的新RDD,会作为新DStream中的RDD
  45. JavaDStream words = lines.flatMap(new FlatMapFunction() {
  46. private static final long serialVersionUID = 1L;
  47. @Override
  48. public Iterable call(String line) throws Exception {
  49. return Arrays.asList(line.split(" "));
  50. }
  51. });
  52. // 这个时候,每秒的数据,一行一行的文本,就会被拆分为多个单词,words DStream中的RDD的元素类型
  53. // 即为一个一个的单词
  54. // 接着,开始进行flatMap、reduceByKey操作
  55. JavaPairDStream pairs = words.mapToPair(
  56. new PairFunction() {
  57. private static final long serialVersionUID = 1L;
  58. @Override
  59. public Tuple2 call(String word)
  60. throws Exception {
  61. return new Tuple2(word, 1);
  62. }
  63. });
  64. // 这里,正好说明一下,其实大家可以看到,用Spark Streaming开发程序,和Spark Core很相像
  65. // 唯一不同的是Spark Core中的JavaRDD、JavaPairRDD,都变成了JavaDStream、JavaPairDStream
  66. JavaPairDStream wordCounts = pairs.reduceByKey(
  67. new Function2() {
  68. private static final long serialVersionUID = 1L;
  69. @Override
  70. public Integer call(Integer v1, Integer v2) throws Exception {
  71. return v1 + v2;
  72. }
  73. });
  74. // 到此为止,我们就实现了实时的wordcount程序了
  75. // 大家总结一下思路,加深一下印象
  76. // 每秒中发送到指定socket端口上的数据,都会被lines DStream接收到
  77. // 然后lines DStream会把每秒的数据,也就是一行一行的文本,诸如hello world,封装为一个RDD
  78. // 然后呢,就会对每秒中对应的RDD,执行后续的一系列的算子操作
  79. // 比如,对lines RDD执行了flatMap之后,得到一个words RDD,作为words DStream中的一个RDD
  80. // 以此类推,直到生成最后一个,wordCounts RDD,作为wordCounts DStream中的一个RDD
  81. // 此时,就得到了,每秒钟发送过来的数据的单词统计
  82. // 但是,一定要注意,Spark Streaming的计算模型,就决定了,我们必须自己来进行中间缓存的控制
  83. // 比如写入redis等缓存
  84. // 它的计算模型跟Storm是完全不同的,storm是自己编写的一个一个的程序,运行在节点上,相当于一个
  85. // 一个的对象,可以自己在对象中控制缓存
  86. // 但是Spark本身是函数式编程的计算模型,所以,比如在words或pairs DStream中,没法在实例变量中
  87. // 进行缓存
  88. // 此时就只能将最后计算出的wordCounts中的一个一个的RDD,写入外部的缓存,或者持久化DB
  89. // 最后,每次计算完,都打印一下这一秒钟的单词计数情况
  90. // 并休眠5秒钟,以便于我们测试和观察
  91. try
  92. {
  93. Thread.sleep(5000);
  94. } catch (Exception e) {
  95. // TODO: handle exception
  96. }
  97. wordCounts.print();
  98. // 首先对JavaSteamingContext进行一下后续处理
  99. // 必须调用JavaStreamingContext的start()方法,整个Spark Streaming Application才会启动执行
  100. // 否则是不会执行的
  101. jssc.start();
  102. jssc.awaitTermination();
  103. jssc.close();
  104. }
  105. }

scala版本

package cn.spark.study.streaming
 
import org.apache.spark.SparkConf;
import org.apache.spark.streaming;
import org.apache.spark.streaming.Seconds;
import org.apache.spark.streaming.StreamingContext;
 
object WordCount {
  def main(args:Array[String])
  {
    val conf = new SparkConf()
      .setMaster("local[2]")
      .setAppName("WordCount")
    // Scala中,创建的是StreamingContext
    val ssc = new StreamingContext(conf, Seconds(1))
    
    val lines = ssc.socketTextStream("localhost", 9999)
    val words = lines.flatMap{ _.split(" ")}
    val pairs = words.map{ word => (word, 1)}
    val wordCounts = pairs.reduceByKey(_ + _)
    
    Thread.sleep(5000)
    wordCounts.print()
    
    ssc.start()
    ssc.awaitTermination()
  }
}

运行步骤:

打包,上传到linux中;编写spark-submit脚本;运行脚本;启动nc

 

运行结果:

 


文章最后,给大家推荐一些受欢迎的技术博客链接:

  1. Hadoop相关技术博客链接
  2. Spark 核心技术链接
  3. JAVA相关的深度技术博客链接
  4. 超全干货--Flink思维导图,花了3周左右编写、校对
  5. 深入JAVA 的JVM核心原理解决线上各种故障【附案例】
  6. 请谈谈你对volatile的理解?--最近小李子与面试官的一场“硬核较量”
  7. 聊聊RPC通信,经常被问到的一道面试题。源码+笔记,包懂

 


欢迎扫描下方的二维码或 搜索 公众号“10点进修”,我们会有更多、且及时的资料推送给您,欢迎多多交流!

                                           

       

 

注:本文转载自blog.csdn.net的不埋雷的探长的文章"https://blog.csdn.net/weixin_32265569/article/details/78555654"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
复制链接
复制链接
相关推荐
发表评论
登录后才能发表评论和回复 注册

/ 登录

评论记录:

未查询到任何数据!
回复评论:

分类栏目

后端 (14832) 前端 (14280) 移动开发 (3760) 编程语言 (3851) Java (3904) Python (3298) 人工智能 (10119) AIGC (2810) 大数据 (3499) 数据库 (3945) 数据结构与算法 (3757) 音视频 (2669) 云原生 (3145) 云平台 (2965) 前沿技术 (2993) 开源 (2160) 小程序 (2860) 运维 (2533) 服务器 (2698) 操作系统 (2325) 硬件开发 (2492) 嵌入式 (2955) 微软技术 (2769) 软件工程 (2056) 测试 (2865) 网络空间安全 (2948) 网络与通信 (2797) 用户体验设计 (2592) 学习和成长 (2593) 搜索 (2744) 开发工具 (7108) 游戏 (2829) HarmonyOS (2935) 区块链 (2782) 数学 (3112) 3C硬件 (2759) 资讯 (2909) Android (4709) iOS (1850) 代码人生 (3043) 阅读 (2841)

热门文章

101
推荐
关于我们 隐私政策 免责声明 联系我们
Copyright © 2020-2025 蚁人论坛 (iYenn.com) All Rights Reserved.
Scroll to Top