您现在的位置是: 网站首页 > 程序设计  > redis 

redis源码分析——9、AOF持久化

2021年12月25日 01:02 3999人围观

简介除了RDB外,redis还提供了AOF(Append Only File)方式的持久化功能。与RDB不同是,AOF记录的所有写命令的流水,通过命令重放来恢复数据。

除了RDB外,redis还提供了AOF(Append Only File)方式的持久化功能。与RDB不同是,AOF记录的所有写命令的流水,通过命令重放来恢复数据。

思考:当redis接收到一条写命令后,执行写命令、回复接口、写AOF 这三者的顺序是怎样的

一、开启AOF

只需要在配置文件中打开appendonly就可以开启aof,默认文件名是"appendonly.aof"

appendonly yes 
appendfilename "appendonly.aof" 

二、AOF相关配置

除了上面两个最基本的配置外,还有些和AOF相关的配置。

2.1 appendfsync

我们知道,在向磁盘写文件时并不是直接写磁盘的,而是先写到的OS的buffer区,等到满足触发条件时,OS再将buffer中的数据刷到磁盘。这么做主要是为了提高效率。但是这也就导致,在我们调用调用write函数的时候,数据只是写到了buffer中,不一定刷到磁盘。假如write后机器宕机,那么这部分数据就会丢失。系统提供了fsync函数来强制刷磁盘,redis中提供了刷磁盘的频率选项。

# appendfsync always  # 每写一条,刷一次盘,数据丢失最少 
appendfsync everysec  # 每隔1s刷一次盘,数据最多丢失1s 
# appendfsync no      # 有OS刷盘,效率最高,丢失数据最多 

2.2 no-appendfsync-on-rewrite

在AOF重写的过程中,命令还是会被记录到旧的AOF文件的,也就是此时系统有2个磁盘IO——重写的IO和记录binlog的IO。这两个IO同时操作磁盘会引起资源竞争,而在记录binlog的是主进程,还要提供服务,如果磁盘阻塞了,相当于是主进程阻塞了,会影响大正常服务。

no-appendfsync-on-rewrite no 

2.4 aof-load-truncated

truncated(缩减的,删节的,截短了的)

我们知道在写文件时是先写到buffer,再刷到磁盘的,那么就会出现最后一条不完整的情况,比如磁盘写了一般的时候宕机了。那么此时当redis重启加载AOF文件遇到最后一行的时候该不该报错呢,这就是aof-load-truncated的作用

aof-load-truncated yes 

2.5 aof-use-rdb-preamble

preamble(序言; 绪论; 导言; 前言; 开场白)

直接翻译就是“aof前面加rdb”,其实这是开启混合模式,在AOF重写的是先快速生成RDB,再讲这段时间的命令追加到RDB后面

# When rewriting the AOF file, Redis is able to use an RDB preamble in the 
# AOF file for faster rewrites and recoveries. When this option is turned 
# on the rewritten AOF file is composed of two different stanzas: 
# 
#   [RDB file][AOF tail] 
# 
# When loading Redis recognizes that the AOF file starts with the "REDIS" 
# string and loads the prefixed RDB file, and continues loading the AOF 
# tail. 
aof-use-rdb-preamble yes 

2.6 aof-rewrite-incremental-fsync

AOF重写时每次每批写入磁盘的数据大小是32mb,避免单次刷盘数据过多造成磁盘阻塞

# When a child rewrites the AOF file, if the following option is enabled 
# the file will be fsync-ed every 32 MB of data generated. This is useful 
# in order to commit the file to the disk more incrementally and avoid 
# big latency spikes. 
aof-rewrite-incremental-fsync yes 

三、AOF

3.1 AOF原理

AOF的原理相对简单,就是把写命令记录到文件。

AOF保存的是一条条的指令,在数据恢复的时候只要重放这些指令就可以了,在理想情况下,AOF可以做到宕机时只丢失一条数据,不过这样对性能损失很大

AOF持久化的时候并不是把命令直接写磁盘的,而是先将这些命令记录在全局的aof_buf里面,然后按设置条件writefsync到磁盘

3.2 AOF过程

a. 服务器执行完命令后把对键空间改动的命令propagate到aofAppend流程中

b. 把命令保存到aof_buf中,aof_buf是一个全局的sds,如果此时有子进程在重写AOF,则还需要保存到重写buffer

c. 主事件循环的beforeSleep中调用flushAppendOnlyFile

d. 将aof_buf中的数据通过write写到aof文件

e. 根据刷新规则(always、everysec、no)将数据fsync到磁盘

四、AOF重写

AOF是以流水日志的形式保存命令的,随着写(修改)的次数增多,这个文件会越来越来大,哪怕只有1个key,我们对这个key执行1000次set时也会产生1000条记录,整个文件膨胀很大。

4.1 AOF重写的触发条件

通过rewriteaof可以手动触发AOF重写,也可以用文件大小来触发AOF重写

auto-aof-rewrite-percentage 100  # 当文件超过上次重写的100% 
auto-aof-rewrite-min-size 64mb   # 文件达到64mb时重写,这个值只在第一次重写有效,即第一次重写后就会根据auto-aof-rewrite-percentage来判断重写了 

AOF重写触发条件有2个,但实际情况是——当服务从来没有AOF重写时,只要文件超过64mb就会触发重写,自此以后,如果新的aof文件大小超过了初始时的2倍,则触发重写

4.2 AOF重写原理

重写这个词语有点歧义,不了解的可能会认为是对原aof分析压缩得到新aof文件,其实redis中的AOF重写完全不是这样的,redis中的AOF重写和原aof文件没有任何关系。

在AOF重写的时候redis是直接fork了一个子进程,然后把键空间中的每一个key根据当前值生成一条写命令就可以了。因此在AOF重写的过程中系统可能会存在两个aof文件——老的aof文件和正在重写的aof文件,当重写完成后直接用重写后的aof覆盖旧的aof。

值得注意的是在实际重写过程中,对于一些数据结构(比如list、set、zset等),如果元素较多,则会把写入拆分成若干条子命令,每条命令的item不超过AOF_REWRITE_ITEMS_PER_CMD(默认64),这么做主要是避免恢复数据的时候客户端输入缓冲区溢出

4.3 AOF重写过程

前面提到AOF重写是通过子进程来的。

a. 触发服务AOF重写

b. 服务(主进程)fork子进程(前提是当前没有子进程在重写)

c. 父进程把此后的写命令记录到buffer(旧的aof还是会写)

d. 子进程创建一个temp-rewriteaof-bg-子进程id.aof文件用来记录重写后的aof

e. 子进程给每一个key生成一条写命令

f. 子进程通过"!"消息通知父进程,告诉父进程键空间重写完成

g. 子进程等待接收父进程缓存在buffer中的写命令,并追加到重写的aof文件

h. fsync刷新磁盘、重命名

五、源码分析

5.1 全局定义

和AOF相关的定义都在redisServer结构体里面

struct redisServer { 
    /* AOF persistence */ 
    int aof_enabled;                /* AOF configuration */ 
    int aof_state;                  /* AOF_(ON|OFF|WAIT_REWRITE) */ 
    int aof_fsync;                  /* Kind of fsync() policy */ 
    char *aof_filename;             /* Name of the AOF file */ 
    int aof_no_fsync_on_rewrite;    /* Don't fsync if a rewrite is in prog. */ 
    int aof_rewrite_perc;           /* Rewrite AOF if % growth is > M and... */ 
    off_t aof_rewrite_min_size;     /* the AOF file is at least N bytes. */ 
    off_t aof_rewrite_base_size;    /* AOF size on latest startup or rewrite. */ 
    off_t aof_current_size;         /* AOF current size. */ 
    off_t aof_fsync_offset;         /* AOF offset which is already synced to disk. */ 
    int aof_flush_sleep;            /* Micros to sleep before flush. (used by tests) */ 
    int aof_rewrite_scheduled;      /* Rewrite once BGSAVE terminates. */ 
    pid_t aof_child_pid;            /* PID if rewriting process */ 
    list *aof_rewrite_buf_blocks;   /* Hold changes during an AOF rewrite. 重写时主进程用来缓存diff的地方 */ 
    sds aof_buf;      /* AOF buffer, written before entering the event loop, 所有的命令会先记录到aof_buf缓存 */ 
    int aof_fd;       /* File descriptor of currently selected AOF file */ 
    int aof_selected_db; /* Currently selected DB in AOF,当前记录的db,如果切换db,则aof中增加一条select指令 */ 
    time_t aof_flush_postponed_start; /* UNIX time of postponed AOF flush */ 
    time_t aof_last_fsync;            /* UNIX time of last fsync() */ 
    time_t aof_rewrite_time_last;   /* Time used by last AOF rewrite run. */ 
    time_t aof_rewrite_time_start;  /* Current AOF rewrite start time. */ 
    int aof_lastbgrewrite_status;   /* C_OK or C_ERR */ 
    unsigned long aof_delayed_fsync;  /* delayed AOF fsync() counter */ 
    int aof_rewrite_incremental_fsync;/* fsync incrementally while aof rewriting? */ 
    int rdb_save_incremental_fsync;   /* fsync incrementally while rdb saving? */ 
    int aof_last_write_status;      /* C_OK or C_ERR */ 
    int aof_last_write_errno;       /* Valid if aof_last_write_status is ERR */ 
    int aof_load_truncated;         /* Don't stop on unexpected AOF EOF. */ 
    int aof_use_rdb_preamble;       /* Use RDB preamble on AOF rewrites. */ 
    /* AOF pipes used to communicate between parent and child during rewrite. */ 
    int aof_pipe_write_data_to_child; 
    int aof_pipe_read_data_from_parent; 
    int aof_pipe_write_ack_to_parent;   /* 这6个管道id用来父子进程直接通信 */ 
    int aof_pipe_read_ack_from_child; 
    int aof_pipe_write_ack_to_child; 
    int aof_pipe_read_ack_from_parent; 
    int aof_stop_sending_diff;     /* If true stop sending accumulated diffs 
                                      to child process. */ 
    sds aof_child_diff;             /* AOF diff accumulator child side. */ 
} 

5.2 AOF写入

所有的命令都会进入到processCommand函数,而该函数对执行命令又调用了一个call函数

int processCommand(client *c) { 
    /***************** 一些其他命令 ************************/ 
    /* Exec the command */ 
    if (c->flags & CLIENT_MULTI && 
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand && 
        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand) 
    { 
        queueMultiCommand(c); 
        addReply(c,shared.queued); 
    } else { 
        call(c,CMD_CALL_FULL);  /* 进入真正的执行函数 */ 
        c->woff = server.master_repl_offset; 
        if (listLength(server.ready_keys)) 
            handleClientsBlockedOnKeys(); 
    } 
    return C_OK; 
} 

call函数的主流程如下:

void call(client *c, int flags) { 
    /* Call the command. */ 
    c->cmd->proc(c);  /* 调用命令实现函数 */ 

    /* Propagate the command into the AOF and replication link */ 
    if (flags & CMD_CALL_PROPAGATE && 
        (c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP) 
    { 
        int propagate_flags = PROPAGATE_NONE; 

        /* Check if the command operated changes in the data set. If so 
         * set for replication / AOF propagation. */ 
        if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL);  /* 此次操作引起数据改变 */ 

        /* If the client forced AOF / replication of the command, set 
         * the flags regardless of the command effects on the data set. */ 
        if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL; 
        if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF; 

        /* However prevent AOF / replication propagation if the command 
         * implementations called preventCommandPropagation() or similar, 
         * or if we don't have the call() flags to do so. */ 
        if (c->flags & CLIENT_PREVENT_REPL_PROP || 
            !(flags & CMD_CALL_PROPAGATE_REPL)) 
                propagate_flags &= ~PROPAGATE_REPL; 
        if (c->flags & CLIENT_PREVENT_AOF_PROP || 
            !(flags & CMD_CALL_PROPAGATE_AOF)) 
                propagate_flags &= ~PROPAGATE_AOF; 

        /* Call propagate() only if at least one of AOF / replication 
         * propagation is needed. Note that modules commands handle replication 
         * in an explicit way, so we never replicate them automatically. */ 
        if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE)) 
            propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags); /* 如果开启了aof或者复制,则将命令传播*/ 
    } 
} 

propagate:传播; 宣传; 繁殖; 增殖

propagate函数调用AOF的接口将命令记录到aof_buf缓冲中

/* Propagate the specified command (in the context of the specified database id) 
 * to AOF and Slaves. 
 * 
 * flags are an xor between: 
 * + PROPAGATE_NONE (no propagation of command at all) 
 * + PROPAGATE_AOF (propagate into the AOF file if is enabled) 
 * + PROPAGATE_REPL (propagate into the replication link) 
 * 
 * This should not be used inside commands implementation since it will not 
 * wrap the resulting commands in MULTI/EXEC. Use instead alsoPropagate(), 
 * preventCommandPropagation(), forceCommandPropagation(). 
 * 
 * However for functions that need to (also) propagate out of the context of a 
 * command execution, for example when serving a blocked client, you 
 * want to use propagate(). 
 */ 
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, 
               int flags) 
{ 
    if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF) /* aof */ 
        feedAppendOnlyFile(cmd,dbid,argv,argc); 
    if (flags & PROPAGATE_REPL) /* 主从复制 */ 
        replicationFeedSlaves(server.slaves,dbid,argv,argc); 
} 

feedAppendOnlyFile将命令保存到aof_buf中

void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) { 
    /* Append to the AOF buffer. This will be flushed on disk just before 
     * of re-entering the event loop, so before the client will get a 
     * positive reply about the operation performed. */ 
    if (server.aof_state == AOF_ON) 
        server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf)); /* append到aof_buf里面 */ 

    /* If a background append only file rewriting is in progress we want to 
     * accumulate the differences between the child DB and the current one 
     * in a buffer, so that when the child process will do its work we 
     * can append the differences to the new append only file. */ 
    if (server.aof_child_pid != -1) 
        aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf)); 
        /* 如果正在执行aof重写,需要记录命令到buffer,该函数里面还注册了父进程发送diff的回调事件,通过aof_pipe_write_data_to_child管道发送 */ 
} 

在服务的主循环中,每次事件触发之前都会调beforesleep回调

void aeMain(aeEventLoop *eventLoop) { 
    eventLoop->stop = 0; 
    while (!eventLoop->stop) { 
        if (eventLoop->beforesleep != NULL) 
            eventLoop->beforesleep(eventLoop); 
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); 
    } 
} 

beforesleep中再次调用flushAppendOnlyFile

/* This function gets called every time Redis is entering the 
 * main loop of the event driven library, that is, before to sleep 
 * for ready file descriptors. */ 
void beforeSleep(struct aeEventLoop *eventLoop) {     
    /* Write the AOF buffer on disk */ 
    flushAppendOnlyFile(0); 
} 

flushAppendOnlyFileaof_buf的数据写到OS缓冲区,根据配置择机fsync到磁盘

flushAppendOnlyFile函数很值得细看,里面涉及到IO

/* Write the append only file buffer on disk. 将数据刷到磁盘 
 * 
 * Since we are required to write the AOF before replying to the client, 
 * 这里说写aof是在回复客户端之前 
 * and the only way the client socket can get a write is entering when the 
 * 客户端的socket只能在event loop中触发可写事件 
 * the event loop, we accumulate(积累) all the AOF writes in a memory 
 * buffer and write it on disk using this function just before entering 
 * the event loop again. 
 * 我们把所有的AOF写命令都积累到内存的buffer中,在进入eventloop前先用本函数将buffer中的数据写到磁盘 
 * 
 * About the 'force' argument: 
 * 
 * When the fsync policy is set to 'everysec' we may delay the flush if there 
 * is still an fsync() going on in the background thread, since for instance 
 * on Linux write(2) will be blocked by the background fsync anyway. 
 * When this happens we remember that there is some aof buffer to be 
 * flushed ASAP, and will try to do that in the serverCron() function. 
 * 
 * However if force is set to 1 we'll write regardless of the background 
 * fsync. */ 
#define AOF_WRITE_LOG_ERROR_RATE 30 /* Seconds between errors logging. */ 
void flushAppendOnlyFile(int force) { 
    ssize_t nwritten; 
    int sync_in_progress = 0; 
    mstime_t latency; 

    if (sdslen(server.aof_buf) == 0) { 
        /* Check if we need to do fsync even the aof buffer is empty, 
         * because previously in AOF_FSYNC_EVERYSEC mode, fsync is 
         * called only when aof buffer is not empty, so if users 
         * stop write commands before fsync called in one second, 
         * the data in page cache cannot be flushed in time.  
         * 即使aof_buf中没有数据,也要检查是否需要fsync 
         */ 
        if (server.aof_fsync == AOF_FSYNC_EVERYSEC && /* 每秒刷一次 */ 
            server.aof_fsync_offset != server.aof_current_size && /* 当前aof_buf里还有未fsync的 */ 
            server.unixtime > server.aof_last_fsync && /* 时间超过1s */ 
            !(sync_in_progress = aofFsyncInProgress())) { /* aofFsyncInProgress 里面是一个mutex锁 */ 
            goto try_fsync; 
        } else { 
            return; 
        } 
    } 

    if (server.aof_fsync == AOF_FSYNC_EVERYSEC) 
        sync_in_progress = aofFsyncInProgress(); 

    if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) { 
        /* With this append fsync policy we do background fsyncing. 
         * If the fsync is still in progress we can try to delay 
         * the write for a couple of seconds. */ 
        if (sync_in_progress) { 
            if (server.aof_flush_postponed_start == 0) { /* postponed:延期*/ 
                /* No previous write postponing, remember that we are 
                 * postponing the flush and return. */ 
                server.aof_flush_postponed_start = server.unixtime; 
                return; 
            } else if (server.unixtime - server.aof_flush_postponed_start < 2) { 
                /* We were already waiting for fsync to finish, but for less 
                 * than two seconds this is still ok. Postpone again. */ 
                return; 
            } 
            /* Otherwise fall trough, and go write since we can't wait 
             * over two seconds. */ 
            server.aof_delayed_fsync++; 
            serverLog(LL_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis."); 
        } 
    } 
    /* We want to perform a single write. This should be guaranteed atomic 
     * at least if the filesystem we are writing is a real physical one. 
     * While this will save us against the server being killed I don't think 
     * there is much to do about the whole server stopping for power problems 
     * or alike */ 

    if (server.aof_flush_sleep && sdslen(server.aof_buf)) { 
        usleep(server.aof_flush_sleep); 
    } 

    latencyStartMonitor(latency); 
    nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf)); /* 内部调用write函数 */ 
    latencyEndMonitor(latency); 
    /* We want to capture different events for delayed writes: 
     * when the delay happens with a pending fsync, or with a saving child 
     * active, and when the above two conditions are missing. 
     * We also use an additional event name to save all samples which is 
     * useful for graphing / monitoring purposes. */ 
    if (sync_in_progress) { 
        latencyAddSampleIfNeeded("aof-write-pending-fsync",latency); 
    } else if (hasActiveChildProcess()) { 
        latencyAddSampleIfNeeded("aof-write-active-child",latency); 
    } else { 
        latencyAddSampleIfNeeded("aof-write-alone",latency); 
    } 
    latencyAddSampleIfNeeded("aof-write",latency); 

    /* We performed the write so reset the postponed flush sentinel to zero. */ 
    server.aof_flush_postponed_start = 0; 

    if (nwritten != (ssize_t)sdslen(server.aof_buf)) { 
        static time_t last_write_error_log = 0; 
        int can_log = 0; 

        /* Limit logging rate to 1 line per AOF_WRITE_LOG_ERROR_RATE seconds. */ 
        if ((server.unixtime - last_write_error_log) > AOF_WRITE_LOG_ERROR_RATE) { 
            can_log = 1; 
            last_write_error_log = server.unixtime; 
        } 

        /* Log the AOF write error and record the error code. */ 
        if (nwritten == -1) { 
            if (can_log) { 
                serverLog(LL_WARNING,"Error writing to the AOF file: %s", 
                    strerror(errno)); 
                server.aof_last_write_errno = errno; 
            } 
        } else { 
            if (can_log) { 
                serverLog(LL_WARNING,"Short write while writing to " 
                                       "the AOF file: (nwritten=%lld, " 
                                       "expected=%lld)", 
                                       (long long)nwritten, 
                                       (long long)sdslen(server.aof_buf)); 
            } 

            if (ftruncate(server.aof_fd, server.aof_current_size) == -1) { 
                if (can_log) { 
                    serverLog(LL_WARNING, "Could not remove short write " 
                             "from the append-only file.  Redis may refuse " 
                             "to load the AOF the next time it starts.  " 
                             "ftruncate: %s", strerror(errno)); 
                } 
            } else { 
                /* If the ftruncate() succeeded we can set nwritten to 
                 * -1 since there is no longer partial data into the AOF. */ 
                nwritten = -1; 
            } 
            server.aof_last_write_errno = ENOSPC; 
        } 

        /* Handle the AOF write error. */ 
        if (server.aof_fsync == AOF_FSYNC_ALWAYS) { 
            /* We can't recover when the fsync policy is ALWAYS since the 
             * reply for the client is already in the output buffers, and we 
             * have the contract(缩小,减小) with the user that on acknowledged(承认) write data 
             * is synced on disk. 
             * 如果是always方式刷新,由于此时客户端回包已经在输出缓冲区,应该减少数据的丢失,所以一旦错误就应该退出 
             */ 
            serverLog(LL_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting..."); 
            exit(1); 
        } else { 
            /* Recover from failed write leaving data into the buffer. However 
             * set an error to stop accepting writes as long as the error 
             * condition is not cleared. */ 
            server.aof_last_write_status = C_ERR; 

            /* Trim the sds buffer if there was a partial write, and there 
             * was no way to undo it with ftruncate(2). */ 
            if (nwritten > 0) { 
                server.aof_current_size += nwritten; 
                sdsrange(server.aof_buf,nwritten,-1); 
            } 
            return; /* We'll try again on the next call... */ 
        } 
    } else { 
        /* Successful write(2). If AOF was in error state, restore the 
         * OK state and log the event. */ 
        if (server.aof_last_write_status == C_ERR) { 
            serverLog(LL_WARNING, 
                "AOF write error looks solved, Redis can write again."); 
            server.aof_last_write_status = C_OK; 
        } 
    } 
    server.aof_current_size += nwritten; 

    /* Re-use AOF buffer when it is small enough. The maximum comes from the 
     * arena size of 4k minus some overhead (but is otherwise arbitrary). */ 
    if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) { 
        sdsclear(server.aof_buf); 
    } else { 
        sdsfree(server.aof_buf); 
        server.aof_buf = sdsempty(); 
    } 

try_fsync: 
    /* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are 
     * children doing I/O in the background. */ 
    if (server.aof_no_fsync_on_rewrite && hasActiveChildProcess()) 
        return; 

    /* Perform the fsync if needed. */ 
    if (server.aof_fsync == AOF_FSYNC_ALWAYS) { 
        /* redis_fsync is defined as fdatasync() for Linux in order to avoid 
         * flushing metadata. */ 
        latencyStartMonitor(latency); 
        redis_fsync(server.aof_fd); /* Let's try to get this data on the disk,always时同步调用fsync */ 
        latencyEndMonitor(latency); 
        latencyAddSampleIfNeeded("aof-fsync-always",latency); 
        server.aof_fsync_offset = server.aof_current_size; 
        server.aof_last_fsync = server.unixtime; 
    } else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC && 
                server.unixtime > server.aof_last_fsync)) { 
        if (!sync_in_progress) { 
            aof_background_fsync(server.aof_fd); /* 每秒fsync的时候是在单独的线程中做的 */ 
            server.aof_fsync_offset = server.aof_current_size; 
        } 
        server.aof_last_fsync = server.unixtime; 
    } 
} 

5.3 AOF 重写

serverCron函数中会判断是否达到了重写条件,前提是当前没有子进程在重写,该函数会按配置的hz选项值调用

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { 
    /* Check if a background saving or AOF rewrite in progress terminated. */ 
    if (hasActiveChildProcess() || ldbPendingChildren()) 
    { 
        checkChildrenDone(); 
    } else { 
        /* Trigger an AOF rewrite if needed. */ 
        if (server.aof_state == AOF_ON && /* 开启了aof*/ 
            !hasActiveChildProcess() && /* 当前没有子进程 */ 
            server.aof_rewrite_perc && /* aof配置的是everysenc */ 
            server.aof_current_size > server.aof_rewrite_min_size) /* 当前文件超过了重写的最小值 */ 
        { 
            long long base = server.aof_rewrite_base_size ? 
                server.aof_rewrite_base_size : 1; 
            long long growth = (server.aof_current_size*100/base) - 100; 
            if (growth >= server.aof_rewrite_perc) { 
                serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth); 
                rewriteAppendOnlyFileBackground(); /* 重写 */ 
            } 
        } 
    } 
} 

rewriteAppendOnlyFileBackground函数中创建了父子进程之间通信所需的管道组,然后fork一个子进程,子进程内部调用了rewriteAppendOnlyFile函数

/* Create the pipes used for parent - child process IPC during rewrite. 
 * We have a data pipe used to send AOF incremental diffs to the child, 
 * and two other pipes used by the children to signal it finished with 
 * the rewrite so no more data should be written, and another for the 
 * parent to acknowledge it understood this new condition. */ 
int aofCreatePipes(void) { 
    int fds[6] = {-1, -1, -1, -1, -1, -1}; 
    int j; 

    if (pipe(fds) == -1) goto error; /* parent -> children data. */ 
    if (pipe(fds+2) == -1) goto error; /* children -> parent ack. */ 
    if (pipe(fds+4) == -1) goto error; /* parent -> children ack. 3组管道*/ 
    /* Parent -> children data is non blocking. */ 
    if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error; 
    if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error; 
    if (aeCreateFileEvent(server.el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error; 

    server.aof_pipe_write_data_to_child = fds[1]; 
    server.aof_pipe_read_data_from_parent = fds[0]; 
    server.aof_pipe_write_ack_to_parent = fds[3]; 
    server.aof_pipe_read_ack_from_child = fds[2]; 
    server.aof_pipe_write_ack_to_child = fds[5]; 
    server.aof_pipe_read_ack_from_parent = fds[4]; 
    server.aof_stop_sending_diff = 0; 
    return C_OK; 

error: 
    serverLog(LL_WARNING,"Error opening /setting AOF rewrite IPC pipes: %s", 
        strerror(errno)); 
    for (j = 0; j < 6; j++) if(fds[j] != -1) close(fds[j]); 
    return C_ERR; 
} 
/* Write a sequence of commands able to fully rebuild the dataset into 
 * "filename". Used both by REWRITEAOF and BGREWRITEAOF. 
 * 
 * In order to minimize the number of commands needed in the rewritten 
 * log Redis uses variadic commands when possible, such as RPUSH, SADD 
 * and ZADD. However at max AOF_REWRITE_ITEMS_PER_CMD items per time 
 * are inserted using a single command. */ 
int rewriteAppendOnlyFile(char *filename) { 
    rio aof; 
    FILE *fp; 
    char tmpfile[256]; 
    char byte; 

    /* Note that we have to use a different temp name here compared to the 
     * one used by rewriteAppendOnlyFileBackground() function. */ 
    snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid()); 
    fp = fopen(tmpfile,"w"); 
    if (!fp) { 
        serverLog(LL_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno)); 
        return C_ERR; 
    } 

    server.aof_child_diff = sdsempty(); /* 清空diff */ 
    rioInitWithFile(&aof,fp); 

    if (server.aof_rewrite_incremental_fsync) 
        rioSetAutoSync(&aof,REDIS_AUTOSYNC_BYTES); /* 32mb刷新一次 */ 

    startSaving(RDBFLAGS_AOF_PREAMBLE); 

    if (server.aof_use_rdb_preamble) { 
        int error; 
        if (rdbSaveRio(&aof,&error,RDBFLAGS_AOF_PREAMBLE,NULL) == C_ERR) { 
            errno = error; 
            goto werr; 
        } 
    } else { 
        if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr; /* 实际重写函数 */ 
    } 

    /* Do an initial slow fsync here while the parent is still sending 
     * data, in order to make the next final fsync faster. */ 
    if (fflush(fp) == EOF) goto werr; /* 刷新文件指针 */ 
    if (fsync(fileno(fp)) == -1) goto werr; /* 刷新文件fd */ 

    /* Read again a few times to get more data from the parent. 
     * We can't read forever (the server may receive data from clients 
     * faster than it is able to send data to the child), so we try to read 
     * server读请求比回复应答快的多 
     * some more data in a loop as soon as there is a good chance more data 
     * will come. If it looks like we are wasting time, we abort (this 
     * happens after 20 ms without new data). */ 
    int nodata = 0; 
    mstime_t start = mstime(); 
    while(mstime()-start < 1000 && nodata < 20) { 
        if (aeWait(server.aof_pipe_read_data_from_parent, AE_READABLE, 1) <= 0) 
        { 
            nodata++; 
            continue; 
        } 
        nodata = 0; /* Start counting from zero, we stop on N *contiguous* 
                       timeouts. */ 
        aofReadDiffFromParent(); /* 通过aof_pipe_read_data_from_parent接收父进程发送过来的积累命令 */ 
    } 

    /* Ask the master to stop sending diffs. */ 
    if (write(server.aof_pipe_write_ack_to_parent,"!",1) != 1) goto werr; /* 通知server 停止发送积累 */ 
    if (anetNonBlock(NULL,server.aof_pipe_read_ack_from_parent) != ANET_OK) /* 等待server应答 */ 
        goto werr; 
    /* We read the ACK from the server using a 10 seconds timeout. Normally 
     * it should reply ASAP, but just in case we lose its reply, we are sure 
     * the child will eventually get terminated. */ 
    if (syncRead(server.aof_pipe_read_ack_from_parent,&byte,1,5000) != 1 ||  /* 等待server的确认回包 */ 
        byte != '!') goto werr; 
    serverLog(LL_NOTICE,"Parent agreed to stop sending diffs. Finalizing AOF..."); 

    /* Read the final diff if any. 再收一次,防止丢包 */ 
    aofReadDiffFromParent(); 
    /* 这里涉及到3组管道,管道是单向的,如果要双向通信,就需要多组管道 
     * aof_pipe_write_data_to_child / aof_pipe_read_data_from_parent 收发积累命令的管道 
     * aof_pipe_write_ack_to_parent / aof_pipe_read_ack_from_child 子进程通知父进程stop发送的管道 
     * aof_pipe_write_ack_to_child / aof_pipe_read_ack_from_parent 父进程确认通知收到子进程收到stop发送管道 
     */ 

    /* Write the received diff to the file. */ 
    serverLog(LL_NOTICE, 
        "Concatenating %.2f MB of AOF diff received from parent.", 
        (double) sdslen(server.aof_child_diff) / (1024*1024)); 
    if (rioWrite(&aof,server.aof_child_diff,sdslen(server.aof_child_diff)) == 0) 
        goto werr; 

    /* Make sure data will not remain on the OS's output buffers */ 
    if (fflush(fp) == EOF) goto werr; 
    if (fsync(fileno(fp)) == -1) goto werr; 
    if (fclose(fp) == EOF) goto werr; /* 关闭文件 */ 

    /* Use RENAME to make sure the DB file is changed atomically only 
     * if the generate DB file is ok. */ 
    if (rename(tmpfile,filename) == -1) { 
        serverLog(LL_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno)); 
        unlink(tmpfile); 
        stopSaving(0); 
        return C_ERR; 
    } 
    serverLog(LL_NOTICE,"SYNC append only file rewrite performed"); 
    stopSaving(1); 
    return C_OK; 

werr: 
    serverLog(LL_WARNING,"Write error writing append only file on disk: %s", strerror(errno)); 
    fclose(fp); 
    unlink(tmpfile); 
    stopSaving(0); 
    return C_ERR; 
}