wForget's blog


  • Home

  • About

  • Tags

  • Archives

Redis 发布订阅

Posted on 2018-11-06

Redis 发布订阅

Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。Redis 客户端可以订阅任意数量的 channel。

下图展示了频道 channel1 , 以及订阅这个频道的三个客户端 —— client2 、 client5 和 client1 之间的关系:

当有新消息通过 PUBLISH 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个客户端:

Redis 发布订阅命令

SUBSCRIBE channel [channel …]

订阅一个或多个 channel

1
2
3
4
5
127.0.0.1:6380> SUBSCRIBE testChannel
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "testChannel"
3) (integer) 1

UNSUBSCRIBE [channel [channel …]]

退订一个或多个 channel

PSUBSCRIBE pattern [pattern …]

订阅所有匹配一个或多个 pattern 的 channel

PUNSUBSCRIBE [pattern [pattern …]]

退订所有匹配一个或多个 pattern 的 channel

PUBLISH channel message

发布消息到指定的 channel

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# 打开一个新的控制台发布消息到 testChannel
127.0.0.1:6380> publish testChannel "test"
(integer) 1
127.0.0.1:6380> publish testChannel "test2"
(integer) 1

# 查看订阅 testChannel 的控制台
127.0.0.1:6380> SUBSCRIBE testChannel
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "testChannel"
3) (integer) 1
1) "message"
2) "testChannel"
3) "test"
1) "message"
2) "testChannel"
3) "test2"

PUBSUB subcommand [argument [argument …]]

查看订阅与发布系统状态。

1
2
127.0.0.1:6380> PUBSUB channels
1) "testChannel"

Java 实现 Redis 发布订阅

JavaApi 向 channel 发送消息可以直接调用 Jedis 的 publish(final String channel, final String message) 方法。订阅 channel 则需要调用 Jedis 的 subscribe(final JedisPubSub jedisPubSub, final String… channels)方法,并实现 JedisPubSub 类的 onMessage() 方法来处理消息。
Jedis 的 subscribe() 方法中调用了 JedisPubSub 的 proceed()方法,proceed()方法中订阅 channels 并调用了 process() 方法。process() 方法会一直循环调用 onMessage() 方法处理消息。所以使用 subscribe() 方法时,一般开启新的线程进行订阅,要不然 subscribe() 方法后面的代码不会执行。
下面是 JedisPubSub proceed()方法和 process() 方法的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
public void proceed(Client client, String... channels) {
this.client = client;
client.subscribe(channels);
client.flush();
process(client);
}

private void process(Client client) {

do {
List<Object> reply = client.getRawObjectMultiBulkReply();
final Object firstObj = reply.get(0);
if (!(firstObj instanceof byte[])) {
throw new JedisException("Unknown message type: " + firstObj);
}
final byte[] resp = (byte[]) firstObj;
if (Arrays.equals(SUBSCRIBE.raw, resp)) {
subscribedChannels = ((Long) reply.get(2)).intValue();
final byte[] bchannel = (byte[]) reply.get(1);
final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
onSubscribe(strchannel, subscribedChannels);
} else if (Arrays.equals(UNSUBSCRIBE.raw, resp)) {
subscribedChannels = ((Long) reply.get(2)).intValue();
final byte[] bchannel = (byte[]) reply.get(1);
final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
onUnsubscribe(strchannel, subscribedChannels);
} else if (Arrays.equals(MESSAGE.raw, resp)) {
final byte[] bchannel = (byte[]) reply.get(1);
final byte[] bmesg = (byte[]) reply.get(2);
final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
final String strmesg = (bmesg == null) ? null : SafeEncoder.encode(bmesg);
onMessage(strchannel, strmesg);
} else if (Arrays.equals(PMESSAGE.raw, resp)) {
final byte[] bpattern = (byte[]) reply.get(1);
final byte[] bchannel = (byte[]) reply.get(2);
final byte[] bmesg = (byte[]) reply.get(3);
final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
final String strchannel = (bchannel == null) ? null : SafeEncoder.encode(bchannel);
final String strmesg = (bmesg == null) ? null : SafeEncoder.encode(bmesg);
onPMessage(strpattern, strchannel, strmesg);
} else if (Arrays.equals(PSUBSCRIBE.raw, resp)) {
subscribedChannels = ((Long) reply.get(2)).intValue();
final byte[] bpattern = (byte[]) reply.get(1);
final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
onPSubscribe(strpattern, subscribedChannels);
} else if (Arrays.equals(PUNSUBSCRIBE.raw, resp)) {
subscribedChannels = ((Long) reply.get(2)).intValue();
final byte[] bpattern = (byte[]) reply.get(1);
final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
onPUnsubscribe(strpattern, subscribedChannels);
} else if (Arrays.equals(PONG.raw, resp)) {
final byte[] bpattern = (byte[]) reply.get(1);
final String strpattern = (bpattern == null) ? null : SafeEncoder.encode(bpattern);
onPong(strpattern);
} else {
throw new JedisException("Unknown message type: " + firstObj);
}
} while (isSubscribed());

下面是一个简单的实现发布的订阅模式的代码,github地址:PubsubDemo.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
public class PubsubDemo {

private final String ip;
private final int port;
private final String channel;
private final JedisPool jedisPool;

public PubsubDemo(String ip, int port, String channel) {
this.ip = ip;
this.port = port;
this.channel = channel;
JedisPoolConfig config = new JedisPoolConfig();
jedisPool = new JedisPool(config, ip, port); // 初始化JedisPool
}

/**
* Subscribe 订阅线程
* 订阅 channel,并打印接收到的消息
*/
class Subscribe implements Runnable {

@Override
public void run() {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.subscribe(new JedisPubSub() {
@Override
public void onMessage(String channel, String message) {
System.out.println("receive message[ channel: " + channel + ", message: " + message + "]");
}
}, channel);
} finally {
if (jedis != null) {
jedis.close();
}
}
}
}

/**
* Publish 发布线程
* 向 channel 中每秒发送一个递增的数
*/
class Publish implements Runnable {

@Override
public void run() {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
int i = 0;
while (true) {
jedis.publish(channel, String.valueOf(i++));
Thread.sleep(1000);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
if (jedis != null) {
jedis.close();
}
}
}
}

// 启动方法
public void run() throws Exception {
// 启动一个 Subscribe 订阅线程
Thread subscribeThread = new Thread(new Subscribe(), "SubscribeThread");
subscribeThread.start();

// 启动一个 Publish 发布线程
Thread publishThread = new Thread(new Publish(), "PublishThread");
publishThread.start();

publishThread.join();
subscribeThread.join();
}


public static void main(String[] args) throws Exception {
final String ip = "192.168.1.153";
final int port = 6380;
final String channel = "testChannel";

PubsubDemo pubsub = new PubsubDemo(ip, port, channel);
pubsub.run();
}
}

Kafka概述

Posted on 2018-10-16

介绍

使用场景

  • 构造实时流数据管道,它可以在系统或应用之间可靠地获取数据。(相当于message queue)
  • 构建实时流式应用程序,对这些流数据进行转换或者影响。 (就是流处理,通过kafka stream topic和topic之间内部进行变化)

概念

  • Kafka 作为一个集群,运行在一台或者多台服务器上.
  • Kafka 通过 topic 对存储的流数据进行分类。
  • 每条记录中包含一个key,一个value和一个timestamp。

四个核心API

  • Producer API 允许一个应用程序发布一串流式的数据到一个或者多个Kafka topic。
  • Consumer API 允许一个应用程序订阅一个或多个 topic ,并且对发布给他们的流式数据进行处理。
  • Streams API 允许一个应用程序作为一个流处理器,消费一个或者多个topic产生的输入流,然后生产一个输出流到一个或多个topic中去,在输入输出流中进行有效的转换。
  • Connector API 允许构建并运行可重用的生产者或者消费者,将Kafka topics连接到已存在的应用程序或者数据系统。比如,连接到一个关系型数据库,捕捉表(table)的所有变更内容。

Tomcat配置APR运行模式

Posted on 2018-10-12

Tomcat三种运行模式

BIO(blocking I/O)

阻塞式I/O操作,表示Tomcat使用的是传统的Java I/O操作(即java.io包及其子包),对于每一个请求都要创建一个线程来进行处理,所以开销较大不适合处理高并发的场景。

NIO(New IO)

基于java中非阻塞IO操作的API实现(即java.nio包及其子包),比传统的i/o处理方式有更高的并发运行性能。

APR(Apache Portable Runtime/Apache可移植运行库)

Apache Portable Runtime是Apache HTTP服务器的支持库。可以简单地理解为,Tomcat将以JNI的形式调用Apache HTTP服务器的核心动态链接库来处理文件读取或网络传输操作,从而大大地提高Tomcat对静态文件的处理性能。 Tomcat apr也是在Tomcat上运行高并发应用的首选模式。

安装APR

安装gcc

查看gcc是否安装,gcc –version,没有安装的话进行安装

下载tomcat、apr、apr-util、openssl

1
2
3
4
5
wget https://mirrors.tuna.tsinghua.edu.cn/apache/tomcat/tomcat-7/v7.0.72/bin/apache-tomcat-7.0.72.tar.gz
wget http://mirrors.tuna.tsinghua.edu.cn/apache/apr/apr-1.5.2.tar.gz
wget http://mirrors.tuna.tsinghua.edu.cn/apache/apr/apr-util-1.5.4.tar.gz

wget https://www.openssl.org/source/openssl-1.0.2k.tar.gz

编译apr

1
2
3
4
sudo mkdir /usr/local/apr
sudo chown hadoop:hadoop /usr/local/apr
cd apr-1.5.2
./configure && make && make install

编译apr-util

1
2
cd apr-util-1.5.4
./configure --with-apr=/usr/local/apr/ && make && make install

编译openssl

1
2
3
sudo mkdir /usr/local/ssl
sudo chown hadoop:hadoop /usr/local/ssl
./config -fPIC no-gost no-shared no-zlib && make && make install

编译tomcat-native

1
2
3
4
cd apache-tomcat-7.0.70/bin/
tar xzvf tomcat-native.tar.gz
cd tomcat-native-1.2.7-src/native/
./configure CFLAGS=-fPIC --with-apr=/usr/local/apr/ --with-java-home=/usr/java/jdk1.8.0_131 --with-ssl=/usr/local/ssl && make install

配置Tomcat使用APR运行模式

修改apache-tomcat-7.0.70/bin/catalina.sh文件

添加CATALINA_OPTS=”$CATALINA_OPTS -Djava.library.path=/usr/local/apr/lib”

修改apache-tomcat-7.0.70/conf/server.xml 文件

将protocol由HTTP/1.1 改成 org.apache.coyote.http11.Http11AprProtocol

1
2
3
<Connector port="8080" protocol="org.apache.coyote.http11.Http11AprProtocol"
connectionTimeout="20000"
redirectPort="8443" />

修改内存(apache-tomcat-7.0.70/bin/catalina.sh)

1
JAVA_OPTS="-server -Xmx8g -Xms8g -Xss256k -XX:+DisableExplicitGC -XX:LargePageSizeInBytes=128m -XX:+UseFastAccessorMethods -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:InitiatingHeapOccupancyPercent=50 -XX:G1ReservePercent=15"

压测

使用apache ab压测

1
ab -kc 1000 -n 10000 http://localhost/

Kafka常用命令

Posted on 2018-10-09

Kafka常用命令

启动命令

1
2
3
4
5
bin/kafka-server-start.sh  config/server.properties

#后台启动
bin/kafka-server-start.sh -daemon config/server.properties
bin/kafka-server-start.sh config/server.properties 1>/dev/null 2>&1 &

创建topic

1
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 10 --topic testTopic

查看所有的topic

1
bin/kafka-topics.sh --list --zookeeper 127.0.0.1:2181

查看topic的详细信息

1
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe --topic testTopic

为topic增加partition

1
bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --alter --partitions 20 --topic testTopic

删除topic

1
bin/kafka-topics.sh --delete --zookeeper 127.0.0.1:2181 --topic testTopic  #要配置允许删除topic,delete.topic.enable=true

kafka生产者客户端命令

1
2
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testTopic
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic testTopic --producer.config config/producer.properties

kafka消费者客户端命令

1
2
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic testTopic
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic testTopic --consumer.config config/consumer.properties

新消费者列表查询(支持0.9版本+)

1
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --list

显示某个消费组的消费详情(支持0.9版本+)

1
bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group testGroup001

Kafka Stream TimestampExtractor

Posted on 2018-10-09

问题

线上的KafkaStream程序出现了下面的异常,”has invalid (negative) timestamp.”:

1
2
3
4
5
6
7
8
9
10
11
Exception in thread "ad-effect-application-001-bf034f5b-9012-4e5e-aab3-128774768e74-StreamThread-2" org.apache.kafka.streams.errors.StreamsException: Input record ConsumerRecord(topic = dmpRawLog7, partition = 5, offset = 375653352, CreateTime = -1, serialized key size = -1, serialized value size = 980, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"data":"{\"type\":\"getad\",\"sid\":\"e52db2495177bd3c\",\"time\":1538208879,\"zone\":{\"id\":\"1xsgiwb0jqfh\",\"resourceid\":\"\",\"agentid\":\"1xsgh2et8jk7\",\"businessid\":\"sndo_sndo\"},\"user\":{\"id\":\"5d4073746afabbf9a11a27fbf9fe33eb\",\"clkip\":\"\",\"clkuseragent\":\"\",\"clkvalid\":0},\"scenario\":{\"page\":\"http://sp360.9idudu.com/sdk/defaultr/app/share/share.html?appId=9214af24213ab9212c5235fafe82c4d4\",\"referrer\":\"http://sp360.9idudu.com/sdk/defaultr/app/share/share.html?appId=9214af24213ab9212c5235fafe82c4d4\\u0026appSecret=a943ff9a630c3c01c3e92444f354f860\\u0026messageId=cmsqukngt1irs\\u0026messageType=NEWS\\u0026parentId=af274d7b4026bcff1c04131321eaf7ec\"},\"device\":{\"ip\":\"171.220.55.140\",\"agent\":\"Mozilla/5.0 (Linux; Android 7.1.2; KINGSUN-F16 Build/N2G47H; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/52.0.2743.100 Mobile Safari/537.36 Imei/434766830973068DCBEA8AFB32463F5F\",\"screen\":\"\"},\"fill\":1,\"appid\":\"\"}"}) has invalid (negative) timestamp. Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, or because the input topic was created before upgrading the Kafka cluster to 0.10+. Use a different TimestampExtractor to process this data.
at org.apache.kafka.streams.processor.FailOnInvalidTimestamp.onInvalidTimestamp(FailOnInvalidTimestamp.java:63)
at org.apache.kafka.streams.processor.ExtractRecordMetadataTimestamp.extract(ExtractRecordMetadataTimestamp.java:61)
at org.apache.kafka.streams.processor.FailOnInvalidTimestamp.extract(FailOnInvalidTimestamp.java:46)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:85)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:493)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:628)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:512)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:482)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:459)

解决

查阅资料发现是时间戳导致的,kafka中我存的是一个Json字符串,里面包含timestamp字段,出问题的数据是因为没有timestamp字段,所以自己实现TimestampExtractor。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.TimestampExtractor;

/**
* Created by hadoop on 2018/9/30.
*/
public class CurrentTimestampExtractor implements TimestampExtractor {
@Override
public long extract(ConsumerRecord<Object, Object> record, long previousTimestamp) {
long timestamp = record.timestamp();
if (timestamp < 0) {
return System.currentTimeMillis();
} else {
return timestamp;
}
}
}

Kafka Stream 配置自定义的TimestampExtractor:

1
kafkaStreamConf.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CurrentTimestampExtractor.class.getName());

Kafka Stream WordCount

Posted on 2018-10-09

Kafka Stream 示例

Maven依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>0.11.0.2</version>
</dependency>

WordCount Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class WordCountApplication {

public static void main(final String[] args) throws Exception {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = builder.stream("TextLinesTopic");
KTable<String, Long> wordCounts = textLines
.flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word)
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));
wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long()));

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}

}

KafkaStream 一些配置说明

1
2
3
4
5
6
7
8
9
10
11
12
13
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application"); //KafkaStream Application Id
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092"); //kafka bootstrap servers
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); //key Serde
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); //value Serde

props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), 1000); //一次poll操作获取的最大记录数

props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "latest"); //如果是新的group latest 表示从当前最新开始消费,默认是earliest

props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 10); //stream操作的线程数

props.put(StreamsConfig.consumerPrefix(ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_CONFIG), false); //是否排除内部的topics,比如记录offset的topic:__consumer_offsets

《Flume系列文章》 六、自定义Flume插件

Posted on 2018-10-09

自定义Flume插件

Flume的很多组件都是可以使用自定义的组件,可以自己实现Source、Channel、Sink,也可以编写Channel Selectors、Sink Processors、Interceptors、Monitoring等。每个组件的实现都是继承组件定义的接口,实现相关的方法,具体的实现可以参考Flume已经写好的组件。

创建插件项目,继承组件的接口并实现相关方法

1
2
3
4
public class DmpMemoryChannel extends BasicChannelSemantics {
// TODO
// 在编写插件的时候可以通过context获取配置文件中的配置,可以自定义一些配置。
}

打Jar包,并放入到 plugins.d 目录中

在plugins.d中创建dmp-flume-plugin/lib、dmp-flume-plugin/libext、dmp-flume-plugin/native目录,lib中放插件的Jar包,libext中放插件依赖的Jar包,native存放any required native libraries, such as .so files。

配置组件

1
agent.channels.memoryChannel1.type = org.sndo.dmp.flume.channel.memory.DmpMemoryChannel

Druid集群搭建

Posted on 2018-09-28

下载解压安装包

下载Druid包,解压到安装目录:

1
wget http://static.druid.io/artifacts/releases/druid-0.12.3-bin.tar.gz

下载MySQL metadata store extension,并解压到extensions目录中:

1
wget http://static.druid.io/artifacts/releases/mysql-metadata-storage-0.12.3.tar.gz

手动创建 druid-0.12.3/var/tmp 和 druid-0.12.3/var/hadoop-tmp 目录

创建Metadata storage和Deep storage

Metadata storage

我使用的是MySql数据库作为元数据库,在MySql中创建一个Druid用户和Druid数据库,并给Druid用户配置Druid数据库的所有权限。

Deep storage

我使用HDFS作为Druid的Deep storage,在hdfs创建/druid目录,相关命令如下:

1
2
3
sudo su hdfs
hadoop fs -mkdir -p /druid
hadoop fs -chown hadoop:supergroup /druid #将目录授权给hadoop

集群分配

线上集群一共10台机器,启动8个数据节点、2个Master节点、2个查询节点。
数据节点:包括 Historical 和 MiddleManager 进程。
Master节点:包括 Coordinator 和 Overlord 进程。
查询节点:包括 Broker 和 Router(可选) 进程。

配置

创建druid-0.12.3/conf-online目录,将conf/中的配置复制到conf-online/中,在进行修改,具体配置如下:

Common配置

配置文件:conf-online/druid/_common/common.runtime.properties

复制hadoop配置文件

将Hadoop configuration XMLs(core-site.xml, hdfs-site.xml, yarn-site.xml, mapred-site.xml)复制到conf-online/druid/_common中。

配置 Zookeeper

1
2
3
4
5
6
#
# Zookeeper
#

druid.zk.service.host=host01:2181,host02:2181,host03:2181
druid.zk.paths.base=/druid

添加 extensions (“druid-kafka-indexing-service” “mysql-metadata-storage” “druid-hdfs-storage”)

1
druid.extensions.loadList=["druid-kafka-eight", "druid-histogram", "druid-datasketches", "druid-lookups-cached-global", "mysql-metadata-storage", "druid-kafka-indexing-service", "druid-hdfs-storage"]

配置 Metadata storage

1
2
3
4
5
6
7
8
9
#
# Metadata storage
#

# For MySQL:
druid.metadata.storage.type=mysql
druid.metadata.storage.connector.connectURI=jdbc:mysql://host01:3306/druid
druid.metadata.storage.connector.user=druid
druid.metadata.storage.connector.password=123456

配置 Deep storage

1
2
3
4
5
6
7
#
# Deep storage
#

# For HDFS (make sure to include the HDFS extension and that your Hadoop config files in the cp):
druid.storage.type=hdfs
druid.storage.storageDirectory=hdfs://nameservice1/druid/segments

配置 Indexing service logs

1
2
3
4
5
6
7
#
# Indexing service logs
#

# For HDFS (make sure to include the HDFS extension and that your Hadoop config files in the cp):
druid.indexer.logs.type=hdfs
druid.indexer.logs.directory=hdfs://nameservice1/druid/indexing-logs

设置 Monitoring 日志级别

1
2
3
4
5
6
#
# Monitoring
#
druid.monitoring.monitors=["io.druid.java.util.metrics.JvmMonitor"]
druid.emitter=logging
druid.emitter.logging.logLevel=info

Coordinator配置

配置目录:conf-online/druid/coordinator

jvm.config

修改时区

1
2
3
4
5
6
7
8
-server
-Xms3g
-Xmx3g
-Duser.timezone=UTC+0800
-Dfile.encoding=UTF-8
-Djava.io.tmpdir=var/tmp
-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
-Dderby.stream.error.file=var/druid/derby.log

runtime.properties

修改host和port

1
2
3
4
5
6
druid.service=druid/coordinator
druid.host=192.168.1.120
druid.port=28081

druid.coordinator.startDelay=PT30S
druid.coordinator.period=PT30S

Overlord配置

配置目录:conf-online/druid/overlord

jvm.config

修改时区

1
2
3
4
5
6
7
-server
-Xms3g
-Xmx3g
-Duser.timezone=UTC+0800
-Dfile.encoding=UTF-8
-Djava.io.tmpdir=var/tmp
-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager

runtime.properties

修改host和port

1
2
3
4
5
6
7
8
druid.service=druid/overlord
druid.host=192.168.1.120
druid.port=28090

druid.indexer.queue.startDelay=PT30S

druid.indexer.runner.type=remote
druid.indexer.storage.type=metadata

Historical配置

配置目录:conf-online/druid/historical

jvm.config

修改时区和MaxDirectMemorySize

1
2
3
4
5
6
7
8
-server
-Xms8g
-Xmx8g
-XX:MaxDirectMemorySize=20g
-Duser.timezone=UTC+0800
-Dfile.encoding=UTF-8
-Djava.io.tmpdir=var/tmp
-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager

runtime.properties

修改host和port
调整druid.server.http.numThreads和druid.processing.numThreads(建议cores-1)
注意:MaxDirectMemorySiz >= druid.processing.buffer.sizeBytes * (druid.processing.numMergeBuffers + druid.processing.numThreads + 1)
druid.segmentCache.locations是本地缓存segment的地方
配置historical的缓存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
druid.service=druid/historical
druid.host=192.168.1.120
druid.port=28083

# HTTP server threads
druid.server.http.numThreads=25

# Processing threads and buffers
druid.processing.buffer.sizeBytes=1073741824
druid.processing.numMergeBuffers=2
druid.processing.numThreads=15
druid.processing.tmpDir=var/druid/processing

# Segment storage
#druid.segmentCache.locations=[{"path":"var/druid/segment-cache","maxSize"\:130000000000}]
druid.segmentCache.locations=[{"path":"/data01/druid/segment-cache","maxSize"\:100000000000}, {"path":"/data02/druid/segment-cache","maxSize"\:100000000000}]
druid.server.maxSize=200000000000

druid.query.groupBy.maxOnDiskStorage=10737418240

# Cache
druid.historical.cache.useCache=true
druid.historical.cache.populateCache=true
druid.historical.cache.unCacheable=["select"]
druid.cache.type=caffeine
druid.cache.sizeInBytes=6000000000

MiddleManager配置

配置目录:conf-online/druid/middleManager

jvm.config

修改时区

1
2
3
4
5
6
7
8
-server
-Xms64m
-Xmx64m
-Duser.timezone=UTC+0800
-Dfile.encoding=UTF-8
-Dhadoop.hadoop.tmp.dir=var/hadoop-tmp
-Djava.io.tmpdir=var/tmp
-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager

runtime.properties

修改host和port
修改druid.indexer.runner.javaOpts,配置MaxDirectMemorySize
修改druid.indexer.fork.property.druid.processing相关配置
指定druid.indexer.task.defaultHadoopCoordinates,HadoopIndexTasks使用的Hadoop版本

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
druid.service=druid/middleManager
druid.host=192.168.1.120
druid.port=28091

# Number of tasks per middleManager
druid.worker.capacity=3

# Task launch parameters
druid.indexer.runner.javaOpts=-server -Xms2g -Xmx2g -XX:MaxDirectMemorySize=4g -Duser.timezone=UTC+0800 -Dfile.encoding=UTF-8 -Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager
druid.indexer.task.baseTaskDir=var/druid/task
druid.indexer.task.restoreTasksOnRestart=true

# HTTP server threads
druid.server.http.numThreads=25

# Processing threads and buffers on Peons
druid.indexer.fork.property.druid.processing.buffer.sizeBytes=536870912
druid.indexer.fork.property.druid.processing.numMergeBuffers=2
druid.indexer.fork.property.druid.processing.numThreads=2
druid.indexer.fork.property.druid.processing.tmpDir=var/druid/processing

# Hadoop indexing
druid.indexer.task.hadoopWorkingPath=var/druid/hadoop-tmp
druid.indexer.task.defaultHadoopCoordinates=["org.apache.hadoop:hadoop-client:2.6.0-mr1-cdh5.13.1"]

Broker配置

配置目录:conf-online/druid/broker

jvm.config

修改时区
修改-Xms、-Xmx、MaxDirectMemorySize

1
2
3
4
5
6
7
8
-server
-Xms24g
-Xmx24g
-XX:MaxDirectMemorySize=20g
-Duser.timezone=UTC+0800
-Dfile.encoding=UTF-8
-Djava.io.tmpdir=var/tmp
-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager

runtime.properties

修改host和port
调整druid.server.http.numThreads和druid.processing.numThreads(建议cores-1)
注意:MaxDirectMemorySiz >= druid.processing.buffer.sizeBytes * (druid.processing.numMergeBuffers + druid.processing.numThreads + 1)
设置druid.sql.enable为true,可以使用sql
设置druid.query.groupBy.maxOnDiskStorag。合并缓冲区或dictionary填满时,将结果集溢出到磁盘的最大磁盘空间(每次查询)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
druid.service=druid/broker
druid.host=192.168.1.120
druid.port=28082

# HTTP server threads
druid.broker.http.numConnections=50
druid.server.http.numThreads=40

# Processing threads and buffers
druid.processing.buffer.sizeBytes=1073741824
druid.processing.numMergeBuffers=4
druid.processing.numThreads=14
druid.processing.tmpDir=var/druid/processing

# SQL
druid.sql.enable = true

# Query cache
#druid.broker.cache.useCache=true
#druid.broker.cache.populateCache=true
#druid.cache.type=local
#druid.cache.sizeInBytes=2000000000

# Query config
# 查询节点请求历史节点方式,有random和connectionCount两种连接方式
druid.broker.balancer.type=connectionCount
druid.query.groupBy.maxOnDiskStorage=10737418240

启动集群

修改bin/node.sh

在文件前面添加 DRUID_CONF_DIR=”conf-online/druid”

复制配置到其他节点上,注意要修改 host

启动 Master节点

1
2
bin/coordinator.sh start
bin/overlord.sh start

启动 Data节点

1
2
bin/historical.sh start
bin/middleManager.sh start

启动 Query节点

1
bin/broker.sh start

安装Superset

Superset是一款开源的数据探索与可视化工具,支持Druid数据源,可以将druid中的数据可视化的展示,还可以支持Sql查询。
项目地址:https://github.com/apache/incubator-superset
安装参考:https://superset.incubator.apache.org/installation.html

遇到的问题

Not enough direct memory.

1
Not enough direct memory.  Please adjust -XX:MaxDirectMemorySize, druid.processing.buffer.sizeBytes, druid.processing.numThreads, or druid.processing.numMergeBuffers: maxDirectMemory[1,073,741,824], memoryNeeded[6,979,321,856] = druid.processing.buffer.sizeBytes[536,870,912] * (druid.processing.numMergeBuffers[2] + druid.processing.numThreads[10] + 1)

修改-XX:MaxDirectMemorySize,使得:MaxDirectMemorySiz >= druid.processing.buffer.sizeBytes * (druid.processing.numMergeBuffers + druid.processing.numThreads + 1)

Failed to create directory within 10000 attempts

手动创建var/tmp目录,路径错误,在druid中有一个环境路径需要提前手工创建environment:java.io.tmpdir=var/tmp

1
2
3
4
5
6
7
8
9
10
11
12
java.lang.IllegalStateException: Failed to create directory within 10000 attempts (tried 1538122986466-0 to 1538122986466-9999)
at com.google.common.io.Files.createTempDir(Files.java:600) ~[guava-16.0.1.jar:?]
at io.druid.segment.indexing.RealtimeTuningConfig.createNewBasePersistDirectory(RealtimeTuningConfig.java:58) ~[druid-server-0.12.3.jar:0.12.3]
at io.druid.segment.indexing.RealtimeTuningConfig.makeDefaultTuningConfig(RealtimeTuningConfig.java:68) ~[druid-server-0.12.3.jar:0.12.3]
at io.druid.segment.realtime.FireDepartment.<init>(FireDepartment.java:62) ~[druid-server-0.12.3.jar:0.12.3]
at io.druid.indexing.kafka.KafkaIndexTask.run(KafkaIndexTask.java:397) ~[?:?]
at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:444) [druid-indexing-service-0.12.3.jar:0.12.3]
at io.druid.indexing.overlord.ThreadPoolTaskRunner$ThreadPoolTaskRunnerCallable.call(ThreadPoolTaskRunner.java:416) [druid-indexing-service-0.12.3.jar:0.12.3]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_131]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_131]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_131]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_131]

Druid构架

Posted on 2018-09-26

Druid系统架构

Historical

历史节点对历史数据进行存储和查询,从Deep Storage下载Segment,然后响应Broker对于Segment的查询将查询结果返回给Broker节点,通过Zookeeper来声明自己存储的节点,同时也通过zookeeper来监听加载或删除Segment的信号,不接受写入操作。

MiddleManager

MiddleManager节点摄入新的数据到集群中,从外部数据源中读取数据并生成新的Segment。

Broker

Broker节点从外部客户端接收查询,并将这些查询转发给Historicals和MiddleManagers。当Brokers收到返回的结果时,它会合并这些结果并将它们返回给调用者。Druid的查询时通过请求Broker节点,而不是直接查询Historicals或MiddleManagers。

Coordinator

协调节点负责监控历史节点,将Segment分配给特定服务器,确保Segment在历史节点之间保持平衡。

Overlord

Overlord节点负责监控MiddleManager节点并控制数据摄入到Druid中。它将摄入数据的任务分配给MiddleManager节点,并协调Segment的发布。

Router

Router是一个可选的进程,在Brokers、Overlords和Coordinators前提供统一的网关。

外部依赖

Deep storage

Druid使用它来存储已被摄入系统的任何数据。通常是分布式存储系统,如S3、HDFS 或 a network mounted filesystem。

Metadata store

存储元数据,通常是传统的关系数据库,如:PostgreSQL、MySQL。

ZooKeeper

ZooKeeper用于内部服务发现,协调和领导者选举。

HBase MapReduce简单实例

Posted on 2018-09-26

HBase MapReduce简单实例

前言

在项目中使用到了HBase的MapReduce,写个小例子统计HBase table记录数

Mapper

需要继承 org.apache.hadoop.hbase.mapreduce.TableMapper 这个Mapper类,重写 map() 方法。

1
2
3
4
5
6
7
8
public class CounterMapper extends TableMapper<Text, LongWritable> {

@Override
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
String tableName = context.getConfiguration().get(TableInputFormat.INPUT_TABLE);
context.write(new Text(tableName), new LongWritable(1));
}
}

Reducer

继承 org.apache.hadoop.mapreduce.Reducer 类,重写 reduce() 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class CounterReducer extends Reducer<Text, LongWritable, Text, LongWritable> {
private static AtomicLong result = new AtomicLong(0);

@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long count = 0;
for (LongWritable value : values) {
count += value.get();
}
result.addAndGet(count);
String tableName = context.getConfiguration().get(TableInputFormat.INPUT_TABLE);
context.write(new Text(tableName), new LongWritable(result.get()));
}
}

job

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class CounterJob {
public static void main(String[] args) throws Exception {
Configuration config = HBaseConfiguration.create();

GenericOptionsParser optionsParser = new GenericOptionsParser(config, args);
String[] remainingArgs = optionsParser.getRemainingArgs();
if (remainingArgs.length != 2) {
System.err.println("Usage: CounterJob <tableName> <outPath>");
System.exit(2);
}

String tableName = remainingArgs[0];
String outpath = remainingArgs[1];

Scan scan = new Scan();
scan.setCaching(500);
scan.setCacheBlocks(false);

Job job = Job.getInstance(config, "HbaseCounterJob");
job.setJarByClass(CounterJob.class);

TableMapReduceUtil.initTableMapperJob(tableName, scan, CounterMapper.class, Text.class, LongWritable.class, job);
job.setReducerClass(CounterReducer.class);
job.setNumReduceTasks(1);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
FileOutputFormat.setOutputPath(job, new Path(outpath));

boolean b = job.waitForCompletion(true);
if (!b) {
throw new IOException("error with job!");
}
}
}
<i class="fa fa-angle-left"></i>1…345<i class="fa fa-angle-right"></i>

45 posts
28 tags
GitHub
© 2022 wangz