Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -126,4 +126,9 @@ public boolean isAutoDelete() {
return autoDelete;
}

@Override
public void recordDispatchEvent() {

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package io.streamnative.pulsar.handlers.amqp;

import io.streamnative.pulsar.handlers.amqp.metrics.AmqpMetrics;
import lombok.Getter;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.authentication.AuthenticationService;
Expand All @@ -36,22 +37,26 @@ public class AmqpBrokerService {
private ConnectionContainer connectionContainer;
@Getter
private PulsarService pulsarService;
@Getter
private AmqpMetrics amqpMetrics;

public AmqpBrokerService(PulsarService pulsarService) {
public AmqpBrokerService(PulsarService pulsarService, boolean enableMetrics) {
this.amqpMetrics = AmqpMetrics.create(enableMetrics);
this.pulsarService = pulsarService;
this.amqpTopicManager = new AmqpTopicManager(pulsarService);
this.exchangeContainer = new ExchangeContainer(amqpTopicManager, pulsarService);
this.queueContainer = new QueueContainer(amqpTopicManager, pulsarService, exchangeContainer);
this.exchangeContainer = new ExchangeContainer(amqpTopicManager, pulsarService, amqpMetrics);
this.queueContainer = new QueueContainer(amqpTopicManager, pulsarService, exchangeContainer, amqpMetrics);
this.exchangeService = new ExchangeServiceImpl(exchangeContainer);
this.queueService = new QueueServiceImpl(exchangeContainer, queueContainer);
this.connectionContainer = new ConnectionContainer(pulsarService, exchangeContainer, queueContainer);
}

public AmqpBrokerService(PulsarService pulsarService, ConnectionContainer connectionContainer) {
this.amqpMetrics = AmqpMetrics.create(true);
this.pulsarService = pulsarService;
this.amqpTopicManager = new AmqpTopicManager(pulsarService);
this.exchangeContainer = new ExchangeContainer(amqpTopicManager, pulsarService);
this.queueContainer = new QueueContainer(amqpTopicManager, pulsarService, exchangeContainer);
this.exchangeContainer = new ExchangeContainer(amqpTopicManager, pulsarService, amqpMetrics);
this.queueContainer = new QueueContainer(amqpTopicManager, pulsarService, exchangeContainer, amqpMetrics);
this.exchangeService = new ExchangeServiceImpl(exchangeContainer);
this.queueService = new QueueServiceImpl(exchangeContainer, queueContainer);
this.connectionContainer = connectionContainer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,12 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
completeAndCloseAllChannels();
amqpBrokerService.getConnectionContainer().removeConnection(namespaceName, this);
this.brokerDecoder.close();
if (this.brokerDecoder != null) {
this.brokerDecoder.close();
if (namespaceName != null) {
this.amqpBrokerService.getAmqpMetrics().connectionDec(namespaceName.getLocalName());
}
}
}

@Override
Expand Down Expand Up @@ -345,6 +350,7 @@ public void receiveConnectionOpen(AMQShortString virtualHost, AMQShortString cap
// Policies policies = getPolicies(namespaceName);
// if (policies != null) {
this.namespaceName = namespaceName;
this.amqpBrokerService.getAmqpMetrics().connectionInc(namespaceName.getLocalName());

MethodRegistry methodRegistry = getMethodRegistry();
AMQMethodBody responseBody = methodRegistry.createConnectionOpenOkBody(virtualHost);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ private CompletableFuture<Void> sendMessage(Entry index) {
getRedeliveryTracker().contains(index.getPosition()),
deliveryTag,
AMQShortString.createAMQShortString(consumerTag));
amqpQueue.recordDispatchEvent();
sendFuture.complete(null);
} catch (Exception e) {
log.error("[{}-{}] Failed to send message to consumer.", queueName, consumerTag, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public String getProtocolDataToAdvertise() {
@Override
public void start(BrokerService service) {
brokerService = service;
amqpBrokerService = new AmqpBrokerService(service.getPulsar());
amqpBrokerService = new AmqpBrokerService(service.getPulsar(), amqpConfig.isAmqpEnableMetrics());
if (amqpConfig.isAmqpProxyEnable()) {
ProxyConfiguration proxyConfig = new ProxyConfiguration();
proxyConfig.setAmqpTenant(amqpConfig.getAmqpTenant());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,6 @@ void bindExchange(AmqpExchange exchange, AmqpMessageRouter router, String bindin
boolean isAutoDelete();

Topic getTopic();

void recordDispatchEvent();
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,12 @@ public class AmqpServiceConfiguration extends ServiceConfiguration {
doc = "Whether start amqp protocol handler with proxy"
)
private boolean amqpProxyEnable = false;

@FieldContext(
category = CATEGORY_AMQP,
required = false,
doc = "Whether enable amqp metrics"
)
private boolean amqpEnableMetrics = false;

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package io.streamnative.pulsar.handlers.amqp;

import io.streamnative.pulsar.handlers.amqp.impl.PersistentExchange;
import io.streamnative.pulsar.handlers.amqp.metrics.AmqpMetrics;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -35,10 +36,13 @@ public class ExchangeContainer {

private AmqpTopicManager amqpTopicManager;
private PulsarService pulsarService;
private AmqpMetrics amqpMetrics;

protected ExchangeContainer(AmqpTopicManager amqpTopicManager, PulsarService pulsarService) {
protected ExchangeContainer(AmqpTopicManager amqpTopicManager, PulsarService pulsarService,
AmqpMetrics amqpMetrics) {
this.amqpTopicManager = amqpTopicManager;
this.pulsarService = pulsarService;
this.amqpMetrics = amqpMetrics;
}

@Getter
Expand Down Expand Up @@ -108,7 +112,8 @@ public CompletableFuture<AmqpExchange> asyncGetExchange(NamespaceName namespaceN
amqpExchangeType = AmqpExchange.Type.value(exchangeType);
}
PersistentExchange amqpExchange = new PersistentExchange(exchangeName,
amqpExchangeType, persistentTopic, false);
amqpExchangeType, persistentTopic, false,
amqpMetrics.addExchangeMetrics(namespaceName.getLocalName(), exchangeName));
amqpExchangeCompletableFuture.complete(amqpExchange);
}
}
Expand All @@ -134,6 +139,7 @@ private void removeExchangeFuture(NamespaceName namespaceName, String exchangeNa
if (exchangeMap.containsKey(namespaceName)) {
exchangeMap.get(namespaceName).remove(exchangeName);
}
amqpMetrics.deleteExchangeMetrics(namespaceName.getLocalName(), exchangeName);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package io.streamnative.pulsar.handlers.amqp;

import io.streamnative.pulsar.handlers.amqp.impl.PersistentQueue;
import io.streamnative.pulsar.handlers.amqp.metrics.AmqpMetrics;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -36,12 +37,14 @@ public class QueueContainer {
private AmqpTopicManager amqpTopicManager;
private PulsarService pulsarService;
private ExchangeContainer exchangeContainer;
private AmqpMetrics amqpMetrics;

protected QueueContainer(AmqpTopicManager amqpTopicManager, PulsarService pulsarService,
ExchangeContainer exchangeContainer) {
ExchangeContainer exchangeContainer, AmqpMetrics amqpMetrics) {
this.amqpTopicManager = amqpTopicManager;
this.pulsarService = pulsarService;
this.exchangeContainer = exchangeContainer;
this.amqpMetrics = amqpMetrics;
}

@Getter
Expand Down Expand Up @@ -96,7 +99,8 @@ public CompletableFuture<AmqpQueue> asyncGetQueue(NamespaceName namespaceName, S

// TODO: reset connectionId, exclusive and autoDelete
PersistentQueue amqpQueue = new PersistentQueue(queueName, persistentTopic,
0, false, false);
0, false, false,
amqpMetrics.addQueueMetrics(namespaceName.getLocalName(), queueName));
try {
amqpQueue.recoverRoutersFromQueueProperties(properties, exchangeContainer,
namespaceName);
Expand Down Expand Up @@ -132,6 +136,7 @@ private void removeQueueFuture(NamespaceName namespaceName, String queueName) {
if (queueMap.containsKey(namespaceName)) {
queueMap.get(namespaceName).remove(queueName);
}
amqpMetrics.deleteQueueMetrics(namespaceName.getLocalName(), queueName);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.streamnative.pulsar.handlers.amqp.AmqpEntryWriter;
import io.streamnative.pulsar.handlers.amqp.AmqpExchangeReplicator;
import io.streamnative.pulsar.handlers.amqp.AmqpQueue;
import io.streamnative.pulsar.handlers.amqp.metrics.ExchangeMetrics;
import io.streamnative.pulsar.handlers.amqp.utils.MessageConvertUtils;
import io.streamnative.pulsar.handlers.amqp.utils.PulsarTopicMetadataUtils;
import java.io.IOException;
Expand Down Expand Up @@ -66,8 +67,10 @@ public class PersistentExchange extends AbstractAmqpExchange {
private final ConcurrentOpenHashMap<String, ManagedCursor> cursors;
private AmqpExchangeReplicator messageReplicator;
private AmqpEntryWriter amqpEntryWriter;
private ExchangeMetrics exchangeMetrics;

public PersistentExchange(String exchangeName, Type type, PersistentTopic persistentTopic, boolean autoDelete) {
public PersistentExchange(String exchangeName, Type type, PersistentTopic persistentTopic, boolean autoDelete,
ExchangeMetrics exchangeMetrics) {
super(exchangeName, type, new HashSet<>(), true, autoDelete);
this.persistentTopic = persistentTopic;
topicNameValidate();
Expand All @@ -83,6 +86,7 @@ public PersistentExchange(String exchangeName, Type type, PersistentTopic persis
messageReplicator = new AmqpExchangeReplicator(this) {
@Override
public CompletableFuture<Void> readProcess(Entry entry) {
exchangeMetrics.routeInc();
Map<String, Object> props;
try {
MessageImpl<byte[]> message = MessageImpl.deserialize(entry.getDataBuffer());
Expand All @@ -100,17 +104,27 @@ public CompletableFuture<Void> readProcess(Entry entry) {
props);
routeFutureList.add(routeFuture);
}
return FutureUtil.waitForAll(routeFutureList);
return FutureUtil.waitForAll(routeFutureList).whenComplete((__, t) -> {
if (t != null) {
exchangeMetrics.routeFailedInc();
}
});
}
};
messageReplicator.startReplicate();
}
this.amqpEntryWriter = new AmqpEntryWriter(persistentTopic);
this.exchangeMetrics = exchangeMetrics;
}

@Override
public CompletableFuture<Position> writeMessageAsync(Message<byte[]> message, String routingKey) {
return amqpEntryWriter.publishMessage(message);
exchangeMetrics.writeInc();
return amqpEntryWriter.publishMessage(message).whenComplete((__, t) -> {
if (t != null) {
exchangeMetrics.writeFailed();
}
});
}

@Override
Expand All @@ -120,7 +134,8 @@ public CompletableFuture<Entry> readEntryAsync(String queueName, long ledgerId,

@Override
public CompletableFuture<Entry> readEntryAsync(String queueName, Position position) {
CompletableFuture<Entry> future = new CompletableFuture();
exchangeMetrics.readInc();
CompletableFuture<Entry> future = new CompletableFuture<>();
// TODO Temporarily put the creation operation here, and later put the operation in router
ManagedCursor cursor = cursors.get(queueName);
if (cursor == null) {
Expand All @@ -137,6 +152,7 @@ public void readEntryComplete(Entry entry, Object o) {

@Override
public void readEntryFailed(ManagedLedgerException e, Object o) {
exchangeMetrics.readFailed();
future.completeExceptionally(e);
}
}
Expand All @@ -151,6 +167,7 @@ public CompletableFuture<Void> markDeleteAsync(String queueName, long ledgerId,

@Override
public CompletableFuture<Void> markDeleteAsync(String queueName, Position position) {
exchangeMetrics.ackInc();
CompletableFuture<Void> future = new CompletableFuture();
ManagedCursor cursor = cursors.get(queueName);
if (cursor == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.streamnative.pulsar.handlers.amqp.AmqpQueueProperties;
import io.streamnative.pulsar.handlers.amqp.ExchangeContainer;
import io.streamnative.pulsar.handlers.amqp.IndexMessage;
import io.streamnative.pulsar.handlers.amqp.metrics.QueueMetrics;
import io.streamnative.pulsar.handlers.amqp.utils.MessageConvertUtils;
import io.streamnative.pulsar.handlers.amqp.utils.PulsarTopicMetadataUtils;
import java.util.ArrayList;
Expand Down Expand Up @@ -61,36 +62,54 @@ public class PersistentQueue extends AbstractAmqpQueue {

private AmqpEntryWriter amqpEntryWriter;

protected QueueMetrics queueMetrics;

public PersistentQueue(String queueName, PersistentTopic indexTopic,
long connectionId,
boolean exclusive, boolean autoDelete) {
boolean exclusive, boolean autoDelete,
QueueMetrics queueMetrics) {
super(queueName, true, connectionId, exclusive, autoDelete);
this.indexTopic = indexTopic;
topicNameValidate();
this.jsonMapper = new ObjectMapper();
this.amqpEntryWriter = new AmqpEntryWriter(indexTopic);
this.queueMetrics = queueMetrics;
}

@Override
public CompletableFuture<Void> writeIndexMessageAsync(String exchangeName, long ledgerId, long entryId) {
try {
queueMetrics.writeInc();
IndexMessage indexMessage = IndexMessage.create(exchangeName, ledgerId, entryId);
MessageImpl<byte[]> message = MessageConvertUtils.toPulsarMessage(indexMessage);
return amqpEntryWriter.publishMessage(message).thenApply(__ -> null);
return amqpEntryWriter.publishMessage(message).whenComplete((__, t) -> {
if (t != null) {
log.error("Failed to publish index messages from exchange {} to queue {}.",
exchangeName, queueName);
queueMetrics.writeFailed();
}
}).thenApply(__ -> null);
} catch (Exception e) {
log.error("Failed to writer index message for exchange {} with position {}:{}.",
exchangeName, ledgerId, entryId);
queueMetrics.writeFailed();
return FutureUtil.failedFuture(e);
}
}

@Override
public CompletableFuture<Entry> readEntryAsync(String exchangeName, long ledgerId, long entryId) {
return getRouter(exchangeName).getExchange().readEntryAsync(getName(), ledgerId, entryId);
return getRouter(exchangeName).getExchange().readEntryAsync(getName(), ledgerId, entryId)
.whenComplete((__, t) -> {
if (t != null) {
queueMetrics.readFailed();
}
});
}

@Override
public CompletableFuture<Void> acknowledgeAsync(String exchangeName, long ledgerId, long entryId) {
queueMetrics.ackInc();
return getRouter(exchangeName).getExchange().markDeleteAsync(getName(), ledgerId, entryId);
}

Expand Down Expand Up @@ -183,4 +202,9 @@ private void topicNameValidate() {
TOPIC_PREFIX, "exchangeName");
}

@Override
public void recordDispatchEvent() {
queueMetrics.dispatchInc();
}

}
Loading