首页 最新 热门 推荐

  • 首页
  • 最新
  • 热门
  • 推荐

11.updateStateByKey以及基于缓存的实时wordcount程序

  • 25-03-07 20:21
  • 4722
  • 11182
blog.csdn.net

updateStateByKey

updateStateByKey操作,可以让我们为每个key维护一份state,并持续不断的更新该state。

1、首先,要定义一个state,可以是任意的数据类型;

2、其次,要定义state更新函数——指定一个函数如何使用之前的state和新值来更新state。

对于每个batch,Spark都会为每个之前已经存在的key去应用一次state更新函数,无论这个key在batch中是否有新的数据。如果state更新函数返回none,那么key对应的state就会被删除。

当然,对于每个新出现的key,也会执行state更新函数。

注意,updateStateByKey操作,要求必须开启Checkpoint机制。

案例:基于缓存的实时wordcount程序(在实际业务场景中,这个是非常有用的)

java版本代码:

  1. package cn.spark.study.streaming;
  2. import java.util.Arrays;
  3. import java.util.List;
  4. import org.apache.spark.SparkConf;
  5. import org.apache.spark.api.java.function.FlatMapFunction;
  6. import org.apache.spark.api.java.function.Function2;
  7. import org.apache.spark.api.java.function.PairFunction;
  8. import org.apache.spark.streaming.Durations;
  9. import org.apache.spark.streaming.api.java.JavaDStream;
  10. import org.apache.spark.streaming.api.java.JavaPairDStream;
  11. import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
  12. import org.apache.spark.streaming.api.java.JavaStreamingContext;
  13. import com.google.common.base.Optional;
  14. import scala.Tuple2;
  15. /**
  16. * 基于updateStateByKey算子实现缓存机制的实时wordcount程序
  17. * @author Administrator
  18. *
  19. */
  20. public class UpdateStateByKeyWordCount {
  21. public static void main(String[] args) {
  22. SparkConf conf = new SparkConf()
  23. .setMaster("local[2]")
  24. .setAppName("UpdateStateByKeyWordCount");
  25. JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
  26. // 第一点,如果要使用updateStateByKey算子,就必须设置一个checkpoint目录,开启checkpoint机制
  27. // 这样的话才能把每个key对应的state除了在内存中有,那么是不是也要checkpoint一份
  28. // 因为你要长期保存一份key的state的话,那么spark streaming是要求必须用checkpoint的,以便于在
  29. // 内存数据丢失的时候,可以从checkpoint中恢复数据
  30. // 开启checkpoint机制,很简单,只要调用jssc的checkpoint()方法,设置一个hdfs目录即可
  31. jssc.checkpoint("hdfs://spark1:9000/wordcount_checkpoint");
  32. // 然后先实现基础的wordcount逻辑
  33. JavaReceiverInputDStream lines = jssc.socketTextStream("localhost", 9999);
  34. JavaDStream words = lines.flatMap(new FlatMapFunction() {
  35. private static final long serialVersionUID = 1L;
  36. @Override
  37. public Iterable call(String line) throws Exception {
  38. return Arrays.asList(line.split(" "));
  39. }
  40. });
  41. JavaPairDStream pairs = words.mapToPair(
  42. new PairFunction() {
  43. private static final long serialVersionUID = 1L;
  44. @Override
  45. public Tuple2 call(String word)
  46. throws Exception {
  47. return new Tuple2(word, 1);
  48. }
  49. });
  50. // 到了这里,就不一样了,之前的话,是不是直接就是pairs.reduceByKey
  51. // 然后,就可以得到每个时间段的batch对应的RDD,计算出来的单词计数
  52. // 然后,可以打印出那个时间段的单词计数
  53. // 但是,有个问题,你如果要统计每个单词的全局的计数呢?
  54. // 就是说,统计出来,从程序启动开始,到现在为止,一个单词出现的次数,那么就之前的方式就不好实现
  55. // 就必须基于redis这种缓存,或者是mysql这种db,来实现累加
  56. // 但是,我们的updateStateByKey,就可以实现直接通过Spark维护一份每个单词的全局的统计次数
  57. JavaPairDStream wordCounts = pairs.updateStateByKey(
  58. // 这里的Optional,相当于Scala中的样例类,就是Option,可以这么理解
  59. // 它代表了一个值的存在状态,可能存在,也可能不存在
  60. new Function2, Optional, Optional>() {
  61. private static final long serialVersionUID = 1L;
  62. // 这里两个参数
  63. // 实际上,对于每个单词,每次batch计算的时候,都会调用这个函数
  64. // 第一个参数,values,相当于是这个batch中,这个key的新的值,可能有多个吧
  65. // 比如说一个hello,可能有2个1,(hello, 1) (hello, 1),那么传入的是(1,1)
  66. // 第二个参数,就是指的是这个key之前的状态,state,其中泛型的类型是你自己指定的
  67. @Override
  68. public Optional call(List values,
  69. Optional state) throws Exception {
  70. // 首先定义一个全局的单词计数
  71. Integer newValue = 0;
  72. // 其次,判断,state是否存在,如果不存在,说明是一个key第一次出现
  73. // 如果存在,说明这个key之前已经统计过全局的次数了
  74. if(state.isPresent()) {
  75. newValue = state.get();
  76. }
  77. // 接着,将本次新出现的值,都累加到newValue上去,就是一个key目前的全局的统计
  78. // 次数
  79. for(Integer value : values) {
  80. newValue += value;
  81. }
  82. return Optional.of(newValue);
  83. }
  84. });
  85. // 到这里为止,相当于是,每个batch过来是,计算到pairs DStream,就会执行全局的updateStateByKey
  86. // 算子,updateStateByKey返回的JavaPairDStream,其实就代表了每个key的全局的计数
  87. // 打印出来
  88. wordCounts.print();
  89. jssc.start();
  90. jssc.awaitTermination();
  91. jssc.close();
  92. }
  93. }

scala版本代码:

 

package cn.spark.study.streaming
 
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
 
/**
 * @author Administrator
 */
object UpdateStateByKeyWordCount {
  
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
        .setMaster("local[2]")  
        .setAppName("UpdateStateByKeyWordCount")
    val ssc = new StreamingContext(conf, Seconds(5))
    ssc.checkpoint("hdfs://spark1:9000/wordcount_checkpoint")  
    
    val lines = ssc.socketTextStream("spark1", 9999)
    val words = lines.flatMap { _.split(" ") }   
    val pairs = words.map { word => (word, 1) }
    val wordCounts = pairs.updateStateByKey((values: Seq[Int], state: Option[Int]) => {
      var newValue = state.getOrElse(0)    
      for(value <- values) {
        newValue += value
      }
      Option(newValue)  
    })
    
    wordCounts.print()  
    
    ssc.start()
    ssc.awaitTermination()
  }
}

运行步骤:

1.在hdfs文件系统创建wordcount_checkpoint

hadoop fs -mkdir /wordcount_checkpoint

2.启动nc

nc -lk 9999

3.启动运行程序

运行结果:

将启动到结束过程中的结果都进行缓存

 

 


文章最后,给大家推荐一些受欢迎的技术博客链接:

  1. Hadoop相关技术博客链接
  2. Spark 核心技术链接
  3. JAVA相关的深度技术博客链接
  4. 超全干货--Flink思维导图,花了3周左右编写、校对
  5. 深入JAVA 的JVM核心原理解决线上各种故障【附案例】
  6. 请谈谈你对volatile的理解?--最近小李子与面试官的一场“硬核较量”
  7. 聊聊RPC通信,经常被问到的一道面试题。源码+笔记,包懂

 


欢迎扫描下方的二维码或 搜索 公众号“10点进修”,我们会有更多、且及时的资料推送给您,欢迎多多交流!

                                           

       

 

注:本文转载自blog.csdn.net的不埋雷的探长的文章"https://blog.csdn.net/weixin_32265569/article/details/78571456"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
复制链接
复制链接
相关推荐
发表评论
登录后才能发表评论和回复 注册

/ 登录

评论记录:

未查询到任何数据!
回复评论:

分类栏目

后端 (14832) 前端 (14280) 移动开发 (3760) 编程语言 (3851) Java (3904) Python (3298) 人工智能 (10119) AIGC (2810) 大数据 (3499) 数据库 (3945) 数据结构与算法 (3757) 音视频 (2669) 云原生 (3145) 云平台 (2965) 前沿技术 (2993) 开源 (2160) 小程序 (2860) 运维 (2533) 服务器 (2698) 操作系统 (2325) 硬件开发 (2492) 嵌入式 (2955) 微软技术 (2769) 软件工程 (2056) 测试 (2865) 网络空间安全 (2948) 网络与通信 (2797) 用户体验设计 (2592) 学习和成长 (2593) 搜索 (2744) 开发工具 (7108) 游戏 (2829) HarmonyOS (2935) 区块链 (2782) 数学 (3112) 3C硬件 (2759) 资讯 (2909) Android (4709) iOS (1850) 代码人生 (3043) 阅读 (2841)

热门文章

101
推荐
关于我们 隐私政策 免责声明 联系我们
Copyright © 2020-2025 蚁人论坛 (iYenn.com) All Rights Reserved.
Scroll to Top