《Flume系列文章》 二、Flume源码分析之Flume Node

Flume Node源码分析

类Application.java(org.apache.flume.node.Application)为Flume Node的主入口,从这个开始进行代码分析。本分分析的代码在Flume源码包中flume-ng-node、flume-ng-core两个模块中。

读取配置文件

配置文件的读取是通过调用AbstractConfigurationProvider类的getConfiguration()方法。AbstractConfigurationProvider的子类通过实现getFlumeConfiguration()方法可以从不同的地方获取配置,参数”z”/“zkConnString”是来指定zookeeper地址。没有配置zookeeper地址的话就直接读取”f”/“conf-file”指定的配置文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public MaterializedConfiguration getConfiguration() {
MaterializedConfiguration conf = new SimpleMaterializedConfiguration();
//子类通过实现getFlumeConfiguration方法,从不同的地方获取配置数据(zookeeper、memory、file等)
FlumeConfiguration fconfig = getFlumeConfiguration();
AgentConfiguration agentConf = fconfig.getConfigurationFor(getAgentName());
if (agentConf != null) {
Map<String, ChannelComponent> channelComponentMap = Maps.newHashMap();
Map<String, SourceRunner> sourceRunnerMap = Maps.newHashMap();
Map<String, SinkRunner> sinkRunnerMap = Maps.newHashMap();
try {
loadChannels(agentConf, channelComponentMap); //加载chanels配置
loadSources(agentConf, channelComponentMap, sourceRunnerMap); //加载sources配置
loadSinks(agentConf, channelComponentMap, sinkRunnerMap); //加载sinks配置
Set<String> channelNames = new HashSet<String>(channelComponentMap.keySet());
for (String channelName : channelNames) {
ChannelComponent channelComponent = channelComponentMap.get(channelName);
if (channelComponent.components.isEmpty()) {
LOGGER.warn(String.format("Channel %s has no components connected" +
" and has been removed.", channelName));
channelComponentMap.remove(channelName);
Map<String, Channel> nameChannelMap =
channelCache.get(channelComponent.channel.getClass());
if (nameChannelMap != null) {
nameChannelMap.remove(channelName);
}
} else {
LOGGER.info(String.format("Channel %s connected to %s",
channelName, channelComponent.components.toString()));
conf.addChannel(channelName, channelComponent.channel);
}
}
for (Map.Entry<String, SourceRunner> entry : sourceRunnerMap.entrySet()) {
conf.addSourceRunner(entry.getKey(), entry.getValue());
}
for (Map.Entry<String, SinkRunner> entry : sinkRunnerMap.entrySet()) {
conf.addSinkRunner(entry.getKey(), entry.getValue());
}
} catch (InstantiationException ex) {
LOGGER.error("Failed to instantiate component", ex);
} finally {
channelComponentMap.clear();
sourceRunnerMap.clear();
sinkRunnerMap.clear();
}
} else {
LOGGER.warn("No configuration found for this host:{}", getAgentName());
}
return conf;
}

启动是有个参数”no-reload-conf”,是指定是否需要在修改配置文件后,重启Flume Node。对于文件方式的配置方式,指定需要在修改配置文件后重启Flume,就是启动了一个监控文件的线程,执行了如下操作,对配置文件的修改时间进行监控。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class FileWatcherRunnable implements Runnable {

private final File file;
private final CounterGroup counterGroup;

private long lastChange;

public FileWatcherRunnable(File file, CounterGroup counterGroup) {
super();
this.file = file;
this.counterGroup = counterGroup;
this.lastChange = 0L;
}

@Override
public void run() {
LOGGER.debug("Checking file:{} for changes", file);

counterGroup.incrementAndGet("file.checks");

long lastModified = file.lastModified();

if (lastModified > lastChange) {
LOGGER.info("Reloading configuration file:{}", file);

counterGroup.incrementAndGet("file.loads");

lastChange = lastModified;

try {
eventBus.post(getConfiguration());
} catch (Exception e) {
LOGGER.error("Failed to load configuration data. Exception follows.",
e);
} catch (NoClassDefFoundError e) {
LOGGER.error("Failed to start agent because dependencies were not " +
"found in classpath. Error follows.", e);
} catch (Throwable t) {
// caught because the caller does not handle or log Throwables
LOGGER.error("Unhandled error", t);
}
}
}
}

启动组件

加载配置后,会调用start()或者startAllComponents()方法启动所有的组件。在这两个方法中可以看到,组件是通过调用LifecycleSupervisor对象的supervise()方法进行启动的。在supervise()方法中,是执行了MonitorRunnable对象,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
public static class MonitorRunnable implements Runnable {

public ScheduledExecutorService monitorService;
public LifecycleAware lifecycleAware; //Flume组件(SourceRunner、Channel、SinkRunner)
public Supervisoree supervisoree; //管理组件状态,status属性记录组件的状态,policy属性是指定组件的运行方式(总是重启和只执行一次)

@Override
public void run() {
logger.debug("checking process:{} supervisoree:{}", lifecycleAware,
supervisoree);

long now = System.currentTimeMillis();

try {
if (supervisoree.status.firstSeen == null) {
logger.debug("first time seeing {}", lifecycleAware);

supervisoree.status.firstSeen = now;
}

supervisoree.status.lastSeen = now;
synchronized (lifecycleAware) {
if (supervisoree.status.discard) {
// Unsupervise has already been called on this.
logger.info("Component has already been stopped {}", lifecycleAware);
return;
} else if (supervisoree.status.error) {
logger.info("Component {} is in error state, and Flume will not"
+ "attempt to change its state", lifecycleAware);
return;
}

supervisoree.status.lastSeenState = lifecycleAware.getLifecycleState();

if (!lifecycleAware.getLifecycleState().equals(
supervisoree.status.desiredState)) {

logger.debug("Want to transition {} from {} to {} (failures:{})",
new Object[] { lifecycleAware, supervisoree.status.lastSeenState,
supervisoree.status.desiredState,
supervisoree.status.failures });

switch (supervisoree.status.desiredState) {
case START:
try {
lifecycleAware.start();
} catch (Throwable e) {
logger.error("Unable to start " + lifecycleAware
+ " - Exception follows.", e);
if (e instanceof Error) {
// This component can never recover, shut it down.
supervisoree.status.desiredState = LifecycleState.STOP;
try {
lifecycleAware.stop();
logger.warn("Component {} stopped, since it could not be"
+ "successfully started due to missing dependencies",
lifecycleAware);
} catch (Throwable e1) {
logger.error("Unsuccessful attempt to "
+ "shutdown component: {} due to missing dependencies."
+ " Please shutdown the agent"
+ "or disable this component, or the agent will be"
+ "in an undefined state.", e1);
supervisoree.status.error = true;
if (e1 instanceof Error) {
throw (Error) e1;
}
// Set the state to stop, so that the conf poller can
// proceed.
}
}
supervisoree.status.failures++;
}
break;
case STOP:
try {
lifecycleAware.stop();
} catch (Throwable e) {
logger.error("Unable to stop " + lifecycleAware
+ " - Exception follows.", e);
if (e instanceof Error) {
throw (Error) e;
}
supervisoree.status.failures++;
}
break;
default:
logger.warn("I refuse to acknowledge {} as a desired state",
supervisoree.status.desiredState);
}

if (!supervisoree.policy.isValid(lifecycleAware, supervisoree.status)) {
logger.error(
"Policy {} of {} has been violated - supervisor should exit!",
supervisoree.policy, lifecycleAware);
}
}
}
} catch (Throwable t) {
logger.error("Unexpected error", t);
}
logger.debug("Status check complete");
}
}