首页 最新 热门 推荐

  • 首页
  • 最新
  • 热门
  • 推荐

8.输入DStream之Kafka数据源实战(基于Receiver的方式)

  • 25-03-07 20:20
  • 2535
  • 13787
blog.csdn.net

基于Receiver的方式

这种方式使用Receiver来获取数据。Receiver是使用Kafka的高层次Consumer API来实现的。receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的,然后Spark Streaming启动的job会去处理那些数据。

然而,在默认的配置下,这种方式可能会因为底层的失败而丢失数据。如果要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL)。该机制会同步地将接收到的Kafka数据写入分布式文件系统(比如HDFS)上的预写日志中。所以,即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复。

如何进行Kafka数据源连接

1、在maven添加依赖

groupId = org.apache.spark
artifactId = spark-streaming-kafka_2.10
version = 1.5.1

2、使用第三方工具类创建输入DStream

 JavaPairReceiverInputDStream<String, String> kafkaStream =
     KafkaUtils.createStream(streamingContext,
     [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume]);

需要注意的要点 

  1. Kafka中的topic的partition,与Spark中的RDD的partition是没有关系的。所以,在KafkaUtils.createStream()中,提高partition的数量,只会增加一个Receiver中,读取partition的线程的数量。不会增加Spark处理数据的并行度。
  2. 可以创建多个Kafka输入DStream,使用不同的consumer group和topic,来通过多个receiver并行接收数据。
  3. 如果基于容错的文件系统,比如HDFS,启用了预写日志机制,接收到的数据都会被复制一份到预写日志中。因此,在KafkaUtils.createStream()中,设置的持久化级别是StorageLevel.MEMORY_AND_DISK_SER。

Kafka命令(进入到cd  /usr/local/kafka)

// 创建topic

bin/kafka-topics.sh --zookeeper 172.20.10.117:2181,172.20.10.118:2181,172.20.10.119:2181 --topic MyTestWordCount --replication-factor 1 --partitions 1 --create

// 创建consumer

bin/kafka-console-producer.sh --broker-list 172.20.10.117:9092, 172.20.10.118:9092, 172.20.10.119:9092 --topic MyTestWordCount

java版本代码:

  1. package cn.spark.study.streaming;
  2. import java.util.Arrays;
  3. import java.util.HashMap;
  4. import java.util.Map;
  5. import org.apache.spark.SparkConf;
  6. import org.apache.spark.api.java.function.FlatMapFunction;
  7. import org.apache.spark.api.java.function.Function2;
  8. import org.apache.spark.api.java.function.PairFunction;
  9. import org.apache.spark.streaming.Durations;
  10. import org.apache.spark.streaming.api.java.JavaDStream;
  11. import org.apache.spark.streaming.api.java.JavaPairDStream;
  12. import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
  13. import org.apache.spark.streaming.api.java.JavaStreamingContext;
  14. import org.apache.spark.streaming.kafka.KafkaUtils;
  15. import scala.Tuple2;
  16. /**
  17. * 基于Kafka receiver方式的实时wordcount程序
  18. * @author Administrator
  19. *
  20. */
  21. public class KafkaReceiverWordCount {
  22. public static void main(String[] args) {
  23. SparkConf conf = new SparkConf()
  24. .setMaster("local[2]")
  25. .setAppName("KafkaWordCount");
  26. JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
  27. // 使用KafkaUtils.createStream()方法,创建针对Kafka的输入数据流
  28. Map topicThreadMap = new HashMap();
  29. topicThreadMap.put("MyTestWordCount", 1);
  30. JavaPairReceiverInputDStream lines = KafkaUtils.createStream(
  31. jssc,
  32. "172.20.10.117:2181,172.20.10.118:2181,172.20.10.119:2181",
  33. "DefaultConsumerGroup",
  34. topicThreadMap);
  35. // 然后开发wordcount逻辑
  36. JavaDStream words = lines.flatMap(
  37. new FlatMapFunction, String>() {
  38. private static final long serialVersionUID = 1L;
  39. @Override
  40. public Iterable call(Tuple2 tuple)
  41. throws Exception {
  42. return Arrays.asList(tuple._2.split(" "));
  43. }
  44. });
  45. JavaPairDStream pairs = words.mapToPair(
  46. new PairFunction() {
  47. private static final long serialVersionUID = 1L;
  48. @Override
  49. public Tuple2 call(String word)
  50. throws Exception {
  51. return new Tuple2(word, 1);
  52. }
  53. });
  54. JavaPairDStream wordCounts = pairs.reduceByKey(
  55. new Function2() {
  56. private static final long serialVersionUID = 1L;
  57. @Override
  58. public Integer call(Integer v1, Integer v2) throws Exception {
  59. return v1 + v2;
  60. }
  61. });
  62. wordCounts.print();
  63. jssc.start();
  64. jssc.awaitTermination();
  65. jssc.close();
  66. }
  67. }

运行步骤及运行结果(java本地运行):

 

 


文章最后,给大家推荐一些受欢迎的技术博客链接:

  1. Hadoop相关技术博客链接
  2. Spark 核心技术链接
  3. JAVA相关的深度技术博客链接
  4. 超全干货--Flink思维导图,花了3周左右编写、校对
  5. 深入JAVA 的JVM核心原理解决线上各种故障【附案例】
  6. 请谈谈你对volatile的理解?--最近小李子与面试官的一场“硬核较量”
  7. 聊聊RPC通信,经常被问到的一道面试题。源码+笔记,包懂

 


欢迎扫描下方的二维码或 搜索 公众号“10点进修”,我们会有更多、且及时的资料推送给您,欢迎多多交流!

                                           

       

 

注:本文转载自blog.csdn.net的不埋雷的探长的文章"https://blog.csdn.net/weixin_32265569/article/details/78556230"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
复制链接
复制链接
相关推荐
发表评论
登录后才能发表评论和回复 注册

/ 登录

评论记录:

未查询到任何数据!
回复评论:

分类栏目

后端 (14832) 前端 (14280) 移动开发 (3760) 编程语言 (3851) Java (3904) Python (3298) 人工智能 (10119) AIGC (2810) 大数据 (3499) 数据库 (3945) 数据结构与算法 (3757) 音视频 (2669) 云原生 (3145) 云平台 (2965) 前沿技术 (2993) 开源 (2160) 小程序 (2860) 运维 (2533) 服务器 (2698) 操作系统 (2325) 硬件开发 (2492) 嵌入式 (2955) 微软技术 (2769) 软件工程 (2056) 测试 (2865) 网络空间安全 (2948) 网络与通信 (2797) 用户体验设计 (2592) 学习和成长 (2593) 搜索 (2744) 开发工具 (7108) 游戏 (2829) HarmonyOS (2935) 区块链 (2782) 数学 (3112) 3C硬件 (2759) 资讯 (2909) Android (4709) iOS (1850) 代码人生 (3043) 阅读 (2841)

热门文章

101
推荐
关于我们 隐私政策 免责声明 联系我们
Copyright © 2020-2025 蚁人论坛 (iYenn.com) All Rights Reserved.
Scroll to Top