diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpBrokerService.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpBrokerService.java index cdacbd89..3497956a 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpBrokerService.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpBrokerService.java @@ -14,6 +14,9 @@ package io.streamnative.pulsar.handlers.amqp; +import io.netty.util.concurrent.DefaultThreadFactory; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import lombok.Getter; import org.apache.pulsar.broker.PulsarService; import org.apache.pulsar.broker.authentication.AuthenticationService; @@ -36,11 +39,13 @@ public class AmqpBrokerService { private ConnectionContainer connectionContainer; @Getter private PulsarService pulsarService; + private ScheduledExecutorService exchangeRouteAckExecutor = Executors.newSingleThreadScheduledExecutor( + new DefaultThreadFactory("ex-route-ack")); public AmqpBrokerService(PulsarService pulsarService) { this.pulsarService = pulsarService; this.amqpTopicManager = new AmqpTopicManager(pulsarService); - this.exchangeContainer = new ExchangeContainer(amqpTopicManager, pulsarService); + this.exchangeContainer = new ExchangeContainer(amqpTopicManager, pulsarService, exchangeRouteAckExecutor); this.queueContainer = new QueueContainer(amqpTopicManager, pulsarService, exchangeContainer); this.exchangeService = new ExchangeServiceImpl(exchangeContainer); this.queueService = new QueueServiceImpl(exchangeContainer, queueContainer); @@ -50,7 +55,7 @@ public AmqpBrokerService(PulsarService pulsarService) { public AmqpBrokerService(PulsarService pulsarService, ConnectionContainer connectionContainer) { this.pulsarService = pulsarService; this.amqpTopicManager = new AmqpTopicManager(pulsarService); - this.exchangeContainer = new ExchangeContainer(amqpTopicManager, pulsarService); + this.exchangeContainer = new ExchangeContainer(amqpTopicManager, pulsarService, exchangeRouteAckExecutor); this.queueContainer = new QueueContainer(amqpTopicManager, pulsarService, exchangeContainer); this.exchangeService = new ExchangeServiceImpl(exchangeContainer); this.queueService = new QueueServiceImpl(exchangeContainer, queueContainer); diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpExchangeReplicator.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpExchangeReplicator.java index ab11acfe..fbc8784f 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpExchangeReplicator.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpExchangeReplicator.java @@ -16,8 +16,10 @@ import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS; import io.streamnative.pulsar.handlers.amqp.impl.PersistentExchange; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; @@ -27,6 +29,7 @@ import org.apache.bookkeeper.mledger.Entry; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.bookkeeper.mledger.Position; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.client.impl.Backoff; import org.apache.pulsar.common.api.proto.CommandSubscribe; @@ -74,10 +77,18 @@ protected enum State { private final Backoff readFailureBackoff = new Backoff( 1, TimeUnit.SECONDS, 1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS); - protected AmqpExchangeReplicator(PersistentExchange persistentExchange) { + private final ScheduledExecutorService markDeleteExecutor; + private final ConcurrentLinkedDeque markDeletePositionDeque = new ConcurrentLinkedDeque<>(); + private final Backoff markDeleteBackoff = new Backoff( + 100, TimeUnit.MILLISECONDS, 1, TimeUnit.SECONDS, 1, TimeUnit.SECONDS); + + protected AmqpExchangeReplicator(PersistentExchange persistentExchange, + ScheduledExecutorService exchangeRouteAckExecutor) { this.persistentExchange = persistentExchange; this.topic = (PersistentTopic) persistentExchange.getTopic(); this.scheduledExecutorService = topic.getBrokerService().executor(); + this.markDeleteExecutor = exchangeRouteAckExecutor; + this.markDeleteExecutor.schedule(this::batchMarkDelete, 100, TimeUnit.MILLISECONDS); STATE_UPDATER.set(this, AmqpExchangeReplicator.State.Stopped); this.name = "[AMQP Replicator for " + topic.getName() + " ]"; } @@ -198,13 +209,13 @@ public void readEntriesComplete(List list, Object o) { completableFuture.whenComplete((ignored, exception) -> { if (exception != null) { log.error("{} Error producing messages", name, exception); + batchMarkDelete(); AmqpExchangeReplicator.this.cursor.rewind(); } else { if (log.isDebugEnabled()) { log.debug("{} Route message successfully.", name); } - AmqpExchangeReplicator.this.cursor - .asyncDelete(entry.getPosition(), this, entry.getPosition()); + this.markDeletePositionDeque.add(entry.getPosition()); } entry.release(); @@ -245,15 +256,17 @@ public void readEntriesFailed(ManagedLedgerException exception, Object o) { } @Override - public void deleteComplete(Object position) { + public void deleteComplete(Object positions) { if (log.isDebugEnabled()) { - log.debug("{} Deleted message at {}", name, position); + log.debug("{} Deleted message at {}", name, positions); } } @Override - public void deleteFailed(ManagedLedgerException e, Object position) { - log.error("{} Failed to delete message at {}: {}", name, position, e.getMessage(), e); + public void deleteFailed(ManagedLedgerException e, Object positions) { + log.error("{} Failed to delete message at {}: {}", name, positions, e.getMessage(), e); + batchMarkDelete(); + this.cursor.rewind(); } public void stopReplicate() { @@ -271,6 +284,10 @@ public void stopReplicate() { if (cursor != null && (STATE_UPDATER.compareAndSet(this, State.Starting, State.Stopping) || STATE_UPDATER.compareAndSet(this, State.Started, State.Stopping))) { cursor.setInactive(); + if (!this.markDeleteExecutor.isShutdown() && !this.markDeleteExecutor.isTerminated()) { + batchMarkDelete(); + this.markDeleteExecutor.shutdown(); + } cursor.asyncClose(new AsyncCallbacks.CloseCallback() { @Override public void closeComplete(Object o) { @@ -298,4 +315,23 @@ public void closeFailed(ManagedLedgerException e, Object o) { log.info("[{}] AMQP Exchange Replicator is already stopped. State: {}", name, STATE_UPDATER.get(this)); } + private synchronized void batchMarkDelete() { + if (markDeletePositionDeque.isEmpty()) { + long backoff = this.markDeleteBackoff.next(); + markDeleteExecutor.schedule(this::batchMarkDelete, backoff, TimeUnit.MILLISECONDS); + return; + } + int size = markDeletePositionDeque.size(); + List positions = new ArrayList<>(size); + for (int i = 0; i < size; i++) { + Position position = markDeletePositionDeque.pollFirst(); + if (position == null) { + break; + } + positions.add(position); + } + this.cursor.asyncDelete(positions, this, positions); + this.markDeleteBackoff.reset(); + } + } diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/ExchangeContainer.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/ExchangeContainer.java index 57a78199..f4cf43ef 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/ExchangeContainer.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/ExchangeContainer.java @@ -18,6 +18,7 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; @@ -33,12 +34,15 @@ @Slf4j public class ExchangeContainer { - private AmqpTopicManager amqpTopicManager; - private PulsarService pulsarService; + private final AmqpTopicManager amqpTopicManager; + private final PulsarService pulsarService; + private final ScheduledExecutorService exchangeRouteAckExecutor; - protected ExchangeContainer(AmqpTopicManager amqpTopicManager, PulsarService pulsarService) { + protected ExchangeContainer(AmqpTopicManager amqpTopicManager, PulsarService pulsarService, + ScheduledExecutorService exchangeRouteAckExecutor) { this.amqpTopicManager = amqpTopicManager; this.pulsarService = pulsarService; + this.exchangeRouteAckExecutor = exchangeRouteAckExecutor; } @Getter @@ -108,7 +112,7 @@ public CompletableFuture asyncGetExchange(NamespaceName namespaceN amqpExchangeType = AmqpExchange.Type.value(exchangeType); } PersistentExchange amqpExchange = new PersistentExchange(exchangeName, - amqpExchangeType, persistentTopic, false); + amqpExchangeType, persistentTopic, false, exchangeRouteAckExecutor); amqpExchangeCompletableFuture.complete(amqpExchange); } } diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/impl/PersistentExchange.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/impl/PersistentExchange.java index b75a88b1..3c8991be 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/impl/PersistentExchange.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/impl/PersistentExchange.java @@ -31,6 +31,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.AsyncCallbacks; @@ -61,13 +62,14 @@ public class PersistentExchange extends AbstractAmqpExchange { public static final String TYPE = "TYPE"; public static final String TOPIC_PREFIX = "__amqp_exchange__"; - private PersistentTopic persistentTopic; + private final PersistentTopic persistentTopic; private ObjectMapper jsonMapper = new JsonMapper(); private final ConcurrentOpenHashMap cursors; private AmqpExchangeReplicator messageReplicator; - private AmqpEntryWriter amqpEntryWriter; + private final AmqpEntryWriter amqpEntryWriter; - public PersistentExchange(String exchangeName, Type type, PersistentTopic persistentTopic, boolean autoDelete) { + public PersistentExchange(String exchangeName, Type type, PersistentTopic persistentTopic, boolean autoDelete, + ScheduledExecutorService exchangeRouteAckExecutor) { super(exchangeName, type, new HashSet<>(), true, autoDelete); this.persistentTopic = persistentTopic; topicNameValidate(); @@ -80,7 +82,7 @@ public PersistentExchange(String exchangeName, Type type, PersistentTopic persis } if (messageReplicator == null) { - messageReplicator = new AmqpExchangeReplicator(this) { + messageReplicator = new AmqpExchangeReplicator(this, exchangeRouteAckExecutor) { @Override public CompletableFuture readProcess(Entry entry) { Map props; diff --git a/amqp-impl/src/test/java/io/streamnative/pulsar/handlers/amqp/test/TopicNameTest.java b/amqp-impl/src/test/java/io/streamnative/pulsar/handlers/amqp/test/TopicNameTest.java index ae019f27..b2a48f3c 100644 --- a/amqp-impl/src/test/java/io/streamnative/pulsar/handlers/amqp/test/TopicNameTest.java +++ b/amqp-impl/src/test/java/io/streamnative/pulsar/handlers/amqp/test/TopicNameTest.java @@ -19,6 +19,7 @@ import io.streamnative.pulsar.handlers.amqp.AbstractAmqpExchange; import io.streamnative.pulsar.handlers.amqp.impl.PersistentExchange; import io.streamnative.pulsar.handlers.amqp.impl.PersistentQueue; +import java.util.concurrent.Executors; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer; import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; @@ -49,7 +50,8 @@ public void exchangeTopicNameValidate() { Mockito.when(managedLedger.getCursors()).thenReturn(new ManagedCursorContainer()); try { new PersistentExchange( - exchangeName, exchangeType, exchangeTopic1, false); + exchangeName, exchangeType, exchangeTopic1, false, + Executors.newSingleThreadScheduledExecutor()); } catch (IllegalArgumentException e) { Assert.fail("Failed to new PersistentExchange. errorMsg: " + e.getMessage()); } @@ -59,7 +61,8 @@ public void exchangeTopicNameValidate() { Mockito.when(exchangeTopic2.getManagedLedger()).thenReturn(managedLedger); try { new PersistentExchange( - exchangeName, exchangeType, exchangeTopic2, false); + exchangeName, exchangeType, exchangeTopic2, false, + Executors.newSingleThreadScheduledExecutor()); } catch (IllegalArgumentException e) { Assert.assertNotNull(e); log.info("This is expected behavior.");