您是小站的第 35369 位访客,欢迎~

您现在的位置是: 网站首页 > 程序设计  > redis 

redis源码分析——1、网络框架

2021年7月12日 23:40 420人围观

简介我们知道redis用的epoll,但是底层的代码到底是怎样一步步起来的,本文解读redis的网络框架,一探究竟。

一、 核心数据结构

  1. ConnectionTypeConnectionType定义了网络连接的接口,包含readwrite等,具体定义如下。
   typedef struct ConnectionType { 
       void (*ae_handler)(struct aeEventLoop *el, int fd, void *clientData, int mask); 
       int (*connect)(struct connection *conn, const char *addr, int port, const char *source_addr, ConnectionCallbackFunc connect_handler); 
       int (*write)(struct connection *conn, const void *data, size_t data_len); 
       int (*read)(struct connection *conn, void *buf, size_t buf_len); 
       void (*close)(struct connection *conn); 
       int (*accept)(struct connection *conn, ConnectionCallbackFunc accept_handler); 
       int (*set_write_handler)(struct connection *conn, ConnectionCallbackFunc handler, int barrier); 
       int (*set_read_handler)(struct connection *conn, ConnectionCallbackFunc handler); 
       const char *(*get_last_error)(struct connection *conn); 
       int (*blocking_connect)(struct connection *conn, const char *addr, int port, long long timeout); 
       ssize_t (*sync_write)(struct connection *conn, char *ptr, ssize_t size, long long timeout); 
       ssize_t (*sync_read)(struct connection *conn, char *ptr, ssize_t size, long long timeout); 
       ssize_t (*sync_readline)(struct connection *conn, char *ptr, ssize_t size, long long timeout); 
   } ConnectionType; 

在源码中实现了两种connection,分别是CT_SocketCT_TLS,这两种也是redis支持的连接类型,redis也支持本地unix socket,但这归根也是socket,其读写和网络socket是一样的。这两种的实现分别如下:

CT_Socket:

   ConnectionType CT_Socket = { 
       .ae_handler = connSocketEventHandler, 
       .close = connSocketClose, 
       .write = connSocketWrite, 
       .read = connSocketRead, 
       .accept = connSocketAccept, 
       .connect = connSocketConnect, 
       .set_write_handler = connSocketSetWriteHandler, 
       .set_read_handler = connSocketSetReadHandler, 
       .get_last_error = connSocketGetLastError, 
       .blocking_connect = connSocketBlockingConnect, 
       .sync_write = connSocketSyncWrite, 
       .sync_read = connSocketSyncRead, 
       .sync_readline = connSocketSyncReadLine 
   }; 

CT_TLS:

   ConnectionType CT_TLS = { 
       .ae_handler = tlsEventHandler, 
       .accept = connTLSAccept, 
       .connect = connTLSConnect, 
       .blocking_connect = connTLSBlockingConnect, 
       .read = connTLSRead, 
       .write = connTLSWrite, 
       .close = connTLSClose, 
       .set_write_handler = connTLSSetWriteHandler, 
       .set_read_handler = connTLSSetReadHandler, 
       .get_last_error = connTLSGetLastError, 
       .sync_write = connTLSSyncWrite, 
       .sync_read = connTLSSyncRead, 
       .sync_readline = connTLSSyncReadLine, 
   } 

这个结构是核心,后续的各种网络操作,都是通过ConnectionType中的指针调用的。

  1. struct connection:该结构的一个完成的连接,客户端的fd封装成一个connection。该结构与ConnectionType配合使用,可以认为ConnectionType是操作接口(所有函数的第一个参数都是connection),而struct connection是操作对象,set_r_w_hander都是设置struct connection中的handler
   struct connection { 
       ConnectionType *type; 
       ConnectionState state; 
       short int flags; 
       short int refs; 
       int last_errno; 
       void *private_data; 
       ConnectionCallbackFunc conn_handler; 
       ConnectionCallbackFunc write_handler; 
       ConnectionCallbackFunc read_handler; 
       int fd; 
   }; 
  1. aeEventLoop,redis是单线程非阻塞io,网络的结构封装在aeEventLoop结构中
   /* State of an event based program */ 
   typedef struct aeEventLoop { 
       int maxfd;   /* highest file descriptor currently registered */ 
       int setsize; /* max number of file descriptors tracked */ 
       long long timeEventNextId; 
       time_t lastTime;     /* Used to detect system clock skew */ 
       aeFileEvent *events; /* Registered events */ 
       aeFiredEvent *fired; /* Fired events */ 
       aeTimeEvent *timeEventHead; 
       int stop; 
       void *apidata; /* This is used for polling API specific data */ 
       aeBeforeSleepProc *beforesleep; 
       aeBeforeSleepProc *aftersleep; 
       int flags; 
   } aeEventLoop; 

最核心的两个数据就是eventsfired,分别代表了在监听的fd有事件触发的fd。

aeFileEvent:

   /* File event structure */ 
   typedef struct aeFileEvent { 
       int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */ 
       aeFileProc *rfileProc; 
       aeFileProc *wfileProc; 
       void *clientData; 
   } aeFileEvent; 

其中rfileProcwfileProc分别是该事件对应的读写回调。

aeFiredEvent:

   /* A fired event */ 
   typedef struct aeFiredEvent { 
       int fd; 
       int mask; 
   } aeFiredEvent; 

aeFiredEvent的定义很简单,就一个fd,再加一个该fd是否可读写。

可以看到,在aeEventLoop中定义的eventsfired都是指针,指向了外面分配的一个数组,其大小是server.maxclients+**CONFIG_FDSET_INCR**

二、网络框架

  1. 等待连接
   main() 
   initServer() 
    server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR) 
    listenToPort(server.port,server.ipfd,&server.ipfd_count) 
    aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL) 
        // 设置rfileProc和wfileProc都为 acceptTcpHandler 
   InitServerLast() 
    initThreadedIO() 
        pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) 
   aeMain(server.el) 
    aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); 
        fe->rfileProc(eventLoop,fd,fe->clientData,mask); // 如果触发的监听的fd,则走acceptTcpHandler回调 
            acceptTcpHandler() 
                anetTcpAccept() 
                   connCreateAcceptedSocket() 
                    connCreateSocket() 
                        conn->type = &CT_Socket;  // 最终创建了ConnectionType为CT_Socket的一个连接 
                acceptCommonHandler() 
                       createClient() // 创建一个真正的client 
                        connSetReadHandler(conn, readQueryFromClient); // 个客户端fd设置 readQueryFromClient 回调,epoll触发时调用 
                            set_read_handler->connSocketSetReadHandler(conn, func) 
                                   aeCreateFileEvent(server.el,conn->fd, AE_READABLE,conn->type->ae_handler,conn) // 注册fd 
                        linkClient() 
                            raxInsert(server.clients_index,(unsigned char*)&id,sizeof(id),c,NULL); 
                    connAccept(conn, clientAcceptHandler)  
                           connSocketAccept 
                            callHandler(conn, accept_handler) 
                                accept->connSocketAccept 
        fe->wfileProc(eventLoop,fd,fe->clientData,mask); 
  1. 读请求
   aeMain(server.el) 
    aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); 
        fe->rfileProc(eventLoop,fd,fe->clientData,mask); // 如果触发的是客户端fd,则走 readQueryFromClient 回调 
            readQueryFromClient() 

三、主要函数

  1. aeCreateFileEvent: 该函数将redis监听的fd都加到epoll队列中,并且对fd绑定了回调函数acceptTcpHandler用于接收客户端的连接请求。
   int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask, 
           aeFileProc *proc, void *clientData) 
   { 
       if (fd >= eventLoop->setsize) { 
           errno = ERANGE; 
           return AE_ERR; 
       } 
       aeFileEvent *fe = &eventLoop->events[fd]; 

       if (aeApiAddEvent(eventLoop, fd, mask) == -1) // 将监听的fd加到epoll 
           return AE_ERR; 
       fe->mask |= mask; 
       if (mask & AE_READABLE) fe->rfileProc = proc;  // 读写回调都是 acceptTcpHandler 
       if (mask & AE_WRITABLE) fe->wfileProc = proc; 
       fe->clientData = clientData; 
       if (fd > eventLoop->maxfd) 
           eventLoop->maxfd = fd; 
       return AE_OK; 
   } 
  1. acceptTcpHandler: 用于接收客户端连接请求,其中anetTcpAccept内部是系统调用accept接口。该函数内部的acceptCommonHandler是重要操作,通过connCreateAcceptedSocket将客户端的fd转换成一个connection,在connCreateAcceptedSocket内部通过connCreateSocket创建一个connection,其类型为CT_Socket
   void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) { 
       int cport, cfd, max = MAX_ACCEPTS_PER_CALL; 
       char cip[NET_IP_STR_LEN]; 
       UNUSED(el); 
       UNUSED(mask); 
       UNUSED(privdata); 

       while(max--) { 
           cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); // 内部调用accept,返回成功fd 
           if (cfd == ANET_ERR) { 
               if (errno != EWOULDBLOCK) 
                   serverLog(LL_WARNING, 
                       "Accepting client connection: %s", server.neterr); 
               return; 
           } 
           serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport); 
           acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip); 
       } 
   } 
  1. acceptCommonHandler:根据connection创建一个client,并将该客户端链接到server.clients。在新建client的时候有个重要操作connSetReadHandler,这个操作绑定了客户端的读取实现。
   static void acceptCommonHandler(connection *conn, int flags, char *ip) { 
       client *c; 
       UNUSED(ip); 

       /* Admission control will happen before a client is created and connAccept() 
        * called, because we don't want to even start transport-level negotiation 
        * if rejected. 
        */ 
       if (listLength(server.clients) >= server.maxclients) { 
           char *err = "-ERR max number of clients reached\r\n"; 

           /* That's a best effort error message, don't check write errors. 
            * Note that for TLS connections, no handshake was done yet so nothing is written 
            * and the connection will just drop. 
            */ 
           if (connWrite(conn,err,strlen(err)) == -1) { 
               /* Nothing to do, Just to avoid the warning... */ 
           } 
           server.stat_rejected_conn++; 
           connClose(conn); 
           return; 
       } 

       /* Create connection and client */ 
       if ((c = createClient(conn)) == NULL) {  // 这里创建了客户端,并且画挂载到server.clients列表 
           char conninfo[100]; 
           serverLog(LL_WARNING, 
               "Error registering fd event for the new client: %s (conn: %s)", 
               connGetLastError(conn), 
               connGetInfo(conn, conninfo, sizeof(conninfo))); 
           connClose(conn); /* May be already closed, just ignore errors */ 
           return; 
       } 

       /* Last chance to keep flags */ 
       c->flags |= flags; 

       /* Initiate accept. 
        * 
        * Note that connAccept() is free to do two things here: 
        * 1. Call clientAcceptHandler() immediately; 
        * 2. Schedule a future call to clientAcceptHandler(). 
        * 
        * Because of that, we must do nothing else afterwards. 
        */ 
       if (connAccept(conn, clientAcceptHandler) == C_ERR) {  
           char conninfo[100]; 
           if (connGetState(conn) == CONN_STATE_ERROR) 
               serverLog(LL_WARNING, 
                       "Error accepting a client connection: %s (conn: %s)", 
                       connGetLastError(conn), connGetInfo(conn, conninfo, sizeof(conninfo))); 
           freeClient(connGetPrivateData(conn)); 
           return; 
       } 
   } 
  1. createClient:最主要的是connSetReadHandler,其实是调用CT_Socketset_read_handler接口,也就是connSocketSetReadHandler函数,读取请求就是readQueryFromClient函数。
   client *createClient(connection *conn) { 
       client *c = zmalloc(sizeof(client)); 

       /* passing NULL as conn it is possible to create a non connected client. 
        * This is useful since all the commands needs to be executed 
        * in the context of a client. When commands are executed in other 
        * contexts (for instance a Lua script) we need a non connected client. */ 
       if (conn) { 
           connNonBlock(conn); 
           connEnableTcpNoDelay(conn); 
           if (server.tcpkeepalive) 
               connKeepAlive(conn,server.tcpkeepalive); 
           connSetReadHandler(conn, readQueryFromClient); 
           connSetPrivateData(conn, c); 
       } 
       ////////////省略代码///////////// 
  1. connSocketSetReadHandler:该函数的目的是把客户端的connection注册到epoll队列中,等待读写触发,从而实现了非阻塞。
   /* Register a read handler, to be called when the connection is readable. 
    * If NULL, the existing handler is removed. 
    */ 
   static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) { 
       if (func == conn->read_handler) return C_OK; 

       conn->read_handler = func; 
       if (!conn->read_handler) 
           aeDeleteFileEvent(server.el,conn->fd,AE_READABLE); 
       else 
           if (aeCreateFileEvent(server.el,conn->fd, 
                       AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR; 
       return C_OK; 
   }