redis源码之线程模型
虽然Redis6引入了多线程,但是命令处理依旧是单线程,所以说多线程是不可能多线程的,这辈子都不可能多线程的。
Redis6的多线程仅仅存在于网络IO中,更确切来说主要是网络写操作,线程模型封装在networking.c
文件中。
#io-threads 1
#
# Setting io-threads to 1 will just use the main thread as usually.
# When I/O threads are enabled, we only use threads for writes, that is
# to thread the write(2) syscall and transfer the client buffers to the
# socket. However it is also possible to enable threading of reads and
# protocol parsing using the following configuration directive, by setting
# it to yes:
#
# io-threads-do-reads no
#
# Usually threading reads doesn't help much.
#
# NOTE 1: This configuration directive cannot be changed at runtime via
# CONFIG SET. Aso this feature currently does not work when SSL is
# enabled.
配置文件中用io-threads
来设置线程数,而且还只对write生效,如果想把read开启多线程,那还需要把io-threads-do-reads
设成yes
。不过最后又来了一句通常来说,把read设为多线程作用不大。
所有Redis6的线程模型如下,主要是把网络IO多线程化,但是命令处理依旧是单线程。
在main
函数中调用initThreadedIO
创建线程
/* Initialize the data structures needed for threaded I/O. */
void initThreadedIO(void) {
io_threads_active = 0; /* We start with threads not active. */
/* Don't spawn any thread if the user selected a single thread:
* we'll handle I/O directly from the main thread. */
if (server.io_threads_num == 1) return;
if (server.io_threads_num > IO_THREADS_MAX_NUM) {
serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
"The maximum number is %d.", IO_THREADS_MAX_NUM);
exit(1);
}
/* Spawn and initialize the I/O threads. */
for (int i = 0; i < server.io_threads_num; i++) {
/* Things we do for all the threads including the main thread. */
io_threads_list[i] = listCreate();
if (i == 0) continue; /* Thread 0 is the main thread. */
/* Things we do only for the additional threads. */
pthread_t tid;
pthread_mutex_init(&io_threads_mutex[i],NULL);
io_threads_pending[i] = 0;
pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
exit(1);
}
io_threads[i] = tid;
}
}
说明几点:
- 当不开启多线程,也就是
io-threads
为1时,逻辑和旧版一样,不会创建线程了。 - 创建线程是时候对于#0线程,也就是主线程,直接跳过了后面的初始化
- 对于每一个线程,都创建了一把锁,而且锁状态是lock状态
可以看到initThreadedIO
中创建了子线程IOThreadMain
, IOThreadMain
函数原型如下:
void *IOThreadMain(void *myid) {
/* The ID is the thread number (from 0 to server.iothreads_num-1), and is
* used by the thread to just manipulate a single sub-array of clients. */
long id = (unsigned long)myid;
char thdname[16];
snprintf(thdname, sizeof(thdname), "io_thd_%ld", id);
redis_set_thread_title(thdname);
redisSetCpuAffinity(server.server_cpulist);
while(1) {
/* Wait for start */
for (int j = 0; j < 1000000; j++) {
if (io_threads_pending[id] != 0) break;
}
/* Give the main thread a chance to stop this thread. */
if (io_threads_pending[id] == 0) {
pthread_mutex_lock(&io_threads_mutex[id]);
pthread_mutex_unlock(&io_threads_mutex[id]);
continue;
}
serverAssert(io_threads_pending[id] != 0);
if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id]));
/* Process: note that the main thread will never touch our list
* before we drop the pending count to 0. */
listIter li;
listNode *ln;
listRewind(io_threads_list[id],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
if (io_threads_op == IO_THREADS_OP_WRITE) {
writeToClient(c,0);
} else if (io_threads_op == IO_THREADS_OP_READ) {
readQueryFromClient(c->conn);
} else {
serverPanic("io_threads_op value is unknown");
}
}
listEmpty(io_threads_list[id]);
io_threads_pending[id] = 0;
if (tio_debug) printf("[%ld] Done\n", id);
}
}
其中io_threads_mutex
, io_threads_pending
都是数组,线程之间独立,不共享。
IOThreadMain
算是真正的线程处理函数,在一个大的while(1)
循环里面,作者用应该for循环代替了sleep,io_threads_pending[id]
表示该线程待处理的客户端数,这个值在handleClientsWithPendingReadsUsingThreads
和handleClientsWithPendingWritesUsingThreads
函数中设置。
我们把IOThreadMain
函数看成消费者,那么handleClientsWithPendingReadsUsingThreads
和handleClientsWithPendingWritesUsingThreads
相当于是生产者,
另外提供了startThreadedIO
和stopThreadedIO
两个接口,用来开启和结束线程,其实就是对上面创建的锁进行lock或者unlock。
接下来再看一下两个比较核心的函数,用户真正读写IO,分别是handleClientsWithPendingWritesUsingThreads
和handleClientsWithPendingReadsUsingThreads
上一篇: 关于高可用服务的优化
下一篇: redis源码之辅助线程