深度解析单线程的 Redis 如何做到每秒数万 QPS 的超高处理能力( 四 )

aeProcessEvents 就是调用 epoll_wait 来发现事件 。当发现有某个 fd 上事件发生以后 , 则调为其事先注册的事件处理器函数 rfileProc 和 wfileProc 。
3.2 处理新连接请求我们假设现在有新用户连接到达了 。前面在我们看到 listen socket 上的 rfileProc 注册的是 acceptTcpHandler 。也就是说 , 如果有连接到达的时候 , 会回调到 acceptTcpHandler 。
在 acceptTcpHandler 中 , 主要做了几件事情

深度解析单线程的 Redis 如何做到每秒数万 QPS 的超高处理能力

文章插图
 
  • 调用 accept 系统调用把用户连接给接收回来
  • 为这个新连接创建一个唯一 redisClient 对象
  • 将这个新连接添加到 epoll , 并注册一个读事件处理函数
接下来让我们看上面这三件事情都分别是如何被处理的 。
//file:src/networking.cvoid acceptTcpHandler(aeEventLoop *el, int fd, ...) {cfd = anetTcpAccept(server.neterr, fd, cip, ...);acceptCommonHandler(cfd,0);}在 anetTcpAccept 中执行非常的简单 , 就是调用 accept 把连接接收回来 。
//file: src/anet.cint anetTcpAccept(......) {anetGenericAccept(err,s,(struct sockaddr*)&sa,&salen)}static int anetGenericAccept(......) {fd = accept(s,sa,len)}接下来在 acceptCommonHandler 为这个新的客户端连接 socket , 创建一个 redisClient 对象 。
//file: src/networking.cstatic void acceptCommonHandler(int fd, int flags) {// 创建 redisClient 对象redisClient *c;c = createClient(fd);......}在 createClient 中 , 创建 client 对象 , 并且为该用户连接注册了读事件处理器 。
//file:src/networking.credisClient *createClient(int fd) {// 为用户连接创建 client 对象redisClient *c = zmalloc(sizeof(redisClient));if (fd != -1) {...// 为用户连接注册读事件处理器aeCreateFileEvent(server.el,fd,AE_READABLE,readQueryFromClient, c)}...}关于 aeCreateFileEvent 的处理过程这里就不赘述了 , 详情参见 2.3 节 。其效果就是将该用户连接 socket fd 对应的读处理函数设置为 readQueryFromClient, 并且设置私有数据为 redisClient c 。
3.3 处理客户连接上的可读事件现在假设该用户连接有命令到达了 , 就假设用户发送了GET XXXXXX_KEY 命令 。那么在 Redis 的时间循环中调用 epoll_wait 发现该连接上有读时间后 , 会调用在上一节中讨论的为其注册的读处理函数 readQueryFromClient 。
深度解析单线程的 Redis 如何做到每秒数万 QPS 的超高处理能力

文章插图
 
在读处理函数 readQueryFromClient 中主要做了这么几件事情 。
  • 解析并查找命令
  • 调用命令处理
  • 添加写任务到队列
  • 将输出写到缓存等待发送
我们来详细地看 readQueryFromClient 的代码 。在 readQueryFromClient 中会调用 processInputBuffer , 然后进入 processCommand 对命令进行处理 。其调用链如下:
//file: src/networking.cvoid readQueryFromClient(aeEventLoop *el, int fd, void *privdata, ...) {redisClient *c = (redisClient*) privdata;processInputBufferAndReplicate(c);}void processInputBufferAndReplicate(client *c) {...processInputBuffer(c);}// 处理客户端输入的命令内容void processInputBuffer(redisClient *c) {// 执行命令 , processCommand(c);}我们再来详细看 processCommand。
//file:int processCommand(redisClient *c) {// 查找命令 , 并进行命令合法性检查 , 以及命令参数个数检查c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);......// 处理命令// 如果是 MULTI 事务 , 则入队 , 否则调用 call 直接处理if (c->flags & CLIENT_MULTI && ...){queueMultiCommand(c);} else {call(c,CMD_CALL_FULL);...}return C_OK;}我们先忽略 queueMultiCommand , 直接看核心命令处理方法 call 。
//file:src/server.cvoid call(client *c, int flags) {// 查找处理命令 , struct redisCommand *real_cmd = c->cmd;// 调用命令处理函数c->cmd->proc(c);......}在 server.c 中定义了每一个命令对应的处理函数
//file:src/server.cstruct redisCommand redisCommandTable[] = {{"module",moduleCommand,-2,"as",0,NULL,0,0,0,0,0},{"get",getCommand,2,"rF",0,NULL,1,1,1,0,0},{"set",setCommand,-3,"wm",0,NULL,1,1,1,0,0},{"setnx",setnxCommand,3,"wmF",0,NULL,1,1,1,0,0},{"setex",setexCommand,4,"wm",0,NULL,1,1,1,0,0},......{"mget",mgetCommand,-2,"rF",0,NULL,1,-1,1,0,0},{"rpush",rpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},{"lpush",lpushCommand,-3,"wmF",0,NULL,1,1,1,0,0},{"rpushx",rpushxCommand,-3,"wmF",0,NULL,1,1,1,0,0},......}


推荐阅读