《Flume系列文章》 五、Flume源码分析之Flume-Channel

Flume Channel

AbstractConfigurationProvider的loadChannels()方法,加载配置文件中Channel的配置。loadChannels()中调用getOrCreateChannel()方法,通过DefaultChannelFactory.create()根据type创建不同的Channel。Flume目前实现的Channel有:Memory Channel、JDBC Channel、Kafka Channel、File Channel、Spillable Memory Channel(events存储在内存或磁盘上(溢出的时候),试验阶段不建议生成环境使用),具体配置参考:http://flume.apache.org/FlumeUserGuide.html#flume-channels

Memory Channel

这里具体对Memory Channel源码进行分析,学习一下Channel中事务机制的实现。

1
2
3
4
5
6
7
8
9
private Object queueLock = new Object();		//用于锁定queue
@GuardedBy(value = "queueLock")
private LinkedBlockingDeque<Event> queue; //Channel中存放events的队列

private Semaphore queueStored; //用于控制put和take操作的信号量,初始值为0,put操作release添加许可证,take操作tryAcquire请求许可证

private Semaphore queueRemaining; //用于控制Channel中最大event数的信号量,初始值为capacity配置。

private Semaphore bytesRemaining; //用于控制Channel中最大字节数的信号量,初始值为byteCapacity配置。

MemoryTransaction

前面的文章中可以知道,Source是通过ChannelProcessor.processEventBatch()向Channel中put events,Sink在process()中从Channel中take events。Channel对Event具体的操作是通过MemoryTransaction实现的,MemoryTransaction中定义了两个LinkedBlockingDeque(阻塞双端队列),takeList和putList,用来缓存一次事务中take或者put操作的events。putByteCounter和takeByteCounter对象是对events的bytes计数。
Channel事务中定义了put、take、commit、rollback四个操作。具体的实现:
put操作把event put到putList中;
take操作从queue中take event到takeList中,并返回event;
commit操作将putList里面events put到queue中,并清空takeList;
rollback操作是将takeList里面的events put回queue中,并清空putList。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
private class MemoryTransaction extends BasicTransactionSemantics {
private LinkedBlockingDeque<Event> takeList; //存放take操作的events
private LinkedBlockingDeque<Event> putList; //存放put操作的events
private final ChannelCounter channelCounter; //channel counter(监控)
private int putByteCounter = 0; //put events的总bytes
private int takeByteCounter = 0; //take events的总bytes

public MemoryTransaction(int transCapacity, ChannelCounter counter) {
//transCapacity 是一次事务中操作events的最大数量
putList = new LinkedBlockingDeque<Event>(transCapacity);
takeList = new LinkedBlockingDeque<Event>(transCapacity);

channelCounter = counter;
}

//put操作是把event put到putList中
@Override
protected void doPut(Event event) throws InterruptedException {
channelCounter.incrementEventPutAttemptCount();
int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize);

if (!putList.offer(event)) { //向putList中offer event,满了的话抛出异常
throw new ChannelException(
"Put queue for MemoryTransaction of capacity " +
putList.size() + " full, consider committing more frequently, " +
"increasing capacity or increasing thread count");
}
putByteCounter += eventByteSize;
}

//take操作是从channel 的 events queue中take event到takeList中
@Override
protected Event doTake() throws InterruptedException {
channelCounter.incrementEventTakeAttemptCount();
if (takeList.remainingCapacity() == 0) { //takeList满了的话抛出异常
throw new ChannelException("Take list for MemoryTransaction, capacity " +
takeList.size() + " full, consider committing more frequently, " +
"increasing capacity, or increasing thread count");
}
if (!queueStored.tryAcquire(keepAlive, TimeUnit.SECONDS)) { //请求许可
return null;
}
Event event;
synchronized (queueLock) { //锁定queue,poll一个event
event = queue.poll();
}
Preconditions.checkNotNull(event, "Queue.poll returned NULL despite semaphore " +
"signalling existence of entry");
takeList.put(event); //将event存入takeList

int eventByteSize = (int) Math.ceil(estimateEventSize(event) / byteCapacitySlotSize);
takeByteCounter += eventByteSize;

return event;
}

//commit操作是将putList里面events put到queue中,并清空takeList
@Override
protected void doCommit() throws InterruptedException {
int remainingChange = takeList.size() - putList.size();
if (remainingChange < 0) {
//判断put到queue中是否会导致,channel中最大字节数操作byteCapacity
if (!bytesRemaining.tryAcquire(putByteCounter, keepAlive, TimeUnit.SECONDS)) {
throw new ChannelException("Cannot commit transaction. Byte capacity " +
"allocated to store event body " + byteCapacity * byteCapacitySlotSize +
"reached. Please increase heap space/byte capacity allocated to " +
"the channel as the sinks may not be keeping up with the sources");
}
//判断提交本次事务是否会导致channel中的events的数量超过capacity
if (!queueRemaining.tryAcquire(-remainingChange, keepAlive, TimeUnit.SECONDS)) {
bytesRemaining.release(putByteCounter);
throw new ChannelFullException("Space for commit to queue couldn't be acquired." +
" Sinks are likely not keeping up with sources, or the buffer size is too tight");
}
}
int puts = putList.size();
int takes = takeList.size();
synchronized (queueLock) { //将putList里面events put到queue中,并清空takeList
if (puts > 0) {
while (!putList.isEmpty()) {
if (!queue.offer(putList.removeFirst())) {
throw new RuntimeException("Queue add failed, this shouldn't be able to happen");
}
}
}
putList.clear();
takeList.clear();
}
bytesRemaining.release(takeByteCounter);
takeByteCounter = 0;
putByteCounter = 0;

queueStored.release(puts);
if (remainingChange > 0) {
queueRemaining.release(remainingChange);
}
if (puts > 0) {
channelCounter.addToEventPutSuccessCount(puts);
}
if (takes > 0) {
channelCounter.addToEventTakeSuccessCount(takes);
}

channelCounter.setChannelSize(queue.size());
}

//rollback操作是将takeList里面的events put回queue中,并清空putList
@Override
protected void doRollback() {
int takes = takeList.size();
synchronized (queueLock) {
Preconditions.checkState(queue.remainingCapacity() >= takeList.size(),
"Not enough space in memory channel " +
"queue to rollback takes. This should never happen, please report");
while (!takeList.isEmpty()) {
queue.addFirst(takeList.removeLast());
}
putList.clear();
}
putByteCounter = 0;
takeByteCounter = 0;

queueStored.release(takes);
channelCounter.setChannelSize(queue.size());
}

}