Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,13 @@ public class DistributedDoubleBarrier {
private final int memberQty;
private final String ourPath;
private final String readyPath;
private final AtomicBoolean hasBeenNotified = new AtomicBoolean(false);
private final AtomicBoolean connectionLost = new AtomicBoolean(false);
private final Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
connectionLost.set(event.getState() != Event.KeeperState.SyncConnected);
client.runSafe(() -> {
synchronized (DistributedDoubleBarrier.this) {
hasBeenNotified.set(true);
DistributedDoubleBarrier.this.notifyAll();
}
});
Expand Down Expand Up @@ -253,8 +251,7 @@ private void checkDeleteOurPath(boolean shouldExist) throws Exception {
}

private synchronized boolean internalEnter(long startMs, boolean hasMaxWait, long maxWaitMs) throws Exception {
boolean result = true;
do {
while (true) {
List<String> children = getChildrenForEntering();
int count = (children != null) ? children.size() : 0;
if (count >= memberQty) {
Expand All @@ -263,26 +260,19 @@ private synchronized boolean internalEnter(long startMs, boolean hasMaxWait, lon
} catch (KeeperException.NodeExistsException ignore) {
// ignore
}
break;
return true;
}

if (hasMaxWait && !hasBeenNotified.get()) {
if (hasMaxWait) {
long elapsed = System.currentTimeMillis() - startMs;
long thisWaitMs = maxWaitMs - elapsed;
if (thisWaitMs <= 0) {
result = false;
} else {
wait(thisWaitMs);
}

if (!hasBeenNotified.get()) {
result = false;
return false;
}
wait(thisWaitMs);
} else {
wait();
}
} while (false);

return result;
}
}
}