structredisServerserver;/* Server global state, server.c */ structredisServer {// server.h dict *pubsub_channels; /* Map channels to list of subscribed clients */ };
voidsubscribeCommand(client *c) { int j; for (j = 1; j < c->argc; j++) pubsubSubscribeChannel(c,c->argv[j]); c->flags |= CLIENT_PUBSUB; }
/* Subscribe a client to a channel. Returns 1 if the operation succeeded, or * 0 if the client was already subscribed to that channel. */ intpubsubSubscribeChannel(client *c, robj *channel) { dictEntry *de; list *clients = NULL; int retval = 0;
if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) { retval = 1; incrRefCount(channel); /* Add the client to the channel -> list of clients hash table */ de = dictFind(server.pubsub_channels,channel); if (de == NULL) { // 说明channel还没有任何订阅者,需先在字典中新增channel键,再初始化1个链表,并将客户端加入链表 clients = listCreate(); dictAdd(server.pubsub_channels,channel,clients); incrRefCount(channel); } else { // 说明channel已有1个或多个订阅者,直接将这个客户端加入链表即可 clients = dictGetVal(de); } listAddNodeTail(clients,c); } /* Notify the client */ addReplyPubsubSubscribed(c,channel); return retval; }
voidunsubscribeCommand(client *c) { if (c->argc == 1) { pubsubUnsubscribeAllChannels(c,1); // unsubscribe不加任何参数,表示解除所有频道的订阅 } else { int j; for (j = 1; j < c->argc; j++) pubsubUnsubscribeChannel(c,c->argv[j],1); // 解除这个客户端与某个频道的订阅关系 } if (clientSubscriptionsCount(c) == 0) c->flags &= ~CLIENT_PUBSUB; }
/* Unsubscribe a client from a channel. Returns 1 if the operation succeeded, or * 0 if the client was not subscribed to the specified channel. */ intpubsubUnsubscribeChannel(client *c, robj *channel, int notify) { dictEntry *de; list *clients; listNode *ln; int retval = 0;
/* Remove the channel from the client -> channels hash table */ incrRefCount(channel); if (dictDelete(c->pubsub_channels,channel) == DICT_OK) { retval = 1; /* Remove the client from the channel -> clients list hash table */ de = dictFind(server.pubsub_channels,channel); serverAssertWithInfo(c,NULL,de != NULL); clients = dictGetVal(de); // 1、查询pubsub_channels字典,找到频道对应的客户端链表clients ln = listSearchKey(clients,c); // 2、在链表clients中查找客户端c serverAssertWithInfo(c,NULL,ln != NULL); listDelNode(clients,ln); // 3、然后从链表中删除这个客户端c if (listLength(clients) == 0) { // 4、如果频道只有1个订阅者,还需要从pubsub_channels字典中删除这个频道对应的键 dictDelete(server.pubsub_channels,channel); } } /* Notify the client */ if (notify) addReplyPubsubUnsubscribed(c,channel); decrRefCount(channel); /* it is finally safe to release it */ return retval; }
structredisServerserver;/* Server global state, server.c */ structredisServer { list *pubsub_patterns; /* A list of pubsub_patterns */ dict *pubsub_patterns_dict; /* A dict of pubsub_patterns */ };
/* Subscribe a client to a pattern. Returns 1 if the operation succeeded, or 0 if the client was already subscribed to that pattern. */ intpubsubSubscribePattern(client *c, robj *pattern) { dictEntry *de; list *clients; int retval = 0;
if (listSearchKey(c->pubsub_patterns,pattern) == NULL) { retval = 1; pubsubPattern *pat; listAddNodeTail(c->pubsub_patterns,pattern); incrRefCount(pattern); pat = zmalloc(sizeof(*pat)); // 1、构造一个pubsubPattern结构 pat->pattern = getDecodedObject(pattern); // 2、pattern成员设置为被订阅的模式 pat->client = c; // 3、client成员设置为订阅这个模式的客户端 listAddNodeTail(server.pubsub_patterns,pat); // 4、添加这个pubsubPatern结构到pubsub_patterns链表尾 /* Add the client to the pattern -> list of clients hash table */ de = dictFind(server.pubsub_patterns_dict,pattern); if (de == NULL) { // 如果没查到,说明这个模式还没有客户端订阅,此时先在字典中为这个模式创建一个键,键的值为空链表 clients = listCreate(); dictAdd(server.pubsub_patterns_dict,pattern,clients); incrRefCount(pattern); } else { clients = dictGetVal(de); } listAddNodeTail(clients,c); // 将客户端c加入链表clients } /* Notify the client */ addReplyPubsubPatSubscribed(c,pattern); return retval; }
voidpunsubscribeCommand(client *c) { if (c->argc == 1) { pubsubUnsubscribeAllPatterns(c,1); } else { int j;
for (j = 1; j < c->argc; j++) pubsubUnsubscribePattern(c,c->argv[j],1); } if (clientSubscriptionsCount(c) == 0) c->flags &= ~CLIENT_PUBSUB; }
intpubsubUnsubscribePattern(client *c, robj *pattern, int notify) { dictEntry *de; list *clients; listNode *ln; pubsubPattern pat; int retval = 0;
incrRefCount(pattern); /* Protect the object. May be the same we remove */ if ((ln = listSearchKey(c->pubsub_patterns,pattern)) != NULL) { retval = 1; listDelNode(c->pubsub_patterns,ln); pat.client = c; pat.pattern = pattern; ln = listSearchKey(server.pubsub_patterns,&pat); listDelNode(server.pubsub_patterns,ln); // 在pubsub_patterns链表中查找这个客户端并删除。 /* Remove the client from the pattern -> clients list hash table */ de = dictFind(server.pubsub_patterns_dict,pattern); serverAssertWithInfo(c,NULL,de != NULL); clients = dictGetVal(de); ln = listSearchKey(clients,c); // 查找pubsub_patterns_dict字典,找到客户端链表,在链表中删除这个客户端节点 serverAssertWithInfo(c,NULL,ln != NULL); listDelNode(clients,ln); if (listLength(clients) == 0) { // 如果删除节点后的链表长度为0,把键pattern也从字典中删除。 /* Free the list and associated hash entry at all if this was * the latest client. */ dictDelete(server.pubsub_patterns_dict,pattern); } } /* Notify the client */ if (notify) addReplyPubsubPatUnsubscribed(c,pattern); decrRefCount(pattern); return retval; }
(gdb) bt #00x00007ffff7e5d5b0 in __libc_recv (fd=3, buf=0x7fffffffa200, len=16384, flags=flags@entry=0) at ../sysdeps/unix/sysv/linux/recv.c:28 #10x000055555558ec0b in recv(__flags=0, __n=<optimized out>, __buf=<optimized out>, __fd=<optimized out>) at /usr/include/x86_64-linux-gnu/bits/socket2.h:44 #2 redisNetRead(c=0x555555841300, buf=<optimized out>, bufcap=<optimized out>) at net.c:61 #3 0x0000555555587702 in redisBufferRead(c=0x555555841300) at hiredis.c:881 #4 0x0000555555587a92 in main(c=0x555555841300, reply=0x7fffffffe280) at hiredis.c:954 #5 0x000055555557fadd in cliReadReply(output_raw_strings=0) at redis-cli.c:1204 #6 0x0000555555581e0d in cliSendCommand(argc=2, argv=0x7ffff780a000, repeat=0) at redis-cli.c:1361 #7 0x0000555555582006 in issueCommandRepeat(argc=2, argv=0x7ffff780a000, repeat=1) at redis-cli.c:1858 #8 0x000055555556dd8d in issueCommand(argv=0x7ffff780a000, argc=<optimized out>) at redis-cli.c:2090 #9 noninteractive(argv=0x7ffff780a000, argc=<optimized out>) at redis-cli.c:2090 #10 main(argc=<optimized out>, argv=<optimized out>) at redis-cli.c:8251
(gdb) bt // 1、首先通过I/O多路复用器监听这个客户端套接字的可读事件,触发命令请求处理的回调函数readQueryFromClient #0 connSocketRead (conn=0x7fcbeae150c0, buf=0x7fcbeae7cfc5, buf_len=16384) at connection.c:182 #10x000056041195df03 in connRead(buf_len=<optimized out>, buf=<optimized out>, conn=<optimized out>) at connection.h:152 #2 readQueryFromClient(conn=0x7fcbeae150c0) at networking.c:2026 #3 0x00005604119e3e3c in callHandler(handler=<optimized out>, conn=0x7fcbeae150c0) at connhelpers.h:79 #4 connSocketEventHandler(el=<optimized out>, fd=<optimized out>, clientData=0x7fcbeae150c0, mask=<optimized out>) at connection.c:296 #5 0x0000560411942723 in aeProcessEvents(eventLoop=eventLoop@entry=0x7fcbeae0b480, flags=flags@entry=27) at ae.c:479 #6 0x0000560411942a5d in aeMain(eventLoop=0x7fcbeae0b480) at ae.c:539 #7 0x000056041193eed8 in main(argc=<optimized out>, argv=0x7ffc77472d58) at server.c:5498 (gdb) n int ret = read(conn->fd, buf, buf_len); // 2、调用read()读套接字, 发现返回值ret为0,从而感知到对端已关闭连接。 (gdb) n if(!ret) { (gdb) p ret $2 = 0
voidfreeClientAsync(client *c) { if (c->flags & CLIENT_CLOSE_ASAP || c->flags & CLIENT_LUA) return; c->flags |= CLIENT_CLOSE_ASAP; if (server.io_threads_num == 1) { /* no need to bother with locking if there's just one thread (the main thread) */ listAddNodeTail(server.clients_to_close,c); return; } staticpthread_mutex_t async_free_queue_mutex = PTHREAD_MUTEX_INITIALIZER; pthread_mutex_lock(&async_free_queue_mutex); listAddNodeTail(server.clients_to_close,c); pthread_mutex_unlock(&async_free_queue_mutex); }
(gdb) bt #0 dictGenericDelete (d=0x7fcbeae0d2e0, key=key@entry=0x7fcbeae0e360, nofree=nofree@entry=0) at dict.c:393 #10x0000560411944bdf in dictDelete(ht=<optimized out>, key=key@entry=0x7fcbeae0e360) at dict.c:406 #2 0x000056041198e2bf in pubsubUnsubscribeChannel(c=c@entry=0x7fcbeaf4d700, channel=0x7fcbeae0e360, notify=notify@entry=0) at pubsub.c:198 #3 0x000056041198e5bd in pubsubUnsubscribeAllChannels(c=c@entry=0x7fcbeaf4d700, notify=notify@entry=0) at pubsub.c:284 #4 0x0000560411957cf7 in freeClient(c=0x7fcbeaf4d700) at networking.c:1251 #5 0x00005604119584fd in freeClientsInAsyncFreeQueue() at networking.c:1345 #6 0x000056041194634b in beforeSleep(eventLoop=<optimized out>) at server.c:2204 #7 beforeSleep(eventLoop=<optimized out>) at server.c:2117 #8 0x00005604119425e8 in aeProcessEvents(eventLoop=eventLoop@entry=0x7fcbeae0b480, flags=flags@entry=27) at ae.c:443 #9 0x0000560411942a5d in aeMain(eventLoop=0x7fcbeae0b480) at ae.c:539 #10 0x000056041193eed8 in main(argc=<optimized out>, argv=0x7ffc77472d58) at server.c:5498
(gdb) bt #0 closeListeningSockets (unlink_unix_socket=1) at server.c:3809 #10x0000562ee4b31992 in prepareForShutdown(flags=<optimized out>, flags@entry=0) at server.c:3916 #2 0x0000562ee4b4bbdb in shutdownCommand(c=0x7ff3c594dd80) at db.c:1061 #3 0x0000562ee4b30701 in call(c=0x7ff3c594dd80, flags=15) at server.c:3368 #4 0x0000562ee4b311c6 in processCommand(c=c@entry=0x7ff3c594dd80) at server.c:3797 #5 0x0000562ee4b3fca4 in processCommandAndResetClient(c=c@entry=0x7ff3c594dd80) at networking.c:1895 #6 0x0000562ee4b448fa in processInputBuffer(c=0x7ff3c594dd80) at networking.c:1978 #7 0x0000562ee4bcae3c in callHandler(handler=<optimized out>, conn=0x7ff3c58150c0) at connhelpers.h:79 #8 connSocketEventHandler(el=<optimized out>, fd=<optimized out>, clientData=0x7ff3c58150c0, mask=<optimized out>) at connection.c:296 #9 0x0000562ee4b29723 in aeProcessEvents(eventLoop=eventLoop@entry=0x7ff3c580b480, flags=flags@entry=27) at ae.c:479 #10 0x0000562ee4b29a5d in aeMain(eventLoop=0x7ff3c580b480) at ae.c:539 #11 0x0000562ee4b25ed8 in main(argc=<optimized out>, argv=0x7ffe9a269bf8) at server.c:5498
(gdb) bt #0 0x00007ffff7e5d5b0 in __libc_recv (fd=3, buf=0x7fffffffa200, len=16384, flags=flags@entry=0) at ../sysdeps/unix/sysv/linux/recv.c:28 #1 0x000055555558ec0b in recv (__flags=0, __n=<optimized out>, __buf=<optimized out>, __fd=<optimized out>) at /usr/include/x86_64-linux-gnu/bits/socket2.h:44 #2 redisNetRead (c=0x555555841300, buf=<optimized out>, bufcap=<optimized out>) at net.c:61 #3 0x0000555555587702 in redisBufferRead (c=0x555555841300) at hiredis.c:881 #4 0x0000555555587a92 in redisGetReply (c=0x555555841300, reply=0x7fffffffe280) at hiredis.c:954 #5 0x000055555557fadd in cliReadReply (output_raw_strings=0) at redis-cli.c:1204 #6 0x0000555555581e0d in cliSendCommand (argc=2, argv=0x7ffff780a000, repeat=0) at redis-cli.c:1361 #7 0x0000555555582006 in issueCommandRepeat (argc=2, argv=0x7ffff780a000, repeat=1) at redis-cli.c:1858 #8 0x000055555556dd8d in issueCommand (argv=0x7ffff780a000, argc=<optimized out>) at redis-cli.c:2090 #9 noninteractive (argv=0x7ffff780a000, argc=<optimized out>) at redis-cli.c:2090 #10 main (argc=<optimized out>, argv=<optimized out>) at redis-cli.c:8251