Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import java.time.Duration;
import java.util.concurrent.Executor;
import java.util.function.Supplier;

import com.gruelbox.transactionoutbox.spi.AbstractProxyFactory;
import lombok.ToString;
import org.slf4j.MDC;
import org.slf4j.event.Level;
Expand Down Expand Up @@ -142,6 +144,7 @@ abstract class TransactionOutboxBuilder {
protected TransactionManager transactionManager;
protected Instantiator instantiator;
protected Submitter submitter;
protected AbstractProxyFactory proxyFactory;
protected Duration attemptFrequency;
protected int blockAfterAttempts;
protected int flushBatchSize;
Expand Down Expand Up @@ -189,6 +192,16 @@ public TransactionOutboxBuilder submitter(Submitter submitter) {
return this;
}

/**
* @param proxyFactory Used for proxy object creation
*
* @return Builder.
*/
public TransactionOutboxBuilder proxyFactory(AbstractProxyFactory proxyFactory) {
this.proxyFactory = proxyFactory;
return this;
}

/**
* @param attemptFrequency How often tasks should be re-attempted. This should be balanced with
* {@link #flushBatchSize} and the frequency with which {@link #flush()} is called to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static java.time.temporal.ChronoUnit.MILLIS;
import static java.time.temporal.ChronoUnit.MINUTES;

import com.gruelbox.transactionoutbox.spi.AbstractProxyFactory;
import com.gruelbox.transactionoutbox.spi.ProxyFactory;
import com.gruelbox.transactionoutbox.spi.Utils;
import java.lang.reflect.InvocationTargetException;
Expand Down Expand Up @@ -46,7 +47,7 @@ final class TransactionOutboxImpl implements TransactionOutbox, Validatable {
private final Validator validator;
private final Duration retentionThreshold;
private final AtomicBoolean initialized = new AtomicBoolean();
private final ProxyFactory proxyFactory = new ProxyFactory();
private final AbstractProxyFactory proxyFactory;
private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

@Override
Expand All @@ -62,6 +63,7 @@ public void validate(Validator validator) {
validator.notNull("clockProvider", clockProvider);
validator.notNull("listener", listener);
validator.notNull("retentionThreshold", retentionThreshold);
validator.notNull("proxyFactory", proxyFactory);
}

static TransactionOutboxBuilder builder() {
Expand Down Expand Up @@ -431,7 +433,8 @@ public TransactionOutboxImpl build() {
Utils.firstNonNull(listener, () -> TransactionOutboxListener.EMPTY),
serializeMdc == null || serializeMdc,
validator,
retentionThreshold == null ? Duration.ofDays(7) : retentionThreshold);
retentionThreshold == null ? Duration.ofDays(7) : retentionThreshold,
Utils.firstNonNull(proxyFactory, ProxyFactory::new));
validator.validate(impl);
if (initializeImmediately == null || initializeImmediately) {
impl.initialize();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package com.gruelbox.transactionoutbox.spi;

import com.gruelbox.transactionoutbox.MissingOptionalDependencyException;
import lombok.extern.slf4j.Slf4j;
import net.bytebuddy.TypeCache;
import org.objenesis.Objenesis;
import org.objenesis.ObjenesisStd;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.util.concurrent.Callable;
import java.util.function.BiFunction;

import static java.lang.reflect.Proxy.newProxyInstance;
import static java.util.Optional.ofNullable;

/**
* @author Ilya Viaznin
*/
@Slf4j
public abstract class AbstractProxyFactory {

private final Objenesis objenesis = setupObjenesis();

private final TypeCache<Class<?>> byteBuddyCache = setupByteBuddyCache();

private static boolean hasDefaultConstructor(Class<?> clazz) {
try {
clazz.getConstructor();
return true;
} catch (NoSuchMethodException e) {
return false;
}
}

protected abstract <T> Callable<Class<?>> byteBuddyProxyCallable(Class<T> clazz);

@SuppressWarnings({"unchecked", "cast"})
public <T> T createProxy(Class<T> clazz, BiFunction<Method, Object[], T> processor) {
return clazz.isInterface()
// Fastest, we can just proxy an interface directly
? (T) newProxyInstance(clazz.getClassLoader(),
new Class[]{clazz},
(proxy, method, args) -> processor.apply(method, args))
: constructProxy(clazz, processor, buildByteBuddyProxyClass(clazz));
}

protected <T> T constructProxy(Class<T> clazz, BiFunction<Method, Object[], T> processor, Class<? extends T> proxy) {
final T instance;
if (hasDefaultConstructor(clazz)) instance = Utils.uncheckedly(() -> proxy.getDeclaredConstructor().newInstance());
else {
if (objenesis == null) throw new MissingOptionalDependencyException("org.objenesis", "objenesis");

var instantiator = objenesis.getInstantiatorOf(proxy);
instance = instantiator.newInstance();
}
Utils.uncheck(
() -> {
var field = instance.getClass().getDeclaredField("handler");
field.set(
instance,
(InvocationHandler) (proxy1, method, args) -> processor.apply(method, args));
});

return instance;
}

@SuppressWarnings({"unchecked", "cast"})
protected <T> Class<? extends T> buildByteBuddyProxyClass(Class<T> clazz) {
return (Class<? extends T>)
ofNullable(byteBuddyCache)
.orElseThrow(() -> new MissingOptionalDependencyException("net.bytebuddy", "byte-buddy"))
.findOrInsert(
clazz.getClassLoader(),
clazz,
byteBuddyProxyCallable(clazz)
);
}

private TypeCache<Class<?>> setupByteBuddyCache() {
try {
return new TypeCache<>(TypeCache.Sort.WEAK);
} catch (NoClassDefFoundError error) {
log.info("ByteBuddy is not on the classpath, so only interfaces can be used with transaction-outbox");
return null;
}
}

private ObjenesisStd setupObjenesis() {
try {
return new ObjenesisStd();
} catch (NoClassDefFoundError error) {
log.info("Objenesis is not on the classpath, so only interfaces or classes with default constructors can be used with transaction-outbox");
return null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.gruelbox.transactionoutbox.spi;

import lombok.SneakyThrows;
import net.bytebuddy.ByteBuddy;
import net.bytebuddy.description.modifier.Visibility;
import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
import net.bytebuddy.implementation.InvocationHandlerAdapter;
import net.bytebuddy.matcher.ElementMatchers;

import java.lang.invoke.MethodHandles;
import java.lang.reflect.InvocationHandler;
import java.util.concurrent.Callable;

/**
* Proxy factory that uses lookup load method in bytebuddy
* <p>
* Use for JDK 17+
*
* @author Ilya Viaznin
*/
public class LookupProxyFactory extends AbstractProxyFactory {

@Override
@SneakyThrows
protected <T> Callable<Class<?>> byteBuddyProxyCallable(Class<T> clazz) {
var lookup = MethodHandles.privateLookupIn(clazz, MethodHandles.lookup());
return () -> new ByteBuddy()
.subclass(clazz)
.defineField("handler", InvocationHandler.class, Visibility.PUBLIC)
.method(ElementMatchers.isDeclaredBy(clazz))
.intercept(InvocationHandlerAdapter.toField("handler"))
.make()
.load(clazz.getClassLoader(), ClassLoadingStrategy.UsingLookup.of(lookup))
.getLoaded();
}
}
Original file line number Diff line number Diff line change
@@ -1,111 +1,25 @@
package com.gruelbox.transactionoutbox.spi;

import com.gruelbox.transactionoutbox.MissingOptionalDependencyException;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.function.BiFunction;
import lombok.extern.slf4j.Slf4j;
import net.bytebuddy.ByteBuddy;
import net.bytebuddy.TypeCache;
import net.bytebuddy.TypeCache.Sort;
import net.bytebuddy.description.modifier.Visibility;
import net.bytebuddy.dynamic.loading.ClassLoadingStrategy;
import net.bytebuddy.implementation.InvocationHandlerAdapter;
import net.bytebuddy.matcher.ElementMatchers;
import org.objenesis.Objenesis;
import org.objenesis.ObjenesisStd;
import org.objenesis.instantiator.ObjectInstantiator;

@Slf4j
public class ProxyFactory {

private final Objenesis objenesis = setupObjenesis();
private final TypeCache<Class<?>> byteBuddyCache = setupByteBuddyCache();

private static boolean hasDefaultConstructor(Class<?> clazz) {
try {
clazz.getConstructor();
return true;
} catch (NoSuchMethodException e) {
return false;
}
}

private TypeCache<Class<?>> setupByteBuddyCache() {
try {
return new TypeCache<>(Sort.WEAK);
} catch (NoClassDefFoundError error) {
log.info(
"ByteBuddy is not on the classpath, so only interfaces can be used with transaction-outbox");
return null;
}
}

private ObjenesisStd setupObjenesis() {
try {
return new ObjenesisStd();
} catch (NoClassDefFoundError error) {
log.info(
"Objenesis is not on the classpath, so only interfaces or classes with default constructors can be used with transaction-outbox");
return null;
}
}

@SuppressWarnings({"unchecked", "cast"})
public <T> T createProxy(Class<T> clazz, BiFunction<Method, Object[], T> processor) {
if (clazz.isInterface()) {
// Fastest - we can just proxy an interface directly
return (T)
Proxy.newProxyInstance(
clazz.getClassLoader(),
new Class[] {clazz},
(proxy, method, args) -> processor.apply(method, args));
} else {
Class<? extends T> proxy = buildByteBuddyProxyClass(clazz);
return constructProxy(clazz, processor, proxy);
}
}

private <T> T constructProxy(
Class<T> clazz, BiFunction<Method, Object[], T> processor, Class<? extends T> proxy) {
final T instance;
if (hasDefaultConstructor(clazz)) {
instance = Utils.uncheckedly(() -> proxy.getDeclaredConstructor().newInstance());
} else {
if (objenesis == null) {
throw new MissingOptionalDependencyException("org.objenesis", "objenesis");
}
ObjectInstantiator<? extends T> instantiator = objenesis.getInstantiatorOf(proxy);
instance = instantiator.newInstance();
}
Utils.uncheck(
() -> {
var field = instance.getClass().getDeclaredField("handler");
field.set(
instance,
(InvocationHandler) (proxy1, method, args) -> processor.apply(method, args));
});
return instance;
}

@SuppressWarnings({"unchecked", "cast"})
private <T> Class<? extends T> buildByteBuddyProxyClass(Class<T> clazz) {
if (byteBuddyCache == null) {
throw new MissingOptionalDependencyException("net.bytebuddy", "byte-buddy");
import java.lang.reflect.InvocationHandler;
import java.util.concurrent.Callable;

public class ProxyFactory extends AbstractProxyFactory {

@Override
protected <T> Callable<Class<?>> byteBuddyProxyCallable(Class<T> clazz) {
return () -> new ByteBuddy()
.subclass(clazz)
.defineField("handler", InvocationHandler.class, Visibility.PUBLIC)
.method(ElementMatchers.isDeclaredBy(clazz))
.intercept(InvocationHandlerAdapter.toField("handler"))
.make()
.load(clazz.getClassLoader(), ClassLoadingStrategy.Default.INJECTION)
.getLoaded();
}
return (Class<? extends T>)
byteBuddyCache.findOrInsert(
clazz.getClassLoader(),
clazz,
() ->
new ByteBuddy()
.subclass(clazz)
.defineField("handler", InvocationHandler.class, Visibility.PUBLIC)
.method(ElementMatchers.isDeclaredBy(clazz))
.intercept(InvocationHandlerAdapter.toField("handler"))
.make()
.load(clazz.getClassLoader(), ClassLoadingStrategy.Default.INJECTION)
.getLoaded());
}
}
Loading