首页 最新 热门 推荐

  • 首页
  • 最新
  • 热门
  • 推荐

5、Flink 的 source、transformations、sink的详细示例(一)Flink 系列文章一、source-transformations-sink示例

  • 23-09-04 15:56
  • 2306
  • 11260
blog.csdn.net

Flink 系列文章

1、Flink1.12.7或1.13.5详细介绍及本地安装部署、验证
2、Flink1.13.5二种部署方式(Standalone、Standalone HA )、四种提交任务方式(前两种及session和per-job)验证详细步骤
3、flink重要概念(api分层、角色、执行流程、执行图和编程模型)及dataset、datastream详细示例入门和提交任务至on yarn运行
4、介绍Flink的流批一体、transformations的18种算子详细介绍、Flink与Kafka的source、sink介绍
5、Flink 的 source、transformations、sink的详细示例(一)
5、Flink的source、transformations、sink的详细示例(二)-source和transformation示例
5、Flink的source、transformations、sink的详细示例(三)-sink示例
6、Flink四大基石之Window详解与详细示例(一)
6、Flink四大基石之Window详解与详细示例(二)
7、Flink四大基石之Time和WaterMaker详解与详细示例(watermaker基本使用、kafka作为数据源的watermaker使用示例以及超出最大允许延迟数据的接收实现)
8、Flink四大基石之State概念、使用场景、持久化、批处理的详解与keyed state和operator state、broadcast state使用和详细示例
9、Flink四大基石之Checkpoint容错机制详解及示例(checkpoint配置、重启策略、手动恢复checkpoint和savepoint)
10、Flink的source、transformations、sink的详细示例(二)-source和transformation示例【补充示例】
11、Flink配置flink-conf.yaml详细说明(HA配置、checkpoint、web、安全、zookeeper、historyserver、workers、zoo.cfg)
12、Flink source和sink 的 clickhouse 详细示例

13、Flink 的table api和sql的介绍、示例等系列综合文章链接


文章目录

  • Flink 系列文章
  • 一、source-transformations-sink示例
    • 1、maven依赖
    • 2、source示例
      • 1)、基于Collection
      • 2)、基于文件
      • 3)、基于socket
    • 3、transformations示例
    • 4、sink示例


本文介绍了source、transformations和sink的基本用法,下一篇将介绍各自的自定义用法。

一、source-transformations-sink示例

1、maven依赖

下文中所有示例都是用该maven依赖,除非有特殊说明的情况。

<properties>
        <encoding>UTF-8encoding>
        <project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
        <maven.compiler.source>1.8maven.compiler.source>
        <maven.compiler.target>1.8maven.compiler.target>
        <java.version>1.8java.version>
        <scala.version>2.12scala.version>
        <flink.version>1.12.0flink.version>
    properties>

    <dependencies>
            <dependency>
            <groupId>jdk.toolsgroupId>
            <artifactId>jdk.toolsartifactId>
            <version>1.8version>
            <scope>systemscope>
            <systemPath>${JAVA_HOME}/lib/tools.jarsystemPath>
        dependency>
        <dependency>
            <groupId>org.apache.flinkgroupId>
            <artifactId>flink-clients_2.12artifactId>
            <version>${flink.version}version>
        dependency>
        <dependency>
            <groupId>org.apache.flinkgroupId>
            <artifactId>flink-scala_2.12artifactId>
            <version>${flink.version}version>
        dependency>
        <dependency>
            <groupId>org.apache.flinkgroupId>
            <artifactId>flink-javaartifactId>
            <version>${flink.version}version>
        dependency>
        <dependency>
            <groupId>org.apache.flinkgroupId>
            <artifactId>flink-streaming-scala_2.12artifactId>
            <version>${flink.version}version>
        dependency>
        <dependency>
            <groupId>org.apache.flinkgroupId>
            <artifactId>flink-streaming-java_2.12artifactId>
            <version>${flink.version}version>
        dependency>
        <dependency>
            <groupId>org.apache.flinkgroupId>
            <artifactId>flink-table-api-scala-bridge_2.12artifactId>
            <version>${flink.version}version>
        dependency>
        <dependency>
            <groupId>org.apache.flinkgroupId>
            <artifactId>flink-table-api-java-bridge_2.12artifactId>
            <version>${flink.version}version>
        dependency>
        <dependency>
            <groupId>org.apache.flinkgroupId>
            <artifactId>flink-table-planner-blink_2.12artifactId>
            <version>${flink.version}version>
        dependency>
        <dependency>
            <groupId>org.apache.flinkgroupId>
            <artifactId>flink-table-commonartifactId>
            <version>${flink.version}version>
        dependency>

        
        <dependency>
            <groupId>org.slf4jgroupId>
            <artifactId>slf4j-log4j12artifactId>
            <version>1.7.7version>
            <scope>runtimescope>
        dependency>
        <dependency>
            <groupId>log4jgroupId>
            <artifactId>log4jartifactId>
            <version>1.2.17version>
            <scope>runtimescope>
        dependency>

        <dependency>
            <groupId>org.projectlombokgroupId>
            <artifactId>lombokartifactId>
            <version>1.18.2version>
            <scope>providedscope>
        dependency>
        <dependency>
			<groupId>org.apache.hadoopgroupId>
			<artifactId>hadoop-commonartifactId>
			<version>3.1.4version>
		dependency>
		<dependency>
			<groupId>org.apache.hadoopgroupId>
			<artifactId>hadoop-clientartifactId>
			<version>3.1.4version>
		dependency>
		<dependency>
			<groupId>org.apache.hadoopgroupId>
			<artifactId>hadoop-hdfsartifactId>
			<version>3.1.4version>
		dependency>
    dependencies>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100

2、source示例

1)、基于Collection

import java.util.Arrays;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author alanchan
 *
 */
public class Source_Collection {

    /**
     * 一般用于学习测试时使用 
     * 1.env.fromElements(可变参数); 
     * 2.env.fromColletion(各种集合);
     * 3.env.generateSequence(开始,结束); 
     * 4.env.fromSequence(开始,结束);
     * 
     * @param args 基于集合
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        // env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        // source
        DataStream<String> ds1 = env.fromElements("i am alanchan", "i like flink");
        DataStream<String> ds2 = env.fromCollection(Arrays.asList("i am alanchan", "i like flink"));
        DataStream<Long> ds3 = env.generateSequence(1, 10);//已过期,使用fromSequence方法
        DataStream<Long> ds4 = env.fromSequence(1, 10);

        // transformation

        // sink
        ds1.print();
        ds2.print();
        ds3.print();
        ds4.print();

        // execute
        env.execute();
    }

}
//运行结果
3> 3
10> 10
6> 6
1> 1

9> 9

4> 4
2> 2

7> 7

5> 5

14> 1
14> 2
14> 3
14> 4
14> 5

14> 6
14> 7

11> 8
11> 9
11> 10

8> 8

11> i am alanchan
12> i like flink
10> i like flink
9> i am alanchan
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80

2)、基于文件

//自己创建测试的文件,内容都会如实的展示了,故本示例不再提供
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author alanchan
 *
 */
public class Source_File {

    /**
     * env.readTextFile(本地/HDFS文件/文件夹),压缩文件也可以
     * 
     * 
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        // env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        // source
        DataStream<String> ds1 = env.readTextFile("D:/workspace/flink1.12-java/flink1.12-java/source_transformation_sink/src/main/resources/words.txt");
        DataStream<String> ds2 = env.readTextFile("D:/workspace/flink1.12-java/flink1.12-java/source_transformation_sink/src/main/resources/input/distribute_cache_student");
        DataStream<String> ds3 = env.readTextFile("D:/workspace/flink1.12-java/flink1.12-java/source_transformation_sink/src/main/resources/words.tar.gz");
        DataStream<String> ds4 = env.readTextFile("hdfs://server2:8020///flinktest/wc-1688627439219");

        // transformation

        // sink
        ds1.print();
        ds2.print();
        ds3.print();
        ds4.print();

        // execute
        env.execute();

    }

}
// 运行结果
//单个文件
8> i am alanchan
12> i like flink
1> and you ?

//目录
15> 1, "英文", 90
1> 2, "数学", 70
11> and you ?
8> i am alanchan
9> i like flink
3> 3, "英文", 86
13> 1, "数学", 90
5> 1, "语文", 50

//tar.gz
12> i am alanchan
12> i like flink
12> and you ?

//hdfs
6> (hive,2)
10> (flink,4)
15> (hadoop,3)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68

3)、基于socket

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @author alanchan
 *         在192.168.10.42上使用nc -lk 9999 向指定端口发送数据
 *         nc是netcat的简称,原本是用来设置路由器,我们可以利用它向某个端口发送数据 
 *         如果没有该命令可以下安装 yum install -y nc
 *         
 */
public class Source_Socket {

    /**
     * @param args
     * @throws Exception 
     */
    public static void main(String[] args) throws Exception {
        //env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        //source
        DataStream<String> lines = env.socketTextStream("192.168.10.42", 9999);
        
        //transformation
        

        //sink
        lines.print();

        //execute
        env.execute();
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

验证步骤:
1、先启动该程序
2、192.168.10.42上输入
在这里插入图片描述
3、观察应用程序的控制台输出
在这里插入图片描述

3、transformations示例

对流数据中的单词进行统计,排除敏感词vx:alanchanchn

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * @author alanchan
 *
 */
public class TransformationDemo {

    /**
     * @param args
     * @throws Exception 
     */
    public static void main(String[] args) throws Exception {
        // env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        // source
        DataStream<String> lines = env.socketTextStream("192.168.10.42", 9999);

        // transformation
        DataStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String value, Collector<String> out) throws Exception {
                String[] arr = value.split(",");
                for (String word : arr) {
                    out.collect(word);
                }
            }
        });

        DataStream<String> filted = words.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String value) throws Exception {
                return !value.equals("vx:alanchanchn");// 如果是vx:alanchanchn则返回false表示过滤掉
            }
        });

        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOne = filted
                .map(new MapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(String value) throws Exception {
                        return Tuple2.of(value, 1);
                    }
                });
        KeyedStream<Tuple2<String, Integer>, String> grouped = wordAndOne.keyBy(t -> t.f0);

        SingleOutputStreamOperator<Tuple2<String, Integer>> result = grouped.sum(1);



        // sink
        result.print();

        // execute
        env.execute();

    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70

验证步骤
1、启动nc
nc -lk 9999
2、启动应用程序
3、在192.168.10.42中输入测试数据,如下
在这里插入图片描述
4、观察应用程序的控制台,截图如下
在这里插入图片描述

4、sink示例

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author alanchan 
 */
public class SinkDemo {

    /**
     * @param args
     * @throws Exception
     */
    public static void main(String[] args) throws Exception {
        // env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        // source
        DataStream<String> ds = env.readTextFile("D:/workspace/flink1.12-java/flink1.12-java/source_transformation_sink/src/main/resources/words.txt");
        System.setProperty("HADOOP_USER_NAME", "alanchan");
        // transformation
  
        // sink
//        ds.print();
//        ds.print("输出标识");

        // 并行度与写出的文件个数有关,一个并行度写一个文件,多个并行度写多个文件

        ds.writeAsText("hdfs://server2:8020///flinktest/words").setParallelism(1);

        // execute
        env.execute();
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

并行度为1的输出,到hdfs看结果
在这里插入图片描述
并行度为2的时候,生成了2个文件,内容如下
在这里插入图片描述
在这里插入图片描述
以上,简单的介绍了source、transformations和sink的使用示例。

注:本文转载自blog.csdn.net的一瓢一瓢的饮 alanchan的文章"https://blog.csdn.net/chenwewi520feng/article/details/131592673"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
复制链接
复制链接
相关推荐
发表评论
登录后才能发表评论和回复 注册

/ 登录

评论记录:

未查询到任何数据!
回复评论:

分类栏目

后端 (14832) 前端 (14280) 移动开发 (3760) 编程语言 (3851) Java (3904) Python (3298) 人工智能 (10119) AIGC (2810) 大数据 (3499) 数据库 (3945) 数据结构与算法 (3757) 音视频 (2669) 云原生 (3145) 云平台 (2965) 前沿技术 (2993) 开源 (2160) 小程序 (2860) 运维 (2533) 服务器 (2698) 操作系统 (2325) 硬件开发 (2492) 嵌入式 (2955) 微软技术 (2769) 软件工程 (2056) 测试 (2865) 网络空间安全 (2948) 网络与通信 (2797) 用户体验设计 (2592) 学习和成长 (2593) 搜索 (2744) 开发工具 (7108) 游戏 (2829) HarmonyOS (2935) 区块链 (2782) 数学 (3112) 3C硬件 (2759) 资讯 (2909) Android (4709) iOS (1850) 代码人生 (3043) 阅读 (2841)

热门文章

101
推荐
关于我们 隐私政策 免责声明 联系我们
Copyright © 2020-2025 蚁人论坛 (iYenn.com) All Rights Reserved.
Scroll to Top