介绍
概览
Apache Flume是一个分布式、可靠且可用的系统,用于高效地从多个不同源收集、聚合和移动大量日志数据到集中式数据存储。
Apache Flume的使用不仅限于日志数据聚合。由于数据源是可定制的,Flume可以用来传输包括但不限于网络流量数据、社交媒体生成的数据、电子邮件消息以及几乎任何可能的数据源的大量事件数据。
Apache Flume是Apache软件基金会的顶级项目。
系统要求
- Java运行时环境 - Java 1.8或更高版本
- 内存 - 足够的内存用于源、通道或接收器的配置
- 磁盘空间 - 足够的磁盘空间用于通道或接收器的配置
- 目录权限 - 代理使用的目录的读写权限
架构
数据流模型
Flume事件被定义为具有字节有效载荷和可选的一组字符串属性的数据流单元。Flume代理是一个(JVM)进程,它托管了事件从外部源流向下一个目的地(跳转)的组件。
Flume源通过外部源(如Web服务器)接收事件。外部源以Flume目标源识别的格式向Flume发送事件。例如,可以使用Avro Flume源从Avro客户端或其他在流中发送事件的Flume代理的Avro接收器接收Avro事件。可以使用Thrift Flume源以类似的方式从Thrift接收器或Flume Thrift Rpc客户端接收事件,这些客户端是用Flume thrift协议生成的任何语言编写的。当Flume源接收到事件时,它会将其存储到一个或多个通道中。通道是一个被动存储,它保留事件直到被Flume接收器消费。文件通道是一个例子 - 它由本地文件系统支持。接收器从通道中移除事件并将其放入外部存储库,如HDFS(通过Flume HDFS接收器)或转发给下一个Flume代理(下一个跳转)的Flume源。给定代理中的源和接收器与通道中的舞台事件异步运行。
复杂流程
Flume允许用户构建多跳流程,其中事件在到达最终目的地之前通过多个代理传输。它还允许扇入和扇出流程、上下文路由和备份路由(故障转移)。
可靠性
事件在每个代理的通道中被暂存。然后,事件被传递到下一个代理或流程中的终端存储库(如HDFS)。只有在事件被存储在下一个代理的通道或终端存储库中后,才从通道中移除。这就是Flume提供端到端流程可靠性的单跳消息传递语义的方式。
可恢复性
事件在通道中被暂存,通道管理从故障中恢复。Flume支持持久的文件通道,它由本地文件系统支持。还有一个内存通道,它简单地将事件存储在内存队列中,这更快,但在代理进程死亡时,内存通道中仍然留下的任何事件都无法恢复。
Flume的KafkaChannel使用Apache Kafka来暂存事件。使用复制的Kafka主题作为通道有助于避免磁盘故障时的事件丢失。
标准设置
本节记录了如何使用Flume的长期配置技术(使用属性文件)配置和连接Flume组件。请参阅下一节,了解如何使用Spring Boot创建Flume应用程序。
设置代理
Flume代理配置存储在一个或多个配置文件中,这些文件遵循Java属性文件格式。这些配置文件可以指定一个或多个代理的配置。配置包括代理中每个源、接收器和通道的属性以及它们如何连接以形成数据流。
配置单个组件
流程中的每个组件(源、接收器或通道)都有一个名称、类型以及特定于类型和实例化的属性集。例如,Avro源需要一个主机名(或IP地址)和一个端口号来接收数据。内存通道可以有最大队列大小(“容量”),HDFS接收器需要知道文件系统URI、创建文件的路径、文件轮换频率(“hdfs.rollInterval”)等。所有这些属性都需要在托管Flume代理的属性文件中设置。
将各个部分连接在一起
代理需要知道要加载哪些单个组件以及它们是如何连接的,以构成流程。这是通过列出代理中的每个源、接收器和通道的名称,然后为每个接收器和源指定连接通道来完成的。例如,一个代理通过名为file-channel的文件通道将事件从名为avroWeb的Avro源流向名为hdfs-cluster1的HDFS接收器。配置文件将包含这些组件的名称,并将file-channel作为avroWeb源和hdfs-cluster1接收器的共享通道。
启动代理
使用位于Flume发行版bin目录中的名为flume-ng的shell脚本启动代理。您需要在命令行上指定代理名称、配置目录和配置文件:
shell 代码解读复制代码sh
$ bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template
现在代理将启动在给定属性文件中配置的源和接收器。
一个简单的例子
这里,我们提供一个示例配置文件,描述了单节点Flume部署。此配置允许用户生成事件,然后将它们记录到控制台。
ini 代码解读复制代码properties
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
此配置定义了一个名为a1的单个代理。a1有一个在44444端口上侦听数据的源,一个在内存中缓冲事件数据的通道,以及一个将事件数据记录到控制台的接收器。配置文件命名了各种组件,然后描述了它们的类型和配置参数。给定的配置文件可能定义了几个命名代理;当启动给定的Flume进程时,会传递一个标志告诉它要体现哪个命名代理。
给定此配置文件,我们可以如下启动Flume:
css 代码解读复制代码sh
$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1
请注意,在完整的部署中,我们通常会包含一个额外的选项:--conf=。目录将包括一个shell脚本flume-env.sh和可能的log4j配置文件。
从另一个终端,我们可以通过telnet端口44444向Flume发送事件:
sql 代码解读复制代码sh
$ telnet localhost 44444
Trying 127.0.0.1...
Connected to localhost.localdomain (127.0.0.1).
Escape character is '^]'.
Hello world! <ENTER>
OK
原始的Flume终端将输出事件的日志消息。
css 代码解读复制代码plaintext
12/06/19 15:32:19 INFO source.NetcatSource: Source starting
12/06/19 15:32:19 INFO source.NetcatSource: Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]
12/06/19 15:32:34 INFO sink.LoggerSink: Event: { headers:{} body: 48 65 6C 6C 6F 20 77 6F 72 6C 64 21 0D Hello world!. }
恭喜你 - 你已经成功配置并部署了一个Flume代理!后续章节将更详细地介绍代理配置。
从URI配置
- 从版本1.10.0开始,Flume支持使用URI而不是仅从本地文件进行配置。包括对HTTP(S)、文件和类路径URI的直接支持。HTTP支持包括使用基本授权进行认证,但其他授权机制可能通过使用
--auth-provider
选项指定实现 AuthorizationProvider 接口的类的完全限定名来支持。HTTP还支持使用轮询重新加载配置文件,如果目标服务器正确响应 If-Modified-Since 头部。
要为HTTP认证添加凭据,请添加:
css 代码解读复制代码--conf-user userid --conf-password password
到启动命令。
多个配置文件
从版本1.10.0开始,Flume支持从多个配置文件而不是仅从一个配置文件进行配置。这更容易根据特定环境覆盖或添加值。每个文件应使用自己的 --conf-file
或 --conf-uri
选项进行配置。然而,所有文件应该要么使用 --conf-file
提供,要么使用 --conf-uri
提供。如果 --conf-file
和 --conf-uri
作为选项一起出现,所有 --conf-uri
配置将在任何 --conf-file
配置合并之前处理。
例如,以下配置:
shell 代码解读复制代码sh
$ bin/flume-ng agent --conf conf --conf-file example.conf --conf-uri http://localhost:80/flume.conf --conf-uri http://localhost:80/override.conf --
将导致首先读取 flume.conf
,然后将 override.conf
与之合并,最后 example.conf
将被最后合并。如果希望 example.conf
成为基本配置,应使用 --conf-uri
选项指定,如下:
--conf-uri classpath://example.conf
或--conf-uri file:///example.conf
取决于它应该如何被访问。
使用环境变量、系统属性或其他属性配置文件
Flume有能力在配置中替换环境变量。 例如:
ini 代码解读复制代码properties
a1.sources = r1
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = ${env:NC_PORT}
a1.sources.r1.channels = c1
注意:目前它仅适用于值,不适用于键。(即只在配置行的等号“=”的“右侧”。)
从版本1.10.0开始,Flume使用 Apache Commons Text 的 StringSubstitutor 类解析配置值,使用默认的 Lookup 集合以及使用配置文件作为替换值来源的 Lookup。
例如:
ini 代码解读复制代码sh
$ NC_PORT=44444 bin/flume-ng agent –conf conf –conf-file example.conf –name a1
注意:上述只是一个例子,环境变量可以以其他方式配置,包括在 conf/flume-env.sh
中设置。
如上所述,系统属性也得到支持,因此以下配置:
ini 代码解读复制代码properties
a1.sources = r1
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = ${sys:NC_PORT}
a1.sources.r1.channels = c1
可以使用,并且启动命令可以是:
css 代码解读复制代码sh
$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -DNC_PORT=44444
此外,因为允许多个配置文件,第一个文件可能包含:
ini 代码解读复制代码properties
a1.sources = r1
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = ${NC_PORT}
a1.sources.r1.channels = c1
并且覆盖文件可能包含:
ini 代码解读复制代码properties
NC_PORT = 44444
在这种情况下,启动命令可以是:
css 代码解读复制代码sh
$ bin/flume-ng agent --conf conf --conf-file example.conf --conf-file override.conf --name a1
请注意,以前版本中指定环境变量的方法仍然有效,但已经不推荐使用,而是为了使用 ${env:varName}
。
使用命令选项文件
从版本1.10.0开始,而不是在命令行上指定所有命令选项,命令选项可以放置在 /etc/flume/flume.opts
或类路径上的 flume.opts
中。一个例子可能是:
ini 代码解读复制代码properties
conf-file = example.conf
conf-file = override.conf
name = a1
记录原始数据
在许多生产环境中,不希望记录通过摄取管道流动的原始数据流,因为这可能导致敏感数据或与安全相关的配置(如密钥)泄露到Flume日志文件中。默认情况下,Flume不会记录此类信息。另一方面,如果数据管道出现问题,Flume将尝试为调试问题提供线索。
调试事件管道问题的一种方法是设置一个附加的内存通道,连接到记录器接收器,它将把所有事件数据输出到Flume日志中。然而,在某些情况下,这种方法可能不够。
为了启用事件和配置相关数据的记录,除了log4j属性外,还必须设置一些Java系统属性。
要启用与配置相关的记录,设置Java系统属性 -Dorg.apache.flume.log.printconfig=true
。这可以在命令行上传递,也可以在 flume-env.sh
中的 JAVA_OPTS
变量中设置。
要启用数据记录,以相同的方式设置Java系统属性 -Dorg.apache.flume.log.rawdata=true
。对于大多数组件,还必须将log4j日志级别设置为DEBUG或TRACE,以便事件特定记录出现在Flume日志中。
以下是启用配置记录和原始数据记录的示例:
c 代码解读复制代码sh
$ bin/flume-ng agent --conf conf --conf-file example.conf --name a1 -Dorg.apache.flume.log.printconfig=true -Dorg.apache.flume.log.rawdata=true
基于Zookeeper的配置
Flume支持通过Zookeeper进行代理配置。这是一个实验性功能。配置文件需要上传到Zookeeper中,在可配置的前缀下。配置文件存储在Zookeeper节点数据中。以下是代理a1和a2的Zookeeper节点树的外观:
bash代码解读复制代码- /flume |- /a1 [代理配置文件] |- /a2 [代理配置文件]
一旦上传了配置文件,使用以下选项启动代理:
shell 代码解读复制代码sh
$ bin/flume-ng agent –conf conf -z zkhost:2181,zkhost1:2181 -p /flume –name a1
参数名称 默认值 描述 z - Zookeeper连接字符串。主机名:端口的逗号分隔列表 p /flume 在Zookeeper中存储代理配置的基本路径
安装第三方插件
Flume具有完全基于插件的架构。虽然Flume提供了许多开箱即用的源、通道、接收器、序列化器等,但许多实现与Flume分开提供。
虽然通过在 flume-env.sh
文件中将它们的jar添加到 FLUME_CLASSPATH 变量中,一直可以包含自定义Flume组件,但Flume现在支持一个名为 plugins.d 的特殊目录,它自动拾取以特定格式打包的插件。这使得插件包装问题的管理更加容易,并且简化了多个类别问题的调试和故障排除,特别是库依赖冲突。
plugins.d目录
plugins.d目录位于 $FLUME_HOME/plugins.d
。在启动时,flume-ng启动脚本会在plugins.d目录中查找符合以下格式的插件,并将它们包含在启动java时的正确路径中。
插件的目录布局
在plugins.d中的每个插件(子目录)可以包含多达三个子目录:
1、lib - 插件的jar(s) 2、libext - 插件的依赖jar(s) 3、native - 任何所需的本地库,如.so文件
插件在plugins.d目录中的例子:
bash代码解读复制代码plugins.d/ plugins.d/custom-source-1/ plugins.d/custom-source-1/lib/my-source.jar plugins.d/custom-source-1/libext/spring-core-2.5.6.jar plugins.d/custom-source-2/ plugins.d/custom-source-2/lib/custom.jar plugins.d/custom-source-2/native/gettext.so
数据摄取
Flume支持多种机制从外部源摄取数据。
RPC
Flume发行版中包含的Avro客户端可以使用avro RPC机制将给定文件发送到Flume Avro源:
shell 代码解读复制代码sh
$ bin/flume-ng avro-client -H localhost -p 41414 -F /usr/logs/log.10
上述命令将把 /usr/logs/log.10
的内容发送到在该端口上侦听的Flume源。
执行命令
Flume 提供了一个 exec 源,它可以执行指定的命令并消费输出。输出的单行数据是指由回车符('\r')或换行符('\n')或两者组合而成的文本。
网络流
Flume 支持以下机制来读取流行的日志流类型数据,例如:
- Avro
- Thrift
- Syslog
- Netcat
设置多代理流程
为了在多个代理或跳转之间流动数据,前一个代理的接收器和当前跳转的源需要是 Avro 类型,接收器指向源的主机名(或 IP 地址)和端口。
合并流程
在日志收集中一个非常常见的场景是大量日志生成客户端将数据发送到少数几个连接到存储子系统的消费者代理。例如,从数百台 Web 服务器收集的日志发送到一打代理,这些代理写入 HDFS 集群。
这可以通过配置多个第一层代理的 Avro 接收器来实现,所有这些都指向单个代理的 Avro 源(同样,您可以在这种情况下使用 Thrift 源/接收器/客户端)。第二层代理的这个源将接收到的事件合并到一个单独的通道中,该通道由接收器消费到其最终目的地。
多路复用流程
Flume 支持将事件流多路复用到一个或多个目的地。这是通过定义一个流多路复用器实现的,它可以复制或选择性地将事件路由到一个或多个通道。
上述示例显示了来自代理“foo”的源将流程扇出到三个不同的通道。这种扇出可以是复制或多路复用。在复制流程的情况下,每个事件被发送到所有三个通道。对于多路复用情况,当事件的属性与预配置值匹配时,事件被交付到可用通道的一个子集。例如,如果事件属性“txnType”设置为“customer”,则它应该进入 channel1 和 channel3,如果是“vendor”,则应该进入 channel2,否则进入 channel3。映射可以在代理的配置文件中设置。
使用 Flume 的 Spring Boot 设置
简介
Apache Flume 是一个分布式、可靠且可用的系统,用于高效地收集、聚合和移动大量日志数据。Flume 的 Spring Boot 模块提供了对使用 Spring Boot 打包和配置应用程序的支持。建议使用 2.0.0 或更高版本的 flume-spring-boot。
Flume 通常遵循一种范式,每个组件实现 Configurable
接口,并必须实现 configure
方法,以通过从其上下文的属性中检索配置属性来配置自己。相反,Spring 和 Spring Boot 通常依赖于依赖注入,其中配置值作为构造函数参数或通过 setter 方法设置到被配置的对象中。
Flume 与 Spring Boot 的集成支持通过应用程序的属性配置 Flume 组件,使用常规的 application.yml
文件。与 Flume 的默认配置方法不同,只有组件属性在 application.yml
中指定。将组件连接在一起,从而定义数据流,则通过 Spring 的 Java 配置来处理。
创建应用程序
Flume 的 Spring Boot 支持提供了一个主类,配置为 org.apache.flume.spring.boot.FlumeApplication
。使用 Spring Boot 的 Flume 应用程序应将 Spring Boot Maven 插件配置为将其作为主类,如下所示:
xml 代码解读复制代码xml
<execution>
<id>repackageid>
<goals>
<goal>repackagegoal>
goals>
<configuration>
<executable>trueexecutable>
<mainClass>org.apache.flume.spring.boot.FlumeApplicationmainClass>
configuration>
execution>
组件扫描
Spring Boot 将自动查找 Flume 提供的所有 Spring 组件。然而,为了使 Flume 应用程序能够配置,Spring 需要应用程序使用的配置和包名称,以便定位这些组件。这可以通过在应用程序中提供一个 META-INF/spring.factories
文件来实现,该文件启用一个类的自动配置,然后提供应用程序其余部分的组件扫描信息。例如:
META-INF/spring.factories:
ini 代码解读复制代码properties
org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.sample.myapp.config.AppConfig
com.sample.config.AppConfig.java:
kotlin 代码解读复制代码java
package com.sample.myapp.config;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
@Configuration
@ComponentScan(basePackages="com.sample.myapp")
public class MyConfiguration {
}
这将导致 Spring 扫描 com.sample.myapp
包及其子包中的所有类,以查找要包含的组件。注意,找到的类也可以使用 Spring 的 @Import
注解来包含其他包中的类。
组件连接
Flume 的 Spring Boot 支持将所有定义的通道、源运行器和接收器自动收集并启动。为此,它们必须首先使用 Spring 的 @Bean
注解在包含 @Configuration
注解的类中创建为 Spring 单例,然后像“正常的” FlumeApplication 类那样初始化它们。要定义这些组件,应用程序应提供一个配置类来创建这些 Flume 组件。一个生成序列号、将其写入内存通道并随后消费这些事件而不发布到任何地方的示例配置如下:
typescript 代码解读复制代码java
@Configuration
@ComponentScan(basePackages="com.sample.myapp")
public class AppConfig extends AbstractFlumeConfiguration {
@Bean
@ConfigurationProperties(prefix = "flume.sources.source1")
public Map<String, String> source1Properties() {
return new HashMap<>();
}
@Bean
@ConfigurationProperties(prefix = "flume.channels.channel1")
public Map<String, String> channel1Properties() {
return new HashMap<>();
}
@Bean
public Channel memoryChannel(Map<String, String> channel1Properties) {
return configureChannel("channel1", MemoryChannel.class, channel1Properties);
}
@Bean
public SourceRunner seqSource(Channel memoryChannel, Map<String, String> source1Properties) {
ChannelSelector selector = new ReplicatingChannelSelector();
selector.setChannels(listOf(memoryChannel));
return configureSource("source1", SequenceGeneratorSource.class, selector,
source1Properties);
}
@Bean
public SinkRunner nullSink(Channel memoryChannel) {
Sink sink = configureSink("null", NullSink.class, memoryChannel, null);
return createSinkRunner(configureSinkProcessor(null, DefaultSinkProcessor.class,
listOf(sink)));
}
}
该配置的配置可能如下所示:
yaml 代码解读复制代码yaml
spring:
application:
name: flume-test
server:
port: 41414
flume:
metrics: http
sources:
source1:
totalEvents: 10000
channels:
channel1:
capacity: 10000
这将导致一个名为“flume-test”的应用程序,在41414端口上监听 /metrics
端点。10,000个事件将被写入通道。这些事件将由 NullSink 消费。配置类应扩展 AbstractFlumeConfiguration
,如上所示,以能够使用构建适当 Flume 组件的辅助类。
请注意,使用这种方式配置的 Flume 应用程序可以访问所有 Spring Boot 的功能。
SinkGroups 和 Sinks 的配置
SinkGroups 和 Sinks 也可以以类似的方式进行配置,如下所示:
yaml 代码解读复制代码yaml
flume:
sinkGroups:
rrobin:
backoff: true
selector: round_robin
"selector.maxTimeOut": 30000
sinks:
avroSinks:
avroSink1:
hostname: 192.168.10.10
port: 4141
batch-size: 100
compression-type: deflate
avroSink2:
hostname: 192.168.10.11
port: 4141
batch-size: 100
compression-type: deflate
这些将在 Java 配置中进行如下配置:
typescript 代码解读复制代码java
@Bean
@ConfigurationProperties(prefix = "flume.sink-groups.rrobin")
public Map<String, String> rrobinProperties() {
return new HashMap<>();
}
@Bean
@ConfigurationProperties(prefix = "flume.sinks.avro-sinks")
public Map<String, AvroSinkConfiguration> avroSinksProperties() {
return new HashMap<>();
}
@Bean
public List<Sink> avroSinks(final Channel avroFileChannel,
final Map<String, AvroSinkConfiguration> avroSinksProperties) {
List<Sink> sinks = new ArrayList<>();
for (Map.Entry<String, AvroSinkConfiguration> entry : avroSinksProperties.entrySet()) {
sinks.add(configureSink(entry.getKey(), AvroSink.class, avroFileChannel,
entry.getValue().getProperties()));
}
return sinks;
}
@Bean
public SinkRunner avroSinkRunner(final Map<String, String> rrobinProperties, final List avroSinks ) {
return createSinkRunner(
configureSinkProcessor(rrobinProperties, LoadBalancingSinkProcessor.class, avroSinks));
}
请注意,为源、通道和接收器组指定的属性名称必须与文档其他部分中组件指定的属性名称匹配。
重要的是要注意,使用具体类来捕获 Avro Sinks 的数据。当使用简单的 Map 时,Spring 似乎会对嵌套的 Maps 感到困惑。AvroSinkConfiguration
类将如下所示:
arduino 代码解读复制代码java
public class AvroSinkConfiguration {
private String hostName;
private int port;
private int batchSize;
private String compressionType;
public String getHostName() {
return hostName;
}
public void setHostName(String hostName) {
this.hostName = hostName;
}
public int getPort() {
return port;
}
public void setPort(int port) {
this.port = port;
}
public int getBatchSize() {
return batchSize;
}
public void setBatchSize(int batchSize) {
this.batchSize = batchSize;
}
public String getCompressionType() {
return compressionType;
}
public void setCompressionType(String compressionType) {
this.compressionType = compressionType;
}
public Map<String, String> getProperties() {
Map<String, String> map = new HashMap<>();
map.put("hostname", hostName);
map.put("port", Integer.toString(port));
map.put("batchSize", Integer.toString(batchSize));
map.put(compressionType, compressionType);
return map;
}
}
这个配置示例展示了如何在 Spring Boot 环境中使用 Flume 进行配置和管理。通过合理的配置,您可以轻松地实现数据的采集和传输。
Apache Flume 的配置涉及定义数据流,这通常在配置文件中完成,该文件采用类似于 Java 属性文件的格式,具有分层的属性设置。
定义数据流
要在单个代理中定义数据流,需要通过通道将源和接收器连接起来。需要为给定代理列出源、接收器和通道,然后将源和接收器指向通道。源实例可以指定多个通道,但接收器实例只能指定一个通道。配置格式如下:
xml 代码解读复制代码properties
# 为代理列出源、接收器和通道
<Agent>.sources = <Source>
<Agent>.sinks = <Sink>
<Agent>.channels = <Channel1> <Channel2>
# 为源设置通道
<Agent>.sources.<Source>.channels = <Channel1> <Channel2> ...
# 为接收器设置通道
<Agent>.sinks.<Sink>.channel = <Channel1>
例如,名为 agent_foo
的代理从外部 Avro 客户端读取数据,并通过内存通道将其发送到 HDFS。配置文件 weblog.config
可能如下所示:
ini 代码解读复制代码properties
# 为代理列出源、接收器和通道
agent_foo.sources = avro-appserver-src-1
agent_foo.sinks = hdfs-sink-1
agent_foo.channels = mem-channel-1
# 为源设置通道
agent_foo.sources.avro-appserver-src-1.channels = mem-channel-1
# 为接收器设置通道
agent_foo.sinks.hdfs-sink-1.channel = mem-channel-1
这将使得事件从 avro-AppSrv-source
通过内存通道 mem-channel-1
流向 hdfs-Cluster1-sink
。当代理使用 weblog.config
作为其配置文件启动时,它将实例化该流。
配置单个组件
在定义了流之后,需要为每个源、接收器和通道设置属性。这是通过在相同的层次命名空间中设置组件类型和其他特定于每个组件的属性值来完成的:
xml 代码解读复制代码properties
# 源的属性
<Agent>.sources.<Source>.<someProperty> = <someValue>
# 通道的属性
<Agent>.channels.<Channel>.<someProperty> = <someValue>
# 接收器的属性
<Agent>.sinks.<Sink>.<someProperty> = <someValue>
每个组件的“类型”属性需要设置,以便 Flume 知道它需要成为的对象类型。每种源、接收器和通道类型都有其自己的一组属性,这些属性需要按需设置。在前面的例子中,我们有一个从 avro-AppSrv-source
到 hdfs-Cluster1-sink
通过内存通道 mem-channel-1
的流。以下是配置这些组件的示例:
ini 代码解读复制代码properties
agent_foo.sources = avro-AppSrv-source
agent_foo.sinks = hdfs-Cluster1-sink
agent_foo.channels = mem-channel-1
# 设置源和接收器的通道
# avro-AppSrv-source 的属性
agent_foo.sources.avro-AppSrv-source.type = avro
agent_foo.sources.avro-AppSrv-source.bind = localhost
agent_foo.sources.avro-AppSrv-source.port = 10000
# mem-channel-1 的属性
agent_foo.channels.mem-channel-1.type = memory
agent_foo.channels.mem-channel-1.capacity = 1000
agent_foo.channels.mem-channel-1.transactionCapacity = 100
# hdfs-Cluster1-sink 的属性
agent_foo.sinks.hdfs-Cluster1-sink.type = hdfs
agent_foo.sinks.hdfs-Cluster1-sink.hdfs.path = hdfs://namenode/flume/webdata
在代理中添加多个流
单个 Flume 代理可以包含多个独立的流。可以在配置中列出多个源、接收器和通道。这些组件可以连接形成多个流:
xml 代码解读复制代码properties
# 为代理列出源、接收器和通道
<Agent>.sources = <Source1> <Source2>
<Agent>.sinks = <Sink1> <Sink2>
<Agent>.channels = <Channel1> <Channel2>
然后,可以将源和接收器连接到它们对应的通道(对于源)或通道(对于接收器)来设置两个不同的流。例如,如果需要在代理中设置两个流,一个从外部 Avro 客户端到外部 HDFS,另一个从 tail 输出到 Avro 接收器,那么配置如下:
ini 代码解读复制代码properties
# 列出代理的源、接收器和通道
agent_foo.sources = avro-AppSrv-source1 exec-tail-source2
agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2
agent_foo.channels = mem-channel-1 file-channel-2
# 流 #1 配置
agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1
agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1
# 流 #2 配置
agent_foo.sources.exec-tail-source2.channels = file-channel-2
agent_foo.sinks.avro-forward-sink2.channel = file-channel-2
配置多代理流
要设置多层流,需要有第一跳的 Avro/Thrift 接收器指向下一跳的 Avro/Thrift 源。这将导致第一个 Flume 代理将事件转发到下一个 Flume 代理。例如,如果您定期使用 Avro 客户端将文件(每个事件一个文件)发送到本地 Flume 代理,然后这个本地代理可以将其转发到另一个代理,该代理已挂载用于存储。
Weblog 代理配置:
ini 代码解读复制代码properties
# 列出代理的源、接收器和通道
agent_foo.sources = avro-AppSrv-source
agent_foo.sinks = avro-forward-sink
agent_foo.channels = file-channel
# 定义流
agent_foo.sources.avro-AppSrv-source.channels = file-channel
agent_foo.sinks.avro-forward-sink.channel = file-channel
# Avro 接收器属性
agent_foo.sinks.avro-forward-sink.type = avro
agent_foo.sinks.avro-forward-sink.hostname = 10.1.1.100
agent_foo.sinks.avro-forward-sink.port = 10000
# 配置其他部分
# ...
HDFS 代理配置:
ini 代码解读复制代码properties
# 列出代理的源、接收器和通道
agent_foo.sources = avro-collection-source
agent_foo.sinks = hdfs-sink
agent_foo.channels = mem-channel
# 定义流
agent_foo.sources.avro-collection-source.channels = mem-channel
agent_foo.sinks.hdfs-sink.channel = mem-channel
# Avro 源属性
agent_foo.sources.avro-collection-source.type = avro
agent_foo.sources.avro-collection-source.bind = 10.1.1.100
agent_foo.sources.avro-collection-source.port = 10000
# 配置其他部分
# ...
在这里,我们将 weblog 代理的 avro-forward-sink
连接到 hdfs 代理的 avro-collection-source
。这将导致来自外部应用服务器源的事件最终被存储在 HDFS 中。
扇出流
如前一节所讨论的,Flume 支持将流从单个源扇出到多个通道。有两种扇出模式:复制和多路复用。在复制流中,事件被发送到所有配置的通道。在多路复用的情况下,事件只被发送到符合条件的子集通道。要扇出流,需要为源指定通道列表和扇出策略。这通过添加一个通道“选择器”来完成,它可以是复制或多路复用。然后进一步指定选择规则(如果是多路复用器)。如果没有指定选择器,则默认为复制:
xml 代码解读复制代码properties
# 列出代理的源、接收器和通道
<Agent>.sources = <Source1>
<Agent>.sinks = <Sink1> <Sink2>
<Agent>.channels = <Channel1> <Channel2>
# 为源设置通道列表(用空格分隔)
<Agent>.sources.<Source1>.channels = <Channel1> <Channel2>
# 为接收器设置通道
<Agent>.sinks.<Sink1>.channel = <Channel1>
<Agent>.sinks.<Sink2>.channel = <Channel2>
<Agent>.sources.<Source1>.selector.type = replicating
多路复用选择器有一组进一步的属性来分支流。这需要为事件属性指定映射到通道集的映射。选择器检查事件头中为每个配置的属性。如果它匹配指定的值,则该事件被发送到所有映射到该值的通道。如果没有匹配,则事件被发送到配置为默认的通道集:
xml 代码解读复制代码properties复制
# 多路复用选择器映射
<Agent>.sources.<Source1>.selector.type = multiplexing
<Agent>.sources.<Source1>.selector.header = <someHeader>
<Agent>.sources.<Source1>.selector.mapping.<Value1> = <Channel1>
<Agent>.sources.<Source1>.selector.mapping.<Value2> = <Channel1> <Channel2>
<Agent>.sources.<Source1>.selector.mapping.<Value3> = <Channel2> #...
<Agent>.sources.<Source1>.selector.default = <Channel2>
映射允许每个值的通道重叠。
以下示例具有单个流,该流多路复用到两个路径。代理名为 agent_foo
的代理具有单个 Avro 源和两个通道,链接到两个接收器:
properties代码解读复制代码# 列出代理的源、接收器和通道 agent_foo.sources = avro-AppSrv-source1 agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2 agent_foo.channels = mem-channel-1 file-channel-2 # 为源设置通道 agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1 file-channel-2 # 为接收器设置通道 agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1 agent_foo.sinks.avro-forward-sink2.channel = file-channel-2 # 通道选择器配置 agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing agent_foo.sources.avro-AppSrv-source1.selector.header = State agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1 agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2 agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2 agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1
选择器检查名为 “State” 的头部。如果值为 “CA”,则发送到 mem-channel-1;如果是 “AZ”,则发送到 file-channel-2;如果是 “NY”,则两者都发送。如果 “State” 头部未设置或不匹配这三个中的任何一个,则发送到指定为 ‘default’ 的 mem-channel-1。
选择器还支持可选通道。要为头部指定可选通道,配置参数 ‘optional’ 如下使用:
ini 代码解读复制代码properties
# 通道选择器配置
agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing
agent_foo.sources.avro-AppSrv-source1.selector.header = State
agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.optional.CA = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1
选择器首先尝试写入必需的通道,如果任何一个通道未能消费事件,则会失败该事务。事务在所有通道上重试。一旦所有必需的通道都消费了事件,然后选择器将尝试写入可选通道。任何可选通道未能消费事件的失败将被忽略并不重试。
如果对于特定头部,可选通道和必需通道之间存在重叠,该通道被视为必需的,通道的失败将导致整个必需通道集重试。例如,在上述示例中,对于头部 “CA”,mem-channel-1 被视为必需通道,即使它同时被标记为必需和可选,未能写入该通道将导致该事件在所有为选择器配置的通道上重试。
请注意,如果头部没有任何必需通道,则事件将被写入默认通道,并且将尝试将事件写入该头部的可选通道。指定可选通道仍将导致事件被写入默认通道,如果没有指定必需通道。如果没有指定默认通道并且没有必需通道,选择器将尝试将事件写入可选通道。在这种情况下,任何失败都被忽略。
评论记录:
回复评论: