Redis网络部分相关的结构体2 和 绑定回调函数细节

发布于:2024-04-29 ⋅ 阅读:(27) ⋅ 点赞:(0)

目录

1. struct connection

ConnectionType属性

创建connection

2. struct client

3. 绑定客户端回调函数的流程

3.1. 读事件回调函数的设置

3.2. 写事件回调函数的设置

3.3. connSocketEventHandler函数

3.4. Redis5版本的设置回调函数

3.5. 个人的一些想法,修改源码

4. 总结设置客户端回调函数的流程

读事件回调函数的设置

写事件回调函数的设置流程

5. 回调函数的调用流程

客户端

服务器端 


本文章主要内容是下面两部分:

  • 两个结构体:connectionclient
  • 设置回调函数的注意点和函数connSocketEventHandler

1. struct connection

什么时候使用了connection?在创建一个客户端时候,会同时创建一个connection来绑定该fd。

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
    while(max--) {
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        .............
        acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
    }
}

//创建connection
connection *connCreateAcceptedSocket(int fd) {
    connection *conn = connCreateSocket();
    conn->fd = fd;
    conn->state = CONN_STATE_ACCEPTING;
    return conn;
}

该结构是一个完成的连接,客户端的fd封装成一个connection。

struct connection {
    ConnectionType *type;
    ConnectionState state;    //表示该客户端当前的连接状态
    short int flags;
    short int refs;        //该连接被引用的数量
    int last_errno;        //该连接的最终错误
    void *private_data;    //在网络这部分,可以认为是结构体client
    //一些对应的回调函数
    ConnectionCallbackFunc conn_handler;
    ConnectionCallbackFunc write_handler;
    ConnectionCallbackFunc read_handler;
    int fd;    //该客户端对应的fd
};

//回调函数的类型
typedef void (*ConnectionCallbackFunc)(struct connection *conn);

//connection的flags的值
#define CONN_FLAG_CLOSE_SCHEDULED   (1<<0)      /* Closed scheduled by a handler */
#define CONN_FLAG_WRITE_BARRIER     (1<<1)      /* Write barrier requested */
//一般是先执行读事件,之后再执行写事件,但是想要置换顺序的话,其flags置为CONN_FLAG_WRITE_BARRIER,就可以换顺序了

//表示客户端当前的连接状态
typedef enum {
    CONN_STATE_NONE = 0,
    CONN_STATE_CONNECTING,
    CONN_STATE_ACCEPTING,
    CONN_STATE_CONNECTED,
    CONN_STATE_CLOSED,
    CONN_STATE_ERROR
} ConnectionState;

ConnectionType属性

connection有个ConnectionType属性,这里是一堆接口(函数的第一个参数都是connection),而struct connection是操作对象。

那么该结构与ConnectionType配合使用。不同ConnectionType的connection就会有不同的接口。

typedef struct ConnectionType {
    void (*ae_handler)(struct aeEventLoop *el, int fd, void *clientData, int mask);
    int (*connect)(struct connection *conn, const char *addr, int port, const char *source_addr, ConnectionCallbackFunc connect_handler);
    int (*write)(struct connection *conn, const void *data, size_t data_len);
    int (*read)(struct connection *conn, void *buf, size_t buf_len);
    void (*close)(struct connection *conn);
    int (*accept)(struct connection *conn, ConnectionCallbackFunc accept_handler);
    int (*set_write_handler)(struct connection *conn, ConnectionCallbackFunc handler, int barrier);
    int (*set_read_handler)(struct connection *conn, ConnectionCallbackFunc handler);
    .........................
    int (*get_type)(struct connection *conn);
} ConnectionType;

//这里有个要点需要留意:所有的函数类型的参数都是struct connecton* 开头的,
//但是只有ae_handler类型不是,其参数没有connection的,其是在参数clientData位置上

为什么要有这个类型ConnectionType呢?是因为Redis中默认有两种类型的connection。感觉像是面向对象的,继承,不同类型的connection会有不同的方法。

Redis从版本6开始支持SSL / TLS,这是一项可选功能,需要在编译时启用。所以才弄了两种类型。

//要对这些接口有印象,后续就是使用这些接口的

ConnectionType CT_Socket = {
    //这些都是函数,比如把函数connSocketEventHandler赋值给ae_hander
    .ae_handler = connSocketEventHandler,    
    .close = connSocketClose,
    .write = connSocketWrite,
    .read = connSocketRead,
    .accept = connSocketAccept,
    .connect = connSocketConnect,
    .set_write_handler = connSocketSetWriteHandler,
    .set_read_handler = connSocketSetReadHandler,
  ............................................
    .get_type = connSocketGetType
};

//tls.c
#ifdef USE_OPENSSL
//需要定义了USE_OPENSSL,这个CT_TLS才会生效
ConnectionType CT_TLS = {
    .ae_handler = tlsEventHandler,
    .accept = connTLSAccept,
    .connect = connTLSConnect,
    .blocking_connect = connTLSBlockingConnect,
    .read = connTLSRead,
    .write = connTLSWrite,
    .close = connTLSClose,
    .set_write_handler = connTLSSetWriteHandler,
    .set_read_handler = connTLSSetReadHandler
  .........................................................
    .get_type = connTLSGetType
};

创建connection

那编译时候不使用TLS的,那创建的connection的type就是CT_Socket类型。从源码可知,所以后面我们就关注CT_Socket的接口就行。

connection *connCreateAcceptedSocket(int fd) {
    connection *conn = connCreateSocket();
    conn->fd = fd;    //设置对应的fd
    conn->state = CONN_STATE_ACCEPTING;    //设置状态
    return conn;
}

connection *connCreateSocket() {
    connection *conn = zcalloc(sizeof(connection));
    conn->type = &CT_Socket;    //这个重点,是CT_Socket类型
    conn->fd = -1;

    return conn;
}

//tls.c
static connection *createTLSConnection(int client_side) {
    SSL_CTX *ctx = redis_tls_ctx;
    if (client_side && redis_tls_client_ctx)
        ctx = redis_tls_client_ctx;
    tls_connection *conn = zcalloc(sizeof(tls_connection));
    conn->c.type = &CT_TLS;    //这个就是CT_TLS类型的
    conn->c.fd = -1;    
    conn->ssl = SSL_new(ctx);
    return (connection *) conn;
}

2. struct client

什么时候使用到client?还是从创建一个客户端acceptTcpHandler开始。

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    ........................
    acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
}
static void acceptCommonHandler(connection *conn, int flags, char *ip) {
    ..............................
   client *c = createClient(conn);    //创建client
}
client *createClient(connection *conn) {
    client *c = zmalloc(sizeof(client));
    if (conn) {
        .........................
        connSetReadHandler(conn, readQueryFromClient);    //设置读事件回调函数
        connSetPrivateData(conn, c);    //把client变量c赋值给conection->privateData
    }
    //初始化client的一些变量
    ................
    if (conn) linkClient(c);    //把该客户端添加到服务器server.client链表中保存
}

Redis使用结构体client存储客户端连接的所有信息。这里面就包括了客户端对应的connection。

//redis5版本的是有fd,而redis6版本的用connection替代了fd
typedef struct client {
    uint64_t id;            /* Client incremental unique ID. */
    connection *conn;        //客户对应的connection
    int resp;               /* RESP protocol version. Can be 2 or 3. */
    redisDb *db;              //select命令选择的数据库对象
    
    //从客户端读取的数据存储的位置,即输入缓冲区
    sds querybuf;           /* Buffer we use to accumulate client queries. */
    size_t qb_pos;          /* The position we have read in querybuf. */
    // 命令和命令参数
    int argc;               /* Num of arguments of current command. */
    robj **argv;            /* Arguments of current command. */

    struct redisCommand *cmd;  //待执行的命令

    int reqtype;            /* Request protocol type: PROTO_REQ_* */

    int multibulklen;       /* Number of multi bulk arguments left to read. */
    long bulklen;           /* Length of bulk argument in multi bulk request. */
    //回复客户端数据的链表
    list *reply;            /* List of reply objects to send to the client. */
    unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */
    size_t sentlen;         /* Amount of bytes already sent in the current
                               buffer or object being sent. */

    uint64_t flags;         /* 客户端标识,Client flags: CLIENT_* macros. */

    ...............
    /* Response buffer */ //回复客户端数据的地方,即输出缓冲区,若是不够空间,就存放在reply中
    int bufpos;
    char buf[PROTO_REPLY_CHUNK_BYTES];
} client;

3. 绑定客户端回调函数的流程

3.1. 读事件回调函数的设置

void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
    ..........
    while(max--) {
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        ........................
        acceptCommonHandler(connCreateAcceptedSocket(cfd),0,cip);
    }
}

static void acceptCommonHandler(connection *conn, int flags, char *ip) {
    ..............................
    /* Create connection and client */
   client *c = createClient(conn);
}
client *createClient(connection *conn) {
    client *c = zmalloc(sizeof(client));
    if (conn) {
        .........................
        connSetReadHandler(conn, readQueryFromClient);    //设置读事件回调函数
        connSetPrivateData(conn, c);    //把client变量c赋值给conection->privateData
    }
}

static inline int connSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
    return conn->type->set_read_handler(conn, func);
}

调用函数connSetReadHandler设置读事件回调函数。看到该函数的实现,可能会比较疑惑。所以这时就需要关联上面讲的ConnectionType属性,其是CT_Socket。所以我们查看到CT_Socket的set_read_handler是函数 connSocketSetReadHandler。

那么connSetReadHandler的实现就变成如下。那么其最终也是调用aeCreateFileEvent来创建一个FileEvent,并且绑定func给对应的fileProc

static inline int connSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
    //return conn->type->set_read_handler(conn, func);
    //就是调用connSocketSetReadHandler
    return connSocketSetReadHandler(conn, func);
}

static int connSocketSetReadHandler(connection *conn, ConnectionCallbackFunc func) {
    if (func == conn->read_handler) return C_OK;

    conn->read_handler = func;
    if (!conn->read_handler)
        aeDeleteFileEvent(server.el,conn->fd,AE_READABLE);
    else
        if (aeCreateFileEvent(server.el,conn->fd,
                    AE_READABLE,conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
    return C_OK;
}

int aeCreateFileEvent(aeEventLoop *eventLoop, int fd, int mask,
        aeFileProc *proc, void *clientData)
{
    aeFileEvent *fe = &eventLoop->events[fd];
    ....................
    fe->mask |= mask;
    if (mask & AE_READABLE) fe->rfileProc = proc;
    if (mask & AE_WRITABLE) fe->wfileProc = proc;
    fe->clientData = clientData;
}

那么又有疑惑了,不是说绑定func的吗?怎么函数aeCreateFileEvent中的参数是conn->type->ae_handler?我们先保留这个疑问,看完写事件回调函数的设置。

3.2. 写事件回调函数的设置

void beforeSleep(struct aeEventLoop *eventLoop) {
    .................
    /* Handle writes with pending output buffers. */
    handleClientsWithPendingWritesUsingThreads();
}
int handleClientsWithPendingWritesUsingThreads(void) {
    .....................
    listRewind(server.clients_pending_write,&li);
    while((ln = listNext(&li))) {
        client *c = listNodeValue(ln);

          //设置写事件回调函数
        if (clientHasPendingReplies(c) &&
                connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
        ..................
    }
}

static inline int connSetWriteHandler(connection *conn, ConnectionCallbackFunc func) {
    return conn->type->set_write_handler(conn, func, 0);
}

conn->type->set_write_handler绑定的是connSocketSetWriteHandler。那么connSetWriteHandler的实现即是:

static inline int connSetWriteHandler(connection *conn, ConnectionCallbackFunc func) {
    //return conn->type->set_write_handler(conn, func, 0);
    return connSocketSetWriteHandler(conn, func, 0);
}

static int connSocketSetWriteHandler(connection *conn, ConnectionCallbackFunc func, int barrier) {
    if (func == conn->write_handler) return C_OK;

    conn->write_handler = func;
    if (barrier)
        conn->flags |= CONN_FLAG_WRITE_BARRIER;
    else
        conn->flags &= ~CONN_FLAG_WRITE_BARRIER;
    if (!conn->write_handler)
        aeDeleteFileEvent(server.el,conn->fd,AE_WRITABLE);
    else
        if (aeCreateFileEvent(server.el,conn->fd,AE_WRITABLE,
                    conn->type->ae_handler,conn) == AE_ERR) return C_ERR;
    return C_OK;
}

设置写事件回调函数的也是使用conn->type->ae_handler。

说明读写事件的回调函数的设置都统一是使用conn->type->ae_handler。ae_handler对应的是connSocketEventHandler

3.3. connSocketEventHandler函数

static void connSocketEventHandler(struct aeEventLoop *el, int fd, void *clientData, int mask)
{
    connection *conn = clientData;
    //创建connection时候,设置了state=CONN_STATE_ACCEPTING,所以这个判断不成立
    if (conn->state == CONN_STATE_CONNECTING && (mask & AE_WRITABLE) && conn->conn_handler) {
        int conn_error = connGetSocketError(conn);
        if (conn_error) {
            conn->last_errno = conn_error;
            conn->state = CONN_STATE_ERROR;
        } else {
            conn->state = CONN_STATE_CONNECTED;
        }
        if (!conn->write_handler) aeDeleteFileEvent(server.el,conn->fd,AE_WRITABLE);
        if (!callHandler(conn, conn->conn_handler)) return;
        conn->conn_handler = NULL;
    }

    //位全为1结果才是1,初始时候flags是0,所以&CONN_FLAG_WRITE_BARRIER后也是0
    //只有后续设置flags=CONN_FLAG_WRITE_BARRIER后,再&结果才是1
    int invert = conn->flags & CONN_FLAG_WRITE_BARRIER;

    int call_write = (mask & AE_WRITABLE) && conn->write_handler;
    int call_read = (mask & AE_READABLE) && conn->read_handler;
    
    //执行对应的回调函数
    /* Handle normal I/O flows */
    if (!invert && call_read) {
        if (!callHandler(conn, conn->read_handler)) return;
    }
    /* Fire the writable event. */
    if (call_write) {
        if (!callHandler(conn, conn->write_handler)) return;
    }
    /* If we have to invert the call, fire the readable event now
     * after the writable one. */
    if (invert && call_read) {
        if (!callHandler(conn, conn->read_handler)) return;
    }
}

static inline int callHandler(connection *conn, ConnectionCallbackFunc handler) {
    connIncrRefs(conn);    //增加refs值以保护连接
    if (handler) handler(conn);    //这里就是执行回调函数,即是执行readQueryFromClient等
    connDecrRefs(conn);    //回调函数执行后,refs--
    if (conn->flags & CONN_FLAG_CLOSE_SCHEDULED) {
        if (!connHasRefs(conn)) connClose(conn);    //如果refs==0,执行延迟关闭
        return 0;
    }
    return 1;
}

static inline void connIncrRefs(connection *conn) {
    conn->refs++;
}

到这里终于知道设置conn->type->ae_handler作为回调函数的原因了。

前面的设置读写回调时候,把readQueryFromClient绑定给conn->read_handler把sendReplyToClient绑定给conn->write_handler

所以,connSocketEventHandler函数中既有读事件回调函数,也有写事件回调函数。所以,我们可以这样认为,connSocketEventHandler是connection的处理中心。在主框架的epoll中并不会直接调用客户端的读写回调函数,而是统一调用connSocketEventHandler这样一来相当于是框架与connection解耦了。

3.4. Redis5版本的设置回调函数

我回头查看了Redis5.0.10版本的,发现,其是直接设置readQueryFromClient作为回调函数这个版本也是没有结构体connection,其是直接使用client的。

//Reids5.0.10版本
void acceptTcpHandler(aeEventLoop *el, int fd, void *privdata, int mask) {
    int cport, cfd, max = MAX_ACCEPTS_PER_CALL;
    while(max--) {
        cfd = anetTcpAccept(server.neterr, fd, cip, sizeof(cip), &cport);
        ...........
        acceptCommonHandler(cfd,0,cip);
    }
}

static void acceptCommonHandler(int fd, int flags, char *ip) {
    client *c = createClient(fd)
    .................
}

client *createClient(int fd) {
    client *c = zmalloc(sizeof(client));
    if (fd != -1) {
       aeCreateFileEvent(server.el,fd,AE_READABLE,readQueryFromClient, c) 
       ......................
    }
}

Redis6后,开始支持SSL / TLS,添加了conneciton,这个connection就是有两种类型。所以才这样弄吧。

3.5. 个人的一些想法,修改源码

当事件就绪时,那都是执行connSocketEventHandler,而其都有读/写事件的回调函数。

我认为,那结构体aeFileEvent可以只拥有一个aeFileProc即可。

typedef struct aeFileEvent {
    int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
    aeFileProc *rfileProc;
    aeFileProc *wfileProc;
    void *clientData;
} aeFileEvent;

//可以改写成如下
typedef struct aeFileEvent {
    int mask; /* one of AE_(READABLE|WRITABLE|BARRIER) */
    aeFileProc *fileProc;    //只用一个回调函数就行
    void *clientData;
} aeFileEvent;

而在函数aeProcessEvents中不再需要判别是读事件还是写事件了。可以改写成如下:

//展示主体,主要是修改了for循环内部,不管是哪种类型,统一是使用fe->fileProc(....)
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;
    ...............................
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        struct timeval tv, *tvp;
      
        if (eventLoop->beforesleep != NULL && flags & AE_CALL_BEFORE_SLEEP)
            eventLoop->beforesleep(eventLoop);

        /* Call the multiplexing API, will return only on timeout or when
         * some event fires. */
        numevents = aeApiPoll(eventLoop, tvp);
        for (int j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;

            if (fe->mask)    //表示该fd是有关注的事件类型的,就可以执行对应的读或写
                //该函数就是调用connSocketEventHandler  
                fe->fileProc(eventLoop,fd,fe->clientData,mask);
                   
        }
    }
}

可以这样写的原因,是因为connSocketEventHandler中有写回调和读回调,只要传事件类型进去就知道是使用读回调还是写回调。这种写法就更加统一了。 

那为什么Redis作者不这样做呢?是为了兼容之前版本的,或者是我漏了什么细节是不能这样操作的呢?若有见解,欢迎在评论区讨论指出。

4. 总结设置客户端回调函数的流程

读事件回调函数的设置

创建连接时候,需要设置客户端的读回调。

createClient--->connSetReadHandler--->(conn->type->set_read_handler)--->(connSocketReadHandler,其内部把读回调函数readQueryFromClient赋值给read_handler)--->(aeCreateFileEvent,把conn->type->ae_handler赋值给rfileProc)。

写事件回调函数的设置流程

单线程的情况:

aeProcessEvents--->beforesleep--->handleClientsWithPendingWritesUsingThreads--->handleClientsWithPendingWrites--->connSetWriteHandlerWithBarrier--->(connSocketSetWriteHandler,把sendReplyToClient赋值给write_handler)--->(aeCreateFileEvent,把conn->type->ae_handler赋值给wfileProc)。

5. 回调函数的调用流程

客户端

当事件就绪(假设是读事件),就会执行fe->rfileProc函数,那该函数就是执行connSocketEventHandler,接着其内部会调用callHandler函数,callHandler就会调用conn->read_handler。

其实不管是读事件还是写事件,都是执行connSocketEventHandler

即是aeProcessEvents--->rfileProc--->connSocketEventHandler--->callHandler--->(conn->read_handler,即是readQueryFromClient)。

服务器端 

aeProcessEvents--->rfileProc--->acceptTcpHandler。

对比:

服务器端的是直接调用回调函数acceptTcpHandler。

而客户端的是调用connSocketEventHandler再在connSocketEventHandler内部判断若是读事件,才执行回调函数readQueryFromClient


网站公告

今日签到

点亮在社区的每一天
去签到