diff --git a/cmake/Modules/SourceFiles.cmake b/cmake/Modules/SourceFiles.cmake index 6aeef7eff56..05b4102b260 100644 --- a/cmake/Modules/SourceFiles.cmake +++ b/cmake/Modules/SourceFiles.cmake @@ -45,6 +45,7 @@ set(VALKEY_SERVER_SRCS ${CMAKE_SOURCE_DIR}/src/cluster.c ${CMAKE_SOURCE_DIR}/src/cluster_migrateslots.c ${CMAKE_SOURCE_DIR}/src/cluster_legacy.c + ${CMAKE_SOURCE_DIR}/src/cluster_raft.c ${CMAKE_SOURCE_DIR}/src/cluster_link.c ${CMAKE_SOURCE_DIR}/src/cluster_nodes.c ${CMAKE_SOURCE_DIR}/src/cluster_state.c diff --git a/src/Makefile b/src/Makefile index a4191e60915..7fa5068ee4f 100644 --- a/src/Makefile +++ b/src/Makefile @@ -467,6 +467,7 @@ ENGINE_SERVER_OBJ = \ cluster_link.o \ cluster_nodes.o \ cluster_state.o \ + cluster_raft.o \ cluster_migrateslots.o \ cluster_slot_stats.o \ commandlog.o \ diff --git a/src/cluster.c b/src/cluster.c index 351372a2bbc..f618e248afd 100644 --- a/src/cluster.c +++ b/src/cluster.c @@ -52,6 +52,7 @@ /* The active cluster bus protocol implementation. */ extern clusterBusType clusterLegacyBus; +extern clusterBusType clusterRaftBus; clusterBusType *clusterCurrentBus = &clusterLegacyBus; static void clusterCommandFlushslot(client *c); @@ -61,6 +62,10 @@ static void clusterCommandFlushslot(client *c); * -------------------------------------------------------------------------- */ void clusterInit(void) { + if (server.cluster_raft_enabled) { + clusterCurrentBus = &clusterRaftBus; + } + server.cluster = zmalloc(sizeof(struct clusterState)); server.cluster->myself = NULL; server.cluster->state = CLUSTER_FAIL; diff --git a/src/cluster_bus.h b/src/cluster_bus.h index 5b53ca8aa04..138596d3273 100644 --- a/src/cluster_bus.h +++ b/src/cluster_bus.h @@ -8,6 +8,7 @@ typedef char *sds; struct serverObject; struct client; struct clusterLink; +typedef struct clusterNode clusterNode; /* Interface for cluster bus protocol implementations. * Only includes operations that code outside the protocol @@ -88,6 +89,10 @@ typedef struct clusterBusType { * If NULL, protocol_data is not freed. */ void (*freeNodeData)(clusterNode *node); + /* Clean up any protocol-specific data associated with a node before it is deleted. + * If NULL, no protocol-specific cleanup is performed. */ + void (*cleanupNode)(clusterNode *node); + /* Slot ownership changes — called from cluster commands and slot migration. * Assigns or unassigns slots specified by an array of slot ranges. If * target is non-NULL, slots are assigned to target. If target is NULL, diff --git a/src/cluster_legacy.c b/src/cluster_legacy.c index a7017f955c9..5a77c12271a 100644 --- a/src/cluster_legacy.c +++ b/src/cluster_legacy.c @@ -1090,35 +1090,9 @@ static void clusterLegacyFreeNodeData(clusterNode *node) { } -/* Remove a node from the cluster. The function performs the high level - * cleanup, calling freeClusterNode() for the low level cleanup. - * Here we do the following: - * - * 1) Mark all the slots handled by it as unassigned. - * 2) Remove all the failure reports sent by this node and referenced by - * other nodes. - * 3) Remove the node from the owning shard - * 4) Free the node with freeClusterNode() that will in turn remove it - * from the hash table and from the list of replicas of its primary, if - * it is a replica node. - */ -void clusterDelNode(clusterNode *delnode) { - serverAssert(delnode != NULL); - serverLog(LL_DEBUG, "Deleting node %.40s (%s) from cluster view", delnode->name, humanNodename(delnode)); - - int j; - dictIterator *di; +static void clusterLegacyCleanupNode(clusterNode *delnode) { + dictIterator *di = dictGetSafeIterator(server.cluster->nodes); dictEntry *de; - - /* 1) Mark slots as unassigned. */ - for (j = 0; j < CLUSTER_SLOTS; j++) { - if (getImportingSlotSource(j) == delnode) setImportingSlotSource(j, NULL); - if (getMigratingSlotDest(j) == delnode) setMigratingSlotDest(j, NULL); - if (server.cluster->slots[j] == delnode) clusterDelSlot(j); - } - - /* 2) Remove failure reports. */ - di = dictGetSafeIterator(server.cluster->nodes); while ((de = dictNext(di)) != NULL) { clusterNode *node = dictGetVal(de); @@ -1126,12 +1100,6 @@ void clusterDelNode(clusterNode *delnode) { clusterNodeDelFailureReport(node, delnode); } dictReleaseIterator(di); - - /* 3) Remove the node from the owning shard */ - clusterRemoveNodeFromShard(delnode); - - /* 4) Free the node, unlinking it from the cluster. */ - freeClusterNode(delnode); } @@ -1139,18 +1107,7 @@ void clusterDelNode(clusterNode *delnode) { * as a result of CLUSTER MEET we don't have the node name yet, so we * pick a random one, and will fix it when we receive the PONG request using * this function. */ -void clusterRenameNode(clusterNode *node, char *newname) { - int retval; - sds s = sdsnewlen(node->name, CLUSTER_NAMELEN); - serverLog(LL_DEBUG, "Renaming node %.40s (%s) into %.40s", node->name, humanNodename(node), newname); - retval = dictDelete(server.cluster->nodes, s); - sdsfree(s); - serverAssert(retval == DICT_OK); - memcpy(node->name, newname, CLUSTER_NAMELEN); - clusterAddNode(node); - clusterAddNodeToShard(node->shard_id, node); -} /* ----------------------------------------------------------------------------- * CLUSTER config epoch handling @@ -5501,6 +5458,7 @@ clusterBusType clusterLegacyBus = { .postLoad = clusterLegacyPostLoad, .initNodeData = clusterLegacyInitNodeData, .freeNodeData = clusterLegacyFreeNodeData, + .cleanupNode = clusterLegacyCleanupNode, .slotChange = clusterLegacySlotChange, .cancelManualFailover = clusterLegacyCancelManualFailover, .cancelAutomaticFailover = clusterLegacyCancelAutomaticFailover, diff --git a/src/cluster_raft.c b/src/cluster_raft.c new file mode 100644 index 00000000000..e21a2d84ec4 --- /dev/null +++ b/src/cluster_raft.c @@ -0,0 +1,514 @@ +#include "server.h" +#include "cluster.h" +#include "cluster_state.h" +#include "cluster_link.h" +#include "cluster_bus.h" +#include "cluster_raft.h" +#include + +/* Access Raft protocol-specific state from the cluster. */ +#define RAFT_STATE() ((clusterRaftState *)server.cluster->protocol_data) + +/* Access Raft protocol-specific data from a clusterNode. */ +#define RAFT_DATA(n) ((clusterNodeRaftData *)(n)->protocol_data) + +/* Forward declarations */ +clusterMsgSendBlock *createClusterRaftMsgSendBlock(int type, uint32_t payload_len); +void clusterRaftStateMachine(void); +void clusterRaftInit(void); +void clusterRaftFree(void); +void clusterRaftCron(void); +void clusterRaftProcessHandshake(clusterLink *link, uint16_t type, clusterMsgRaftHandshake *req); +void clusterRaftConnectToNode(clusterNode *node); + +/* Dictionary type for applied entries. */ +void raftAppliedEntryFree(void *val) { + raftAppliedEntry *ae = val; + sdsfree(ae->value); + zfree(ae); +} + +static void clusterRaftAppliedEntryDictEntryDestructor(void *entry) { + dictEntry *de = entry; + dictSdsDestructor(dictGetKey(de)); + raftAppliedEntryFree(dictGetVal(de)); + zfree(de); +} + +dictType clusterRaftAppliedEntryDictType = { + .entryGetKey = dictEntryGetKey, + .hashFunction = dictSdsHash, + .keyCompare = dictSdsKeyCompare, + .entryDestructor = clusterRaftAppliedEntryDictEntryDestructor, +}; + +static const char *raftRoleToString(int role) { + switch (role) { + case RAFT_ROLE_FOLLOWER: return "FOLLOWER"; + case RAFT_ROLE_CANDIDATE: return "CANDIDATE"; + case RAFT_ROLE_LEADER: return "LEADER"; + case RAFT_ROLE_HANDSHAKING: return "HANDSHAKING"; + case RAFT_ROLE_NON_MEMBER: return "NON_MEMBER"; + case RAFT_ROLE_JOINING: return "JOINING"; + default: return "UNKNOWN"; + } +} + +static void clusterRaftSetNodeRole(clusterNode *node, int new_role) { + if (RAFT_DATA(node)->role == new_role) return; + serverLog(LL_NOTICE, "Raft: Node %.40s changing role: %s -> %s", node->name, raftRoleToString(RAFT_DATA(node)->role), raftRoleToString(new_role)); + RAFT_DATA(node)->role = new_role; + + /* Keep the CLUSTER_NODE_* flags in sync */ + int flags = node->flags; + if (new_role == RAFT_ROLE_HANDSHAKING || new_role == RAFT_ROLE_NON_MEMBER) { + flags |= CLUSTER_NODE_HANDSHAKE; + } else { + flags &= ~CLUSTER_NODE_HANDSHAKE; + } + node->flags = flags; +} + +void clusterRaftInit(void) { + server.cluster->protocol_data = zcalloc(sizeof(clusterRaftState)); + RAFT_STATE()->enabled = server.cluster_raft_enabled; + RAFT_STATE()->applied_entries = dictCreate(&clusterRaftAppliedEntryDictType); + RAFT_STATE()->term = 0; +} + +static void clusterRaftInitLast(void) { + /* Initialize fields that depend on server.cluster->myself */ + RAFT_STATE()->leader = server.cluster->myself; + clusterRaftSetNodeRole(server.cluster->myself, RAFT_ROLE_LEADER); + + /* Bootstrap our metadata with just the membership information of our 1-node + * group.*/ + raftAppliedEntry *ae = zmalloc(sizeof(*ae)); + ae->index = 1; + ae->term = 0; + ae->value = sdsnewlen(server.cluster->myself->name, CLUSTER_NAMELEN); + dictAdd(RAFT_STATE()->applied_entries, sdsnew("membership"), ae); + RAFT_STATE()->last_applied_index = 1; + RAFT_STATE()->last_applied_term = 0; + + clusterListenerInit(); +} + +void clusterRaftCron(void) { + clusterRaftStateMachine(); +} + +static void clusterRaftBeforeSleep(void) { +} + +static void clusterRaftHandleServerShutdown(bool auto_failover) { + UNUSED(auto_failover); +} + +static uint32_t clusterRaftValidateMessageHeader(char *header) { + clusterRaftHeader *hdr = (clusterRaftHeader *)header; + if (memcmp(hdr->sig, "VCv2", 4) != 0) return 0; + return ntohl(hdr->totlen); +} + +static int clusterRaftProcessMessage(clusterLink *link) { + clusterRaftHeader *hdr = (clusterRaftHeader *)link->rcvbuf; + uint16_t type = ntohs(hdr->type); + void *payload = link->rcvbuf + sizeof(clusterRaftHeader); + clusterNode *sender = link->node; + if (!sender) { + if (type != CLUSTERMSG_TYPE_RAFT_HANDSHAKE_REQUEST) { + serverLog(LL_WARNING, "Raft: Unexpected message type %d from unknown sender", type); + return 0; + } + } + + switch (type) { + case CLUSTERMSG_TYPE_RAFT_HANDSHAKE_REQUEST: + case CLUSTERMSG_TYPE_RAFT_HANDSHAKE_REPLY: + clusterRaftProcessHandshake(link, type, (clusterMsgRaftHandshake *)payload); + break; + default: + serverLog(LL_WARNING, "Unknown Raft message type %d", type); + return 0; + } + return 1; +} + +static void clusterRaftPostConnect(clusterLink *link) { + serverAssert(link->node != NULL); + serverLog(LL_DEBUG, "Raft: Sending handshake to %.40s after connection established", clusterLinkGetNodeName(link)); + + clusterMsgSendBlock *msgblock = createClusterRaftMsgSendBlock(CLUSTERMSG_TYPE_RAFT_HANDSHAKE_REQUEST, sizeof(clusterMsgRaftHandshake)); + clusterMsgRaftHandshake *req = (clusterMsgRaftHandshake *)((char *)msgblock->data + sizeof(clusterRaftHeader)); + + memcpy(req->sender_name, myself->name, CLUSTER_NAMELEN); + req->term = htonu64(RAFT_STATE()->term); + memcpy(req->remote_ip, link->node->ip, NET_IP_STR_LEN); + req->plaintext_port = htons(server.port); + req->tls_port = htons(server.tls_port); + req->cluster_port = htons(myself->cport); + + clusterLinkSendBlock(link, msgblock); + clusterMsgSendBlockDecrRefCount(msgblock); + clusterRaftStateMachine(); +} + +static void clusterRaftOnMyselfUpdated(int old_flags) { + UNUSED(old_flags); +} + +static void clusterRaftPropagatePublish(robj *channel, robj *message, int sharded) { + UNUSED(channel); + UNUSED(message); + UNUSED(sharded); +} + +static int clusterRaftSendModuleMessage(const char *target, uint64_t module_id, uint8_t type, const char *payload, uint32_t len) { + UNUSED(target); + UNUSED(module_id); + UNUSED(type); + UNUSED(payload); + UNUSED(len); + return 0; +} + +static unsigned long clusterRaftGetConnectionsCount(void) { + return 0; +} + +static void clusterRaftResetStats(void) { +} + +static sds clusterRaftAppendInfoFields(sds info) { + if (server.cluster == NULL) return info; + + info = sdscatfmt(info, "raft_node_count:%U\r\n", (unsigned long long)dictSize(server.cluster->nodes)); + info = sdscatfmt(info, "raft_role:%s\r\n", raftRoleToString(RAFT_DATA(myself)->role)); + info = sdscatprintf(info, "raft_node_id:%.40s\r\n", server.cluster->myself->name); + info = sdscatprintf(info, "raft_term:%llu\r\n", (unsigned long long)RAFT_STATE()->term); + info = sdscatprintf(info, "raft_leader:%.40s\r\n", RAFT_STATE()->leader ? RAFT_STATE()->leader->name : ""); + info = sdscatprintf(info, "raft_applied_key_count:%zu\r\n", dictSize(RAFT_STATE()->applied_entries)); + info = sdscatprintf(info, "raft_applied_index:%llu\r\n", (unsigned long long)RAFT_STATE()->last_applied_index); + + return info; +} + +static int clusterRaftGetFailureReportsCount(clusterNode *node) { + UNUSED(node); + return 0; +} + +static void clusterRaftGetNodePingPongEpoch(clusterNode *node, long long *ping_sent, long long *pong_received, uint64_t *config_epoch) { + UNUSED(node); + *ping_sent = 0; + *pong_received = 0; + *config_epoch = 0; +} + +static void clusterRaftSetNodePingPongEpoch(clusterNode *node, int ping_active, int pong_active, uint64_t config_epoch) { + UNUSED(node); + UNUSED(ping_active); + UNUSED(pong_active); + UNUSED(config_epoch); +} + +static void clusterRaftSetNodeFailed(clusterNode *node) { + UNUSED(node); +} + +static sds clusterRaftAppendVarsLine(sds config) { + return config; +} + +static int clusterRaftParseVarsLine(const char *name, const char *value) { + UNUSED(name); + UNUSED(value); + return 0; +} + +static void clusterRaftPostLoad(void) { +} + +void clusterRaftInitNodeData(clusterNode *node) { + node->protocol_data = zcalloc(sizeof(clusterNodeRaftData)); + clusterRaftSetNodeRole(node, RAFT_ROLE_HANDSHAKING); +} + +void clusterRaftFreeNodeData(clusterNode *node) { + if (node->protocol_data) { + zfree(node->protocol_data); + node->protocol_data = NULL; + } +} + +static void clusterRaftSlotChange(slotRange *ranges, int numranges, clusterNode *target, void *ctx, void (*callback)(void *ctx, const char *error)) { + UNUSED(ranges); + UNUSED(numranges); + UNUSED(target); + if (callback) callback(ctx, NULL); +} + +static void clusterRaftCancelManualFailover(void) { +} + +static void clusterRaftCancelAutomaticFailover(void) { +} + +static void clusterRaftForgetNode(const char *node_id, size_t id_len, void *ctx, void (*callback)(void *ctx, const char *error)) { + UNUSED(node_id); + UNUSED(id_len); + if (callback) callback(ctx, NULL); +} + +static void clusterRaftSetReplicaOf(clusterNode *primary, void *ctx, void (*callback)(void *ctx, const char *error)) { + UNUSED(primary); + if (callback) callback(ctx, "Not supported in Raft mode"); +} + +static void clusterRaftFailover(int force, int takeover, void *ctx, void (*callback)(void *ctx, const char *error)) { + UNUSED(force); + UNUSED(takeover); + if (callback) callback(ctx, "Not supported in Raft mode"); +} + +static void clusterRaftMeet(const char *ip, int port, int cport, void *ctx, void (*callback)(void *ctx, const char *error)) { + char norm_ip[NET_IP_STR_LEN]; + struct sockaddr_storage sa; + if (inet_pton(AF_INET, ip, &(((struct sockaddr_in *)&sa)->sin_addr))) { + sa.ss_family = AF_INET; + } else if (inet_pton(AF_INET6, ip, &(((struct sockaddr_in6 *)&sa)->sin6_addr))) { + sa.ss_family = AF_INET6; + } else { + if (callback) callback(ctx, "Invalid node address specified"); + return; + } + inet_ntop(sa.ss_family, sa.ss_family == AF_INET ? (void *)&(((struct sockaddr_in *)&sa)->sin_addr) : (void *)&(((struct sockaddr_in6 *)&sa)->sin6_addr), norm_ip, NET_IP_STR_LEN); + + clusterNode *n = createClusterNode(NULL, CLUSTER_NODE_HANDSHAKE); + memcpy(n->ip, norm_ip, sizeof(n->ip)); + if (server.tls_cluster) { + n->tls_port = port; + } else { + n->tcp_port = port; + } + n->cport = cport; + clusterAddNode(n); + + serverLog(LL_DEBUG, "Raft: Initiating outbound connection to %s:%d", norm_ip, port); + + clusterLink *link = createClusterLink(n); + link->conn = connCreate(connTypeOfCluster()); + connSetPrivateData(link->conn, link); + if (connConnect(link->conn, n->ip, n->cport, server.bind_source_addr, 0, clusterLinkConnectHandler) == C_ERR) { + serverLog(LL_WARNING, "Raft: Failed to connect to %s:%d", n->ip, n->cport); + freeClusterLink(link); + if (callback) callback(ctx, "Failed to initiate connection"); + return; + } + + if (callback) callback(ctx, NULL); +} + +static void clusterRaftReset(int hard) { + UNUSED(hard); +} + +static int clusterRaftProtocolSubcommand(client *c) { + UNUSED(c); + return 0; +} + +void clusterRaftFree(void) { + if (RAFT_STATE()) { + dictRelease(RAFT_STATE()->applied_entries); + zfree(RAFT_STATE()); + server.cluster->protocol_data = NULL; + } +} + +/* Handshake related functions */ + +clusterMsgSendBlock *createClusterRaftMsgSendBlock(int type, uint32_t payload_len) { + uint32_t msglen = sizeof(clusterRaftHeader) + payload_len; + uint32_t blocklen = sizeof(clusterMsgSendBlock) + msglen; + clusterMsgSendBlock *msgblock = zcalloc(blocklen); + msgblock->refcount = 1; + msgblock->totlen = blocklen; + msgblock->len = msglen; + + clusterRaftHeader *hdr = (clusterRaftHeader *)msgblock->data; + memcpy(hdr->sig, "VCv2", 4); + hdr->totlen = htonl(msglen); + hdr->ver = htons(1); + hdr->type = htons(type); + + return msgblock; +} + +void clusterRaftConnectToNode(clusterNode *node) { + if (node->link) return; + clusterLink *link = createClusterLink(node); + link->conn = connCreate(connTypeOfCluster()); + connSetPrivateData(link->conn, link); + if (connConnect(link->conn, node->ip, node->cport, server.bind_source_addr, 0, clusterLinkConnectHandler) == C_ERR) { + serverLog(LL_WARNING, "Raft: Failed to initiate connection to %.40s", node->name); + freeClusterLink(link); + } +} + +void clusterRaftSendHandshakeReply(clusterLink *link, const char *node_name) { + serverLog(LL_DEBUG, "Raft: Sending handshake response to %.40s", node_name); + + clusterMsgSendBlock *msgblock = createClusterRaftMsgSendBlock(CLUSTERMSG_TYPE_RAFT_HANDSHAKE_REPLY, sizeof(clusterMsgRaftHandshake)); + clusterMsgRaftHandshake *resp = (clusterMsgRaftHandshake *)((char *)msgblock->data + sizeof(clusterRaftHeader)); + memcpy(resp->sender_name, server.cluster->myself->name, CLUSTER_NAMELEN); + char ip[NET_IP_STR_LEN] = {0}; + if (nodeIp2String(ip, link, "") == C_OK) { + memcpy(resp->remote_ip, ip, NET_IP_STR_LEN); + } + resp->plaintext_port = htons(server.port); + resp->tls_port = htons(server.tls_port); + resp->cluster_port = htons(myself->cport); + + clusterLinkSendBlock(link, msgblock); + clusterMsgSendBlockDecrRefCount(msgblock); +} + +void clusterRaftProcessHandshake(clusterLink *link, uint16_t type, clusterMsgRaftHandshake *req) { + char *sender_name = req->sender_name; + char *remote_ip = req->remote_ip; + + /* Set my IP if I don't know it */ + if (myself->ip[0] == '\0' && remote_ip[0] != '\0') { + valkey_strlcpy(myself->ip, remote_ip, NET_IP_STR_LEN); + char *colon = strchr(myself->ip, ':'); + if (colon) *colon = '\0'; + serverLog(LL_VERBOSE, "Raft: Discovered my IP: %s", myself->ip); + } + + if (verifyClusterNodeId(sender_name, CLUSTER_NAMELEN) != C_OK) { + serverLog(LL_WARNING, "Raft: Received handshake with invalid sender name"); + return; + } + + clusterNode *sender = clusterLookupNode(sender_name, CLUSTER_NAMELEN); + if (!sender) { + if (link->node && (link->node->flags & CLUSTER_NODE_HANDSHAKE)) { + /* Rename the handshake node to the real name */ + clusterRenameNode(link->node, sender_name); + sender = link->node; + } else { + /* Create new node if we don't have a handshake node for it */ + sender = createClusterNode(sender_name, CLUSTER_NODE_HANDSHAKE); + clusterAddNode(sender); + nodeIp2String(sender->ip, link, ""); + serverLog(LL_DEBUG, "Raft: Created handshake node for %.40s", sender_name); + } + } + + sender->tcp_port = ntohs(req->plaintext_port); + sender->tls_port = ntohs(req->tls_port); + sender->cport = ntohs(req->cluster_port); + + if (link->node && (link->node->flags & CLUSTER_NODE_HANDSHAKE)) { + if (link->node != sender) { + serverLog(LL_DEBUG, "Raft: Moving link from handshake node %.40s to real node %.40s", link->node->name, sender->name); + clusterNode *old_node = link->node; + old_node->link = NULL; + sender->link = link; + link->node = sender; + + /* Delete the old temporary node */ + clusterDelNode(old_node); + } + } else if (!link->node) { + setClusterNodeToInboundClusterLink(sender, link); + if (!sender->link) { + clusterRaftConnectToNode(sender); + } + } + + RAFT_DATA(sender)->member_count = ntohl(req->member_count); + clusterRaftStateMachine(); + + if (type == CLUSTERMSG_TYPE_RAFT_HANDSHAKE_REQUEST) { + clusterRaftSendHandshakeReply(link, sender_name); + } +} + +void clusterRaftStateMachine(void) { + if (server.cluster == NULL || !RAFT_STATE()) return; + + dictIterator *di = dictGetSafeIterator(server.cluster->nodes); + dictEntry *de; + while ((de = dictNext(di)) != NULL) { + clusterNode *node = dictGetVal(de); + if (node == server.cluster->myself) continue; + if (!RAFT_DATA(node)) continue; + + /* Handshake timeout */ + if (node->flags & CLUSTER_NODE_HANDSHAKE) { + mstime_t timeout = server.cluster_node_timeout > 1000 ? server.cluster_node_timeout : 1000; + if (mstime() - node->ctime > timeout) { + serverLog(LL_WARNING, "Raft: Handshake timeout for node %.40s", node->name); + clusterDelNode(node); + continue; + } + } + + switch (RAFT_DATA(node)->role) { + case RAFT_ROLE_HANDSHAKING: + if (node->link && connGetState(node->link->conn) == CONN_STATE_CONNECTED && node->inbound_link != NULL) { + clusterRaftSetNodeRole(node, RAFT_ROLE_NON_MEMBER); + } + break; + default: + break; + } + } + dictReleaseIterator(di); +} + +void clusterRaftGetMetadata(client *c) { + UNUSED(c); +} + +void clusterRaftSetMetadata(client *c) { + UNUSED(c); +} + +clusterBusType clusterRaftBus = { + .init = clusterRaftInit, + .initLast = clusterRaftInitLast, + .cron = clusterRaftCron, + .beforeSleep = clusterRaftBeforeSleep, + .handleServerShutdown = clusterRaftHandleServerShutdown, + .validateMessageHeader = clusterRaftValidateMessageHeader, + .processMessage = clusterRaftProcessMessage, + .postConnect = clusterRaftPostConnect, + .onMyselfUpdated = clusterRaftOnMyselfUpdated, + .propagatePublish = clusterRaftPropagatePublish, + .sendModuleMessage = clusterRaftSendModuleMessage, + .getConnectionsCount = clusterRaftGetConnectionsCount, + .resetStats = clusterRaftResetStats, + .appendInfoFields = clusterRaftAppendInfoFields, + .getFailureReportsCount = clusterRaftGetFailureReportsCount, + .getNodePingPongEpoch = clusterRaftGetNodePingPongEpoch, + .setNodePingPongEpoch = clusterRaftSetNodePingPongEpoch, + .setNodeFailed = clusterRaftSetNodeFailed, + .appendVarsLine = clusterRaftAppendVarsLine, + .parseVarsLine = clusterRaftParseVarsLine, + .postLoad = clusterRaftPostLoad, + .initNodeData = clusterRaftInitNodeData, + .freeNodeData = clusterRaftFreeNodeData, + .slotChange = clusterRaftSlotChange, + .cancelManualFailover = clusterRaftCancelManualFailover, + .cancelAutomaticFailover = clusterRaftCancelAutomaticFailover, + .forgetNode = clusterRaftForgetNode, + .setReplicaOf = clusterRaftSetReplicaOf, + .failover = clusterRaftFailover, + .meet = clusterRaftMeet, + .resetCluster = clusterRaftReset, + .protocolSubcommand = clusterRaftProtocolSubcommand, +}; diff --git a/src/cluster_raft.h b/src/cluster_raft.h new file mode 100644 index 00000000000..308d2d64a23 --- /dev/null +++ b/src/cluster_raft.h @@ -0,0 +1,91 @@ +#ifndef __CLUSTER_RAFT_H +#define __CLUSTER_RAFT_H + +#include "server.h" +#include "sds.h" +#include "cluster.h" + +/* Forward declarations */ +typedef struct clusterMsg clusterMsg; +typedef struct clusterMsgPingExt clusterMsgPingExt; +struct clusterLink; + +/* Raft message types */ +#define CLUSTERMSG_TYPE_RAFT_HANDSHAKE_REQUEST 0 +#define CLUSTERMSG_TYPE_RAFT_HANDSHAKE_REPLY 1 + +/* Raft message header */ +typedef struct { + char sig[4]; /* Signature "VCv2" */ + uint32_t totlen; /* Total length of this message */ + uint16_t ver; /* Protocol version */ + uint16_t type; /* Message type */ +} clusterRaftHeader; + +/* Raft node role state machine: + * + * CLUSTER MEET + * Remote Node + * | + * v + * +-------------+ +-------------+ +-------------+ + * | | Handsh. | | Propos. | | + * | HANDSHAKING |-------->| NON_MEMBER |-------->| JOINING | + * | | Compl. | | | | + * +-------------+ +-------------+ +-------------+ + * ^ | + * +: : : : Join Decision : : : : | Committed + * : v + * +-------------+ +-------------+ +-------------+ + * | | Quorum | | Elect. | | + * Bootstrap --->| LEADER |<--------| CANDIDATE |<--------| FOLLOWER | + * | | Recv. | | Timeout | | + * +-------------+ +-------------+ +-------------+ + * | ^ | + * | | Leader Discov. | + * | +-----------------------+ + * | | + * | Higher Term Seen | + * +-----------------------------------------------+ + */ +#define RAFT_ROLE_UNKNOWN 0 /* Unused outside of node initialization */ +#define RAFT_ROLE_FOLLOWER 1 /* A follower in my Raft group*/ +#define RAFT_ROLE_CANDIDATE 2 /* A candidate to be leader in my Raft group */ +#define RAFT_ROLE_LEADER 3 /* The leader of my Raft group */ +#define RAFT_ROLE_HANDSHAKING 4 /* A non-member node I am handshaking with */ +#define RAFT_ROLE_NON_MEMBER 5 /* A non-member node I have finished handshaking */ +#define RAFT_ROLE_JOINING 6 /* A proposed member node that has not been fully committed.*/ + +typedef struct raftAppliedEntry { + uint64_t index; + uint64_t term; + sds value; +} raftAppliedEntry; + +typedef struct clusterRaftState { + bool enabled; + uint64_t term; /* Current term */ + clusterNode *leader; /* Pointer to the current leader */ + + uint64_t last_applied_index; /* Index of the newest entry in applied_entries */ + uint64_t last_applied_term; /* Term of the newest entry in applied_entries */ + dict *applied_entries; /* Map of key -> raftAppliedEntry */ +} clusterRaftState; + +typedef struct clusterNodeRaftData { + int role; + uint32_t member_count; /* Raft member count of the node */ + mstime_t outbound_link_attempt_time; /* Time of last connection attempt */ +} clusterNodeRaftData; + +typedef struct { + uint64_t term; + char sender_name[CLUSTER_NAMELEN]; + uint32_t member_count; + char remote_ip[NET_IP_STR_LEN]; + uint16_t plaintext_port; + uint16_t tls_port; + uint16_t cluster_port; +} clusterMsgRaftHandshake; + +#endif diff --git a/src/cluster_state.c b/src/cluster_state.c index ea3c93ed381..ad8d6ade13f 100644 --- a/src/cluster_state.c +++ b/src/cluster_state.c @@ -577,6 +577,56 @@ void freeClusterNode(clusterNode *n) { zfree(n); } +/* Remove a node from the cluster. The function performs the high level + * cleanup, calling freeClusterNode() for the low level cleanup. + * Here we do the following: + * + * 1) Mark all the slots handled by it as unassigned. + * 2) Remove all the failure reports sent by this node and referenced by + * other nodes via the bus hook. + * 3) Remove the node from the owning shard + * 4) Free the node with freeClusterNode() that will in turn remove it + * from the hash table and from the list of replicas of its primary, if + * it is a replica node. + */ +void clusterDelNode(clusterNode *delnode) { + serverAssert(delnode != NULL); + serverLog(LL_DEBUG, "Deleting node %.40s (%s) from cluster view", delnode->name, humanNodename(delnode)); + + int j; + + /* 1) Mark slots as unassigned. */ + for (j = 0; j < CLUSTER_SLOTS; j++) { + if (getImportingSlotSource(j) == delnode) setImportingSlotSource(j, NULL); + if (getMigratingSlotDest(j) == delnode) setMigratingSlotDest(j, NULL); + if (server.cluster->slots[j] == delnode) clusterDelSlot(j); + } + + /* 2) Remove failure reports via bus hook. */ + if (clusterCurrentBus->cleanupNode) { + clusterCurrentBus->cleanupNode(delnode); + } + + /* 3) Remove the node from the owning shard */ + clusterRemoveNodeFromShard(delnode); + + /* 4) Free the node, unlinking it from the cluster. */ + freeClusterNode(delnode); +} + +void clusterRenameNode(clusterNode *node, char *newname) { + int retval; + sds s = sdsnewlen(node->name, CLUSTER_NAMELEN); + + serverLog(LL_DEBUG, "Renaming node %.40s (%s) into %.40s", node->name, humanNodename(node), newname); + retval = dictDelete(server.cluster->nodes, s); + sdsfree(s); + serverAssert(retval == DICT_OK); + memcpy(node->name, newname, CLUSTER_NAMELEN); + clusterAddNode(node); + clusterAddNodeToShard(node->shard_id, node); +} + /* ----------------------------------------------------------------------------- * Slot assignment * -------------------------------------------------------------------------- */ diff --git a/src/config.c b/src/config.c index 74e464ced48..bb60c6b39a1 100644 --- a/src/config.c +++ b/src/config.c @@ -3287,6 +3287,7 @@ standardConfig static_configs[] = { createBoolConfig("activedefrag", NULL, DEBUG_CONFIG | MODIFIABLE_CONFIG, server.active_defrag_enabled, CONFIG_ACTIVE_DEFRAG_DEFAULT, isValidActiveDefrag, NULL), createBoolConfig("syslog-enabled", NULL, IMMUTABLE_CONFIG, server.syslog_enabled, 0, NULL, NULL), createBoolConfig("cluster-enabled", NULL, IMMUTABLE_CONFIG, server.cluster_enabled, 0, NULL, NULL), + createBoolConfig("cluster-raft-enabled", NULL, IMMUTABLE_CONFIG, server.cluster_raft_enabled, 0, NULL, NULL), createBoolConfig("appendonly", NULL, MODIFIABLE_CONFIG | DENY_LOADING_CONFIG, server.aof_enabled, 0, NULL, updateAppendOnly), createBoolConfig("cluster-allow-reads-when-down", NULL, MODIFIABLE_CONFIG, server.cluster_allow_reads_when_down, 0, NULL, NULL), createBoolConfig("cluster-allow-pubsubshard-when-down", NULL, MODIFIABLE_CONFIG, server.cluster_allow_pubsubshard_when_down, 1, NULL, NULL), diff --git a/src/server.c b/src/server.c index 737e5175bab..17426bde4d9 100644 --- a/src/server.c +++ b/src/server.c @@ -2346,6 +2346,7 @@ void initServerConfig(void) { server.shutdown_flags = 0; server.shutdown_mstime = 0; server.cluster_module_flags = CLUSTER_MODULE_FLAG_NONE; + server.cluster_raft_enabled = 0; server.migrate_cached_sockets = dictCreate(&migrateCacheDictType); server.next_client_id = 1; /* Client IDs, start from 1 .*/ server.page_size = sysconf(_SC_PAGESIZE); diff --git a/src/server.h b/src/server.h index 5ce54dfa59f..18db426c24a 100644 --- a/src/server.h +++ b/src/server.h @@ -2254,6 +2254,7 @@ struct valkeyServer { unsigned int watching_clients; /* # of clients are watching keys */ /* Cluster */ int cluster_enabled; /* Is cluster enabled? */ + int cluster_raft_enabled; /* Is Raft enabled for metadata? */ int cluster_port; /* Set the cluster port for a node. */ mstime_t cluster_node_timeout; /* Cluster node timeout. */ mstime_t cluster_ping_interval; /* A debug configuration for setting how often cluster nodes send ping messages. */ diff --git a/tests/unit/cluster/raft.tcl b/tests/unit/cluster/raft.tcl new file mode 100644 index 00000000000..e0f6382bfb0 --- /dev/null +++ b/tests/unit/cluster/raft.tcl @@ -0,0 +1,158 @@ +# Test Raft handshake and basic commands + +proc cluster_has_node_handshaking_and_connected {myself_id target_id} { + set target_name [R $target_id CLUSTER MYID] + set nodes [R $myself_id CLUSTER NODES] + foreach line [split $nodes "\n"] { + if {$line eq ""} continue + set fields [split $line " "] + set id [lindex $fields 0] + set flags [lindex $fields 2] + set status [lindex $fields 7] + if {$id eq $target_name} { + if {[string match "*handshake*" $flags] && $status eq "connected"} { + return 1 + } + } + } + return 0 +} + +tags {tls:skip external:skip cluster singledb} { + +set base_conf [list cluster-enabled yes cluster-raft-enabled yes] +start_multiple_servers 2 [list overrides $base_conf] { + +test "Raft nodes are reachable" { + for {set id 0} {$id < [llength $::servers]} {incr id} { + wait_for_condition 1000 50 { + ([catch {R $id ping} ping_reply] == 0) && + ($ping_reply eq {PONG}) + } else { + fail "Node #$id keeps replying to PING." + } + } +} + +test "Raft Handshake test" { + set port1 [srv -1 port] + set host1 [srv -1 host] + + # Node 0 meets Node 1 + R 0 CLUSTER MEET $host1 $port1 + + # Wait for handshake to complete (links connected, but stays in handshake state) + wait_for_condition 5000 100 { + [cluster_has_node_handshaking_and_connected 0 1] + } else { + fail "Handshake did not complete or links not connected" + } +} + +test "CLUSTER INFO shows Raft info" { + set info [R 0 CLUSTER INFO] + assert_match "*raft_role:*" $info + assert_match "*raft_node_count:2*" $info +} + +test "CLUSTER NODES shows connected nodes" { + set port0 [srv 0 port] + set host0 [srv 0 host] + set port1 [srv -1 port] + set host1 [srv -1 host] + set nodes [R 0 CLUSTER NODES] + assert_match "*$host1:$port1*" $nodes + set id0 [R 0 CLUSTER MYID] + set id1 [R 1 CLUSTER MYID] + set found0 0 + set found1 0 + foreach line [split $nodes "\n"] { + if {$line eq ""} continue + set fields [split $line " "] + set id [lindex $fields 0] + if {$id eq $id1} { + set found1 1 + set addr [lindex $fields 1] + set flags [lindex $fields 2] + set status [lindex $fields 7] + + # Verify address contains host and port + assert_match "*$host1:$port1*" $addr + + # Verify flags contain handshake + assert_match "*handshake*" $flags + + # Verify status is connected + assert_equal "connected" $status + } + if {$id eq $id0} { + set found0 1 + set addr [lindex $fields 1] + set flags [lindex $fields 2] + set status [lindex $fields 7] + + # Verify address contains host and port + assert_match "*$host0:$port0*" $addr + + # Verify flags contain myself flag + assert_match "*myself*" $flags + + # Verify status is connected + assert_equal "connected" $status + } + } + assert {$found0 == 1} + assert {$found1 == 1} +} + +} ;# stop servers + +test "Raft HANDSHAKING Node Timeout test" { + start_server {overrides {cluster-enabled yes cluster-raft-enabled yes cluster-node-timeout 1000}} { + set port 9999 ;# Random port likely not in use + + # Meet a non-existent node + r CLUSTER MEET 127.0.0.1 $port + + # Verify node is added and in handshake state + set nodes [r CLUSTER NODES] + assert_match "*handshake*" $nodes + + # Wait for timeout (more than 1000ms) + after 2000 + + # Verify node is removed + set nodes [r CLUSTER NODES] + assert_no_match "*127.0.0.1:$port*" $nodes + } +} + +test "Raft EXTERNAL Node Timeout test" { + set base_conf [list cluster-enabled yes cluster-raft-enabled yes] + start_multiple_servers 2 [list overrides $base_conf] { + set port1 [srv -1 port] + set host1 [srv -1 host] + + # Node 0 meets Node 1 + R 0 CLUSTER MEET $host1 $port1 + + # Wait for handshake to complete (Node 1 becomes EXTERNAL in Node 0) + wait_for_condition 5000 100 { + [cluster_has_node_handshaking_and_connected 0 1] + } else { + fail "Handshake did not complete or links not connected" + } + + # Now set short timeout to trigger it + R 0 CONFIG SET cluster-node-timeout 1000 + + # Wait for timeout (more than 1000ms) + after 2000 + + # Verify Node 1 is removed from Node 0's view + set nodes [R 0 CLUSTER NODES] + assert_no_match "*$host1:$port1*" $nodes + } +} + +} ;# tags diff --git a/tests/unit/introspection.tcl b/tests/unit/introspection.tcl index c788abac0ba..86cea97ace8 100644 --- a/tests/unit/introspection.tcl +++ b/tests/unit/introspection.tcl @@ -1228,6 +1228,7 @@ start_server {tags {"introspection"}} { always-show-logo syslog-enabled cluster-enabled + cluster-raft-enabled disable-thp aclfile unixsocket