输入DStream之基础数据源
HDFS文件:基于HDFS文件的实时计算,其实就是,监控一个HDFS目录,只要其中有新文件出现,就实时处理。相当于处理实时的文件流。
streamingContext.fileStream(dataDirectory)
streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
Spark Streaming会监视指定的HDFS目录,并且处理出现在目录中的文件。要注意的是,所有放入HDFS目录中的文件,都必须有相同的格式;必须使用移动或者重命名的方式,将文件移入目录;一旦处理之后,文件的内容即使改变,也不会再处理了;基于HDFS文件的数据源是没有Receiver的,因此不会占用一个cpu core。
java版本
- package cn.spark.study.streaming;
-
- import java.util.Arrays;
-
- import org.apache.spark.SparkConf;
- import org.apache.spark.api.java.function.FlatMapFunction;
- import org.apache.spark.api.java.function.Function2;
- import org.apache.spark.api.java.function.PairFunction;
- import org.apache.spark.streaming.Durations;
- import org.apache.spark.streaming.api.java.JavaDStream;
- import org.apache.spark.streaming.api.java.JavaPairDStream;
- import org.apache.spark.streaming.api.java.JavaStreamingContext;
-
- import scala.Tuple2;
-
- /**
- * 基于HDFS文件的实时wordcount程序
- * @author Administrator
- *
- */
- public class HDFSWordCount {
-
- public static void main(String[] args) {
- SparkConf conf = new SparkConf()
- .setMaster("local[2]")
- .setAppName("HDFSWordCount");
- JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
-
- // 首先,使用JavaStreamingContext的textFileStream()方法,针对HDFS目录创建输入数据流
- JavaDStream
lines = jssc.textFileStream("hdfs://spark1:9000/wordcount_dir"); -
- // 执行wordcount操作
- JavaDStream
words = lines.flatMap(new FlatMapFunction() { -
- private static final long serialVersionUID = 1L;
-
- @Override
- public Iterable
call(String line) throws Exception { - return Arrays.asList(line.split(" "));
- }
-
- });
-
- JavaPairDStream
pairs = words.mapToPair( -
- new PairFunction
() { -
- private static final long serialVersionUID = 1L;
-
- @Override
- public Tuple2
call(String word) - throws Exception {
- return new Tuple2
(word, 1); - }
-
- });
-
- JavaPairDStream
wordCounts = pairs.reduceByKey( -
- new Function2
() { -
- private static final long serialVersionUID = 1L;
-
- @Override
- public Integer call(Integer v1, Integer v2) throws Exception {
- return v1 + v2;
- }
-
- });
-
- wordCounts.print();
-
- jssc.start();
- jssc.awaitTermination();
- jssc.close();
- }
- }
scala版本
package cn.spark.study.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
/**
* @author Administrator
*/
object HDFSWordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("HDFSWordCount")
val ssc = new StreamingContext(conf, Seconds(5))
val lines = ssc.textFileStream("hdfs://spark1:9000/wordcount_dir")
val words = lines.flatMap { _.split(" ") }
val pairs = words.map { word => (word, 1) }
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
运行步骤:
打包,上传到linux中;编写spark-submit脚本;运行脚本;上传文件到hdfs://spark1:9000/wordcount_dir/下。
hadoop fs -put t1.txt /wordcount_dir/tt1.txt
运行结果:
文章最后,给大家推荐一些受欢迎的技术博客链接:
- Hadoop相关技术博客链接
- Spark 核心技术链接
- JAVA相关的深度技术博客链接
- 超全干货--Flink思维导图,花了3周左右编写、校对
- 深入JAVA 的JVM核心原理解决线上各种故障【附案例】
- 请谈谈你对volatile的理解?--最近小李子与面试官的一场“硬核较量”
- 聊聊RPC通信,经常被问到的一道面试题。源码+笔记,包懂
欢迎扫描下方的二维码或 搜索 公众号“10点进修”,我们会有更多、且及时的资料推送给您,欢迎多多交流!
评论记录:
回复评论: