首页 最新 热门 推荐

  • 首页
  • 最新
  • 热门
  • 推荐

零基础入门Flink,掌握基本使用方法

  • 25-04-25 02:01
  • 3630
  • 8180
blog.csdn.net

Flink基本概念

首先来讲,Flink是一个面向数据流处理和批处理的分布式开源计算框架。

那么,流处理和批处理分别处理什么样的数据呢,这就涉及两个概念-无界流和有界流

无界流VS有界流

任何类型的数据都可以形成流数据,比如用户交互记录, 传感器数据,事件日志等等。

Apache Flink 擅长处理无界和有界数据集。 精确的时间控制和有状态的计算,使得 Flink能够运行
任何处理无界流的应用

流数据分为无界流和有界流。

  • 1) 无界流:有定义流的开始,但没有定义流的结束, 会不停地产生数据,无界流采用的是流处理方式。
  • 2) 有界流:有定义流的开始, 也有定义流的结束, 需要在获取所有数据后再进行计算,有界流采用的是批处理方式。

组件结构

DataSet 一般用来处理有界流数据。
DataStream一般用来处理无界流数据。

Flink基础Demo案例

1、基本环境搭建

pom.xml核心配置

  1. <properties>
  2. <project.build.sourceEncoding>UTF-8project.build.sourceEncoding>
  3. <flink.version>1.11.2flink.version>
  4. <java.version>1.8java.version>
  5. <scala.binary.version>2.11scala.binary.version>
  6. <maven.compiler.source>${java.version}maven.compiler.source>
  7. <maven.compiler.target>${java.version}maven.compiler.target>
  8. <log4j.version>2.12.1log4j.version>
  9. <spring.boot.version>2.1.6.RELEASEspring.boot.version>
  10. <mysql.jdbc.version>5.1.47mysql.jdbc.version>
  11. properties>
  12. <dependencies>
  13. <dependency>
  14. <groupId>org.apache.flinkgroupId>
  15. <artifactId>flink-walkthrough-common_${scala.binary.version}artifactId>
  16. <version>${flink.version}version>
  17. dependency>
  18. <dependency>
  19. <groupId>org.apache.flinkgroupId>
  20. <artifactId>flink-streaming-java_${scala.binary.version}artifactId>
  21. <version>${flink.version}version>
  22. dependency>
  23. <dependency>
  24. <groupId>org.apache.flinkgroupId>
  25. <artifactId>flink-clients_${scala.binary.version}artifactId>
  26. <version>${flink.version}version>
  27. dependency>
  28. <dependency>
  29. <groupId>org.projectlombokgroupId>
  30. <artifactId>lombokartifactId>
  31. <version>1.18.8version>
  32. dependency>
  33. dependencies>

2、批处理Demo实现

这个demo实现-----通过批处理方式,统计日志文件中的异常数量。

文件准备order_info.log,文件内容如下

  1. 2019-08-25 16:32:55,626 [main] INFO [o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker] PostProcessorRegistrationDelegate.java:330 - Bean 'org.springframework.cloud.autoconfigure.ConfigurationPropertiesRebinderAutoConfiguration' of type [org.springframework.cloud.autoconfigure.ConfigurationPropertiesRebinderAutoConfiguration$$EnhancerBySpringCGLIB$$e1b53a9c] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
  2. 2019-08-25 16:32:56,918 [main] INFO [c.i.t.s.pending.startup.StockPendingApplication] SpringApplication.java:650 - The following profiles are active: dev
  3. 2019-08-25 16:32:57,829 [main] INFO [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:244 - Multiple Spring Data modules found, entering strict repository configuration mode!
  4. 2019-08-25 16:32:57,834 [main] INFO [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:126 - Bootstrapping Spring Data repositories in DEFAULT mode.
  5. 2019-08-25 16:32:57,847 [main] INFO [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:182 - Finished Spring Data repository scanning in 5ms. Found 0 repository interfaces.
  6. 2019-08-25 16:32:57,858 [main] INFO [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:244 - Multiple Spring Data modules found, entering strict repository configuration mode!
  7. 2019-08-25 16:32:57,859 [main] INFO [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:126 - Bootstrapping Spring Data repositories in DEFAULT mode.
  8. 2019-08-25 16:32:57,870 [main] INFO [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:182 - Finished Spring Data repository scanning in 2ms. Found 0 repository interfaces.
  9. 2019-08-25 16:32:57,908 [main] WARN [o.springframework.boot.actuate.endpoint.EndpointId] EndpointId.java:131 - Endpoint ID 'nacos-discovery' contains invalid characters, please migrate to a valid format.
  10. 2019-08-25 16:32:57,928 [main] WARN [o.springframework.boot.actuate.endpoint.EndpointId] EndpointId.java:131 - Endpoint ID 'service-registry' contains invalid characters, please migrate to a valid format.
  11. 2019-08-25 16:32:58,144 [main] INFO [o.springframework.cloud.context.scope.GenericScope] GenericScope.java:295 - BeanFactory id=62078519-08a5-3bd1-9959-a8e32b7d3ccd
  12. 2019-08-25 16:32:58,155 [main] INFO [o.s.i.c.DefaultConfiguringBeanFactoryPostProcessor] DefaultConfiguringBeanFactoryPostProcessor.java:193 - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
  13. 2019-08-25 16:32:58,162 [main] INFO [o.s.i.c.DefaultConfiguringBeanFactoryPostProcessor] DefaultConfiguringBeanFactoryPostProcessor.java:280 - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
  14. 2019-08-25 16:32:58,176 [main] INFO [o.s.i.c.DefaultConfiguringBeanFactoryPostProcessor] DefaultConfiguringBeanFactoryPostProcessor.java:431 - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
  15. 2019-08-25 16:32:58,212 [main] INFO [o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker] PostProcessorRegistrationDelegate.java:330 - Bean 'seataConfiguration' of type [com.itcast.trade.stock.pending.config.SeataConfiguration$$EnhancerBySpringCGLIB$$1c7426ba] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
  16. 2019-08-25 16:32:58,267 [configOperate_1_2] WARN [io.seata.config.FileConfiguration] FileConfiguration.java:207 - Could not found property config.type, try to use default value instead.
  17. 2019-08-25 16:32:58,269 [main] WARN [o.s.b.w.s.c.AnnotationConfigServletWebServerApplicationContext] AbstractApplicationContext.java:557 - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'globalTransactionScanner' defined in class path resource [com/itcast/trade/stock/pending/config/SeataConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [io.seata.spring.annotation.GlobalTransactionScanner]: Factory method 'globalTransactionScanner' threw exception; nested exception is io.seata.common.exception.NotSupportYetException: not support register type: null
  18. 2019-08-25 16:32:58,280 [main] INFO [o.s.b.a.l.ConditionEvaluationReportLoggingListener] ConditionEvaluationReportLoggingListener.java:135 -
  19. Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
  20. 2019-08-25 16:32:58,289 [main] ERROR [org.springframework.boot.SpringApplication] SpringApplication.java:821 - Application run failed
  21. org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'globalTransactionScanner' defined in class path resource [com/itcast/trade/stock/pending/config/SeataConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [io.seata.spring.annotation.GlobalTransactionScanner]: Factory method 'globalTransactionScanner' threw exception; nested exception is io.seata.common.exception.NotSupportYetException: not support register type: null
  22. at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:627)
  23. at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:456)
  24. at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1321)
  25. at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1160)
  26. at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:555)
  27. at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:515)
  28. at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:320)
  29. at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:222)
  30. at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:318)
  31. at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:204)
  32. at org.springframework.context.support.PostProcessorRegistrationDelegate.registerBeanPostProcessors(PostProcessorRegistrationDelegate.java:228)
  33. at org.springframework.context.support.AbstractApplicationContext.registerBeanPostProcessors(AbstractApplicationContext.java:721)
  34. at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:534)
  35. at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:140)
  36. at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:742)
  37. at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:389)
  38. at org.springframework.boot.SpringApplication.run(SpringApplication.java:311)
  39. at org.springframework.boot.SpringApplication.run(SpringApplication.java:1213)
  40. at org.springframework.boot.SpringApplication.run(SpringApplication.java:1202)
  41. at com.itcast.trade.stock.pending.startup.StockPendingApplication.main(StockPendingApplication.java:23)
  42. Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [io.seata.spring.annotation.GlobalTransactionScanner]: Factory method 'globalTransactionScanner' threw exception; nested exception is io.seata.common.exception.NotSupportYetException: not support register type: null
  43. at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:185)
  44. at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:622)
  45. ... 19 common frames omitted
  46. Caused by: io.seata.common.exception.NotSupportYetException: not support register type: null
  47. at io.seata.config.ConfigurationFactory.buildConfiguration(ConfigurationFactory.java:84)
  48. at io.seata.config.ConfigurationFactory.getInstance(ConfigurationFactory.java:68)
  49. at io.seata.spring.annotation.GlobalTransactionScanner.<init>(GlobalTransactionScanner.java:81)
  50. at io.seata.spring.annotation.GlobalTransactionScanner.<init>(GlobalTransactionScanner.java:124)
  51. at io.seata.spring.annotation.GlobalTransactionScanner.<init>(GlobalTransactionScanner.java:113)
  52. at com.itcast.trade.stock.pending.config.SeataConfiguration.globalTransactionScanner(SeataConfiguration.java:38)
  53. at com.itcast.trade.stock.pending.config.SeataConfiguration$$EnhancerBySpringCGLIB$$1c7426ba.CGLIB$globalTransactionScanner$0(<generated>)
  54. at com.itcast.trade.stock.pending.config.SeataConfiguration$$EnhancerBySpringCGLIB$$1c7426ba$$FastClassBySpringCGLIB$$ec5dcab5.invoke(<generated>)
  55. at org.springframework.cglib.proxy.MethodProxy.invokeSuper(MethodProxy.java:244)
  56. at org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.intercept(ConfigurationClassEnhancer.java:363)
  57. at com.itcast.trade.stock.pending.config.SeataConfiguration$$EnhancerBySpringCGLIB$$1c7426ba.globalTransactionScanner(<generated>)
  58. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  59. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  60. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  61. at java.lang.reflect.Method.invoke(Method.java:498)
  62. at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:154)
  63. ... 20 common frames omitted
  64. Caused by: java.lang.IllegalArgumentException: illegal type:null
  65. at io.seata.config.ConfigType.getType(ConfigType.java:62)
  66. at io.seata.config.ConfigurationFactory.buildConfiguration(ConfigurationFactory.java:82)
  67. ... 35 common frames omitted
  68. 2019-08-25 16:36:05,248 [main] INFO [o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker] PostProcessorRegistrationDelegate.java:330 - Bean 'org.springframework.cloud.autoconfigure.ConfigurationPropertiesRebinderAutoConfiguration' of type [org.springframework.cloud.autoconfigure.ConfigurationPropertiesRebinderAutoConfiguration$$EnhancerBySpringCGLIB$$ed668c12] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
  69. 2019-08-25 16:36:06,559 [main] INFO [c.i.t.s.pending.startup.StockPendingApplication] SpringApplication.java:650 - The following profiles are active: dev
  70. 2019-08-25 16:36:07,554 [main] INFO [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:244 - Multiple Spring Data modules found, entering strict repository configuration mode!
  71. 2019-08-25 16:36:07,555 [main] INFO [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:126 - Bootstrapping Spring Data repositories in DEFAULT mode.
  72. 2019-08-25 16:36:07,568 [main] INFO [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:182 - Finished Spring Data repository scanning in 6ms. Found 0 repository interfaces.
  73. 2019-08-25 16:36:07,581 [main] INFO [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:244 - Multiple Spring Data modules found, entering strict repository configuration mode!
  74. 2019-08-25 16:36:07,583 [main] INFO [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:126 - Bootstrapping Spring Data repositories in DEFAULT mode.
  75. 2019-08-25 16:36:07,595 [main] INFO [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:182 - Finished Spring Data repository scanning in 2ms. Found 0 repository interfaces.
  76. 2019-08-25 16:36:07,639 [main] WARN [o.springframework.boot.actuate.endpoint.EndpointId] EndpointId.java:131 - Endpoint ID 'nacos-discovery' contains invalid characters, please migrate to a valid format.
  77. 2019-08-25 16:36:07,661 [main] WARN [o.springframework.boot.actuate.endpoint.EndpointId] EndpointId.java:131 - Endpoint ID 'service-registry' contains invalid characters, please migrate to a valid format.
  78. 2019-08-25 16:36:07,919 [main] INFO [o.springframework.cloud.context.scope.GenericScope] GenericScope.java:295 - BeanFactory id=62078519-08a5-3bd1-9959-a8e32b7d3ccd
  79. 2019-08-25 16:36:07,930 [main] INFO [o.s.i.c.DefaultConfiguringBeanFactoryPostProcessor] DefaultConfiguringBeanFactoryPostProcessor.java:193 - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
  80. 2019-08-25 16:36:07,937 [main] INFO [o.s.i.c.DefaultConfiguringBeanFactoryPostProcessor] DefaultConfiguringBeanFactoryPostProcessor.java:280 - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
  81. 2019-08-25 16:36:07,944 [main] INFO [o.s.i.c.DefaultConfiguringBeanFactoryPostProcessor] DefaultConfiguringBeanFactoryPostProcessor.java:431 - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
  82. 2019-08-25 16:36:07,987 [main] INFO [o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker] PostProcessorRegistrationDelegate.java:330 - Bean 'seataConfiguration' of type [com.itcast.trade.stock.pending.config.SeataConfiguration$$EnhancerBySpringCGLIB$$28257830] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
  83. 2019-08-25 16:36:10,247 [configOperate_1_2] WARN [io.seata.config.FileConfiguration] FileConfiguration.java:207 - Could not found property config.type, try to use default value instead.
  84. 2019-08-25 16:36:14,137 [main] WARN [o.s.b.w.s.c.AnnotationConfigServletWebServerApplicationContext] AbstractApplicationContext.java:557 - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'globalTransactionScanner' defined in class path resource [com/itcast/trade/stock/pending/config/SeataConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [io.seata.spring.annotation.GlobalTransactionScanner]: Factory method 'globalTransactionScanner' threw exception; nested exception is io.seata.common.exception.NotSupportYetException: not support register type: null
  85. 2019-08-25 16:36:14,150 [main] INFO [o.s.b.a.l.ConditionEvaluationReportLoggingListener] ConditionEvaluationReportLoggingListener.java:135 -
  86. Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
  87. 2019-08-25 16:36:14,161 [main] ERROR [org.springframework.boot.SpringApplication] SpringApplication.java:821 - Application run failed
  88. org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'globalTransactionScanner' defined in class path resource [com/itcast/trade/stock/pending/config/SeataConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [io.seata.spring.annotation.GlobalTransactionScanner]: Factory method 'globalTransactionScanner' threw exception; nested exception is io.seata.common.exception.NotSupportYetException: not support register type: null
  89. at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:627)
  90. at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:456)
  91. at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1321)
  92. at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1160)
  93. at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:555)
  94. at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:515)
  95. at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:320)
  96. at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:222)
  97. at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:318)
  98. at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:204)
  99. at org.springframework.context.support.PostProcessorRegistrationDelegate.registerBeanPostProcessors(PostProcessorRegistrationDelegate.java:228)
  100. at org.springframework.context.support.AbstractApplicationContext.registerBeanPostProcessors(AbstractApplicationContext.java:721)
  101. at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:534)
  102. at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:140)
  103. at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:742)
  104. at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:389)
  105. at org.springframework.boot.SpringApplication.run(SpringApplication.java:311)
  106. at org.springframework.boot.SpringApplication.run(SpringApplication.java:1213)
  107. at org.springframework.boot.SpringApplication.run(SpringApplication.java:1202)
  108. at com.itcast.trade.stock.pending.startup.StockPendingApplication.main(StockPendingApplication.java:23)
  109. Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [io.seata.spring.annotation.GlobalTransactionScanner]: Factory method 'globalTransactionScanner' threw exception; nested exception is io.seata.common.exception.NotSupportYetException: not support register type: null
  110. at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:185)
  111. at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:622)
  112. ... 19 common frames omitted
  113. Caused by: io.seata.common.exception.NotSupportYetException: not support register type: null
  114. at io.seata.config.ConfigurationFactory.buildConfiguration(ConfigurationFactory.java:84)
  115. at io.seata.config.ConfigurationFactory.getInstance(ConfigurationFactory.java:68)
  116. at io.seata.spring.annotation.GlobalTransactionScanner.<init>(GlobalTransactionScanner.java:81)
  117. at io.seata.spring.annotation.GlobalTransactionScanner.<init>(GlobalTransactionScanner.java:124)
  118. at io.seata.spring.annotation.GlobalTransactionScanner.<init>(GlobalTransactionScanner.java:113)
  119. at com.itcast.trade.stock.pending.config.SeataConfiguration.globalTransactionScanner(SeataConfiguration.java:38)
  120. at com.itcast.trade.stock.pending.config.SeataConfiguration$$EnhancerBySpringCGLIB$$28257830.CGLIB$globalTransactionScanner$0(<generated>)
  121. at com.itcast.trade.stock.pending.config.SeataConfiguration$$EnhancerBySpringCGLIB$$28257830$$FastClassBySpringCGLIB$$691b278a.invoke(<generated>)
  122. at org.springframework.cglib.proxy.MethodProxy.invokeSuper(MethodProxy.java:244)
  123. at org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.intercept(ConfigurationClassEnhancer.java:363)
  124. at com.itcast.trade.stock.pending.config.SeataConfiguration$$EnhancerBySpringCGLIB$$28257830.globalTransactionScanner(<generated>)
  125. at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  126. at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  127. at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  128. at java.lang.reflect.Method.invoke(Method.java:498)
  129. at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:154)
  130. ... 20 common frames omitted
  131. Caused by: java.lang.IllegalArgumentException: illegal type:null
  132. at io.seata.config.ConfigType.getType(ConfigType.java:62)
  133. at io.seata.config.ConfigurationFactory.buildConfiguration(ConfigurationFactory.java:82)
  134. ... 35 common frames omitted
  135. 2019-08-25 16:36:21,421 [main] INFO [o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker] PostProcessorRegistrationDelegate.java:330 - Bean

 代码实现

  1. public class BatchProcessorApplication {
  2. public static void main(String[] args) throws Exception{
  3. //1.定义Flink运行环境
  4. ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
  5. //2.读取数据源(日志文件信息)
  6. DataSource logData = env.readTextFile("./data/order_info.log");
  7. //3.清洗转换数据
  8. logData.flatMap(new FlatMapFunction>() {
  9. @Override
  10. public void flatMap(String value, Collector> collector) throws Exception {
  11. // 1) 根据正则,提取每行日志的级别
  12. Pattern pattern = Pattern.compile("\\[main\\](.*?)\\[");
  13. Matcher matcher = pattern.matcher(value);
  14. if (matcher.find()){
  15. // 2) 如果匹配符合规则,放置元组,输出数据
  16. collector.collect(new Tuple2<>(matcher.group(1).trim(),1));
  17. }
  18. }
  19. //groupBy(0)代表对collector中每个tuple2的进行分组,sum(1)代表对tuple2中的Integer进行求和
  20. }).groupBy(0).sum(1).print();
  21. }
  22. }

3、流处理Demo实现

本地模拟socket请求

代码实现

  1. public class StreamProcessorApplication {
  2. public static void main(String[] args) throws Exception{
  3. //1.定义Flink执行环境
  4. StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
  5. //2.从socket数据流中读取实时流数据
  6. DataStreamSource dataStreamSource = streamEnv.socketTextStream("127.0.0.1", 9999);
  7. dataStreamSource.flatMap(new FlatMapFunction>() {
  8. @Override
  9. public void flatMap(String line, Collector> collector) throws Exception {
  10. String[] split = line.split("\t");
  11. collector.collect(new Tuple2<>(split[0],1));
  12. }
  13. // setParallelism设置并行流计算有多少个线程
  14. }).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print().setParallelism(1);
  15. streamEnv.execute();
  16. }
  17. }

Flink部署安装配置

首先确保你的Linux系统已经安装好了JDK

解压flink安装包

tar -zxvf flink-1.11.2-bin-scala_2.11.tgz

进入flink配置目录

masters 文件用于指定 Flink 集群中的主节点(JobManager)地址。它帮助集群中的各个节点知道主节点的位置,从而能够正确地连接和通信。

 flink-conf.yaml 文件是 Flink 的主要配置文件,用于配置 Flink 集群的各种参数。这个文件包含了丰富的配置选项,涵盖了从基本的集群设置到高级的性能调优。

其中的常见配置:

  • jobmanager.rpc.address:

    • 描述: 指定 JobManager 的主机名或IP地址。
    • 示例: jobmanager.rpc.address: localhost
  • jobmanager.rpc.port:

    • 描述: 指定 JobManager 的RPC端口号。
    • 示例: jobmanager.rpc.port: 6123
  • jobmanager.memory.process.size:

    • 描述: 指定 JobManager 的总内存大小。
    • 示例: jobmanager.memory.process.size: 1600m
  • taskmanager.memory.process.size:

    • 描述: 指定 TaskManager 的总内存大小。
    • 示例: taskmanager.memory.process.size: 1600m
  • taskmanager.numberOfTaskSlots:

    • 描述: 指定每个 TaskManager 的任务槽位数。
    • 示例: taskmanager.numberOfTaskSlots: 4
  • parallelism.default:

    • 描述: 指定默认的并行度。
    • 示例: parallelism.default: 4
  • state.backend:

    • 描述: 指定状态后端,可以选择 MemoryStateBackend、FsStateBackend 或 RocksDBStateBackend。
    • 示例: state.backend: rocksdb
  • rest.address:

    • 描述: 指定 REST API 的主机名或IP地址。
    • 示例: rest.address: localhost
  • rest.port:

    • 描述: 指定 REST API 的端口号。
    • 示例: rest.port: 8081

启动flink,进入到bin目录

 

 

访问8081端口

到此flink安装完毕 

Flink任务提交

在pom.xml文件中配置打包插件

  1. <build>
  2. <plugins>
  3. <plugin>
  4. <groupId>org.apache.maven.pluginsgroupId>
  5. <artifactId>maven-compiler-pluginartifactId>
  6. <version>3.5.1version>
  7. <configuration>
  8. <source>1.8source>
  9. <target>1.8target>
  10. configuration>
  11. plugin>
  12. <plugin>
  13. <groupId>org.apache.maven.pluginsgroupId>
  14. <artifactId>maven-shade-pluginartifactId>
  15. <version>2.3version>
  16. <executions>
  17. <execution>
  18. <phase>packagephase>
  19. <goals>
  20. <goal>shadegoal>
  21. goals>
  22. <configuration>
  23. <filters>
  24. <filter>
  25. <artifact>*:*artifact>
  26. <excludes>
  27. <exclude>META-INF/*.SFexclude>
  28. <exclude>META-INF/*.DSAexclude>
  29. <exclude>META-INF/*.RSAexclude>
  30. excludes>
  31. filter>
  32. filters>
  33. <transformers>
  34. <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
  35. <mainClass>com.demo.flink.usage.stream.StreamProcessorApplicationmainClass>
  36. transformer>
  37. transformers>
  38. configuration>
  39. execution>
  40. executions>
  41. plugin>
  42. plugins>
  43. build>

点击package打成jar包

界面提交

上传jar包,信息无误后,点击提交

在这里就能看到输出结果了

命令行提交

上传jar包至linux

进入flink的bin目录

执行命令

./flink run -c com.demo.flink.usage.stream.StreamProcessorApplication /usr/local/flink-usage-1.0-SNAPSHOT.jar

 

注:本文转载自blog.csdn.net的Java 第一深情的文章"https://blog.csdn.net/qq_46248151/article/details/143885601"。版权归原作者所有,此博客不拥有其著作权,亦不承担相应法律责任。如有侵权,请联系我们删除。
复制链接
复制链接
相关推荐
发表评论
登录后才能发表评论和回复 注册

/ 登录

评论记录:

未查询到任何数据!
回复评论:

分类栏目

后端 (14832) 前端 (14280) 移动开发 (3760) 编程语言 (3851) Java (3904) Python (3298) 人工智能 (10119) AIGC (2810) 大数据 (3499) 数据库 (3945) 数据结构与算法 (3757) 音视频 (2669) 云原生 (3145) 云平台 (2965) 前沿技术 (2993) 开源 (2160) 小程序 (2860) 运维 (2533) 服务器 (2698) 操作系统 (2325) 硬件开发 (2492) 嵌入式 (2955) 微软技术 (2769) 软件工程 (2056) 测试 (2865) 网络空间安全 (2948) 网络与通信 (2797) 用户体验设计 (2592) 学习和成长 (2593) 搜索 (2744) 开发工具 (7108) 游戏 (2829) HarmonyOS (2935) 区块链 (2782) 数学 (3112) 3C硬件 (2759) 资讯 (2909) Android (4709) iOS (1850) 代码人生 (3043) 阅读 (2841)

热门文章

111
大数据
关于我们 隐私政策 免责声明 联系我们
Copyright © 2020-2024 蚁人论坛 (iYenn.com) All Rights Reserved.
Scroll to Top