《redis4.0 通信模块源码分析(一)》

发布于:2025-02-10 ⋅ 阅读:(52) ⋅ 点赞:(0)

       

【redis导读】redis作为一款高性能的内存数据库,面试服务端开发,redis是绕不开的话题,如果想提升自己的网络编程的水平和技巧,redis这款优秀的开源软件是很值得大家去分析和研究的。

    

      笔者从大学毕业一直有分析redis源码的想法,但由于各种原因,一直没有付诸行动,今天抽空把redis4.0的源码做了一次深层次的剖析,redis作为一款高效的、支持高并发的内存型数据库,相信很多同学认为redis采用了非常复杂的网络通信架构,但实则不然!redis之所以性能高,redis4.0采用了单线程的模式(redis6.0不再是单线程模式),有效地避免了线程切换和同步所带的性能开销;redis键值对全部存储在内存中,redis自实现了一套高效的内存管理机制,数据的存取都是直接访问内存,无需进行磁盘IO访问。

1、前期准备工作

    centos的终端上运行:

wget http://download.redis.io/releases/redis-4.0.11.tar.gztar -zxvf redis-4.0.11.tar.gzcd redis-4.0.11make -j 5

     编译redis源码:

图片

      gdb调试redis-server:

 gdb redis-server r

图片

   在redis编译目录下,再启一个终端,运行如下指令,把redis-client运行起来:

gdb redis-clirset hello redis

图片

    这样就完成了redis的前期准备工作,可以高效地往redis-server中更新键值对,好那接下来看看redis-server关于服务端源码的剖析。

2、调试源码

      redis-server也是作为一个独立的进程,既然是独立的进程,那么程序肯定有入口点,也即是main函数入口,全局搜索了下redis的源码,可以看到server.c中有main函数有入口。

图片

int main(int argc, char **argv) {
  
      ......    //初始化服务端    initServer();    //设置一些回调函数    aeSetBeforeSleepProc(server.el, beforeSleep);    aeSetAfterSleepProc(server.el, afterSleep);    //aeMain开启事件循环    aeMain(server.el);    ......    aeDeleteEventLoop(server.el);    return 0;}

   以上是server.c中main函数的主要执行流,只有一个主线程,初始化服务,设置回调,开始事件循环。那逐步开始拆解,先看看initServer()的执行流。

    备注:initServer()接口中很多细节值得大家去学习,也是编写服务端程序容易被遗漏的细节

/* Global vars */
struct redisServer server; /* Server global state */

void setupSignalHandlers(void) {
    struct sigaction act;
    /* When the SA_SIGINFO flag is set in sa_flags then sa_sigaction is used.
     * Otherwise, sa_handler is used. */
    sigemptyset(&act.sa_mask);
    act.sa_flags = 0;
    act.sa_handler = sigShutdownHandler;
    sigaction(SIGTERM, &act, NULL);
    sigaction(SIGINT, &act, NULL);
    ......
    return;
}

void initServer(void) 
{
    int j;
    /*
      忽略SIGHUP、SIGPIPE信号,否则这两个信号容易把redis进程给挂掉
    */
    signal(SIGHUP, SIG_IGN);
    signal(SIGPIPE, SIG_IGN);
     //设置指定信号处理函数。
    setupSignalHandlers();
    ......
    /*
      全局redisServer对象,生命周期和整个进程保持一致
      redisServer对象保存了事件循环、客户端队列等成员变量
    */
    server.pid = getpid();
    server.current_client = NULL;
    server.clients = listCreate();
    server.clients_to_close = listCreate();
    server.slaves = listCreate();
    server.monitors = listCreate();
    //clients_pending_write表示已连接客户端,但未注册写事件的队列
    server.clients_pending_write = listCreate();
    server.slaveseldb = -1; 
    server.unblocked_clients = listCreate();
    server.ready_keys = listCreate();
    //还未给回复的客户端队列
    server.clients_waiting_acks = listCreate();
    server.get_ack_from_slaves = 0;
    server.clients_paused = 0;
    server.system_memory_size = zmalloc_get_memory_size();
    createSharedObjects();
    adjustOpenFilesLimit();
     /*
      根据配置的参数,给主evetLoop的各成员队列初始化指定大小的空间
      比如: 读、写回调函数的aeFileEvent队列
      typedef struct aeFileEvent {
          int mask;//可读、可写、异常
          aeFileProc *rfileProc;
          aeFileProc *wfileProc;
          void *clientData;
       }aeFileEvent;
     */
    //全局就一个redisServer,一个redisServer对应一个eventLoop
    server.el = aeCreateEventLoop(server.maxclients + CONFIG_FDSET_INCR);
    if (server.el == NULL) {
        serverLog(LL_WARNING,
            "Failed creating the event loop. Error message: '%s'",
            strerror(errno));
        exit(1);
    }
    server.db = zmalloc(sizeof(redisDb) * server.dbnum);
    //开启监听
    if (server.port != 0 &&
        listenToPort(server.port,server.ipfd,&server.ipfd_count) == C_ERR)
        exit(1);
    
    if (server.unixsocket != NULL) {
        unlink(server.unixsocket); /* don't care if this fails */
        server.sofd = anetUnixServer(server.neterr,server.unixsocket,
            server.unixsocketperm, server.tcp_backlog);
        if (server.sofd == ANET_ERR) {
        serverLog(LL_WARNING, "Opening Unix socket: %s", server.neterr);
            exit(1);
        }
        //将socket设置成非阻塞的
        anetNonBlock(NULL,server.sofd);
    }
    ......
    
    //创建Redis定时器,用于执行定时任务
    if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
        serverPanic("Can't create event loop timers.");
        exit(1);
    }
    
    /*
       1、为redisServer监听套接字设置连接建立成功
       回调函数acceptTcpHandler,只关注可读事件,
       监听套接字产生可读事件,说明连接建立成功。
       2、将监听socket绑定到IO复用模型上面去
    */
    for (j = 0; j < server.ipfd_count; j++) {
       if (aeCreateFileEvent(server.el, 
            server.ipfd[j], 
            AE_READABLE,
            acceptTcpHandler, 
            NULL) == AE_ERR)
            {
              serverPanic("Unrecoverable error 
                   creating server.ipfd file event.");
            }
    }
    if (server.sofd > 0 && aeCreateFileEvent(server.el,
        server.sofd, AE_READABLE,
        acceptUnixHandler,NULL) == AE_ERR) serverPanic("Unrecoverable error creating server.sofd file event.");
    
    
    /* 创建一个管道,用于主动唤醒被epoll_wait挂起的eventLoop */
    if (aeCreateFileEvent(server.el, 
         server.module_blocked_pipe[0], AE_READABLE,
         moduleBlockedClientPipeReadable, NULL) == AE_ERR) {
            serverPanic(
                "Error registering the readable event for the module "
                "blocked clients subsystem.");
    }
    
   ......
}

     基于上述的主流程,我们进一步剖析,如何将监听socket绑定到IO多路复用模型上?进一步剖析aeCreateFileEvent接口。

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
     aeFileProc *proc, void *clientData)
{
    if (fd >= eventLoop->setsize) {
        errno = ERANGE;
        return AE_ERR;
    }
    
    aeFileEvent *fe = &eventLoop->events[fd];
    if (aeApiAddEvent(eventLoop, fd, mask) == -1)
        return AE_ERR;
        
    fe->mask |= mask;
    if (mask & AE_READABLE) 
        fe->rfileProc = proc;
    if (mask & AE_WRITABLE) 
        fe->wfileProc = proc;
    fe->clientData = clientData;
    if (fd > eventLoop->maxfd)
        eventLoop->maxfd = fd;
    return AE_OK;
}

static int aeApiAddEvent(aeEventLoop *eventLoop,
    int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0};
    int op = eventLoop->events[fd].mask == AE_NONE ?
                     EPOLL_CTL_ADD : EPOLL_CTL_MOD;
    
    ee.events = 0;
    mask |= eventLoop->events[fd].mask; /* Merge old events */
    if (mask & AE_READABLE)
        ee.events |= EPOLLIN;
    if (mask & AE_WRITABLE) 
        ee.events |= EPOLLOUT;
    
    ee.data.fd = fd;
    //从这里看redis使用epoll模型,将fd绑定到epfd上
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) 
        return -1;
    return 0;
}


      假设epoll模型检测到监听套接字有可读事件产生,那主Loop的势必从epoll_wait接口返回,再根据事件类型,转调我们提前设置的回调函数acceptTcpHandler中来。

void acceptTcpHandler(aeEventLoop *el, int fd, 
    void *privdata, int mask) {
    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
    char cip[NET_IP_STR_LEN];
    UNUSED(el);
    UNUSED(mask);
    UNUSED(privdata);
    while(max--) {
        //调用accept接口,生成一个客户端套接字
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        if (cfd == ANET_ERR) {
            if (errno != EWOULDBLOCK)
                serverLog(LL_WARNING,
               "Accepting client connection: %s", server.neterr);
            return;
        }
        serverLog(LL_VERBOSE,"Accepted %s:%d", cip, cport);
        //
        acceptCommonHandler(cfd,0,cip);
    }
}

#define MAX_ACCEPTS_PER_CALL 1000
static void acceptCommonHandler(int fd, int flags, char *ip) {
    client *c;
    //创建c
    if ((c = createClient(fd)) == NULL) {
        serverLog(LL_WARNING,
            "Error registering fd event for the new client: %s (fd=%d)",
            strerror(errno),fd);
        close(fd); /* May be already closed, just ignore errors */
        return;
    }
   ......
}

//以客户端套接字创建一个client对象
client *createClient(int fd) {
    client *c = zmalloc(sizeof(client));
    if (fd != -1) {
        //将客户端套接字设置成非阻塞的
        anetNonBlock(NULL,fd);
        //关闭nagel算法
        anetEnableTcpNoDelay(NULL,fd);
        //设置TCP链接保活机制
        if (server.tcpkeepalive)
            anetKeepAlive(NULL,fd,server.tcpkeepalive);
        /*
          将客户端套接字绑定到epfd上,同时设置可读事件回调函数
           readQueryFromClient
         */  
        if (aeCreateFileEvent(server.el, fd, AE_READABLE,
            readQueryFromClient, c) == AE_ERR)
         {
            close(fd);
            zfree(c);
            return NULL;
        }
    }
      ......
}

    那接着看客户端套接字产生了可读事件,进而主Loop循环会执行到和当前客户端套接字相关的回调函数中来,一起看下readQueryFromClient的源码。

void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask) 
{
    client *c = (client*)privdata;
    int nread, readlen;
    size_t qblen;
    UNUSED(el);
    UNUSED(mask);
    
    readlen = PROTO_IOBUF_LEN;
    if (c->reqtype == PROTO_REQ_MULTIBULK && 
        c->multibulklen && c->bulklen != -1
        && c->bulklen >= PROTO_MBULK_BIG_ARG)
    {
        ssize_t remaining = (size_t)(c->bulklen+2) - sdslen(c->querybuf);
        if (remaining < readlen)
            readlen = remaining;
    }

    qblen = sdslen(c->querybuf);
    if (c->querybuf_peak < qblen) 
        c->querybuf_peak = qblen;

    c->querybuf = sdsMakeRoomFor(c->querybuf, readlen);
    nread = read(fd, c->querybuf + qblen, readlen);
    if (nread == -1) {
        if (errno == EAGAIN) {
            //说明当前接收缓冲区不够,没法读到最新的数据
            return;
        } 
 else 
        {
            //那说明真的出错了
            serverLog(LL_VERBOSE, "Reading from client: %s",strerror(errno));
            freeClient(c);
            return;
        }
    } 
    else if (nread == 0) 
    {
        serverLog(LL_VERBOSE, "Client closed connection");
        freeClient(c);
        return;
    } 
    else if (c->flags & CLIENT_MASTER)
    {
        c->pending_querybuf = sdscatlen(c->pending_querybuf, 
            c->querybuf + qblen, nread);
    }

   ......

    if (!(c->flags & CLIENT_MASTER)) {
        processInputBuffer(c);
    } 
    else 
    {
        size_t prev_offset = c->reploff;
        processInputBuffer(c);
        size_t applied = c->reploff - prev_offset;
        if (applied) {
            replicationFeedSlavesFromMasterStream(server.slaves, c->pending_querybuf, applied);
            sdsrange(c->pending_querybuf, applied, -1);
        }
    }
}

    processInputBuffer 判断接收到的字符串是不是以星号( * )开头,如果以*开头,设置 client 对象的 reqtype 字段值为 PROTO_REQ_MULTIBULK ,接着调用 processMultibulkBuffer 函数继续处理剩余的字符串。处理后的字符串被解析成 redis 命令,如果是具体的命令,那么redis会按照指定的规则去执行。

    既然提到指令command,那么processInputBuffer 接口中肯定有和指令command处理相关的接口。

int processCommand(client *c) {
    //如果是quit指令,那么给客户端回应一个ok的应答replay
    if (!strcasecmp(c->argv[0]->ptr,"quit")) {
        addReply(c,shared.ok);
        c->flags |= CLIENT_CLOSE_AFTER_REPLY;
        return C_ERR;
    }
    //查找指令,执行对应的指令,出错了,给客户端回应一个错误信息
    c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
    if (!c->cmd) {
        flagTransaction(c);
        sds args = sdsempty();
        int i;
        for (i=1; i < c->argc && sdslen(args) < 128; i++)
            args = sdscatprintf(args, "`%.*s`, ", 128-(int)sdslen(args), (char*)c->argv[i]->ptr);
        addReplyErrorFormat(c,"unknown command `%s`, with args beginning with: %s",
            (char*)c->argv[0]->ptr, args);
        sdsfree(args);
        return C_OK;
    } else if ((c->cmd->arity > 0 && c->cmd->arity != c->argc) ||
               (c->argc < -c->cmd->arity)) {
        flagTransaction(c);
        addReplyErrorFormat(c,"wrong number of arguments for '%s' command",
            c->cmd->name);
        return C_OK;
    }
    ......
}

  那继续看看addReply接口:
 

void addReply(client *c, robj *obj) {
    if (prepareClientToWrite(c) != C_OK) 
        return;
    
    if (sdsEncodedObject(obj)) {
        if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
            _addReplyObjectToList(c,obj);
    } else if (obj->encoding == OBJ_ENCODING_INT) {
       ......
    } else {
        serverPanic("Wrong obj->encoding in addReply()");
    }
}

    继续看prepareClientToWrite接口:

int prepareClientToWrite(client *c) {
    if (c->flags & (CLIENT_LUA|CLIENT_MODULE)) 
       return C_OK;
       
    if (c->flags & (CLIENT_REPLY_OFF|CLIENT_REPLY_SKIP)) 
        return C_ERR;
        
    if ((c->flags & CLIENT_MASTER) &&
        !(c->flags & CLIENT_MASTER_FORCE_REPLY)) 
        return C_ERR;
        
    if (c->fd <= 0) 
       return C_ERR; 
       
    if (!clientHasPendingReplies(c) &&
        !(c->flags & CLIENT_PENDING_WRITE) &&
        (c->replstate == REPL_STATE_NONE ||
         (c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
    {
        /*
          如果当前client没有CLIENT_PENDING_WRITE标记
          而且没有暂存的数据要发送,那么给它设置个CLIENT_PENDING_WRITE
          同时将当前client添加到redisServer的clients_pending_write链表中去
        */  
        c->flags |= CLIENT_PENDING_WRITE;
        listAddNodeHead(server.clients_pending_write, c);
    }
    return C_OK;
}

      还有接口_addReplyToBuffer:

/*
  最重要的一步,将客户端请求command执行的结果添加到cliet对应的
  buf缓冲区中去。
*/  
int _addReplyToBuffer(client *c, const char *s, size_t len) 
{
    size_t available = sizeof(c->buf) - c->bufpos;
    if (c->flags & CLIENT_CLOSE_AFTER_REPLY) 
        return C_OK;
    
    /*
      如果client对应的replay链表长度大于0,
      那么将该应答指令添加到replay链表中去
    */     
    if (listLength(c->reply) > 0) 
         return C_ERR;
         
    if (len > available) 
        return C_ERR;
    
    memcpy(c->buf + c->bufpos, s, len);
    c->bufpos += len;
    return C_OK;
}

//_addReplyToBuffer返回C_ERR,那将replay添加到replay链表
if (_addReplyToBuffer(c,obj->ptr,sdslen(obj->ptr)) != C_OK)
     _addReplyObjectToList(c, obj);

   redis4.0最核心的部分就是这个主Loop循环:

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, 
           AE_ALL_EVENTS | AE_CALL_AFTER_SLEEP);
    }
}

     每次循环都会执行下beforesleep接口,beforesleep接口主要做了啥呢,可以看看beforesleep接口的实现:

void beforeSleep(struct aeEventLoop *eventLoop) {
    UNUSED(eventLoop);
    /* Handle writes with pending output buffers. */
    handleClientsWithPendingWrites();
    ......
}

int handleClientsWithPendingWrites(void) {
    listIter li;
    listNode *ln;
    int processed = listLength(server.clients_pending_write);
    //先处理有数据需要发送的链表clients_pending_write
    listRewind(server.clients_pending_write, &li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);
        //注销掉CLIENT_PENDING_WRITE标记
        c->flags &= ~CLIENT_PENDING_WRITE;
        listDelNode(server.clients_pending_write,ln);
        //直接往socket写数据
        if (writeToClient(c->fd, c, 0) == C_ERR) 
            continue;
        
        //如果当前client对象有需要发送的replay
        if (clientHasPendingReplies(c)) {
            int ae_flags = AE_WRITABLE;
            if (server.aof_state == AOF_ON &&
                server.aof_fsync == AOF_FSYNC_ALWAYS)
            {
                ae_flags |= AE_BARRIER;
            }
            /*
              如果tcp窗口太小,那么数据有可能发不出去,
              将client的fd可写事件添加到epoll模型上去
              并注册可写回调函数sendReplyToClient
            */
            if (aeCreateFileEvent(server.el, c->fd, ae_flags,
                sendReplyToClient, c) == AE_ERR)
            {
                 freeClientAsync(c);
            }
        }
    }
    return processed;
}

//sendReplyToClient也是调用writeToClient接口
void sendReplyToClient(aeEventLoop *el, int fd, void *privdata, int mask) {
    UNUSED(el);
    UNUSED(mask);
    writeToClient(fd,privdata,1);
}

     所以分析了这么多,感觉redis的通信模型就是单线程,外加一个主Loop循环,定义一个全局的redisServer对象,定义多个数据成员链表用于管理已连接的client对象集合,需要回复的client对象、有数据需要待发送的client对象集合,epoll模型监听listenSocket、AcceptSocket可读事件,客户端有请求指令发送过来,redisServer解析指令,执行指令,并给client回复执行结果,如果tcp窗口太小,给当前client的fd注册可写事件和可写回调函数sendReplyToClient,待TCP窗口满足发送数据要求时,sendReplyToClient再执行数据的发送。另外主Loop每次循环时都会主动检测待回复链表replay、待发送链表clients_pending_write,如果有数据需要发送给客户端,逐个遍历发送。

3、实测验证

     在centos7做下实测,我们同时开启两个redis-cli,先后给redis-server发送两个指令

set hello world

   此时看下redis-server的堆栈以及主线程:

图片

    redis处理客户端请求,并不是多线程并发处理,而是循环遍历去给pending client回复报文,逐一回应。

图片

     


网站公告

今日签到

点亮在社区的每一天
去签到