您现在的位置是: 网站首页 > 程序设计  > redis 

redis源码之辅助线程

2020年8月7日 05:56 1697人围观

简介众所周知,redis是单线程的,但是这里单线程只存在主逻辑中,在后台还有几个常驻线程

    redis创建了3个后台常驻线程,分别是关闭文件、aof异步刷新、惰性释放,在bio.c文件中。 首先用gstack看一下redis运行时有哪些线程(pstree -p pid或者/proc/pid/task/也可查看)。

    可以看到,运行的redis中总共有4个线程,其中一个是主线程,其余三个是start_thread出来的线程,而且三个线程分别传入了0、1、2三个参数。

    /* Initialize the background system, spawning the thread. */ 
    void bioInit(void) { 
        pthread_attr_t attr; 
        pthread_t thread; 
        size_t stacksize; 
        int j; 
    
        /* Initialization of state vars and objects */ 
        for (j = 0; j < BIO_NUM_OPS; j++) { 
            pthread_mutex_init(&bio_mutex[j],NULL); 
            pthread_cond_init(&bio_newjob_cond[j],NULL); 
            pthread_cond_init(&bio_step_cond[j],NULL); 
            bio_jobs[j] = listCreate(); 
            bio_pending[j] = 0; 
        } 
    
        /* Set the stack size as by default it may be small in some system */ 
        pthread_attr_init(&attr); 
        pthread_attr_getstacksize(&attr,&stacksize); 
        if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */ 
        while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2; 
        pthread_attr_setstacksize(&attr, stacksize); 
    
        /* Ready to spawn our threads. We use the single argument the thread 
         * function accepts in order to pass the job ID the thread is 
         * responsible of. */ 
        for (j = 0; j < BIO_NUM_OPS; j++) { 
            void *arg = (void*)(unsigned long) j; 
            if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) { 
                serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs."); 
                exit(1); 
            } 
            bio_threads[j] = thread; 
        } 
    } 
    

    线程执行函数bioProcessBackgroundJobs如下:

    void *bioProcessBackgroundJobs(void *arg) { 
        struct bio_job *job; 
        unsigned long type = (unsigned long) arg; 
        sigset_t sigset; 
    
        /* Check that the type is within the right interval. */ 
        if (type >= BIO_NUM_OPS) { 
            serverLog(LL_WARNING, 
                "Warning: bio thread started with wrong type %lu",type); 
            return NULL; 
        } 
    
        switch (type) { 
        case BIO_CLOSE_FILE: 
            redis_set_thread_title("bio_close_file"); 
            break; 
        case BIO_AOF_FSYNC: 
            redis_set_thread_title("bio_aof_fsync"); 
            break; 
        case BIO_LAZY_FREE: 
            redis_set_thread_title("bio_lazy_free"); 
            break; 
        } 
    
        redisSetCpuAffinity(server.bio_cpulist); 
    
        /* Make the thread killable at any time, so that bioKillThreads() 
         * can work reliably. */ 
        pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL); 
        pthread_setcanceltype(PTHREAD_CANCEL_ASYNCHRONOUS, NULL); 
    
        pthread_mutex_lock(&bio_mutex[type]); 
        /* Block SIGALRM so we are sure that only the main thread will 
         * receive the watchdog signal. */ 
        sigemptyset(&sigset); 
        sigaddset(&sigset, SIGALRM); 
        if (pthread_sigmask(SIG_BLOCK, &sigset, NULL)) 
            serverLog(LL_WARNING, 
                "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno)); 
    
        while(1) { 
            listNode *ln; 
    
            /* The loop always starts with the lock hold. */ 
            if (listLength(bio_jobs[type]) == 0) { 
                pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]); 
                continue; 
            } 
            /* Pop the job from the queue. */ 
            ln = listFirst(bio_jobs[type]); 
            job = ln->value; 
            /* It is now possible to unlock the background system as we know have 
             * a stand alone job structure to process.*/ 
            pthread_mutex_unlock(&bio_mutex[type]); 
    
            /* Process the job accordingly to its type. */ 
            if (type == BIO_CLOSE_FILE) { 
                close((long)job->arg1); 
            } else if (type == BIO_AOF_FSYNC) { 
                redis_fsync((long)job->arg1); 
            } else if (type == BIO_LAZY_FREE) { 
                /* What we free changes depending on what arguments are set: 
                 * arg1 -> free the object at pointer. 
                 * arg2 & arg3 -> free two dictionaries (a Redis DB). 
                 * only arg3 -> free the skiplist. */ 
                if (job->arg1) 
                    lazyfreeFreeObjectFromBioThread(job->arg1); 
                else if (job->arg2 && job->arg3) 
                    lazyfreeFreeDatabaseFromBioThread(job->arg2,job->arg3); 
                else if (job->arg3) 
                    lazyfreeFreeSlotsMapFromBioThread(job->arg3); 
            } else { 
                serverPanic("Wrong job type in bioProcessBackgroundJobs()."); 
            } 
            zfree(job); 
    
            /* Lock again before reiterating the loop, if there are no longer 
             * jobs to process we'll block again in pthread_cond_wait(). */ 
            pthread_mutex_lock(&bio_mutex[type]); 
            listDelNode(bio_jobs[type],ln); 
            bio_pending[type]--; 
    
            /* Unblock threads blocked on bioWaitStepOfType() if any. */ 
            pthread_cond_broadcast(&bio_step_cond[type]); 
        } 
    } 
    

    可以看到, 线程处理函数是一个死循环,那么这些辅助线程怎么与主线程通信呢?是队列!

    /* The loop always starts with the lock hold. */ 
    if (listLength(bio_jobs[type]) == 0) { 
        pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]); 
        continue; 
    } 
    

    可以看到,线程一直在尝试从队列中获取数据。同样,redis也提供了向队列添加数据的接口。

    void bioCreateBackgroundJob(int type, void *arg1, void *arg2, void *arg3) { 
        struct bio_job *job = zmalloc(sizeof(*job)); 
    
        job->time = time(NULL); 
        job->arg1 = arg1; 
        job->arg2 = arg2; 
        job->arg3 = arg3; 
        pthread_mutex_lock(&bio_mutex[type]); 
        listAddNodeTail(bio_jobs[type],job); 
        bio_pending[type]++; 
        pthread_cond_signal(&bio_newjob_cond[type]); 
        pthread_mutex_unlock(&bio_mutex[type]); 
    } 
    

    通过这个接口就可以把消息添加到队列里面了,主线程中通过bioCreateBackgroundJob接口就可以添加元素了,比如在惰性删除的时可以用如下代码:

    if (free_effort > LAZYFREE_THRESHOLD && val->refcount == 1) { 
        atomicIncr(lazyfree_objects,1); 
        bioCreateBackgroundJob(BIO_LAZY_FREE,val,NULL,NULL); 
        dictSetVal(db->dict,de,NULL); 
    }