From 614c8a034e1a63b858fed71985cf207bc86b0392 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Tue, 14 Apr 2026 23:12:45 +0200 Subject: [PATCH 1/2] Add BLOCKED_ASYNC blocking type and async handle API MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a new blocking type for async operations (e.g. Raft commit). When a client is blocked with BLOCKED_ASYNC, the operation handle is owned by the caller. On disconnect or timeout, no special cleanup is needed — the handle will be consumed later and the client lookup will return NULL. Add blockedAsyncCreate/blockedAsyncConsume API in blocked.c. An async handle (opaque blockedAsyncHandle struct) wraps a client ID so that a completion callback can safely look up the client even if it disconnected while waiting. Signed-off-by: Viktor Söderqvist --- src/blocked.c | 74 +++++++++++++++++++++++++++++++++++++++++- src/debug.c | 48 ++++++++++++++++++++++++++- src/server.c | 1 + src/server.h | 10 ++++-- tests/unit/blocked.tcl | 34 +++++++++++++++++++ 5 files changed, 163 insertions(+), 4 deletions(-) create mode 100644 tests/unit/blocked.tcl diff --git a/src/blocked.c b/src/blocked.c index 83437dcf144..c8b5ea532a9 100644 --- a/src/blocked.c +++ b/src/blocked.c @@ -105,7 +105,7 @@ void freeClientBlockingState(client *c) { * and will be processed when the client is unblocked. */ void blockClient(client *c, int btype) { /* Replicated clients should never be blocked unless pause or module */ - serverAssert(!(isReplicatedClient(c) && btype != BLOCKED_MODULE && btype != BLOCKED_POSTPONE)); + serverAssert(!(isReplicatedClient(c) && btype != BLOCKED_MODULE && btype != BLOCKED_POSTPONE && btype != BLOCKED_ASYNC)); initClientBlockingState(c); @@ -228,6 +228,10 @@ void unblockClient(client *c, int queue_for_reprocessing) { c->bstate->postponed_list_node = NULL; } else if (c->bstate->btype == BLOCKED_SHUTDOWN) { /* No special cleanup. */ + } else if (c->bstate->btype == BLOCKED_ASYNC) { + /* The async handle is owned by the caller. When the operation + * completes, the callback will look up the client by ID and + * find it gone. No cleanup needed here. */ } else { serverPanic("Unknown btype in unblockClient()."); } @@ -292,6 +296,11 @@ void replyToBlockedClientTimedOut(client *c) { } } else if (c->bstate->btype == BLOCKED_MODULE) { moduleBlockedClientTimedOut(c, 0); + } else if (c->bstate->btype == BLOCKED_ASYNC) { + /* Timeout: the operation didn't complete in time. The handle is + * still out there but lookupClientByID will return NULL after + * the client is freed. */ + addReplyError(c, "Operation timed out"); } else { serverPanic("Unknown btype in replyToBlockedClientTimedOut()."); } @@ -693,6 +702,55 @@ void blockClientShutdown(client *c) { blockClient(c, BLOCKED_SHUTDOWN); } +/* -------------------------------------------------------------------------- + * BLOCKED_ASYNC: async operation handle + * + * An async handle wraps a client ID so that a completion callback can + * safely look up the client even if it disconnected while waiting. + * -------------------------------------------------------------------------- */ + +typedef struct blockedAsyncHandle { + uint64_t client_id; +} blockedAsyncHandle; + +blockedAsyncHandle *blockClientAsync(client *c) { + blockedAsyncHandle *h = zmalloc(sizeof(*h)); + h->client_id = c->id; + blockClient(c, BLOCKED_ASYNC); + return h; +} + +client *consumeBlockedClientAsyncHandle(blockedAsyncHandle *handle) { + client *c = lookupClientByID(handle->client_id); + zfree(handle); + return c; +} + +/* Complete a BLOCKED_ASYNC operation. The caller must have already added + * the reply (addReply / addReplyError) before calling this. + * + * Synchronous path (inside call()): clears blocked state immediately so + * call() does normal post-processing. + * + * Async path (timer, read handler, etc.): defers the actual unblock to + * blockedBeforeSleep, matching the pattern used by all other block types. */ +void unblockClientAsync(client *c) { + serverAssert(c->flag.blocked && c->bstate->btype == BLOCKED_ASYNC); + + if (c->flag.executing_command && server.current_client == c) { + /* Synchronous completion — still inside call(). Just clear the + * blocked state so call() does normal post-processing. */ + c->flag.blocked = 0; + c->bstate->btype = BLOCKED_NONE; + removeClientFromTimeoutTable(c); + server.blocked_clients--; + server.blocked_clients_by_type[BLOCKED_ASYNC]--; + } else { + /* Async completion — defer to blockedBeforeSleep. */ + listAddNodeTail(server.clients_pending_async_unblock, c); + } +} + /* Unblock a client once a specific key became available for it. * This function will remove the client from the list of clients blocked on this key * and also remove the key from the dictionary of keys this client is blocked on. @@ -807,6 +865,20 @@ void blockedBeforeSleep(void) { * blocking commands. */ if (moduleCount()) moduleHandleBlockedClients(); + /* Unblock clients pending async completion (e.g. Raft commits). */ + while (listLength(server.clients_pending_async_unblock)) { + listNode *ln = listFirst(server.clients_pending_async_unblock); + client *c = ln->value; + listDelNode(server.clients_pending_async_unblock, ln); + commitDeferredReplyBuffer(c, 0); + updateStatsOnUnblock(c, 0, 0, 0); + unblockClient(c, 1); + if (clientHasPendingReplies(c) && !c->flag.pending_write && c->conn) { + c->flag.pending_write = 1; + listLinkNodeHead(server.clients_pending_write, &c->clients_pending_write_node); + } + } + /* Try to process pending commands for clients that were just unblocked. */ if (listLength(server.unblocked_clients)) processUnblockedClients(); } diff --git a/src/debug.c b/src/debug.c index a494017faa5..c86c609b5c8 100644 --- a/src/debug.c +++ b/src/debug.c @@ -396,6 +396,31 @@ void mallctl_string(client *c, robj **argv, int argc) { } #endif +typedef struct { + blockedAsyncHandle *handle; +} debugBlockAsyncCtx; + +/* Timer event proc for DEBUG BLOCK-ASYNC. Fires after the requested delay, + * sends the reply, and unblocks the client. */ +static long long debugBlockAsyncTimeProc(struct aeEventLoop *el, long long id, void *clientData) { + UNUSED(el); + UNUSED(id); + debugBlockAsyncCtx *ctx = clientData; + client *c = consumeBlockedClientAsyncHandle(ctx->handle); + ctx->handle = NULL; + if (c) { + addReply(c, shared.ok); + unblockClientAsync(c); + } + return AE_NOMORE; +} + +/* Timer event finalizer for DEBUG BLOCK-ASYNC. Frees the context. */ +static void debugBlockAsyncFinalize(struct aeEventLoop *el, void *clientData) { + UNUSED(el); + zfree(clientData); +} + void debugCommand(client *c) { if (c->argc == 2 && !strcasecmp(objectGetVal(c->argv[1]), "help")) { const char *help[] = { @@ -490,8 +515,10 @@ void debugCommand(client *c) { " Default value is 1GB, allows values up to 4GB. Setting to 0 restores to default.", "SET-SKIP-CHECKSUM-VALIDATION <0|1>", " Enables or disables checksum checks for RDB files and RESTORE's payload.", - "SLEEP ", + "SLEEP [ASYNC]", " Stop the server for . Decimals allowed.", + " With ASYNC, block the client and unblock via a timer event.", + " SLEEP 0 ASYNC unblocks synchronously (inside call()).", "STRINGMATCH-TEST", " Run a fuzz tester against the stringmatchlen() function.", "STRUCTSIZE", @@ -889,6 +916,25 @@ void debugCommand(client *c) { tv.tv_nsec = (utime % 1000000) * 1000; nanosleep(&tv, NULL); addReply(c, shared.ok); + } else if (!strcasecmp(objectGetVal(c->argv[1]), "sleep") && c->argc == 4 && + !strcasecmp(objectGetVal(c->argv[3]), "async")) { + /* Test the BLOCKED_ASYNC infrastructure: block the client and + * unblock it after a timer fires (or immediately if 0). */ + double dtime = valkey_strtod_sds(objectGetVal(c->argv[2]), NULL); + long long ms = (long long)(dtime * 1000); + if (ms == 0) { + blockedAsyncHandle *h = blockClientAsync(c); + client *bc = consumeBlockedClientAsyncHandle(h); + if (bc) { + addReply(bc, shared.ok); + unblockClientAsync(bc); + } + } else { + blockedAsyncHandle *h = blockClientAsync(c); + debugBlockAsyncCtx *ctx = zmalloc(sizeof(*ctx)); + ctx->handle = h; + aeCreateTimeEvent(server.el, ms, debugBlockAsyncTimeProc, ctx, debugBlockAsyncFinalize); + } } else if (!strcasecmp(objectGetVal(c->argv[1]), "set-active-expire") && c->argc == 3) { server.active_expire_enabled = atoi(objectGetVal(c->argv[2])); addReply(c, shared.ok); diff --git a/src/server.c b/src/server.c index 737e5175bab..9fbcf51f62b 100644 --- a/src/server.c +++ b/src/server.c @@ -2950,6 +2950,7 @@ void initServer(void) { server.tracking_pending_keys = listCreate(); server.pending_push_messages = listCreate(); server.clients_waiting_acks = listCreate(); + server.clients_pending_async_unblock = listCreate(); server.get_ack_from_replicas = 0; server.paused_actions = 0; memset(server.client_pause_per_purpose, 0, sizeof(server.client_pause_per_purpose)); diff --git a/src/server.h b/src/server.h index 5ce54dfa59f..973717391c0 100644 --- a/src/server.h +++ b/src/server.h @@ -344,6 +344,7 @@ typedef enum blocking_type { BLOCKED_ZSET, /* BZPOP et al. */ BLOCKED_POSTPONE, /* Blocked by processCommand, re-try processing later. */ BLOCKED_SHUTDOWN, /* SHUTDOWN. */ + BLOCKED_ASYNC, /* Waiting for async completion callback (e.g. Raft commit). */ BLOCKED_NUM, /* Number of blocked states. */ BLOCKED_END /* End of enumeration */ } blocking_type; @@ -2192,8 +2193,9 @@ struct valkeyServer { /* Import Mode */ int import_mode; /* If true, server is in import mode and forbid expiration and eviction. */ /* Synchronous replication. */ - list *clients_waiting_acks; /* Clients waiting in WAIT or WAITAOF. */ - int get_ack_from_replicas; /* If true we send REPLCONF GETACK. */ + list *clients_waiting_acks; /* Clients waiting in WAIT or WAITAOF. */ + list *clients_pending_async_unblock; /* BLOCKED_ASYNC clients to unblock in beforeSleep. */ + int get_ack_from_replicas; /* If true we send REPLCONF GETACK. */ /* Limits */ unsigned int maxclients; /* Max number of simultaneous clients */ unsigned long long maxmemory; /* Max number of memory bytes to use */ @@ -3850,6 +3852,10 @@ void handleClientsBlockedOnKeys(void); void signalKeyAsReady(serverDb *db, robj *key, int type); void blockForKeys(client *c, int btype, robj **keys, int numkeys, mstime_t timeout, int unblock_on_nokey); void blockClientShutdown(client *c); +typedef struct blockedAsyncHandle blockedAsyncHandle; +blockedAsyncHandle *blockClientAsync(client *c); +client *consumeBlockedClientAsyncHandle(blockedAsyncHandle *handle); +void unblockClientAsync(client *c); void blockPostponeClient(client *c); void blockClientForReplicaAck(client *c, mstime_t timeout, long long offset, long numreplicas, int numlocal); void replicationRequestAckFromReplicas(void); diff --git a/tests/unit/blocked.tcl b/tests/unit/blocked.tcl new file mode 100644 index 00000000000..0e90a6385a1 --- /dev/null +++ b/tests/unit/blocked.tcl @@ -0,0 +1,34 @@ +tags {external:skip} { + +test "DEBUG SLEEP ASYNC: basic unblock" { + start_server {} { + set r [srv 0 client] + set result [$r DEBUG SLEEP 0.1 ASYNC] + assert_equal OK $result + set result [$r PING] + assert_equal PONG $result + } +} + +test "DEBUG SLEEP ASYNC: multiple commands after unblock" { + start_server {} { + set r [srv 0 client] + $r DEBUG SLEEP 0.05 ASYNC + $r SET foo bar + assert_equal bar [$r GET foo] + $r DEBUG SLEEP 0.05 ASYNC + assert_equal bar [$r GET foo] + } +} + +test "DEBUG SLEEP 0 ASYNC: synchronous unblock" { + start_server {} { + set r [srv 0 client] + set result [$r DEBUG SLEEP 0 ASYNC] + assert_equal OK $result + set result [$r PING] + assert_equal PONG $result + } +} + +} ;# tags From ba7564127c5a9b1015a53707ca4fb046e2d41225 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Viktor=20S=C3=B6derqvist?= Date: Sun, 19 Apr 2026 19:14:36 +0200 Subject: [PATCH 2/2] Use blockClientAsync for async cluster command dispatchers MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Block the client using blockClientAsync before calling vtable callbacks that may complete asynchronously: slotChange (ADDSLOTS, DELSLOTS, ADDSLOTSRANGE, DELSLOTSRANGE, FLUSHSLOTS, SETSLOT NODE), meet, setReplicaOf, forgetNode, and failover. The completion callbacks use consumeBlockedClientAsyncHandle and unblockClientAsync. This allows protocol implementations (e.g. Raft) to defer the reply until the operation is committed, while the legacy gossip implementation completes synchronously inside call(). Signed-off-by: Viktor Söderqvist --- src/cluster.c | 67 +++++++++++++++++++++++++++++++++------------------ 1 file changed, 44 insertions(+), 23 deletions(-) diff --git a/src/cluster.c b/src/cluster.c index 351372a2bbc..a6521a4a4e5 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -1210,20 +1210,24 @@ void clusterKeySlotCommand(client *c) { * Called after the slot change is applied. This may be called inline * or asynchronously if the change goes through cluster consensus. */ static void clusterAddDelSlotsCallback(void *ctx, const char *error) { - client *c = (client *)ctx; + client *c = consumeBlockedClientAsyncHandle((blockedAsyncHandle *)ctx); + if (!c) return; if (error) { addReplyError(c, error); - return; + } else { + addReply(c, shared.ok); } - addReply(c, shared.ok); + unblockClientAsync(c); } /* Callback for CLUSTER SETSLOT NODE after the slot change is applied. * Handles replica migration if this shard lost its last slot. */ static void clusterSetSlotNodeCallback(void *ctx, const char *error) { - client *c = (client *)ctx; + client *c = consumeBlockedClientAsyncHandle((blockedAsyncHandle *)ctx); + if (!c) return; if (error) { addReplyError(c, error); + unblockClientAsync(c); return; } @@ -1254,6 +1258,7 @@ static void clusterSetSlotNodeCallback(void *ctx, const char *error) { } addReply(c, shared.ok); + unblockClientAsync(c); } /* Validate slot assignments for ADDSLOTS/DELSLOTS commands. @@ -1510,7 +1515,8 @@ void clusterCommandSetSlot(client *c) { } slotRange range = {slot, slot}; - clusterSlotChange(&range, 1, n, c, clusterSetSlotNodeCallback); + void *h = blockClientAsync(c); + clusterSlotChange(&range, 1, n, h, clusterSetSlotNodeCallback); return; } @@ -1523,36 +1529,44 @@ void clusterCommandSetSlot(client *c) { * post-action work is done and an OK reply is sent. */ static void clusterCommandMeetCompletion(void *ctx, const char *error) { - client *c = ctx; + client *c = consumeBlockedClientAsyncHandle((blockedAsyncHandle *)ctx); + if (!c) return; if (error) { addReplyError(c, error); } else { addReply(c, shared.ok); } + unblockClientAsync(c); } static void clusterCommandForgetCompletion(void *ctx, const char *error) { - client *c = ctx; + client *c = consumeBlockedClientAsyncHandle((blockedAsyncHandle *)ctx); + if (!c) return; if (error) { addReplyError(c, error); } else { addReply(c, shared.ok); } + unblockClientAsync(c); } static void clusterCommandReplicateCompletion(void *ctx, const char *error) { - client *c = ctx; + client *c = consumeBlockedClientAsyncHandle((blockedAsyncHandle *)ctx); + if (!c) return; if (error) { addReplyError(c, error); } else { addReply(c, shared.ok); } + unblockClientAsync(c); } static void clusterCommandPromoteCompletion(void *ctx, const char *error) { - client *c = ctx; + client *c = consumeBlockedClientAsyncHandle((blockedAsyncHandle *)ctx); + if (!c) return; if (error) { addReplyError(c, error); + unblockClientAsync(c); return; } flushAllDataAndResetRDB(server.repl_replica_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS); @@ -1560,15 +1574,18 @@ static void clusterCommandPromoteCompletion(void *ctx, const char *error) { clusterCloseAllSlots(); clusterCancelManualFailover(); addReply(c, shared.ok); + unblockClientAsync(c); } static void clusterCommandFailoverCompletion(void *ctx, const char *error) { - client *c = ctx; + client *c = consumeBlockedClientAsyncHandle((blockedAsyncHandle *)ctx); + if (!c) return; if (error) { addReplyError(c, error); } else { addReply(c, shared.ok); } + unblockClientAsync(c); } void clusterCommand(client *c) { @@ -1684,14 +1701,11 @@ void clusterCommand(client *c) { return; } /* Propose the slot change. The reply is deferred to the callback, - * which is called after the change is applied. - * TODO: If the callback is not called immediately (e.g. when the - * change goes through cluster consensus), the client needs to be - * put in a blocked state so the server can continue processing - * other clients while waiting for the commit. */ + * which is called after the change is applied. */ + void *h = blockClientAsync(c); clusterSlotChange(ranges, numslots, del ? NULL : getMyClusterNode(), - c, clusterAddDelSlotsCallback); + h, clusterAddDelSlotsCallback); zfree(ranges); } else if ((!strcasecmp(objectGetVal(c->argv[1]), "addslotsrange") || !strcasecmp(objectGetVal(c->argv[1]), "delslotsrange")) && c->argc >= 4) { @@ -1735,9 +1749,10 @@ void clusterCommand(client *c) { * change goes through cluster consensus), the client needs to be * put in a blocked state so the server can continue processing * other clients while waiting for the commit. */ + void *h = blockClientAsync(c); clusterSlotChange(ranges, numranges, del ? NULL : getMyClusterNode(), - c, clusterAddDelSlotsCallback); + h, clusterAddDelSlotsCallback); zfree(ranges); } else if (!strcasecmp(objectGetVal(c->argv[1]), "flushslots") && c->argc == 2) { /* CLUSTER FLUSHSLOTS */ @@ -1765,8 +1780,9 @@ void clusterCommand(client *c) { } } if (in_range) numranges++; + void *h = blockClientAsync(c); clusterSlotChange(ranges, numranges, NULL, - c, clusterAddDelSlotsCallback); + h, clusterAddDelSlotsCallback); zfree(ranges); } } else if (!strcasecmp(objectGetVal(c->argv[1]), "setslot") && c->argc >= 4) { @@ -1827,7 +1843,8 @@ void clusterCommand(client *c) { serverLog(LL_NOTICE, "Cluster meet %s:%lld (user request from '%s').", (char *)objectGetVal(c->argv[2]), port, cl); sdsfree(cl); - clusterCurrentBus->meet(objectGetVal(c->argv[2]), port, cport, c, clusterCommandMeetCompletion); + void *h = blockClientAsync(c); + clusterCurrentBus->meet(objectGetVal(c->argv[2]), port, cport, h, clusterCommandMeetCompletion); } else if (!strcasecmp(objectGetVal(c->argv[1]), "reset") && (c->argc == 2 || c->argc == 3)) { /* CLUSTER RESET [SOFT|HARD] */ int hard = 0; @@ -1909,7 +1926,8 @@ void clusterCommand(client *c) { serverLog(LL_NOTICE, "Manual failover user request accepted (user request from '%s').", cl); } sdsfree(cl); - clusterCurrentBus->failover(force, takeover, c, clusterCommandFailoverCompletion); + void *h = blockClientAsync(c); + clusterCurrentBus->failover(force, takeover, h, clusterCommandFailoverCompletion); } else if (!strcasecmp(objectGetVal(c->argv[1]), "replicate") && (c->argc == 3 || c->argc == 4)) { /* CLUSTER REPLICATE ( | NO ONE) */ @@ -1932,7 +1950,8 @@ void clusterCommand(client *c) { "primary (request from '%s').", cl); sdsfree(cl); - clusterCurrentBus->setReplicaOf(NULL, c, clusterCommandPromoteCompletion); + void *h = blockClientAsync(c); + clusterCurrentBus->setReplicaOf(NULL, h, clusterCommandPromoteCompletion); return; } /* CLUSTER REPLICATE */ @@ -1961,7 +1980,8 @@ void clusterCommand(client *c) { addReply(c, shared.ok); return; } - clusterCurrentBus->setReplicaOf(n, c, clusterCommandReplicateCompletion); + void *h = blockClientAsync(c); + clusterCurrentBus->setReplicaOf(n, h, clusterCommandReplicateCompletion); } else if (!strcasecmp(objectGetVal(c->argv[1]), "forget") && c->argc == 3) { /* CLUSTER FORGET */ const char *node_id = objectGetVal(c->argv[2]); @@ -1983,7 +2003,8 @@ void clusterCommand(client *c) { node_id, cl); sdsfree(cl); } - clusterCurrentBus->forgetNode(node_id, id_len, c, clusterCommandForgetCompletion); + void *h = blockClientAsync(c); + clusterCurrentBus->forgetNode(node_id, id_len, h, clusterCommandForgetCompletion); } else if (!strcasecmp(objectGetVal(c->argv[1]), "count-failure-reports") && c->argc == 3) { /* CLUSTER COUNT-FAILURE-REPORTS */ clusterNode *n = clusterLookupNode(objectGetVal(c->argv[2]),