Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/cluster_bus.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ typedef char *sds;
struct serverObject;
struct client;
struct clusterLink;
typedef struct clusterNode clusterNode;

/* Interface for cluster bus protocol implementations.
* Only includes operations that code outside the protocol
Expand Down Expand Up @@ -88,6 +89,10 @@ typedef struct clusterBusType {
* If NULL, protocol_data is not freed. */
void (*freeNodeData)(clusterNode *node);

/* Clean up any protocol-specific data associated with a node before it is deleted.
* If NULL, no protocol-specific cleanup is performed. */
void (*cleanupNode)(clusterNode *node);

/* Slot ownership changes — called from cluster commands and slot migration.
* Assigns or unassigns slots specified by an array of slot ranges. If
* target is non-NULL, slots are assigned to target. If target is NULL,
Expand Down
55 changes: 3 additions & 52 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -436,7 +436,6 @@ void clusterHandleReplicaMigration(int max_replicas);
void clusterSendUpdate(clusterLink *link, clusterNode *node);
static void clusterLegacyCancelManualFailover(void);
void clusterSetNodeAsPrimary(clusterNode *n);
void clusterDelNode(clusterNode *delnode);
static void clusterLegacyResetStats(void);
uint64_t clusterGetMaxEpoch(void);
int clusterBumpConfigEpochWithoutConsensus(void);
Expand Down Expand Up @@ -1090,68 +1089,19 @@ static void clusterLegacyFreeNodeData(clusterNode *node) {
}


/* Remove a node from the cluster. The function performs the high level
* cleanup, calling freeClusterNode() for the low level cleanup.
* Here we do the following:
*
* 1) Mark all the slots handled by it as unassigned.
* 2) Remove all the failure reports sent by this node and referenced by
* other nodes.
* 3) Remove the node from the owning shard
* 4) Free the node with freeClusterNode() that will in turn remove it
* from the hash table and from the list of replicas of its primary, if
* it is a replica node.
*/
void clusterDelNode(clusterNode *delnode) {
serverAssert(delnode != NULL);
serverLog(LL_DEBUG, "Deleting node %.40s (%s) from cluster view", delnode->name, humanNodename(delnode));

int j;
dictIterator *di;
static void clusterLegacyCleanupNode(clusterNode *delnode) {
dictIterator *di = dictGetSafeIterator(server.cluster->nodes);
dictEntry *de;

/* 1) Mark slots as unassigned. */
for (j = 0; j < CLUSTER_SLOTS; j++) {
if (getImportingSlotSource(j) == delnode) setImportingSlotSource(j, NULL);
if (getMigratingSlotDest(j) == delnode) setMigratingSlotDest(j, NULL);
if (server.cluster->slots[j] == delnode) clusterDelSlot(j);
}

/* 2) Remove failure reports. */
di = dictGetSafeIterator(server.cluster->nodes);
while ((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);

if (node == delnode) continue;
clusterNodeDelFailureReport(node, delnode);
}
dictReleaseIterator(di);

/* 3) Remove the node from the owning shard */
clusterRemoveNodeFromShard(delnode);

/* 4) Free the node, unlinking it from the cluster. */
freeClusterNode(delnode);
}


/* This is only used after the handshake. When we connect a given IP/PORT
* as a result of CLUSTER MEET we don't have the node name yet, so we
* pick a random one, and will fix it when we receive the PONG request using
* this function. */
void clusterRenameNode(clusterNode *node, char *newname) {
int retval;
sds s = sdsnewlen(node->name, CLUSTER_NAMELEN);

serverLog(LL_DEBUG, "Renaming node %.40s (%s) into %.40s", node->name, humanNodename(node), newname);
retval = dictDelete(server.cluster->nodes, s);
sdsfree(s);
serverAssert(retval == DICT_OK);
memcpy(node->name, newname, CLUSTER_NAMELEN);
clusterAddNode(node);
clusterAddNodeToShard(node->shard_id, node);
}

/* -----------------------------------------------------------------------------
* CLUSTER config epoch handling
* -------------------------------------------------------------------------- */
Expand Down Expand Up @@ -5501,6 +5451,7 @@ clusterBusType clusterLegacyBus = {
.postLoad = clusterLegacyPostLoad,
.initNodeData = clusterLegacyInitNodeData,
.freeNodeData = clusterLegacyFreeNodeData,
.cleanupNode = clusterLegacyCleanupNode,
.slotChange = clusterLegacySlotChange,
.cancelManualFailover = clusterLegacyCancelManualFailover,
.cancelAutomaticFailover = clusterLegacyCancelAutomaticFailover,
Expand Down
50 changes: 50 additions & 0 deletions src/cluster_state.c
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,56 @@ void freeClusterNode(clusterNode *n) {
zfree(n);
}

/* Remove a node from the cluster. The function performs the high level
* cleanup, calling freeClusterNode() for the low level cleanup.
* Here we do the following:
*
* 1) Mark all the slots handled by it as unassigned.
* 2) Remove all the failure reports sent by this node and referenced by
* other nodes via the bus hook.
* 3) Remove the node from the owning shard
* 4) Free the node with freeClusterNode() that will in turn remove it
* from the hash table and from the list of replicas of its primary, if
* it is a replica node.
*/
void clusterDelNode(clusterNode *delnode) {
serverAssert(delnode != NULL);
serverLog(LL_DEBUG, "Deleting node %.40s (%s) from cluster view", delnode->name, humanNodename(delnode));

int j;

/* 1) Mark slots as unassigned. */
for (j = 0; j < CLUSTER_SLOTS; j++) {
if (getImportingSlotSource(j) == delnode) setImportingSlotSource(j, NULL);
if (getMigratingSlotDest(j) == delnode) setMigratingSlotDest(j, NULL);
if (server.cluster->slots[j] == delnode) clusterDelSlot(j);
}

/* 2) Remove failure reports via bus hook. */
if (clusterCurrentBus->cleanupNode) {
clusterCurrentBus->cleanupNode(delnode);
}

/* 3) Remove the node from the owning shard */
clusterRemoveNodeFromShard(delnode);

/* 4) Free the node, unlinking it from the cluster. */
freeClusterNode(delnode);
}

void clusterRenameNode(clusterNode *node, char *newname) {
int retval;
sds s = sdsnewlen(node->name, CLUSTER_NAMELEN);

serverLog(LL_DEBUG, "Renaming node %.40s (%s) into %.40s", node->name, humanNodename(node), newname);
retval = dictDelete(server.cluster->nodes, s);
sdsfree(s);
serverAssert(retval == DICT_OK);
memcpy(node->name, newname, CLUSTER_NAMELEN);
clusterAddNode(node);
clusterAddNodeToShard(node->shard_id, node);
}

/* -----------------------------------------------------------------------------
* Slot assignment
* -------------------------------------------------------------------------- */
Expand Down
Loading