您现在的位置是: 网站首页 > 程序设计  > redis 

redis源码分析——8、RDB持久化

2021年12月15日 00:08 2939人围观

简介redis是纯内存数据库,一旦重启数据就会丢失,这里先介绍redis的第一种持久化方式——RDB

一、RDB触发命令

触发生成rdb的命令有savebgsave两种,他们有一下特性

  • save是阻塞式的,当执行save时客户端发送的命令都会被拒绝。
  • bgsave是通过子进程执行的,执行bgsave的时候仍然可以处理客户端请求,但是在执行bgsave的时候不能再次执行savebgsave
  • bgsavefork创建子进程,那么就存在COW机制,假如在极端情况下,执行bgsave的同时所有键的value都被修改,那么此时系统的使用内存就会翻倍,很可能触发OOM,因此在运行redis的机器上至少需要保留一半的空闲内存。
  • 不管save还是bgsave,他们保存的都是当前的数据T0(SNAPSHOTTING),假设生成rdb文件耗时为n,那么在rdb文件中不包含这n时间内的新增数据。

二、RDB相关配置

定时生成rdb,可以定时强制生成rdb,也可以按照更新key的频率来生成rdb。

save <seconds> <changes> 

save 900 1   # 900s内至少改变了一个key 
save 300 10     # 300s内至少改变了10个key 
save 60 10000   # 60s内至少改变了10000个key 
save 5 0    # 每5s保存一次 

异常时禁写,假设磁盘问题导致生成rdb失败,此时可以开启禁止写命令来减防止数据丢失的可能

stop-writes-on-bgsave-error yes 

字符串使用LZF压缩,可以对string压缩存储,生成的rdb文件更小,但是会消耗更多的cpu资源

rdbcompression yes 

数据校验,在5以后在rdb文件末尾会加校验和,可以有效防止数据损坏,但是在生成和加载rdb的时候会有10%的性能损耗

rdbchecksum yes 

rdb文件名

dbfilename dump.rdb 

删除rdb,redis通过rdb来进行全量同步,如果没有开启持久化,在同步完成后是否需要删除rdb

rdb-del-sync-files no 

存储路径

dir ./ 

三、触发流程源码分析

saveparam结构,saveparam对应这配置文件中的save选项

struct saveparam { 
    time_t seconds; 
    int changes; 
}; 

rdb相关变量,在主结构redisServer中定义了rdb相关的变量

struct redisServer { 
    ... 
    /* RDB persistence */ 
    long long dirty;                /* Changes to DB from the last save */ 
    long long dirty_before_bgsave;  /* Used to restore dirty on failed BGSAVE */ 
    pid_t rdb_child_pid;            /* PID of RDB saving child */ 
    struct saveparam *saveparams;   /* Save points array for RDB */ 
    int saveparamslen;              /* Number of saving points */ 
    char *rdb_filename;             /* Name of RDB file */ 
    int rdb_compression;            /* Use compression in RDB? */ 
    int rdb_checksum;               /* Use RDB checksum? */ 
    int rdb_del_sync_files;         /* Remove RDB files used only for SYNC if 
                                       the instance does not use persistence. */ 
    time_t lastsave;                /* Unix time of last successful save */ 
    time_t lastbgsave_try;          /* Unix time of last attempted bgsave */ 
    time_t rdb_save_time_last;      /* Time used by last RDB save run. */ 
    time_t rdb_save_time_start;     /* Current RDB save start time. */ 
    int rdb_bgsave_scheduled;       /* BGSAVE when possible if true. */ 
    int rdb_child_type;             /* Type of save by active child. */ 
    int lastbgsave_status;          /* C_OK or C_ERR */ 
    int stop_writes_on_bgsave_err;  /* Don't allow writes if can't BGSAVE */ 
    ... 
} 

定时触发,redis只有一个定时器serverCron,所有的定时任务都在这个定时器中执行

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { 
    ... 
    /* Check if a background saving or AOF rewrite in progress terminated. */ 
    if (hasActiveChildProcess() || ldbPendingChildren()) 
    { 
        checkChildrenDone(); 
    } else { 
        /* If there is not a background saving/rewrite in progress check if 
         * we have to save/rewrite now. */ 
        for (j = 0; j < server.saveparamslen; j++) { 
            struct saveparam *sp = server.saveparams+j; 

            /* Save if we reached the given amount of changes, 
             * the given amount of seconds, and if the latest bgsave was 
             * successful or if, in case of an error, at least 
             * CONFIG_BGSAVE_RETRY_DELAY seconds already elapsed. */ 
            if (server.dirty >= sp->changes && 
                server.unixtime-server.lastsave > sp->seconds && 
                (server.unixtime-server.lastbgsave_try > 
                 CONFIG_BGSAVE_RETRY_DELAY || 
                 server.lastbgsave_status == C_OK)) 
            { 
                serverLog(LL_NOTICE,"%d changes in %d seconds. Saving...", 
                    sp->changes, (int)sp->seconds); 
                rdbSaveInfo rsi, *rsiptr; 
                rsiptr = rdbPopulateSaveInfo(&rsi); 
                rdbSaveBackground(server.rdb_filename,rsiptr); 
                break; 
            } 
        } 
        /* Trigger an AOF rewrite if needed. */ 
        if (server.aof_state == AOF_ON && 
            !hasActiveChildProcess() && 
            server.aof_rewrite_perc && 
            server.aof_current_size > server.aof_rewrite_min_size) 
        { 
            long long base = server.aof_rewrite_base_size ? 
                server.aof_rewrite_base_size : 1; 
            long long growth = (server.aof_current_size*100/base) - 100; 
            if (growth >= server.aof_rewrite_perc) { 
                serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth); 
                rewriteAppendOnlyFileBackground(); 
            } 
        } 
    ... 
} 

保存rdb

int rdbSaveRio(rio *rdb, int *error, int rdbflags, rdbSaveInfo *rsi) { 
    dictIterator *di = NULL; 
    dictEntry *de; 
    char magic[10]; 
    int j; 
    uint64_t cksum; 
    size_t processed = 0; 

    if (server.rdb_checksum) 
        rdb->update_cksum = rioGenericUpdateChecksum; 
    snprintf(magic,sizeof(magic),"REDIS%04d",RDB_VERSION); 
    if (rdbWriteRaw(rdb,magic,9) == -1) goto werr; 
    if (rdbSaveInfoAuxFields(rdb,rdbflags,rsi) == -1) goto werr; 
    if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_BEFORE_RDB) == -1) goto werr; 

    for (j = 0; j < server.dbnum; j++) { 
        redisDb *db = server.db+j; 
        dict *d = db->dict; 
        if (dictSize(d) == 0) continue; 
        di = dictGetSafeIterator(d); 

        /* Write the SELECT DB opcode */ 
        if (rdbSaveType(rdb,RDB_OPCODE_SELECTDB) == -1) goto werr; 
        if (rdbSaveLen(rdb,j) == -1) goto werr; 

        /* Write the RESIZE DB opcode. */ 
        uint64_t db_size, expires_size; 
        db_size = dictSize(db->dict); 
        expires_size = dictSize(db->expires); 
        if (rdbSaveType(rdb,RDB_OPCODE_RESIZEDB) == -1) goto werr; 
        if (rdbSaveLen(rdb,db_size) == -1) goto werr; 
        if (rdbSaveLen(rdb,expires_size) == -1) goto werr; 

        /* Iterate this DB writing every entry */ 
        while((de = dictNext(di)) != NULL) { 
            sds keystr = dictGetKey(de); 
            robj key, *o = dictGetVal(de); 
            long long expire; 

            initStaticStringObject(key,keystr); 
            expire = getExpire(db,&key); 
            if (rdbSaveKeyValuePair(rdb,&key,o,expire) == -1) goto werr; 

            /* When this RDB is produced as part of an AOF rewrite, move 
             * accumulated diff from parent to child while rewriting in 
             * order to have a smaller final write. */ 
            if (rdbflags & RDBFLAGS_AOF_PREAMBLE && 
                rdb->processed_bytes > processed+AOF_READ_DIFF_INTERVAL_BYTES) 
            { 
                processed = rdb->processed_bytes; 
                aofReadDiffFromParent(); 
            } 
        } 
        dictReleaseIterator(di); 
        di = NULL; /* So that we don't release it again on error. */ 
    } 

    /* If we are storing the replication information on disk, persist 
     * the script cache as well: on successful PSYNC after a restart, we need 
     * to be able to process any EVALSHA inside the replication backlog the 
     * master will send us. */ 
    if (rsi && dictSize(server.lua_scripts)) { 
        di = dictGetIterator(server.lua_scripts); 
        while((de = dictNext(di)) != NULL) { 
            robj *body = dictGetVal(de); 
            if (rdbSaveAuxField(rdb,"lua",3,body->ptr,sdslen(body->ptr)) == -1) 
                goto werr; 
        } 
        dictReleaseIterator(di); 
        di = NULL; /* So that we don't release it again on error. */ 
    } 

    if (rdbSaveModulesAux(rdb, REDISMODULE_AUX_AFTER_RDB) == -1) goto werr; 

    /* EOF opcode */ 
    if (rdbSaveType(rdb,RDB_OPCODE_EOF) == -1) goto werr; 

    /* CRC64 checksum. It will be zero if checksum computation is disabled, the 
     * loading code skips the check in this case. */ 
    cksum = rdb->cksum; 
    memrev64ifbe(&cksum); 
    if (rioWrite(rdb,&cksum,8) == 0) goto werr; 
    return C_OK; 

werr: 
    if (error) *error = errno; 
    if (di) dictReleaseIterator(di); 
    return C_ERR; 
} 

四、RDB文件结构

每个版本的RDB文件布局略有不同,好在是向下兼容的,以6为例,文件布局如下:

其中开头5字节的"REDIS"是固定的,表示是redis rdb文件。

其后的4字节用来表示RDB_VERSION,RDB的版本号,不是redis的版本号

INFO_AUX是一段数组key-value,用来记录一些环境,比如生成rdb的时间、机器cpu的位数、使用内存等,加载rdb的服务可以对比来判断是否需要架子啊rdb。

MODULE_AUX也是一段数组key-value,用来记录模块信息。

常见的辅助字段如下

在数据块,都有一字节的opcode来表示字段的含义,比如opcode=254表示接下来是数据库编号;opcode=251表示键空间;opcode=252表示过期时间。

KEY_VALUE是具体的键值对,但是redis有多种数据结构,所以在每一个KEY_VALUE开头首先都是一个结构类型字段,紧接着是key,再是value。

常见的opcode定义如下:

#define RDB_OPCODE_MODULE_AUX 247   /* Module auxiliary data. */ 
#define RDB_OPCODE_IDLE       248   /* LRU idle time. */ 
#define RDB_OPCODE_FREQ       249   /* LFU frequency. */ 
#define RDB_OPCODE_AUX        250   /* RDB aux field. */ 
#define RDB_OPCODE_RESIZEDB   251   /* Hash table resize hint. */ 
#define RDB_OPCODE_EXPIRETIME_MS 252    /* Expire time in milliseconds. */ 
#define RDB_OPCODE_EXPIRETIME 253       /* Old expire time in seconds. */ 
#define RDB_OPCODE_SELECTDB   254   /* DB number of the following keys. */ 
#define RDB_OPCODE_EOF        255   /* End of the RDB file. */ 

键值对结构如下:

所以rdb文件如下:

我们知道,redis不仅有多种数据结构,而且每种结构有多种编码,所在保存类型的时候需要考虑类型和编码两个维度。

五、数据编码

/* Save a key-value pair, with expire time, type, key, value. 
 * On error -1 is returned. 
 * On success if the key was actually saved 1 is returned, otherwise 0 
 * is returned (the key was already expired). */ 
int rdbSaveKeyValuePair(rio *rdb, robj *key, robj *val, long long expiretime) { 
    int savelru = server.maxmemory_policy & MAXMEMORY_FLAG_LRU; 
    int savelfu = server.maxmemory_policy & MAXMEMORY_FLAG_LFU; 

    /* Save the expire time */ 
    if (expiretime != -1) { 
        if (rdbSaveType(rdb,RDB_OPCODE_EXPIRETIME_MS) == -1) return -1; 
        if (rdbSaveMillisecondTime(rdb,expiretime) == -1) return -1; 
    } 

    /* Save the LRU info. */ 
    if (savelru) { 
        uint64_t idletime = estimateObjectIdleTime(val); 
        idletime /= 1000; /* Using seconds is enough and requires less space.*/ 
        if (rdbSaveType(rdb,RDB_OPCODE_IDLE) == -1) return -1; 
        if (rdbSaveLen(rdb,idletime) == -1) return -1; 
    } 

    /* Save the LFU info. */ 
    if (savelfu) { 
        uint8_t buf[1]; 
        buf[0] = LFUDecrAndReturn(val); 
        /* We can encode this in exactly two bytes: the opcode and an 8 
         * bit counter, since the frequency is logarithmic with a 0-255 range. 
         * Note that we do not store the halving time because to reset it 
         * a single time when loading does not affect the frequency much. */ 
        if (rdbSaveType(rdb,RDB_OPCODE_FREQ) == -1) return -1; 
        if (rdbWriteRaw(rdb,buf,1) == -1) return -1; 
    } 

    /* Save type, key, value */ 
    if (rdbSaveObjectType(rdb,val) == -1) return -1; /* 保存类型 */ 
    if (rdbSaveStringObject(rdb,key) == -1) return -1; /* 保存key */ 
    if (rdbSaveObject(rdb,val,key) == -1) return -1; /* 保存value */ 

    /* Delay return if required (for testing) */ 
    if (server.rdb_key_save_delay) 
        usleep(server.rdb_key_save_delay); 

    return 1; 
} 

保存类型

int rdbSaveObjectType(rio *rdb, robj *o) { 
    switch (o->type) { 
    case OBJ_STRING: 
        return rdbSaveType(rdb,RDB_TYPE_STRING); 
    case OBJ_LIST: 
        if (o->encoding == OBJ_ENCODING_QUICKLIST) 
            return rdbSaveType(rdb,RDB_TYPE_LIST_QUICKLIST); 
        else 
            serverPanic("Unknown list encoding"); 
    case OBJ_SET: 
        if (o->encoding == OBJ_ENCODING_INTSET) 
            return rdbSaveType(rdb,RDB_TYPE_SET_INTSET); 
        else if (o->encoding == OBJ_ENCODING_HT) 
            return rdbSaveType(rdb,RDB_TYPE_SET); 
        else 
            serverPanic("Unknown set encoding"); 
    case OBJ_ZSET: 
        if (o->encoding == OBJ_ENCODING_ZIPLIST) 
            return rdbSaveType(rdb,RDB_TYPE_ZSET_ZIPLIST); 
        else if (o->encoding == OBJ_ENCODING_SKIPLIST) 
            return rdbSaveType(rdb,RDB_TYPE_ZSET_2); 
        else 
            serverPanic("Unknown sorted set encoding"); 
    case OBJ_HASH: 
        if (o->encoding == OBJ_ENCODING_ZIPLIST) 
            return rdbSaveType(rdb,RDB_TYPE_HASH_ZIPLIST); 
        else if (o->encoding == OBJ_ENCODING_HT) 
            return rdbSaveType(rdb,RDB_TYPE_HASH); 
        else 
            serverPanic("Unknown hash encoding"); 
    case OBJ_STREAM: 
        return rdbSaveType(rdb,RDB_TYPE_STREAM_LISTPACKS); 
    case OBJ_MODULE: 
        return rdbSaveType(rdb,RDB_TYPE_MODULE_2); 
    default: 
        serverPanic("Unknown object type"); 
    } 
    return -1; /* avoid warning */ 
} 

这个函数把OBJ_xx转换成了RDB_TYPE,那么在解析rdb的时候需要逆向操作,具体的映射如下:

保存key

key是以字符串的形式保存,其包含两个字段,length和string

其中length是不定长编码,用高2位区分编码规则,规则如下:

  • 00xxxxxx—— 高两位为0,则后6位表示长度,长度<64
  • 01xxxxxx xxxxxxxx——高两位是01,后14位表示长度,长度<16384
  • 10000000 xxxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx——高两位是10,忽略当前字节,后4字节表示长度,长度<UINT32_MAX
  • 10000001 xxxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx——高两位是10,且最后一位是1,忽略当前字节,后8字节表示长度,长度<UIN64_MAX

在rdb对字符串还有进一步的压缩——尝试用整数表示

其中也是用type的高2位固定是11,用低2位表示不同编码

  • 11000000 xxxxxxxx——低两位是00,后1字节表示UINT8整数
  • 11000001 xxxxxxxx xxxxxxxx——低2位是01,后2字节表示UINT16整数
  • 11000010 xxxxxxxx xxxxxxxx xxxxxxxx xxxxxxxx——低两位是10,后4字节表示UINT32

由此可见,在解析的时候先判断高两位,如果是11,则说明是整数编码,否则是字符串编码

/* Saves an encoded length. The first two bits in the first byte are used to 
 * hold the encoding type. See the RDB_* definitions for more information 
 * on the types of encoding. */ 
int rdbSaveLen(rio *rdb, uint64_t len) { 
    unsigned char buf[2]; 
    size_t nwritten; 

    if (len < (1<<6)) {  /* 长度<64, 用后6位表示 */ 
        /* Save a 6 bit len */ 
        buf[0] = (len&0xFF)|(RDB_6BITLEN<<6); 
        if (rdbWriteRaw(rdb,buf,1) == -1) return -1; 
        nwritten = 1; 
    } else if (len < (1<<14)) { /* 长度<16384, 当前字节的6位+后一字节,共14位表示*/ 
        /* Save a 14 bit len */ 
        buf[0] = ((len>>8)&0xFF)|(RDB_14BITLEN<<6); 
        buf[1] = len&0xFF; 
        if (rdbWriteRaw(rdb,buf,2) == -1) return -1; 
        nwritten = 2; 
    } else if (len <= UINT32_MAX) { /* 跳过当前字节,后4字节表示*/ 
        /* Save a 32 bit len */ 
        buf[0] = RDB_32BITLEN; 
        if (rdbWriteRaw(rdb,buf,1) == -1) return -1; 
        uint32_t len32 = htonl(len); 
        if (rdbWriteRaw(rdb,&len32,4) == -1) return -1; 
        nwritten = 1+4; 
    } else { 
        /* Save a 64 bit len */ 
        buf[0] = RDB_64BITLEN; 
        if (rdbWriteRaw(rdb,buf,1) == -1) return -1; 
        len = htonu64(len); 
        if (rdbWriteRaw(rdb,&len,8) == -1) return -1; 
        nwritten = 1+8; 
    } 
    return nwritten; 
} 

ssize_t rdbSaveStringObject(rio *rdb, robj *obj) { 
    /* Avoid to decode the object, then encode it again, if the 
     * object is already integer encoded. */ 
    if (obj->encoding == OBJ_ENCODING_INT) { 
        return rdbSaveLongLongAsStringObject(rdb,(long)obj->ptr); 
    } else { 
        serverAssertWithInfo(NULL,obj,sdsEncodedObject(obj)); 
        return rdbSaveRawString(rdb,obj->ptr,sdslen(obj->ptr)); 
    } 
} 

ssize_t rdbSaveRawString(rio *rdb, unsigned char *s, size_t len) { 
    int enclen; 
    ssize_t n, nwritten = 0; 

    /* Try integer encoding */ 
    if (len <= 11) { 
        unsigned char buf[5]; 
        if ((enclen = rdbTryIntegerEncoding((char*)s,len,buf)) > 0) { 
            if (rdbWriteRaw(rdb,buf,enclen) == -1) return -1; 
            return enclen; 
        } 
    } 

    /* Try LZF compression - under 20 bytes it's unable to compress even 
     * aaaaaaaaaaaaaaaaaa so skip it */ 
    if (server.rdb_compression && len > 20) { 
        n = rdbSaveLzfStringObject(rdb,s,len); 
        if (n == -1) return -1; 
        if (n > 0) return n; 
        /* Return value of 0 means data can't be compressed, save the old way */ 
    } 

    /* Store verbatim(逐字) */ 
    if ((n = rdbSaveLen(rdb,len)) == -1) return -1; 
    nwritten += n; 
    if (len > 0) { 
        if (rdbWriteRaw(rdb,s,len) == -1) return -1; 
        nwritten += len; 
    } 
    return nwritten; 
} 

保存value

ssize_t rdbSaveObject(rio *rdb, robj *o, robj *key) { 
    ssize_t n = 0, nwritten = 0; 

    if (o->type == OBJ_STRING) { 
        /* Save a string value */ 
        if ((n = rdbSaveStringObject(rdb,o)) == -1) return -1; 
        nwritten += n; 
    } else if (o->type == OBJ_LIST) { 
        /* Save a list value */ 
        if (o->encoding == OBJ_ENCODING_QUICKLIST) { 
            quicklist *ql = o->ptr; 
            quicklistNode *node = ql->head; 

            if ((n = rdbSaveLen(rdb,ql->len)) == -1) return -1; 
            nwritten += n; 

            while(node) { 
                if (quicklistNodeIsCompressed(node)) { 
                    void *data; 
                    size_t compress_len = quicklistGetLzf(node, &data); 
                    if ((n = rdbSaveLzfBlob(rdb,data,compress_len,node->sz)) == -1) return -1; 
                    nwritten += n; 
                } else { 
                    if ((n = rdbSaveRawString(rdb,node->zl,node->sz)) == -1) return -1; 
                    nwritten += n; 
                } 
                node = node->next; 
            } 
        } else { 
            serverPanic("Unknown list encoding"); 
        } 
    } 
    /////////////// 后面还有很多其他类型的 ///////////////////////// 
}