wForget's blog


  • Home

  • About

  • Tags

  • Archives

声明式编程和命令式编程(转载)

Posted on 2019-12-11

声明式编程和命令式编程(转载)

原文地址:声明式编程和命令式编程的比较

先统一一下概念,我们有两种编程方式:命令式和声明式。

我们可以像下面这样定义它们之间的不同:

  • 命令式编程:命令“机器”如何去做事情(how),这样不管你想要的是什么(what),它都会按照你的命令实现。
  • 声明式编程:告诉“机器”你想要的是什么(what),让机器想出如何去做(how)。

声明式编程和命令式编程的代码例子

举个简单的例子,假设我们想让一个数组里的数值翻倍。

我们用命令式编程风格实现,像下面这样:

1
2
3
4
5
6
7
8
9
var numbers = [1,2,3,4,5]

var doubled = []

for(var i = 0; i < numbers.length; i++) {
var newNumber = numbers[i] * 2
doubled.push(newNumber)
}
console.log(doubled) //=> [2,4,6,8,10]

我们直接遍历整个数组,取出每个元素,乘以二,然后把翻倍后的值放入新数组,每次都要操作这个双倍数组,直到计算完所有元素。

而使用声明式编程方法,我们可以用 Array.map 函数,像下面这样:

1
2
3
4
5
6
var numbers = [1,2,3,4,5]

var doubled = numbers.map(function(n) {
return n * 2
})
console.log(doubled) //=> [2,4,6,8,10]

map 利用当前的数组创建了一个新数组,新数组里的每个元素都是经过了传入map的函数(这里是function(n) { return n*2 })的处理。

map函数所作的事情是将直接遍历整个数组的过程归纳抽离出来,让我们专注于描述我们想要的是什么(what)。注意,我们传入map的是一个纯函数;它不具有任何副作用(不会改变外部状态),它只是接收一个数字,返回乘以二后的值。

在一些具有函数式编程特征的语言里,对于list数据类型的操作,还有一些其他常用的声明式的函数方法。例如,求一个list里所有值的和,命令式编程会这样做:

1
2
3
4
5
6
7
8
var numbers = [1,2,3,4,5]

var total = 0

for(var i = 0; i < numbers.length; i++) {
total += numbers[i]
}
console.log(total) //=> 15

而在声明式编程方式里,我们使用 reduce 函数:

1
2
3
4
5
6
var numbers = [1,2,3,4,5]

var total = numbers.reduce(function(sum, n) {
return sum + n
});
console.log(total) //=> 15

reduce 函数利用传入的函数把一个 list 运算成一个值。它以这个函数为参数,数组里的每个元素都要经过它的处理。每一次调用,第一个参数(这里是sum)都是这个函数处理前一个值时返回的结果,而第二个参数(n)就是当前元素。这样下来,每此处理的新元素都会合计到sum中,最终我们得到的是整个数组的和。

同样,reduce 函数归纳抽离了我们如何遍历数组和状态管理部分的实现,提供给我们一个通用的方式来把一个 list 合并成一个值。我们需要做的只是指明我们想要的是什么?

声明式编程语言:SQL

也许你还不能明白,但有一个地方,你也许已经用到了声明式编程,那就是SQL。

你可以把SQL当做一个处理数据的声明式查询语言。完全用SQL写一个应用程序?这不可能。但如果是处理相互关联的数据集,它就显的无比强大了。

像下面这样的查询语句:

1
2
3
4
SELECT * from dogs
INNER JOIN owners

WHERE dogs.owner_id = owners.id

如果我们用命令式编程方式实现这段逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
//dogs = [{name: 'Fido', owner_id: 1}, {...}, ... ]
//owners = [{id: 1, name: 'Bob'}, {...}, ...]

var dogsWithOwners = []
var dog, owner

for(var di=0; di < dogs.length; di++) {
dog = dogs[di]

for(var oi=0; oi < owners.length; oi++) {
owner = owners[oi]
if (owner && dog.owner_id == owner.id) {
dogsWithOwners.push({
dog: dog,
owner: owner
})
}
}}
}

我可没说SQL是一种很容易懂的语言,也没说一眼就能把它们看明白,但基本上还是很整洁的。

SQL代码不仅很短,不不仅容易读懂,它还有更大的优势。因为我们归纳抽离了how,我们就可以专注于what,让数据库来帮我们优化how.

我们的命令式编程代码会运行的很慢,因为需要遍历所有 list 里的每个狗的主人。

而SQL例子里我们可以让数据库来处理how,来替我们去找我们想要的数据。如果需要用到索引(假设我们建了索引),数据库知道如何使用索引,这样性能又有了大的提升。如果在此不久之前它执行过相同的查询,它也许会从缓存里立即找到。通过放手how,让机器来做这些有难度的事,我们不需要掌握数据库原理就能轻松的完成任务。

《Spark知识梳理》 一、Spark基本概念

Posted on 2019-11-19

前言

《Spark 知识梳理》系列文章主要是通过阅读《Spark内核设计的艺术》书籍,整理的一些个人笔记。

Spark 特点

减少磁盘IO

Spark 将 map 端中间结果存在内存中。

增加并行度

对 Job 划分成多个 Stage,有些 Stage 可以并行执行,增加并行度。

避免重复计算

任务失败重新调度时,会过滤掉执行成功的分区任务。

可选的 Shuffle 排序

Spark 可根据不同场景选择在 map 端或是 reduce 端进行 Shuffle 排序

灵活的内存管理

Spark 将内存分为,堆上存储内存、堆外存储内存、堆上执行内存和堆外执行内存,并且提供了存储内存和执行内存之间动态占用空闲区域。

检查点

启用检查点会保存 RDD 计算的结果,失败重建时不需要重新计算 RDD。

Spark 基础知识

RDD

RDD(Resilient Distributed Dataset),弹性分布式数据集。

DAG

DAG(Directed Acycle Graph),有向无环图。反应 RDD 之间的依赖。

Partition

一个 RDD 可以划分成多个 Partition,Spark 根据 Partition 的数量确定 Task 的数量。

NarrowDependency

NarrowDependency(窄依赖),子 RDD 依赖于父 RDD 固定的 Partition。分为 OneToOneDependency 和 RangeDependency。

ShuffleDependency

ShuffleDependency(宽依赖),子 RDD 对父 RDD 中所有的 Partition 都有可能产生依赖,具体依赖取决于分区计算器(Partitioner)的算法。

Job

用户提交的作业。

Stage

Job 执行阶段。DAGScheduler 按照 ShuffleDependency 作为 Stage 划分节点对 RDD 的 DAG 进行 Stage 划分(上游 Stage 作为 ShuffleMapStage)。Stage 分为 ShuffleMapStage 和 ResultStage。

Task

具体执行的任务。一个 Job 在每个 Stage 都会按照 RDD 的 Partition 数量创建 Task。Task 分为 ShuffleMapTask(类似 map) 和 ResultTask(类似 reduce)。

Shuffle

Shuffle 是 map 任务输出到 reduce 任务输入的中间处理过程。将 map 任务的输出按照指定的分区策略(如,按 key 的 hash 值)分配给某个分区的 reduce 任务。

Nginx配置指南

Posted on 2019-04-12

Nginx配置指南

nginx.conf 文件结构

参考:Nginx配置详解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
...              #全局块

events { #events块
...
}

http #http块
{
... #http全局块
server #server块
{
... #server全局块
location [PATTERN] #location块
{
...
}
location [PATTERN]
{
...
}
}
server
{
...
}
... #http全局块
}
  1. 全局块:配置影响nginx全局的指令。一般有运行nginx服务器的用户组,nginx进程pid存放路径,日志存放路径,配置文件引入,允许生成worker process数等。
  2. events块:配置影响nginx服务器或与用户的网络连接。有每个进程的最大连接数,选取哪种事件驱动模型处理连接请求,是否允许同时接受多个网路连接,开启多个网络连接序列化等。
  3. http块:可以嵌套多个server,配置代理,缓存,日志定义等绝大多数功能和第三方模块的配置。如文件引入,mime-type定义,日志自定义,是否使用sendfile传输文件,连接超时时间,单连接请求数等。
  4. server块:配置虚拟主机的相关参数,一个http中可以有多个server。
  5. location块:配置请求的路由,以及各种页面的处理情况。

配置详解

全局块

指令 说明
user 配置 worker 进程的用户和组
worker_processes 指定 worker 进程启动数量,用于处理客户端的所有连接。设置该值为 CPU 的核数。(max_clients = worker_processes * worker_connections)
error_log 指定日志文件,如果在其他区段中没有设置其他的 error_log ,那么这个日志文件将会记录所有的错误。该指令的第二个参数指定了被记录错误的级别(debug 、info 、notice 、warn 、error 、crit 、alert 、emerg )。注意 debug 级别的错误只有在编译时配置了 –with-debug 选项才可以使用。
pid 设置记录主进程 ID 的文件

events块

指令 说明
use 选择事件模型,默认 Nginx 会自动使用最适合的事件模型。对于Linux操作系统来说,可供选择的事件驱动模型有poll、select、epoll三种。epoll 是性能最高的一种。
worker_connections 定义每个worker进程可以同时处理的最大连接数

http块

Nginx打印RequestBody

Posted on 2019-04-11

Nginx打印RequestBody

背景

新的日志收集构架中直接去掉 SpringBoot 服务,而是通过 Nginx 作为服务器,收集日志,打印成日志文件,通过 Flume 消费到 Kafka 中。由于日志收集的请求都是 Post JSON 的格式,所以需要在 Nginx 中获取 Post 的 Body。

问题

在配置 log_format 后,发现无法打印 $request_body

1
log_format user_log_format escape=json '{"time": "$msec", "ip": "$remote_addr", "ua": "$http_user_agent", "data": "$request_body"}';

查看官方文档,有如下注释。意思是说 $request_body 需要在带有 proxy_pass, fastcgi_pass, uwsgi_pass, scgi_pass 这些指令的 location 中,当 request_body 被读到内存缓冲区中使用。

1
The variable’s value is made available in locations processed by the proxy_pass, fastcgi_pass, uwsgi_pass, and scgi_pass directives when the request body was read to a memory buffer.

Nginx 安装

由于在解决问题的时候需要使用到 ngx_lua 模块,所以我是直接安装的 OpenResty。
官网:http://openresty.org/cn/
安装文档:http://openresty.org/cn/installation.html

解决

在使用 (proxy_pass, fastcgi_pass, uwsgi_pass, scgi_pass) 指令的 location 中打印日志。

使用 HTTP Echo Module (OpenResty 中已经包括了这个模块)。

HTTP Echo Module 参考文档:https://www.nginx.com/resources/wiki/modules/echo/

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
worker_processes  1;
error_log logs/error.log;
events {
worker_connections 1024;
}
http {
include /usr/local/openresty/nginx/conf/mime.types;
default_type application/octet-stream;

# 日志文件格式 access.log nginx默认的日志文件
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent $host "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';
# 日志文件格式 user_defined.log 数据会写到这个文件中
log_format user_log_format escape=json '{"time": "$msec", "ip": "$remote_addr", "ua": "$http_user_agent", "data": "$request_body"}';

server {
listen 8080;
set $p_body "";
location / {
access_log /data/nginx/logs/user_defined.log user_log_format;
echo_read_request_body;
echo '';
}
}
}

使用 ngx_lua 模块

Lua 参考文档:https://www.nginx.com/resources/wiki/modules/lua/

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
worker_processes  1;
error_log logs/error.log;
events {
worker_connections 1024;
}
http {
include /usr/local/openresty/nginx/conf/mime.types;
default_type application/octet-stream;

# 日志文件格式 access.log nginx默认的日志文件
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent $host "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';
# 日志文件格式 user_defined.log 数据会写到这个文件中
log_format user_log_format escape=json '{"time": "$msec", "ip": "$remote_addr", "ua": "$http_user_agent", "data": "$request_body"}';

server {
listen 8080;
set $p_body "";
location / {
lua_need_request_body on;
content_by_lua 'local s = ngx.req.get_body_data()';

access_log /data/nginx/logs/user_defined.log user_log_format;
# echo ''; # 注意这个不能使用 echo 命令和 return 命令
}
}
}

还可以通过自定义的变量存放 request_body,然后再 log_format 中使用自定义变量打印。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
worker_processes  1;
error_log logs/error.log;
events {
worker_connections 1024;
}
http {
include /usr/local/openresty/nginx/conf/mime.types;
default_type application/octet-stream;

# 日志文件格式 access.log nginx默认的日志文件
log_format main '$remote_addr - $remote_user [$time_local] "$request" '
'$status $body_bytes_sent $host "$http_referer" '
'"$http_user_agent" "$http_x_forwarded_for"';
# 日志文件格式 user_defined.log 数据会写到这个文件中
log_format user_log_format escape=json '{"time": "$msec", "ip": "$remote_addr", "ua": "$http_user_agent", "data": "$p_body"}';

server {
listen 8080;
set $p_body "";
location / {
lua_need_request_body on;
content_by_lua '
local p_body = ngx.req.get_body_data() or ""
ngx.var.p_body = p_body
';
access_log /data/nginx/logs/user_defined.log user_log_format;
# echo ''; # 注意这个不能使用 echo 命令和 return 命令
}
}
}

Redis 分布式锁

Posted on 2019-03-28

Redis 分布式锁

分布式锁就是在分布式环境下获取锁,实现进程以独占资源的方式访问共享资源。

Redis 分布式锁的特性

  • 独享。同一时刻,只能有一个客户端获取到锁。
  • 无死锁。即使持有锁的客户端奔溃或网络断开,锁仍然可以被获取。
  • 容错。只要大部分Redis节点都活着,客户端就可以获取和释放锁。

单 Redis 实例实现分布式锁实现

SET 命令

1
2
3
4
5
SET key value [EX seconds] [PX milliseconds] [NX|XX]
# EX seconds – 设置键key的过期时间,单位时秒
# PX milliseconds – 设置键key的过期时间,单位时毫秒
# NX – 只有键key不存在的时候才会设置key的值
# XX – 只有键key存在的时候才会设置key的值

获取锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
/**
* 获取 分布式锁
* @param lockName 锁的 key
* @return 成功时返回一个随机生成的id,为了正确的释放锁,失败时返回 null
*/
public static String getLock(String lockName) {
String result = null;
try (Jedis jedis = getJedis()) {
String randomId = UUID.randomUUID().toString();
// EX seconds – 设置键key的过期时间,单位时秒。PX milliseconds – 设置键key的过期时间,单位时毫秒。NX – 只有键key不存在的时候才会设置key的值。XX – 只有键key存在的时候才会设置key的值
String status = jedis.set(lockName, randomId, "nx", "px", 10000); // 10s 过期时间
if ("OK".equals(status)) {
result = randomId;
}
} catch (Exception e) {
String msg = ExceptionUtil.stackTraceMsg(e);
logger.error(msg);
}
return result;
}

释放锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
/**
* 释放锁
* @param lockName 锁的名称
* @param randomId 获取锁时返回的的id
* @return
*/
public static boolean releaseLock(String lockName, String randomId) {
boolean result = false;
// Lua 脚本
String script = "if redis.call(\"get\",KEYS[1]) == ARGV[1] then\n" +
" return redis.call(\"del\",KEYS[1])\n" +
"else\n" +
" return 0\n" +
"end";
try (Jedis jedis = getJedis()) {
Long status = (Long) jedis.eval(script, Collections.singletonList(lockName), Collections.singletonList(randomId));
if (status.longValue() == 1L) {
result = true;
}
} catch (Exception e) {
String msg = ExceptionUtil.stackTraceMsg(e);
logger.error(msg);
}
return result;
}

单 Redis 实例实现分布式锁存在的问题

在单 Redis 实例的分布式锁实现中存在的最严重的问题就是单节点失败的问题,不能保证 Redis 永远不会挂掉。
在主从结构的 Redis 集群中,这种构架也是有问题的,不能实现资源的独享,因为 Redis 的主从同步通常是异步的:

  1. 客户端 A 从 Master 获取到锁
  2. 在 Master 同步到 Slave 之前,Master 节点挂掉了
  3. Slave 节点成为了 Master 节点
  4. 客户端B取得了同一个资源被客户端A已经获取到的另外一个锁。安全失效!

Redis 宕机毕竟是小概率实现,所以在可以忽略宕机的事件时,这个构架还是很实用的。

Redlock

Redlock 是 Antirez 提出的,使用多个完全独立的 Redis 实例,来实现分布式锁的算法。

参考:The Redlock algorithm
参考:Redis集群下的RedLock算法(真分布式锁) 实践

Redisson 分布式锁

参考:分布式锁和同步器

《Phoenix使用总结》 三、二级索引

Posted on 2019-02-22

二级索引

HBase 二级索引方案

HBase 中的行是以 RowKey 的字典序排序存储。所以对于 HBase 的快速查询主要有两种场景,一种是根据 RowKey 快速查询单行数据,另一种是基于 RowKey 的前缀查询。由于 HBase 只有 RowKey 的一级索引,根据其他的字段进行查询效率是很低的,实际业务场景中可能需要引入二级索引。

我所了解的 HBase 二级索引的解决思路有如下几种:

  • 官方的解决方案是使用 Coprocessor(类似触发器),自定义写入逻辑,当有数据写入时同时写入一份索引表。
  • 可以使用其他的工具在外部构建和维护索引关系,索引字段和 RowKey 的对应关系,比如使用 Elasticsearch、MongoDB 等。
  • 使用 Phoenix 构建二级索引。

Coprocessor 的方案由于运行都交给了 Regionserver,侵入性比较强,会对 Regionserver 的性能造成影响。第一版的时候我使用了 MongoDB 维护了索引关系,后期数据量达到 20 亿,对 MongoDB 所在的服务器造成了很大的压力,所以后期决定使用基于 HBase 的 Phoenix 构建二级索引。

Phoenix 二级索引

官方文档:http://phoenix.apache.org/secondary_indexing.html

索引特性

Covered Indexes(覆盖索引)

覆盖索引指的是把查询相关的字段都指定在索引中,这样查询就只需要在索引表中查询就行。建立索引的语句如下,把查询的字段(where条件字段)放在 ON 语句中,把其他需要查询出来的字段(其他 select 字段)放在 INCLUDE 中。

Phoenix 的索引其实就是维护一张 HBase 表(本地索引与原始数据同一张表),把索引的字段作为 RowKey,把 INCLUDE 的字段放在 COLUMN 中。所以索引的顺序也注意,需要把条件中一定有的字段放在前面,可能有的字段放在后面。

1
2
CREATE INDEX my_index ON my_table (v1,v2) INCLUDE(v3);
SELECT v1, v2, v3 FROM my_table WHERE v1='' AND v2=''; // where 条件中 v1、v2 字段放在 ON 语句中,其他查询字段 v3 放在 INCLUDE 语句中

Functional Indexes(函数索引)

索引的字段不局限与列,支持任意的表达式来创建索引。

1
2
CREATE INDEX UPPER_NAME_IDX ON EMP (UPPER(FIRST_NAME||' '||LAST_NAME));
SELECT EMP_ID FROM EMP WHERE UPPER(FIRST_NAME||' '||LAST_NAME)='JOHN DOE'; // 使用函数索引,生成大写的 FIRST_NAME + LAST_NAME 的索引

构建索引

Phoenix 支持两种类型的索引机制,Global Indexes(全局索引) 和 Local Indexes(本地索引),不同的索引机制适用于不同的业务场景,默认是构建全局索引。全局索引是维护了一张新的索引表,本地索引的索引数据存在了原始数据表中(4.8.0之后)。

Global Indexes(全局索引)

Global Indexes(全局索引) 适用于大量读取操作的场景。所有数据的更新和写操作都需要更新相关的索引表,所以开销主要发生在写入阶段,查询是直接查询索引表,所以查询开销较小。由于只会查询索引表,所以在建立索引时需要合理设计 INCLUDE 字段,把需要查询的字段都放进去。

Local Indexes(本地索引)

Local Indexes(本地索引) 适用于大量写入操作的场景。本地索引的索引数据放在了数据表中(从4.8.0开始,我们将所有本地索引数据存储在同一数据表中的单独阴影列族中),索引数据和原始数据是在同一台机器上面,所以没有额外的网络开销。本地索引可以使用在没有全面覆盖 INCLUDE 的场景。

索引实例

异步创建索引

1
2
3
4
5
6
7
8
9
10
11
12
13
# 先创建异步索引
CREATE INDEX async_index ON my_schema.my_table (v) ASYNC

# 再通过 MapReduce 任务异步构建索引,output-path 是一个hdfs路径(存放 MapReduce 任务执行的临时文件)
${HBASE_HOME}/bin/hbase org.apache.phoenix.mapreduce.index.IndexTool
--schema my_schema --data-table my_table --index-table async_index
--output-path ASYNC_IDX_HFILES

# 实例
CREATE LOCAL INDEX match_index_new_qm ON PUBG.MATCH_INDEX_NEW(lowerNickName, type, mode, queueSize, startedAt DESC) INCLUDE (matchId, totalRank) ASYNC
# 构建时发现无法找到 org.apache.phoenix.mapreduce.index.IndexTool 类,索引把 jar 加到 HBASE_CLASSPATH 中
export HBASE_CLASSPATH=/opt/cloudera/parcels/APACHE_PHOENIX/lib/phoenix/phoenix-4.14.0-cdh5.13.2-client.jar
hbase org.apache.phoenix.mapreduce.index.IndexTool --schema PUBG --data-table MATCH_INDEX_NEW --index-table match_index_new_qm --output-path /user/hadoop/wangzhen

创建索引

1
2
3
4
CREATE INDEX my_index ON my_table (v1) INCLUDE (v2);

CREATE INDEX my_index ON my_table (v2 DESC, v1) INCLUDE (v3)
SALT_BUCKETS=10, DATA_BLOCK_ENCODING='NONE';

与CREATE TABLE语句一样,CREATE INDEX语句可以传递属性以应用于基础HBase表,包括对其进行加盐的能力,如果主表被加盐,则索引将以相同的方式自动为全局索引加盐。使用本地索引时,不允许指定SALT_BUCKETS。

创建本地索引

1
CREATE LOCAL INDEX my_index ON my_table (v1)

查询时指定索引

1
SELECT /*+ INDEX(my_table my_index) */ v2 FROM my_table WHERE v1 = 'foo'

《Phoenix使用总结》 二、Spark操作Phoenix

Posted on 2019-02-22

Spark 操作 Phoenix

由于之前的映射数据是存在 MongoDB 中,所以需要将 MongoDB 中的数据导入到 Phoenix 中。这里记录 Spark 对 Phoenix 的操作。

Maven 依赖

1
2
3
4
5
6
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-spark</artifactId>
<version>${phoenix.version}</version>
<scope>provided</scope>
</dependency>

从 Phoenix 读取数据

1
2
3
val rdd: RDD[Map[String, AnyRef]] = sparkContext.phoenixTableAsRDD(
"TABLE1", Seq("ID", "COL1"), zkUrl = Some("phoenix-server:2181")
)

写入 Phoenix

1
2
3
4
5
6
7
8
9
10
11
12
var matchIndexRDD = MongoSpark.load(sparkContext, ReadConfig(Map("uri" -> uri
, "database" -> "pubg"
, "collection" -> "match_index")))
.filter(m => (StringUtils.isNotBlank(m.getString("lowerNickName")) && m.getDate("startedAt") != null))
.map(m => (m.getString("lowerNickName"), m.getDate("startedAt"), m.getString("matchId")
, m.getString("mode"), m.getInteger("queueSize"), m.getInteger("totalRank"), m.getString("type")))

matchIndexRDD.saveToPhoenix(
"PUBG.MATCH_INDEX",
Seq("LOWERNICKNAME", "STARTEDAT", "MATCHID", "MODE", "QUEUESIZE", "TOTALRANK", "TYPE"),
zkUrl = Some("datanode03:2181")
)

《Phoenix使用总结》 一、使用背景

Posted on 2019-02-21

使用背景

项目中有一个 HBase 表需要根据特定的条件进行查询,并且无法通过一次 rowkey 的设计达到很好的效果。第一版,在 MongoDB 中对 HBase 表的数据建立一张映射表,通过查询条件在 MongoDB 中查询出 RowKey,再到 HBase 中通过 RowKey 进行查询。
到后期由于数据量的增大(20亿),MongoDB所在的机器负载太高。后面通过讨论,使用 Phoenix 做二级索引进行查询。

Phoenix 简介

Phoenix 是开源的构建于 HBase 之上的 SQL 层,使我们能够使用标准的 JDBC API 代替 HBase Client API 操作 Hbase 进行创建表、插入数据和查询数据。

Apache Phoenix 结合以下两点实现在 Hadoop 中低延迟的 OLTP 和 operational analytics:

  • 强大的标准SQL和完整 ACID 事务的 JDBC APIs
  • 通过利用 HBase 作为后台存储,提供了NoSQL的late-bound, schema-on-read灵活的功能。

官网:http://phoenix.apache.org/

Quick Start

连接

Maven依赖

1
2
3
4
5
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-core</artifactId>
<version>4.14.0-cdh5.13.2</version>
</dependency>

JDBC 连接

1
2
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
Connection connection = DriverManager.getConnection("jdbc:phoenix:dmp-test01,dmp-test02,dmp-test03:2181");

DBCP 连接

1
2
3
4
BasicDataSource xdataSource = new BasicDataSource();
dataSource.setDriverClassName("org.apache.phoenix.jdbc.PhoenixDriver");
dataSource.setUrl("jdbc:phoenix:datanode03,datanode04,datanode05:2181");
Connection connection = dataSource.getConnection();

可以参考我写的工具类:PhoenixDbcpUtils.java

语法

Phoenix 的语法:http://phoenix.apache.org/language/index.html

示例

建表

1
2
3
4
5
6
7
8
9
10
CREATE TABLE IF NOT EXISTS PUBG.MATCH_INDEX (
lowerNickName VARCHAR NOT NULL,
startedAt DATE NOT NULL,
matchId VARCHAR,
mode VARCHAR,
queueSize INTEGER,
totalRank INTEGER,
type VARCHAR,
CONSTRAINT match_index_pk PRIMARY KEY (lowerNickName, startedAt))
SALT_BUCKETS = 100;

HBase 中有一个预分区的概念,防止热数据。Phoenix 建表时预分区的方案有两种,一种是使用 SPLIT ON 语句(与 HBase 的 SPLITS 概念一样),另一种是 Salted Tables ,建表时指定 SALT_BUCKETS 数量,具体可参考:http://phoenix.apache.org/salted.html。

建立本地索引

1
CREATE LOCAL INDEX match_index_qm ON PUBG.MATCH_INDEX(lowerNickName, type, mode, queueSize, startedAt DESC) INCLUDE (matchId, totalRank) 

插入数据

1
UPSERT INTO PUBG.MATCH_INDEX(LOWERNICKNAME, STARTEDAT, MATCHID, MODE, QUEUESIZE, TOTALRANK, TYPE) VALUES (?, ?, ?, ?, ? ,?, ?)

Spring注解总结

Posted on 2018-11-27

Spring注解总结

@Autowired

@Autowired 注解,自动装配,默认按类型匹配的方式,在容器查找匹配的 Bean,当有且仅有一个匹配的 Bean 时,Spring 将其注入 @Autowired 标注的变量中。
@Autowired 注解有一个 required 属性,默认为 true,当没有找到相应的 Bean 对象时会抛出异常,如果不想抛出异常可以将 required 设置为 false。

@Qualifier

如果容器中有一个以上匹配的 Bean,则可以通过 @Qualifier 注解限定Bean的名称,与 @Autowired 配合使用。value 属性指定 Bean 的名称。

1
2
@Autowired
@Qualifier(value="beanName")

@Resource

@Resource 注解与 @Autowired 注解作用非常相似。

@Resource的装配顺序:

  1. @Resource 后面没有任何内容,默认通过 Name 属性去匹配 Bean,找不到再按 Type去匹配
  2. 指定了 Name 或者 Type 则根据指定的 Name 或者 Type 去匹配 Bean
  3. 指定了 Name 和 Type 则根据指定的 Name 和 Type 去匹配 Bean,任何一个不匹配都将报错

@Autowired和@Resource两个注解的区别:

  1. @Autowired 默认按照 byType 方式进行 Bean 匹配,@Resource 默认按照 byName 方式进行 Bean 匹配
  2. @Autowired 是 Spring 的注解,@Resource 是 J2EE 的注解

@Component/@Repository/@Service/@Controller

@Component/@Repository/@Service/@Controller 可以向 Spring 容器注册 Bean。
需要在 applicationContext.xml 中注册 <context:component-scan base-package=”pagkage1[,pagkage2,…,pagkageN]”/>,或者使用 @ComponentScan 注解指定扫描包。

@Component

@Component 是所有受 Spring 管理的组件的通用形式,@Component 注解可以放在类的头上,@Component 不推荐使用。

@Repository

@Repository 对应数据访问层(Dao 层) Bean。

1
2
3
4
@Repository(value="userDao")
public class UserDaoImpl extends BaseDaoImpl<User> {
// TODO
}

@Service

@Service 对应的是业务层(Service 层) Bean。

1
2
3
4
@Service("userService")
public class UserServiceImpl implements UserService {
// TODO
}

@Controller

@Controller 对应表现层(Action 层) Bean。
value 属性用来指定 Bean 的名称,默认的名字为这个类的类名首字母小写。

1
2
3
4
5
@Controller
@Scope("prototype")
public class UserController {
// TODO
}

这里的还使用了 @Scope 注解,@Scope(“prototype”) 表示将 Controller 的范围声明为原型,保证每一个请求都 new 一个 Controller 对象来处理。
Spring 默认 scope 是单例模式(scope=”singleton”),表示只会创建一个对象,每次访问都是同一对象。

@Configuration

@Configuration 用于定义配置类,可替换 xml 配置文件,被注解的类内部包含有一个或多个被 @Bean 注解的方法,这些方法将会被 AnnotationConfigApplicationContext 或 AnnotationConfigWebApplicationContext 类进行扫描,并用于构建 Bean 定义,初始化 Spring 容器。

@ConfigurationProperties/@EnableConfigurationProperties

@ConfigurationProperties 注解主要用来把 Properties 配置文件转化为 Bean 来使用的,而 @EnableConfigurationProperties 注解的作用是 @ConfigurationProperties 注解生效。
如果只配置 @ConfigurationProperties 注解,在 IOC 容器中是获取不到 Properties 配置文件转化的 Bean 的。
@ConfigurationProperties 注解可以把 Properties 文件转化为 Bean,然后使用 @Component 注解把该 Bean 注入到 IOC 容器中。

1
2
3
4
5
6
@Component
/*prefix定义配置文件中属性*/
@ConfigurationProperties(prefix="local")
public class ComponentProperties {
// TODO
}

其他

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Configuration 把一个类作为一个IoC容器,它的某个方法头上如果注册了@Bean,就会作为这个Spring容器中的Bean。
@Lazy(true) 表示延迟初始化
@Service 用于标注业务层组件、
@Controller 用于标注控制层组件(如struts中的action)
@Repository 用于标注数据访问组件,即DAO组件。
@Component 泛指组件,当组件不好归类的时候,我们可以使用这个注解进行标注。
@Scope 用于指定scope作用域的(用在类上)
@PostConstruct 用于指定初始化方法(用在方法上)
@PreDestory 用于指定销毁方法(用在方法上)
@DependsOn 定义Bean初始化及销毁时的顺序
@Primary 自动装配时当出现多个Bean候选者时,被注解为@Primary的Bean将作为首选者,否则将抛出异常
@Autowired 默认按类型装配,如果我们想使用按名称装配,可以结合@Qualifier注解一起使用。如下:
@Autowired @Qualifier("personDaoBean") 存在多个实例配合使用
@Resource 默认按名称装配,当找不到与名称匹配的bean才会按类型装配。
@Async 异步方法调用

Netty学习

Posted on 2018-11-08

Netty学习

Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

官网

https://netty.io/

学习参考

书籍:Netty 实战
开源书籍:

  • Netty 实战(精髓)
  • Netty 4.x 用户指南

项目代码

https://github.com/wForget/NettyDemo

<i class="fa fa-angle-left"></i>12345<i class="fa fa-angle-right"></i>

45 posts
28 tags
GitHub
© 2022 wangz