这一年以来,写了太多的业务代码。是时候要总结一下自己的积累了。本文是redis深度历险的读书笔记,做个记录以及分享给大家。
docker
redis
数据结构
字符串
字符串是一个字符数组
常见用途就是信息JSON序列化成为字符串之后,存入redis,取信息会经过一次反序列化
字符串是动态字符串,可以进行扩容;当字符串长度小于1m的时候,扩容是加倍。当大于1m,每次扩容就增加1m。字符串最大的长度为512m
使用字符串计数的时候,单个值的最大值为signed long,超过就会报错
-
常用命令
set name a get name exists a del a mset a 1 b 2 mget a b expire a 5 setex a 5 1 setnx a 1 incr a incrby a 10
列表
-
相当于java中的LinkedList,一个双向链表。插入和删除操作快,但是索引定位很慢。
节点删除操作 p.pre.next = p.next p.next.pre = p.pre p = null
-
列表可以作为队列,又进左出即可
rpush books java python golang kotlin 4 llen books 4 lpop books java
-
列表可以作为栈,右进右出即可。就是,调整了队列使用的方向
rpush books java python golang kotlin 4 llen books 4 rpop books kotlin
-
使用队列会存在的慢操作有get(index)方法,需要遍历整个链表,性能为O(n)
rpush books java python golang kotlin 4 lindex 1 # 时间为O(n) lindex books 1 "python" lrange books 0 -1 "java" "python" "golang" "kotlin" ltrim books 1 -1 # 设置长度为3的数组,区间外的全部被清除 OK lrange books 0 -1 # 获取所有元素 "python" "golang" "kotlin" llen books 3 ltrim books 1 0 # 删除所有元素 OK
快速列表:在列表元素较少的时候,会使用一块连续的内存,叫ziplist(压缩列表)。当数据量较多的时候,就要变成quicklist(快速列表),多个ziplist使用双向指针串起来使用。
// 快速列表
struct quicklist {
quicklistNode* head;
quicklistNode* tail;
long count; // 元素总数
int nodes; // ziplist 节点的个数
int compressDepth; // LZF 算法压缩深度
...
}
// 快速列表节点
struct quicklistNode {
quicklistNode* prev;
quicklistNode* next;
ziplist* zl; // 指向压缩列表
int32 size; // ziplist 的字节总数
int16 count; // ziplist 中的元素数量
int2 encoding; // 存储形式 2bit,原生字节数组还是 LZF 压缩存储
}
struct ziplist_compressed {
int32 size;
byte[] compressed_data;
}
struct ziplist {
...
}
Hash字典
内部实现结构就是数组+链表的方式进行实现。相同hash值的元素使用链表串联起来。java的链表数量超过6个会变成红黑树,不知道redis的会不会有这个查询优化。
redis里面的值就只能是字符串,便于比较。不像java的hashmap可以存储对象,equals方法的编写可谓五花八门
-
java的hashmap在进行rehash的时候,需要一次性进行rehash,是个耗时操作,多线程还会出现环。redis的是渐进式rehash。rehash的时候,会保留两个hash结构,查询时候会查询两个hash结构,在后续的定时任务和hash指令操作的时候,渐进地将旧的内容迁移到新的hash结构中。当内容迁移完毕,删除旧的hash结构
hset books java "think in java" 1 hset books golang "concurrency in go" 1 hset books python "python cookbook" 1 hgetall books # 返回所有entryies,key和value间隔返回 "java" "think in java" "golang" "concurrency in go" "python" "python cookbook" hlen books 3 hget books java "think in java" hset books java "my java book" 0 hmset books java "" golang "" ok hset user-lixinkun age 25 1 hincrby user-lixinkun 1 26
set
-
相当于java的hashset,确保唯一性
sadd books java 1 sadd books java 0 sadd books python golang smembers books java golang python #顺序可能被打乱
Zset 有序列表
- 类似于java的sortedset和hashmap的结合体。具备set的唯一性原理,也可以给每个值赋一个score,代表它的排序权重。它的内部实现是一个跳跃列表。貌似java也有这个数据结构
*内部排序通过skiplist实现。zset要支持睡觉的插入和删除,不能使用数组来表示。普通的链表数据结构,有新元素插入的时候,需要进行定位,这样才能保证是有序的。通常会通过二分查找来确定位置,但是只有数组才支持快速查找。这时候就需要一个多层级的列表结构,最下面一层的元素都是串联起来的。某个元素存在于所有的层。定位插入点的时候,先在顶层进行定位,然后下潜到下一层,然后一直到最底下一层。redia采用随机策略来决定元素可以兼职到第几层。L0层的概率是100%,L1的是50%。这样去类推
-
数据结构,具体介绍链接redis跳跃列表介绍
/* ZSETs use a specialized version of Skiplists */ typedef struct zskiplistNode { //成员对象 robj *obj; //分值 double score; //后退指针 struct zskiplistNode *backward; //层 struct zskiplistLevel { struct zskiplistNode *forward;//前进指针 unsigned int span;//跨度 } level[]; } zskiplistNode; typedef struct zskiplist { //表头节点和表尾节点 struct zskiplistNode *header, *tail; //表中节点的的数量 unsigned long length; //表中层数最大的节点层数 int level; } zskiplist;
容器的通用规则
create if not exists.如果容器不存在,则先创建容器。
drop if no elements.如果容器没有元素了,就立即删除该容器,释放内存。
容器的过期时间是针对整个对象的,hash的过期是整个进行过期,而不是单个元素
分布式锁
对于不是原子性的操作,就需要通过分布式锁来保证只有一个线程可以获取到锁。具体使用的是setnx和del命令。锁处理完了以后,就要被释放。中间需要解决的问题就是,如果业务逻辑执行之间,出现了异常,可能导致del无法执行,整个业务瘫痪。
或许,可以加一个过期时间,5s以后自动释放。但是逻辑还有问题的,如果expire的时候,命令丢失了,也会死锁。出现这种问题的原因就是setnx和expire两个命令不是原子性的。
在redis2.8的版本中,作者加入了set指令的扩展餐厨,setnx和expire指令可以一起执行了,两个指令是原子性的。
set lock:book true ex 5 nx
del lock:book
超时问题是redis分布式锁不能解决的问题,如果加锁和释放锁之间的逻辑太长,超出了锁的超时时限,那么第二个线程可能获得redis分布式锁。所以,redis分布式锁不要用于较长时间的任务
可重入性。java里面有一个ReentrantLock。Redis分布式锁如果要支持重入,需要对客户端的set方法进行封装,使用线程的ThreadLocal遍历存储当前持有锁的计数。
这个是redis分布式锁的使用和错误案例介绍redis分布式锁使用与错误示例
//错误使用就是使用setnx和expire两个指令
/**
* 尝试获取分布式锁
* @param jedis Redis客户端
* @param lockKey 锁
* @param requestId 请求标识
* @param expireTime 超期时间
* @return 是否获取成功
*/
public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {
String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
if (LOCK_SUCCESS.equals(result)) {
return true;
}
return false;
}
- java版本的可重入锁,核心就是用threadlocal去记录每个线程,某个key的锁计数
public class RedisWithReentrantLock{
//线程锁计数器8
private ThreadLocal<Map<String,Integer>> lockers = new ThreadLocal<>();
private Jedis jedis;
//内部使用的lock
private boolean _lock(String key) {
return jedis.set(key, "", "nx", "ex", 5L) != null;
}
//内部使用的unlock
private void _unlock(String key) {
jedis.del(key);
}
//获取计数器
private Map<String, Integer> currentLockers() {
Map<String, Integer> refs = lockers.get();
if (refs != null) {
return refs;
}
lockers.set(new HashMap<>());
return lockers.get();
}
//可重入加锁
public boolean lock(String key) {
Map<String, Integer> refs = currentLockers();
Integer refCnt = refs.get(key);
if (refCnt != null) {
refs.put(key, refCnt + 1);
return true;
}
boolean ok = _lock(key);
if (!ok) {
return false;
}
refs.put(key, 1);
return true;
}
//可重入释放锁
public boolean unlock(String key) {
Map<String, Integer> refs = currentLockers();
Integer refCnt = refs.get(key);
if (refCnt == null) {
return false;
}
refCnt = -1;
if (refCnt > 0) {
refs.put(key, refCnt);
} else {
refs.remove(key);
_unlock(key);
}
return true;
}
}
redis的消息队列,只做特性介绍,实际使用我们目前用rocketmq
使用简单,只有一个消费者
没有ack保证,如果对消息可靠性有极高的要求,推荐使用rocketmq的事务消息
使用list作为异步消息的发送结构,rpush,rpop
延时队列的实现
- 延时队列可以使用zset实现,将消息序列化成为一个字符串,消息的到期时间为score。然后使用多个线程去轮询这个队列。使用多线程是为了保障可用性。一个线程挂了,还有其他线程可以进行处理
位图,节约存储空间
平时进行开发的时候,要对一些bool类型的数据进行存取,如用户一年的签到记录。如果使用key/value方式进行存取,就要记录365个,当用户量上亿的时候,需要的存储结构是非常惊人的。
redis提供了位图数据结构,每个用户每天的签到就只占据一个位。365天大概需要46个字节,大大节约空间
位图就是byte数组,可以使用getbit和setbit进行存储,如果字节是不可以打印字符,redis会展示其16进制。实际来操作一波
零存整取的方式,一个个bit去设置,然后一次性拿出值
通过python获取字母`h`的hash值为0b1101000
setbit s 1 1
setbit s 2 1
setbit s 4 1
get s
"h" # 这样就得到了h的值
零存零取的方式,就是对单个bit进行操作,没有设置的位就是默认为0
set bit w 1 1
get bit w 0
0
整存零取
set w h
getbit w 1
1
不可打印字符
setbit x 0 1
setbit x 1 1
get x
"\xc0"
-
位图的统计和查找:redis提供了对位图的基本统计算
set w hello ok bitcount w 21 bitcount w 0 0 3 bitcount w 0 1 # 统计1的数量 7 bitpos w 0 # 第一个0 0 bitpos w 1 1 1 # 第二个字符算起,第一个1位 魔术指令bitfield bitfield w get u4 0
高级数据结构
HyberLogLog,提供不精确的统计数据
页面的uv,不能使用一个key的incr,因为要进行去重
使用zset也可以做到这个,但是消耗空间太大
HyperLogLog的标准误差是0.81%。对于不精确的统计,可以使用这个数据结构
提供pdadd,pfcount两个指令,pf是Philippe Flajolet教授的名字缩写,这个数据结构是他发明的。这个数据结构占用的内存是12k,使用了2^14个桶去进行计算
pfadd book java
1
pfadd book java
0
pfadd book python
1
pfadd book python
pfcount book
2
布隆过滤器Bloom Filter
这个数据结构主要实现去重的功能
场景使用:广告推送的时候,如果我们看过了一条广告,下一条推送的时候,要对看过的广告进行去重。如果使用关系型数据库,每次推送都要去数据库查询,这样数据库很难抗住这个压力。况且,浏览记录会随着时间线性增长,空间也跟不上
布隆过滤器会有一定的误判概率。这可以比喻成为一个不精确的set结构,调用contains的时候,有可能误判。如果说这个值不存在,那就肯定不存在。如果说这个值存在,优肯是误判,可能,两者长得相像?这样的话,对于实际业务也没有影响,我们可以保证推送给用户的都是最新的内容,只有极少数用户没看过的内容会被误判。经过测试,误判率大概在1%左右。调整参数可以优化这个误判率
布隆过滤器作为redis的一个插件,需要版本为4.0以后,才能使用。
基本指令为bf.add,bf.exists.bf.madd(批量),安装docker,然后去重新安装redis,带这个插件的版本。centos7安装docker方法,安装好以后再安装插件,使用docker启动redis
用户可以通过bf.reserve命令显式制定错误率,key和初始空间。错误率越低,占用空间越大。初始值越大,空间越大,所以,在使用的时候,需要准确预估一下元素的大小。
布隆过滤器是一个大型的位数组。向布隆过滤器添加key的时候,会使用多个hash函数对key进行hash,算得一个整数索引。然后对位数组的长度取模运算得到一个位置,每个hash函数都得到一个不同的位置,再把这几个位置都设置为1,完成了add操作。巧妙地共享可多个key的存储空间。
为何会出现误判呢?就是由于判断key是否存在的时候,如果某个位是0,那就一定不存在。但是,如果各个位置都是1的话,也不能说明是存在的,有可能是其他key存在导致。如果位数组比较稀疏,出现误判的概率就低了。所以,使用的时候,不要让实际的元素大于初始值。如果大于的时候,要对布隆过滤器进行重建。
set存储的是所有的元素,布隆过滤器是存储元素的指纹(hash)。空间节省还是比较明显的.
docker加载插件的命令,docker run -p 6379:6379 --name redis-redisbloom redislabs/rebloom:latest
docker exec -it redis-redisbloom bash
bf.add book java
1
bf.add book java #再次加入java,就被排出了
0
bf.exists book python
0
redis限流
- 后端屏蔽用户的过于频繁的请求,比如1秒内不能请求超过5次。这个限流存在一个滑动时间窗口,zset的score圈出一个时间窗口,我们保留这个时间窗口,对于窗口外面的数据,则可以统统砍掉。zset的value可以为毫秒时间戳。
public class SimpleRateLimiter{
private Jedis jedis;
//用户请求是否允许
//使用乐观锁
//value保证唯一性就好
public boolean isActionAllow(String userId, String actionKey, int period, int maxCount) {
String key = String.format("hist:%s:%s", userId, actionKey);
long now = System.currentTimeMillis();
Pipeline pipe = jedis.pipelined();
pipe.multi();
pipe.zadd(key, now, "", now);
//截取
pipe.zremrangeByScore(key, 0, now - period * 1000);
Response<Long> count = pipe.zcard(key);
pipe.expire(key, period + 1);
pipe.exec();
pipe.close();
return count.get() <= maxCount;
}
}
- 漏斗限流,灵感来源于漏斗,就是漏斗有一个容量,也有一个消耗速率。当灌水速率小于漏水速率,漏斗超过容量以后。灌水行为就要丢弃或者暂停
//单机版的漏斗限流
public class FunnelRateLimiter{
class Funnel{
//容量
int capacity;
//漏水速率
float leakingRate;
//剩余空间
int leftQuota;
//上次触发时间
long leakingTs;
}
//....省略构造方法
void makeSpace() {
long now = System.currentTimeMillis();
long deltaTs = nowTs - leakingTs;
int deltaQuota = (int)(deltaTs * leakingRate);
//如果时间间隔太久,可能会导致int溢出
if (deltaQuota < 0) {
//重设
this.leftQuota = capacity;
this.leakingTs = 0;
return;
}
if (deltaQuota < 1) {
return;
}
this.leftQuota += leftQuota;
this.leakingTs = nowTs;
if (this.leftQuota > this.capacity) {
this.leftQuota = this.capacity;
}
//是否可以接收请求
boolean watering(int quota) {
makeSpace();
if (leftQuota >= quota) {
this.leftQuota -= quota;
return true;
}
return false;
}
}
private Map<String, Funnel> funnels = new HashMap<>();
public boolean isActionAllow(String userId, String actionKey, int capacity, fload leakingRate) {
String key = String.format("funnels:%s:%s", userId, actionKey);
Funnel funnel = funnels.get(key);
if (funnel == null) {
funnel = new Funnel(capacity, leakingRate);
funnels.put(key, funnel);
}
return funnel.watering(1);
}
}
- 这里是redis提供的分布式限流,redis4.0提供了redis-cell。这是一个差价,需要去github下载redis-cell的源码解压以后。进入redis-cli后,执行redis的 module load命令即可加载
wget https://github.com/brandur/redis-cell/releases/download/v0.2.4/redis-cell-v0.2.4-x86_64-unknown-linux-gnu.tar.gz
tar -xzvf redis-cell-v0.2.4-x86_64-unknown-linux-gnu.tar.gz
解压后得到libredis_cess.so
redis-cli
module load /data/libredis_cell.so
我这里是放到了data目录下面,运行redis后加载就行了
//各个参数 key 容量 计算漏斗速率 可选参数
cl.throttle <key> <max_burst> <count per period> <period> [<quantity>]
cl.throttle book 15 30 60 1 # 这个表示容量为15,2秒一个漏水速率(30/60)
0 # 0 允许 -1 拒绝
16 # 漏斗容量
15 # 剩余空间
-1 # 当第一个参数为-1师,这里的值表示需要经过多少时间以后才能再试
2 # 多久时间以后,漏斗可以完全空出来
有疑问加站长微信联系(非本文作者)