SinkRunner
SinkRunner是Sink组件启动的入口,start方法里面可以看到启动了一个线程执行PollingRunner Runnable对象,PollingRunner的实现跟上篇PollableSourceRunner里面PollingRunner的实现一样,循环调用SinkProcessor的proces方法。
SinkProcessor
Sink部分有一个SinkGroup的概念,将一些Sink放在一个SinkGroup里面并指定一个SinkProcessor,可以实现多个Sink的故障转移(failover)或负载均衡(load_balance)等。
SinkProcessor有三个实现类:DefaultSinkProcessor、FailoverSinkProcessor、LoadBalancingSinkProcessor。
DefaultSinkProcessor
在没有指定SinkGroup的时候会对每个Sink启动一个SinkRunner并指定为DefaultSinkProcessor。DefaultSinkProcessor只会接收一个Sink,在process方法里面也只是调用了Sink的process方法。
1 | @Override |
FailoverSinkProcessor
SinkGroup的processor.type设置为failover时,对SinkGroup里面的Sink实行故障转移。FailoverSinkProcessor将有效的Sink存在通过优先级进行排序TreeMap结构的liveSinks中,将失效的Sink存在优先级队列failedSinks中。首先在TreeMap中获取第一个有效的Sink,循环执行process方法,当出现异常时,将这个Sink从TreeMap中移除并放入到失效的队列中,再从TreeMap中获取下一个有效的Sink继续执行process方法,当没有有效的Sink时,会对failedSinks中的sink进行刷新。下面看一下FailoverSinkProcessor的实现。
1 | @Override |
LoadBalancingSinkProcessor
SinkGroup的processor.type设置为load_balance时,对SinkGroup里面的Sink实行负载均衡。负载均衡的实现是通过实现SinkSelector对象的createSinkIterator方法,生成Sinks排序不同的的Iterator,已经实现了循环(round_robin)和随机(random)两种方式,也可以通过继承AbstractSinkSelector类自己进行实现。
1 | @Override |
Sink
Sink的实现就不做具体的分析了,可以通过继承AbstractSink类,编写自定义的Sink,Sink的process方法中对Channel数据进行读取需要开启事务操作。下面是HiveSink的Process方法实现。
1 | public Status process() throws EventDeliveryException { |