class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="3"> class="hljs-ln-code"> class="hljs-ln-line">import java.util.Arrays;
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="4"> class="hljs-ln-code"> class="hljs-ln-line">import java.util.List;
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="5"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="6"> class="hljs-ln-code"> class="hljs-ln-line">import org.apache.spark.SparkConf;
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="7"> class="hljs-ln-code"> class="hljs-ln-line">import org.apache.spark.api.java.function.FlatMapFunction;
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="8"> class="hljs-ln-code"> class="hljs-ln-line">import org.apache.spark.api.java.function.Function2;
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="9"> class="hljs-ln-code"> class="hljs-ln-line">import org.apache.spark.api.java.function.PairFunction;
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="10"> class="hljs-ln-code"> class="hljs-ln-line">import org.apache.spark.streaming.Durations;
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="11"> class="hljs-ln-code"> class="hljs-ln-line">import org.apache.spark.streaming.api.java.JavaDStream;
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="12"> class="hljs-ln-code"> class="hljs-ln-line">import org.apache.spark.streaming.api.java.JavaPairDStream;
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="13"> class="hljs-ln-code"> class="hljs-ln-line">import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="14"> class="hljs-ln-code"> class="hljs-ln-line">import org.apache.spark.streaming.api.java.JavaStreamingContext;
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="15"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="16"> class="hljs-ln-code"> class="hljs-ln-line">import com.google.common.base.Optional;
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="17"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="18"> class="hljs-ln-code"> class="hljs-ln-line">import scala.Tuple2;
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="19"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="20"> class="hljs-ln-code"> class="hljs-ln-line">/**
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="21"> class="hljs-ln-code"> class="hljs-ln-line"> * 基于updateStateByKey算子实现缓存机制的实时wordcount程序
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="22"> class="hljs-ln-code"> class="hljs-ln-line"> * @author Administrator
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="23"> class="hljs-ln-code"> class="hljs-ln-line"> *
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="24"> class="hljs-ln-code"> class="hljs-ln-line"> */
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="25"> class="hljs-ln-code"> class="hljs-ln-line">public class UpdateStateByKeyWordCount {
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="26"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="27"> class="hljs-ln-code"> class="hljs-ln-line"> public static void main(String[] args) {
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="28"> class="hljs-ln-code"> class="hljs-ln-line"> SparkConf conf = new SparkConf()
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="29"> class="hljs-ln-code"> class="hljs-ln-line"> .setMaster("local[2]")
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="30"> class="hljs-ln-code"> class="hljs-ln-line"> .setAppName("UpdateStateByKeyWordCount");
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="31"> class="hljs-ln-code"> class="hljs-ln-line"> JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="32"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="33"> class="hljs-ln-code"> class="hljs-ln-line"> // 第一点,如果要使用updateStateByKey算子,就必须设置一个checkpoint目录,开启checkpoint机制
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="34"> class="hljs-ln-code"> class="hljs-ln-line"> // 这样的话才能把每个key对应的state除了在内存中有,那么是不是也要checkpoint一份
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="35"> class="hljs-ln-code"> class="hljs-ln-line"> // 因为你要长期保存一份key的state的话,那么spark streaming是要求必须用checkpoint的,以便于在
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="36"> class="hljs-ln-code"> class="hljs-ln-line"> // 内存数据丢失的时候,可以从checkpoint中恢复数据
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="37"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="38"> class="hljs-ln-code"> class="hljs-ln-line"> // 开启checkpoint机制,很简单,只要调用jssc的checkpoint()方法,设置一个hdfs目录即可
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="39"> class="hljs-ln-code"> class="hljs-ln-line"> jssc.checkpoint("hdfs://spark1:9000/wordcount_checkpoint");
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="40"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="41"> class="hljs-ln-code"> class="hljs-ln-line"> // 然后先实现基础的wordcount逻辑
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="42"> class="hljs-ln-code"> class="hljs-ln-line"> JavaReceiverInputDStream lines = jssc.socketTextStream("localhost", 9999);
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="43"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="44"> class="hljs-ln-code"> class="hljs-ln-line"> JavaDStream words = lines.flatMap(new FlatMapFunction() {
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="45"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="46"> class="hljs-ln-code"> class="hljs-ln-line"> private static final long serialVersionUID = 1L;
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="47"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="48"> class="hljs-ln-code"> class="hljs-ln-line"> @Override
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="49"> class="hljs-ln-code"> class="hljs-ln-line"> public Iterable call(String line) throws Exception {
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="50"> class="hljs-ln-code"> class="hljs-ln-line"> return Arrays.asList(line.split(" "));
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="51"> class="hljs-ln-code"> class="hljs-ln-line"> }
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="52"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="53"> class="hljs-ln-code"> class="hljs-ln-line"> });
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="54"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="55"> class="hljs-ln-code"> class="hljs-ln-line"> JavaPairDStream pairs = words.mapToPair(
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="56"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="57"> class="hljs-ln-code"> class="hljs-ln-line"> new PairFunction() {
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="58"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="59"> class="hljs-ln-code"> class="hljs-ln-line"> private static final long serialVersionUID = 1L;
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="60"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="61"> class="hljs-ln-code"> class="hljs-ln-line"> @Override
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="62"> class="hljs-ln-code"> class="hljs-ln-line"> public Tuple2 call(String word)
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="63"> class="hljs-ln-code"> class="hljs-ln-line"> throws Exception {
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="64"> class="hljs-ln-code"> class="hljs-ln-line"> return new Tuple2(word, 1);
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="65"> class="hljs-ln-code"> class="hljs-ln-line"> }
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="66"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="67"> class="hljs-ln-code"> class="hljs-ln-line"> });
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="68"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="69"> class="hljs-ln-code"> class="hljs-ln-line"> // 到了这里,就不一样了,之前的话,是不是直接就是pairs.reduceByKey
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="70"> class="hljs-ln-code"> class="hljs-ln-line"> // 然后,就可以得到每个时间段的batch对应的RDD,计算出来的单词计数
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="71"> class="hljs-ln-code"> class="hljs-ln-line"> // 然后,可以打印出那个时间段的单词计数
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="72"> class="hljs-ln-code"> class="hljs-ln-line"> // 但是,有个问题,你如果要统计每个单词的全局的计数呢?
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="73"> class="hljs-ln-code"> class="hljs-ln-line"> // 就是说,统计出来,从程序启动开始,到现在为止,一个单词出现的次数,那么就之前的方式就不好实现
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="74"> class="hljs-ln-code"> class="hljs-ln-line"> // 就必须基于redis这种缓存,或者是mysql这种db,来实现累加
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="75"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="76"> class="hljs-ln-code"> class="hljs-ln-line"> // 但是,我们的updateStateByKey,就可以实现直接通过Spark维护一份每个单词的全局的统计次数
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="77"> class="hljs-ln-code"> class="hljs-ln-line"> JavaPairDStream wordCounts = pairs.updateStateByKey(
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="78"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="79"> class="hljs-ln-code"> class="hljs-ln-line"> // 这里的Optional,相当于Scala中的样例类,就是Option,可以这么理解
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="80"> class="hljs-ln-code"> class="hljs-ln-line"> // 它代表了一个值的存在状态,可能存在,也可能不存在
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="81"> class="hljs-ln-code"> class="hljs-ln-line"> new Function2, Optional, Optional>() {
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="82"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="83"> class="hljs-ln-code"> class="hljs-ln-line"> private static final long serialVersionUID = 1L;
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="84"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="85"> class="hljs-ln-code"> class="hljs-ln-line"> // 这里两个参数
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="86"> class="hljs-ln-code"> class="hljs-ln-line"> // 实际上,对于每个单词,每次batch计算的时候,都会调用这个函数
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="87"> class="hljs-ln-code"> class="hljs-ln-line"> // 第一个参数,values,相当于是这个batch中,这个key的新的值,可能有多个吧
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="88"> class="hljs-ln-code"> class="hljs-ln-line"> // 比如说一个hello,可能有2个1,(hello, 1) (hello, 1),那么传入的是(1,1)
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="89"> class="hljs-ln-code"> class="hljs-ln-line"> // 第二个参数,就是指的是这个key之前的状态,state,其中泛型的类型是你自己指定的
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="90"> class="hljs-ln-code"> class="hljs-ln-line"> @Override
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="91"> class="hljs-ln-code"> class="hljs-ln-line"> public Optional call(List values,
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="92"> class="hljs-ln-code"> class="hljs-ln-line"> Optional state) throws Exception {
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="93"> class="hljs-ln-code"> class="hljs-ln-line"> // 首先定义一个全局的单词计数
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="94"> class="hljs-ln-code"> class="hljs-ln-line"> Integer newValue = 0;
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="95"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="96"> class="hljs-ln-code"> class="hljs-ln-line"> // 其次,判断,state是否存在,如果不存在,说明是一个key第一次出现
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="97"> class="hljs-ln-code"> class="hljs-ln-line"> // 如果存在,说明这个key之前已经统计过全局的次数了
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="98"> class="hljs-ln-code"> class="hljs-ln-line"> if(state.isPresent()) {
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="99"> class="hljs-ln-code"> class="hljs-ln-line"> newValue = state.get();
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="100"> class="hljs-ln-code"> class="hljs-ln-line"> }
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="101"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="102"> class="hljs-ln-code"> class="hljs-ln-line"> // 接着,将本次新出现的值,都累加到newValue上去,就是一个key目前的全局的统计
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="103"> class="hljs-ln-code"> class="hljs-ln-line"> // 次数
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="104"> class="hljs-ln-code"> class="hljs-ln-line"> for(Integer value : values) {
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="105"> class="hljs-ln-code"> class="hljs-ln-line"> newValue += value;
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="106"> class="hljs-ln-code"> class="hljs-ln-line"> }
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="107"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="108"> class="hljs-ln-code"> class="hljs-ln-line"> return Optional.of(newValue);
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="109"> class="hljs-ln-code"> class="hljs-ln-line"> }
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="110"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="111"> class="hljs-ln-code"> class="hljs-ln-line"> });
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="112"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="113"> class="hljs-ln-code"> class="hljs-ln-line"> // 到这里为止,相当于是,每个batch过来是,计算到pairs DStream,就会执行全局的updateStateByKey
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="114"> class="hljs-ln-code"> class="hljs-ln-line"> // 算子,updateStateByKey返回的JavaPairDStream,其实就代表了每个key的全局的计数
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="115"> class="hljs-ln-code"> class="hljs-ln-line"> // 打印出来
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="116"> class="hljs-ln-code"> class="hljs-ln-line"> wordCounts.print();
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="117"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="118"> class="hljs-ln-code"> class="hljs-ln-line"> jssc.start();
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="119"> class="hljs-ln-code"> class="hljs-ln-line"> jssc.awaitTermination();
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="120"> class="hljs-ln-code"> class="hljs-ln-line"> jssc.close();
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="121"> class="hljs-ln-code"> class="hljs-ln-line"> }
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="122"> class="hljs-ln-code"> class="hljs-ln-line">}
  • class="hide-preCode-box"> class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}" onclick="hljs.signin(event)">

    scala版本代码:

     

    package cn.spark.study.streaming
     
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.StreamingContext
    import org.apache.spark.streaming.Seconds
     
    /**
     * @author Administrator
     */
    object UpdateStateByKeyWordCount {
      
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
            .setMaster("local[2]")  
            .setAppName("UpdateStateByKeyWordCount")
        val ssc = new StreamingContext(conf, Seconds(5))
        ssc.checkpoint("hdfs://spark1:9000/wordcount_checkpoint")  
        
        val lines = ssc.socketTextStream("spark1", 9999)
        val words = lines.flatMap { _.split(" ") }   
        val pairs = words.map { word => (word, 1) }
        val wordCounts = pairs.updateStateByKey((values: Seq[Int], state: Option[Int]) => {
          var newValue = state.getOrElse(0)    
          for(value <- values) {
            newValue += value
          }
          Option(newValue)  
        })
        
        wordCounts.print()  
        
        ssc.start()
        ssc.awaitTermination()
      }
    } class="hide-preCode-box">

    运行步骤:

    1.在hdfs文件系统创建wordcount_checkpoint

    hadoop fs -mkdir /wordcount_checkpoint

    2.启动nc

    nc -lk 9999

    3.启动运行程序

    运行结果:

    将启动到结束过程中的结果都进行缓存

     

     


    文章最后,给大家推荐一些受欢迎的技术博客链接

    1. Hadoop相关技术博客链接
    2. Spark 核心技术链接
    3. JAVA相关的深度技术博客链接
    4. 超全干货--Flink思维导图,花了3周左右编写、校对
    5. 深入JAVA 的JVM核心原理解决线上各种故障【附案例】
    6. 请谈谈你对volatile的理解?--最近小李子与面试官的一场“硬核较量”
    7. 聊聊RPC通信,经常被问到的一道面试题。源码+笔记,包懂

     


    欢迎扫描下方的二维码或 搜索 公众号“10点进修”,我们会有更多、且及时的资料推送给您,欢迎多多交流!

                                               

           

     

    >>
    注:本文转载自blog.csdn.net的不埋雷的探长的文章"https://blog.csdn.net/weixin_32265569/article/details/78571456"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
    复制链接

    评论记录:

    未查询到任何数据!