首页 最新 热门 推荐

  • 首页
  • 最新
  • 热门
  • 推荐
2025年5月9日 星期五 6:08am

9.输入DStream之Kafka数据源实战(基于Direct的方式)

  • 25-03-07 20:20
  • 3652
  • 14164
blog.csdn.net

基于Direct的方式

这种新的不基于Receiver的直接方式,是在Spark 1.3中引入的,从而能够确保更加健壮的机制。替代掉使用Receiver来接收数据后,这种方式会周期性地查询Kafka,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。

 

这种方式有如下优点:

1、简化并行读取:如果要读取多个partition,不需要创建多个输入DStream然后对它们进行union操作。Spark会创建跟Kafka partition一样多的RDD partition,并且会并行从Kafka中读取数据。所以在Kafka partition和RDD partition之间,有一个一对一的映射关系。

2、高性能:如果要保证零数据丢失,在基于receiver的方式中,需要开启WAL机制。这种方式其实效率低下,因为数据实际上被复制了两份,Kafka自己本身就有高可靠的机制,会对数据复制一份,而这里又会复制一份到WAL中。而基于direct的方式,不依赖Receiver,不需要开启WAL机制,只要Kafka中作了数据的复制,那么就可以通过Kafka的副本进行恢复。

3、一次且仅一次的事务机制:

    基于receiver的方式,是使用Kafka的高阶API来在ZooKeeper中保存消费过的offset的。这是消费Kafka数据的传统方式。这种方式配合着WAL机制可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为Spark和ZooKeeper之间可能是不同步的。

    基于direct的方式,使用kafka的简单api,Spark Streaming自己就负责追踪消费的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保证数据是消费一次且仅消费一次。

 JavaPairReceiverInputDStream<String, String> directKafkaStream =  KafkaUtils.createDirectStream(streamingContext, [key class], [value class], [key decoder class], [value decoder class],[map of Kafka parameters], [set of topics to consume]);

Kafka命令

bin/kafka-topics.sh --zookeeper 172.20.10.117:2181, 172.20.10.118:2181, 172.20.10.119:2181 --topic TestTopic --replication-factor 1 --partitions 1 --create
bin/kafka-console-producer.sh --broker-list 172.20.10.117:9092, 172.20.10.118:9092, 172.20.10.119:9092 --topic WordCount

metadata.broker.list

java版本代码

  1. package cn.spark.study.streaming;
  2. import java.util.Arrays;
  3. import java.util.HashMap;
  4. import java.util.HashSet;
  5. import java.util.Map;
  6. import java.util.Set;
  7. import kafka.serializer.StringDecoder;
  8. import org.apache.spark.SparkConf;
  9. import org.apache.spark.api.java.function.FlatMapFunction;
  10. import org.apache.spark.api.java.function.Function2;
  11. import org.apache.spark.api.java.function.PairFunction;
  12. import org.apache.spark.streaming.Durations;
  13. import org.apache.spark.streaming.api.java.JavaDStream;
  14. import org.apache.spark.streaming.api.java.JavaPairDStream;
  15. import org.apache.spark.streaming.api.java.JavaPairInputDStream;
  16. import org.apache.spark.streaming.api.java.JavaStreamingContext;
  17. import org.apache.spark.streaming.kafka.KafkaUtils;
  18. import scala.Tuple2;
  19. /**
  20. * 基于Kafka Direct方式的实时wordcount程序
  21. * @author Administrator
  22. *
  23. */
  24. public class KafkaDirectWordCount {
  25. public static void main(String[] args) {
  26. SparkConf conf = new SparkConf()
  27. .setMaster("local[2]")
  28. .setAppName("KafkaDirectWordCount");
  29. JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
  30. // 首先,要创建一份kafka参数map
  31. Map kafkaParams = new HashMap();
  32. kafkaParams.put("metadata.broker.list",
  33. "172.20.10.117:9092,172.20.10.118:9092,172.20.10.119:9092");
  34. // 然后,要创建一个set,里面放入,你要读取的topic
  35. // 这个,就是我们所说的,它自己给你做的很好,可以并行读取多个topic
  36. Set topics = new HashSet();
  37. topics.add("WordCount");
  38. // 创建输入DStream
  39. JavaPairInputDStream lines = KafkaUtils.createDirectStream(
  40. jssc,
  41. String.class,
  42. String.class,
  43. StringDecoder.class,
  44. StringDecoder.class,
  45. kafkaParams,
  46. topics);
  47. // 执行wordcount操作
  48. JavaDStream words = lines.flatMap(
  49. new FlatMapFunction, String>() {
  50. private static final long serialVersionUID = 1L;
  51. @Override
  52. public Iterable call(Tuple2 tuple)
  53. throws Exception {
  54. return Arrays.asList(tuple._2.split(" "));
  55. }
  56. });
  57. JavaPairDStream pairs = words.mapToPair(
  58. new PairFunction() {
  59. private static final long serialVersionUID = 1L;
  60. @Override
  61. public Tuple2 call(String word) throws Exception {
  62. return new Tuple2(word, 1);
  63. }
  64. });
  65. JavaPairDStream wordCounts = pairs.reduceByKey(
  66. new Function2() {
  67. private static final long serialVersionUID = 1L;
  68. @Override
  69. public Integer call(Integer v1, Integer v2) throws Exception {
  70. return v1 + v2;
  71. }
  72. });
  73. wordCounts.print();
  74. jssc.start();
  75. jssc.awaitTermination();
  76. jssc.close();
  77. }
  78. }

运行步骤及运行结果(java本地运行):

 

 


文章最后,给大家推荐一些受欢迎的技术博客链接:

  1. Hadoop相关技术博客链接
  2. Spark 核心技术链接
  3. JAVA相关的深度技术博客链接
  4. 超全干货--Flink思维导图,花了3周左右编写、校对
  5. 深入JAVA 的JVM核心原理解决线上各种故障【附案例】
  6. 请谈谈你对volatile的理解?--最近小李子与面试官的一场“硬核较量”
  7. 聊聊RPC通信,经常被问到的一道面试题。源码+笔记,包懂

 


欢迎扫描下方的二维码或 搜索 公众号“10点进修”,我们会有更多、且及时的资料推送给您,欢迎多多交流!

                                           

       

注:本文转载自blog.csdn.net的不埋雷的探长的文章"https://blog.csdn.net/weixin_32265569/article/details/78556276"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
复制链接
复制链接
相关推荐
发表评论
登录后才能发表评论和回复 注册

/ 登录

评论记录:

未查询到任何数据!
回复评论:

分类栏目

后端 (14832) 前端 (14280) 移动开发 (3760) 编程语言 (3851) Java (3904) Python (3298) 人工智能 (10119) AIGC (2810) 大数据 (3499) 数据库 (3945) 数据结构与算法 (3757) 音视频 (2669) 云原生 (3145) 云平台 (2965) 前沿技术 (2993) 开源 (2160) 小程序 (2860) 运维 (2533) 服务器 (2698) 操作系统 (2325) 硬件开发 (2492) 嵌入式 (2955) 微软技术 (2769) 软件工程 (2056) 测试 (2865) 网络空间安全 (2948) 网络与通信 (2797) 用户体验设计 (2592) 学习和成长 (2593) 搜索 (2744) 开发工具 (7108) 游戏 (2829) HarmonyOS (2935) 区块链 (2782) 数学 (3112) 3C硬件 (2759) 资讯 (2909) Android (4709) iOS (1850) 代码人生 (3043) 阅读 (2841)

热门文章

101
推荐
关于我们 隐私政策 免责声明 联系我们
Copyright © 2020-2024 蚁人论坛 (iYenn.com) All Rights Reserved.
Scroll to Top