您现在的位置是: 网站首页 > 程序设计  > redis 

redis源码之一步一步解析客户端连接请求

2020年9月30日 01:49 1722人围观

简介redis是一个性能很高的内存数据库,从我们使用客户端连接到执行一个命令,在redis中是如何执行的?

redis是一个性能很高的内存数据库,那么从我们使用客户端连接到执行一个命令,在redis中是如何执行的?接下来我们从源码角度调试查看内部流程。由于redis 6对网络IO使用了多线程,我们暂时先用redis 5来调试。

服务启动

redis作为后台服务,首先自己先要运行起来,然后等待客户端的连接。

我们知道,redis网络模型是常见的Reactor模式,简单来说就是主函数复杂接收包,对于处理请求通过回调函数(CallBack)处理。通常情况下,我们会把回调函数放在单独我work线程里面。

我们从main函数看起

// server.c 
int main(int argc, char **argv) { 
    struct timeval tv; 
    int j; 

    /**************此处省略代码**************/ 

    server.supervised = redisIsSupervised(server.supervised_mode); 
    int background = server.daemonize && !server.supervised; 
    if (background) daemonize(); 

    initServer(); 
    if (background || server.pidfile) createPidFile(); 
    redisSetProcTitle(argv[0]); 
    redisAsciiArt(); 
    checkTcpBacklogSettings(); 

    /**************此处省略代码**************/ 

main函数中调用initServer函数来初始化服务。

// server.c 
void initServer(void) { 
    int j; 

    /**************此处省略代码**************/ 

   createSharedObjects(); 
    adjustOpenFilesLimit(); 
    server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR); 
    if (server.el == NULL) { 
        serverLog(LL_WARNING, "Failed creating the event loop. Error message: '%s'", 
            strerror(errno)); 
        exit(1); 
    } 

    /**************此处省略代码**************/ 

    /* Create an event handler for accepting new connections in TCP and Unix 
     * domain sockets. */ 
    for (j = 0; j < server.ipfd_count; j++) { 
        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, 
            acceptTcpHandler,NULL) == AE_ERR) 
            { 
                serverPanic("Unrecoverable error creating server.ipfd file event."); 
            } 
    } 
    if (server.sofd > 0 && aeCreateFileEvent(server.el,server.sofd,AE_READABLE, 
        acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event."); 

initServer函数里面,除了初始化服务参数以外,最主要的就是创建监听了网络端口。redis对网络模型做了封装,select、epoll都有,根据系统选择最优。

我们先大概看一下redis封装的event。

/* State of an event based program */ 
typedef struct aeEventLoop { 
    int maxfd;   /* highest file descriptor currently registered */ 
    int setsize; /* max number of file descriptors tracked */ 
    long long timeEventNextId; 
    time_t lastTime;     /* Used to detect system clock skew */ 
    aeFileEvent *events; /* Registered events */ 
    aeFiredEvent *fired; /* Fired events */ 
    aeTimeEvent *timeEventHead; 
    int stop; 
    void *apidata; /* This is used for polling API specific data */ 
    aeBeforeSleepProc *beforesleep; 
    aeBeforeSleepProc *aftersleep; 
} aeEventLoop; 

这个loop可以认为是全局的loop,在redisServer结构体中的aeEventLoop *el;创建了对象。

loop结构里面着重看events变量,redis在启动的时候默认最多可以监听16个ip(CONFIG_BINDADDR_MAX定义),所以events数组保存了这些要监听的fd,除此以外还有客户端的fd。

aeCreateEventLoop函数中初始化如下:

aeEventLoop *aeCreateEventLoop(int setsize) { 
    aeEventLoop *eventLoop; 
    int i; 

    if ((eventLoop = zmalloc(sizeof(*eventLoop))) == NULL) goto err; 
    eventLoop->events = zmalloc(sizeof(aeFileEvent)*setsize); 
    eventLoop->fired = zmalloc(sizeof(aeFiredEvent)*setsize); 
    if (eventLoop->events == NULL || eventLoop->fired == NULL) goto err; 
    eventLoop->setsize = setsize; 
    eventLoop->lastTime = time(NULL); 
    eventLoop->timeEventHead = NULL; 
    eventLoop->timeEventNextId = 0; 
    eventLoop->stop = 0; 
    eventLoop->maxfd = -1; 
    eventLoop->beforesleep = NULL; 
    eventLoop->aftersleep = NULL; 
    if (aeApiCreate(eventLoop) == -1) goto err; 
    /* Events with mask == AE_NONE are not set. So let's initialize the 
     * vector with it. */ 
    for (i = 0; i < setsize; i++) 
        eventLoop->events[i].mask = AE_NONE; 
    return eventLoop; 

函数中创建了events和fired两个数组,而且把events状态设成了NONE。

aeApiCreate 函数根据不同的模型实现不太一样,以select模型为例:

static int aeApiCreate(aeEventLoop *eventLoop) { 
    aeApiState *state = zmalloc(sizeof(aeApiState)); 

    if (!state) return -1; 
    FD_ZERO(&state->rfds); 
    FD_ZERO(&state->wfds); 
    eventLoop->apidata = state; 
    return 0; 
} 

可以看到,是对select模型进行了初始化,并且设定apiddata字段为对应的state

event对象定义好,接下来就是添加具体的事件了。还是看initServer函数。

    /* Create an event handler for accepting new connections in TCP and Unix 
     * domain sockets. */ 
    for (j = 0; j < server.ipfd_count; j++) { 
        if (aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, 
            acceptTcpHandler,NULL) == AE_ERR) 
            { 
                serverPanic( 
                    "Unrecoverable error creating server.ipfd file event."); 
            } 
    } 

代码中的server.ipfd_counterver.ipfd已经在listenToPort函数中处理过了。从这段代码看到,我们给每个要监听的fd创建了一event事件,并且设置回调函数acceptTcpHandler

我们看看aeCreateFileEvent函数

// ae.c 
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, 
        aeFileProc *proc, void *clientData) 
{ 
    if (fd >= eventLoop->setsize) { 
        errno = ERANGE; 
        return AE_ERR; 
    } 
    aeFileEvent *fe = &eventLoop->events[fd]; 

    if (aeApiAddEvent(eventLoop, fd, mask) == -1) 
        return AE_ERR; 
    fe->mask |= mask; 
    if (mask & AE_READABLE) fe->rfileProc = proc; 
    if (mask & AE_WRITABLE) fe->wfileProc = proc; 
    fe->clientData = clientData; 
    if (fd > eventLoop->maxfd) 
        eventLoop->maxfd = fd; 
    return AE_OK; 
} 

函数中通过event[fd]直接找到创建的event,对该event设置读写回调函数,aeApiAddEvent就是把该fd添加到触发队列,以select模型为例就是下面这样的。

// ae_select.c 
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) { 
    aeApiState *state = eventLoop->apidata; 

    if (mask & AE_READABLE) FD_SET(fd,&state->rfds); 
    if (mask & AE_WRITABLE) FD_SET(fd,&state->wfds); 
    return 0; 
} 

到此为止,监听事件都添加好了,接下来就是while循环,等待连接了,对应的是aeMain函数。

void aeMain(aeEventLoop *eventLoop) { 
    eventLoop->stop = 0; 
    while (!eventLoop->stop) { 
        if (eventLoop->beforesleep != NULL) 
            eventLoop->beforesleep(eventLoop); 
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); 
    } 
} 

看一下aeProcessEvents函数

int aeProcessEvents(aeEventLoop *eventLoop, int flags) 
{ 
    int processed = 0, numevents; 

    /* Nothing to do? return ASAP */ 
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0; 

    /**************此处省略代码**************/ 

        /* Call the multiplexing API, will return only on timeout or when 
         * some event fires. */ 
        numevents = aeApiPoll(eventLoop, tvp); 

        /* After sleep callback. */ 
        if (eventLoop->aftersleep != NULL && flags & AE_CALL_AFTER_SLEEP) 
            eventLoop->aftersleep(eventLoop); 

        for (j = 0; j < numevents; j++) { 
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd]; 
            int mask = eventLoop->fired[j].mask; 
            int fd = eventLoop->fired[j].fd; 
            int fired = 0; /* Number of events fired for current fd. */ 

            /* Normally we execute the readable event first, and the writable 
             * event laster. This is useful as sometimes we may be able 
             * to serve the reply of a query immediately after processing the 
             * query. 
             * 
             * However if AE_BARRIER is set in the mask, our application is 
             * asking us to do the reverse: never fire the writable event 
             * after the readable. In such a case, we invert the calls. 
             * This is useful when, for instance, we want to do things 
             * in the beforeSleep() hook, like fsynching a file to disk, 
             * before replying to a client. */ 
            int invert = fe->mask & AE_BARRIER; 

            /* Note the "fe->mask & mask & ..." code: maybe an already 
             * processed event removed an element that fired and we still 
             * didn't processed, so we check if the event is still valid. 
             * 
             * Fire the readable event if the call sequence is not 
             * inverted. */ 
            if (!invert && fe->mask & mask & AE_READABLE) { 
                fe->rfileProc(eventLoop,fd,fe->clientData,mask); 
                fired++; 
            } 

            /* Fire the writable event. */ 
            if (fe->mask & mask & AE_WRITABLE) { 
                if (!fired || fe->wfileProc != fe->rfileProc) { 
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask); 
                    fired++; 
                } 
            } 

            /* If we have to invert the call, fire the readable event now 
             * after the writable one. */ 
            if (invert && fe->mask & mask & AE_READABLE) { 
                if (!fired || fe->wfileProc != fe->rfileProc) { 
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask); 
                    fired++; 
                } 
            } 

            processed++; 
        } 
    } 
    /* Check time events */ 
    if (flags & AE_TIME_EVENTS) 
        processed += processTimeEvents(eventLoop); 

    return processed; /* return the number of processed file/time events */ 
} 

可以看到,函数中调用aeApiPoll函数,在select模型中,该函数返回了当前已触发可读、写的fd个数,注意的是,该函数内部已经帮我们select出了触发态的fd,并且放在fired数组中。

客户端连接

接上,在for循环中,我们根据触发的fd找到其对应event结构。对于监听的fd,我们挂载了acceptTcpHandler回调函数,当有客户端尝试连接服务时,就会调用回调函数,即就是执行acceptTcpHandler函数。

// networking.c 
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { 
    int cport, cfd, max = MAX_ACCEPTS_PER_CALL; 
    char cip[NET_IP_STR_LEN]; 
    UNUSED(el); 
    UNUSED(mask); 
    UNUSED(privdata); 

    while(max--) { 
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); 
        if (cfd == ANET_ERR) { 
            if (errno != EWOULDBLOCK) 
                serverLog(LL_WARNING, 
                    "Accepting client connection: %s", server.neterr); 
            return; 
        } 
        serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport); 
        acceptCommonHandler(cfd,0,cip); 
    } 
} 

acceptTcpHandler中首先调用了底层的accept函数接收连接,紧接着给连接设置一个回调处理函数,这就是Reactor模式。

acceptCommonHandler函数主要是调用了createClient函数来创建一个客户端连接。

// networkding.c 
client *createClient(int fd) { 
    client *c = zmalloc(sizeof(client)); 

    /* passing -1 as fd it is possible to create a non connected client. 
     * This is useful since all the commands needs to be executed 
     * in the context of a client. When commands are executed in other 
     * contexts (for instance a Lua script) we need a non connected client. */ 
    if (fd != -1) { 
        anetNonBlock(NULL,fd); 
        anetEnableTcpNoDelay(NULL,fd); 
        if (server.tcpkeepalive) 
            anetKeepAlive(NULL,fd,server.tcpkeepalive); 
        if (aeCreateFileEvent(server.el,fd,AE_READABLE, 
            readQueryFromClient, c) == AE_ERR) 
        { 
            close(fd); 
            zfree(c); 
            return NULL; 
        } 
        /**************此处省略代码**************/ 
    } 

可以看到,在系统给客户端的连接fd又创建了一个event,这些设置的回调函数是readQueryFromClient,这个函数就是真正的接收客户端请求了。在创建event的时候把event中的clientData指向了客户端本身,从而将client参数传递到回调函数。

客户端请求

readQueryFromClient函数系统调用read来收取网络包,当然了,用的是redis的RESP协议,我们在readQueryFromClient给函数打断点,查看一下收到的数据包。

gdb启动调试模式,用b readQueryFromClient设断点,然后输入r运行程序,用客户端连接redis。为了方便查看源码,我们再gdb模式下输入layout src,源码就展示了,效果如图。

需要注意的是,当我们用redis客户端连接的时候,客户端连接后会发送一系列请求,这些请求都会经过readQueryFromClient函数处理。我们先跳过这些,等待客户端完全就绪,紧接着我们在客户端命令行中输入get name命令,然后再gdb模式下输入b单步调试。

在执行完nread = read(fd, c->querybuf+qblen, readlen); 行后,我们再gdb中输入p c->querybuf打印查看变量,结果是这样的。

可见,此时c->querybuf已经收到存放了我们的输入命令,值得注意的时,我们在开始的时候先给querybuf通过sdsMakeRoomFor分配了空间,但是实际接收到的数据可能没那么大,所有在read后我们根据实际的数据大小重新调整querybuf的内存,也就是在这里,我们把接收到的网络字节流转换成了sds。

接下来调用processInputBufferAndReplicate,然后进入到真正处理请求的函数processInputBuffer

// networking.c 
void processInputBuffer(client *c) { 
    server.current_client = c; 

    /* Keep processing while there is something in the input buffer */ 
    while(c->qb_pos < sdslen(c->querybuf)) { 
        /* Return if clients are paused. */ 
        if (!(c->flags & CLIENT_SLAVE) && clientsArePaused()) break; 

        /* Immediately abort if the client is in the middle of something. */ 
        if (c->flags & CLIENT_BLOCKED) break; 

        /* Don't process input from the master while there is a busy script 
         * condition on the slave. We want just to accumulate the replication 
         * stream (instead of replying -BUSY like we do with other clients) and 
         * later resume the processing. */ 
        if (server.lua_timedout && c->flags & CLIENT_MASTER) break; 

        /* CLIENT_CLOSE_AFTER_REPLY closes the connection once the reply is 
         * written to the client. Make sure to not let the reply grow after 
         * this flag has been set (i.e. don't process more commands). 
         * 
         * The same applies for clients we want to terminate ASAP. */ 
        if (c->flags & (CLIENT_CLOSE_AFTER_REPLY|CLIENT_CLOSE_ASAP)) break; 

        /* Determine request type when unknown. */ 
        if (!c->reqtype) { 
            if (c->querybuf[c->qb_pos] == '*') { 
                c->reqtype = PROTO_REQ_MULTIBULK; 
            } else { 
                c->reqtype = PROTO_REQ_INLINE; 
            } 
        } 

        if (c->reqtype == PROTO_REQ_INLINE) { 
            if (processInlineBuffer(c) != C_OK) break; 
        } else if (c->reqtype == PROTO_REQ_MULTIBULK) { 
            if (processMultibulkBuffer(c) != C_OK) break; 
        } else { 
            serverPanic("Unknown request type"); 
        } 

        /* Multibulk processing could see a <= 0 length. */ 
        if (c->argc == 0) { 
            resetClient(c); 
        } else { 
            /* Only reset the client when the command was executed. */ 
            if (processCommand(c) == C_OK) { 
                if (c->flags & CLIENT_MASTER && !(c->flags & CLIENT_MULTI)) { 
                    /* Update the applied replication offset of our master. */ 
                    c->reploff = c->read_reploff - sdslen(c->querybuf) + c->qb_pos; 
                } 

                /* Don't reset the client structure for clients blocked in a 
                 * module blocking command, so that the reply callback will 
                 * still be able to access the client argv and argc field. 
                 * The client will be reset in unblockClientFromModule(). */ 
                if (!(c->flags & CLIENT_BLOCKED) || c->btype != BLOCKED_MODULE) 
                    resetClient(c); 
            } 
            /* freeMemoryIfNeeded may flush slave output buffers. This may 
             * result into a slave, that may be the active client, to be 
             * freed. */ 
            if (server.current_client == NULL) break; 
        } 
    } 

    /* Trim to pos */ 
    if (server.current_client != NULL && c->qb_pos) { 
        sdsrange(c->querybuf,c->qb_pos,-1); 
        c->qb_pos = 0; 
    } 

    server.current_client = NULL; 
} 

processInlineBuffer函数中,我们将收到的socket数据包拆解成单独字段,并存储在c->argv数组中,每个参数都是OBJ_STRING类型。

接下来进入真正的命令处理函数processCommand

int processCommand(client *c) { 
    moduleCallCommandFilters(c); 

    /* The QUIT command is handled separately. Normal command procs will 
     * go through checking for replication and QUIT will cause trouble 
     * when FORCE_REPLICATION is enabled and would be implemented in 
     * a regular command proc. */ 
    if (!strcasecmp(c->argv[0]->ptr,"quit")) { 
        addReply(c,shared.ok); 
        c->flags |= CLIENT_CLOSE_AFTER_REPLY; 
        return C_ERR; 
    } 

    /* Now lookup the command and check ASAP about trivial error conditions 
     * such as wrong arity, bad command name and so forth. */ 
    c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr); 

    /**************此处省略代码**************/ 

    /* Exec the command */ 
    if (c->flags & CLIENT_MULTI && 
        c->cmd->proc != execCommand && c->cmd->proc != discardCommand && 
        c->cmd->proc != multiCommand && c->cmd->proc != watchCommand) 
    { 
        queueMultiCommand(c); 
        addReply(c,shared.queued); 
    } else { 
        call(c,CMD_CALL_FULL); 
        c->woff = server.master_repl_offset; 
        if (listLength(server.ready_keys)) 
            handleClientsBlockedOnKeys(); 
    } 
    return C_OK; 
} 

值得注意的是在函数的开头先调用了moduleCallCommandFilters函数,大概看了下函数介绍,其功能是给命令添加自定义的过滤器,对所有命令生效,方便命令的扩展。

processCommand函数首先根据输入指通过lookupCommand令找到对应的指令处理函数,而且该内部做了很多特殊命令检查,我们知道redis除了有客户端的get/set命令外还有很多命令是内部通信使用的,而且redis还支持简单的事务,这些命令在在处理起来是普通的get/set有一定的区别。可以看到,在函数的最后,分别进行了处理。如果我们是get/set,则执行call(c,CMD_CALL_FULL);函数。

//server.c 
void call(client *c, int flags) { 
    /**************此处省略代码**************/ 
    /* Call the command. */ 
    dirty = server.dirty; 
    updateCachedTime(0); 
    start = server.ustime; 
    c->cmd->proc(c); 
    duration = ustime()-start; 
    dirty = server.dirty-dirty; 
    /**************此处省略代码**************/ 
} 

函数中最关键的就是这几行了,c->cmd->proc就是客户端的命令处理函数,我们可以在redisCommandTable中找到发送的命令。我们是get命令,那么调用的就是getCommand函数。

getCommand命令内部实现大概是查找key,然后返回value,需要注意的是redis中dict的rehash特殊逻辑,所以在查找的时候会查找两个dict。