您现在的位置是: 网站首页 > 程序设计  > redis 

redis源码分析——10、主从同步

2022年1月19日 23:47 4012人围观

简介为什么要主从复制?一方面是为了读写分离,而且由于持久化对性能有损耗,可以在备机上持久化。除此也是为了数据的容灾,redis的内存数据库,一旦服务或机器宕机,数据就丢失了,可以通过增加副本来让数据多几个拷贝,提高安全性。

为什么要主从复制?一方面是为了读写分离,而且由于持久化对性能有损耗,可以在备机上持久化。除此也是为了数据的容灾,redis的内存数据库,一旦服务或机器宕机,数据就丢失了,可以通过增加副本来让数据多几个拷贝,提高安全性。

思考:

  1. 我们知道redis有一些命令的操作是不确定的,比如spop是随机删除一个元素,对于这种命令,主从同步的时候是怎么传递的?
  2. 如果备机的机器时间比主机的快,在备机上能读到自己认为已经过期的数据吗?

一、主从复制的原理

主从复制的过程可以分为两大步——全量同步和命令传播

全量同步指的是将备机的数据更新到和主机数据一致的状态

命令同步指的是对于可能引起数据变化的key,主机会将操作命令同步到备机上

1、旧版复制

旧版(2.8以前)的复制严格按照上面两个过来的

a. 备机向主机发送一个sync命令

b. 主机收到sync后开始执行bgsave命令生成一个rdb文件,同时用一个缓冲区记录该时间段的写命令

c. 主服务将生成的rdb文件发送到备机上,备机加载rdb文件,将数据更新到主机生成rdb文件时刻状态

d. 主机将缓冲区的写命令发送到备机,主备之间数据完全一致

2、旧版复制的缺陷

网络是不稳定的,主备之间的网络可能会由于网络的波动二断开,而一点断开重连,那又要执行一次完整的同步,而生成rdb是和耗资源的,但是网络波动在实际情况下又很常见

3、新版复制

针对旧版本复制的缺陷,redis在2.8以后引入了部分同步的概念,用psync代替了sync来执行同步操作

psync具有完整重同步和部分重同步两种模式

完整重同步和旧版的同步类似,先发送rdb,在发送命令缓冲区

部分重同步允许主备之间网络重连,主机维护一个命令缓冲区的循环队列,当备机断开重连时主机先判断备机当前的同步时间点,如果最后一次同步命令还在循环队列里,则主机将该时间点后面的key全部发送给备机即可,这样就省去了rdb生成和传输。当然,前提条件是复制偏移量在缓冲区

每个redis实例都有一个字节的RUN_ID,备机在向主机发起同步请求时会带上主机的RUN_ID,主机检查请求的RUN_ID和自己的RUN_ID是否一致,若相不相等,则说明备机更换的主结点,那就会触发全量同步。而现实情况是进程重启很常见,重启后RUN_ID肯定会变化

4、新版复制改进

由上可见,新版复制部分重同步的条件比较严格的,现网环境很难满足

针对新版复制的严格条件,redis在4.0引入了psync2协议,对psync进行了优化

持久化主从复制信息,主机主从复制信息持久化到rdb中(RUN_ID和复制偏移量),当主机重启加载rdb时回复主从复制信息,主从辅助的信息以辅助(aux-field)字段的格式持久化到rdb文件。(这样的假设前提是主机开启了rdb持久化,且主节点宕机后重新拉起还是主节点,这在现网有点。。。)

存储上一个主服务器的复制信息,当主机因故障而发生主备切换时——备机成为了主,原备机会将原主机的复制信息保存到replid2相关的字段中,如果有其他主机发起部分同步请求,则新主机会根据replid2相关字段决定是否部分同步

5、sync、psync、psync2对比

总的来说,sync是最简单也是效率最低的同步,只要网络异常,备机重连上主机后就会执行全量同步。

psync解决了sync的痛点,在主机上开辟了一块backlog环形队列还存放同步命令,当备机重连时可以直接从缓冲区中取命令

psync的的触发有较为严格的条件限制,每个redis实例在运行的时候都会有一个run_id来表示身份,如果备机检测到主机的run_id发生改变,那么备机也会发起全量同步

psync2的目的在于解决主备切换的情况,比如有A->B、A->C同步链路,假如A、B发生了主备切换,C的主变成了B,那么按照psync算法应该需要全量同步,但事实上并不需要,所以psync2中记录的该结点的上一个主结点的run_id,也就是说B和C有着共同的历史主结点,所以B和C可以部分同步

二、相关配置

1、replica-serve-stale-data

# When a replica loses its connection with the master, or when the replication 
# is still in progress, the replica can act in two different ways: 
# 
# 1) if replica-serve-stale-data is set to 'yes' (the default) the replica will 
#    still reply to client requests, possibly with out of date data, or the 
#    data set may just be empty if this is the first synchronization. 
# 
# 2) if replica-serve-stale-data is set to 'no' the replica will reply with 
#    an error "SYNC with master in progress" to all the kind of commands 
#    but to INFO, replicaOF, AUTH, PING, SHUTDOWN, REPLCONF, ROLE, CONFIG, 
#    SUBSCRIBE, UNSUBSCRIBE, PSUBSCRIBE, PUNSUBSCRIBE, PUBLISH, PUBSUB, 
#    COMMAND, POST, HOST: and LATENCY. 
replica-serve-stale-data yes 

当备机与主机之间的连接断开,或者同步还在进行的时候,备机可以有两种情况:

  1. 如果开启了该选项(默认开启),则备机将会相应客户端请求
  2. 如果关闭该选项,备机会给客户端回复一个SYNC with master in progress错误,除了一些特殊命令

2、replica-read-only

# You can configure a replica instance to accept writes or not. Writing against 
# a replica instance may be useful to store some ephemeral data (because data 
# written on a replica will be easily deleted after resync with the master) but 
# may also cause problems if clients are writing to it because of a 
# misconfiguration. 
# 
# Since Redis 2.6 by default replicas are read-only. 
# 
# Note: read only replicas are not designed to be exposed to untrusted clients 
# on the internet. It's just a protection layer against misuse of the instance. 
# Still a read only replica exports by default all the administrative commands 
# such as CONFIG, DEBUG, and so forth. To a limited extent you can improve 
# security of read only replicas using 'rename-command' to shadow all the 
# administrative / dangerous commands. 
replica-read-only yes 

可以让备机接收写请求,对于一些临时数据,写备机还是很方便的

只读被并不能防止client的恶意请求,如果想进一步提高安全性,可以用rename-command来隐藏一些高危命令

3、repl-diskless-sync

# Replication SYNC strategy: disk or socket. 
# 
# New replicas and reconnecting replicas that are not able to continue the 
# replication process just receiving differences, need to do what is called a 
# "full synchronization". An RDB file is transmitted from the master to the 
# replicas. 
# 
# The transmission can happen in two different ways: 
# 
# 1) Disk-backed: The Redis master creates a new process that writes the RDB 
#                 file on disk. Later the file is transferred by the parent 
#                 process to the replicas incrementally. 
# 2) Diskless: The Redis master creates a new process that directly writes the 
#              RDB file to replica sockets, without touching the disk at all. 
# 
# With disk-backed replication, while the RDB file is generated, more replicas 
# can be queued and served with the RDB file as soon as the current child 
# producing the RDB file finishes its work. With diskless replication instead 
# once the transfer starts, new replicas arriving will be queued and a new 
# transfer will start when the current one terminates. 
# 
# When diskless replication is used, the master waits a configurable amount of 
# time (in seconds) before starting the transfer in the hope that multiple 
# replicas will arrive and the transfer can be parallelized(并行). 
# 
# With slow disks and fast (large bandwidth) networks, diskless replication 
# works better. 
repl-diskless-sync no 

有两种同步方式——磁盘和socket

在新副本在上来的时候不能提供增量同步,只能用全量同步的方法先同步一次,全量同步的时候需要发送rdb快照

在发送rdb快照的时候可以有两种方法

  1. 落磁盘,主机新fork一个子进程,然后把rdb保存在磁盘上,然后父进程再逐渐把rdb发送到备机上
  2. 不落盘,主机fork一个子进程,子进程直接把rdb数据通过socket发送到备机,rdb在内存中,不落盘

落磁盘的方式有个好处就是一旦rdb落盘了,那么这个rdb可以供多个备机使用。否则,不落盘的时候如果有多个备机要求同步,则这些备机需要排队

不落盘的时候,主机在同步之前会稍微等待一会,如果有多个备机同步,那么可以并行发送

如果磁盘速度慢而网络带宽很大的时候,不落盘的方法会更好点

4、repl-diskless-sync-delay

# When diskless replication is enabled, it is possible to configure the delay 
# the server waits in order to spawn the child that transfers the RDB via socket 
# to the replicas. 
# 
# This is important since once the transfer starts, it is not possible to serve 
# new replicas arriving, that will be queued for the next RDB transfer, so the 
# server waits a delay in order to let more replicas arrive. 
# 
# The delay is specified in seconds, and by default is 5 seconds. To disable 
# it entirely just set it to 0 seconds and the transfer will start ASAP. 
repl-diskless-sync-delay 5 

对于无盘复制,可以设置一个等待时间,尽量同时向多个备机同步,因为一旦开始复制,直到结束前都不能为新备机同步数据了

5、repl-diskless-load

# ----------------------------------------------------------------------------- 
# WARNING: RDB diskless load is experimental. Since in this setup the replica 
# does not immediately store an RDB on disk, it may cause data loss during 
# failovers. RDB diskless load + Redis modules not handling I/O reads may also 
# cause Redis to abort in case of I/O errors during the initial synchronization 
# stage with the master. Use only if your do what you are doing. 
# ----------------------------------------------------------------------------- 
# 
# Replica can load the RDB it reads from the replication link directly from the 
# socket, or store the RDB to a file and read that file after it was completely 
# recived from the master. 
# 
# In many cases the disk is slower than the network, and storing and loading 
# the RDB file may increase replication time (and even increase the master's 
# Copy on Write memory and salve buffers). 
# However, parsing the RDB file directly from the socket may mean that we have 
# to flush the contents of the current database before the full rdb was 
# received. For this reason we have the following options: 
# 
# "disabled"    - Don't use diskless load (store the rdb file to the disk first) 
# "on-empty-db" - Use diskless load only when it is completely safe. 
# "swapdb"      - Keep a copy of the current db contents in RAM while parsing 
#                 the data directly from the socket. note that this requires 
#                 sufficient memory, if you don't have it, you risk an OOM kill. 
repl-diskless-load disabled 

在无盘复制中,备机可以通过配置来决定是否将rdb落到本地磁盘,如果落盘,那么同步时间就会变长,甚至会使主节点内存增大

6、repl-ping-replica-period

# Replicas send PINGs to server in a predefined interval. It's possible to 
# change this interval with the repl_ping_replica_period option. The default 
# value is 10 seconds. 
# 
# repl-ping-replica-period 10 

备机向主机发送ping的时间周期,默认是10s

7、repl-timeout

# The following option sets the replication timeout for: 
# 
# 1) Bulk transfer I/O during SYNC, from the point of view of replica. 
# 2) Master timeout from the point of view of replicas (data, pings). 
# 3) Replica timeout from the point of view of masters (REPLCONF ACK pings). 
# 
# It is important to make sure that this value is greater than the value 
# specified for repl-ping-replica-period otherwise a timeout will be detected 
# every time there is low traffic between the master and the replica. 
# 
# repl-timeout 60 

复制同步过程中的超时时间

备机收到rdb的超时时间

备机接收数据或者主机ping的超时时间

主机接收备机的REPLCONF ACK ping超时时间

8、repl-disable-tcp-nodelay

# Disable TCP_NODELAY on the replica socket after SYNC? 
# 
# If you select "yes" Redis will use a smaller number of TCP packets and 
# less bandwidth to send data to replicas. But this can add a delay for 
# the data to appear on the replica side, up to 40 milliseconds with 
# Linux kernels using a default configuration. 
# 
# If you select "no" the delay for data to appear on the replica side will 
# be reduced but more bandwidth will be used for replication. 
# 
# By default we optimize for low latency, but in very high traffic conditions 
# or when the master and replicas are many hops away, turning this to "yes" may 
# be a good idea. 
repl-disable-tcp-nodelay no 

如果开启,则会把小包聚合成打包发送,会有40ms延迟,可以降低带宽,但是会造成数据不一致

9、repl-backlog-size

# Set the replication backlog size. The backlog is a buffer that accumulates 
# replica data when replicas are disconnected for some time, so that when a 
# replica wants to reconnect again, often a full resync is not needed, but a 
# partial resync is enough, just passing the portion of data the replica 
# missed while disconnected. 
# 
# The bigger the replication backlog, the longer the time the replica can be 
# disconnected and later be able to perform a partial resynchronization. 
# 
# The backlog is only allocated once there is at least a replica connected. 
# 
# repl-backlog-size 1mb 

同步过程中的命令缓冲区,如果太小,则很难满足部分重同步,这个值越到,允许备机掉线的时间越长

10、repl-backlog-ttl

# After a master has no longer connected replicas for some time, the backlog 
# will be freed. The following option configures the amount of seconds that 
# need to elapse, starting from the time the last replica disconnected, for 
# the backlog buffer to be freed. 
# 
# Note that replicas never free the backlog for timeout, since they may be 
# promoted to masters later, and should be able to correctly "partially 
# resynchronize" with the replicas: hence they should always accumulate backlog. 
# 
# A value of 0 means to never release the backlog. 
# 
# repl-backlog-ttl 3600 

当没有备机同步是缓冲区需要保留的时长,为0表示不释放

11、replica-priority

# The replica priority is an integer number published by Redis in the INFO 
# output. It is used by Redis Sentinel in order to select a replica to promote 
# into a master if the master is no longer working correctly. 
# 
# A replica with a low priority number is considered better for promotion, so 
# for instance if there are three replicas with priority 10, 100, 25 Sentinel 
# will pick the one with priority 10, that is the lowest. 
# 
# However a special priority of 0 marks the replica as not able to perform the 
# role of master, so a replica with priority of 0 will never be selected by 
# Redis Sentinel for promotion. 
# 
# By default the priority is 100. 
replica-priority 100 

备机的优先级,用来指定备机能否当选主

12、min-replicas-to-write、min-replicas-max-lag

# It is possible for a master to stop accepting writes if there are less than 
# N replicas connected, having a lag less or equal than M seconds. 
# 
# The N replicas need to be in "online" state. 
# 
# The lag in seconds, that must be <= the specified value, is calculated from 
# the last ping received from the replica, that is usually sent every second. 
# 
# This option does not GUARANTEE that N replicas will accept the write, but 
# It is possible for a master to stop accepting writes if there are less than 
# N replicas connected, having a lag less or equal than M seconds. 
# 
# The N replicas need to be in "online" state. 
# 
# The lag in seconds, that must be <= the specified value, is calculated from 
# the last ping received from the replica, that is usually sent every second. 
# 
# This option does not GUARANTEE that N replicas will accept the write, but 
# will limit the window of exposure for lost writes in case not enough replicas 
# are available, to the specified number of seconds. 
# 
# For example to require at least 3 replicas with a lag(滞后) <= 10 seconds use: 
# 
# min-replicas-to-write 3 
# min-replicas-max-lag 10 

为了数据的安全性,对于写请求,至少要求的副本数,和同步延迟时间

三、主从复制的过程

在主从复制的过程,主机和备机之间需要多次的交互协商,确保对端状态正常后才同步数据,这个过程成为handshake

源码代码中定义同步状态码

/* Slave replication state. Used in server.repl_state for slaves to remember 
 * what to do next. */ 
#define REPL_STATE_NONE 0 /* No active replication */ 
#define REPL_STATE_CONNECT 1 /* Must connect to master */ 
#define REPL_STATE_CONNECTING 2 /* Connecting to master */ 
/* --- Handshake states, must be ordered --- */ 
#define REPL_STATE_RECEIVE_PONG 3 /* Wait for PING reply */ 
#define REPL_STATE_SEND_AUTH 4 /* Send AUTH to master */ 
#define REPL_STATE_RECEIVE_AUTH 5 /* Wait for AUTH reply */ 
#define REPL_STATE_SEND_PORT 6 /* Send REPLCONF listening-port */ 
#define REPL_STATE_RECEIVE_PORT 7 /* Wait for REPLCONF reply */ 
#define REPL_STATE_SEND_IP 8 /* Send REPLCONF ip-address */ 
#define REPL_STATE_RECEIVE_IP 9 /* Wait for REPLCONF reply */ 
#define REPL_STATE_SEND_CAPA 10 /* Send REPLCONF capa */ 
#define REPL_STATE_RECEIVE_CAPA 11 /* Wait for REPLCONF reply */ 
#define REPL_STATE_SEND_PSYNC 12 /* Send PSYNC */ 
#define REPL_STATE_RECEIVE_PSYNC 13 /* Wait for PSYNC reply */ 
/* --- End of handshake states --- */ 
#define REPL_STATE_TRANSFER 14 /* Receiving .rdb from master */ 
#define REPL_STATE_CONNECTED 15 /* Connected to master */ 

其中handshake部分为协商部分

redis的主从完整的同步过程可以分为7步

1、设置主服务器的地址和端口

当在备机上执行slaveof命令时,备机上的命令处理函数replicaofCommand会调用replicationSetMaster**(c->argv[1]->ptr, port)来保存主机的地址和端口,备机返回给客户端ok应答,同时同步进入REPL_STATE_CONNECT状态

/* Set replication to the specified master address and port. */ 
void replicationSetMaster(char *ip, int port) { 
    int was_master = server.masterhost == NULL; 

    sdsfree(server.masterhost); 
    server.masterhost = sdsnew(ip); 
    server.masterport = port;  /*保存主机的地址端口*/ 
    if (server.master) { 
        freeClient(server.master); 
    } 
    disconnectAllBlockedClients(); /* Clients blocked in master, now slave. */ 

    /* Force our slaves to resync with us as well. They may hopefully be able 
     * to partially resync with us, but we can notify the replid change. */ 
    disconnectSlaves(); 
    cancelReplicationHandshake(); /* 这里断开了已有的同步,所以如果超时时间太短,则每次都会进到这里 */ 
    /* Before destroying our master state, create a cached master using 
     * our own parameters, to later PSYNC with the new master. */ 
    if (was_master) { 
        replicationDiscardCachedMaster(); 
        replicationCacheMasterUsingMyself(); 
    } 

    /* Fire the role change modules event. */ 
    moduleFireServerEvent(REDISMODULE_EVENT_REPLICATION_ROLE_CHANGED, 
                          REDISMODULE_EVENT_REPLROLECHANGED_NOW_REPLICA, 
                          NULL); 

    /* Fire the master link modules event. */ 
    if (server.repl_state == REPL_STATE_CONNECTED) 
        moduleFireServerEvent(REDISMODULE_EVENT_MASTER_LINK_CHANGE, 
                              REDISMODULE_SUBEVENT_MASTER_LINK_DOWN, 
                              NULL); 

    server.repl_state = REPL_STATE_CONNECT; 
} 

2、建立socket连接

上一步设置了主机的地址端口,设置完后命令就返回了,同步进入到REPL_STATE_CONNECT状态

我们知道,redis有一个定时器serverCron,在这个函数里面,以1s为周期的触发同步的定时器,换句话说,在执行完slaveof命令后最多要1s才能真正开始同步过程

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) { 
    /* Replication cron function -- used to reconnect to master, 
     * detect transfer failures, start background RDB transfers and so forth. */ 
    run_with_period(1000) replicationCron(); 
} 

replicationCron函数内部是整个同步状态的流转

void replicationCron(void) { 
    /* Check if we should connect to a MASTER */ 
    if (server.repl_state == REPL_STATE_CONNECT) { /* 上一步是CONNECT*/ 
        serverLog(LL_NOTICE,"Connecting to MASTER %s:%d", 
            server.masterhost, server.masterport); 
        if (connectWithMaster() == C_OK) { /* 进入下一步,建立连接 */ 
            serverLog(LL_NOTICE,"MASTER <-> REPLICA sync started"); 
        } 
    } 
} 

int connectWithMaster(void) { 
    server.repl_transfer_s = server.tls_replication ? connCreateTLS() : connCreateSocket(); 
    if (connConnect(server.repl_transfer_s, server.masterhost, server.masterport, 
                NET_FIRST_BIND_ADDR, syncWithMaster) == C_ERR) { /* 设置了syncWithMaster回调*/ 
        serverLog(LL_WARNING,"Unable to connect to MASTER: %s", 
                connGetLastError(server.repl_transfer_s)); 
        connClose(server.repl_transfer_s); 
        server.repl_transfer_s = NULL; 
        return C_ERR; 
    } 

    server.repl_transfer_lastio = server.unixtime; 
    server.repl_state = REPL_STATE_CONNECTING; 
    return C_OK; 
} 

和主机建立socket连接,进入到REPL_STATE_CONNECTING状态

3、发送ping

上一步与主机建立完socket连接后,同步进入到了REPL_STATE_CONNECTING状态,且设置了syncWithMaster回调处理函数

在connect成功后,备机向主机发送了一个ping命令,同时备机设置了syncWithMaster作为读回调的处理函数,同时同步进入到了REPL_STATE_RECEIVE_PONG状态

syncWithMaster内部实现了同步时的handshake和同步的过程

/* This handler fires when the non blocking connect was able to 
 * establish a connection with the master. */ 
void syncWithMaster(connection *conn) { 
    /* Send a PING to check the master is able to reply without errors. */ 
    if (server.repl_state == REPL_STATE_CONNECTING) { 
        serverLog(LL_NOTICE,"Non blocking connect for SYNC fired the event."); 
        /* Delete the writable event so that the readable event remains 
         * registered and we can wait for the PONG reply. */ 
        connSetReadHandler(conn, syncWithMaster);  /* 设置读回调仍然是syncWithMaster */ 
        connSetWriteHandler(conn, NULL); 
        server.repl_state = REPL_STATE_RECEIVE_PONG; /* 进入RECEIVE_PONG状态 */ 
        /* Send the PING, don't check for errors at all, we have the timeout 
         * that will take care about this. */ 
        err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"PING",NULL); /* 发送ping */ 
        if (err) goto write_error; 
        return; 
    } 
} 

4、身份验证

在收到主机的ping应答时,备机进入REPL_STATE_SEND_AUTH状态

进入REPL_STATE_SEND_AUTH如果开启了auth认证,则备机会发送auth,同步进入到REPL_STATE_RECEIVE_AUTH状态

REPL_STATE_RECEIVE_AUTH状态后备机接收主机的认证结果,同步进入到REPL_STATE_SEND_PORT状态

如果没有开启auth认证,则同步直接进入到REPL_STATE_SEND_PORT状态

void syncWithMaster(connection *conn) { 
    /* Receive the PONG command. */ 
    if (server.repl_state == REPL_STATE_RECEIVE_PONG) { 
        err = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL); /* 收取ping的回包 */ 

        /* We accept only two replies as valid, a positive +PONG reply 
         * (we just check for "+") or an authentication error. 
         * Note that older versions of Redis replied with "operation not 
         * permitted" instead of using a proper error code, so we test 
         * both. */ 
        if (err[0] != '+' && 
            strncmp(err,"-NOAUTH",7) != 0 && 
            strncmp(err,"-ERR operation not permitted",28) != 0) 
        { 
            serverLog(LL_WARNING,"Error reply to PING from master: '%s'",err); 
            sdsfree(err); 
            goto error; 
        } else { 
            serverLog(LL_NOTICE, 
                "Master replied to PING, replication can continue..."); 
        } 
        sdsfree(err); 
        server.repl_state = REPL_STATE_SEND_AUTH; /* 进入SEND_AUTH状态 */ 
    } 

    /* AUTH with the master if required. */ 
    /* 紧接着开始auth认证 */ 
    if (server.repl_state == REPL_STATE_SEND_AUTH) { 
        if (server.masteruser && server.masterauth) { 
            err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"AUTH", 
                                         server.masteruser,server.masterauth,NULL); /* auth认证 */ 
            if (err) goto write_error; 
            server.repl_state = REPL_STATE_RECEIVE_AUTH; 
            return; 
        } else if (server.masterauth) { 
            err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"AUTH",server.masterauth,NULL); /* auth认证 */ 
            if (err) goto write_error; 
            server.repl_state = REPL_STATE_RECEIVE_AUTH; 
            return; 
        } else { 
            server.repl_state = REPL_STATE_SEND_PORT; 
        } 
    } 

    /* Receive AUTH reply. */ 
    if (server.repl_state == REPL_STATE_RECEIVE_AUTH) { 
        err = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL); /* auth认证结果 */ 
        if (err[0] == '-') { 
            serverLog(LL_WARNING,"Unable to AUTH to MASTER: %s",err); 
            sdsfree(err); 
            goto error; 
        } 
        sdsfree(err); 
        server.repl_state = REPL_STATE_SEND_PORT; 
    } 
} 

5、发送端口信息

为什么需要备机发送一次端口信息呢?主机根据socket连接也是能够获取到备机的连接端口的

进入REPL_STATE_SEND_PORT状态后备机会将自己的端口发送给主机,同步进入到REPL_STATE_RECEIVE_PORT状态

进入REPL_STATE_RECEIVE_PORT状态后,备机接收主机的应答,由于某些版本不支持,所以忽略错误,同步进入到REPL_STATE_SEND_IP状态

void syncWithMaster(connection *conn) { 
    /* Set the slave port, so that Master's INFO command can list the 
     * slave listening port correctly. */ 
    if (server.repl_state == REPL_STATE_SEND_PORT) { 
        int port; 
        if (server.slave_announce_port) port = server.slave_announce_port; 
        else if (server.tls_replication && server.tls_port) port = server.tls_port; 
        else port = server.port; 
        sds portstr = sdsfromlonglong(port); 
        err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"REPLCONF", 
                "listening-port",portstr, NULL); /* 发送端口信息 */ 
        sdsfree(portstr); 
        if (err) goto write_error; 
        sdsfree(err); 
        server.repl_state = REPL_STATE_RECEIVE_PORT; 
        return; 
    } 

    /* Receive REPLCONF listening-port reply. */ 
    if (server.repl_state == REPL_STATE_RECEIVE_PORT) { 
        err = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL); 
        /* Ignore the error if any, not all the Redis versions support 
         * REPLCONF listening-port. */ 
        if (err[0] == '-') { 
            serverLog(LL_NOTICE,"(Non critical) Master does not understand " 
                                "REPLCONF listening-port: %s", err); 
        } 
        sdsfree(err); 
        server.repl_state = REPL_STATE_SEND_IP; 
    } 
} 

6、发送ip信息

当备机发送端口后同步进入到REPL_STATE_SEND_IP状态

进入REPL_STATE_SEND_IP状态后,如果当前没有设置slave-announce-ip则跳过ip发送,同步进入到REPL_STATE_SEND_CAPA状态

如果设置了slave-announce-ip,则发送给主机,备机收取回包后忽略错误,同步进入到REPL_STATE_SEND_CAPA状态

void syncWithMaster(connection *conn) { 
    /* Skip REPLCONF ip-address if there is no slave-announce-ip option set. */ 
    if (server.repl_state == REPL_STATE_SEND_IP && 
        server.slave_announce_ip == NULL) 
    { 
            server.repl_state = REPL_STATE_SEND_CAPA;  /* 如果没有announce-ip,则直接跳过*/ 
    } 

    /* Set the slave ip, so that Master's INFO command can list the 
     * slave IP address port correctly in case of port forwarding or NAT. */ 
    if (server.repl_state == REPL_STATE_SEND_IP) { 
        err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"REPLCONF", 
                "ip-address",server.slave_announce_ip, NULL); 
        if (err) goto write_error; 
        sdsfree(err); 
        server.repl_state = REPL_STATE_RECEIVE_IP; 
        return; 
    } 

    /* Receive REPLCONF ip-address reply. */ 
    if (server.repl_state == REPL_STATE_RECEIVE_IP) { 
        err = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL); 
        /* Ignore the error if any, not all the Redis versions support 
         * REPLCONF listening-port. */ 
        if (err[0] == '-') { 
            serverLog(LL_NOTICE,"(Non critical) Master does not understand " 
                                "REPLCONF ip-address: %s", err); 
        } 
        sdsfree(err); 
        server.repl_state = REPL_STATE_SEND_CAPA; 
    } 
} 

7、同步

进入REPL_STATE_SEND_CAPA状态后,才算开始同步了,但是由于同步算法一直有改进,所以需要先判断一下主机支持的同步算法,对于不支持的同步算法,主机是直接忽略的

在接收REPL_STATE_SEND_CAPA的应答后,同步进入到REPL_STATE_SEND_PSYNC状态

void syncWithMaster(connection *conn) { 
    /* Inform the master of our (slave) capabilities. 
     * 
     * EOF: supports EOF-style RDB transfer for diskless replication. 
     * PSYNC2: supports PSYNC v2, so understands +CONTINUE <new repl ID>. 
     * 
     * The master will ignore capabilities it does not understand. */ 
    if (server.repl_state == REPL_STATE_SEND_CAPA) { 
        err = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"REPLCONF", 
                "capa","eof","capa","psync2",NULL); /* 是否支持eof和psync2 */ 
        if (err) goto write_error; 
        sdsfree(err); 
        server.repl_state = REPL_STATE_RECEIVE_CAPA; 
        return; 
    } 

    /* Receive CAPA reply. */ 
    if (server.repl_state == REPL_STATE_RECEIVE_CAPA) { 
        err = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL); 
        /* Ignore the error if any, not all the Redis versions support 
         * REPLCONF capa. */ 
        if (err[0] == '-') { 
            serverLog(LL_NOTICE,"(Non critical) Master does not understand " 
                                  "REPLCONF capa: %s", err); 
        } 
        sdsfree(err); 
        server.repl_state = REPL_STATE_SEND_PSYNC; 
    } 

    /* Try a partial resynchonization. If we don't have a cached master 
     * slaveTryPartialResynchronization() will at least try to use PSYNC 
     * to start a full resynchronization so that we get the master run id 
     * and the global offset, to try a partial resync at the next 
     * reconnection attempt. */ 
    if (server.repl_state == REPL_STATE_SEND_PSYNC) { 
        if (slaveTryPartialResynchronization(conn,0) == PSYNC_WRITE_ERROR) { /* 尝试部分重同步,发送psync请求 */ 
            err = sdsnew("Write error sending the PSYNC command."); 
            goto write_error; 
        } 
        server.repl_state = REPL_STATE_RECEIVE_PSYNC; 
        return; 
    } 

    psync_result = slaveTryPartialResynchronization(conn,1); /* 接收psync的应答,如果是continue,则说明可以部分重同步*/ 
    if (psync_result == PSYNC_WAIT_REPLY) return; /* Try again later... */ 

    /* If the master is in an transient error, we should try to PSYNC 
     * from scratch later, so go to the error path. This happens when 
     * the server is loading the dataset or is not connected with its 
     * master and so forth. */ 
    if (psync_result == PSYNC_TRY_LATER) goto error; 

    /* Note: if PSYNC does not return WAIT_REPLY, it will take care of 
     * uninstalling the read handler from the file descriptor. */ 

    if (psync_result == PSYNC_CONTINUE) { /*可以部分重同步*/ 
        serverLog(LL_NOTICE, "MASTER <-> REPLICA sync: Master accepted a Partial Resynchronization."); 
        if (server.supervised_mode == SUPERVISED_SYSTEMD) { 
            redisCommunicateSystemd("STATUS=MASTER <-> REPLICA sync: Partial Resynchronization accepted. Ready to accept connections.\n"); 
            redisCommunicateSystemd("READY=1\n"); 
        } 
        return; 
    } 
    /////////////////////////////////////////////////////////// 
    if (psync_result == PSYNC_NOT_SUPPORTED) { /* 如果主机不支持,则用sync发起全量同步*/ 
        serverLog(LL_NOTICE,"Retrying with SYNC..."); 
        if (connSyncWrite(conn,"SYNC\r\n",6,server.repl_syncio_timeout*1000) == -1) { 
            serverLog(LL_WARNING,"I/O error writing to MASTER: %s", 
                strerror(errno)); 
            goto error; 
        } 
    } 
    /* Prepare a suitable temp file for bulk transfer */ 
    if (!useDisklessLoad()) {  /* 有盘复制,创建tmp文件 */ 
        while(maxtries--) { 
            snprintf(tmpfile,256, 
                "temp-%d.%ld.rdb",(int)server.unixtime,(long int)getpid()); 
            dfd = open(tmpfile,O_CREAT|O_WRONLY|O_EXCL,0644); 
            if (dfd != -1) break; 
            sleep(1); 
        } 
        if (dfd == -1) { /* 无盘复制 */ 
            serverLog(LL_WARNING,"Opening the temp file needed for MASTER <-> REPLICA synchronization: %s",strerror(errno)); 
            goto error; 
        } 
        server.repl_transfer_tmpfile = zstrdup(tmpfile); 
        server.repl_transfer_fd = dfd; 
    } 

    /* Setup the non blocking download of the bulk file. */ 
    /* 在slaveTryPartialResynchronization 中已经重置了readHandler,而且不同的消息对应不同的处理函数 */ 
    if (connSetReadHandler(conn, readSyncBulkPayload) == C_ERR) /* 全量rdb同步,设置接收回调函数readSyncBulkPayload */ 
    { 
        char conninfo[CONN_INFO_LEN]; 
        serverLog(LL_WARNING, 
            "Can't create readable event for SYNC: %s (%s)", 
            strerror(errno), connGetInfo(conn, conninfo, sizeof(conninfo))); 
        goto error; 
    }    
} 

slaveTryPartialResynchronization函数的判断能否进行部分重同步,并且返回了需要同步的方式

如果需要全量同步,则重新设置读回调readSyncBulkPayload

#define PSYNC_WRITE_ERROR 0 
#define PSYNC_WAIT_REPLY 1 
#define PSYNC_CONTINUE 2 
#define PSYNC_FULLRESYNC 3 
#define PSYNC_NOT_SUPPORTED 4 
#define PSYNC_TRY_LATER 5 
int slaveTryPartialResynchronization(connection *conn, int read_reply) { 
    if (!read_reply) { 
        /* Issue the PSYNC command */ 
        /* 发送psync请求*/ 
        reply = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"PSYNC",psync_replid,psync_offset,NULL); 
        if (reply != NULL) { 
            serverLog(LL_WARNING,"Unable to send PSYNC to master: %s",reply); 
            sdsfree(reply); 
            connSetReadHandler(conn, NULL); 
            return PSYNC_WRITE_ERROR; 
        } 
        return PSYNC_WAIT_REPLY; 
    } 

    /* Reading half */ 
    reply = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL); 
    if (sdslen(reply) == 0) { 
        /* The master may send empty newlines after it receives PSYNC 
         * and before to reply, just to keep the connection alive. */ 
        sdsfree(reply); 
        return PSYNC_WAIT_REPLY; 
    } 
    connSetReadHandler(conn, NULL); 

    if (!strncmp(reply,"+FULLRESYNC",11)) { 
        /* */ 
        return PSYNC_FULLRESYNC; 
    } 
    if (!strncmp(reply,"+CONTINUE",9)) { 
        /* Partial resync was accepted. */ 
         return PSYNC_CONTINUE; 
    } 
    return PSYNC_NOT_SUPPORTED; 
} 

如果需要全量同,则进入FULLRESYNC

8、命令传播

在aof复制的时候了解到,客户端的所有命令会调用propagate函数来将命令扩散出去

void propagate(struct redisCommand *cmd, int dbid, robj **argv, int argc, 
               int flags) 
{ 
    if (server.aof_state != AOF_OFF && flags & PROPAGATE_AOF) 
        feedAppendOnlyFile(cmd,dbid,argv,argc); 
    if (flags & PROPAGATE_REPL) 
        replicationFeedSlaves(server.slaves,dbid,argv,argc); // 扩散到备机上 
} 
void replicationFeedSlaves(list *slaves, int dictid, robj **argv, int argc) { 
    /* Send SELECT command to every slave if needed. */ 
    if (server.slaveseldb != dictid) { 
        if (server.repl_backlog) feedReplicationBacklogWithObject(selectcmd); /* 如果数据库变了,先生成一条select db命令*/ 

         /* Send it to slaves. */ 
        listRewind(slaves,&li); 
        while((ln = listNext(&li))) { 
            client *slave = ln->value; 
            if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; 
            addReply(slave,selectcmd); 
        } 
    } 

    /* Write the command to the replication backlog if any. */ 
    /* 将命令写入到backlog */ 
    if (server.repl_backlog) { 
        char aux[LONG_STR_SIZE+3]; 

        /* Add the multi bulk reply length. */ 
        aux[0] = '*'; 
        len = ll2string(aux+1,sizeof(aux)-1,argc); 
        aux[len+1] = '\r'; 
        aux[len+2] = '\n'; 
        feedReplicationBacklog(aux,len+3); 

        for (j = 0; j < argc; j++) { 
            long objlen = stringObjectLen(argv[j]); 

            /* We need to feed the buffer with the object as a bulk reply 
             * not just as a plain string, so create the $..CRLF payload len 
             * and add the final CRLF */ 
            aux[0] = '$'; 
            len = ll2string(aux+1,sizeof(aux)-1,objlen); 
            aux[len+1] = '\r'; 
            aux[len+2] = '\n'; 
            feedReplicationBacklog(aux,len+3); 
            feedReplicationBacklogWithObject(argv[j]); 
            feedReplicationBacklog(aux+len+1,2); 
        } 
    } 

    /* Write the command to every slave. */ 
    /* 将命令发送到所有slave上 */ 
    listRewind(slaves,&li); 
    while((ln = listNext(&li))) { 
        client *slave = ln->value; 

        /* Don't feed slaves that are still waiting for BGSAVE to start. */ 
        if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_START) continue; 

        /* Feed slaves that are waiting for the initial SYNC (so these commands 
         * are queued in the output buffer until the initial SYNC completes), 
         * or are already in sync with the master. */ 

        /* Add the multi bulk length. */ 
        addReplyArrayLen(slave,argc); 

        /* Finally any additional argument that was not stored inside the 
         * static buffer if any (from j to argc). */ 
        for (j = 0; j < argc; j++) 
            addReplyBulk(slave,argv[j]); 
    } 
} 

主机通过replicationFeedSlaves函数将命令写到backlog,并且发送给slave

四、syncCommand与slaveTryPartialResynchronization

syncCommandslaveTryPartialResynchronization可以说是在主备同步过程中主机和备机的关键实现部分

syncCommand中实现了SYNCPSYNC两个命令

/* SYNC and PSYNC command implemenation. */ 
void syncCommand(client *c) { 
    /* ignore SYNC if already slave or in monitor mode */ 
    if (c->flags & CLIENT_SLAVE) return; 

    /* Refuse SYNC requests if we are a slave but the link with our master 
     * is not ok... */ 
    /* 备机状态还没进入到CONNECTED,则返回错误 */ 
    if (server.masterhost && server.repl_state != REPL_STATE_CONNECTED) { 
        addReplySds(c,sdsnew("-NOMASTERLINK Can't SYNC while not connected with my master\r\n")); 
        return; 
    } 

    /* SYNC can't be issued when the server has pending data to send to 
     * the client about already issued commands. We need a fresh reply 
     * buffer registering the differences between the BGSAVE and the current 
     * dataset, so that we can copy to other slaves if needed. */ 
    /* 备机的一些应答数据包还没有发出去 */ 
    if (clientHasPendingReplies(c)) { 
        addReplyError(c,"SYNC and PSYNC are invalid with pending output"); 
        return; 
    } 

    serverLog(LL_NOTICE,"Replica %s asks for synchronization", 
        replicationGetSlaveName(c)); 

    /* Try a partial resynchronization if this is a PSYNC command. 
     * If it fails, we continue with usual full resynchronization, however 
     * when this happens masterTryPartialResynchronization() already 
     * replied with: 
     * 
     * +FULLRESYNC <replid> <offset> 
     * 
     * So the slave knows the new replid and offset to try a PSYNC later 
     * if the connection with the master is lost. */ 
    /* 如果是psync,尝试部分重同步 */ 
    if (!strcasecmp(c->argv[0]->ptr,"psync")) { 
        /* masterTryPartialResynchronization 函数中根据replid和offset判断是否可以部分重同步 */ 
        if (masterTryPartialResynchronization(c) == C_OK) { 
            server.stat_sync_partial_ok++; 
            return; /* No full resync needed, return. */ 
        } else { 
            char *master_replid = c->argv[1]->ptr; 

            /* Increment stats for failed PSYNCs, but only if the 
             * replid is not "?", as this is used by slaves to force a full 
             * resync on purpose when they are not albe to partially 
             * resync. */ 
            if (master_replid[0] != '?') server.stat_sync_partial_err++; 
        } 
    } else { 
        /* If a slave uses SYNC, we are dealing with an old implementation 
         * of the replication protocol (like redis-cli --slave). Flag the client 
         * so that we don't expect to receive REPLCONF ACK feedbacks. */ 
        c->flags |= CLIENT_PRE_PSYNC; 
    } 

    /* Full resynchronization. */ 
    server.stat_sync_full++; 

    /* Setup the slave as one waiting for BGSAVE to start. The following code 
     * paths will change the state if we handle the slave differently. */ 
    c->replstate = SLAVE_STATE_WAIT_BGSAVE_START; 
    if (server.repl_disable_tcp_nodelay) 
        connDisableTcpNoDelay(c->conn); /* Non critical if it fails. */ 
    c->repldbfd = -1; 
    c->flags |= CLIENT_SLAVE; 
    listAddNodeTail(server.slaves,c); /* 将client添加到slaves列表 */ 

    /* Create the replication backlog if needed. */ 
    /* 如果当前没有backlog,则创建*/ 
    if (listLength(server.slaves) == 1 && server.repl_backlog == NULL) { 
        /* When we create the backlog from scratch, we always use a new 
         * replication ID and clear the ID2, since there is no valid 
         * past history. */ 
        changeReplicationId(); /* 清空replid1、replid2和backlog*/ 
        clearReplicationId2(); 
        createReplicationBacklog(); 
    } 

    /* CASE 1: BGSAVE is in progress, with disk target. */ 
    /* 如果当前正在执行bgsave,且rdb文件落盘 */ 
    if (server.rdb_child_pid != -1 && 
        server.rdb_child_type == RDB_CHILD_TYPE_DISK) 
    { 
        /* Ok a background save is in progress. Let's check if it is a good 
         * one for replication, i.e. if there is another slave that is 
         * registering differences since the server forked to save. */ 
        client *slave; 
        listNode *ln; 
        listIter li; 

        listRewind(server.slaves,&li); 
        while((ln = listNext(&li))) { 
            slave = ln->value; 
            if (slave->replstate == SLAVE_STATE_WAIT_BGSAVE_END) break; 
        } 
        /* To attach this slave, we check that it has at least all the 
         * capabilities of the slave that triggered the current BGSAVE. */ 
        if (ln && ((c->slave_capa & slave->slave_capa) == slave->slave_capa)) { 
            /* Perfect, the server is already registering differences for 
             * another slave. Set the right state, and copy the buffer. */ 
            copyClientOutputBuffer(c,slave); 
            replicationSetupSlaveForFullResync(c,slave->psync_initial_offset); 
            serverLog(LL_NOTICE,"Waiting for end of BGSAVE for SYNC"); 
        } else { 
            /* No way, we need to wait for the next BGSAVE in order to 
             * register differences. */ 
            serverLog(LL_NOTICE,"Can't attach the replica to the current BGSAVE. Waiting for next BGSAVE for SYNC"); 
        } 

    /* CASE 2: BGSAVE is in progress, with socket target. */ 
    } else if (server.rdb_child_pid != -1 && 
               server.rdb_child_type == RDB_CHILD_TYPE_SOCKET) 
    { 
        /* There is an RDB child process but it is writing directly to 
         * children sockets. We need to wait for the next BGSAVE 
         * in order to synchronize. */ 
        serverLog(LL_NOTICE,"Current BGSAVE has socket target. Waiting for next BGSAVE for SYNC"); 

    /* CASE 3: There is no BGSAVE is progress. */ 
    /* 新起一个bgsave */ 
    } else { 
        if (server.repl_diskless_sync && (c->slave_capa & SLAVE_CAPA_EOF)) { 
            /* Diskless replication RDB child is created inside 
             * replicationCron() since we want to delay its start a 
             * few seconds to wait for more slaves to arrive. */ 
            if (server.repl_diskless_sync_delay) 
                serverLog(LL_NOTICE,"Delay next BGSAVE for diskless SYNC"); 
        } else { 
            /* Target is disk (or the slave is not capable of supporting 
             * diskless replication) and we don't have a BGSAVE in progress, 
             * let's start one. */ 
            if (!hasActiveChildProcess()) { 
                startBgsaveForReplication(c->slave_capa); 
            } else { 
                serverLog(LL_NOTICE, 
                    "No BGSAVE in progress, but another BG operation is active. " 
                    "BGSAVE for replication delayed"); 
            } 
        } 
    } 
    return; 
} 

备机在slaveTryPartialResynchronization函数中对主机的同步信令做了解析,判断能否部分同步

#define PSYNC_WRITE_ERROR 0 
#define PSYNC_WAIT_REPLY 1 
#define PSYNC_CONTINUE 2 
#define PSYNC_FULLRESYNC 3 
#define PSYNC_NOT_SUPPORTED 4 
#define PSYNC_TRY_LATER 5 
int slaveTryPartialResynchronization(connection *conn, int read_reply) { 
    char *psync_replid; 
    char psync_offset[32]; 
    sds reply; 

    /* Writing half */ 
    if (!read_reply) { 
        /* Initially set master_initial_offset to -1 to mark the current 
         * master run_id and offset as not valid. Later if we'll be able to do 
         * a FULL resync using the PSYNC command we'll set the offset at the 
         * right value, so that this information will be propagated to the 
         * client structure representing the master into server.master. */ 
        server.master_initial_offset = -1; 

        if (server.cached_master) { 
            /* cached_master保存的断连之前的master信息 */ 
            psync_replid = server.cached_master->replid; 
            snprintf(psync_offset,sizeof(psync_offset),"%lld", server.cached_master->reploff+1); 
            serverLog(LL_NOTICE,"Trying a partial resynchronization (request %s:%s).", psync_replid, psync_offset); 
        } else { 
            serverLog(LL_NOTICE,"Partial resynchronization not possible (no cached master)"); 
            /* 如果备机没有同步过,在replid=?,offset=-1*/ 
            psync_replid = "?"; 
            memcpy(psync_offset,"-1",3); 
        } 

        /* Issue the PSYNC command */ 
        /* 备机发送psync命令,带上replid和offset*/ 
        reply = sendSynchronousCommand(SYNC_CMD_WRITE,conn,"PSYNC",psync_replid,psync_offset,NULL); 
        if (reply != NULL) { 
            serverLog(LL_WARNING,"Unable to send PSYNC to master: %s",reply); 
            sdsfree(reply); 
            connSetReadHandler(conn, NULL); 
            return PSYNC_WRITE_ERROR; 
        } 
        return PSYNC_WAIT_REPLY; 
    } 

    /* Reading half */ 
    /* 主机可能会发送\n,忽略*/ 
    reply = sendSynchronousCommand(SYNC_CMD_READ,conn,NULL); 
    if (sdslen(reply) == 0) { 
        /* The master may send empty newlines after it receives PSYNC 
         * and before to reply, just to keep the connection alive. */ 
        sdsfree(reply); 
        return PSYNC_WAIT_REPLY; 
    } 

    connSetReadHandler(conn, NULL); 

    /* 如果主机返回了FUNN-RESYNC,则说明是全量同步,备机重置replid和offset*/ 
    if (!strncmp(reply,"+FULLRESYNC",11)) { 
        char *replid = NULL, *offset = NULL; 

        /* FULL RESYNC, parse the reply in order to extract the run id 
         * and the replication offset. */ 
        replid = strchr(reply,' '); 
        if (replid) { 
            replid++; 
            offset = strchr(replid,' '); 
            if (offset) offset++; 
        } 
        if (!replid || !offset || (offset-replid-1) != CONFIG_RUN_ID_SIZE) { 
            serverLog(LL_WARNING, 
                "Master replied with wrong +FULLRESYNC syntax."); 
            /* This is an unexpected condition, actually the +FULLRESYNC 
             * reply means that the master supports PSYNC, but the reply 
             * format seems wrong. To stay safe we blank the master 
             * replid to make sure next PSYNCs will fail. */ 
            memset(server.master_replid,0,CONFIG_RUN_ID_SIZE+1); 
        } else { 
            memcpy(server.master_replid, replid, offset-replid-1); 
            server.master_replid[CONFIG_RUN_ID_SIZE] = '\0'; 
            server.master_initial_offset = strtoll(offset,NULL,10); 
            serverLog(LL_NOTICE,"Full resync from master: %s:%lld", 
                server.master_replid, 
                server.master_initial_offset); 
        } 
        /* We are going to full resync, discard the cached master structure. */ 
        /* 如果备机有cached_master,则释放,因为再用不到了 */ 
        replicationDiscardCachedMaster(); 
        sdsfree(reply); 
        return PSYNC_FULLRESYNC; 
    } 

    /* 返回continue,则说明部分同步*/ 
    if (!strncmp(reply,"+CONTINUE",9)) { 
        /* Partial resync was accepted. */ 
        serverLog(LL_NOTICE, 
            "Successful partial resynchronization with master."); 

        /* Check the new replication ID advertised by the master. If it 
         * changed, we need to set the new ID as primary ID, and set or 
         * secondary ID as the old master ID up to the current offset, so 
         * that our sub-slaves will be able to PSYNC with us after a 
         * disconnection. */ 
        char *start = reply+10; 
        char *end = reply+9; 
        while(end[0] != '\r' && end[0] != '\n' && end[0] != '\0') end++; 
        if (end-start == CONFIG_RUN_ID_SIZE) { 
            char new[CONFIG_RUN_ID_SIZE+1]; 
            memcpy(new,start,CONFIG_RUN_ID_SIZE); 
            new[CONFIG_RUN_ID_SIZE] = '\0'; 

            /* 如果备机发现新主机的replid_id和自己发送的不一样,则备机更新 */ 

            if (strcmp(new,server.cached_master->replid)) { 
                /* Master ID changed. */ 
                serverLog(LL_WARNING,"Master replication ID changed to %s",new); 

                /* Set the old ID as our ID2, up to the current offset+1. */ 
                /* 把旧的replid保存在server.replid2当中 */ 
                memcpy(server.replid2,server.cached_master->replid, 
                    sizeof(server.replid2)); 
                server.second_replid_offset = server.master_repl_offset+1; 

                /* Update the cached master ID and our own primary ID to the 
                 * new one. */ 
                memcpy(server.replid,new,sizeof(server.replid)); 
                memcpy(server.cached_master->replid,new,sizeof(server.replid)); 

                /* Disconnect all the sub-slaves: they need to be notified. */ 
                /* 强制让自己的slave断开重连,这里有无必要?? */ 
                disconnectSlaves(); 
            } 
        } 

        /* Setup the replication to continue. */ 
        sdsfree(reply); 
        replicationResurrectCachedMaster(conn); 

        /* If this instance was restarted and we read the metadata to 
         * PSYNC from the persistence file, our replication backlog could 
         * be still not initialized. Create it. */ 
        if (server.repl_backlog == NULL) createReplicationBacklog(); 
        return PSYNC_CONTINUE; 
    } 

    /* If we reach this point we received either an error (since the master does 
     * not understand PSYNC or because it is in a special state and cannot 
     * serve our request), or an unexpected reply from the master. 
     * 
     * Return PSYNC_NOT_SUPPORTED on errors we don't understand, otherwise 
     * return PSYNC_TRY_LATER if we believe this is a transient error. */ 

    if (!strncmp(reply,"-NOMASTERLINK",13) || 
        !strncmp(reply,"-LOADING",8)) 
    { 
        serverLog(LL_NOTICE, 
            "Master is currently unable to PSYNC " 
            "but should be in the future: %s", reply); 
        sdsfree(reply); 
        return PSYNC_TRY_LATER; 
    } 

    if (strncmp(reply,"-ERR",4)) { 
        /* If it's not an error, log the unexpected event. */ 
        serverLog(LL_WARNING, 
            "Unexpected reply to PSYNC from master: %s", reply); 
    } else { 
        serverLog(LL_NOTICE, 
            "Master does not support PSYNC or is in " 
            "error state (reply: %s)", reply); 
    } 
    sdsfree(reply); 
    replicationDiscardCachedMaster(); 
    return PSYNC_NOT_SUPPORTED; 
} 

关于cached_master的说明:

当主备之间的连接断开时,备机在释放连接的时候会将主机(视为一个client)保存到cached_master中,

void freeClient(client *c) { 
    /* If it is our master that's beging disconnected we should make sure 
     * to cache the state to try a partial resynchronization later. 
     * 
     * Note that before doing this we make sure that the client is not in 
     * some unexpected state, by checking its flags. */ 
    if (server.master && c->flags & CLIENT_MASTER) { 
        serverLog(LL_WARNING,"Connection with master lost."); 
        if (!(c->flags & (CLIENT_CLOSE_AFTER_REPLY| 
                          CLIENT_CLOSE_ASAP| 
                          CLIENT_BLOCKED))) 
        { 
            replicationCacheMaster(c); 
            return; 
        } 
    } 
} 

void replicationCacheMaster(client *c) { 
    /* Save the master. Server.master will be set to null later by 
     * replicationHandleMasterDisconnection(). */ 
    server.cached_master = server.master; 
}