redis源码分析——9、AOF持久化
除了RDB外,redis还提供了AOF(Append Only File)方式的持久化功能。与RDB不同是,AOF记录的所有写命令的流水,通过命令重放来恢复数据。
思考:当redis接收到一条写命令后,执行写命令、回复接口、写AOF 这三者的顺序是怎样的
一、开启AOF
只需要在配置文件中打开appendonly
就可以开启aof,默认文件名是"appendonly.aof"
appendonly yes
appendfilename "appendonly.aof"
二、AOF相关配置
除了上面两个最基本的配置外,还有些和AOF相关的配置。
2.1 appendfsync
我们知道,在向磁盘写文件时并不是直接写磁盘的,而是先写到的OS的buffer区,等到满足触发条件时,OS再将buffer中的数据刷到磁盘。这么做主要是为了提高效率。但是这也就导致,在我们调用调用write
函数的时候,数据只是写到了buffer中,不一定刷到磁盘。假如write
后机器宕机,那么这部分数据就会丢失。系统提供了fsync
函数来强制刷磁盘,redis中提供了刷磁盘的频率选项。
# appendfsync always # 每写一条,刷一次盘,数据丢失最少
appendfsync everysec # 每隔1s刷一次盘,数据最多丢失1s
# appendfsync no # 有OS刷盘,效率最高,丢失数据最多
2.2 no-appendfsync-on-rewrite
在AOF重写的过程中,命令还是会被记录到旧的AOF文件的,也就是此时系统有2个磁盘IO——重写的IO和记录binlog的IO。这两个IO同时操作磁盘会引起资源竞争,而在记录binlog的是主进程,还要提供服务,如果磁盘阻塞了,相当于是主进程阻塞了,会影响大正常服务。
no-appendfsync-on-rewrite no
2.4 aof-load-truncated
truncated(缩减的,删节的,截短了的)
我们知道在写文件时是先写到buffer,再刷到磁盘的,那么就会出现最后一条不完整的情况,比如磁盘写了一般的时候宕机了。那么此时当redis重启加载AOF文件遇到最后一行的时候该不该报错呢,这就是aof-load-truncated
的作用
aof-load-truncated yes
2.5 aof-use-rdb-preamble
preamble(序言; 绪论; 导言; 前言; 开场白)
直接翻译就是“aof前面加rdb”,其实这是开启混合模式,在AOF重写的是先快速生成RDB,再讲这段时间的命令追加到RDB后面
# When rewriting the AOF file, Redis is able to use an RDB preamble in the
# AOF file for faster rewrites and recoveries. When this option is turned
# on the rewritten AOF file is composed of two different stanzas:
#
# [RDB file][AOF tail]
#
# When loading Redis recognizes that the AOF file starts with the "REDIS"
# string and loads the prefixed RDB file, and continues loading the AOF
# tail.
aof-use-rdb-preamble yes
2.6 aof-rewrite-incremental-fsync
AOF重写时每次每批写入磁盘的数据大小是32mb,避免单次刷盘数据过多造成磁盘阻塞
# When a child rewrites the AOF file, if the following option is enabled
# the file will be fsync-ed every 32 MB of data generated. This is useful
# in order to commit the file to the disk more incrementally and avoid
# big latency spikes.
aof-rewrite-incremental-fsync yes
三、AOF
3.1 AOF原理
AOF的原理相对简单,就是把写命令记录到文件。
AOF保存的是一条条的指令,在数据恢复的时候只要重放这些指令就可以了,在理想情况下,AOF可以做到宕机时只丢失一条数据,不过这样对性能损失很大
AOF持久化的时候并不是把命令直接写磁盘的,而是先将这些命令记录在全局的aof_buf
里面,然后按设置条件write
、fsync
到磁盘
3.2 AOF过程
a. 服务器执行完命令后把对键空间改动的命令propagate到aofAppend流程中
b. 把命令保存到aof_buf
中,aof_buf
是一个全局的sds,如果此时有子进程在重写AOF,则还需要保存到重写buffer
c. 主事件循环的beforeSleep中调用flushAppendOnlyFile
d. 将aof_buf
中的数据通过write
写到aof文件
e. 根据刷新规则(always、everysec、no)将数据fsync到磁盘
四、AOF重写
AOF是以流水日志的形式保存命令的,随着写(修改)的次数增多,这个文件会越来越来大,哪怕只有1个key,我们对这个key执行1000次set时也会产生1000条记录,整个文件膨胀很大。
4.1 AOF重写的触发条件
通过rewriteaof
可以手动触发AOF重写,也可以用文件大小来触发AOF重写
auto-aof-rewrite-percentage 100 # 当文件超过上次重写的100%
auto-aof-rewrite-min-size 64mb # 文件达到64mb时重写,这个值只在第一次重写有效,即第一次重写后就会根据auto-aof-rewrite-percentage来判断重写了
AOF重写触发条件有2个,但实际情况是——当服务从来没有AOF重写时,只要文件超过64mb就会触发重写,自此以后,如果新的aof文件大小超过了初始时的2倍,则触发重写
4.2 AOF重写原理
重写这个词语有点歧义,不了解的可能会认为是对原aof分析压缩得到新aof文件,其实redis中的AOF重写完全不是这样的,redis中的AOF重写和原aof文件没有任何关系。
在AOF重写的时候redis是直接fork了一个子进程,然后把键空间中的每一个key根据当前值生成一条写命令就可以了。因此在AOF重写的过程中系统可能会存在两个aof文件——老的aof文件和正在重写的aof文件,当重写完成后直接用重写后的aof覆盖旧的aof。
值得注意的是在实际重写过程中,对于一些数据结构(比如list、set、zset等),如果元素较多,则会把写入拆分成若干条子命令,每条命令的item不超过AOF_REWRITE_ITEMS_PER_CMD
(默认64),这么做主要是避免恢复数据的时候客户端输入缓冲区溢出
4.3 AOF重写过程
前面提到AOF重写是通过子进程来的。
a. 触发服务AOF重写
b. 服务(主进程)fork子进程(前提是当前没有子进程在重写)
c. 父进程把此后的写命令记录到buffer(旧的aof还是会写)
d. 子进程创建一个temp-rewriteaof-bg-子进程id.aof文件用来记录重写后的aof
e. 子进程给每一个key生成一条写命令
f. 子进程通过"!"消息通知父进程,告诉父进程键空间重写完成
g. 子进程等待接收父进程缓存在buffer中的写命令,并追加到重写的aof文件
h. fsync刷新磁盘、重命名
五、源码分析
5.1 全局定义
和AOF相关的定义都在redisServer
结构体里面
struct redisServer {
/* AOF persistence */
int aof_enabled; /* AOF configuration */
int aof_state; /* AOF_(ON|OFF|WAIT_REWRITE) */
int aof_fsync; /* Kind of fsync() policy */
char *aof_filename; /* Name of the AOF file */
int aof_no_fsync_on_rewrite; /* Don't fsync if a rewrite is in prog. */
int aof_rewrite_perc; /* Rewrite AOF if % growth is > M and... */
off_t aof_rewrite_min_size; /* the AOF file is at least N bytes. */
off_t aof_rewrite_base_size; /* AOF size on latest startup or rewrite. */
off_t aof_current_size; /* AOF current size. */
off_t aof_fsync_offset; /* AOF offset which is already synced to disk. */
int aof_flush_sleep; /* Micros to sleep before flush. (used by tests) */
int aof_rewrite_scheduled; /* Rewrite once BGSAVE terminates. */
pid_t aof_child_pid; /* PID if rewriting process */
list *aof_rewrite_buf_blocks; /* Hold changes during an AOF rewrite. 重写时主进程用来缓存diff的地方 */
sds aof_buf; /* AOF buffer, written before entering the event loop, 所有的命令会先记录到aof_buf缓存 */
int aof_fd; /* File descriptor of currently selected AOF file */
int aof_selected_db; /* Currently selected DB in AOF,当前记录的db,如果切换db,则aof中增加一条select指令 */
time_t aof_flush_postponed_start; /* UNIX time of postponed AOF flush */
time_t aof_last_fsync; /* UNIX time of last fsync() */
time_t aof_rewrite_time_last; /* Time used by last AOF rewrite run. */
time_t aof_rewrite_time_start; /* Current AOF rewrite start time. */
int aof_lastbgrewrite_status; /* C_OK or C_ERR */
unsigned long aof_delayed_fsync; /* delayed AOF fsync() counter */
int aof_rewrite_incremental_fsync;/* fsync incrementally while aof rewriting? */
int rdb_save_incremental_fsync; /* fsync incrementally while rdb saving? */
int aof_last_write_status; /* C_OK or C_ERR */
int aof_last_write_errno; /* Valid if aof_last_write_status is ERR */
int aof_load_truncated; /* Don't stop on unexpected AOF EOF. */
int aof_use_rdb_preamble; /* Use RDB preamble on AOF rewrites. */
/* AOF pipes used to communicate between parent and child during rewrite. */
int aof_pipe_write_data_to_child;
int aof_pipe_read_data_from_parent;
int aof_pipe_write_ack_to_parent; /* 这6个管道id用来父子进程直接通信 */
int aof_pipe_read_ack_from_child;
int aof_pipe_write_ack_to_child;
int aof_pipe_read_ack_from_parent;
int aof_stop_sending_diff; /* If true stop sending accumulated diffs
to child process. */
sds aof_child_diff; /* AOF diff accumulator child side. */
}
5.2 AOF写入
所有的命令都会进入到processCommand
函数,而该函数对执行命令又调用了一个call
函数
int processCommand(client *c) {
/***************** 一些其他命令 ************************/
/* Exec the command */
if (c->flags & CLIENT_MULTI &&
c->cmd->proc != execCommand && c->cmd->proc != discardCommand &&
c->cmd->proc != multiCommand && c->cmd->proc != watchCommand)
{
queueMultiCommand(c);
addReply(c,shared.queued);
} else {
call(c,CMD_CALL_FULL); /* 进入真正的执行函数 */
c->woff = server.master_repl_offset;
if (listLength(server.ready_keys))
handleClientsBlockedOnKeys();
}
return C_OK;
}
call
函数的主流程如下:
void call(client *c, int flags) {
/* Call the command. */
c->cmd->proc(c); /* 调用命令实现函数 */
/* Propagate the command into the AOF and replication link */
if (flags & CMD_CALL_PROPAGATE &&
(c->flags & CLIENT_PREVENT_PROP) != CLIENT_PREVENT_PROP)
{
int propagate_flags = PROPAGATE_NONE;
/* Check if the command operated changes in the data set. If so
* set for replication / AOF propagation. */
if (dirty) propagate_flags |= (PROPAGATE_AOF|PROPAGATE_REPL); /* 此次操作引起数据改变 */
/* If the client forced AOF / replication of the command, set
* the flags regardless of the command effects on the data set. */
if (c->flags & CLIENT_FORCE_REPL) propagate_flags |= PROPAGATE_REPL;
if (c->flags & CLIENT_FORCE_AOF) propagate_flags |= PROPAGATE_AOF;
/* However prevent AOF / replication propagation if the command
* implementations called preventCommandPropagation() or similar,
* or if we don't have the call() flags to do so. */
if (c->flags & CLIENT_PREVENT_REPL_PROP ||
!(flags & CMD_CALL_PROPAGATE_REPL))
propagate_flags &= ~PROPAGATE_REPL;
if (c->flags & CLIENT_PREVENT_AOF_PROP ||
!(flags & CMD_CALL_PROPAGATE_AOF))
propagate_flags &= ~PROPAGATE_AOF;
/* Call propagate() only if at least one of AOF / replication
* propagation is needed. Note that modules commands handle replication
* in an explicit way, so we never replicate them automatically. */
if (propagate_flags != PROPAGATE_NONE && !(c->cmd->flags & CMD_MODULE))
propagate(c->cmd,c->db->id,c->argv,c->argc,propagate_flags); /* 如果开启了aof或者复制,则将命令传播*/
}
}
propagate:传播; 宣传; 繁殖; 增殖
propagate
函数调用AOF的接口将命令记录到aof_buf
缓冲中
/* Propagate the specified command (in the context of the specified database id)
* to AOF and Slaves.
*
* flags are an xor between:
* + PROPAGATE_NONE (no propagation of command at all)
* + PROPAGATE_AOF (propagate into the AOF file if is enabled)
* + PROPAGATE_REPL (propagate into the replication link)
*
* This should not be used inside commands implementation since it will not
* wrap the resulting commands in MULTI/EXEC. Use instead alsoPropagate(),
* preventCommandPropagation(), forceCommandPropagation().
*
* However for functions that need to (also) propagate out of the context of a
* command execution, for example when serving a blocked client, you
* want to use propagate().
*/
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
int flags)
{
if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF) /* aof */
feedAppendOnlyFile(cmd,dbid,argv,argc);
if (flags & PROPAGATE_REPL) /* 主从复制 */
replicationFeedSlaves(server.slaves,dbid,argv,argc);
}
feedAppendOnlyFile
将命令保存到aof_buf中
void feedAppendOnlyFile(struct redisCommand *cmd, int dictid, robj **argv, int argc) {
/* Append to the AOF buffer. This will be flushed on disk just before
* of re-entering the event loop, so before the client will get a
* positive reply about the operation performed. */
if (server.aof_state == AOF_ON)
server.aof_buf = sdscatlen(server.aof_buf,buf,sdslen(buf)); /* append到aof_buf里面 */
/* If a background append only file rewriting is in progress we want to
* accumulate the differences between the child DB and the current one
* in a buffer, so that when the child process will do its work we
* can append the differences to the new append only file. */
if (server.aof_child_pid != -1)
aofRewriteBufferAppend((unsigned char*)buf,sdslen(buf));
/* 如果正在执行aof重写,需要记录命令到buffer,该函数里面还注册了父进程发送diff的回调事件,通过aof_pipe_write_data_to_child管道发送 */
}
在服务的主循环中,每次事件触发之前都会调beforesleep
回调
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
}
在beforesleep
中再次调用flushAppendOnlyFile
/* This function gets called every time Redis is entering the
* main loop of the event driven library, that is, before to sleep
* for ready file descriptors. */
void beforeSleep(struct aeEventLoop *eventLoop) {
/* Write the AOF buffer on disk */
flushAppendOnlyFile(0);
}
flushAppendOnlyFile
将aof_buf
的数据写到OS缓冲区,根据配置择机fsync到磁盘
flushAppendOnlyFile
函数很值得细看,里面涉及到IO
/* Write the append only file buffer on disk. 将数据刷到磁盘
*
* Since we are required to write the AOF before replying to the client,
* 这里说写aof是在回复客户端之前
* and the only way the client socket can get a write is entering when the
* 客户端的socket只能在event loop中触发可写事件
* the event loop, we accumulate(积累) all the AOF writes in a memory
* buffer and write it on disk using this function just before entering
* the event loop again.
* 我们把所有的AOF写命令都积累到内存的buffer中,在进入eventloop前先用本函数将buffer中的数据写到磁盘
*
* About the 'force' argument:
*
* When the fsync policy is set to 'everysec' we may delay the flush if there
* is still an fsync() going on in the background thread, since for instance
* on Linux write(2) will be blocked by the background fsync anyway.
* When this happens we remember that there is some aof buffer to be
* flushed ASAP, and will try to do that in the serverCron() function.
*
* However if force is set to 1 we'll write regardless of the background
* fsync. */
#define AOF_WRITE_LOG_ERROR_RATE 30 /* Seconds between errors logging. */
void flushAppendOnlyFile(int force) {
ssize_t nwritten;
int sync_in_progress = 0;
mstime_t latency;
if (sdslen(server.aof_buf) == 0) {
/* Check if we need to do fsync even the aof buffer is empty,
* because previously in AOF_FSYNC_EVERYSEC mode, fsync is
* called only when aof buffer is not empty, so if users
* stop write commands before fsync called in one second,
* the data in page cache cannot be flushed in time.
* 即使aof_buf中没有数据,也要检查是否需要fsync
*/
if (server.aof_fsync == AOF_FSYNC_EVERYSEC && /* 每秒刷一次 */
server.aof_fsync_offset != server.aof_current_size && /* 当前aof_buf里还有未fsync的 */
server.unixtime > server.aof_last_fsync && /* 时间超过1s */
!(sync_in_progress = aofFsyncInProgress())) { /* aofFsyncInProgress 里面是一个mutex锁 */
goto try_fsync;
} else {
return;
}
}
if (server.aof_fsync == AOF_FSYNC_EVERYSEC)
sync_in_progress = aofFsyncInProgress();
if (server.aof_fsync == AOF_FSYNC_EVERYSEC && !force) {
/* With this append fsync policy we do background fsyncing.
* If the fsync is still in progress we can try to delay
* the write for a couple of seconds. */
if (sync_in_progress) {
if (server.aof_flush_postponed_start == 0) { /* postponed:延期*/
/* No previous write postponing, remember that we are
* postponing the flush and return. */
server.aof_flush_postponed_start = server.unixtime;
return;
} else if (server.unixtime - server.aof_flush_postponed_start < 2) {
/* We were already waiting for fsync to finish, but for less
* than two seconds this is still ok. Postpone again. */
return;
}
/* Otherwise fall trough, and go write since we can't wait
* over two seconds. */
server.aof_delayed_fsync++;
serverLog(LL_NOTICE,"Asynchronous AOF fsync is taking too long (disk is busy?). Writing the AOF buffer without waiting for fsync to complete, this may slow down Redis.");
}
}
/* We want to perform a single write. This should be guaranteed atomic
* at least if the filesystem we are writing is a real physical one.
* While this will save us against the server being killed I don't think
* there is much to do about the whole server stopping for power problems
* or alike */
if (server.aof_flush_sleep && sdslen(server.aof_buf)) {
usleep(server.aof_flush_sleep);
}
latencyStartMonitor(latency);
nwritten = aofWrite(server.aof_fd,server.aof_buf,sdslen(server.aof_buf)); /* 内部调用write函数 */
latencyEndMonitor(latency);
/* We want to capture different events for delayed writes:
* when the delay happens with a pending fsync, or with a saving child
* active, and when the above two conditions are missing.
* We also use an additional event name to save all samples which is
* useful for graphing / monitoring purposes. */
if (sync_in_progress) {
latencyAddSampleIfNeeded("aof-write-pending-fsync",latency);
} else if (hasActiveChildProcess()) {
latencyAddSampleIfNeeded("aof-write-active-child",latency);
} else {
latencyAddSampleIfNeeded("aof-write-alone",latency);
}
latencyAddSampleIfNeeded("aof-write",latency);
/* We performed the write so reset the postponed flush sentinel to zero. */
server.aof_flush_postponed_start = 0;
if (nwritten != (ssize_t)sdslen(server.aof_buf)) {
static time_t last_write_error_log = 0;
int can_log = 0;
/* Limit logging rate to 1 line per AOF_WRITE_LOG_ERROR_RATE seconds. */
if ((server.unixtime - last_write_error_log) > AOF_WRITE_LOG_ERROR_RATE) {
can_log = 1;
last_write_error_log = server.unixtime;
}
/* Log the AOF write error and record the error code. */
if (nwritten == -1) {
if (can_log) {
serverLog(LL_WARNING,"Error writing to the AOF file: %s",
strerror(errno));
server.aof_last_write_errno = errno;
}
} else {
if (can_log) {
serverLog(LL_WARNING,"Short write while writing to "
"the AOF file: (nwritten=%lld, "
"expected=%lld)",
(long long)nwritten,
(long long)sdslen(server.aof_buf));
}
if (ftruncate(server.aof_fd, server.aof_current_size) == -1) {
if (can_log) {
serverLog(LL_WARNING, "Could not remove short write "
"from the append-only file. Redis may refuse "
"to load the AOF the next time it starts. "
"ftruncate: %s", strerror(errno));
}
} else {
/* If the ftruncate() succeeded we can set nwritten to
* -1 since there is no longer partial data into the AOF. */
nwritten = -1;
}
server.aof_last_write_errno = ENOSPC;
}
/* Handle the AOF write error. */
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
/* We can't recover when the fsync policy is ALWAYS since the
* reply for the client is already in the output buffers, and we
* have the contract(缩小,减小) with the user that on acknowledged(承认) write data
* is synced on disk.
* 如果是always方式刷新,由于此时客户端回包已经在输出缓冲区,应该减少数据的丢失,所以一旦错误就应该退出
*/
serverLog(LL_WARNING,"Can't recover from AOF write error when the AOF fsync policy is 'always'. Exiting...");
exit(1);
} else {
/* Recover from failed write leaving data into the buffer. However
* set an error to stop accepting writes as long as the error
* condition is not cleared. */
server.aof_last_write_status = C_ERR;
/* Trim the sds buffer if there was a partial write, and there
* was no way to undo it with ftruncate(2). */
if (nwritten > 0) {
server.aof_current_size += nwritten;
sdsrange(server.aof_buf,nwritten,-1);
}
return; /* We'll try again on the next call... */
}
} else {
/* Successful write(2). If AOF was in error state, restore the
* OK state and log the event. */
if (server.aof_last_write_status == C_ERR) {
serverLog(LL_WARNING,
"AOF write error looks solved, Redis can write again.");
server.aof_last_write_status = C_OK;
}
}
server.aof_current_size += nwritten;
/* Re-use AOF buffer when it is small enough. The maximum comes from the
* arena size of 4k minus some overhead (but is otherwise arbitrary). */
if ((sdslen(server.aof_buf)+sdsavail(server.aof_buf)) < 4000) {
sdsclear(server.aof_buf);
} else {
sdsfree(server.aof_buf);
server.aof_buf = sdsempty();
}
try_fsync:
/* Don't fsync if no-appendfsync-on-rewrite is set to yes and there are
* children doing I/O in the background. */
if (server.aof_no_fsync_on_rewrite && hasActiveChildProcess())
return;
/* Perform the fsync if needed. */
if (server.aof_fsync == AOF_FSYNC_ALWAYS) {
/* redis_fsync is defined as fdatasync() for Linux in order to avoid
* flushing metadata. */
latencyStartMonitor(latency);
redis_fsync(server.aof_fd); /* Let's try to get this data on the disk,always时同步调用fsync */
latencyEndMonitor(latency);
latencyAddSampleIfNeeded("aof-fsync-always",latency);
server.aof_fsync_offset = server.aof_current_size;
server.aof_last_fsync = server.unixtime;
} else if ((server.aof_fsync == AOF_FSYNC_EVERYSEC &&
server.unixtime > server.aof_last_fsync)) {
if (!sync_in_progress) {
aof_background_fsync(server.aof_fd); /* 每秒fsync的时候是在单独的线程中做的 */
server.aof_fsync_offset = server.aof_current_size;
}
server.aof_last_fsync = server.unixtime;
}
}
5.3 AOF 重写
在serverCron
函数中会判断是否达到了重写条件,前提是当前没有子进程在重写,该函数会按配置的hz
选项值调用
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
/* Check if a background saving or AOF rewrite in progress terminated. */
if (hasActiveChildProcess() || ldbPendingChildren())
{
checkChildrenDone();
} else {
/* Trigger an AOF rewrite if needed. */
if (server.aof_state == AOF_ON && /* 开启了aof*/
!hasActiveChildProcess() && /* 当前没有子进程 */
server.aof_rewrite_perc && /* aof配置的是everysenc */
server.aof_current_size > server.aof_rewrite_min_size) /* 当前文件超过了重写的最小值 */
{
long long base = server.aof_rewrite_base_size ?
server.aof_rewrite_base_size : 1;
long long growth = (server.aof_current_size*100/base) - 100;
if (growth >= server.aof_rewrite_perc) {
serverLog(LL_NOTICE,"Starting automatic rewriting of AOF on %lld%% growth",growth);
rewriteAppendOnlyFileBackground(); /* 重写 */
}
}
}
}
rewriteAppendOnlyFileBackground
函数中创建了父子进程之间通信所需的管道组,然后fork一个子进程,子进程内部调用了rewriteAppendOnlyFile
函数
/* Create the pipes used for parent - child process IPC during rewrite.
* We have a data pipe used to send AOF incremental diffs to the child,
* and two other pipes used by the children to signal it finished with
* the rewrite so no more data should be written, and another for the
* parent to acknowledge it understood this new condition. */
int aofCreatePipes(void) {
int fds[6] = {-1, -1, -1, -1, -1, -1};
int j;
if (pipe(fds) == -1) goto error; /* parent -> children data. */
if (pipe(fds+2) == -1) goto error; /* children -> parent ack. */
if (pipe(fds+4) == -1) goto error; /* parent -> children ack. 3组管道*/
/* Parent -> children data is non blocking. */
if (anetNonBlock(NULL,fds[0]) != ANET_OK) goto error;
if (anetNonBlock(NULL,fds[1]) != ANET_OK) goto error;
if (aeCreateFileEvent(server.el, fds[2], AE_READABLE, aofChildPipeReadable, NULL) == AE_ERR) goto error;
server.aof_pipe_write_data_to_child = fds[1];
server.aof_pipe_read_data_from_parent = fds[0];
server.aof_pipe_write_ack_to_parent = fds[3];
server.aof_pipe_read_ack_from_child = fds[2];
server.aof_pipe_write_ack_to_child = fds[5];
server.aof_pipe_read_ack_from_parent = fds[4];
server.aof_stop_sending_diff = 0;
return C_OK;
error:
serverLog(LL_WARNING,"Error opening /setting AOF rewrite IPC pipes: %s",
strerror(errno));
for (j = 0; j < 6; j++) if(fds[j] != -1) close(fds[j]);
return C_ERR;
}
/* Write a sequence of commands able to fully rebuild the dataset into
* "filename". Used both by REWRITEAOF and BGREWRITEAOF.
*
* In order to minimize the number of commands needed in the rewritten
* log Redis uses variadic commands when possible, such as RPUSH, SADD
* and ZADD. However at max AOF_REWRITE_ITEMS_PER_CMD items per time
* are inserted using a single command. */
int rewriteAppendOnlyFile(char *filename) {
rio aof;
FILE *fp;
char tmpfile[256];
char byte;
/* Note that we have to use a different temp name here compared to the
* one used by rewriteAppendOnlyFileBackground() function. */
snprintf(tmpfile,256,"temp-rewriteaof-%d.aof", (int) getpid());
fp = fopen(tmpfile,"w");
if (!fp) {
serverLog(LL_WARNING, "Opening the temp file for AOF rewrite in rewriteAppendOnlyFile(): %s", strerror(errno));
return C_ERR;
}
server.aof_child_diff = sdsempty(); /* 清空diff */
rioInitWithFile(&aof,fp);
if (server.aof_rewrite_incremental_fsync)
rioSetAutoSync(&aof,REDIS_AUTOSYNC_BYTES); /* 32mb刷新一次 */
startSaving(RDBFLAGS_AOF_PREAMBLE);
if (server.aof_use_rdb_preamble) {
int error;
if (rdbSaveRio(&aof,&error,RDBFLAGS_AOF_PREAMBLE,NULL) == C_ERR) {
errno = error;
goto werr;
}
} else {
if (rewriteAppendOnlyFileRio(&aof) == C_ERR) goto werr; /* 实际重写函数 */
}
/* Do an initial slow fsync here while the parent is still sending
* data, in order to make the next final fsync faster. */
if (fflush(fp) == EOF) goto werr; /* 刷新文件指针 */
if (fsync(fileno(fp)) == -1) goto werr; /* 刷新文件fd */
/* Read again a few times to get more data from the parent.
* We can't read forever (the server may receive data from clients
* faster than it is able to send data to the child), so we try to read
* server读请求比回复应答快的多
* some more data in a loop as soon as there is a good chance more data
* will come. If it looks like we are wasting time, we abort (this
* happens after 20 ms without new data). */
int nodata = 0;
mstime_t start = mstime();
while(mstime()-start < 1000 && nodata < 20) {
if (aeWait(server.aof_pipe_read_data_from_parent, AE_READABLE, 1) <= 0)
{
nodata++;
continue;
}
nodata = 0; /* Start counting from zero, we stop on N *contiguous*
timeouts. */
aofReadDiffFromParent(); /* 通过aof_pipe_read_data_from_parent接收父进程发送过来的积累命令 */
}
/* Ask the master to stop sending diffs. */
if (write(server.aof_pipe_write_ack_to_parent,"!",1) != 1) goto werr; /* 通知server 停止发送积累 */
if (anetNonBlock(NULL,server.aof_pipe_read_ack_from_parent) != ANET_OK) /* 等待server应答 */
goto werr;
/* We read the ACK from the server using a 10 seconds timeout. Normally
* it should reply ASAP, but just in case we lose its reply, we are sure
* the child will eventually get terminated. */
if (syncRead(server.aof_pipe_read_ack_from_parent,&byte,1,5000) != 1 || /* 等待server的确认回包 */
byte != '!') goto werr;
serverLog(LL_NOTICE,"Parent agreed to stop sending diffs. Finalizing AOF...");
/* Read the final diff if any. 再收一次,防止丢包 */
aofReadDiffFromParent();
/* 这里涉及到3组管道,管道是单向的,如果要双向通信,就需要多组管道
* aof_pipe_write_data_to_child / aof_pipe_read_data_from_parent 收发积累命令的管道
* aof_pipe_write_ack_to_parent / aof_pipe_read_ack_from_child 子进程通知父进程stop发送的管道
* aof_pipe_write_ack_to_child / aof_pipe_read_ack_from_parent 父进程确认通知收到子进程收到stop发送管道
*/
/* Write the received diff to the file. */
serverLog(LL_NOTICE,
"Concatenating %.2f MB of AOF diff received from parent.",
(double) sdslen(server.aof_child_diff) / (1024*1024));
if (rioWrite(&aof,server.aof_child_diff,sdslen(server.aof_child_diff)) == 0)
goto werr;
/* Make sure data will not remain on the OS's output buffers */
if (fflush(fp) == EOF) goto werr;
if (fsync(fileno(fp)) == -1) goto werr;
if (fclose(fp) == EOF) goto werr; /* 关闭文件 */
/* Use RENAME to make sure the DB file is changed atomically only
* if the generate DB file is ok. */
if (rename(tmpfile,filename) == -1) {
serverLog(LL_WARNING,"Error moving temp append only file on the final destination: %s", strerror(errno));
unlink(tmpfile);
stopSaving(0);
return C_ERR;
}
serverLog(LL_NOTICE,"SYNC append only file rewrite performed");
stopSaving(1);
return C_OK;
werr:
serverLog(LL_WARNING,"Write error writing append only file on disk: %s", strerror(errno));
fclose(fp);
unlink(tmpfile);
stopSaving(0);
return C_ERR;
}
上一篇: 用C++写了个协程库,欢迎star
下一篇: redis源码分析——9、AOF持久化