首页 最新 热门 推荐

  • 首页
  • 最新
  • 热门
  • 推荐

Flink实战 —— 读取Kafka数据并与MySQL数据关联【附源码】

  • 25-03-07 20:21
  • 4463
  • 11793
blog.csdn.net

目录

一、功能需求说明

二、前期准备工作

2.1 需要导入mysql驱动

2.2 mysql建立表及插入数据

2.3 启动Kafka的topic

2.4 编写 Flink相关应用代码

三、 启动Flink 应用程序及向Kafka生产数据 

3.1 可以在idea本地启动 C01_QueryActivityName 

3.2 通过向Kafka-producer生产数据


一、功能需求说明

Flink 从 Kafka消息中间件中取数据,有活动id 无活动名称Name,Flink 通过与MySQL关联到活动名称返回

二、前期准备工作

2.1 需要导入mysql驱动

  1. mysql
  2. mysql-connector-java
  3. 5.1.44

2.2 mysql建立表及插入数据

  1. DROP DATABASE IF EXISTS `flink_big_data`; -- 库名与项目名保持一致
  2. CREATE DATABASE IF NOT EXISTS `flink_big_data` DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci;
  3. USE `flink_big_data`;
  4. -- 活动列表
  5. DROP TABLE IF EXISTS `t_activities`;
  6. CREATE TABLE `t_activities` (
  7. `id` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '主键id, 必备字段',
  8. `gmt_create` DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间, 必备字段',
  9. `gmt_modified` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间, 必备字段',
  10. `a_id` VARCHAR(100) NOT NULL COMMENT '活动id',
  11. `name` VARCHAR(100) NOT NULL COMMENT '活动名称',
  12. `last_update` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间, 必备字段',
  13. PRIMARY KEY (`id`)
  14. ) ENGINE=INNODB DEFAULT CHARSET=utf8;
  15. -- 插入数据
  16. INSERT INTO `t_activities` (`a_id`, `name`) VALUES ('A1', '新人礼包');
  17. INSERT INTO `t_activities` (`a_id`, `name`) VALUES ('A2', '月末活动');
  18. INSERT INTO `t_activities` (`a_id`, `name`) VALUES ('A3', '周末促销');
  19. INSERT INTO `t_activities` (`a_id`, `name`) VALUES ('A4', '年度促销');

2.3 启动Kafka的topic

  1. # 创建topic
  2. kafka-topics --create --zookeeper node-01:2181,node-02:2181,node-03:2181 --replication-factor 2 --partitions 3 --topic activity10
  3. # 创建生产者
  4. kafka-console-producer --broker-list node-01:9092,node-02:9092,node-03:9092 --topic activity10

2.4 编写 Flink相关应用代码

  1. public class ActivityBean {
  2. public String uid; // userId
  3. public String aid; // activityId
  4. public String activityName;
  5. public String time;
  6. public int eventType;
  7. public double longitude;
  8. public double latitude;
  9. public String province;
  10. public int count = 1;
  11. public ActivityBean() {
  12. }
  13. public ActivityBean(String uid, String aid, String activityName, String time, int eventType, String province) {
  14. this.uid = uid;
  15. this.aid = aid;
  16. this.activityName = activityName;
  17. this.time = time;
  18. this.eventType = eventType;
  19. this.province = province;
  20. }
  21. public ActivityBean(String uid, String aid, String activityName, String time, int eventType, double longitude, double latitude, String province) {
  22. this.uid = uid;
  23. this.aid = aid;
  24. this.activityName = activityName;
  25. this.time = time;
  26. this.eventType = eventType;
  27. this.longitude = longitude;
  28. this.latitude = latitude;
  29. this.province = province;
  30. }
  31. @Override
  32. public String toString() {
  33. return "ActivityBean{" +
  34. "uid='" + uid + '\'' +
  35. ", aid='" + aid + '\'' +
  36. ", activityName='" + activityName + '\'' +
  37. ", time='" + time + '\'' +
  38. ", eventType=" + eventType +
  39. ", longitude=" + longitude +
  40. ", latitude=" + latitude +
  41. ", province='" + province + '\'' +
  42. ", count=" + count +
  43. '}';
  44. }
  45. public static ActivityBean of(String uid, String aid, String activityName, String time, int eventType, String province) {
  46. return new ActivityBean(uid, aid, activityName, time, eventType, province);
  47. }
  48. public static ActivityBean of(String uid, String aid, String activityName, String time, int eventType, double longitude, double latitude, String province) {
  49. return new ActivityBean(uid, aid, activityName, time, eventType, longitude, latitude, province);
  50. }
  51. }
  1. public class C01_DataToActivityBeanFunction extends RichMapFunction {
  2. private Connection connection = null;
  3. @Override
  4. public void open(Configuration parameters) throws Exception {
  5. super.open(parameters);
  6. // 创建MySQL连接
  7. // 这里不应该对异常进行捕获,让Flink自行处理,比如重启之类的
  8. // 如果捕获异常了,则Flink无法捕获到该异常
  9. String url = "jdbc:mysql://localhost:3306/flink_big_data?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&failOverReadOnly=false";
  10. String user = "root";
  11. String password = "1234";
  12. connection = DriverManager.getConnection(url, user, password);
  13. }
  14. @Override
  15. public ActivityBean map(String line) throws Exception {
  16. String[] fields = line.split(",");
  17. String uid = fields[0];
  18. String aid = fields[1];
  19. // 根据aid作为查询条件查询出name
  20. // 最好使用简单的关联查询,MySQL也可以进行关联查询
  21. PreparedStatement preparedStatement = connection.prepareStatement("SELECT name FROM t_activities WHERE a_id = ?");
  22. preparedStatement.setString(1, aid);
  23. ResultSet resultSet = preparedStatement.executeQuery();
  24. String name = null;
  25. while (resultSet.next()) {
  26. name = resultSet.getString(1);
  27. }
  28. String time = fields[2];
  29. int eventType = Integer.parseInt(fields[3]);
  30. String province = fields[4];
  31. return ActivityBean.of(uid, aid, name, time, eventType, province);
  32. }
  33. @Override
  34. public void close() throws Exception {
  35. super.close();
  36. if (connection != null) {
  37. connection.close();
  38. }
  39. }
  40. }
  1. public class C01_QueryActivityName {
  2. public static void main(String[] args) throws Exception {
  3. // topic:activity10 分区3,副本2
  4. // # 创建topic
  5. // kafka-topics --create --zookeeper node-01:2181,node-02:2181,node-03:2181 --replication-factor 2 --partitions 3 --topic activity10
  6. // # 创建生产者
  7. // kafka-console-producer --broker-list node-01:9092,node-02:9092,node-03:9092 --topic activity10
  8. // 输入参数:activity10 group_id_flink node-01:9092,node-02:9092,node-03:9092
  9. DataStream lines = FlinkUtilsV1.createKafkaStream(args, new SimpleStringSchema());
  10. SingleOutputStreamOperator beans = lines.map(new C01_DataToActivityBeanFunction());
  11. beans.print();
  12. FlinkUtilsV1.getEnv().execute("C01_QueryActivityName");
  13. }
  14. }

三、 启动Flink 应用程序及向Kafka生产数据 

3.1 可以在idea本地启动 C01_QueryActivityName 

3.2 通过向Kafka-producer生产数据

  1. u001,A1,2019-09-02 10:10:11,1,北京市
  2. u001,A2,2019-09-02 10:10:11,1,北京市
  3. u001,A3,2019-09-02 10:10:11,1,北京市
  4. u001,A4,2019-09-02 10:10:11,1,北京市
  5. u002,A1,2019-09-02 10:11:11,1,辽宁省
  6. u001,A1,2019-09-02 10:11:11,2,北京市
  7. u001,A1,2019-09-02 10:11:30,3,北京市
  8. u002,A1,2019-09-02 10:12:11,2,辽宁省
  9. u001,A1,2019-09-02 10:10:11,1,北京市
  10. u001,A1,2019-09-02 10:10:11,1,北京市
  11. u001,A1,2019-09-02 10:10:11,1,北京市
  12. u001,A1,2019-09-02 10:10:11,1,北京市

运行结果如下: 发送的数据是活动id,Flink 通过活动id从MySQL获取活动名称

 


文章最后,给大家推荐一些受欢迎的技术博客链接:

  1. JAVA相关的深度技术博客链接
  2. Flink 相关技术博客链接
  3. Spark 核心技术链接
  4. 设计模式 —— 深度技术博客链接
  5. 机器学习 —— 深度技术博客链接
  6. Hadoop相关技术博客链接
  7. 超全干货--Flink思维导图,花了3周左右编写、校对
  8. 深入JAVA 的JVM核心原理解决线上各种故障【附案例】
  9. 请谈谈你对volatile的理解?--最近小李子与面试官的一场“硬核较量”
  10. 聊聊RPC通信,经常被问到的一道面试题。源码+笔记,包懂
  11. 深入聊聊Java 垃圾回收机制【附原理图及调优方法】

欢迎扫描下方的二维码或 搜索 公众号“大数据高级架构师”,我们会有更多、且及时的资料推送给您,欢迎多多交流!

                                           

       

 

注:本文转载自blog.csdn.net的不埋雷的探长的文章"https://blog.csdn.net/weixin_32265569/article/details/108367968"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
复制链接
复制链接
相关推荐
发表评论
登录后才能发表评论和回复 注册

/ 登录

评论记录:

未查询到任何数据!
回复评论:

分类栏目

后端 (14832) 前端 (14280) 移动开发 (3760) 编程语言 (3851) Java (3904) Python (3298) 人工智能 (10119) AIGC (2810) 大数据 (3499) 数据库 (3945) 数据结构与算法 (3757) 音视频 (2669) 云原生 (3145) 云平台 (2965) 前沿技术 (2993) 开源 (2160) 小程序 (2860) 运维 (2533) 服务器 (2698) 操作系统 (2325) 硬件开发 (2492) 嵌入式 (2955) 微软技术 (2769) 软件工程 (2056) 测试 (2865) 网络空间安全 (2948) 网络与通信 (2797) 用户体验设计 (2592) 学习和成长 (2593) 搜索 (2744) 开发工具 (7108) 游戏 (2829) HarmonyOS (2935) 区块链 (2782) 数学 (3112) 3C硬件 (2759) 资讯 (2909) Android (4709) iOS (1850) 代码人生 (3043) 阅读 (2841)

热门文章

101
推荐
关于我们 隐私政策 免责声明 联系我们
Copyright © 2020-2025 蚁人论坛 (iYenn.com) All Rights Reserved.
Scroll to Top