Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 73 additions & 1 deletion src/blocked.c
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ void freeClientBlockingState(client *c) {
* and will be processed when the client is unblocked. */
void blockClient(client *c, int btype) {
/* Replicated clients should never be blocked unless pause or module */
serverAssert(!(isReplicatedClient(c) && btype != BLOCKED_MODULE && btype != BLOCKED_POSTPONE));
serverAssert(!(isReplicatedClient(c) && btype != BLOCKED_MODULE && btype != BLOCKED_POSTPONE && btype != BLOCKED_ASYNC));

initClientBlockingState(c);

Expand Down Expand Up @@ -228,6 +228,10 @@ void unblockClient(client *c, int queue_for_reprocessing) {
c->bstate->postponed_list_node = NULL;
} else if (c->bstate->btype == BLOCKED_SHUTDOWN) {
/* No special cleanup. */
} else if (c->bstate->btype == BLOCKED_ASYNC) {
/* The async handle is owned by the caller. When the operation
* completes, the callback will look up the client by ID and
* find it gone. No cleanup needed here. */
} else {
serverPanic("Unknown btype in unblockClient().");
}
Expand Down Expand Up @@ -292,6 +296,11 @@ void replyToBlockedClientTimedOut(client *c) {
}
} else if (c->bstate->btype == BLOCKED_MODULE) {
moduleBlockedClientTimedOut(c, 0);
} else if (c->bstate->btype == BLOCKED_ASYNC) {
/* Timeout: the operation didn't complete in time. The handle is
* still out there but lookupClientByID will return NULL after
* the client is freed. */
addReplyError(c, "Operation timed out");
} else {
serverPanic("Unknown btype in replyToBlockedClientTimedOut().");
}
Expand Down Expand Up @@ -693,6 +702,55 @@ void blockClientShutdown(client *c) {
blockClient(c, BLOCKED_SHUTDOWN);
}

/* --------------------------------------------------------------------------
* BLOCKED_ASYNC: async operation handle
*
* An async handle wraps a client ID so that a completion callback can
* safely look up the client even if it disconnected while waiting.
* -------------------------------------------------------------------------- */

typedef struct blockedAsyncHandle {
uint64_t client_id;
} blockedAsyncHandle;

blockedAsyncHandle *blockClientAsync(client *c) {
blockedAsyncHandle *h = zmalloc(sizeof(*h));
h->client_id = c->id;
blockClient(c, BLOCKED_ASYNC);
return h;
}

client *consumeBlockedClientAsyncHandle(blockedAsyncHandle *handle) {
client *c = lookupClientByID(handle->client_id);
zfree(handle);
return c;
}

/* Complete a BLOCKED_ASYNC operation. The caller must have already added
* the reply (addReply / addReplyError) before calling this.
*
* Synchronous path (inside call()): clears blocked state immediately so
* call() does normal post-processing.
*
* Async path (timer, read handler, etc.): defers the actual unblock to
* blockedBeforeSleep, matching the pattern used by all other block types. */
void unblockClientAsync(client *c) {
serverAssert(c->flag.blocked && c->bstate->btype == BLOCKED_ASYNC);

if (c->flag.executing_command && server.current_client == c) {
/* Synchronous completion — still inside call(). Just clear the
* blocked state so call() does normal post-processing. */
c->flag.blocked = 0;
c->bstate->btype = BLOCKED_NONE;
removeClientFromTimeoutTable(c);
server.blocked_clients--;
server.blocked_clients_by_type[BLOCKED_ASYNC]--;
} else {
/* Async completion — defer to blockedBeforeSleep. */
listAddNodeTail(server.clients_pending_async_unblock, c);
}
}

/* Unblock a client once a specific key became available for it.
* This function will remove the client from the list of clients blocked on this key
* and also remove the key from the dictionary of keys this client is blocked on.
Expand Down Expand Up @@ -807,6 +865,20 @@ void blockedBeforeSleep(void) {
* blocking commands. */
if (moduleCount()) moduleHandleBlockedClients();

/* Unblock clients pending async completion (e.g. Raft commits). */
while (listLength(server.clients_pending_async_unblock)) {
listNode *ln = listFirst(server.clients_pending_async_unblock);
client *c = ln->value;
listDelNode(server.clients_pending_async_unblock, ln);
commitDeferredReplyBuffer(c, 0);
updateStatsOnUnblock(c, 0, 0, 0);
unblockClient(c, 1);
if (clientHasPendingReplies(c) && !c->flag.pending_write && c->conn) {
c->flag.pending_write = 1;
listLinkNodeHead(server.clients_pending_write, &c->clients_pending_write_node);
}
}

/* Try to process pending commands for clients that were just unblocked. */
if (listLength(server.unblocked_clients)) processUnblockedClients();
}
67 changes: 44 additions & 23 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -1210,20 +1210,24 @@ void clusterKeySlotCommand(client *c) {
* Called after the slot change is applied. This may be called inline
* or asynchronously if the change goes through cluster consensus. */
static void clusterAddDelSlotsCallback(void *ctx, const char *error) {
client *c = (client *)ctx;
client *c = consumeBlockedClientAsyncHandle((blockedAsyncHandle *)ctx);
if (!c) return;
if (error) {
addReplyError(c, error);
return;
} else {
addReply(c, shared.ok);
}
addReply(c, shared.ok);
unblockClientAsync(c);
}

/* Callback for CLUSTER SETSLOT NODE after the slot change is applied.
* Handles replica migration if this shard lost its last slot. */
static void clusterSetSlotNodeCallback(void *ctx, const char *error) {
client *c = (client *)ctx;
client *c = consumeBlockedClientAsyncHandle((blockedAsyncHandle *)ctx);
if (!c) return;
if (error) {
addReplyError(c, error);
unblockClientAsync(c);
return;
}

Expand Down Expand Up @@ -1254,6 +1258,7 @@ static void clusterSetSlotNodeCallback(void *ctx, const char *error) {
}

addReply(c, shared.ok);
unblockClientAsync(c);
}

/* Validate slot assignments for ADDSLOTS/DELSLOTS commands.
Expand Down Expand Up @@ -1510,7 +1515,8 @@ void clusterCommandSetSlot(client *c) {
}

slotRange range = {slot, slot};
clusterSlotChange(&range, 1, n, c, clusterSetSlotNodeCallback);
void *h = blockClientAsync(c);
clusterSlotChange(&range, 1, n, h, clusterSetSlotNodeCallback);
return;
}

Expand All @@ -1523,52 +1529,63 @@ void clusterCommandSetSlot(client *c) {
* post-action work is done and an OK reply is sent. */

static void clusterCommandMeetCompletion(void *ctx, const char *error) {
client *c = ctx;
client *c = consumeBlockedClientAsyncHandle((blockedAsyncHandle *)ctx);
if (!c) return;
if (error) {
addReplyError(c, error);
} else {
addReply(c, shared.ok);
}
unblockClientAsync(c);
}

static void clusterCommandForgetCompletion(void *ctx, const char *error) {
client *c = ctx;
client *c = consumeBlockedClientAsyncHandle((blockedAsyncHandle *)ctx);
if (!c) return;
if (error) {
addReplyError(c, error);
} else {
addReply(c, shared.ok);
}
unblockClientAsync(c);
}

static void clusterCommandReplicateCompletion(void *ctx, const char *error) {
client *c = ctx;
client *c = consumeBlockedClientAsyncHandle((blockedAsyncHandle *)ctx);
if (!c) return;
if (error) {
addReplyError(c, error);
} else {
addReply(c, shared.ok);
}
unblockClientAsync(c);
}

static void clusterCommandPromoteCompletion(void *ctx, const char *error) {
client *c = ctx;
client *c = consumeBlockedClientAsyncHandle((blockedAsyncHandle *)ctx);
if (!c) return;
if (error) {
addReplyError(c, error);
unblockClientAsync(c);
return;
}
flushAllDataAndResetRDB(server.repl_replica_lazy_flush ? EMPTYDB_ASYNC : EMPTYDB_NO_FLAGS);
verifyClusterConfigWithData();
clusterCloseAllSlots();
clusterCancelManualFailover();
addReply(c, shared.ok);
unblockClientAsync(c);
}

static void clusterCommandFailoverCompletion(void *ctx, const char *error) {
client *c = ctx;
client *c = consumeBlockedClientAsyncHandle((blockedAsyncHandle *)ctx);
if (!c) return;
if (error) {
addReplyError(c, error);
} else {
addReply(c, shared.ok);
}
unblockClientAsync(c);
}

void clusterCommand(client *c) {
Expand Down Expand Up @@ -1684,14 +1701,11 @@ void clusterCommand(client *c) {
return;
}
/* Propose the slot change. The reply is deferred to the callback,
* which is called after the change is applied.
* TODO: If the callback is not called immediately (e.g. when the
* change goes through cluster consensus), the client needs to be
* put in a blocked state so the server can continue processing
* other clients while waiting for the commit. */
* which is called after the change is applied. */
void *h = blockClientAsync(c);
clusterSlotChange(ranges, numslots,
del ? NULL : getMyClusterNode(),
c, clusterAddDelSlotsCallback);
h, clusterAddDelSlotsCallback);
zfree(ranges);
} else if ((!strcasecmp(objectGetVal(c->argv[1]), "addslotsrange") || !strcasecmp(objectGetVal(c->argv[1]), "delslotsrange")) &&
c->argc >= 4) {
Expand Down Expand Up @@ -1735,9 +1749,10 @@ void clusterCommand(client *c) {
* change goes through cluster consensus), the client needs to be
* put in a blocked state so the server can continue processing
* other clients while waiting for the commit. */
void *h = blockClientAsync(c);
clusterSlotChange(ranges, numranges,
del ? NULL : getMyClusterNode(),
c, clusterAddDelSlotsCallback);
h, clusterAddDelSlotsCallback);
zfree(ranges);
} else if (!strcasecmp(objectGetVal(c->argv[1]), "flushslots") && c->argc == 2) {
/* CLUSTER FLUSHSLOTS */
Expand Down Expand Up @@ -1765,8 +1780,9 @@ void clusterCommand(client *c) {
}
}
if (in_range) numranges++;
void *h = blockClientAsync(c);
clusterSlotChange(ranges, numranges, NULL,
c, clusterAddDelSlotsCallback);
h, clusterAddDelSlotsCallback);
zfree(ranges);
}
} else if (!strcasecmp(objectGetVal(c->argv[1]), "setslot") && c->argc >= 4) {
Expand Down Expand Up @@ -1827,7 +1843,8 @@ void clusterCommand(client *c) {
serverLog(LL_NOTICE, "Cluster meet %s:%lld (user request from '%s').",
(char *)objectGetVal(c->argv[2]), port, cl);
sdsfree(cl);
clusterCurrentBus->meet(objectGetVal(c->argv[2]), port, cport, c, clusterCommandMeetCompletion);
void *h = blockClientAsync(c);
clusterCurrentBus->meet(objectGetVal(c->argv[2]), port, cport, h, clusterCommandMeetCompletion);
} else if (!strcasecmp(objectGetVal(c->argv[1]), "reset") && (c->argc == 2 || c->argc == 3)) {
/* CLUSTER RESET [SOFT|HARD] */
int hard = 0;
Expand Down Expand Up @@ -1909,7 +1926,8 @@ void clusterCommand(client *c) {
serverLog(LL_NOTICE, "Manual failover user request accepted (user request from '%s').", cl);
}
sdsfree(cl);
clusterCurrentBus->failover(force, takeover, c, clusterCommandFailoverCompletion);
void *h = blockClientAsync(c);
clusterCurrentBus->failover(force, takeover, h, clusterCommandFailoverCompletion);
} else if (!strcasecmp(objectGetVal(c->argv[1]), "replicate") &&
(c->argc == 3 || c->argc == 4)) {
/* CLUSTER REPLICATE (<NODE ID> | NO ONE) */
Expand All @@ -1932,7 +1950,8 @@ void clusterCommand(client *c) {
"primary (request from '%s').",
cl);
sdsfree(cl);
clusterCurrentBus->setReplicaOf(NULL, c, clusterCommandPromoteCompletion);
void *h = blockClientAsync(c);
clusterCurrentBus->setReplicaOf(NULL, h, clusterCommandPromoteCompletion);
return;
}
/* CLUSTER REPLICATE <NODE ID> */
Expand Down Expand Up @@ -1961,7 +1980,8 @@ void clusterCommand(client *c) {
addReply(c, shared.ok);
return;
}
clusterCurrentBus->setReplicaOf(n, c, clusterCommandReplicateCompletion);
void *h = blockClientAsync(c);
clusterCurrentBus->setReplicaOf(n, h, clusterCommandReplicateCompletion);
} else if (!strcasecmp(objectGetVal(c->argv[1]), "forget") && c->argc == 3) {
/* CLUSTER FORGET <NODE ID> */
const char *node_id = objectGetVal(c->argv[2]);
Expand All @@ -1983,7 +2003,8 @@ void clusterCommand(client *c) {
node_id, cl);
sdsfree(cl);
}
clusterCurrentBus->forgetNode(node_id, id_len, c, clusterCommandForgetCompletion);
void *h = blockClientAsync(c);
clusterCurrentBus->forgetNode(node_id, id_len, h, clusterCommandForgetCompletion);
} else if (!strcasecmp(objectGetVal(c->argv[1]), "count-failure-reports") && c->argc == 3) {
/* CLUSTER COUNT-FAILURE-REPORTS <NODE ID> */
clusterNode *n = clusterLookupNode(objectGetVal(c->argv[2]),
Expand Down
48 changes: 47 additions & 1 deletion src/debug.c
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,31 @@ void mallctl_string(client *c, robj **argv, int argc) {
}
#endif

typedef struct {
blockedAsyncHandle *handle;
} debugBlockAsyncCtx;

/* Timer event proc for DEBUG BLOCK-ASYNC. Fires after the requested delay,
* sends the reply, and unblocks the client. */
static long long debugBlockAsyncTimeProc(struct aeEventLoop *el, long long id, void *clientData) {
UNUSED(el);
UNUSED(id);
debugBlockAsyncCtx *ctx = clientData;
client *c = consumeBlockedClientAsyncHandle(ctx->handle);
ctx->handle = NULL;
if (c) {
addReply(c, shared.ok);
unblockClientAsync(c);
}
return AE_NOMORE;
}

/* Timer event finalizer for DEBUG BLOCK-ASYNC. Frees the context. */
static void debugBlockAsyncFinalize(struct aeEventLoop *el, void *clientData) {
UNUSED(el);
zfree(clientData);
}

void debugCommand(client *c) {
if (c->argc == 2 && !strcasecmp(objectGetVal(c->argv[1]), "help")) {
const char *help[] = {
Expand Down Expand Up @@ -490,8 +515,10 @@ void debugCommand(client *c) {
" Default value is 1GB, allows values up to 4GB. Setting to 0 restores to default.",
"SET-SKIP-CHECKSUM-VALIDATION <0|1>",
" Enables or disables checksum checks for RDB files and RESTORE's payload.",
"SLEEP <seconds>",
"SLEEP <seconds> [ASYNC]",
" Stop the server for <seconds>. Decimals allowed.",
" With ASYNC, block the client and unblock via a timer event.",
" SLEEP 0 ASYNC unblocks synchronously (inside call()).",
"STRINGMATCH-TEST",
" Run a fuzz tester against the stringmatchlen() function.",
"STRUCTSIZE",
Expand Down Expand Up @@ -889,6 +916,25 @@ void debugCommand(client *c) {
tv.tv_nsec = (utime % 1000000) * 1000;
nanosleep(&tv, NULL);
addReply(c, shared.ok);
} else if (!strcasecmp(objectGetVal(c->argv[1]), "sleep") && c->argc == 4 &&
!strcasecmp(objectGetVal(c->argv[3]), "async")) {
/* Test the BLOCKED_ASYNC infrastructure: block the client and
* unblock it after a timer fires (or immediately if 0). */
double dtime = valkey_strtod_sds(objectGetVal(c->argv[2]), NULL);
long long ms = (long long)(dtime * 1000);
if (ms == 0) {
blockedAsyncHandle *h = blockClientAsync(c);
client *bc = consumeBlockedClientAsyncHandle(h);
if (bc) {
addReply(bc, shared.ok);
unblockClientAsync(bc);
}
} else {
blockedAsyncHandle *h = blockClientAsync(c);
debugBlockAsyncCtx *ctx = zmalloc(sizeof(*ctx));
ctx->handle = h;
aeCreateTimeEvent(server.el, ms, debugBlockAsyncTimeProc, ctx, debugBlockAsyncFinalize);
}
} else if (!strcasecmp(objectGetVal(c->argv[1]), "set-active-expire") && c->argc == 3) {
server.active_expire_enabled = atoi(objectGetVal(c->argv[2]));
addReply(c, shared.ok);
Expand Down
1 change: 1 addition & 0 deletions src/server.c
Original file line number Diff line number Diff line change
Expand Up @@ -2950,6 +2950,7 @@ void initServer(void) {
server.tracking_pending_keys = listCreate();
server.pending_push_messages = listCreate();
server.clients_waiting_acks = listCreate();
server.clients_pending_async_unblock = listCreate();
server.get_ack_from_replicas = 0;
server.paused_actions = 0;
memset(server.client_pause_per_purpose, 0, sizeof(server.client_pause_per_purpose));
Expand Down
Loading
Loading