Flume Channel
AbstractConfigurationProvider的loadChannels()方法,加载配置文件中Channel的配置。loadChannels()中调用getOrCreateChannel()方法,通过DefaultChannelFactory.create()根据type创建不同的Channel。Flume目前实现的Channel有:Memory Channel、JDBC Channel、Kafka Channel、File Channel、Spillable Memory Channel(events存储在内存或磁盘上(溢出的时候),试验阶段不建议生成环境使用),具体配置参考:http://flume.apache.org/FlumeUserGuide.html#flume-channels。
Memory Channel
这里具体对Memory Channel源码进行分析,学习一下Channel中事务机制的实现。
1 | private Object queueLock = new Object(); //用于锁定queue |
MemoryTransaction
前面的文章中可以知道,Source是通过ChannelProcessor.processEventBatch()向Channel中put events,Sink在process()中从Channel中take events。Channel对Event具体的操作是通过MemoryTransaction实现的,MemoryTransaction中定义了两个LinkedBlockingDeque(阻塞双端队列),takeList和putList,用来缓存一次事务中take或者put操作的events。putByteCounter和takeByteCounter对象是对events的bytes计数。
Channel事务中定义了put、take、commit、rollback四个操作。具体的实现:
put操作把event put到putList中;
take操作从queue中take event到takeList中,并返回event;
commit操作将putList里面events put到queue中,并清空takeList;
rollback操作是将takeList里面的events put回queue中,并清空putList。
1 | private class MemoryTransaction extends BasicTransactionSemantics { |