redis源码分析——8、RDB持久化
一、RDB触发命令
触发生成rdb的命令有save
和bgsave
两种,他们有一下特性
save
是阻塞式的,当执行save
时客户端发送的命令都会被拒绝。bgsave
是通过子进程执行的,执行bgsave
的时候仍然可以处理客户端请求,但是在执行bgsave
的时候不能再次执行save
和bgsave
。bgsave
用fork创建子进程,那么就存在COW机制,假如在极端情况下,执行bgsave
的同时所有键的value都被修改,那么此时系统的使用内存就会翻倍,很可能触发OOM,因此在运行redis的机器上至少需要保留一半的空闲内存。- 不管
save
还是bgsave
,他们保存的都是当前的数据T0(SNAPSHOTTING),假设生成rdb文件耗时为n,那么在rdb文件中不包含这n时间内的新增数据。
二、RDB相关配置
定时生成rdb,可以定时强制生成rdb,也可以按照更新key的频率来生成rdb。
save <seconds> <changes>
save 900 1 # 900s内至少改变了一个key
save 300 10 # 300s内至少改变了10个key
save 60 10000 # 60s内至少改变了10000个key
save 5 0 # 每5s保存一次
异常时禁写,假设磁盘问题导致生成rdb失败,此时可以开启禁止写命令来减防止数据丢失的可能
stop-writes-on-bgsave-error yes
字符串使用LZF压缩,可以对string压缩存储,生成的rdb文件更小,但是会消耗更多的cpu资源
rdbcompression yes
数据校验,在5以后在rdb文件末尾会加校验和,可以有效防止数据损坏,但是在生成和加载rdb的时候会有10%的性能损耗
rdbchecksum yes
rdb文件名
dbfilename dump.rdb
删除rdb,redis通过rdb来进行全量同步,如果没有开启持久化,在同步完成后是否需要删除rdb
rdb-del-sync-files no
存储路径
dir ./
三、触发流程源码分析
saveparam结构,saveparam对应这配置文件中的save选项
struct saveparam {
time_t seconds;
int changes;
};
rdb相关变量,在主结构redisServer中定义了rdb相关的变量
struct redisServer {
...
/* RDB persistence */
long long dirty; /* Changes to DB from the last save */
long long dirty_before_bgsave; /* Used to restore dirty on failed BGSAVE */
pid_t rdb_child_pid; /* PID of RDB saving child */
struct saveparam *saveparams; /* Save points array for RDB */
int saveparamslen; /* Number of saving points */
char *rdb_filename; /* Name of RDB file */
int rdb_compression; /* Use compression in RDB? */
int rdb_checksum; /* Use RDB checksum? */
int rdb_del_sync_files; /* Remove RDB files used only for SYNC if
the instance does not use persistence. */
time_t lastsave; /* Unix time of last successful save */
time_t lastbgsave_try; /* Unix time of last attempted bgsave */
time_t rdb_save_time_last; /* Time used by last RDB save run. */
time_t rdb_save_time_start; /* Current RDB save start time. */
int rdb_bgsave_scheduled; /* BGSAVE when possible if true. */
int rdb_child_type; /* Type of save by active child. */
int lastbgsave_status; /* C_OK or C_ERR */
int stop_writes_on_bgsave_err; /* Don't allow writes if can't BGSAVE */
...
}
定时触发,redis只有一个定时器serverCron,所有的定时任务都在这个定时器中执行
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
...
/* Check if a background saving or AOF rewrite in progress terminated. */
if (hasActiveChildProcess() || ldbPendingChildren())
{
checkChildrenDone();
} else {
/* If there is not a background saving/rewrite in progress check if
* we have to save/rewrite now. */
for (j = 0; j < server.saveparamslen; j++) {
struct saveparam *sp = server.saveparams+j;
/* Save if we reached the given amount of changes,
* the given amount of seconds, and if the latest bgsave was
* successful or if, in case of an error, at least
* CONFIG_BGSAVE_RETRY_DELAY seconds already elapsed. */
if (server.dirty >= sp->changes &&
server.unixtime-server.lastsave > sp->seconds &&
(server.unixtime-server.lastbgsave_try >
CONFIG_BGSAVE_RETRY_DELAY ||
server.lastbgsave_status == C_OK))
{
serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...",
sp->changes, (int)sp->seconds);
rdbSaveInfo rsi, *rsiptr;
rsiptr = rdbPopulateSaveInfo(&rsi);
rdbSaveBackground(server.rdb_filename,rsiptr);
break;
}
}
/* Trigger an AOF rewrite if needed. */
if (server.aof_state == AOF_ON &&
!hasActiveChildProcess() &&
server.aof_rewrite_perc &&
server.aof_current_size > server.aof_rewrite_min_size)
{
long long base = server.aof_rewrite_base_size ?
server.aof_rewrite_base_size : 1;
long long growth = (server.aof_current_size*100/base) - 100;
if (growth >= server.aof_rewrite_perc) {
serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
rewriteAppendOnlyFileBackground();
}
}
...
}
保存rdb
int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) {
dictIterator *di = NULL;
dictEntry *de;
char magic[10];
int j;
uint64_t cksum;
size_t processed = 0;
if (server.rdb_checksum)
rdb->update_cksum = rioGenericUpdateChecksum;
snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION);
if (rdbWriteRaw(rdb,magic,9) == -1) goto werr;
if (rdbSaveInfoAuxFields(rdb,rdbflags,rsi) == -1) goto werr;
if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr;
for (j = 0; j < server.dbnum; j++) {
redisDb *db = server.db+j;
dict *d = db->dict;
if (dictSize(d) == 0) continue;
di = dictGetSafeIterator(d);
/* Write the SELECT DB opcode */
if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr;
if (rdbSaveLen(rdb,j) == -1) goto werr;
/* Write the RESIZE DB opcode. */
uint64_t db_size, expires_size;
db_size = dictSize(db->dict);
expires_size = dictSize(db->expires);
if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr;
if (rdbSaveLen(rdb,db_size) == -1) goto werr;
if (rdbSaveLen(rdb,expires_size) == -1) goto werr;
/* Iterate this DB writing every entry */
while((de = dictNext(di)) != NULL) {
sds keystr = dictGetKey(de);
robj key, *o = dictGetVal(de);
long long expire;
initStaticStringObject(key,keystr);
expire = getExpire(db,&key);
if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) goto werr;
/* When this RDB is produced as part of an AOF rewrite, move
* accumulated diff from parent to child while rewriting in
* order to have a smaller final write. */
if (rdbflags & RDBFLAGS_AOF_PREAMBLE &&
rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES)
{
processed = rdb->processed_bytes;
aofReadDiffFromParent();
}
}
dictReleaseIterator(di);
di = NULL; /* So that we don't release it again on error. */
}
/* If we are storing the replication information on disk, persist
* the script cache as well: on successful PSYNC after a restart, we need
* to be able to process any EVALSHA inside the replication backlog the
* master will send us. */
if (rsi && dictSize(server.lua_scripts)) {
di = dictGetIterator(server.lua_scripts);
while((de = dictNext(di)) != NULL) {
robj *body = dictGetVal(de);
if (rdbSaveAuxField(rdb,"lua",3,body->ptr,sdslen(body->ptr)) == -1)
goto werr;
}
dictReleaseIterator(di);
di = NULL; /* So that we don't release it again on error. */
}
if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_AFTER_RDB) == -1) goto werr;
/* EOF opcode */
if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr;
/* CRC64 checksum. It will be zero if checksum computation is disabled, the
* loading code skips the check in this case. */
cksum = rdb->cksum;
memrev64ifbe(&cksum);
if (rioWrite(rdb,&cksum,8) == 0) goto werr;
return C_OK;
werr:
if (error) *error = errno;
if (di) dictReleaseIterator(di);
return C_ERR;
}
四、RDB文件结构
每个版本的RDB文件布局略有不同,好在是向下兼容的,以6为例,文件布局如下:
其中开头5字节的"REDIS"是固定的,表示是redis rdb文件。
其后的4字节用来表示RDB_VERSION,RDB的版本号,不是redis的版本号
INFO_AUX是一段数组key-value,用来记录一些环境,比如生成rdb的时间、机器cpu的位数、使用内存等,加载rdb的服务可以对比来判断是否需要架子啊rdb。
MODULE_AUX也是一段数组key-value,用来记录模块信息。
常见的辅助字段如下
在数据块,都有一字节的opcode来表示字段的含义,比如opcode=254表示接下来是数据库编号;opcode=251表示键空间;opcode=252表示过期时间。
KEY_VALUE是具体的键值对,但是redis有多种数据结构,所以在每一个KEY_VALUE开头首先都是一个结构类型字段,紧接着是key,再是value。
常见的opcode定义如下:
#define RDB_OPCODE_MODULE_AUX 247 /* Module auxiliary data. */
#define RDB_OPCODE_IDLE 248 /* LRU idle time. */
#define RDB_OPCODE_FREQ 249 /* LFU frequency. */
#define RDB_OPCODE_AUX 250 /* RDB aux field. */
#define RDB_OPCODE_RESIZEDB 251 /* Hash table resize hint. */
#define RDB_OPCODE_EXPIRETIME_MS 252 /* Expire time in milliseconds. */
#define RDB_OPCODE_EXPIRETIME 253 /* Old expire time in seconds. */
#define RDB_OPCODE_SELECTDB 254 /* DB number of the following keys. */
#define RDB_OPCODE_EOF 255 /* End of the RDB file. */
键值对结构如下:
所以rdb文件如下:
我们知道,redis不仅有多种数据结构,而且每种结构有多种编码,所在保存类型的时候需要考虑类型和编码两个维度。
五、数据编码
/* Save a key-value pair, with expire time, type, key, value.
* On error -1 is returned.
* On success if the key was actually saved 1 is returned, otherwise 0
* is returned (the key was already expired). */
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) {
int savelru = server.maxmemory_policy & MAXMEMORY_FLAG_LRU;
int savelfu = server.maxmemory_policy & MAXMEMORY_FLAG_LFU;
/* Save the expire time */
if (expiretime != -1) {
if (rdbSaveType(rdb,RDB_OPCODE_EXPIRETIME_MS) == -1) return -1;
if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1;
}
/* Save the LRU info. */
if (savelru) {
uint64_t idletime = estimateObjectIdleTime(val);
idletime /= 1000; /* Using seconds is enough and requires less space.*/
if (rdbSaveType(rdb,RDB_OPCODE_IDLE) == -1) return -1;
if (rdbSaveLen(rdb,idletime) == -1) return -1;
}
/* Save the LFU info. */
if (savelfu) {
uint8_t buf[1];
buf[0] = LFUDecrAndReturn(val);
/* We can encode this in exactly two bytes: the opcode and an 8
* bit counter, since the frequency is logarithmic with a 0-255 range.
* Note that we do not store the halving time because to reset it
* a single time when loading does not affect the frequency much. */
if (rdbSaveType(rdb,RDB_OPCODE_FREQ) == -1) return -1;
if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
}
/* Save type, key, value */
if (rdbSaveObjectType(rdb,val) == -1) return -1; /* 保存类型 */
if (rdbSaveStringObject(rdb,key) == -1) return -1; /* 保存key */
if (rdbSaveObject(rdb,val,key) == -1) return -1; /* 保存value */
/* Delay return if required (for testing) */
if (server.rdb_key_save_delay)
usleep(server.rdb_key_save_delay);
return 1;
}
保存类型
int rdbSaveObjectType(rio *rdb, robj *o) {
switch (o->type) {
case OBJ_STRING:
return rdbSaveType(rdb,RDB_TYPE_STRING);
case OBJ_LIST:
if (o->encoding == OBJ_ENCODING_QUICKLIST)
return rdbSaveType(rdb,RDB_TYPE_LIST_QUICKLIST);
else
serverPanic("Unknown list encoding");
case OBJ_SET:
if (o->encoding == OBJ_ENCODING_INTSET)
return rdbSaveType(rdb,RDB_TYPE_SET_INTSET);
else if (o->encoding == OBJ_ENCODING_HT)
return rdbSaveType(rdb,RDB_TYPE_SET);
else
serverPanic("Unknown set encoding");
case OBJ_ZSET:
if (o->encoding == OBJ_ENCODING_ZIPLIST)
return rdbSaveType(rdb,RDB_TYPE_ZSET_ZIPLIST);
else if (o->encoding == OBJ_ENCODING_SKIPLIST)
return rdbSaveType(rdb,RDB_TYPE_ZSET_2);
else
serverPanic("Unknown sorted set encoding");
case OBJ_HASH:
if (o->encoding == OBJ_ENCODING_ZIPLIST)
return rdbSaveType(rdb,RDB_TYPE_HASH_ZIPLIST);
else if (o->encoding == OBJ_ENCODING_HT)
return rdbSaveType(rdb,RDB_TYPE_HASH);
else
serverPanic("Unknown hash encoding");
case OBJ_STREAM:
return rdbSaveType(rdb,RDB_TYPE_STREAM_LISTPACKS);
case OBJ_MODULE:
return rdbSaveType(rdb,RDB_TYPE_MODULE_2);
default:
serverPanic("Unknown object type");
}
return -1; /* avoid warning */
}
这个函数把OBJ_xx转换成了RDB_TYPE,那么在解析rdb的时候需要逆向操作,具体的映射如下:
保存key
key是以字符串的形式保存,其包含两个字段,length和string
其中length是不定长编码,用高2位区分编码规则,规则如下:
- 00xxxxxx—— 高两位为0,则后6位表示长度,长度<64
- 01xxxxxx xxxxxxxx——高两位是01,后14位表示长度,长度<16384
- 10000000 xxxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx——高两位是10,忽略当前字节,后4字节表示长度,长度<UINT32_MAX
- 10000001 xxxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx——高两位是10,且最后一位是1,忽略当前字节,后8字节表示长度,长度<UIN64_MAX
在rdb对字符串还有进一步的压缩——尝试用整数表示
其中也是用type的高2位固定是11,用低2位表示不同编码
- 11000000 xxxxxxxx——低两位是00,后1字节表示UINT8整数
- 11000001 xxxxxxxx xxxxxxxx——低2位是01,后2字节表示UINT16整数
- 11000010 xxxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx——低两位是10,后4字节表示UINT32
由此可见,在解析的时候先判断高两位,如果是11,则说明是整数编码,否则是字符串编码
/* Saves an encoded length. The first two bits in the first byte are used to
* hold the encoding type. See the RDB_* definitions for more information
* on the types of encoding. */
int rdbSaveLen(rio *rdb, uint64_t len) {
unsigned char buf[2];
size_t nwritten;
if (len < (1<<6)) { /* 长度<64, 用后6位表示 */
/* Save a 6 bit len */
buf[0] = (len&0xFF)|(RDB_6BITLEN<<6);
if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
nwritten = 1;
} else if (len < (1<<14)) { /* 长度<16384, 当前字节的6位+后一字节,共14位表示*/
/* Save a 14 bit len */
buf[0] = ((len>>8)&0xFF)|(RDB_14BITLEN<<6);
buf[1] = len&0xFF;
if (rdbWriteRaw(rdb,buf,2) == -1) return -1;
nwritten = 2;
} else if (len <= UINT32_MAX) { /* 跳过当前字节,后4字节表示*/
/* Save a 32 bit len */
buf[0] = RDB_32BITLEN;
if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
uint32_t len32 = htonl(len);
if (rdbWriteRaw(rdb,&len32,4) == -1) return -1;
nwritten = 1+4;
} else {
/* Save a 64 bit len */
buf[0] = RDB_64BITLEN;
if (rdbWriteRaw(rdb,buf,1) == -1) return -1;
len = htonu64(len);
if (rdbWriteRaw(rdb,&len,8) == -1) return -1;
nwritten = 1+8;
}
return nwritten;
}
ssize_t rdbSaveStringObject(rio *rdb, robj *obj) {
/* Avoid to decode the object, then encode it again, if the
* object is already integer encoded. */
if (obj->encoding == OBJ_ENCODING_INT) {
return rdbSaveLongLongAsStringObject(rdb,(long)obj->ptr);
} else {
serverAssertWithInfo(NULL,obj,sdsEncodedObject(obj));
return rdbSaveRawString(rdb,obj->ptr,sdslen(obj->ptr));
}
}
ssize_t rdbSaveRawString(rio *rdb, unsigned char *s, size_t len) {
int enclen;
ssize_t n, nwritten = 0;
/* Try integer encoding */
if (len <= 11) {
unsigned char buf[5];
if ((enclen = rdbTryIntegerEncoding((char*)s,len,buf)) > 0) {
if (rdbWriteRaw(rdb,buf,enclen) == -1) return -1;
return enclen;
}
}
/* Try LZF compression - under 20 bytes it's unable to compress even
* aaaaaaaaaaaaaaaaaa so skip it */
if (server.rdb_compression && len > 20) {
n = rdbSaveLzfStringObject(rdb,s,len);
if (n == -1) return -1;
if (n > 0) return n;
/* Return value of 0 means data can't be compressed, save the old way */
}
/* Store verbatim(逐字) */
if ((n = rdbSaveLen(rdb,len)) == -1) return -1;
nwritten += n;
if (len > 0) {
if (rdbWriteRaw(rdb,s,len) == -1) return -1;
nwritten += len;
}
return nwritten;
}
保存value
ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key) {
ssize_t n = 0, nwritten = 0;
if (o->type == OBJ_STRING) {
/* Save a string value */
if ((n = rdbSaveStringObject(rdb,o)) == -1) return -1;
nwritten += n;
} else if (o->type == OBJ_LIST) {
/* Save a list value */
if (o->encoding == OBJ_ENCODING_QUICKLIST) {
quicklist *ql = o->ptr;
quicklistNode *node = ql->head;
if ((n = rdbSaveLen(rdb,ql->len)) == -1) return -1;
nwritten += n;
while(node) {
if (quicklistNodeIsCompressed(node)) {
void *data;
size_t compress_len = quicklistGetLzf(node, &data);
if ((n = rdbSaveLzfBlob(rdb,data,compress_len,node->sz)) == -1) return -1;
nwritten += n;
} else {
if ((n = rdbSaveRawString(rdb,node->zl,node->sz)) == -1) return -1;
nwritten += n;
}
node = node->next;
}
} else {
serverPanic("Unknown list encoding");
}
}
/////////////// 后面还有很多其他类型的 /////////////////////////
}
上一篇: Redis的RESP协议
下一篇: MD5、对称加密、非对称加密