Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 18 additions & 11 deletions .github/workflows/pr-qpid-jms-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,22 @@ jobs:

- name: package surefire artifacts
if: failure()
run: |
rm -rf artifacts
mkdir artifacts
find . -type d -name "*surefire*" -exec cp --parents -R {} artifacts/ \;
zip -r artifacts.zip artifacts

- uses: actions/upload-artifact@master
name: upload surefire-artifacts
if: failure()
uses: actions/upload-artifact@v3
with:
name: surefire-artifacts
path: artifacts.zip
name: my-artifact
path: tests-qpid-jms-client/target/surefire-reports/ # or path/to/artifact

# - name: package surefire artifacts
# if: failure()
# run: |
# rm -rf artifacts
# mkdir artifacts
# find . -type d -name "*surefire-reports*" -exec cp --parents -R {} artifacts/ \;
# zip -r artifacts.zip artifacts

# - uses: actions/upload-artifact@master
# name: upload surefire-artifacts
# if: failure()
# with:
# name: surefire-artifacts
# path: artifacts.zip
29 changes: 18 additions & 11 deletions .github/workflows/pr-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,22 @@ jobs:

- name: package surefire artifacts
if: failure()
run: |
rm -rf artifacts
mkdir artifacts
find . -type d -name "*surefire*" -exec cp --parents -R {} artifacts/ \;
zip -r artifacts.zip artifacts

- uses: actions/upload-artifact@master
name: upload surefire-artifacts
if: failure()
uses: actions/upload-artifact@v3
with:
name: surefire-artifacts
path: artifacts.zip
name: my-artifact
path: tests/target/surefire-reports/ # or path/to/artifact

# - name: package surefire artifacts
# if: failure()
# run: |
# rm -rf artifacts
# mkdir artifacts
# find . -type d -name "*surefire-reports*" -exec cp --parents -R {} artifacts/ \;
# zip -r artifacts.zip artifacts
#
# - uses: actions/upload-artifact@master
# name: upload surefire-artifacts
# if: failure()
# with:
# name: surefire-artifacts
# path: artifacts.zip
7 changes: 0 additions & 7 deletions amqp-client-auth/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,5 @@
<version>${jjwt.version}</version>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>${awaitility.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

/**
* Base class of AMQP exchange.
Expand All @@ -38,8 +39,9 @@ protected AbstractAmqpExchange(String exchangeName, AmqpExchange.Type exchangeTy
}

@Override
public void addQueue(AmqpQueue queue) {
public CompletableFuture<Void> addQueue(AmqpQueue queue) {
queues.add(queue);
return CompletableFuture.completedFuture(null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;

/**
Expand Down Expand Up @@ -87,8 +88,8 @@ public int hashCode() {
}

@Override
public void bindExchange(AmqpExchange exchange, AmqpMessageRouter router, String bindingKey,
Map<String, Object> arguments) {
public CompletableFuture<Void> bindExchange(AmqpExchange exchange, AmqpMessageRouter router, String bindingKey,
Map<String, Object> arguments) {
// The same exchange and queue can have more than one binding,
// if router had been created, get it and add a new bindingKey.
if (isRouterExisted(exchange)) {
Expand All @@ -100,7 +101,7 @@ public void bindExchange(AmqpExchange exchange, AmqpMessageRouter router, String
router.setArguments(arguments);
this.routers.put(router.getExchange().getName(), router);
}
exchange.addQueue(this);
return exchange.addQueue(this);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public static Type value(String type) {
* Add a queue {@link AmqpQueue} to the exchange.
* @param queue AMQP queue.
*/
void addQueue(AmqpQueue queue);
CompletableFuture<Void> addQueue(AmqpQueue queue);

/**
* Remove a queue {@link AmqpQueue} from the exchange.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ CompletableFuture<Void> writeIndexMessageAsync(String exchangeName, long ledgerI
/**
* Bind to a exchange {@link AmqpExchange}.
*/
void bindExchange(AmqpExchange exchange, AmqpMessageRouter router, String bindingKey,
CompletableFuture<Void> bindExchange(AmqpExchange exchange, AmqpMessageRouter router, String bindingKey,
Map<String, Object> arguments);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.KeyValue;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
Expand All @@ -63,7 +64,7 @@ public class PersistentExchange extends AbstractAmqpExchange {

private PersistentTopic persistentTopic;
private ObjectMapper jsonMapper = new JsonMapper();
private final ConcurrentOpenHashMap<String, ManagedCursor> cursors;
private final ConcurrentOpenHashMap<String, CompletableFuture<ManagedCursor>> cursors;
private AmqpExchangeReplicator messageReplicator;
private AmqpEntryWriter amqpEntryWriter;

Expand All @@ -74,7 +75,7 @@ public PersistentExchange(String exchangeName, Type type, PersistentTopic persis
updateExchangeProperties();
cursors = new ConcurrentOpenHashMap<>(16, 1);
for (ManagedCursor cursor : persistentTopic.getManagedLedger().getCursors()) {
cursors.put(cursor.getName(), cursor);
cursors.put(cursor.getName(), CompletableFuture.completedFuture(cursor));
log.info("PersistentExchange {} recover cursor {}", persistentTopic.getName(), cursor.toString());
cursor.setInactive();
}
Expand Down Expand Up @@ -120,27 +121,20 @@ public CompletableFuture<Entry> readEntryAsync(String queueName, long ledgerId,

@Override
public CompletableFuture<Entry> readEntryAsync(String queueName, Position position) {
CompletableFuture<Entry> future = new CompletableFuture();
// TODO Temporarily put the creation operation here, and later put the operation in router
ManagedCursor cursor = cursors.get(queueName);
if (cursor == null) {
future.completeExceptionally(new ManagedLedgerException("cursor is null"));
return future;
}
ManagedLedgerImpl ledger = (ManagedLedgerImpl) cursor.getManagedLedger();

ledger.asyncReadEntry((PositionImpl) position, new AsyncCallbacks.ReadEntryCallback() {
@Override
public void readEntryComplete(Entry entry, Object o) {
future.complete(entry);
}
CompletableFuture<Entry> future = new CompletableFuture<>();
((ManagedLedgerImpl) persistentTopic.getManagedLedger())
.asyncReadEntry((PositionImpl) position, new AsyncCallbacks.ReadEntryCallback() {
@Override
public void readEntryComplete(Entry entry, Object o) {
future.complete(entry);
}

@Override
public void readEntryFailed(ManagedLedgerException e, Object o) {
future.completeExceptionally(e);
}
}
, null);
@Override
public void readEntryFailed(ManagedLedgerException e, Object o) {
future.completeExceptionally(e);
}
}, null);
return future;
}

Expand All @@ -151,54 +145,62 @@ public CompletableFuture<Void> markDeleteAsync(String queueName, long ledgerId,

@Override
public CompletableFuture<Void> markDeleteAsync(String queueName, Position position) {
CompletableFuture<Void> future = new CompletableFuture();
ManagedCursor cursor = cursors.get(queueName);
if (cursor == null) {
future.complete(null);
return future;
}
if (((PositionImpl) position).compareTo((PositionImpl) cursor.getMarkDeletedPosition()) < 0) {
future.complete(null);
return future;
}
cursor.asyncMarkDelete(position, new AsyncCallbacks.MarkDeleteCallback() {
@Override
public void markDeleteComplete(Object ctx) {
if (log.isDebugEnabled()) {
log.debug("Mark delete success for position: {}", position);
}
CompletableFuture<Void> future = new CompletableFuture<>();
getCursor(queueName).thenAccept(cursor -> {
if (cursor == null) {
future.completeExceptionally(new RuntimeException("Failed to make delete, the cursor "
+ queueName + " of the exchange " + exchangeName + " is null."));
return;
}
if (((PositionImpl) position).compareTo((PositionImpl) cursor.getMarkDeletedPosition()) < 0) {
future.complete(null);
return;
}
cursor.asyncMarkDelete(position, new AsyncCallbacks.MarkDeleteCallback() {
@Override
public void markDeleteComplete(Object ctx) {
if (log.isDebugEnabled()) {
log.debug("[{}] Mark delete success for position: {}", exchangeName, position);
}
future.complete(null);
}

@Override
public void markDeleteFailed(ManagedLedgerException e, Object ctx) {
log.warn("Mark delete success for position: {} with error:",
position, e);
future.completeExceptionally(e);
}
}, null);
@Override
public void markDeleteFailed(ManagedLedgerException e, Object ctx) {
if (((PositionImpl) position).compareTo((PositionImpl) cursor.getMarkDeletedPosition()) < 0) {
log.warn("Mark delete failed for position: {}, {}", position, e.getMessage());
} else {
log.error("Mark delete failed for position: {}", position, e);
}
future.completeExceptionally(e);
}
}, null);
}).exceptionally(t -> {
future.completeExceptionally(t);
return null;
});
return future;
}

@Override
public CompletableFuture<Position> getMarkDeleteAsync(String queueName) {
CompletableFuture<Position> future = new CompletableFuture();
ManagedCursor cursor = cursors.get(queueName);
if (cursor == null) {
future.complete(null);
return future;
}
future.complete(cursor.getMarkDeletedPosition());
return future;
return getCursor(queueName).thenApply(ManagedCursor::getMarkDeletedPosition);
}

private CompletableFuture<ManagedCursor> getCursor(String queueName) {
CompletableFuture<ManagedCursor> cursorFuture = cursors.get(queueName);
if (cursorFuture == null) {
return FutureUtil.failedFuture(new RuntimeException(
"The cursor " + queueName + " of the exchange " + exchangeName + " is not exist."));
}
return cursorFuture;
}

@Override
public void addQueue(AmqpQueue queue) {
public CompletableFuture<Void> addQueue(AmqpQueue queue) {
queues.add(queue);
updateExchangeProperties();
createCursorIfNotExists(queue.getName());

return createCursorIfNotExists(queue.getName()).thenApply(__ -> null);
}

@Override
Expand Down Expand Up @@ -237,46 +239,60 @@ private List<String> getQueueNames() {
return queueNames;
}

private ManagedCursor createCursorIfNotExists(String name) {
private CompletableFuture<ManagedCursor> createCursorIfNotExists(String name) {
CompletableFuture<ManagedCursor> cursorFuture = new CompletableFuture<>();
return cursors.computeIfAbsent(name, cusrsor -> {
ManagedLedgerImpl ledger = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
if (log.isDebugEnabled()) {
log.debug("Create cursor {} for topic {}", name, persistentTopic.getName());
}
ManagedCursor newCursor;
try {
//newCursor = ledger.openCursor(name, CommandSubscribe.InitialPosition.Latest);
newCursor = ledger.newNonDurableCursor(ledger.getLastConfirmedEntry(), name);
} catch (ManagedLedgerException e) {
log.error("Error new cursor for topic {} - {}. will cause fetch data error.",
persistentTopic.getName(), e);
return null;
}
return newCursor;
ledger.asyncOpenCursor(name, CommandSubscribe.InitialPosition.Earliest,
new AsyncCallbacks.OpenCursorCallback() {
@Override
public void openCursorComplete(ManagedCursor cursor, Object ctx) {
cursorFuture.complete(cursor);
}

@Override
public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
log.error("[{}] Failed to open cursor. ", name, exception);
cursorFuture.completeExceptionally(exception);
if (cursors.get(name) != null && cursors.get(name).isCompletedExceptionally()
|| cursors.get(name).isCancelled()) {
cursors.remove(name);
}
}
}, null);
return cursorFuture;
});
}

public void deleteCursor(String name) {
ManagedCursor cursor = cursors.remove(name);
if (cursor != null) {
persistentTopic.getManagedLedger().asyncDeleteCursor(cursor.getName(),
new AsyncCallbacks.DeleteCursorCallback() {
@Override
public void deleteCursorComplete(Object ctx) {
if (log.isDebugEnabled()) {
log.debug("Cursor {} for topic {} deleted successfully .",
cursor.getName(), persistentTopic.getName());
}
}

@Override
public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
log.error("[{}] Error deleting cursor {} for topic {} for reason: {}.",
cursor.getName(), persistentTopic.getName(), exception);
}
}, null);
CompletableFuture<ManagedCursor> cursorFuture = cursors.remove(name);
if (cursorFuture == null) {
log.warn("Failed to delete cursor, the cursor {} of the exchange {} is not exist.", name, exchangeName);
return;
}
cursorFuture.thenAccept(cursor -> {
if (cursor != null) {
persistentTopic.getManagedLedger().asyncDeleteCursor(cursor.getName(),
new AsyncCallbacks.DeleteCursorCallback() {
@Override
public void deleteCursorComplete(Object ctx) {
if (log.isDebugEnabled()) {
log.debug("Cursor {} for topic {} deleted successfully .",
cursor.getName(), persistentTopic.getName());
}
}

@Override
public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
log.error("[{}] Error deleting cursor for topic {}.",
cursor.getName(), persistentTopic.getName(), exception);
}
}, null);
}
});
}

public static String getExchangeTopicName(NamespaceName namespaceName, String exchangeName) {
Expand Down
Loading