《Flume系列文章》 一、Flume的使用

官方文档

官网地址:http://flume.apache.org
文档地址:http://flume.apache.org/FlumeUserGuide.html
下载地址:http://flume.apache.org/download.html

Flume结构

Events

Event是Flume中数据流动的单元。

Agent

完整Flume实例,是一个独立的JVM程序。

Source

Source是从外部系统摄入数据,生成Events到Channel中。

Channel

Channel是一个数据通道,在Source和Channel之间,起到一个缓冲数据的作用。

Sink

Sink是从Channel中取出Events,并将数据落入到外部系统中。

Flume使用

下载

下载Flume的包并解压,我使用的是1.8.0的版本(apache-flume-1.8.0-bin.tar.gz)。

JAVA_HOME和JAVA_OPTS配置

conf/flume-env.sh.template是Flume运行环境的配置模板,可以复制一份flume-env.sh文件,配置JAVA_HOME和JAVA_OPTS。

Flume Agent配置

conf/flume-conf.properties.template是Flume Agent配置模板。
接下来我需要配置一个Http Source、一个Memory Channel和一个Kafka Sink,来实现Http接口上报日志数据落入到Kafka消息队列中。

定义Agent:

1
2
3
agent.sources = httpSrc
agent.channels = memoryChannel
agent.sinks = kafkaSink

配置Source
参考:http://flume.apache.org/FlumeUserGuide.html#http-source

1
2
3
4
agent.sources.httpSrc.type = http
agent.sources.httpSrc.port = 5140
agent.sources.httpSrc.handler = org.apache.flume.source.http.JSONHandler
agent.sources.httpSrc.channels = memoryChannel

配置Channel
参考:http://flume.apache.org/FlumeUserGuide.html#memory-channel

1
2
3
agent.channels.memoryChannel.type = memory
agent.channels.memoryChannel.capacity = 10 # Channel中存储的最大Event数
agent.channels.memoryChannel.transactionCapacity = 8 # Channel每个事务从Source获取Events最大数或Channel每个事务提供给Sink的Events最大数。

配置Sink
参考:http://flume.apache.org/FlumeUserGuide.html#kafka-sink

1
2
3
4
5
6
7
8
agent.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
agent.sinks.kafkaSink.kafka.topic = wz_test
agent.sinks.kafkaSink.kafka.bootstrap.servers = 192.168.1.180:9092
agent.sinks.kafkaSink.kafka.flumeBatchSize = 10
agent.sinks.kafkaSink.kafka.producer.acks = 1
agent.sinks.kafkaSink.kafka.producer.linger.ms = 1
agent.sinks.kafkaSink.kafka.producer.compression.type = gzip
agent.sinks.kafkaSink.channel = memoryChannel

启动Agent

1
2
3
4
5
bin/flume-ng agent \
-n agent \ #agent名称
-c ./conf \ #配置文件目录
-f ./conf/flume-conf.properties \ #agent配置文件
-Dflume.monitoring.type=http -Dflume.monitoring.port=34545 #监控