Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,12 @@ static final class Builder {
"Add flush index to support ordering",
"CREATE INDEX IX_TXNO_OUTBOX_2 ON TXNO_OUTBOX (topic, processed, seq)"));
migrations.put(13, new Migration(13, "Enforce UTF8 collation for outbox messages", null));
migrations.put(
14,
new Migration(
14,
"Add retryOptions column to outbox",
"ALTER TABLE TXNO_OUTBOX ADD COLUMN retryOptions TEXT NULL AFTER invocation"));
}

Builder setMigration(Migration migration) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public final class DefaultInvocationSerializer implements InvocationSerializer {

@Builder
DefaultInvocationSerializer(Set<Class<?>> serializableTypes, Integer version) {
this.gson =
var gsonBuilder =
new GsonBuilder()
.registerTypeAdapter(
Invocation.class,
Expand All @@ -82,7 +82,14 @@ public final class DefaultInvocationSerializer implements InvocationSerializer {
.registerTypeAdapter(Period.class, new PeriodTypeAdapter())
.registerTypeAdapter(Year.class, new YearTypeAdapter())
.registerTypeAdapter(YearMonth.class, new YearMonthAdapter())
.excludeFieldsWithModifiers(Modifier.TRANSIENT, Modifier.STATIC)
.excludeFieldsWithModifiers(Modifier.TRANSIENT, Modifier.STATIC);
this.gson =
gsonBuilder
.create()
.newBuilder()
.registerTypeHierarchyAdapter(
NextRetryStrategy.Options.class,
new NextRetryStrategyOptionsAdapter<>(gsonBuilder.create()))
.create();
}

Expand All @@ -104,6 +111,24 @@ public Invocation deserializeInvocation(Reader reader) throws IOException {
}
}

@Override
public void serializeRetryOptions(NextRetryStrategy.Options retryOptions, Writer writer) {
try {
gson.toJson(retryOptions, writer);
} catch (Exception e) {
throw new IllegalArgumentException("Cannot serialize " + retryOptions, e);
}
}

@Override
public NextRetryStrategy.Options deserializeRetryOptions(Reader reader) throws IOException {
try {
return gson.fromJson(reader, NextRetryStrategy.Options.class);
} catch (JsonIOException | JsonSyntaxException exception) {
throw new IOException(exception);
}
}

private static final class InvocationJsonSerializer
implements JsonSerializer<Invocation>, JsonDeserializer<Invocation> {

Expand Down Expand Up @@ -690,4 +715,42 @@ private static int parseInt(String value, int beginIndex, int endIndex)
return -result;
}
}

private static class NextRetryStrategyOptionsAdapter<T extends NextRetryStrategy.Options>
extends TypeAdapter<T> {

private static final String CLASS_NAME = "cn";
private static final String INSTANCE_FIELDS_VALUES = "ifv";
private final Gson parentGson;

NextRetryStrategyOptionsAdapter(Gson parentGson) {
this.parentGson = parentGson;
}

@Override
public void write(JsonWriter out, T options) throws IOException {
out.beginObject();
out.name(CLASS_NAME).value(options.getClass().getName());
out.name(INSTANCE_FIELDS_VALUES).value(parentGson.toJson(options, options.getClass()));
out.endObject();
}

@Override
public T read(JsonReader in) {
var json = parentGson.fromJson(in, JsonObject.class);
var jsonData = ((JsonObject) json).get(INSTANCE_FIELDS_VALUES).getAsString();
var jsonClazz = ((JsonObject) json).get(CLASS_NAME).getAsString();
var clazz = getObjectClass(jsonClazz);
return (T) parentGson.fromJson(jsonData, clazz);
}

/****** Helper method to get the className of the object to be deserialized *****/
Class<?> getObjectClass(String className) {
try {
return Class.forName(className);
} catch (ClassNotFoundException e) {
throw new JsonParseException(e.getMessage());
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,7 @@
package com.gruelbox.transactionoutbox;

import java.io.IOException;
import java.io.Reader;
import java.io.StringWriter;
import java.io.Writer;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.SQLIntegrityConstraintViolationException;
import java.sql.SQLTimeoutException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.io.*;
import java.sql.*;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -39,7 +30,7 @@
public class DefaultPersistor implements Persistor, Validatable {

private static final String ALL_FIELDS =
"id, uniqueRequestId, invocation, topic, seq, lastAttemptTime, nextAttemptTime, attempts, blocked, processed, version";
"id, uniqueRequestId, invocation, topic, seq, lastAttemptTime, nextAttemptTime, attempts, blocked, processed, version, retryOptions";

/**
* @param writeLockTimeoutSeconds How many seconds to wait before timing out on obtaining a write
Expand Down Expand Up @@ -116,15 +107,20 @@ public void save(Transaction tx, TransactionOutboxEntry entry)
+ tableName
+ " ("
+ ALL_FIELDS
+ ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
var writer = new StringWriter();
serializer.serializeInvocation(entry.getInvocation(), writer);
+ ") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
var invocationWriter = new StringWriter();
serializer.serializeInvocation(entry.getInvocation(), invocationWriter);
StringWriter retryWriter = null;
if (entry.getRetryOptions() != null) {
retryWriter = new StringWriter();
serializer.serializeRetryOptions(entry.getRetryOptions(), retryWriter);
}
if (entry.getTopic() != null) {
setNextSequence(tx, entry);
log.info("Assigned sequence number {} to topic {}", entry.getSequence(), entry.getTopic());
}
PreparedStatement stmt = tx.prepareBatchStatement(insertSql);
setupInsert(entry, writer, stmt);
setupInsert(entry, invocationWriter, retryWriter, stmt);
if (entry.getUniqueRequestId() == null) {
stmt.addBatch();
log.debug("Inserted {} in batch", entry.description());
Expand Down Expand Up @@ -184,7 +180,7 @@ private boolean indexViolation(Exception e) {
}

private void setupInsert(
TransactionOutboxEntry entry, StringWriter writer, PreparedStatement stmt)
TransactionOutboxEntry entry, StringWriter writer, StringWriter retryWriter, PreparedStatement stmt)
throws SQLException {
stmt.setString(1, entry.getId());
stmt.setString(2, entry.getUniqueRequestId());
Expand All @@ -202,6 +198,11 @@ private void setupInsert(
stmt.setBoolean(9, entry.isBlocked());
stmt.setBoolean(10, entry.isProcessed());
stmt.setInt(11, entry.getVersion());
if (retryWriter == null) {
stmt.setNull(12, Types.VARCHAR);
} else {
stmt.setString(12, retryWriter.toString());
}
}

@Override
Expand Down Expand Up @@ -409,13 +410,19 @@ private TransactionOutboxEntry map(ResultSet rs) throws SQLException, IOExceptio
} catch (IOException e) {
invocation = new FailedDeserializingInvocation(e);
}
var retryOptionsString = rs.getString("retryOptions");
NextRetryStrategy.Options options = null;
if (retryOptionsString != null) {
options = serializer.deserializeRetryOptions(new StringReader(retryOptionsString));
}
TransactionOutboxEntry entry =
TransactionOutboxEntry.builder()
.invocation(invocation)
.id(rs.getString("id"))
.uniqueRequestId(rs.getString("uniqueRequestId"))
.topic("*".equals(topic) ? null : topic)
.sequence(sequence)
.retryOptions(options)
.lastAttemptTime(
rs.getTimestamp("lastAttemptTime") == null
? null
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package com.gruelbox.transactionoutbox;

import java.time.Duration;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import lombok.Value;

@Value
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
public class DefaultRetryOptions implements NextRetryStrategy.Options {

Duration attemptFrequency;

@Override
public String strategyClassName() {
return DefaultRetryStrategy.class.getName();
}

public static DefaultRetryOptions withFrequency(Duration attemptFrequency) {
return new DefaultRetryOptions(attemptFrequency);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.gruelbox.transactionoutbox;

import java.time.Duration;

public class DefaultRetryStrategy implements NextRetryStrategy<DefaultRetryOptions> {

@Override
public Duration nextAttemptDelay(DefaultRetryOptions parameters, TransactionOutboxEntry entry) {
return parameters.getAttemptFrequency();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ public interface Dialect {
.changeMigration(
11,
"CREATE TABLE TXNO_SEQUENCE (topic VARCHAR(250) NOT NULL, seq NUMBER NOT NULL, CONSTRAINT PK_TXNO_SEQUENCE PRIMARY KEY (topic, seq))")
.changeMigration(14, "ALTER TABLE TXNO_OUTBOX ADD retryOptions CLOB NULL")
.booleanValueFrom(v -> v ? "1" : "0")
.createVersionTableBy(
connection -> {
Expand Down Expand Up @@ -216,6 +217,7 @@ public interface Dialect {
11,
"CREATE TABLE TXNO_SEQUENCE (topic VARCHAR(250) NOT NULL, seq INT NOT NULL, CONSTRAINT "
+ "PK_TXNO_SEQUENCE PRIMARY KEY (topic, seq))")
.changeMigration(14, "ALTER TABLE TXNO_OUTBOX ADD COLUMN retryOptions NVARCHAR(MAX) NULL")
.createVersionTableBy(
connection -> {
try (Statement s = connection.createStatement()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.gruelbox.transactionoutbox;

import java.time.Duration;
import lombok.AccessLevel;
import lombok.RequiredArgsConstructor;
import lombok.Value;

@Value
@RequiredArgsConstructor(access = AccessLevel.PRIVATE)
public class ExponentialBackOffOptions implements NextRetryStrategy.Options {

Duration attemptFrequency;
int backOff;

public static ExponentialBackOffOptions exponential(Duration attemptFrequency, int backOff) {
return new ExponentialBackOffOptions(attemptFrequency, backOff);
}

@Override
public String strategyClassName() {
return ExponentialBackOffStrategy.class.getName();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package com.gruelbox.transactionoutbox;

import java.time.Duration;

public class ExponentialBackOffStrategy implements NextRetryStrategy<ExponentialBackOffOptions> {

@Override
public Duration nextAttemptDelay(
ExponentialBackOffOptions parameters, TransactionOutboxEntry entry) {
return parameters
.getAttemptFrequency()
.multipliedBy(parameters.getBackOff() ^ entry.getAttempts());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,12 @@ static InvocationSerializer createDefaultJsonSerializer() {
* @return The deserialized invocation.
*/
Invocation deserializeInvocation(Reader reader) throws IOException;

default void serializeRetryOptions(NextRetryStrategy.Options retryOptions, Writer writer) {
throw new UnsupportedOperationException("Serialize retry options not implemented");
}

default NextRetryStrategy.Options deserializeRetryOptions(Reader reader) throws IOException {
throw new UnsupportedOperationException("Deserialize retry options not implemented");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.gruelbox.transactionoutbox;

import java.time.Duration;

public interface NextRetryStrategy<T extends NextRetryStrategy.Options> {
Duration nextAttemptDelay(T parameters, TransactionOutboxEntry entry);

interface Options {
String strategyClassName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ abstract class TransactionOutboxBuilder {
protected TransactionManager transactionManager;
protected Instantiator instantiator;
protected Submitter submitter;
protected NextRetryStrategy.Options retryOptions;
protected Duration attemptFrequency;
protected int blockAfterAttempts;
protected int flushBatchSize;
Expand Down Expand Up @@ -213,6 +214,11 @@ public TransactionOutboxBuilder submitter(Submitter submitter) {
return this;
}

public TransactionOutboxBuilder retryOptions(NextRetryStrategy.Options options) {
this.retryOptions = options;
return this;
}

/**
* @param attemptFrequency How often tasks should be re-attempted. This should be balanced with
* {@link #flushBatchSize} and the frequency with which {@link #flush()} is called to
Expand Down Expand Up @@ -348,6 +354,12 @@ interface ParameterizedScheduleBuilder {
*/
ParameterizedScheduleBuilder uniqueRequestId(String uniqueRequestId);

ParameterizedScheduleBuilder retryOptions(NextRetryStrategy.Options options);

default ParameterizedScheduleBuilder attemptFrequency(Duration attemptFrequency) {
return retryOptions(DefaultRetryOptions.withFrequency(attemptFrequency));
}

/**
* Specifies that the request should be applied in a strictly-ordered fashion within the
* specified topic.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ public class TransactionOutboxEntry implements Validatable {
@Setter(AccessLevel.PACKAGE)
private Invocation invocation;

@Getter
@Setter(AccessLevel.PACKAGE)
private NextRetryStrategy.Options retryOptions;

/**
* @param lastAttemptTime The timestamp at which the task was last processed.
* @return The timestamp at which the task was last processed.
Expand Down
Loading
Loading