首页 最新 热门 推荐

  • 首页
  • 最新
  • 热门
  • 推荐

6、Flink四大基石之Window详解与详细示例(二)Flink 系列文章二、示例1:基于时间的滚动和滑动窗口三、示例2:基于数量的滚动窗口与滑动窗口四、示例3:会话窗口

  • 23-09-04 15:56
  • 3319
  • 5685
blog.csdn.net

Flink 系列文章

1、Flink1.12.7或1.13.5详细介绍及本地安装部署、验证
2、Flink1.13.5二种部署方式(Standalone、Standalone HA )、四种提交任务方式(前两种及session和per-job)验证详细步骤
3、flink重要概念(api分层、角色、执行流程、执行图和编程模型)及dataset、datastream详细示例入门和提交任务至on yarn运行
4、介绍Flink的流批一体、transformations的18种算子详细介绍、Flink与Kafka的source、sink介绍
5、Flink 的 source、transformations、sink的详细示例(一)
5、Flink的source、transformations、sink的详细示例(二)-source和transformation示例
5、Flink的source、transformations、sink的详细示例(三)-sink示例
6、Flink四大基石之Window详解与详细示例(一)
6、Flink四大基石之Window详解与详细示例(二)
7、Flink四大基石之Time和WaterMaker详解与详细示例(watermaker基本使用、kafka作为数据源的watermaker使用示例以及超出最大允许延迟数据的接收实现)
8、Flink四大基石之State概念、使用场景、持久化、批处理的详解与keyed state和operator state、broadcast state使用和详细示例
9、Flink四大基石之Checkpoint容错机制详解及示例(checkpoint配置、重启策略、手动恢复checkpoint和savepoint)
10、Flink的source、transformations、sink的详细示例(二)-source和transformation示例【补充示例】
11、Flink配置flink-conf.yaml详细说明(HA配置、checkpoint、web、安全、zookeeper、historyserver、workers、zoo.cfg)
12、Flink source和sink 的 clickhouse 详细示例

13、Flink 的table api和sql的介绍、示例等系列综合文章链接


文章目录

  • Flink 系列文章
  • 二、示例1:基于时间的滚动和滑动窗口
    • 1、滚动窗口实现统计地铁进站口人数
      • 1)、一般实现
      • 2)、面向对象实现
      • 3)、面向对象lambda实现
      • 4)、一般lambda实现
    • 2、滑动窗口实现统计地铁进站口人数
      • 1)、一般实现
      • 2)、面向对象实现
  • 三、示例2:基于数量的滚动窗口与滑动窗口
    • 1、滚动窗口实现地铁进站口人数
      • 1)、实现
      • 2)、验证步骤
    • 2、滑动窗口实现地铁进站口人数
      • 1)、实现
      • 2)、验证步骤
  • 四、示例3:会话窗口
    • 1、实现
    • 2、验证步骤


本文介绍了三个大类的示例,即基于时间和基于数量的滚动窗口与滑动窗口、会话窗口,其中包含详细的验证步骤与验证结果。
本文分为三个部分,即基于时间的滚动与滑动窗口、基于数量的滚动与滑动窗口和会话窗口。

二、示例1:基于时间的滚动和滑动窗口

1、滚动窗口实现统计地铁进站口人数

实现:每10s统计一次地铁进站每个入口人数,最近10s每个进站口的人数

1)、一般实现

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.api.java.functions.KeySelector;

/**
 * @author alanchan 
 * 基于滚动窗口的入门示例 
 * 每10s统计一次地铁进站每个入口人数,最近10s每个进站口的人数 
 * size=slide
 *
 */
public class TumblingTimeWindowsDemo1 {

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		// nc
		// 数据结构: 入口编号,人数
		// 12,50
		// 11,28
		DataStream<String> lines = env.socketTextStream("192.168.10.42", 9999);
		
		// transformation
		DataStream<Tuple2<String, Integer>> subwayExit = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {

			@Override
			public Tuple2<String, Integer> map(String line) throws Exception {
				String[] arr = line.split(",");

				return Tuple2.of(arr[0], Integer.parseInt(arr[1]));
			}
		});
		
		//按照地铁口分组
//		KeyedStream, String> keyedDS = subwayExit.keyBy(new KeySelector, String>() {
//			@Override
//			public String getKey(Tuple2 value) throws Exception {
//				return value.f0;
//			}
//		});
		//另外一种分组方式
		KeyedStream<Tuple2<String, Integer>, Tuple> keyedDS = subwayExit.keyBy(0);
				
		DataStream<Tuple2<String, Integer>> result1 = keyedDS.window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
				//另外一种聚合方式实现
//				.reduce(new ReduceFunction>() {
//
//					@Override
//					public Tuple2 reduce(Tuple2 value1, Tuple2 value2) throws Exception {
//
//						return Tuple2.of(value1.f0, value1.f1 + value2.f1);
//					}
//
//				});
				.sum(1);

		// sink
		result1.print();

		// execute
		env.execute();
	}

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 验证步骤
    1、启动nc
nc -lk 9999
  • 1

2、启动应用程序

3、nc控制台输入

[alanchan@server2 src]$ nc -lk 9999
no1,1
no2,1
no1,2
no1,3
no2,6

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

4、查看应用程序控制台输出
在这里插入图片描述

2)、面向对象实现

  • bean
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @author alanchan
 *
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class SubWay {
	// 地铁站进站口
	private String No;
	// 某一时段人数
	private Integer userCount;
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 实现

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * @author alanchan 
 * 基于滚动窗口的入门示例 
 * 每10s统计一次地铁进站每个入口人数,最近10s每个进站口的人数 
 * size=slide
 *
 */
public class TumblingTimeWindowsDemo2 {

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		// nc
		// 数据结构: 入口编号,人数
		// 12,50
		// 11,28
		DataStream<String> lines = env.socketTextStream("192.168.10.42", 9999);

		// transformation
		DataStream<Subway> subwayExit = lines.map(new MapFunction<String, Subway>() {

			@Override
			public Subway map(String line) throws Exception {
				String[] arr = line.split(",");

				return new Subway(arr[0], Integer.parseInt(arr[1]));
			}
		});

		// 按照地铁口分组
		KeyedStream<Subway, String> keyedDS = subwayExit.keyBy(new KeySelector<Subway, String>() {
			@Override
			public String getKey(Subway value) throws Exception {
				return value.getNo();
			}
		});
		
		//userCount是Subway的属性名称
		DataStream<Subway> result = keyedDS.window(TumblingProcessingTimeWindows.of(Time.seconds(10))).sum("userCount");
		// sink
		result.print();

		// execute
		env.execute();
	}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 验证步骤
    1、启动nc
nc -lk 9999
  • 1

2、启动应用程序

3、nc控制台输入

[alanchan@server2 src]$ nc -lk 9999
no,1
no2,1
no2,4
no1,2
  • 1
  • 2
  • 3
  • 4
  • 5

4、查看应用程序控制台输出
在这里插入图片描述

3)、面向对象lambda实现

import java.util.Arrays;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.flink.api.common.typeinfo.Types;

/**
 * @author alanchan
 *
 */
public class TumblingTimeWindowsDemo3 {

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		// nc
		// 数据结构: 入口编号,人数
		// 12,50
		// 11,28
		DataStream<String> lines = env.socketTextStream("192.168.10.42", 9999);

		// transformation
//		DataStream subwayExit = lines.map(new MapFunction() {
//
//			@Override
//			public Subway map(String line) throws Exception {
//				String[] arr = line.split(",");
//				return new Subway(arr[0], Integer.parseInt(arr[1]));
//			}
//		});
		DataStream<Subway> subwayExit = lines.map(new Splitter());

		// 按照地铁口分组
		KeyedStream<Subway, String> keyedDS = subwayExit.keyBy(Subway::getNo);
		// subwayExit.keyBy(subway->subway.getNo())

		DataStream<Subway> result = keyedDS.window(TumblingProcessingTimeWindows.of(Time.seconds(10))).sum("userCount");
		
		// sink
		result.print();

		// execute
		env.execute();
	}

	public static class Splitter implements MapFunction<String, Subway> {
		@Override
		public Subway map(String value) {
			String[] arr = value.split(",");
			return new Subway(arr[0], Integer.parseInt(arr[1]));
		}
	}
}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 验证步骤
    1、启动nc
nc -lk 9999
  • 1

2、启动应用程序

3、nc控制台输入

[alanchan@server2 src]$ nc -lk 9999
n1,2
n2,3
n1,4
n1,5

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

4、查看应用程序控制台输出
在这里插入图片描述

4)、一般lambda实现

  • 实现

import java.util.Arrays;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.flink.api.common.typeinfo.Types;

/**
 * @author alanchan
 *
 */
public class TumblingTimeWindowsDemo4 {

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		DataStream<Tuple2<String, Integer>> dataStream = env
                .socketTextStream("192.168.10.42", 9999)
                .map(new Splitter())
                .keyBy(value -> value.f0)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
                .sum(1);
		// sink
		dataStream.print();

		// execute
		env.execute();
	}

	public static class Splitter implements MapFunction<String, Tuple2<String, Integer>> {
		@Override
		public Tuple2<String, Integer> map(String value) {
			String[] arr = value.split(",");
			return new Tuple2(arr[0], Integer.parseInt(arr[1]));
		}
	}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 验证步骤
    1、启动nc
nc -lk 9999
  • 1

2、启动应用程序

3、nc控制台输入

[alanchan@server2 src]$ nc -lk 9999
n3,1
n3,5
n4,6
n4,8
n3,3
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

4、查看应用程序控制台输出
在这里插入图片描述

2、滑动窗口实现统计地铁进站口人数

每分钟统计一次地铁进站每个入口人数,最近2分钟每个进站口的人数
lambda实现方式不再赘述

1)、一般实现

  • 实现
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * @author alanchan 
 * 基于滑动窗口的入门示例 
 * 每分钟统计一次地铁进站每个入口人数,最近2分钟每个进站口的人数 
 * size>slide
 * 
 */
public class SlidingProcessingTimeWindowsDemo1 {

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		// nc
		// 数据结构: 入口编号,人数
		// 12,50
		// 11,28
		DataStream<String> lines = env.socketTextStream("192.168.10.42", 9999);

		// transformation
		DataStream<Tuple2<String, Integer>> subwayExit = lines.map(new MapFunction<String, Tuple2<String, Integer>>() {

			@Override
			public Tuple2<String, Integer> map(String line) throws Exception {
				String[] arr = line.split(",");
				return Tuple2.of(arr[0], Integer.parseInt(arr[1]));
			}
		});

		// 按照地铁口分组
		KeyedStream<Tuple2<String, Integer>, String> keyedDS = subwayExit.keyBy(new KeySelector<Tuple2<String, Integer>, String>() {
			@Override
			public String getKey(Tuple2<String, Integer> value) throws Exception {
				return value.f0;
			}
		});

		SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyedDS.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).sum(1);

		// sink
		result.print();

		// execute
		env.execute();
	}

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 验证步骤
    1、启动nc
nc -lk 9999
  • 1

2、启动应用程序

3、nc控制台输入

[alanchan@server2 src]$ nc -lk 9999
1,2
1,3
1,4
2,3
2,4,
1,2
1,3
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

4、查看应用程序控制台输出
通过验证发现输出数据与预期一致

7> (1,5)
4> (2,3)
7> (1,9)
4> (2,7)
7> (1,6)
4> (2,4)
7> (1,5)
7> (1,3)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

2)、面向对象实现

  • 实现
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * @author alanchan
 *
 */
public class SlidingProcessingTimeWindowsDemo2 {

	/**
	 * @param args
	 * @throws Exception 
	 */
	public static void main(String[] args) throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		// nc
		// 数据结构: 入口编号,人数
		// 12,50
		// 11,28
		DataStream<String> lines = env.socketTextStream("192.168.10.42", 9999);

		// transformation
		DataStream<Subway> subwayExit = lines.map(new MapFunction<String, Subway>() {

			@Override
			public Subway map(String line) throws Exception {
				String[] arr = line.split(",");
				return new Subway(arr[0], Integer.parseInt(arr[1]));
			}
		});

		// 按照地铁口分组
		KeyedStream<Subway, String> keyedDS = subwayExit.keyBy(new KeySelector<Subway, String>() {
			@Override
			public String getKey(Subway value) throws Exception {
				return value.getNo();
			}
		});

		SingleOutputStreamOperator<Subway> result = keyedDS.window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5))).sum("userCount");

		// sink
		result.print();

		// execute
		env.execute();

	}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 验证步骤
    1、启动nc
nc -lk 9999
  • 1

2、启动应用程序

3、nc控制台输入

[alanchan@server2 src]$ nc -lk 9999
2,2
3,3
2,4
2,5
3,5
4,5
3,5

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

4、查看应用程序控制台输出
通过查看输出结果与预期一致。

5> Subway(No=3, userCount=3)
4> Subway(No=2, userCount=2)
4> Subway(No=2, userCount=6)
5> Subway(No=3, userCount=3)
1> Subway(No=4, userCount=5)
5> Subway(No=3, userCount=5)
4> Subway(No=2, userCount=9)
5> Subway(No=3, userCount=10)
4> Subway(No=2, userCount=5)
1> Subway(No=4, userCount=5)
5> Subway(No=3, userCount=5)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

三、示例2:基于数量的滚动窗口与滑动窗口

1、滚动窗口实现地铁进站口人数

实现统计在最近5条消息中,各自进站口通过的人数数量,相同的key每出现5次进行统计
本示例仅以面向对象方式实现,一般实现在具体的开发中视情况而定。

1)、实现

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author alanchan 
 * 统计在最近5条消息中,各自进站口通过的人数数量,相同的key每出现5次进行统计
 */
public class TumblingCountWindowDemo {

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		// nc
		// 数据结构: 入口编号,人数
		// 12,50
		// 11,28
		DataStream<String> lines = env.socketTextStream("192.168.10.42", 9999);

		// transformation
		SingleOutputStreamOperator<Subway> subwayDS = lines.map(new MapFunction<String, Subway>() {
			@Override
			public Subway map(String value) throws Exception {
				String[] arr = value.split(",");
				return new Subway(arr[0], Integer.parseInt(arr[1]));
			}
		});

//		KeyedStream keyedDS = subwayDS.keyBy(Subway::getNo);

		KeyedStream<Subway, String> keyedDS = subwayDS.keyBy(new KeySelector<Subway, String>() {
			@Override
			public String getKey(Subway value) throws Exception {
				return value.getNo();
			}
		});

		// 统计在最近5条消息中,各自进站口通过的人数数量,相同的key每出现5次进行统计
		SingleOutputStreamOperator<Subway> result1 = keyedDS.countWindow(5).sum("userCount");
		
		// sink
		result1.print();
		
		// execute
		env.execute();

	}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61

2)、验证步骤

1、启动nc

nc -lk 9999
  • 1

2、启动应用程序

3、nc控制台输入

[alanchan@server2 src]$ nc -lk 9999
1,5
1,6
1,2
1,4
2,4
3,6
23,11
1,8
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

4、查看应用程序控制台输出
通过查看输出结果与预期一致。
在这里插入图片描述

2、滑动窗口实现地铁进站口人数

统计在最近5条消息中,各自进站口通过的人数数量,相同的key每出现3次进行统计
本示例仅以面向对象方式实现,一般实现在具体的开发中视情况而定。

1)、实现

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author alanchan
 *
 */
public class SlidingCountWindowDemo {

	/**
	 * @param args
	 * @throws Exception 
	 */
	public static void main(String[] args) throws Exception {

		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		// nc
		// 数据结构: 入口编号,人数
		// 12,50
		// 11,28
		DataStream<String> lines = env.socketTextStream("192.168.10.42", 9999);

		// transformation
		SingleOutputStreamOperator<Subway> subwayDS = lines.map(new MapFunction<String, Subway>() {
			@Override
			public Subway map(String value) throws Exception {
				String[] arr = value.split(",");
				return new Subway(arr[0], Integer.parseInt(arr[1]));
			}
		});

//				KeyedStream keyedDS = subwayDS.keyBy(Subway::getNo);

		KeyedStream<Subway, String> keyedDS = subwayDS.keyBy(new KeySelector<Subway, String>() {
			@Override
			public String getKey(Subway value) throws Exception {
				return value.getNo();
			}
		});

		// 统计在最近5条消息中,各自进站口通过的人数数量,相同的key每出现3次进行统计
//		public WindowedStream countWindow(long size, long slide) {
//			return window(GlobalWindows.create())
//					.evictor(CountEvictor.of(size))
//					.trigger(CountTrigger.of(slide));
//		}
		SingleOutputStreamOperator<Subway> result1 = keyedDS.countWindow(5, 3).sum("userCount");

		// sink
		result1.print();

		// execute
		env.execute();
	}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66

2)、验证步骤

1、启动nc

nc -lk 9999
  • 1

2、启动应用程序

3、nc控制台输入

[alanchan@server2 src]$ nc -lk 9999
1,2
1,3
2,3
3,4
1,2

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

4、查看应用程序控制台输出
通过查看输出结果与预期一致。
在这里插入图片描述

四、示例3:会话窗口

实现设置会话超时时间为10s,如果上一个窗口有数据,10s内没有数据则触发上个窗口的计算。

1、实现

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

/**
 * @author alanchan
 *
 */
public class TimeSessionWindowsDemo {

	/**
	 * @param args
	 * @throws Exception 
	 */
	public static void main(String[] args) throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		DataStream<String> lines = env.socketTextStream("192.168.10.42", 9999);

		// transformation
		SingleOutputStreamOperator<Subway> carDS = lines.map(new MapFunction<String, Subway>() {
			@Override
			public Subway map(String value) throws Exception {
				String[] arr = value.split(",");
				return new Subway(arr[0], Integer.parseInt(arr[1]));
			}
		});
		
		//KeyedStream keyedDS = subwayDS.keyBy(Subway::getNo);
		KeyedStream<Subway, String> keyedDS = carDS.keyBy(new KeySelector<Subway, String>() {
			@Override
			public String getKey(Subway value) throws Exception {
				return value.getNo();
			}
		});

		// 设置会话超时时间为10s,10s内没有数据则触发上个窗口的计算,如果上一个窗口有数据
		SingleOutputStreamOperator<Subway> result = keyedDS.window(ProcessingTimeSessionWindows.withGap(Time.seconds(10))).sum("userCount");
		
		// sink
		result.print();

		// execute
		env.execute();

	}

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58

2、验证步骤

1、启动nc

nc -lk 9999
  • 1

2、启动应用程序

3、nc控制台输入

[alanchan@server2 src]$ nc -lk 9999
1,2
1,3
2,3
3,4
1,2

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

4、查看应用程序控制台输出
通过查看输出结果与预期一致。
在这里插入图片描述

以上,本文介绍了三个大类的示例,即基于时间和基于数量的滚动窗口与滑动窗口、会话窗口,其中包含详细的验证步骤与验证结果。

注:本文转载自blog.csdn.net的一瓢一瓢的饮 alanchan的文章"https://blog.csdn.net/chenwewi520feng/article/details/131658457"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
复制链接
复制链接
相关推荐
发表评论
登录后才能发表评论和回复 注册

/ 登录

评论记录:

未查询到任何数据!
回复评论:

分类栏目

后端 (14832) 前端 (14280) 移动开发 (3760) 编程语言 (3851) Java (3904) Python (3298) 人工智能 (10119) AIGC (2810) 大数据 (3499) 数据库 (3945) 数据结构与算法 (3757) 音视频 (2669) 云原生 (3145) 云平台 (2965) 前沿技术 (2993) 开源 (2160) 小程序 (2860) 运维 (2533) 服务器 (2698) 操作系统 (2325) 硬件开发 (2492) 嵌入式 (2955) 微软技术 (2769) 软件工程 (2056) 测试 (2865) 网络空间安全 (2948) 网络与通信 (2797) 用户体验设计 (2592) 学习和成长 (2593) 搜索 (2744) 开发工具 (7108) 游戏 (2829) HarmonyOS (2935) 区块链 (2782) 数学 (3112) 3C硬件 (2759) 资讯 (2909) Android (4709) iOS (1850) 代码人生 (3043) 阅读 (2841)

热门文章

101
推荐
关于我们 隐私政策 免责声明 联系我们
Copyright © 2020-2025 蚁人论坛 (iYenn.com) All Rights Reserved.
Scroll to Top