redis源码分析——2、读写应答
一、读回调
- 客户端的消息处理循环
有上一节可知,redis通过epoll来检测是否有客户端接入,一旦有请求,则会调用acceptTcpHandler
,然后再调用createClient
将fd和client对应起来,最后挂在到全局的server.clients
链表上面。在createClient
里面又调用了connSetReadHandler**(conn, readQueryFromClient)
来将client加入到epoll中。
set_read_handler
接口的实现如上,需要注意的是这里给fd的读写绑定的是 ae_handler
也就是connSocketEventHandler
。
static void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientData, int mask)
{
UNUSED(el);
UNUSED(fd);
connection *conn = clientData;
if (conn->state == CONN_STATE_CONNECTING &&
(mask & AE_WRITABLE) && conn->conn_handler) {
if (connGetSocketError(conn)) {
conn->last_errno = errno;
conn->state = CONN_STATE_ERROR;
} else {
conn->state = CONN_STATE_CONNECTED;
}
if (!conn->write_handler) aeDeleteFileEvent(server.el,conn->fd,AE_WRITABLE);
if (!callHandler(conn, conn->conn_handler)) return;
conn->conn_handler = NULL;
}
/* Normally we execute the readable event first, and the writable
* event later. This is useful as sometimes we may be able
* to serve the reply of a query immediately after processing the
* query.
*
* However if WRITE_BARRIER is set in the mask, our application is
* asking us to do the reverse: never fire the writable event
* after the readable. In such a case, we invert the calls.
* This is useful when, for instance, we want to do things
* in the beforeSleep() hook, like fsync'ing a file to disk,
* before replying to a client. */
int invert = conn->flags & CONN_FLAG_WRITE_BARRIER;
int call_write = (mask & AE_WRITABLE) && conn->write_handler;
int call_read = (mask & AE_READABLE) && conn->read_handler;
/* Handle normal I/O flows */
if (!invert && call_read) {
if (!callHandler(conn, conn->read_handler)) return;
}
/* Fire the writable event. */
if (call_write) {
if (!callHandler(conn, conn->write_handler)) return;
}
/* If we have to invert the call, fire the readable event now
* after the writable one. */
if (invert && call_read) {
if (!callHandler(conn, conn->read_handler)) return;
}
}
在connSocketEventHandler
函数既有读调用,也有写调用,因此,我们可以把connSocketEventHandler
看成是一个socket connection的处理中心。也就是说在主框架的epoll中并不会直接调用客户端的读写回调,而是会把这个epoll触发下发到connSocketEventHandler
中具体处理,这样一来相当于是框架与socket connection解耦了。上节也提到了ConnectionType
有两个实现,一个是CT_Socket
,另一个是CT_TLS
。
二、写回调
- 由上面我们知道了redis的读触发,那回复客户端的写操作是在什么时候做的呢?
在redis中,并不是取到数据后立即回复给客户端的,而是在“下一轮”循环中发送的。再看aeMain
的实现:
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop); // 每次判断触发前先执行一次beforsleep,对应的就是beforeSleep函数
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
}
可知,在每次epoll前先要执行一次beforeSleep
调用,这个函数的操作较多,我们只看和回复有关的
/* This function gets called every time Redis is entering the
* main loop of the event driven library, that is, before to sleep
* for ready file descriptors. */
void beforeSleep(struct aeEventLoop *eventLoop) {
UNUSED(eventLoop);
/* Handle precise timeouts of blocked clients. */
handleBlockedClientsTimeout();
/* We should handle pending reads clients ASAP after event loop. */
handleClientsWithPendingReadsUsingThreads();
/***************** 省略一些代码 ****************/
/* Handle writes with pending output buffers. */
handleClientsWithPendingWritesUsingThreads();
/* Close clients that need to be closed asynchronous */
freeClientsInAsyncFreeQueue();
/* Before we are going to sleep, let the threads access the dataset by
* releasing the GIL. Redis main thread will not touch anything at this
* time. */
if (moduleCount()) moduleReleaseGIL();
}
可以看到有handleClientsWithPendingReadsUsingThreads
和handleClientsWithPendingWritesUsingThreads
,我们只看write相关的,顺便提一下,如果开启read多线程,其线程间的路由就是在handleClientsWithPendingReadsUsingThreads
中实现的。
int handleClientsWithPendingWritesUsingThreads(void) {
int processed = listLength(server.clients_pending_write);
if (processed == 0) return 0; /* Return ASAP if there are no clients. */
/* If we have just a few clients to serve, don't use I/O threads, but the
* boring synchronous code. */
if (stopThreadedIOIfNeeded()) {
return handleClientsWithPendingWrites(); // 如果是单线程,直接走这里了
}
/* Start threads if needed. */
if (!io_threads_active) startThreadedIO();
if (tio_debug) printf("%d TOTAL WRITE pending clients\n", processed);
/* Distribute the clients across N different lists. */
listIter li;
listNode *ln;
listRewind(server.clients_pending_write,&li);
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
/* Give the start condition to the waiting threads, by setting the
* start condition atomic var. */
io_threads_op = IO_THREADS_OP_WRITE;
for (int j = 1; j < server.io_threads_num; j++) {
int count = listLength(io_threads_list[j]);
io_threads_pending[j] = count;
}
/* Also use the main thread to process a slice of clients. */
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
writeToClient(c,0);
}
listEmpty(io_threads_list[0]);
/* Wait for all the other threads to end their work. */
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += io_threads_pending[j];
if (pending == 0) break;
}
if (tio_debug) printf("I/O WRITE All threads finshed\n");
/* Run the list of clients again to install the write handler where
* needed. */
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
/* Install the write handler if there are pending writes in some
* of the clients. */
if (clientHasPendingReplies(c) &&
connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR) // 对于没发完的回复包,设置write_handler发送
{
freeClientAsync(c);
}
}
listEmpty(server.clients_pending_write);
return processed;
}
handleClientsWithPendingWrites
中实现了单线程模型的回复请求过程,如果说是发送缓冲区满了,发布出去了,则设置一个写回调sendReplyToClient
,sendReplyToClient
最后又在文章开头的connSocketEventHandler
中调用。
/* This function is called just before entering the event loop, in the hope
* we can just write the replies to the client output buffer without any
* need to use a syscall in order to install the writable event handler,
* get it called, and so forth. */
int handleClientsWithPendingWrites(void) {
listIter li;
listNode *ln;
int processed = listLength(server.clients_pending_write);
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
listDelNode(server.clients_pending_write,ln);
/* If a client is protected, don't do anything,
* that may trigger write error or recreate handler. */
if (c->flags & CLIENT_PROTECTED) continue;
/* Try to write buffers to the client socket. */
if (writeToClient(c,0) == C_ERR) continue; // 如果一次没发完,则继续发
/* If after the synchronous writes above we still have data to
* output to the client, we need to install the writable handler. */
if (clientHasPendingReplies(c)) {
int ae_barrier = 0;
/* For the fsync=always policy, we want that a given FD is never
* served for reading and writing in the same event loop iteration,
* so that in the middle of receiving the query, and serving it
* to the client, we'll call beforeSleep() that will do the
* actual fsync of AOF to disk. the write barrier ensures that. */
if (server.aof_state == AOF_ON &&
server.aof_fsync == AOF_FSYNC_ALWAYS)
{
ae_barrier = 1;
}
if (connSetWriteHandlerWithBarrier(c->conn, sendReplyToClient, ae_barrier) == C_ERR) {
freeClientAsync(c);
}
}
}
return processed;
}
三、总结
和大部分服务一样,redis在处理完数据以后并不是直接回复,而是在下一次前epoll前处理,可是redis是单线程的,这么做的目的是什么呢?是为了照顾其他客户端吗?
上一篇: 用C++写了个协程库,欢迎star
下一篇: redis源码分析——9、AOF持久化