首页 最新 热门 推荐

  • 首页
  • 最新
  • 热门
  • 推荐

13.window滑动窗口以及热点搜索词滑动统计案例实战

  • 25-03-07 20:21
  • 4189
  • 7703
blog.csdn.net

window滑动窗口

Spark Streaming提供了滑动窗口操作的支持,从而让我们可以对一个滑动窗口内的数据执行计算操作。每次掉落在窗口内的RDD的数据,会被聚合起来执行计算操作,然后生成的RDD,会作为window DStream的一个RDD。比如下图中,就是对每三秒钟的数据执行一次滑动窗口计算,这3秒内的3个RDD会被聚合起来进行处理,然后过了两秒钟,又会对最近三秒内的数据执行滑动窗口计算。所以每个滑动窗口操作,都必须指定两个参数,窗口长度以及滑动间隔,而且这两个参数值都必须是batch间隔的整数倍。(Spark Streaming对滑动窗口的支持,是比Storm更加完善和强大的)

Transform

意义

window

对每个滑动窗口的数据执行自定义的计算

countByWindow

对每个滑动窗口的数据执行count操作

reduceByWindow

对每个滑动窗口的数据执行reduce操作

reduceByKeyAndWindow

对每个滑动窗口的数据执行reduceByKey操作

countByValueAndWindow

对每个滑动窗口的数据执行countByValue操作

案例:热点搜索词滑动统计,每隔10秒钟,统计最近60秒钟的搜索词的搜索频次,并打印出排名最靠前的3个搜索词以及出现次数

 

java版本:

 

  1. package cn.spark.study.streaming;
  2. import java.util.List;
  3. import org.apache.spark.SparkConf;
  4. import org.apache.spark.api.java.JavaPairRDD;
  5. import org.apache.spark.api.java.function.Function;
  6. import org.apache.spark.api.java.function.Function2;
  7. import org.apache.spark.api.java.function.PairFunction;
  8. import org.apache.spark.streaming.Durations;
  9. import org.apache.spark.streaming.api.java.JavaDStream;
  10. import org.apache.spark.streaming.api.java.JavaPairDStream;
  11. import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
  12. import org.apache.spark.streaming.api.java.JavaStreamingContext;
  13. import scala.Tuple2;
  14. /**
  15. * 基于滑动窗口的热点搜索词实时统计
  16. * @author Administrator
  17. *
  18. */
  19. public class WindowHotWord {
  20. public static void main(String[] args) {
  21. SparkConf conf = new SparkConf()
  22. .setMaster("local[2]")
  23. .setAppName("WindowHotWord");
  24. JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
  25. // 说明一下,这里的搜索日志的格式
  26. // leo hello
  27. // tom world
  28. JavaReceiverInputDStream searchLogsDStream = jssc.socketTextStream("spark1", 9999);
  29. // 将搜索日志给转换成,只有一个搜索词,即可
  30. JavaDStream searchWordsDStream = searchLogsDStream.map(new Function() {
  31. private static final long serialVersionUID = 1L;
  32. @Override
  33. public String call(String searchLog) throws Exception {
  34. return searchLog.split(" ")[1];
  35. }
  36. });
  37. // 将搜索词映射为(searchWord, 1)的tuple格式
  38. JavaPairDStream searchWordPairDStream = searchWordsDStream.mapToPair(
  39. new PairFunction() {
  40. private static final long serialVersionUID = 1L;
  41. @Override
  42. public Tuple2 call(String searchWord)
  43. throws Exception {
  44. return new Tuple2(searchWord, 1);
  45. }
  46. });
  47. // 针对(searchWord, 1)的tuple格式的DStream,执行reduceByKeyAndWindow,滑动窗口操作
  48. // 第二个参数,是窗口长度,这里是60秒
  49. // 第三个参数,是滑动间隔,这里是10秒
  50. // 也就是说,每隔10秒钟,将最近60秒的数据,作为一个窗口,进行内部的RDD的聚合,然后统一对一个RDD进行后续
  51. // 计算
  52. // 所以说,这里的意思,就是,之前的searchWordPairDStream为止,其实,都是不会立即进行计算的
  53. // 而是只是放在那里
  54. // 然后,等待我们的滑动间隔到了以后,10秒钟到了,会将之前60秒的RDD,因为一个batch间隔是,5秒,所以之前
  55. // 60秒,就有12个RDD,给聚合起来,然后,统一执行redcueByKey操作
  56. // 所以这里的reduceByKeyAndWindow,是针对每个窗口执行计算的,而不是针对某个DStream中的RDD
  57. JavaPairDStream searchWordCountsDStream =
  58. searchWordPairDStream.reduceByKeyAndWindow(new Function2() {
  59. private static final long serialVersionUID = 1L;
  60. @Override
  61. public Integer call(Integer v1, Integer v2) throws Exception {
  62. return v1 + v2;
  63. }
  64. }, Durations.seconds(60), Durations.seconds(10));
  65. // 到这里为止,就已经可以做到,每隔10秒钟,出来,之前60秒的收集到的单词的统计次数
  66. // 执行transform操作,因为,一个窗口,就是一个60秒钟的数据,会变成一个RDD,然后,对这一个RDD
  67. // 根据每个搜索词出现的频率进行排序,然后获取排名前3的热点搜索词
  68. JavaPairDStream finalDStream = searchWordCountsDStream.transformToPair(
  69. new Function, JavaPairRDD>() {
  70. private static final long serialVersionUID = 1L;
  71. @Override
  72. public JavaPairRDD call(
  73. JavaPairRDD searchWordCountsRDD) throws Exception {
  74. // 执行搜索词和出现频率的反转
  75. JavaPairRDD countSearchWordsRDD = searchWordCountsRDD
  76. .mapToPair(new PairFunction, Integer, String>() {
  77. private static final long serialVersionUID = 1L;
  78. @Override
  79. public Tuple2 call(
  80. Tuple2 tuple)
  81. throws Exception {
  82. return new Tuple2(tuple._2, tuple._1);
  83. }
  84. });
  85. // 然后执行降序排序
  86. JavaPairRDD sortedCountSearchWordsRDD = countSearchWordsRDD
  87. .sortByKey(false);
  88. // 然后再次执行反转,变成(searchWord, count)的这种格式
  89. JavaPairRDD sortedSearchWordCountsRDD = sortedCountSearchWordsRDD
  90. .mapToPair(new PairFunction, String, Integer>() {
  91. private static final long serialVersionUID = 1L;
  92. @Override
  93. public Tuple2 call(
  94. Tuple2 tuple)
  95. throws Exception {
  96. return new Tuple2(tuple._2, tuple._1);
  97. }
  98. });
  99. // 然后用take(),获取排名前3的热点搜索词
  100. List> hogSearchWordCounts =
  101. sortedSearchWordCountsRDD.take(3);
  102. for(Tuple2 wordCount : hogSearchWordCounts) {
  103. System.out.println(wordCount._1 + ": " + wordCount._2);
  104. }
  105. return searchWordCountsRDD;
  106. }
  107. });
  108. // 这个无关紧要,只是为了触发job的执行,所以必须有output操作
  109. finalDStream.print();
  110. jssc.start();
  111. jssc.awaitTermination();
  112. jssc.close();
  113. }
  114. }


scala版本:

package cn.spark.study.streaming


import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds


/**
 * @author Administrator
 */
object WindowHotWord {
  
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
        .setMaster("local[2]")  
        .setAppName("WindowHotWord")
    val ssc = new StreamingContext(conf, Seconds(1))
    
    val searchLogsDStream = ssc.socketTextStream("spark1", 9999)  
    val searchWordsDStream = searchLogsDStream.map { _.split(" ")(1) }  
    val searchWordPairsDStream = searchWordsDStream.map { searchWord => (searchWord, 1) }  
    val searchWordCountsDSteram = searchWordPairsDStream.reduceByKeyAndWindow(
        (v1: Int, v2: Int) => v1 + v2, 
        Seconds(60), 
        Seconds(10))  
        
    val finalDStream = searchWordCountsDSteram.transform(searchWordCountsRDD => {
      val countSearchWordsRDD = searchWordCountsRDD.map(tuple => (tuple._2, tuple._1))  
      val sortedCountSearchWordsRDD = countSearchWordsRDD.sortByKey(false)  
      val sortedSearchWordCountsRDD = sortedCountSearchWordsRDD.map(tuple => (tuple._1, tuple._2))
      
      val top3SearchWordCounts = sortedSearchWordCountsRDD.take(3)
      for(tuple <- top3SearchWordCounts) {
        println(tuple)
      }
      
      searchWordCountsRDD
    })
    
    finalDStream.print()
    
    ssc.start()
    ssc.awaitTermination()
  }
}


运行步骤:
1.启动nc 
nc -lk 9999
2.本地运行,直接在IDE中运行程序

运行结果:

 

 


文章最后,给大家推荐一些受欢迎的技术博客链接:

  1. Hadoop相关技术博客链接
  2. Spark 核心技术链接
  3. JAVA相关的深度技术博客链接
  4. 超全干货--Flink思维导图,花了3周左右编写、校对
  5. 深入JAVA 的JVM核心原理解决线上各种故障【附案例】
  6. 请谈谈你对volatile的理解?--最近小李子与面试官的一场“硬核较量”
  7. 聊聊RPC通信,经常被问到的一道面试题。源码+笔记,包懂

 


欢迎扫描下方的二维码或 搜索 公众号“10点进修”,我们会有更多、且及时的资料推送给您,欢迎多多交流!

                                           

       

 

注:本文转载自blog.csdn.net的不埋雷的探长的文章"https://blog.csdn.net/weixin_32265569/article/details/78571519"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
复制链接
复制链接
相关推荐
发表评论
登录后才能发表评论和回复 注册

/ 登录

评论记录:

未查询到任何数据!
回复评论:

分类栏目

后端 (14832) 前端 (14280) 移动开发 (3760) 编程语言 (3851) Java (3904) Python (3298) 人工智能 (10119) AIGC (2810) 大数据 (3499) 数据库 (3945) 数据结构与算法 (3757) 音视频 (2669) 云原生 (3145) 云平台 (2965) 前沿技术 (2993) 开源 (2160) 小程序 (2860) 运维 (2533) 服务器 (2698) 操作系统 (2325) 硬件开发 (2492) 嵌入式 (2955) 微软技术 (2769) 软件工程 (2056) 测试 (2865) 网络空间安全 (2948) 网络与通信 (2797) 用户体验设计 (2592) 学习和成长 (2593) 搜索 (2744) 开发工具 (7108) 游戏 (2829) HarmonyOS (2935) 区块链 (2782) 数学 (3112) 3C硬件 (2759) 资讯 (2909) Android (4709) iOS (1850) 代码人生 (3043) 阅读 (2841)

热门文章

101
推荐
关于我们 隐私政策 免责声明 联系我们
Copyright © 2020-2025 蚁人论坛 (iYenn.com) All Rights Reserved.
Scroll to Top