Flink 系列文章
1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接
13、Flink 的table api与sql的基本概念、通用api介绍及入门示例
14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性
15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置(如何处理更新结果)、时态表、流上的join、流上的确定性以及查询配置
22、Flink 的table api与sql之创建表的DDL
文章目录
本文介绍了Table Api和Sql的概念、使用示例,接下来将针对该部分继续进行概念介绍,最后会给出综合使用示例。
本文是该系列中介绍Table API与SQL的第一篇,接下来将通过9篇内容介绍该部分内容。
本文分为3个部分,即table api和sql的概要介绍、概念及api和简单的入门示例。
一、Table API & SQL介绍
1、Table API & SQL 介绍
Apache Flink 有两种关系型 API 来做流批统一处理:Table API 和 SQL。
Table API 是用于 Scala 和 Java 语言的查询API,它可以用一种非常直观的方式来组合使用选取、过滤、join 等关系型算子。
Flink SQL 是基于 Apache Calcite 来实现的标准 SQL。无论输入是连续的(流式)还是有界的(批处理),在两个接口中指定的查询都具有相同的语义,并指定相同的结果。
Table API 和 SQL 两种 API 是紧密集成的,以及 DataStream API。可以在这些 API 之间,以及一些基于这些 API 的库之间轻松的切换。 比如,可以先用 CEP 从 DataStream 中做模式匹配,然后用 Table API 来分析匹配的结果;或者可以用 SQL 来扫描、过滤、聚合一个批式的表,然后再跑一个 Gelly 图算法 来处理已经预处理好的数据。
Flink的Table模块包括 Table API 和 SQL:
Table API 是一种类SQL的API,通过Table API,用户可以像操作表一样操作数据,非常直观和方便
SQL作为一种声明式语言,有着标准的语法和规范,用户可以不用关心底层实现即可进行数据的处理,非常易于上手
Flink Table API 和 SQL 的实现上有80%左右的代码是公用的。作为一个流批统一的计算引擎,Flink 的 Runtime 层是统一的。
2、maven依赖
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-table-api-java-bridge_2.11artifactId>
<version>1.12.7version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-table-planner-blink_2.11artifactId>
<version>1.12.7version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-streaming-scala_2.11artifactId>
<version>1.12.7version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-table-commonartifactId>
<version>1.12.7version>
<scope>providedscope>
dependency>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
3、table api与sql章节介绍
1、公共概念和 API: Table API 和 SQL 公共概念以及 API
2、数据类型: 内置数据类型以及它们的属性
3、流式概念: Table API 和 SQL 中流式相关的文档,比如配置时间属性和如何处理更新结果
4、连接外部系统: 读写外部系统的连接器和格式
5、Table API: Table API 支持的操作
6、SQL: SQL 支持的操作和语法
7、内置函数: Table API 和 SQL 中的内置函数
8、SQL Client: 不用编写代码就可以尝试 Flink SQL,可以直接提交 SQL 任务到集群上
9、table api与sql示例
二、概念与通用 API
1、两种计划器(Planner)的主要区别
两种planner是指flink1.9版本之前使用的flink-table-planner和1.11版本以后默认使用的flink-table-planner-blink。一般称为该两种planner为old planner和blink planner。
- Blink 将批处理作业视作流处理的一种特例。严格来说,Table 和 DataSet 之间不支持相互转换,并且批处理作业也不会转换成
DataSet 程序而是转换成 DataStream 程序,流处理作业也一样。 - Blink 计划器不支持 BatchTableSource,而是使用有界的 StreamTableSource 来替代。
- 旧计划器和 Blink 计划器中 FilterableTableSource 的实现是不兼容的。旧计划器会将 PlannerExpression 下推至FilterableTableSource,而 Blink 计划器则是将 Expression 下推。
- 基于字符串的键值配置选项仅在 Blink 计划器中使用。
- PlannerConfig 在两种计划器中的实现(CalciteConfig)是不同的。
- Blink 计划器会将多sink(multiple-sinks)优化成一张有向无环图(DAG),TableEnvironment 和
StreamTableEnvironment 都支持该特性。旧计划器总是将每个sink都优化成一个新的有向无环图,且所有图相互独立。 - 旧计划器目前不支持 catalog 统计数据,而 Blink 支持。
2、Table API 和 SQL 程序的结构
示例代码
// create a TableEnvironment for specific planner batch or streaming
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// create an input Table
tableEnv.executeSql("CREATE TEMPORARY TABLE tablename_test... WITH ( 'connector' = ... )");
// register an output Table
tableEnv.executeSql("CREATE TEMPORARY TABLE outputTable_test ... WITH ( 'connector' = ... )");
// create a Table object from a Table API query
Table table2 = tableEnv.from("tablename_test").select(...);
// create a Table object from a SQL query
Table table3 = tableEnv.sqlQuery("SELECT ... FROM tablename_test... ");
// emit a Table API result Table to a TableSink, same for SQL result
TableResult tableResult = table2.executeInsert("outputTable_test ");
tableResult...
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
Table API 和 SQL 查询可以很容易地集成并嵌入到 DataStream 或 DataSet 程序中。关于转换参考本文后面的章节。
3、创建 TableEnvironment
TableEnvironment 是 Table API 和 SQL 的核心概念。它负责:
- 在内部的 catalog 中注册 Table
- 注册外部的 catalog
- 加载可插拔模块
- 执行 SQL 查询
- 注册自定义函数(scalar、table 或 aggregation)
- 将 DataStream 或 DataSet 转换成 Table
- 持有对 ExecutionEnvironment 或 StreamExecutionEnvironment 的引用
Table 总是与特定的 TableEnvironment 绑定。不能在同一条查询中使用不同 TableEnvironment 中的表,例如,对它们进行 join 或 union 操作。
TableEnvironment 可以通过静态方法 BatchTableEnvironment.create() 或者 StreamTableEnvironment.create() 在 StreamExecutionEnvironment 或者 ExecutionEnvironment 中创建,TableConfig 是可选项。TableConfig可用于配置TableEnvironment或定制的查询优化和转换过程(参见 本文查询优化章节)。
请确保选择与你的编程语言匹配的特定的计划器BatchTableEnvironment/StreamTableEnvironment。
如果两种计划器的 jar 包都在 classpath 中(默认行为),你应该明确地设置要在当前程序中使用的计划器。
- Flink query
// **********************
// FLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);
// or TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings);
// ******************
// FLINK BATCH QUERY
// ******************
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- Blink query
// **********************
// BLINK STREAMING QUERY
// **********************
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
// or TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);
// ******************
// BLINK BATCH QUERY
// ******************
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
注意: 如果/lib目录中只有一种计划器的 jar 包,则可以使用useAnyPlanner创建 EnvironmentSettings。
4、在 Catalog 中创建表
TableEnvironment 维护着一个由标识符(identifier)创建的表 catalog 的映射。标识符由三个部分组成:catalog 名称、数据库名称以及对象名称。如果 catalog 或者数据库没有指明,就会使用当前默认值(参见表标识符扩展章节中的例子)。
Table 可以是虚拟的(视图 VIEWS)也可以是常规的(表 TABLES)。视图 VIEWS可以从已经存在的Table中创建,一般是 Table API 或者 SQL 的查询结果。 表TABLES描述的是外部数据,例如文件、数据库表或者消息队列。
1)、临时表(Temporary Table)和永久表(Permanent Table)
表可以是临时的,并与单个 Flink 会话(session)的生命周期相关,也可以是永久的,并且在多个 Flink 会话和群集(cluster)中可见。
永久表需要 catalog(例如 Hive Metastore)以维护表的元数据。一旦永久表被创建,它将对任何连接到 catalog 的 Flink 会话可见且持续存在,直至被明确删除。
另一方面,临时表通常保存于内存中并且仅在创建它们的 Flink 会话持续期间存在。这些表对于其它会话是不可见的。它们不与任何 catalog 或者数据库绑定但可以在一个命名空间(namespace)中创建。即使它们对应的数据库被删除,临时表也不会被删除。
可以使用与已存在的永久表相同的标识符去注册临时表。临时表会屏蔽永久表,并且只要临时表存在,永久表就无法访问。所有使用该标识符的查询都将作用于临时表。
这可能对实验(experimentation)有用。它允许先对一个临时表进行完全相同的查询,例如只有一个子集的数据,或者数据是不确定的。一旦验证了查询的正确性,就可以对实际的生产表进行查询。
2)、创建表
- 虚拟表-视图
在 SQL 的术语中,Table API 的对象对应于视图(虚拟表)。它封装了一个逻辑查询计划。它可以通过以下方法在 catalog 中创建:
// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// table is the result of a simple projection query
Table tableName= tableEnv.from("X").select(...);
// register the Table projTable as table "projectedTable"
tableEnv.createTemporaryView("projectedTable", tableName);
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
注意: 从传统数据库系统的角度来看,Table 对象与 VIEW 视图非常像。也就是,定义了 Table 的查询是没有被优化的, 而且会被内嵌到另一个引用了这个注册了的 Table的查询中。如果多个查询都引用了同一个注册了的Table,那么它会被内嵌每个查询中并被执行多次, 也就是说注册了的Table的结果不会被共享(注:Blink 计划器的TableEnvironment会优化成只执行一次)。
- Connector Tables
另外一个方式去创建 TABLE 是通过 connector 声明。Connector 描述了存储表数据的外部系统。存储系统例如 Apache Kafka 或者常规的文件系统都可以通过这种方式来声明。
tableEnvironment
.connect(...)
.withFormat(...)
.withSchema(...)
.inAppendMode()
.createTemporaryTable("tableName")
- 1
- 2
- 3
- 4
- 5
- 6
3)、扩展表标识符
表总是通过三元标识符注册,包括 catalog 名、数据库名和表名。
用户可以指定一个 catalog 和数据库作为 “当前catalog” 和”当前数据库”。有了这些,那么刚刚提到的三元标识符的前两个部分就可以被省略了。如果前两部分的标识符没有指定, 那么会使用当前的 catalog 和当前数据库。用户也可以通过 Table API 或 SQL 切换当前的 catalog 和当前的数据库。
标识符遵循 SQL 标准,因此使用时需要用反引号(`)进行转义。
TableEnvironment tEnv = ...;
tEnv.useCatalog("custom_catalog");
tEnv.useDatabase("custom_database");
Table table = ...;
// register the view named 'exampleView' in the catalog named 'custom_catalog'
// in the database named 'custom_database'
tableEnv.createTemporaryView("view_Name", table);
// register the view named 'exampleView' in the catalog named 'custom_catalog'
// in the database named 'other_database'
tableEnv.createTemporaryView("other_database.view_Name", table);
// register the view named 'example.View' in the catalog named 'custom_catalog'
// in the database named 'custom_database'
tableEnv.createTemporaryView("`example.View`", table);
// register the view named 'exampleView' in the catalog named 'other_catalog'
// in the database named 'other_database'
tableEnv.createTemporaryView("other_catalog.other_database.viewName", table);
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
5、查询表
1)、Table API
Table API 是关于 Scala 和 Java 的集成语言式查询 API。与 SQL 相反,Table API 的查询不是由字符串指定,而是在宿主语言中逐步构建。
Table API 是基于 Table 类的,该类表示一个表(流或批处理),并提供使用关系操作的方法。这些方法返回一个新的 Table 对象,该对象表示对输入 Table 进行关系操作的结果。 一些关系操作由多个方法调用组成,例如 table.groupBy(…).select(),其中 groupBy(…) 指定 table 的分组,而 select(…) 在 table 分组上的投影。
该链接中列出了所有支持的算子操作:17、Flink 的table api与sql之Table API: Table API 支持的操作
以下示例展示了一个简单的 Table API 聚合查询:
// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// register Orders table
// scan registered Orders table
Table orders = tableEnv.from("student");
// compute revenue for all customers from France
Table revenue = orders
.filter($("name").isEqual("alanchan"))
.groupBy($("id"), $("name")
.select($("id"), $("name"), $("chinese").sum().as("sum_c"));
// emit or convert Table
// execute query
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
2)、SQL
Flink SQL 是基于实现了SQL标准的 Apache Calcite 的。SQL 查询由常规字符串指定。
该链接 描述了Flink对流处理和批处理表的SQL支持:
26、Flink 的SQL之概览与入门
27、Flink 的SQL之SELECT (Queries)
28、Flink 的SQL之DROP 语句、ALTER 语句、INSERT 语句、ANALYZE 语句
29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE
30、Flink SQL之SQL 客户端
下面的示例演示了如何指定查询并将结果作为 Table 对象返回。
// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// register Orders table
// compute revenue for all customers from France
Table revenue = tableEnv.sqlQuery(
"SELECT id, name, SUM(chinese) AS sum_c" +
"FROM student" +
"WHERE name= 'alanchan' " +
"GROUP BY id, name"
);
// emit or convert Table
// execute query
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
如下的示例展示了如何指定一个更新查询,将查询的结果插入到已注册的表中
// get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// register "Orders" table
// register "RevenueFrance" output table
// compute revenue for all customers from France and emit to "RevenueFrance"
tableEnv.executeSql(
"INSERT INTO student_sumscore " +
"SELECT id, name, SUM(chinese) AS sum_c" +
"FROM student" +
"WHERE name= 'alanchan' " +
"GROUP BY id, name"
);
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
3)、混用 Table API 和 SQL
Table API 和 SQL 查询的混用非常简单因为它们都返回 Table 对象:
- 可以在 SQL 查询返回的 Table 对象上定义 Table API 查询。
- 在 TableEnvironment 中注册的结果表可以在 SQL 查询的 FROM 子句中引用,通过这种方法就可以在 Table API 查询的结果上定义 SQL 查询。
6、输出表
Table 通过写入 TableSink 输出。TableSink 是一个通用接口,用于支持多种文件格式(如 CSV、Apache Parquet、Apache Avro)、存储系统(如 JDBC、Apache HBase、Apache Cassandra、Elasticsearch)或消息队列系统(如 Apache Kafka、RabbitMQ)。
批处理 Table 只能写入 BatchTableSink,而流处理 Table 需要指定写入 AppendStreamTableSink,RetractStreamTableSink 或者 UpsertStreamTableSink。
该链接可以获取更多关于可用 Sink 的信息以及如何自定义 TableSink:http://iyenn.com/index/link?url=http://iyenn.com/index/link?url=https://nightlies.apache.org/flink/flink-docs-release-1.12/zh/dev/table/sourceSinks.html
方法 Table.executeInsert(String tableName) 将 Table 发送至已注册的 TableSink。该方法通过名称在 catalog 中查找 TableSink 并确认Table schema 和 TableSink schema 一致。
下面的示例演示如何输出 Table:
/ get a TableEnvironment
TableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// create an output Table
final Schema schema = new Schema()
.field("a", DataTypes.INT())
.field("b", DataTypes.STRING())
.field("c", DataTypes.BIGINT());
tableEnv.connect(new FileSystem().path("/usr/local/bigdata/testdata"))
.withFormat(new Csv().fieldDelimiter('|').deriveSchema())
.withSchema(schema)
.createTemporaryTable("CsvSinkTable");
// compute a result Table using Table API operators and/or SQL queries
Table result = ...
// emit the result Table to the registered TableSink
result.executeInsert("CsvSinkTable");
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
7、翻译与执行查询
两种计划器翻译和执行查询的方式是不同的。
1)、Blink planner(flink-table-planner-blink)
不论输入数据源是流式的还是批式的,Table API 和 SQL 查询都会被转换成 DataStream 程序。查询在内部表示为逻辑查询计划,并被翻译成两个阶段:
- 优化逻辑执行计划
- 翻译成 DataStream 程序
Table API 或者 SQL 查询在下列情况下会被翻译:
- 当 TableEnvironment.executeSql() 被调用时。该方法是用来执行一个 SQL 语句,一旦该方法被调用, SQL 语句立即被翻译。
- 当 Table.executeInsert() 被调用时。该方法是用来将一个表的内容插入到目标表中,一旦该方法被调用, TABLE API 程序立即被翻译。
- 当 Table.execute() 被调用时。该方法是用来将一个表的内容收集到本地,一旦该方法被调用, TABLE API 程序立即被翻译。
- 当 StatementSet.execute() 被调用时。Table (通过 StatementSet.addInsert() 输出给某个 Sink)和 INSERT 语句 (通过调用 StatementSet.addInsertSql())会先被缓存到 StatementSet 中,StatementSet.execute() 方法被调用时,所有的 sink 会被优化成一张有向无环图。
- 当 Table 被转换成 DataStream 时(本文下面的与 DataStream 和 DataSet API 结合)。转换完成后,它就成为一个普通的 DataStream 程序,并会在调用 StreamExecutionEnvironment.execute() 时被执行。
从 1.11 版本开始,sqlUpdate 方法 和 insertInto 方法被废弃,从这两个方法构建的 Table 程序必须通过 StreamTableEnvironment.execute() 方法执行,而不能通过 StreamExecutionEnvironment.execute() 方法来执行
2)、Flink old planner(flink-table-planner)
Table API 和 SQL 查询会被翻译成 DataStream 或者 DataSet 程序, 这取决于它们的输入数据源是流式的还是批式的。查询在内部表示为逻辑查询计划,并被翻译成两个阶段:
- 优化逻辑执行计划
- 翻译成 DataStream 或 DataSet 程序
Table API 或者 SQL 查询在下列情况下会被翻译:
- 当 TableEnvironment.executeSql() 被调用时。该方法是用来执行一个 SQL 语句,一旦该方法被调用, SQL 语句立即被翻译。
- 当 Table.executeInsert() 被调用时。该方法是用来将一个表的内容插入到目标表中,一旦该方法被调用, TABLE API 程序立即被翻译。
- 当 Table.execute() 被调用时。该方法是用来将一个表的内容收集到本地,一旦该方法被调用, TABLE API 程序立即被翻译。
- 当 StatementSet.execute() 被调用时。Table (通过 StatementSet.addInsert() 输出给某个 Sink)和 INSERT 语句 (通过调用 StatementSet.addInsertSql())会先被缓存到 StatementSet 中,StatementSet.execute() 方法被调用时,所有的 sink 会被优化成一张有向无环图。
- 对于 Streaming 而言,当Table 被转换成 DataStream 时(参阅与 DataStream 和 DataSet API 结合)触发翻译。转换完成后,它就成为一个普通的 DataStream 程序,并会在调用 StreamExecutionEnvironment.execute() 时被执行。对于 Batch 而言,Table 被转换成 DataSet 时(参阅与 DataStream 和 DataSet API 结合)触发翻译。转换完成后,它就成为一个普通的 DataSet 程序,并会在调用 ExecutionEnvironment.execute() 时被执行。
从 1.11 版本开始,sqlUpdate 方法 和 insertInto 方法被废弃。对于 Streaming 而言,如果一个 Table 程序是从这两个方法构建出来的,必须通过 StreamTableEnvironment.execute() 方法执行,而不能通过 StreamExecutionEnvironment.execute() 方法执行;对于 Batch 而言,如果一个 Table 程序是从这两个方法构建出来的,必须通过 BatchTableEnvironment.execute() 方法执行,而不能通过 ExecutionEnvironment.execute() 方法执行
8、与 DataStream 和 DataSet API 结合
在流处理方面两种计划器都可以与 DataStream API 结合。只有旧计划器可以与 DataSet API 结合。在批处理方面,Blink 计划器不能同两种计划器中的任何一个结合。
注意: 下文讨论的 DataSet API 只与旧计划起有关。
Table API 和 SQL 可以被很容易地集成并嵌入到 DataStream 和 DataSet 程序中。例如,可以查询外部表(例如从 RDBMS),进行一些预处理,例如过滤,投影,聚合或与元数据 join,然后使用 DataStream 或 DataSet API(以及在这些 API 之上构建的任何库,例如 CEP 或 Gelly)。相反,也可以将 Table API 或 SQL 查询应用于 DataStream 或 DataSet 程序的结果。
这种交互可以通过 DataStream 或 DataSet 与 Table 的相互转化实现。本节介绍这些转化是如何实现的。
1)、通过 DataSet 或 DataStream 创建视图
在 TableEnvironment 中可以将 DataStream 或 DataSet 注册成视图。结果视图的 schema 取决于注册的 DataStream 或 DataSet 的数据类型。请参阅文档 数据类型到 table schema 的映射获取详细信息。
注意: 通过 DataStream 或 DataSet 创建的视图只能注册成临时视图。
// get StreamTableEnvironment
// registration of a DataSet in a BatchTableEnvironment is equivalent
StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
DataStream<Tuple2<Long, String>> stream = ...
// register the DataStream as View "myTable" with fields "f0", "f1"
tableEnv.createTemporaryView("table_name", stream);
// register the DataStream as View "myTable2" with fields "myLong", "myString"
tableEnv.createTemporaryView("table_name2", stream, $("id"), $("name"));
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
2)、将 DataStream 或 DataSet 转换成表
与在 TableEnvironment 注册 DataStream 或 DataSet 不同,DataStream 和 DataSet 还可以直接转换成 Table。
// get StreamTableEnvironment
// registration of a DataSet in a BatchTableEnvironment is equivalent
StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
DataStream<Tuple2<Long, String>> stream = ...
// Convert the DataStream into a Table with default fields "f0", "f1"
Table table1 = tableEnv.fromDataStream(stream);
// Convert the DataStream into a Table with fields "myLong", "myString"
Table table2 = tableEnv.fromDataStream(stream, $("id"), $("name"));
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
3)、将表转换成 DataStream 或 DataSet
Table 可以被转换成 DataStream 或 DataSet。通过这种方式,定制的 DataSet 或 DataStream 程序就可以在 Table API 或者 SQL 的查询结果上运行了。
将 Table 转换为 DataStream 或者 DataSet 时,你需要指定生成的 DataStream 或者 DataSet 的数据类型,即Table 的每行数据要转换成的数据类型。通常最方便的选择是转换成 Row 。以下列表概述了不同选项的功能:
- Row: 字段按位置映射,字段数量任意,支持 null 值,无类型安全(type-safe)检查。
- POJO: 字段按名称映射(POJO 必须按Table 中字段名称命名),字段数量任意,支持 null 值,无类型安全检查。
- Case Class: 字段按位置映射,不支持 null 值,有类型安全检查。
- Tuple: 字段按位置映射,字段数量少于 22(Scala)或者 25(Java),不支持 null 值,无类型安全检查。
- Atomic Type: Table 必须有一个字段,不支持 null 值,有类型安全检查。
1、将表转换成 DataStream
流式查询(streaming query)的结果表会动态更新,即,当新纪录到达查询的输入流时,查询结果会改变。因此,像这样将动态查询结果转换成 DataStream 需要对表的更新方式进行编码。
将 Table 转换为 DataStream 有两种模式:
- Append Mode: 仅当动态 Table 仅通过INSERT更改进行修改时,才可以使用此模式,即,它仅是追加操作,并且之前输出的结果永远不会更新。
- Retract Mode: 任何情形都可以使用此模式。它使用 boolean 值对 INSERT 和 DELETE 操作的数据进行标记。
// get StreamTableEnvironment.
StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// Table with two fields (String name, Integer age)
Table table = ...
// convert the Table into an append DataStream of Row by specifying the class
DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class);
// convert the Table into an append DataStream of Tuple2
// via a TypeInformation
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
Types.STRING(),
Types.INT());
DataStream<Tuple2<String, Integer>> dsTuple =
tableEnv.toAppendStream(table, tupleType);
// convert the Table into a retract DataStream of Row.
// A retract stream of type X is a DataStream>.
// The boolean field indicates the type of the change.
// True is INSERT, false is DELETE.
DataStream<Tuple2<Boolean, Row>> retractStream =
tableEnv.toRetractStream(table, Row.class);
// DataStream> result = tenv.toRetractStream(resultTable, Student.class);
// DataStream> result = tenv.toRetractStream(resultTable, Row.class);
// DataStream> result = tenv.toRetractStream(resultTable, Result.class);
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Student {
private Long id;
private String name;
private double chinese;
private double english;
private double math;
}
@Data
public static class Result {
private Long id;
private Double sum_c;
private Double sum_m;
private Double sum_e;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
关于动态表dynamic tables请参考链接:15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置(如何处理更新结果)、时态表、流上的join、流上的确定性以及查询配置中的动态表部分
一旦 Table 被转化为 DataStream,必须使用 StreamExecutionEnvironment 的 execute 方法执行该 DataStream 作业。
2、将表转换成 DataSet
将 Table 转换成 DataSet 的过程如下:
// get BatchTableEnvironment
BatchTableEnvironment tableEnv = BatchTableEnvironment.create(env);
// Table with two fields (String name, Integer age)
Table table = ...
// convert the Table into a DataSet of Row by specifying a class
DataSet<Row> dsRow = tableEnv.toDataSet(table, Row.class);
// convert the Table into a DataSet of Tuple2 via a TypeInformation
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
Types.STRING(),
Types.INT());
DataSet<Tuple2<String, Integer>> dsTuple =
tableEnv.toDataSet(table, tupleType);
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
一旦 Table 被转化为 DataSet,必须使用 ExecutionEnvironment 的 execute 方法执行该 DataSet 作业。
4)、数据类型到 Table Schema 的映射
Flink 的 DataStream 和 DataSet APIs 支持多样的数据类型。例如 Tuple(Scala 内置以及Flink Java tuple)、POJO 类型、Scala case class 类型以及 Flink 的 Row 类型等允许嵌套且有多个可在表的表达式中访问的字段的复合数据类型。其他类型被视为原子类型。下面,我们讨论 Table API 如何将这些数据类型类型转换为内部 row 表示形式,并提供将 DataStream 转换成 Table 的样例。
数据类型到 table schema 的映射有两种方式:基于字段位置或基于字段名称。
- 基于字段位置
基于位置的映射可在保持字段顺序的同时为字段提供更有意义的名称。这种映射方式可用于具有特定的字段顺序的复合数据类型以及原子类型。如 tuple、row 以及 case class 这些复合数据类型都有这样的字段顺序。然而,POJO 类型的字段则必须通过名称映射(参见下一章)。可以将字段投影出来,但不能使用as重命名。
定义基于位置的映射时,输入数据类型中一定不能存在指定的名称,否则 API 会假定应该基于字段名称进行映射。如果未指定任何字段名称,则使用默认的字段名称和复合数据类型的字段顺序,或者使用 f0 表示原子类型。
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env, settings);
DataStream<Tuple2<Long, String>> stream = env.fromCollection(
Arrays.asList(new Tuple2(1L, "alan"),
new Tuple2(2L, "alanchan"),
new Tuple2(3L, "alanchanchn"),
new Tuple2(4L, "alanalan_chn"),
new Tuple2(5L, "alan_chan_chn")));
// convert DataStream into Table with default field names "f0" and "f1"
Table t_name_row = tenv.fromDataStream(stream);
DataStream<Tuple2<Boolean, Row>> result = tenv.toRetractStream(t_name_row, Row.class);
// convert DataStream into Table with field "id" only
Table table_f0 = tenv.fromDataStream(stream, $("f0"));
DataStream<Tuple2<Boolean, Row>> result2 = tenv.toRetractStream(table_f0, Row.class);
// convert DataStream into Table with field names "id" and "name"
Table table_f = tenv.fromDataStream(stream, $("f0"), $("f1"));
DataStream<Tuple2<Boolean, Row>> result3 = tenv.toRetractStream(table_f, Row.class);
result.print();
result2.print();
result3.print();
env.execute();
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 基于字段名称
基于名称的映射适用于任何数据类型包括 POJO 类型。这是定义 table schema 映射最灵活的方式。映射中的所有字段均按名称引用,并且可以通过 as 重命名。字段可以被重新排序和映射。
若果没有指定任何字段名称,则使用默认的字段名称和复合数据类型的字段顺序,或者使用 f0 表示原子类型。
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env, settings);
DataStream<Tuple2<Long, String>> stream = env.fromCollection(
Arrays.asList(new Tuple2(1L, "alan"),
new Tuple2(2L, "alanchan"),
new Tuple2(3L, "alanchanchn"),
new Tuple2(4L, "alanalan_chn"),
new Tuple2(5L, "alan_chan_chn")));
// convert DataStream into Table with default field names "f0" and "f1"
Table t_name_row = tenv.fromDataStream(stream);
DataStream<Tuple2<Boolean, Row>> result = tenv.toRetractStream(t_name_row, Row.class);
// convert DataStream into Table with field "f0" only
Table table_f0 = tenv.fromDataStream(stream, $("f0"));
DataStream<Tuple2<Boolean, Row>> result2 = tenv.toRetractStream(table_f0, Row.class);
// convert DataStream into Table with field names "id" and "name"
Table table_f = tenv.fromDataStream(stream, $("f0").as("id"), $("f1").as("name"));
DataStream<Tuple2<Boolean, Row>> result3 = tenv.toRetractStream(table_f, Row.class);
result.print();
result2.print();
result3.print();
env.execute();
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
1、原子类型
Flink 将基础数据类型(Integer、Double、String)或者通用数据类型(不可再拆分的数据类型)视为原子类型。原子类型的 DataStream 或者 DataSet 会被转换成只有一条属性的 Table。属性的数据类型可以由原子类型推断出,还可以重新命名属性。
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
DataStream<Long> stream = ...
// convert DataStream into Table with default field name "f0"
Table table = tableEnv.fromDataStream(stream);
// convert DataStream into Table with field name "fo"
Table table = tableEnv.fromDataStream(stream, $("f"));
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
2、Tuple类型(Scala 和 Java)和 Case Class类型(仅 Scala)
Flink 支持 Scala 的内置 tuple 类型并给 Java 提供自己的 tuple 类型。两种 tuple 的 DataStream 和 DataSet 都能被转换成表。可以通过提供所有字段名称来重命名字段(基于位置映射)。如果没有指明任何字段名称,则会使用默认的字段名称。如果引用了原始字段名称(对于 Flink tuple 为f0、f1 … …,对于 Scala tuple 为_1、_2 … …),则 API 会假定映射是基于名称的而不是基于位置的。基于名称的映射可以通过 as 对字段和投影进行重新排序。
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
// env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env, settings);
// source
DataStream<Tuple2<Long, String>> stream = env.fromCollection(
Arrays.asList(new Tuple2(1L, "alan"), new Tuple2(2L, "alanchan"), new Tuple2(3L, "alanchanchn"), new Tuple2(4L, "alanalan_chn"), new Tuple2(5L, "alan_chan_chn")));
// convert DataStream into Table with default field names "f0", "f1"
Table t_name_row = tenv.fromDataStream(stream);
DataStream<Tuple2<Boolean, Row>> result = tenv.toRetractStream(t_name_row, Row.class);
result.print();
// convert DataStream into Table with renamed field names "myLong", "myString" (position-based)
Table table_f0 = tenv.fromDataStream(stream, $("id"), $("name"));
DataStream<Tuple2<Boolean, Row>> result2 = tenv.toRetractStream(table_f0, Row.class);
result2.print();
// convert DataStream into Table with reordered fields "f1", "f0" (name-based)
Table table_f = tenv.fromDataStream(stream, $("f1"), $("f0"));
DataStream<Tuple2<Boolean, Row>> result4 = tenv.toRetractStream(table_f, Row.class);
result4.print();
// convert DataStream into Table with reordered and aliased fields "myString", "myLong" (name-based)
Table table_f_as= tenv.fromDataStream(stream, $("f0").as("id"), $("f1").as("name"));
DataStream<Tuple2<Boolean, Row>> result3 = tenv.toRetractStream(table_f_as, Row.class);
result3.print();
// execute
env.execute();
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
3、POJO 类型 (Java 和 Scala)
Flink 支持 POJO 类型作为复合类型。确定 POJO 类型的规则记录参看链接:https://nightlies.apache.org/flink/flink-docs-release-1.12/zh/dev/types_serialization.html#pojos
在不指定字段名称的情况下将 POJO 类型的 DataStream 或 DataSet 转换成 Table 时,将使用原始 POJO 类型字段的名称。名称映射需要原始名称,并且不能按位置进行。字段可以使用别名(带有 as 关键字)来重命名,重新排序和投影。
// get a StreamTableEnvironment, works for BatchTableEnvironment equivalently
// env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env, settings);
// source
DataStream<Student> stream = env.fromCollection(Arrays.asList(
new Student(1L, "alan", 10, 20, 30),
new Student(2L, "alanchan", 60, 70, 80),
new Student(3L, "alanchanchn", 70, 80, 90),
new Student(4L, "alanchn", 100, 100, 100)));
// convert DataStream into Table with default field names "age", "name" (fields are ordered by name!)
Table table1 = tenv.fromDataStream(stream);
DataStream<Tuple2<Boolean, Student>> result = tenv.toRetractStream(table1, Student.class);
result.print();
// convert DataStream into Table with renamed fields "myAge", "myName" (name-based)
Table table2 = tenv.fromDataStream(stream, $("id").as("r_id"), $("name").as("r_name"), $("chinese").as("r_chinese"), $("english").as("r_english"), $("math").as("r_math"));
DataStream<Tuple2<Boolean, Row>> result2 = tenv.toRetractStream(table2, Row.class);
result2.print();
// convert DataStream into Table with projected field "name" (name-based)
Table table3 = tenv.fromDataStream(stream, $("name"));
DataStream<Tuple2<Boolean, Row>> result3 = tenv.toRetractStream(table3, Row.class);
result3.print();
// convert DataStream into Table with projected and renamed field "myName" (name-based)
Table table4 = tenv.fromDataStream(stream, $("name").as("NAME"),$("chinese").as("CHINESE"));
DataStream<Tuple2<Boolean, Row>> result4 = tenv.toRetractStream(table4, Row.class);
result4.print();
// execute
env.execute();
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
4、Row类型
Row 类型支持任意数量的字段以及具有 null 值的字段。字段名称可以通过 RowTypeInfo 指定,也可以在将 Row 的 DataStream 或 DataSet 转换为 Table 时指定。
Row 类型的字段映射支持基于名称和基于位置两种方式。字段可以通过提供所有字段的名称的方式重命名(基于位置映射)或者分别选择进行投影/排序/重命名(基于名称映射)。
// env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env, settings);
// source
DataStream<Row> stream = env.fromCollection(Arrays.asList(
Row.of(1L, "alan", 10, 20, 30),
Row.of(2L, "alanchan", 60, 70, 80),
Row.of(3L, "alanchanchn", 70, 80, 90),
Row.of(4L, "alanchn", 100, 100, 100)));
// Convert DataStream into Table with renamed field names "myName", "myAge" (position-based)
Table table1 = tenv.fromDataStream(stream,$("id"),$("name"),$("chinese"),$("english"),$("math"));
DataStream<Tuple2<Boolean, Row>> result = tenv.toRetractStream(table1, Row.class);
result.print();
// Convert DataStream into Table with renamed fields "myName", "myAge" (name-based)
//Table table2 = tenv.fromDataStream(stream, $("id").as("r_id"), $("name").as("r_name"), $("chinese").as("r_chinese"), $("english").as("r_english"), $("math").as("r_math"));
RowTypeInfo rowTypeInfo = new RowTypeInfo(new TypeInformation<?>[] { Types.LONG, Types.STRING, Types.INT, Types.INT, Types.INT },
new String[] { "id_", "name_", "chinese_", "english_", "math_" });
DataStream<Row> processedStream = stream.process(new ProcessFunction<Row, Row>() {
@Override
public void processElement(Row input, Context context, Collector<Row> output) {
output.collect(input);
}
}).returns(rowTypeInfo);
Table resultTable = tenv.fromDataStream(processedStream);
resultTable.printSchema();
DataStream<Tuple2<Boolean, Row>> result2 = tenv.toRetractStream(resultTable,Row.class);
result2.print();
// convert DataStream into Table with projected field "name" (name-based)
Table table3 = tenv.fromDataStream(stream, $("name"));
DataStream<Tuple2<Boolean, Row>> result3 = tenv.toRetractStream(table3, Row.class);
result3.print();
// execute
env.execute();
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
9、查询优化
1)、Blink planner(flink-table-planner-blink)
Apache Flink 使用并扩展了 Apache Calcite 来执行复杂的查询优化。 这包括一系列基于规则和成本的优化,例如:
- 基于 Apache Calcite 的子查询解相关
- 投影剪裁
- 分区剪裁
- 过滤器下推
- 子计划消除重复数据以避免重复计算
- 特殊子查询重写,包括两部分:
1、将 IN 和 EXISTS 转换为 left semi-joins
2、将 NOT IN 和 NOT EXISTS 转换为 left anti-join - 可选 join 重新排序
通过 table.optimizer.join-reorder-enabled 启用
注意: 当前仅在子查询重写的结合条件下支持 IN / EXISTS / NOT IN / NOT EXISTS。
优化器不仅基于计划,而且还基于可从数据源获得的丰富统计信息以及每个算子(例如 io,cpu,网络和内存)的细粒度成本来做出明智的决策。
高级用户可以通过 CalciteConfig 对象提供自定义优化,可以通过调用 TableEnvironment#getConfig#setPlannerConfig 将其提供给 TableEnvironment。
2)、Flink planner(flink-table-planner)
Apache Flink 利用 Apache Calcite 来优化和翻译查询。当前执行的优化包括投影和过滤器下推,子查询消除以及其他类型的查询重写。原版计划程序尚未优化 join 的顺序,而是按照查询中定义的顺序执行它们(FROM 子句中的表顺序和/或 WHERE 子句中的 join 谓词顺序)。
通过提供一个 CalciteConfig 对象,可以调整在不同阶段应用的优化规则集合。这个对象可以通过调用构造器 CalciteConfig.createBuilder() 创建,并通过调用 tableEnv.getConfig.setPlannerConfig(calciteConfig) 提供给 TableEnvironment。
10、解释表
Table API 提供了一种机制来解释计算 Table 的逻辑和优化查询计划。 这是通过 Table.explain() 方法或者 StatementSet.explain() 方法来完成的。Table.explain() 返回一个 Table 的计划。StatementSet.explain() 返回多 sink 计划的结果。它返回一个描述三种计划的字符串:
- 关系查询的抽象语法树(the Abstract Syntax Tree),即未优化的逻辑查询计划,
- 优化的逻辑查询计划,以及
- 物理执行计划。
可以用 TableEnvironment.explainSql() 方法和 TableEnvironment.executeSql() 方法支持执行一个 EXPLAIN 语句获取逻辑和优化查询计划,请参阅 29、Flink SQL之DESCRIBE、EXPLAIN、USE、SHOW、LOAD、UNLOAD、SET、RESET、JAR、JOB Statements、UPDATE、DELETE。
以下代码展示了一个示例以及对给定 Table 使用 Table.explain() 方法的相应输出:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env, settings);
// source
DataStream<Tuple2<Long, String>> stream1 = env.fromCollection(
Arrays.asList(
new Tuple2(1L, "alan"),
new Tuple2(2L, "alanchan"),
new Tuple2(3L, "alanchanchn")));
DataStream<Tuple2<Long, String>> stream2 = env.fromCollection(
Arrays.asList(
new Tuple2(4L, "alanalan_chn"),
new Tuple2(5L, "alan_chan_chn")));
// explain Table API
Table table1 = tenv.fromDataStream(stream1, $("id"), $("name"));
Table table2 = tenv.fromDataStream(stream2, $("id"), $("name"));
Table table = table1
.where($("name").like("alan%"))
.unionAll(table2);
System.out.println(table.explain());
DataStream<Row> result2 = tenv.toChangelogStream(table);
result2.print();
// execute
env.execute();
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
上述例子的结果是:
== Abstract Syntax Tree ==
LogicalUnion(all=[true])
:- LogicalFilter(condition=[LIKE($1, _UTF-16LE'alan%')])
: +- LogicalTableScan(table=[[Unregistered_DataStream_1]])
+- LogicalTableScan(table=[[Unregistered_DataStream_2]])
== Optimized Physical Plan ==
Union(all=[true], union=[id, name])
:- Calc(select=[id, name], where=[LIKE(name, _UTF-16LE'alan%')])
: +- DataStreamScan(table=[[Unregistered_DataStream_1]], fields=[id, name])
+- DataStreamScan(table=[[Unregistered_DataStream_2]], fields=[id, name])
== Optimized Execution Plan ==
Union(all=[true], union=[id, name])
:- Calc(select=[id, name], where=[LIKE(name, _UTF-16LE'alan%')])
: +- DataStreamScan(table=[[Unregistered_DataStream_1]], fields=[id, name])
+- DataStreamScan(table=[[Unregistered_DataStream_2]], fields=[id, name])
5> +I[3, alanchanchn]
3> +I[1, alan]
1> +I[4, alanalan_chn]
2> +I[5, alan_chan_chn]
4> +I[2, alanchan]
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
以下代码展示了一个示例以及使用 StatementSet.explain() 的多 sink 计划的相应输出:
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
final Schema schema = new Schema()
.field("count", DataTypes.INT())
.field("word", DataTypes.STRING());
tEnv.connect(new FileSystem().path("/source/path1"))
.withFormat(new Csv().deriveSchema())
.withSchema(schema)
.createTemporaryTable("MySource1");
tEnv.connect(new FileSystem().path("/source/path2"))
.withFormat(new Csv().deriveSchema())
.withSchema(schema)
.createTemporaryTable("MySource2");
tEnv.connect(new FileSystem().path("/sink/path1"))
.withFormat(new Csv().deriveSchema())
.withSchema(schema)
.createTemporaryTable("MySink1");
tEnv.connect(new FileSystem().path("/sink/path2"))
.withFormat(new Csv().deriveSchema())
.withSchema(schema)
.createTemporaryTable("MySink2");
StatementSet stmtSet = tEnv.createStatementSet();
Table table1 = tEnv.from("MySource1").where($("word").like("F%"));
stmtSet.addInsert("MySink1", table1);
Table table2 = table1.unionAll(tEnv.from("MySource2"));
stmtSet.addInsert("MySink2", table2);
String explanation = stmtSet.explain();
System.out.println(explanation);
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
多 sink 计划的结果是:
== Abstract Syntax Tree ==
LogicalLegacySink(name=[MySink1], fields=[count, word])
+- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
+- LogicalTableScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]])
LogicalLegacySink(name=[MySink2], fields=[count, word])
+- LogicalUnion(all=[true])
:- LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')])
: +- LogicalTableScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]])
+- LogicalTableScan(table=[[default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]]])
== Optimized Logical Plan ==
Calc(select=[count, word], where=[LIKE(word, _UTF-16LE'F%')], reuse_id=[1])
+- TableSourceScan(table=[[default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]]], fields=[count, word])
LegacySink(name=[MySink1], fields=[count, word])
+- Reused(reference_id=[1])
LegacySink(name=[MySink2], fields=[count, word])
+- Union(all=[true], union=[count, word])
:- Reused(reference_id=[1])
+- TableSourceScan(table=[[default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]]], fields=[count, word])
== Physical Execution Plan ==
Stage 1 : Data Source
content : collect elements with CollectionInputFormat
Stage 2 : Operator
content : CsvTableSource(read fields: count, word)
ship_strategy : REBALANCE
Stage 3 : Operator
content : SourceConversion(table:Buffer(default_catalog, default_database, MySource1, source: [CsvTableSource(read fields: count, word)]), fields:(count, word))
ship_strategy : FORWARD
Stage 4 : Operator
content : Calc(where: (word LIKE _UTF-16LE'F%'), select: (count, word))
ship_strategy : FORWARD
Stage 5 : Operator
content : SinkConversionToRow
ship_strategy : FORWARD
Stage 6 : Operator
content : Map
ship_strategy : FORWARD
Stage 8 : Data Source
content : collect elements with CollectionInputFormat
Stage 9 : Operator
content : CsvTableSource(read fields: count, word)
ship_strategy : REBALANCE
Stage 10 : Operator
content : SourceConversion(table:Buffer(default_catalog, default_database, MySource2, source: [CsvTableSource(read fields: count, word)]), fields:(count, word))
ship_strategy : FORWARD
Stage 12 : Operator
content : SinkConversionToRow
ship_strategy : FORWARD
Stage 13 : Operator
content : Map
ship_strategy : FORWARD
Stage 7 : Data Sink
content : Sink: CsvTableSink(count, word)
ship_strategy : FORWARD
Stage 14 : Data Sink
content : Sink: CsvTableSink(count, word)
ship_strategy : FORWARD
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
三、示例1:将DataStream数据转Table然后使用sql进行查询
1、maven依赖
<properties>
<encoding>UTF-8encoding>
<project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
<maven.compiler.source>1.8maven.compiler.source>
<maven.compiler.target>1.8maven.compiler.target>
<java.version>1.8java.version>
<scala.version>2.12scala.version>
<flink.version>1.12.0flink.version>
properties>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-table-api-scala-bridge_2.12artifactId>
<version>${flink.version}version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-table-api-java-bridge_2.12artifactId>
<version>${flink.version}version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-table-planner_2.12artifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-table-planner-blink_2.12artifactId>
<version>${flink.version}version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-table-commonartifactId>
<version>${flink.version}version>
<scope>providedscope>
dependency>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
2、实现
- java bean
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author alanchan
*
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Student {
private Long id;
private String name;
private double chinese;
private double english;
private double math;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 实现
import static org.apache.flink.table.api.Expressions.$;
import java.util.Arrays;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* @author alanchan
*
*/
public class DataStream2Table {
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env, settings);
// source
DataStream<Student> studentDS = env.fromCollection(Arrays.asList(
new Student(1L, "alan", 10, 20, 30),
new Student(2L, "alanchan", 60, 70, 80),
new Student(3L, "alanchanchn", 70, 80, 90),
new Student(4L, "alanchn", 100, 100, 100)));
// transformation
// 将DataStream数据转Table,然后查询
Table tableStudent = tenv.fromDataStream(studentDS, $("id"), $("name"), $("chinese"), $("english"), $("math"));
String sql = "select * from " + tableStudent + " where english > 20";
Table resultTable = tenv.sqlQuery(sql);
DataStream<Tuple2<Boolean, Student>> result = tenv.toRetractStream(resultTable, Student.class);
// sink
result.print();
// execute
env.execute();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
3、验证结果
15> (true,Student(id=2, name=alanchan, chinese=60.0, english=70.0, math=80.0))
1> (true,Student(id=4, name=alanchn, chinese=100.0, english=100.0, math=100.0))
16> (true,Student(id=3, name=alanchanchn, chinese=70.0, english=80.0, math=90.0))
- 1
- 2
- 3
以上,介绍了Table Api和Sql的概念、使用示例,接下来将针对该部分继续进行概念介绍,最后会给出综合使用示例。
评论记录:
回复评论: