SourceRunner
在上片文章中,Flume Node启动的是SourceRunner,SourceRunner的实现类有两个,PollableSourceRunner和EventDrivenSourceRunner。
PollableSourceRunner的start方法会启动一个线程,线程内部通过while不断调用PollableSource的process()方法。EventDrivenSourceRunner的start方法则只调用一次EventDrivenSource的start()方法。
1 | public static class PollingRunner implements Runnable { |
HTTPSource
HTTPSource是EventDrivenSource的一个实现类。start()方法中启动了一个jetty server,在doPost方法中接收events然后发送给channel,下面看一个FlumeHTTPServlet的doPost()方法。
1 | public void doPost(HttpServletRequest request, HttpServletResponse response) |
KafkaSource
KafkaSource是PollableSource的一个实现类。doProcess()方法获取kafka的数据并通过ChannelProcessor发送到Channel中,处理完成后提交offset。
1 | protected Status doProcess() throws EventDeliveryException { |
ChannelProcessor
在Source中是通过ChannelProcessor的processEventBatch/processEvent将events发送到channel中,在方法中通过事务机制保证了数据一致性,保证了一次processEventBatch/processEvent方法要么将全部的events放入channel中要么都不会放入channel中,后面在详细对Flume的事务进行分析。在ChannelProcessor还包含了ChannelSelector和InterceptorChain。
ChannelSelector是根据events的相关数据,将events发送到对应的channel中,参考:http://flume.apache.org/FlumeUserGuide.html#flume-channel-selectors。
Interceptor可以在events发送到channel之前对events进行修改或删除,参考:http://flume.apache.org/FlumeUserGuide.html#flume-interceptors。
InterceptorChain
自定义的Interceptor通过实现intercept方法,可以对events进行修改或删除。不过有一个局限就是”public Event intercept(Event event);”这个方法不能够将一个event转成多个event,在项目中我遇到的一个问题是kafka source中消费的event是一个JSON数组字符串,需要转换成单个event进行落盘。解决方法是修改kafkasource代码,在里面添加一个Transform对象,进行转换操作。
1 | class InterceptorChain: |
ChannelSelector
ChannelSelector的子类需要实现下面两个方法,根据event返回必须的channel(RequiredChannels)和非必须的channel(OptionalChannels)。
RequiredChannels指的是在source发送event到channel的过程中如果发生的异常,需要告知source(向source抛出异常)
OptionalChannels指的是在source发送event到channel的过程中如果发生的异常,不需要告知source(不需要向source抛出异常,不过发生Error错误时会向source抛出)
1 | public List<Channel> getRequiredChannels(Event event); |
ChannelProcessor.processEventBatch
1 | public void processEventBatch(List<Event> events) { |