《Flume系列文章》 三、Flume源码分析之Flume Source

SourceRunner

在上片文章中,Flume Node启动的是SourceRunner,SourceRunner的实现类有两个,PollableSourceRunner和EventDrivenSourceRunner。
PollableSourceRunner的start方法会启动一个线程,线程内部通过while不断调用PollableSource的process()方法。EventDrivenSourceRunner的start方法则只调用一次EventDrivenSource的start()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public static class PollingRunner implements Runnable {

private PollableSource source;
private AtomicBoolean shouldStop;
private CounterGroup counterGroup;

@Override
public void run() {
logger.debug("Polling runner starting. Source:{}", source);

while (!shouldStop.get()) { //循环执行Source的process方法
counterGroup.incrementAndGet("runner.polls");

try {
// 这里执行source.process()并对执行的结果进行判断,如果执行失败会休眠一段时间。
// 休眠的时间则是取的maxBackoffSleep(默认5000ms)和consecutive*backoffSleepIncrement(默认1000ms)的最小值。
// runner.backoffs.consecutive计数的记录连续失败的次数。
if (source.process().equals(PollableSource.Status.BACKOFF)) {
counterGroup.incrementAndGet("runner.backoffs");

Thread.sleep(Math.min(
counterGroup.incrementAndGet("runner.backoffs.consecutive")
* source.getBackOffSleepIncrement(), source.getMaxBackOffSleepInterval()));
} else {
counterGroup.set("runner.backoffs.consecutive", 0L);
}
} catch (InterruptedException e) {
logger.info("Source runner interrupted. Exiting");
counterGroup.incrementAndGet("runner.interruptions");
} catch (EventDeliveryException e) {
logger.error("Unable to deliver event. Exception follows.", e);
counterGroup.incrementAndGet("runner.deliveryErrors");
} catch (Exception e) {
counterGroup.incrementAndGet("runner.errors");
logger.error("Unhandled exception, logging and sleeping for " +
source.getMaxBackOffSleepInterval() + "ms", e);
try {
Thread.sleep(source.getMaxBackOffSleepInterval());
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}

logger.debug("Polling runner exiting. Metrics:{}", counterGroup);
}

}

HTTPSource

HTTPSource是EventDrivenSource的一个实现类。start()方法中启动了一个jetty server,在doPost方法中接收events然后发送给channel,下面看一个FlumeHTTPServlet的doPost()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public void doPost(HttpServletRequest request, HttpServletResponse response)
throws IOException {
List<Event> events = Collections.emptyList(); //create empty list
try {
events = handler.getEvents(request); //从request中获取Events。
} catch (HTTPBadRequestException ex) {
LOG.warn("Received bad request from client. ", ex);
response.sendError(HttpServletResponse.SC_BAD_REQUEST,
"Bad request from client. "
+ ex.getMessage());
return;
} catch (Exception ex) {
LOG.warn("Deserializer threw unexpected exception. ", ex);
response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
"Deserializer threw unexpected exception. "
+ ex.getMessage());
return;
}
sourceCounter.incrementAppendBatchReceivedCount();
sourceCounter.addToEventReceivedCount(events.size());
try {
getChannelProcessor().processEventBatch(events); //通过ChannelProcessor处理events,将events发送到Channel中
} catch (ChannelException ex) { //发送channel失败,返回错误消息
LOG.warn("Error appending event to channel. "
+ "Channel might be full. Consider increasing the channel "
+ "capacity or make sure the sinks perform faster.", ex);
response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE,
"Error appending event to channel. Channel might be full."
+ ex.getMessage());
return;
} catch (Exception ex) {
LOG.warn("Unexpected error appending event to channel. ", ex);
response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
"Unexpected error while appending event to channel. "
+ ex.getMessage());
return;
}
response.setCharacterEncoding(request.getCharacterEncoding());
response.setStatus(HttpServletResponse.SC_OK);
response.flushBuffer();
sourceCounter.incrementAppendBatchAcceptedCount();
sourceCounter.addToEventAcceptedCount(events.size());
}

KafkaSource

KafkaSource是PollableSource的一个实现类。doProcess()方法获取kafka的数据并通过ChannelProcessor发送到Channel中,处理完成后提交offset。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
protected Status doProcess() throws EventDeliveryException {
final String batchUUID = UUID.randomUUID().toString();
byte[] kafkaMessage;
String kafkaKey;
Event event;
byte[] eventBody;

try {
// prepare time variables for new batch
final long nanoBatchStartTime = System.nanoTime();
final long batchStartTime = System.currentTimeMillis();
final long maxBatchEndTime = System.currentTimeMillis() + maxBatchDurationMillis;

while (eventList.size() < batchUpperLimit && //判断eventList的长度是否超过batchSize
System.currentTimeMillis() < maxBatchEndTime) { //判断持续的时间是否超过batchDurationMillis

if (it == null || !it.hasNext()) { //判断一次poll的ConsumerRecords是否处理完了,如果完成则继续从kafka中poll数据
// Obtaining new records
// Poll time is remainder time for current batch.
ConsumerRecords<String, byte[]> records = consumer.poll(
Math.max(0, maxBatchEndTime - System.currentTimeMillis()));
it = records.iterator();

// this flag is set to true in a callback when some partitions are revoked.
// If there are any records we commit them.
//rebalanceFlag通过SourceRebalanceListener,监听kafka是否正在进行rebalance,正在rebalance的时候会将rebalanceFlag设置成true
if (rebalanceFlag.get()) {
rebalanceFlag.set(false);
break;
}
// check records after poll
if (!it.hasNext()) {
if (log.isDebugEnabled()) {
counter.incrementKafkaEmptyCount();
log.debug("Returning with backoff. No more data to read");
}
// batch time exceeded
break;
}
}

// get next message
ConsumerRecord<String, byte[]> message = it.next();
kafkaKey = message.key();
kafkaMessage = message.value();

if (useAvroEventFormat) {
//Assume the event is in Avro format using the AvroFlumeEvent schema
//Will need to catch the exception if it is not
ByteArrayInputStream in =
new ByteArrayInputStream(message.value());
decoder = DecoderFactory.get().directBinaryDecoder(in, decoder);
if (!reader.isPresent()) {
reader = Optional.of(
new SpecificDatumReader<AvroFlumeEvent>(AvroFlumeEvent.class));
}
//This may throw an exception but it will be caught by the
//exception handler below and logged at error
AvroFlumeEvent avroevent = reader.get().read(null, decoder);

eventBody = avroevent.getBody().array();
headers = toStringMap(avroevent.getHeaders());
} else {
eventBody = message.value();
headers.clear();
headers = new HashMap<String, String>(4);
}

// Add headers to event (timestamp, topic, partition, key) only if they don't exist
if (!headers.containsKey(KafkaSourceConstants.TIMESTAMP_HEADER)) {
headers.put(KafkaSourceConstants.TIMESTAMP_HEADER,
String.valueOf(System.currentTimeMillis()));
}
// Only set the topic header if setTopicHeader and it isn't already populated
if (setTopicHeader && !headers.containsKey(topicHeader)) {
headers.put(topicHeader, message.topic());
}
if (!headers.containsKey(KafkaSourceConstants.PARTITION_HEADER)) {
headers.put(KafkaSourceConstants.PARTITION_HEADER,
String.valueOf(message.partition()));
}

if (kafkaKey != null) {
headers.put(KafkaSourceConstants.KEY_HEADER, kafkaKey);
}

if (log.isTraceEnabled()) {
if (LogPrivacyUtil.allowLogRawData()) {
log.trace("Topic: {} Partition: {} Message: {}", new String[]{
message.topic(),
String.valueOf(message.partition()),
new String(eventBody)
});
} else {
log.trace("Topic: {} Partition: {} Message arrived.",
message.topic(),
String.valueOf(message.partition()));
}
}

event = EventBuilder.withBody(eventBody, headers);
eventList.add(event); //将event添加到eventList中

if (log.isDebugEnabled()) {
log.debug("Waited: {} ", System.currentTimeMillis() - batchStartTime);
log.debug("Event #: {}", eventList.size());
}

// For each partition store next offset that is going to be read.
// 记录kafka每个parttiion的offset
tpAndOffsetMetadata.put(new TopicPartition(message.topic(), message.partition()),
new OffsetAndMetadata(message.offset() + 1, batchUUID));
}

if (eventList.size() > 0) {
counter.addToKafkaEventGetTimer((System.nanoTime() - nanoBatchStartTime) / (1000 * 1000));
counter.addToEventReceivedCount((long) eventList.size());
getChannelProcessor().processEventBatch(eventList); //将eventList发送给Channel
counter.addToEventAcceptedCount(eventList.size());
if (log.isDebugEnabled()) {
log.debug("Wrote {} events to channel", eventList.size());
}
eventList.clear(); //清除eventList

if (!tpAndOffsetMetadata.isEmpty()) { //同步kafka的offset
long commitStartTime = System.nanoTime();
// TODO
// 这里有一个bug,如果commitSync失败的话,会重复消费
// eventList已经提交给channel了,但是offset没有提交成功,下次消费的时候kafka会重复消费之前的数据。
consumer.commitSync(tpAndOffsetMetadata);
long commitEndTime = System.nanoTime();
counter.addToKafkaCommitTimer((commitEndTime - commitStartTime) / (1000 * 1000));
tpAndOffsetMetadata.clear(); //清除tpAndOffsetMetadata
}
return Status.READY; //成功后返回ready状态
}

return Status.BACKOFF;
} catch (Exception e) { //如果processEventBatch发送channel失败,则不会同步offset,会重新消费之前的数据
log.error("KafkaSource EXCEPTION, {}", e);
return Status.BACKOFF; //失败后返回backoff状态,PollableSourceRunner休眠一段时间
}
}

ChannelProcessor

在Source中是通过ChannelProcessor的processEventBatch/processEvent将events发送到channel中,在方法中通过事务机制保证了数据一致性,保证了一次processEventBatch/processEvent方法要么将全部的events放入channel中要么都不会放入channel中,后面在详细对Flume的事务进行分析。在ChannelProcessor还包含了ChannelSelector和InterceptorChain。
ChannelSelector是根据events的相关数据,将events发送到对应的channel中,参考:http://flume.apache.org/FlumeUserGuide.html#flume-channel-selectors
Interceptor可以在events发送到channel之前对events进行修改或删除,参考:http://flume.apache.org/FlumeUserGuide.html#flume-interceptors

InterceptorChain

自定义的Interceptor通过实现intercept方法,可以对events进行修改或删除。不过有一个局限就是”public Event intercept(Event event);”这个方法不能够将一个event转成多个event,在项目中我遇到的一个问题是kafka source中消费的event是一个JSON数组字符串,需要转换成单个event进行落盘。解决方法是修改kafkasource代码,在里面添加一个Transform对象,进行转换操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class InterceptorChain:
@Override
public Event intercept(Event event) {
for (Interceptor interceptor : interceptors) {
if (event == null) {
return null;
}
event = interceptor.intercept(event); //调用interceptor处理event
}
return event;
}

@Override
public List<Event> intercept(List<Event> events) {
for (Interceptor interceptor : interceptors) {
if (events.isEmpty()) {
return events;
}
events = interceptor.intercept(events); //调用interceptor处理events
Preconditions.checkNotNull(events,
"Event list returned null from interceptor %s", interceptor);
}
return events;
}
interface Interceptor:
public Event intercept(Event event); //如果要删除event,返回null就可以了
public List<Event> intercept(List<Event> events);

ChannelSelector

ChannelSelector的子类需要实现下面两个方法,根据event返回必须的channel(RequiredChannels)和非必须的channel(OptionalChannels)。
RequiredChannels指的是在source发送event到channel的过程中如果发生的异常,需要告知source(向source抛出异常)
OptionalChannels指的是在source发送event到channel的过程中如果发生的异常,不需要告知source(不需要向source抛出异常,不过发生Error错误时会向source抛出)

1
2
public List<Channel> getRequiredChannels(Event event);
public List<Channel> getOptionalChannels(Event event);

ChannelProcessor.processEventBatch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
public void processEventBatch(List<Event> events) {
Preconditions.checkNotNull(events, "Event list must not be null");

events = interceptorChain.intercept(events); //interceptor处理events

Map<Channel, List<Event>> reqChannelQueue =
new LinkedHashMap<Channel, List<Event>>();

Map<Channel, List<Event>> optChannelQueue =
new LinkedHashMap<Channel, List<Event>>();

for (Event event : events) {
List<Channel> reqChannels = selector.getRequiredChannels(event); //获取必须的Channel

for (Channel ch : reqChannels) { //初始化RequiredChannel的队列
List<Event> eventQueue = reqChannelQueue.get(ch);
if (eventQueue == null) {
eventQueue = new ArrayList<Event>();
reqChannelQueue.put(ch, eventQueue);
}
eventQueue.add(event);
}

List<Channel> optChannels = selector.getOptionalChannels(event); //获取可选的Channel

for (Channel ch : optChannels) { //初始化OptionalChannel的队列
List<Event> eventQueue = optChannelQueue.get(ch);
if (eventQueue == null) {
eventQueue = new ArrayList<Event>();
optChannelQueue.put(ch, eventQueue);
}

eventQueue.add(event);
}
}

// Process required channels
for (Channel reqChannel : reqChannelQueue.keySet()) {
Transaction tx = reqChannel.getTransaction();
Preconditions.checkNotNull(tx, "Transaction object must not be null");
try {
tx.begin(); //开启事务

List<Event> batch = reqChannelQueue.get(reqChannel); //获取channel的Events

for (Event event : batch) {
reqChannel.put(event); //发送event到channel中
}

tx.commit(); //events全部成功发送到channel中,提交事务
} catch (Throwable t) {
tx.rollback(); //出现异常,事务回滚,再抛出异常让source处理
if (t instanceof Error) {
LOG.error("Error while writing to required channel: " + reqChannel, t);
throw (Error) t;
} else if (t instanceof ChannelException) {
throw (ChannelException) t;
} else {
throw new ChannelException("Unable to put batch on required " +
"channel: " + reqChannel, t);
}
} finally {
if (tx != null) {
tx.close(); //关闭事务
}
}
}

// Process optional channels
for (Channel optChannel : optChannelQueue.keySet()) {
Transaction tx = optChannel.getTransaction();
Preconditions.checkNotNull(tx, "Transaction object must not be null");
try {
tx.begin(); //开启事务

List<Event> batch = optChannelQueue.get(optChannel); //获取channel的Events

for (Event event : batch) {
optChannel.put(event); //发送event到channel中
}

tx.commit(); //events全部成功发送到channel中,提交事务
} catch (Throwable t) {
tx.rollback(); //出现异常,事务回滚。
LOG.error("Unable to put batch on optional channel: " + optChannel, t);
//这里只是在捕获到Error的错误情况下才会对Source抛出异常,其他的情况在不会抛出异常
if (t instanceof Error) {
throw (Error) t;
}
} finally {
if (tx != null) {
tx.close(); //关闭事务
}
}
}
}