《Flume系列文章》 四、Flume源码分析之Flume-Sink

SinkRunner

SinkRunner是Sink组件启动的入口,start方法里面可以看到启动了一个线程执行PollingRunner Runnable对象,PollingRunner的实现跟上篇PollableSourceRunner里面PollingRunner的实现一样,循环调用SinkProcessor的proces方法。

SinkProcessor

Sink部分有一个SinkGroup的概念,将一些Sink放在一个SinkGroup里面并指定一个SinkProcessor,可以实现多个Sink的故障转移(failover)或负载均衡(load_balance)等。
SinkProcessor有三个实现类:DefaultSinkProcessor、FailoverSinkProcessor、LoadBalancingSinkProcessor。

DefaultSinkProcessor

在没有指定SinkGroup的时候会对每个Sink启动一个SinkRunner并指定为DefaultSinkProcessor。DefaultSinkProcessor只会接收一个Sink,在process方法里面也只是调用了Sink的process方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public Status process() throws EventDeliveryException {
return sink.process();
}

@Override
public void setSinks(List<Sink> sinks) { //AbstractConfigurationProvider的loadSinkGroups方法加载配置的时候进行设置
Preconditions.checkNotNull(sinks);
Preconditions.checkArgument(sinks.size() == 1, "DefaultSinkPolicy can "
+ "only handle one sink, "
+ "try using a policy that supports multiple sinks");
sink = sinks.get(0);
}

FailoverSinkProcessor

SinkGroup的processor.type设置为failover时,对SinkGroup里面的Sink实行故障转移。FailoverSinkProcessor将有效的Sink存在通过优先级进行排序TreeMap结构的liveSinks中,将失效的Sink存在优先级队列failedSinks中。首先在TreeMap中获取第一个有效的Sink,循环执行process方法,当出现异常时,将这个Sink从TreeMap中移除并放入到失效的队列中,再从TreeMap中获取下一个有效的Sink继续执行process方法,当没有有效的Sink时,会对failedSinks中的sink进行刷新。下面看一下FailoverSinkProcessor的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
@Override
public void configure(Context context) { //配置
liveSinks = new TreeMap<Integer, Sink>(); //使用TreeMap存储有序的Sinks,通过优先级进行排序。
failedSinks = new PriorityQueue<FailedSink>(); //优先级队列来存放失败的Sink
Integer nextPrio = 0;
String maxPenaltyStr = context.getString(MAX_PENALTY_PREFIX);
if (maxPenaltyStr == null) {
maxPenalty = DEFAULT_MAX_PENALTY;
} else {
try {
maxPenalty = Integer.parseInt(maxPenaltyStr);
} catch (NumberFormatException e) {
logger.warn("{} is not a valid value for {}",
new Object[] { maxPenaltyStr, MAX_PENALTY_PREFIX });
maxPenalty = DEFAULT_MAX_PENALTY;
}
}
for (Entry<String, Sink> entry : sinks.entrySet()) {
String priStr = PRIORITY_PREFIX + entry.getKey();
Integer priority; //优先级,用来对Sink进行排序,优先级高的将较早的激活
try {
priority = Integer.parseInt(context.getString(priStr));
} catch (Exception e) {
priority = --nextPrio; //没有配置优先级的话,根据加载的顺序,从0递减
}
if (!liveSinks.containsKey(priority)) {
liveSinks.put(priority, sinks.get(entry.getKey()));
} else {
logger.warn("Sink {} not added to FailverSinkProcessor as priority" +
"duplicates that of sink {}", entry.getKey(),
liveSinks.get(priority));
}
}
activeSink = liveSinks.get(liveSinks.lastKey()); //获取优先级最高的Sink,作为active sink
}

@Override
public Status process() throws EventDeliveryException {
// Retry any failed sinks that have gone through their "cooldown" period
Long now = System.currentTimeMillis();
//刷新失败的sinks
//failedSinks不为空,并且failedSinks的头部的Sink的刷新时间小于当前时间,开始刷新失败的Sink
while (!failedSinks.isEmpty() && failedSinks.peek().getRefresh() < now) {
FailedSink cur = failedSinks.poll(); //取出头部的Sink
Status s;
try {
s = cur.getSink().process(); //执行失败sink的process方法
if (s == Status.READY) { //判断执行状态,如果成功执行,将sink放入liveSinks里面,并重新在liveSinks里面获取一个sink作为activeSink
liveSinks.put(cur.getPriority(), cur.getSink());
activeSink = liveSinks.get(liveSinks.lastKey());
logger.debug("Sink {} was recovered from the fail list",
cur.getSink().getName());
} else {
// if it's a backoff it needn't be penalized.
failedSinks.add(cur);
}
return s;
} catch (Exception e) { //执行出现异常时,将sink失败次数加一,并放回失败队列中
cur.incFails();
failedSinks.add(cur);
}
}

Status ret = null;
while (activeSink != null) { //当activeSink不为空的时候循环执行sink的process方法
try {
ret = activeSink.process();
return ret;
} catch (Exception e) {
logger.warn("Sink {} failed and has been sent to failover list",
activeSink.getName(), e);
//出现异常的时候移除当前的sink并获取下一个有效的sink。当没有有效的sink的时候会跳出循环,下次进到方法的时候就会执行上面刷新失败的sinks
activeSink = moveActiveToDeadAndGetNext();
}
}

throw new EventDeliveryException("All sinks failed to process, " +
"nothing left to failover to");
}

LoadBalancingSinkProcessor

SinkGroup的processor.type设置为load_balance时,对SinkGroup里面的Sink实行负载均衡。负载均衡的实现是通过实现SinkSelector对象的createSinkIterator方法,生成Sinks排序不同的的Iterator,已经实现了循环(round_robin)和随机(random)两种方式,也可以通过继承AbstractSinkSelector类自己进行实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
public Status process() throws EventDeliveryException {
Status status = null;

Iterator<Sink> sinkIterator = selector.createSinkIterator(); //这里不同的负载均衡方法,sinkIterator中sink的顺序不同
while (sinkIterator.hasNext()) {
Sink sink = sinkIterator.next();
try {
status = sink.process();
break;
} catch (Exception ex) {
selector.informSinkFailed(sink);
LOGGER.warn("Sink failed to consume event. "
+ "Attempting next sink if available.", ex);
}
}

if (status == null) {
throw new EventDeliveryException("All configured sinks have failed");
}

return status;
}

Sink

Sink的实现就不做具体的分析了,可以通过继承AbstractSink类,编写自定义的Sink,Sink的process方法中对Channel数据进行读取需要开启事务操作。下面是HiveSink的Process方法实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public Status process() throws EventDeliveryException {
// writers used in this Txn

Channel channel = getChannel();
Transaction transaction = channel.getTransaction(); //获取事务
transaction.begin(); //开启事务
boolean success = false;
try {
// 1 Enable Heart Beats
if (timeToSendHeartBeat.compareAndSet(true, false)) {
enableHeartBeatOnAllWriters();
}

// 2 Drain Batch
int txnEventCount = drainOneBatch(channel); //批量处理数据
transaction.commit(); //提交事务
success = true;

// 3 Update Counters
if (txnEventCount < 1) {
return Status.BACKOFF;
} else {
return Status.READY;
}
} catch (InterruptedException err) {
LOG.warn(getName() + ": Thread was interrupted.", err);
return Status.BACKOFF;
} catch (Exception e) {
throw new EventDeliveryException(e);
} finally {
if (!success) {
transaction.rollback(); //事务回滚
}
transaction.close(); //关闭事务
}
}