Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmake/Modules/SourceFiles.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ set(VALKEY_SERVER_SRCS
${CMAKE_SOURCE_DIR}/src/cluster.c
${CMAKE_SOURCE_DIR}/src/cluster_migrateslots.c
${CMAKE_SOURCE_DIR}/src/cluster_legacy.c
${CMAKE_SOURCE_DIR}/src/cluster_raft.c
${CMAKE_SOURCE_DIR}/src/cluster_link.c
${CMAKE_SOURCE_DIR}/src/cluster_nodes.c
${CMAKE_SOURCE_DIR}/src/cluster_state.c
Expand Down
1 change: 1 addition & 0 deletions src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,7 @@ ENGINE_SERVER_OBJ = \
cluster_link.o \
cluster_nodes.o \
cluster_state.o \
cluster_raft.o \
cluster_migrateslots.o \
cluster_slot_stats.o \
commandlog.o \
Expand Down
5 changes: 5 additions & 0 deletions src/cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@

/* The active cluster bus protocol implementation. */
extern clusterBusType clusterLegacyBus;
extern clusterBusType clusterRaftBus;
clusterBusType *clusterCurrentBus = &clusterLegacyBus;

static void clusterCommandFlushslot(client *c);
Expand All @@ -61,6 +62,10 @@ static void clusterCommandFlushslot(client *c);
* -------------------------------------------------------------------------- */

void clusterInit(void) {
if (server.cluster_raft_enabled) {
clusterCurrentBus = &clusterRaftBus;
}

server.cluster = zmalloc(sizeof(struct clusterState));
server.cluster->myself = NULL;
server.cluster->state = CLUSTER_FAIL;
Expand Down
5 changes: 5 additions & 0 deletions src/cluster_bus.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ typedef char *sds;
struct serverObject;
struct client;
struct clusterLink;
typedef struct clusterNode clusterNode;

/* Interface for cluster bus protocol implementations.
* Only includes operations that code outside the protocol
Expand Down Expand Up @@ -88,6 +89,10 @@ typedef struct clusterBusType {
* If NULL, protocol_data is not freed. */
void (*freeNodeData)(clusterNode *node);

/* Clean up any protocol-specific data associated with a node before it is deleted.
* If NULL, no protocol-specific cleanup is performed. */
void (*cleanupNode)(clusterNode *node);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I notice this missing piece too.

cluster_link.c calls clusterDelNode which is defined in cluster_legacy.c but declared in cluster_state.h. It's incomplete work from the cluster protocol separation refactoring.

I see you moved clusterDelNode to cluster_state.c and made it call this callback.

It'd be good to have this as a separate commit or PR.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've extracted this part to a separate PR now: #3542


/* Slot ownership changes — called from cluster commands and slot migration.
* Assigns or unassigns slots specified by an array of slot ranges. If
* target is non-NULL, slots are assigned to target. If target is NULL,
Expand Down
48 changes: 3 additions & 45 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -1090,67 +1090,24 @@ static void clusterLegacyFreeNodeData(clusterNode *node) {
}


/* Remove a node from the cluster. The function performs the high level
* cleanup, calling freeClusterNode() for the low level cleanup.
* Here we do the following:
*
* 1) Mark all the slots handled by it as unassigned.
* 2) Remove all the failure reports sent by this node and referenced by
* other nodes.
* 3) Remove the node from the owning shard
* 4) Free the node with freeClusterNode() that will in turn remove it
* from the hash table and from the list of replicas of its primary, if
* it is a replica node.
*/
void clusterDelNode(clusterNode *delnode) {
serverAssert(delnode != NULL);
serverLog(LL_DEBUG, "Deleting node %.40s (%s) from cluster view", delnode->name, humanNodename(delnode));

int j;
dictIterator *di;
static void clusterLegacyCleanupNode(clusterNode *delnode) {
dictIterator *di = dictGetSafeIterator(server.cluster->nodes);
dictEntry *de;

/* 1) Mark slots as unassigned. */
for (j = 0; j < CLUSTER_SLOTS; j++) {
if (getImportingSlotSource(j) == delnode) setImportingSlotSource(j, NULL);
if (getMigratingSlotDest(j) == delnode) setMigratingSlotDest(j, NULL);
if (server.cluster->slots[j] == delnode) clusterDelSlot(j);
}

/* 2) Remove failure reports. */
di = dictGetSafeIterator(server.cluster->nodes);
while ((de = dictNext(di)) != NULL) {
clusterNode *node = dictGetVal(de);

if (node == delnode) continue;
clusterNodeDelFailureReport(node, delnode);
}
dictReleaseIterator(di);

/* 3) Remove the node from the owning shard */
clusterRemoveNodeFromShard(delnode);

/* 4) Free the node, unlinking it from the cluster. */
freeClusterNode(delnode);
}


/* This is only used after the handshake. When we connect a given IP/PORT
* as a result of CLUSTER MEET we don't have the node name yet, so we
* pick a random one, and will fix it when we receive the PONG request using
* this function. */
void clusterRenameNode(clusterNode *node, char *newname) {
int retval;
sds s = sdsnewlen(node->name, CLUSTER_NAMELEN);

serverLog(LL_DEBUG, "Renaming node %.40s (%s) into %.40s", node->name, humanNodename(node), newname);
retval = dictDelete(server.cluster->nodes, s);
sdsfree(s);
serverAssert(retval == DICT_OK);
memcpy(node->name, newname, CLUSTER_NAMELEN);
clusterAddNode(node);
clusterAddNodeToShard(node->shard_id, node);
}

/* -----------------------------------------------------------------------------
* CLUSTER config epoch handling
Expand Down Expand Up @@ -5501,6 +5458,7 @@ clusterBusType clusterLegacyBus = {
.postLoad = clusterLegacyPostLoad,
.initNodeData = clusterLegacyInitNodeData,
.freeNodeData = clusterLegacyFreeNodeData,
.cleanupNode = clusterLegacyCleanupNode,
.slotChange = clusterLegacySlotChange,
.cancelManualFailover = clusterLegacyCancelManualFailover,
.cancelAutomaticFailover = clusterLegacyCancelAutomaticFailover,
Expand Down
Loading
Loading