wForget's blog


  • Home

  • About

  • Tags

  • Archives

《Flume系列文章》 五、Flume源码分析之Flume-Channel

Posted on 2018-09-25

Flume Channel

AbstractConfigurationProvider的loadChannels()方法,加载配置文件中Channel的配置。loadChannels()中调用getOrCreateChannel()方法,通过DefaultChannelFactory.create()根据type创建不同的Channel。Flume目前实现的Channel有:Memory Channel、JDBC Channel、Kafka Channel、File Channel、Spillable Memory Channel(events存储在内存或磁盘上(溢出的时候),试验阶段不建议生成环境使用),具体配置参考:http://flume.apache.org/FlumeUserGuide.html#flume-channels。

Memory Channel

这里具体对Memory Channel源码进行分析,学习一下Channel中事务机制的实现。

1
2
3
4
5
6
7
8
9
private Object queueLock = new Object();		//用于锁定queue
@GuardedBy(value = "queueLock")
private LinkedBlockingDeque<Event> queue; //Channel中存放events的队列

private Semaphore queueStored; //用于控制put和take操作的信号量,初始值为0,put操作release添加许可证,take操作tryAcquire请求许可证

private Semaphore queueRemaining; //用于控制Channel中最大event数的信号量,初始值为capacity配置。

private Semaphore bytesRemaining; //用于控制Channel中最大字节数的信号量,初始值为byteCapacity配置。

MemoryTransaction

前面的文章中可以知道,Source是通过ChannelProcessor.processEventBatch()向Channel中put events,Sink在process()中从Channel中take events。Channel对Event具体的操作是通过MemoryTransaction实现的,MemoryTransaction中定义了两个LinkedBlockingDeque(阻塞双端队列),takeList和putList,用来缓存一次事务中take或者put操作的events。putByteCounter和takeByteCounter对象是对events的bytes计数。
Channel事务中定义了put、take、commit、rollback四个操作。具体的实现:
put操作把event put到putList中;
take操作从queue中take event到takeList中,并返回event;
commit操作将putList里面events put到queue中,并清空takeList;
rollback操作是将takeList里面的events put回queue中,并清空putList。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
private class MemoryTransaction extends BasicTransactionSemantics {
private LinkedBlockingDeque<Event> takeList; //存放take操作的events
private LinkedBlockingDeque<Event> putList; //存放put操作的events
private final ChannelCounter channelCounter; //channel counter(监控)
private int putByteCounter = 0; //put events的总bytes
private int takeByteCounter = 0; //take events的总bytes

public MemoryTransaction(int transCapacity, ChannelCounter counter) {
//transCapacity 是一次事务中操作events的最大数量
putList = new LinkedBlockingDeque<Event>(transCapacity);
takeList = new LinkedBlockingDeque<Event>(transCapacity);

channelCounter = counter;
}

//put操作是把event put到putList中
@Override
protected void doPut(Event event) throws InterruptedException {
channelCounter.incrementEventPutAttemptCount();
int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize);

if (!putList.offer(event)) { //向putList中offer event,满了的话抛出异常
throw new ChannelException(
"Put queue for MemoryTransaction of capacity " +
putList.size() + " full, consider committing more frequently, " +
"increasing capacity or increasing thread count");
}
putByteCounter += eventByteSize;
}

//take操作是从channel 的 events queue中take event到takeList中
@Override
protected Event doTake() throws InterruptedException {
channelCounter.incrementEventTakeAttemptCount();
if (takeList.remainingCapacity() == 0) { //takeList满了的话抛出异常
throw new ChannelException("Take list for MemoryTransaction, capacity " +
takeList.size() + " full, consider committing more frequently, " +
"increasing capacity, or increasing thread count");
}
if (!queueStored.tryAcquire(keepAlive, TimeUnit.SECONDS)) { //请求许可
return null;
}
Event event;
synchronized (queueLock) { //锁定queue,poll一个event
event = queue.poll();
}
Preconditions.checkNotNull(event, "Queue.poll returned NULL despite semaphore " +
"signalling existence of entry");
takeList.put(event); //将event存入takeList

int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize);
takeByteCounter += eventByteSize;

return event;
}

//commit操作是将putList里面events put到queue中,并清空takeList
@Override
protected void doCommit() throws InterruptedException {
int remainingChange = takeList.size() - putList.size();
if (remainingChange < 0) {
//判断put到queue中是否会导致,channel中最大字节数操作byteCapacity
if (!bytesRemaining.tryAcquire(putByteCounter, keepAlive, TimeUnit.SECONDS)) {
throw new ChannelException("Cannot commit transaction. Byte capacity " +
"allocated to store event body " + byteCapacity * byteCapacitySlotSize +
"reached. Please increase heap space/byte capacity allocated to " +
"the channel as the sinks may not be keeping up with the sources");
}
//判断提交本次事务是否会导致channel中的events的数量超过capacity
if (!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) {
bytesRemaining.release(putByteCounter);
throw new ChannelFullException("Space for commit to queue couldn't be acquired." +
" Sinks are likely not keeping up with sources, or the buffer size is too tight");
}
}
int puts = putList.size();
int takes = takeList.size();
synchronized (queueLock) { //将putList里面events put到queue中,并清空takeList
if (puts > 0) {
while (!putList.isEmpty()) {
if (!queue.offer(putList.removeFirst())) {
throw new RuntimeException("Queue add failed, this shouldn't be able to happen");
}
}
}
putList.clear();
takeList.clear();
}
bytesRemaining.release(takeByteCounter);
takeByteCounter = 0;
putByteCounter = 0;

queueStored.release(puts);
if (remainingChange > 0) {
queueRemaining.release(remainingChange);
}
if (puts > 0) {
channelCounter.addToEventPutSuccessCount(puts);
}
if (takes > 0) {
channelCounter.addToEventTakeSuccessCount(takes);
}

channelCounter.setChannelSize(queue.size());
}

//rollback操作是将takeList里面的events put回queue中,并清空putList
@Override
protected void doRollback() {
int takes = takeList.size();
synchronized (queueLock) {
Preconditions.checkState(queue.remainingCapacity() >= takeList.size(),
"Not enough space in memory channel " +
"queue to rollback takes. This should never happen, please report");
while (!takeList.isEmpty()) {
queue.addFirst(takeList.removeLast());
}
putList.clear();
}
putByteCounter = 0;
takeByteCounter = 0;

queueStored.release(takes);
channelCounter.setChannelSize(queue.size());
}

}

《Flume系列文章》 四、Flume源码分析之Flume-Sink

Posted on 2018-09-25

SinkRunner

SinkRunner是Sink组件启动的入口,start方法里面可以看到启动了一个线程执行PollingRunner Runnable对象,PollingRunner的实现跟上篇PollableSourceRunner里面PollingRunner的实现一样,循环调用SinkProcessor的proces方法。

SinkProcessor

Sink部分有一个SinkGroup的概念,将一些Sink放在一个SinkGroup里面并指定一个SinkProcessor,可以实现多个Sink的故障转移(failover)或负载均衡(load_balance)等。
SinkProcessor有三个实现类:DefaultSinkProcessor、FailoverSinkProcessor、LoadBalancingSinkProcessor。

DefaultSinkProcessor

在没有指定SinkGroup的时候会对每个Sink启动一个SinkRunner并指定为DefaultSinkProcessor。DefaultSinkProcessor只会接收一个Sink,在process方法里面也只是调用了Sink的process方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public Status process() throws EventDeliveryException {
return sink.process();
}

@Override
public void setSinks(List<Sink> sinks) { //AbstractConfigurationProvider的loadSinkGroups方法加载配置的时候进行设置
Preconditions.checkNotNull(sinks);
Preconditions.checkArgument(sinks.size() == 1, "DefaultSinkPolicy can "
+ "only handle one sink, "
+ "try using a policy that supports multiple sinks");
sink = sinks.get(0);
}

FailoverSinkProcessor

SinkGroup的processor.type设置为failover时,对SinkGroup里面的Sink实行故障转移。FailoverSinkProcessor将有效的Sink存在通过优先级进行排序TreeMap结构的liveSinks中,将失效的Sink存在优先级队列failedSinks中。首先在TreeMap中获取第一个有效的Sink,循环执行process方法,当出现异常时,将这个Sink从TreeMap中移除并放入到失效的队列中,再从TreeMap中获取下一个有效的Sink继续执行process方法,当没有有效的Sink时,会对failedSinks中的sink进行刷新。下面看一下FailoverSinkProcessor的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
@Override
public void configure(Context context) { //配置
liveSinks = new TreeMap<Integer, Sink>(); //使用TreeMap存储有序的Sinks,通过优先级进行排序。
failedSinks = new PriorityQueue<FailedSink>(); //优先级队列来存放失败的Sink
Integer nextPrio = 0;
String maxPenaltyStr = context.getString(MAX_PENALTY_PREFIX);
if (maxPenaltyStr == null) {
maxPenalty = DEFAULT_MAX_PENALTY;
} else {
try {
maxPenalty = Integer.parseInt(maxPenaltyStr);
} catch (NumberFormatException e) {
logger.warn("{} is not a valid value for {}",
new Object[] { maxPenaltyStr, MAX_PENALTY_PREFIX });
maxPenalty = DEFAULT_MAX_PENALTY;
}
}
for (Entry<String, Sink> entry : sinks.entrySet()) {
String priStr = PRIORITY_PREFIX + entry.getKey();
Integer priority; //优先级,用来对Sink进行排序,优先级高的将较早的激活
try {
priority = Integer.parseInt(context.getString(priStr));
} catch (Exception e) {
priority = --nextPrio; //没有配置优先级的话,根据加载的顺序,从0递减
}
if (!liveSinks.containsKey(priority)) {
liveSinks.put(priority, sinks.get(entry.getKey()));
} else {
logger.warn("Sink {} not added to FailverSinkProcessor as priority" +
"duplicates that of sink {}", entry.getKey(),
liveSinks.get(priority));
}
}
activeSink = liveSinks.get(liveSinks.lastKey()); //获取优先级最高的Sink,作为active sink
}

@Override
public Status process() throws EventDeliveryException {
// Retry any failed sinks that have gone through their "cooldown" period
Long now = System.currentTimeMillis();
//刷新失败的sinks
//failedSinks不为空,并且failedSinks的头部的Sink的刷新时间小于当前时间,开始刷新失败的Sink
while (!failedSinks.isEmpty() && failedSinks.peek().getRefresh() < now) {
FailedSink cur = failedSinks.poll(); //取出头部的Sink
Status s;
try {
s = cur.getSink().process(); //执行失败sink的process方法
if (s == Status.READY) { //判断执行状态,如果成功执行,将sink放入liveSinks里面,并重新在liveSinks里面获取一个sink作为activeSink
liveSinks.put(cur.getPriority(), cur.getSink());
activeSink = liveSinks.get(liveSinks.lastKey());
logger.debug("Sink {} was recovered from the fail list",
cur.getSink().getName());
} else {
// if it's a backoff it needn't be penalized.
failedSinks.add(cur);
}
return s;
} catch (Exception e) { //执行出现异常时,将sink失败次数加一,并放回失败队列中
cur.incFails();
failedSinks.add(cur);
}
}

Status ret = null;
while (activeSink != null) { //当activeSink不为空的时候循环执行sink的process方法
try {
ret = activeSink.process();
return ret;
} catch (Exception e) {
logger.warn("Sink {} failed and has been sent to failover list",
activeSink.getName(), e);
//出现异常的时候移除当前的sink并获取下一个有效的sink。当没有有效的sink的时候会跳出循环,下次进到方法的时候就会执行上面刷新失败的sinks
activeSink = moveActiveToDeadAndGetNext();
}
}

throw new EventDeliveryException("All sinks failed to process, " +
"nothing left to failover to");
}

LoadBalancingSinkProcessor

SinkGroup的processor.type设置为load_balance时,对SinkGroup里面的Sink实行负载均衡。负载均衡的实现是通过实现SinkSelector对象的createSinkIterator方法,生成Sinks排序不同的的Iterator,已经实现了循环(round_robin)和随机(random)两种方式,也可以通过继承AbstractSinkSelector类自己进行实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
public Status process() throws EventDeliveryException {
Status status = null;

Iterator<Sink> sinkIterator = selector.createSinkIterator(); //这里不同的负载均衡方法,sinkIterator中sink的顺序不同
while (sinkIterator.hasNext()) {
Sink sink = sinkIterator.next();
try {
status = sink.process();
break;
} catch (Exception ex) {
selector.informSinkFailed(sink);
LOGGER.warn("Sink failed to consume event. "
+ "Attempting next sink if available.", ex);
}
}

if (status == null) {
throw new EventDeliveryException("All configured sinks have failed");
}

return status;
}

Sink

Sink的实现就不做具体的分析了,可以通过继承AbstractSink类,编写自定义的Sink,Sink的process方法中对Channel数据进行读取需要开启事务操作。下面是HiveSink的Process方法实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public Status process() throws EventDeliveryException {
// writers used in this Txn

Channel channel = getChannel();
Transaction transaction = channel.getTransaction(); //获取事务
transaction.begin(); //开启事务
boolean success = false;
try {
// 1 Enable Heart Beats
if (timeToSendHeartBeat.compareAndSet(true, false)) {
enableHeartBeatOnAllWriters();
}

// 2 Drain Batch
int txnEventCount = drainOneBatch(channel); //批量处理数据
transaction.commit(); //提交事务
success = true;

// 3 Update Counters
if (txnEventCount < 1) {
return Status.BACKOFF;
} else {
return Status.READY;
}
} catch (InterruptedException err) {
LOG.warn(getName() + ": Thread was interrupted.", err);
return Status.BACKOFF;
} catch (Exception e) {
throw new EventDeliveryException(e);
} finally {
if (!success) {
transaction.rollback(); //事务回滚
}
transaction.close(); //关闭事务
}
}

《Flume系列文章》 三、Flume源码分析之Flume Source

Posted on 2018-09-12

SourceRunner

在上片文章中,Flume Node启动的是SourceRunner,SourceRunner的实现类有两个,PollableSourceRunner和EventDrivenSourceRunner。
PollableSourceRunner的start方法会启动一个线程,线程内部通过while不断调用PollableSource的process()方法。EventDrivenSourceRunner的start方法则只调用一次EventDrivenSource的start()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public static class PollingRunner implements Runnable {

private PollableSource source;
private AtomicBoolean shouldStop;
private CounterGroup counterGroup;

@Override
public void run() {
logger.debug("Polling runner starting. Source:{}", source);

while (!shouldStop.get()) { //循环执行Source的process方法
counterGroup.incrementAndGet("runner.polls");

try {
// 这里执行source.process()并对执行的结果进行判断,如果执行失败会休眠一段时间。
// 休眠的时间则是取的maxBackoffSleep(默认5000ms)和consecutive*backoffSleepIncrement(默认1000ms)的最小值。
// runner.backoffs.consecutive计数的记录连续失败的次数。
if (source.process().equals(PollableSource.Status.BACKOFF)) {
counterGroup.incrementAndGet("runner.backoffs");

Thread.sleep(Math.min(
counterGroup.incrementAndGet("runner.backoffs.consecutive")
* source.getBackOffSleepIncrement(), source.getMaxBackOffSleepInterval()));
} else {
counterGroup.set("runner.backoffs.consecutive", 0L);
}
} catch (InterruptedException e) {
logger.info("Source runner interrupted. Exiting");
counterGroup.incrementAndGet("runner.interruptions");
} catch (EventDeliveryException e) {
logger.error("Unable to deliver event. Exception follows.", e);
counterGroup.incrementAndGet("runner.deliveryErrors");
} catch (Exception e) {
counterGroup.incrementAndGet("runner.errors");
logger.error("Unhandled exception, logging and sleeping for " +
source.getMaxBackOffSleepInterval() + "ms", e);
try {
Thread.sleep(source.getMaxBackOffSleepInterval());
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
}

logger.debug("Polling runner exiting. Metrics:{}", counterGroup);
}

}

HTTPSource

HTTPSource是EventDrivenSource的一个实现类。start()方法中启动了一个jetty server,在doPost方法中接收events然后发送给channel,下面看一个FlumeHTTPServlet的doPost()方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public void doPost(HttpServletRequest request, HttpServletResponse response)
throws IOException {
List<Event> events = Collections.emptyList(); //create empty list
try {
events = handler.getEvents(request); //从request中获取Events。
} catch (HTTPBadRequestException ex) {
LOG.warn("Received bad request from client. ", ex);
response.sendError(HttpServletResponse.SC_BAD_REQUEST,
"Bad request from client. "
+ ex.getMessage());
return;
} catch (Exception ex) {
LOG.warn("Deserializer threw unexpected exception. ", ex);
response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
"Deserializer threw unexpected exception. "
+ ex.getMessage());
return;
}
sourceCounter.incrementAppendBatchReceivedCount();
sourceCounter.addToEventReceivedCount(events.size());
try {
getChannelProcessor().processEventBatch(events); //通过ChannelProcessor处理events,将events发送到Channel中
} catch (ChannelException ex) { //发送channel失败,返回错误消息
LOG.warn("Error appending event to channel. "
+ "Channel might be full. Consider increasing the channel "
+ "capacity or make sure the sinks perform faster.", ex);
response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE,
"Error appending event to channel. Channel might be full."
+ ex.getMessage());
return;
} catch (Exception ex) {
LOG.warn("Unexpected error appending event to channel. ", ex);
response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR,
"Unexpected error while appending event to channel. "
+ ex.getMessage());
return;
}
response.setCharacterEncoding(request.getCharacterEncoding());
response.setStatus(HttpServletResponse.SC_OK);
response.flushBuffer();
sourceCounter.incrementAppendBatchAcceptedCount();
sourceCounter.addToEventAcceptedCount(events.size());
}

KafkaSource

KafkaSource是PollableSource的一个实现类。doProcess()方法获取kafka的数据并通过ChannelProcessor发送到Channel中,处理完成后提交offset。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
protected Status doProcess() throws EventDeliveryException {
final String batchUUID = UUID.randomUUID().toString();
byte[] kafkaMessage;
String kafkaKey;
Event event;
byte[] eventBody;

try {
// prepare time variables for new batch
final long nanoBatchStartTime = System.nanoTime();
final long batchStartTime = System.currentTimeMillis();
final long maxBatchEndTime = System.currentTimeMillis() + maxBatchDurationMillis;

while (eventList.size() < batchUpperLimit && //判断eventList的长度是否超过batchSize
System.currentTimeMillis() < maxBatchEndTime) { //判断持续的时间是否超过batchDurationMillis

if (it == null || !it.hasNext()) { //判断一次poll的ConsumerRecords是否处理完了,如果完成则继续从kafka中poll数据
// Obtaining new records
// Poll time is remainder time for current batch.
ConsumerRecords<String, byte[]> records = consumer.poll(
Math.max(0, maxBatchEndTime - System.currentTimeMillis()));
it = records.iterator();

// this flag is set to true in a callback when some partitions are revoked.
// If there are any records we commit them.
//rebalanceFlag通过SourceRebalanceListener,监听kafka是否正在进行rebalance,正在rebalance的时候会将rebalanceFlag设置成true
if (rebalanceFlag.get()) {
rebalanceFlag.set(false);
break;
}
// check records after poll
if (!it.hasNext()) {
if (log.isDebugEnabled()) {
counter.incrementKafkaEmptyCount();
log.debug("Returning with backoff. No more data to read");
}
// batch time exceeded
break;
}
}

// get next message
ConsumerRecord<String, byte[]> message = it.next();
kafkaKey = message.key();
kafkaMessage = message.value();

if (useAvroEventFormat) {
//Assume the event is in Avro format using the AvroFlumeEvent schema
//Will need to catch the exception if it is not
ByteArrayInputStream in =
new ByteArrayInputStream(message.value());
decoder = DecoderFactory.get().directBinaryDecoder(in, decoder);
if (!reader.isPresent()) {
reader = Optional.of(
new SpecificDatumReader<AvroFlumeEvent>(AvroFlumeEvent.class));
}
//This may throw an exception but it will be caught by the
//exception handler below and logged at error
AvroFlumeEvent avroevent = reader.get().read(null, decoder);

eventBody = avroevent.getBody().array();
headers = toStringMap(avroevent.getHeaders());
} else {
eventBody = message.value();
headers.clear();
headers = new HashMap<String, String>(4);
}

// Add headers to event (timestamp, topic, partition, key) only if they don't exist
if (!headers.containsKey(KafkaSourceConstants.TIMESTAMP_HEADER)) {
headers.put(KafkaSourceConstants.TIMESTAMP_HEADER,
String.valueOf(System.currentTimeMillis()));
}
// Only set the topic header if setTopicHeader and it isn't already populated
if (setTopicHeader && !headers.containsKey(topicHeader)) {
headers.put(topicHeader, message.topic());
}
if (!headers.containsKey(KafkaSourceConstants.PARTITION_HEADER)) {
headers.put(KafkaSourceConstants.PARTITION_HEADER,
String.valueOf(message.partition()));
}

if (kafkaKey != null) {
headers.put(KafkaSourceConstants.KEY_HEADER, kafkaKey);
}

if (log.isTraceEnabled()) {
if (LogPrivacyUtil.allowLogRawData()) {
log.trace("Topic: {} Partition: {} Message: {}", new String[]{
message.topic(),
String.valueOf(message.partition()),
new String(eventBody)
});
} else {
log.trace("Topic: {} Partition: {} Message arrived.",
message.topic(),
String.valueOf(message.partition()));
}
}

event = EventBuilder.withBody(eventBody, headers);
eventList.add(event); //将event添加到eventList中

if (log.isDebugEnabled()) {
log.debug("Waited: {} ", System.currentTimeMillis() - batchStartTime);
log.debug("Event #: {}", eventList.size());
}

// For each partition store next offset that is going to be read.
// 记录kafka每个parttiion的offset
tpAndOffsetMetadata.put(new TopicPartition(message.topic(), message.partition()),
new OffsetAndMetadata(message.offset() + 1, batchUUID));
}

if (eventList.size() > 0) {
counter.addToKafkaEventGetTimer((System.nanoTime() - nanoBatchStartTime) / (1000 * 1000));
counter.addToEventReceivedCount((long) eventList.size());
getChannelProcessor().processEventBatch(eventList); //将eventList发送给Channel
counter.addToEventAcceptedCount(eventList.size());
if (log.isDebugEnabled()) {
log.debug("Wrote {} events to channel", eventList.size());
}
eventList.clear(); //清除eventList

if (!tpAndOffsetMetadata.isEmpty()) { //同步kafka的offset
long commitStartTime = System.nanoTime();
// TODO
// 这里有一个bug,如果commitSync失败的话,会重复消费
// eventList已经提交给channel了,但是offset没有提交成功,下次消费的时候kafka会重复消费之前的数据。
consumer.commitSync(tpAndOffsetMetadata);
long commitEndTime = System.nanoTime();
counter.addToKafkaCommitTimer((commitEndTime - commitStartTime) / (1000 * 1000));
tpAndOffsetMetadata.clear(); //清除tpAndOffsetMetadata
}
return Status.READY; //成功后返回ready状态
}

return Status.BACKOFF;
} catch (Exception e) { //如果processEventBatch发送channel失败,则不会同步offset,会重新消费之前的数据
log.error("KafkaSource EXCEPTION, {}", e);
return Status.BACKOFF; //失败后返回backoff状态,PollableSourceRunner休眠一段时间
}
}

ChannelProcessor

在Source中是通过ChannelProcessor的processEventBatch/processEvent将events发送到channel中,在方法中通过事务机制保证了数据一致性,保证了一次processEventBatch/processEvent方法要么将全部的events放入channel中要么都不会放入channel中,后面在详细对Flume的事务进行分析。在ChannelProcessor还包含了ChannelSelector和InterceptorChain。
ChannelSelector是根据events的相关数据,将events发送到对应的channel中,参考:http://flume.apache.org/FlumeUserGuide.html#flume-channel-selectors。
Interceptor可以在events发送到channel之前对events进行修改或删除,参考:http://flume.apache.org/FlumeUserGuide.html#flume-interceptors。

InterceptorChain

自定义的Interceptor通过实现intercept方法,可以对events进行修改或删除。不过有一个局限就是”public Event intercept(Event event);”这个方法不能够将一个event转成多个event,在项目中我遇到的一个问题是kafka source中消费的event是一个JSON数组字符串,需要转换成单个event进行落盘。解决方法是修改kafkasource代码,在里面添加一个Transform对象,进行转换操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
class InterceptorChain:
@Override
public Event intercept(Event event) {
for (Interceptor interceptor : interceptors) {
if (event == null) {
return null;
}
event = interceptor.intercept(event); //调用interceptor处理event
}
return event;
}

@Override
public List<Event> intercept(List<Event> events) {
for (Interceptor interceptor : interceptors) {
if (events.isEmpty()) {
return events;
}
events = interceptor.intercept(events); //调用interceptor处理events
Preconditions.checkNotNull(events,
"Event list returned null from interceptor %s", interceptor);
}
return events;
}
interface Interceptor:
public Event intercept(Event event); //如果要删除event,返回null就可以了
public List<Event> intercept(List<Event> events);

ChannelSelector

ChannelSelector的子类需要实现下面两个方法,根据event返回必须的channel(RequiredChannels)和非必须的channel(OptionalChannels)。
RequiredChannels指的是在source发送event到channel的过程中如果发生的异常,需要告知source(向source抛出异常)
OptionalChannels指的是在source发送event到channel的过程中如果发生的异常,不需要告知source(不需要向source抛出异常,不过发生Error错误时会向source抛出)

1
2
public List<Channel> getRequiredChannels(Event event);
public List<Channel> getOptionalChannels(Event event);

ChannelProcessor.processEventBatch

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
public void processEventBatch(List<Event> events) {
Preconditions.checkNotNull(events, "Event list must not be null");

events = interceptorChain.intercept(events); //interceptor处理events

Map<Channel, List<Event>> reqChannelQueue =
new LinkedHashMap<Channel, List<Event>>();

Map<Channel, List<Event>> optChannelQueue =
new LinkedHashMap<Channel, List<Event>>();

for (Event event : events) {
List<Channel> reqChannels = selector.getRequiredChannels(event); //获取必须的Channel

for (Channel ch : reqChannels) { //初始化RequiredChannel的队列
List<Event> eventQueue = reqChannelQueue.get(ch);
if (eventQueue == null) {
eventQueue = new ArrayList<Event>();
reqChannelQueue.put(ch, eventQueue);
}
eventQueue.add(event);
}

List<Channel> optChannels = selector.getOptionalChannels(event); //获取可选的Channel

for (Channel ch : optChannels) { //初始化OptionalChannel的队列
List<Event> eventQueue = optChannelQueue.get(ch);
if (eventQueue == null) {
eventQueue = new ArrayList<Event>();
optChannelQueue.put(ch, eventQueue);
}

eventQueue.add(event);
}
}

// Process required channels
for (Channel reqChannel : reqChannelQueue.keySet()) {
Transaction tx = reqChannel.getTransaction();
Preconditions.checkNotNull(tx, "Transaction object must not be null");
try {
tx.begin(); //开启事务

List<Event> batch = reqChannelQueue.get(reqChannel); //获取channel的Events

for (Event event : batch) {
reqChannel.put(event); //发送event到channel中
}

tx.commit(); //events全部成功发送到channel中,提交事务
} catch (Throwable t) {
tx.rollback(); //出现异常,事务回滚,再抛出异常让source处理
if (t instanceof Error) {
LOG.error("Error while writing to required channel: " + reqChannel, t);
throw (Error) t;
} else if (t instanceof ChannelException) {
throw (ChannelException) t;
} else {
throw new ChannelException("Unable to put batch on required " +
"channel: " + reqChannel, t);
}
} finally {
if (tx != null) {
tx.close(); //关闭事务
}
}
}

// Process optional channels
for (Channel optChannel : optChannelQueue.keySet()) {
Transaction tx = optChannel.getTransaction();
Preconditions.checkNotNull(tx, "Transaction object must not be null");
try {
tx.begin(); //开启事务

List<Event> batch = optChannelQueue.get(optChannel); //获取channel的Events

for (Event event : batch) {
optChannel.put(event); //发送event到channel中
}

tx.commit(); //events全部成功发送到channel中,提交事务
} catch (Throwable t) {
tx.rollback(); //出现异常,事务回滚。
LOG.error("Unable to put batch on optional channel: " + optChannel, t);
//这里只是在捕获到Error的错误情况下才会对Source抛出异常,其他的情况在不会抛出异常
if (t instanceof Error) {
throw (Error) t;
}
} finally {
if (tx != null) {
tx.close(); //关闭事务
}
}
}
}

《Flume系列文章》 二、Flume源码分析之Flume Node

Posted on 2018-09-10

Flume Node源码分析

类Application.java(org.apache.flume.node.Application)为Flume Node的主入口,从这个开始进行代码分析。本分分析的代码在Flume源码包中flume-ng-node、flume-ng-core两个模块中。

读取配置文件

配置文件的读取是通过调用AbstractConfigurationProvider类的getConfiguration()方法。AbstractConfigurationProvider的子类通过实现getFlumeConfiguration()方法可以从不同的地方获取配置,参数”z”/“zkConnString”是来指定zookeeper地址。没有配置zookeeper地址的话就直接读取”f”/“conf-file”指定的配置文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public MaterializedConfiguration getConfiguration() {
MaterializedConfiguration conf = new SimpleMaterializedConfiguration();
//子类通过实现getFlumeConfiguration方法,从不同的地方获取配置数据(zookeeper、memory、file等)
FlumeConfiguration fconfig = getFlumeConfiguration();
AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName());
if (agentConf != null) {
Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap();
Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap();
Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap();
try {
loadChannels(agentConf, channelComponentMap); //加载chanels配置
loadSources(agentConf, channelComponentMap, sourceRunnerMap); //加载sources配置
loadSinks(agentConf, channelComponentMap, sinkRunnerMap); //加载sinks配置
Set<String> channelNames = new HashSet<String>(channelComponentMap.keySet());
for (String channelName : channelNames) {
ChannelComponent channelComponent = channelComponentMap.get(channelName);
if (channelComponent.components.isEmpty()) {
LOGGER.warn(String.format("Channel %s has no components connected" +
" and has been removed.", channelName));
channelComponentMap.remove(channelName);
Map<String, Channel> nameChannelMap =
channelCache.get(channelComponent.channel.getClass());
if (nameChannelMap != null) {
nameChannelMap.remove(channelName);
}
} else {
LOGGER.info(String.format("Channel %s connected to %s",
channelName, channelComponent.components.toString()));
conf.addChannel(channelName, channelComponent.channel);
}
}
for (Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) {
conf.addSourceRunner(entry.getKey(), entry.getValue());
}
for (Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) {
conf.addSinkRunner(entry.getKey(), entry.getValue());
}
} catch (InstantiationException ex) {
LOGGER.error("Failed to instantiate component", ex);
} finally {
channelComponentMap.clear();
sourceRunnerMap.clear();
sinkRunnerMap.clear();
}
} else {
LOGGER.warn("No configuration found for this host:{}", getAgentName());
}
return conf;
}

启动是有个参数”no-reload-conf”,是指定是否需要在修改配置文件后,重启Flume Node。对于文件方式的配置方式,指定需要在修改配置文件后重启Flume,就是启动了一个监控文件的线程,执行了如下操作,对配置文件的修改时间进行监控。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class FileWatcherRunnable implements Runnable {

private final File file;
private final CounterGroup counterGroup;

private long lastChange;

public FileWatcherRunnable(File file, CounterGroup counterGroup) {
super();
this.file = file;
this.counterGroup = counterGroup;
this.lastChange = 0L;
}

@Override
public void run() {
LOGGER.debug("Checking file:{} for changes", file);

counterGroup.incrementAndGet("file.checks");

long lastModified = file.lastModified();

if (lastModified > lastChange) {
LOGGER.info("Reloading configuration file:{}", file);

counterGroup.incrementAndGet("file.loads");

lastChange = lastModified;

try {
eventBus.post(getConfiguration());
} catch (Exception e) {
LOGGER.error("Failed to load configuration data. Exception follows.",
e);
} catch (NoClassDefFoundError e) {
LOGGER.error("Failed to start agent because dependencies were not " +
"found in classpath. Error follows.", e);
} catch (Throwable t) {
// caught because the caller does not handle or log Throwables
LOGGER.error("Unhandled error", t);
}
}
}
}

启动组件

加载配置后,会调用start()或者startAllComponents()方法启动所有的组件。在这两个方法中可以看到,组件是通过调用LifecycleSupervisor对象的supervise()方法进行启动的。在supervise()方法中,是执行了MonitorRunnable对象,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
public static class MonitorRunnable implements Runnable {

public ScheduledExecutorService monitorService;
public LifecycleAware lifecycleAware; //Flume组件(SourceRunner、Channel、SinkRunner)
public Supervisoree supervisoree; //管理组件状态,status属性记录组件的状态,policy属性是指定组件的运行方式(总是重启和只执行一次)

@Override
public void run() {
logger.debug("checking process:{} supervisoree:{}", lifecycleAware,
supervisoree);

long now = System.currentTimeMillis();

try {
if (supervisoree.status.firstSeen == null) {
logger.debug("first time seeing {}", lifecycleAware);

supervisoree.status.firstSeen = now;
}

supervisoree.status.lastSeen = now;
synchronized (lifecycleAware) {
if (supervisoree.status.discard) {
// Unsupervise has already been called on this.
logger.info("Component has already been stopped {}", lifecycleAware);
return;
} else if (supervisoree.status.error) {
logger.info("Component {} is in error state, and Flume will not"
+ "attempt to change its state", lifecycleAware);
return;
}

supervisoree.status.lastSeenState = lifecycleAware.getLifecycleState();

if (!lifecycleAware.getLifecycleState().equals(
supervisoree.status.desiredState)) {

logger.debug("Want to transition {} from {} to {} (failures:{})",
new Object[] { lifecycleAware, supervisoree.status.lastSeenState,
supervisoree.status.desiredState,
supervisoree.status.failures });

switch (supervisoree.status.desiredState) {
case START:
try {
lifecycleAware.start();
} catch (Throwable e) {
logger.error("Unable to start " + lifecycleAware
+ " - Exception follows.", e);
if (e instanceof Error) {
// This component can never recover, shut it down.
supervisoree.status.desiredState = LifecycleState.STOP;
try {
lifecycleAware.stop();
logger.warn("Component {} stopped, since it could not be"
+ "successfully started due to missing dependencies",
lifecycleAware);
} catch (Throwable e1) {
logger.error("Unsuccessful attempt to "
+ "shutdown component: {} due to missing dependencies."
+ " Please shutdown the agent"
+ "or disable this component, or the agent will be"
+ "in an undefined state.", e1);
supervisoree.status.error = true;
if (e1 instanceof Error) {
throw (Error) e1;
}
// Set the state to stop, so that the conf poller can
// proceed.
}
}
supervisoree.status.failures++;
}
break;
case STOP:
try {
lifecycleAware.stop();
} catch (Throwable e) {
logger.error("Unable to stop " + lifecycleAware
+ " - Exception follows.", e);
if (e instanceof Error) {
throw (Error) e;
}
supervisoree.status.failures++;
}
break;
default:
logger.warn("I refuse to acknowledge {} as a desired state",
supervisoree.status.desiredState);
}

if (!supervisoree.policy.isValid(lifecycleAware, supervisoree.status)) {
logger.error(
"Policy {} of {} has been violated - supervisor should exit!",
supervisoree.policy, lifecycleAware);
}
}
}
} catch (Throwable t) {
logger.error("Unexpected error", t);
}
logger.debug("Status check complete");
}
}

《Flume系列文章》 一、Flume的使用

Posted on 2018-09-10

官方文档

官网地址:http://flume.apache.org
文档地址:http://flume.apache.org/FlumeUserGuide.html
下载地址:http://flume.apache.org/download.html

Flume结构

Events

Event是Flume中数据流动的单元。

Agent

完整Flume实例,是一个独立的JVM程序。

Source

Source是从外部系统摄入数据,生成Events到Channel中。

Channel

Channel是一个数据通道,在Source和Channel之间,起到一个缓冲数据的作用。

Sink

Sink是从Channel中取出Events,并将数据落入到外部系统中。

Flume使用

下载

下载Flume的包并解压,我使用的是1.8.0的版本(apache-flume-1.8.0-bin.tar.gz)。

JAVA_HOME和JAVA_OPTS配置

conf/flume-env.sh.template是Flume运行环境的配置模板,可以复制一份flume-env.sh文件,配置JAVA_HOME和JAVA_OPTS。

Flume Agent配置

conf/flume-conf.properties.template是Flume Agent配置模板。
接下来我需要配置一个Http Source、一个Memory Channel和一个Kafka Sink,来实现Http接口上报日志数据落入到Kafka消息队列中。

定义Agent:

1
2
3
agent.sources = httpSrc
agent.channels = memoryChannel
agent.sinks = kafkaSink

配置Source
参考:http://flume.apache.org/FlumeUserGuide.html#http-source

1
2
3
4
agent.sources.httpSrc.type = http
agent.sources.httpSrc.port = 5140
agent.sources.httpSrc.handler = org.apache.flume.source.http.JSONHandler
agent.sources.httpSrc.channels = memoryChannel

配置Channel
参考:http://flume.apache.org/FlumeUserGuide.html#memory-channel

1
2
3
agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 10 # Channel中存储的最大Event数
agent.channels.memoryChannel.transactionCapacity = 8 # Channel每个事务从Source获取Events最大数或Channel每个事务提供给Sink的Events最大数。

配置Sink
参考:http://flume.apache.org/FlumeUserGuide.html#kafka-sink

1
2
3
4
5
6
7
8
agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafkaSink.kafka.topic = wz_test
agent.sinks.kafkaSink.kafka.bootstrap.servers = 192.168.1.180:9092
agent.sinks.kafkaSink.kafka.flumeBatchSize = 10
agent.sinks.kafkaSink.kafka.producer.acks = 1
agent.sinks.kafkaSink.kafka.producer.linger.ms = 1
agent.sinks.kafkaSink.kafka.producer.compression.type = gzip
agent.sinks.kafkaSink.channel = memoryChannel

启动Agent

1
2
3
4
5
bin/flume-ng agent \
-n agent \ #agent名称
-c ./conf \ #配置文件目录
-f ./conf/flume-conf.properties \ #agent配置文件
-Dflume.monitoring.type=http -Dflume.monitoring.port=34545 #监控
<i class="fa fa-angle-left"></i>1…45

45 posts
28 tags
GitHub
© 2022 wangz