如何理解Redis(5.0及以前)的单线程?

2024-02-06

Redis真的是单线程吗?

在Redis 5.0 及以前版本,经常被认为是单线程。具体是什么样的呢?笔者这里从reactor模型和redis源码解读一下Redis的线程模型,以及确认redis启动(默认配置)后的线程数。

本文讨论的版本是5.0及以前。

一. Reactor模型

首先我们要学习一下Reactor模型。netty、tomcat、redis等都使用了Reactor模型。对于Java,主要要用到nio包。 下面简单介绍一下Reactor模型,主要组件是:

  1. Acceptor:监听连接,监听客户端的connect操作,负责处理Accept事件。
  2. Reactor:核心上下文,使用Acceptor监听连接,并将连接的事件(READ、WRITE)分派给具体Handler。
  3. Handler:是具体处理事件的处理器。Handler为了提高性能,通常会创建多个,并且使用线程池等。

通常,Acceptor只有一个,像Netty的MainReactor。 Reactor可以有多个,像Netty的MainReactor是一个线程,只负责连接事件;而SubReactor是两个或者多个,处理读写事件。Handler,通常是多个线程并发处理具体的业务逻辑。

使用一个Reactor,一般成为”单Reactor”;使用多个Reactor,通常成为”主从Reactor”。研究Handler的数量和线程数量时,使用一个Handler线称为“单线程”;多个则称为“多线程”。

一般为了最大提升网络IO的性能,会使用主Reactor(连接处理)+从Reactor(一个或多个,事件分派)+Handler(多个或者线程池,具体对事件的业务处理)。

但是Redis使用了最简单的单reactor+单线程的模型。具体如下图:

single_thead_version

更多Reactor模型内容请参考Doug Lea的这篇PPT (《Scalable IO in Java》)(https://gee.cs.oswego.edu/dl/cpjslides/nio.pdf)。

为什么选择这么简单这么原始的模型?我的理解是以下的原因:

  1. 实现简单,编程简单。
  2. 避免线程切换开销,不需要使用锁定或者额外同步操作。
  3. Handler处理极快,我认为这个是主要原因。一般应用是认为Handler需要额外耗时,为了不让Reactor等待,尽量将Handler放入线程池处理。 而Redis直接操作内存,正常普通的存取操作极快(除了部分批量和大key操作),几乎不存在使得Reactor长等待的情况,所以考虑线程池或者多线程的同步开销还不如单线程来的快。

Redis是CPU计算是及其简单的,就是简单的内存存取,绝不是CPU密集的。同时Redis可以处理上万的网络IO请求,是称得上是“IO密集”。因为Redis不操作磁盘也几乎不发远程网络请求,其Handler几乎不存在IO等待的问题。这也就是官方说的的,Redis的瓶颈不在于CPU,而在于内存和网络带宽。

Redis以其简单的编程模型:Acceptor、Reactor、Handler就是一个线程里面(main线程),实现了高并发的缓存服务器。

下面简单解读一下Redis的线程模型的代码。

二. 源码解读

下载官方5.1.4源码,进入main方法。 server.c里面的main:

int main(int argc, char **argv) {
    struct timeval tv;
    int j;
    //......
    }

这里主要是创建、配置和初始化服务器。 其中initServerConfig初始化Server的相关配置。

void initServerConfig(void) {
    int j;

    pthread_mutex_init(&server.next_client_id_mutex,NULL);
    pthread_mutex_init(&server.lruclock_mutex,NULL);
    pthread_mutex_init(&server.unixtime_mutex,NULL);

    updateCachedTime(1);
    getRandomHexChars(server.runid,CONFIG_RUN_ID_SIZE);
    server.runid[CONFIG_RUN_ID_SIZE] = '\0';
    changeReplicationId();
    clearReplicationId2();
    server.timezone = getTimeZone(); /* Initialized by tzset(). */
    server.configfile = NULL;
    server.executable = NULL;
    server.hz = server.config_hz = CONFIG_DEFAULT_HZ;
    server.dynamic_hz = CONFIG_DEFAULT_DYNAMIC_HZ;
    server.arch_bits = (sizeof(long) == 8) ? 64 : 32;
    server.port = CONFIG_DEFAULT_SERVER_PORT;
    server.tcp_backlog = CONFIG_DEFAULT_TCP_BACKLOG;
    server.bindaddr_count = 0;
    server.unixsocket = NULL;
    server.unixsocketperm = CONFIG_DEFAULT_UNIX_SOCKET_PERM;
    server.ipfd_count = 0;
    //......
    }

我们再来到这个 InitServerLast()方法。

void InitServerLast() {
    bioInit();
    server.initial_memory_usage = zmalloc_used_memory();
}

这里创建了BIO的后台线程。

void bioInit(void) {
    pthread_attr_t attr;
    pthread_t thread;
    size_t stacksize;
    int j;

    /* Initialization of state vars and objects */
    for (j = 0; j < BIO_NUM_OPS; j++) {
        pthread_mutex_init(&bio_mutex[j],NULL);
        pthread_cond_init(&bio_newjob_cond[j],NULL);
        pthread_cond_init(&bio_step_cond[j],NULL);
        bio_jobs[j] = listCreate();
        bio_pending[j] = 0;
    }

    /* Set the stack size as by default it may be small in some system */
    pthread_attr_init(&attr);
    pthread_attr_getstacksize(&attr,&stacksize);
    if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */
    while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
    pthread_attr_setstacksize(&attr, stacksize);

    /* Ready to spawn our threads. We use the single argument the thread
     * function accepts in order to pass the job ID the thread is
     * responsible of. */
    for (j = 0; j < BIO_NUM_OPS; j++) {
        void *arg = (void*)(unsigned long) j;
        if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
            serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");
            exit(1);
        }
        bio_threads[j] = thread;
    }
}

注意看这行代码” if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0)” ,这里创建了BIO_NUM_OPS的数量的Background后台处理线程。 然后这个数量此版本是3个:

/* Background job opcodes */
#define BIO_CLOSE_FILE    0 /* Deferred close(2) syscall. */
#define BIO_AOF_FSYNC     1 /* Deferred AOF fsync. */
#define BIO_LAZY_FREE     2 /* Deferred objects freeing. */
#define BIO_NUM_OPS       3

分别是关闭文件、AOF线程、惰性删除。因此有3个后台线程。

再回到server.c的main线程。

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
    }
}

这里就是Reactor的线程。里面会使用epoll的IO多路复用器获得事件。

  /* Call the multiplexing API, will return only on timeout or when
         * some event fires. */
        numevents = aeApiPoll(eventLoop, tvp);
        
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;

    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
    if (retval > 0) {
        int j;

        numevents = retval;
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events+j;

            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }
    return numevents;
}

然后事件交给具体的Handler去处理。如下:


for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int fired = 0; /* Number of events fired for current fd. */

            /* Normally we execute the readable event first, and the writable
             * event laster. This is useful as sometimes we may be able
             * to serve the reply of a query immediately after processing the
             * query.
             *
             * However if AE_BARRIER is set in the mask, our application is
             * asking us to do the reverse: never fire the writable event
             * after the readable. In such a case, we invert the calls.
             * This is useful when, for instance, we want to do things
             * in the beforeSleep() hook, like fsynching a file to disk,
             * before replying to a client. */
            int invert = fe->mask & AE_BARRIER;

            /* Note the "fe->mask & mask & ..." code: maybe an already
             * processed event removed an element that fired and we still
             * didn't processed, so we check if the event is still valid.
             *
             * Fire the readable event if the call sequence is not
             * inverted. */
            if (!invert && fe->mask & mask & AE_READABLE) {
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
            }

            /* Fire the writable event. */
            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            /* If we have to invert the call, fire the readable event now
             * after the writable one. */
            if (invert && fe->mask & mask & AE_READABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            processed++;
        }

这些处理都是在aeMain这个方法里面的循环处理的,也就是执行线程的是启动的main线程。

到这里,我们知道了Redis 5.0的线程数量应该是4个:主线程main 和后台线程CLOSE_FILE、AOF_SYNC、LAZY_FREE,下面简单验证一下。

三. 简单测试

从redis 5.0的官方源码编译得到目标文件,进入./redis-5.0.14/src,执行redis-server启动服务器。再启动redis-cli,进入服务器。 做下简单测试如下:

127.0.0.1:6379> get 'mykey'
(nil)
127.0.0.1:6379> set 'mykey' 'v1'
OK
127.0.0.1:6379> set 'mykey2' 'v2'
OK
127.0.0.1:6379> get 'mykey' 
"v1"
127.0.0.1:6379> 

再使用ps查看redis的线程数量,结果如下:

zouhl@zouhl-mint:~/dev/ide/idea2023/bin$ ps -ef|grep  redis
zouhl       5533    1722  0 21:55 ?        00:00:00 ./redis-server *:6379
zouhl       5571    5541  0 21:57 pts/3    00:00:00 ./redis-cli
zouhl       5665    2367  0 21:59 pts/0    00:00:00 grep --color=auto redis
zouhl@zouhl-mint:~/dev/ide/idea2023/bin$ ps -T  -p 5533
    PID    SPID TTY          TIME CMD
   5533    5533 ?        00:00:00 redis-server
   5533    5534 ?        00:00:00 redis-server
   5533    5535 ?        00:00:00 redis-server
   5533    5536 ?        00:00:00 redis-server
zouhl@zouhl-mint:~/dev/ide/idea2023/bin$ 

可以看到默认启动的redis-sever是包含4个线程的。

所以所谓的redis单线程指的是:main线程负责使用IO多路复用器获取事件,然后事件分派给具体的Handler处理器,以及Handler处理器处理具体事件,这三个过程都是main线程处理的。所以redis是单线程,但是仍然有background后台线程。

四. 参考资料

  1. Redis源码(5.1.4)
  2. 《Scalable IO in Java》[https://gee.cs.oswego.edu/dl/cpjslides/nio.pdf]