diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultDialect.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultDialect.java index 5bf5d59f..f148dc46 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultDialect.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultDialect.java @@ -166,6 +166,12 @@ static final class Builder { "Add flush index to support ordering", "CREATE INDEX IX_TXNO_OUTBOX_2 ON TXNO_OUTBOX (topic, processed, seq)")); migrations.put(13, new Migration(13, "Enforce UTF8 collation for outbox messages", null)); + migrations.put( + 14, + new Migration( + 14, + "Add retryOptions column to outbox", + "ALTER TABLE TXNO_OUTBOX ADD COLUMN retryOptions TEXT NULL AFTER invocation")); } Builder setMigration(Migration migration) { diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultInvocationSerializer.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultInvocationSerializer.java index 004e780d..03dfc811 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultInvocationSerializer.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultInvocationSerializer.java @@ -65,7 +65,7 @@ public final class DefaultInvocationSerializer implements InvocationSerializer { @Builder DefaultInvocationSerializer(Set> serializableTypes, Integer version) { - this.gson = + var gsonBuilder = new GsonBuilder() .registerTypeAdapter( Invocation.class, @@ -82,7 +82,14 @@ public final class DefaultInvocationSerializer implements InvocationSerializer { .registerTypeAdapter(Period.class, new PeriodTypeAdapter()) .registerTypeAdapter(Year.class, new YearTypeAdapter()) .registerTypeAdapter(YearMonth.class, new YearMonthAdapter()) - .excludeFieldsWithModifiers(Modifier.TRANSIENT, Modifier.STATIC) + .excludeFieldsWithModifiers(Modifier.TRANSIENT, Modifier.STATIC); + this.gson = + gsonBuilder + .create() + .newBuilder() + .registerTypeHierarchyAdapter( + NextRetryStrategy.Options.class, + new NextRetryStrategyOptionsAdapter<>(gsonBuilder.create())) .create(); } @@ -104,6 +111,24 @@ public Invocation deserializeInvocation(Reader reader) throws IOException { } } + @Override + public void serializeRetryOptions(NextRetryStrategy.Options retryOptions, Writer writer) { + try { + gson.toJson(retryOptions, writer); + } catch (Exception e) { + throw new IllegalArgumentException("Cannot serialize " + retryOptions, e); + } + } + + @Override + public NextRetryStrategy.Options deserializeRetryOptions(Reader reader) throws IOException { + try { + return gson.fromJson(reader, NextRetryStrategy.Options.class); + } catch (JsonIOException | JsonSyntaxException exception) { + throw new IOException(exception); + } + } + private static final class InvocationJsonSerializer implements JsonSerializer, JsonDeserializer { @@ -690,4 +715,42 @@ private static int parseInt(String value, int beginIndex, int endIndex) return -result; } } + + private static class NextRetryStrategyOptionsAdapter + extends TypeAdapter { + + private static final String CLASS_NAME = "cn"; + private static final String INSTANCE_FIELDS_VALUES = "ifv"; + private final Gson parentGson; + + NextRetryStrategyOptionsAdapter(Gson parentGson) { + this.parentGson = parentGson; + } + + @Override + public void write(JsonWriter out, T options) throws IOException { + out.beginObject(); + out.name(CLASS_NAME).value(options.getClass().getName()); + out.name(INSTANCE_FIELDS_VALUES).value(parentGson.toJson(options, options.getClass())); + out.endObject(); + } + + @Override + public T read(JsonReader in) { + var json = parentGson.fromJson(in, JsonObject.class); + var jsonData = ((JsonObject) json).get(INSTANCE_FIELDS_VALUES).getAsString(); + var jsonClazz = ((JsonObject) json).get(CLASS_NAME).getAsString(); + var clazz = getObjectClass(jsonClazz); + return (T) parentGson.fromJson(jsonData, clazz); + } + + /****** Helper method to get the className of the object to be deserialized *****/ + Class getObjectClass(String className) { + try { + return Class.forName(className); + } catch (ClassNotFoundException e) { + throw new JsonParseException(e.getMessage()); + } + } + } } diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultPersistor.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultPersistor.java index ff267596..70ce7fa4 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultPersistor.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultPersistor.java @@ -1,16 +1,7 @@ package com.gruelbox.transactionoutbox; -import java.io.IOException; -import java.io.Reader; -import java.io.StringWriter; -import java.io.Writer; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.SQLIntegrityConstraintViolationException; -import java.sql.SQLTimeoutException; -import java.sql.Statement; -import java.sql.Timestamp; +import java.io.*; +import java.sql.*; import java.time.Instant; import java.util.ArrayList; import java.util.Collection; @@ -39,7 +30,7 @@ public class DefaultPersistor implements Persistor, Validatable { private static final String ALL_FIELDS = - "id, uniqueRequestId, invocation, topic, seq, lastAttemptTime, nextAttemptTime, attempts, blocked, processed, version"; + "id, uniqueRequestId, invocation, topic, seq, lastAttemptTime, nextAttemptTime, attempts, blocked, processed, version, retryOptions"; /** * @param writeLockTimeoutSeconds How many seconds to wait before timing out on obtaining a write @@ -116,15 +107,20 @@ public void save(Transaction tx, TransactionOutboxEntry entry) + tableName + " (" + ALL_FIELDS - + ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; - var writer = new StringWriter(); - serializer.serializeInvocation(entry.getInvocation(), writer); + + ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"; + var invocationWriter = new StringWriter(); + serializer.serializeInvocation(entry.getInvocation(), invocationWriter); + StringWriter retryWriter = null; + if (entry.getRetryOptions() != null) { + retryWriter = new StringWriter(); + serializer.serializeRetryOptions(entry.getRetryOptions(), retryWriter); + } if (entry.getTopic() != null) { setNextSequence(tx, entry); log.info("Assigned sequence number {} to topic {}", entry.getSequence(), entry.getTopic()); } PreparedStatement stmt = tx.prepareBatchStatement(insertSql); - setupInsert(entry, writer, stmt); + setupInsert(entry, invocationWriter, retryWriter, stmt); if (entry.getUniqueRequestId() == null) { stmt.addBatch(); log.debug("Inserted {} in batch", entry.description()); @@ -184,7 +180,7 @@ private boolean indexViolation(Exception e) { } private void setupInsert( - TransactionOutboxEntry entry, StringWriter writer, PreparedStatement stmt) + TransactionOutboxEntry entry, StringWriter writer, StringWriter retryWriter, PreparedStatement stmt) throws SQLException { stmt.setString(1, entry.getId()); stmt.setString(2, entry.getUniqueRequestId()); @@ -202,6 +198,11 @@ private void setupInsert( stmt.setBoolean(9, entry.isBlocked()); stmt.setBoolean(10, entry.isProcessed()); stmt.setInt(11, entry.getVersion()); + if (retryWriter == null) { + stmt.setNull(12, Types.VARCHAR); + } else { + stmt.setString(12, retryWriter.toString()); + } } @Override @@ -409,6 +410,11 @@ private TransactionOutboxEntry map(ResultSet rs) throws SQLException, IOExceptio } catch (IOException e) { invocation = new FailedDeserializingInvocation(e); } + var retryOptionsString = rs.getString("retryOptions"); + NextRetryStrategy.Options options = null; + if (retryOptionsString != null) { + options = serializer.deserializeRetryOptions(new StringReader(retryOptionsString)); + } TransactionOutboxEntry entry = TransactionOutboxEntry.builder() .invocation(invocation) @@ -416,6 +422,7 @@ private TransactionOutboxEntry map(ResultSet rs) throws SQLException, IOExceptio .uniqueRequestId(rs.getString("uniqueRequestId")) .topic("*".equals(topic) ? null : topic) .sequence(sequence) + .retryOptions(options) .lastAttemptTime( rs.getTimestamp("lastAttemptTime") == null ? null diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultRetryOptions.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultRetryOptions.java new file mode 100644 index 00000000..a819c081 --- /dev/null +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultRetryOptions.java @@ -0,0 +1,22 @@ +package com.gruelbox.transactionoutbox; + +import java.time.Duration; +import lombok.AccessLevel; +import lombok.RequiredArgsConstructor; +import lombok.Value; + +@Value +@RequiredArgsConstructor(access = AccessLevel.PRIVATE) +public class DefaultRetryOptions implements NextRetryStrategy.Options { + + Duration attemptFrequency; + + @Override + public String strategyClassName() { + return DefaultRetryStrategy.class.getName(); + } + + public static DefaultRetryOptions withFrequency(Duration attemptFrequency) { + return new DefaultRetryOptions(attemptFrequency); + } +} diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultRetryStrategy.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultRetryStrategy.java new file mode 100644 index 00000000..f376a665 --- /dev/null +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/DefaultRetryStrategy.java @@ -0,0 +1,11 @@ +package com.gruelbox.transactionoutbox; + +import java.time.Duration; + +public class DefaultRetryStrategy implements NextRetryStrategy { + + @Override + public Duration nextAttemptDelay(DefaultRetryOptions parameters, TransactionOutboxEntry entry) { + return parameters.getAttemptFrequency(); + } +} diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Dialect.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Dialect.java index 6b3d820e..caaed1b0 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Dialect.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/Dialect.java @@ -140,6 +140,7 @@ public interface Dialect { .changeMigration( 11, "CREATE TABLE TXNO_SEQUENCE (topic VARCHAR(250) NOT NULL, seq NUMBER NOT NULL, CONSTRAINT PK_TXNO_SEQUENCE PRIMARY KEY (topic, seq))") + .changeMigration(14, "ALTER TABLE TXNO_OUTBOX ADD retryOptions CLOB NULL") .booleanValueFrom(v -> v ? "1" : "0") .createVersionTableBy( connection -> { @@ -216,6 +217,7 @@ public interface Dialect { 11, "CREATE TABLE TXNO_SEQUENCE (topic VARCHAR(250) NOT NULL, seq INT NOT NULL, CONSTRAINT " + "PK_TXNO_SEQUENCE PRIMARY KEY (topic, seq))") + .changeMigration(14, "ALTER TABLE TXNO_OUTBOX ADD COLUMN retryOptions NVARCHAR(MAX) NULL") .createVersionTableBy( connection -> { try (Statement s = connection.createStatement()) { diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/ExponentialBackOffOptions.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/ExponentialBackOffOptions.java new file mode 100644 index 00000000..bb67bade --- /dev/null +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/ExponentialBackOffOptions.java @@ -0,0 +1,23 @@ +package com.gruelbox.transactionoutbox; + +import java.time.Duration; +import lombok.AccessLevel; +import lombok.RequiredArgsConstructor; +import lombok.Value; + +@Value +@RequiredArgsConstructor(access = AccessLevel.PRIVATE) +public class ExponentialBackOffOptions implements NextRetryStrategy.Options { + + Duration attemptFrequency; + int backOff; + + public static ExponentialBackOffOptions exponential(Duration attemptFrequency, int backOff) { + return new ExponentialBackOffOptions(attemptFrequency, backOff); + } + + @Override + public String strategyClassName() { + return ExponentialBackOffStrategy.class.getName(); + } +} diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/ExponentialBackOffStrategy.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/ExponentialBackOffStrategy.java new file mode 100644 index 00000000..5fde80b2 --- /dev/null +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/ExponentialBackOffStrategy.java @@ -0,0 +1,14 @@ +package com.gruelbox.transactionoutbox; + +import java.time.Duration; + +public class ExponentialBackOffStrategy implements NextRetryStrategy { + + @Override + public Duration nextAttemptDelay( + ExponentialBackOffOptions parameters, TransactionOutboxEntry entry) { + return parameters + .getAttemptFrequency() + .multipliedBy(parameters.getBackOff() ^ entry.getAttempts()); + } +} diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/InvocationSerializer.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/InvocationSerializer.java index 20af181f..a4224f9c 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/InvocationSerializer.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/InvocationSerializer.java @@ -42,4 +42,12 @@ static InvocationSerializer createDefaultJsonSerializer() { * @return The deserialized invocation. */ Invocation deserializeInvocation(Reader reader) throws IOException; + + default void serializeRetryOptions(NextRetryStrategy.Options retryOptions, Writer writer) { + throw new UnsupportedOperationException("Serialize retry options not implemented"); + } + + default NextRetryStrategy.Options deserializeRetryOptions(Reader reader) throws IOException { + throw new UnsupportedOperationException("Deserialize retry options not implemented"); + } } diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/NextRetryStrategy.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/NextRetryStrategy.java new file mode 100644 index 00000000..a7c1b4f9 --- /dev/null +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/NextRetryStrategy.java @@ -0,0 +1,11 @@ +package com.gruelbox.transactionoutbox; + +import java.time.Duration; + +public interface NextRetryStrategy { + Duration nextAttemptDelay(T parameters, TransactionOutboxEntry entry); + + interface Options { + String strategyClassName(); + } +} diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutbox.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutbox.java index 8f0afe64..996a3633 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutbox.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutbox.java @@ -166,6 +166,7 @@ abstract class TransactionOutboxBuilder { protected TransactionManager transactionManager; protected Instantiator instantiator; protected Submitter submitter; + protected NextRetryStrategy.Options retryOptions; protected Duration attemptFrequency; protected int blockAfterAttempts; protected int flushBatchSize; @@ -213,6 +214,11 @@ public TransactionOutboxBuilder submitter(Submitter submitter) { return this; } + public TransactionOutboxBuilder retryOptions(NextRetryStrategy.Options options) { + this.retryOptions = options; + return this; + } + /** * @param attemptFrequency How often tasks should be re-attempted. This should be balanced with * {@link #flushBatchSize} and the frequency with which {@link #flush()} is called to @@ -348,6 +354,12 @@ interface ParameterizedScheduleBuilder { */ ParameterizedScheduleBuilder uniqueRequestId(String uniqueRequestId); + ParameterizedScheduleBuilder retryOptions(NextRetryStrategy.Options options); + + default ParameterizedScheduleBuilder attemptFrequency(Duration attemptFrequency) { + return retryOptions(DefaultRetryOptions.withFrequency(attemptFrequency)); + } + /** * Specifies that the request should be applied in a strictly-ordered fashion within the * specified topic. diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxEntry.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxEntry.java index 01749722..0a4543a8 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxEntry.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxEntry.java @@ -56,6 +56,10 @@ public class TransactionOutboxEntry implements Validatable { @Setter(AccessLevel.PACKAGE) private Invocation invocation; + @Getter + @Setter(AccessLevel.PACKAGE) + private NextRetryStrategy.Options retryOptions; + /** * @param lastAttemptTime The timestamp at which the task was last processed. * @return The timestamp at which the task was last processed. diff --git a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java index 4b018311..6a9ca83e 100644 --- a/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java +++ b/transactionoutbox-core/src/main/java/com/gruelbox/transactionoutbox/TransactionOutboxImpl.java @@ -3,7 +3,6 @@ import static com.gruelbox.transactionoutbox.spi.Utils.logAtLevel; import static com.gruelbox.transactionoutbox.spi.Utils.uncheckedly; import static java.time.temporal.ChronoUnit.MILLIS; -import static java.time.temporal.ChronoUnit.MINUTES; import com.gruelbox.transactionoutbox.spi.ProxyFactory; import com.gruelbox.transactionoutbox.spi.Utils; @@ -36,7 +35,7 @@ final class TransactionOutboxImpl implements TransactionOutbox, Validatable { private final Persistor persistor; private final Instantiator instantiator; private final Submitter submitter; - private final Duration attemptFrequency; + private final NextRetryStrategy.Options retryOptions; private final Level logLevelTemporaryFailure; private final int blockAfterAttempts; private final int flushBatchSize; @@ -55,7 +54,7 @@ public void validate(Validator validator) { validator.valid("persistor", persistor); validator.valid("instantiator", instantiator); validator.valid("submitter", submitter); - validator.notNull("attemptFrequency", attemptFrequency); + validator.notNull("retryOptions", retryOptions); validator.notNull("logLevelTemporaryFailure", logLevelTemporaryFailure); validator.min("blockAfterAttempts", blockAfterAttempts, 1); validator.min("flushBatchSize", flushBatchSize, 1); @@ -82,7 +81,7 @@ public void initialize() { @Override public T schedule(Class clazz) { - return schedule(clazz, null, null, null); + return schedule(clazz, null, null, null, null); } @Override @@ -233,7 +232,11 @@ public boolean unblock(String entryId, Object transactionContext) { } private T schedule( - Class clazz, String uniqueRequestId, String topic, Duration delayForAtLeast) { + Class clazz, + String uniqueRequestId, + String topic, + Duration delayForAtLeast, + NextRetryStrategy.Options retryOptions) { if (!initialized.get()) { throw new IllegalStateException("Not initialized"); } @@ -250,6 +253,7 @@ private T schedule( extracted.getParameters(), extracted.getArgs(), uniqueRequestId, + retryOptions, topic); if (delayForAtLeast != null) { entry.setNextAttemptTime(entry.getNextAttemptTime().plus(delayForAtLeast)); @@ -267,7 +271,8 @@ private T schedule( submitNow(entry); log.debug( "Scheduled {} for post-commit execution", entry.description()); - } else if (delayForAtLeast.compareTo(attemptFrequency) < 0) { + } else if (delayForAtLeast.compareTo(getNextAttemptDelayFrom(entry)) + < 0) { scheduler.schedule( () -> submitNow(entry), delayForAtLeast.toMillis(), @@ -353,6 +358,7 @@ private TransactionOutboxEntry newEntry( Class[] params, Object[] args, String uniqueRequestId, + NextRetryStrategy.Options retryOptions, String topic) { return TransactionOutboxEntry.builder() .id(UUID.randomUUID().toString()) @@ -363,6 +369,7 @@ private TransactionOutboxEntry newEntry( params, args, serializeMdc && (MDC.getMDCAdapter() != null) ? MDC.getCopyOfContextMap() : null)) + .retryOptions(retryOptions) .lastAttemptTime(null) .nextAttemptTime(clockProvider.get().instant()) .uniqueRequestId(uniqueRequestId) @@ -373,8 +380,9 @@ private TransactionOutboxEntry newEntry( private void pushBack(Transaction transaction, TransactionOutboxEntry entry) throws OptimisticLockException { try { + var nextAttemptDelay = getNextAttemptDelayFrom(entry); entry.setLastAttemptTime(clockProvider.get().instant()); - entry.setNextAttemptTime(after(attemptFrequency)); + entry.setNextAttemptTime(after(nextAttemptDelay)); validator.validate(entry); persistor.update(transaction, entry); } catch (OptimisticLockException e) { @@ -384,6 +392,19 @@ private void pushBack(Transaction transaction, TransactionOutboxEntry entry) } } + @SuppressWarnings("rawtypes,unchecked") + private Duration getNextAttemptDelayFrom(TransactionOutboxEntry entry) { + NextRetryStrategy.Options options = + Utils.firstNonNull(entry.getRetryOptions(), () -> this.retryOptions); + NextRetryStrategy retryStrategy; + if (options instanceof DefaultRetryOptions) { + retryStrategy = new DefaultRetryStrategy(); + } else { + retryStrategy = (NextRetryStrategy) instantiator.getInstance(options.strategyClassName()); + } + return retryStrategy.nextAttemptDelay(options, entry); + } + private Instant after(Duration duration) { return clockProvider.get().instant().plus(duration).truncatedTo(MILLIS); } @@ -429,13 +450,20 @@ static class TransactionOutboxBuilderImpl extends TransactionOutboxBuilder { public TransactionOutboxImpl build() { Validator validator = new Validator(this.clockProvider); + NextRetryStrategy.Options retryOptions = this.retryOptions; + if (retryOptions == null) { + retryOptions = + DefaultRetryOptions.withFrequency( + Utils.firstNonNull(attemptFrequency, () -> Duration.ofMinutes(2))); + } + TransactionOutboxImpl impl = new TransactionOutboxImpl( transactionManager, persistor, Utils.firstNonNull(instantiator, Instantiator::usingReflection), Utils.firstNonNull(submitter, Submitter::withDefaultExecutor), - Utils.firstNonNull(attemptFrequency, () -> Duration.of(2, MINUTES)), + retryOptions, Utils.firstNonNull(logLevelTemporaryFailure, () -> Level.WARN), blockAfterAttempts < 1 ? 5 : blockAfterAttempts, flushBatchSize < 1 ? 4096 : flushBatchSize, @@ -459,13 +487,15 @@ private class ParameterizedScheduleBuilderImpl implements ParameterizedScheduleB private String uniqueRequestId; private String ordered; private Duration delayForAtLeast; + private NextRetryStrategy.Options retryOptions; @Override public T schedule(Class clazz) { if (uniqueRequestId != null && uniqueRequestId.length() > 250) { throw new IllegalArgumentException("uniqueRequestId may be up to 250 characters"); } - return TransactionOutboxImpl.this.schedule(clazz, uniqueRequestId, ordered, delayForAtLeast); + return TransactionOutboxImpl.this.schedule( + clazz, uniqueRequestId, ordered, delayForAtLeast, retryOptions); } } } diff --git a/transactionoutbox-core/src/test/java/com/gruelbox/transactionoutbox/AbstractTestDefaultInvocationSerializer.java b/transactionoutbox-core/src/test/java/com/gruelbox/transactionoutbox/AbstractTestDefaultInvocationSerializer.java index 3ef2f10b..1c708e6c 100644 --- a/transactionoutbox-core/src/test/java/com/gruelbox/transactionoutbox/AbstractTestDefaultInvocationSerializer.java +++ b/transactionoutbox-core/src/test/java/com/gruelbox/transactionoutbox/AbstractTestDefaultInvocationSerializer.java @@ -10,10 +10,15 @@ import java.time.temporal.ChronoUnit; import java.util.*; import lombok.Getter; +import lombok.Value; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +// TODO; add tests. +// confirm default options work as expected (can serialise/ deserialise) +// also a custom impl. + @SuppressWarnings("RedundantCast") @Slf4j abstract class AbstractTestDefaultInvocationSerializer { @@ -216,6 +221,28 @@ void testDeserializationException() { IOException.class, () -> serializer.deserializeInvocation(new StringReader("unparseable"))); } + @Test + void testRetryOptions() { + check(DefaultRetryOptions.withFrequency(Duration.ofSeconds(5))); + check(new RandomRetryOptions()); + } + + void check(NextRetryStrategy.Options options) { + var deserialized = serdeser(options); + Assertions.assertEquals(options, deserialized); + } + + NextRetryStrategy.Options serdeser(NextRetryStrategy.Options options) { + try { + var writer = new StringWriter(); + serializer.serializeRetryOptions(options, writer); + log.info("Serialised as: {}", writer); + return serializer.deserializeRetryOptions(new StringReader(writer.toString())); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + void check(Invocation invocation) { Invocation deserialized = serdeser(invocation); Assertions.assertEquals(deserialized, serdeser(invocation)); @@ -267,4 +294,15 @@ public int hashCode() { return Objects.hash(arg1, arg2); } } + + @Value + static class RandomRetryOptions implements NextRetryStrategy.Options { + String testing = "here"; + Map testMap = new HashMap<>(Map.of("A", Duration.ZERO)); + + @Override + public String strategyClassName() { + return "RANDOM_RETRY_OPT"; + } + } } diff --git a/transactionoutbox-jackson/src/test/java/com/gruelbox/transactionoutbox/jackson/TestJacksonInvocationSerializer.java b/transactionoutbox-jackson/src/test/java/com/gruelbox/transactionoutbox/jackson/TestJacksonInvocationSerializer.java index 0ad39a0a..c966e0a4 100644 --- a/transactionoutbox-jackson/src/test/java/com/gruelbox/transactionoutbox/jackson/TestJacksonInvocationSerializer.java +++ b/transactionoutbox-jackson/src/test/java/com/gruelbox/transactionoutbox/jackson/TestJacksonInvocationSerializer.java @@ -19,6 +19,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +// TODO - test serialisation for jackson @SuppressWarnings("RedundantCast") class TestJacksonInvocationSerializer { diff --git a/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/AbstractAcceptanceTest.java b/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/AbstractAcceptanceTest.java index 69f86934..d13da3d3 100644 --- a/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/AbstractAcceptanceTest.java +++ b/transactionoutbox-testing/src/main/java/com/gruelbox/transactionoutbox/testing/AbstractAcceptanceTest.java @@ -1,5 +1,6 @@ package com.gruelbox.transactionoutbox.testing; +import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; import static java.util.stream.Collectors.toList; import static java.util.stream.Collectors.toMap; @@ -21,7 +22,10 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; +import java.util.stream.Collectors; import java.util.stream.IntStream; +import lombok.AllArgsConstructor; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.*; @@ -501,6 +505,71 @@ final void retryBehaviour() throws Exception { singleThreadPool); } + @Test + final void customDefaultRetryOptionsBehaviour() throws Exception { + TransactionManager transactionManager = txManager(); + CountDownLatch latch = new CountDownLatch(1); + AtomicInteger attempts = new AtomicInteger(); + + var listener = new ProcessedEntryListener(latch); + TransactionOutbox outbox = + TransactionOutbox.builder() + .transactionManager(transactionManager) + .persistor(Persistor.forDialect(connectionDetails().dialect())) + .instantiator(Instantiator.usingReflection()) + .instantiator( + RegisteredFuncInstantiator.register( + Map.entry( + DefaultRetryOptions.class, + clazz -> Instantiator.usingReflection().getInstance(clazz.getName())), + Map.entry( + DefaultRetryStrategy.class, + clazz -> Instantiator.usingReflection().getInstance(clazz.getName())), + Map.entry( + InterfaceProcessor.class, + clazz -> + (InterfaceProcessor) + (foo, bar) -> { + LOGGER.info("Processing ({}, {})", foo, bar); + if (attempts.incrementAndGet() < foo) { + throw new RuntimeException("Temporary failure"); + } + LOGGER.info("Processed ({}, {})", foo, bar); + }))) + .submitter(Submitter.withExecutor(singleThreadPool)) + .attemptFrequency(Duration.ofMillis(500)) // the og default + .listener(listener) + .blockAfterAttempts(10) + .build(); + + clearOutbox(); + + withRunningFlusher( + outbox, + () -> { + transactionManager.inTransaction( + () -> + outbox + .with() + .retryOptions(DefaultRetryOptions.withFrequency(Duration.ofSeconds(5))) + .schedule(InterfaceProcessor.class) + .process(5, "Whee")); + assertTrue(latch.await(1, MINUTES)); + }, + singleThreadPool); + + var failingEntries = listener.getFailingEntries(); + var nextAttemptAt = + failingEntries.stream().map(TransactionOutboxEntry::getLastAttemptTime).collect(toList()); + var firstNextAttemptTime = nextAttemptAt.get(0); + var counter = 1; + for (var attempt : nextAttemptAt.stream().skip(1).collect(toList())) { + assertTrue( + attempt.isAfter(firstNextAttemptTime.plus(Duration.ofSeconds(5).multipliedBy(counter)))); + counter++; + } + } + @Test final void flushOnlyASpecifiedTopic() throws Exception { TransactionManager transactionManager = txManager(); @@ -813,4 +882,39 @@ public Object getInstance(String name) { } } } + + @AllArgsConstructor + private static class RegisteredFuncInstantiator implements Instantiator { + Map, Function, Object>> registeredItems; + + @Override + public String getName(Class clazz) { + return clazz.getName(); + } + + @Override + public Object getInstance(String name) { + var clazz = getClass(name); + if (!registeredItems.containsKey(clazz)) { + throw new UnsupportedOperationException(); + } + var item = registeredItems.get(clazz); + return item.apply(clazz); + } + + static RegisteredFuncInstantiator register( + Map.Entry, Function, Object>>... clazzFns) { + Map, Function, Object>> map = + Arrays.stream(clazzFns).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + return new RegisteredFuncInstantiator(map); + } + + private Class getClass(String name) { + try { + return Class.forName(name); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + } + } }