Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@

package io.streamnative.pulsar.handlers.amqp;

import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import lombok.Getter;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.authentication.AuthenticationService;
Expand All @@ -36,11 +39,13 @@ public class AmqpBrokerService {
private ConnectionContainer connectionContainer;
@Getter
private PulsarService pulsarService;
private ScheduledExecutorService exchangeRouteAckExecutor = Executors.newSingleThreadScheduledExecutor(
new DefaultThreadFactory("ex-route-ack"));

public AmqpBrokerService(PulsarService pulsarService) {
this.pulsarService = pulsarService;
this.amqpTopicManager = new AmqpTopicManager(pulsarService);
this.exchangeContainer = new ExchangeContainer(amqpTopicManager, pulsarService);
this.exchangeContainer = new ExchangeContainer(amqpTopicManager, pulsarService, exchangeRouteAckExecutor);
this.queueContainer = new QueueContainer(amqpTopicManager, pulsarService, exchangeContainer);
this.exchangeService = new ExchangeServiceImpl(exchangeContainer);
this.queueService = new QueueServiceImpl(exchangeContainer, queueContainer);
Expand All @@ -50,7 +55,7 @@ public AmqpBrokerService(PulsarService pulsarService) {
public AmqpBrokerService(PulsarService pulsarService, ConnectionContainer connectionContainer) {
this.pulsarService = pulsarService;
this.amqpTopicManager = new AmqpTopicManager(pulsarService);
this.exchangeContainer = new ExchangeContainer(amqpTopicManager, pulsarService);
this.exchangeContainer = new ExchangeContainer(amqpTopicManager, pulsarService, exchangeRouteAckExecutor);
this.queueContainer = new QueueContainer(amqpTopicManager, pulsarService, exchangeContainer);
this.exchangeService = new ExchangeServiceImpl(exchangeContainer);
this.queueService = new QueueServiceImpl(exchangeContainer, queueContainer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;

import io.streamnative.pulsar.handlers.amqp.impl.PersistentExchange;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
Expand All @@ -27,6 +29,7 @@
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
Expand Down Expand Up @@ -74,10 +77,18 @@ protected enum State {
private final Backoff readFailureBackoff = new Backoff(
1, TimeUnit.SECONDS, 1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);

protected AmqpExchangeReplicator(PersistentExchange persistentExchange) {
private final ScheduledExecutorService markDeleteExecutor;
private final ConcurrentLinkedDeque<Position> markDeletePositionDeque = new ConcurrentLinkedDeque<>();
private final Backoff markDeleteBackoff = new Backoff(
100, TimeUnit.MILLISECONDS, 1, TimeUnit.SECONDS, 1, TimeUnit.SECONDS);

protected AmqpExchangeReplicator(PersistentExchange persistentExchange,
ScheduledExecutorService exchangeRouteAckExecutor) {
this.persistentExchange = persistentExchange;
this.topic = (PersistentTopic) persistentExchange.getTopic();
this.scheduledExecutorService = topic.getBrokerService().executor();
this.markDeleteExecutor = exchangeRouteAckExecutor;
this.markDeleteExecutor.schedule(this::batchMarkDelete, 100, TimeUnit.MILLISECONDS);
STATE_UPDATER.set(this, AmqpExchangeReplicator.State.Stopped);
this.name = "[AMQP Replicator for " + topic.getName() + " ]";
}
Expand Down Expand Up @@ -198,13 +209,13 @@ public void readEntriesComplete(List<Entry> list, Object o) {
completableFuture.whenComplete((ignored, exception) -> {
if (exception != null) {
log.error("{} Error producing messages", name, exception);
batchMarkDelete();
AmqpExchangeReplicator.this.cursor.rewind();
} else {
if (log.isDebugEnabled()) {
log.debug("{} Route message successfully.", name);
}
AmqpExchangeReplicator.this.cursor
.asyncDelete(entry.getPosition(), this, entry.getPosition());
this.markDeletePositionDeque.add(entry.getPosition());
}
entry.release();

Expand Down Expand Up @@ -245,15 +256,17 @@ public void readEntriesFailed(ManagedLedgerException exception, Object o) {
}

@Override
public void deleteComplete(Object position) {
public void deleteComplete(Object positions) {
if (log.isDebugEnabled()) {
log.debug("{} Deleted message at {}", name, position);
log.debug("{} Deleted message at {}", name, positions);
}
}

@Override
public void deleteFailed(ManagedLedgerException e, Object position) {
log.error("{} Failed to delete message at {}: {}", name, position, e.getMessage(), e);
public void deleteFailed(ManagedLedgerException e, Object positions) {
log.error("{} Failed to delete message at {}: {}", name, positions, e.getMessage(), e);
batchMarkDelete();
this.cursor.rewind();
}

public void stopReplicate() {
Expand All @@ -271,6 +284,10 @@ public void stopReplicate() {
if (cursor != null && (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopping)
|| STATE_UPDATER.compareAndSet(this, State.Started, State.Stopping))) {
cursor.setInactive();
if (!this.markDeleteExecutor.isShutdown() && !this.markDeleteExecutor.isTerminated()) {
batchMarkDelete();
this.markDeleteExecutor.shutdown();
}
cursor.asyncClose(new AsyncCallbacks.CloseCallback() {
@Override
public void closeComplete(Object o) {
Expand Down Expand Up @@ -298,4 +315,23 @@ public void closeFailed(ManagedLedgerException e, Object o) {
log.info("[{}] AMQP Exchange Replicator is already stopped. State: {}", name, STATE_UPDATER.get(this));
}

private synchronized void batchMarkDelete() {
if (markDeletePositionDeque.isEmpty()) {
long backoff = this.markDeleteBackoff.next();
markDeleteExecutor.schedule(this::batchMarkDelete, backoff, TimeUnit.MILLISECONDS);
return;
}
int size = markDeletePositionDeque.size();
List<Position> positions = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
Position position = markDeletePositionDeque.pollFirst();
if (position == null) {
break;
}
positions.add(position);
}
this.cursor.asyncDelete(positions, this, positions);
this.markDeleteBackoff.reset();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -33,12 +34,15 @@
@Slf4j
public class ExchangeContainer {

private AmqpTopicManager amqpTopicManager;
private PulsarService pulsarService;
private final AmqpTopicManager amqpTopicManager;
private final PulsarService pulsarService;
private final ScheduledExecutorService exchangeRouteAckExecutor;

protected ExchangeContainer(AmqpTopicManager amqpTopicManager, PulsarService pulsarService) {
protected ExchangeContainer(AmqpTopicManager amqpTopicManager, PulsarService pulsarService,
ScheduledExecutorService exchangeRouteAckExecutor) {
this.amqpTopicManager = amqpTopicManager;
this.pulsarService = pulsarService;
this.exchangeRouteAckExecutor = exchangeRouteAckExecutor;
}

@Getter
Expand Down Expand Up @@ -108,7 +112,7 @@ public CompletableFuture<AmqpExchange> asyncGetExchange(NamespaceName namespaceN
amqpExchangeType = AmqpExchange.Type.value(exchangeType);
}
PersistentExchange amqpExchange = new PersistentExchange(exchangeName,
amqpExchangeType, persistentTopic, false);
amqpExchangeType, persistentTopic, false, exchangeRouteAckExecutor);
amqpExchangeCompletableFuture.complete(amqpExchange);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
Expand Down Expand Up @@ -61,13 +62,14 @@ public class PersistentExchange extends AbstractAmqpExchange {
public static final String TYPE = "TYPE";
public static final String TOPIC_PREFIX = "__amqp_exchange__";

private PersistentTopic persistentTopic;
private final PersistentTopic persistentTopic;
private ObjectMapper jsonMapper = new JsonMapper();
private final ConcurrentOpenHashMap<String, ManagedCursor> cursors;
private AmqpExchangeReplicator messageReplicator;
private AmqpEntryWriter amqpEntryWriter;
private final AmqpEntryWriter amqpEntryWriter;

public PersistentExchange(String exchangeName, Type type, PersistentTopic persistentTopic, boolean autoDelete) {
public PersistentExchange(String exchangeName, Type type, PersistentTopic persistentTopic, boolean autoDelete,
ScheduledExecutorService exchangeRouteAckExecutor) {
super(exchangeName, type, new HashSet<>(), true, autoDelete);
this.persistentTopic = persistentTopic;
topicNameValidate();
Expand All @@ -80,7 +82,7 @@ public PersistentExchange(String exchangeName, Type type, PersistentTopic persis
}

if (messageReplicator == null) {
messageReplicator = new AmqpExchangeReplicator(this) {
messageReplicator = new AmqpExchangeReplicator(this, exchangeRouteAckExecutor) {
@Override
public CompletableFuture<Void> readProcess(Entry entry) {
Map<String, Object> props;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.streamnative.pulsar.handlers.amqp.AbstractAmqpExchange;
import io.streamnative.pulsar.handlers.amqp.impl.PersistentExchange;
import io.streamnative.pulsar.handlers.amqp.impl.PersistentQueue;
import java.util.concurrent.Executors;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
Expand Down Expand Up @@ -49,7 +50,8 @@ public void exchangeTopicNameValidate() {
Mockito.when(managedLedger.getCursors()).thenReturn(new ManagedCursorContainer());
try {
new PersistentExchange(
exchangeName, exchangeType, exchangeTopic1, false);
exchangeName, exchangeType, exchangeTopic1, false,
Executors.newSingleThreadScheduledExecutor());
} catch (IllegalArgumentException e) {
Assert.fail("Failed to new PersistentExchange. errorMsg: " + e.getMessage());
}
Expand All @@ -59,7 +61,8 @@ public void exchangeTopicNameValidate() {
Mockito.when(exchangeTopic2.getManagedLedger()).thenReturn(managedLedger);
try {
new PersistentExchange(
exchangeName, exchangeType, exchangeTopic2, false);
exchangeName, exchangeType, exchangeTopic2, false,
Executors.newSingleThreadScheduledExecutor());
} catch (IllegalArgumentException e) {
Assert.assertNotNull(e);
log.info("This is expected behavior.");
Expand Down