您现在的位置是: 网站首页 > 程序设计  > redis 

redis源码分析——2、读写应答

2021年8月1日 19:21 4164人围观

简介了解了redis网络框架后,我们具体看看请求->回复的过程到底是怎么样的

一、读回调

  1. 客户端的消息处理循环

有上一节可知,redis通过epoll来检测是否有客户端接入,一旦有请求,则会调用acceptTcpHandler,然后再调用createClient将fd和client对应起来,最后挂在到全局的server.clients链表上面。在createClient里面又调用了connSetReadHandler**(conn, readQueryFromClient)来将client加入到epoll中。

set_read_handler接口的实现如上,需要注意的是这里给fd的读写绑定的是 ae_handler也就是connSocketEventHandler

   static void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientData, int mask) 
   { 
       UNUSED(el); 
       UNUSED(fd); 
       connection *conn = clientData; 

       if (conn->state == CONN_STATE_CONNECTING && 
               (mask & AE_WRITABLE) && conn->conn_handler) { 

           if (connGetSocketError(conn)) { 
               conn->last_errno = errno; 
               conn->state = CONN_STATE_ERROR; 
           } else { 
               conn->state = CONN_STATE_CONNECTED; 
           } 

           if (!conn->write_handler) aeDeleteFileEvent(server.el,conn->fd,AE_WRITABLE); 

           if (!callHandler(conn, conn->conn_handler)) return; 
           conn->conn_handler = NULL; 
       } 

       /* Normally we execute the readable event first, and the writable 
        * event later. This is useful as sometimes we may be able 
        * to serve the reply of a query immediately after processing the 
        * query. 
        * 
        * However if WRITE_BARRIER is set in the mask, our application is 
        * asking us to do the reverse: never fire the writable event 
        * after the readable. In such a case, we invert the calls. 
        * This is useful when, for instance, we want to do things 
        * in the beforeSleep() hook, like fsync'ing a file to disk, 
        * before replying to a client. */ 
       int invert = conn->flags & CONN_FLAG_WRITE_BARRIER; 

       int call_write = (mask & AE_WRITABLE) && conn->write_handler; 
       int call_read = (mask & AE_READABLE) && conn->read_handler; 

       /* Handle normal I/O flows */ 
       if (!invert && call_read) { 
           if (!callHandler(conn, conn->read_handler)) return; 
       } 
       /* Fire the writable event. */ 
       if (call_write) { 
           if (!callHandler(conn, conn->write_handler)) return; 
       } 
       /* If we have to invert the call, fire the readable event now 
        * after the writable one. */ 
       if (invert && call_read) { 
           if (!callHandler(conn, conn->read_handler)) return; 
       } 
   } 

connSocketEventHandler函数既有读调用,也有写调用,因此,我们可以把connSocketEventHandler看成是一个socket connection的处理中心。也就是说在主框架的epoll中并不会直接调用客户端的读写回调,而是会把这个epoll触发下发到connSocketEventHandler中具体处理,这样一来相当于是框架与socket connection解耦了。上节也提到了ConnectionType有两个实现,一个是CT_Socket,另一个是CT_TLS

二、写回调

  1. 由上面我们知道了redis的读触发,那回复客户端的写操作是在什么时候做的呢?

在redis中,并不是取到数据后立即回复给客户端的,而是在“下一轮”循环中发送的。再看aeMain的实现:

   void aeMain(aeEventLoop *eventLoop) { 
       eventLoop->stop = 0; 
       while (!eventLoop->stop) { 
           if (eventLoop->beforesleep != NULL) 
               eventLoop->beforesleep(eventLoop);  // 每次判断触发前先执行一次beforsleep,对应的就是beforeSleep函数 
           aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP); 
       } 
   } 

可知,在每次epoll前先要执行一次beforeSleep调用,这个函数的操作较多,我们只看和回复有关的

   /* This function gets called every time Redis is entering the 
    * main loop of the event driven library, that is, before to sleep 
    * for ready file descriptors. */ 
   void beforeSleep(struct aeEventLoop *eventLoop) { 
       UNUSED(eventLoop); 

       /* Handle precise timeouts of blocked clients. */ 
       handleBlockedClientsTimeout(); 

       /* We should handle pending reads clients ASAP after event loop. */ 
       handleClientsWithPendingReadsUsingThreads(); 

    /***************** 省略一些代码 ****************/ 

       /* Handle writes with pending output buffers. */ 
       handleClientsWithPendingWritesUsingThreads(); 

       /* Close clients that need to be closed asynchronous */ 
       freeClientsInAsyncFreeQueue(); 

       /* Before we are going to sleep, let the threads access the dataset by 
        * releasing the GIL. Redis main thread will not touch anything at this 
        * time. */ 
       if (moduleCount()) moduleReleaseGIL(); 
   } 

可以看到有handleClientsWithPendingReadsUsingThreadshandleClientsWithPendingWritesUsingThreads,我们只看write相关的,顺便提一下,如果开启read多线程,其线程间的路由就是在handleClientsWithPendingReadsUsingThreads中实现的。

   int handleClientsWithPendingWritesUsingThreads(void) { 
       int processed = listLength(server.clients_pending_write); 
       if (processed == 0) return 0; /* Return ASAP if there are no clients. */ 

       /* If we have just a few clients to serve, don't use I/O threads, but the 
        * boring synchronous code. */ 
       if (stopThreadedIOIfNeeded()) { 
           return handleClientsWithPendingWrites();  // 如果是单线程,直接走这里了 
       } 

       /* Start threads if needed. */ 
       if (!io_threads_active) startThreadedIO(); 

       if (tio_debug) printf("%d TOTAL WRITE pending clients\n", processed); 

       /* Distribute the clients across N different lists. */ 
       listIter li; 
       listNode *ln; 
       listRewind(server.clients_pending_write,&li); 
       int item_id = 0; 
       while((ln = listNext(&li))) { 
           client *c = listNodeValue(ln); 
           c->flags &= ~CLIENT_PENDING_WRITE; 
           int target_id = item_id % server.io_threads_num; 
           listAddNodeTail(io_threads_list[target_id],c); 
           item_id++; 
       } 

       /* Give the start condition to the waiting threads, by setting the 
        * start condition atomic var. */ 
       io_threads_op = IO_THREADS_OP_WRITE; 
       for (int j = 1; j < server.io_threads_num; j++) { 
           int count = listLength(io_threads_list[j]); 
           io_threads_pending[j] = count; 
       } 

       /* Also use the main thread to process a slice of clients. */ 
       listRewind(io_threads_list[0],&li); 
       while((ln = listNext(&li))) { 
           client *c = listNodeValue(ln); 
           writeToClient(c,0); 
       } 
       listEmpty(io_threads_list[0]); 

       /* Wait for all the other threads to end their work. */ 
       while(1) { 
           unsigned long pending = 0; 
           for (int j = 1; j < server.io_threads_num; j++) 
               pending += io_threads_pending[j]; 
           if (pending == 0) break; 
       } 
       if (tio_debug) printf("I/O WRITE All threads finshed\n"); 

       /* Run the list of clients again to install the write handler where 
        * needed. */ 
       listRewind(server.clients_pending_write,&li); 
       while((ln = listNext(&li))) { 
           client *c = listNodeValue(ln); 

           /* Install the write handler if there are pending writes in some 
            * of the clients. */ 
           if (clientHasPendingReplies(c) && 
                   connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)  // 对于没发完的回复包,设置write_handler发送 
           { 
               freeClientAsync(c); 
           } 
       } 
       listEmpty(server.clients_pending_write); 
       return processed; 
   } 

handleClientsWithPendingWrites中实现了单线程模型的回复请求过程,如果说是发送缓冲区满了,发布出去了,则设置一个写回调sendReplyToClientsendReplyToClient最后又在文章开头的connSocketEventHandler中调用。

   /* This function is called just before entering the event loop, in the hope 
    * we can just write the replies to the client output buffer without any 
    * need to use a syscall in order to install the writable event handler, 
    * get it called, and so forth. */ 
   int handleClientsWithPendingWrites(void) { 
       listIter li; 
       listNode *ln; 
       int processed = listLength(server.clients_pending_write); 

       listRewind(server.clients_pending_write,&li); 
       while((ln = listNext(&li))) { 
           client *c = listNodeValue(ln); 
           c->flags &= ~CLIENT_PENDING_WRITE; 
           listDelNode(server.clients_pending_write,ln); 

           /* If a client is protected, don't do anything, 
            * that may trigger write error or recreate handler. */ 
           if (c->flags & CLIENT_PROTECTED) continue; 

           /* Try to write buffers to the client socket. */ 
           if (writeToClient(c,0) == C_ERR) continue;  // 如果一次没发完,则继续发 

           /* If after the synchronous writes above we still have data to 
            * output to the client, we need to install the writable handler. */ 
           if (clientHasPendingReplies(c)) { 
               int ae_barrier = 0; 
               /* For the fsync=always policy, we want that a given FD is never 
                * served for reading and writing in the same event loop iteration, 
                * so that in the middle of receiving the query, and serving it 
                * to the client, we'll call beforeSleep() that will do the 
                * actual fsync of AOF to disk. the write barrier ensures that. */ 
               if (server.aof_state == AOF_ON && 
                   server.aof_fsync == AOF_FSYNC_ALWAYS) 
               { 
                   ae_barrier = 1; 
               } 
               if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_barrier) == C_ERR) { 
                   freeClientAsync(c); 
               } 
           } 
       } 
       return processed; 
   } 

三、总结

​ 和大部分服务一样,redis在处理完数据以后并不是直接回复,而是在下一次前epoll前处理,可是redis是单线程的,这么做的目的是什么呢?是为了照顾其他客户端吗?