diff --git a/src/cluster_bus.h b/src/cluster_bus.h index 5b53ca8aa04..138596d3273 100644 --- a/src/cluster_bus.h +++ b/src/cluster_bus.h @@ -8,6 +8,7 @@ typedef char *sds; struct serverObject; struct client; struct clusterLink; +typedef struct clusterNode clusterNode; /* Interface for cluster bus protocol implementations. * Only includes operations that code outside the protocol @@ -88,6 +89,10 @@ typedef struct clusterBusType { * If NULL, protocol_data is not freed. */ void (*freeNodeData)(clusterNode *node); + /* Clean up any protocol-specific data associated with a node before it is deleted. + * If NULL, no protocol-specific cleanup is performed. */ + void (*cleanupNode)(clusterNode *node); + /* Slot ownership changes — called from cluster commands and slot migration. * Assigns or unassigns slots specified by an array of slot ranges. If * target is non-NULL, slots are assigned to target. If target is NULL, diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index a7017f955c9..b8d1a1a3758 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -436,7 +436,6 @@ void clusterHandleReplicaMigration(int max_replicas); void clusterSendUpdate(clusterLink *link, clusterNode *node); static void clusterLegacyCancelManualFailover(void); void clusterSetNodeAsPrimary(clusterNode *n); -void clusterDelNode(clusterNode *delnode); static void clusterLegacyResetStats(void); uint64_t clusterGetMaxEpoch(void); int clusterBumpConfigEpochWithoutConsensus(void); @@ -1090,35 +1089,9 @@ static void clusterLegacyFreeNodeData(clusterNode *node) { } -/* Remove a node from the cluster. The function performs the high level - * cleanup, calling freeClusterNode() for the low level cleanup. - * Here we do the following: - * - * 1) Mark all the slots handled by it as unassigned. - * 2) Remove all the failure reports sent by this node and referenced by - * other nodes. - * 3) Remove the node from the owning shard - * 4) Free the node with freeClusterNode() that will in turn remove it - * from the hash table and from the list of replicas of its primary, if - * it is a replica node. - */ -void clusterDelNode(clusterNode *delnode) { - serverAssert(delnode != NULL); - serverLog(LL_DEBUG, "Deleting node %.40s (%s) from cluster view", delnode->name, humanNodename(delnode)); - - int j; - dictIterator *di; +static void clusterLegacyCleanupNode(clusterNode *delnode) { + dictIterator *di = dictGetSafeIterator(server.cluster->nodes); dictEntry *de; - - /* 1) Mark slots as unassigned. */ - for (j = 0; j < CLUSTER_SLOTS; j++) { - if (getImportingSlotSource(j) == delnode) setImportingSlotSource(j, NULL); - if (getMigratingSlotDest(j) == delnode) setMigratingSlotDest(j, NULL); - if (server.cluster->slots[j] == delnode) clusterDelSlot(j); - } - - /* 2) Remove failure reports. */ - di = dictGetSafeIterator(server.cluster->nodes); while ((de = dictNext(di)) != NULL) { clusterNode *node = dictGetVal(de); @@ -1126,32 +1099,9 @@ void clusterDelNode(clusterNode *delnode) { clusterNodeDelFailureReport(node, delnode); } dictReleaseIterator(di); - - /* 3) Remove the node from the owning shard */ - clusterRemoveNodeFromShard(delnode); - - /* 4) Free the node, unlinking it from the cluster. */ - freeClusterNode(delnode); } -/* This is only used after the handshake. When we connect a given IP/PORT - * as a result of CLUSTER MEET we don't have the node name yet, so we - * pick a random one, and will fix it when we receive the PONG request using - * this function. */ -void clusterRenameNode(clusterNode *node, char *newname) { - int retval; - sds s = sdsnewlen(node->name, CLUSTER_NAMELEN); - - serverLog(LL_DEBUG, "Renaming node %.40s (%s) into %.40s", node->name, humanNodename(node), newname); - retval = dictDelete(server.cluster->nodes, s); - sdsfree(s); - serverAssert(retval == DICT_OK); - memcpy(node->name, newname, CLUSTER_NAMELEN); - clusterAddNode(node); - clusterAddNodeToShard(node->shard_id, node); -} - /* ----------------------------------------------------------------------------- * CLUSTER config epoch handling * -------------------------------------------------------------------------- */ @@ -5501,6 +5451,7 @@ clusterBusType clusterLegacyBus = { .postLoad = clusterLegacyPostLoad, .initNodeData = clusterLegacyInitNodeData, .freeNodeData = clusterLegacyFreeNodeData, + .cleanupNode = clusterLegacyCleanupNode, .slotChange = clusterLegacySlotChange, .cancelManualFailover = clusterLegacyCancelManualFailover, .cancelAutomaticFailover = clusterLegacyCancelAutomaticFailover, diff --git a/src/cluster_state.c b/src/cluster_state.c index ea3c93ed381..ad8d6ade13f 100644 --- a/src/cluster_state.c +++ b/src/cluster_state.c @@ -577,6 +577,56 @@ void freeClusterNode(clusterNode *n) { zfree(n); } +/* Remove a node from the cluster. The function performs the high level + * cleanup, calling freeClusterNode() for the low level cleanup. + * Here we do the following: + * + * 1) Mark all the slots handled by it as unassigned. + * 2) Remove all the failure reports sent by this node and referenced by + * other nodes via the bus hook. + * 3) Remove the node from the owning shard + * 4) Free the node with freeClusterNode() that will in turn remove it + * from the hash table and from the list of replicas of its primary, if + * it is a replica node. + */ +void clusterDelNode(clusterNode *delnode) { + serverAssert(delnode != NULL); + serverLog(LL_DEBUG, "Deleting node %.40s (%s) from cluster view", delnode->name, humanNodename(delnode)); + + int j; + + /* 1) Mark slots as unassigned. */ + for (j = 0; j < CLUSTER_SLOTS; j++) { + if (getImportingSlotSource(j) == delnode) setImportingSlotSource(j, NULL); + if (getMigratingSlotDest(j) == delnode) setMigratingSlotDest(j, NULL); + if (server.cluster->slots[j] == delnode) clusterDelSlot(j); + } + + /* 2) Remove failure reports via bus hook. */ + if (clusterCurrentBus->cleanupNode) { + clusterCurrentBus->cleanupNode(delnode); + } + + /* 3) Remove the node from the owning shard */ + clusterRemoveNodeFromShard(delnode); + + /* 4) Free the node, unlinking it from the cluster. */ + freeClusterNode(delnode); +} + +void clusterRenameNode(clusterNode *node, char *newname) { + int retval; + sds s = sdsnewlen(node->name, CLUSTER_NAMELEN); + + serverLog(LL_DEBUG, "Renaming node %.40s (%s) into %.40s", node->name, humanNodename(node), newname); + retval = dictDelete(server.cluster->nodes, s); + sdsfree(s); + serverAssert(retval == DICT_OK); + memcpy(node->name, newname, CLUSTER_NAMELEN); + clusterAddNode(node); + clusterAddNodeToShard(node->shard_id, node); +} + /* ----------------------------------------------------------------------------- * Slot assignment * -------------------------------------------------------------------------- */