首页 最新 热门 推荐

  • 首页
  • 最新
  • 热门
  • 推荐

12.transform以及实时黑名单过滤案例实战

  • 25-03-07 20:21
  • 4212
  • 5645
blog.csdn.net

transform以及实时黑名单过滤案例实战

transform操作,应用在DStream上时,可以用于执行任意的RDD到RDD的转换操作。它可以用于实现,DStream API中所没有提供的操作。比如说,DStream API中,并没有提供将一个DStream中的每个batch,与一个特定的RDD进行join的操作。但是我们自己就可以使用transform操作来实现该功能。

DStream.join(),只能join其他DStream。在DStream每个batch的RDD计算出来之后,会去跟其他DStream的RDD进行join。

案例:广告计费日志实时黑名单过滤

java版本:

  1. package cn.spark.study.streaming;
  2. import java.util.ArrayList;
  3. import java.util.List;
  4. import org.apache.spark.SparkConf;
  5. import org.apache.spark.api.java.JavaPairRDD;
  6. import org.apache.spark.api.java.JavaRDD;
  7. import org.apache.spark.api.java.function.Function;
  8. import org.apache.spark.api.java.function.PairFunction;
  9. import org.apache.spark.streaming.Durations;
  10. import org.apache.spark.streaming.api.java.JavaDStream;
  11. import org.apache.spark.streaming.api.java.JavaPairDStream;
  12. import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
  13. import org.apache.spark.streaming.api.java.JavaStreamingContext;
  14. import com.google.common.base.Optional;
  15. import scala.Tuple2;
  16. /**
  17. * 基于transform的实时广告计费日志黑名单过滤
  18. * 这里案例,完全脱胎于真实的广告业务的大数据系统,业务是真实的,实用
  19. * @author Administrator
  20. *
  21. */
  22. public class TransformBlacklist {
  23. @SuppressWarnings("deprecation")
  24. public static void main(String[] args) {
  25. SparkConf conf = new SparkConf()
  26. .setMaster("local[2]")
  27. .setAppName("TransformBlacklist");
  28. JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
  29. // 用户对我们的网站上的广告可以进行点击
  30. // 点击之后,是不是要进行实时计费,点一下,算一次钱
  31. // 但是,对于那些帮助某些无良商家刷广告的人,那么我们有一个黑名单
  32. // 只要是黑名单中的用户点击的广告,我们就给过滤掉
  33. // 先做一份模拟的黑名单RDD
  34. List> blacklist = new ArrayList>();
  35. blacklist.add(new Tuple2("tom", true));
  36. final JavaPairRDD blacklistRDD = jssc.sc().parallelizePairs(blacklist);
  37. // 这里的日志格式,就简化一下,就是date username的方式
  38. JavaReceiverInputDStream adsClickLogDStream = jssc.socketTextStream("spark1", 9999);
  39. // 所以,要先对输入的数据,进行一下转换操作,变成,(username, date username)
  40. // 以便于,后面对每个batch RDD,与定义好的黑名单RDD进行join操作
  41. JavaPairDStream userAdsClickLogDStream = adsClickLogDStream.mapToPair(
  42. new PairFunction() {
  43. private static final long serialVersionUID = 1L;
  44. @Override
  45. public Tuple2 call(String adsClickLog)
  46. throws Exception {
  47. return new Tuple2(
  48. adsClickLog.split(" ")[1], adsClickLog);
  49. }
  50. });
  51. // 然后,就可以执行transform操作了,将每个batch的RDD,与黑名单RDD进行join、filter、map等操作
  52. // 实时进行黑名单过滤
  53. JavaDStream validAdsClickLogDStream = userAdsClickLogDStream.transform(
  54. new Function, JavaRDD>() {
  55. private static final long serialVersionUID = 1L;
  56. @Override
  57. public JavaRDD call(JavaPairRDD userAdsClickLogRDD)
  58. throws Exception {
  59. // 这里为什么用左外连接?
  60. // 因为,并不是每个用户都存在于黑名单中的
  61. // 所以,如果直接用join,那么没有存在于黑名单中的数据,会无法join到
  62. // 就给丢弃掉了
  63. // 所以,这里用leftOuterJoin,就是说,哪怕一个user不在黑名单RDD中,没有join到
  64. // 也还是会被保存下来的
  65. JavaPairRDD>> joinedRDD =
  66. userAdsClickLogRDD.leftOuterJoin(blacklistRDD);
  67. // 连接之后,执行filter算子
  68. JavaPairRDD>> filteredRDD =
  69. joinedRDD.filter(
  70. new Function
  71. Tuple2>>, Boolean>() {
  72. private static final long serialVersionUID = 1L;
  73. @Override
  74. public Boolean call(
  75. Tuple2
  76. Tuple2>> tuple)
  77. throws Exception {
  78. // 这里的tuple,就是每个用户,对应的访问日志,和在黑名单中
  79. // 的状态
  80. if(tuple._2._2().isPresent() &&
  81. tuple._2._2.get()) {
  82. return false;
  83. }
  84. return true;
  85. }
  86. });
  87. // 此时,filteredRDD中,就只剩下没有被黑名单过滤的用户点击了
  88. // 进行map操作,转换成我们想要的格式
  89. JavaRDD validAdsClickLogRDD = filteredRDD.map(
  90. new Function>>, String>() {
  91. private static final long serialVersionUID = 1L;
  92. @Override
  93. public String call(
  94. Tuple2>> tuple)
  95. throws Exception {
  96. return tuple._2._1;
  97. }
  98. });
  99. return validAdsClickLogRDD;
  100. }
  101. });
  102. // 打印有效的广告点击日志
  103. // 其实在真实企业场景中,这里后面就可以走写入kafka、ActiveMQ等这种中间件消息队列
  104. // 然后再开发一个专门的后台服务,作为广告计费服务,执行实时的广告计费,这里就是只拿到了有效的广告点击
  105. validAdsClickLogDStream.print();
  106. jssc.start();
  107. jssc.awaitTermination();
  108. jssc.close();
  109. }
  110. }

scala版本:

 

package cn.spark.study.streaming
 
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
 
/**
 * @author Administrator
 */
object TransformBlacklist {
  
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
        .setMaster("local[2]")  
        .setAppName("TransformBlacklist")
    val ssc = new StreamingContext(conf, Seconds(5))
    
    val blacklist = Array(("tom", true))  
    val blacklistRDD = ssc.sparkContext.parallelize(blacklist, 5)  
    
    val adsClickLogDStream = ssc.socketTextStream("spark1", 9999)   
    val userAdsClickLogDStream = adsClickLogDStream
        .map { adsClickLog => (adsClickLog.split(" ")(1), adsClickLog) }
    
    val validAdsClickLogDStream = userAdsClickLogDStream.transform(userAdsClickLogRDD => {
      val joinedRDD = userAdsClickLogRDD.leftOuterJoin(blacklistRDD)
      val filteredRDD = joinedRDD.filter(tuple => {
        if(tuple._2._2.getOrElse(false)) {  
          false
        } else {
          true
        }
      })
      val validAdsClickLogRDD = filteredRDD.map(tuple => tuple._2._1)
      validAdsClickLogRDD
    })
    
    validAdsClickLogDStream.print()
    
    ssc.start()
    ssc.awaitTermination()
  }
}

运行步骤:

 

1.启动nc

nc -lk 9999

2.本地运行,直接在IDE中运行程序

运行结果:

 

 


文章最后,给大家推荐一些受欢迎的技术博客链接:

  1. Hadoop相关技术博客链接
  2. Spark 核心技术链接
  3. JAVA相关的深度技术博客链接
  4. 超全干货--Flink思维导图,花了3周左右编写、校对
  5. 深入JAVA 的JVM核心原理解决线上各种故障【附案例】
  6. 请谈谈你对volatile的理解?--最近小李子与面试官的一场“硬核较量”
  7. 聊聊RPC通信,经常被问到的一道面试题。源码+笔记,包懂

 


欢迎扫描下方的二维码或 搜索 公众号“10点进修”,我们会有更多、且及时的资料推送给您,欢迎多多交流!

                                           

       

注:本文转载自blog.csdn.net的不埋雷的探长的文章"https://blog.csdn.net/weixin_32265569/article/details/78571497"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
复制链接
复制链接
相关推荐
发表评论
登录后才能发表评论和回复 注册

/ 登录

评论记录:

未查询到任何数据!
回复评论:

分类栏目

后端 (14832) 前端 (14280) 移动开发 (3760) 编程语言 (3851) Java (3904) Python (3298) 人工智能 (10119) AIGC (2810) 大数据 (3499) 数据库 (3945) 数据结构与算法 (3757) 音视频 (2669) 云原生 (3145) 云平台 (2965) 前沿技术 (2993) 开源 (2160) 小程序 (2860) 运维 (2533) 服务器 (2698) 操作系统 (2325) 硬件开发 (2492) 嵌入式 (2955) 微软技术 (2769) 软件工程 (2056) 测试 (2865) 网络空间安全 (2948) 网络与通信 (2797) 用户体验设计 (2592) 学习和成长 (2593) 搜索 (2744) 开发工具 (7108) 游戏 (2829) HarmonyOS (2935) 区块链 (2782) 数学 (3112) 3C硬件 (2759) 资讯 (2909) Android (4709) iOS (1850) 代码人生 (3043) 阅读 (2841)

热门文章

101
推荐
关于我们 隐私政策 免责声明 联系我们
Copyright © 2020-2025 蚁人论坛 (iYenn.com) All Rights Reserved.
Scroll to Top