Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion amqp-client-auth/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<artifactId>pulsar-protocol-handler-amqp-parent</artifactId>
<groupId>io.streamnative.pulsar.handlers</groupId>
<version>3.3.0-SNAPSHOT</version>
<version>4.0.10.1</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion amqp-impl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>io.streamnative.pulsar.handlers</groupId>
<artifactId>pulsar-protocol-handler-amqp-parent</artifactId>
<version>3.3.0-SNAPSHOT</version>
<version>4.0.10.1</version>
</parent>

<artifactId>pulsar-protocol-handler-amqp</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@
import org.apache.pulsar.broker.protocol.ProtocolHandler;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
import org.eclipse.jetty.ee8.servlet.ServletContextHandler;
import org.eclipse.jetty.ee8.servlet.ServletHolder;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.glassfish.jersey.servlet.ServletContainer;

/**
Expand Down
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

<groupId>io.streamnative.pulsar.handlers</groupId>
<artifactId>pulsar-protocol-handler-amqp-parent</artifactId>
<version>3.3.0-SNAPSHOT</version>
<version>4.0.10.1</version>
<name>StreamNative :: Pulsar Protocol Handler :: AoP Parent</name>
<description>Parent for AMQP on Pulsar implemented using Pulsar Protocol Handler.</description>

Expand All @@ -40,10 +40,10 @@
<project.compiler.release>${maven.compiler.target}</project.compiler.release>

<!-- dependencies -->
<pulsar.version>4.1.0-SNAPSHOT</pulsar.version>
<pulsar.version>4.0.10.1</pulsar.version>
<qpid-protocol-plugin.version>8.0.0</qpid-protocol-plugin.version>
<rabbitmq.version>5.8.0</rabbitmq.version>
<sn.bom.version>4.1.0-SNAPSHOT</sn.bom.version>
<sn.bom.version>4.0.10.1</sn.bom.version>

<!-- test dependencies -->
<qpid-client-version>6.4.0</qpid-client-version>
Expand Down
2 changes: 1 addition & 1 deletion tests-qpid-jms-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
<parent>
<groupId>io.streamnative.pulsar.handlers</groupId>
<artifactId>pulsar-protocol-handler-amqp-parent</artifactId>
<version>3.3.0-SNAPSHOT</version>
<version>4.0.10.1</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import io.streamnative.pulsar.handlers.amqp.qpid.core.JmsTestBase;
import io.streamnative.pulsar.handlers.amqp.qpid.jms_1_1.extensions.BrokerManagementHelper;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
Expand All @@ -43,8 +43,6 @@
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.naming.NamingException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.security.FileTrustStore;
Expand All @@ -59,12 +57,14 @@
import org.apache.qpid.test.utils.tls.PrivateKeyEntry;
import org.apache.qpid.test.utils.tls.TlsResource;
import org.apache.qpid.test.utils.tls.TlsResourceBuilder;
import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.server.handler.HandlerCollection;
import org.eclipse.jetty.util.Callback;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -109,7 +109,7 @@ public class AuthenticationTest extends JmsTestBase
private static final String USER_PASSWORD = "user";

private static final Server CRL_SERVER = new Server();
private static final HandlerCollection HANDLERS = new HandlerCollection();
private static final Handler.Collection HANDLERS = new Handler.Sequence();

private static final String CRL_TEMPLATE = "http://localhost:%d/%s";

Expand Down Expand Up @@ -832,7 +832,7 @@ public static String createDataUrlForFile(Path file) throws IOException
return DataUrlUtils.getDataUrlForBytes(Files.readAllBytes(file));
}

private static class CrlServerHandler extends AbstractHandler
private static class CrlServerHandler extends Handler.Abstract
{
final Path crlPath;

Expand All @@ -842,15 +842,13 @@ private static class CrlServerHandler extends AbstractHandler
}

@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
public boolean handle(Request request, Response response, Callback callback)
throws IOException
{
final byte[] crlBytes = Files.readAllBytes(crlPath);
response.setStatus(HttpServletResponse.SC_OK);
try (final OutputStream responseBody = response.getOutputStream())
{
responseBody.write(crlBytes);
}
response.setStatus(HttpStatus.OK_200);
response.write(true, ByteBuffer.wrap(crlBytes), callback);
return true;
}
}
}
2 changes: 1 addition & 1 deletion tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<groupId>io.streamnative.pulsar.handlers</groupId>
<artifactId>pulsar-protocol-handler-amqp-parent</artifactId>
<version>3.3.0-SNAPSHOT</version>
<version>4.0.10.1</version>
</parent>

<groupId>io.streamnative.pulsar.handlers</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,16 @@
import static org.testng.AssertJUnit.assertFalse;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AlreadyClosedException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.Cleanup;
import lombok.extern.slf4j.Slf4j;
import org.awaitility.Awaitility;
import org.testng.annotations.BeforeClass;
Expand All @@ -48,63 +49,80 @@ public void setup() throws Exception {
@Test
public void e2eTest() throws Exception {
int port = getAmqpBrokerPortList().get(0);
@Cleanup
Connection conn = getConnection("vhost1", port);
@Cleanup
Channel channel = conn.createChannel();
try {
String ex = randExName();
channel.exchangeDeclare(ex, "direct", false, true, null);

String ex = randExName();
channel.exchangeDeclare(ex, "direct", false, true, null);
String qu1 = randQuName();
channel.queueDeclare(qu1, false, false, true, null);
String key1 = "key1";
channel.queueBind(qu1, ex, key1);

String qu1 = randQuName();
channel.queueDeclare(qu1, false, false, true, null);
String key1 = "key1";
channel.queueBind(qu1, ex, key1);
String qu2 = randQuName();
channel.queueDeclare(qu2, false, false, true, null);
String key2 = "key2";
channel.queueBind(qu2, ex, key2);

String qu2 = randQuName();
channel.queueDeclare(qu2, false, false, true, null);
String key2 = "key2";
channel.queueBind(qu2, ex, key2);

int messageCount = 100;
for (int i = 0; i < messageCount; i++) {
channel.basicPublish(ex, key1, null, (key1 + "-" + i).getBytes());
channel.basicPublish(ex, key2, null, (key2 + "-" + i).getBytes());
}
int messageCount = 100;
for (int i = 0; i < messageCount; i++) {
channel.basicPublish(ex, key1, null, (key1 + "-" + i).getBytes());
channel.basicPublish(ex, key2, null, (key2 + "-" + i).getBytes());
}

AtomicInteger receiveCount = new AtomicInteger();
AtomicBoolean flag1 = new AtomicBoolean(false);
channel.basicConsume(qu1, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
receiveCount.incrementAndGet();
if (!new String(body).contains(key1)) {
flag1.set(true);
AtomicInteger receiveCount = new AtomicInteger();
AtomicBoolean flag1 = new AtomicBoolean(false);
channel.basicConsume(qu1, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
receiveCount.incrementAndGet();
if (!new String(body).contains(key1)) {
flag1.set(true);
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
});

AtomicBoolean flag2 = new AtomicBoolean(false);
channel.basicConsume(qu2, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
receiveCount.incrementAndGet();
if (!new String(body).contains(key2)) {
flag2.set(true);
AtomicBoolean flag2 = new AtomicBoolean(false);
channel.basicConsume(qu2, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
receiveCount.incrementAndGet();
if (!new String(body).contains(key2)) {
flag2.set(true);
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
});

Awaitility.waitAtMost(5, TimeUnit.SECONDS)
.until(() -> receiveCount.get() == messageCount * 2);
assertFalse(flag1.get());
assertFalse(flag2.get());
channel.queueUnbind(qu1, ex, key1);
channel.queueUnbind(qu2, ex, key2);
Awaitility.waitAtMost(5, TimeUnit.SECONDS)
.until(() -> receiveCount.get() == messageCount * 2);
assertFalse(flag1.get());
assertFalse(flag2.get());
channel.queueUnbind(qu1, ex, key1);
channel.queueUnbind(qu2, ex, key2);
} finally {
closeQuietly(channel);
closeQuietly(conn);
}
}

private void closeQuietly(Channel channel) throws IOException, TimeoutException {
try {
channel.close();
} catch (AlreadyClosedException ignored) {
log.debug("Channel already closed during cleanup", ignored);
}
}

private void closeQuietly(Connection conn) throws IOException {
try {
conn.close();
} catch (AlreadyClosedException ignored) {
log.debug("Connection already closed during cleanup", ignored);
}
}
}
Loading