目录
3.1 可以在idea本地启动 C01_QueryActivityName
一、功能需求说明
Flink 从 Kafka消息中间件中取数据,有活动id 无活动名称Name,Flink 通过与MySQL关联到活动名称返回
二、前期准备工作
2.1 需要导入mysql驱动
-
-
mysql -
mysql-connector-java -
5.1.44 -
2.2 mysql建立表及插入数据
- DROP DATABASE IF EXISTS `flink_big_data`; -- 库名与项目名保持一致
- CREATE DATABASE IF NOT EXISTS `flink_big_data` DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci;
-
- USE `flink_big_data`;
-
- -- 活动列表
- DROP TABLE IF EXISTS `t_activities`;
- CREATE TABLE `t_activities` (
- `id` BIGINT UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '主键id, 必备字段',
- `gmt_create` DATETIME DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间, 必备字段',
- `gmt_modified` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间, 必备字段',
-
- `a_id` VARCHAR(100) NOT NULL COMMENT '活动id',
- `name` VARCHAR(100) NOT NULL COMMENT '活动名称',
- `last_update` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间, 必备字段',
- PRIMARY KEY (`id`)
- ) ENGINE=INNODB DEFAULT CHARSET=utf8;
-
- -- 插入数据
- INSERT INTO `t_activities` (`a_id`, `name`) VALUES ('A1', '新人礼包');
- INSERT INTO `t_activities` (`a_id`, `name`) VALUES ('A2', '月末活动');
- INSERT INTO `t_activities` (`a_id`, `name`) VALUES ('A3', '周末促销');
- INSERT INTO `t_activities` (`a_id`, `name`) VALUES ('A4', '年度促销');
2.3 启动Kafka的topic
- # 创建topic
- kafka-topics --create --zookeeper node-01:2181,node-02:2181,node-03:2181 --replication-factor 2 --partitions 3 --topic activity10
-
- # 创建生产者
- kafka-console-producer --broker-list node-01:9092,node-02:9092,node-03:9092 --topic activity10
2.4 编写 Flink相关应用代码
- public class ActivityBean {
-
- public String uid; // userId
- public String aid; // activityId
- public String activityName;
- public String time;
- public int eventType;
- public double longitude;
- public double latitude;
- public String province;
- public int count = 1;
-
- public ActivityBean() {
- }
-
- public ActivityBean(String uid, String aid, String activityName, String time, int eventType, String province) {
- this.uid = uid;
- this.aid = aid;
- this.activityName = activityName;
- this.time = time;
- this.eventType = eventType;
- this.province = province;
- }
-
- public ActivityBean(String uid, String aid, String activityName, String time, int eventType, double longitude, double latitude, String province) {
- this.uid = uid;
- this.aid = aid;
- this.activityName = activityName;
- this.time = time;
- this.eventType = eventType;
- this.longitude = longitude;
- this.latitude = latitude;
- this.province = province;
- }
-
- @Override
- public String toString() {
- return "ActivityBean{" +
- "uid='" + uid + '\'' +
- ", aid='" + aid + '\'' +
- ", activityName='" + activityName + '\'' +
- ", time='" + time + '\'' +
- ", eventType=" + eventType +
- ", longitude=" + longitude +
- ", latitude=" + latitude +
- ", province='" + province + '\'' +
- ", count=" + count +
- '}';
- }
-
- public static ActivityBean of(String uid, String aid, String activityName, String time, int eventType, String province) {
- return new ActivityBean(uid, aid, activityName, time, eventType, province);
- }
-
- public static ActivityBean of(String uid, String aid, String activityName, String time, int eventType, double longitude, double latitude, String province) {
- return new ActivityBean(uid, aid, activityName, time, eventType, longitude, latitude, province);
- }
- }
- public class C01_DataToActivityBeanFunction extends RichMapFunction
{ -
- private Connection connection = null;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
-
- // 创建MySQL连接
- // 这里不应该对异常进行捕获,让Flink自行处理,比如重启之类的
- // 如果捕获异常了,则Flink无法捕获到该异常
- String url = "jdbc:mysql://localhost:3306/flink_big_data?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&failOverReadOnly=false";
- String user = "root";
- String password = "1234";
- connection = DriverManager.getConnection(url, user, password);
- }
-
- @Override
- public ActivityBean map(String line) throws Exception {
- String[] fields = line.split(",");
-
- String uid = fields[0];
- String aid = fields[1];
-
- // 根据aid作为查询条件查询出name
- // 最好使用简单的关联查询,MySQL也可以进行关联查询
- PreparedStatement preparedStatement = connection.prepareStatement("SELECT name FROM t_activities WHERE a_id = ?");
- preparedStatement.setString(1, aid);
- ResultSet resultSet = preparedStatement.executeQuery();
- String name = null;
- while (resultSet.next()) {
- name = resultSet.getString(1);
- }
-
- String time = fields[2];
- int eventType = Integer.parseInt(fields[3]);
- String province = fields[4];
-
- return ActivityBean.of(uid, aid, name, time, eventType, province);
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- if (connection != null) {
- connection.close();
- }
- }
- }
- public class C01_QueryActivityName {
- public static void main(String[] args) throws Exception {
- // topic:activity10 分区3,副本2
- // # 创建topic
- // kafka-topics --create --zookeeper node-01:2181,node-02:2181,node-03:2181 --replication-factor 2 --partitions 3 --topic activity10
-
- // # 创建生产者
- // kafka-console-producer --broker-list node-01:9092,node-02:9092,node-03:9092 --topic activity10
-
- // 输入参数:activity10 group_id_flink node-01:9092,node-02:9092,node-03:9092
- DataStream
lines = FlinkUtilsV1.createKafkaStream(args, new SimpleStringSchema()); -
- SingleOutputStreamOperator
beans = lines.map(new C01_DataToActivityBeanFunction()); -
- beans.print();
-
- FlinkUtilsV1.getEnv().execute("C01_QueryActivityName");
-
- }
- }
三、 启动Flink 应用程序及向Kafka生产数据
3.1 可以在idea本地启动 C01_QueryActivityName
3.2 通过向Kafka-producer生产数据
- u001,A1,2019-09-02 10:10:11,1,北京市
- u001,A2,2019-09-02 10:10:11,1,北京市
- u001,A3,2019-09-02 10:10:11,1,北京市
- u001,A4,2019-09-02 10:10:11,1,北京市
- u002,A1,2019-09-02 10:11:11,1,辽宁省
- u001,A1,2019-09-02 10:11:11,2,北京市
- u001,A1,2019-09-02 10:11:30,3,北京市
- u002,A1,2019-09-02 10:12:11,2,辽宁省
- u001,A1,2019-09-02 10:10:11,1,北京市
- u001,A1,2019-09-02 10:10:11,1,北京市
- u001,A1,2019-09-02 10:10:11,1,北京市
- u001,A1,2019-09-02 10:10:11,1,北京市
运行结果如下: 发送的数据是活动id,Flink 通过活动id从MySQL获取活动名称
文章最后,给大家推荐一些受欢迎的技术博客链接:
- JAVA相关的深度技术博客链接
- Flink 相关技术博客链接
- Spark 核心技术链接
- 设计模式 —— 深度技术博客链接
- 机器学习 —— 深度技术博客链接
- Hadoop相关技术博客链接
- 超全干货--Flink思维导图,花了3周左右编写、校对
- 深入JAVA 的JVM核心原理解决线上各种故障【附案例】
- 请谈谈你对volatile的理解?--最近小李子与面试官的一场“硬核较量”
- 聊聊RPC通信,经常被问到的一道面试题。源码+笔记,包懂
- 深入聊聊Java 垃圾回收机制【附原理图及调优方法】
欢迎扫描下方的二维码或 搜索 公众号“大数据高级架构师”,我们会有更多、且及时的资料推送给您,欢迎多多交流!
评论记录:
回复评论: