您现在的位置是:
网站首页
> 程序设计 
> redis 
redis源码分析——1、网络框架
简介我们知道redis用的epoll,但是底层的代码到底是怎样一步步起来的,本文解读redis的网络框架,一探究竟。
一、 核心数据结构
- ConnectionType,
ConnectionType
定义了网络连接的接口,包含read
、write
等,具体定义如下。
typedef struct ConnectionType {
void (*ae_handler)(struct aeEventLoop *el, int fd, void *clientData, int mask);
int (*connect)(struct connection *conn, const char *addr, int port, const char *source_addr, ConnectionCallbackFunc connect_handler);
int (*write)(struct connection *conn, const void *data, size_t data_len);
int (*read)(struct connection *conn, void *buf, size_t buf_len);
void (*close)(struct connection *conn);
int (*accept)(struct connection *conn, ConnectionCallbackFunc accept_handler);
int (*set_write_handler)(struct connection *conn, ConnectionCallbackFunc handler, int barrier);
int (*set_read_handler)(struct connection *conn, ConnectionCallbackFunc handler);
const char *(*get_last_error)(struct connection *conn);
int (*blocking_connect)(struct connection *conn, const char *addr, int port, long long timeout);
ssize_t (*sync_write)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
ssize_t (*sync_read)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
ssize_t (*sync_readline)(struct connection *conn, char *ptr, ssize_t size, long long timeout);
} ConnectionType;
在源码中实现了两种connection,分别是CT_Socket
和CT_TLS
,这两种也是redis支持的连接类型,redis也支持本地unix socket,但这归根也是socket,其读写和网络socket是一样的。这两种的实现分别如下:
CT_Socket:
ConnectionType CT_Socket = {
.ae_handler = connSocketEventHandler,
.close = connSocketClose,
.write = connSocketWrite,
.read = connSocketRead,
.accept = connSocketAccept,
.connect = connSocketConnect,
.set_write_handler = connSocketSetWriteHandler,
.set_read_handler = connSocketSetReadHandler,
.get_last_error = connSocketGetLastError,
.blocking_connect = connSocketBlockingConnect,
.sync_write = connSocketSyncWrite,
.sync_read = connSocketSyncRead,
.sync_readline = connSocketSyncReadLine
};
CT_TLS:
ConnectionType CT_TLS = {
.ae_handler = tlsEventHandler,
.accept = connTLSAccept,
.connect = connTLSConnect,
.blocking_connect = connTLSBlockingConnect,
.read = connTLSRead,
.write = connTLSWrite,
.close = connTLSClose,
.set_write_handler = connTLSSetWriteHandler,
.set_read_handler = connTLSSetReadHandler,
.get_last_error = connTLSGetLastError,
.sync_write = connTLSSyncWrite,
.sync_read = connTLSSyncRead,
.sync_readline = connTLSSyncReadLine,
}
这个结构是核心,后续的各种网络操作,都是通过ConnectionType
中的指针调用的。
- struct connection:该结构的一个完成的连接,客户端的fd封装成一个
connection
。该结构与ConnectionType
配合使用,可以认为ConnectionType
是操作接口(所有函数的第一个参数都是connection
),而struct connection
是操作对象,set_r_w_hander
都是设置struct connection
中的handler
。
struct connection {
ConnectionType *type;
ConnectionState state;
short int flags;
short int refs;
int last_errno;
void *private_data;
ConnectionCallbackFunc conn_handler;
ConnectionCallbackFunc write_handler;
ConnectionCallbackFunc read_handler;
int fd;
};
- aeEventLoop,redis是单线程非阻塞io,网络的结构封装在
aeEventLoop
结构中
/* State of an event based program */
typedef struct aeEventLoop {
int maxfd; /* highest file descriptor currently registered */
int setsize; /* max number of file descriptors tracked */
long long timeEventNextId;
time_t lastTime; /* Used to detect system clock skew */
aeFileEvent *events; /* Registered events */
aeFiredEvent *fired; /* Fired events */
aeTimeEvent *timeEventHead;
int stop;
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
aeBeforeSleepProc *aftersleep;
int flags;
} aeEventLoop;
最核心的两个数据就是events
和fired
,分别代表了在监听的fd有事件触发的fd。
aeFileEvent:
/* File event structure */
typedef struct aeFileEvent {
int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
} aeFileEvent;
其中rfileProc
和wfileProc
分别是该事件对应的读写回调。
aeFiredEvent:
/* A fired event */
typedef struct aeFiredEvent {
int fd;
int mask;
} aeFiredEvent;
aeFiredEvent
的定义很简单,就一个fd,再加一个该fd是否可读写。
可以看到,在aeEventLoop
中定义的events
和fired
都是指针,指向了外面分配的一个数组,其大小是server.maxclients+**CONFIG_FDSET_INCR**
二、网络框架
- 等待连接
main()
initServer()
server.el = aeCreateEventLoop(server.maxclients+CONFIG_FDSET_INCR)
listenToPort(server.port,server.ipfd,&server.ipfd_count)
aeCreateFileEvent(server.el, server.ipfd[j], AE_READABLE, acceptTcpHandler,NULL)
// 设置rfileProc和wfileProc都为 acceptTcpHandler
InitServerLast()
initThreadedIO()
pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i)
aeMain(server.el)
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
fe->rfileProc(eventLoop,fd,fe->clientData,mask); // 如果触发的监听的fd,则走acceptTcpHandler回调
acceptTcpHandler()
anetTcpAccept()
connCreateAcceptedSocket()
connCreateSocket()
conn->type = &CT_Socket; // 最终创建了ConnectionType为CT_Socket的一个连接
acceptCommonHandler()
createClient() // 创建一个真正的client
connSetReadHandler(conn, readQueryFromClient); // 个客户端fd设置 readQueryFromClient 回调,epoll触发时调用
set_read_handler->connSocketSetReadHandler(conn, func)
aeCreateFileEvent(server.el,conn->fd, AE_READABLE,conn->type->ae_handler,conn) // 注册fd
linkClient()
raxInsert(server.clients_index,(unsigned char*)&id,sizeof(id),c,NULL);
connAccept(conn, clientAcceptHandler)
connSocketAccept
callHandler(conn, accept_handler)
accept->connSocketAccept
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
- 读请求
aeMain(server.el)
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
fe->rfileProc(eventLoop,fd,fe->clientData,mask); // 如果触发的是客户端fd,则走 readQueryFromClient 回调
readQueryFromClient()
三、主要函数
- aeCreateFileEvent: 该函数将redis监听的fd都加到epoll队列中,并且对fd绑定了回调函数
acceptTcpHandler
用于接收客户端的连接请求。
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
aeFileEvent *fe = &eventLoop->events[fd];
if (aeApiAddEvent(eventLoop, fd, mask) == -1) // 将监听的fd加到epoll
return AE_ERR;
fe->mask |= mask;
if (mask & AE_READABLE) fe->rfileProc = proc; // 读写回调都是 acceptTcpHandler
if (mask & AE_WRITABLE) fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
- acceptTcpHandler: 用于接收客户端连接请求,其中
anetTcpAccept
内部是系统调用accept
接口。该函数内部的acceptCommonHandler
是重要操作,通过connCreateAcceptedSocket
将客户端的fd转换成一个connection
,在connCreateAcceptedSocket
内部通过connCreateSocket
创建一个connection
,其类型为CT_Socket
。
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
char cip[NET_IP_STR_LEN];
UNUSED(el);
UNUSED(mask);
UNUSED(privdata);
while(max--) {
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport); // 内部调用accept,返回成功fd
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
serverLog(LL_WARNING,
"Accepting client connection: %s", server.neterr);
return;
}
serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
}
}
- acceptCommonHandler:根据
connection
创建一个client,并将该客户端链接到server.clients
。在新建client的时候有个重要操作connSetReadHandler
,这个操作绑定了客户端的读取实现。
static void acceptCommonHandler(connection *conn, int flags, char *ip) {
client *c;
UNUSED(ip);
/* Admission control will happen before a client is created and connAccept()
* called, because we don't want to even start transport-level negotiation
* if rejected.
*/
if (listLength(server.clients) >= server.maxclients) {
char *err = "-ERR max number of clients reached\r\n";
/* That's a best effort error message, don't check write errors.
* Note that for TLS connections, no handshake was done yet so nothing is written
* and the connection will just drop.
*/
if (connWrite(conn,err,strlen(err)) == -1) {
/* Nothing to do, Just to avoid the warning... */
}
server.stat_rejected_conn++;
connClose(conn);
return;
}
/* Create connection and client */
if ((c = createClient(conn)) == NULL) { // 这里创建了客户端,并且画挂载到server.clients列表
char conninfo[100];
serverLog(LL_WARNING,
"Error registering fd event for the new client: %s (conn: %s)",
connGetLastError(conn),
connGetInfo(conn, conninfo, sizeof(conninfo)));
connClose(conn); /* May be already closed, just ignore errors */
return;
}
/* Last chance to keep flags */
c->flags |= flags;
/* Initiate accept.
*
* Note that connAccept() is free to do two things here:
* 1. Call clientAcceptHandler() immediately;
* 2. Schedule a future call to clientAcceptHandler().
*
* Because of that, we must do nothing else afterwards.
*/
if (connAccept(conn, clientAcceptHandler) == C_ERR) {
char conninfo[100];
if (connGetState(conn) == CONN_STATE_ERROR)
serverLog(LL_WARNING,
"Error accepting a client connection: %s (conn: %s)",
connGetLastError(conn), connGetInfo(conn, conninfo, sizeof(conninfo)));
freeClient(connGetPrivateData(conn));
return;
}
}
- createClient:最主要的是
connSetReadHandler
,其实是调用CT_Socket
的set_read_handler
接口,也就是connSocketSetReadHandler
函数,读取请求就是readQueryFromClient
函数。
client *createClient(connection *conn) {
client *c = zmalloc(sizeof(client));
/* passing NULL as conn it is possible to create a non connected client.
* This is useful since all the commands needs to be executed
* in the context of a client. When commands are executed in other
* contexts (for instance a Lua script) we need a non connected client. */
if (conn) {
connNonBlock(conn);
connEnableTcpNoDelay(conn);
if (server.tcpkeepalive)
connKeepAlive(conn,server.tcpkeepalive);
connSetReadHandler(conn, readQueryFromClient);
connSetPrivateData(conn, c);
}
////////////省略代码/////////////
- connSocketSetReadHandler:该函数的目的是把客户端的
connection
注册到epoll队列中,等待读写触发,从而实现了非阻塞。
/* Register a read handler, to be called when the connection is readable.
* If NULL, the existing handler is removed.
*/
static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
if (func == conn->read_handler) return C_OK;
conn->read_handler = func;
if (!conn->read_handler)
aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);
else
if (aeCreateFileEvent(server.el,conn->fd,
AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
return C_OK;
}
上一篇: 用C++写了个协程库,欢迎star
下一篇: redis源码分析——9、AOF持久化