• class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="2">
  • class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="3"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="4"> class="hljs-ln-code"> class="hljs-ln-line">import java.util.List;
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="5"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="6"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="7"> class="hljs-ln-code"> class="hljs-ln-line">import org.apache.spark.SparkConf;
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="8"> class="hljs-ln-code"> class="hljs-ln-line">import org.apache.spark.api.java.JavaPairRDD;
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="9"> class="hljs-ln-code"> class="hljs-ln-line">import org.apache.spark.api.java.function.Function;
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="10"> class="hljs-ln-code"> class="hljs-ln-line">import org.apache.spark.api.java.function.Function2;
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="11"> class="hljs-ln-code"> class="hljs-ln-line">import org.apache.spark.api.java.function.PairFunction;
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="12"> class="hljs-ln-code"> class="hljs-ln-line">import org.apache.spark.streaming.Durations;
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="13"> class="hljs-ln-code"> class="hljs-ln-line">import org.apache.spark.streaming.api.java.JavaDStream;
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="14"> class="hljs-ln-code"> class="hljs-ln-line">import org.apache.spark.streaming.api.java.JavaPairDStream;
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="15"> class="hljs-ln-code"> class="hljs-ln-line">import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="16"> class="hljs-ln-code"> class="hljs-ln-line">import org.apache.spark.streaming.api.java.JavaStreamingContext;
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="17"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="18"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="19"> class="hljs-ln-code"> class="hljs-ln-line">import scala.Tuple2;
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="20"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="21"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="22"> class="hljs-ln-code"> class="hljs-ln-line">/**
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="23"> class="hljs-ln-code"> class="hljs-ln-line"> * 基于滑动窗口的热点搜索词实时统计
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="24"> class="hljs-ln-code"> class="hljs-ln-line"> * @author Administrator
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="25"> class="hljs-ln-code"> class="hljs-ln-line"> *
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="26"> class="hljs-ln-code"> class="hljs-ln-line"> */
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="27"> class="hljs-ln-code"> class="hljs-ln-line">public class WindowHotWord {
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="28"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="29"> class="hljs-ln-code"> class="hljs-ln-line"> public static void main(String[] args) {
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="30"> class="hljs-ln-code"> class="hljs-ln-line"> SparkConf conf = new SparkConf()
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="31"> class="hljs-ln-code"> class="hljs-ln-line"> .setMaster("local[2]")
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="32"> class="hljs-ln-code"> class="hljs-ln-line"> .setAppName("WindowHotWord");
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="33"> class="hljs-ln-code"> class="hljs-ln-line"> JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="34"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="35"> class="hljs-ln-code"> class="hljs-ln-line"> // 说明一下,这里的搜索日志的格式
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="36"> class="hljs-ln-code"> class="hljs-ln-line"> // leo hello
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="37"> class="hljs-ln-code"> class="hljs-ln-line"> // tom world
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="38"> class="hljs-ln-code"> class="hljs-ln-line"> JavaReceiverInputDStream searchLogsDStream = jssc.socketTextStream("spark1", 9999);
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="39"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="40"> class="hljs-ln-code"> class="hljs-ln-line"> // 将搜索日志给转换成,只有一个搜索词,即可
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="41"> class="hljs-ln-code"> class="hljs-ln-line"> JavaDStream searchWordsDStream = searchLogsDStream.map(new Function() {
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="42"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="43"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="44"> class="hljs-ln-code"> class="hljs-ln-line"> private static final long serialVersionUID = 1L;
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="45"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="46"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="47"> class="hljs-ln-code"> class="hljs-ln-line"> @Override
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="48"> class="hljs-ln-code"> class="hljs-ln-line"> public String call(String searchLog) throws Exception {
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="49"> class="hljs-ln-code"> class="hljs-ln-line"> return searchLog.split(" ")[1];
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="50"> class="hljs-ln-code"> class="hljs-ln-line"> }
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="51"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="52"> class="hljs-ln-code"> class="hljs-ln-line"> });
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="53"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="54"> class="hljs-ln-code"> class="hljs-ln-line"> // 将搜索词映射为(searchWord, 1)的tuple格式
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="55"> class="hljs-ln-code"> class="hljs-ln-line"> JavaPairDStream searchWordPairDStream = searchWordsDStream.mapToPair(
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="56"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="57"> class="hljs-ln-code"> class="hljs-ln-line"> new PairFunction() {
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="58"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="59"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="60"> class="hljs-ln-code"> class="hljs-ln-line"> private static final long serialVersionUID = 1L;
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="61"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="62"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="63"> class="hljs-ln-code"> class="hljs-ln-line"> @Override
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="64"> class="hljs-ln-code"> class="hljs-ln-line"> public Tuple2 call(String searchWord)
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="65"> class="hljs-ln-code"> class="hljs-ln-line"> throws Exception {
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="66"> class="hljs-ln-code"> class="hljs-ln-line"> return new Tuple2(searchWord, 1);
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="67"> class="hljs-ln-code"> class="hljs-ln-line"> }
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="68"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="69"> class="hljs-ln-code"> class="hljs-ln-line"> });
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="70"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="71"> class="hljs-ln-code"> class="hljs-ln-line"> // 针对(searchWord, 1)的tuple格式的DStream,执行reduceByKeyAndWindow,滑动窗口操作
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="72"> class="hljs-ln-code"> class="hljs-ln-line"> // 第二个参数,是窗口长度,这里是60秒
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="73"> class="hljs-ln-code"> class="hljs-ln-line"> // 第三个参数,是滑动间隔,这里是10秒
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="74"> class="hljs-ln-code"> class="hljs-ln-line"> // 也就是说,每隔10秒钟,将最近60秒的数据,作为一个窗口,进行内部的RDD的聚合,然后统一对一个RDD进行后续
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="75"> class="hljs-ln-code"> class="hljs-ln-line"> // 计算
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="76"> class="hljs-ln-code"> class="hljs-ln-line"> // 所以说,这里的意思,就是,之前的searchWordPairDStream为止,其实,都是不会立即进行计算的
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="77"> class="hljs-ln-code"> class="hljs-ln-line"> // 而是只是放在那里
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="78"> class="hljs-ln-code"> class="hljs-ln-line"> // 然后,等待我们的滑动间隔到了以后,10秒钟到了,会将之前60秒的RDD,因为一个batch间隔是,5秒,所以之前
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="79"> class="hljs-ln-code"> class="hljs-ln-line"> // 60秒,就有12个RDD,给聚合起来,然后,统一执行redcueByKey操作
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="80"> class="hljs-ln-code"> class="hljs-ln-line"> // 所以这里的reduceByKeyAndWindow,是针对每个窗口执行计算的,而不是针对某个DStream中的RDD
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="81"> class="hljs-ln-code"> class="hljs-ln-line"> JavaPairDStream searchWordCountsDStream =
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="82"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="83"> class="hljs-ln-code"> class="hljs-ln-line"> searchWordPairDStream.reduceByKeyAndWindow(new Function2() {
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="84"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="85"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="86"> class="hljs-ln-code"> class="hljs-ln-line"> private static final long serialVersionUID = 1L;
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="87"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="88"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="89"> class="hljs-ln-code"> class="hljs-ln-line"> @Override
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="90"> class="hljs-ln-code"> class="hljs-ln-line"> public Integer call(Integer v1, Integer v2) throws Exception {
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="91"> class="hljs-ln-code"> class="hljs-ln-line"> return v1 + v2;
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="92"> class="hljs-ln-code"> class="hljs-ln-line"> }
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="93"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="94"> class="hljs-ln-code"> class="hljs-ln-line"> }, Durations.seconds(60), Durations.seconds(10));
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="95"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="96"> class="hljs-ln-code"> class="hljs-ln-line"> // 到这里为止,就已经可以做到,每隔10秒钟,出来,之前60秒的收集到的单词的统计次数
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="97"> class="hljs-ln-code"> class="hljs-ln-line"> // 执行transform操作,因为,一个窗口,就是一个60秒钟的数据,会变成一个RDD,然后,对这一个RDD
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="98"> class="hljs-ln-code"> class="hljs-ln-line"> // 根据每个搜索词出现的频率进行排序,然后获取排名前3的热点搜索词
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="99"> class="hljs-ln-code"> class="hljs-ln-line"> JavaPairDStream finalDStream = searchWordCountsDStream.transformToPair(
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="100"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="101"> class="hljs-ln-code"> class="hljs-ln-line"> new Function, JavaPairRDD>() {
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="102"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="103"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="104"> class="hljs-ln-code"> class="hljs-ln-line"> private static final long serialVersionUID = 1L;
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="105"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="106"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="107"> class="hljs-ln-code"> class="hljs-ln-line"> @Override
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="108"> class="hljs-ln-code"> class="hljs-ln-line"> public JavaPairRDD call(
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="109"> class="hljs-ln-code"> class="hljs-ln-line"> JavaPairRDD searchWordCountsRDD) throws Exception {
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="110"> class="hljs-ln-code"> class="hljs-ln-line"> // 执行搜索词和出现频率的反转
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="111"> class="hljs-ln-code"> class="hljs-ln-line"> JavaPairRDD countSearchWordsRDD = searchWordCountsRDD
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="112"> class="hljs-ln-code"> class="hljs-ln-line"> .mapToPair(new PairFunction, Integer, String>() {
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="113"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="114"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="115"> class="hljs-ln-code"> class="hljs-ln-line"> private static final long serialVersionUID = 1L;
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="116"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="117"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="118"> class="hljs-ln-code"> class="hljs-ln-line"> @Override
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="119"> class="hljs-ln-code"> class="hljs-ln-line"> public Tuple2 call(
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="120"> class="hljs-ln-code"> class="hljs-ln-line"> Tuple2 tuple)
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="121"> class="hljs-ln-code"> class="hljs-ln-line"> throws Exception {
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="122"> class="hljs-ln-code"> class="hljs-ln-line"> return new Tuple2(tuple._2, tuple._1);
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="123"> class="hljs-ln-code"> class="hljs-ln-line"> }
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="124"> class="hljs-ln-code"> class="hljs-ln-line"> });
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="125"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="126"> class="hljs-ln-code"> class="hljs-ln-line"> // 然后执行降序排序
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="127"> class="hljs-ln-code"> class="hljs-ln-line"> JavaPairRDD sortedCountSearchWordsRDD = countSearchWordsRDD
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="128"> class="hljs-ln-code"> class="hljs-ln-line"> .sortByKey(false);
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="129"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="130"> class="hljs-ln-code"> class="hljs-ln-line"> // 然后再次执行反转,变成(searchWord, count)的这种格式
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="131"> class="hljs-ln-code"> class="hljs-ln-line"> JavaPairRDD sortedSearchWordCountsRDD = sortedCountSearchWordsRDD
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="132"> class="hljs-ln-code"> class="hljs-ln-line"> .mapToPair(new PairFunction, String, Integer>() {
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="133"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="134"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="135"> class="hljs-ln-code"> class="hljs-ln-line"> private static final long serialVersionUID = 1L;
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="136"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="137"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="138"> class="hljs-ln-code"> class="hljs-ln-line"> @Override
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="139"> class="hljs-ln-code"> class="hljs-ln-line"> public Tuple2 call(
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="140"> class="hljs-ln-code"> class="hljs-ln-line"> Tuple2 tuple)
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="141"> class="hljs-ln-code"> class="hljs-ln-line"> throws Exception {
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="142"> class="hljs-ln-code"> class="hljs-ln-line"> return new Tuple2(tuple._2, tuple._1);
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="143"> class="hljs-ln-code"> class="hljs-ln-line"> }
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="144"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="145"> class="hljs-ln-code"> class="hljs-ln-line"> });
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="146"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="147"> class="hljs-ln-code"> class="hljs-ln-line"> // 然后用take(),获取排名前3的热点搜索词
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="148"> class="hljs-ln-code"> class="hljs-ln-line"> List> hogSearchWordCounts =
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="149"> class="hljs-ln-code"> class="hljs-ln-line"> sortedSearchWordCountsRDD.take(3);
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="150"> class="hljs-ln-code"> class="hljs-ln-line"> for(Tuple2 wordCount : hogSearchWordCounts) {
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="151"> class="hljs-ln-code"> class="hljs-ln-line"> System.out.println(wordCount._1 + ": " + wordCount._2);
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="152"> class="hljs-ln-code"> class="hljs-ln-line"> }
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="153"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="154"> class="hljs-ln-code"> class="hljs-ln-line"> return searchWordCountsRDD;
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="155"> class="hljs-ln-code"> class="hljs-ln-line"> }
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="156"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="157"> class="hljs-ln-code"> class="hljs-ln-line"> });
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="158"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="159"> class="hljs-ln-code"> class="hljs-ln-line"> // 这个无关紧要,只是为了触发job的执行,所以必须有output操作
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="160"> class="hljs-ln-code"> class="hljs-ln-line"> finalDStream.print();
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="161"> class="hljs-ln-code"> class="hljs-ln-line">
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="162"> class="hljs-ln-code"> class="hljs-ln-line"> jssc.start();
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="163"> class="hljs-ln-code"> class="hljs-ln-line"> jssc.awaitTermination();
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="164"> class="hljs-ln-code"> class="hljs-ln-line"> jssc.close();
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="165"> class="hljs-ln-code"> class="hljs-ln-line"> }
  • class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="166"> class="hljs-ln-code"> class="hljs-ln-line">}
  • class="hide-preCode-box"> class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}" onclick="hljs.signin(event)">


    scala版本:

    package cn.spark.study.streaming
    
    
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.StreamingContext
    import org.apache.spark.streaming.Seconds
    
    
    /**
     * @author Administrator
     */
    object WindowHotWord {
      
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
            .setMaster("local[2]")  
            .setAppName("WindowHotWord")
        val ssc = new StreamingContext(conf, Seconds(1))
        
        val searchLogsDStream = ssc.socketTextStream("spark1", 9999)  
        val searchWordsDStream = searchLogsDStream.map { _.split(" ")(1) }  
        val searchWordPairsDStream = searchWordsDStream.map { searchWord => (searchWord, 1) }  
        val searchWordCountsDSteram = searchWordPairsDStream.reduceByKeyAndWindow(
            (v1: Int, v2: Int) => v1 + v2, 
            Seconds(60), 
            Seconds(10))  
            
        val finalDStream = searchWordCountsDSteram.transform(searchWordCountsRDD => {
          val countSearchWordsRDD = searchWordCountsRDD.map(tuple => (tuple._2, tuple._1))  
          val sortedCountSearchWordsRDD = countSearchWordsRDD.sortByKey(false)  
          val sortedSearchWordCountsRDD = sortedCountSearchWordsRDD.map(tuple => (tuple._1, tuple._2))
          
          val top3SearchWordCounts = sortedSearchWordCountsRDD.take(3)
          for(tuple <- top3SearchWordCounts) {
            println(tuple)
          }
          
          searchWordCountsRDD
        })
        
        finalDStream.print()
        
        ssc.start()
        ssc.awaitTermination()
      }
    } class="hide-preCode-box">


    运行步骤:
    1.启动nc 
    nc -lk 9999
    2.本地运行,直接在IDE中运行程序

    运行结果:

     

     


    文章最后,给大家推荐一些受欢迎的技术博客链接

    1. Hadoop相关技术博客链接
    2. Spark 核心技术链接
    3. JAVA相关的深度技术博客链接
    4. 超全干货--Flink思维导图,花了3周左右编写、校对
    5. 深入JAVA 的JVM核心原理解决线上各种故障【附案例】
    6. 请谈谈你对volatile的理解?--最近小李子与面试官的一场“硬核较量”
    7. 聊聊RPC通信,经常被问到的一道面试题。源码+笔记,包懂

     


    欢迎扫描下方的二维码或 搜索 公众号“10点进修”,我们会有更多、且及时的资料推送给您,欢迎多多交流!

                                               

           

     

    >>
    注:本文转载自blog.csdn.net的不埋雷的探长的文章"https://blog.csdn.net/weixin_32265569/article/details/78571519"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
    复制链接

    评论记录:

    未查询到任何数据!