class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="3"> class="hljs-ln-code"> class="hljs-ln-line">import java.util.ArrayList; class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="4"> class="hljs-ln-code"> class="hljs-ln-line">import java.util.List; class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="5"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="6"> class="hljs-ln-code"> class="hljs-ln-line">import org.apache.spark.SparkConf; class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="7"> class="hljs-ln-code"> class="hljs-ln-line">import org.apache.spark.api.java.JavaPairRDD; class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="8"> class="hljs-ln-code"> class="hljs-ln-line">import org.apache.spark.api.java.JavaRDD; class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="9"> class="hljs-ln-code"> class="hljs-ln-line">import org.apache.spark.api.java.function.Function; class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="10"> class="hljs-ln-code"> class="hljs-ln-line">import org.apache.spark.api.java.function.PairFunction; class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="11"> class="hljs-ln-code"> class="hljs-ln-line">import org.apache.spark.streaming.Durations; class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="12"> class="hljs-ln-code"> class="hljs-ln-line">import org.apache.spark.streaming.api.java.JavaDStream; class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="13"> class="hljs-ln-code"> class="hljs-ln-line">import org.apache.spark.streaming.api.java.JavaPairDStream; class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="14"> class="hljs-ln-code"> class="hljs-ln-line">import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="15"> class="hljs-ln-code"> class="hljs-ln-line">import org.apache.spark.streaming.api.java.JavaStreamingContext; class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="16"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="17"> class="hljs-ln-code"> class="hljs-ln-line">import com.google.common.base.Optional; class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="18"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="19"> class="hljs-ln-code"> class="hljs-ln-line">import scala.Tuple2; class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="20"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="21"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="22"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="23"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="24"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="25"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="26"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="27"> class="hljs-ln-code"> class="hljs-ln-line">public class TransformBlacklist { class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="28"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="29"> class="hljs-ln-code"> class="hljs-ln-line"> @SuppressWarnings("deprecation") class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="30"> class="hljs-ln-code"> class="hljs-ln-line"> public static void main(String[] args) { class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="31"> class="hljs-ln-code"> class="hljs-ln-line"> SparkConf conf = new SparkConf() class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="32"> class="hljs-ln-code"> class="hljs-ln-line"> .setMaster("local[2]") class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="33"> class="hljs-ln-code"> class="hljs-ln-line"> .setAppName("TransformBlacklist"); class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="34"> class="hljs-ln-code"> class="hljs-ln-line"> JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5)); class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="35"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="36"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="37"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="38"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="39"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="40"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="41"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="42"> class="hljs-ln-code"> class="hljs-ln-line"> List> blacklist = new ArrayList>(); class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="43"> class="hljs-ln-code"> class="hljs-ln-line"> blacklist.add(new Tuple2("tom", true)); class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="44"> class="hljs-ln-code"> class="hljs-ln-line"> final JavaPairRDD blacklistRDD = jssc.sc().parallelizePairs(blacklist); class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="45"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="46"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="47"> class="hljs-ln-code"> class="hljs-ln-line"> JavaReceiverInputDStream adsClickLogDStream = jssc.socketTextStream("spark1", 9999); class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="48"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="49"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="50"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="51"> class="hljs-ln-code"> class="hljs-ln-line"> JavaPairDStream userAdsClickLogDStream = adsClickLogDStream.mapToPair( class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="52"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="53"> class="hljs-ln-code"> class="hljs-ln-line"> new PairFunction() { class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="54"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="55"> class="hljs-ln-code"> class="hljs-ln-line"> private static final long serialVersionUID = 1L; class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="56"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="57"> class="hljs-ln-code"> class="hljs-ln-line"> @Override class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="58"> class="hljs-ln-code"> class="hljs-ln-line"> public Tuple2 call(String adsClickLog) class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="59"> class="hljs-ln-code"> class="hljs-ln-line"> throws Exception { class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="60"> class="hljs-ln-code"> class="hljs-ln-line"> return new Tuple2( class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="61"> class="hljs-ln-code"> class="hljs-ln-line"> adsClickLog.split(" ")[1], adsClickLog); class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="62"> class="hljs-ln-code"> class="hljs-ln-line"> } class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="63"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="64"> class="hljs-ln-code"> class="hljs-ln-line"> }); class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="65"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="66"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="67"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="68"> class="hljs-ln-code"> class="hljs-ln-line"> JavaDStream validAdsClickLogDStream = userAdsClickLogDStream.transform( class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="69"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="70"> class="hljs-ln-code"> class="hljs-ln-line"> new Function, JavaRDD>() { class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="71"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="72"> class="hljs-ln-code"> class="hljs-ln-line"> private static final long serialVersionUID = 1L; class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="73"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="74"> class="hljs-ln-code"> class="hljs-ln-line"> @Override class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="75"> class="hljs-ln-code"> class="hljs-ln-line"> public JavaRDD call(JavaPairRDD userAdsClickLogRDD) class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="76"> class="hljs-ln-code"> class="hljs-ln-line"> throws Exception { class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="77"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="78"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="79"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="80"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="81"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="82"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="83"> class="hljs-ln-code"> class="hljs-ln-line"> JavaPairRDD>> joinedRDD = class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="84"> class="hljs-ln-code"> class="hljs-ln-line"> userAdsClickLogRDD.leftOuterJoin(blacklistRDD); class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="85"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="86"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="87"> class="hljs-ln-code"> class="hljs-ln-line"> JavaPairRDD>> filteredRDD = class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="88"> class="hljs-ln-code"> class="hljs-ln-line"> joinedRDD.filter( class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="89"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="90"> class="hljs-ln-code"> class="hljs-ln-line"> new Function class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="91"> class="hljs-ln-code"> class="hljs-ln-line"> Tuple2>>, Boolean>() { class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="92"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="93"> class="hljs-ln-code"> class="hljs-ln-line"> private static final long serialVersionUID = 1L; class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="94"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="95"> class="hljs-ln-code"> class="hljs-ln-line"> @Override class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="96"> class="hljs-ln-code"> class="hljs-ln-line"> public Boolean call( class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="97"> class="hljs-ln-code"> class="hljs-ln-line"> Tuple2 class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="98"> class="hljs-ln-code"> class="hljs-ln-line"> Tuple2>> tuple) class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="99"> class="hljs-ln-code"> class="hljs-ln-line"> throws Exception { class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="100"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="101"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="102"> class="hljs-ln-code"> class="hljs-ln-line"> if(tuple._2._2().isPresent() && class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="103"> class="hljs-ln-code"> class="hljs-ln-line"> tuple._2._2.get()) { class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="104"> class="hljs-ln-code"> class="hljs-ln-line"> return false; class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="105"> class="hljs-ln-code"> class="hljs-ln-line"> } class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="106"> class="hljs-ln-code"> class="hljs-ln-line"> return true; class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="107"> class="hljs-ln-code"> class="hljs-ln-line"> } class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="108"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="109"> class="hljs-ln-code"> class="hljs-ln-line"> }); class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="110"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="111"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="112"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="113"> class="hljs-ln-code"> class="hljs-ln-line"> JavaRDD validAdsClickLogRDD = filteredRDD.map( class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="114"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="115"> class="hljs-ln-code"> class="hljs-ln-line"> new Function>>, String>() { class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="116"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="117"> class="hljs-ln-code"> class="hljs-ln-line"> private static final long serialVersionUID = 1L; class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="118"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="119"> class="hljs-ln-code"> class="hljs-ln-line"> @Override class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="120"> class="hljs-ln-code"> class="hljs-ln-line"> public String call( class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="121"> class="hljs-ln-code"> class="hljs-ln-line"> Tuple2>> tuple) class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="122"> class="hljs-ln-code"> class="hljs-ln-line"> throws Exception { class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="123"> class="hljs-ln-code"> class="hljs-ln-line"> return tuple._2._1; class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="124"> class="hljs-ln-code"> class="hljs-ln-line"> } class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="125"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="126"> class="hljs-ln-code"> class="hljs-ln-line"> }); class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="127"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="128"> class="hljs-ln-code"> class="hljs-ln-line"> return validAdsClickLogRDD; class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="129"> class="hljs-ln-code"> class="hljs-ln-line"> } class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="130"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="131"> class="hljs-ln-code"> class="hljs-ln-line"> }); class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="132"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="133"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="134"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="135"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="136"> class="hljs-ln-code"> class="hljs-ln-line"> validAdsClickLogDStream.print(); class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="137"> class="hljs-ln-code"> class="hljs-ln-line"> class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="138"> class="hljs-ln-code"> class="hljs-ln-line"> jssc.start(); class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="139"> class="hljs-ln-code"> class="hljs-ln-line"> jssc.awaitTermination(); class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="140"> class="hljs-ln-code"> class="hljs-ln-line"> jssc.close(); class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="141"> class="hljs-ln-code"> class="hljs-ln-line"> } class="hljs-ln-numbers"> class="hljs-ln-line hljs-ln-n" data-line-number="142"> class="hljs-ln-code"> class="hljs-ln-line">} class="hide-preCode-box">
class="hljs-button signin active" data-title="登录复制" data-report-click="{"spm":"1001.2101.3001.4334"}" onclick="hljs.signin(event)">
scala版本:
package cn.spark.study.streaming
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
/**
object TransformBlacklist {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local[2]")
.setAppName("TransformBlacklist")
val ssc = new StreamingContext(conf, Seconds(5))
val blacklist = Array(("tom", true))
val blacklistRDD = ssc.sparkContext.parallelize(blacklist, 5)
val adsClickLogDStream = ssc.socketTextStream("spark1", 9999)
val userAdsClickLogDStream = adsClickLogDStream
.map { adsClickLog => (adsClickLog.split(" ")(1), adsClickLog) }
val validAdsClickLogDStream = userAdsClickLogDStream.transform(userAdsClickLogRDD => {
val joinedRDD = userAdsClickLogRDD.leftOuterJoin(blacklistRDD)
val filteredRDD = joinedRDD.filter(tuple => {
if(tuple._2._2.getOrElse(false)) {
false
} else {
true
}
})
val validAdsClickLogRDD = filteredRDD.map(tuple => tuple._2._1)
validAdsClickLogRDD
})
validAdsClickLogDStream.print()
ssc.start()
ssc.awaitTermination()
}
}
class="hide-preCode-box">
运行步骤:
1.启动nc
nc -lk 9999
2.本地运行,直接在IDE中运行程序
运行结果:

文章最后,给大家推荐一些受欢迎的技术博客链接:
- Hadoop相关技术博客链接
- Spark 核心技术链接
- JAVA相关的深度技术博客链接
- 超全干货--Flink思维导图,花了3周左右编写、校对
- 深入JAVA 的JVM核心原理解决线上各种故障【附案例】
- 请谈谈你对volatile的理解?--最近小李子与面试官的一场“硬核较量”
- 聊聊RPC通信,经常被问到的一道面试题。源码+笔记,包懂
欢迎扫描下方的二维码或 搜索 公众号“10点进修”,我们会有更多、且及时的资料推送给您,欢迎多多交流!


>>
评论记录:
回复评论: