diff --git a/src/blocked.c b/src/blocked.c index 83437dcf144..c8b5ea532a9 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -105,7 +105,7 @@ void freeClientBlockingState(client *c) { * and will be processed when the client is unblocked. */ void blockClient(client *c, int btype) { /* Replicated clients should never be blocked unless pause or module */ - serverAssert(!(isReplicatedClient(c) && btype != BLOCKED_MODULE && btype != BLOCKED_POSTPONE)); + serverAssert(!(isReplicatedClient(c) && btype != BLOCKED_MODULE && btype != BLOCKED_POSTPONE && btype != BLOCKED_ASYNC)); initClientBlockingState(c); @@ -228,6 +228,10 @@ void unblockClient(client *c, int queue_for_reprocessing) { c->bstate->postponed_list_node = NULL; } else if (c->bstate->btype == BLOCKED_SHUTDOWN) { /* No special cleanup. */ + } else if (c->bstate->btype == BLOCKED_ASYNC) { + /* The async handle is owned by the caller. When the operation + * completes, the callback will look up the client by ID and + * find it gone. No cleanup needed here. */ } else { serverPanic("Unknown btype in unblockClient()."); } @@ -292,6 +296,11 @@ void replyToBlockedClientTimedOut(client *c) { } } else if (c->bstate->btype == BLOCKED_MODULE) { moduleBlockedClientTimedOut(c, 0); + } else if (c->bstate->btype == BLOCKED_ASYNC) { + /* Timeout: the operation didn't complete in time. The handle is + * still out there but lookupClientByID will return NULL after + * the client is freed. */ + addReplyError(c, "Operation timed out"); } else { serverPanic("Unknown btype in replyToBlockedClientTimedOut()."); } @@ -693,6 +702,55 @@ void blockClientShutdown(client *c) { blockClient(c, BLOCKED_SHUTDOWN); } +/* -------------------------------------------------------------------------- + * BLOCKED_ASYNC: async operation handle + * + * An async handle wraps a client ID so that a completion callback can + * safely look up the client even if it disconnected while waiting. + * -------------------------------------------------------------------------- */ + +typedef struct blockedAsyncHandle { + uint64_t client_id; +} blockedAsyncHandle; + +blockedAsyncHandle *blockClientAsync(client *c) { + blockedAsyncHandle *h = zmalloc(sizeof(*h)); + h->client_id = c->id; + blockClient(c, BLOCKED_ASYNC); + return h; +} + +client *consumeBlockedClientAsyncHandle(blockedAsyncHandle *handle) { + client *c = lookupClientByID(handle->client_id); + zfree(handle); + return c; +} + +/* Complete a BLOCKED_ASYNC operation. The caller must have already added + * the reply (addReply / addReplyError) before calling this. + * + * Synchronous path (inside call()): clears blocked state immediately so + * call() does normal post-processing. + * + * Async path (timer, read handler, etc.): defers the actual unblock to + * blockedBeforeSleep, matching the pattern used by all other block types. */ +void unblockClientAsync(client *c) { + serverAssert(c->flag.blocked && c->bstate->btype == BLOCKED_ASYNC); + + if (c->flag.executing_command && server.current_client == c) { + /* Synchronous completion — still inside call(). Just clear the + * blocked state so call() does normal post-processing. */ + c->flag.blocked = 0; + c->bstate->btype = BLOCKED_NONE; + removeClientFromTimeoutTable(c); + server.blocked_clients--; + server.blocked_clients_by_type[BLOCKED_ASYNC]--; + } else { + /* Async completion — defer to blockedBeforeSleep. */ + listAddNodeTail(server.clients_pending_async_unblock, c); + } +} + /* Unblock a client once a specific key became available for it. * This function will remove the client from the list of clients blocked on this key * and also remove the key from the dictionary of keys this client is blocked on. @@ -807,6 +865,20 @@ void blockedBeforeSleep(void) { * blocking commands. */ if (moduleCount()) moduleHandleBlockedClients(); + /* Unblock clients pending async completion (e.g. Raft commits). */ + while (listLength(server.clients_pending_async_unblock)) { + listNode *ln = listFirst(server.clients_pending_async_unblock); + client *c = ln->value; + listDelNode(server.clients_pending_async_unblock, ln); + commitDeferredReplyBuffer(c, 0); + updateStatsOnUnblock(c, 0, 0, 0); + unblockClient(c, 1); + if (clientHasPendingReplies(c) && !c->flag.pending_write && c->conn) { + c->flag.pending_write = 1; + listLinkNodeHead(server.clients_pending_write, &c->clients_pending_write_node); + } + } + /* Try to process pending commands for clients that were just unblocked. */ if (listLength(server.unblocked_clients)) processUnblockedClients(); } diff --git a/src/cluster.c b/src/cluster.c index 351372a2bbc..a6521a4a4e5 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1210,20 +1210,24 @@ void clusterKeySlotCommand(client *c) { * Called after the slot change is applied. This may be called inline * or asynchronously if the change goes through cluster consensus. */ static void clusterAddDelSlotsCallback(void *ctx, const char *error) { - client *c = (client *)ctx; + client *c = consumeBlockedClientAsyncHandle((blockedAsyncHandle *)ctx); + if (!c) return; if (error) { addReplyError(c, error); - return; + } else { + addReply(c, shared.ok); } - addReply(c, shared.ok); + unblockClientAsync(c); } /* Callback for CLUSTER SETSLOT NODE after the slot change is applied. * Handles replica migration if this shard lost its last slot. */ static void clusterSetSlotNodeCallback(void *ctx, const char *error) { - client *c = (client *)ctx; + client *c = consumeBlockedClientAsyncHandle((blockedAsyncHandle *)ctx); + if (!c) return; if (error) { addReplyError(c, error); + unblockClientAsync(c); return; } @@ -1254,6 +1258,7 @@ static void clusterSetSlotNodeCallback(void *ctx, const char *error) { } addReply(c, shared.ok); + unblockClientAsync(c); } /* Validate slot assignments for ADDSLOTS/DELSLOTS commands. @@ -1510,7 +1515,8 @@ void clusterCommandSetSlot(client *c) { } slotRange range = {slot, slot}; - clusterSlotChange(&range, 1, n, c, clusterSetSlotNodeCallback); + void *h = blockClientAsync(c); + clusterSlotChange(&range, 1, n, h, clusterSetSlotNodeCallback); return; } @@ -1523,36 +1529,44 @@ void clusterCommandSetSlot(client *c) { * post-action work is done and an OK reply is sent. */ static void clusterCommandMeetCompletion(void *ctx, const char *error) { - client *c = ctx; + client *c = consumeBlockedClientAsyncHandle((blockedAsyncHandle *)ctx); + if (!c) return; if (error) { addReplyError(c, error); } else { addReply(c, shared.ok); } + unblockClientAsync(c); } static void clusterCommandForgetCompletion(void *ctx, const char *error) { - client *c = ctx; + client *c = consumeBlockedClientAsyncHandle((blockedAsyncHandle *)ctx); + if (!c) return; if (error) { addReplyError(c, error); } else { addReply(c, shared.ok); } + unblockClientAsync(c); } static void clusterCommandReplicateCompletion(void *ctx, const char *error) { - client *c = ctx; + client *c = consumeBlockedClientAsyncHandle((blockedAsyncHandle *)ctx); + if (!c) return; if (error) { addReplyError(c, error); } else { addReply(c, shared.ok); } + unblockClientAsync(c); } static void clusterCommandPromoteCompletion(void *ctx, const char *error) { - client *c = ctx; + client *c = consumeBlockedClientAsyncHandle((blockedAsyncHandle *)ctx); + if (!c) return; if (error) { addReplyError(c, error); + unblockClientAsync(c); return; } flushAllDataAndResetRDB(server.repl_replica_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS); @@ -1560,15 +1574,18 @@ static void clusterCommandPromoteCompletion(void *ctx, const char *error) { clusterCloseAllSlots(); clusterCancelManualFailover(); addReply(c, shared.ok); + unblockClientAsync(c); } static void clusterCommandFailoverCompletion(void *ctx, const char *error) { - client *c = ctx; + client *c = consumeBlockedClientAsyncHandle((blockedAsyncHandle *)ctx); + if (!c) return; if (error) { addReplyError(c, error); } else { addReply(c, shared.ok); } + unblockClientAsync(c); } void clusterCommand(client *c) { @@ -1684,14 +1701,11 @@ void clusterCommand(client *c) { return; } /* Propose the slot change. The reply is deferred to the callback, - * which is called after the change is applied. - * TODO: If the callback is not called immediately (e.g. when the - * change goes through cluster consensus), the client needs to be - * put in a blocked state so the server can continue processing - * other clients while waiting for the commit. */ + * which is called after the change is applied. */ + void *h = blockClientAsync(c); clusterSlotChange(ranges, numslots, del ? NULL : getMyClusterNode(), - c, clusterAddDelSlotsCallback); + h, clusterAddDelSlotsCallback); zfree(ranges); } else if ((!strcasecmp(objectGetVal(c->argv[1]), "addslotsrange") || !strcasecmp(objectGetVal(c->argv[1]), "delslotsrange")) && c->argc >= 4) { @@ -1735,9 +1749,10 @@ void clusterCommand(client *c) { * change goes through cluster consensus), the client needs to be * put in a blocked state so the server can continue processing * other clients while waiting for the commit. */ + void *h = blockClientAsync(c); clusterSlotChange(ranges, numranges, del ? NULL : getMyClusterNode(), - c, clusterAddDelSlotsCallback); + h, clusterAddDelSlotsCallback); zfree(ranges); } else if (!strcasecmp(objectGetVal(c->argv[1]), "flushslots") && c->argc == 2) { /* CLUSTER FLUSHSLOTS */ @@ -1765,8 +1780,9 @@ void clusterCommand(client *c) { } } if (in_range) numranges++; + void *h = blockClientAsync(c); clusterSlotChange(ranges, numranges, NULL, - c, clusterAddDelSlotsCallback); + h, clusterAddDelSlotsCallback); zfree(ranges); } } else if (!strcasecmp(objectGetVal(c->argv[1]), "setslot") && c->argc >= 4) { @@ -1827,7 +1843,8 @@ void clusterCommand(client *c) { serverLog(LL_NOTICE, "Cluster meet %s:%lld (user request from '%s').", (char *)objectGetVal(c->argv[2]), port, cl); sdsfree(cl); - clusterCurrentBus->meet(objectGetVal(c->argv[2]), port, cport, c, clusterCommandMeetCompletion); + void *h = blockClientAsync(c); + clusterCurrentBus->meet(objectGetVal(c->argv[2]), port, cport, h, clusterCommandMeetCompletion); } else if (!strcasecmp(objectGetVal(c->argv[1]), "reset") && (c->argc == 2 || c->argc == 3)) { /* CLUSTER RESET [SOFT|HARD] */ int hard = 0; @@ -1909,7 +1926,8 @@ void clusterCommand(client *c) { serverLog(LL_NOTICE, "Manual failover user request accepted (user request from '%s').", cl); } sdsfree(cl); - clusterCurrentBus->failover(force, takeover, c, clusterCommandFailoverCompletion); + void *h = blockClientAsync(c); + clusterCurrentBus->failover(force, takeover, h, clusterCommandFailoverCompletion); } else if (!strcasecmp(objectGetVal(c->argv[1]), "replicate") && (c->argc == 3 || c->argc == 4)) { /* CLUSTER REPLICATE ( | NO ONE) */ @@ -1932,7 +1950,8 @@ void clusterCommand(client *c) { "primary (request from '%s').", cl); sdsfree(cl); - clusterCurrentBus->setReplicaOf(NULL, c, clusterCommandPromoteCompletion); + void *h = blockClientAsync(c); + clusterCurrentBus->setReplicaOf(NULL, h, clusterCommandPromoteCompletion); return; } /* CLUSTER REPLICATE */ @@ -1961,7 +1980,8 @@ void clusterCommand(client *c) { addReply(c, shared.ok); return; } - clusterCurrentBus->setReplicaOf(n, c, clusterCommandReplicateCompletion); + void *h = blockClientAsync(c); + clusterCurrentBus->setReplicaOf(n, h, clusterCommandReplicateCompletion); } else if (!strcasecmp(objectGetVal(c->argv[1]), "forget") && c->argc == 3) { /* CLUSTER FORGET */ const char *node_id = objectGetVal(c->argv[2]); @@ -1983,7 +2003,8 @@ void clusterCommand(client *c) { node_id, cl); sdsfree(cl); } - clusterCurrentBus->forgetNode(node_id, id_len, c, clusterCommandForgetCompletion); + void *h = blockClientAsync(c); + clusterCurrentBus->forgetNode(node_id, id_len, h, clusterCommandForgetCompletion); } else if (!strcasecmp(objectGetVal(c->argv[1]), "count-failure-reports") && c->argc == 3) { /* CLUSTER COUNT-FAILURE-REPORTS */ clusterNode *n = clusterLookupNode(objectGetVal(c->argv[2]), diff --git a/src/debug.c b/src/debug.c index a494017faa5..c86c609b5c8 100644 --- a/src/debug.c +++ b/src/debug.c @@ -396,6 +396,31 @@ void mallctl_string(client *c, robj **argv, int argc) { } #endif +typedef struct { + blockedAsyncHandle *handle; +} debugBlockAsyncCtx; + +/* Timer event proc for DEBUG BLOCK-ASYNC. Fires after the requested delay, + * sends the reply, and unblocks the client. */ +static long long debugBlockAsyncTimeProc(struct aeEventLoop *el, long long id, void *clientData) { + UNUSED(el); + UNUSED(id); + debugBlockAsyncCtx *ctx = clientData; + client *c = consumeBlockedClientAsyncHandle(ctx->handle); + ctx->handle = NULL; + if (c) { + addReply(c, shared.ok); + unblockClientAsync(c); + } + return AE_NOMORE; +} + +/* Timer event finalizer for DEBUG BLOCK-ASYNC. Frees the context. */ +static void debugBlockAsyncFinalize(struct aeEventLoop *el, void *clientData) { + UNUSED(el); + zfree(clientData); +} + void debugCommand(client *c) { if (c->argc == 2 && !strcasecmp(objectGetVal(c->argv[1]), "help")) { const char *help[] = { @@ -490,8 +515,10 @@ void debugCommand(client *c) { " Default value is 1GB, allows values up to 4GB. Setting to 0 restores to default.", "SET-SKIP-CHECKSUM-VALIDATION <0|1>", " Enables or disables checksum checks for RDB files and RESTORE's payload.", - "SLEEP ", + "SLEEP [ASYNC]", " Stop the server for . Decimals allowed.", + " With ASYNC, block the client and unblock via a timer event.", + " SLEEP 0 ASYNC unblocks synchronously (inside call()).", "STRINGMATCH-TEST", " Run a fuzz tester against the stringmatchlen() function.", "STRUCTSIZE", @@ -889,6 +916,25 @@ void debugCommand(client *c) { tv.tv_nsec = (utime % 1000000) * 1000; nanosleep(&tv, NULL); addReply(c, shared.ok); + } else if (!strcasecmp(objectGetVal(c->argv[1]), "sleep") && c->argc == 4 && + !strcasecmp(objectGetVal(c->argv[3]), "async")) { + /* Test the BLOCKED_ASYNC infrastructure: block the client and + * unblock it after a timer fires (or immediately if 0). */ + double dtime = valkey_strtod_sds(objectGetVal(c->argv[2]), NULL); + long long ms = (long long)(dtime * 1000); + if (ms == 0) { + blockedAsyncHandle *h = blockClientAsync(c); + client *bc = consumeBlockedClientAsyncHandle(h); + if (bc) { + addReply(bc, shared.ok); + unblockClientAsync(bc); + } + } else { + blockedAsyncHandle *h = blockClientAsync(c); + debugBlockAsyncCtx *ctx = zmalloc(sizeof(*ctx)); + ctx->handle = h; + aeCreateTimeEvent(server.el, ms, debugBlockAsyncTimeProc, ctx, debugBlockAsyncFinalize); + } } else if (!strcasecmp(objectGetVal(c->argv[1]), "set-active-expire") && c->argc == 3) { server.active_expire_enabled = atoi(objectGetVal(c->argv[2])); addReply(c, shared.ok); diff --git a/src/server.c b/src/server.c index 737e5175bab..9fbcf51f62b 100644 --- a/src/server.c +++ b/src/server.c @@ -2950,6 +2950,7 @@ void initServer(void) { server.tracking_pending_keys = listCreate(); server.pending_push_messages = listCreate(); server.clients_waiting_acks = listCreate(); + server.clients_pending_async_unblock = listCreate(); server.get_ack_from_replicas = 0; server.paused_actions = 0; memset(server.client_pause_per_purpose, 0, sizeof(server.client_pause_per_purpose)); diff --git a/src/server.h b/src/server.h index 5ce54dfa59f..973717391c0 100644 --- a/src/server.h +++ b/src/server.h @@ -344,6 +344,7 @@ typedef enum blocking_type { BLOCKED_ZSET, /* BZPOP et al. */ BLOCKED_POSTPONE, /* Blocked by processCommand, re-try processing later. */ BLOCKED_SHUTDOWN, /* SHUTDOWN. */ + BLOCKED_ASYNC, /* Waiting for async completion callback (e.g. Raft commit). */ BLOCKED_NUM, /* Number of blocked states. */ BLOCKED_END /* End of enumeration */ } blocking_type; @@ -2192,8 +2193,9 @@ struct valkeyServer { /* Import Mode */ int import_mode; /* If true, server is in import mode and forbid expiration and eviction. */ /* Synchronous replication. */ - list *clients_waiting_acks; /* Clients waiting in WAIT or WAITAOF. */ - int get_ack_from_replicas; /* If true we send REPLCONF GETACK. */ + list *clients_waiting_acks; /* Clients waiting in WAIT or WAITAOF. */ + list *clients_pending_async_unblock; /* BLOCKED_ASYNC clients to unblock in beforeSleep. */ + int get_ack_from_replicas; /* If true we send REPLCONF GETACK. */ /* Limits */ unsigned int maxclients; /* Max number of simultaneous clients */ unsigned long long maxmemory; /* Max number of memory bytes to use */ @@ -3850,6 +3852,10 @@ void handleClientsBlockedOnKeys(void); void signalKeyAsReady(serverDb *db, robj *key, int type); void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, int unblock_on_nokey); void blockClientShutdown(client *c); +typedef struct blockedAsyncHandle blockedAsyncHandle; +blockedAsyncHandle *blockClientAsync(client *c); +client *consumeBlockedClientAsyncHandle(blockedAsyncHandle *handle); +void unblockClientAsync(client *c); void blockPostponeClient(client *c); void blockClientForReplicaAck(client *c, mstime_t timeout, long long offset, long numreplicas, int numlocal); void replicationRequestAckFromReplicas(void); diff --git a/tests/unit/blocked.tcl b/tests/unit/blocked.tcl new file mode 100644 index 00000000000..0e90a6385a1 --- /dev/null +++ b/tests/unit/blocked.tcl @@ -0,0 +1,34 @@ +tags {external:skip} { + +test "DEBUG SLEEP ASYNC: basic unblock" { + start_server {} { + set r [srv 0 client] + set result [$r DEBUG SLEEP 0.1 ASYNC] + assert_equal OK $result + set result [$r PING] + assert_equal PONG $result + } +} + +test "DEBUG SLEEP ASYNC: multiple commands after unblock" { + start_server {} { + set r [srv 0 client] + $r DEBUG SLEEP 0.05 ASYNC + $r SET foo bar + assert_equal bar [$r GET foo] + $r DEBUG SLEEP 0.05 ASYNC + assert_equal bar [$r GET foo] + } +} + +test "DEBUG SLEEP 0 ASYNC: synchronous unblock" { + start_server {} { + set r [srv 0 client] + set result [$r DEBUG SLEEP 0 ASYNC] + assert_equal OK $result + set result [$r PING] + assert_equal PONG $result + } +} + +} ;# tags