Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
ece5ac7
wipwipwip
johanandren Jan 22, 2026
afc48ed
es, kv, and workflow test coverage, changes to make it work
johanandren Jan 27, 2026
5ddbc3e
workflow tests passing
johanandren Jan 27, 2026
6fb9a80
component client protobuf input
johanandren Jan 28, 2026
3c15d73
annotation to list es entity event types, serialization handling and …
johanandren Jan 28, 2026
0badef2
multi message type handling for view updaters and consumers
johanandren Feb 2, 2026
2a19d05
allow non-sealed protos
johanandren Feb 6, 2026
ea26eb4
console support by passing protobuf types through SPI
johanandren Feb 6, 2026
a65526c
more proto descriptors through SPI (command handler input and output …
johanandren Feb 9, 2026
9a6ffdc
workflow command/response proto test coverage
johanandren Feb 9, 2026
6978e8c
test coverage for timed action with protobuf input
johanandren Feb 9, 2026
2c63069
test coverage for protobuf in view state, view query param, projected…
johanandren Feb 9, 2026
b6d1050
proto message view state, query input and response
johanandren Feb 9, 2026
39de283
protobuf messages as input and responses from agents
johanandren Feb 9, 2026
56fac16
rebase fixes
johanandren Feb 9, 2026
ada88d2
minimal docs
johanandren Feb 9, 2026
12f7448
some cleanup, optimization of descriptor lookup
johanandren Feb 10, 2026
4ad450e
remove some useless indirection
johanandren Feb 10, 2026
0c34c63
use the same method in streams and regular calls for views, optimize …
johanandren Feb 10, 2026
ed9ff0b
copyrights
johanandren Feb 12, 2026
0eaca07
review feedback
johanandren Feb 16, 2026
d0fbf62
feat: Migration support for consumers and views from Kalix
johanandren Feb 16, 2026
cd306dd
setting was in the wrong block
johanandren Feb 16, 2026
c6abe81
runtime bump
johanandren Feb 16, 2026
d6f1a20
timestamp format docs
johanandren Feb 17, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,28 @@

package akka.javasdk.testkit;

import akka.javasdk.impl.serialization.JsonSerializer;
import akka.javasdk.impl.serialization.Serializer;
import akka.runtime.sdk.spi.BytesPayload;
import java.lang.reflect.Type;

/**
* Internal helper to verify serialization/deserialization of entity events, state, commands, and
* responses. Supports both JSON and Protobuf serialization formats.
*/
final class EntitySerializationChecker {

private static JsonSerializer jsonSerializer = new JsonSerializer();
private static Serializer serializer = new Serializer();

static void verifySerDer(Object object, Object entity) {
try {
BytesPayload bytesPayload = jsonSerializer.toBytes(object);
jsonSerializer.fromBytes(bytesPayload);
BytesPayload bytesPayload = serializer.toBytes(object);
// For protobuf, we need to deserialize with the expected type
// For JSON with type hints, fromBytes(bytesPayload) works
if (serializer.isProtobuf(bytesPayload)) {
serializer.fromBytes(object.getClass(), bytesPayload);
} else {
serializer.fromBytes(bytesPayload);
}
} catch (Exception e) {
fail(object, entity, e);
}
Expand All @@ -24,8 +34,8 @@ static void verifySerDer(Object object, Object entity) {
/** different deserialization for responses, state, and commands */
static void verifySerDerWithExpectedType(Class<?> expectedClass, Object object, Object entity) {
try {
BytesPayload bytesPayload = jsonSerializer.toBytes(object);
jsonSerializer.fromBytes(expectedClass, bytesPayload);
BytesPayload bytesPayload = serializer.toBytes(object);
serializer.fromBytes(expectedClass, bytesPayload);
} catch (Exception e) {
fail(object, entity, e);
}
Expand All @@ -34,8 +44,8 @@ static void verifySerDerWithExpectedType(Class<?> expectedClass, Object object,
/** different deserialization for responses, state, and commands */
static void verifySerDerWithExpectedType(Type expectedType, Object object, Object entity) {
try {
BytesPayload bytesPayload = jsonSerializer.toBytes(object);
jsonSerializer.fromBytes(expectedType, bytesPayload);
BytesPayload bytesPayload = serializer.toBytes(object);
serializer.fromBytes(expectedType, bytesPayload);
} catch (Exception e) {
fail(object, entity, e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,44 @@
import akka.javasdk.testkit.impl.EventSourcedResultImpl;
import akka.javasdk.testkit.impl.TestKitEventSourcedEntityCommandContext;
import akka.javasdk.testkit.impl.TestKitEventSourcedEntityEventContext;
import com.google.protobuf.GeneratedMessageV3;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import scala.jdk.javaapi.CollectionConverters;

/** Extended by generated code, not meant for user extension */
abstract class EventSourcedEntityEffectsRunner<S, E> {

private final Class<?> stateClass;
private final List<Class<? extends GeneratedMessageV3>> allowedProtoEventTypes;
protected EventSourcedEntity<S, E> entity;
private S _state;
private boolean deleted = false;
private List<E> events = new ArrayList<>();

@SuppressWarnings("unchecked")
private static List<Class<? extends GeneratedMessageV3>> getProtoEventTypes(
Class<?> entityClass) {
return new ArrayList<>(
(List<Class<? extends GeneratedMessageV3>>)
(List<?>) CollectionConverters.asJava(Reflect.protoEventTypes(entityClass)));
}

public EventSourcedEntityEffectsRunner(EventSourcedEntity<S, E> entity) {
this.entity = entity;
this.stateClass = Reflect.eventSourcedEntityStateType(entity.getClass());
this.allowedProtoEventTypes = getProtoEventTypes(entity.getClass());
this._state = entity.emptyState();
}

public EventSourcedEntityEffectsRunner(EventSourcedEntity<S, E> entity, S initialState) {
this.entity = entity;
this.stateClass = Reflect.eventSourcedEntityStateType(entity.getClass());
this.allowedProtoEventTypes = getProtoEventTypes(entity.getClass());
verifySerDerWithExpectedType(stateClass, initialState, entity);
this._state = initialState;
}
Expand All @@ -45,6 +59,7 @@ public EventSourcedEntityEffectsRunner(EventSourcedEntity<S, E> entity, List<E>
this.entity = entity;
this._state = entity.emptyState();
this.stateClass = Reflect.eventSourcedEntityStateType(entity.getClass());
this.allowedProtoEventTypes = getProtoEventTypes(entity.getClass());
entity._internalSetCurrentState(this._state, false);
// NB: updates _state
playEventsForEntity(initialEvents);
Expand Down Expand Up @@ -80,6 +95,30 @@ public List<E> getAllEvents() {
return events;
}

/**
* Validates that events are of allowed types when @ProtoEventTypes is used. Throws
* IllegalArgumentException if an event is not one of the declared types.
*/
private void validateProtoEventTypes(List<E> events) {
if (!allowedProtoEventTypes.isEmpty()) {
for (E event : events) {
Class<?> eventClass = event.getClass();
boolean isAllowed =
allowedProtoEventTypes.stream()
.anyMatch(allowed -> allowed.isAssignableFrom(eventClass));
if (!isAllowed) {
String allowedTypesStr =
allowedProtoEventTypes.stream().map(Class::getName).collect(Collectors.joining(", "));
throw new IllegalArgumentException(
String.format(
"Event Sourced Entity [%s] tried to persist event of type [%s] "
+ "which is not declared in @ProtoEventTypes. Allowed types are: [%s]",
entity.getClass().getName(), eventClass.getName(), allowedTypesStr));
}
}
}
}

/**
* creates a command context to run the commands, then creates an event context to run the events,
* and finally, creates a command context to run the side effects. It cleans each context after
Expand All @@ -100,7 +139,10 @@ protected <R> EventSourcedResult<R> interpretEffects(
entity._internalSetCommandContext(Optional.of(commandContext));
entity._internalSetCurrentState(this._state, this.deleted);
effectExecuted = effect.get();
this.events.addAll(EventSourcedResultImpl.eventsOf(effectExecuted));
// Validate proto event types before adding events
List<E> newEvents = EventSourcedResultImpl.eventsOf(effectExecuted);
validateProtoEventTypes(newEvents);
this.events.addAll(newEvents);
} finally {
entity._internalSetCommandContext(Optional.empty());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import akka.actor.typed.ActorSystem;
import akka.annotation.InternalApi;
import akka.javasdk.Metadata;
import akka.javasdk.impl.serialization.JsonSerializer;
import akka.javasdk.impl.serialization.Serializer;
import akka.javasdk.testkit.impl.EventingTestKitImpl;
import akka.javasdk.testkit.impl.OutgoingMessagesImpl;
import akka.javasdk.testkit.impl.TestKitMessageImpl;
Expand All @@ -20,7 +20,7 @@ public interface EventingTestKit {
/** INTERNAL API */
@InternalApi
static EventingTestKit start(
ActorSystem<?> system, String host, int port, JsonSerializer serializer) {
ActorSystem<?> system, String host, int port, Serializer serializer) {
return EventingTestKitImpl.start(system, host, port, serializer);
}

Expand Down Expand Up @@ -213,9 +213,9 @@ interface OutgoingMessages {
}

class MessageBuilder {
private final JsonSerializer serializer;
private final Serializer serializer;

public MessageBuilder(JsonSerializer serializer) {
public MessageBuilder(Serializer serializer) {
this.serializer = serializer;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,46 @@

package akka.javasdk.testkit;

import akka.javasdk.impl.serialization.JsonSerializer;
import akka.javasdk.impl.serialization.Serializer;
import akka.runtime.sdk.spi.BytesPayload;
import akka.util.ByteString;
import com.fasterxml.jackson.core.JsonProcessingException;
import java.io.IOException;

/** Helper class for serializing and deserializing objects for testing schema migration. */
/**
* Helper class for serializing and deserializing objects for testing schema migration. Supports
* both JSON and Protobuf serialization formats.
*/
public final class SerializationTestkit {

private record SerializedPayload(String contentType, byte[] bytes) {}

private static final JsonSerializer jsonSerializer = new JsonSerializer();
private static final Serializer serializer = new Serializer();

/**
* Serialize a value to bytes. Automatically uses protobuf format for protobuf messages and JSON
* format for other types.
*/
public static <T> byte[] serialize(T value) {
BytesPayload bytesPayload = jsonSerializer.toBytes(value);
BytesPayload bytesPayload = serializer.toBytes(value);
SerializedPayload serializedPayload =
new SerializedPayload(bytesPayload.contentType(), bytesPayload.bytes().toArray());
try {
return jsonSerializer.objectMapper().writeValueAsBytes(serializedPayload);
return serializer.json().objectMapper().writeValueAsBytes(serializedPayload);
} catch (JsonProcessingException e) {
throw new RuntimeException("Unexpected serialization error", e);
}
}

/**
* Deserialize bytes to the expected type. Automatically detects the content type and uses the
* appropriate deserializer.
*/
public static <T> T deserialize(Class<T> valueClass, byte[] bytes) {
try {
SerializedPayload serializedPayload =
jsonSerializer.objectMapper().readValue(bytes, SerializedPayload.class);
return jsonSerializer.fromBytes(
serializer.json().objectMapper().readValue(bytes, SerializedPayload.class);
return serializer.fromBytes(
valueClass,
new BytesPayload(
ByteString.fromArray(serializedPayload.bytes), serializedPayload.contentType));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import akka.javasdk.impl.client.ComponentClientImpl;
import akka.javasdk.impl.grpc.GrpcClientProviderImpl;
import akka.javasdk.impl.http.HttpClientImpl;
import akka.javasdk.impl.serialization.JsonSerializer;
import akka.javasdk.impl.serialization.Serializer;
import akka.javasdk.impl.timer.TimerSchedulerImpl;
import akka.javasdk.keyvalueentity.KeyValueEntity;
import akka.javasdk.testkit.EventingTestKit.IncomingMessages;
Expand Down Expand Up @@ -689,7 +689,7 @@ private void startEventingTestkit() {
// after discovery happens
eventingTestKit =
EventingTestKit.start(
runtimeActorSystem, "0.0.0.0", eventingTestKitPort, new JsonSerializer());
runtimeActorSystem, "0.0.0.0", eventingTestKitPort, new Serializer());
}
}

Expand Down Expand Up @@ -774,7 +774,7 @@ public SpiSettings getSettings() {
final Sdk.StartupContext startupContext =
runner.started().toCompletableFuture().get(20, TimeUnit.SECONDS);
final ComponentClients componentClients = startupContext.componentClients();
final JsonSerializer serializer = startupContext.serializer();
final Serializer serializer = startupContext.serializer();
dependencyProvider =
Optional.ofNullable(startupContext.dependencyProvider().getOrElse(() -> null));
sanitizer = startupContext.sanitizer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import akka.javasdk.JsonSupport
import akka.javasdk.Metadata.{ MetadataEntry => SdkMetadataEntry }
import akka.javasdk.impl.AnySupport
import akka.javasdk.impl.MetadataImpl
import akka.javasdk.impl.serialization.JsonSerializer
import akka.javasdk.impl.serialization.Serializer
import akka.javasdk.testkit.EventingTestKit
import akka.javasdk.testkit.EventingTestKit.IncomingMessages
import akka.javasdk.testkit.EventingTestKit.OutgoingMessages
Expand Down Expand Up @@ -79,7 +79,7 @@ object EventingTestKitImpl {
* The returned testkit can be used to expect and emit events to the proxy as if they came from an actual pub/sub
* event backend.
*/
def start(system: ActorSystem[_], host: String, port: Int, serializer: JsonSerializer): EventingTestKit = {
def start(system: ActorSystem[_], host: String, port: Int, serializer: Serializer): EventingTestKit = {

// Create service handlers
val service = new EventingTestServiceImpl(system, host, port, serializer)
Expand Down Expand Up @@ -161,7 +161,7 @@ object EventingTestKitImpl {
* Implements the EventingTestKit protocol originally defined in proxy
* protocols/testkit/src/main/protobuf/eventing_test_backend.proto
*/
final class EventingTestServiceImpl(system: ActorSystem[_], val host: String, var port: Int, serializer: JsonSerializer)
final class EventingTestServiceImpl(system: ActorSystem[_], val host: String, var port: Int, serializer: Serializer)
extends EventingTestKit {

private val log = LoggerFactory.getLogger(classOf[EventingTestServiceImpl])
Expand Down Expand Up @@ -297,7 +297,7 @@ final class EventingTestServiceImpl(system: ActorSystem[_], val host: String, va
}
}

private[testkit] class IncomingMessagesImpl(val sourcesHolder: ActorRef, val serializer: JsonSerializer)
private[testkit] class IncomingMessagesImpl(val sourcesHolder: ActorRef, val serializer: Serializer)
extends IncomingMessages {

def addSourceProbe(runningSourceProbe: RunningSourceProbe): Unit = {
Expand Down Expand Up @@ -344,9 +344,7 @@ private[testkit] class IncomingMessagesImpl(val sourcesHolder: ActorRef, val ser
"Publishing a delete message is supported only for ValueEntity messages.")
}

private[testkit] class VeIncomingMessagesImpl(
override val sourcesHolder: ActorRef,
override val serializer: JsonSerializer)
private[testkit] class VeIncomingMessagesImpl(override val sourcesHolder: ActorRef, override val serializer: Serializer)
extends IncomingMessagesImpl(sourcesHolder, serializer) {

override def publishDelete(subject: String): Unit = {
Expand All @@ -362,7 +360,7 @@ private[testkit] class VeIncomingMessagesImpl(

private[testkit] class OutgoingMessagesImpl(
private[testkit] val destinationProbe: TestProbe,
protected val serializer: JsonSerializer)
protected val serializer: Serializer)
extends OutgoingMessages {
import EventingTestKitImpl.metadataToSpi

Expand Down Expand Up @@ -474,7 +472,7 @@ private[testkit] object TestKitMessageImpl {
TestKitMessageImpl[ByteString](m.payload, metadata).asInstanceOf[TestKitMessage[ByteString]]
}

def defaultMetadata(message: Any, subject: String, serializer: JsonSerializer): SdkMetadata = {
def defaultMetadata(message: Any, subject: String, serializer: Serializer): SdkMetadata = {
val (contentType, ceType) = message match {
case pbMsg: GeneratedMessageV3 =>
val desc = pbMsg.getDescriptorForType
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright (C) 2021-2026 Lightbend Inc. <https://www.lightbend.com>
*/

package akka.javasdk.testkit.eventsourced;

import akka.javasdk.annotations.Component;
import akka.javasdk.annotations.ProtoEventTypes;
import akka.javasdk.eventsourcedentity.EventSourcedEntity;
import com.google.protobuf.Duration;
import com.google.protobuf.GeneratedMessageV3;
import com.google.protobuf.StringValue;
import com.google.protobuf.Timestamp;

/**
* Event sourced entity with @ProtoEventTypes for testing validation in the testkit. Uses standard
* Google protobuf types so no custom proto files are needed.
*/
@Component(id = "test-proto-entity")
@ProtoEventTypes({StringValue.class, Duration.class})
public class ProtoEventSourcedEntity extends EventSourcedEntity<String, GeneratedMessageV3> {

@Override
public String emptyState() {
return "";
}

@Override
public String applyEvent(GeneratedMessageV3 event) {
return switch (event) {
case StringValue sv -> sv.getValue();
case Duration d -> String.valueOf(d.getSeconds());
default -> throw new IllegalArgumentException("Unknown event type: " + event.getClass());
};
}

/** Command handler that persists an allowed event type (StringValue). */
public Effect<String> persistAllowedEvent(String value) {
return effects().persist(StringValue.of(value)).thenReply(__ -> "ok");
}

/** Command handler that persists another allowed event type (Duration). */
public Effect<String> persistAllowedEventDuration(long seconds) {
return effects()
.persist(Duration.newBuilder().setSeconds(seconds).build())
.thenReply(__ -> "ok");
}

/**
* Command handler that tries to persist an event type NOT in @ProtoEventTypes. Timestamp is NOT
* in the annotation, so this should fail with validation error.
*/
public Effect<String> persistNotAllowedEvent(long seconds) {
return effects()
.persist(Timestamp.newBuilder().setSeconds(seconds).build())
.thenReply(__ -> "should not reach here");
}
}
Loading
Loading