宝塔服务器面板,一键全能部署及管理,送你10850元礼包,点我领取

 目录

1.前言

1.1什么是flume?

1.2Flume特性

2.Flume核心概念

2.1agent

2.2Event:flume内部数据传输的封装形式

2.3Transaction:事务控制机制

2.4拦截器

3.Flume安装部署

3.1参数配置

3.2启动命令

4.Flume入门案例

4.1数据流

 4.2 组件选择

 4.3 部署配置实现

5.Flume常用内置组件详解

6.Flume常用组件详解:Source

6.1netcat source

6.1.1工作机制:

6.1.2配置文件:

6.2 exec source

6.2.1工作机制:

 6.2.2参数详解:

6.2.3配置文件:

6.3 spooldir source

6.3.1工作机制:

6.3.2参数详解:

6.3.3配置文件:

6.3.4启动测试:

6.4 avro source

6.4.1工作机制

6.4.2参数详解

6.4.3配置文件

6.4.4启动测试

6.4.5启动测试利用avro source和avro sink实现agent级联

6.5 kafka source

6.5.1工作机制

6.5.2参数详解

6.5.3配置文件

6.5.4启动测试

6.6 taildir source

6.6.1 工作机制

​​​​​​​6.6.2 参数详解

6.6.3配置文件

​​​​​​​​​​​​​​6.6.4启动测试

7Flume常用组件详解:Interceptor拦截器

7.1timestamp 拦截器

7.1.1作用

7.1.2参数

7.1.3配置示例

7.1.4​​​​​​​​​​​​​​测试

7.2static拦截器

7.2.1作用

​​​​​​​7.2.2参数

​​​​​​​7.2.3配置示例

​​​​​​​7.2.4测试

7.3Host 拦截器

7.3.1作用

​​​​​​​7.3.2参数

7.3.3配置示例

7.3.4测试

7.4 UUID 拦截器

7.4.1作用

7.4.2参数

7.4.3配置

​​​​​​​​​​​​​​7.4.4测试

8 Flume常用组件详解:channel

8.1 memory channel

8.1.1特性

​​​​​​​​​​​​​​8.1.2参数

​​​​​​​​​​​​​​8.1.3配置示例

​​​​​​​​​​​​​​8.1.4测试

​​​​​​​​​​​​​​8.1.5扩展了解

​​​​​​​​​​​8.2 file channel

8.2.1特性

​​​​​​​8.2.3参数

​​​​​​​8.2.4配置示例

​​​​​​​8.2.5​​​​​​​​​​​​​​测试

​​​​​​​8.3kafka channel

​​​​​​​​​​​​​​​​​​​​​8.3.1特性

​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​8.3.2参数

​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​8.3.3配置测试

9 Flume常用组件详解:sink

9.1.1特性

​​​​​​​​​​​​​​9.1.2参数

​​​​​​​​​​​​​​9.1.3配置示例

9.1.4测试

9.2 kafka sink

9.2.1特性

9.2.2参数

9.3 avro sink

9.3.1 特性

10 Flume常用组件详解:Selector

10.1实践一:replicating selector(复制选择器)

 ​​​​​​​​​​​10.1.1目标场景

​​​​​​​​​​​​​​​​​​​​​​​​​​​​10.1​​​​​​​.2Flume agent配置

10.1​​​​​​​.3 Collector1 配置

10.1.4 Collector2 配置

10.1.5 测试验证

10.2 实践二:multiplexing selector(多路选择器)

10.2.1目标场景

 10.2.2 第一级1 / 2配置

 ​​​​​​​10.2.3 第二级配置

​​​​​​​​​​​​​​ ​​​​​​​10.2.4 测试验证

11 Flume常用组件详解:grouping processor

12Flume自定义扩展组件

12.1自定义Source

12.1.1需求场景

​​​​​​​​​​​​​​12.1.2实现思路

12.1.3代码架构

  ​​​12.1.4 具体实现

12.2 自定义拦截器

12.2.2实现思路

12.2.3 自定义拦截器的开发

13 综合案例

13.1 案例场景

13.2 实现思路

13.4 配置文件

13.5 启动测试

14 面试加强

14.1 flume事务机制

14.2flume agent内部机制

14.3 ganglia及flume监控

14.4 Flume调优


1.前言

flume是由cloudera软件公司产出的可分布式日志收集系统,后与2009年被捐赠了apache软件基金会,为hadoop相关组件之一。尤其近几年随着flume的不断被完善以及升级版本的逐一推出,特别是flume-ng;同时flume内部的各种组件不断丰富,用户在开发的过程中使用的便利性得到很大的改善,现已成为apache top项目之一.

补充:cloudera公司的主打产品是CDH(hadoop的一个企业级商业发行版)

1.1什么是flume?

   Apache Flume 是一个从可以收集例如日志,事件等数据资源,并将这些数量庞大的数据从各项数据资源中集中起来存储的工具/服务。flume具有高可用,分布式和丰富的配置工具,其结构如下图所示:

Flume: 是一个数据采集工具;可以从各种各样的数据源(服务器)上采集数据传输(汇聚)到大数据生态的各种存储系统中(Hdfs、hbase、hive、kafka);

开箱即用!(安装部署、修改配置文件)

Flume知识点全面总结教程-风君子博客

1.2Flume特性

Flume是一个分布式、可靠、和高可用的海量日志采集、汇聚和传输的系统。

Flume可以采集文件,socket数据包(网络端口)、文件夹、kafka、mysql数据库等各种形式源数据,又可以将采集到的数据(下沉sink)输出到HDFS、hbase、hive、kafka等众多外部存储系统中

一般的采集、传输需求,通过对flume的简单配置即可实现;不用开发一行代码!

Flume针对特殊场景也具备良好的自定义扩展能力,因此,flume可以适用于大部分的日常数据采集场景

2.Flume核心概念

2.1agent

Flume中最核心的角色是agent,flume采集系统就是由一个个agent连接起来所形成的一个或简单或复杂的数据传输通道。

对于每一个Agent来说,它就是一个独立的守护进程(JVM),它负责从数据源接收数据,并发往下一个目的地,如下图所示:

Flume知识点全面总结教程-风君子博客

每一个agent相当于一个数据(被封装成Event对象)传递员,内部有三个组件:

Source:采集组件,用于跟数据源对接,以获取数据;它有各种各样的内置实现;

Sink:下沉组件,用于往下一级agent传递数据或者向最终存储系统传递数据

Channel:传输通道组件,用于从source将数据传递到sink

Flume知识点全面总结教程-风君子博客

单个agent采集数据

Flume知识点全面总结教程-风君子博客

 多级agent之间串联

Flume知识点全面总结教程-风君子博客

2.2Eventflume内部数据传输的封装形式

数据在Flum内部中数据以Event的封装形式存在。

因此,Source组件在获取到原始数据后,需要封装成Event放入channel;

Sink组件从channel中取出Event后,需要根据配置要求,转成其他形式的数据输出。

Event封装对象主要有两部分组成: Headers和  Body

Header是一个集合  Map[String,String],用于携带一些KV形式的元数据(标志、描述等)

Boby: 就是一个字节数组;装载具体的数据内容

2018-11-03 18:44:44,913 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO – org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 61 20 61 20 61 61 61 20 61 20 0D                a a aaa a . }

2.3Transaction事务控制机制

Flume的事务机制(类似数据库的事务机制):

Flume使用两个独立的事务分别负责从Soucrce到Channel,以及从Channel到Sink的event传递。比如spooling directory source 为文件的每一个event batch创建一个事务,一旦事务中所有的事件全部传递到Channel且提交成功,那么Soucrce就将event batch标记为完成。

同理,事务以类似的方式处理从Channel到Sink的传递过程,如果因为某种原因使得事件无法记录,那么事务将会回滚,且所有的事件都会保持到Channel中,等待重新传递。

事务机制涉及到如下重要参数:

a1.sources.s1.batchSize =100

a1.sinks.k1.batchSize = 200

a1.channels.c1.transactionCapacity = 300 (应该大于source或者sink的批次大小)

< transactionCapacity 是说,channel中保存的事务的个数>

跟channel的数据缓存空间容量区别开来:

a1.channels.c1.capacity = 10000

Flume知识点全面总结教程-风君子博客

那么事务是如何保证数据的端到端完整性的呢?看下面有两个agent的情况:

Flume知识点全面总结教程-风君子博客

数据流程:

  1. source 1产生Event,通过“put”、“commit”操作将Event放到Channel 1中
  2. sink 1通过“take”操作从Channel 1中取出Event,并把它发送到Source 2中
  3. source 2通过“put”、“commit”操作将Event放到Channel 2中
  4. source 2向sink 1发送成功信号,sink 1“commit”步骤2中的“take”操作(其实就是删除Channel 1中的Event)

说明:在任何时刻,Event至少在一个Channel中是完整有效的

2.4拦截器

拦截器工作在source组件之后,source产生的event会被传入拦截器根据需要进行拦截处理

而且,拦截器可以组成拦截器链!

拦截器在flume中有一些内置的功能比较常用的拦截器

用户也可以根据自己的数据处理需求,自己开发自定义拦截器!

这也是flume的一个可以用来自定义扩展的接口!

3.Flume安装部署

3.1参数配置

Flume的安装非常简单,只需要解压即可,当然,前提是已有hadoop环境

1.上传安装包到数据源所在节点上

然后解压  tar -zxvf apache-flume-1.8.0-bin.tar.gz

2、根据数据采集的需求配置采集方案,描述在配置文件中(文件名可任意自定义)

3、指定采集方案配置文件,在相应的节点上启动flume agent

3.2启动命令

bin/flume-ng agent -c ./conf ………….

commands:

  help                      显示本帮助信息

  agent                     启动一个agent进程

  avro-client                 启动一个用于测试avro source的客户端(能够发送avro序列化流)

  version                    显示当前flume的版本信息

global options:   全局通用选项

  –conf,-c <conf>          指定flume的系统配置文件所在目录

  –classpath,-C <cp>        添加额外的jar路径

  –dryrun,-d               不去真实启动flume agent,而是打印当前命令

  –plugins-path <dirs>       指定插件(jar)所在路径

  -Dproperty=value          传入java环境参数

  -Xproperty=value          传入所需的JVM配置参数

agent options:

  –name,-n <name>          agent的别名(在用户采集方案配置文件中)

  –conf-file,-f <file>          指定用户采集方案配置文件的路径

  –zkConnString,-z <str>      指定zookeeper的连接地址

  –zkBasePath,-p <path>      指定用户配置文件所在的zookeeper path,比如:/flume/config

  –no-reload-conf            关闭配置文件动态加载

  –help,-h                   display help text

avro-client options:

  –rpcProps,-P <file>   RPC client properties file with server connection params

  –host,-H <host>     avro序列化数据所要发往的目标主机(avro source所在机器)

  –port,-p <port>      avro序列化数据所要发往的目标主机的端口号

  –dirname <dir>      需要被序列化发走的数据所在目录(提前准备好测试数据放在一个文件中)

  –filename,-F <file>   需要被序列化发走的数据所在文件(default: std input)

  –headerFile,-R <file>  存储header key-value的文件

  –help,-h             帮助信息

  Either –rpcProps or both –host and –port must be specified.

Note that if <conf> directory is specified, then it is always included first

in the classpath.

开启内置监控功能

-Dflume.monitoring.type=http -Dflume.monitoring.port=34545

4.Flume入门案例

先用一个最简单的例子来测试一下程序环境是否正常

4.1数据流

Flume知识点全面总结教程-风君子博客

 4.2
组件选择

  • Source组件 NetCat:

Flume知识点全面总结教程-风君子博客

  • Channel组件:

Memory Channel

Flume知识点全面总结教程-风君子博客

capacity: 缓存的容量 ,可缓存的event的数量

transactionCapacity: 事务容量。支持出错情况下的event回滚事件数量。

  • Sink组件: logger Sink

Flume知识点全面总结教程-风君子博客

 4.3
部署配置实现

  • 创建部署配置文件

在flume的安装目录下,新建一个文件夹,myconf

# cd myconf

# vi  netcat-logger.conf

# 定义这个agent中各组件的名字

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# 描述和配置source组件:r1

a1.sources.r1.type = netcat

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 9999

# source 和 channel关联

a1.sources.r1.channels = c1  

# 描述和配置sink组件:k1

a1.sinks.k1.type = logger

# sink也要关联channel

a1.sinks.k1.channel = c1

# 描述和配置channel组件,此处使用是内存缓存的方式

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100

flume-ng 命令的模式:

Flume知识点全面总结教程-风君子博客

  •  启动一个采集器:

[root@hdp-01 apache-flume-1.6.0-bin]# bin/flume-ng agent -n a1 -c conf -f myconf/netcat-logger.conf    -Dflume.root.logger=INFO,console

agent    运行一个采集器

-n a1  指定我们这个agent的名字

-c conf   指定flume自身的配置文件所在目录

-f conf/netcat-logger.conf  指定自定义的采集方案

在工作环境中的命令为:

nohup bin/flume-ng agent -n a1 -c conf -f myconf/netcat-logger.conf 1>/dev/null 2>&1 &

  • 测试

往agent的source所监听的端口上发送数据,让agent有数据可采。

通过telnet命令向端口发送消息:

[root@hdp-01 ~]# telnet hdp-01 9999

如果没有telnet命令,用yum安装一个即可:  

yum -y install telnet

就可以通过日志查看:

Flume知识点全面总结教程-风君子博客

注意: 注释不能写在配置的后面,只能单独一行写。

Flume知识点全面总结教程-风君子博客

5.Flume常用内置组件详解

Flume支持众多的source和sink类型,详细手册可参考官方文档

Flume 1.9.0 User Guide — Apache Flume

6.Flume常用组件详解Source

6.1netcat source

6.1.1工作机制:

启动一个socket服务,监听一个端口;

将端口上收到的数据,转成event写入channel;

6.1.2配置文件:

a1.sources = s1

a1.sources.s1.type = netcat

a1.sources.s1.bind = 0.0.0.0

a1.sources.s1.port = 44444

a1.sources.s1.channels = c1

6.2
exec source

6.2.1工作机制:

启动一个用户所指定的linux shell命令;

采集这个linux shell命令的标准输出,作为收集到的数据,转为event写入channel;

Flume知识点全面总结教程-风君子博客

 6.2.2参数详解:

channels

本source要发往的channel

type

本source的类别名称:exec

command

本source所要运行的linux命令,比如: tail -F /path/file

shell

指定运行上述命令所用shell

restartThrottle

10000

命令die了以后,重启的时间间隔

restart

false

命令die了以后,是否要重启

logStdErr

false

是否收集命令的错误输出stderr

batchSize

20

提交的event批次大小

batchTimeout

3000

发往下游没完成前,等待的时间

selector.type

replicating

指定channel选择器:replicating or multiplexing

selector.*

选择器的具体参数

interceptors

指定拦截器

interceptors.*

 指定的拦截器的具体参数

6.2.3配置文件:

a1.sources = s1

a1.sources.s1.channels = c1

a1.sources.s1.type = exec

a1.sources.s1.command = tail -F /root/weblog/access.log

a1.sources.s1.batchSize = 100

a1.channels = c1

a1.channels.c1.type = memory

a1.channels.c1.capacity = 200

a1.channels.c1.transactionCapacity = 100

a1.sinks = k1

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

启动测试:

1.准备一个日志文件

2.写一个脚本模拟往日志文件中持续写入数据

for i in {1..10000}; do echo ${i}————————— >> access.log ; sleep 0.5; done

3.创建一个flume自定义配置文件

4.启动flume采集

注意:通过人为破坏测试,发现这个exec source,不会记录宕机前所采集数据的偏移量位置,重启后可能会造成数据丢失!

6.3
spooldir source

6.3.1工作机制:

监视一个指定的文件夹,如果文件夹下有没采集过的新文件,则将这些新文件中的数据采集,并转成event写入channel;

注意:spooling目录中的文件必须是不可变的,而且是不能重名的!否则,source会loudly fail!

6.3.2参数详解:

Property Name

Default

Description

channels

type

The component type name, needs to be spooldir.

spoolDir

The directory from which to read files from.

fileSuffix

.COMPLETED

采集完成的文件,添加什么后缀名

deletePolicy

never

是否删除采完的文件never or immediate

fileHeader

false

是否将所采集文件的绝对路径添加到header中

fileHeaderKey

file

上述header的key名称

basenameHeader

false

是否将文件名添加到header

basenameHeaderKey

basename

上述header的key名称

includePattern

^.*$

指定需要采集的文件名的正则表达式

ignorePattern

^$

指定要排除的文件名的正则表达式

如果一个文件名即符合includePattern又匹配ignorePattern,则该文件不采

trackerDir

.flumespool

记录元数据的目录所在路径,可以用绝对路径也可以用相对路径(相对于采集目录)

trackingPolicy

rename

采集进度跟踪策略,有两种: “rename” “tracker_dir”. 本参数只在deletePolicy=never时才生效

 “rename”– 采完的文件根据filesuffix重命名

 “tracker_dir” – 采完的文件会在trackerDir目录中生成一个同名的空文件

consumeOrder

oldest

采集顺序: oldestyoungest and random.

oldest和youngest情况下,可能会带来一定效率的损失;(需要对文件夹中所有文件进行一次扫描以寻找最old或最young的)

pollDelay

500

Delay (in milliseconds) used when polling for new files.

recursiveDirectorySearch

false

Whether to monitor sub directories for new files to read.

maxBackoff

4000

The maximum time (in millis) to wait between consecutive attempts to write to the channel(s) if the channel is full. The source will start at a low backoff and increase it exponentially each time the channel throws a ChannelException, upto the value specified by this parameter.

batchSize

100

一次传输到channel的event条数(一批)

inputCharset

UTF-8

Character set used by deserializers that treat the input file as text.

decodeErrorPolicy

FAIL

What to do when we see a non-decodable character in the input file. FAIL: Throw an exception and fail to parse the file. REPLACE: Replace the unparseable character with the “replacement character” char, typically Unicode U+FFFD. IGNORE: Drop the unparseable character sequence.

deserializer

LINE

Specify the deserializer used to parse the file into events. Defaults to parsing each line as an event. The class specified must implementEventDeserializer.Builder.

deserializer.*

Varies per event deserializer.

bufferMaxLines

(Obselete) This option is now ignored.

bufferMaxLineLength

5000

(Deprecated) Maximum length of a line in the commit buffer. Use deserializer.maxLineLength instead.

selector.type

replicating

replicating or multiplexing

selector.*

Depends on the selector.type value

interceptors

Space-separated list of interceptors

interceptors.*

6.3.3配置文件

a1.sources = s1

a1.sources.s1.channels = c1

a1.sources.s1.type = spooldir

a1.sources.s1.spoolDir = /root/weblog

a1.sources.s1.batchSize = 200

a1.channels = c1

a1.channels.c1.type = memory

a1.channels.c1.capacity = 200

a1.channels.c1.transactionCapacity = 100

a1.sinks = k1

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

6.3.4启动测试

bin/flume-ng agent -n a1 -c conf -f myconf/spooldir-mem-logger.conf -Dflume.root.logger=DEBUG,console

注意:spooldir source 与exec source不同,spooldir source本身是可靠的!会记录崩溃之前的采集位置!

6.4 avro source

Avro source 是通过监听一个网络端口来接受数据,而且接受的数据必须是使用avro序列化框架序列化后的数据;

Avro是一种序列化框架,跨语言的;

扩展:什么是序列化,什么是序列化框架?

序列化: 是将一个有复杂结构的数据块(对象)变成扁平的(线性的)二进制序列

序列化框架: 一套现成的软件,可以按照既定策略,将对象转成二进制序列

比如: jdk就有: ObjectOutputStream

       hadoop就有: Writable

       跨平台的序列化框架: avro

6.4.1工作机制

启动一个网络服务,监听一个端口,收集端口上收到的avro序列化数据流!

该source中拥有avro的反序列化器,能够将收到的二进制流进行正确反序列化,并装入一个event写入channel!​​​​​​

6.4.2参数详解

Property Name

Default

Description

channels

type

本source的别名: avro

bind

要绑定的地址

port

要绑定的端口号

threads

服务的最大线程数

selector.type

selector.*

interceptors

Space-separated list of interceptors

interceptors.*

compression-type

none

压缩类型:跟发过来的数据是否压缩要匹配:none | deflate

ssl

false

Set this to true to enable SSL encryption. If SSL is enabled, you must also specify a “keystore” and a “keystore-password”, either through component level parameters (see below) or as global SSL parameters (see SSL/TLS support section).

keystore

This is the path to a Java keystore file. If not specified here, then the global keystore will be used (if defined, otherwise configuration error).

keystore-password

The password for the Java keystore. If not specified here, then the global keystore password will be used (if defined, otherwise configuration error).

keystore-type

JKS

The type of the Java keystore. This can be “JKS” or “PKCS12”. If not specified here, then the global keystore type will be used (if defined, otherwise the default is JKS).

exclude-protocols

SSLv3

Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified.

include-protocols

Space-separated list of SSL/TLS protocols to include. The enabled protocols will be the included protocols without the excluded protocols. If included-protocols is empty, it includes every supported protocols.

exclude-cipher-suites

Space-separated list of cipher suites to exclude.

include-cipher-suites

Space-separated list of cipher suites to include. The enabled cipher suites will be the included cipher suites without the excluded cipher suites. If included-cipher-suites is empty, it includes every supported cipher suites.

ipFilter

false

Set this to true to enable ipFiltering for netty

ipFilterRules

Define N netty ipFilter pattern rules with this config.

6.4.3配置文件

a1.sources = r1

a1.sources.r1.type = avro

a1.sources.r1.channels = c1

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 4141

a1.channels = c1

a1.channels.c1.type = memory

a1.channels.c1.capacity = 200

a1.channels.c1.transactionCapacity = 100

a1.sinks = k1

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

6.4.4启动测试

启动agent:

bin/flume-ng agent -c ./conf -f ./myconf/avro-mem-logger.conf -n a1 -Dflume.root.logger=DEBUG,consol

用一个客户端去给启动好的source发送avro序列化数据:

bin/flume-ng avro-client –host c703 –port 4141

6.4.5启动测试利用avro source和avro sink实现agent级联

6.4.5.1需求说明

Flume知识点全面总结教程-风君子博客

6.4.5.2配置文件

  • 上游配置文件

vi  exec-m-avro.conf

a1.sources = r1

a1.channels = c1

a1.sinks = k1

a1.sources.r1.channels = c1

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /tmp/logs/access.log

a1.sources.r1.batchSize = 100

a1.channels.c1.type = memory

a1.channels.c1.capacity = 10000

a1.channels.c1.trasactionCapacity = 100

a1.sinks.k1.channel = c1

a1.sinks.k1.type = avro

a1.sinks.k1.hostname = h3

a1.sinks.k1.port = 4455

  • 下游配置文件

vi  avro-m-log.conf

a1.sources = r1

a1.channels = c1

a1.sinks = k1

a1.sources.r1.channels = c1

a1.sources.r1.type = avro

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 4455

a1.channels.c1.type = memory

a1.channels.c1.capacity = 10000

a1.channels.c1.trasactionCapacity = 100

a1.sinks.k1.channel = c1

a1.sinks.k1.type = logger

6.4.5.3启动测试

  • 先启动下游:

bin/flume-ng agent -n a1 -c conf/ -f avro-m-log.conf -Dflume.root.logger=INFO,console

  • 再启动上游:

bin/flume-ng agent -n a1 -c conf/ -f exec-m-avro.conf

  • 然后写一个脚本在h1上模拟生成数据

while true

do

echo "hello "  >> /tmp/logs/access.log

sleep 0.1

done

6.5 kafka source

6.5.1​​​​​​​工作机制

Kafka source的工作机制:就是用kafka consumer连接kafka,读取数据,然后转换成event,写入channel

Flume知识点全面总结教程-风君子博客

6.5.2​​​​​​​​​​​​​​参数详解

Property Name

Default

Description

channels

 数据发往的channel

type

本source的名称:

org.apache.flume.source.kafka.KafkaSource

kafka.bootstrap.servers

Kafka broker服务器列表,逗号分隔

kafka.consumer.group.id

flume

Kafka消费者组id

kafka.topics

Kafka消息主题列表,逗号隔开

kafka.topics.regex

用正则表达式来指定一批topic;本参数的优先级高于kafka.topics

batchSize

1000

写入channel的event 批,最大消息条数

batchDurationMillis

1000

批次写入channel的最大时长

backoffSleepIncrement

1000

Kafka Topic 显示为空时触发的初始和增量等待时间。

maxBackoffSleep

5000

Kafka Topic 显示为空时触发的最长等待时间

useFlumeEventFormat

false

默认情况下,event 将从Kafka Topic 直接作为字节直接进入event 主体。设置为true以读取event 作为Flume Avro二进制格式。与Kafka Sink上的相同属性或Kafka Channel上的parseAsFlumeEvent属性一起使用时,这将保留在生成端发送的任何Flume标头。

setTopicHeader

true

是否要往header中加入一个kv:topic信息

topicHeader

topic

应上面开关的需求,加入kv:topic =>topic名称

kafka.consumer.security.protocol

PLAINTEXT

Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security. See below for additional info on secure setup.

more consumer security props

If using SASL_PLAINTEXT, SASL_SSL or SSL refer to Kafka security for additional properties that need to be set on consumer.

Other Kafka Consumer Properties

本source,允许直接配置任意的kafka消费者参数,格式如下:

For example: kafka.consumer.auto.offset.reset

就是在消费者参数前加统一前缀: kafka.consumer.

​​​​​​​​​​​​​​6.5.3配置文件

a1.sources = s1

a1.sources.s1.type = org.apache.flume.source.kafka.KafkaSource

a1.sources.s1.channels = c1

a1.sources.s1.batchSize = 100

a1.sources.s1.batchDurationMillis = 2000

a1.sources.s1.kafka.bootstrap.servers = c701:9092,c702:9092,c703:9092

a1.sources.s1.kafka.topics = TAOGE

a1.sources.s1.kafka.consumer.group.id = g1

a1.channels = c1

a1.channels.c1.type = memory

a1.channels.c1.capacity = 200

a1.channels.c1.transactionCapacity = 100

a1.sinks = k1

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

​​​​​​​​​​​​​​6.5.4启动测试

1. 首先,操作kafka,准备好topic

# 查看当前kafka集群中的topic:

bin/kafka-topics.sh  –list –zookeeper c701:2181

# 创建一个新的topic

bin/kafka-topics.sh  –create –topic TAOGE –partitions 3 –replication-factor 2 –zookeeper c701:2181

# 查看topic的详细信息

bin/kafka-topics.sh –describe –topic TAOGE –zookeeper c701:2181

# 控制台生产者,向topic中写入数据

bin/kafka-console-producer.sh –broker-list c701:9092,c702:9092,c703:9092 –topic TAOGE

2. 启动flume agent来采集kafka中的数据

bin/flume-ng agent -n a1 -c conf/ -f myconf/kfk-mem-logger.conf  -Dflume.root.logger=INFO,console

注意:

Source往channel中写入数据的批次大小  <=  channel的事务控制容量大小

6.6 taildir source

​​​​​​​6.6.1 工作机制

监视指定目录下的一批文件,只要某个文件中有新写入的行,则会被tail到

Flume知识点全面总结教程-风君子博客

它会记录每一个文件所tail到的位置,记录到一个指定的positionfile保存目录中,格式为json(如果需要的时候,可以人为修改,就可以让source从任意指定的位置开始读取数据)

所以,这个source真的像官网所吹的,是可靠的reliable!

它对采集完成的文件,不会做任何修改(比如重命名,删除…..)

taildir source会把读到的数据成功写入channel后,再更新记录偏移量

这种机制,能保证数据不会漏采(丢失),但是有可能会产生数据重复!

​​​​​​​​​​​​​​​​​​​​​6.6.2 参数详解

Property Name

Default

Description

channels

所要写往的channel

type

本source的别名: TAILDIR.

filegroups

空格分割的组名,每一组代表着一批文件

g1 g2

filegroups.<filegroupName>

每个文件组的绝路路径,文件名可用正则表达式

positionFile

~/.flume/taildir_position.json

记录偏移量位置的文件所在路径

headers.<filegroupName>.<headerKey>

Header value which is the set with header key. Multiple headers can be specified for one file group.

byteOffsetHeader

false

Whether to add the byte offset of a tailed line to a header called ‘byteoffset’.

skipToEnd

false

Whether to skip the position to EOF in the case of files not written on the position file.

idleTimeout

120000

关闭非活动文件的时延。如果被关闭的这个文件又在某个时间有了新增行,会被此source检测到,并重新打开

writePosInterval

3000

3s 记录一次偏移量到positionfile

batchSize

100

提交event到channel的批次最大条数

maxBatchCount

Long.MAX_VALUE

控制在一个文件上连续读取的最大批次个数(如果某个文件正在被高速写入,那就应该让这个参数调为最大值,以让source可以集中精力专采这个文件)

backoffSleepIncrement

1000

The increment for time delay before reattempting to poll for new data, when the last attempt did not find any new data.

maxBackoffSleep

5000

The max time delay between each reattempt to poll for new data, when the last attempt did not find any new data.

cachePatternMatching

true

Listing directories and applying the filename regex pattern may be time consuming for directories containing thousands of files. Caching the list of matching files can improve performance. The order in which files are consumed will also be cached. Requires that the file system keeps track of modification times with at least a 1-second granularity.

fileHeader

false

Whether to add a header storing the absolute path filename.

fileHeaderKey

file

Header key to use when appending absolute path filename to event header.

​​​​​​​​​​​​​​6.6.3配置文件

a1.sources = r1

a1.sources.r1.type = TAILDIR

a1.sources.r1.channels = c1

a1.sources.r1.positionFile = /root/flumedata/taildir_position.json

a1.sources.r1.filegroups = f1

a1.sources.r1.filegroups.f1 = /root/weblog/access.log

a1.sources.r1.fileHeader = true

a1.sources.ri.maxBatchCount = 1000

a1.channels = c1

a1.channels.c1.type = memory

a1.channels.c1.capacity = 200

a1.channels.c1.transactionCapacity = 100

a1.sinks = k1

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

​​​​​​​​​​​​​​​​​​​​​​​​​​​​6.6.4启动测试

bin/flume-ng agent -n a1 -c conf/ -f myconf/taildir-mem-logger.conf -Dflume.root.logger=DEBUG,console

经过人为破坏测试,发现, this source还是真正挺reliable的!

不会丢失数据,但在极端情况下可能会产生重复数据!

7Flume常用组件详解Interceptor拦截器

拦截器是什么?

就是工作在source之后,它可以从source获得event,做一个逻辑处理,然后再返回处理之后的event

这样一来,就可以让用户不需要改动source代码的情况下,就可以插入一些数据处理逻辑;

Flume supports chaining of interceptors.

Flume知识点全面总结教程-风君子博客

阅读源码,获取的知识:

拦截器的调用顺序:

SourceRunner

ExecSource

ChannelProcessor

SourceRunner -》 source 的start( )方法 –》读到一批数据,调channelProcessor.processEventBatch(events) –> 调拦截器进行拦截处理  –> 调选择器selector获取要发送的channle –> 提交数据

7.1timestamp 拦截器

​​​​​​​7.1.1作用

向event中,写入一个kv到header里

k名称可配置;v就是当前的时间戳(毫秒)

​​​​​​​7.1.2参数

Property Name

Default

Description

type

本拦截器的名称:timestamp

headerName

timestamp

要插入header的key名

preserveExisting

false

如果header中已存在同名key,是否要覆盖

​​​​​​​​​​​​​​7.1.3配置示例

a1.sources = s1

a1.sources.s1.channels = c1

a1.sources.s1.type = exec

a1.sources.s1.command = tail -F /root/weblog/access.log

a1.sources.s1.batchSize = 100

a1.sources.s1.interceptors = i1

a1.sources.s1.interceptors.i1.type = timestamp

a1.sources.s1.interceptors.i1.preserveExisting = false

a1.channels = c1

a1.channels.c1.type = memory

a1.channels.c1.capacity = 200

a1.channels.c1.transactionCapacity = 100

a1.sinks = k1

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

​​​​​​​​​​​​​​7.1.4​​​​​​​​​​​​​​测试

2019-06-09 10:24:21,884 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO – org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{timestamp=1560047061012} body: 31 30 32 34 34 2E 2E 2E 2E 2E 2E 2E 2E 2E 2E 2E 10244……….. }

7.2static拦截器

​​​​​​​7.2.1作用

让用户往event中添加一个自定义的header  key-value,当然,这个key-value是在配置文件中配死的;

​​​​​​​​​​​​​​​​​​​​​7.2.2参数

Property Name

Default

Description

type

别名: static

preserveExisting

true

是否覆盖同名kv

key

key

你要插入的key名

value

value

你要插入的value

​​​​​​​​​​​​​​​​​​​​​7.2.3配置示例

a1.sources = s1

a1.sources.s1.channels = c1

a1.sources.s1.type = exec

a1.sources.s1.command = tail -F /root/weblog/access.log

a1.sources.s1.batchSize = 100

a1.sources.s1.interceptors = i1 i2 i3

a1.sources.s1.interceptors.i1.type = timestamp

a1.sources.s1.interceptors.i1.preserveExisting = false

a1.sources.s1.interceptors.i2.type = host

a1.sources.s1.interceptors.i2.preserveExisting = false

a1.sources.s1.interceptors.i2.useIP = true

a1.sources.r1.interceptors.i3.type = static

a1.sources.r1.interceptors.i3.key = hero

a1.sources.r1.interceptors.i3.value = TAOGE

a1.channels = c1

a1.channels.c1.type = memory

a1.channels.c1.capacity = 200

a1.channels.c1.transactionCapacity = 100

a1.sinks = k1

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

​​​​​​​​​​​​​​​​​​​​​7.2.4测试

Flume知识点全面总结教程-风君子博客

7.3Host 拦截器

​​​​​​​7.3.1作用

往event的header中插入主机名(ip)信息

​​​​​​​​​​​​​​​​​​​​​7.3.2参数

Property Name

Default

Description

type

本拦截器的别名: host

preserveExisting

false

是否覆盖已存在的hader key-value

useIP

true

插入ip还是主机名

hostHeader

host

要插入header的key名

7.3.3配置示例

a1.sources = s1

a1.sources.s1.channels = c1

a1.sources.s1.type = exec

a1.sources.s1.command = tail -F /root/weblog/access.log

a1.sources.s1.batchSize = 100

a1.sources.s1.interceptors = i1 i2

a1.sources.s1.interceptors.i1.type = timestamp

a1.sources.s1.interceptors.i1.preserveExisting = false

a1.sources.s1.interceptors.i2.type = host

a1.sources.s1.interceptors.i2.preserveExisting = false

a1.sources.s1.interceptors.i2.useIP = true

a1.channels = c1

a1.channels.c1.type = memory

a1.channels.c1.capacity = 200

a1.channels.c1.transactionCapacity = 100

a1.sinks = k1

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

7.3.4测试

Flume知识点全面总结教程-风君子博客

​​​​​​​​​​​​​​7.4 UUID 拦截器

7.4.1作用

生成uuid放入event的header中

​​​​​​​​​​​​​​7.4.2参数

Property Name

Default

Description

type

全名:org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder

headerName

id

Key名称

preserveExisting

true

是否覆盖同名key

prefix

""

Uuid前的前缀

​​​​​​​​​​​​​​7.4.3配置

a1.sources = s1

a1.sources.s1.channels = c1

a1.sources.s1.type = exec

a1.sources.s1.command = tail -F /root/weblog/access.log

a1.sources.s1.batchSize = 100

a1.sources.s1.interceptors = i1 i2 i3 i4

a1.sources.s1.interceptors.i1.type = timestamp

a1.sources.s1.interceptors.i1.preserveExisting = false

a1.sources.s1.interceptors.i2.type = host

a1.sources.s1.interceptors.i2.preserveExisting = false

a1.sources.s1.interceptors.i2.useIP = true

a1.sources.s1.interceptors.i3.type = static

a1.sources.s1.interceptors.i3.key = hero

a1.sources.s1.interceptors.i3.value = TAOGE

a1.sources.s1.interceptors.i4.type = org.apache.flume.sink.solr.morphline.UUIDInterceptor$Builder

a1.sources.s1.interceptors.i4.headName = duanzong

a1.sources.s1.interceptors.i4.prefix =  666_

a1.channels = c1

a1.channels.c1.type = memory

a1.channels.c1.capacity = 200

a1.channels.c1.transactionCapacity = 100

a1.sinks = k1

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

​​​​​​​​​​​​​​​​​​​​​​​​​​​​7.4.4测试

Flume知识点全面总结教程-风君子博客

Flume常用组件详解channel

channel是agent中用来缓存event的repository(池,仓库)

source往channel中添加event

sink从channel中取并移除event

channel跟事务控制有极大关系;

channel 有容量大小、可靠性级别、事务容量等特性;

8.1 memory channel

​​​​​​​​​​​​​​8.1.1特性

事件被存储在实现配置好容量的内存(队列)中。

速度快,但可靠性较低,有可能会丢失数据

​​​​​​​​​​​​​​​​​​​​​​​​​​​​8.1.2参数

Property Name

Default

Description

type

别名: memory

capacity

100

能存储的最大事件event数

transactionCapacity

100

最大事务控制容量

keep-alive

3

添加或移除event的超时时间

byteCapacityBufferPercentage

20

除了body以外的字节所能占用的容量百分比

byteCapacity

see description

channel中最大的总byte数(只计算body)

​​​​​​​​​​​​​​​​​​​​​​​​​​​​8.1.3配置示例

a1.channels = c1

a1.channels.c1.type = memory

a1.channels.c1.capacity = 10000

a1.channels.c1.transactionCapacity = 10000

a1.channels.c1.byteCapacityBufferPercentage = 20

a1.channels.c1.byteCapacity = 800000

​​​​​​​​​​​​​​​​​​​​​​​​​​​​8.1.4测试

​​​​​​​​​​​​​​​​​​​​​​​​​​​​8.1.5扩展了解

Memory channel源码阅读

// lock to guard queue, mainly needed to keep it locked down during resizes// it should never be held through a blocking operation

private Object queueLock = new Object();

//queue为Memory Channel中存放Event的地方,这里用了LinkedBlockingDeque来实现

@GuardedBy(value = "queueLock")p

rivate LinkedBlockingDeque<Event> queue;

//下面的两个信号量用来做同步操作,queueRemaining表示queue中的剩余空间,queueStored表示queue中的使用空间

// invariant that tracks the amount of space remaining in the queue(with all uncommitted takeLists deducted)

// we maintain the remaining permits = queue.remaining – takeList.size()// this allows local threads waiting for space in the queue to commit without denying access to the

// shared lock to threads that would make more space on the queue

private Semaphore queueRemaining;

// used to make "reservations" to grab data from the queue.

// by using this we can block for a while to get data without locking all other threads out

// like we would if we tried to use a blocking call on queue

private Semaphore queueStored;

//下面几个变量为配置文件中Memory Channel的配置项

// 一个事务中Event的最大数目

private volatile Integer transCapacity;

// 向queue中添加、移除Event的等待时间

private volatile int keepAlive;

// queue中,所有Event所能占用的最大空间

private volatile int byteCapacity;

private volatile int lastByteCapacity;

// queue中,所有Event的header所能占用的最大空间占byteCapacity的比例

private volatile int byteCapacityBufferPercentage;

// 用于标示byteCapacity中剩余空间的信号量

private Semaphore bytesRemaining;

// 用于记录Memory Channel的一些指标,后面可以通过配置监控来观察Flume的运行情况

private ChannelCounter channelCounter;

然后重点说下MemoryChannel里面的MemoryTransaction,它是Transaction类的子类,从其文档来看,一个Transaction的使用模式都是类似的:

 Channel ch = ...Transaction tx = ch.getTransaction();try {tx.begin();...// ch.put(event) or ch.take()...tx.commit();} catch (ChannelException ex) {tx.rollback();...} finally {tx.close();}

可以看到一个Transaction主要有、put、take、commit、rollback这四个方法,我们在实现其子类时,主要也是实现着四个方法。

Flume官方为了方便开发者实现自己的Transaction,定义了BasicTransactionSemantics,这时开发者只需要继承这个辅助类,并且实现其相应的、doPut、doTake、doCommit、doRollback方法即可,MemoryChannel就是继承了这个辅助类。

private class MemoryTransaction extends BasicTransactionSemantics {//和MemoryChannel一样,内部使用LinkedBlockingDeque来保存没有commit的Eventprivate LinkedBlockingDeque<Event> takeList;private LinkedBlockingDeque<Event> putList;private final ChannelCounter channelCounter;//下面两个变量用来表示put的Event的大小、take的Event的大小private int putByteCounter = 0;private int takeByteCounter = 0;public MemoryTransaction(int transCapacity, ChannelCounter counter) {//用transCapacity来初始化put、take的队列putList = new LinkedBlockingDeque<Event>(transCapacity);takeList = new LinkedBlockingDeque<Event>(transCapacity);channelCounter = counter;}@Overrideprotected void doPut(Event event) throws InterruptedException {//doPut操作,先判断putList中是否还有剩余空间,有则把Event插入到该队列中,同时更新putByteCounter//没有剩余空间的话,直接报ChannelExceptionchannelCounter.incrementEventPutAttemptCount();int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize);if (!putList.offer(event)) {throw new ChannelException("Put queue for MemoryTransaction of capacity " +putList.size() + " full, consider committing more frequently, " +"increasing capacity or increasing thread count");}putByteCounter += eventByteSize;}@Overrideprotected Event doTake() throws InterruptedException {//doTake操作,首先判断takeList中是否还有剩余空间channelCounter.incrementEventTakeAttemptCount();if(takeList.remainingCapacity() == 0) {throw new ChannelException("Take list for MemoryTransaction, capacity " +takeList.size() + " full, consider committing more frequently, " +"increasing capacity, or increasing thread count");}//然后判断,该MemoryChannel中的queue中是否还有空间,这里通过信号量来判断if(!queueStored.tryAcquire(keepAlive, TimeUnit.SECONDS)) {return null;}Event event;//从MemoryChannel中的queue中取出一个eventsynchronized(queueLock) {event = queue.poll();}Preconditions.checkNotNull(event, "Queue.poll returned NULL despite semaphore " +"signalling existence of entry");//放到takeList中,然后更新takeByteCounter变量takeList.put(event);int eventByteSize = (int)Math.ceil(estimateEventSize(event)/byteCapacitySlotSize);takeByteCounter += eventByteSize;return event;}@Overrideprotected void doCommit() throws InterruptedException {//该对应一个事务的提交//首先判断putList与takeList的相对大小int remainingChange = takeList.size() - putList.size();//如果takeList小,说明向该MemoryChannel放的数据比取的数据要多,所以需要判断该MemoryChannel是否有空间来放if(remainingChange < 0) {// 1. 首先通过信号量来判断是否还有剩余空间if(!bytesRemaining.tryAcquire(putByteCounter, keepAlive,TimeUnit.SECONDS)) {throw new ChannelException("Cannot commit transaction. Byte capacity " +"allocated to store event body " + byteCapacity * byteCapacitySlotSize +"reached. Please increase heap space/byte capacity allocated to " +"the channel as the sinks may not be keeping up with the sources");}// 2. 然后判断,在给定的keepAlive时间内,能否获取到充足的queue空间if(!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) {bytesRemaining.release(putByteCounter);throw new ChannelFullException("Space for commit to queue couldn't be acquired." +" Sinks are likely not keeping up with sources, or the buffer size is too tight");}}int puts = putList.size();int takes = takeList.size();//如果上面的两个判断都过了,那么把putList中的Event放到该MemoryChannel中的queue中。synchronized(queueLock) {if(puts > 0 ) {while(!putList.isEmpty()) {if(!queue.offer(putList.removeFirst())) {throw new RuntimeException("Queue add failed, this shouldn't be able to happen");}}}//清空本次事务中用到的putList与takeList,释放资源putList.clear();takeList.clear();}//更新控制queue大小的信号量bytesRemaining,因为把takeList清空了,所以直接把takeByteCounter加到bytesRemaining中。bytesRemaining.release(takeByteCounter);takeByteCounter = 0;putByteCounter = 0;//因为把putList中的Event放到了MemoryChannel中的queue,所以把puts加到queueStored中去。queueStored.release(puts);//如果takeList比putList大,说明该MemoryChannel中queue的数量应该是减少了,所以把(takeList-putList)的差值加到信号量queueRemainingif(remainingChange > 0) {queueRemaining.release(remainingChange);}if (puts > 0) {channelCounter.addToEventPutSuccessCount(puts);}if (takes > 0) {channelCounter.addToEventTakeSuccessCount(takes);}channelCounter.setChannelSize(queue.size());}@Overrideprotected void doRollback() {//当一个事务失败时,会进行回滚,即调用本方法//首先把takeList中的Event放回到MemoryChannel中的queue中。int takes = takeList.size();synchronized(queueLock) {Preconditions.checkState(queue.remainingCapacity() >= takeList.size(), "Not enough space in memory channel " +"queue to rollback takes. This should never happen, please report");while(!takeList.isEmpty()) {queue.addFirst(takeList.removeLast());}//然后清空putListputList.clear();}//因为清空了putList,所以需要把putList所占用的空间大小添加到bytesRemaining中bytesRemaining.release(putByteCounter);putByteCounter = 0;takeByteCounter = 0;//因为把takeList中的Event回退到queue中去了,所以需要把takeList的大小添加到queueStored中queueStored.release(takes);channelCounter.setChannelSize(queue.size());}}

MemoryChannel的逻辑相对简单,主要是通过MemoryTransaction中的putList、takeList与MemoryChannel中的queue打交道,这里的queue相当于持久化层,只不过放到了内存中,如果是FileChannel的话,会把这个queue放到本地文件中。下面表示了Event在一个使用了MemoryChannel的agent中数据流向:

source —> putList —> queue —> takeList —> sink

还需要注意的一点是,这里的事务可以嵌套使用,如下图:

Flume知识点全面总结教程-风君子博客

 当有两个agent级连时,sink的事务中包含了一个source的事务,这也应证了前面所说的:

在任何时刻,Event至少在一个Channel中是完整有效的

​​​​​​​​​​​​​​​​​​​​​​​​​8.2 file channel

​​​​​​​8.2.1特性

event被缓存在本地磁盘文件中

可靠性高,不会丢失

但在极端情况下可能会重复数据

​​​​​​​​​​​​​​​​​​​​​8.2.3参数

Property Name Default

Description

 

type

别名: file.

checkpointDir

~/.flume/file-channel/checkpoint

Checkpoint信息保存目录

useDualCheckpoints

false

Checkpoint是否双重checkpoint机制

backupCheckpointDir

备份checkpoint的保存目录

dataDirs

~/.flume/file-channel/data

Event数据缓存目录

transactionCapacity

10000

事务管理容量

checkpointInterval

30000

记录checkpoint信息的时间间隔

maxFileSize

2146435071

控制一个数据文件的大小规格

minimumRequiredSpace

524288000

所需的最低磁盘空间,低于则停止接收新数据

capacity

1000000

最大event缓存数

keep-alive

3

等待添加数据的最大时间

​​​​​​​​​​​​​​​​​​​​​8.2.4配置示例

a1.sources = r1

a1.sources.r1.type = TAILDIR

a1.sources.r1.channels = c1

a1.sources.r1.positionFile = /root/taildir_chkp/taildir_position.json

a1.sources.r1.filegroups = f1

a1.sources.r1.filegroups.f1 = /root/weblog/access.log

a1.sources.r1.fileHeader = true

a1.sources.ri.maxBatchCount = 1000

a1.channels = c1

a1.channels.c1.type = file

a1.channels.c1.capacity = 1000000

a1.channels.c1.transactionCapacity = 100

a1.channels.c1.checkpointDir = /root/flume_chkp

a1.channels.c1.dataDirs = /root/flume_data

a1.sinks = k1

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

​​​​​​​​​​​​​​​​​​​​​8.2.5​​​​​​​​​​​​​​测试

在使用taildir source  和  file channel的情况下,经过反复各种人为破坏,发现,没有数据丢失的现象发生;

但是,如果时间点掐的比较好(sink 取了一批数据写出,但还没来得及向channel提交事务),会产生数据重复的现象!

​​​​​​​​​​​​​​​​​​​​​8.3kafka channel

​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​8.3.1特性

agent利用kafka作为channel数据缓存

kafka channel要跟 kafka source、 kafka sink区别开来

kafka channel在应用时,可以没有source |  或者可以没有sink

如果是需要把kafka作为最终采集存储,那么就只要  source + kafka channel

如果是把kafka作为数据源,要将kafka中的数据写往hdfs,那么就只要 kafka channel + hdfs sink

​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​8.3.2参数

Property Name

Default

Description

type

名字: org.apache.flume.channel.kafka.KafkaChannel

kafka.bootstrap.servers

Kafka服务器地址

kafka.topic

flume-channel

所使用的topic

kafka.consumer.group.id

flume

消费者组id

parseAsFlumeEvent

true

跟上、下游匹配,是否需要将数据解析为Flume的Event格式

pollTimeout

500

从kafka取数据的超时时间

defaultPartitionId

默认指派的partitionid

partitionIdHeader

将一个event指派到某个分区时所使用的header 的key

kafka.consumer.auto.offset.reset

latest

初始化读取偏移量的策略

​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​8.3.3配置测试

配置示例1:用exec soure读文件,往kafka channel中写

不带sink!自己用消费者去kafka集群中读取采集到的数据!

a1.sources = r1

# 配置两个channel,为了便于观察

# c1 是kafkachannel ,c2是一个内存channel

a1.channels = c1 c2

a1.sinks = k1

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /root/logs/a.log

a1.sources.r1.channels = c1 c2

# kafka-channel具体配置,该channel没有sink

a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel

a1.channels.c1.kafka.bootstrap.servers = h1:9092,h2:9092,h3:9092

a1.channels.c1.parseAsFlumeEvent = false

# 内存channel 配置,并对接一个logger sink来观察

a1.channels.c2.type = memory

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c2

运行测试:

1.准备好a.log文件

2.启动好kafka

3.启动agent

4.往a.log写入数据

5.用kafka的控制台消费者消费主题,看是否拿到数据

配置示例2:用logger sink,从kafka channel中取数据

a1.sources = r1

a1.sinks = k1

a1.channels = c1

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /root/logs/a.log

a1.sources.r1.channels = c1

a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel

a1.channels.c1.kafka.bootstrap.servers = h1:9092,h2:9092,h3:9092

a1.channels.c1.parseAsFlumeEvent = false

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

运行测试:

2.启动好kafka

3.启动flume agent

4.用kafka的控制台生产者向topic中写入数据

5.在flume agent的控制台上观察是否取到数据

9 Flume常用组件详解sink

sink是从channel中获取、移除数据,并输出到下游(可能是下一级agent,也可能是最终目标存储系统)

9.1hdfs sink

​​​​​​​9.1.1特性

数据被最终发往hdfs

可以生成text文件或 sequence 文件,而且支持压缩;

支持生成文件的周期性roll机制:基于文件size,或者时间间隔,或者event数量;

目标路径,可以使用动态通配符替换,比如用%D代表当前日期;

当然,它也能从event的header中,取到一些标记来作为通配符替换;

header:{type=acb}

/weblog/%{type}/%D/  就会被替换成: /weblog/abc/19-06-09/

​​​​​​​​​​​​​​​​​​​​​​​​​​​​9.1.2参数

Name

Default

Description

channel

从哪个channel取数据

type

别名: hdfs

hdfs.path

目标hdfs存储路径(URI

hdfs.filePrefix

FlumeData

指定生成的文件名前缀

hdfs.fileSuffix

后缀

hdfs.inUsePrefix

正在写入的文件的前缀标识

hdfs.inUseSuffix

.tmp

正在写入的文件的后缀标识

hdfs.rollInterval

30

切换文件的条件:间隔时间;为0则不生效

hdfs.rollSize

134217728

切换文件的条件:文件大小;为0则不生效

hdfs.rollCount

10

切换文件的条件:event条数;为0则不生效

hdfs.idleTimeout

0

不活跃文件的关闭超时时长;0则不自动关闭

hdfs.batchSize

100

从channel中取一批数据的最大大小;

hdfs.codeC

压缩编码: gzip, bzip2, lzo, lzop, snappy

hdfs.fileType

SequenceFile

目标文件格式: SequenceFileDataStream or CompressedStream 

注意:DataStream 不能支持压缩

CompressedStream 必须设置压缩编码

SequenceFile 可压缩可不压缩

hdfs.maxOpenFiles

5000

允许同时最多打开的文件数;如果超出,则会关闭最早打开的

hdfs.minBlockReplicas

目标文件的block副本数

hdfs.writeFormat

Writable

指定sequence file中的对象类型;支持Text和Writable

同时请使用Text,否则后续数据处理平台可能无法解析

hdfs.threadsPoolSize

10

操作HDFS时的线程池大小

hdfs.rollTimerPoolSize

1

检查文件是否需要被roll的线程数

hdfs.kerberosPrincipal

Kerberos user principal for accessing secure HDFS

hdfs.kerberosKeytab

Kerberos keytab for accessing secure HDFS

hdfs.proxyUser

hdfs.round

false

目录通配符切换是是否需要切掉尾数

hdfs.roundValue

10

时间尾数切掉多少

hdfs.roundUnit

minute

时间尾数切掉大小的单位– secondminute or hour.

hdfs.timeZone

Local Time

时间通配符所使用的时区

hdfs.useLocalTimeStamp

false

所用的时间是否要从agent sink本地获取

hdfs.closeTries

0

重命名已完成文件的重试次数;0则一直尝试重命名

hdfs.retryInterval

180

关闭一个文件的重试时间间隔

serializer

TEXT

将channel中的event body解析成什么格式:Text| avro_event ; 也可以使用自定义的序列化器

serializer.*

小提示:什么叫做URI

Flume知识点全面总结教程-风君子博客

​​​​​​​​​​​​​​​​​​​​​​​​​​​​9.1.3配置示例

## 定义

a1.sources = r1

a1.sinks = k1

a1.channels = c1

## source

a1.sources.r1.type = exec

a1.sources.r1.command = tail -F /root/logs/a.log

a1.sources.r1.channels = c1

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type = timestamp

## channel

a1.channels.c1.type = memory

a1.channels.c1.capacity = 100000000

## sink

a1.sinks.k1.channel = c1

a1.sinks.k1.type = hdfs

a1.sinks.k1.hdfs.path = hdfs://h1:8020/doitedu/%Y-%m-%d/%H-%M

a1.sinks.k1.hdfs.round = true

a1.sinks.k1.hdfs.roundValue = 10

a1.sinks.k1.hdfs.roundUnit = minute

a1.sinks.k1.hdfs.filePrefix = doit_

a1.sinks.k1.hdfs.fileSuffix = .log.gz

a1.sinks.k1.hdfs.rollInterval = 0

a1.sinks.k1.hdfs.rollSize = 102400

a1.sinks.k1.hdfs.rollCount = 0

a1.sinks.k1.hdfs.fileType = CompressedStream

a1.sinks.k1.hdfs.codeC = gzip

a1.sinks.k1.hdfs.writeFormat = Text

​​​​​​​​​​​​​​9.1.4测试

1. 启动hdfs

2. 清除以前的taildirsource产生的偏移量记录文件、filechannel缓存的数据目录和checkpoint目录

3. 启动agent

4. 用for循环脚本往日志文件中不断写入新的数据

5. 到hdfs中观察结果

​​​​​​​9.2 kafka sink

有了kafka channel后,  kafka sink的必要性就降低了。因为我们可以用kafka作为channel来接收source产生的数据!

9.2.1特性

9.2.2参数

Property Name

Default

Description

type

名称: org.apache.flume.sink.kafka.KafkaSink

kafka.bootstrap.servers

Kafka服务器列表

kafka.topic

default-flume-topic

Kafka的topic

flumeBatchSize

100

从channel中取event的批次大小

kafka.producer.acks

1

Kafka生产者消息推送应答级别:

1 : Leader接收到即回应

0 :不等回应 

1:副本同步完成,再回应

useFlumeEventFormat

false

是否使用avro序列化

defaultPartitionId

指定所有event默认发往的分区id;不指定则按kafka生产者的分区器,均匀分发

partitionIdHeader

通过在header中指定分区id,来约束这个event发往的分区

allowTopicOverride

true

允许在event的header中指定要写入的topic

topicHeader

topic

如果上一条开关开启,则在header中放入的key的名称

Other Kafka Producer Properties

可以用kafka.producer.xxx来配置任何kafka producer的参数

​​​​​​​​​​​​​​9.2.3配置示例

a1.sources = r1

a1.sources.r1.type = TAILDIR

a1.sources.r1.channels = c1

a1.sources.r1.positionFile = /root/taildir_chkp/taildir_position.json

a1.sources.r1.filegroups = f1

a1.sources.r1.filegroups.f1 = /root/weblog/access.log

a1.sources.r1.fileHeader = true

a1.sources.r1.maxBatchCount = 1000

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type = timestamp

a1.channels = c1

a1.channels.c1.type = file

a1.channels.c1.capacity = 1000000

a1.channels.c1.transactionCapacity = 100

a1.channels.c1.checkpointDir = /root/flume_chkp

a1.channels.c1.dataDirs = /root/flume_data

a1.sinks = k1

a1.sinks.k1.channel = c1

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

a1.sinks.k1.kafka.topic = mytopic

a1.sinks.k1.kafka.bootstrap.servers = c701:9092,c702:9092,c703:9092

a1.sinks.k1.kafka.flumeBatchSize = 20

a1.sinks.k1.kafka.producer.acks = 1

a1.sinks.k1.kafka.producer.linger.ms = 1

a1.sinks.k1.kafka.producer.compression.type = snappy

​​​​​​​​​​​​​​9.2.4测试

1. 清掉 taildir的偏移量记录文件;清掉filechannel的数据缓存和checkpoint记录;

2. 准备一个日志文件,并不断写入数据

3. 启动flume的agent

4. 用kafka的客户端去观察数据结果

9.3 avro sink

9.3.1 特性

avro sink用来向avro source发送avro序列化数据,这样就可以实现agent之间的级联

​​​​​​​​​​​​​​9.3.2参数

Property Name

Default

Description

channel

type

The component type name, needs to be avro.

hostname

目标avro source的主机

port

目标avro source的绑定端口

batch-size

100

number of event to batch together for send.

connect-timeout

20000

连接超时时间

request-timeout

20000

请求超时时间

reset-connection-interval

none

Amount of time (s) before the connection to the next hop is reset. This will force the Avro Sink to reconnect to the next hop. This will allow the sink to connect to hosts behind a hardware load-balancer when news hosts are added without having to restart the agent.

compression-type

none

This can be “none” or “deflate”. The compression-type must match the compression-type of matching AvroSource

compression-level

6

The level of compression to compress event. 0 = no compression and 1-9 is compression. The higher the number the more compression

ssl

false

Set to true to enable SSL for this AvroSink. When configuring SSL, you can optionally set a “truststore”, “truststore-password”, “truststore-type”, and specify whether to “trust-all-certs”.

trust-all-certs

false

If this is set to true, SSL server certificates for remote servers (Avro Sources) will not be checked. This should NOT be used in production because it makes it easier for an attacker to execute a man-in-the-middle attack and “listen in” on the encrypted connection.

truststore

The path to a custom Java truststore file. Flume uses the certificate authority information in this file to determine whether the remote Avro Source’s SSL authentication credentials should be trusted. If not specified, then the global keystore will be used. If the global keystore not specified either, then the default Java JSSE certificate authority files (typically “jssecacerts” or “cacerts” in the Oracle JRE) will be used.

truststore-password

The password for the truststore. If not specified, then the global keystore password will be used (if defined).

truststore-type

JKS

The type of the Java truststore. This can be “JKS” or other supported Java truststore type. If not specified, then the global keystore type will be used (if defined, otherwise the defautl is JKS).

exclude-protocols

SSLv3

Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified.

maxIoWorkers

2 * the number of available processors in the machine

The maximum number of I/O worker threads. This is configured on the NettyAvroRpcClient NioClientSocketChannelFactory.

​​​​​​​​​​​​​​​​​​​​​​​​​​​​9.3.3配置示例

级联配置,需要至少两个flume agent来演示

在C703上,配置avro sink 发送者

## c703 ##

a1.sources = s1

a1.sources.s1.type = exec

a1.sources.s1.command = tail -F /root/weblog/access.log

a1.sources.s1.channels = c1

a1.channels = c1

a1.channels.c1.type = memory

a1.sinks = k1

a1.sinks.k1.type = avro

a1.sinks.k1.channel = c1

a1.sinks.k1.hostname = c701

a1.sinks.k1.port = 4545

在C701上,配置avro source 接收者

## c701 ##

a1.sources = s1

a1.sources.s1.type = avro

a1.sources.s1.hostname = 0.0.0.0

a1.sources.s1.port = 4545

a1.sources.s1.channel = c1

a1.channels = c1

a1.channels.c1.type = memory

a1.sinks = k1

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

​​​​​​​​​​​​​​​​​​​​​​​​​​​​9.3.4启动测试

先在C701上启动接受者avro source(服务)

bin/flume-ng agent -n a1 -c conf/ -f myconf/avro-mem-logger.conf -Dflume.root.logger=INFO,console

再在C703上启动发送者avro sink(客户端)

bin/flume-ng agent -n a1 -c conf/ -f myconf/tail-mem-avro.conf -Dflume.root.logger=INFO,console

10 Flume常用组件详解:Selector

一个source可以对接多个channel

那么,source的数据如何在多个channel之间传递,就由selector来控制

配置应该挂载到source组件上

​​​​​​​10.1实践一:replicating selector复制选择器

replicating selector就是默认的选择器

官网配置参考

Flume知识点全面总结教程-风君子博客

 ​​​​​​​​​​​​​​​​​​​​​​​​​10.1.1目标场景

selector将event复制,分发给所有下游节点

​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​10.1​​​​​​​.2Flume agent配置

# Name the components on this agent  a1.sources = r1  a1.sinks = k1 k2  a1.channels = c1 c2  # http source, with replicating selectora1.sources.r1.type = httpa1.sources.r1.port = 6666a1.sources.r1.bind = mastera1.sources.r1.selector.type = replicating  # Describe the sink  a1.sinks.k1.type = avro  a1.sinks.k1.hostname = slave1  # bind to remote host,RPCa1.sinks.k1.port = 6666a1.sinks.k2.type = avro# bind to remote host,PRCa1.sinks.k2.hostname = slave2a1.sinks.k2.port = 6666# 2 channels in selector testa1.channels.c1.type = memory  a1.channels.c1.capacity = 1000  a1.channels.c1.transactionCapacity = 100  a1.channels.c2.type = memory  a1.channels.c2.capacity = 1000  a1.channels.c2.transactionCapacity = 100  # bind source ,sink to channelsa1.sources.r1.channels = c1 c2a1.sinks.k1.channel = c1  a1.sinks.k2.channel = c2

​​​​​​​​​​​​​​10.1​​​​​​​.3 Collector1 配置

# 01 specify agent,source,sink,channela1.sources = r1a1.sinks = k1a1.channels = c1# 02 avro source,connect to local port 6666a1.sources.r1.type = avroa1.sources.r1.bind = slave1a1.sources.r1.port = 6666# 03 logger sinka1.sinks.k1.type = logger# 04 channel,memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# 05 bind source,sink to channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1

​​​​​​​10.1.4 Collector2 配置

# 01 specify agent,source,sink,channela1.sources = r1a1.sinks = k1a1.channels = c1# 02 avro source,connect to local port 6666a1.sources.r1.type = avroa1.sources.r1.bind = slave2a1.sources.r1.port = 6666# 03 logger sinka1.sinks.k1.type = logger# 04 channel,memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# 05 bind source,sink to channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1

​​​​​​​10.1.5 测试验证

​​​​​​​10.2 实践二:multiplexing selector多路选择器

multiplexing selector可以根据event中的一个指定key的value来决定这条消息会写入哪个channel,具体在选择时,需要配置一个映射关系,比如

a1.sources.r1.selector.mapping.CZ=c1  ; 就意味着header中的value为CZ的话,这条消息就会被写入c1这个channel

multiplexing selector官方配置参考

​​​​​​​10.2.1目标场景

Flume知识点全面总结教程-风君子博客

 ​​​​​​​10.2.2 第一级1 / 2配置

a1.sources = r1a1.channels = c1a1.sinks = k1a1.sources.r1.type = execa1.sources.r1.command = tail -F /root/logs/a.loga1.sources.r1.channels = c1a1.sources.r1.interceptors = i1a1.sources.r1.interceptors.i1.type = statica1.sources.r1.interceptors.i1.key = flag# 第一台value=1,另一台value=2a1.sources.r1.interceptors.i1.value = 1a1.channels.c1.type = memorya1.sinks.k1.type = avroa1.sinks.k1.hostname = h3a1.sinks.k1.port = 44444a1.sinks.k1.channel = c1

​​​​​​​​​​​​​​ ​​​​​​​10.2.3 第二级配置

a1.sources = r1a1.channels = c1 c2a1.sinks = k1 k2# source配置a1.sources.r1.channels = c1 c2a1.sources.r1.type = avroa1.sources.r1.bind = 0.0.0.0a1.sources.r1.port = 44444# source的选择器配置a1.sources.r1.selector.type = multiplexinga1.sources.r1.selector.header = flaga1.sources.r1.selector.default = c2a1.sources.r1.selector.mapping.1 = c1a1.sources.r1.selector.mapping.2 = c2# channle 配置a1.channels.c1 = memorya1.channels.c2 = memory# 两个sink分别对接两个channel的配置a1.sinks.k1.type = loggera1.sinks.k1.channel = c1a1.sinks.k2.type = loggera1.sinks.k2.channel = c2

​​​​​​​​​​​​​​​​​​​​​​​​​​​​ ​​​​​​​10.2.4 测试验证

11 Flume常用组件详解:grouping processor

一个agent中,多个sink可以被组装到一个组,而数据在组内多个sink之间发送,有两种模式:

模式1: Failover Sink Processor  失败切换

一组中只有优先级高的那个sink在工作,另一个是等待中

如果高优先级的sink发送数据失败,则专用低优先级的sink去工作!并且,在配置时间penalty之后,还会尝试用高优先级的去发送数据!

a1.sinkgroups = g1

a1.sinkgroups.g1.sinks = k1 k2

a1.sinkgroups.g1.processor.type = failover

## 对两个sink分配不同的优先级

a1.sinkgroups.g1.processor.priority.k1 = 200

a1.sinkgroups.g1.processor.priority.k2 = 100

## 主sink失败后,停用惩罚时间

a1.sinkgroups.g1.processor.maxpenalty = 5000

模式2: Load balancing Sink Processor  负载均衡

允许channel中的数据在一组sink中的多个sink之间进行轮转,策略有:

round-robin(轮着发)

random(随机挑)

a1.sinkgroups = g1

a1.sinkgroups.g1.sinks = k1 k2

a1.sinkgroups.g1.processor.type = load_balance

a1.sinkgroups.g1.processor.backoff = true

a1.sinkgroups.g1.processor.selector = random

12Flume自定义扩展组件

12.1自定义Source

​​​​​​​​​​​​​​12.1.1需求场景

什么情况下需要自定义source:

一般是某种数据源,用flume内置的source组件无法解析,比如XML文档

而本教程中的例子:实现文本日志的采集,并能记住偏移量!

​​​​​​​​​​​​​​​​​​​​​​​​​​​​12.1.2实现思路

首先,找到自定义source所要实现或继承的父类/接口

然后,重写方法(插入自己的需求逻辑)

然后,将代码打成jar包,传入flume的lib目录

最后,写配置文件调用自定义的source

​​​​​​​​​​​​​​12.1.3代码架构

source是由 SourceRunner—》EventDrivenSourceRunner来调用

sourceRunner 拿到 source实例对象

然后调 source的start方法

source读数据,然后将数据转成event

然后将event传给channelprocessor.processEvent(event)

processEvent中第一个动作就是调用拦截器拦截这个event,然后再往channel中写入

Flume知识点全面总结教程-风君子博客

  ​​​12.1.4 具体实现

  • 线程池实现版:

package cn.doitedu.flume.custom;

import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDrivenSource;
import org.apache.flume.SystemClock;
import org.apache.flume.channel.ChannelProcessor;
import org.apache.flume.conf.Configurable;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.source.AbstractSource;
import org.apache.flume.source.ExecSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 能够记录读取位置偏移量的自定义source
 */
public class HoldOffesetSource extends AbstractSource implements EventDrivenSource, Configurable {
    private static final Logger logger = LoggerFactory.getLogger(HoldOffesetSource.class);

    private String positionfilepath;
    private String logfile;
    private int batchsize;

    private ExecutorService exec;

    /**
     * 框架调用本方法,开始采集数据
     * 自定义代码去读取数据,转为event
     * 用getChannelProcessor()方法(定义在父类中)去获取框架的channelprocessor(channel处理器)
     * 调用这个channelprocessor将event提交给channel
     */
    @Override
    public synchronized void start() {

        super.start();
        // 用于向channel提交数据的一个处理器
        ChannelProcessor channelProcessor = getChannelProcessor();

        // 获取历史偏移量
        long offset = 0;
        try {
            File positionfile = new File(this.positionfilepath);
            String s = FileUtils.readFileToString(positionfile);
            offset = Long.parseLong(s);
        } catch (IOException e) {
            e.printStackTrace();
        }

        // 构造一个线程池
        exec = Executors.newSingleThreadExecutor();
        // 向线程池提交数据采集任务
        exec.execute(new HoldOffsetRunnable(offset, logfile, channelProcessor, batchsize, positionfilepath));

    }

    /**
     * 停止前要调用的方法
     * 可以在这里做一些资源关闭清理工作
     */
    @Override
    public synchronized void stop() {
        super.stop();

       try{
           exec.shutdown();
       }catch (Exception e){
           exec.shutdown();
       }
    }

    /**
     * 获取配置文件中的参数,来配置本source实例
     * <p>
     * 要哪些参数:
     * 偏移量记录文件所在路径
     * 要采集的文件所在路径
     *
     * @param context
     */
    public void configure(Context context) {

        // 这是我们source用来记录偏移量的文件路径
        this.positionfilepath = context.getString("positionfile", "./");

        // 这是我们source要采集的日志文件的路径
        this.logfile = context.getString("logfile");

        // 这是用户配置的采集事务批次最大值
        this.batchsize = context.getInteger("batchsize", 100);

        // 如果日志文件路径没有指定,则抛异常
        if (StringUtils.isBlank(logfile)) throw new RuntimeException("请配置需要采集的文件路径");

    }

    /**
     * 采集文件的具体工作线程任务类
     */
    private static class HoldOffsetRunnable implements Runnable {

        long offset;
        String logfilepath;
        String positionfilepath;
        ChannelProcessor channelProcessor;  // channel提交器 (里面会调拦截器,会开启写入channel的事务)
        int batchsize; // 批次大小
        List<Event> events = new ArrayList<Event>();  // 用来保存一批事件
        SystemClock systemClock = new SystemClock();

        public HoldOffsetRunnable(long offset, String logfilepath, ChannelProcessor channelProcessor, int batchsize, String positionfilepath) {
            this.offset = offset;
            this.logfilepath = logfilepath;
            this.channelProcessor = channelProcessor;
            this.batchsize = batchsize;
            this.positionfilepath = positionfilepath;
        }

        public void run() {

            try {
                // 先定位到指定的offset
                RandomAccessFile raf = new RandomAccessFile(logfilepath, "r");
                raf.seek(offset);

                // 循环读数据
                String line = null;

                // 记录上一批提交的时间
                long lastBatchTime = System.currentTimeMillis();
                while (true) {
                    line = raf.readLine();
                    if(line == null ){
                        Thread.sleep(2000);
                        continue;
                    }

                    // 将数据转成event
                    Event event = EventBuilder.withBody(line.getBytes());
                    // 装入list batch
                    synchronized (HoldOffesetSource.class) {
                        events.add(event);
                    }

                    // 判断批次大小是否满 或者 时间到了没有
                    if (events.size() >= batchsize || timeout(lastBatchTime)) {
                        // 满足,则提交
                        channelProcessor.processEventBatch(events);

                        // 记录提交时间
                        lastBatchTime = systemClock.currentTimeMillis();

                        // 记录偏移量
                        long offset = raf.getFilePointer();
                        FileUtils.writeStringToFile(new File(positionfilepath), offset + "");

                        // 清空本批event
                        events.clear();

                    }

                    // 不满足,继续读
                }
            } catch (FileNotFoundException e) {
                logger.error("要采集的文件不存在");
            } catch (IOException e) {
                logger.error("我也不知道怎么搞的,不好意思,我罢工了");
            } catch (InterruptedException e) {
                logger.error("线程休眠出问题了");
            }
        }

        // 判断是否批次间隔超时
        private boolean timeout(long lastBatchTime) {
            return systemClock.currentTimeMillis() – lastBatchTime > 2000;
        }
    }
}

  • 单线程实现版

12.1.5 启动测试

代码打成jar包,上传flume的lib

然后写agent配置文件

a1.sources = s1

a1.channels = c1

a1.sinks = k1

# source 配置

a1.sources.s1.type = cn.doitedu.flume.custom.HoldOffesetSource

a1.sources.s1.position_file_path= /root/myposition

a1.sources.s1.data_file_path = /root/weblog/access.log

a1.sources.s1.batchSize = 100

a1.sources.s1.batchTime = 5000

a1.sources.s1.channels = c1

a1.channels.c1.type = memory

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

然后启动flume-agent即可

12.2 自定义拦截器

12.2.1需求场景

公司的点击流日志数据在一个目录中不断生成: /var/log/click_streaming.log

日志文件会随着文件的大小达到一定阈值(128M)而被重命名,比如:click_streaming.log.1,并且会生成一个新的click_streaming.log文件继续写入日志流;

日志文件中数据格式如下:

13888776677,click,/baoming,张飞,湖北省

13889976655,click,/xuexi,关羽,山西省

……..

现在需要用flume去日志服务器上采集数据写入HDFS,并且要求,对数据中的手机号、姓名字段进行加密(MD5加密,并将加密结果变成BASE64编码);

​​​​​​​​​​​​​​12.2.2实现思路

采集的目标目录中,会不断生成新文件

那么,我们的source组件可以选择  taildir:

1.可以监控到新文件  

2.可以记录采集的偏移量

channel,为了保证可靠性,可以选择  filechannel :

1.会在磁盘上缓存event  

2.会在磁盘上记录事务状态

目标存储是HDFS,sink自然是选择hdfs sink;

加密需求的解决:

如果从source上解决,那只能修改 taildir组件的源码;

如果从sink上解决,那只能修改hdfs sink组件的源码;

上述两种,都需要修改源码,不是最佳选择!

最佳选择:通过拦截器来实现对数据的加工!而flume中没有现成的内置拦截器可以实现字段加密,我们可以自定义自己的拦截器;

​​​​​​​​​​​​​​12.2.3 自定义拦截器的开发

​​​​​​​​​​​​​​​​​​​​​​​​​​​​12.2.3.1基本套路

框架中,自定义扩展接口的套路:

1. 要实现或者继承框架中提供的接口或父类,实现、重写其中的方法

2. 写好的代码要打成jar包,并放入flume的lib目录

3. 要将自定义的类,写入相关agent配置文件

​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​12.2.3.2拦截器设计

应对本场景使用自定义拦截器,还要考虑几个参数的问题:

用户要加密的字段,可能会变化,代码的可配置性需要匹配

1. 可以在配置文件中设计一个参数,来指定要加密的字段:  

indices (要加密的字段索引)

以及索引的切割符idxSplitBy

–》 比如:  a1.sources.interceptors.i1.indices = 0:3

a1.sources.interceptors.i1.idxSplitBy = :

2.为了能够正确切分数据中的字段,还需要一个参数:字段的分隔符dataSplitBy  

–》 比如: a1.sources.interceptors.i1.dataSplitBy= ,

​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​12.2.3.3flume中的拦截器接口规范

首先,引入flume的开发依赖

<dependency>
    <groupId>org.apache.flume</groupId>
    <artifactId>flume-ng-core</artifactId>
    <version>1.9.0</version>
    <scope>provided</scope>
</dependency>

自定义拦截器的工作机制,其实很简单:

flume先调自定义拦截器中的一个内部Builder类的config()方法进行参数配置

flume再调Builder类的build()方法获取自定义拦截器的实例对象(可以在构造过程中传递参数)

flume再反复调用拦截器对象的intercept(List<Event> events)方法来修改event

​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​12.2.3.4 代码实现

package cn.doitedu.flume.custom;

import org.apache.commons.codec.binary.Base64;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.event.EventBuilder;
import org.apache.flume.interceptor.Interceptor;

import java.util.ArrayList;
import java.util.List;

public class EncryptInterceptor  implements Interceptor {
    // 要加密的字段索引s
    String indices;
    // 索引之间的分隔符
    String idxSplitBy;
    // 数据体字段之间的分隔符
    String dataSplitBy;

    /**
     * 构造方法
     * @param indices
     * @param idxSplitBy
     * @param dataSplitBy
     */
    public EncryptInterceptor(String indices, String idxSplitBy, String dataSplitBy) {
        // 0,3
        this.indices = indices;
        this.idxSplitBy = idxSplitBy;
        this.dataSplitBy = dataSplitBy;
    }

    // 这个方法会被框架调用一次,用来做一些初始化工作
    public void initialize() {

    }

    // 拦截方法对一个event进行处理
    public Event intercept(Event event) {

        byte[] body = event.getBody();
        String dataStr = new String(body);

        // 数据的字段数组
        String[] dataFieldsArr = dataStr.split(dataSplitBy);

        // 需要加密的索引的数组
        String[] idxArr = indices.split(idxSplitBy);

        for (String s : idxArr) {

            int index = Integer.parseInt(s);
            // 取出要加密的字段的内容
            String field = dataFieldsArr[index];
            // MD5加密这个字段
            String encryptedField = DigestUtils.md5Hex(field);
            // BASE64编码
            byte[] bytes = Base64.decodeBase64(encryptedField);
            // 替换掉原来的未加密内容
            dataFieldsArr[index] = new String(bytes);
        }

        // 将加密过的字段重新拼接成一条数据,并使用原来的分隔符
        StringBuilder sb = new StringBuilder();
        for (String field : dataFieldsArr) {
            sb.append(field).append(dataSplitBy);
        }

        sb.deleteCharAt(sb.lastIndexOf(dataSplitBy));

        // 返回加密后的字段所封装的event对象
        return EventBuilder.withBody(sb.toString().getBytes());
    }

    // 拦截方法对一批event进行处理
    public List<Event> intercept(List<Event> events) {

        ArrayList<Event> lst = new ArrayList<Event>();

        for (Event event : events) {
            Event eventEncrpt = intercept(event);
            lst.add(eventEncrpt);
        }

        return lst;
    }

    // agent退出前,会调一次该方法,进行需要的清理、关闭操作
    public void close() {

    }

    /**
     * 拦截器的构造器
     */
    public staticc lass EncryptInterceptorBuilder implements Interceptor.Builder{
        // 要加密的字段索引s
        String indices;
        // 索引之间的分隔符
        String idxSplitBy;
        // 数据体字段之间的分隔符
        String dataSplitBy;

        // 构造一个拦截器实例
        public Interceptor build() {

            return new EncryptInterceptor(indices,idxSplitBy,dataSplitBy);
        }

        // 获取配置文件中的拦截器参数
        public void configure(Context context) {
            // 要加密的字段索引s
            this.indices = context.getString(Constants.INDICES);
            // 索引之间的分隔符
            this.idxSplitBy = context.getString(Constants.IDX_SPLIT_BY);
            // 数据体字段之间的分隔符
            this.dataSplitBy = context.getString(Constants.DATA_SPLIT_BY);

        }
    }

    public static class Constants {
        public static final String INDICES = "indices";
        public static final String IDX_SPLIT_BY = "idxSplitBy";
        public static final String DATA_SPLIT_BY = "dataSplitBy";
    }
}

​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​​12.2.3.5 运行测试

1. 先将代码打成jar包

2. 上传到flume安装节点上,并放入flume的lib目录

3. 写采集方案配置文件

a1.sources = s1

a1.sources.s1.channels = c1

a1.sources.s1.type = exec

a1.sources.s1.command = tail -F /root/weblog/access.log

a1.sources.s1.batchSize = 100

a1.sources.s1.interceptors = i1

a1.sources.s1.interceptors.i1.type = cn.doitedu.flume.custom.EncryptInterceptor$EncryptInterceptorBuilder

a1.sources.s1.interceptors.i1.indices = 0:4

a1.sources.s1.interceptors.i1.idxSplitBy = :

a1.sources.s1.interceptors.i1.dataSplitBy = ,

a1.channels = c1

a1.channels.c1.type = memory

a1.channels.c1.capacity = 200

a1.channels.c1.transactionCapacity = 100

a1.sinks = k1

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

# 写到kafka就用这一段sink配置

# a1.sinks = k1

# a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

# a1.sinks.k1.kafka.bootstrap.servers = h1:9092,h2:9092,h3:9092

# a1.sinks.k1.channel = c1

4. 准备数据日志文件,与你的处理逻辑相符,如下所示

13888776677,click,/baoming,张飞,湖北省

13889976655,click,/xuexi,关羽,山西省

…….

5.运行

bin/flume-ng agent -n a1 -c conf/ -f agentconf/spooldir-myi-m-log.conf -Dflume.root.logger=INFO,console

13 综合案例

​​​​​​​13.1 案例场景

A、B等日志服务机器实时生产日志,日志分为多种类型:

log1/access.log

log2/nginx.log

log3/web.log

现在要求:

 把日志服务器中的各类日志采集汇总到一个中转agent上,然后分类写入hdfs中。

但是在hdfs中要求的目录为:

/source/logs/access/20160101/**

/source/logs/nginx/20160101/**

/source/logs/web/20160101/**

并要求可以按指定的索引,将对应字段内容加密!

​​​​​​​13.2 实现思路

1. 每台日志服务器上部署一个flume agent – – -> level1,每个agent配置3个source对应3类数据

2. leve1_1级的agent在采集数据时,添加一个header,指定数据的类别

3. level_1级的agent要配置两个avro sink,各自对接一个下级的agent

4. level_1还要配置sink processoràfail over  sink processor,控制两个sink中只有一个avro sink在工作,如果失败再切换到另一个avro sink

5.level_1还要配置字段加密拦截器

6. level_2 级配置两个flume agent,使用avro source接收数据

7. level_2 级的hdfs sink,目录配置使用动态通配符,取到event中的类别header,以便于将不同类别数据写入不同hdfs 目录!

​​​​​​​​​​​​​​13.4 配置文件

level_1级配置文件

a1.sources = r1 r2 r3

## source 配置

a1.sources.r1.type = TAILDIR

a1.sources.r1.channels = c1

a1.sources.r1.positionFile = /root/chekp1/taildir_position.json

a1.sources.r1.filegroups = f1

a1.sources.r1.filegroups.f1 = /root/weblog/log1/access.log

a1.sources.r1.maxBatchCount = 1000

a1.sources.r1.interceptors = i1 i2 i3

a1.sources.r1.interceptors.i1.type = timestamp

a1.sources.r1.interceptors.i2.type = cn.doitedu.flume.custom.EncryptInterceptor$EncryptInterceptorBuilder

a1.sources.r1.interceptors.i2.indices = 0:4

a1.sources.r1.interceptors.i2.idxSplitBy = :

a1.sources.r1.interceptors.i2.dataSplitBy = ,

a1.sources.r1.interceptors.i3.type = static

a1.sources.r1.interceptors.i3.key = logtype

a1.sources.r1.interceptors.i3.value = access

a1.sources.r2.type = TAILDIR

a1.sources.r2.channels = c1

a1.sources.r2.positionFile = /root/chekp2/taildir_position.json

a1.sources.r2.filegroups = f1

a1.sources.r2.filegroups.f1 = /root/weblog/log2/nginx.log

a1.sources.r2.maxBatchCount = 1000

a1.sources.r2.interceptors = i1 i2 i3

a1.sources.r2.interceptors.i1.type = timestamp

a1.sources.r2.interceptors.i2.type = cn.doitedu.flume.custom.EncryptInterceptor$EncryptInterceptorBuilder

a1.sources.r2.interceptors.i2.indices = 0:4

a1.sources.r2.interceptors.i2.idxSplitBy = :

a1.sources.r2.interceptors.i2.dataSplitBy = ,

a1.sources.r2.interceptors.i3.type = static

a1.sources.r2.interceptors.i3.key = logtype

a1.sources.r2.interceptors.i3.value = nginx

a1.sources.r3.type = TAILDIR

a1.sources.r3.channels = c1

a1.sources.r3.positionFile = /root/chekp3/taildir_position.json

a1.sources.r3.filegroups = f1

a1.sources.r3.filegroups.f1 = /root/weblog/log3/weblog.log

a1.sources.r3.maxBatchCount = 1000

a1.sources.r3.interceptors = i1 i2 i3

a1.sources.r3.interceptors.i1.type = timestamp

a1.sources.r3.interceptors.i2.type = cn.doitedu.flume.custom.EncryptInterceptor$EncryptInterceptorBuilder

a1.sources.r3.interceptors.i2.indices = 0:4

a1.sources.r3.interceptors.i2.idxSplitBy = :

a1.sources.r3.interceptors.i2.dataSplitBy = ,

a1.sources.r3.interceptors.i3.type = static

a1.sources.r3.interceptors.i3.key = logtype

a1.sources.r3.interceptors.i3.value = weblog

## channel 配置

a1.channels = c1

a1.channels.c1.type = file

a1.channels.c1.capacity = 1000000

a1.channels.c1.transactionCapacity = 100

a1.channels.c1.checkpointDir = /root/channel_chkp

a1.channels.c1.dataDirs = /root/channel_data

## sink 配置

a1.sinks = k1 k2

a1.sinks.k1.type = avro

a1.sinks.k1.channel = c1

a1.sinks.k1.hostname = c704

a1.sinks.k1.port = 4545

a1.sinks.k2.type = avro

a1.sinks.k2.channel = c1

a1.sinks.k2.hostname = c705

a1.sinks.k2.port = 4545

## sink processor – fail over 失败配置

a1.sinkgroups = g1

a1.sinkgroups.g1.sinks = k1 k2

a1.sinkgroups.g1.processor.type = failover

## 对两个sink分配不同的优先级

a1.sinkgroups.g1.processor.priority.k1 = 200

a1.sinkgroups.g1.processor.priority.k2 = 100

## 主sink失败后,停用惩罚时间

a1.sinkgroups.g1.processor.maxpenalty = 5000

level_2配置

a1.sources = s1

a1.channels = c1

a1.sinks = k1

## source 配置

a1.sources.s1.type = avro

a1.sources.s1.bind= 0.0.0.0

a1.sources.s1.port = 4545

a1.sources.s1.channels = c1

a1.sources.s1.interceptors = i1

a1.sources.s1.interceptors.i1.type = timestamp

## channel 配置

a1.channels.c1.type = file

a1.channels.c1.capacity = 1000000

a1.channels.c1.transactionCapacity = 100

a1.channels.c1.checkpointDir = /root/lev2_channel_chkp

a1.channels.c1.dataDirs = /root/lev2_channel_data

## sink配置

a1.sinks.k1.type = hdfs

a1.sinks.k1.channel = c1

a1.sinks.k1.hdfs.path = hdfs://c701:8020/doitedu/%{logtype}/%Y-%m-%d/%H/

a1.sinks.k1.hdfs.filePrefix = doitedu-

a1.sinks.k1.hdfs.fileSuffix = .log.gz

a1.sinks.k1.hdfs.rollInterval = 0

a1.sinks.k1.hdfs.rollSize = 134217728

a1.sinks.k1.hdfs.rollCount = 0

## 配置压缩

a1.sinks.k1.hdfs.fileType = CompressedStream

a1.sinks.k1.hdfs.codeC = gzip

## 数据格式

a1.sinks.k1.hdfs.serializer = TEXT

​​​​​​​13.5 启动测试

1. 先把自定义拦截器代码jar包放入level_1级(C701/C702/C703)的所有flume的lib目录中;

2. 将各台机器上之前的一些checkpoint、缓存等目录清除;

2. 启动level_2的两个agent(C704、C705上);

3. 在level_1的所有机器上,创建日志数据目录,并写脚本模拟往3类日志中写入日志:

4.在level_1的所有机器上启动level_1的flume agent

5.到hdfs上观察结果

6.尝试kill掉2级的c704 的 agent,看是否能够故障切换

14 面试加强

​​​​​​​14.1 flume事务机制

Flume知识点全面总结教程-风君子博客

二、Delivery 保证

认识 Flume 对事件投递的可靠性保证是非常重要的,它往往是我们是否使用 Flume 来解决问题的决定因素之一。

消息投递的可靠保证有三种:

  1. At-least-once
  2. At-most-once
  3. Exactly-once

基本上所有工具的使用用户都希望工具框架能保证消息 Exactly-once ,这样就不必在设计实现上考虑消息的丢失或者重复的处理场景。但是事实上很少有工具和框架能做到这一点,真正能做到这一点所付出的成本往往很大,或者带来的额外影响反而让你觉得不值得。假设 Flume 真的做到了 Exactly-once ,那势必降低了稳定性和吞吐量,所以 Flume 选择的策略是 At-least-once 。

当然这里的 At-least-once 需要加上引号,并不是说用上 Flume 的随便哪个组件组成一个实例,运行过程中就能保存消息不会丢失。事实上 At-least-once 原则只是说的是 Source 、 Channel 和 Sink 三者之间上下投递消息的保证。而当你选择 MemoryChannel 时,实例如果异常挂了再重启,在 channel 中的未被 sink 所消费的残留数据也就丢失了,从而没办法保证整条链路的 At-least-once。

Flume 的 At-least-once 保证的实现基础是建立了自身的 Transaction 机制。Flume 的 Transaction 有4个生命周期函数,分别是 start、 commit、rollback 和 close。

当 Source 往 Channel 批量投递事件时首先调用 start 开启事务,批量

put 完事件后通过 commit 来提交事务,如果 commit 异常则 rollback ,然后 close 事务,最后 Source 将刚才提交的一批消息事件向源服务 ack(比如 kafka 提交新的 offset )。Sink 消费 Channel 也是相同的模式,唯一的区别就是 Sink 需要在向目标源完成写入之后才对事务进行 commit。两个组件的相同做法都是只有向下游成功投递了消息才会向上游 ack,从而保证了数据能 At-least-once 向下投递。

​​​​​​​14.2flume agent内部机制

Flume知识点全面总结教程-风君子博客

组件:

1、ChannelSelector

ChannelSelector 的作用就是选出 Event 将要被发往哪个 Channel。其共有两种类型,分别是 Replicating(复制)和 Multiplexing(多路复用)。 ReplicatingSelector 会将同一个 Event 发往所有的 Channel,Multiplexing 会根据相应的原则,将不同的 Event 发往不同的 Channel。

2、SinkProcessor

(1) SinkProcessor 共 有 三 种 类 型 , 分 别 是 DefaultSinkProcessor 、

LoadBalancingSinkProcessor 和 FailoverSinkProcessor。

(2) DefaultSinkProcessor 对应的是单个的 Sink,

LoadBalancingSinkProcessor 和 FailoverSinkProcessor 对应的是 Sink Group。

(3) LoadBalancingSinkProcessor 可以实现负载均衡的功能,FailoverSinkProcessor 可以实现故障转移的功能。

​​​​​​​14.3 ganglia及flume监控

开启内置监控功能

-Dflume.monitoring.type=http -Dflume.monitoring.port=34545

将监控数据发往ganglia进行展现

-Dflume.monitoring.type=ganglia -Dflume.monitoring.port=34890

​​​​​​​14.4 Flume调优

flume-ng agent包括source、channel、sink三个部分,这三部分都运行在JVM上,而JVM运行在linux操作系统之上。因此,对于flume的性能调优,就是对这三部分及影响因素调优。

1、source的配置

该项目中采用的是 taildir source,他的读取速度能够跟上命令行写入日志的速度,故并未做特殊的处理。

2、channel的配置

可选的channel配置一般有两种,一是memory channel,二是file channel。

建议在内存足够的情况下,优先选择memory channel。

尝试过相同配置下使用file channel和memory channel,file channel明显速度较慢,并且会生成log的文件,应该是用作缓存,当source已经接收但是还未写入sink时的event都会存在这个文件中。这样的好处是保证数据不会丢失,所以当对数据的丢失情况非常敏感且对实时性没有太大要求的时候,还是使用file memory吧。。

一开始的memory channel配置用的是默认的,然后控制台报出了如下警告:

The channel is full or unexpected failure. The source will try again after 1000 ms

这个是因为当前被采集的文件过大,可以通过增大keep-alive的值解决。深层的原因是文件采集的速度和sink的速度没有匹配好。

所以memory channel有三个比较重要的参数需要配置:

#channel中最多缓存多少

a1.channels.c1.capacity = 5000

#channel一次最多吐给sink多少

a1.channels.c1.transactionCapacity = 2000

#event的活跃时间

a1.channels.c1.keep-alive = 10

3、sink的配置

可以通过压缩来节省空间和网络流量,但是会增加cpu的消耗。

batch:size越大性能越好,但是太大会影响时效性,一般batch size和源数据端的大小相同。

4、java内存的配置

export JAVA_OPTS="-Xms512m -Xmx2048m -Dcom.sun.management.jmxremote"

主要涉及Xms和Xmx两个参数,可以根据实际的服务器的内存大小进行设计。

5、OS内核参数的配置

如果单台服务器启动的flume agent过多的话,默认的内核参数设置偏小,需要调整。(待补充,暂时还未涉及)。