Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 35 additions & 13 deletions src/cluster_legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -2997,6 +2997,16 @@ static void clusterLogSlotRangeMigration(int first_slot,
target_node->name, humanNodename(target_node), target_node->shard_id);
}

/* When iterating through the slot bitmap, group every 64 bits as
* a word to speedup. */
static inline int clusterExtractSlotFromWord(uint64_t *slot_word, size_t slot_word_index) {
/* Get the index of the least-significant set bit, in this 64-bit word */
const unsigned bit = (unsigned)__builtin_ctzll(*slot_word);
const int slot = (int)((slot_word_index << 6) | bit);
*slot_word &= *slot_word - 1; /* clear that bit */
return slot;
}

/* This function is called when we receive a primary configuration via a
* PING, PONG or UPDATE packet. What we receive is a node, a configEpoch of the
* node, and the set of slots claimed under this configEpoch.
Expand Down Expand Up @@ -3047,8 +3057,25 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
int first_migrated_slot = -1, last_migrated_slot = -1;
clusterNode *migration_source_node = NULL;

for (j = 0; j < CLUSTER_SLOTS; j++) {
if (bitmapTestBit(slots, j)) {
for (size_t w = 0; w < CLUSTER_SLOT_WORDS; w++) {
uint64_t msg_word;
uint64_t old_word;

/* Load 64 slots at once from the incoming message */
memcpy(&msg_word, slots + SLOT_WORD_OFFSET(w), sizeof(msg_word));
memrev64ifbe(&msg_word);

/* Load our local record of sender's slots */
memcpy(&old_word, sender->slots + SLOT_WORD_OFFSET(w), sizeof(old_word));
memrev64ifbe(&old_word);

/* Performance: If nothing changed and no slots are involved, skip 64 slots. */
if (msg_word == 0 && old_word == 0) continue;

/* Process only the set bits in the message word */
uint64_t current_msg_word = msg_word;
while (current_msg_word) {
j = clusterExtractSlotFromWord(&current_msg_word, w);
sender_slots++;

/* The slot is already bound to the sender of this message. */
Expand Down Expand Up @@ -3149,7 +3176,12 @@ void clusterUpdateSlotsConfigWith(clusterNode *sender, uint64_t senderConfigEpoc
bitmapClearBit(server.cluster->owner_not_claiming_slot, j);
clusterDoBeforeSleep(CLUSTER_TODO_SAVE_CONFIG | CLUSTER_TODO_UPDATE_STATE | CLUSTER_TODO_FSYNC_CONFIG);
}
} else {
}

/* Logic for slots in this word that are NOT in the message but were in our config */
uint64_t lost_slots = old_word & ~msg_word;
while (lost_slots) {
j = clusterExtractSlotFromWord(&lost_slots, w);
if (server.cluster->slots[j] == sender) {
/* The slot is currently bound to the sender but the sender is no longer
* claiming it. We don't want to unbind the slot yet as it can cause the cluster
Expand Down Expand Up @@ -3792,16 +3824,6 @@ int clusterIsValidPacket(clusterLink *link) {
return 1;
}

/* When iterating through the slot bitmap, group every 64 bits as
* a word to speedup. */
static inline int clusterExtractSlotFromWord(uint64_t *slot_word, size_t slot_word_index) {
/* Get the index of the least-significant set bit, in this 64-bit word */
const unsigned bit = (unsigned)__builtin_ctzll(*slot_word);
const int slot = (int)((slot_word_index << 6) | bit);
*slot_word &= *slot_word - 1; /* clear that bit */
return slot;
}

/* When this function is called, there is a packet to process starting
* at link->rcvbuf. Releasing the buffer is up to the caller, so this
* function should just handle the higher level stuff of processing the
Expand Down
Loading