diff --git a/README.md b/README.md
index 1cf1bbc4..8f581b2b 100644
--- a/README.md
+++ b/README.md
@@ -426,6 +426,48 @@ However, if you completely trust your serialized data (for example, your develop
See [transaction-outbox-jackson](transactionoutbox-jackson/README.md), which uses a specially-configured Jackson `ObjectMapper` to achieve this.
+### Flexible retry policy
+
+The default retry policy for tasks is configured using:
+```
+TransactionOutbox outbox = TransactionOutbox.builder()
+ ...
+ // 10 attempts at a task before blocking it.
+ .blockAfterAttempts(10)
+ // Flush once every 15 minutes only
+ .attemptFrequency(Duration.ofMinutes(15))
+ .build();
+```
+and suitable for most use cases.
+
+If you need to override the default retry policy on a per-task basis, you can do so by implementing the `RetryPolicyAware` interface in the class you pass to `outbox.schedule()`:
+```
+public class RetryPolicyAwareService implements RetryPolicyAware {
+ @Override
+ public Duration waitDuration(int attempt, Throwable throwable) {
+ Duration initialInterval = Duration.ofMillis(100);
+ // Exponential backoff using IntervalFunction from resilience4j-core
+ long waitDurationMillis = IntervalFunction.ofExponentialBackoff(initialInterval).apply(attempt);
+ return Duration.ofMillis(waitDurationMillis);
+ }
+
+ @Override
+ public int blockAfterAttempts(int attempt, Throwable throwable) {
+ // Stop retrying and block outbox entry immediately on ServerError
+ if (throwable instanceof ServerError) {
+ return 0;
+ }
+ return 3;
+ }
+
+ public void callExternalService() {
+ // ...
+ }
+}
+
+outbox.schedule(RetryPolicyAwareService.class).callExternalService();
+```
+
### Clustering
The default mechanism for _running_ tasks (either immediately, or when they are picked up by background processing) is via a `java.concurrent.Executor`, which effectively does the following:
diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/RetryPolicyAware.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/RetryPolicyAware.java
new file mode 100644
index 00000000..5ca872fa
--- /dev/null
+++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/RetryPolicyAware.java
@@ -0,0 +1,30 @@
+package com.gruelbox.transactionoutbox;
+
+import java.time.Duration;
+
+/**
+ * Defines a custom retry policy for tasks scheduled in the {@link TransactionOutbox}.
+ *
+ *
Implement this interface in the class that is passed to {@link
+ * TransactionOutbox#schedule(Class)} to override the default retry behavior.
+ */
+public interface RetryPolicyAware {
+ /**
+ * Determines the wait duration before retrying a failed task.
+ *
+ * @param attempt The current retry attempt (starting from 1).
+ * @param throwable The exception that caused the failure.
+ * @return The duration to wait before the next retry.
+ */
+ Duration waitDuration(int attempt, Throwable throwable);
+
+ /**
+ * Specifies the maximum number of retry attempts before blocking the task.
+ *
+ * @param attempt The current retry attempt (starting from 1).
+ * @param throwable The exception that caused the failure.
+ * @return The number of attempts after which the task should be blocked. If the returned value is
+ * less than or equal to {@code attempt}, the task is blocked immediately.
+ */
+ int blockAfterAttempts(int attempt, Throwable throwable);
+}
diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java
index feb5b1bd..631d6677 100644
--- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java
+++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java
@@ -103,7 +103,7 @@ private boolean doFlush(Function
for (var entry : entries) {
log.debug("Triggering {}", entry.description());
try {
- pushBack(transaction, entry);
+ pushBack(transaction, entry, attemptFrequency);
result.add(entry);
} catch (OptimisticLockException e) {
log.debug("Beaten to optimistic lock on {}", entry.description());
@@ -286,6 +286,7 @@ private void submitNow(TransactionOutboxEntry entry) {
public void processNow(TransactionOutboxEntry entry) {
initialize();
Boolean success = null;
+ InvocationInstanceHolder invocationInstanceHolder = new InvocationInstanceHolder();
try {
success =
transactionManager.inTransactionReturnsThrows(
@@ -298,7 +299,8 @@ public void processNow(TransactionOutboxEntry entry) {
.withinMDC(
() -> {
log.info("Processing {}", entry.description());
- invoke(entry, tx);
+ invocationInstanceHolder.instance = getInvocationInstance(entry);
+ invoke(invocationInstanceHolder.instance, entry, tx);
if (entry.getUniqueRequestId() == null) {
persistor.delete(tx, entry);
} else {
@@ -316,9 +318,9 @@ public void processNow(TransactionOutboxEntry entry) {
return true;
});
} catch (InvocationTargetException e) {
- updateAttemptCount(entry, e.getCause());
+ updateAttemptCount(entry, e.getCause(), invocationInstanceHolder.getAsRetryPolicyAware());
} catch (Exception e) {
- updateAttemptCount(entry, e);
+ updateAttemptCount(entry, e, invocationInstanceHolder.getAsRetryPolicyAware());
}
if (success != null) {
if (success) {
@@ -330,13 +332,18 @@ public void processNow(TransactionOutboxEntry entry) {
}
}
- private void invoke(TransactionOutboxEntry entry, Transaction transaction)
+ private Object getInvocationInstance(TransactionOutboxEntry entry) {
+ Object invocationInstance = instantiator.getInstance(entry.getInvocation().getClassName());
+ log.debug("Created instance {}", invocationInstance);
+ return invocationInstance;
+ }
+
+ private void invoke(
+ Object invocationInstance, TransactionOutboxEntry entry, Transaction transaction)
throws NoSuchMethodException, IllegalAccessException, InvocationTargetException {
- Object instance = instantiator.getInstance(entry.getInvocation().getClassName());
- log.debug("Created instance {}", instance);
transactionManager
.injectTransaction(entry.getInvocation(), transaction)
- .invoke(instance, listener);
+ .invoke(invocationInstance, listener);
}
private TransactionOutboxEntry newEntry(
@@ -362,11 +369,12 @@ private TransactionOutboxEntry newEntry(
.build();
}
- private void pushBack(Transaction transaction, TransactionOutboxEntry entry)
+ private void pushBack(
+ Transaction transaction, TransactionOutboxEntry entry, Duration waitDuration)
throws OptimisticLockException {
try {
entry.setLastAttemptTime(clockProvider.get().instant());
- entry.setNextAttemptTime(after(attemptFrequency));
+ entry.setNextAttemptTime(after(waitDuration));
validator.validate(entry);
persistor.update(transaction, entry);
} catch (OptimisticLockException e) {
@@ -380,12 +388,23 @@ private Instant after(Duration duration) {
return clockProvider.get().instant().plus(duration).truncatedTo(MILLIS);
}
- private void updateAttemptCount(TransactionOutboxEntry entry, Throwable cause) {
+ private void updateAttemptCount(
+ TransactionOutboxEntry entry, Throwable cause, RetryPolicyAware retryPolicyAware) {
try {
entry.setAttempts(entry.getAttempts() + 1);
+
+ int blockAfterAttempts =
+ retryPolicyAware == null
+ ? this.blockAfterAttempts
+ : retryPolicyAware.blockAfterAttempts(entry.getAttempts(), cause);
+ Duration waitDuration =
+ retryPolicyAware == null
+ ? this.attemptFrequency
+ : retryPolicyAware.waitDuration(entry.getAttempts(), cause);
+
var blocked = (entry.getTopic() == null) && (entry.getAttempts() >= blockAfterAttempts);
entry.setBlocked(blocked);
- transactionManager.inTransactionThrows(tx -> pushBack(tx, entry));
+ transactionManager.inTransactionThrows(tx -> pushBack(tx, entry, waitDuration));
listener.failure(entry, cause);
if (blocked) {
log.error(
@@ -460,4 +479,16 @@ public T schedule(Class clazz) {
return TransactionOutboxImpl.this.schedule(clazz, uniqueRequestId, ordered, delayForAtLeast);
}
}
+
+ private static class InvocationInstanceHolder {
+ Object instance;
+
+ RetryPolicyAware getAsRetryPolicyAware() {
+ if (instance instanceof RetryPolicyAware) {
+ return (RetryPolicyAware) instance;
+ } else {
+ return null;
+ }
+ }
+ }
}
diff --git a/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/AbstractAcceptanceTest.java b/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/AbstractAcceptanceTest.java
index 878c0c17..8e0c36ee 100644
--- a/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/AbstractAcceptanceTest.java
+++ b/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/AbstractAcceptanceTest.java
@@ -476,16 +476,31 @@ public T requireTransactionReturns(
*/
@Test
final void retryBehaviour() throws Exception {
+ retryBehaviour(new FailingInstantiator(3), Duration.ofMillis(500));
+ }
+
+ /**
+ * Runs a piece of work which will fail several times before working successfully. Ensures that
+ * the work runs eventually. Uses RetryPolicyAware to configure retry policy.
+ */
+ @Test
+ final void retryPolicyAwareRetryBehaviour() throws Exception {
+ retryBehaviour(
+ new FailingRetryPolicyAwareInstantiator(3, 3), Duration.ofHours(1) // should be ignored
+ );
+ }
+
+ private void retryBehaviour(FailingInstantiator instantiator, Duration attemptFrequency)
+ throws Exception {
TransactionManager transactionManager = txManager();
CountDownLatch latch = new CountDownLatch(1);
- AtomicInteger attempts = new AtomicInteger();
TransactionOutbox outbox =
TransactionOutbox.builder()
.transactionManager(transactionManager)
.persistor(Persistor.forDialect(connectionDetails().dialect()))
- .instantiator(new FailingInstantiator(attempts))
+ .instantiator(instantiator)
.submitter(Submitter.withExecutor(singleThreadPool))
- .attemptFrequency(Duration.ofMillis(500))
+ .attemptFrequency(attemptFrequency)
.listener(new LatchListener(latch))
.build();
@@ -549,13 +564,12 @@ final void lastAttemptTime_updatesEveryTime() throws Exception {
TransactionManager transactionManager = txManager();
CountDownLatch successLatch = new CountDownLatch(1);
CountDownLatch blockLatch = new CountDownLatch(1);
- AtomicInteger attempts = new AtomicInteger();
var orderedEntryListener = new OrderedEntryListener(successLatch, blockLatch);
TransactionOutbox outbox =
TransactionOutbox.builder()
.transactionManager(transactionManager)
.persistor(Persistor.forDialect(connectionDetails().dialect()))
- .instantiator(new FailingInstantiator(attempts))
+ .instantiator(new FailingInstantiator(3))
.submitter(Submitter.withExecutor(singleThreadPool))
.attemptFrequency(Duration.ofMillis(500))
.listener(orderedEntryListener)
@@ -601,20 +615,36 @@ final void lastAttemptTime_updatesEveryTime() throws Exception {
*/
@Test
final void blockAndThenUnblockForRetry() throws Exception {
+ blockAndThenUnblockForRetry(new FailingInstantiator(3), 2);
+ }
+
+ /**
+ * Runs a piece of work which will fail enough times to enter a blocked state but will then pass
+ * when re-tried after it is unblocked. Uses RetryPolicyAware to configure the number of max
+ * attempts.
+ */
+ @Test
+ final void retryPolicyAwareBlockAndThenUnblockForRetry() throws Exception {
+ blockAndThenUnblockForRetry(
+ new FailingRetryPolicyAwareInstantiator(3, 2), 100 // should be ignored
+ );
+ }
+
+ private void blockAndThenUnblockForRetry(FailingInstantiator instantiator, int blockAfterAttempts)
+ throws Exception {
TransactionManager transactionManager = txManager();
CountDownLatch successLatch = new CountDownLatch(1);
CountDownLatch blockLatch = new CountDownLatch(1);
LatchListener latchListener = new LatchListener(successLatch, blockLatch);
- AtomicInteger attempts = new AtomicInteger();
TransactionOutbox outbox =
TransactionOutbox.builder()
.transactionManager(transactionManager)
.persistor(Persistor.forDialect(connectionDetails().dialect()))
- .instantiator(new FailingInstantiator(attempts))
+ .instantiator(instantiator)
.submitter(Submitter.withExecutor(singleThreadPool))
.attemptFrequency(Duration.ofMillis(500))
.listener(latchListener)
- .blockAfterAttempts(2)
+ .blockAfterAttempts(blockAfterAttempts)
.build();
clearOutbox();
@@ -695,10 +725,12 @@ protected String createTestTable() {
private static class FailingInstantiator implements Instantiator {
- private final AtomicInteger attempts;
+ protected final int attemptsBeforeSuccess;
+
+ protected final AtomicInteger attempts = new AtomicInteger();
- FailingInstantiator(AtomicInteger attempts) {
- this.attempts = attempts;
+ private FailingInstantiator(int attemptsBeforeSuccess) {
+ this.attemptsBeforeSuccess = attemptsBeforeSuccess;
}
@Override
@@ -707,18 +739,60 @@ public String getName(Class> clazz) {
}
@Override
- public Object getInstance(String name) {
+ public InterfaceProcessor getInstance(String name) {
if (!"BEEF".equals(name)) {
throw new UnsupportedOperationException();
}
- return (InterfaceProcessor)
- (foo, bar) -> {
- LOGGER.info("Processing ({}, {})", foo, bar);
- if (attempts.incrementAndGet() < 3) {
- throw new RuntimeException("Temporary failure");
- }
- LOGGER.info("Processed ({}, {})", foo, bar);
- };
+ return (foo, bar) -> {
+ LOGGER.info("Processing ({}, {})", foo, bar);
+ if (attempts.incrementAndGet() < attemptsBeforeSuccess) {
+ throw new RuntimeException("Temporary failure");
+ }
+ LOGGER.info("Processed ({}, {})", foo, bar);
+ };
+ }
+ }
+
+ private static class FailingRetryPolicyAwareInstantiator extends FailingInstantiator
+ implements Instantiator {
+
+ private final int blockAfterAttempts;
+
+ private FailingRetryPolicyAwareInstantiator(int attemptsBeforeSuccess, int blockAfterAttempts) {
+ super(attemptsBeforeSuccess);
+ this.blockAfterAttempts = blockAfterAttempts;
+ }
+
+ @Override
+ public InterfaceProcessor getInstance(String name) {
+ return new RetryPolicyAwareInterfaceProcessor(super.getInstance(name));
+ }
+
+ private class RetryPolicyAwareInterfaceProcessor
+ implements InterfaceProcessor, RetryPolicyAware {
+
+ private final InterfaceProcessor processor;
+
+ RetryPolicyAwareInterfaceProcessor(InterfaceProcessor processor) {
+ this.processor = processor;
+ }
+
+ @Override
+ public Duration waitDuration(int attempt, Throwable throwable) {
+ Duration waitDuration = Duration.ofMillis(100L * attempt);
+ LOGGER.info("Waiting {} for attempt {}", waitDuration, attempt);
+ return waitDuration;
+ }
+
+ @Override
+ public int blockAfterAttempts(int attempt, Throwable throwable) {
+ return blockAfterAttempts;
+ }
+
+ @Override
+ public void process(int foo, String bar) {
+ processor.process(foo, bar);
+ }
}
}