您现在的位置是: 网站首页 > 程序设计  > redis 

redis源码之线程模型

2020年8月8日 04:26 1358人围观

简介redis是单线程的,也是因为单线程,我们可以用redis来实现分布式锁。但是Redis6引入了多线程,那么Redis6的线程模型是怎么样的呢?

    虽然Redis6引入了多线程,但是命令处理依旧是单线程,所以说多线程是不可能多线程的,这辈子都不可能多线程的。 Redis6的多线程仅仅存在于网络IO中,更确切来说主要是网络写操作,线程模型封装在networking.c文件中。

    #io-threads 1 
    # 
    # Setting io-threads to 1 will just use the main thread as usually. 
    # When I/O threads are enabled, we only use threads for writes, that is 
    # to thread the write(2) syscall and transfer the client buffers to the 
    # socket. However it is also possible to enable threading of reads and 
    # protocol parsing using the following configuration directive, by setting 
    # it to yes: 
    # 
    # io-threads-do-reads no 
    # 
    # Usually threading reads doesn't help much. 
    # 
    # NOTE 1: This configuration directive cannot be changed at runtime via 
    # CONFIG SET. Aso this feature currently does not work when SSL is 
    # enabled. 
    

    配置文件中用io-threads来设置线程数,而且还只对write生效,如果想把read开启多线程,那还需要把io-threads-do-reads设成yes。不过最后又来了一句通常来说,把read设为多线程作用不大

    所有Redis6的线程模型如下,主要是把网络IO多线程化,但是命令处理依旧是单线程。

    main函数中调用initThreadedIO创建线程

    /* Initialize the data structures needed for threaded I/O. */ 
    void initThreadedIO(void) { 
        io_threads_active = 0; /* We start with threads not active. */ 
    
        /* Don't spawn any thread if the user selected a single thread: 
         * we'll handle I/O directly from the main thread. */ 
        if (server.io_threads_num == 1) return; 
    
        if (server.io_threads_num > IO_THREADS_MAX_NUM) { 
            serverLog(LL_WARNING,"Fatal: too many I/O threads configured. " 
                                 "The maximum number is %d.", IO_THREADS_MAX_NUM); 
            exit(1); 
        } 
    
        /* Spawn and initialize the I/O threads. */ 
        for (int i = 0; i < server.io_threads_num; i++) { 
            /* Things we do for all the threads including the main thread. */ 
            io_threads_list[i] = listCreate(); 
            if (i == 0) continue; /* Thread 0 is the main thread. */ 
    
            /* Things we do only for the additional threads. */ 
            pthread_t tid; 
            pthread_mutex_init(&io_threads_mutex[i],NULL); 
            io_threads_pending[i] = 0; 
            pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */ 
            if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) { 
                serverLog(LL_WARNING,"Fatal: Can't initialize IO thread."); 
                exit(1); 
            } 
            io_threads[i] = tid; 
        } 
    } 
    

    说明几点:

    • 当不开启多线程,也就是io-threads 为1时,逻辑和旧版一样,不会创建线程了。
    • 创建线程是时候对于#0线程,也就是主线程,直接跳过了后面的初始化
    • 对于每一个线程,都创建了一把锁,而且锁状态是lock状态

    可以看到initThreadedIO中创建了子线程IOThreadMain, IOThreadMain函数原型如下:

    void *IOThreadMain(void *myid) { 
        /* The ID is the thread number (from 0 to server.iothreads_num-1), and is 
         * used by the thread to just manipulate a single sub-array of clients. */ 
        long id = (unsigned long)myid; 
        char thdname[16]; 
    
        snprintf(thdname, sizeof(thdname), "io_thd_%ld", id); 
        redis_set_thread_title(thdname); 
        redisSetCpuAffinity(server.server_cpulist); 
    
        while(1) { 
            /* Wait for start */ 
            for (int j = 0; j < 1000000; j++) { 
                if (io_threads_pending[id] != 0) break; 
            } 
    
            /* Give the main thread a chance to stop this thread. */ 
            if (io_threads_pending[id] == 0) { 
                pthread_mutex_lock(&io_threads_mutex[id]); 
                pthread_mutex_unlock(&io_threads_mutex[id]); 
                continue; 
            } 
    
            serverAssert(io_threads_pending[id] != 0); 
    
            if (tio_debug) printf("[%ld] %d to handle\n", id, (int)listLength(io_threads_list[id])); 
    
            /* Process: note that the main thread will never touch our list 
             * before we drop the pending count to 0. */ 
            listIter li; 
            listNode *ln; 
            listRewind(io_threads_list[id],&li); 
            while((ln = listNext(&li))) { 
                client *c = listNodeValue(ln); 
                if (io_threads_op == IO_THREADS_OP_WRITE) { 
                    writeToClient(c,0); 
                } else if (io_threads_op == IO_THREADS_OP_READ) { 
                    readQueryFromClient(c->conn); 
                } else { 
                    serverPanic("io_threads_op value is unknown"); 
                } 
            } 
            listEmpty(io_threads_list[id]); 
            io_threads_pending[id] = 0; 
    
            if (tio_debug) printf("[%ld] Done\n", id); 
        } 
    } 
    

    其中io_threads_mutex, io_threads_pending都是数组,线程之间独立,不共享。 IOThreadMain算是真正的线程处理函数,在一个大的while(1)循环里面,作者用应该for循环代替了sleep,io_threads_pending[id]表示该线程待处理的客户端数,这个值在handleClientsWithPendingReadsUsingThreadshandleClientsWithPendingWritesUsingThreads函数中设置。

    我们把IOThreadMain函数看成消费者,那么handleClientsWithPendingReadsUsingThreadshandleClientsWithPendingWritesUsingThreads相当于是生产者,

    另外提供了startThreadedIOstopThreadedIO两个接口,用来开启和结束线程,其实就是对上面创建的锁进行lock或者unlock。

    接下来再看一下两个比较核心的函数,用户真正读写IO,分别是handleClientsWithPendingWritesUsingThreadshandleClientsWithPendingReadsUsingThreads