From e2e33ed6280e5970875e0e253bb334e12a05a29d Mon Sep 17 00:00:00 2001 From: hqbfzwang Date: Fri, 9 Jan 2026 08:08:49 +0000 Subject: [PATCH 1/2] fix: support vhost config for rabbitmq benchmark and clean resource (merge request !9) Squash merge branch 'fix_rabbitmq' into 'tencent_master' fix: support vhost config for rabbitmq benchmark and clean resource --- .../benchmark/driver/ResourceCreator.java | 4 + .../rabbitmq/RabbitMqBenchmarkDriver.java | 99 ++++++++++--------- 2 files changed, 59 insertions(+), 44 deletions(-) diff --git a/driver-api/src/main/java/io/openmessaging/benchmark/driver/ResourceCreator.java b/driver-api/src/main/java/io/openmessaging/benchmark/driver/ResourceCreator.java index 3bdfe5372..64ef34561 100644 --- a/driver-api/src/main/java/io/openmessaging/benchmark/driver/ResourceCreator.java +++ b/driver-api/src/main/java/io/openmessaging/benchmark/driver/ResourceCreator.java @@ -83,6 +83,10 @@ private List createBlocking(List resources) { return created; } + public void close() { + executor.shutdown(); + } + @SneakyThrows private Map> executeBatch(List batch) { log.debug("Executing batch, size: {}", batch.size()); diff --git a/driver-rabbitmq/src/main/java/io/openmessaging/benchmark/driver/rabbitmq/RabbitMqBenchmarkDriver.java b/driver-rabbitmq/src/main/java/io/openmessaging/benchmark/driver/rabbitmq/RabbitMqBenchmarkDriver.java index 5d639b545..dcfa87c2b 100644 --- a/driver-rabbitmq/src/main/java/io/openmessaging/benchmark/driver/rabbitmq/RabbitMqBenchmarkDriver.java +++ b/driver-rabbitmq/src/main/java/io/openmessaging/benchmark/driver/rabbitmq/RabbitMqBenchmarkDriver.java @@ -46,6 +46,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; import org.apache.bookkeeper.stats.StatsLogger; +import org.apache.commons.lang.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,6 +59,47 @@ public class RabbitMqBenchmarkDriver implements BenchmarkDriver { * back to secondary brokers. */ private final Map connections = new ConcurrentHashMap<>(); + private final ResourceCreator producerResourceCreator = new ResourceCreator<>( + "producer", + config.producerCreationBatchSize, + config.producerCreationDelay, + ps -> ps.stream().collect(toMap(p -> p, p -> createProducer(p.getTopic()))), + fc -> { + try { + return new CreationResult<>(fc.get(), true); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (ExecutionException e) { + log.debug(e.getMessage()); + return new CreationResult<>(null, false); + } + }); + private final ResourceCreator consumerResourceCreator = new ResourceCreator<>( + "consumer", + config.consumerCreationBatchSize, + config.consumerCreationDelay, + cs -> + cs.stream() + .collect( + toMap( + c -> c, + c -> + createConsumer( + c.getTopic(), + c.getSubscriptionName(), + c.getConsumerCallback()))), + fc -> { + try { + return new CreationResult<>(fc.get(), true); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (ExecutionException e) { + log.debug(e.getMessage()); + return new CreationResult<>(null, false); + } + }); @Override public void initialize(File configurationFile, StatsLogger statsLogger) throws IOException { @@ -78,6 +120,8 @@ public void close() { } it.remove(); } + producerResourceCreator.close(); + consumerResourceCreator.close(); } @Override @@ -126,53 +170,12 @@ public CompletableFuture createProducer(String topic) { @Override public CompletableFuture> createProducers(List producers) { - return new ResourceCreator( - "producer", - config.producerCreationBatchSize, - config.producerCreationDelay, - ps -> ps.stream().collect(toMap(p -> p, p -> createProducer(p.getTopic()))), - fc -> { - try { - return new CreationResult<>(fc.get(), true); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } catch (ExecutionException e) { - log.debug(e.getMessage()); - return new CreationResult<>(null, false); - } - }) - .create(producers); + return producerResourceCreator.create(producers); } @Override public CompletableFuture> createConsumers(List consumers) { - return new ResourceCreator( - "consumer", - config.consumerCreationBatchSize, - config.consumerCreationDelay, - cs -> - cs.stream() - .collect( - toMap( - c -> c, - c -> - createConsumer( - c.getTopic(), - c.getSubscriptionName(), - c.getConsumerCallback()))), - fc -> { - try { - return new CreationResult<>(fc.get(), true); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } catch (ExecutionException e) { - log.debug(e.getMessage()); - return new CreationResult<>(null, false); - } - }) - .create(consumers); + return consumerResourceCreator.create(consumers); } @Override @@ -228,12 +231,20 @@ private Connection getOrCreateConnection(String primaryBrokerUri) { try { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setAutomaticRecoveryEnabled(true); - String userInfo = newURI(primaryBrokerUri).getUserInfo(); + URI uri = newURI(primaryBrokerUri); + String userInfo = uri.getUserInfo(); if (userInfo != null) { String[] userInfoElems = userInfo.split(":"); connectionFactory.setUsername(userInfoElems[0]); connectionFactory.setPassword(userInfoElems[1]); } + String path = uri.getPath(); + if (!StringUtils.isBlank(path)) { + if (path.startsWith("/")) { + path = path.substring(1); + } + connectionFactory.setVirtualHost(path); + } return connectionFactory.newConnection(addresses); } catch (Exception e) { throw new RuntimeException("Couldn't establish connection to: " + primaryBrokerUri, e); From ed517b22d9d634c28b53e937c45c7eeb6800e220 Mon Sep 17 00:00:00 2001 From: hqbfzwang Date: Tue, 27 Jan 2026 08:11:44 +0000 Subject: [PATCH 2/2] fix : fix mqtt worker and opti response (merge request !12) Squash merge branch 'opti_response' into 'tencent_master' opti: opti response return --- .../rabbitmq/RabbitMqBenchmarkDriver.java | 92 ++++++++++--------- 1 file changed, 49 insertions(+), 43 deletions(-) diff --git a/driver-rabbitmq/src/main/java/io/openmessaging/benchmark/driver/rabbitmq/RabbitMqBenchmarkDriver.java b/driver-rabbitmq/src/main/java/io/openmessaging/benchmark/driver/rabbitmq/RabbitMqBenchmarkDriver.java index dcfa87c2b..879a96dd2 100644 --- a/driver-rabbitmq/src/main/java/io/openmessaging/benchmark/driver/rabbitmq/RabbitMqBenchmarkDriver.java +++ b/driver-rabbitmq/src/main/java/io/openmessaging/benchmark/driver/rabbitmq/RabbitMqBenchmarkDriver.java @@ -59,51 +59,53 @@ public class RabbitMqBenchmarkDriver implements BenchmarkDriver { * back to secondary brokers. */ private final Map connections = new ConcurrentHashMap<>(); - private final ResourceCreator producerResourceCreator = new ResourceCreator<>( - "producer", - config.producerCreationBatchSize, - config.producerCreationDelay, - ps -> ps.stream().collect(toMap(p -> p, p -> createProducer(p.getTopic()))), - fc -> { - try { - return new CreationResult<>(fc.get(), true); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } catch (ExecutionException e) { - log.debug(e.getMessage()); - return new CreationResult<>(null, false); - } - }); - private final ResourceCreator consumerResourceCreator = new ResourceCreator<>( - "consumer", - config.consumerCreationBatchSize, - config.consumerCreationDelay, - cs -> - cs.stream() - .collect( - toMap( - c -> c, - c -> - createConsumer( - c.getTopic(), - c.getSubscriptionName(), - c.getConsumerCallback()))), - fc -> { - try { - return new CreationResult<>(fc.get(), true); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new RuntimeException(e); - } catch (ExecutionException e) { - log.debug(e.getMessage()); - return new CreationResult<>(null, false); - } - }); + private ResourceCreator producerResourceCreator; + private ResourceCreator consumerResourceCreator; @Override public void initialize(File configurationFile, StatsLogger statsLogger) throws IOException { config = mapper.readValue(configurationFile, RabbitMqConfig.class); + producerResourceCreator = new ResourceCreator<>( + "producer", + config.producerCreationBatchSize, + config.producerCreationDelay, + ps -> ps.stream().collect(toMap(p -> p, p -> createProducer(p.getTopic()))), + fc -> { + try { + return new CreationResult<>(fc.get(), true); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (ExecutionException e) { + log.debug(e.getMessage()); + return new CreationResult<>(null, false); + } + }); + consumerResourceCreator = new ResourceCreator<>( + "consumer", + config.consumerCreationBatchSize, + config.consumerCreationDelay, + cs -> + cs.stream() + .collect( + toMap( + c -> c, + c -> + createConsumer( + c.getTopic(), + c.getSubscriptionName(), + c.getConsumerCallback()))), + fc -> { + try { + return new CreationResult<>(fc.get(), true); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } catch (ExecutionException e) { + log.debug(e.getMessage()); + return new CreationResult<>(null, false); + } + }); } @Override @@ -120,8 +122,12 @@ public void close() { } it.remove(); } - producerResourceCreator.close(); - consumerResourceCreator.close(); + if (producerResourceCreator != null) { + producerResourceCreator.close(); + } + if (consumerResourceCreator != null) { + consumerResourceCreator.close(); + } } @Override