Flink 系列文章
1、Flink1.12.7或1.13.5详细介绍及本地安装部署、验证
2、Flink1.13.5二种部署方式(Standalone、Standalone HA )、四种提交任务方式(前两种及session和per-job)验证详细步骤
3、flink重要概念(api分层、角色、执行流程、执行图和编程模型)及dataset、datastream详细示例入门和提交任务至on yarn运行
4、介绍Flink的流批一体、transformations的18种算子详细介绍、Flink与Kafka的source、sink介绍
5、Flink 的 source、transformations、sink的详细示例(一)
5、Flink的source、transformations、sink的详细示例(二)-source和transformation示例
5、Flink的source、transformations、sink的详细示例(三)-sink示例
6、Flink四大基石之Window详解与详细示例(一)
6、Flink四大基石之Window详解与详细示例(二)
7、Flink四大基石之Time和WaterMaker详解与详细示例(watermaker基本使用、kafka作为数据源的watermaker使用示例以及超出最大允许延迟数据的接收实现)
8、Flink四大基石之State概念、使用场景、持久化、批处理的详解与keyed state和operator state、broadcast state使用和详细示例
9、Flink四大基石之Checkpoint容错机制详解及示例(checkpoint配置、重启策略、手动恢复checkpoint和savepoint)
10、Flink的source、transformations、sink的详细示例(二)-source和transformation示例【补充示例】
11、Flink配置flink-conf.yaml详细说明(HA配置、checkpoint、web、安全、zookeeper、historyserver、workers、zoo.cfg)
12、Flink source和sink 的 clickhouse 详细示例
13、Flink 的table api和sql的介绍、示例等系列综合文章链接
文章目录
本文详细介绍了流批一体的开发过程、source、transformations、sink详细的api以及flink与kafka的详细功能介绍。详细的介绍了18种transformations算子的功能、示例代码。
本文仅仅是介绍概念性内容,详细示例参考该系列文章Flink(五)source、transformations、sink的详细示例
本文部分图片、文字来源于互联网。
本分为2个部分,即流批一体介绍和flink与kafka介绍。
一、流批一体API
在自然环境中,数据的产生原本就是流式的。无论是来自 Web 服务器的事件数据,证券交易所的交易数据,还是来自工厂车间机器上的传感器数据,其数据都是流式的。但是当你分析数据时,可以围绕 有界流(bounded)或 无界流(unbounded)两种模型来组织处理数据,当然,选择不同的模型,程序的执行和处理方式也都会不同。
- 批处理是有界数据流处理的范例。在这种模式下,你可以选择在计算结果输出之前输入整个数据集,这也就意味着你可以对整个数据集的数据进行排序、统计或汇总计算后再输出结果。
- 流处理正相反,其涉及无界数据流。至少理论上来说,它的数据输入永远不会结束,因此程序必须持续不断地对到达的数据进行处理。
在 Flink 中,应用程序由用户自定义算子转换而来的流式 dataflows 所组成。这些流式 dataflows 形成了有向图,以一个或多个源(source)开始,并以一个或多个汇(sink)结束。
通常,程序代码中的 transformation 和 dataflow 中的算子(operator)之间是一一对应的。但有时也会出现一个 transformation 包含多个算子的情况,如上图所示。
Flink 应用程序可以消费来自消息队列或分布式日志这类流式数据源(例如 Apache Kafka 或 Kinesis)的实时数据,也可以从各种的数据源中消费有界的历史数据。同样,Flink 应用程序生成的结果流也可以发送到各种数据汇中。
2、DataStream API
DataStream API 支持流批执行模式
Flink 的核心 API 最初是针对特定的场景设计的,尽管 Table API / SQL 针对流处理和批处理已经实现了统一的 API,但当用户使用较底层的 API 时,仍然需要在批处理(DataSet API)和流处理(DataStream API)这两种不同的 API 之间进行选择。鉴于批处理是流处理的一种特例,将这两种 API 合并成统一的 API,有一些非常明显的好处,比如:
- 可复用性:作业可以在流和批这两种执行模式之间自由地切换,而无需重写任何代码。因此,用户可以复用同一个作业,来处理实时数据和历史数据。
- 维护简单:统一的 API 意味着流和批可以共用同一组 connector,维护同一套代码,并能够轻松地实现流批混合执行,例如 backfilling 之类的场景。
流批统一的 DataStream API 支持高效的批处理(FLIP-134),DataSet API 将被弃用(FLIP-131),其功能将被包含在 DataStream API 和 Table API / SQL 中。
3、Flink的编程模型
4、编程步骤
官网链接说明:Apache Flink 1.12 Documentation: Flink DataStream API 编程指南
Flink programs look like regular programs that transform DataStreams/dataset. Each program consists of the same basic parts:
- Obtain an execution environment, 准备环境env
- Load/create the initial data,加载数据源
- Specify transformations on this data,转换操作
- Specify where to put the results of your computations,sink结果
- Trigger the program execution,触发执行
1)、准备环境env
getExecutionEnvironment(),推荐使用
createLocalEnvironment()
createRemoteEnvironment(String host, int port, String... jarFiles)
示例如下:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- 1
- 2
- 3
- 4
- 5
2)、加载数据源
env可以加载很多种数据源,比如文件、socket、fromelements等
示例如下:
DataSet<String> lines = env.fromElements("flink hadoop hive", "flink hadoop hive", "flink hadoop", "flink");
DataStream<String> text = env.readTextFile("file:///path/to/file");
- 1
- 2
- 3
- 4
3)、转换操作
flink的核心功能之一就是转换处理操作,有很多种实现
示例如下:
DataStream<Integer> parsed = input.map(new MapFunction<String, Integer>() {
@Override
public Integer map(String value) {
return Integer.parseInt(value);
}
});
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
4)、sink结果
sink可以有很多种数据源,比如关系型数据库、消息队列、hdfs、redis等
示例如下:
writeAsText(String path)
print()
- 1
- 2
- 3
- 4
5)、触发执行
Once you specified the complete program you need to trigger the program execution by calling execute() on the StreamExecutionEnvironment. Depending on the type of the ExecutionEnvironment the execution will be triggered on your local machine or submit your program for execution on a cluster.
The execute() method will wait for the job to finish and then return a JobExecutionResult, this contains execution times and accumulator results.
If you don’t want to wait for the job to finish, you can trigger asynchronous job execution by calling executeAysnc() on the StreamExecutionEnvironment. It will return a JobClient with which you can communicate with the job you just submitted. For instance, here is how to implement the semantics of execute() by using executeAsync().
示例如下:
final JobClient jobClient = env.executeAsync();
final JobExecutionResult jobExecutionResult = jobClient.getJobExecutionResult().get();
- 1
- 2
- 3
That last part about program execution is crucial to understanding when and how Flink operations are executed. All Flink programs are executed lazily: When the program’s main method is executed, the data loading and transformations do not happen directly. Rather, each operation is created and added to a dataflow graph. The operations are actually executed when the execution is explicitly triggered by an execute() call on the execution environment. Whether the program is executed locally or on a cluster depends on the type of execution environment
The lazy evaluation lets you construct sophisticated programs that Flink executes as one holistically planned unit.
5、Source-Transformations-Sink介绍
官网链接:Apache Flink 1.12 Documentation: Flink DataStream API 编程指南
1)、Source介绍
source是数据输入源,使用StreamExecutionEnvironment.addSource(sourceFunction)添加数据源,支持的数据源有基于文件的、基于socket的、基于集合的和基于自定义的。
Sources are where your program reads its input from. You can attach a source to your program by using StreamExecutionEnvironment.addSource(sourceFunction). Flink comes with a number of pre-implemented source functions, but you can always write your own custom sources by implementing the SourceFunction for non-parallel sources, or by implementing the ParallelSourceFunction interface or extending the RichParallelSourceFunction for parallel sources.
There are several predefined stream sources accessible from the StreamExecutionEnvironment:
1、File-based
一般用于测试或生产应用场景
path可以是本地文件(包含压缩文件)、hdfs等文件、文件夹
- readTextFile(path) - Reads text files, i.e. files that respect the TextInputFormat specification, line-by-line and returns them as Strings.
- readFile(fileInputFormat, path) - Reads (once) files as dictated by the specified file input format.
- readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo) - This is the method called internally by the two previous ones. It reads files in the path based on the given fileInputFormat. Depending on the provided watchType, this source may periodically monitor (every interval ms) the path for new data (FileProcessingMode.PROCESS_CONTINUOUSLY), or process once the data currently in the path and exit (FileProcessingMode.PROCESS_ONCE). Using the pathFilter, the user can further exclude files from being processed.
IMPLEMENTATION:
Under the hood, Flink splits the file reading process into two sub-tasks, namely directory monitoring and data reading. Each of these sub-tasks is implemented by a separate entity. Monitoring is implemented by a single, non-parallel (parallelism = 1) task, while reading is performed by multiple tasks running in parallel. The parallelism of the latter is equal to the job parallelism. The role of the single monitoring task is to scan the directory (periodically or only once depending on the watchType), find the files to be processed, divide them in splits, and assign these splits to the downstream readers. The readers are the ones who will read the actual data. Each split is read by only one reader, while a reader can read multiple splits, one-by-one.
IMPORTANT NOTES:
If the watchType is set to FileProcessingMode.PROCESS_CONTINUOUSLY, when a file is modified, its contents are re-processed entirely. This can break the “exactly-once” semantics, as appending data at the end of a file will lead to all its contents being re-processed.
If the watchType is set to FileProcessingMode.PROCESS_ONCE, the source scans the path once and exits, without waiting for the readers to finish reading the file contents. Of course the readers will continue reading until all file contents are read. Closing the source leads to no more checkpoints after that point. This may lead to slower recovery after a node failure, as the job will resume reading from the last checkpoint.
2、Socket-based
一般用于测试应用场景
socketTextStream - Reads from a socket. Elements can be separated by a delimiter.
3、Collection-based
一般用于测试、验证场景
- fromCollection(Collection) - Creates a data stream from the Java Java.util.Collection. All elements in the collection must be of the same type.
- fromCollection(Iterator, Class) - Creates a data stream from an iterator. The class specifies the data type of the elements returned by the iterator.
- fromElements(T …) - Creates a data stream from the given sequence of objects. All objects must be of the same type.
fromParallelCollection(SplittableIterator, Class) - Creates a data stream from an iterator, in parallel. The class specifies the data type of the elements returned by the iterator. - generateSequence(from, to) - Generates the sequence of numbers in the given interval, in parallel.该方法被替换了成了fromSequence了
4、Custom
一般用于测试、生产应用场景
- addSource - Attach a new source function. For example, to read from Apache Kafka you can use addSource(new FlinkKafkaConsumer<>(…)). See connectors for more details.
2)、Transformations介绍
用户通过算子Operators能将一个或多个 DataStream 转换成新的 DataStream,在应用程序中可以将多个数据转换算子Operators合并成一个复杂的数据流拓扑。
这部分内容将描述 Flink DataStream API 中基本的数据转换API,数据转换后各种数据分区方式,以及算子的链接策略。
整体来说,流式数据上的操作可以分为四类。
- 第一类是对于单条记录的操作,比如筛除掉不符合要求的记录(Filter 操作),或者将每条记录都做一个转换(Map 操作)
- 第二类是对多条记录的操作。比如说统计一个小时内的订单总成交量,就需要将一个小时内的所有订单记录的成交量加到一起。为了支持这种类型的操作,就得通过 Window 将需要的记录关联到一起进行处理
- 第三类是对多个流进行操作并转换为单个流。例如,多个流可以通过 Union、Join 或 Connect 等操作合到一起。这些操作合并的逻辑不同,但是它们最终都会产生了一个新的统一的流,从而可以进行一些跨流的操作。
- 第四类DataStream 还支持与合并对称的拆分操作,即把一个流按一定规则拆分为多个流(Split 操作),每个流是之前流的一个子集,这样就可以对不同的流作不同的处理。
1、map
DataStream → DataStream
将函数作用在集合中的每一个元素上,并返回作用后的结果
Takes one element and produces one element. A map function that doubles the values of the input stream:
DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer value) throws Exception {
return 2 * value;
}
});
- 1
- 2
- 3
- 4
- 5
- 6
- 7
2、flatmap
DataStream → DataStream
将集合中的每个元素变成一个或多个元素,并返回扁平化之后的结果
Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:
dataStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out)
throws Exception {
for(String word: value.split(" ")){
out.collect(word);
}
}
});
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
3、Filter
DataStream → DataStream
按照指定的条件对集合中的元素进行过滤,过滤出返回true/符合条件的元素
Evaluates a boolean function for each element and retains those for which the function returns true. A filter that filters out zero values:
dataStream.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) throws Exception {
return value != 0;
}
});
- 1
- 2
- 3
- 4
- 5
- 6
4、KeyBy
DataStream → KeyedStream
按照指定的key来对流中的数据进行分组
Logically partitions a stream into disjoint partitions. All records with the same key are assigned to the same partition. Internally, keyBy() is implemented with hash partitioning. There are different ways to specify keys.
This transformation returns a KeyedStream, which is, among other things, required to use keyed state.
dataStream.keyBy(value -> value.getSomeKey()) // Key by field "someKey"
dataStream.keyBy(value -> value.f0) // Key by the first element of a Tuple
- 1
- 2
A type cannot be a key if:
- it is a POJO type but does not override the hashCode() method and relies on the Object.hashCode() implementation.
- it is an array of any type.
5、Reduce
KeyedStream → DataStream
对集合中的元素进行聚合
A “rolling” reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value.
A reduce function that creates a stream of partial sums:
keyedStream.reduce(new ReduceFunction<Integer>() {
@Override
public Integer reduce(Integer value1, Integer value2)
throws Exception {
return value1 + value2;
}
});
- 1
- 2
- 3
- 4
- 5
- 6
- 7
6、Aggregations
KeyedStream → DataStream
Rolling aggregations on a keyed data stream. The difference between min and minBy is that min returns the minimum value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).
keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
7、Window
KeyedStream → WindowedStream
Windows can be defined on already partitioned KeyedStreams. Windows group the data in each key according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a complete description of windows.
dataStream.keyBy(value -> value.f0).window(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
- 1
8、WindowAll
DataStream → AllWindowedStream
Windows can be defined on regular DataStreams. Windows group all the stream events according to some characteristic (e.g., the data that arrived within the last 5 seconds). See windows for a complete description of windows.
WARNING: This is in many cases a non-parallel transformation. All records will be gathered in one task for the windowAll operator.
dataStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(5))); // Last 5 seconds of data
- 1
9、Window Apply
WindowedStream → DataStream
AllWindowedStream → DataStream
Applies a general function to the window as a whole. Below is a function that manually sums the elements of a window.
Note: If you are using a windowAll transformation, you need to use an AllWindowFunction instead.
windowedStream.apply (new WindowFunction<Tuple2<String,Integer>, Integer, Tuple, Window>() {
public void apply (Tuple tuple,
Window window,
Iterable<Tuple2<String, Integer>> values,
Collector<Integer> out) throws Exception {
int sum = 0;
for (value t: values) {
sum += t.f1;
}
out.collect (new Integer(sum));
}
});
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
10、Window Reduce
WindowedStream → DataStream
Applies a functional reduce function to the window and returns the reduced value.
windowedStream.reduce (new ReduceFunction<Tuple2<String,Integer>>() {
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
return new Tuple2<String,Integer>(value1.f0, value1.f1 + value2.f1);
}
});
- 1
- 2
- 3
- 4
- 5
11、Aggregations on windows
WindowedStream → DataStream
Aggregates the contents of a window. The difference between min and minBy is that min returns the minimum value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).
windowedStream.sum(0);
windowedStream.sum("key");
windowedStream.min(0);
windowedStream.min("key");
windowedStream.max(0);
windowedStream.max("key");
windowedStream.minBy(0);
windowedStream.minBy("key");
windowedStream.maxBy(0);
windowedStream.maxBy("key");
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
12、Union
union算子可以合并多个同类型的数据流,并生成同类型的数据流,即可以将多个DataStream[T]合并为一个新的DataStream[T]。数据将按照先进先出(First In First Out)的模式合并,且不去重
DataStream* → DataStream
Union of two or more data streams creating a new stream containing all the elements from all the streams. Note: If you union a data stream with itself you will get each element twice in the resulting stream.
dataStream.union(otherStream1, otherStream2, ...);
- 1
13、Window Join
DataStream,DataStream → DataStream
Join two data streams on a given key and a common window.
dataStream.join(otherStream)
.where(<key selector>).equalTo(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new JoinFunction () {...});
- 1
- 2
- 3
- 4
14、Interval Join
KeyedStream,KeyedStream → DataStream
Join two elements e1 and e2 of two keyed streams with a common key over a given time interval, so that e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound
// this will join the two streams so that
// key1 == key2 && leftTs - 2 < rightTs < leftTs + 2
keyedStream.intervalJoin(otherKeyedStream)
.between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound
.upperBoundExclusive(true) // optional
.lowerBoundExclusive(true) // optional
.process(new IntervalJoinFunction() {...});
- 1
- 2
- 3
- 4
- 5
- 6
- 7
15、Window CoGroup
DataStream,DataStream → DataStream
Cogroups two data streams on a given key and a common window.
dataStream.coGroup(otherStream)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new CoGroupFunction () {...});
- 1
- 2
- 3
- 4
16、Connect
connect提供了和union类似的功能,用来连接两个数据流,它与union的区别在于:
connect只能连接两个数据流,union可以连接多个数据流。
connect所连接的两个数据流的数据类型可以不一致,union所连接的两个数据流的数据类型必须一致。
两个DataStream经过connect之后被转化为ConnectedStreams,ConnectedStreams会对两个流的数据应用不同的处理方法,且双流之间可以共享状态。
DataStream,DataStream → ConnectedStreams
“Connects” two data streams retaining their types. Connect allowing for shared state between the two streams.
DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...
- 1
- 2
ConnectedStreams
17、CoMap, CoFlatMap
ConnectedStreams → DataStream
Similar to map and flatMap on a connected data stream
connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
@Override
public Boolean map1(Integer value) {
return true;
}
@Override
public Boolean map2(String value) {
return false;
}
});
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {
@Override
public void flatMap1(Integer value, Collector<String> out) {
out.collect(value.toString());
}
@Override
public void flatMap2(String value, Collector<String> out) {
for (String word: value.split(" ")) {
out.collect(word);
}
}
});
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
18、Iterate
DataStream → IterativeStream → DataStream
Creates a “feedback” loop in the flow, by redirecting the output of one operator to some previous operator. This is especially useful for defining algorithms that continuously update a model. The following code starts with a stream and applies the iteration body continuously. Elements that are greater than 0 are sent back to the feedback channel, and the rest of the elements are forwarded downstream.
IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/*do something*/);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
@Override
public boolean filter(Long value) throws Exception {
return value > 0;
}
});
iteration.closeWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
@Override
public boolean filter(Long value) throws Exception {
return value <= 0;
}
});
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
3)、Sink介绍
1、Flink支持的sink
Data sinks consume DataStreams and forward them to files, sockets, external systems, or print them. Flink comes with a variety of built-in output formats that are encapsulated behind operations on the DataStreams:
- writeAsText() / TextOutputFormat - Writes elements line-wise as Strings. The Strings are obtained by calling the toString() method of each element.
- writeAsCsv(…) / CsvOutputFormat - Writes tuples as comma-separated value files. Row and field delimiters are configurable. The value for each field comes from the toString() method of the objects.
- print() / printToErr() - Prints the toString() value of each element on the standard out / standard error stream. Optionally, a prefix (msg) can be provided which is prepended to the output. This can help to distinguish between different calls to print. If the parallelism is greater than 1, the output will also be prepended with the identifier of the task which produced the output.
writeUsingOutputFormat() / FileOutputFormat - Method and base class for custom file outputs. Supports custom object-to-bytes conversion. - writeToSocket - Writes elements to a socket according to a SerializationSchema
- addSink - Invokes a custom sink function. Flink comes bundled with connectors to other systems (such as Apache Kafka) that are implemented as sink functions.
Note that the write*() methods on DataStream are mainly intended for debugging purposes. They are not participating in Flink’s checkpointing, this means these functions usually have at-least-once semantics. The data flushing to the target system depends on the implementation of the OutputFormat. This means that not all elements send to the OutputFormat are immediately showing up in the target system. Also, in failure cases, those records might be lost.
For reliable, exactly-once delivery of a stream into a file system, use the StreamingFileSink. Also, custom implementations through the .addSink(…) method can participate in Flink’s checkpointing for exactly-once semantics.
- ds.print 直接输出到控制台
- ds.printToErr() 直接输出到控制台,用红色
- ds.writeAsText(“本地/HDFS的path”,WriteMode.OVERWRITE).setParallelism(1)
在输出到path的时候,可以在前面设置并行度,如果
并行度>1,则path为目录
并行度=1,则path为文件名
2、自定义Sink
自己实现RichSinkFunction等sink接口或抽象类即可。
6、DataStream Connectors
DataStream Flink支持很多种连接,接下来会在该系列文章中给出使用示例,如下图
1)、JDBC
This connector provides a sink that writes data to a JDBC database.
To use it, add the following dependency to your project (along with your JDBC-driver):
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-connector-jdbc_2.11artifactId>
<version>1.12.7version>
dependency>
- 1
- 2
- 3
- 4
- 5
Note that the streaming connectors are currently NOT part of the binary distribution. See how to link with them for cluster execution here.
Created JDBC sink provides at-least-once guarantee. Effectively exactly-once can be achieved using upsert statements or idempotent updates.
Example usage:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
.fromElements(...)
.addSink(JdbcSink.sink(
"insert into books (id, title, author, price, qty) values (?,?,?,?,?)",
(ps, t) -> {
ps.setInt(1, t.id);
ps.setString(2, t.title);
ps.setString(3, t.author);
ps.setDouble(4, t.price);
ps.setInt(5, t.qty);
},
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl(getDbMetadata().getUrl())
.withDriverName(getDbMetadata().getDriverClass())
.build()));
env.execute();
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
2)、kafka
官网介绍:Kafka | Apache Flink
Flink 里已经提供了一些绑定的 Connector,例如 kafka source 和 sink,Es sink 等。读写 kafka、es、rabbitMQ 时可以直接使用相应 connector 的 api 即可,虽然该部分是 Flink 项目源代码里的一部分,但是真正意义上不算作 Flink 引擎相关逻辑,并且该部分没有打包在二进制的发布包里面。所以在提交 Job 时候需要注意, job 代码 jar 包中一定要将相应的 connetor 相关类打包进去,否则在提交作业时就会失败,提示找不到相应的类,或初始化某些类异常。
以下参数都建议设置
订阅的主题
反序列化规则
消费者属性-集群地址
消费者属性-消费者组id(如果不设置,会有默认的,但是默认的不方便管理)
消费者属性-offset重置规则,如earliest/latest…
动态分区检测(当kafka的分区数变化/增加时,Flink能够检测到!)
如果没有设置Checkpoint,那么可以设置自动提交offset,后续了解了Checkpoint会把offset随着做Checkpoint的时候提交到Checkpoint和默认主题中
Apache Flink ships with a universal Kafka connector which attempts to track the latest version of the Kafka client. The version of the client it uses may change between Flink releases. Modern Kafka clients are backwards compatible with broker versions 0.10.0 or later. For details on Kafka compatibility, please refer to the official Kafka documentation.
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-connector-kafkaartifactId>
<version>1.17.1version>
dependency>
- 1
- 2
- 3
- 4
- 5
Flink’s streaming connectors are not part of the binary distribution.
二、Flink与kafka
该部分是关于Flink中使用kafka更详细的介绍,内容摘抄至官网。下文中没有介绍关于kafka的性能指标、安全等信息,更多的内容参考官网。
特别注意:
FlinkKafkaConsumer is deprecated and will be removed with Flink 1.17, please use KafkaSource instead.
FlinkKafkaProducer is deprecated and will be removed with Flink 1.15, please use KafkaSink instead.
1、Kafka Source
1)、用法Usage
Kafka source provides a builder class for constructing instance of KafkaSource. The code snippet below shows how to build a KafkaSource to consume messages from the earliest offset of topic “input-topic”, with consumer group “my-group” and deserialize only the value of message as string.
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers(brokers)
.setTopics("input-topic")
.setGroupId("my-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
The following properties are required for building a KafkaSource:
- Bootstrap servers, configured by setBootstrapServers(String)
- Topics / partitions to subscribe, see the following Topic-partition subscription for more details.
- Deserializer to parse Kafka messages, see the following Deserializer for more details.
2)、主题订阅Topic-partition Subscription
Kafka source provide 3 ways of topic-partition subscription:
- Topic list, subscribing messages from all partitions in a list of topics. For example:
KafkaSource.builder().setTopics("topic-a", "topic-b");
- 1
- Topic pattern, subscribing messages from all topics whose name matches the provided regular expression. For example:
KafkaSource.builder().setTopicPattern("topic.*");
- 1
- Partition set, subscribing partitions in the provided partition set. For example:
final HashSet<TopicPartition> partitionSet = new HashSet<>(Arrays.asList(
new TopicPartition("topic-a", 0), // Partition 0 of topic "topic-a"
new TopicPartition("topic-b", 5))); // Partition 5 of topic "topic-b"
KafkaSource.builder().setPartitions(partitionSet);
- 1
- 2
- 3
- 4
3)、反序列化Deserializer
A deserializer is required for parsing Kafka messages. Deserializer (Deserialization schema) can be configured by setDeserializer(KafkaRecordDeserializationSchema), where KafkaRecordDeserializationSchema defines how to deserialize a Kafka ConsumerRecord.
If only the value of Kafka ConsumerRecord is needed, you can use setValueOnlyDeserializer(DeserializationSchema) in the builder, where DeserializationSchema defines how to deserialize binaries of Kafka message value.
You can also use a Kafka Deserializer for deserializing Kafka message value. For example using StringDeserializer for deserializing Kafka message value as string:
import org.apache.kafka.common.serialization.StringDeserializer;
KafkaSource.<String>builder()
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));
- 1
- 2
- 3
4)、偏移量Starting Offset
Kafka source is able to consume messages starting from different offsets by specifying OffsetsInitializer. Built-in initializers include:
KafkaSource.builder()
// Start from committed offset of the consuming group, without reset strategy
.setStartingOffsets(OffsetsInitializer.committedOffsets())
// Start from committed offset, also use EARLIEST as reset strategy if committed offset doesn't exist
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
// Start from the first record whose timestamp is greater than or equals a timestamp (milliseconds)
.setStartingOffsets(OffsetsInitializer.timestamp(1657256176000L))
// Start from earliest offset
.setStartingOffsets(OffsetsInitializer.earliest())
// Start from latest offset
.setStartingOffsets(OffsetsInitializer.latest());
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
You can also implement a custom offsets initializer if built-in initializers above cannot fulfill your requirement. (Not supported in PyFlink)
If offsets initializer is not specified, OffsetsInitializer.earliest() will be used by default.
5)、有界性Boundedness
Kafka source is designed to support both streaming and batch running mode. By default, the KafkaSource is set to run in streaming manner, thus never stops until Flink job fails or is cancelled. You can use setBounded(OffsetsInitializer) to specify stopping offsets and set the source running in batch mode. When all partitions have reached their stopping offsets, the source will exit.
You can also set KafkaSource running in streaming mode, but still stop at the stopping offset by using setUnbounded(OffsetsInitializer). The source will exit when all partitions reach their specified stopping offset.
6)、配置Additional Properties
In addition to properties described above, you can set arbitrary properties for KafkaSource and KafkaConsumer by using setProperties(Properties) and setProperty(String, String). KafkaSource has following options for configuration:
- client.id.prefix defines the prefix to use for Kafka consumer’s client ID
- partition.discovery.interval.ms defines the interval im milliseconds for Kafka source to discover new partitions. See Dynamic Partition Discovery below for more details.
- register.consumer.metrics specifies whether to register metrics of KafkaConsumer in Flink metric group
- commit.offsets.on.checkpoint specifies whether to commit consuming offsets to Kafka brokers on checkpoint
For configurations of KafkaConsumer, you can refer to Apache Kafka documentation for more details.
Please note that the following keys will be overridden by the builder even if it is configured:
- key.deserializer is always set to ByteArrayDeserializer
- value.deserializer is always set to ByteArrayDeserializer
- auto.offset.reset.strategy is overridden by OffsetsInitializer#getAutoOffsetResetStrategy() for the starting offsets
- partition.discovery.interval.ms is overridden to -1 when setBounded(OffsetsInitializer) has been invoked
7)、动态分区Dynamic Partition Discovery
In order to handle scenarios like topic scaling-out or topic creation without restarting the Flink job, Kafka source can be configured to periodically discover new partitions under provided topic-partition subscribing pattern. To enable partition discovery, set a non-negative value for property partition.discovery.interval.ms:
//Partition discovery is disabled by default. You need to explicitly set the partition discovery interval to enable this feature.
KafkaSource.builder()
.setProperty("partition.discovery.interval.ms", "10000"); // discover new partitions per 10 seconds
- 1
- 2
- 3
8)、事件时间与水印Event Time and Watermarks
By default, the record will use the timestamp embedded in Kafka ConsumerRecord as the event time. You can define your own WatermarkStrategy for extract event time from the record itself, and emit watermark downstream:
env.fromSource(kafkaSource, new CustomWatermarkStrategy(), "Kafka Source With Custom Watermark Strategy");
- 1
This documentation describes details about how to define a WatermarkStrategy. (Not supported in PyFlink)
9)、惰性Idleness
The Kafka Source does not go automatically in an idle state if the parallelism is higher than the number of partitions. You will either need to lower the parallelism or add an idle timeout to the watermark strategy. If no records flow in a partition of a stream for that amount of time, then that partition is considered “idle” and will not hold back the progress of watermarks in downstream operators.
This documentation describes details about how to define a WatermarkStrategy#withIdleness.
10)、消费者偏移量提交Consumer Offset Committing
Kafka source commits the current consuming offset when checkpoints are completed, for ensuring the consistency between Flink’s checkpoint state and committed offsets on Kafka brokers.
If checkpointing is not enabled, Kafka source relies on Kafka consumer’s internal automatic periodic offset committing logic, configured by enable.auto.commit and auto.commit.interval.ms in the properties of Kafka consumer.
Note that Kafka source does NOT rely on committed offsets for fault tolerance. Committing offset is only for exposing the progress of consumer and consuming group for monitoring.
2、Kafka Sink
KafkaSink allows writing a stream of records to one or more Kafka topics.
1)、用法Usage
Kafka sink provides a builder class to construct an instance of a KafkaSink. The code snippet below shows how to write String records to a Kafka topic with a delivery guarantee of at least once.
DataStream<String> stream = ...;
KafkaSink<String> sink = KafkaSink.<String>builder()
.setBootstrapServers(brokers)
.setRecordSerializer(KafkaRecordSerializationSchema.builder()
.setTopic("topic-name")
.setValueSerializationSchema(new SimpleStringSchema())
.build()
)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.build();
stream.sinkTo(sink);
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
The following properties are required to build a KafkaSink:
- Bootstrap servers, setBootstrapServers(String)
- Record serializer, setRecordSerializer(KafkaRecordSerializationSchema)
- If you configure the delivery guarantee with DeliveryGuarantee.EXACTLY_ONCE you also have use setTransactionalIdPrefix(String)
2)、序列化Serializer
You always need to supply a KafkaRecordSerializationSchema to transform incoming elements from the data stream to Kafka producer records. Flink offers a schema builder to provide some common building blocks i.e. key/value serialization, topic selection, partitioning. You can also implement the interface on your own to exert more control.
KafkaRecordSerializationSchema.builder()
.setTopicSelector((element) -> {<your-topic-selection-logic>})
.setValueSerializationSchema(new SimpleStringSchema())
.setKeySerializationSchema(new SimpleStringSchema())
.setPartitioner(new FlinkFixedPartitioner())
.build();
- 1
- 2
- 3
- 4
- 5
- 6
It is required to always set a value serialization method and a topic (selection method). Moreover, it is also possible to use Kafka serializers instead of Flink serializer by using setKafkaKeySerializer(Serializer) or setKafkaValueSerializer(Serializer).
3)、容错Fault Tolerance
Overall the KafkaSink supports three different DeliveryGuarantees. For DeliveryGuarantee.AT_LEAST_ONCE and DeliveryGuarantee.EXACTLY_ONCE Flink’s checkpointing must be enabled. By default the KafkaSink uses DeliveryGuarantee.NONE. Below you can find an explanation of the different guarantees.
-
DeliveryGuarantee.NONE does not provide any guarantees: messages may be lost in case of issues on the Kafka broker and messages may be duplicated in case of a Flink failure.
-
DeliveryGuarantee.AT_LEAST_ONCE: The sink will wait for all outstanding records in the Kafka buffers to be acknowledged by the Kafka producer on a checkpoint. No messages will be lost in case of any issue with the Kafka brokers but messages may be duplicated when Flink restarts because Flink reprocesses old input records.
-
DeliveryGuarantee.EXACTLY_ONCE: In this mode, the KafkaSink will write all messages in a Kafka transaction that will be committed to Kafka on a checkpoint. Thus, if the consumer reads only committed data (see Kafka consumer config isolation.level), no duplicates will be seen in case of a Flink restart. However, this delays record visibility effectively until a checkpoint is written, so adjust the checkpoint duration accordingly. Please ensure that you use unique transactionalIdPrefix across your applications running on the same Kafka cluster such that multiple running jobs do not interfere in their transactions! Additionally, it is highly recommended to tweak Kafka transaction timeout (see Kafka producer transaction.timeout.ms)» maximum checkpoint duration + maximum restart duration or data loss may happen when Kafka expires an uncommitted transaction.
3、Transformations
这个与上面的介绍的内容一致,不再赘述。
以上,本文详细介绍了流批一体的开发过程、source、transformations、sink详细的api以及flink与kafka的详细功能介绍。
评论记录:
回复评论: