class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="2"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="3"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="4"> class="hljs-ln-code"> class="hljs-ln-line">import java.util.List; class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="5"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="6"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="7"> class="hljs-ln-code"> class="hljs-ln-line">import org.apache.spark.SparkConf; class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="8"> class="hljs-ln-code"> class="hljs-ln-line">import org.apache.spark.api.java.JavaPairRDD; class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="9"> class="hljs-ln-code"> class="hljs-ln-line">import org.apache.spark.api.java.function.Function; class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="10"> class="hljs-ln-code"> class="hljs-ln-line">import org.apache.spark.api.java.function.Function2; class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="11"> class="hljs-ln-code"> class="hljs-ln-line">import org.apache.spark.api.java.function.PairFunction; class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="12"> class="hljs-ln-code"> class="hljs-ln-line">import org.apache.spark.streaming.Durations; class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="13"> class="hljs-ln-code"> class="hljs-ln-line">import org.apache.spark.streaming.api.java.JavaDStream; class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="14"> class="hljs-ln-code"> class="hljs-ln-line">import org.apache.spark.streaming.api.java.JavaPairDStream; class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="15"> class="hljs-ln-code"> class="hljs-ln-line">import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="16"> class="hljs-ln-code"> class="hljs-ln-line">import org.apache.spark.streaming.api.java.JavaStreamingContext; class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="17"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="18"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="19"> class="hljs-ln-code"> class="hljs-ln-line">import scala.Tuple2; class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="20"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="21"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="22"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="23"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="24"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="25"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="26"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="27"> class="hljs-ln-code"> class="hljs-ln-line">public class WindowHotWord { class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="28"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="29"> class="hljs-ln-code"> class="hljs-ln-line"> public static void main(String[] args) { class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="30"> class="hljs-ln-code"> class="hljs-ln-line"> SparkConf conf = new SparkConf() class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="31"> class="hljs-ln-code"> class="hljs-ln-line"> .setMaster("local[2]") class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="32"> class="hljs-ln-code"> class="hljs-ln-line"> .setAppName("WindowHotWord"); class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="33"> class="hljs-ln-code"> class="hljs-ln-line"> JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1)); class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="34"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="35"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="36"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="37"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="38"> class="hljs-ln-code"> class="hljs-ln-line"> JavaReceiverInputDStream searchLogsDStream = jssc.socketTextStream("spark1", 9999); class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="39"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="40"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="41"> class="hljs-ln-code"> class="hljs-ln-line"> JavaDStream searchWordsDStream = searchLogsDStream.map(new Function() { class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="42"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="43"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="44"> class="hljs-ln-code"> class="hljs-ln-line"> private static final long serialVersionUID = 1L; class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="45"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="46"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="47"> class="hljs-ln-code"> class="hljs-ln-line"> @Override class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="48"> class="hljs-ln-code"> class="hljs-ln-line"> public String call(String searchLog) throws Exception { class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="49"> class="hljs-ln-code"> class="hljs-ln-line"> return searchLog.split(" ")[1]; class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="50"> class="hljs-ln-code"> class="hljs-ln-line"> } class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="51"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="52"> class="hljs-ln-code"> class="hljs-ln-line"> }); class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="53"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="54"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="55"> class="hljs-ln-code"> class="hljs-ln-line"> JavaPairDStream searchWordPairDStream = searchWordsDStream.mapToPair( class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="56"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="57"> class="hljs-ln-code"> class="hljs-ln-line"> new PairFunction() { class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="58"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="59"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="60"> class="hljs-ln-code"> class="hljs-ln-line"> private static final long serialVersionUID = 1L; class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="61"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="62"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="63"> class="hljs-ln-code"> class="hljs-ln-line"> @Override class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="64"> class="hljs-ln-code"> class="hljs-ln-line"> public Tuple2 call(String searchWord) class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="65"> class="hljs-ln-code"> class="hljs-ln-line"> throws Exception { class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="66"> class="hljs-ln-code"> class="hljs-ln-line"> return new Tuple2(searchWord, 1); class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="67"> class="hljs-ln-code"> class="hljs-ln-line"> } class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="68"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="69"> class="hljs-ln-code"> class="hljs-ln-line"> }); class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="70"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="71"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="72"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="73"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="74"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="75"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="76"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="77"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="78"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="79"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="80"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="81"> class="hljs-ln-code"> class="hljs-ln-line"> JavaPairDStream searchWordCountsDStream = class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="82"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="83"> class="hljs-ln-code"> class="hljs-ln-line"> searchWordPairDStream.reduceByKeyAndWindow(new Function2() { class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="84"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="85"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="86"> class="hljs-ln-code"> class="hljs-ln-line"> private static final long serialVersionUID = 1L; class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="87"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="88"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="89"> class="hljs-ln-code"> class="hljs-ln-line"> @Override class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="90"> class="hljs-ln-code"> class="hljs-ln-line"> public Integer call(Integer v1, Integer v2) throws Exception { class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="91"> class="hljs-ln-code"> class="hljs-ln-line"> return v1 + v2; class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="92"> class="hljs-ln-code"> class="hljs-ln-line"> } class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="93"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="94"> class="hljs-ln-code"> class="hljs-ln-line"> }, Durations.seconds(60), Durations.seconds(10)); class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="95"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="96"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="97"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="98"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="99"> class="hljs-ln-code"> class="hljs-ln-line"> JavaPairDStream finalDStream = searchWordCountsDStream.transformToPair( class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="100"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="101"> class="hljs-ln-code"> class="hljs-ln-line"> new Function, JavaPairRDD>() { class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="102"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="103"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="104"> class="hljs-ln-code"> class="hljs-ln-line"> private static final long serialVersionUID = 1L; class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="105"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="106"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="107"> class="hljs-ln-code"> class="hljs-ln-line"> @Override class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="108"> class="hljs-ln-code"> class="hljs-ln-line"> public JavaPairRDD call( class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="109"> class="hljs-ln-code"> class="hljs-ln-line"> JavaPairRDD searchWordCountsRDD) throws Exception { class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="110"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="111"> class="hljs-ln-code"> class="hljs-ln-line"> JavaPairRDD countSearchWordsRDD = searchWordCountsRDD class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="112"> class="hljs-ln-code"> class="hljs-ln-line"> .mapToPair(new PairFunction, Integer, String>() { class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="113"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="114"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="115"> class="hljs-ln-code"> class="hljs-ln-line"> private static final long serialVersionUID = 1L; class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="116"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="117"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="118"> class="hljs-ln-code"> class="hljs-ln-line"> @Override class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="119"> class="hljs-ln-code"> class="hljs-ln-line"> public Tuple2 call( class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="120"> class="hljs-ln-code"> class="hljs-ln-line"> Tuple2 tuple) class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="121"> class="hljs-ln-code"> class="hljs-ln-line"> throws Exception { class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="122"> class="hljs-ln-code"> class="hljs-ln-line"> return new Tuple2(tuple._2, tuple._1); class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="123"> class="hljs-ln-code"> class="hljs-ln-line"> } class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="124"> class="hljs-ln-code"> class="hljs-ln-line"> }); class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="125"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="126"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="127"> class="hljs-ln-code"> class="hljs-ln-line"> JavaPairRDD sortedCountSearchWordsRDD = countSearchWordsRDD class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="128"> class="hljs-ln-code"> class="hljs-ln-line"> .sortByKey(false); class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="129"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="130"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="131"> class="hljs-ln-code"> class="hljs-ln-line"> JavaPairRDD sortedSearchWordCountsRDD = sortedCountSearchWordsRDD class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="132"> class="hljs-ln-code"> class="hljs-ln-line"> .mapToPair(new PairFunction, String, Integer>() { class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="133"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="134"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="135"> class="hljs-ln-code"> class="hljs-ln-line"> private static final long serialVersionUID = 1L; class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="136"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="137"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="138"> class="hljs-ln-code"> class="hljs-ln-line"> @Override class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="139"> class="hljs-ln-code"> class="hljs-ln-line"> public Tuple2 call( class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="140"> class="hljs-ln-code"> class="hljs-ln-line"> Tuple2 tuple) class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="141"> class="hljs-ln-code"> class="hljs-ln-line"> throws Exception { class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="142"> class="hljs-ln-code"> class="hljs-ln-line"> return new Tuple2(tuple._2, tuple._1); class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="143"> class="hljs-ln-code"> class="hljs-ln-line"> } class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="144"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="145"> class="hljs-ln-code"> class="hljs-ln-line"> }); class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="146"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="147"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="148"> class="hljs-ln-code"> class="hljs-ln-line"> List> hogSearchWordCounts = class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="149"> class="hljs-ln-code"> class="hljs-ln-line"> sortedSearchWordCountsRDD.take(3); class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="150"> class="hljs-ln-code"> class="hljs-ln-line"> for(Tuple2 wordCount : hogSearchWordCounts) { class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="151"> class="hljs-ln-code"> class="hljs-ln-line"> System.out.println(wordCount._1 + ": " + wordCount._2); class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="152"> class="hljs-ln-code"> class="hljs-ln-line"> } class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="153"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="154"> class="hljs-ln-code"> class="hljs-ln-line"> return searchWordCountsRDD; class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="155"> class="hljs-ln-code"> class="hljs-ln-line"> } class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="156"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="157"> class="hljs-ln-code"> class="hljs-ln-line"> }); class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="158"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="159"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="160"> class="hljs-ln-code"> class="hljs-ln-line"> finalDStream.print(); class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="161"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="162"> class="hljs-ln-code"> class="hljs-ln-line"> jssc.start(); class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="163"> class="hljs-ln-code"> class="hljs-ln-line"> jssc.awaitTermination(); class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="164"> class="hljs-ln-code"> class="hljs-ln-line"> jssc.close(); class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="165"> class="hljs-ln-code"> class="hljs-ln-line"> } class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="166"> class="hljs-ln-code"> class="hljs-ln-line">} class="hide-preCode-box">
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}" onclick="hljs.signin(event)">
scala版本:
package cn.spark.study.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
/**
object WindowHotWord {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("WindowHotWord")
val ssc = new StreamingContext(conf, Seconds(1))
val searchLogsDStream = ssc.socketTextStream("spark1", 9999)
val searchWordsDStream = searchLogsDStream.map { _.split(" ")(1) }
val searchWordPairsDStream = searchWordsDStream.map { searchWord => (searchWord, 1) }
val searchWordCountsDSteram = searchWordPairsDStream.reduceByKeyAndWindow(
(v1: Int, v2: Int) => v1 + v2,
Seconds(60),
Seconds(10))
val finalDStream = searchWordCountsDSteram.transform(searchWordCountsRDD => {
val countSearchWordsRDD = searchWordCountsRDD.map(tuple => (tuple._2, tuple._1))
val sortedCountSearchWordsRDD = countSearchWordsRDD.sortByKey(false)
val sortedSearchWordCountsRDD = sortedCountSearchWordsRDD.map(tuple => (tuple._1, tuple._2))
val top3SearchWordCounts = sortedSearchWordCountsRDD.take(3)
for(tuple <- top3SearchWordCounts) {
println(tuple)
}
searchWordCountsRDD
})
finalDStream.print()
ssc.start()
ssc.awaitTermination()
}
}
class="hide-preCode-box">
运行步骤:
1.启动nc
nc -lk 9999
2.本地运行,直接在IDE中运行程序
运行结果:

文章最后,给大家推荐一些受欢迎的技术博客链接:
- Hadoop相关技术博客链接
- Spark 核心技术链接
- JAVA相关的深度技术博客链接
- 超全干货--Flink思维导图,花了3周左右编写、校对
- 深入JAVA 的JVM核心原理解决线上各种故障【附案例】
- 请谈谈你对volatile的理解?--最近小李子与面试官的一场“硬核较量”
- 聊聊RPC通信,经常被问到的一道面试题。源码+笔记,包懂
欢迎扫描下方的二维码或 搜索 公众号“10点进修”,我们会有更多、且及时的资料推送给您,欢迎多多交流!


>>
评论记录:
回复评论: