【Flume】TailDirSource源码理解
Flume是一个分布式的可靠、高可用的日志收集和聚合引擎,其核心组件之一就是Source(数据源)组件。而TailDirSource是Flume自带的一个Source组件,其主要作用是实时监控指定目录下的日志文件,读取新增的日志行,将其封装为事件并发送给下一个拦截器或者Sink组件。本文将对TailDirSource组件的源码进行分析和理解。
1、核心逻辑分析
TailDirSource组件的核心逻辑主要包括以下步骤:
1). 首先读取指定目录下的所有日志文件路径。
2). 对每一个日志文件创建一个TailFile对象,该对象会在后台启动一个线程来不断读取指定的文件,将新增的日志行封装成事件并发送给下一个处理节点。
3). TailFile类继承自Runnable接口,当run()方法被调用时,会执行如下逻辑:
a. 根据上一次读取时记录的当前文件末尾指针位置,从当前位置开始读取文件内容。
b. 依次读取每一行内容,根据用户定义的正则表达式(regex)判断该行是否为合法的日志行,如果符合要求,则封装成Flume事件对象并发送给下一个处理节点;否则忽略该行。
c. 如果当前文件读取完毕,判断该文件是否被删除或移动,若是,则关闭当前文件的流,停止该文件的读取线程。
d. 如果当前文件被重命名,则重新打开该文件的读取流,从头开始读取文件内容。
4). 当监控的目录下出现新的日志文件时,会根据用户定义的文件匹配规则识别该文件,创建一个新的TailFile对象并启动一个新的读取线程。
2、源码分析与说明
2.1 TaildirSource的配置文件
TaildirSource源码解析需要先看一下TaildirSource的配置文件的内容,文件内容如下:
a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /log_pos/taildir_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /data/logs/app.log.*
a1.sources.r1.filegroups.f2 = /data/logs/*.log
a1.sources.r1.headers.f1.headerKey1 = headerValue1
a1.sources.r1.fileHeader = true
a1.sources.r1.batchSize = 1000
a1.sources.r1.skipToEnd = false
a1.sources.r1.skipToEndRegex = ^$
a1.sources.r1.recursiveDirectorySearch = false
a1.sources.r1.fileHeaderKey = file
a1.sources.r1.basenameHeader = true
a1.sources.r1.basenameHeaderKey = basename
a1.sources.r1.quoteChar = \"
a1.sources.r1.ignorePattern = ^\s*$
a1.sources.r1.deletePolicy = never
a1.sources.r1.deserializer = org.apache.flume.sink.solr.morphline.BlobDeserializer
a1.sources.r1.deserializer.maxLineLength = 1024
a1.sources.r1.selector.type = replicating
configuration中的参数含义:
Parameter Description
type 赋值为TAILDIR,表示这个agent节点的source类型就是TAILDIR
positionFile 状态存储文件,文件保存了每个文件的一个标识状态,这个标识可以是偏移量,也可以记录inode号或者其他文件 标识
filegroups 包含多个filegroup配置,每个filegroup关联一个或多个日志文件
filegroups.(#).XXX 每个filegroup都对应一个以f开头的一个 标识,XXX又有若干个参数,表示关联的日志文件路径,支持ant path格式,即?表示一个字符,*表示0或多个字符,**表示0或多个目录
headers.(#).headerKey 表示该filegroup的 Flume events头信息变量
fileHeader 如果设为true,则每个event都会添加头信息,头信息的键为“file”,值为文件名
batchSize 每批次发送的事件数量
skipToEnd 如果设为true,则不处理已经存在的文件,而是跳到文件最后并从那里开始读取,一般用在传输非实时数据时,指定为true可以快速消费老数据。
skipToEndRegex 指定一个正则表达式,当skipToEnd为true时,当遇到日志行符合该正则表达式时才会停止跳转
recursiveDirectorySearch 是否递归搜索目录。如果设置为true,则会一直遍历所有子目录,查找匹配指定文件路径的文件
fileHeaderKey 即设置的标识文件名的头信息写入的key。默认为file。
basenameHeader basename只保留文件的文件名,去掉父路径,仅保留文件名。当basename不为空时,会覆盖fileHeader头信息,并显示在events中。该参数默认值为false。
basenameHeaderKey basename头信息对应的key值,当basenameHeader设为true并且basenameHeaderKey不为空时,将会把basename写入为该key的头信息。
ignorePattern 忽略所有符合该正则表达式的文件名
deletePolicy 当文件被删除时,执行何种操作,可选值:“never”,”immediate”,”delay” 。
deserializer 反序列化器
deserializer.maxLineLength 设置已读取的每一行的最大长度
selector.type 选择器策略,默认选择策略是replicating,会将消息同时发送到channel中的所有sink中。
2.2 TaildirSource的核心逻辑class
2.2.1 TaildirEventReader
TaildirEventReader是TaildirSource的核心类,它是一个目录监听器,管理多个TailFile,某个TailFile对应一个正在运行的任务。
单个TailFile包含一个监听文件和依次下载并发送指定文件内新出现或者修改的行。
启动一个TailFile的线程是在TaildirSource的start函数中管理的
private Map<String, TailFile> tailFiles = new ConcurrentHashMap<>();
public void start() {
try {
logger.info("TaildirSource source starting with directory: {}", directory);
ensureDirectoryExistence();
tailFiles.clear();
//开始时读取所有的日志文件
List<File> files = getAllFiles(directory);
for (File file : files) {
addTailFile(file);
}
tailSourceCounter.start();
super.start();
logger.debug("Taildir source started");
} catch (Exception ex) {
ex.printStackTrace();
}
}
start函数执行的顺序如下:
1).调用ensureDirectoryExistence()检查数据文件目录是否存在;
2).清空tailFiles
3).调用getAllFiles(directory)获取指定目录下的所有日志文件
4).调用addTailFile()方法,为每一个日志文件创建TailFile对象,启动新的线程来读取文件内容,并加入到tailFiles中进行管理。
5).调用tailSourceCounter.start(),启动监控进程;
6).调用super.start(),启动Source组件的生命周期。
第3~5步是核心逻辑,下面分别分析:
3)调用getAllFiles(directory)获取指定目录下的所有日志文件
private List<File> getAllFiles(String path) throws IOException {
List<File> allFiles = Lists.newArrayList();
PathMatcher pathMatcher = FileSystems.getDefault().getPathMatcher("glob:**");
for (String fileGroup : filegroups) {
PathMatcher matcher = FileSystems.getDefault().getPathMatcher("glob:" + fileGroup);
//实现空数组pattern workaround effeciently.
if (matcher.matches(Paths.get(directory))) {
continue;
}
Path p = Paths.get(fileGroup);
String glob = p.getFileName().toString();
Path dir = p.getParent() == null ? Paths.get(".") : p.getParent();
try {
Files.walkFileTree(dir, new SimpleFileVisitor<Path>() {
@Override
public File
