diff --git a/amqp-client-auth/pom.xml b/amqp-client-auth/pom.xml index 41165c440..e598edbcd 100644 --- a/amqp-client-auth/pom.xml +++ b/amqp-client-auth/pom.xml @@ -20,7 +20,7 @@ pulsar-protocol-handler-amqp-parent io.streamnative.pulsar.handlers - 3.3.0-SNAPSHOT + 4.0.10.1 4.0.0 diff --git a/amqp-impl/pom.xml b/amqp-impl/pom.xml index b28e6529b..f5033b3bf 100644 --- a/amqp-impl/pom.xml +++ b/amqp-impl/pom.xml @@ -22,7 +22,7 @@ io.streamnative.pulsar.handlers pulsar-protocol-handler-amqp-parent - 3.3.0-SNAPSHOT + 4.0.10.1 pulsar-protocol-handler-amqp diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpProtocolHandler.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpProtocolHandler.java index d599be858..76c71fa59 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpProtocolHandler.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpProtocolHandler.java @@ -31,9 +31,9 @@ import org.apache.pulsar.broker.protocol.ProtocolHandler; import org.apache.pulsar.broker.service.BrokerService; import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener; +import org.eclipse.jetty.ee8.servlet.ServletContextHandler; +import org.eclipse.jetty.ee8.servlet.ServletHolder; import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.servlet.ServletContextHandler; -import org.eclipse.jetty.servlet.ServletHolder; import org.glassfish.jersey.servlet.ServletContainer; /** diff --git a/pom.xml b/pom.xml index 2512c7cb5..f9237d2d8 100644 --- a/pom.xml +++ b/pom.xml @@ -29,7 +29,7 @@ io.streamnative.pulsar.handlers pulsar-protocol-handler-amqp-parent - 3.3.0-SNAPSHOT + 4.0.10.1 StreamNative :: Pulsar Protocol Handler :: AoP Parent Parent for AMQP on Pulsar implemented using Pulsar Protocol Handler. @@ -40,10 +40,10 @@ ${maven.compiler.target} - 4.1.0-SNAPSHOT + 4.0.10.1 8.0.0 5.8.0 - 4.1.0-SNAPSHOT + 4.0.10.1 6.4.0 diff --git a/tests-qpid-jms-client/pom.xml b/tests-qpid-jms-client/pom.xml index 1e95ec8ef..636ac762b 100644 --- a/tests-qpid-jms-client/pom.xml +++ b/tests-qpid-jms-client/pom.xml @@ -20,7 +20,7 @@ io.streamnative.pulsar.handlers pulsar-protocol-handler-amqp-parent - 3.3.0-SNAPSHOT + 4.0.10.1 4.0.0 diff --git a/tests-qpid-jms-client/src/test/java/io/streamnative/pulsar/handlers/amqp/qpid/jms_1_1/extensions/sasl/AuthenticationTest.java b/tests-qpid-jms-client/src/test/java/io/streamnative/pulsar/handlers/amqp/qpid/jms_1_1/extensions/sasl/AuthenticationTest.java index 1db8efc0f..8ec1d1491 100644 --- a/tests-qpid-jms-client/src/test/java/io/streamnative/pulsar/handlers/amqp/qpid/jms_1_1/extensions/sasl/AuthenticationTest.java +++ b/tests-qpid-jms-client/src/test/java/io/streamnative/pulsar/handlers/amqp/qpid/jms_1_1/extensions/sasl/AuthenticationTest.java @@ -28,7 +28,7 @@ import io.streamnative.pulsar.handlers.amqp.qpid.core.JmsTestBase; import io.streamnative.pulsar.handlers.amqp.qpid.jms_1_1.extensions.BrokerManagementHelper; import java.io.IOException; -import java.io.OutputStream; +import java.nio.ByteBuffer; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardCopyOption; @@ -43,8 +43,6 @@ import javax.jms.Session; import javax.jms.TemporaryQueue; import javax.naming.NamingException; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Protocol; import org.apache.qpid.server.security.FileTrustStore; @@ -59,12 +57,14 @@ import org.apache.qpid.test.utils.tls.PrivateKeyEntry; import org.apache.qpid.test.utils.tls.TlsResource; import org.apache.qpid.test.utils.tls.TlsResourceBuilder; +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.Response; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; -import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.server.handler.ContextHandler; -import org.eclipse.jetty.server.handler.HandlerCollection; +import org.eclipse.jetty.util.Callback; import org.hamcrest.Matchers; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -109,7 +109,7 @@ public class AuthenticationTest extends JmsTestBase private static final String USER_PASSWORD = "user"; private static final Server CRL_SERVER = new Server(); - private static final HandlerCollection HANDLERS = new HandlerCollection(); + private static final Handler.Collection HANDLERS = new Handler.Sequence(); private static final String CRL_TEMPLATE = "http://localhost:%d/%s"; @@ -832,7 +832,7 @@ public static String createDataUrlForFile(Path file) throws IOException return DataUrlUtils.getDataUrlForBytes(Files.readAllBytes(file)); } - private static class CrlServerHandler extends AbstractHandler + private static class CrlServerHandler extends Handler.Abstract { final Path crlPath; @@ -842,15 +842,13 @@ private static class CrlServerHandler extends AbstractHandler } @Override - public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) + public boolean handle(Request request, Response response, Callback callback) throws IOException { final byte[] crlBytes = Files.readAllBytes(crlPath); - response.setStatus(HttpServletResponse.SC_OK); - try (final OutputStream responseBody = response.getOutputStream()) - { - responseBody.write(crlBytes); - } + response.setStatus(HttpStatus.OK_200); + response.write(true, ByteBuffer.wrap(crlBytes), callback); + return true; } } } diff --git a/tests/pom.xml b/tests/pom.xml index 95ee1831c..7180bc811 100644 --- a/tests/pom.xml +++ b/tests/pom.xml @@ -22,7 +22,7 @@ io.streamnative.pulsar.handlers pulsar-protocol-handler-amqp-parent - 3.3.0-SNAPSHOT + 4.0.10.1 io.streamnative.pulsar.handlers diff --git a/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/MultiBundlesTest.java b/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/MultiBundlesTest.java index df7c04be4..6cf6ee551 100644 --- a/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/MultiBundlesTest.java +++ b/tests/src/test/java/io/streamnative/pulsar/handlers/amqp/MultiBundlesTest.java @@ -16,15 +16,16 @@ import static org.testng.AssertJUnit.assertFalse; import com.rabbitmq.client.AMQP; +import com.rabbitmq.client.AlreadyClosedException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.DefaultConsumer; import com.rabbitmq.client.Envelope; import java.io.IOException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.awaitility.Awaitility; import org.testng.annotations.BeforeClass; @@ -48,63 +49,80 @@ public void setup() throws Exception { @Test public void e2eTest() throws Exception { int port = getAmqpBrokerPortList().get(0); - @Cleanup Connection conn = getConnection("vhost1", port); - @Cleanup Channel channel = conn.createChannel(); + try { + String ex = randExName(); + channel.exchangeDeclare(ex, "direct", false, true, null); - String ex = randExName(); - channel.exchangeDeclare(ex, "direct", false, true, null); + String qu1 = randQuName(); + channel.queueDeclare(qu1, false, false, true, null); + String key1 = "key1"; + channel.queueBind(qu1, ex, key1); - String qu1 = randQuName(); - channel.queueDeclare(qu1, false, false, true, null); - String key1 = "key1"; - channel.queueBind(qu1, ex, key1); + String qu2 = randQuName(); + channel.queueDeclare(qu2, false, false, true, null); + String key2 = "key2"; + channel.queueBind(qu2, ex, key2); - String qu2 = randQuName(); - channel.queueDeclare(qu2, false, false, true, null); - String key2 = "key2"; - channel.queueBind(qu2, ex, key2); - - int messageCount = 100; - for (int i = 0; i < messageCount; i++) { - channel.basicPublish(ex, key1, null, (key1 + "-" + i).getBytes()); - channel.basicPublish(ex, key2, null, (key2 + "-" + i).getBytes()); - } + int messageCount = 100; + for (int i = 0; i < messageCount; i++) { + channel.basicPublish(ex, key1, null, (key1 + "-" + i).getBytes()); + channel.basicPublish(ex, key2, null, (key2 + "-" + i).getBytes()); + } - AtomicInteger receiveCount = new AtomicInteger(); - AtomicBoolean flag1 = new AtomicBoolean(false); - channel.basicConsume(qu1, false, new DefaultConsumer(channel) { - @Override - public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, - byte[] body) throws IOException { - receiveCount.incrementAndGet(); - if (!new String(body).contains(key1)) { - flag1.set(true); + AtomicInteger receiveCount = new AtomicInteger(); + AtomicBoolean flag1 = new AtomicBoolean(false); + channel.basicConsume(qu1, false, new DefaultConsumer(channel) { + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, + byte[] body) throws IOException { + receiveCount.incrementAndGet(); + if (!new String(body).contains(key1)) { + flag1.set(true); + } + channel.basicAck(envelope.getDeliveryTag(), false); } - channel.basicAck(envelope.getDeliveryTag(), false); - } - }); + }); - AtomicBoolean flag2 = new AtomicBoolean(false); - channel.basicConsume(qu2, false, new DefaultConsumer(channel) { - @Override - public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, - byte[] body) throws IOException { - receiveCount.incrementAndGet(); - if (!new String(body).contains(key2)) { - flag2.set(true); + AtomicBoolean flag2 = new AtomicBoolean(false); + channel.basicConsume(qu2, false, new DefaultConsumer(channel) { + @Override + public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, + byte[] body) throws IOException { + receiveCount.incrementAndGet(); + if (!new String(body).contains(key2)) { + flag2.set(true); + } + channel.basicAck(envelope.getDeliveryTag(), false); } - channel.basicAck(envelope.getDeliveryTag(), false); - } - }); + }); - Awaitility.waitAtMost(5, TimeUnit.SECONDS) - .until(() -> receiveCount.get() == messageCount * 2); - assertFalse(flag1.get()); - assertFalse(flag2.get()); - channel.queueUnbind(qu1, ex, key1); - channel.queueUnbind(qu2, ex, key2); + Awaitility.waitAtMost(5, TimeUnit.SECONDS) + .until(() -> receiveCount.get() == messageCount * 2); + assertFalse(flag1.get()); + assertFalse(flag2.get()); + channel.queueUnbind(qu1, ex, key1); + channel.queueUnbind(qu2, ex, key2); + } finally { + closeQuietly(channel); + closeQuietly(conn); + } } + private void closeQuietly(Channel channel) throws IOException, TimeoutException { + try { + channel.close(); + } catch (AlreadyClosedException ignored) { + log.debug("Channel already closed during cleanup", ignored); + } + } + + private void closeQuietly(Connection conn) throws IOException { + try { + conn.close(); + } catch (AlreadyClosedException ignored) { + log.debug("Connection already closed during cleanup", ignored); + } + } }