Skip to content
This repository was archived by the owner on Jan 24, 2024. It is now read-only.
Draft
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.streamnative.pulsar.handlers.kop.storage;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
Expand All @@ -31,6 +32,7 @@
import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
import org.apache.pulsar.client.api.Message;
Expand All @@ -53,28 +55,69 @@ public class PulsarTopicProducerStateManagerSnapshotBuffer implements ProducerSt

private synchronized CompletableFuture<Reader<ByteBuffer>> ensureReaderHandle() {
if (reader == null) {
reader = pulsarClient.newReaderBuilder()
CompletableFuture<Reader<ByteBuffer>> newReader = pulsarClient.newReaderBuilder()
.topic(topic)
.startMessageId(MessageId.earliest)
.readCompacted(true)
.createAsync();
reader = newReader;

newReader.whenComplete((r, error) -> {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a unit test for this case?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for not getting back to you sooner. I found this change may cause NPE problems, I add NPE checks and unit tests, maybe we don't need to merge this PR immediately.

if (error != null) {
discardReader(newReader);
}
});
}
return reader;
}

private synchronized void discardReader(CompletableFuture<Reader<ByteBuffer>> oldReader) {
if (reader == oldReader || (reader != null && reader.isCompletedExceptionally())) {
reader = null;
log.info("discard broken reader for {}", topic);
}
}

@SuppressFBWarnings("NP_NONNULL_PARAM_VIOLATION")
private synchronized void discardReader(Reader<ByteBuffer> oldReader) {
if (reader == null) {
return;
}
if (reader.isCompletedExceptionally() || (reader.isDone()
&& !reader.isCompletedExceptionally()
&& reader.getNow(null) == oldReader)) {
log.info("discard broken reader for {}", topic);
reader = null;
}
}

private synchronized CompletableFuture<Producer<ByteBuffer>> ensureProducerHandle() {
if (producer == null) {
producer = pulsarClient.newProducerBuilder()
CompletableFuture<Producer<ByteBuffer>> newProducer = pulsarClient.newProducerBuilder()
.enableBatching(false)
.topic(topic)
.blockIfQueueFull(true)
.createAsync();

producer = newProducer;

newProducer.whenComplete((r, error) -> {
if (error != null) {
discardProducer(newProducer);
}
});
}
return producer;
}

private synchronized void discardProducer(CompletableFuture<Producer<ByteBuffer>> oldProducer) {
if (producer == oldProducer) {
producer = null;
}
}

private CompletableFuture<Void> readNextMessageIfAvailable(Reader<ByteBuffer> reader) {
return reader
CompletableFuture<Void> result = reader
.hasMessageAvailableAsync()
.thenCompose(hasMessageAvailable -> {
if (hasMessageAvailable == null
Expand All @@ -88,11 +131,19 @@ private CompletableFuture<Void> readNextMessageIfAvailable(Reader<ByteBuffer> re
});
}
});

result.whenComplete((r, error) -> {
if (error != null) {
discardReader(reader);
}
});

return result;
}


private synchronized CompletableFuture<Void> ensureLatestData(boolean beforeWrite) {
if (currentReadHandle != null) {
if (currentReadHandle != null && !currentReadHandle.isCompletedExceptionally()) {
if (beforeWrite) {
// we are inside a write loop, so
// we must ensure that we start to read now
Expand All @@ -113,6 +164,12 @@ private synchronized CompletableFuture<Void> ensureLatestData(boolean beforeWrit
final CompletableFuture<Void> newReadHandle =
readerHandle.thenCompose(this::readNextMessageIfAvailable);
currentReadHandle = newReadHandle;

newReadHandle.exceptionally(___ -> {
endReadLoop(newReadHandle);
return null;
});

return newReadHandle.thenApply((__) -> {
endReadLoop(newReadHandle);
return null;
Expand Down