首页 最新 热门 推荐

  • 首页
  • 最新
  • 热门
  • 推荐

7.Spark Streaming:输入DStream之基础数据源以及基于HDFS的实时wordcount程序

  • 25-03-07 20:20
  • 2900
  • 12121
blog.csdn.net

输入DStream之基础数据源

HDFS文件:基于HDFS文件的实时计算,其实就是,监控一个HDFS目录,只要其中有新文件出现,就实时处理。相当于处理实时的文件流。

streamingContext.fileStream(dataDirectory)
streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)

Spark Streaming会监视指定的HDFS目录,并且处理出现在目录中的文件。要注意的是,所有放入HDFS目录中的文件,都必须有相同的格式;必须使用移动或者重命名的方式,将文件移入目录;一旦处理之后,文件的内容即使改变,也不会再处理了;基于HDFS文件的数据源是没有Receiver的,因此不会占用一个cpu core。

 

java版本

  1. package cn.spark.study.streaming;
  2. import java.util.Arrays;
  3. import org.apache.spark.SparkConf;
  4. import org.apache.spark.api.java.function.FlatMapFunction;
  5. import org.apache.spark.api.java.function.Function2;
  6. import org.apache.spark.api.java.function.PairFunction;
  7. import org.apache.spark.streaming.Durations;
  8. import org.apache.spark.streaming.api.java.JavaDStream;
  9. import org.apache.spark.streaming.api.java.JavaPairDStream;
  10. import org.apache.spark.streaming.api.java.JavaStreamingContext;
  11. import scala.Tuple2;
  12. /**
  13. * 基于HDFS文件的实时wordcount程序
  14. * @author Administrator
  15. *
  16. */
  17. public class HDFSWordCount {
  18. public static void main(String[] args) {
  19. SparkConf conf = new SparkConf()
  20. .setMaster("local[2]")
  21. .setAppName("HDFSWordCount");
  22. JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
  23. // 首先,使用JavaStreamingContext的textFileStream()方法,针对HDFS目录创建输入数据流
  24. JavaDStream lines = jssc.textFileStream("hdfs://spark1:9000/wordcount_dir");
  25. // 执行wordcount操作
  26. JavaDStream words = lines.flatMap(new FlatMapFunction() {
  27. private static final long serialVersionUID = 1L;
  28. @Override
  29. public Iterable call(String line) throws Exception {
  30. return Arrays.asList(line.split(" "));
  31. }
  32. });
  33. JavaPairDStream pairs = words.mapToPair(
  34. new PairFunction() {
  35. private static final long serialVersionUID = 1L;
  36. @Override
  37. public Tuple2 call(String word)
  38. throws Exception {
  39. return new Tuple2(word, 1);
  40. }
  41. });
  42. JavaPairDStream wordCounts = pairs.reduceByKey(
  43. new Function2() {
  44. private static final long serialVersionUID = 1L;
  45. @Override
  46. public Integer call(Integer v1, Integer v2) throws Exception {
  47. return v1 + v2;
  48. }
  49. });
  50. wordCounts.print();
  51. jssc.start();
  52. jssc.awaitTermination();
  53. jssc.close();
  54. }
  55. }

scala版本

package cn.spark.study.streaming
 
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
 
/**
 * @author Administrator
 */
object HDFSWordCount {
  
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
        .setMaster("local[2]")  
        .setAppName("HDFSWordCount")
    val ssc = new StreamingContext(conf, Seconds(5))
    
    val lines = ssc.textFileStream("hdfs://spark1:9000/wordcount_dir")  
    val words = lines.flatMap { _.split(" ") }  
    val pairs = words.map { word => (word, 1) }  
    val wordCounts = pairs.reduceByKey(_ + _)  
    
    wordCounts.print()  
    
    ssc.start()
    ssc.awaitTermination()
  }
  
}

运行步骤:

 

打包,上传到linux中;编写spark-submit脚本;运行脚本;上传文件到hdfs://spark1:9000/wordcount_dir/下。

hadoop fs -put t1.txt /wordcount_dir/tt1.txt

 

 

运行结果:

 

 


文章最后,给大家推荐一些受欢迎的技术博客链接:

  1. Hadoop相关技术博客链接
  2. Spark 核心技术链接
  3. JAVA相关的深度技术博客链接
  4. 超全干货--Flink思维导图,花了3周左右编写、校对
  5. 深入JAVA 的JVM核心原理解决线上各种故障【附案例】
  6. 请谈谈你对volatile的理解?--最近小李子与面试官的一场“硬核较量”
  7. 聊聊RPC通信,经常被问到的一道面试题。源码+笔记,包懂

 


欢迎扫描下方的二维码或 搜索 公众号“10点进修”,我们会有更多、且及时的资料推送给您,欢迎多多交流!

                                           

       

注:本文转载自blog.csdn.net的不埋雷的探长的文章"https://blog.csdn.net/weixin_32265569/article/details/78556187"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
复制链接
复制链接
相关推荐
发表评论
登录后才能发表评论和回复 注册

/ 登录

评论记录:

未查询到任何数据!
回复评论:

分类栏目

后端 (14832) 前端 (14280) 移动开发 (3760) 编程语言 (3851) Java (3904) Python (3298) 人工智能 (10119) AIGC (2810) 大数据 (3499) 数据库 (3945) 数据结构与算法 (3757) 音视频 (2669) 云原生 (3145) 云平台 (2965) 前沿技术 (2993) 开源 (2160) 小程序 (2860) 运维 (2533) 服务器 (2698) 操作系统 (2325) 硬件开发 (2492) 嵌入式 (2955) 微软技术 (2769) 软件工程 (2056) 测试 (2865) 网络空间安全 (2948) 网络与通信 (2797) 用户体验设计 (2592) 学习和成长 (2593) 搜索 (2744) 开发工具 (7108) 游戏 (2829) HarmonyOS (2935) 区块链 (2782) 数学 (3112) 3C硬件 (2759) 资讯 (2909) Android (4709) iOS (1850) 代码人生 (3043) 阅读 (2841)

热门文章

101
推荐
关于我们 隐私政策 免责声明 联系我们
Copyright © 2020-2025 蚁人论坛 (iYenn.com) All Rights Reserved.
Scroll to Top