首页 最新 热门 推荐

  • 首页
  • 最新
  • 热门
  • 推荐

14.DStream的output操作以及foreachRDD详解

  • 25-03-07 20:21
  • 3798
  • 12676
blog.csdn.net

output操作概览

Output

Meaning

print

打印每个batch中的前10个元素,主要用于测试,或者是不需要执行什么output操作时,用于简单触发一下job。

saveAsTextFile(prefix, [suffix])

将每个batch的数据保存到文件中。每个batch的文件的命名格式为:prefix-TIME_IN_MS[.suffix]

saveAsObjectFile

同上,但是将每个batch的数据以序列化对象的方式,保存到SequenceFile中。

saveAsHadoopFile

同上,将数据保存到Hadoop文件中

foreachRDD

最常用的output操作,遍历DStream中的每个产生的RDD,进行处理。可以将每个RDD中的数据写入外部存储,比如文件、数据库、缓存等。通常在其中,是针对RDD执行action操作的,比如foreach。

output操作

 

DStream中的所有计算,都是由output操作触发的,比如print()。如果没有任何output操作,那么,压根儿就不会执行定义的计算逻辑。

此外,即使你使用了foreachRDDoutput操作,也必须在里面对RDD执行action操作,才能触发对每一个batch的计算逻辑。否则,光有foreachRDD output操作,在里面没有对RDD执行action操作,也不会触发任何逻辑。

foreachRDD详解

通常在foreachRDD中,都会创建一个Connection,比如JDBC Connection,然后通过Connection将数据写入外部存储。

误区一:在RDD的foreach操作外部,创建Connection

这种方式是错误的,因为它会导致Connection对象被序列化后传输到每个Task中。而这种Connection对象,实际上一般是不支持序列化的,也就无法被传输。

  1. dstream.foreachRDD {rdd =>
  2.   val connection = createNewConnection()
  3.   rdd.foreach { record =>connection.send(record)
  4.   }
  5. }

误区二:在RDD的foreach操作内部,创建Connection

这种方式是可以的,但是效率低下。因为它会导致对于RDD中的每一条数据,都创建一个Connection对象。而通常来说,Connection的创建,是很消耗性能的。

  1. dstream.foreachRDD {rdd =>
  2.   rdd.foreach { record =>
  3.     val connection = createNewConnection()
  4.     connection.send(record)
  5.     connection.close()
  6.   }
  7. }

合理方式一:使用RDD的foreachPartition操作,并且在该操作内部,创建Connection对象,这样就相当于是,为RDD的每个partition创建一个Connection对象,节省资源的多了。

  1. dstream.foreachRDD {rdd =>
  2.   rdd.foreachPartition { partitionOfRecords=>
  3.     val connection = createNewConnection()
  4.     partitionOfRecords.foreach(record =>connection.send(record))
  5.     connection.close()
  6.   }
  7. }

合理方式二:自己手动封装一个静态连接池,使用RDD的foreachPartition操作,并且在该操作内部,从静态连接池中,通过静态方法,获取到一个连接,使用之后再还回去。这样的话,甚至在多个RDD的partition之间,也可以复用连接了。而且可以让连接池采取懒创建的策略,并且空闲一段时间后,将其释放掉。

  1. dstream.foreachRDD {rdd =>
  2.   rdd.foreachPartition { partitionOfRecords=>
  3.     val connection = ConnectionPool.getConnection()
  4.     partitionOfRecords.foreach(record =>connection.send(record))
  5.     ConnectionPool.returnConnection(connection) 
  6.   }
  7. }

foreachRDD实战

 

案例:改写UpdateStateByKeyWordCount,将每次统计出来的全局的单词计数,写入一份,到MySQL数据库中。

建表语句

  1. create table wordcount(
  2.   id integer auto_increment primary key,
  3.   updated_time timestamp NOT NULL defaultCURRENT_TIMESTAMP on update CURRENT_TIMESTAMP,
  4.   word varchar(255),
  5.   count integer
  6. );

java版本代码:

  1. package cn.spark.study.sql;
  2. import java.sql.Connection;
  3. import java.sql.DriverManager;
  4. import java.sql.Statement;
  5. import java.util.ArrayList;
  6. import java.util.HashMap;
  7. import java.util.List;
  8. import java.util.Map;
  9. import org.apache.spark.SparkConf;
  10. import org.apache.spark.api.java.JavaPairRDD;
  11. import org.apache.spark.api.java.JavaRDD;
  12. import org.apache.spark.api.java.JavaSparkContext;
  13. import org.apache.spark.api.java.function.Function;
  14. import org.apache.spark.api.java.function.PairFunction;
  15. import org.apache.spark.api.java.function.VoidFunction;
  16. import org.apache.spark.sql.DataFrame;
  17. import org.apache.spark.sql.Row;
  18. import org.apache.spark.sql.RowFactory;
  19. import org.apache.spark.sql.SQLContext;
  20. import org.apache.spark.sql.types.DataTypes;
  21. import org.apache.spark.sql.types.StructField;
  22. import org.apache.spark.sql.types.StructType;
  23. import scala.Tuple2;
  24. /**
  25. * JDBC数据源
  26. * @author leizq120310
  27. *
  28. */
  29. public class JDBCDataSource {
  30. public static void main(String[] args) {
  31. SparkConf conf = new SparkConf()
  32. .setAppName("JDBCDataSource");
  33. JavaSparkContext sc = new JavaSparkContext(conf);
  34. SQLContext sqlContext = new SQLContext(sc);
  35. // 总结一下
  36. // jdbc数据源
  37. // 首先,是通过SQLContext的read系列方法,将mysql中的数据加载为DataFrame
  38. // 然后可以将DataFrame转换为RDD,使用Spark Core提供的各种算子进行操作
  39. // 最后可以将得到的数据结果,通过foreach()算子,写入mysql、hbase、redis等等db/ cache中
  40. // 分别将mysql中两张表的数据加载为DataFrame
  41. Map options = new HashMap();
  42. options.put("url", "jdbc:mysql://spark1:3306/testdb");
  43. options.put("dbtable", "student_infos");
  44. DataFrame studentInfosDF = sqlContext.read().format("jdbc").options(options).load();
  45. options.clear();
  46. options.put("url", "jdbc:mysql://spark1:3306/testdb");
  47. options.put("dbtable", "student_scores");
  48. DataFrame studentScoresDF = sqlContext.read().format("jdbc").options(options).load();
  49. // 将两个DataFrame转换为JavaPairRDD,执行join操作
  50. JavaPairRDD> studentsRDD = studentInfosDF.javaRDD().mapToPair(new PairFunction() {
  51. private static final long serialVersionUID = 1L;
  52. @Override
  53. public Tuple2 call(Row row) throws Exception {
  54. return new Tuple2
  55. (row.getString(0), Integer.valueOf(String.valueOf(row.get(1))));
  56. }
  57. }).join(studentScoresDF.javaRDD().mapToPair(new PairFunction() {
  58. private static final long serialVersionUID = 1L;
  59. @Override
  60. public Tuple2 call(Row row) throws Exception {
  61. return new Tuple2
  62. (row.getString(0), Integer.valueOf(String.valueOf(row.get(1))));
  63. }
  64. }));
  65. // 将JavaPairRDD转换为JavaRDD
  66. JavaRDD studentRowsRDD = studentsRDD.map(new Function>, Row>() {
  67. private static final long serialVersionUID = 1L;
  68. @Override
  69. public Row call(Tuple2> tuple) throws Exception {
  70. return RowFactory.create(tuple._1, tuple._2._1, tuple._2._2);
  71. }
  72. });
  73. // 过滤出分数大于80分的数据
  74. JavaRDD filteredStudentRowsRDD = studentRowsRDD.filter(new Function() {
  75. private static final long serialVersionUID = 1L;
  76. @Override
  77. public Boolean call(Row row) throws Exception {
  78. if (row.getInt(2) > 80) {
  79. return true;
  80. }
  81. return false;
  82. }
  83. });
  84. // 转换为DataFrame
  85. List structFields = new ArrayList();
  86. structFields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
  87. structFields.add(DataTypes.createStructField("age", DataTypes.IntegerType, true));
  88. structFields.add(DataTypes.createStructField("score", DataTypes.IntegerType, true));
  89. StructType structType = DataTypes.createStructType(structFields);
  90. DataFrame studentsDF = sqlContext.createDataFrame(filteredStudentRowsRDD, structType);
  91. Row[] rows = studentsDF.collect();
  92. for (Row row : rows)
  93. {
  94. System.out.println(row);
  95. }
  96. // 将DataFrame中的数据保存到mysql表中
  97. // 这种方式是在企业里很常用的,有可能是插入mysql、有可能是插入hbase,还有可能是插入redis缓存
  98. studentsDF.javaRDD().foreach(new VoidFunction() {
  99. private static final long serialVersionUID = 1L;
  100. @Override
  101. public void call(Row row) throws Exception {
  102. String sql = "insert into good_student_infos values("
  103. + "'" + String.valueOf(row.getString(0)) + "',"
  104. + Integer.valueOf(String.valueOf(row.get(1))) + ","
  105. + Integer.valueOf(String.valueOf(row.get(2))) + ")";
  106. Class.forName("com.mysql.jdbc.Driver");
  107. Connection conn = null;
  108. Statement stmt = null;
  109. try{
  110. conn = DriverManager.getConnection(
  111. "jdbc:mysql://spark1:3306/testdb", "", "");
  112. stmt = conn.createStatement();
  113. stmt.executeUpdate(sql);
  114. } catch (Exception e) {
  115. e.printStackTrace();
  116. } finally{
  117. if (stmt != null){
  118. stmt.close();
  119. }
  120. if (conn != null){
  121. conn.close();
  122. }
  123. }
  124. }
  125. });
  126. sc.close();
  127. }
  128. }

操作步骤:

 

1.进入linux系统平台下的mysql操作中,创建wordcount表


文章最后,给大家推荐一些受欢迎的技术博客链接:

  1. Hadoop相关技术博客链接
  2. Spark 核心技术链接
  3. 超全干货--Flink思维导图,花了3周左右编写、校对
  4. 深入JAVA 的JVM核心原理解决线上各种故障【附案例】
  5. 请谈谈你对volatile的理解?--最近小李子与面试官的一场“硬核较量”
  6. 聊聊RPC通信,经常被问到的一道面试题。源码+笔记,包懂

 


欢迎扫描下方的二维码或 搜索 公众号“10点进修”,我们会有更多、且及时的资料推送给您,欢迎多多交流!

                                           

       

 

 

 

注:本文转载自blog.csdn.net的不埋雷的探长的文章"https://blog.csdn.net/weixin_32265569/article/details/78578776"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
复制链接
复制链接
相关推荐
发表评论
登录后才能发表评论和回复 注册

/ 登录

评论记录:

未查询到任何数据!
回复评论:

分类栏目

后端 (14832) 前端 (14280) 移动开发 (3760) 编程语言 (3851) Java (3904) Python (3298) 人工智能 (10119) AIGC (2810) 大数据 (3499) 数据库 (3945) 数据结构与算法 (3757) 音视频 (2669) 云原生 (3145) 云平台 (2965) 前沿技术 (2993) 开源 (2160) 小程序 (2860) 运维 (2533) 服务器 (2698) 操作系统 (2325) 硬件开发 (2492) 嵌入式 (2955) 微软技术 (2769) 软件工程 (2056) 测试 (2865) 网络空间安全 (2948) 网络与通信 (2797) 用户体验设计 (2592) 学习和成长 (2593) 搜索 (2744) 开发工具 (7108) 游戏 (2829) HarmonyOS (2935) 区块链 (2782) 数学 (3112) 3C硬件 (2759) 资讯 (2909) Android (4709) iOS (1850) 代码人生 (3043) 阅读 (2841)

热门文章

101
推荐
关于我们 隐私政策 免责声明 联系我们
Copyright © 2020-2025 蚁人论坛 (iYenn.com) All Rights Reserved.
Scroll to Top