上图的Flume的Architecture,在Flume中,最重要的抽象是data flow(数据流),data flow描述了数据从产生,传输、处理并最终写入目标的一条路径。在上图中,实线描述了data flow。
其中,Agent用于采集数据,agent是flume中产生数据流的地方,同时,agent会将产生的数据流传输到collector。对应的,collector用于对数据进行聚合,往往会产生一个更大的流。
Flume提供了从console(控制台)、RPC(Thrift-RPC)、text(文件)、tail(UNIX tail)、syslog(syslog日志系统,支持TCP和UDP等2种模式),exec(命令执行)等数据源上收集数据的能力。同时,Flume的数据接受方,可以是console(控制台)、text(文件)、dfs(HDFS文件)、RPC(Thrift-RPC)和syslogTCP(TCP syslog日志系统)等。
其中,收集数据有2种主要工作模式,如下:
Push Sources:外部系统会主动地将数据推送到Flume中,如RPC、syslog。
Polling Sources:Flume到外部系统中获取数据,一般使用轮询的方式,如text和exec。
注意,在Flume中,agent和collector对应,而source和sink对应。Source和sink强调发送、接受方的特性(如数据格式、编码等),而agent和collector关注功能。
Flume Master用于管理数据流的配置,如下图。
为了保证可扩展性,Flume采用了多Master的方式。为了保证配置数据的一致性,Flume引入了ZooKeeper,用于保存配置数据,ZooKeeper本身可保证配置数据的一致性和高可用,另外,在配置数据发生变化时,ZooKeeper可以通知Flume Master节点。
Flume Master间使用gossip协议同步数据。
下面简要分析Flume如何支持Reliability、Scalability、Manageability和Extensibility。
Reliability:Flume提供3中数据可靠性选项,包括End-to-end、Store on failure和Best effort。其中End-to-end使用了磁盘日志和接受端Ack的方式,保证Flume接受到的数据会最终到达目的。Store on failure在目的不可用的时候,数据会保持在本地硬盘。和End-to-end不同的是,如果是进程出现问题,Store on failure可能会丢失部分数据。Best effort不做任何QoS保证。
Scalability:Flume的3大组件:collector、master和storage tier都是可伸缩的。需要注意的是,Flume中对事件的处理不需要带状态,它的Scalability可以很容易实现。
Manageability:Flume利用ZooKeeper和gossip,保证配置数据的一致性、高可用。同时,多Master,保证Master可以管理大量的节点。
Extensibility:基于Java,用户可以为Flume添加各种新的功能,如通过继承Source,用户可以实现自己的数据接入方式,实现Sink的子类,用户可以将数据写往特定目标,同时,通过SinkDecorator,用户可以对数据进行一定的预处理。
如上代码可以看到,每条日志过来都需要trigger判断是否达到归档条件。如果达到归档条件则惊醒归档,并且把该条日志写入新的文件中。个人觉得如上绿色和红色代码应该交换顺寻,触发归档的日志应该写入当前文档,而不是写入新创建的文档中,理由有两点:1、日志从产品处发送到flume虽然时间很短,但是触发归档的日志产生时间是大于归档时间的,理论上是属于上个归档规则之内的日志,例如:时间方式归档。public void synchronousAppend(Event e) throws IOException,
InterruptedException {
Preconditions.checkState(curSink != null,
"Attempted to append when rollsink not open");
if (trigger.isTriggered()) {
trigger.reset();
LOG.debug("Rotate started by append... ");
rotate();
LOG.debug("... rotate completed by append.");
}
String tag = trigger.getTagger().getTag();
e.set(A_ROLL_TAG, tag.getBytes());
lock.readLock().lock();
try {
curSink.append(e);
trigger.append(e);
super.append(e);
} finally {
lock.readLock().unlock();
}
}
该线程用于保证当没有日志过来的时候,按时间归档的日志能够及时归档,归档的方式为关闭RollSink中curSink,重新open。while (!isInterrupted()) {
// TODO there should probably be a lock on Roll sink but until we
// handle
// interruptions throughout the code, we cannot because this causes a
// deadlock
if (trigger.isTriggered()) {
trigger.reset();
LOG.debug("Rotate started by triggerthread... ");
rotate();
LOG.debug("Rotate stopped by triggerthread... ");
continue;
}
try {
Clock.sleep(checkLatencyMs);
} catch (InterruptedException e) {
LOG.debug("TriggerThread interrupted");
doneLatch.countDown();
return;
}
}
<property>
<name>flume.collector.dfs.compress.codec</name>
<value>Lzop</value>
<description>Writes formatted data compressed in specified codec to dfs. Value is None, GzipCodec, DefaultCodec (deflate), BZip2Codec, or any other Codec Hadoop is aware of </description>
</property>
<property>
<name>io.compression.codecs</name>
<value>com.hadoop.compression.lzo.LzopCodec</value>
<description>lzop</description>
</property>
<property>
<name>flume.security.kerberos.principal</name>
<value>flume/local@DOMAIN</value>
<description></description>
</property>
<property>
<name>flume.security.kerberos.keytab</name>
<value>/home/ds/flume/flume.keytab</value>
<description></description>
</property>