redis源码分析——10、主从同步
为什么要主从复制?一方面是为了读写分离,而且由于持久化对性能有损耗,可以在备机上持久化。除此也是为了数据的容灾,redis的内存数据库,一旦服务或机器宕机,数据就丢失了,可以通过增加副本来让数据多几个拷贝,提高安全性。
思考:
- 我们知道redis有一些命令的操作是不确定的,比如
spop
是随机删除一个元素,对于这种命令,主从同步的时候是怎么传递的? - 如果备机的机器时间比主机的快,在备机上能读到自己认为已经过期的数据吗?
一、主从复制的原理
主从复制的过程可以分为两大步——全量同步和命令传播
全量同步指的是将备机的数据更新到和主机数据一致的状态
命令同步指的是对于可能引起数据变化的key,主机会将操作命令同步到备机上
1、旧版复制
旧版(2.8以前)的复制严格按照上面两个过来的
a. 备机向主机发送一个sync
命令
b. 主机收到sync
后开始执行bgsave
命令生成一个rdb文件,同时用一个缓冲区记录该时间段的写命令
c. 主服务将生成的rdb文件发送到备机上,备机加载rdb文件,将数据更新到主机生成rdb文件时刻状态
d. 主机将缓冲区的写命令发送到备机,主备之间数据完全一致
2、旧版复制的缺陷
网络是不稳定的,主备之间的网络可能会由于网络的波动二断开,而一点断开重连,那又要执行一次完整的同步,而生成rdb是和耗资源的,但是网络波动在实际情况下又很常见
3、新版复制
针对旧版本复制的缺陷,redis在2.8以后引入了部分同步的概念,用psync
代替了sync
来执行同步操作
psync
具有完整重同步和部分重同步两种模式
完整重同步和旧版的同步类似,先发送rdb,在发送命令缓冲区
部分重同步允许主备之间网络重连,主机维护一个命令缓冲区的循环队列,当备机断开重连时主机先判断备机当前的同步时间点,如果最后一次同步命令还在循环队列里,则主机将该时间点后面的key全部发送给备机即可,这样就省去了rdb生成和传输。当然,前提条件是复制偏移量在缓冲区
每个redis实例都有一个字节的RUN_ID,备机在向主机发起同步请求时会带上主机的RUN_ID,主机检查请求的RUN_ID和自己的RUN_ID是否一致,若相不相等,则说明备机更换的主结点,那就会触发全量同步。而现实情况是进程重启很常见,重启后RUN_ID肯定会变化
4、新版复制改进
由上可见,新版复制部分重同步的条件比较严格的,现网环境很难满足
针对新版复制的严格条件,redis在4.0引入了psync2协议,对psync进行了优化
持久化主从复制信息,主机主从复制信息持久化到rdb中(RUN_ID和复制偏移量),当主机重启加载rdb时回复主从复制信息,主从辅助的信息以辅助(aux-field)字段的格式持久化到rdb文件。(这样的假设前提是主机开启了rdb持久化,且主节点宕机后重新拉起还是主节点,这在现网有点。。。)
存储上一个主服务器的复制信息,当主机因故障而发生主备切换时——备机成为了主,原备机会将原主机的复制信息保存到replid2
相关的字段中,如果有其他主机发起部分同步请求,则新主机会根据replid2
相关字段决定是否部分同步
5、sync、psync、psync2对比
总的来说,sync
是最简单也是效率最低的同步,只要网络异常,备机重连上主机后就会执行全量同步。
psync
解决了sync
的痛点,在主机上开辟了一块backlog环形队列还存放同步命令,当备机重连时可以直接从缓冲区中取命令
psync
的的触发有较为严格的条件限制,每个redis实例在运行的时候都会有一个run_id来表示身份,如果备机检测到主机的run_id发生改变,那么备机也会发起全量同步
psync2
的目的在于解决主备切换的情况,比如有A->B、A->C同步链路,假如A、B发生了主备切换,C的主变成了B,那么按照psync
算法应该需要全量同步,但事实上并不需要,所以psync2
中记录的该结点的上一个主结点的run_id,也就是说B和C有着共同的历史主结点,所以B和C可以部分同步
二、相关配置
1、replica-serve-stale-data
# When a replica loses its connection with the master, or when the replication
# is still in progress, the replica can act in two different ways:
#
# 1) if replica-serve-stale-data is set to 'yes' (the default) the replica will
# still reply to client requests, possibly with out of date data, or the
# data set may just be empty if this is the first synchronization.
#
# 2) if replica-serve-stale-data is set to 'no' the replica will reply with
# an error "SYNC with master in progress" to all the kind of commands
# but to INFO, replicaOF, AUTH, PING, SHUTDOWN, REPLCONF, ROLE, CONFIG,
# SUBSCRIBE, UNSUBSCRIBE, PSUBSCRIBE, PUNSUBSCRIBE, PUBLISH, PUBSUB,
# COMMAND, POST, HOST: and LATENCY.
replica-serve-stale-data yes
当备机与主机之间的连接断开,或者同步还在进行的时候,备机可以有两种情况:
- 如果开启了该选项(默认开启),则备机将会相应客户端请求
- 如果关闭该选项,备机会给客户端回复一个
SYNC with master in progress
错误,除了一些特殊命令
2、replica-read-only
# You can configure a replica instance to accept writes or not. Writing against
# a replica instance may be useful to store some ephemeral data (because data
# written on a replica will be easily deleted after resync with the master) but
# may also cause problems if clients are writing to it because of a
# misconfiguration.
#
# Since Redis 2.6 by default replicas are read-only.
#
# Note: read only replicas are not designed to be exposed to untrusted clients
# on the internet. It's just a protection layer against misuse of the instance.
# Still a read only replica exports by default all the administrative commands
# such as CONFIG, DEBUG, and so forth. To a limited extent you can improve
# security of read only replicas using 'rename-command' to shadow all the
# administrative / dangerous commands.
replica-read-only yes
可以让备机接收写请求,对于一些临时数据,写备机还是很方便的
只读被并不能防止client的恶意请求,如果想进一步提高安全性,可以用rename-command
来隐藏一些高危命令
3、repl-diskless-sync
# Replication SYNC strategy: disk or socket.
#
# New replicas and reconnecting replicas that are not able to continue the
# replication process just receiving differences, need to do what is called a
# "full synchronization". An RDB file is transmitted from the master to the
# replicas.
#
# The transmission can happen in two different ways:
#
# 1) Disk-backed: The Redis master creates a new process that writes the RDB
# file on disk. Later the file is transferred by the parent
# process to the replicas incrementally.
# 2) Diskless: The Redis master creates a new process that directly writes the
# RDB file to replica sockets, without touching the disk at all.
#
# With disk-backed replication, while the RDB file is generated, more replicas
# can be queued and served with the RDB file as soon as the current child
# producing the RDB file finishes its work. With diskless replication instead
# once the transfer starts, new replicas arriving will be queued and a new
# transfer will start when the current one terminates.
#
# When diskless replication is used, the master waits a configurable amount of
# time (in seconds) before starting the transfer in the hope that multiple
# replicas will arrive and the transfer can be parallelized(并行).
#
# With slow disks and fast (large bandwidth) networks, diskless replication
# works better.
repl-diskless-sync no
有两种同步方式——磁盘和socket
在新副本在上来的时候不能提供增量同步,只能用全量同步的方法先同步一次,全量同步的时候需要发送rdb快照
在发送rdb快照的时候可以有两种方法
- 落磁盘,主机新fork一个子进程,然后把rdb保存在磁盘上,然后父进程再逐渐把rdb发送到备机上
- 不落盘,主机fork一个子进程,子进程直接把rdb数据通过socket发送到备机,rdb在内存中,不落盘
落磁盘的方式有个好处就是一旦rdb落盘了,那么这个rdb可以供多个备机使用。否则,不落盘的时候如果有多个备机要求同步,则这些备机需要排队
不落盘的时候,主机在同步之前会稍微等待一会,如果有多个备机同步,那么可以并行发送
如果磁盘速度慢而网络带宽很大的时候,不落盘的方法会更好点
4、repl-diskless-sync-delay
# When diskless replication is enabled, it is possible to configure the delay
# the server waits in order to spawn the child that transfers the RDB via socket
# to the replicas.
#
# This is important since once the transfer starts, it is not possible to serve
# new replicas arriving, that will be queued for the next RDB transfer, so the
# server waits a delay in order to let more replicas arrive.
#
# The delay is specified in seconds, and by default is 5 seconds. To disable
# it entirely just set it to 0 seconds and the transfer will start ASAP.
repl-diskless-sync-delay 5
对于无盘复制,可以设置一个等待时间,尽量同时向多个备机同步,因为一旦开始复制,直到结束前都不能为新备机同步数据了
5、repl-diskless-load
# -----------------------------------------------------------------------------
# WARNING: RDB diskless load is experimental. Since in this setup the replica
# does not immediately store an RDB on disk, it may cause data loss during
# failovers. RDB diskless load + Redis modules not handling I/O reads may also
# cause Redis to abort in case of I/O errors during the initial synchronization
# stage with the master. Use only if your do what you are doing.
# -----------------------------------------------------------------------------
#
# Replica can load the RDB it reads from the replication link directly from the
# socket, or store the RDB to a file and read that file after it was completely
# recived from the master.
#
# In many cases the disk is slower than the network, and storing and loading
# the RDB file may increase replication time (and even increase the master's
# Copy on Write memory and salve buffers).
# However, parsing the RDB file directly from the socket may mean that we have
# to flush the contents of the current database before the full rdb was
# received. For this reason we have the following options:
#
# "disabled" - Don't use diskless load (store the rdb file to the disk first)
# "on-empty-db" - Use diskless load only when it is completely safe.
# "swapdb" - Keep a copy of the current db contents in RAM while parsing
# the data directly from the socket. note that this requires
# sufficient memory, if you don't have it, you risk an OOM kill.
repl-diskless-load disabled
在无盘复制中,备机可以通过配置来决定是否将rdb落到本地磁盘,如果落盘,那么同步时间就会变长,甚至会使主节点内存增大
6、repl-ping-replica-period
# Replicas send PINGs to server in a predefined interval. It's possible to
# change this interval with the repl_ping_replica_period option. The default
# value is 10 seconds.
#
# repl-ping-replica-period 10
备机向主机发送ping的时间周期,默认是10s
7、repl-timeout
# The following option sets the replication timeout for:
#
# 1) Bulk transfer I/O during SYNC, from the point of view of replica.
# 2) Master timeout from the point of view of replicas (data, pings).
# 3) Replica timeout from the point of view of masters (REPLCONF ACK pings).
#
# It is important to make sure that this value is greater than the value
# specified for repl-ping-replica-period otherwise a timeout will be detected
# every time there is low traffic between the master and the replica.
#
# repl-timeout 60
复制同步过程中的超时时间
备机收到rdb的超时时间
备机接收数据或者主机ping的超时时间
主机接收备机的REPLCONF ACK ping超时时间
8、repl-disable-tcp-nodelay
# Disable TCP_NODELAY on the replica socket after SYNC?
#
# If you select "yes" Redis will use a smaller number of TCP packets and
# less bandwidth to send data to replicas. But this can add a delay for
# the data to appear on the replica side, up to 40 milliseconds with
# Linux kernels using a default configuration.
#
# If you select "no" the delay for data to appear on the replica side will
# be reduced but more bandwidth will be used for replication.
#
# By default we optimize for low latency, but in very high traffic conditions
# or when the master and replicas are many hops away, turning this to "yes" may
# be a good idea.
repl-disable-tcp-nodelay no
如果开启,则会把小包聚合成打包发送,会有40ms延迟,可以降低带宽,但是会造成数据不一致
9、repl-backlog-size
# Set the replication backlog size. The backlog is a buffer that accumulates
# replica data when replicas are disconnected for some time, so that when a
# replica wants to reconnect again, often a full resync is not needed, but a
# partial resync is enough, just passing the portion of data the replica
# missed while disconnected.
#
# The bigger the replication backlog, the longer the time the replica can be
# disconnected and later be able to perform a partial resynchronization.
#
# The backlog is only allocated once there is at least a replica connected.
#
# repl-backlog-size 1mb
同步过程中的命令缓冲区,如果太小,则很难满足部分重同步,这个值越到,允许备机掉线的时间越长
10、repl-backlog-ttl
# After a master has no longer connected replicas for some time, the backlog
# will be freed. The following option configures the amount of seconds that
# need to elapse, starting from the time the last replica disconnected, for
# the backlog buffer to be freed.
#
# Note that replicas never free the backlog for timeout, since they may be
# promoted to masters later, and should be able to correctly "partially
# resynchronize" with the replicas: hence they should always accumulate backlog.
#
# A value of 0 means to never release the backlog.
#
# repl-backlog-ttl 3600
当没有备机同步是缓冲区需要保留的时长,为0表示不释放
11、replica-priority
# The replica priority is an integer number published by Redis in the INFO
# output. It is used by Redis Sentinel in order to select a replica to promote
# into a master if the master is no longer working correctly.
#
# A replica with a low priority number is considered better for promotion, so
# for instance if there are three replicas with priority 10, 100, 25 Sentinel
# will pick the one with priority 10, that is the lowest.
#
# However a special priority of 0 marks the replica as not able to perform the
# role of master, so a replica with priority of 0 will never be selected by
# Redis Sentinel for promotion.
#
# By default the priority is 100.
replica-priority 100
备机的优先级,用来指定备机能否当选主
12、min-replicas-to-write、min-replicas-max-lag
# It is possible for a master to stop accepting writes if there are less than
# N replicas connected, having a lag less or equal than M seconds.
#
# The N replicas need to be in "online" state.
#
# The lag in seconds, that must be <= the specified value, is calculated from
# the last ping received from the replica, that is usually sent every second.
#
# This option does not GUARANTEE that N replicas will accept the write, but
# It is possible for a master to stop accepting writes if there are less than
# N replicas connected, having a lag less or equal than M seconds.
#
# The N replicas need to be in "online" state.
#
# The lag in seconds, that must be <= the specified value, is calculated from
# the last ping received from the replica, that is usually sent every second.
#
# This option does not GUARANTEE that N replicas will accept the write, but
# will limit the window of exposure for lost writes in case not enough replicas
# are available, to the specified number of seconds.
#
# For example to require at least 3 replicas with a lag(滞后) <= 10 seconds use:
#
# min-replicas-to-write 3
# min-replicas-max-lag 10
为了数据的安全性,对于写请求,至少要求的副本数,和同步延迟时间
三、主从复制的过程
在主从复制的过程,主机和备机之间需要多次的交互协商,确保对端状态正常后才同步数据,这个过程成为handshake
源码代码中定义同步状态码
/* Slave replication state. Used in server.repl_state for slaves to remember
* what to do next. */
#define REPL_STATE_NONE 0 /* No active replication */
#define REPL_STATE_CONNECT 1 /* Must connect to master */
#define REPL_STATE_CONNECTING 2 /* Connecting to master */
/* --- Handshake states, must be ordered --- */
#define REPL_STATE_RECEIVE_PONG 3 /* Wait for PING reply */
#define REPL_STATE_SEND_AUTH 4 /* Send AUTH to master */
#define REPL_STATE_RECEIVE_AUTH 5 /* Wait for AUTH reply */
#define REPL_STATE_SEND_PORT 6 /* Send REPLCONF listening-port */
#define REPL_STATE_RECEIVE_PORT 7 /* Wait for REPLCONF reply */
#define REPL_STATE_SEND_IP 8 /* Send REPLCONF ip-address */
#define REPL_STATE_RECEIVE_IP 9 /* Wait for REPLCONF reply */
#define REPL_STATE_SEND_CAPA 10 /* Send REPLCONF capa */
#define REPL_STATE_RECEIVE_CAPA 11 /* Wait for REPLCONF reply */
#define REPL_STATE_SEND_PSYNC 12 /* Send PSYNC */
#define REPL_STATE_RECEIVE_PSYNC 13 /* Wait for PSYNC reply */
/* --- End of handshake states --- */
#define REPL_STATE_TRANSFER 14 /* Receiving .rdb from master */
#define REPL_STATE_CONNECTED 15 /* Connected to master */
其中handshake部分为协商部分
redis的主从完整的同步过程可以分为7步
1、设置主服务器的地址和端口
当在备机上执行slaveof
命令时,备机上的命令处理函数replicaofCommand
会调用replicationSetMaster**(c->argv[1]->ptr, port)
来保存主机的地址和端口,备机返回给客户端ok
应答,同时同步进入REPL_STATE_CONNECT状态
/* Set replication to the specified master address and port. */
void replicationSetMaster(char *ip, int port) {
int was_master = server.masterhost == NULL;
sdsfree(server.masterhost);
server.masterhost = sdsnew(ip);
server.masterport = port; /*保存主机的地址端口*/
if (server.master) {
freeClient(server.master);
}
disconnectAllBlockedClients(); /* Clients blocked in master, now slave. */
/* Force our slaves to resync with us as well. They may hopefully be able
* to partially resync with us, but we can notify the replid change. */
disconnectSlaves();
cancelReplicationHandshake(); /* 这里断开了已有的同步,所以如果超时时间太短,则每次都会进到这里 */
/* Before destroying our master state, create a cached master using
* our own parameters, to later PSYNC with the new master. */
if (was_master) {
replicationDiscardCachedMaster();
replicationCacheMasterUsingMyself();
}
/* Fire the role change modules event. */
moduleFireServerEvent(REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED,
REDISMODULE_EVENT_REPLROLECHANGED_NOW_REPLICA,
NULL);
/* Fire the master link modules event. */
if (server.repl_state == REPL_STATE_CONNECTED)
moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE,
REDISMODULE_SUBEVENT_MASTER_LINK_DOWN,
NULL);
server.repl_state = REPL_STATE_CONNECT;
}
2、建立socket连接
上一步设置了主机的地址端口,设置完后命令就返回了,同步进入到REPL_STATE_CONNECT状态
我们知道,redis有一个定时器serverCron
,在这个函数里面,以1s为周期的触发同步的定时器,换句话说,在执行完slaveof
命令后最多要1s才能真正开始同步过程
int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
/* Replication cron function -- used to reconnect to master,
* detect transfer failures, start background RDB transfers and so forth. */
run_with_period(1000) replicationCron();
}
replicationCron
函数内部是整个同步状态的流转
void replicationCron(void) {
/* Check if we should connect to a MASTER */
if (server.repl_state == REPL_STATE_CONNECT) { /* 上一步是CONNECT*/
serverLog(LL_NOTICE,"Connecting to MASTER %s:%d",
server.masterhost, server.masterport);
if (connectWithMaster() == C_OK) { /* 进入下一步,建立连接 */
serverLog(LL_NOTICE,"MASTER <-> REPLICA sync started");
}
}
}
int connectWithMaster(void) {
server.repl_transfer_s = server.tls_replication ? connCreateTLS() : connCreateSocket();
if (connConnect(server.repl_transfer_s, server.masterhost, server.masterport,
NET_FIRST_BIND_ADDR, syncWithMaster) == C_ERR) { /* 设置了syncWithMaster回调*/
serverLog(LL_WARNING,"Unable to connect to MASTER: %s",
connGetLastError(server.repl_transfer_s));
connClose(server.repl_transfer_s);
server.repl_transfer_s = NULL;
return C_ERR;
}
server.repl_transfer_lastio = server.unixtime;
server.repl_state = REPL_STATE_CONNECTING;
return C_OK;
}
和主机建立socket连接,进入到REPL_STATE_CONNECTING状态
3、发送ping
上一步与主机建立完socket连接后,同步进入到了REPL_STATE_CONNECTING状态,且设置了syncWithMaster
回调处理函数
在connect成功后,备机向主机发送了一个ping
命令,同时备机设置了syncWithMaster
作为读回调的处理函数,同时同步进入到了REPL_STATE_RECEIVE_PONG状态
syncWithMaster
内部实现了同步时的handshake和同步的过程
/* This handler fires when the non blocking connect was able to
* establish a connection with the master. */
void syncWithMaster(connection *conn) {
/* Send a PING to check the master is able to reply without errors. */
if (server.repl_state == REPL_STATE_CONNECTING) {
serverLog(LL_NOTICE,"Non blocking connect for SYNC fired the event.");
/* Delete the writable event so that the readable event remains
* registered and we can wait for the PONG reply. */
connSetReadHandler(conn, syncWithMaster); /* 设置读回调仍然是syncWithMaster */
connSetWriteHandler(conn, NULL);
server.repl_state = REPL_STATE_RECEIVE_PONG; /* 进入RECEIVE_PONG状态 */
/* Send the PING, don't check for errors at all, we have the timeout
* that will take care about this. */
err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"PING",NULL); /* 发送ping */
if (err) goto write_error;
return;
}
}
4、身份验证
在收到主机的ping
应答时,备机进入REPL_STATE_SEND_AUTH状态
进入REPL_STATE_SEND_AUTH如果开启了auth认证,则备机会发送auth,同步进入到REPL_STATE_RECEIVE_AUTH状态
REPL_STATE_RECEIVE_AUTH状态后备机接收主机的认证结果,同步进入到REPL_STATE_SEND_PORT状态
如果没有开启auth认证,则同步直接进入到REPL_STATE_SEND_PORT状态
void syncWithMaster(connection *conn) {
/* Receive the PONG command. */
if (server.repl_state == REPL_STATE_RECEIVE_PONG) {
err = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL); /* 收取ping的回包 */
/* We accept only two replies as valid, a positive +PONG reply
* (we just check for "+") or an authentication error.
* Note that older versions of Redis replied with "operation not
* permitted" instead of using a proper error code, so we test
* both. */
if (err[0] != '+' &&
strncmp(err,"-NOAUTH",7) != 0 &&
strncmp(err,"-ERR operation not permitted",28) != 0)
{
serverLog(LL_WARNING,"Error reply to PING from master: '%s'",err);
sdsfree(err);
goto error;
} else {
serverLog(LL_NOTICE,
"Master replied to PING, replication can continue...");
}
sdsfree(err);
server.repl_state = REPL_STATE_SEND_AUTH; /* 进入SEND_AUTH状态 */
}
/* AUTH with the master if required. */
/* 紧接着开始auth认证 */
if (server.repl_state == REPL_STATE_SEND_AUTH) {
if (server.masteruser && server.masterauth) {
err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"AUTH",
server.masteruser,server.masterauth,NULL); /* auth认证 */
if (err) goto write_error;
server.repl_state = REPL_STATE_RECEIVE_AUTH;
return;
} else if (server.masterauth) {
err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"AUTH",server.masterauth,NULL); /* auth认证 */
if (err) goto write_error;
server.repl_state = REPL_STATE_RECEIVE_AUTH;
return;
} else {
server.repl_state = REPL_STATE_SEND_PORT;
}
}
/* Receive AUTH reply. */
if (server.repl_state == REPL_STATE_RECEIVE_AUTH) {
err = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL); /* auth认证结果 */
if (err[0] == '-') {
serverLog(LL_WARNING,"Unable to AUTH to MASTER: %s",err);
sdsfree(err);
goto error;
}
sdsfree(err);
server.repl_state = REPL_STATE_SEND_PORT;
}
}
5、发送端口信息
为什么需要备机发送一次端口信息呢?主机根据socket连接也是能够获取到备机的连接端口的
进入REPL_STATE_SEND_PORT状态后备机会将自己的端口发送给主机,同步进入到REPL_STATE_RECEIVE_PORT状态
进入REPL_STATE_RECEIVE_PORT状态后,备机接收主机的应答,由于某些版本不支持,所以忽略错误,同步进入到REPL_STATE_SEND_IP状态
void syncWithMaster(connection *conn) {
/* Set the slave port, so that Master's INFO command can list the
* slave listening port correctly. */
if (server.repl_state == REPL_STATE_SEND_PORT) {
int port;
if (server.slave_announce_port) port = server.slave_announce_port;
else if (server.tls_replication && server.tls_port) port = server.tls_port;
else port = server.port;
sds portstr = sdsfromlonglong(port);
err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"REPLCONF",
"listening-port",portstr, NULL); /* 发送端口信息 */
sdsfree(portstr);
if (err) goto write_error;
sdsfree(err);
server.repl_state = REPL_STATE_RECEIVE_PORT;
return;
}
/* Receive REPLCONF listening-port reply. */
if (server.repl_state == REPL_STATE_RECEIVE_PORT) {
err = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL);
/* Ignore the error if any, not all the Redis versions support
* REPLCONF listening-port. */
if (err[0] == '-') {
serverLog(LL_NOTICE,"(Non critical) Master does not understand "
"REPLCONF listening-port: %s", err);
}
sdsfree(err);
server.repl_state = REPL_STATE_SEND_IP;
}
}
6、发送ip信息
当备机发送端口后同步进入到REPL_STATE_SEND_IP状态
进入REPL_STATE_SEND_IP状态后,如果当前没有设置slave-announce-ip
则跳过ip发送,同步进入到REPL_STATE_SEND_CAPA状态
如果设置了slave-announce-ip
,则发送给主机,备机收取回包后忽略错误,同步进入到REPL_STATE_SEND_CAPA状态
void syncWithMaster(connection *conn) {
/* Skip REPLCONF ip-address if there is no slave-announce-ip option set. */
if (server.repl_state == REPL_STATE_SEND_IP &&
server.slave_announce_ip == NULL)
{
server.repl_state = REPL_STATE_SEND_CAPA; /* 如果没有announce-ip,则直接跳过*/
}
/* Set the slave ip, so that Master's INFO command can list the
* slave IP address port correctly in case of port forwarding or NAT. */
if (server.repl_state == REPL_STATE_SEND_IP) {
err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"REPLCONF",
"ip-address",server.slave_announce_ip, NULL);
if (err) goto write_error;
sdsfree(err);
server.repl_state = REPL_STATE_RECEIVE_IP;
return;
}
/* Receive REPLCONF ip-address reply. */
if (server.repl_state == REPL_STATE_RECEIVE_IP) {
err = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL);
/* Ignore the error if any, not all the Redis versions support
* REPLCONF listening-port. */
if (err[0] == '-') {
serverLog(LL_NOTICE,"(Non critical) Master does not understand "
"REPLCONF ip-address: %s", err);
}
sdsfree(err);
server.repl_state = REPL_STATE_SEND_CAPA;
}
}
7、同步
进入REPL_STATE_SEND_CAPA状态后,才算开始同步了,但是由于同步算法一直有改进,所以需要先判断一下主机支持的同步算法,对于不支持的同步算法,主机是直接忽略的
在接收REPL_STATE_SEND_CAPA的应答后,同步进入到REPL_STATE_SEND_PSYNC状态
void syncWithMaster(connection *conn) {
/* Inform the master of our (slave) capabilities.
*
* EOF: supports EOF-style RDB transfer for diskless replication.
* PSYNC2: supports PSYNC v2, so understands +CONTINUE <new repl ID>.
*
* The master will ignore capabilities it does not understand. */
if (server.repl_state == REPL_STATE_SEND_CAPA) {
err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"REPLCONF",
"capa","eof","capa","psync2",NULL); /* 是否支持eof和psync2 */
if (err) goto write_error;
sdsfree(err);
server.repl_state = REPL_STATE_RECEIVE_CAPA;
return;
}
/* Receive CAPA reply. */
if (server.repl_state == REPL_STATE_RECEIVE_CAPA) {
err = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL);
/* Ignore the error if any, not all the Redis versions support
* REPLCONF capa. */
if (err[0] == '-') {
serverLog(LL_NOTICE,"(Non critical) Master does not understand "
"REPLCONF capa: %s", err);
}
sdsfree(err);
server.repl_state = REPL_STATE_SEND_PSYNC;
}
/* Try a partial resynchonization. If we don't have a cached master
* slaveTryPartialResynchronization() will at least try to use PSYNC
* to start a full resynchronization so that we get the master run id
* and the global offset, to try a partial resync at the next
* reconnection attempt. */
if (server.repl_state == REPL_STATE_SEND_PSYNC) {
if (slaveTryPartialResynchronization(conn,0) == PSYNC_WRITE_ERROR) { /* 尝试部分重同步,发送psync请求 */
err = sdsnew("Write error sending the PSYNC command.");
goto write_error;
}
server.repl_state = REPL_STATE_RECEIVE_PSYNC;
return;
}
psync_result = slaveTryPartialResynchronization(conn,1); /* 接收psync的应答,如果是continue,则说明可以部分重同步*/
if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */
/* If the master is in an transient error, we should try to PSYNC
* from scratch later, so go to the error path. This happens when
* the server is loading the dataset or is not connected with its
* master and so forth. */
if (psync_result == PSYNC_TRY_LATER) goto error;
/* Note: if PSYNC does not return WAIT_REPLY, it will take care of
* uninstalling the read handler from the file descriptor. */
if (psync_result == PSYNC_CONTINUE) { /*可以部分重同步*/
serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Master accepted a Partial Resynchronization.");
if (server.supervised_mode == SUPERVISED_SYSTEMD) {
redisCommunicateSystemd("STATUS=MASTER <-> REPLICA sync: Partial Resynchronization accepted. Ready to accept connections.\n");
redisCommunicateSystemd("READY=1\n");
}
return;
}
///////////////////////////////////////////////////////////
if (psync_result == PSYNC_NOT_SUPPORTED) { /* 如果主机不支持,则用sync发起全量同步*/
serverLog(LL_NOTICE,"Retrying with SYNC...");
if (connSyncWrite(conn,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) {
serverLog(LL_WARNING,"I/O error writing to MASTER: %s",
strerror(errno));
goto error;
}
}
/* Prepare a suitable temp file for bulk transfer */
if (!useDisklessLoad()) { /* 有盘复制,创建tmp文件 */
while(maxtries--) {
snprintf(tmpfile,256,
"temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid());
dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644);
if (dfd != -1) break;
sleep(1);
}
if (dfd == -1) { /* 无盘复制 */
serverLog(LL_WARNING,"Opening the temp file needed for MASTER <-> REPLICA synchronization: %s",strerror(errno));
goto error;
}
server.repl_transfer_tmpfile = zstrdup(tmpfile);
server.repl_transfer_fd = dfd;
}
/* Setup the non blocking download of the bulk file. */
/* 在slaveTryPartialResynchronization 中已经重置了readHandler,而且不同的消息对应不同的处理函数 */
if (connSetReadHandler(conn, readSyncBulkPayload) == C_ERR) /* 全量rdb同步,设置接收回调函数readSyncBulkPayload */
{
char conninfo[CONN_INFO_LEN];
serverLog(LL_WARNING,
"Can't create readable event for SYNC: %s (%s)",
strerror(errno), connGetInfo(conn, conninfo, sizeof(conninfo)));
goto error;
}
}
slaveTryPartialResynchronization
函数的判断能否进行部分重同步,并且返回了需要同步的方式
如果需要全量同步,则重新设置读回调readSyncBulkPayload
#define PSYNC_WRITE_ERROR 0
#define PSYNC_WAIT_REPLY 1
#define PSYNC_CONTINUE 2
#define PSYNC_FULLRESYNC 3
#define PSYNC_NOT_SUPPORTED 4
#define PSYNC_TRY_LATER 5
int slaveTryPartialResynchronization(connection *conn, int read_reply) {
if (!read_reply) {
/* Issue the PSYNC command */
/* 发送psync请求*/
reply = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"PSYNC",psync_replid,psync_offset,NULL);
if (reply != NULL) {
serverLog(LL_WARNING,"Unable to send PSYNC to master: %s",reply);
sdsfree(reply);
connSetReadHandler(conn, NULL);
return PSYNC_WRITE_ERROR;
}
return PSYNC_WAIT_REPLY;
}
/* Reading half */
reply = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL);
if (sdslen(reply) == 0) {
/* The master may send empty newlines after it receives PSYNC
* and before to reply, just to keep the connection alive. */
sdsfree(reply);
return PSYNC_WAIT_REPLY;
}
connSetReadHandler(conn, NULL);
if (!strncmp(reply,"+FULLRESYNC",11)) {
/* */
return PSYNC_FULLRESYNC;
}
if (!strncmp(reply,"+CONTINUE",9)) {
/* Partial resync was accepted. */
return PSYNC_CONTINUE;
}
return PSYNC_NOT_SUPPORTED;
}
如果需要全量同,则进入FULLRESYNC
8、命令传播
在aof复制的时候了解到,客户端的所有命令会调用propagate
函数来将命令扩散出去
void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc,
int flags)
{
if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF)
feedAppendOnlyFile(cmd,dbid,argv,argc);
if (flags & PROPAGATE_REPL)
replicationFeedSlaves(server.slaves,dbid,argv,argc); // 扩散到备机上
}
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) {
/* Send SELECT command to every slave if needed. */
if (server.slaveseldb != dictid) {
if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd); /* 如果数据库变了,先生成一条select db命令*/
/* Send it to slaves. */
listRewind(slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
addReply(slave,selectcmd);
}
}
/* Write the command to the replication backlog if any. */
/* 将命令写入到backlog */
if (server.repl_backlog) {
char aux[LONG_STR_SIZE+3];
/* Add the multi bulk reply length. */
aux[0] = '*';
len = ll2string(aux+1,sizeof(aux)-1,argc);
aux[len+1] = '\r';
aux[len+2] = '\n';
feedReplicationBacklog(aux,len+3);
for (j = 0; j < argc; j++) {
long objlen = stringObjectLen(argv[j]);
/* We need to feed the buffer with the object as a bulk reply
* not just as a plain string, so create the $..CRLF payload len
* and add the final CRLF */
aux[0] = '$';
len = ll2string(aux+1,sizeof(aux)-1,objlen);
aux[len+1] = '\r';
aux[len+2] = '\n';
feedReplicationBacklog(aux,len+3);
feedReplicationBacklogWithObject(argv[j]);
feedReplicationBacklog(aux+len+1,2);
}
}
/* Write the command to every slave. */
/* 将命令发送到所有slave上 */
listRewind(slaves,&li);
while((ln = listNext(&li))) {
client *slave = ln->value;
/* Don't feed slaves that are still waiting for BGSAVE to start. */
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue;
/* Feed slaves that are waiting for the initial SYNC (so these commands
* are queued in the output buffer until the initial SYNC completes),
* or are already in sync with the master. */
/* Add the multi bulk length. */
addReplyArrayLen(slave,argc);
/* Finally any additional argument that was not stored inside the
* static buffer if any (from j to argc). */
for (j = 0; j < argc; j++)
addReplyBulk(slave,argv[j]);
}
}
主机通过replicationFeedSlaves
函数将命令写到backlog,并且发送给slave
四、syncCommand与slaveTryPartialResynchronization
syncCommand
与slaveTryPartialResynchronization
可以说是在主备同步过程中主机和备机的关键实现部分
syncCommand
中实现了SYNC和PSYNC两个命令
/* SYNC and PSYNC command implemenation. */
void syncCommand(client *c) {
/* ignore SYNC if already slave or in monitor mode */
if (c->flags & CLIENT_SLAVE) return;
/* Refuse SYNC requests if we are a slave but the link with our master
* is not ok... */
/* 备机状态还没进入到CONNECTED,则返回错误 */
if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED) {
addReplySds(c,sdsnew("-NOMASTERLINK Can't SYNC while not connected with my master\r\n"));
return;
}
/* SYNC can't be issued when the server has pending data to send to
* the client about already issued commands. We need a fresh reply
* buffer registering the differences between the BGSAVE and the current
* dataset, so that we can copy to other slaves if needed. */
/* 备机的一些应答数据包还没有发出去 */
if (clientHasPendingReplies(c)) {
addReplyError(c,"SYNC and PSYNC are invalid with pending output");
return;
}
serverLog(LL_NOTICE,"Replica %s asks for synchronization",
replicationGetSlaveName(c));
/* Try a partial resynchronization if this is a PSYNC command.
* If it fails, we continue with usual full resynchronization, however
* when this happens masterTryPartialResynchronization() already
* replied with:
*
* +FULLRESYNC <replid> <offset>
*
* So the slave knows the new replid and offset to try a PSYNC later
* if the connection with the master is lost. */
/* 如果是psync,尝试部分重同步 */
if (!strcasecmp(c->argv[0]->ptr,"psync")) {
/* masterTryPartialResynchronization 函数中根据replid和offset判断是否可以部分重同步 */
if (masterTryPartialResynchronization(c) == C_OK) {
server.stat_sync_partial_ok++;
return; /* No full resync needed, return. */
} else {
char *master_replid = c->argv[1]->ptr;
/* Increment stats for failed PSYNCs, but only if the
* replid is not "?", as this is used by slaves to force a full
* resync on purpose when they are not albe to partially
* resync. */
if (master_replid[0] != '?') server.stat_sync_partial_err++;
}
} else {
/* If a slave uses SYNC, we are dealing with an old implementation
* of the replication protocol (like redis-cli --slave). Flag the client
* so that we don't expect to receive REPLCONF ACK feedbacks. */
c->flags |= CLIENT_PRE_PSYNC;
}
/* Full resynchronization. */
server.stat_sync_full++;
/* Setup the slave as one waiting for BGSAVE to start. The following code
* paths will change the state if we handle the slave differently. */
c->replstate = SLAVE_STATE_WAIT_BGSAVE_START;
if (server.repl_disable_tcp_nodelay)
connDisableTcpNoDelay(c->conn); /* Non critical if it fails. */
c->repldbfd = -1;
c->flags |= CLIENT_SLAVE;
listAddNodeTail(server.slaves,c); /* 将client添加到slaves列表 */
/* Create the replication backlog if needed. */
/* 如果当前没有backlog,则创建*/
if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) {
/* When we create the backlog from scratch, we always use a new
* replication ID and clear the ID2, since there is no valid
* past history. */
changeReplicationId(); /* 清空replid1、replid2和backlog*/
clearReplicationId2();
createReplicationBacklog();
}
/* CASE 1: BGSAVE is in progress, with disk target. */
/* 如果当前正在执行bgsave,且rdb文件落盘 */
if (server.rdb_child_pid != -1 &&
server.rdb_child_type == RDB_CHILD_TYPE_DISK)
{
/* Ok a background save is in progress. Let's check if it is a good
* one for replication, i.e. if there is another slave that is
* registering differences since the server forked to save. */
client *slave;
listNode *ln;
listIter li;
listRewind(server.slaves,&li);
while((ln = listNext(&li))) {
slave = ln->value;
if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) break;
}
/* To attach this slave, we check that it has at least all the
* capabilities of the slave that triggered the current BGSAVE. */
if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa)) {
/* Perfect, the server is already registering differences for
* another slave. Set the right state, and copy the buffer. */
copyClientOutputBuffer(c,slave);
replicationSetupSlaveForFullResync(c,slave->psync_initial_offset);
serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC");
} else {
/* No way, we need to wait for the next BGSAVE in order to
* register differences. */
serverLog(LL_NOTICE,"Can't attach the replica to the current BGSAVE. Waiting for next BGSAVE for SYNC");
}
/* CASE 2: BGSAVE is in progress, with socket target. */
} else if (server.rdb_child_pid != -1 &&
server.rdb_child_type == RDB_CHILD_TYPE_SOCKET)
{
/* There is an RDB child process but it is writing directly to
* children sockets. We need to wait for the next BGSAVE
* in order to synchronize. */
serverLog(LL_NOTICE,"Current BGSAVE has socket target. Waiting for next BGSAVE for SYNC");
/* CASE 3: There is no BGSAVE is progress. */
/* 新起一个bgsave */
} else {
if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF)) {
/* Diskless replication RDB child is created inside
* replicationCron() since we want to delay its start a
* few seconds to wait for more slaves to arrive. */
if (server.repl_diskless_sync_delay)
serverLog(LL_NOTICE,"Delay next BGSAVE for diskless SYNC");
} else {
/* Target is disk (or the slave is not capable of supporting
* diskless replication) and we don't have a BGSAVE in progress,
* let's start one. */
if (!hasActiveChildProcess()) {
startBgsaveForReplication(c->slave_capa);
} else {
serverLog(LL_NOTICE,
"No BGSAVE in progress, but another BG operation is active. "
"BGSAVE for replication delayed");
}
}
}
return;
}
备机在slaveTryPartialResynchronization
函数中对主机的同步信令做了解析,判断能否部分同步
#define PSYNC_WRITE_ERROR 0
#define PSYNC_WAIT_REPLY 1
#define PSYNC_CONTINUE 2
#define PSYNC_FULLRESYNC 3
#define PSYNC_NOT_SUPPORTED 4
#define PSYNC_TRY_LATER 5
int slaveTryPartialResynchronization(connection *conn, int read_reply) {
char *psync_replid;
char psync_offset[32];
sds reply;
/* Writing half */
if (!read_reply) {
/* Initially set master_initial_offset to -1 to mark the current
* master run_id and offset as not valid. Later if we'll be able to do
* a FULL resync using the PSYNC command we'll set the offset at the
* right value, so that this information will be propagated to the
* client structure representing the master into server.master. */
server.master_initial_offset = -1;
if (server.cached_master) {
/* cached_master保存的断连之前的master信息 */
psync_replid = server.cached_master->replid;
snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1);
serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_replid, psync_offset);
} else {
serverLog(LL_NOTICE,"Partial resynchronization not possible (no cached master)");
/* 如果备机没有同步过,在replid=?,offset=-1*/
psync_replid = "?";
memcpy(psync_offset,"-1",3);
}
/* Issue the PSYNC command */
/* 备机发送psync命令,带上replid和offset*/
reply = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"PSYNC",psync_replid,psync_offset,NULL);
if (reply != NULL) {
serverLog(LL_WARNING,"Unable to send PSYNC to master: %s",reply);
sdsfree(reply);
connSetReadHandler(conn, NULL);
return PSYNC_WRITE_ERROR;
}
return PSYNC_WAIT_REPLY;
}
/* Reading half */
/* 主机可能会发送\n,忽略*/
reply = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL);
if (sdslen(reply) == 0) {
/* The master may send empty newlines after it receives PSYNC
* and before to reply, just to keep the connection alive. */
sdsfree(reply);
return PSYNC_WAIT_REPLY;
}
connSetReadHandler(conn, NULL);
/* 如果主机返回了FUNN-RESYNC,则说明是全量同步,备机重置replid和offset*/
if (!strncmp(reply,"+FULLRESYNC",11)) {
char *replid = NULL, *offset = NULL;
/* FULL RESYNC, parse the reply in order to extract the run id
* and the replication offset. */
replid = strchr(reply,' ');
if (replid) {
replid++;
offset = strchr(replid,' ');
if (offset) offset++;
}
if (!replid || !offset || (offset-replid-1) != CONFIG_RUN_ID_SIZE) {
serverLog(LL_WARNING,
"Master replied with wrong +FULLRESYNC syntax.");
/* This is an unexpected condition, actually the +FULLRESYNC
* reply means that the master supports PSYNC, but the reply
* format seems wrong. To stay safe we blank the master
* replid to make sure next PSYNCs will fail. */
memset(server.master_replid,0,CONFIG_RUN_ID_SIZE+1);
} else {
memcpy(server.master_replid, replid, offset-replid-1);
server.master_replid[CONFIG_RUN_ID_SIZE] = '\0';
server.master_initial_offset = strtoll(offset,NULL,10);
serverLog(LL_NOTICE,"Full resync from master: %s:%lld",
server.master_replid,
server.master_initial_offset);
}
/* We are going to full resync, discard the cached master structure. */
/* 如果备机有cached_master,则释放,因为再用不到了 */
replicationDiscardCachedMaster();
sdsfree(reply);
return PSYNC_FULLRESYNC;
}
/* 返回continue,则说明部分同步*/
if (!strncmp(reply,"+CONTINUE",9)) {
/* Partial resync was accepted. */
serverLog(LL_NOTICE,
"Successful partial resynchronization with master.");
/* Check the new replication ID advertised by the master. If it
* changed, we need to set the new ID as primary ID, and set or
* secondary ID as the old master ID up to the current offset, so
* that our sub-slaves will be able to PSYNC with us after a
* disconnection. */
char *start = reply+10;
char *end = reply+9;
while(end[0] != '\r' && end[0] != '\n' && end[0] != '\0') end++;
if (end-start == CONFIG_RUN_ID_SIZE) {
char new[CONFIG_RUN_ID_SIZE+1];
memcpy(new,start,CONFIG_RUN_ID_SIZE);
new[CONFIG_RUN_ID_SIZE] = '\0';
/* 如果备机发现新主机的replid_id和自己发送的不一样,则备机更新 */
if (strcmp(new,server.cached_master->replid)) {
/* Master ID changed. */
serverLog(LL_WARNING,"Master replication ID changed to %s",new);
/* Set the old ID as our ID2, up to the current offset+1. */
/* 把旧的replid保存在server.replid2当中 */
memcpy(server.replid2,server.cached_master->replid,
sizeof(server.replid2));
server.second_replid_offset = server.master_repl_offset+1;
/* Update the cached master ID and our own primary ID to the
* new one. */
memcpy(server.replid,new,sizeof(server.replid));
memcpy(server.cached_master->replid,new,sizeof(server.replid));
/* Disconnect all the sub-slaves: they need to be notified. */
/* 强制让自己的slave断开重连,这里有无必要?? */
disconnectSlaves();
}
}
/* Setup the replication to continue. */
sdsfree(reply);
replicationResurrectCachedMaster(conn);
/* If this instance was restarted and we read the metadata to
* PSYNC from the persistence file, our replication backlog could
* be still not initialized. Create it. */
if (server.repl_backlog == NULL) createReplicationBacklog();
return PSYNC_CONTINUE;
}
/* If we reach this point we received either an error (since the master does
* not understand PSYNC or because it is in a special state and cannot
* serve our request), or an unexpected reply from the master.
*
* Return PSYNC_NOT_SUPPORTED on errors we don't understand, otherwise
* return PSYNC_TRY_LATER if we believe this is a transient error. */
if (!strncmp(reply,"-NOMASTERLINK",13) ||
!strncmp(reply,"-LOADING",8))
{
serverLog(LL_NOTICE,
"Master is currently unable to PSYNC "
"but should be in the future: %s", reply);
sdsfree(reply);
return PSYNC_TRY_LATER;
}
if (strncmp(reply,"-ERR",4)) {
/* If it's not an error, log the unexpected event. */
serverLog(LL_WARNING,
"Unexpected reply to PSYNC from master: %s", reply);
} else {
serverLog(LL_NOTICE,
"Master does not support PSYNC or is in "
"error state (reply: %s)", reply);
}
sdsfree(reply);
replicationDiscardCachedMaster();
return PSYNC_NOT_SUPPORTED;
}
关于cached_master的说明:
当主备之间的连接断开时,备机在释放连接的时候会将主机(视为一个client)保存到cached_master中,
void freeClient(client *c) {
/* If it is our master that's beging disconnected we should make sure
* to cache the state to try a partial resynchronization later.
*
* Note that before doing this we make sure that the client is not in
* some unexpected state, by checking its flags. */
if (server.master && c->flags & CLIENT_MASTER) {
serverLog(LL_WARNING,"Connection with master lost.");
if (!(c->flags & (CLIENT_CLOSE_AFTER_REPLY|
CLIENT_CLOSE_ASAP|
CLIENT_BLOCKED)))
{
replicationCacheMaster(c);
return;
}
}
}
void replicationCacheMaster(client *c) {
/* Save the master. Server.master will be set to null later by
* replicationHandleMasterDisconnection(). */
server.cached_master = server.master;
}
上一篇: redis源码分析——10、主从同步
下一篇: redis源码分析——8、RDB持久化