Redis常见应用

1、分布式锁

分布式锁本质上要实现的目标就是在 Redis 里面占一个“茅坑”,当别的进程也要来占时,发现已经有人蹲在那里了,就只好放弃或者稍后再试。

A、几种方式

  • setnx (set if not exit)
1
2
3
4
5
127.0.0.1:6379> setnx lock true
(integer) 1
127.0.0.1:6379> del lock
(integer) 1
127.0.0.1:6379>

但是有个问题,如果逻辑执行到中间出现异常了,可能会导致 del 指令没有被调用,这样就会陷入死锁,锁永远得不到释放。

  • setnx + expire
1
2
3
4
5
127.0.0.1:6379> setnx lock true
(integer) 1
127.0.0.1:6379> expire lock 5
(integer) 1
127.0.0.1:6379>

但是以上逻辑还有问题。如果在 setnx 和 expire 之间服务器进程突然挂掉了,可能是因为机器掉电或者是被人为杀掉的,就会导致 expire 得不到执行,也会造成死锁。为了解决这个疑难,Redis 开源社区涌现了一堆分布式锁的 library,专门用来解决这个问题。实现方法极为复杂。

  • set [ex seconds] [nx]
1
2
3
4
5
127.0.0.1:6379> set lock  true ex 5 nx
OK
127.0.0.1:6379> del lock
(integer) 1
127.0.0.1:6379>

Redis 2.8 版本中作者加入了 set 指令的扩展参数,使得 setnx 和 expire 指令可以一起执行,彻底解决了分布式锁的乱象。从此以后所有的第三方分布式锁 library 可以休息了。

B、超时问题

Redis 的分布式锁不能解决超时问题,如果在加锁和释放锁之间的逻辑执行的太长,以至于超出了锁的超时限制,就会出现问题。因为这时候第一个线程持有的锁过期了,临界区的逻辑还没有执行完,这个时候第二个线程就提前重新持有了这把锁,导致临界区代码不能得到严格的串行执行。

  • 最佳实践

    • 不要执行较长时间的任务
    • set 的value设置一个随机数,释放锁时先匹配随机数是否一致,然后再删除 key。
    • 匹配 value 和删除 key 不是一个原子操作
    • lua 脚本解决原子性,以python代码为例

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      tag = random.nextint()  # 随机数
      if redis.set(key, tag, nx=True, ex=5):
      do_something()
      redis.delifequals(key, tag) # 自定义的 delifequals 指令

      # delifequals
      if redis.call("get",KEYS[1]) == ARGV[1] then
      return redis.call("del",KEYS[1])
      else
      return 0
      end

这也不是一个完美的方案,它只是相对安全一点,因为如果真的超时了,当前线程的逻辑没有执行完,其它线程也会乘虚而入。

C、可重入性

可重入性是指线程在持有锁的情况下再次请求加锁,如果一个锁支持同一个线程的多次加锁,那么这个锁就是可重入的。如 Java 里的 ReentrantLock 就是可重入锁。Redis 分布式锁如果要支持可重入,需要对客户端的 set 方法进行包装,我用线程的 Threadlocal 变量存储当前持有锁的计数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import redis.clients.jedis.Jedis;

public class RedisWithReentrantLock {

private ThreadLocal<Map<String, Integer>> lockers = new ThreadLocal<>();

private Jedis jedis;

public RedisWithReentrantLock(Jedis jedis) {
this.jedis = jedis;
}

private boolean _lock(String key) {
return jedis.set(key, "", "nx", "ex", 5L) != null;
}

private void _unlock(String key) {
jedis.del(key);
}

private Map<String, Integer> currentLockers() {
Map<String, Integer> refs = lockers.get();
if (refs != null) {
return refs;
}
lockers.set(new HashMap<>());
return lockers.get();
}

public boolean lock(String key) {
Map<String, Integer> refs = currentLockers();
Integer refCnt = refs.get(key);
if (refCnt != null) {
refs.put(key, refCnt + 1);
return true;
}
boolean ok = this._lock(key);
if (!ok) {
return false;
}
refs.put(key, 1);
return true;
}

public boolean unlock(String key) {
Map<String, Integer> refs = currentLockers();
Integer refCnt = refs.get(key);
if (refCnt == null) {
return false;
}
refCnt -= 1;
if (refCnt > 0) {
refs.put(key, refCnt);
} else {
refs.remove(key);
this._unlock(key);
}
return true;
}

public static void main(String[] args) {
Jedis jedis = new Jedis();
RedisWithReentrantLock redis = new RedisWithReentrantLock(jedis);
System.out.println(redis.lock("fan"));
System.out.println(redis.lock("fan"));
System.out.println(redis.unlock("fan"));
System.out.println(redis.unlock("fan"));
}
}

D、redisson

redis 官方推荐的redis分布式锁库,你想要的基本都有,github地址

2、Bitmap位图

位图不是特殊的数据结构,它的内容其实就是普通的字符串,也就是 byte 数组。我们可以使用普通的 get/set 直接获取和设置整个位图的内容,也可以使用位图操作 getbit/setbit 等将 byte 数组看成「位数组」来处理。

  • 统计活跃用户

    • set结构:sadd key,value (uesrId,1),若用户数很大,则key会很多,不推荐
    • 位图操作:setbit key, offset, value (”activeUser”,userId,1),此时userId 需要为整数不重复,且最好连续自增。
  • 操作命令

    • set key value 这个设置key的值(整体设)
    • setbit key offset bit 设置value二进制中某个位置的值
    • getbit key offset 获取value指定位置的值
    • bitcount key [start end] 统计指定位置范围内 1 的个数,注意这里面的start和end是字符索引
    • bitpos key bit [start] [end] 查找指定范围内出现的第一个 0 或 1
    • bitfield key [get type offset] [set type offset value] [incrby type offset increment] 多个连续位操作(最长64位)

3、HyperLogLog

用来做基数统计的算法,HyperLogLog 的优点是,在输入元素的数量或者体积非常非常大时,计算基数所需的空间总是固定 的、并且是很小的。占用内存12K,但是它的精确度不是很高,我们也无法知道某个key是否已经在其中了。

  • 统计uv(如:key为用户id)

    • set结构:sadd key , scard key 如果数据页面uv很大,则不合适了
    • HyperLogLog: pfadd key value ,pfcount key
    • 多个页面的uv合并,可以使用pfmerge
  • set和HyperLogLog统计页面uv,set 的key为suv,HyperLogLog uv 为huv

1
2
3
4
5
6
7
8
127.0.0.1:6379> sadd suv u1
(integer) 1
127.0.0.1:6379> sadd suv u2
(integer) 1
127.0.0.1:6379> sadd suv u3
(integer) 1
127.0.0.1:6379> scard suv
(integer) 3
1
2
3
4
127.0.0.1:6379> pfadd huv u1 u2 u3
(integer) 1
127.0.0.1:6379> pfcount huv
(integer) 3
1
2
3
4
5
6
7
8
9
10
127.0.0.1:6379> pfadd huv u1 u2 u3
(integer) 1
127.0.0.1:6379> pfcount huv
(integer) 3
127.0.0.1:6379> pfadd huv2 u4 u5 u6
(integer) 1
127.0.0.1:6379> pfmerge huv huv2
OK
127.0.0.1:6379> pfcount huv
(integer) 6

HyperLogLog这个数据结构的发明人名字叫Philippe Flajolet ,命令pf为首字母缩写。至于内存占用12K,可以查看这个PPT。

4、BloomFilter布隆过滤器

布隆过滤器可以理解为一个不怎么精确的 set 结构,当你使用它的 contains 方法判断某个对象是否存在时,它可能会误判。当布隆过滤器说某个值存在时,这个值可能不存在;当它说不存在时,那就肯定不存在。

  • 目前布隆过滤器还是基于插件形式,我们直接通过docker 获取带插件的redis

    1
    2
    3
    docker pull redislabs/rebloom
    docker run --name redisBloom -d -p 6379:6379 redislabs/rebloom
    redis-cli
  • 操作命令

    • 单个操作

      1
      2
      3
      4
      5
      6
      7
      8
      9
       127.0.0.1:6379> bf.add news 1
      (integer) 1
      127.0.0.1:6379> bf.add news 2
      (integer) 1
      127.0.0.1:6379> bf.add news 3
      (integer) 1
      127.0.0.1:6379> bf.exists news 1
      (integer) 1
      127.0.0.1:6379> bf.exists news 4
    • 多个操作
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    127.0.0.1:6379> bf.madd news 11 22 33 44 55
    1) (integer) 1
    2) (integer) 1
    3) (integer) 1
    4) (integer) 1
    5) (integer) 1
    127.0.0.1:6379> bf.mexists news 4 5 6 7 8 9
    1) (integer) 1
    2) (integer) 1
    3) (integer) 1
    4) (integer) 1
    5) (integer) 1
    6) (integer) 1
* 布隆过滤器算法应用
  * 新闻推荐系统,推送去重
  * 爬虫url去重
  * 垃圾邮件过滤
  * HBase、Cassandra、LevelDB等使用它过滤不在的查询

* 原理(粗略)


[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FRwmHX90-1570534107291)(/Users/huangfan/Downloads/16464301a0e26416-2.png)]

每个布隆过滤器对应到 Redis 的数据结构里面就是一个大型的位数组和几个不一样的无偏 hash 函数(上图中的f g h)。所谓无偏就是能够把元素的 hash 值算得比较均匀。

* 添加操作:使用多个hash函数对key进行hash计算,得到一个整数索引值,然后再和数组长度取模得到一个位置,再这将这几个位置置为1,就完成了操作。index=hash(key)%/tableSize,table[index]=1。

* 判断key是否存在:把 hash 的几个位置都算出来,看看位数组中这几个位置是否都为 1,只要有一个位为 0,那么说明布隆过滤器中这个 key 不存在。如果都是 1,这并不能说明这个 key 就一定存在,只是极有可能存在,因为这些位被置为 1 可能是因为其它的 key 存在所致。如果这个位数组比较稀疏,判断正确的概率就会很大,如果这个位数组比较拥挤,判断正确的概率就会降低。
* 空间占用估计
     * 输入参数2个:
n为预计元素数量,f为错误率。
1
2
3
4
5
6
     * 输出结果2个:```l,k```,l为位数组长度,k为最佳的hash函数个数。


## 5、限流

* 例子:限制一个用户一段时间发帖频率。关键点用户```userId```,一段时间```period```,发帖这个动作```actionKey```,限制次数```maxCount
* 简单的限流方法,利用zset结构
1
2
3
4
5
6
7
8
9
10
11
12
13
14
 public boolean isActionAllowed(String userId, String actionKey, int period, int maxCount) {
String key = String.format("hist:%s:%s", userId, actionKey);
long nowTs = System.currentTimeMillis();
// 保障批量操作的事务性
Pipeline pipe = jedis.pipelined();
pipe.multi();
pipe.zadd(key, nowTs, "" + nowTs);
pipe.zremrangeByScore(key, 0, nowTs - period * 1000);
Response<Long> count = pipe.zcard(key);
pipe.expire(key, period + 1);
pipe.exec();
pipe.close();
return count.get() <= maxCount;
}
  • 漏斗限流方法
    • 漏斗剩余空间:可持续进行的数量
    • 漏斗流水速率:允许行为最大的频率

使用Java代码实现漏斗限流方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class FunnelRateLimiter {

static class Funnel {
int capacity;
float leakingRate;
int leftQuota;
long leakingTs;

public Funnel(int capacity, float leakingRate) {
this.capacity = capacity;//漏斗容量
this.leakingRate = leakingRate;//漏斗流水速率
this.leftQuota = capacity;//漏斗剩余空间
this.leakingTs = System.currentTimeMillis();//上一次漏水时间
}

void makeSpace() {
long nowTs = System.currentTimeMillis();
long deltaTs = nowTs - leakingTs;//距离上一次漏水过去了多久
int deltaQuota = (int) (deltaTs * leakingRate);//可腾出的空间
if (deltaQuota < 0) { // 间隔时间太长,整数数字过大溢出
this.leftQuota = capacity;
this.leakingTs = nowTs;
return;
}
if (deltaQuota < 1) { // 腾出空间太小,最小单位是1
return;
}
this.leftQuota += deltaQuota;
this.leakingTs = nowTs;
if (this.leftQuota > this.capacity) {
this.leftQuota = this.capacity;
}
}

// quota 倒入漏斗的水容量
boolean watering(int quota) {
makeSpace();
if (this.leftQuota >= quota) {
this.leftQuota -= quota;
return true;
}
return false;
}
}

private Map<String, Funnel> funnels = new HashMap<>();

public boolean isActionAllowed(String userId, String actionKey, int capacity, float leakingRate) {
String key = String.format("%s:%s", userId, actionKey);
Funnel funnel = funnels.get(key);
if (funnel == null) {
funnel = new Funnel(capacity, leakingRate);
funnels.put(key, funnel);
}
return funnel.watering(1); // 需要1个quota
}
}

Redis 4.0 之后可以安装限流模块,它叫 redis-cell。该模块也使用了漏斗算法,并提供了原子的限流指令。该模块只有1条指令cl.throttle。

  • cl.throttle xiaoming:reply 15 30 60 1
  • 该命令意思:允许小明回复行为的频率是60s最多30次,即2秒1次的速率,漏斗的初始容量15次(即一开始可以连续回复15个帖子),

我下载最新版本redis,操作如下

1
2
3
4
5
6
7
8
9
10
11
// 安装redis
brew install redis

// 到redis的bin下,加载下载的redis-cell模块
./redis-server --loadmodule /usr/local/Cellar/redis/5.0.5/extend/libredis_cell.dylib
127.0.0.1:6379> cl.throttle xiaoming:reply 15 30 60 1
1) (integer) 0 # 0 表示允许,1表示拒绝
2) (integer) 16 # 漏斗容量capacity(15+1,15 是从0开始计数的)
3) (integer) 15 # 漏斗剩余空间left_quota(16-1)
4) (integer) -1 # 如果拒绝了,需要多长时间后再试(漏斗有空间了,单位秒)
5) (integer) 2 # 多长时间后,漏斗完全空出来(left_quota==capacity,单位秒)

5、GeoHash(距离计算)

  • GeoHash 算法

GeoHash 算法将二维的经纬度数据映射到一维的整数,这样所有的元素都将在挂载到一条线上,距离靠近的二维坐标映射到一维后的点之间距离也会很接近。当我们想要计算「附近的人时」,首先将目标位置映射到这条线上,然后在这个一维的线上获取附近的点就行了。

那这个映射算法具体是怎样的呢?它将整个地球看成一个二维平面,然后划分成了一系列正方形的方格,就好比围棋棋盘。所有的地图元素坐标都将放置于唯一的方格中。方格越小,坐标越精确。然后对这些方格进行整数编码,越是靠近的方格编码越是接近。那如何编码呢?一个最简单的方案就是切蛋糕法。设想一个正方形的蛋糕摆在你面前,二刀下去均分分成四块小正方形,这四个小正方形可以分别标记为 00,01,10,11 四个二进制整数。然后对每一个小正方形继续用二刀法切割一下,这时每个小小正方形就可以使用 4bit 的二进制整数予以表示。然后继续切下去,正方形就会越来越小,二进制整数也会越来越长,精确度就会越来越高。

  • Geo命令
    • 新增:geoadd bike 116.562108 39.787602 bike1 116.334255 40.027400 bike2 新增bike1和bike2 的经纬度
    • 距离:geodist bike bike1 bike2 km //计算bike1和bike2之间的距离,单位km
    • 元素附近:georadiusbymember bike bike1 2 km count 3 desc //在bike1附近2km内的3个自行车,距离从远到近
    • 坐标附近:georadius bike 116.334255 40.027400 5 km withdist count 2 asc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
127.0.0.1:6379[2]> geoadd bike 116.562108 39.787602 bike1
(integer) 1
127.0.0.1:6379[2]> geoadd bike 116.334255 40.027400 bike2
(integer) 1
127.0.0.1:6379[2]> geoadd bike 112.334255 30.028400 bike3
(integer) 1
127.0.0.1:6379[2]> geoadd bike 192.334255 10.028400 bike4
(integer) 1
127.0.0.1:6379[2]> geodist bike bike1 bike2
"33004.6915"
127.0.0.1:6379[2]> geodist bike bike1 bike2 km
"33.0047"
127.0.0.1:6379[2]> geodist bike bike1 bike3 km
"1151.5533"
127.0.0.1:6379[2]> georadiusbymember bike bike1 2 km count 3 desc
1) "bike1"
2) "haluo"
127.0.0.1:6379[2]> georadius bike 116.334255 40.027400 5 km withdist count 2 asc
1) 1) "bike2"
2) "0.0002"
2) 1) "mobai"
2) "0.0002"