Flink 系列文章
1、Flink1.12.7或1.13.5详细介绍及本地安装部署、验证
2、Flink1.13.5二种部署方式(Standalone、Standalone HA )、四种提交任务方式(前两种及session和per-job)验证详细步骤
3、flink重要概念(api分层、角色、执行流程、执行图和编程模型)及dataset、datastream详细示例入门和提交任务至on yarn运行
4、介绍Flink的流批一体、transformations的18种算子详细介绍、Flink与Kafka的source、sink介绍
5、Flink 的 source、transformations、sink的详细示例(一)
5、Flink的source、transformations、sink的详细示例(二)-source和transformation示例
5、Flink的source、transformations、sink的详细示例(三)-sink示例
6、Flink四大基石之Window详解与详细示例(一)
6、Flink四大基石之Window详解与详细示例(二)
7、Flink四大基石之Time和WaterMaker详解与详细示例(watermaker基本使用、kafka作为数据源的watermaker使用示例以及超出最大允许延迟数据的接收实现)
8、Flink四大基石之State概念、使用场景、持久化、批处理的详解与keyed state和operator state、broadcast state使用和详细示例
9、Flink四大基石之Checkpoint容错机制详解及示例(checkpoint配置、重启策略、手动恢复checkpoint和savepoint)
10、Flink的source、transformations、sink的详细示例(二)-source和transformation示例【补充示例】
11、Flink配置flink-conf.yaml详细说明(HA配置、checkpoint、web、安全、zookeeper、historyserver、workers、zoo.cfg)
12、Flink source和sink 的 clickhouse 详细示例
13、Flink 的table api和sql的介绍、示例等系列综合文章链接
本文介绍了三个大类的示例,即基于时间和基于数量的滚动窗口与滑动窗口、会话窗口,其中包含详细的验证步骤与验证结果。
本文分为三个部分,即基于时间的滚动与滑动窗口、基于数量的滚动与滑动窗口和会话窗口。
二、示例1:基于时间的滚动和滑动窗口
1、滚动窗口实现统计地铁进站口人数
实现:每10s统计一次地铁进站每个入口人数,最近10s每个进站口的人数
1)、一般实现
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.api.java.functions.KeySelector;
/**
* @author alanchan
* 基于滚动窗口的入门示例
* 每10s统计一次地铁进站每个入口人数,最近10s每个进站口的人数
* size=slide
*
*/
public class TumblingTimeWindowsDemo1 {
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// source
// nc
// 数据结构: 入口编号,人数
// 12,50
// 11,28
DataStream<String> lines = env.socketTextStream("192.168.10.42", 9999);
// transformation
DataStream<Tuple2<String, Integer>> subwayExit = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String line) throws Exception {
String[] arr = line.split(",");
return Tuple2.of(arr[0], Integer.parseInt(arr[1]));
}
});
//按照地铁口分组
// KeyedStream, String> keyedDS = subwayExit.keyBy(new KeySelector, String>() {
// @Override
// public String getKey(Tuple2 value) throws Exception {
// return value.f0;
// }
// });
//另外一种分组方式
KeyedStream<Tuple2<String, Integer>, Tuple> keyedDS = subwayExit.keyBy(0);
DataStream<Tuple2<String, Integer>> result1 = keyedDS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
//另外一种聚合方式实现
// .reduce(new ReduceFunction>() {
//
// @Override
// public Tuple2 reduce(Tuple2 value1, Tuple2 value2) throws Exception {
//
// return Tuple2.of(value1.f0, value1.f1 + value2.f1);
// }
//
// });
.sum(1);
// sink
result1.print();
// execute
env.execute();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 验证步骤
1、启动nc
nc -lk 9999
- 1
2、启动应用程序
3、nc控制台输入
[alanchan@server2 src]$ nc -lk 9999
no1,1
no2,1
no1,2
no1,3
no2,6
- 1
- 2
- 3
- 4
- 5
- 6
- 7
4、查看应用程序控制台输出
2)、面向对象实现
- bean
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author alanchan
*
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class SubWay {
// 地铁站进站口
private String No;
// 某一时段人数
private Integer userCount;
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 实现
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
* @author alanchan
* 基于滚动窗口的入门示例
* 每10s统计一次地铁进站每个入口人数,最近10s每个进站口的人数
* size=slide
*
*/
public class TumblingTimeWindowsDemo2 {
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// source
// nc
// 数据结构: 入口编号,人数
// 12,50
// 11,28
DataStream<String> lines = env.socketTextStream("192.168.10.42", 9999);
// transformation
DataStream<Subway> subwayExit = lines.map(new MapFunction<String, Subway>() {
@Override
public Subway map(String line) throws Exception {
String[] arr = line.split(",");
return new Subway(arr[0], Integer.parseInt(arr[1]));
}
});
// 按照地铁口分组
KeyedStream<Subway, String> keyedDS = subwayExit.keyBy(new KeySelector<Subway, String>() {
@Override
public String getKey(Subway value) throws Exception {
return value.getNo();
}
});
//userCount是Subway的属性名称
DataStream<Subway> result = keyedDS.window(TumblingProcessingTimeWindows.of(Time.seconds(10))).sum("userCount");
// sink
result.print();
// execute
env.execute();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 验证步骤
1、启动nc
nc -lk 9999
- 1
2、启动应用程序
3、nc控制台输入
[alanchan@server2 src]$ nc -lk 9999
no,1
no2,1
no2,4
no1,2
- 1
- 2
- 3
- 4
- 5
4、查看应用程序控制台输出
3)、面向对象lambda实现
import java.util.Arrays;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.flink.api.common.typeinfo.Types;
/**
* @author alanchan
*
*/
public class TumblingTimeWindowsDemo3 {
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// source
// nc
// 数据结构: 入口编号,人数
// 12,50
// 11,28
DataStream<String> lines = env.socketTextStream("192.168.10.42", 9999);
// transformation
// DataStream subwayExit = lines.map(new MapFunction() {
//
// @Override
// public Subway map(String line) throws Exception {
// String[] arr = line.split(",");
// return new Subway(arr[0], Integer.parseInt(arr[1]));
// }
// });
DataStream<Subway> subwayExit = lines.map(new Splitter());
// 按照地铁口分组
KeyedStream<Subway, String> keyedDS = subwayExit.keyBy(Subway::getNo);
// subwayExit.keyBy(subway->subway.getNo())
DataStream<Subway> result = keyedDS.window(TumblingProcessingTimeWindows.of(Time.seconds(10))).sum("userCount");
// sink
result.print();
// execute
env.execute();
}
public static class Splitter implements MapFunction<String, Subway> {
@Override
public Subway map(String value) {
String[] arr = value.split(",");
return new Subway(arr[0], Integer.parseInt(arr[1]));
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 验证步骤
1、启动nc
nc -lk 9999
- 1
2、启动应用程序
3、nc控制台输入
[alanchan@server2 src]$ nc -lk 9999
n1,2
n2,3
n1,4
n1,5
- 1
- 2
- 3
- 4
- 5
- 6
4、查看应用程序控制台输出
4)、一般lambda实现
- 实现
import java.util.Arrays;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.flink.api.common.typeinfo.Types;
/**
* @author alanchan
*
*/
public class TumblingTimeWindowsDemo4 {
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("192.168.10.42", 9999)
.map(new Splitter())
.keyBy(value -> value.f0)
.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
.sum(1);
// sink
dataStream.print();
// execute
env.execute();
}
public static class Splitter implements MapFunction<String, Tuple2<String, Integer>> {
@Override
public Tuple2<String, Integer> map(String value) {
String[] arr = value.split(",");
return new Tuple2(arr[0], Integer.parseInt(arr[1]));
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 验证步骤
1、启动nc
nc -lk 9999
- 1
2、启动应用程序
3、nc控制台输入
[alanchan@server2 src]$ nc -lk 9999
n3,1
n3,5
n4,6
n4,8
n3,3
- 1
- 2
- 3
- 4
- 5
- 6
4、查看应用程序控制台输出
2、滑动窗口实现统计地铁进站口人数
每分钟统计一次地铁进站每个入口人数,最近2分钟每个进站口的人数
lambda实现方式不再赘述
1)、一般实现
- 实现
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
* @author alanchan
* 基于滑动窗口的入门示例
* 每分钟统计一次地铁进站每个入口人数,最近2分钟每个进站口的人数
* size>slide
*
*/
public class SlidingProcessingTimeWindowsDemo1 {
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// source
// nc
// 数据结构: 入口编号,人数
// 12,50
// 11,28
DataStream<String> lines = env.socketTextStream("192.168.10.42", 9999);
// transformation
DataStream<Tuple2<String, Integer>> subwayExit = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String line) throws Exception {
String[] arr = line.split(",");
return Tuple2.of(arr[0], Integer.parseInt(arr[1]));
}
});
// 按照地铁口分组
KeyedStream<Tuple2<String, Integer>, String> keyedDS = subwayExit.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
});
SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyedDS.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).sum(1);
// sink
result.print();
// execute
env.execute();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 验证步骤
1、启动nc
nc -lk 9999
- 1
2、启动应用程序
3、nc控制台输入
[alanchan@server2 src]$ nc -lk 9999
1,2
1,3
1,4
2,3
2,4,
1,2
1,3
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
4、查看应用程序控制台输出
通过验证发现输出数据与预期一致
7> (1,5)
4> (2,3)
7> (1,9)
4> (2,7)
7> (1,6)
4> (2,4)
7> (1,5)
7> (1,3)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
2)、面向对象实现
- 实现
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
* @author alanchan
*
*/
public class SlidingProcessingTimeWindowsDemo2 {
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// source
// nc
// 数据结构: 入口编号,人数
// 12,50
// 11,28
DataStream<String> lines = env.socketTextStream("192.168.10.42", 9999);
// transformation
DataStream<Subway> subwayExit = lines.map(new MapFunction<String, Subway>() {
@Override
public Subway map(String line) throws Exception {
String[] arr = line.split(",");
return new Subway(arr[0], Integer.parseInt(arr[1]));
}
});
// 按照地铁口分组
KeyedStream<Subway, String> keyedDS = subwayExit.keyBy(new KeySelector<Subway, String>() {
@Override
public String getKey(Subway value) throws Exception {
return value.getNo();
}
});
SingleOutputStreamOperator<Subway> result = keyedDS.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).sum("userCount");
// sink
result.print();
// execute
env.execute();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 验证步骤
1、启动nc
nc -lk 9999
- 1
2、启动应用程序
3、nc控制台输入
[alanchan@server2 src]$ nc -lk 9999
2,2
3,3
2,4
2,5
3,5
4,5
3,5
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
4、查看应用程序控制台输出
通过查看输出结果与预期一致。
5> Subway(No=3, userCount=3)
4> Subway(No=2, userCount=2)
4> Subway(No=2, userCount=6)
5> Subway(No=3, userCount=3)
1> Subway(No=4, userCount=5)
5> Subway(No=3, userCount=5)
4> Subway(No=2, userCount=9)
5> Subway(No=3, userCount=10)
4> Subway(No=2, userCount=5)
1> Subway(No=4, userCount=5)
5> Subway(No=3, userCount=5)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
三、示例2:基于数量的滚动窗口与滑动窗口
1、滚动窗口实现地铁进站口人数
实现统计在最近5条消息中,各自进站口通过的人数数量,相同的key每出现5次进行统计
本示例仅以面向对象方式实现,一般实现在具体的开发中视情况而定。
1)、实现
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author alanchan
* 统计在最近5条消息中,各自进站口通过的人数数量,相同的key每出现5次进行统计
*/
public class TumblingCountWindowDemo {
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// source
// nc
// 数据结构: 入口编号,人数
// 12,50
// 11,28
DataStream<String> lines = env.socketTextStream("192.168.10.42", 9999);
// transformation
SingleOutputStreamOperator<Subway> subwayDS = lines.map(new MapFunction<String, Subway>() {
@Override
public Subway map(String value) throws Exception {
String[] arr = value.split(",");
return new Subway(arr[0], Integer.parseInt(arr[1]));
}
});
// KeyedStream keyedDS = subwayDS.keyBy(Subway::getNo);
KeyedStream<Subway, String> keyedDS = subwayDS.keyBy(new KeySelector<Subway, String>() {
@Override
public String getKey(Subway value) throws Exception {
return value.getNo();
}
});
// 统计在最近5条消息中,各自进站口通过的人数数量,相同的key每出现5次进行统计
SingleOutputStreamOperator<Subway> result1 = keyedDS.countWindow(5).sum("userCount");
// sink
result1.print();
// execute
env.execute();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
2)、验证步骤
1、启动nc
nc -lk 9999
- 1
2、启动应用程序
3、nc控制台输入
[alanchan@server2 src]$ nc -lk 9999
1,5
1,6
1,2
1,4
2,4
3,6
23,11
1,8
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
4、查看应用程序控制台输出
通过查看输出结果与预期一致。
2、滑动窗口实现地铁进站口人数
统计在最近5条消息中,各自进站口通过的人数数量,相同的key每出现3次进行统计
本示例仅以面向对象方式实现,一般实现在具体的开发中视情况而定。
1)、实现
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author alanchan
*
*/
public class SlidingCountWindowDemo {
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// source
// nc
// 数据结构: 入口编号,人数
// 12,50
// 11,28
DataStream<String> lines = env.socketTextStream("192.168.10.42", 9999);
// transformation
SingleOutputStreamOperator<Subway> subwayDS = lines.map(new MapFunction<String, Subway>() {
@Override
public Subway map(String value) throws Exception {
String[] arr = value.split(",");
return new Subway(arr[0], Integer.parseInt(arr[1]));
}
});
// KeyedStream keyedDS = subwayDS.keyBy(Subway::getNo);
KeyedStream<Subway, String> keyedDS = subwayDS.keyBy(new KeySelector<Subway, String>() {
@Override
public String getKey(Subway value) throws Exception {
return value.getNo();
}
});
// 统计在最近5条消息中,各自进站口通过的人数数量,相同的key每出现3次进行统计
// public WindowedStream countWindow(long size, long slide) {
// return window(GlobalWindows.create())
// .evictor(CountEvictor.of(size))
// .trigger(CountTrigger.of(slide));
// }
SingleOutputStreamOperator<Subway> result1 = keyedDS.countWindow(5, 3).sum("userCount");
// sink
result1.print();
// execute
env.execute();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
2)、验证步骤
1、启动nc
nc -lk 9999
- 1
2、启动应用程序
3、nc控制台输入
[alanchan@server2 src]$ nc -lk 9999
1,2
1,3
2,3
3,4
1,2
- 1
- 2
- 3
- 4
- 5
- 6
- 7
4、查看应用程序控制台输出
通过查看输出结果与预期一致。
四、示例3:会话窗口
实现设置会话超时时间为10s,如果上一个窗口有数据,10s内没有数据则触发上个窗口的计算。
1、实现
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
/**
* @author alanchan
*
*/
public class TimeSessionWindowsDemo {
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// source
DataStream<String> lines = env.socketTextStream("192.168.10.42", 9999);
// transformation
SingleOutputStreamOperator<Subway> carDS = lines.map(new MapFunction<String, Subway>() {
@Override
public Subway map(String value) throws Exception {
String[] arr = value.split(",");
return new Subway(arr[0], Integer.parseInt(arr[1]));
}
});
//KeyedStream keyedDS = subwayDS.keyBy(Subway::getNo);
KeyedStream<Subway, String> keyedDS = carDS.keyBy(new KeySelector<Subway, String>() {
@Override
public String getKey(Subway value) throws Exception {
return value.getNo();
}
});
// 设置会话超时时间为10s,10s内没有数据则触发上个窗口的计算,如果上一个窗口有数据
SingleOutputStreamOperator<Subway> result = keyedDS.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))).sum("userCount");
// sink
result.print();
// execute
env.execute();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
2、验证步骤
1、启动nc
nc -lk 9999
- 1
2、启动应用程序
3、nc控制台输入
[alanchan@server2 src]$ nc -lk 9999
1,2
1,3
2,3
3,4
1,2
- 1
- 2
- 3
- 4
- 5
- 6
- 7
4、查看应用程序控制台输出
通过查看输出结果与预期一致。
以上,本文介绍了三个大类的示例,即基于时间和基于数量的滚动窗口与滑动窗口、会话窗口,其中包含详细的验证步骤与验证结果。
评论记录:
回复评论: