Flink 系列文章
1、Flink 部署、概念介绍、source、transformation、sink使用示例、四大基石介绍和示例等系列综合文章链接
13、Flink 的table api与sql的基本概念、通用api介绍及入门示例
14、Flink 的table api与sql之数据类型: 内置数据类型以及它们的属性
15、Flink 的table api与sql之流式概念-详解的介绍了动态表、时间属性配置(如何处理更新结果)、时态表、流上的join、流上的确定性以及查询配置
22、Flink 的table api与sql之创建表的DDL
30、Flink SQL之SQL 客户端(通过kafka和filesystem的例子介绍了配置文件使用-表、视图等)
本文介绍了Flink 的table api和sql中的DDL操作与示例。
本文比较简单,仅仅是介绍Flink 的DDL。
一、DDL概述
CREATE 语句用于向当前或指定的 Catalog 中注册表、视图或函数。注册后的表、视图和函数可以在 SQL 查询中使用。
目前 Flink SQL 支持下列 CREATE 语句:
- CREATE TABLE
- CREATE CATALOG
- CREATE DATABASE
- CREATE VIEW
- CREATE FUNCTION
二、执行 CREATE 语句
可以使用 TableEnvironment 中的 executeSql() 方法执行 CREATE 语句。 若 CREATE 操作执行成功,executeSql() 方法返回 ‘OK’,否则会抛出异常。
1、java
以下的例子展示了如何在 TableEnvironment 中执行一个 CREATE 语句。
EnvironmentSettings settings = EnvironmentSettings.newInstance()...
TableEnvironment tableEnv = TableEnvironment.create(settings);
// 对已注册的表进行 SQL 查询
// 注册名为 “Orders” 的表
tableEnv.executeSql("CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...)");
// 在表上执行 SQL 查询,并把得到的结果作为一个新的表
Table result = tableEnv.sqlQuery(
"SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
// 对已注册的表进行 INSERT 操作
// 注册 TableSink
tableEnv.executeSql("CREATE TABLE RubberOrders(product STRING, amount INT) WITH (...)");
// 在表上执行 INSERT 语句并向 TableSink 发出结果
tableEnv.executeSql(
"INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%'");
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
2、SQL Cli
Flink SQL> CREATE TABLE Orders (`user` BIGINT, product STRING, amount INT) WITH (...);
[INFO] Table has been created.
Flink SQL> CREATE TABLE RubberOrders (product STRING, amount INT) WITH (...);
[INFO] Table has been created.
Flink SQL> INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product LIKE '%Rubber%';
[INFO] Submitting SQL update statement to the cluster...
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
三、CREATE TABLE语法
CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
(
{ <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n]
[ <watermark_definition> ]
[ <table_constraint> ][ , ...n]
)
[COMMENT table_comment]
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
WITH (key1=val1, key2=val2, ...)
[ LIKE source_table [( <like_options> )] | AS select_query ]
<physical_column_definition>:
column_name column_type [ <column_constraint> ] [COMMENT column_comment]
<column_constraint>:
[CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED
<table_constraint>:
[CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED
<metadata_column_definition>:
column_name column_type METADATA [ FROM metadata_key ] [ VIRTUAL ]
<computed_column_definition>:
column_name AS computed_column_expression [COMMENT column_comment]
<watermark_definition>:
WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
<source_table>:
[catalog_name.][db_name.]table_name
<like_options>:
{
{ INCLUDING | EXCLUDING } { ALL | CONSTRAINTS | PARTITIONS }
| { INCLUDING | EXCLUDING | OVERWRITING } { GENERATED | OPTIONS | WATERMARKS }
}[, ...]
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
根据指定的表名创建一个表,如果同名表已经在 catalog 中存在了,则无法注册。
1、Columns
1、Physical / Regular Columns
物理列是数据库中已知的常规列。它们定义物理数据中字段的名称、类型和顺序。因此,物理列表示从外部系统读取和写入的有效负载。连接器和格式使用这些列(按定义的顺序)来配置自身。可以在物理列之间声明其他类型的列,但不会影响最终的physical schema。
以下语句创建一个仅包含常规列的表:
CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING
) WITH (
...
);
- 1
- 2
- 3
- 4
- 5
- 6
2、Metadata Columns
元数据列是SQL标准的扩展,允许访问连接器和/或格式化表的每一行的特定字段。元数据列由元数据关键字指示。例如,元数据列可用于从 Kafka 记录读取和写入时间戳,以便进行基于时间的操作。连接器和格式文档列出了每个组件的可用元数据字段。但是,在table’s schema中声明元数据列是可选的。
以下语句创建一个表,其中包含引用元数据字段时间戳的附加元数据列:
CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
`record_time` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp' -- reads and writes a Kafka record's timestamp
) WITH (
'connector' = 'kafka'
...
);
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
每个元数据字段都由基于字符串的键标识,并具有记录的数据类型。例如,Kafka 连接器公开一个元数据字段,其中包含可用于读取和写入记录的键时间戳和数据类型 TIMESTAMP_LTZ(3)。
在上面的示例中,元数据列record_time成为table’s schema的一部分,并且可以像常规列一样进行转换和存储:
INSERT INTO MyTable SELECT user_id, name, record_time + INTERVAL '1' SECOND FROM MyTable;
- 1
为方便起见,如果列名应用作标识元数据键,则可以省略 FROM 子句:
CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
`timestamp` TIMESTAMP_LTZ(3) METADATA -- use column name as metadata key
) WITH (
'connector' = 'kafka'
...
);
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
为方便起见,如果列的数据类型与元数据字段的数据类型不同,运行时将执行显式强制转换。当然,这要求两种数据类型兼容。
CREATE TABLE MyTable (
`user_id` BIGINT,
`name` STRING,
`timestamp` BIGINT METADATA -- cast the timestamp as BIGINT
) WITH (
'connector' = 'kafka'
...
);
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
默认情况下,计划器假定元数据列可用于读取和写入。但是,在许多情况下,外部系统提供的只读元数据字段多于可写字段。因此,可以使用 VIRTUAL 关键字从持久保留中排除元数据列。
CREATE TABLE MyTable (
`timestamp` BIGINT METADATA, -- part of the query-to-sink schema
`offset` BIGINT METADATA VIRTUAL, -- not part of the query-to-sink schema
`user_id` BIGINT,
`name` STRING,
) WITH (
'connector' = 'kafka'
...
);
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
在上面的示例中,偏移量是只读元数据列, 在query-to-sink schema中排除。因此,source-to-query schema(对于 SELECT)和query-to-sink (对于INSERT INTO)架构不同:
- source-to-query schema:
MyTable(`timestamp` BIGINT, `offset` BIGINT, `user_id` BIGINT, `name` STRING)
- 1
- query-to-sink schema:
MyTable(`timestamp` BIGINT, `user_id` BIGINT, `name` STRING)
- 1
3、Computed Columns
计算列是使用 AS computed_column_expression语法column_name生成的虚拟列。
计算列计算可引用同一表中声明的其他列的表达式。可以访问物理列和元数据列。列本身不以物理方式存储在表中。列的数据类型是从给定表达式自动派生的,不必手动声明。
计划器将在源之后将计算列转换为常规投影。对于优化或水印策略下推,评估可能会分布在运算符之间、多次执行或在给定查询不需要时跳过。
例如,计算列可以定义为:
CREATE TABLE MyTable (
`user_id` BIGINT,
`price` DOUBLE,
`quantity` DOUBLE,
`cost` AS price * quanitity, -- evaluate expression and supply the result to queries
) WITH (
'connector' = 'kafka'
...
);
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
表达式可以包含列、常量或函数的任意组合。表达式不能包含子查询。
计算列在 Flink 中通常用于定义 CREATE TABLE 语句中的时间属性。
处理时间属性可以通过 proc AS PROCTIME() 使用系统的 PROCTIME() 函数轻松定义。
可以在 WATERMARK 声明之前预处理事件时间属性时间戳。例如,如果原始字段不是 TIMESTAMP(3) 类型或嵌套在 JSON 字符串中,则可以使用计算列。
与虚拟元数据列类似,计算列从持久化中排除。因此,计算列不能是 INSERT INTO 语句的目标。因此,source-to-query schema(对于 SELECT)和query-to-sink (for INSERT INTO) schema不同:
- source-to-query schema:
MyTable(`user_id` BIGINT, `price` DOUBLE, `quantity` DOUBLE, `cost` DOUBLE)
- 1
- query-to-sink schema:
MyTable(`user_id` BIGINT, `price` DOUBLE, `quantity` DOUBLE)
- 1
2、WATERMARK
WATERMARK 定义了表的事件时间属性,其形式为 WATERMARK FOR rowtime_column_name AS watermark_strategy_expression 。
rowtime_column_name 把一个现有的列定义为一个为表标记事件时间的属性。该列的类型必须为 TIMESTAMP(3),且是 schema 中的顶层列,它也可以是一个计算列。
watermark_strategy_expression 定义了 watermark 的生成策略。它允许使用包括计算列在内的任意非查询表达式来计算 watermark ;表达式的返回类型必须是 TIMESTAMP(3),表示了从 Epoch 以来的经过的时间。 返回的 watermark 只有当其不为空且其值大于之前发出的本地 watermark 时才会被发出(以保证 watermark 递增)。每条记录的 watermark 生成表达式计算都会由框架完成。 框架会定期发出所生成的最大的 watermark ,如果当前 watermark 仍然与前一个 watermark 相同、为空、或返回的 watermark 的值小于最后一个发出的 watermark ,则新的 watermark 不会被发出。 Watermark 根据 pipeline.auto-watermark-interval 中所配置的间隔发出。 若 watermark 的间隔是 0ms ,那么每条记录都会产生一个 watermark,且 watermark 会在不为空并大于上一个发出的 watermark 时发出。
使用事件时间语义时,表必须包含事件时间属性和 watermark 策略。
Flink 提供了三种常用的 watermark 策略。
-
严格递增时间戳: WATERMARK FOR rowtime_column AS rowtime_column。
发出到目前为止已观察到的最大时间戳的 watermark ,时间戳大于最大时间戳的行被认为没有迟到。 -
递增时间戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘0.001’ SECOND。
发出到目前为止已观察到的最大时间戳减 1 的 watermark ,时间戳大于或等于最大时间戳的行被认为没有迟到。 -
有界乱序时间戳: WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘string’ timeUnit。
发出到目前为止已观察到的最大时间戳减去指定延迟的 watermark ,例如, WATERMARK FOR rowtime_column AS rowtime_column - INTERVAL ‘5’ SECOND 是一个 5 秒延迟的 watermark 策略。
CREATE TABLE Orders (
`user` BIGINT,
product STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH ( . . . );
- 1
- 2
- 3
- 4
- 5
- 6
3、PRIMARY KEY
主键用作 Flink 优化的一种提示信息。主键限制表明一张表或视图的某个(些)列是唯一的并且不包含 Null 值。 主键声明的列都是非 nullable 的。因此主键可以被用作表行级别的唯一标识。
主键可以和列的定义一起声明,也可以独立声明为表的限制属性,不管是哪种方式,主键都不可以重复定义,否则 Flink 会报错。
SQL 标准主键限制可以有两种模式:ENFORCED 或者 NOT ENFORCED。 它申明了是否输入/出数据会做合法性检查(是否唯一)。Flink 不存储数据因此只支持 NOT ENFORCED 模式,即不做检查,用户需要自己保证唯一性。
Flink 假设声明了主键的列都是不包含 Null 值的,Connector 在处理数据时需要自己保证语义正确。
在 CREATE TABLE 语句中,创建主键会修改列的 nullable 属性,主键声明的列默认都是非 Nullable 的。
4、PARTITIONED BY
根据指定的列对已经创建的表进行分区。若表使用 filesystem sink ,则将会为每个分区创建一个目录。
5、WITH Options
表属性用于创建 table source/sink ,一般用于寻找和创建底层的连接器。
表达式 key1=val1 的键和值必须为字符串文本常量。请参考 连接外部系统Flink(十六)Flink 的table api与sql之连接外部系统: 读写外部系统的连接器和格式
表名可以为以下三种格式
- catalog_name.db_name.table_name ,使用catalog_name.db_name.table_name 的表将会与名为 “catalog_name” 的 catalog 和名为 “db_name” 的数据库一起注册到 metastore 中
- db_name.table_name ,使用 db_name.table_name 的表将会被注册到当前执行的 table environment 中的 catalog 且数据库会被命名为 “db_name”
- table_name,对于 table_name, 数据表将会被注册到当前正在运行的catalog和数据库中
使用 CREATE TABLE 语句注册的表均可用作 table source 和 table sink。 在被 DML 语句引用前,我们无法决定其实际用于 source 抑或是 sink。
- 1
6、LIKE
LIKE 子句来源于两种 SQL 特性的变体/组合(Feature T171,“表定义中的 LIKE 语法” 和 Feature T173,“表定义中的 LIKE 语法扩展”)。LIKE 子句可以基于现有表的定义去创建新表,并且可以扩展或排除原始表中的某些部分。与 SQL 标准相反,LIKE 子句必须在 CREATE 语句中定义,并且是基于 CREATE 语句的更上层定义,这是因为 LIKE 子句可以用于定义表的多个部分,而不仅仅是 schema 部分。
可以使用该子句,重用(或改写)指定的连接器配置属性或者可以向外部表添加 watermark 定义,例如可以向 Apache Hive 中定义的表添加 watermark 定义。
示例如下:
CREATE TABLE Orders (
`user` BIGINT,
product STRING,
order_time TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'scan.startup.mode' = 'earliest-offset'
);
CREATE TABLE Orders_with_watermark (
-- 添加 watermark 定义
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
-- 改写 startup-mode 属性
'scan.startup.mode' = 'latest-offset'
)
LIKE Orders;
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
结果表 Orders_with_watermark 等效于使用以下语句创建的表:
CREATE TABLE Orders_with_watermark (
`user` BIGINT,
product STRING,
order_time TIMESTAMP(3),
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
'scan.startup.mode' = 'latest-offset'
);
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
表属性的合并逻辑可以用 like options 来控制。
可以控制合并的表属性如下:
- CONSTRAINTS - 主键和唯一键约束
- GENERATED - 计算列
- OPTIONS - 连接器信息、格式化方式等配置项
- PARTITIONS - 表分区信息
- WATERMARKS - watermark 定义
并且有三种不同的表属性合并策略:
- INCLUDING - 新表包含源表(source table)所有的表属性,如果和源表的表属性重复则会直接失败,例如新表和源表存在相同 key 的属性
- EXCLUDING - 新表不包含源表指定的任何表属性
- OVERWRITING - 新表包含源表的表属性,但如果出现重复项,则会用新表的表属性覆盖源表中的重复表属性,例如,两个表中都存在相同 key 的属性,则会使用当前语句中定义的 key 的属性值
并且你可以使用 INCLUDING/EXCLUDING ALL 这种声明方式来指定使用怎样的合并策略,例如使用 EXCLUDING ALL INCLUDING WATERMARKS,那么代表只有源表的 WATERMARKS 属性才会被包含进新表。
示例如下:
-- 存储在文件系统的源表
CREATE TABLE Orders_in_file (
`user` BIGINT,
product STRING,
order_time_string STRING,
order_time AS to_timestamp(order_time)
)
PARTITIONED BY (`user`)
WITH (
'connector' = 'filesystem',
'path' = '...'
);
-- 对应存储在 kafka 的源表
CREATE TABLE Orders_in_kafka (
-- 添加 watermark 定义
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND
) WITH (
'connector' = 'kafka',
...
)
LIKE Orders_in_file (
-- 排除需要生成 watermark 的计算列之外的所有内容。
-- 去除不适用于 kafka 的所有分区和文件系统的相关属性。
EXCLUDING ALL
INCLUDING GENERATED
);
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
如果未提供 like 配置项(like options),默认将使用 INCLUDING ALL OVERWRITING OPTIONS 的合并策略。
您无法选择物理列的合并策略,当物理列进行合并时就如使用了 INCLUDING 策略。
源表 source_table 可以是一个组合 ID。您可以指定不同 catalog 或者 DB 的表作为源表: 例如,my_catalog.my_db.MyTable 指定了源表 MyTable 来源于名为 MyCatalog 的 catalog 和名为 my_db 的 DB ,my_db.MyTable 指定了源表 MyTable 来源于当前 catalog 和名为 my_db 的 DB。
7、AS select_statement
表也可以通过一个 CTAS 语句中的查询结果来创建并填充数据,CTAS 是一种简单、快捷的创建表并插入数据的方法。
CTAS 有两个部分,SELECT 部分可以是 Flink SQL 支持的任何 SELECT 查询。 CREATE 部分从 SELECT 查询中获取列信息,并创建目标表。 与 CREATE TABLE 类似,CTAS 要求必须在目标表的 WITH 子句中指定必填的表属性。
CTAS 的建表操作需要依赖目标 Catalog。比如,Hive Catalog 会自动在 Hive 中创建物理表。但是基于内存的 Catalog 只会将表的元信息注册在执行 SQL 的 Client 的内存中。
示例如下:
CREATE TABLE my_ctas_table
WITH (
'connector' = 'kafka',
...
)
AS SELECT id, name, age FROM source_table WHERE mod(id, 10) = 0;
- 1
- 2
- 3
- 4
- 5
- 6
结果表 my_ctas_table 等效于使用以下语句创建表并写入数据:
CREATE TABLE my_ctas_table (
id BIGINT,
name STRING,
age INT
) WITH (
'connector' = 'kafka',
...
);
INSERT INTO my_ctas_table SELECT id, name, age FROM source_table WHERE mod(id, 10) = 0;
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
注意 CTAS 有如下约束:
- 暂不支持创建临时表。
- 暂不支持指定列信息。
- 暂不支持指定 Watermark。
- 暂不支持创建分区表。
- 暂不支持主键约束。
目前,CTAS 创建的目标表是非原子性的,如果在向表中插入数据时发生错误,该表不会被自动删除。
三、CREATE CATALOG
CREATE CATALOG catalog_name
WITH (key1=val1, key2=val2, ...)
- 1
- 2
使用给定的目录属性创建目录。如果已存在同名目录,则会引发异常。
用于存储与此目录相关的额外信息的目录属性。表达式 key1=val1 的键和值都应该是字符串文本。
关于Catalogs,请参考Flink(二十四)Flink 的table api与sql之Catalogs
四、CREATE DATABASE
CREATE DATABASE [IF NOT EXISTS] [catalog_name.]db_name
[COMMENT database_comment]
WITH (key1=val1, key2=val2, ...)
- 1
- 2
- 3
根据给定的表属性创建数据库。若数据库中已存在同名表会抛出异常。
- 1、IF NOT EXISTS
若数据库已经存在,则不会进行任何操作。
- 2、WITH OPTIONS
数据库属性一般用于存储关于这个数据库额外的信息。 表达式 key1=val1 中的键和值都需要是字符串文本常量。
五、CREATE VIEW
CREATE [TEMPORARY] VIEW [IF NOT EXISTS] [catalog_name.][db_name.]view_name
[{columnName [, columnName ]* }] [COMMENT view_comment]
AS query_expression
- 1
- 2
- 3
根据给定的 query 语句创建一个视图。若数据库中已经存在同名视图会抛出异常.
- 1、TEMPORARY
创建一个有 catalog 和数据库命名空间的临时视图,并覆盖原有的视图。
- 2、IF NOT EXISTS
若该视图已经存在,则不会进行任何操作。
六、CREATE FUNCTION
CREATE [TEMPORARY|TEMPORARY SYSTEM] FUNCTION
[IF NOT EXISTS] [[catalog_name.]db_name.]function_name
AS identifier [LANGUAGE JAVA|SCALA|PYTHON]
[USING JAR '.jar' [, JAR '.jar' ]* ]
- 1
- 2
- 3
- 4
创建一个有 catalog 和数据库命名空间的 catalog function ,需要指定一个 identifier ,可指定 language tag 。 若 catalog 中,已经有同名的函数注册了,则无法注册。
如果 language tag 是 JAVA 或者 SCALA ,则 identifier 是 UDF 实现类的全限定名。关于 JAVA/SCALA UDF 的实现,请参考 Flink(二十五)Flink 的table api与sql之函数。
- TEMPORARY
创建一个有 catalog 和数据库命名空间的临时 catalog function ,并覆盖原有的 catalog function 。
- TEMPORARY SYSTEM
创建一个没有数据库命名空间的临时系统 catalog function ,并覆盖系统内置的函数。
- IF NOT EXISTS
若该函数已经存在,则不会进行任何操作。
- LANGUAGE JAVA|SCALA|PYTHON
Language tag 用于指定 Flink runtime 如何执行这个函数。目前,只支持 JAVA, SCALA 和 PYTHON,且函数的默认语言为 JAVA。
- USING
指定包含该函数的实现及其依赖的 jar 资源列表。该 jar 应该位于 Flink 当前支持的本地或远程文件系统 中,比如 hdfs/s3/oss。
注意 目前只有 JAVA、SCALA 语言支持 USING 子句。
以上,介绍了Flink 的table api和sql中的DDL操作与示例。
评论记录:
回复评论: