让 Redis zset 支持多条件排序

一些需求中经常要我们实现一个排行榜,数据量少的话可以使用 RDB 数据库排序,数据量大可以自己实现算法或者使用 NoSQL 数据库排序,NoSQL 数据库中最方便的可能就是利用 Redis 的 zset 来实现了。 例如要实现一个玩家成就点数的排行榜:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
    >zadd r 100 A
    "1"
    >zadd r 200 B
    "1"
    >zadd r 200 C
    "1"
    >zadd r 300 D
    "1"
    >zrange r 0 -1 withscores
    1)  "A"
    2)  "100"
    3)  "B"
    4)  "200"
    5)  "C"
    6)  "200"
    7)  "D"
    8)  "300"       

其中 B 和 C 的分数是一样的,这时我们可能想让先达到对应分数的人排在前面,相当于增加了一个排序维度。这时直接用 score 就不能实现了,需要做一些转换,这里分享几个我会用到的方法。

实现方法

假设我们需要一个三个维度的排序,第一维度是具体的数值,第二个维度是一个是否标志位,第三个维度是时间戳。其中氪金的(0否1是)和时间早的排序靠前。

以下是原始数据:

1
2
3
4
5
6
    玩家  成就  是否氪金  时间戳
    A    100  1        1571819021259
    B    200  0        1571819021259
    C    200  1        1571819021259
    D    400  0        1571819021259
    E    200  1        1571810001259

1. 利用 key 排序

如果后两个维度初始化后就不再变化的话,可以利用 Redis 的排序特性,将不变的维度写到 key 里,这样 score 相同时会用 key 来进行排序。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
    # 这里反转时间戳(9999999999999 - 1571810001259),让时间早的排在前面
    >zadd r 100 A-1-8428180978740
    "1"
    >zadd r 200 B-0-8428180978740
    "1"
    >zadd r 200 C-1-8428180978740
    "1"
    >zadd r 400 D-0-8428180978740
    "1"
    >zadd r 200 E-1-8428189998740
    "1"
    >zrange r 0 -1 withscores
     1)  "A-1-8428180978740"
     2)  "100"
     3)  "B-0-8428180978740"
     4)  "200"
     5)  "C-1-8428180978740"
     6)  "200"
     7)  "E-1-8428189998740"
     8)  "200"
     9)  "D-0-8428180978740"
     10)  "400"

以上就是第一种方法了,实现起来非常简单。


score 存储格式

在介绍下两种方法前先来看看 zset 的 score 是怎么存储的,能保留多少精度,直接看跳表 node 的结构。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
    /*
     * 跳跃表节点
     */
    typedef struct zskiplistNode {
    
        // 成员对象
        robj *obj;
    
        // 分值
        double score;
    
        // 后退指针
        struct zskiplistNode *backward;
    
        // 层
        struct zskiplistLevel {
            // 前进指针
            struct zskiplistNode *forward;
    
            // 跨度
            unsigned int span;
        } level[];
    
    } zskiplistNode;

我们可以看到 score 是用 double 存储的,它是一个 IEEE 754 标准的 double 浮点数。(IEEE 754 标准的详细存储规则可以搜索其他文章,这里重点分析保留的精度)

一个 64 位浮点数存储时分为3段

  • S:符号位,第一段,占1位,0表示正数,1表示负数。
  • Exp:指数字,第二段,占11位,移码方式表示正负数,排除全0和全1表示零和无穷大的特殊情况,指数部分是−1022~+1023加上1023,指数值的大小从1~2046
  • Fraction:有效数字,占52位,定点数,小数点放在尾数最前面
    实际数字 = (-1)^S * Fraction * 2^Exp  (二进制计算)

double 存储的有效数据是第三段的52位,超出会损失精度,由于标准规定小数点是在有效数字最前面,所以实际可以存储 53 位数字。比如有个数字是 111___(53个1),有效数字位存储的是 .11___(52个1),计算时会在个位补1,得到1.11___(53个1),最大值为 2^53 = 9007199254740991,大概 log2^53 = 15.95 位。

可以看到只要保证有效数字小于等于9007199254740991就不会损失精度。

好了,现在有了理论基础,接下来看看我们可以怎么实现。


2. 二进制分段

第二种方法,可以利用二进制 64 位 long 分段存储各个维度。

首先时间戳如果全存储就太长了,可以通过一些计算缩小一些,先忽略毫秒,然后和一个大数计算差值。

1
2
    int MAX_SECOND = 1861891200; // 2029.1.1 0:0:0
    int sts = (MAX_SECOND - (int)(ts / 1000))

这样时间就缩小到了300000000以内。

接下来进行划分,首位标志位不用,剩下63位,然后我划分高33位存分数,1位存标志位,最后29位存时间戳,存储结构是这样的:

如果要不损失精度,第一段可存储位数 = 53 - 1 - 29 = 23。

第一段最大值 = 2^33 = 8589934591,不损失精度 = 2^23 = 8388607。

第三段最大值 = 2^29 = 536870911。

  • 使用时可以适当缩短第三段时间戳的长度,或者不追求时间戳一定精确的话,第一段分数可以超出不损失精度的长度,也只会损失一点时间戳的精度而已。

然后就可以用下面的方法计算 score 了,因为是二进制,可以全用位运算。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
    int size1 = 33;
    int size2 = 1;
    int size3 = 29;
    
    long d1Max = maxBySize(size1);
    long d2Max = maxBySize(size2);
    long d3Max = maxBySize(size3);
    
    // 计算 score
    public long genScore(long d1, long d2, long d3) {
        return ((d1 & d1Max) << (size2 + size3)) | ((d2 & d2Max) << size3) | (d3 & d3Max);
    }
    
    // 计算增加 value 值
    public void incValue(long d1) {    
        return ((d1 & d1Max) << (size2 + size3)) | ((0 & d2Max) << size3) | (0 & d3Max);
    }
    
    // 根据二进制长度计算最大值
    private long maxBySize(int len) {
        long r = 0;
        while (len-- > 0) {
            r = ((r << 1) | 1);
        }
    
        return r;
    }

增加高位分数的话直接把低维度设为0,计算出分数后调用 ZINCRBY 就行了。

改变低位维度值的话就不能调用 ZINCRBY 了,需要调用ZADD,我们用自旋加 CAS 更新,防止覆盖掉新更新的值。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
    /**
     * 设置维度值
     * 
     * @param key zset key
     * @param value zset value
     * @param d2 第二位
     * @param d3 第三位
     * @return
     */
    public boolean setD(String key, String value, Long d2, Long d3) {
        long start = System.currentTimeMillis();
        Long oldScore;
        long score;
        byte[] originBytes;
    
        do {
            if ((System.currentTimeMillis() - start) > 2000) {
                return false;
            }
    
            long d1 = 0;
    
            originBytes = zscoreBytes(key, value);
            oldScore = convertZscore(originBytes);
    
            if (originBytes != null) {
                // 根据 score 获取原始值
                d1 = getD1ByScore(oldScore);
            }
            // 如果不设置新值就获取原始值
            if (d2 == null) {
                // 根据 score 获取原始值
                d2 = getD2ByScore(oldScore);
            }
    				// 如果不设置新值就获取原始值
            if (d3 == null) {
                // 根据 score 获取原始值
                d3 = getD3ByScore(oldScore);
            }
    
    				// 生成新值
            score = genScore(d1, d2, d3);
        } while (!compareAndSetDimension(key, value, originBytes, score));
    
        return true;
    }
    
    
    Long SUCCESS = 1L;
    String script = "if (((not (redis.call('zscore', KEYS[1], ARGV[1]))) and (ARGV[2] == '')) " +
                        "or redis.call('zscore', KEYS[1], ARGV[1]) == ARGV[2]) " +
                    " then redis.call('zadd',KEYS[1],ARGV[3],ARGV[1]) return 1 else return 0 end";
    
    // CAS 设置值
    private boolean compareAndSetDimension(String setKey, String key, byte[] oldScore, long newScore) {
        Long result = redisTemplate.execute((RedisCallback<Long>) connection -> {
            try {
                return connection.eval(script.getBytes(), ReturnType.fromJavaType(Long.class), 1, setKey.getBytes(),
                        key.getBytes(), oldScore, om.writeValueAsBytes(newScore));
            } catch (JsonProcessingException e) {
                throw new RuntimeException(e);
            }
        });
    
        return SUCCESS.equals(result);
    }
    
    // 获取原始值
    public byte[] zscoreBytes(String key, String value) {
        String script = "return redis.call('zscore', KEYS[1], ARGV[1])";
        return redisTemplate.execute((RedisCallback<byte[]>) connection -> connection.eval(script.getBytes(),
                ReturnType.fromJavaType(String.class),
                1, key.getBytes(), value.getBytes()));
    }

这里初始 score 值获取使用的是 zscoreBytes() 而不是redisTemplate.opsForZSet().score()是为了防止损失精度,这个方法返回的 score 值其实是可能损失精度的,我们看源码分析下:

直接进到最底层的执行方法 zscore

1
2
3
4
5
6
7
    // class RedisCommandBuilder
    
    Command<K, V, Double> zscore(K key, V member) {
        notNullKey(key);
    
        return createCommand(ZSCORE, new DoubleOutput<>(codec), key, member);
    }

DoubleOutput 将 Redis 的返回值转成 Double

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
    // 将 Redis 返回值转成 Double
    public class DoubleOutput<K, V> extends CommandOutput<K, V, Double> {
    
        public DoubleOutput(RedisCodec<K, V> codec) {
            super(codec, null);
        }
    
        // 方法参数 bytes 就是 Redis 返回的原始值了,是个字符串
        @Override
        public void set(ByteBuffer bytes) {
            output = (bytes == null) ? null : parseDouble(decodeAscii(bytes));
        }
    }

Redis 原始的返回值其实是个类似 1.3094266712875436e+18 的字符串,DoubleOutput 在转换成 Double 的时候有可能会损失精度,用 BigDecimal 转的话是不会丢精度的。

1
2
    String a = "1.3094266712875436e+18";
    long value = new BigDecimal(a).longValue();

所以要不损失精度的话,可以修改下原始方法重新打个包或者用 lua 取出来原始的字符串再转成自己需要的值。

最后利用 genScore 方法就可以生成score值并排序了

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
    # genScore(100, 1, 290072179)
    >zadd r 108201125491 A
    "1"
    # genScore(200, 0, 290072179)
    >zadd r 215038436979 B
    "1"
    # genScore(200, 1, 290072179)
    >zadd r 215575307891 C
    "1"
    # genScore(400, 0, 290072179)
    >zadd r 429786801779 D
    "1"
    # genScore(200, 1, 290081199)
    >zadd r 215575316911 E
    "1"
    >zrange r 0 -1 withscores
     1)  "A"
     2)  "108201125491"
     3)  "B"
     4)  "215038436979"
     5)  "C"
     6)  "215575307891"
     7)  "E"
     8)  "215575316911"
     9)  "D"
     10)  "429786801779"

3. 直接拆分 double

可以用二进制拆分当然也可以直接拆分十进制数,为了方便,还可以用小数划分维度,比如将分数放在整数位,标志位和时间戳放在小数位。

1
2
3
4
5
    A 100.1290072179
    B 200.0290072179
    C 200.1290072179
    D 400.0290072179
    E 200.1290081199

不过这样不丢精度存储的分数比二进制拆分小。

原理和二进制拆分一样,就不赘述了。

总结

数据量不大的情况下直接使用 RDB 就可以了,比较方便。用 Redis 的时候,一级以外的维度不变的情况下可以直接用 key 排序,比较简单,如果维度会更新,可以使用拆分二进制或十进制的方法存储,二进制的优点是存储的数比较大,而且可以用位运算,十进制的优点是计算简单,可读性比较好。各个维度的长度还可以做成配置项,这样就可以满足不同的业务需求了。