一些需求中经常要我们实现一个排行榜,数据量少的话可以使用 RDB 数据库排序,数据量大可以自己实现算法或者使用 NoSQL 数据库排序,NoSQL 数据库中最方便的可能就是利用 Redis 的 zset 来实现了。 例如要实现一个玩家成就点数的排行榜:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
>zadd r 100 A
"1"
>zadd r 200 B
"1"
>zadd r 200 C
"1"
>zadd r 300 D
"1"
>zrange r 0 -1 withscores
1) "A"
2) "100"
3) "B"
4) "200"
5) "C"
6) "200"
7) "D"
8) "300"
其中 B 和 C 的分数是一样的,这时我们可能想让先达到对应分数的人排在前面,相当于增加了一个排序维度。这时直接用 score 就不能实现了,需要做一些转换,这里分享几个我会用到的方法。
实现方法假设我们需要一个三个维度的排序,第一维度是具体的数值,第二个维度是一个是否标志位,第三个维度是时间戳。其中氪金的(0否1是)和时间早的排序靠前。
以下是原始数据:
1
2
3
4
5
6
玩家 成就 是否氪金 时间戳
A 100 1 1571819021259
B 200 0 1571819021259
C 200 1 1571819021259
D 400 0 1571819021259
E 200 1 1571810001259
1. 利用 key 排序如果后两个维度初始化后就不再变化的话,可以利用 Redis 的排序特性,将不变的维度写到 key 里,这样 score 相同时会用 key 来进行排序。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
# 这里反转时间戳(9999999999999 - 1571810001259),让时间早的排在前面
>zadd r 100 A-1-8428180978740
"1"
>zadd r 200 B-0-8428180978740
"1"
>zadd r 200 C-1-8428180978740
"1"
>zadd r 400 D-0-8428180978740
"1"
>zadd r 200 E-1-8428189998740
"1"
>zrange r 0 -1 withscores
1) "A-1-8428180978740"
2) "100"
3) "B-0-8428180978740"
4) "200"
5) "C-1-8428180978740"
6) "200"
7) "E-1-8428189998740"
8) "200"
9) "D-0-8428180978740"
10) "400"
以上就是第一种方法了,实现起来非常简单。
score 存储格式在介绍下两种方法前先来看看 zset 的 score 是怎么存储的,能保留多少精度,直接看跳表 node 的结构。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/*
* 跳跃表节点
*/
typedef struct zskiplistNode {
// 成员对象
robj * obj ;
// 分值
double score ;
// 后退指针
struct zskiplistNode * backward ;
// 层
struct zskiplistLevel {
// 前进指针
struct zskiplistNode * forward ;
// 跨度
unsigned int span ;
} level [];
} zskiplistNode ;
我们可以看到 score 是用 double 存储的,它是一个 IEEE 754 标准的 double 浮点数。(IEEE 754 标准的详细存储规则可以搜索其他文章,这里重点分析保留的精度)
一个 64 位浮点数存储时分为3段
S:符号位,第一段,占1位,0表示正数,1表示负数。 Exp:指数字,第二段,占11位,移码方式表示正负数,排除全0和全1表示零和无穷大的特殊情况,指数部分是−1022~+1023加上1023,指数值的大小从1~2046 Fraction:有效数字,占52位,定点数,小数点放在尾数最前面 实际数字 = (-1)^S * Fraction * 2^Exp (二进制计算)
double 存储的有效数据是第三段的52位,超出会损失精度,由于标准规定小数点是在有效数字最前面,所以实际可以存储 53 位数字。比如有个数字是 111___(53个1)
,有效数字位存储的是 .11___(52个1)
,计算时会在个位补1,得到1.11___(53个1)
,最大值为 2^53 = 9007199254740991
,大概 log2^53 = 15.95
位。
可以看到只要保证有效数字小于等于9007199254740991
就不会损失精度。
好了,现在有了理论基础,接下来看看我们可以怎么实现。
2. 二进制分段第二种方法,可以利用二进制 64 位 long 分段存储各个维度。
首先时间戳如果全存储就太长了,可以通过一些计算缩小一些,先忽略毫秒,然后和一个大数计算差值。
1
2
int MAX_SECOND = 1861891200 ; // 2029.1.1 0:0:0
int sts = ( MAX_SECOND - ( int )( ts / 1000 ))
这样时间就缩小到了300000000以内。
接下来进行划分,首位标志位不用,剩下63位,然后我划分高33位存分数,1位存标志位,最后29位存时间戳,存储结构是这样的:
如果要不损失精度,第一段可存储位数 = 53 - 1 - 29 = 23。
第一段最大值 = 2^33 = 8589934591,不损失精度 = 2^23 = 8388607。
第三段最大值 = 2^29 = 536870911。
使用时可以适当缩短第三段时间戳的长度,或者不追求时间戳一定精确的话,第一段分数可以超出不损失精度的长度,也只会损失一点时间戳的精度而已。 然后就可以用下面的方法计算 score 了,因为是二进制,可以全用位运算。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
int size1 = 33 ;
int size2 = 1 ;
int size3 = 29 ;
long d1Max = maxBySize ( size1 );
long d2Max = maxBySize ( size2 );
long d3Max = maxBySize ( size3 );
// 计算 score
public long genScore ( long d1 , long d2 , long d3 ) {
return (( d1 & d1Max ) << ( size2 + size3 )) | (( d2 & d2Max ) << size3 ) | ( d3 & d3Max );
}
// 计算增加 value 值
public void incValue ( long d1 ) {
return (( d1 & d1Max ) << ( size2 + size3 )) | (( 0 & d2Max ) << size3 ) | ( 0 & d3Max );
}
// 根据二进制长度计算最大值
private long maxBySize ( int len ) {
long r = 0 ;
while ( len -- > 0 ) {
r = (( r << 1 ) | 1 );
}
return r ;
}
增加高位分数的话直接把低维度设为0,计算出分数后调用 ZINCRBY 就行了。
改变低位维度值的话就不能调用 ZINCRBY
了,需要调用ZADD
,我们用自旋加 CAS 更新,防止覆盖掉新更新的值。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
/**
* 设置维度值
*
* @param key zset key
* @param value zset value
* @param d2 第二位
* @param d3 第三位
* @return
*/
public boolean setD ( String key , String value , Long d2 , Long d3 ) {
long start = System . currentTimeMillis ();
Long oldScore ;
long score ;
byte [] originBytes ;
do {
if (( System . currentTimeMillis () - start ) > 2000 ) {
return false ;
}
long d1 = 0 ;
originBytes = zscoreBytes ( key , value );
oldScore = convertZscore ( originBytes );
if ( originBytes != null ) {
// 根据 score 获取原始值
d1 = getD1ByScore ( oldScore );
}
// 如果不设置新值就获取原始值
if ( d2 == null ) {
// 根据 score 获取原始值
d2 = getD2ByScore ( oldScore );
}
// 如果不设置新值就获取原始值
if ( d3 == null ) {
// 根据 score 获取原始值
d3 = getD3ByScore ( oldScore );
}
// 生成新值
score = genScore ( d1 , d2 , d3 );
} while (! compareAndSetDimension ( key , value , originBytes , score ));
return true ;
}
Long SUCCESS = 1L ;
String script = "if (((not (redis.call('zscore', KEYS[1], ARGV[1]))) and (ARGV[2] == '')) " +
"or redis.call('zscore', KEYS[1], ARGV[1]) == ARGV[2]) " +
" then redis.call('zadd',KEYS[1],ARGV[3],ARGV[1]) return 1 else return 0 end" ;
// CAS 设置值
private boolean compareAndSetDimension ( String setKey , String key , byte [] oldScore , long newScore ) {
Long result = redisTemplate . execute (( RedisCallback < Long >) connection -> {
try {
return connection . eval ( script . getBytes (), ReturnType . fromJavaType ( Long . class ), 1 , setKey . getBytes (),
key . getBytes (), oldScore , om . writeValueAsBytes ( newScore ));
} catch ( JsonProcessingException e ) {
throw new RuntimeException ( e );
}
});
return SUCCESS . equals ( result );
}
// 获取原始值
public byte [] zscoreBytes ( String key , String value ) {
String script = "return redis.call('zscore', KEYS[1], ARGV[1])" ;
return redisTemplate . execute (( RedisCallback < byte []>) connection -> connection . eval ( script . getBytes (),
ReturnType . fromJavaType ( String . class ),
1 , key . getBytes (), value . getBytes ()));
}
这里初始 score 值获取使用的是 zscoreBytes()
而不是redisTemplate.opsForZSet().score()
是为了防止损失精度,这个方法返回的 score 值其实是可能损失精度的,我们看源码分析下:
直接进到最底层的执行方法 zscore
1
2
3
4
5
6
7
// class RedisCommandBuilder
Command < K , V , Double > zscore ( K key , V member ) {
notNullKey ( key );
return createCommand ( ZSCORE , new DoubleOutput <>( codec ), key , member );
}
DoubleOutput
将 Redis 的返回值转成 Double
1
2
3
4
5
6
7
8
9
10
11
12
13
// 将 Redis 返回值转成 Double
public class DoubleOutput < K , V > extends CommandOutput < K , V , Double > {
public DoubleOutput ( RedisCodec < K , V > codec ) {
super ( codec , null );
}
// 方法参数 bytes 就是 Redis 返回的原始值了,是个字符串
@Override
public void set ( ByteBuffer bytes ) {
output = ( bytes == null ) ? null : parseDouble ( decodeAscii ( bytes ));
}
}
Redis 原始的返回值其实是个类似 1.3094266712875436e+18
的字符串,DoubleOutput
在转换成 Double 的时候有可能会损失精度,用 BigDecimal
转的话是不会丢精度的。
1
2
String a = "1.3094266712875436e+18" ;
long value = new BigDecimal ( a ). longValue ();
所以要不损失精度的话,可以修改下原始方法重新打个包或者用 lua 取出来原始的字符串再转成自己需要的值。
最后利用 genScore
方法就可以生成score值并排序了
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# genScore(100, 1, 290072179)
>zadd r 108201125491 A
"1"
# genScore(200, 0, 290072179)
>zadd r 215038436979 B
"1"
# genScore(200, 1, 290072179)
>zadd r 215575307891 C
"1"
# genScore(400, 0, 290072179)
>zadd r 429786801779 D
"1"
# genScore(200, 1, 290081199)
>zadd r 215575316911 E
"1"
>zrange r 0 -1 withscores
1) "A"
2) "108201125491"
3) "B"
4) "215038436979"
5) "C"
6) "215575307891"
7) "E"
8) "215575316911"
9) "D"
10) "429786801779"
3. 直接拆分 double可以用二进制拆分当然也可以直接拆分十进制数,为了方便,还可以用小数划分维度,比如将分数放在整数位,标志位和时间戳放在小数位。
1
2
3
4
5
A 100.1290072179
B 200.0290072179
C 200.1290072179
D 400.0290072179
E 200.1290081199
不过这样不丢精度存储的分数比二进制拆分小。
原理和二进制拆分一样,就不赘述了。
总结数据量不大的情况下直接使用 RDB 就可以了,比较方便。用 Redis 的时候,一级以外的维度不变的情况下可以直接用 key 排序,比较简单,如果维度会更新,可以使用拆分二进制或十进制的方法存储,二进制的优点是存储的数比较大,而且可以用位运算,十进制的优点是计算简单,可读性比较好。各个维度的长度还可以做成配置项,这样就可以满足不同的业务需求了。