【redis导读】redis作为一款高性能的内存数据库,面试服务端开发,redis是绕不开的话题,如果想提升自己的网络编程的水平和技巧,redis这款优秀的开源软件是很值得大家去分析和研究的。
笔者从大学毕业一直有分析redis源码的想法,但由于各种原因,一直没有付诸行动,今天抽空把redis4.0的源码做了一次深层次的剖析,redis作为一款高效的、支持高并发的内存型数据库,相信很多同学认为redis采用了非常复杂的网络通信架构,但实则不然!redis之所以性能高,redis4.0采用了单线程的模式(redis6.0不再是单线程模式),有效地避免了线程切换和同步所带的性能开销;redis键值对全部存储在内存中,redis自实现了一套高效的内存管理机制,数据的存取都是直接访问内存,无需进行磁盘IO访问。
1、前期准备工作
centos的终端上运行:
wget http://download.redis.io/releases/redis-4.0.11.tar.gz
tar -zxvf redis-4.0.11.tar.gz
cd redis-4.0.11
make -j 5
编译redis源码:
gdb调试redis-server:
gdb redis-server
r
在redis编译目录下,再启一个终端,运行如下指令,把redis-client运行起来:
gdb redis-cli
r
set hello redis
这样就完成了redis的前期准备工作,可以高效地往redis-server中更新键值对,好那接下来看看redis-server关于服务端源码的剖析。
2、调试源码
redis-server也是作为一个独立的进程,既然是独立的进程,那么程序肯定有入口点,也即是main函数入口,全局搜索了下redis的源码,可以看到server.c中有main函数有入口。
int main(int argc, char **argv) {
......
//初始化服务端
initServer();
//设置一些回调函数
aeSetBeforeSleepProc(server.el, beforeSleep);
aeSetAfterSleepProc(server.el, afterSleep);
//aeMain开启事件循环
aeMain(server.el);
......
aeDeleteEventLoop(server.el);
return 0;
}
以上是server.c中main函数的主要执行流,只有一个主线程,初始化服务,设置回调,开始事件循环。那逐步开始拆解,先看看initServer()的执行流。
备注:initServer()接口中很多细节值得大家去学习,也是编写服务端程序容易被遗漏的细节。
/* Global vars */
struct redisServer server; /* Server global state */
void setupSignalHandlers(void) {
struct sigaction act;
/* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction is used.
* Otherwise, sa_handler is used. */
sigemptyset(&act.sa_mask);
act.sa_flags = 0;
act.sa_handler = sigShutdownHandler;
sigaction(SIGTERM, &act, NULL);
sigaction(SIGINT, &act, NULL);
......
return;
}
void initServer(void)
{
int j;
/*
忽略SIGHUP、SIGPIPE信号,否则这两个信号容易把redis进程给挂掉
*/
signal(SIGHUP, SIG_IGN);
signal(SIGPIPE, SIG_IGN);
//设置指定信号处理函数。
setupSignalHandlers();
......
/*
全局redisServer对象,生命周期和整个进程保持一致
redisServer对象保存了事件循环、客户端队列等成员变量
*/
server.pid = getpid();
server.current_client = NULL;
server.clients = listCreate();
server.clients_to_close = listCreate();
server.slaves = listCreate();
server.monitors = listCreate();
//clients_pending_write表示已连接客户端,但未注册写事件的队列
server.clients_pending_write = listCreate();
server.slaveseldb = -1;
server.unblocked_clients = listCreate();
server.ready_keys = listCreate();
//还未给回复的客户端队列
server.clients_waiting_acks = listCreate();
server.get_ack_from_slaves = 0;
server.clients_paused = 0;
server.system_memory_size = zmalloc_get_memory_size();
createSharedObjects();
adjustOpenFilesLimit();
/*
根据配置的参数,给主evetLoop的各成员队列初始化指定大小的空间
比如: 读、写回调函数的aeFileEvent队列
typedef struct aeFileEvent {
int mask;//可读、可写、异常
aeFileProc *rfileProc;
aeFileProc *wfileProc;
void *clientData;
}aeFileEvent;
*/
//全局就一个redisServer,一个redisServer对应一个eventLoop
server.el = aeCreateEventLoop(server.maxclients + CONFIG_FDSET_INCR);
if (server.el == NULL) {
serverLog(LL_WARNING,
"Failed creating the event loop. Error message: '%s'",
strerror(errno));
exit(1);
}
server.db = zmalloc(sizeof(redisDb) * server.dbnum);
//开启监听
if (server.port != 0 &&
listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
exit(1);
if (server.unixsocket != NULL) {
unlink(server.unixsocket); /* don't care if this fails */
server.sofd = anetUnixServer(server.neterr,server.unixsocket,
server.unixsocketperm, server.tcp_backlog);
if (server.sofd == ANET_ERR) {
serverLog(LL_WARNING, "Opening Unix socket: %s", server.neterr);
exit(1);
}
//将socket设置成非阻塞的
anetNonBlock(NULL,server.sofd);
}
......
//创建Redis定时器,用于执行定时任务
if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create event loop timers.");
exit(1);
}
/*
1、为redisServer监听套接字设置连接建立成功
回调函数acceptTcpHandler,只关注可读事件,
监听套接字产生可读事件,说明连接建立成功。
2、将监听socket绑定到IO复用模型上面去
*/
for (j = 0; j < server.ipfd_count; j++) {
if (aeCreateFileEvent(server.el,
server.ipfd[j],
AE_READABLE,
acceptTcpHandler,
NULL) == AE_ERR)
{
serverPanic("Unrecoverable error
creating server.ipfd file event.");
}
}
if (server.sofd > 0 && aeCreateFileEvent(server.el,
server.sofd, AE_READABLE,
acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");
/* 创建一个管道,用于主动唤醒被epoll_wait挂起的eventLoop */
if (aeCreateFileEvent(server.el,
server.module_blocked_pipe[0], AE_READABLE,
moduleBlockedClientPipeReadable, NULL) == AE_ERR) {
serverPanic(
"Error registering the readable event for the module "
"blocked clients subsystem.");
}
......
}
基于上述的主流程,我们进一步剖析,如何将监听socket绑定到IO多路复用模型上?进一步剖析aeCreateFileEvent接口。
int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
aeFileProc *proc, void *clientData)
{
if (fd >= eventLoop->setsize) {
errno = ERANGE;
return AE_ERR;
}
aeFileEvent *fe = &eventLoop->events[fd];
if (aeApiAddEvent(eventLoop, fd, mask) == -1)
return AE_ERR;
fe->mask |= mask;
if (mask & AE_READABLE)
fe->rfileProc = proc;
if (mask & AE_WRITABLE)
fe->wfileProc = proc;
fe->clientData = clientData;
if (fd > eventLoop->maxfd)
eventLoop->maxfd = fd;
return AE_OK;
}
static int aeApiAddEvent(aeEventLoop *eventLoop,
int fd, int mask) {
aeApiState *state = eventLoop->apidata;
struct epoll_event ee = {0};
int op = eventLoop->events[fd].mask == AE_NONE ?
EPOLL_CTL_ADD : EPOLL_CTL_MOD;
ee.events = 0;
mask |= eventLoop->events[fd].mask; /* Merge old events */
if (mask & AE_READABLE)
ee.events |= EPOLLIN;
if (mask & AE_WRITABLE)
ee.events |= EPOLLOUT;
ee.data.fd = fd;
//从这里看redis使用epoll模型,将fd绑定到epfd上
if (epoll_ctl(state->epfd,op,fd,&ee) == -1)
return -1;
return 0;
}
假设epoll模型检测到监听套接字有可读事件产生,那主Loop的势必从epoll_wait接口返回,再根据事件类型,转调我们提前设置的回调函数acceptTcpHandler中来。
void acceptTcpHandler(aeEventLoop *el, int fd,
void *privdata, int mask) {
int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
char cip[NET_IP_STR_LEN];
UNUSED(el);
UNUSED(mask);
UNUSED(privdata);
while(max--) {
//调用accept接口,生成一个客户端套接字
cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
if (cfd == ANET_ERR) {
if (errno != EWOULDBLOCK)
serverLog(LL_WARNING,
"Accepting client connection: %s", server.neterr);
return;
}
serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
//
acceptCommonHandler(cfd,0,cip);
}
}
#define MAX_ACCEPTS_PER_CALL 1000
static void acceptCommonHandler(int fd, int flags, char *ip) {
client *c;
//创建c
if ((c = createClient(fd)) == NULL) {
serverLog(LL_WARNING,
"Error registering fd event for the new client: %s (fd=%d)",
strerror(errno),fd);
close(fd); /* May be already closed, just ignore errors */
return;
}
......
}
//以客户端套接字创建一个client对象
client *createClient(int fd) {
client *c = zmalloc(sizeof(client));
if (fd != -1) {
//将客户端套接字设置成非阻塞的
anetNonBlock(NULL,fd);
//关闭nagel算法
anetEnableTcpNoDelay(NULL,fd);
//设置TCP链接保活机制
if (server.tcpkeepalive)
anetKeepAlive(NULL,fd,server.tcpkeepalive);
/*
将客户端套接字绑定到epfd上,同时设置可读事件回调函数
readQueryFromClient
*/
if (aeCreateFileEvent(server.el, fd, AE_READABLE,
readQueryFromClient, c) == AE_ERR)
{
close(fd);
zfree(c);
return NULL;
}
}
......
}
那接着看客户端套接字产生了可读事件,进而主Loop循环会执行到和当前客户端套接字相关的回调函数中来,一起看下readQueryFromClient的源码。
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask)
{
client *c = (client*)privdata;
int nread, readlen;
size_t qblen;
UNUSED(el);
UNUSED(mask);
readlen = PROTO_IOBUF_LEN;
if (c->reqtype == PROTO_REQ_MULTIBULK &&
c->multibulklen && c->bulklen != -1
&& c->bulklen >= PROTO_MBULK_BIG_ARG)
{
ssize_t remaining = (size_t)(c->bulklen+2) - sdslen(c->querybuf);
if (remaining < readlen)
readlen = remaining;
}
qblen = sdslen(c->querybuf);
if (c->querybuf_peak < qblen)
c->querybuf_peak = qblen;
c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
nread = read(fd, c->querybuf + qblen, readlen);
if (nread == -1) {
if (errno == EAGAIN) {
//说明当前接收缓冲区不够,没法读到最新的数据
return;
}
else
{
//那说明真的出错了
serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));
freeClient(c);
return;
}
}
else if (nread == 0)
{
serverLog(LL_VERBOSE, "Client closed connection");
freeClient(c);
return;
}
else if (c->flags & CLIENT_MASTER)
{
c->pending_querybuf = sdscatlen(c->pending_querybuf,
c->querybuf + qblen, nread);
}
......
if (!(c->flags & CLIENT_MASTER)) {
processInputBuffer(c);
}
else
{
size_t prev_offset = c->reploff;
processInputBuffer(c);
size_t applied = c->reploff - prev_offset;
if (applied) {
replicationFeedSlavesFromMasterStream(server.slaves, c->pending_querybuf, applied);
sdsrange(c->pending_querybuf, applied, -1);
}
}
}
processInputBuffer 判断接收到的字符串是不是以星号( * )开头,如果以*开头,设置 client 对象的 reqtype 字段值为 PROTO_REQ_MULTIBULK ,接着调用 processMultibulkBuffer 函数继续处理剩余的字符串。处理后的字符串被解析成 redis 命令,如果是具体的命令,那么redis会按照指定的规则去执行。
既然提到指令command,那么processInputBuffer 接口中肯定有和指令command处理相关的接口。
int processCommand(client *c) {
//如果是quit指令,那么给客户端回应一个ok的应答replay
if (!strcasecmp(c->argv[0]->ptr,"quit")) {
addReply(c,shared.ok);
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
return C_ERR;
}
//查找指令,执行对应的指令,出错了,给客户端回应一个错误信息
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
if (!c->cmd) {
flagTransaction(c);
sds args = sdsempty();
int i;
for (i=1; i < c->argc && sdslen(args) < 128; i++)
args = sdscatprintf(args, "`%.*s`, ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr);
addReplyErrorFormat(c,"unknown command `%s`, with args beginning with: %s",
(char*)c->argv[0]->ptr, args);
sdsfree(args);
return C_OK;
} else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
(c->argc < -c->cmd->arity)) {
flagTransaction(c);
addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
c->cmd->name);
return C_OK;
}
......
}
那继续看看addReply接口:
void addReply(client *c, robj *obj) {
if (prepareClientToWrite(c) != C_OK)
return;
if (sdsEncodedObject(obj)) {
if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
_addReplyObjectToList(c,obj);
} else if (obj->encoding == OBJ_ENCODING_INT) {
......
} else {
serverPanic("Wrong obj->encoding in addReply()");
}
}
继续看prepareClientToWrite接口:
int prepareClientToWrite(client *c) {
if (c->flags & (CLIENT_LUA|CLIENT_MODULE))
return C_OK;
if (c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP))
return C_ERR;
if ((c->flags & CLIENT_MASTER) &&
!(c->flags & CLIENT_MASTER_FORCE_REPLY))
return C_ERR;
if (c->fd <= 0)
return C_ERR;
if (!clientHasPendingReplies(c) &&
!(c->flags & CLIENT_PENDING_WRITE) &&
(c->replstate == REPL_STATE_NONE ||
(c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
{
/*
如果当前client没有CLIENT_PENDING_WRITE标记
而且没有暂存的数据要发送,那么给它设置个CLIENT_PENDING_WRITE
同时将当前client添加到redisServer的clients_pending_write链表中去
*/
c->flags |= CLIENT_PENDING_WRITE;
listAddNodeHead(server.clients_pending_write, c);
}
return C_OK;
}
还有接口_addReplyToBuffer:
/*
最重要的一步,将客户端请求command执行的结果添加到cliet对应的
buf缓冲区中去。
*/
int _addReplyToBuffer(client *c, const char *s, size_t len)
{
size_t available = sizeof(c->buf) - c->bufpos;
if (c->flags & CLIENT_CLOSE_AFTER_REPLY)
return C_OK;
/*
如果client对应的replay链表长度大于0,
那么将该应答指令添加到replay链表中去
*/
if (listLength(c->reply) > 0)
return C_ERR;
if (len > available)
return C_ERR;
memcpy(c->buf + c->bufpos, s, len);
c->bufpos += len;
return C_OK;
}
//_addReplyToBuffer返回C_ERR,那将replay添加到replay链表
if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
_addReplyObjectToList(c, obj);
redis4.0最核心的部分就是这个主Loop循环:
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop,
AE_ALL_EVENTS | AE_CALL_AFTER_SLEEP);
}
}
每次循环都会执行下beforesleep接口,beforesleep接口主要做了啥呢,可以看看beforesleep接口的实现:
void beforeSleep(struct aeEventLoop *eventLoop) {
UNUSED(eventLoop);
/* Handle writes with pending output buffers. */
handleClientsWithPendingWrites();
......
}
int handleClientsWithPendingWrites(void) {
listIter li;
listNode *ln;
int processed = listLength(server.clients_pending_write);
//先处理有数据需要发送的链表clients_pending_write
listRewind(server.clients_pending_write, &li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
//注销掉CLIENT_PENDING_WRITE标记
c->flags &= ~CLIENT_PENDING_WRITE;
listDelNode(server.clients_pending_write,ln);
//直接往socket写数据
if (writeToClient(c->fd, c, 0) == C_ERR)
continue;
//如果当前client对象有需要发送的replay
if (clientHasPendingReplies(c)) {
int ae_flags = AE_WRITABLE;
if (server.aof_state == AOF_ON &&
server.aof_fsync == AOF_FSYNC_ALWAYS)
{
ae_flags |= AE_BARRIER;
}
/*
如果tcp窗口太小,那么数据有可能发不出去,
将client的fd可写事件添加到epoll模型上去
并注册可写回调函数sendReplyToClient
*/
if (aeCreateFileEvent(server.el, c->fd, ae_flags,
sendReplyToClient, c) == AE_ERR)
{
freeClientAsync(c);
}
}
}
return processed;
}
//sendReplyToClient也是调用writeToClient接口
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
UNUSED(el);
UNUSED(mask);
writeToClient(fd,privdata,1);
}
所以分析了这么多,感觉redis的通信模型就是单线程,外加一个主Loop循环,定义一个全局的redisServer对象,定义多个数据成员链表用于管理已连接的client对象集合,需要回复的client对象、有数据需要待发送的client对象集合,epoll模型监听listenSocket、AcceptSocket可读事件,客户端有请求指令发送过来,redisServer解析指令,执行指令,并给client回复执行结果,如果tcp窗口太小,给当前client的fd注册可写事件和可写回调函数sendReplyToClient,待TCP窗口满足发送数据要求时,sendReplyToClient再执行数据的发送。另外主Loop每次循环时都会主动检测待回复链表replay、待发送链表clients_pending_write,如果有数据需要发送给客户端,逐个遍历发送。
3、实测验证
在centos7做下实测,我们同时开启两个redis-cli,先后给redis-server发送两个指令
set hello world
此时看下redis-server的堆栈以及主线程:
redis处理客户端请求,并不是多线程并发处理,而是循环遍历去给pending client回复报文,逐一回应。