xiaofang's blog

  • 首页

  • 标签

  • 分类

  • 归档

Kerberos

发表于 2020-02-05 | 评论数: | 阅读次数:

Kerberos

1、 几个概念

  • KDC:秘钥分发中心
  • Realm:kerberos管理的领域标识
  • Principal:用户/服务 向kdc注册时候的身份,形式为,主名称/实例名@领域名

    * 主名称:可以是用户名/服务名字
    * 实例名:可以是用户组/主机ip(host)
    

2、KDC

Key Distribute Center。

  • Database(zhangsan/supergroup hdfs/haddop1)

  • AS: Authentication Server(认证服务器)

    用于初始化认证,并生成Ticket Granting Ticket (TGT)

  • TGS: Ticket Granting Server(票据授权服务器)

    在TGT的基础上生成Service Ticket。一般情况下AS和TGS都在KDC的Server上

3、搭建

1
2
3
4
//重要的三个配置文件
/etc/krb5.conf
/var/kerberos/krb5kdc/kdc.conf
/var/kerberos/krb5kdc/kadm5.acl

部署

  • 安装KDC
install krb5-server krb5-libs krb5-workstation -y```
1
2
3
4

* 每个节点部署客户端(方便其他机器访问KDC)

```yum install krb5-libs krb5-workstation -y

修改配置

1、服务端配置

  • 位置:
    1
    2
    3
    * 可以配置多个realms,一般建议配置一个,大写
    * 配置support_enctype去掉```aes-2560cts:normal```,否则需要下载jar支持这个加密算法。
    * ```max_life=1d```(kinit 后一天内不用认证,超过1d会走的那个认证) ```max_renewable_life=7d```(7d后需要再kinit)

[kdcdefaults]
kdc_ports = 88
kdc_tcp_ports = 88

[realms]
HADOOP.COM = { #是设定的 realms。名字随意。Kerberos 可以支持多个 realms,会增加复杂度。大小写敏感,一般为了识别使用全部大写。这个realms跟机器的host没有大关系。

#master_key_type = aes256-cts

#和supported_enctypes默认使用aes256-cts。由于,JAVA使用aes256-cts验证方式需要安装额外的jar包(后面再做说明)。推荐不使用,并且删除aes256-cts。
kadmind_port = 749
acl_file = /var/kerberos/krb5kdc/kadm5.acl #标注了admin的用户权限,需要用户自己创建。文件格式是:Kerberos_principal permissions [target_principal] [restrictions] 支持通配符等。最简单的写法是*/admin@HADOOP.COM ,代表名称匹配/admin@HADOOP.COM 都认为是admin,权限是 *。代表全部权限。
dict_file = /usr/share/dict/words
database_name = /var/kerberos/krb5kdc/principal
key_stash_file = /var/kerberos/krb5kdc/.k5.HADOOP.COM
admin_keytab = /var/kerberos/krb5kdc/kadm5.keytab #KDC 进行校验的 keytab
max_life = 24h
max_renewable_life = 10d #涉及到是否能进行ticket的renwe必须配置
default_principal_flags = +renewable, +forwardable
supported_enctypes = des3-hmac-sha1:normal arcfour-hmac:normal camellia256-cts:normal camellia128-cts:normal des-hmac-sha1:normal des-cbc-md5:normal des-cbc-crc:normal #支持的校验方式.注意把aes256-cts去掉
}

1
2
3
4
5

#### 2、客户端配置
> 每个客户端都需要配置

* 位置:```/etc/krb5.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# Configuration snippets may be placed in this directory as well
includedir /etc/krb5.conf.d/
[logging] #[logging]:表示 server 端的日志的打印位置
default = FILE:/var/log/krb5libs.log
kdc = FILE:/var/log/krb5kdc.log
admin_server = FILE:/var/log/kadmind.log

[libdefaults] #[libdefaults]:每种连接的默认配置,需要注意以下几个关键的小配置
default_realm = HADOOP.COM #设置Kerberos应用程序的默认领域。如果您有多个领域,只需向[realms]节添加其他的语句
dns_lookup_realm = false
#clockskew = 120 #时钟偏差是不完全符合主机系统时钟的票据时戳的容差,超过此容差将不接受此票据。通常,将时钟扭斜设置为 300 秒(5 分钟)。这意味着从服务器的角度看,票证的时间戳与它的偏差可以是在前后 5 分钟内。~~
ticket_lifetime = 24h #表明凭证生效的时限,一般为24小时
renew_lifetime = 7d #表明凭证最长可以被延期的时限,一般为一个礼拜。当凭证过期之后,对安全认证的服务的后续访问则会失败
forwardable = true #允许转发解析请求
rdns = false
udp_preference_limit = 1 #禁止使用udp可以防止一个Hadoop中的错误

[realms] #列举使用的realm
HADOOP.COM = {
kdc = node1:88 #代表要kdc的位置。格式是机器:端口。测试过程中也可不加端口。
admin_server = node1:749 #代表admin的位置。格式是机器:端口。测试过程中也可不加端口。
default_domain = HADOOP.COM #代表默认的域名。
}
[kdc]
profile=/var/kerberos/krb5kdc/kdc.conf
// 指定哪个域名数据哪个realm,单个realm不需要设置这个映射
[domain_realm]
#.example.com = EXAMPLE.COM
# example.com = EXAMPLE.COM

3、创建Kerberos数据库

在 kdc安装的机器上运行

create [-r HADOOP.COM] -s```,会需要输入密码。生成的文件在```/var/kerberos/krb5kdc```路径下。里面默认生成了kerberos的主体(principals)
1
2
3
4
5


#### 4、启动

* 开启kdc:```systemctl start krb5kdc

  • 开启可远程登陆:

    start kadmin```
    1
    * 开机自启动 ```systemctl enable krb5kdc```、 ```systemctl enable kadmin

  • 检查是否开启自启动:

    is-enabled krb5kdc```、```systemctl is-enabled kadmin```
    1
    2
    3
    4


    #### 5、数据库登陆方式(类似mysql登陆)
    * kdc本地登陆(用户名&密码):```kadmin.local

    • 远程登陆()

6、kdc 账号操作

  • 新增(默认的域可以省略) :addprinc hf/hfgrioup[@HADOOP.COM]
  • 删除:delprinc hf

7、kerberos 主体认证 (互斥)

  • 用户名+密码:
    hf```
    1
    * 秘钥

// 生成秘钥
xst -k /home/username/user.keytab user/ugroup
// 认证
kinit -kt /home/username/user.keytab user/ugroup

1
2
3
4
5
6
7

* 退出 : quit


## CDH 启用kerberos准备

* 为CDH创建管理员主体

kadmin.local -q “addprinc cloudrea-scm/admin”

输入密码

1
2

* 给管理员实例的所有主体授权

vim /var/kerberos/krb5kdc/kadm5.acl

*/admin@HADOOP.COM *

`

  • 在CDH的管控台开启Kerberos。

Alluxio

发表于 2020-02-05 | 分类于 大数据 | 评论数: | 阅读次数:

alluxio简单使用

本文是基于alluxio官网和自己实践整理。

  • Alluxio版本:1.8.1
  • CDH 1.15.2

1、介绍

以内存为中心的分布式虚拟存储系统。Alluxio在上层计算框架和底层存储系统之间架起了桥梁,应用层只需要访问Alluxio即可以访问底层对接了的任意存储系统的数据。作者是李浩源/范斌,都是中国人,所以官网 也提供了中文的文档。

在这里插入图片描述

2、功能简介

  • 灵活的API
  • 兼容Haddop 的HDFS文件系统接口
  • 分级存储,自定义分配和回收策略
  • 统一命名空间
  • 完整的命令行
  • Web UI

3、下载编译

默认从官网下载的执行包,支持的Hadoop 2.2.x,一般我们需要自己编译源码。从gitHub上下载下来,通过以下命令构建适合自己的版本

  • -T 2C install -Phadoop-2 -Dhadoop.version
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14

    * 说明
    * -T 2C:开启多线程编译,每个核cpu开启2个线程
    * -P:hadoop-2 为haddop大版本,可以是hadoop-1、hadoop-3
    * -D:具体细分版本号,我们这里是cdh版本

    ## 4、搭建和部署
    > 部署一般建议和计算框架同置(co-locate)部署。本次以3台做个集群,其中p1机器是alluxio主节点,若要做高可用可引入zk(2.0版本会使用masters自身做高可用,不需要zk),这里没有做高可用配置。

    * 下载/编译出适合自己的执行包
    * 上传到服务器上(3台都要),我将源码文件放在```/usr/local/git```并软链接到```/opt```下,后续我就直接在```/opt```下操作
    * 选取其中一台机器作为主节点master,hostname是p1
    * 在```${ALLUXIO_HOME}/conf```下
    * ```cp conf/alluxio-site.properties.template conf/alluxio-site.properties
    • 1
      2
      3
      4
      5
      6
           * ```alluxio.underfs.address=hdfs://p1:8020/alluxio/home```(namenode地址,即将hdfs跟路径挂载到alluxio下)
      * ```workers```文件里面添加woker主机名字,例如我这里是p2、p3
      * ```masters```文件里面添加master主机名字,例如我这里是p1
      * 利用alluxio提供拷贝命令到其他集群机器

      * 在hdfs 上新建```/alluxio/home```目录,```hdfs dfs -mkdir /alluxio/home
  • 启动Alluxio

    • 用启动hadoop同一用户来启动,例如hdfs
    • ${ALLUXIO_HOME}/bin,然后运行 ./alluxio format ```,只是第一次需要运行,会清空alluxio里面的数据
      1
      2
      3
      * ```cd ${ALLUXIO_HOME}/bin,然后运行 ./alluxio-start.sh all SudoMount ```,过程需要输入几次启动用户的密码
      * SudoMount 只是在第一次启动需要加,目的是挂载```/mnt/ramdisk```给alluxio作为默认的存储,若是一直hang住,检查启动用户是否配置了免密,我是将```hdfs ALL=(ALL) NOPASSWD: ALL```加入了```/etc/sudoers```里面。
      * ```${ALLUXIO_HOME}/bin ./alluxio runTests```测试集群,其实就是上传一些文件到```/alluxio/home
      • 到web UI查看:
        1
        * 执行命令查看集群信息:```cd ${ALLUXIO_HOME}/bin```然后```./alluxio fsadmin report

5、系统架构与原理

5.1 与操作系统文件系统对比

5.2 系统组件

集群组成:master、worker、client、UFS(底层存储)

master

  • 管理集群的元数据
    • 文件inode树
    • 文件到数据块block的映射
    • 数据快block到woker位置的映射
    • woker元数据(worker的状态)
  • 被动响应客户端RPC请求

    • client的对请求文件的操作
    • woker汇报状态心跳
  • 记录文件系统日志(集群重启后可以准确恢复)

secondary master

高可用模式下,集群可以有多个master节点,其中只有一个会被选举为primary mater,其余均为standby状态,称为secondary master,它不接受任何Alluxio组件的请求,只是将文件系统的日志持久化存储,在多个master间共享。

worker

  • 管理本机的存储资源(RAM、SSD、HDD)
  • 和底层存储(UFS)交互,缓存数据
  • 根据配置的缓存替换策略分配保存缓存数据

client

  • 向master发起操作文件的RPC请求
  • 从worker读取写入数据
  • client的jar包在编译后的源码文件
    1
    2
    3
    4
    5
    6
    7
    8
    * client jar 不能单独使用,需要与应用程序在同一个JVM里面,否则会抛异常
    * woker与client在一台机器,会短路读取数据(绕过请求worker的RPC请求,直接用本地文件系统读取woker里数据)

    ### 5.3、读写

    #### 5.3.1 读

    关键配置参数:```alluxio.user.file.readtype.default
值 说明
CACHE_PROMOTE(默认) 将数据块移动到worker最顶层,且缓存一个副本到本机worker
CACHE 将一个副本添加到本地worker中
NO_CACHE 不会创建副本
1.命中worker

命中本地worker(“短路读取”)

此时client直接通过本地文件系统读取存储在worker上的数据,称作为“短路读取”。

  • 此时需要获取本地文件的操作权限
  • 容器化容器里面运行alluxio client 和woker,可以通过 Unix domain socket 方式访问。

    Unix domain socket 又叫 IPC(inter-process communication 进程间通信) 主要用于同一主机上的进程间通信。与主机间的进程通信不同,它不是通过 “IP地址:端口号”的方式进程通信,不需要经过网络协议栈,不需要打包拆包、计算校验和、维护序号和应答等,只是将应用层数据从一个进程拷贝到另一个进程,使用 socket 类型的文件来完成通信。

命中远程worker

  • client 通过RPC连接远程的worker,woker 处理请求返回client数据
  • 并缓存一个副本在本地(发起rpc的机器的worker),这样可以加快下一次访问,但是副本数会增多,引起数据爆炸(但是这也是alluxio的特点,不像hdfs那样设置副本后就是固定死了)我们可以通过设置ReadType为NO_CACHE不缓存副本。
  • 2.0版本里面会有针对某个文件设置缓存的副本数量(但是没有全局的设置副本数量)
2.未命worker
  • 1.8 之前版本,alluxio client 会承担缓存任务,还需要配置读取的数据是部分还是整个,采取缓存/不缓存
  • 1.8 之后,缓存数据的任务交给woker异步执行,不需要关心读取的数据是完整的还是部分,因为所有的动作都在woker这边,默认woker工作机制是这样

    • 客户端顺序完整读取文件,则woker顺便缓存整个文件副本
    • 客户端不是顺序/完整读取,则woker会放弃读取时候顺便缓存,但是客户端会在读取完成后向woker发送异步缓存命令,worker 会继续缓存整个文件。
    • woker节点线程池大小:
      1
      2
      3
      4
      5


      #### 5.3.2 写

      关键配置参数:```alluxio.user.file.writetype.default
  • 写类型

值 说明
MUST_CACHE(默认) 同步将数据存储在Alluxio中(不怕丢), 本地有worker,“短路写”,本地无worker,写入远程woker
THROUGH 同步将数据存储在UFS中(怕丢,但是数据不会立即用到)
CACHE_THROUGH 同步将数据存储在Alluxio中和UFS中(怕丢,且数据会立即用到)
ASYNC_THROUGH(异步) 同步将数据写入到alluxio,所有数据块block会驻留在一个woker上,然后异步地写入底层存储系统。实验性写类型,2.0 版本会稳定些
  • 写定位策略
值 说明
LocalFirstPolicy(默认) 优先使用本地worker,若本地Worker没有足够的容量,从有效的worker列表中随机选择一个
MostAvailableFirstPolicy 使用拥有最多可用容量的worker
RoundRobinPolicy 循环选取存储下一个数据块的worker,若该worker没有足够的容量,跳过
SpecificHostPolicy 返回指定主机名的Worker

6、与HDFS集成

6.1 前提

  • HDFS 集群启动
  • Alluxio编译打包成对应的HDFS版本(参考上述下载编译)
  • 上传Alluxio编译后的源码包到集群机器上,我的位置为
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    * 确定好namenode的地址,我的cdh版本这里是:```hdfs://p1:8020```,p1为我namenode主机的ip


    ### 6.2 集成配置

    #### 配置方式

    * 普通模式:参考上面,搭建和部署目录
    * 高可用模式
    * 将Hadoop目录下的```hds-site.xml、core-site.xml```软链接到```${ALLUXIO_HOME}/conf```下
    * 更改```{ALLUXIO_HOME}/conf```下的```alluxio-site.properties```里面的属性```alluxio.underfs.address=nameservice```,其中nameservice为```core-site.xml```文件里面配置的HDFS服务名称。

    #### 权限

    alluxio文件系统实现了类似POSIX文件系统的用户和权限验证,所以我们需要确保HDFS上的用户、组和访问模式等文件的权限信息与Alluxio里面一致。alluxio提供了用户模拟功能,我们在```{ALLUXIO_HOME}/conf```里的```alluxio-site.properties```添加:

alluxio.master.security.impersonation.hdfs.users=
alluxio.master.security.impersonation.yarn.users=

alluxio.master.security.impersonation.hive.users=
alluxio.master.security.impersonation.root.users=

1
2
3
4
5
6
7


## 7、常用命令

通过上述的步骤,基本的一个基于HDFS存储的Alluxio集群搭建好了,我们一起来试试常用的命令感受下。首先```cd {ALLUXIO_HOME}/alluxio/bin```下。

### 7.1 管理员命令(fsadmin)

[hdfs@p1 bin]$ ./alluxio fsadmin
Usage: alluxio fsadmin [generic options]
[backup [directory] [–local]]
[doctor [category]]
[report [category] [category args]]
[ufs [–mode <noAccess/readOnly/readWrite>] ]
[hdfs@p1 bin]$

1
2

* backup 备份元数据

// 备份到hdfs中
[hdfs@p1 bin]$ ./alluxio fsadmin backup /meta
Successfully backed up journal to hdfs://p1:8020/meta/alluxio-backup-2019-11-13-1573636945711.gz
// 备份到本地文件中
[hdfs@p1 bin]$ ./alluxio fsadmin backup /opt/ –local
Successfully backed up journal to file:///opt/alluxio-backup-2019-11-13-1573637112922.gz on master p1
// 从备份文件中恢复元数据
hdfs@p1 bin]$ ./alluxio-start.sh -i /opt/alluxio-backup-2019-11-13-1573637112922.gz masters
Executing the following command on all master nodes and logging to /usr/local/git/alluxio/logs/task.log: /usr/local/git/alluxio/bin/alluxio-stop.sh master
Waiting for tasks to finish…
All tasks finished
Executing the following command on all master nodes and logging to /usr/local/git/alluxio/logs/task.log: /usr/local/git/alluxio/bin/alluxio-start.sh -i /opt/alluxio-backup-2019-11-13-1573637112922.gz master
Waiting for tasks to finish…
All tasks finished

1
2

* doctor 检查alluxio的配置

[hdfs@p1 bin]$ ./alluxio fsadmin doctor
No server-side configuration errors or warnings.

1
2
	 
* report 报告集群信息

// 有4个可选项,默认集群信息摘要,如:web界面地址,端口,woker数目等
[hdfs@p1 bin]$ ./alluxio fsadmin report -h
report [category] [category args]
Report Alluxio running cluster information.
Where [category] is an optional argument. If no arguments are passed in, summary information will be printed out.
[category] can be one of the following:
capacity worker capacity information
metrics metrics information
summary cluster summary(默认)
ufs under filesystem information

// capacity, wokers的容量信息汇总
[hdfs@p1 bin]$ ./alluxio fsadmin report capacity
Capacity information for all workers:
Total Capacity: 20.68GB
Tier: MEM Size: 20.68GB
Used Capacity: 0B
Tier: MEM Size: 0B
Used Percentage: 0%
Free Percentage: 100%

Worker Name Last Heartbeat Storage MEM
p2 0 capacity 10.34GB
used 0B (0%)
p3 0 capacity 10.34GB
used 0B (0%)
// ufs 集群配置底层存储系统信息
[hdfs@p1 bin]$ ./alluxio fsadmin report ufs
Alluxio under filesystem information:
hdfs://p1:8020/alluxio/home on / (hdfs, capacity=70.64GB, used=1197.66MB(1%), not read-only, not shared, properties={})

1
2

* ufs 存储层文件系统

// 有一个 –mode 可选择项目,下面可以跟三个参数
[hdfs@p1 bin]$ ./alluxio fsadmin ufs -h
Usage: ufs [–mode <noAccess/readOnly/readWrite>]

1
2

### 7.1 普通用户命令(fs)

[hdfs@p1 bin]$ ./alluxio fs
Usage: alluxio fs [generic options]
[cat ]
[checkConsistency [-r] ]
[checksum ]
[chgrp [-R] ]
[chmod [-R] ]
[chown [-R] [:] ]
[copyFromLocal ]
[copyToLocal ]
[count ]
[cp [-R] ]
[createLineage <inputFile1,…> <outputFile1,…> [<cmd_arg1> <cmd_arg2> …]]
[deleteLineage <cascade(true|false)>]
[du ]
[fileInfo ]
[free [-f] ]
[getCapacityBytes]
[getUsedBytes]
[head [-c ] ]
[help []]
[leader]
[listLineages]
[load [–local] ]
[loadMetadata ]
[location ]
[ls [-d|-f|-p|-R|-h|–sort=option|-r] ]
[masterInfo]
[mkdir [path2] … [pathn]]
[mount [–readonly] [–shared] [–option <key=val>] ]
[mv ]
[persist [ …]]
[pin ]
[report ]
[rm [-R] [-U] [–alluxioOnly] ]
[setTtl [–action delete|free] ]
[stat [-f ] ]
[tail [-c ] ]
[test [-d|-f|-e|-s|-z] ]
[touch ]
[unmount ]
[unpin ]
[unsetTtl ]

1
2
3
4
5
6
7
8

命令很多,如果熟悉Linux命令的话,掌握起来不难。我们重点看几个命令

* checkConsistency

对比某个给定路径下Allluxio及底层存储系统的元数据。给出的路径是目录,会比较所有子内容。检查的是目录子树的读锁,在命令完成之前,无法对目录子树文件/目录进行更新或者写操作。

* copyFromLocal

// 将本地文件/目录 拷贝到alluxio里面
[hdfs@p1 bin]$ ./alluxio fs copyFromLocal /opt/fm.text /123
Copied file:///opt/fm.text to /123

1
2
3
4

* free

将文件从释放中释放,前提是这个文件已经持久化到UFS了,不然是没办法释放的。

[hdfs@p1 bin]$ ./alluxio fs free /123
Cannot free file /123 which is not persisted

1
2
3
4
5


* location

显示文件所在的worker

[hdfs@p1 bin]$ ./alluxio fs location /123
/123 with file id 16810770431 is on nodes:
p3

1
2

* mount

//显示所有挂载点
[hdfs@p1 bin]$ ./alluxio fs mount
hdfs://p1:8020 on / (hdfs, capacity=70.64GB, used=1191.04MB(1%), not read-only, not shared, properties={})
// 挂载hdfs://p1:8020/meta 到/meta下
[root@p1 bin]# ./alluxio fs mount /meta hdfs://p1:8020/meta
Mounted hdfs://p1:8020/meta at /meta

1
2
3
4

* unMount

取消挂载点

[root@p1 bin]# ./alluxio fs unmount /meta
Unmounted /meta

1
2

* persist

// 将aluxio的/1234目录持久化到hdfs中
[root@p1 bin]# ./alluxio fs persist /1234
persisted file /1234 with size 46

// 查看hdfs是否持久化了,我们初始化时候是挂载hdfs目录/alluxio/home到alluxio中的
[root@p1 bin]# hdfs dfs -ls /alluxio/home
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.15.2-1.cdh5.15.2.p0.3/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.15.2-1.cdh5.15.2.p0.3/lib/hadoop/lib/alluxio-1.8.1-client.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Found 2 items
-rw-r–r– 2 hdfs supergroup 46 2019-11-14 11:36 /alluxio/home/1234
drwxr-xr-x - 0 2019-11-14 10:20 /alluxio/home/default_tests_files

1
2
3
4

* setTtl

* --action deltele 参数(alluxio和ufs里面都会删除)

// 设置5秒后删除(alluxio和hdfs里面都会删除)
[root@p1 bin]# ./alluxio fs setTtl –action delete /1234 5000
TTL of path ‘/1234’ was successfully set to 5000 milliseconds, with expiry action set to DELETE

//5秒后,查看alluxio(/1234 没了)
[root@p1 bin]# ./alluxio fs ls /
46 NOT_PERSISTED 11-14-2019 10:46:46:775 100% /123
46 NOT_PERSISTED 11-14-2019 10:47:18:184 100% /12345
12 PERSISTED 11-14-2019 10:20:41:992 DIR /default_tests_files
46 NOT_PERSISTED 11-14-2019 10:44:37:127 100% /fm.text

// 5秒后,查看hdfs(/1234 没了)
[root@p1 bin]# hdfs dfs -ls /alluxio/home
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.15.2-1.cdh5.15.2.p0.3/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/cloudera/parcels/CDH-5.15.2-1.cdh5.15.2.p0.3/lib/hadoop/lib/alluxio-1.8.1-client.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Found 1 items
drwxr-xr-x - 0 2019-11-14 10:20 /alluxio/home/default_tests_files

1
2
3
4
5
6
7
8
9
10

### 7.3通过hadoop命令来操作Alluxio

Alluxio提供了兼容HDFS的接口,因此我们可以在执行hdfs命令时候,通过alluxio client 传递给allxuio 实现操作alluxio的目的。

* 在cm控制台,修改hadoop-env.sh

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-wfGMhSB5-1574129851147)(/Users/huangfan/Desktop/hadoop-env.png)]

* 操作alluxio里面文件

// 查看allxuio全部文件
[root@p1 bin]# hdfs dfs -ls alluxio://localhost:19998/
Found 4 items
-rw-r–r– 3 46 2019-11-14 10:46 alluxio://localhost:19998/123
-rw-r–r– 3 46 2019-11-14 10:47 alluxio://localhost:19998/12345
drwxr-xr-x - 12 2019-11-14 11:49 alluxio://localhost:19998/default_tests_files
-rw-r–r– 3 46 2019-11-14 11:54 alluxio://localhost:19998/fm.text
// 创建文件并查看
[root@p1 bin]# hdfs dfs -mkdir alluxio://localhost:19998/from-hdfs
[root@p1 bin]# hdfs dfs -ls alluxio://localhost:19998/
Found 5 items
-rw-r–r– 3 46 2019-11-14 10:46 alluxio://localhost:19998/123
-rw-r–r– 3 46 2019-11-14 10:47 alluxio://localhost:19998/12345
drwxr-xr-x - 12 2019-11-14 11:49 alluxio://localhost:19998/default_tests_files
-rw-r–r– 3 46 2019-11-14 11:54 alluxio://localhost:19998/fm.text
drwxrwxrwx - 0 2019-11-14 12:08 alluxio://localhost:19998/from-hdfs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

## 8、与计算框架整合

计算框架使用```alluxio client```需要在同一个JVM里面,且在```classpath```下能够找到```alluxio client```。
编译打包后```alluxio client```在```${ALLUXIO_HOME}/client```下。

### 8.1 与MapReduce整合

#### 8.1.1 整合方式


* ```-libjars```命令,它会把```alluxio client ```放到Hadoop的Distributed Cache中,所有节点均可以访问到。

* 手动将```alluxio client```放到每个MapReduce的```${HADOOP_HOME}/lib```下,对于我的CDH是在```/opt/cloudera/parcels/CDH/lib/hadoop/lib```下。


#### 8.1.2 验证

##### 命令验证

[hdfs@p1 bin]$ pwd
/opt/alluxio/integration/checker/bin
[hdfs@p1 bin]$ ./alluxio-checker.sh mapreduce
… 省略 …
* Integration test passed. *

1
2
3
4

##### wordcount验证

* 准备被统计的文件

// 将${ALLUXIO_HOME}下的LICENSE文件拷贝到alluxio中
[hdfs@p1 lib]$ /opt/alluxio/bin/alluxio fs copyFromLocal /opt/alluxio/LICENSE /input

1
2
 
* wordcount

// 我的 cdh 的hadoop 安装目录在 /opt/cloudera/parcels/CDH/lib
[hdfs@p1 opt]$ cd /opt/cloudera/parcels/CDH/lib
// 执行 wordcount
[hdfs@p1 lib]$ hadoop jar hadoop-mapreduce/hadoop-mapreduce-examples-2.6.0-cdh5.15.2.jar wordcount -libjars /opt/alluxio/client/alluxio-1.8.1-client.jar alluxio://p1:19998/input alluxio://p1:19998/output

1
2
3
4
5
6
7
8
9
10
11
12
13
14
	
* 到alluxio的Web UI统计信息

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-MhB1n816-1574129851147)(/Users/huangfan/Desktop/wordcount.png)]

### 8.2 与Hive整合

前提:alluxio与MapReduce整合成功。

我在cm控制台修改hive.env.sh 文件,其他方式请自行找到hive-env.sh 文件修改

添加:

```HIVE_AUX_JARS_PATH=/usr/local/git/alluxio/client/alluxio-1.8.1-client.jar:${HIVE_AUX_JARS_PATH}

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-S7BksU1N-1574129851147)(/Users/huangfan/Desktop/hive.env.png)]

8.2.1 存储部分Hive表

场景:常用的表存储在Alluxio中,获取高吞吐量和低延迟。

准备:下载文件 下载ml-100k.zip 文件,上传到服务器上,例如我上传到

/opt```下,解压。拷贝到Alluxios上
1
2


[hdfs@p1 opt]$ alluxio/bin/alluxio fs mkdir /ml-100
[hdfs@p1 opt]$ alluxio/bin/alluxio fs copyFromLocal /opt/ml-100k/u.user alluxio://localhost:19998/ml-100

1
2

* 存储内部表

CREATE TABLE u_user (
userid INT,
age INT,
gender CHAR(1),
occupation STRING,
zipcode STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ‘|’
LOCATION ‘alluxio://p1:19998/ml-100’;

1
2

* 存储外部表

CREATE EXTERNAL TABLE hive_hdfs (
userid INT,
age INT,
gender CHAR(1),
occupation STRING,
zipcode STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ‘|’;
LOCATION ‘alluxio://p1:19998/ml-100’;

1
2
3
4

此时hive内部表的存储位置变成了```ml-100```目录下,不是在默认的hdfs下面了。内部表,在hive里面删除表```u_user```时候,alluxio里存储的```/ml-100```也会被删除。外部表,在hive里面删除表```u_user```时候,alluxio里面的```/ml-100```不会被删除。

* 使用hdfs里面的表

hive> alter table u_user set location “alluxio://127.0.0.1:19998/tables/u_user”;
OK
Time taken: 3.572 seconds

1
2

* 恢复到hdfs里面

hive> alter table u_user set location “hdfs://127.0.0.1:8020/alluxio/home”;
OK
Time taken: 1.554 seconds

1
2
3
4
5
6

#### 8.2.1 存储全部Hive表

这种情况是Hive使用Alluxio作为默认文件系统替代hdfs。

* 修改```hive-siet.xml

1
2
3
4
<property>
<name>fs.defaultFS</name>
<value>alluxio://localhost:19998</value>
</property>

案例就不演示了,因为hive底层还是用hdfs好些,节省空间。

8.3 与Presto整合

版本:presto-server-0.228

Prest是从HiveMetaStore里面获取元数据信息,然后通过元数据信息来获取底层ufs(这里是hdfs),它查询数据不是像hive那样提交MapReduce,而是直接操作底层ufs。

8.3.1 下载配置presto

presto下载配置移步

其他基本配置可以参照官网,其中catalog配置是关键,我这

1
2


connector.name=hive-hadoop2
hive.metastore.uri=thrift://p1:9083
// 保证presto可以访问到hdfs
hive.config.resources=/etc/hive/conf/core-site.xml,/etc/hive/conf/hdfs-site.xml
hive.allow-drop-table=false
hive.allow-rename-table=false
hive.allow-add-column=false
hive.allow-rename-column=false
hive.force-local-scheduling=true

1
2

将```${ALLUXIO_HOME}/conf```下的```alluxio-site.properties```文件路径加到presto的```jvm.config```中,这样在allxuio里面设置的属性会应用到presto

-Xbootclasspath/p:/opt/alluxio/conf

1
2
3
4

**做以下几个配置**

* 读写超时配置(```alluxio-site.properties```)

// sec、min、hour、day结尾的配置都可以,从源码看到,代码层做了自适应
alluxio.user.network.netty.timeout=10min

1
2
3
4
 

* 启用Presto中数据本地性(```${PRESTO_HOME/etc/catalog/hive.properties} ```)
> 一般 Presto worker 与 Alluxio worker 同置部署,开启这个属性后,pesto处理分片的工作可以被调度到有该分片的机器上。

hive.force-local-scheduling=true

1
2
3
4

注意:网上很多说,presto调度是基于Alluxio worker的文件块地址与Presto worker地址之间的字符串匹配进行的(没看pesto源码我不确定)

* 设置Presto分布式查询粒度(```${PRESTO_HOME/etc/catalog/hive.properties} ```)

// 默认 alluxio.user.block.size.bytes.default=512M,我们需要将查询分割设置>512MB,减少presto在同一个块上多次并行查询带来相互阻塞。
hive.max-split-size=600MB
1
2
	
* 更改读写类型(```alluxio-site.properties```)
//默认读,首先将数据块从SSD或者HDD移动到MEM,然后再读取MEM中的数据块 alluxio.user.file.readtype.default=CACHE_PROMOTE // 双写(内存和ufs),默认写是MUST_CACHE,只写内存 alluxio.user.file.writetype.default=CACHE_THROUGH
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
	
## 8、配置使用

### 8.1 服务端配置
主要配置都是在```${ALLUXIO_HOME}/conf```下的```alluxio-site.properties```配置里面,集群内所有的机器上都需要设置。可以在alluxio的Web UI 界面看每个属性配置的值/默认值。

![在这里插入图片描述](https://img-blog.csdnimg.cn/2019111910284938.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L2h1YW5nZmFuMzIy,size_16,color_FFFFFF,t_70)

### 8.2 客户端配置

alluxio客户端的初始化是以集群master配置新来初始化的,也就是在```${ALLUXIO_CONF}/conf```下 ```alluxio-site.properties```里面配置的信息会应用到客户端的初始化。例如设置写类型:```alluxio.user.file.writetype.default=CACHE_THROUGH```。
当然这样一刀切的配置肯定不是最优的,可以通过在客户端设置:```alluxio.user.conf.cluster.default.enable=false```来忽略或者覆盖集群范围内默认值,客户端的配置一般是通过设置JVM参数"-D",或者通过api在代码里面设置。


### 8.3 配置工具

alluxi 提供了一些在配置时候提高效率的小工具,说到底就是shell脚本来,具体可以看```${ALLUXIO_HOME}/conf/alluxio```这个脚本内容。


* copyDir

//同步配置到所有worker机器上,不用再傻傻scp了。
./alluxio copyDir [path to alluxio’s conf dir]

1
2

* getConf

// 查看属性值
[hdfs@p1 bin]# ./alluxio getConf alluxio.user.file.writetype.default
CACHE_THROUGH
// 查看属性配置来源
[hdfs@p1 bin]# ./alluxio getConf –source alluxio.user.file.writetype.default
SITE_PROPERTY (/usr/local/git/alluxio/conf/alluxio-site.properties)
// 查看集群默认配置
[root@p1 bin]# ./alluxio getConf –master

alluxio.conf.dir=/usr/local/git/alluxio/conf
alluxio.conf.validation.enabled=true
alluxio.debug=false
alluxio.extensions.dir=/usr/local/git/alluxio/extensions
alluxio.fuse.cached.paths.max=500
alluxio.fuse.debug.enabled=false
alluxio.fuse.fs.name=alluxio-fuse
alluxio.fuse.maxwrite.bytes=128KB
alluxio.home=/usr/local/git/alluxio
alluxio.integration.master.resource.cpu=1
alluxio.integration.master.resource.mem=1024MB
alluxio.integration.mesos.alluxio.jar.url=http://downloads.alluxio.org/downloads/files/1.8.1/alluxio-1.8.1-bin.tar.gz
alluxio.integration.mesos.jdk.path=jdk1.8.0_151
alluxio.integration.mesos.jdk.url=LOCAL
alluxio.integration.mesos.master.name=AlluxioMaster
alluxio.integration.mesos.master.node.count=1
alluxio.integration.mesos.principal=alluxio
alluxio.integration.mesos.role=*
alluxio.integration.mesos.secret=(no value set)
alluxio.integration.mesos.user=(no value set)
alluxio.integration.mesos.worker.name=AlluxioWorker
alluxio.integration.worker.resource.cpu=1
alluxio.integration.worker.resource.mem=1024MB
alluxio.integration.yarn.workers.per.host.max=1
…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

## 9、存储管理

### 9.1 单层模式

不需要设置,默认在集群启动时候,alluxio会为wokers分配ramdisk,


### alluxio与ufs元数据同步

* 客户端

alluxio1.7 之后支持

客户端调用时候,增加参数:```alluxio.user.file.metadata.sync.interval=int```,int<0 永远不同步,int>0 在间隔时间内不同步,int=0 操作之前,代理总是会同步路径的元数据

alluxio fs ls -R -Dalluxio.user.file.metadata.sync.interval=0 /dirpath

1
2
3
4

* 服务端异步

alluxio 2.0 + HDFS 2.7 以上版本

// 启动
./alluxio fs startSync /syncedDirPath

// 关闭
./alluxio fs stopSync /syncedDir

1
2
3
4
5
6
7
8
9
10
11

## 10、异常诊断和调试

### 10.1 日志

在```${ALLUXIO_HOME}/logs```下,```*.log```为log4j 生成的,```*.out```是标准的输出和错误流重定向文件。一般我们查看```master.log 、worker.log、user_${USER}.log```来排查问题。


### 10.2 远程调试

在 ```${ALLUXIO_HOME}/conf```下的```alluxio-env.sh```配置调试的环境变量:

export ALLUXIO_WORKER_JAVA_OPTS=”$ALLUXIO_JAVA_OPTS -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=6606”
export ALLUXIO_MASTER_JAVA_OPTS=”$ALLUXIO_JAVA_OPTS -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=6607”
export ALLUXIO_USER_DEBUG_JAVA_OPTS=”-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=6609”
`

然后我们就可以在 IntelliJ IDEA 或者Eclipse里面开启Remote Debug 调试了。

Redis常见应用

发表于 2020-02-05 | 评论数: | 阅读次数:

1、分布式锁

分布式锁本质上要实现的目标就是在 Redis 里面占一个“茅坑”,当别的进程也要来占时,发现已经有人蹲在那里了,就只好放弃或者稍后再试。

A、几种方式

  • setnx (set if not exit)
1
2
3
4
5
127.0.0.1:6379> setnx lock true
(integer) 1
127.0.0.1:6379> del lock
(integer) 1
127.0.0.1:6379>

但是有个问题,如果逻辑执行到中间出现异常了,可能会导致 del 指令没有被调用,这样就会陷入死锁,锁永远得不到释放。

  • setnx + expire
1
2
3
4
5
127.0.0.1:6379> setnx lock true
(integer) 1
127.0.0.1:6379> expire lock 5
(integer) 1
127.0.0.1:6379>

但是以上逻辑还有问题。如果在 setnx 和 expire 之间服务器进程突然挂掉了,可能是因为机器掉电或者是被人为杀掉的,就会导致 expire 得不到执行,也会造成死锁。为了解决这个疑难,Redis 开源社区涌现了一堆分布式锁的 library,专门用来解决这个问题。实现方法极为复杂。

  • set [ex seconds] [nx]
1
2
3
4
5
127.0.0.1:6379> set lock  true ex 5 nx
OK
127.0.0.1:6379> del lock
(integer) 1
127.0.0.1:6379>

Redis 2.8 版本中作者加入了 set 指令的扩展参数,使得 setnx 和 expire 指令可以一起执行,彻底解决了分布式锁的乱象。从此以后所有的第三方分布式锁 library 可以休息了。

B、超时问题

Redis 的分布式锁不能解决超时问题,如果在加锁和释放锁之间的逻辑执行的太长,以至于超出了锁的超时限制,就会出现问题。因为这时候第一个线程持有的锁过期了,临界区的逻辑还没有执行完,这个时候第二个线程就提前重新持有了这把锁,导致临界区代码不能得到严格的串行执行。

  • 最佳实践

    • 不要执行较长时间的任务
    • set 的value设置一个随机数,释放锁时先匹配随机数是否一致,然后再删除 key。
    • 匹配 value 和删除 key 不是一个原子操作
    • lua 脚本解决原子性,以python代码为例

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      tag = random.nextint()  # 随机数
      if redis.set(key, tag, nx=True, ex=5):
      do_something()
      redis.delifequals(key, tag) # 自定义的 delifequals 指令

      # delifequals
      if redis.call("get",KEYS[1]) == ARGV[1] then
      return redis.call("del",KEYS[1])
      else
      return 0
      end

这也不是一个完美的方案,它只是相对安全一点,因为如果真的超时了,当前线程的逻辑没有执行完,其它线程也会乘虚而入。

C、可重入性

可重入性是指线程在持有锁的情况下再次请求加锁,如果一个锁支持同一个线程的多次加锁,那么这个锁就是可重入的。如 Java 里的 ReentrantLock 就是可重入锁。Redis 分布式锁如果要支持可重入,需要对客户端的 set 方法进行包装,我用线程的 Threadlocal 变量存储当前持有锁的计数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import redis.clients.jedis.Jedis;

public class RedisWithReentrantLock {

private ThreadLocal<Map<String, Integer>> lockers = new ThreadLocal<>();

private Jedis jedis;

public RedisWithReentrantLock(Jedis jedis) {
this.jedis = jedis;
}

private boolean _lock(String key) {
return jedis.set(key, "", "nx", "ex", 5L) != null;
}

private void _unlock(String key) {
jedis.del(key);
}

private Map<String, Integer> currentLockers() {
Map<String, Integer> refs = lockers.get();
if (refs != null) {
return refs;
}
lockers.set(new HashMap<>());
return lockers.get();
}

public boolean lock(String key) {
Map<String, Integer> refs = currentLockers();
Integer refCnt = refs.get(key);
if (refCnt != null) {
refs.put(key, refCnt + 1);
return true;
}
boolean ok = this._lock(key);
if (!ok) {
return false;
}
refs.put(key, 1);
return true;
}

public boolean unlock(String key) {
Map<String, Integer> refs = currentLockers();
Integer refCnt = refs.get(key);
if (refCnt == null) {
return false;
}
refCnt -= 1;
if (refCnt > 0) {
refs.put(key, refCnt);
} else {
refs.remove(key);
this._unlock(key);
}
return true;
}

public static void main(String[] args) {
Jedis jedis = new Jedis();
RedisWithReentrantLock redis = new RedisWithReentrantLock(jedis);
System.out.println(redis.lock("fan"));
System.out.println(redis.lock("fan"));
System.out.println(redis.unlock("fan"));
System.out.println(redis.unlock("fan"));
}
}

D、redisson

redis 官方推荐的redis分布式锁库,你想要的基本都有,github地址

2、Bitmap位图

位图不是特殊的数据结构,它的内容其实就是普通的字符串,也就是 byte 数组。我们可以使用普通的 get/set 直接获取和设置整个位图的内容,也可以使用位图操作 getbit/setbit 等将 byte 数组看成「位数组」来处理。

  • 统计活跃用户

    • set结构:sadd key,value (uesrId,1),若用户数很大,则key会很多,不推荐
    • 位图操作:setbit key, offset, value (”activeUser”,userId,1),此时userId 需要为整数不重复,且最好连续自增。
  • 操作命令

    • set key value 这个设置key的值(整体设)
    • setbit key offset bit 设置value二进制中某个位置的值
    • getbit key offset 获取value指定位置的值
    • bitcount key [start end] 统计指定位置范围内 1 的个数,注意这里面的start和end是字符索引
    • bitpos key bit [start] [end] 查找指定范围内出现的第一个 0 或 1
    • bitfield key [get type offset] [set type offset value] [incrby type offset increment] 多个连续位操作(最长64位)

3、HyperLogLog

用来做基数统计的算法,HyperLogLog 的优点是,在输入元素的数量或者体积非常非常大时,计算基数所需的空间总是固定 的、并且是很小的。占用内存12K,但是它的精确度不是很高,我们也无法知道某个key是否已经在其中了。

  • 统计uv(如:key为用户id)

    • set结构:sadd key , scard key 如果数据页面uv很大,则不合适了
    • HyperLogLog: pfadd key value ,pfcount key
    • 多个页面的uv合并,可以使用pfmerge
  • set和HyperLogLog统计页面uv,set 的key为suv,HyperLogLog uv 为huv

1
2
3
4
5
6
7
8
127.0.0.1:6379> sadd suv u1
(integer) 1
127.0.0.1:6379> sadd suv u2
(integer) 1
127.0.0.1:6379> sadd suv u3
(integer) 1
127.0.0.1:6379> scard suv
(integer) 3
1
2
3
4
127.0.0.1:6379> pfadd huv u1 u2 u3
(integer) 1
127.0.0.1:6379> pfcount huv
(integer) 3
1
2
3
4
5
6
7
8
9
10
127.0.0.1:6379> pfadd huv u1 u2 u3
(integer) 1
127.0.0.1:6379> pfcount huv
(integer) 3
127.0.0.1:6379> pfadd huv2 u4 u5 u6
(integer) 1
127.0.0.1:6379> pfmerge huv huv2
OK
127.0.0.1:6379> pfcount huv
(integer) 6

HyperLogLog这个数据结构的发明人名字叫Philippe Flajolet ,命令pf为首字母缩写。至于内存占用12K,可以查看这个PPT。

4、BloomFilter布隆过滤器

布隆过滤器可以理解为一个不怎么精确的 set 结构,当你使用它的 contains 方法判断某个对象是否存在时,它可能会误判。当布隆过滤器说某个值存在时,这个值可能不存在;当它说不存在时,那就肯定不存在。

  • 目前布隆过滤器还是基于插件形式,我们直接通过docker 获取带插件的redis

    1
    2
    3
    docker pull redislabs/rebloom
    docker run --name redisBloom -d -p 6379:6379 redislabs/rebloom
    redis-cli
  • 操作命令

    • 单个操作

      1
      2
      3
      4
      5
      6
      7
      8
      9
       127.0.0.1:6379> bf.add news 1
      (integer) 1
      127.0.0.1:6379> bf.add news 2
      (integer) 1
      127.0.0.1:6379> bf.add news 3
      (integer) 1
      127.0.0.1:6379> bf.exists news 1
      (integer) 1
      127.0.0.1:6379> bf.exists news 4
    • 多个操作
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    127.0.0.1:6379> bf.madd news 11 22 33 44 55
    1) (integer) 1
    2) (integer) 1
    3) (integer) 1
    4) (integer) 1
    5) (integer) 1
    127.0.0.1:6379> bf.mexists news 4 5 6 7 8 9
    1) (integer) 1
    2) (integer) 1
    3) (integer) 1
    4) (integer) 1
    5) (integer) 1
    6) (integer) 1
* 布隆过滤器算法应用
  * 新闻推荐系统,推送去重
  * 爬虫url去重
  * 垃圾邮件过滤
  * HBase、Cassandra、LevelDB等使用它过滤不在的查询

* 原理(粗略)


[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FRwmHX90-1570534107291)(/Users/huangfan/Downloads/16464301a0e26416-2.png)]

每个布隆过滤器对应到 Redis 的数据结构里面就是一个大型的位数组和几个不一样的无偏 hash 函数(上图中的f g h)。所谓无偏就是能够把元素的 hash 值算得比较均匀。

* 添加操作:使用多个hash函数对key进行hash计算,得到一个整数索引值,然后再和数组长度取模得到一个位置,再这将这几个位置置为1,就完成了操作。index=hash(key)%/tableSize,table[index]=1。

* 判断key是否存在:把 hash 的几个位置都算出来,看看位数组中这几个位置是否都为 1,只要有一个位为 0,那么说明布隆过滤器中这个 key 不存在。如果都是 1,这并不能说明这个 key 就一定存在,只是极有可能存在,因为这些位被置为 1 可能是因为其它的 key 存在所致。如果这个位数组比较稀疏,判断正确的概率就会很大,如果这个位数组比较拥挤,判断正确的概率就会降低。
* 空间占用估计
     * 输入参数2个:
n为预计元素数量,f为错误率。
1
2
3
4
5
6
     * 输出结果2个:```l,k```,l为位数组长度,k为最佳的hash函数个数。


## 5、限流

* 例子:限制一个用户一段时间发帖频率。关键点用户```userId```,一段时间```period```,发帖这个动作```actionKey```,限制次数```maxCount
* 简单的限流方法,利用zset结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
 public boolean isActionAllowed(String userId, String actionKey, int period, int maxCount) {
String key = String.format("hist:%s:%s", userId, actionKey);
long nowTs = System.currentTimeMillis();
// 保障批量操作的事务性
Pipeline pipe = jedis.pipelined();
pipe.multi();
pipe.zadd(key, nowTs, "" + nowTs);
pipe.zremrangeByScore(key, 0, nowTs - period * 1000);
Response<Long> count = pipe.zcard(key);
pipe.expire(key, period + 1);
pipe.exec();
pipe.close();
return count.get() <= maxCount;
}
  • 漏斗限流方法
    • 漏斗剩余空间:可持续进行的数量
    • 漏斗流水速率:允许行为最大的频率

使用Java代码实现漏斗限流方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class FunnelRateLimiter {

static class Funnel {
int capacity;
float leakingRate;
int leftQuota;
long leakingTs;

public Funnel(int capacity, float leakingRate) {
this.capacity = capacity;//漏斗容量
this.leakingRate = leakingRate;//漏斗流水速率
this.leftQuota = capacity;//漏斗剩余空间
this.leakingTs = System.currentTimeMillis();//上一次漏水时间
}

void makeSpace() {
long nowTs = System.currentTimeMillis();
long deltaTs = nowTs - leakingTs;//距离上一次漏水过去了多久
int deltaQuota = (int) (deltaTs * leakingRate);//可腾出的空间
if (deltaQuota < 0) { // 间隔时间太长,整数数字过大溢出
this.leftQuota = capacity;
this.leakingTs = nowTs;
return;
}
if (deltaQuota < 1) { // 腾出空间太小,最小单位是1
return;
}
this.leftQuota += deltaQuota;
this.leakingTs = nowTs;
if (this.leftQuota > this.capacity) {
this.leftQuota = this.capacity;
}
}

// quota 倒入漏斗的水容量
boolean watering(int quota) {
makeSpace();
if (this.leftQuota >= quota) {
this.leftQuota -= quota;
return true;
}
return false;
}
}

private Map<String, Funnel> funnels = new HashMap<>();

public boolean isActionAllowed(String userId, String actionKey, int capacity, float leakingRate) {
String key = String.format("%s:%s", userId, actionKey);
Funnel funnel = funnels.get(key);
if (funnel == null) {
funnel = new Funnel(capacity, leakingRate);
funnels.put(key, funnel);
}
return funnel.watering(1); // 需要1个quota
}
}

Redis 4.0 之后可以安装限流模块,它叫 redis-cell。该模块也使用了漏斗算法,并提供了原子的限流指令。该模块只有1条指令cl.throttle。

  • cl.throttle xiaoming:reply 15 30 60 1
  • 该命令意思:允许小明回复行为的频率是60s最多30次,即2秒1次的速率,漏斗的初始容量15次(即一开始可以连续回复15个帖子),

我下载最新版本redis,操作如下

1
2
3
4
5
6
7
8
9
10
11
// 安装redis
brew install redis

// 到redis的bin下,加载下载的redis-cell模块
./redis-server --loadmodule /usr/local/Cellar/redis/5.0.5/extend/libredis_cell.dylib
127.0.0.1:6379> cl.throttle xiaoming:reply 15 30 60 1
1) (integer) 0 # 0 表示允许,1表示拒绝
2) (integer) 16 # 漏斗容量capacity(15+1,15 是从0开始计数的)
3) (integer) 15 # 漏斗剩余空间left_quota(16-1)
4) (integer) -1 # 如果拒绝了,需要多长时间后再试(漏斗有空间了,单位秒)
5) (integer) 2 # 多长时间后,漏斗完全空出来(left_quota==capacity,单位秒)

5、GeoHash(距离计算)

  • GeoHash 算法

GeoHash 算法将二维的经纬度数据映射到一维的整数,这样所有的元素都将在挂载到一条线上,距离靠近的二维坐标映射到一维后的点之间距离也会很接近。当我们想要计算「附近的人时」,首先将目标位置映射到这条线上,然后在这个一维的线上获取附近的点就行了。

那这个映射算法具体是怎样的呢?它将整个地球看成一个二维平面,然后划分成了一系列正方形的方格,就好比围棋棋盘。所有的地图元素坐标都将放置于唯一的方格中。方格越小,坐标越精确。然后对这些方格进行整数编码,越是靠近的方格编码越是接近。那如何编码呢?一个最简单的方案就是切蛋糕法。设想一个正方形的蛋糕摆在你面前,二刀下去均分分成四块小正方形,这四个小正方形可以分别标记为 00,01,10,11 四个二进制整数。然后对每一个小正方形继续用二刀法切割一下,这时每个小小正方形就可以使用 4bit 的二进制整数予以表示。然后继续切下去,正方形就会越来越小,二进制整数也会越来越长,精确度就会越来越高。

  • Geo命令
    • 新增:geoadd bike 116.562108 39.787602 bike1 116.334255 40.027400 bike2 新增bike1和bike2 的经纬度
    • 距离:geodist bike bike1 bike2 km //计算bike1和bike2之间的距离,单位km
    • 元素附近:georadiusbymember bike bike1 2 km count 3 desc //在bike1附近2km内的3个自行车,距离从远到近
    • 坐标附近:georadius bike 116.334255 40.027400 5 km withdist count 2 asc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
127.0.0.1:6379[2]> geoadd bike 116.562108 39.787602 bike1
(integer) 1
127.0.0.1:6379[2]> geoadd bike 116.334255 40.027400 bike2
(integer) 1
127.0.0.1:6379[2]> geoadd bike 112.334255 30.028400 bike3
(integer) 1
127.0.0.1:6379[2]> geoadd bike 192.334255 10.028400 bike4
(integer) 1
127.0.0.1:6379[2]> geodist bike bike1 bike2
"33004.6915"
127.0.0.1:6379[2]> geodist bike bike1 bike2 km
"33.0047"
127.0.0.1:6379[2]> geodist bike bike1 bike3 km
"1151.5533"
127.0.0.1:6379[2]> georadiusbymember bike bike1 2 km count 3 desc
1) "bike1"
2) "haluo"
127.0.0.1:6379[2]> georadius bike 116.334255 40.027400 5 km withdist count 2 asc
1) 1) "bike2"
2) "0.0002"
2) 1) "mobai"
2) "0.0002"

Alluxio+Presto 查询

发表于 2020-02-05 | 分类于 大数据 | 评论数: | 阅读次数:

1、 总述

主机 cpu 内存 磁盘
p1(master) 16 core 64GB 100GB
P2 8 core 32GB 100GB
P2 8 core 32GB 100GB

之前我基于TPC-DS 做了很多轮测试,数据量也分布了几个维度,从结果行看性能提升并不明显,只是单表查询上略有提升,多表基本无变化。结果懒得贴上去了,具体可以看我提的issue。

TPC-DS 典型的单表查询

  • group by 、count
1
select ss_sold_Date_sk,count(*) as cnt from hive.tpcds_bin_partitioned_orc_50.store_sales group by ss_sold_Date_sk
  • group by、 order by、count
1
select ss_sold_Date_sk,count(*) as cnt from hive.tpcds_bin_partitioned_orc_50.store_sales group by ss_sold_Date_sk order by cnt desc,ss_sold_Date_sk limit 10
  • group by 、order by、count 、avg
1
select ss_sold_Date_sk,ss_wholesale_cost,avg(ss_item_sk) as cnt,count(distinct(ss_sales_price)) as avg1 from hive.tpcds_bin_partitioned_orc_50.store_sales group by ss_sold_Date_sk,ss_wholesale_cost  order by cnt desc,ss_sold_Date_sk limit 10

TPC-DS 典型的多表查询

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
select channel, col_name, d_year, d_qoy, i_category, COUNT(*) sales_cnt, SUM(ext_sales_price) sales_amt FROM (
SELECT 'store' as channel, 'ss_promo_sk' col_name, d_year, d_qoy, i_category, ss_ext_sales_price ext_sales_price
FROM store_sales, item, date_dim
WHERE ss_promo_sk IS NULL
AND ss_sold_date_sk=d_date_sk
AND ss_item_sk=i_item_sk
UNION ALL
SELECT 'web' as channel, 'ws_ship_customer_sk' col_name, d_year, d_qoy, i_category, ws_ext_sales_price ext_sales_price
FROM web_sales, item, date_dim
WHERE ws_ship_customer_sk IS NULL
AND ws_sold_date_sk=d_date_sk
AND ws_item_sk=i_item_sk
UNION ALL
SELECT 'catalog' as channel, 'cs_bill_hdemo_sk' col_name, d_year, d_qoy, i_category, cs_ext_sales_price ext_sales_price
FROM catalog_sales, item, date_dim
WHERE cs_bill_hdemo_sk IS NULL
AND cs_sold_date_sk=d_date_sk
AND cs_item_sk=i_item_sk) foo
GROUP BY channel, col_name, d_year, d_qoy, i_category
ORDER BY channel, col_name, d_year, d_qoy, i_category
limit 10

针对tpc-ds的sql,单表查询聚合函数多个联合使用,多表关联太多且又使用复杂的聚合函数。测试下来总是囫囵吞枣,很难侦查到有无alluxio时性能的变化,我觉得需要更细粒度的单表测试。

2、单表单聚合函数测试

表名:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37

```sql
CREATE TABLE `store_sales`(
`ss_sold_time_sk` bigint,
`ss_item_sk` bigint,
`ss_customer_sk` bigint,
`ss_cdemo_sk` bigint,
`ss_hdemo_sk` bigint,
`ss_addr_sk` bigint,
`ss_store_sk` bigint,
`ss_promo_sk` bigint,
`ss_ticket_number` bigint,
`ss_quantity` int,
`ss_wholesale_cost` decimal(7,2),
`ss_list_price` decimal(7,2),
`ss_sales_price` decimal(7,2),
`ss_ext_discount_amt` decimal(7,2),
`ss_ext_sales_price` decimal(7,2),
`ss_ext_wholesale_cost` decimal(7,2),
`ss_ext_list_price` decimal(7,2),
`ss_ext_tax` decimal(7,2),
`ss_coupon_amt` decimal(7,2),
`ss_net_paid` decimal(7,2),
`ss_net_paid_inc_tax` decimal(7,2),
`ss_net_profit` decimal(7,2))
PARTITIONED BY (
`ss_sold_date_sk` bigint)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
LOCATION
'alluxio://p1:19998/user/hive/warehouse/tpcds_bin_partitioned_orc_50.db/store_sales'
TBLPROPERTIES (
'transient_lastDdlTime'='1574907136')

聚合查询

每个查询三次,最后一列为平均时间,单位毫秒。

  • presto+hdfs (sum)
1
2
3
20381	19798	19916	20031
19213 19564 19855 19544
20350 20671 19432 20151
  • presto+hdfs+alluxio(sum)
1
2
3
6479	6711	6816	6668
6669 7759 6179 6869
6465 7507 7567 7179
  • presto+hdfs (count)
1
2
3
19920	20434	19285	19879
19583 19369 19004 19318
19988 20062 20294 20114
  • presto+hdfs+alluxio(count)
1
2
3
6194	5739	5638	5857
6291 6482 5496 6089
6275 5651 5440 5788
  • presto+hdfs (count(xxx))
1
2
3
21166	18758	20217	20047
20514 19281 20184 19993
20574 19703 19994 20090
  • presto+hdfs+alluxio (count(xxx))
1
2
3
6203	6268	5857	6109
6484 6443 5758 6228
5432 6173 5811 5805
  • presto+hdfs (order by)
1
2
3
22183	20706	21496	21461
21398 20386 20828 20870
20357 20878 21721 20985
  • presto+hdfs+alluxio (order by )
1
2
3
14981	15045	14967	14997
14141 14594 14949 14561
14379 15013 14243 14545
  • presto+hdfs (avg)
1
2
3
21916	20552	19876	22114
19675 19231 20083 19663
20904 19721 20155 20260
  • presto+hdfs+alluxio (avg)
1
2
3
6555	5928	6317	6266
6514 6161 6255 6310
5856 6365 6586 6269

3、总结

单表常规聚合, 有alluxio性能大约2-3倍的提升,presto对于join的查询不擅长,join做分析性能没什么提升,具体也可以看我提交的issue。另外对于多个聚合函数的测试可能也会干扰测试结果,建议大家单独的测试,alluxio+presto 做adhoc对sql是有要求的,不是有缓存了,性能就提升了,需要仔细甄别。

presto

发表于 2020-02-05 | 分类于 查询引擎 | 评论数: | 阅读次数:

Presto

1、简介

Hadoop提供了大数据存储和计算一套解决方案,完美地解决了大数据的存储和计算问题。但是Hadoop提供的Map-Reduce计算框架,适用于大数据量的离线和批量计算,它关注的吞吐量不是计算效率,在大数据量快速实时Ad-Hoc查询计算上表现很不友好。继Hive后,facebook公司在2012年开始开发Presto,与2013年正式开源,给Ad-Hoc查询带来了一股清凉的春风。Presto基于Java语言开发,在多数据源支持上,易用性,可扩展性上搜事大数据实时查询计算产品中的佼佼者。

2、特性

特点 说明
多数据源 目前Presto支持Mysql、PostgreSQL、Cassandra、Hive、Kafa等
SQL 完全兼容ANSI SQL,并提供sql shell 给用户
扩展性 基于SPI机制,易开发特定数据源连接器Connector
混合计算 多数据的混合计算
高性能 Presto官方测试是Hive性能的10倍以上
PipeLine 终端用户不用等待结果集处理完才看到结果,从一开始计算就可以产出一部分结果给终端

3、基本概念

3.1 服务进程

  • Coordinator

一般Coordinator部署在集群中一个单独节点,是整个Presto集群的管理节点。它主要用户接受客户端提交的查询,解析查询并生成查询计划,对任务进行调度,对worker管理。

  • Worker

工作节点,一般多个,主要进行数据的处理和Task的执行,它会周期性的向Coordinator进行Restful “沟通”,即心跳。

3.2 Presto模型

名称 说明
Connector Presto是通过Connector来访问不同的数据源的,可以将Connector当作访问不同数据源的驱动程序,每种Connector都实现了Presto里面的SPI接口。当需要使用某种Connnector时候需要在${PRESTO_HOME}/etc/catalog中配置。
Catalog 类似我们常规关系数据库Mysql里面实列。
Schema 类似我们常规关系数据库Mysql里面的database。
Table 类似我们常规关系数据库Mysql里面的table。

3.3 Presto查询模型

3.3.1 模型术语

  • Statement

客户端输入的SQL。

  • Query

客户端输入的SQL在Presto内部的表述(实例)。由Stage、Task、Driver、Split、Operator、DataSource组成。

  • Stage

Query的组成部分,多个有层级关系的Stage组合成Query。

  • Exchage

Stage之间是通过Exchage来相互连接,完成数据交换。

  • Task

正真运行在Worker上的,是Stage的逻辑上的拆分。

  • Driver

组成Task的单位,操作的集合。一个Driver处理一个Split,并生成输入输出。

  • Operator

具体作用在Split上的操作,如:过滤、加权、转换等。

  • Split

分片,大的数据集中一个小的数据集。

3.3.1 查询过程

整个过程大致分为7步

  • 客户端通过Http请求发送查询语句给Coordinator。
  • Coordinator解析查询语句,生成查询计划。根据查询计划依次会生成:SqlQueryExecution、SqlStageExecution、HttpRemoteTask。
  • 分发任务到各个Worker,全程是通过HttpRemoteTask里面的HttpClient将创建或者更新Task的请求发送到数据所在的节点。节点上的TaskResource接收到请求后在worker上启动/更新SqlTaskExecution对象,然后处理Split。
  • 上游(距离数据源近的)Stage里面的Task,通过各种Connector从数据源里面读取数据。
  • 下游Stage里面的Task读取上游Stage里输出的结果,拿到数据后在内存里面做计算和处理。
  • Coordinator从分发Task后就可以从最顶层的Stage(Single Stage)里面获取Task计算结果,缓存到buffer,直到所有计算结束。
  • Client从提交语句之后,会不停从Coordinator中获取本次查询的结果,不是等到查询结果都产生完毕才获取显示。

4、查询流程解析

presto 构建Restful服务使用的airlift框框。

4.1 客户端发起请求

1
2
3
4
5
6
7
8
9
10
11
12
public final class Presto {
private Presto() {
}
public static void main(String[] args) {
Console console = singleCommand(Console.class).parse(args);
if (console.helpOption.showHelpIfRequested() ||
console.versionOption.showVersionIfRequested()) {
return;
}
System.exit(console.run() ? 0 : 1);
}
}

4.2 执行客户端命令

类:Console

1
2
3
4
5
6
7
	public boolean run() {
...
if (hasQuery) {
return executeCommand(queryRunner, query, clientOptions.outputFormat, clientOptions.ignoreErrors);
}
...
}

4.3 发送Http请求

类:StatementClientV1

1
2
3
4
5
6
7
8
9
10
11
12
private Request buildQueryRequest(ClientSession session, String query)
{
HttpUrl url = HttpUrl.get(session.getServer());
if (url == null) {
throw new ClientException("Invalid server URL: " + session.getServer());
}
// 注意路径
url = url.newBuilder().encodedPath("/v1/statement").build();
Request.Builder builder = prepareRequest(url).post(RequestBody.create(MEDIA_TYPE_TEXT, query));
...
return builder.build();
}

4.4 Coordinator处理请求

类:StatementResource

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Path("/v1/statement")
public class StatementResource {
@POST
@Produces(MediaType.APPLICATION_JSON)
public Response createQuery(
String statement,
@HeaderParam(X_FORWARDED_PROTO) String proto,
@Context HttpServletRequest servletRequest,
@Context UriInfo uriInfo) {
...
// 核心
Query query = Query.create(
sessionContext,
statement,
queryManager,
sessionPropertyManager,
exchangeClient,
responseExecutor,
timeoutExecutor,
blockEncodingSerde);
queries.put(query.getQueryId(), query);
// 获取结果
QueryResults queryResults = query.getNextResult(OptionalLong.empty(), uriInfo, proto, DEFAULT_TARGET_RESULT_SIZE);
...
return toResponse(query, queryResults);
}
}

后面逻辑较多,具体怎么个处理,有兴趣的可以参考:一条sql如何倍presto执行的。

5、Hive Connector

Presto查询过程和Hive不同,它是从Hive-Metastore里面获取表的元数据信息和与之关联的数据文件信息,然后利用HdfsEnvironment操作HDFS文件。

  • persto配置的hive.properties文件

    1
    2
    3
    4
    5
    6
    connector.name=hive-hadoop2
    hive.metastore.uri=thrift://p1:9083
    hive.metastore-cache-ttl=0s
    hive.metastore-refresh-interval=0s
    // hdfs信息相关配置文件位置
    hive.config.resources=/etc/hive/conf/core-site.xml,/etc/hive/conf/hdfs-site.xml
  • 获取Hive Metastore元数据

    类:HiveMetastoreClientFactory

    1
    2
    3
    public HiveMetastoreClient create(HostAndPort address)throws TTransportException {
    return new ThriftHiveMetastoreClient(Transport.create(address, sslContext, socksProxy, timeoutMillis, metastoreAuthentication));
    }
  • HiveMetastoreClient 里面方法,curd操作

在这里插入图片描述

  • 加载元数据

类:CachingHiveMetastore

1
2
3
4
5
6
7
8
9
10
@ThreadSafe
public class CachingHiveMetastore implements ExtendedHiveMetastore{
//这里面有2个关键配置项,其他配置可以参看MetastoreClientConfig里面配置
// hive.metastore-refresh-interval,默认0
// hive.metastore-cache-ttl,默认0
public CachingHiveMetastore(...){
// 主要的逻辑都是在构造函数里面完成的,太多了,省略了
...
}
}
  • 获取FileSystem
1
2
3
4
5
6
7
8
9
public FileSystem getFileSystem(String user, Path path, Configuration configuration)
throws IOException {
return hdfsAuthentication.doAs(user, () -> {
// 从我们配置的hive的catalog文件里面读取hdfs相关信息
FileSystem fileSystem = path.getFileSystem(configuration);
fileSystem.setVerifyChecksum(verifyChecksum);
return fileSystem;
});
}

6、插件开发

6.1 Connector开发

Presto目前支持很多的数据源,数据源都是以plugin形式添加的。下面是我安装的presto里面的plugin

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
drwxr-xr-x 2 root root 4096 Oct 29 13:34 tpch
drwxr-xr-x 2 root root 4096 Oct 29 13:34 mongodb
drwxr-xr-x 2 root root 4096 Oct 29 13:34 session-property-managers
drwxr-xr-x 2 root root 4096 Oct 29 13:34 postgresql
drwxr-xr-x 2 root root 4096 Oct 29 13:34 kudu
drwxr-xr-x 2 root root 4096 Oct 29 13:34 geospatial
drwxr-xr-x 2 root root 4096 Oct 29 13:34 example-http
drwxr-xr-x 2 root root 4096 Oct 29 13:34 localfile
drwxr-xr-x 2 root root 4096 Oct 29 13:34 mysql
drwxr-xr-x 2 root root 4096 Oct 29 13:34 atop
drwxr-xr-x 2 root root 4096 Oct 29 13:34 accumulo
drwxr-xr-x 2 root root 4096 Oct 29 13:34 sqlserver
drwxr-xr-x 2 root root 4096 Oct 29 13:34 redis
drwxr-xr-x 2 root root 4096 Oct 29 13:34 presto-elasticsearch
drwxr-xr-x 2 root root 4096 Oct 29 13:34 password-authenticators
drwxr-xr-x 2 root root 4096 Oct 29 13:34 blackhole
drwxr-xr-x 2 root root 4096 Oct 29 13:34 redshift
drwxr-xr-x 2 root root 4096 Oct 29 13:34 kafka
drwxr-xr-x 2 root root 4096 Oct 29 13:34 jmx
drwxr-xr-x 2 root root 4096 Oct 29 13:34 teradata-functions
drwxr-xr-x 2 root root 4096 Oct 29 13:34 resource-group-managers
drwxr-xr-x 2 root root 4096 Oct 29 13:34 presto-thrift
drwxr-xr-x 2 root root 4096 Oct 29 13:34 memory
drwxr-xr-x 2 root root 4096 Oct 29 13:34 tpcds
drwxr-xr-x 2 root root 4096 Oct 29 13:34 raptor
drwxr-xr-x 2 root root 4096 Oct 29 13:34 ml
drwxr-xr-x 2 root root 4096 Oct 29 13:34 cassandra
drwxr-xr-x 2 root root 4096 Nov 25 21:39 hive-hadoop2

步骤

源代码里面有presto-example-http 的自定义connector代码示例。

  • 添加插件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
presto-main/etc/config.properties下

plugin.bundles=\
../presto-blackhole/pom.xml,\
../presto-memory/pom.xml,\
../presto-jmx/pom.xml,\
../presto-raptor/pom.xml,\
../presto-hive-hadoop2/pom.xml,\
//*******官方示例 example-http *********
../presto-example-http/pom.xml,\
../presto-kafka/pom.xml, \
../presto-tpch/pom.xml, \
../presto-local-file/pom.xml, \
../presto-mysql/pom.xml,\
../presto-sqlserver/pom.xml, \
../presto-postgresql/pom.xml, \
../presto-tpcds/pom.xml
  • 需要实现的接口
1
2
3
4
5
6
7
8
9
10
11
12
13
// presto 会加载所有实现了Plugin的实现类
Plugin
// 实例化Connector工厂
ConnectorFactory
// connector连接器,里面有metadata、splitManager、recordSetProvider等实例
Connector
// 元数据管理(schemas、tables等)
ConnectorMetadata
// 数据分片
ConnectorSplitManager
// 数据读取类
ConnectorRecordSetProvider
...
  • 示例代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// 插件
public class ExamplePlugin implements Plugin {
@Override
public Iterable<ConnectorFactory> getConnectorFactories() {
return ImmutableList.of(new ExampleConnectorFactory());
}
}

// 连接器工厂
public class ExampleConnectorFactory implements ConnectorFactory {
...
@Override
public Connector create(String catalogName, Map<String, String> requiredConfig, ConnectorContext context) {
...
// 获取Connector 实例
return injector.getInstance(ExampleConnector.class);
}
}

// 连接器,里面包含元数据管理,数据分片,数据读取类 等实例变量
public class ExampleConnector implements Connector{
private static final Logger log = Logger.get(ExampleConnector.class);

private final LifeCycleManager lifeCycleManager;
private final ExampleMetadata metadata;
private final ExampleSplitManager splitManager;
private final ExampleRecordSetProvider recordSetProvider;
// get/set 方法 ...
}

6.2 函数开发

Presto自定义了很多函数,在

1
2
3
4
5
6
7
8
9

* 函数分类:

* 标量函数(scalar)
* 聚合函数(aggregation)
* 开窗函数(window)


* 代码示例

// 标量函数

@Description(“return array containing elements that match the given predicate”)//函数描述
@ScalarFunction(value = “filter”, deterministic = false)
public final class ArrayFilterFunction{

@TypeParameter("T")
@TypeParameterSpecialization(name = "T", nativeContainerType = long.class) // 参数类型
@SqlType("array(T)")// 返回函数类型
public static Block filterLong(
        @TypeParameter("T") Type elementType,
        @SqlType("array(T)") Block arrayBlock,
        @SqlType("function(T, boolean)") FilterLongLambda function){
     ...
     实现逻辑
     ...
    BlockBuilder resultBuilder = elementType.createBlockBuilder(null, positionCount);
    return resultBuilder.build();
}

}

// 开窗函数

@WindowFunctionSignature(name = “rank”, returnType = “bigint”)
public class RankFunction extends RankingWindowFunction{
private long rank;
private long count;

@Override
public void reset(){
    rank = 0;
    count = 1;
}
@Override
public void processRow(BlockBuilder output, boolean newPeerGroup, int peerGroupCount, int currentPosition){
    if (newPeerGroup) {
        rank += count;
        count = 1;
    }
    else {
        count++;
    }
    BIGINT.writeLong(output, rank);
}

}

// 聚合函数

@AggregationFunction(“count”)
public final class CountAggregation {
private CountAggregation() {
}
// 数据在每个worker上分片输入
@InputFunction
public static void input(@AggregationState LongState state) {
state.setLong(state.getLong() + 1);
}
// worker间数据聚合
@CombineFunction
public static void combine(@AggregationState LongState state, @AggregationState LongState otherState) {
state.setLong(state.getLong() + otherState.getLong());
}
// 规约输出结果
@OutputFunction(StandardTypes.BIGINT)
public static void output(@AggregationState LongState state, BlockBuilder out) {
BIGINT.writeLong(out, state.getLong());
}
}

1
2
3
4
5
6
7
8
9
10
11
12

## 7、性能优化


* Order by + limit

order by 会扫描worker上表数据,耗内存,结合limit 使用减少计算消耗。


* Group By

需要分组的字段,distinct后数量倒序放置

select count(1),uid,age from user_sales group by uid,age;

1
2
3
4

* 使用模糊聚合函数

>用精准误差换性能。

//不推荐
select count(distinct(age)) from user_sales;
//推荐
select count(approx_distinct(age)) from user_sales;

1
2
3


* 合并like

//不推荐
select from user_sales where name like ‘%a%’ or name like ‘%b%’ or name like ‘%c%’;
//推荐
select
from user_sales where name regexp_like(name, a|b|c);

`

  • 大表 join 小表

Presto 使用的是Distributed Hash Join,优先将右表进行hash分区的数据配发到集群中所有woker上(请确保此时的woker内存>右表大小),然后才将左表hash的分区依次传输到相应的集群节点上。即,右表数据的分区会先全部到分布到所有计算节点,左表hash分区是流式传输到相应的计算节点。

  • 关闭 Distributed Hash Join

如果数据存在倾斜,hash join 性能急剧下降,此时可以通过 `set session distributes_join=’false’关闭hash join。那么2表就不会进行hash重分布,右表会被广播到左表Source Stage 的每个节点做join。

  • 使用ORC存储和snappy压缩

针对hive创建表,建议使用orc格式存储,preto在代码层面对orc格式做了很多优化。 数据压缩可以减少节点间数据传输对IO带宽压力,对于即席查询需要快速解压,snappy比较合适。

日常用的一些命令,包括linux,git

发表于 2019-12-05 | 更新于 2020-02-05 | 分类于 命令 | 评论数: | 阅读次数:

总结下自己常用命令
一.linux
1.ssh登陆服务器
ssh username@Ip

2.ssh免密码
查看ssh目录,没有创建sshkey
cd ~/.ssh 若没有:ssh-keygen -t rsa (连续三次回车,即在本地生成了私钥(id_rsa) 与公钥 (id_rsa.pub),不设置密码)
copy本地公钥到服务器 scp ~/.ssh/id_rsa.pub root@192.168.56.210:/root/.ssh/authorized_keys
或者ssh master cat /root/.ssh/authorized_keys > /root/.ssh/authorized_keys

3.查看文件大小
du -h 文件ls -lht

4.查看80端口使用情况
lsof -i:80

5.开启80端口
/sbin/iptables -I INPUT -p tcp –dport 80 -j ACCEPT

6.更改文件所属
chown [-R] 账号名称 文件或目录
chown [-R] 账号名称:用户组名称 文件或目录

7.查看tomcat线程
ps -ef | grep tomcat

8.删除用户
userdel -r name 删除用户和文件夹
userdel XX
groupdel XX

9.查看CPU
lscpu
cat /proc/cpuinfo 或者cat /proc/cpuinfo | grep name | cut -f2 -d: | uniq -c (筛选出来信息简单)

二.git
1.开始使用git
1) git config –global user.name “yourname”
2) git config –global user.email “youremail”
3) ssh-keygen -t rsa -C “youremail”
4) 将~/.ssh目录下生成的id_rsa.pub文件中的key放到服务器上
如果已经存在id_rsa文件,可以生成自己的文件,但是需要在~/.ssh/中加一个config文件,内容如下:
Host git.*.com
IdentityFile ~/.ssh/git_id_rsa(自定义的文件名)
User yourEmail
5) 使用ssh方式迁出项目: git clone ssh地址

2.git 常用别名配置
git config –global alias.co checkout
git config –global alias.br branch
git config –global alias.ci commit
git config –global alias.st status
git config –global alias.unstage “reset HEAD –”
git config –global alias.lg “log –graph –pretty=format:’%Cred%h %Creset - %s %Cgreen(%ad) %Creset %Cblue <%an>’ –date=iso-local”

3.git log美化输出,更好看,更直观
git log –graph –pretty=format:”%Cred%h %Creset - %s %Cgreen(%ad) %Creset %Cblue <%an>” –date=iso-local

4.基本操作
迁出代码 —— git pull
查看代码状态 —— git status
提交代码 —— git commit -a -m “message”
推送代码到服务器 —— git push
添加代码到待提交区 —— git add fileName git add可以有很多含义,比如解决冲突等

5.分支操作常用命令
查看全部分支 —— git branch -a
切换到远程分支 —— git checkout -b 分支本地别名 分支远程名字(上一步看到的名字) 如:git checkout -b dev remotes/origin/dev (本地看不到远程分支的时候需要git pull 一下)
创建本地分支 —— git chckout -b 本地分支名称
切换分支 —— git checkout 分支名(只能是本地分支名)
推送本地分支到服务器 —— git push origin 本地分支名:远程分支名(远程没有就自动创建)
删除本地分支 —— git branch -d 分支名(用于删除已经合并的分支) git branch -D 分支名(删除未合并的分支)
删除远程分支 —— git push origin –delete 远程分支名
合并分支 —— 先切换到主分支,git merge 要合并的分支名
更新代码 —— git pull origin 分支名
推送代码到分支 —— git push origin 分支名

6.标签
打附注标签 —— git tag -a 标签名 -m 注释 例:git tag -a v1.1 -m “version 1.1.0” 附注标签包含作者时间等详细信息,建议打附注标签
查看标签信息 —— git show 标签名 只对附注标签有效
打轻量标签 —— git tag 标签名 只适合作为临时标签使用
根据logID打标签 —— git tag -a 标签名 logId中的一段 例:git tag -a v1.2 9fceb02
推送标签到远程服务器 —— git push origin 标签名
推送所有本地标签到远程服务器 —— git push origin –tags
检出标签 —— git checkout -b 本地标签名 远程标签名 注:git并不能真正的检出标签,而是在标签上新建一个分支

观察者模式

发表于 2019-11-15 | 更新于 2020-02-05 | 分类于 设计模式 | 评论数: | 阅读次数:

观察者模式

当对象间存在一对多关系时,则使用观察者模式(Observer Pattern)。比如,当一个对象被修改时,则会自动通知它的依赖对象。观察者模式属于行为型模式

介绍

  • 意图:定义对象间的一种一对多的依赖关系,当一个对象的状态发生改变时,所有依赖于它的对象都得到通知并被自动更新。
  • 主要解决:一个对象状态改变给其他对象通知的问题,而且要考虑到易用和低耦合,保证高度的协作。
  • 何时使用:一个对象(目标对象)的状态发生改变,所有的依赖对象(观察者对象)都将得到通知,进行广播通知。
  • 如何解决:使用面向对象技术,可以将这种依赖关系弱化。
  • 关键代码:在抽象类里有一个 ArrayList 存放观察者们。
  • 应用实例: 1、拍卖的时候,拍卖师观察最高标价,然后通知给其他竞价者竞价。 2、西游记里面悟空请求菩萨降服红孩儿,菩萨洒了一地水招来一个老乌龟,这个乌龟就是观察者,他观察菩萨洒水这个动作。
  • 优点: 1、观察者和被观察者是抽象耦合的。 2、建立一套触发机制。
  • 缺点: 1、如果一个被观察者对象有很多的直接和间接的观察者的话,将所有的观察者都通知到会花费很多时间。 2、如果在观察者和观察目标之间有循环依赖的话,观察目标会触发它们之间进行循环调用,可能导致系统崩溃。 3、观察者模式没有相应的机制让观察者知道所观察的目标对象是怎么发生变化的,而仅仅只是知道观察目标发生了变化。

例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
// 抽象观察者
public abstract class Observer {
public abstract void update();
}
// 具体被观察者
public class User implements Observer {

private String name;
private String message;

public User(String name) {
this.name = name;
}

@Override
public void update(String message) {
this.message = message;
read();
}

public void read() {
System.out.println(name + " 收到推送消息: " + message);
}

}

// 抽象被观察者
public interface Observerable {
public void registerObserver(Observer o);
public void removeObserver(Observer o);
public void notifyObserver();

}

// 具体被观察者
public class WechatServer implements Observerable {

//注意到这个List集合的泛型参数为Observer接口,设计原则:面向接口编程而不是面向实现编程
private List<Observer> list;
private String message;

public WechatServer() {
list = new ArrayList<Observer>();
}

@Override
public void registerObserver(Observer o) {
list.add(o);
}

@Override
public void removeObserver(Observer o) {
if(!list.isEmpty())
list.remove(o);
}

//遍历
@Override
public void notifyObserver() {
for(int i = 0; i < list.size(); i++) {
Observer oserver = list.get(i);
oserver.update(message);
}
}

public void setInfomation(String s) {
this.message = s;
System.out.println("微信服务更新消息: " + s);
//消息更新,通知所有观察者
notifyObserver();
}

}

核心思路

  • 观察者
  • 被观察者(里面包含观察者引用)

命令模式

发表于 2019-11-11 | 更新于 2020-02-05 | 分类于 设计模式 | 评论数: | 阅读次数:

命令模式

命令模式(Command Pattern)是一种数据驱动的设计模式,它属于行为型模式。请求以命令的形式包裹在对象中,并传给调用对象。调用对象寻找可以处理该命令的合适的对象,并把该命令传给相应的对象,该对象执行命令。

介绍

  • 意图:将一个请求封装成一个对象,从而使您可以用不同的请求对客户进行参数化。
  • 主要解决:在软件系统中,行为请求者与行为实现者通常是一种紧耦合的关系,但某些场合,比如需要对行为进行记录、撤销或重做、事务等处理时,这种无法抵御变化的紧耦合的设计就不太合适。
  • 关键代码:定义三个角色:1、Receiver 真正的命令执行对象 2、Command 命令3、Invoker 命令调用协调者
  • 流程:调用者→接受者→命令。
  • 优点: 1、降低了系统耦合度。 2、新的命令可以很容易添加到系统中去。
  • 缺点:使用命令模式可能会导致某些系统有过多的具体命令类。

例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
//抽象命令
public abstract class Command {

public abstract void execute();

}
//具体命令1
public class ConcreteCommand1 extends Command {
private Receiver receiver;

public ConcreteCommand1(Receiver receiver) {
this.receiver = receiver;
}

@Override
public void execute() {
receiver.doSomething();
}
}
// 具体命令2
public class ConcreteCommand2 extends Command {
private Receiver receiver;

public ConcreteCommand2(Receiver receiver) {
this.receiver = receiver;
}

@Override
public void execute() {
receiver.doSomething();
}
}
// 具体命令执行者抽象类
public abstract class Receiver {

public abstract void doSomething();
}
//具体命令执行者1
public class Receiver1 extends Receiver {
@Override
public void doSomething() {

}
}
//具体命令执行者2
public class Receiver2 extends Receiver {
@Override
public void doSomething() {

}
}
//命令调用的协调者
public class Invoker {
private Command command;

public void setCommand(Command command) {
this.command = command;
}

public void action() {
command.execute();
}
}
//test方法
public class Main {
public static void main(String[] args){
Receiver receiver = new Receiver1();
Command command = new ConcreteCommand1(receiver);

Invoker invoker = new Invoker();
invoker.setCommand(command);
invoker.action();
}
}

核心思路

  • 执行者抽象
  • 命令抽象(里面包含命令的执行者)
  • 事务协调者(持有命令对象)

建造者模式

发表于 2019-11-10 | 更新于 2020-02-05 | 分类于 设计模式 | 评论数: | 阅读次数:

建造者模式

建造者模式(Builder Pattern)使用多个简单的对象一步一步构建成一个复杂的对象。这种类型的设计模式属于创建型模式,它提供了一种创建对象的最佳方式。
一个 Builder 类会一步一步构造最终的对象。该 Builder 类是独立于其他对象的。

介绍
  • 意图:将一个复杂的构建与其表示相分离,使得同样的构建过程可以创建不同的表示。
  • 主要解决:主要解决在软件系统中,有时候面临着”一个复杂对象”的创建工作,其通常由各个部分的子对象用一定的算法构成;由于需求的变化,这个复杂对象的各个部分经常面临着剧烈的变化,但是将它们组合在一起的算法却相对稳定。
  • 何时使用:一些基本部件不会变,而其组合经常变化的时候。
  • 如何解决:将变与不变分离开。
  • 关键代码:建造者:创建和提供实例,导演:管理建造出来的实例的依赖关系。
  • 应用实例: 1、去肯德基,汉堡、可乐、薯条、炸鸡翅等是不变的,而其组合是经常变化的,生成出所谓的”套餐”。 2、JAVA 中的 StringBuilder。
  • 优点: 1、建造者独立,易扩展。 2、便于控制细节风险。
  • 缺点: 1、产品必须有共同点,范围有限制。 2、如内部变化复杂,会有很多的建造类。

例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
public abstract class AbstractBuilder {

abstract void buildPart1();

abstract void buildPart2();

abstract void buildPart3();
}

public class BuilderImpl extends AbstractBuilder {
@Override
void buildPart1() {
System.out.println("构建对象第一步");
}

@Override
void buildPart2() {
System.out.println("构建对象第二步");
}

@Override
void buildPart3() {
System.out.println("构建对象第三步");
}
}

public class Director {

private AbstractBuilder builder;

private Integer condition;

public Director(AbstractBuilder builder, Integer condition) {
this.builder = builder;
this.condition = condition;
}

public void build(AbstractBuilder builder) {
switch (condition) {
case 1:
builder.buildPart1();
break;
case 2:
builder.buildPart2();
break;
case 3:
builder.buildPart3();
break;
default:
builder.buildPart1();
}
}
}

核定思路

  • 对象构建需要若干步骤
  • 构建抽象类,抽象每一步
  • 一个协调者负责协调构建的每一步

抽象工厂模式

发表于 2019-11-05 | 更新于 2020-02-05 | 分类于 设计模式 | 评论数: | 阅读次数:

抽象工厂模式

抽象工厂模式(Abstract Factory Pattern)是围绕一个超级工厂创建其他工厂。该超级工厂又称为其他工厂的工厂。这种类型的设计模式属于创建型模式,它提供了一种创建对象的最佳方式。
在抽象工厂模式中,接口是负责创建一个相关对象的工厂,不需要显式指定它们的类。每个生成的工厂都能按照工厂模式提供对象。

介绍

  • 意图:提供一个创建一系列相关或相互依赖对象的接口,而无需指定它们具体的类。
  • 主要解决:主要解决接口选择的问题。
  • 何时使用:系统的产品有多于一个的产品族,而系统只消费其中某一族的产品。
  • 如何解决:在一个产品族里面,定义多个产品。
  • 关键代码:在一个工厂里聚合多个同类产品。
  • 应用实例:工作了,为了参加一些聚会,肯定有两套或多套衣服吧,比如说有商务装(成套,一系列具体产品)、时尚装(成套,一系列具体产品),甚至对于一个家庭来说,可能有商务女装、商务男装、时尚女装、时尚男装,这些也都是成套的,即一系列具体产品。
  • 优点:当一个产品族中的多个对象被设计成一起工作时,它能保证客户端始终只使用同一个产品族中的对象。
  • 缺点:产品族扩展非常困难,要增加一个系列的某一产品,既要在抽象的 Creator 里加代码,又要在具体的里面加代码。

例子

实例化若干一个创建不同颜色的形状的图形,颜色:一个工厂来创建;形状:一个工厂来创建。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62

public abstract class AbstractFactory {
public abstract Color getColor(String color);
public abstract Shape getShape(String shape) ;
}

public class ColorFactory extends AbstractFactory {

@Override
public Shape getShape(String shapeType) {
return null;
}

@Override
public Color getColor(String color) {
if (color == null) {
return null;
}
if (color.equalsIgnoreCase("RED")) {
return new Red();
} else if (color.equalsIgnoreCase("GREEN")) {
return new Green();
} else if (color.equalsIgnoreCase("BLUE")) {
return new Blue();
}
return null;
}
}

public class ShapeFactory extends AbstractFactory{
@Override
public Color getColor(String color) {
return null;
}

@Override
public Shape getShape(String shapeType) {
if(shapeType == null){
return null;
}
if(shapeType.equalsIgnoreCase("CIRCLE")){
return new Circle();
} else if(shapeType.equalsIgnoreCase("RECTANGLE")){
return new Rectangle();
} else if(shapeType.equalsIgnoreCase("SQUARE")){
return new Square();
}
return null;
}
}

public class FactoryProducer {

public static AbstractFactory getFactory(String choice) {
if (choice.equalsIgnoreCase("SHAPE")) {
return new ShapeFactory();
} else if (choice.equalsIgnoreCase("COLOR")) {
return new ColorFactory();
}
return null;
}
}

核心思路

  • 一个综合产品是由多个产品组成
  • 每个产品由一个工厂来生产
  • 抽象一个大工厂作为每个产品工厂的工厂
12…4

安迪·梵

keep it simple and stupid
34 日志
13 分类
4 标签
RSS
E-Mail GitHub Weibo
© 2020 安迪·梵
本站总访问量 次 | 有人看过我的博客