Redis真的是单线程吗?
在Redis 5.0 及以前版本,经常被认为是单线程。具体是什么样的呢?笔者这里从reactor模型和redis源码解读一下Redis的线程模型,以及确认redis启动(默认配置)后的线程数。
本文讨论的版本是5.0及以前。
一. Reactor模型
首先我们要学习一下Reactor模型。netty、tomcat、redis等都使用了Reactor模型。对于Java,主要要用到nio包。 下面简单介绍一下Reactor模型,主要组件是:
- Acceptor:监听连接,监听客户端的connect操作,负责处理Accept事件。
- Reactor:核心上下文,使用Acceptor监听连接,并将连接的事件(READ、WRITE)分派给具体Handler。
- Handler:是具体处理事件的处理器。Handler为了提高性能,通常会创建多个,并且使用线程池等。
通常,Acceptor只有一个,像Netty的MainReactor。 Reactor可以有多个,像Netty的MainReactor是一个线程,只负责连接事件;而SubReactor是两个或者多个,处理读写事件。Handler,通常是多个线程并发处理具体的业务逻辑。
使用一个Reactor,一般成为”单Reactor”;使用多个Reactor,通常成为”主从Reactor”。研究Handler的数量和线程数量时,使用一个Handler线称为“单线程”;多个则称为“多线程”。
一般为了最大提升网络IO的性能,会使用主Reactor(连接处理)+从Reactor(一个或多个,事件分派)+Handler(多个或者线程池,具体对事件的业务处理)。
但是Redis使用了最简单的单reactor+单线程的模型。具体如下图:
更多Reactor模型内容请参考Doug Lea的这篇PPT (《Scalable IO in Java》)(https://gee.cs.oswego.edu/dl/cpjslides/nio.pdf)。
为什么选择这么简单这么原始的模型?我的理解是以下的原因:
- 实现简单,编程简单。
- 避免线程切换开销,不需要使用锁定或者额外同步操作。
- Handler处理极快,我认为这个是主要原因。一般应用是认为Handler需要额外耗时,为了不让Reactor等待,尽量将Handler放入线程池处理。 而Redis直接操作内存,正常普通的存取操作极快(除了部分批量和大key操作),几乎不存在使得Reactor长等待的情况,所以考虑线程池或者多线程的同步开销还不如单线程来的快。
Redis是CPU计算是及其简单的,就是简单的内存存取,绝不是CPU密集的。同时Redis可以处理上万的网络IO请求,是称得上是“IO密集”。因为Redis不操作磁盘也几乎不发远程网络请求,其Handler几乎不存在IO等待的问题。这也就是官方说的的,Redis的瓶颈不在于CPU,而在于内存和网络带宽。
Redis以其简单的编程模型:Acceptor、Reactor、Handler就是一个线程里面(main线程),实现了高并发的缓存服务器。
下面简单解读一下Redis的线程模型的代码。
二. 源码解读
下载官方5.1.4源码,进入main方法。 server.c里面的main:
int main(int argc, char **argv) {
struct timeval tv;
int j;
//......
}
这里主要是创建、配置和初始化服务器。 其中initServerConfig初始化Server的相关配置。
void initServerConfig(void) {
int j;
pthread_mutex_init(&server.next_client_id_mutex,NULL);
pthread_mutex_init(&server.lruclock_mutex,NULL);
pthread_mutex_init(&server.unixtime_mutex,NULL);
updateCachedTime(1);
getRandomHexChars(server.runid,CONFIG_RUN_ID_SIZE);
server.runid[CONFIG_RUN_ID_SIZE] = '\0';
changeReplicationId();
clearReplicationId2();
server.timezone = getTimeZone(); /* Initialized by tzset(). */
server.configfile = NULL;
server.executable = NULL;
server.hz = server.config_hz = CONFIG_DEFAULT_HZ;
server.dynamic_hz = CONFIG_DEFAULT_DYNAMIC_HZ;
server.arch_bits = (sizeof(long) == 8) ? 64 : 32;
server.port = CONFIG_DEFAULT_SERVER_PORT;
server.tcp_backlog = CONFIG_DEFAULT_TCP_BACKLOG;
server.bindaddr_count = 0;
server.unixsocket = NULL;
server.unixsocketperm = CONFIG_DEFAULT_UNIX_SOCKET_PERM;
server.ipfd_count = 0;
//......
}
我们再来到这个 InitServerLast()方法。
void InitServerLast() {
bioInit();
server.initial_memory_usage = zmalloc_used_memory();
}
这里创建了BIO的后台线程。
void bioInit(void) {
pthread_attr_t attr;
pthread_t thread;
size_t stacksize;
int j;
/* Initialization of state vars and objects */
for (j = 0; j < BIO_NUM_OPS; j++) {
pthread_mutex_init(&bio_mutex[j],NULL);
pthread_cond_init(&bio_newjob_cond[j],NULL);
pthread_cond_init(&bio_step_cond[j],NULL);
bio_jobs[j] = listCreate();
bio_pending[j] = 0;
}
/* Set the stack size as by default it may be small in some system */
pthread_attr_init(&attr);
pthread_attr_getstacksize(&attr,&stacksize);
if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */
while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
pthread_attr_setstacksize(&attr, stacksize);
/* Ready to spawn our threads. We use the single argument the thread
* function accepts in order to pass the job ID the thread is
* responsible of. */
for (j = 0; j < BIO_NUM_OPS; j++) {
void *arg = (void*)(unsigned long) j;
if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");
exit(1);
}
bio_threads[j] = thread;
}
}
注意看这行代码” if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0)” ,这里创建了BIO_NUM_OPS的数量的Background后台处理线程。 然后这个数量此版本是3个:
/* Background job opcodes */
#define BIO_CLOSE_FILE 0 /* Deferred close(2) syscall. */
#define BIO_AOF_FSYNC 1 /* Deferred AOF fsync. */
#define BIO_LAZY_FREE 2 /* Deferred objects freeing. */
#define BIO_NUM_OPS 3
分别是关闭文件、AOF线程、惰性删除。因此有3个后台线程。
再回到server.c的main线程。
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS|AE_CALL_AFTER_SLEEP);
}
}
这里就是Reactor的线程。里面会使用epoll的IO多路复用器获得事件。
/* Call the multiplexing API, will return only on timeout or when
* some event fires. */
numevents = aeApiPoll(eventLoop, tvp);
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
if (retval > 0) {
int j;
numevents = retval;
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}
然后事件交给具体的Handler去处理。如下:
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int fired = 0; /* Number of events fired for current fd. */
/* Normally we execute the readable event first, and the writable
* event laster. This is useful as sometimes we may be able
* to serve the reply of a query immediately after processing the
* query.
*
* However if AE_BARRIER is set in the mask, our application is
* asking us to do the reverse: never fire the writable event
* after the readable. In such a case, we invert the calls.
* This is useful when, for instance, we want to do things
* in the beforeSleep() hook, like fsynching a file to disk,
* before replying to a client. */
int invert = fe->mask & AE_BARRIER;
/* Note the "fe->mask & mask & ..." code: maybe an already
* processed event removed an element that fired and we still
* didn't processed, so we check if the event is still valid.
*
* Fire the readable event if the call sequence is not
* inverted. */
if (!invert && fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
/* Fire the writable event. */
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
/* If we have to invert the call, fire the readable event now
* after the writable one. */
if (invert && fe->mask & mask & AE_READABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
processed++;
}
这些处理都是在aeMain这个方法里面的循环处理的,也就是执行线程的是启动的main线程。
到这里,我们知道了Redis 5.0的线程数量应该是4个:主线程main 和后台线程CLOSE_FILE、AOF_SYNC、LAZY_FREE,下面简单验证一下。
三. 简单测试
从redis 5.0的官方源码编译得到目标文件,进入./redis-5.0.14/src,执行redis-server启动服务器。再启动redis-cli,进入服务器。 做下简单测试如下:
127.0.0.1:6379> get 'mykey'
(nil)
127.0.0.1:6379> set 'mykey' 'v1'
OK
127.0.0.1:6379> set 'mykey2' 'v2'
OK
127.0.0.1:6379> get 'mykey'
"v1"
127.0.0.1:6379>
再使用ps查看redis的线程数量,结果如下:
zouhl@zouhl-mint:~/dev/ide/idea2023/bin$ ps -ef|grep redis
zouhl 5533 1722 0 21:55 ? 00:00:00 ./redis-server *:6379
zouhl 5571 5541 0 21:57 pts/3 00:00:00 ./redis-cli
zouhl 5665 2367 0 21:59 pts/0 00:00:00 grep --color=auto redis
zouhl@zouhl-mint:~/dev/ide/idea2023/bin$ ps -T -p 5533
PID SPID TTY TIME CMD
5533 5533 ? 00:00:00 redis-server
5533 5534 ? 00:00:00 redis-server
5533 5535 ? 00:00:00 redis-server
5533 5536 ? 00:00:00 redis-server
zouhl@zouhl-mint:~/dev/ide/idea2023/bin$
可以看到默认启动的redis-sever是包含4个线程的。
所以所谓的redis单线程指的是:main线程负责使用IO多路复用器获取事件,然后事件分派给具体的Handler处理器,以及Handler处理器处理具体事件,这三个过程都是main线程处理的。所以redis是单线程,但是仍然有background后台线程。
四. 参考资料
- Redis源码(5.1.4)
- 《Scalable IO in Java》[https://gee.cs.oswego.edu/dl/cpjslides/nio.pdf]