Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ static Builder builder(String name) {
@Getter private final String delete;
@Getter private final String selectBatch;
@Getter private final String lock;
@Getter private final String lockBatch;
@Getter private final String checkSql;
@Getter private final String fetchNextInAllTopics;
@Getter private final String fetchNextInSelectedTopics;
@Getter private final String fetchNextBatchInTopics;
@Getter private final String fetchCurrentVersion;
@Getter private final String fetchNextSequence;
private final Collection<Migration> migrations;
Expand Down Expand Up @@ -69,6 +71,8 @@ static final class Builder {
+ "AND blocked = false AND processed = false AND topic = '*' LIMIT {{batchSize}}";
private String lock =
"SELECT id, invocation FROM {{table}} WHERE id = ? AND version = ? FOR UPDATE";
private String lockBatch =
"SELECT id, version, invocation FROM {{table}} WHERE (id, version) IN ({{placeholders}}) FOR UPDATE";
private String checkSql = "SELECT 1";
private Map<Integer, Migration> migrations;
private Function<Boolean, String> booleanValueFrom;
Expand All @@ -85,6 +89,13 @@ static final class Builder {
+ " AND seq = ("
+ "SELECT MIN(seq) FROM {{table}} b WHERE b.topic=a.topic AND b.processed = false"
+ ") LIMIT {{batchSize}}";
private String fetchNextBatchInTopics =
"WITH raw AS ("
+ " SELECT {{allFields}}, ROW_NUMBER() OVER (PARTITION BY topic ORDER BY seq) as rn"
+ " FROM {{table}}"
+ " WHERE processed = false AND topic <> '*'"
+ ")"
+ " SELECT * FROM raw WHERE rn <= {{batchSize}} AND nextAttemptTime < ?";
private String fetchCurrentVersion = "SELECT version FROM TXNO_VERSION FOR UPDATE";
private String fetchNextSequence = "SELECT seq FROM TXNO_SEQUENCE WHERE topic = ? FOR UPDATE";

Expand Down Expand Up @@ -188,9 +199,11 @@ Dialect build() {
delete,
selectBatch,
lock,
lockBatch,
checkSql,
fetchNextInAllTopics,
fetchNextInSelectedTopics,
fetchNextBatchInTopics,
fetchCurrentVersion,
fetchNextSequence,
migrations.values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import lombok.AccessLevel;
import lombok.AllArgsConstructor;
Expand Down Expand Up @@ -218,6 +220,33 @@ public void delete(Transaction tx, TransactionOutboxEntry entry) throws Exceptio
}
}

@Override
public void deleteBatch(Transaction tx, List<TransactionOutboxEntry> entries) throws Exception {
if (entries == null || entries.isEmpty()) {
return;
}

try (PreparedStatement stmt =
tx.connection().prepareStatement(dialect.getDelete().replace("{{table}}", tableName))) {

for (TransactionOutboxEntry entry : entries) {
stmt.setString(1, entry.getId());
stmt.setInt(2, entry.getVersion());
stmt.addBatch();
}

int[] results = stmt.executeBatch();

for (int i = 0; i < results.length; i++) {
if (results[i] != 1) {
throw new OptimisticLockException();
}
log.debug("Batch deleted {}", entries.get(i).description());
}
log.debug("Batch deleted {} entries", results.length);
}
}

@Override
public void update(Transaction tx, TransactionOutboxEntry entry) throws Exception {
//noinspection resource
Expand Down Expand Up @@ -248,6 +277,45 @@ public void update(Transaction tx, TransactionOutboxEntry entry) throws Exceptio
}
}

@Override
public void updateBatch(Transaction tx, List<TransactionOutboxEntry> entries) throws Exception {
try (PreparedStatement stmt =
tx.connection()
.prepareStatement(
"UPDATE "
+ tableName
+ " SET "
+ "lastAttemptTime = ?, nextAttemptTime = ?, attempts = ?, "
+ "blocked = ?, processed = ?, version = ? "
+ "WHERE id = ? AND version = ?")) {

for (TransactionOutboxEntry entry : entries) {
stmt.setTimestamp(
1,
entry.getLastAttemptTime() == null ? null : Timestamp.from(entry.getLastAttemptTime()));
stmt.setTimestamp(2, Timestamp.from(entry.getNextAttemptTime()));
stmt.setInt(3, entry.getAttempts());
stmt.setBoolean(4, entry.isBlocked());
stmt.setBoolean(5, entry.isProcessed());
stmt.setInt(6, entry.getVersion() + 1);
stmt.setString(7, entry.getId());
stmt.setInt(8, entry.getVersion());
stmt.addBatch();
}

int[] results = stmt.executeBatch();

for (int i = 0; i < results.length; i++) {
if (results[i] != 1) {
throw new OptimisticLockException();
}
entries.get(i).setVersion(entries.get(i).getVersion() + 1);
log.debug("Batch updated {}", entries.get(i).description());
}
log.debug("Batch updated {} entries", results.length);
}
}

@Override
public boolean lock(Transaction tx, TransactionOutboxEntry entry) throws Exception {
//noinspection resource
Expand Down Expand Up @@ -281,6 +349,71 @@ public boolean lock(Transaction tx, TransactionOutboxEntry entry) throws Excepti
}
}

@Override
public boolean lockBatch(Transaction tx, List<TransactionOutboxEntry> entries) throws Exception {
if (entries == null || entries.isEmpty()) {
return true; // Nothing to lock is considered success
}

// Create placeholders for each entry
String placeholders = entries.stream().map(e -> "(?, ?)").collect(Collectors.joining(", "));

// Get the SQL from the dialect, replacing the placeholders
String sql =
dialect
.getLockBatch()
.replace("{{table}}", tableName)
.replace("{{placeholders}}", placeholders);

try (PreparedStatement stmt = tx.connection().prepareStatement(sql)) {
// Set parameters for each entry
int paramIndex = 1;
for (TransactionOutboxEntry entry : entries) {
stmt.setString(paramIndex++, entry.getId());
stmt.setInt(paramIndex++, entry.getVersion());
}

stmt.setQueryTimeout(writeLockTimeoutSeconds);

try {
try (ResultSet rs = stmt.executeQuery()) {
// Build a map of id -> deserialized invocation
Map<String, Invocation> invocationsById = new HashMap<>();

while (rs.next()) {
String id = rs.getString("id");
try (Reader invocationStream = rs.getCharacterStream("invocation")) {
Invocation invocation = serializer.deserializeInvocation(invocationStream);
invocationsById.put(id, invocation);
}
}

// If we didn't get all entries, return false
if (invocationsById.size() != entries.size()) {
log.debug(
"Could only lock {} out of {} entries", invocationsById.size(), entries.size());
return false;
}

// Update each entry with its deserialized invocation
for (TransactionOutboxEntry entry : entries) {
Invocation invocation = invocationsById.get(entry.getId());
if (invocation == null) {
log.error("Could not find result for entry {}", entry.getId());
return false;
}
entry.setInvocation(invocation);
}

return true;
}
} catch (SQLTimeoutException e) {
log.debug("Lock attempt timed out on batch of {} entries", entries.size());
return false;
}
}
}

@Override
public boolean unblock(Transaction tx, String entryId) throws Exception {
//noinspection resource
Expand Down Expand Up @@ -366,6 +499,35 @@ public Collection<TransactionOutboxEntry> selectNextInSelectedTopics(
}
}

/**
* Selects the next batch of entries in topics, maintaining order within each topic. This method
* is used for ordered batch processing and returns multiple entries per topic up to the batch
* size limit.
*
* @param tx The current transaction
* @param batchSize The maximum number of entries to return per topic
* @param now The current time
* @return A collection of entries ordered by topic and sequence
* @throws Exception If an error occurs during selection
*/
public Collection<TransactionOutboxEntry> selectNextBatchInTopics(
Transaction tx, int batchSize, Instant now) throws Exception {
var sql =
dialect
.getFetchNextBatchInTopics()
.replace("{{table}}", tableName)
.replace("{{batchSize}}", Integer.toString(batchSize))
.replace("{{allFields}}", ALL_FIELDS);
log.debug("SQL: {}", sql);
//noinspection resource
try (PreparedStatement stmt = tx.connection().prepareStatement(sql)) {
stmt.setTimestamp(1, Timestamp.from(now));
var results = new ArrayList<TransactionOutboxEntry>();
gatherResults(stmt, results);
return results;
}
}

@Override
public int deleteProcessedAndExpired(Transaction tx, int batchSize, Instant now)
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ public interface Dialect {

String getLock();

/**
* @return Format string for the SQL required to lock a batch of entries using a single statement.
*/
String getLockBatch();

String getCheckSql();

String getFetchNextInAllTopics();
Expand All @@ -28,6 +33,12 @@ public interface Dialect {

String getFetchNextSequence();

/**
* @return Format string for the SQL required to fetch the next batch of ordered items in topics.
* This query should return items ordered by topic and sequence, with a limit per topic.
*/
String getFetchNextBatchInTopics();

String booleanValue(boolean criteriaValue);

void createVersionTableIfNotExists(Connection connection) throws SQLException;
Expand Down Expand Up @@ -60,6 +71,9 @@ public interface Dialect {
.lock(
"SELECT id, invocation FROM {{table}} WHERE id = ? AND version = ? FOR "
+ "UPDATE SKIP LOCKED")
.lockBatch(
"SELECT id, version, invocation FROM {{table}} WHERE (id, version) IN ({{placeholders}}) FOR "
+ "UPDATE SKIP LOCKED")
.changeMigration(
13,
"ALTER TABLE TXNO_OUTBOX MODIFY COLUMN invocation mediumtext CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci")
Expand All @@ -74,6 +88,13 @@ public interface Dialect {
"WITH raw AS(SELECT {{allFields}}, (ROW_NUMBER() OVER(PARTITION BY topic ORDER BY seq)) as rn"
+ " FROM {{table}} WHERE processed = false AND topic IN ({{topicNames}}))"
+ " SELECT * FROM raw WHERE rn = 1 AND nextAttemptTime < ? LIMIT {{batchSize}}")
.fetchNextBatchInTopics(
"WITH raw AS ("
+ " SELECT {{allFields}}, ROW_NUMBER() OVER (PARTITION BY topic ORDER BY seq) as rn"
+ " FROM {{table}}"
+ " WHERE processed = false AND topic <> '*'"
+ ")"
+ " SELECT * FROM raw WHERE rn <= {{batchSize}} AND nextAttemptTime < ?")
.deleteExpired(
"DELETE FROM {{table}} WHERE id IN "
+ "(SELECT id FROM {{table}} WHERE nextAttemptTime < ? AND processed = true AND blocked = false LIMIT {{batchSize}})")
Expand All @@ -84,6 +105,9 @@ public interface Dialect {
.lock(
"SELECT id, invocation FROM {{table}} WHERE id = ? AND version = ? FOR "
+ "UPDATE SKIP LOCKED")
.lockBatch(
"SELECT id, version, invocation FROM {{table}} WHERE (id, version) IN ({{placeholders}}) FOR "
+ "UPDATE SKIP LOCKED")
.changeMigration(
5, "ALTER TABLE TXNO_OUTBOX ALTER COLUMN uniqueRequestId TYPE VARCHAR(250)")
.changeMigration(6, "ALTER TABLE TXNO_OUTBOX RENAME COLUMN blacklisted TO blocked")
Expand Down Expand Up @@ -117,6 +141,9 @@ public interface Dialect {
.lock(
"SELECT id, invocation FROM {{table}} WHERE id = ? AND version = ? FOR "
+ "UPDATE SKIP LOCKED")
.lockBatch(
"SELECT id, version, invocation FROM {{table}} WHERE (id, version) IN ({{placeholders}}) FOR "
+ "UPDATE SKIP LOCKED")
.checkSql("SELECT 1 FROM DUAL")
.changeMigration(
1,
Expand Down Expand Up @@ -160,6 +187,8 @@ public interface Dialect {
DefaultDialect.builder("MS_SQL_SERVER")
.lock(
"SELECT id, invocation FROM {{table}} WITH (UPDLOCK, ROWLOCK, READPAST) WHERE id = ? AND version = ?")
.lockBatch(
"SELECT id, version, invocation FROM {{table}} WITH (UPDLOCK, ROWLOCK, READPAST) WHERE (id, version) IN ({{placeholders}})")
.selectBatch(
"SELECT TOP ({{batchSize}}) {{allFields}} FROM {{table}} "
+ "WITH (UPDLOCK, ROWLOCK, READPAST) WHERE nextAttemptTime < ? AND topic = '*' "
Expand Down
Loading