Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.retry.RetryPolicies;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.MiniRaftCluster;
Expand All @@ -47,11 +48,13 @@

import java.io.File;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -83,6 +86,11 @@ public abstract class InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC

private static final int SNAPSHOT_TRIGGER_THRESHOLD = 64;
private static final int PURGE_GAP = 8;
/**
* Keep the retry budget bounded so that a stuck reconfiguration surfaces as a test failure with
* diagnostics, instead of silently retrying until the outer JUnit timeout aborts the test.
*/
private static final int SET_CONFIGURATION_MAX_ATTEMPTS = 20;
private static final AtomicReference<SnapshotInfo> LEADER_SNAPSHOT_INFO_REF = new AtomicReference<>();

private static final AtomicInteger numSnapshotRequests = new AtomicInteger();
Expand Down Expand Up @@ -523,6 +531,106 @@ public void testInstallSnapshotDuringBootstrap() throws Exception {
runWithNewCluster(1, this::testInstallSnapshotDuringBootstrap);
}

/**
* Issue setConfiguration in a bounded retry loop. Each iteration uses a no-retry client so the
* outer loop controls when we log state, sleep, and finally fail with the collected diagnostics.
*/
private void setConfigurationWithBoundedRetry(CLUSTER cluster, RaftPeer[] peersInNewConf, String phase)
throws IOException {
IOException lastException = null;
for (int attempt = 1; attempt <= SET_CONFIGURATION_MAX_ATTEMPTS; attempt++) {
final RaftPeerId leaderId;
try {
leaderId = RaftTestUtil.waitForLeader(cluster).getId();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
final InterruptedIOException ioe = new InterruptedIOException(
phase + ": interrupted while waiting for leader before setConfiguration");
ioe.initCause(e);
dumpClusterState(cluster, phase + ": setConfiguration", peersInNewConf, ioe);
throw ioe;
}
LOG.info("{}: setConfiguration attempt #{}/{}, leaderId={}, targetConf={}",
phase, attempt, SET_CONFIGURATION_MAX_ATTEMPTS, leaderId, Arrays.asList(peersInNewConf));
// Disable the client's internal retry loop here. We want one observable RPC attempt per
// outer iteration so that CI logs show which attempt failed and what the cluster looked like.
try (final RaftClient client = cluster.createClient(leaderId, RetryPolicies.noRetry())) {
final RaftClientReply reply = client.admin().setConfiguration(peersInNewConf);
if (reply.isSuccess()) {
LOG.info("{}: setConfiguration succeeded on attempt #{}: {}", phase, attempt, reply);
return;
}
lastException = new IOException("setConfiguration returned unsuccessful reply: " + reply);
} catch (IOException e) {
lastException = e;
}

LOG.warn("{}: setConfiguration attempt #{} failed, targetConf={}, lastException={}, {}",
phase, attempt, Arrays.asList(peersInNewConf),
lastException == null ? null : lastException.toString(), cluster.printServers());
if (attempt < SET_CONFIGURATION_MAX_ATTEMPTS) {
try {
ONE_SECOND.sleep();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
final InterruptedIOException ioe = new InterruptedIOException(
phase + ": interrupted while sleeping before setConfiguration retry");
ioe.initCause(e);
dumpClusterState(cluster, phase + ": setConfiguration", peersInNewConf, ioe);
throw ioe;
}
}
}

dumpClusterState(cluster, phase + ": setConfiguration", peersInNewConf, lastException);
throw lastException != null? lastException
: new IOException("setConfiguration failed after " + SET_CONFIGURATION_MAX_ATTEMPTS + " attempts");
}

/**
* When the new configuration never becomes stable, dump the same diagnostics we use for
* setConfiguration failures so the CI log shows whether the request failed early or the cluster
* got stuck during convergence.
*/
private void waitAndCheckNewConfWithDiagnostics(CLUSTER cluster, RaftPeer[] peersInNewConf, String phase)
throws Exception {
try {
LOG.info("{}: waitAndCheckNewConf, targetConf={}", phase, Arrays.asList(peersInNewConf));
RaftServerTestUtil.waitAndCheckNewConf(cluster, peersInNewConf, 0, null);
} catch (AssertionError | Exception e) {
dumpClusterState(cluster, phase + ": waitAndCheckNewConf", peersInNewConf, e);
throw e;
}
}

/**
* Print both the summarized cluster view and per-division indices/snapshot state so that a CI
* timeout can be traced to bootstrap, snapshot installation, or configuration-commit lag.
*/
private void dumpClusterState(CLUSTER cluster, String phase, RaftPeer[] peersInNewConf, Throwable cause) {
final SnapshotInfo leaderSnapshotInfo = LEADER_SNAPSHOT_INFO_REF.get();
LOG.error("{} failed: targetConf={}, numSnapshotRequests={}, numNotifyInstallSnapshotFinished={}, "
+ "leaderSnapshotInfo={}\n{}\n{}",
phase, Arrays.asList(peersInNewConf), numSnapshotRequests.get(), numNotifyInstallSnapshotFinished.get(),
Optional.ofNullable(leaderSnapshotInfo).map(SnapshotInfo::getTermIndex).orElse(null),
cluster.printServers(), cluster.printAllLogs(), cause);

for (RaftServer.Division division : cluster.iterateDivisions()) {
final SnapshotInfo snapshot = division.getStateMachine().getLatestSnapshot();
LOG.error("{}: divisionState id={}, role={}, leaderId={}, leaderReady={}, alive={}, term={}, "
+ "lastAppliedIndex={}, snapshot={}, logStartIndex={}, logNextIndex={}, commitIndex={}, conf={}, "
+ "followerNextIndices={}, followerMatchIndices={}",
phase, division.getId(), division.getInfo().getCurrentRole(), division.getInfo().getLeaderId(),
division.getInfo().isLeaderReady(), division.getInfo().isAlive(), division.getInfo().getCurrentTerm(),
division.getInfo().getLastAppliedIndex(),
Optional.ofNullable(snapshot).map(SnapshotInfo::getTermIndex).orElse(null),
division.getRaftLog().getStartIndex(), division.getRaftLog().getNextIndex(),
division.getRaftLog().getLastCommittedIndex(), division.getRaftConf(),
Arrays.toString(division.getInfo().getFollowerNextIndices()),
Arrays.toString(division.getInfo().getFollowerMatchIndices()));
}
}

private void testInstallSnapshotDuringBootstrap(CLUSTER cluster) throws Exception {
LEADER_SNAPSHOT_INFO_REF.set(null);
numSnapshotRequests.set(0);
Expand Down Expand Up @@ -559,11 +667,12 @@ private void testInstallSnapshotDuringBootstrap(CLUSTER cluster) throws Exceptio
// add two more peers
final MiniRaftCluster.PeerChanges change = cluster.addNewPeers(2, true,
true);
final String phase = "testInstallSnapshotDuringBootstrap";
// trigger setConfiguration
RaftServerTestUtil.runWithMinorityPeers(cluster, Arrays.asList(change.allPeersInNewConf),
peers -> cluster.setConfiguration(peers.toArray(RaftPeer.emptyArray())));
peers -> setConfigurationWithBoundedRetry(cluster, peers.toArray(RaftPeer.emptyArray()), phase));

RaftServerTestUtil.waitAndCheckNewConf(cluster, change.allPeersInNewConf, 0, null);
waitAndCheckNewConfWithDiagnostics(cluster, change.allPeersInNewConf, phase);

// Check the installed snapshot index on each Follower matches with the
// leader snapshot.
Expand Down
Loading