diff --git a/src/main/java/li/cil/oc2/api/README.md b/src/main/java/li/cil/oc2/api/README.md index 3f47be16..48b4d335 100644 --- a/src/main/java/li/cil/oc2/api/README.md +++ b/src/main/java/li/cil/oc2/api/README.md @@ -77,12 +77,28 @@ This can be useful for various things. For example: - Close and delete the file in `unmount()`. - Close the file in `suspend()`. -### No Active Back-channel - -Unlike some other computer mods (e.g. OpenComputers and ComputerCraft), there is no *active* back-channel in the -`RPCDevice` API. In other words, it is not possible for `RPCDevices` to raise events in the virtual machines. The only -way to provide data to the virtual machines is as values returned from exposed methods. Programs running in the virtual -machines will always have to poll for changed data. +### Subscriptions and Events + +The `RPCDevice` API also lets devices raise events in the virtual machine, by implementing the `RPCEventSource` +interface. If using `ObjectDevice`, it will automatically notice if the wrapped object implements `RPCEventSource`; if +implementing `RPCDevice` directly you can either implement `RPCEventSource` on the same type, or override the +`asEventSource` method to return the actual event source. The VM will then be able to subscribe to events from the +device, which can be sent by calling `postEvent` on the `IEventSink` passed to `subscribe`. An example of this is +the built-in `RedstoneInterfaceBlockEntity` device. + +#### Note: This is mostly a new feature and there are several caveats to keep in mind when using it. + +- If subscriptions are used on or before OC2R version 2.2.12, they can cause a server crash if too many messages are + sent at once. To be safe, do not send any event in the same tick as another message, or depend on a later minimum + version of OC2R. +- Right now, if a lot of messages are sent and the VM does not listen for them, some may be dropped; in theory the VM + can detect this, but none of the existing libraries do (see next point). You cannot depend on the events being + reliably delivered, and must use some other communication channel if reliability is a requirement. A method call + should not have this issue unless the results and all the (subscribed to) events in that tick put together are more + than 4 KiB, though be aware that if the method call causes a state change, that might itself cause some events. +- The on-the-"wire" event and subscription format is probably stable, but the VM's userspace python and lua libraries do + not easily support them yet, and often silently discard events when expecting a different sort of message. Better + support is actively being worked on. ## The `BlockDeviceProvider` and `ItemDeviceProvider` diff --git a/src/main/java/li/cil/oc2/api/bus/device/object/ObjectDevice.java b/src/main/java/li/cil/oc2/api/bus/device/object/ObjectDevice.java index 920272eb..5b2d7d20 100644 --- a/src/main/java/li/cil/oc2/api/bus/device/object/ObjectDevice.java +++ b/src/main/java/li/cil/oc2/api/bus/device/object/ObjectDevice.java @@ -84,6 +84,7 @@ public ObjectDevice(final Object object) { } /////////////////////////////////////////////////////////////////// + @Override public RPCEventSource asEventSource() { if (object instanceof RPCEventSource res) { return res; diff --git a/src/main/java/li/cil/oc2/api/bus/device/rpc/IEventSink.java b/src/main/java/li/cil/oc2/api/bus/device/rpc/IEventSink.java index 055eab6e..4cd68050 100644 --- a/src/main/java/li/cil/oc2/api/bus/device/rpc/IEventSink.java +++ b/src/main/java/li/cil/oc2/api/bus/device/rpc/IEventSink.java @@ -1,7 +1,6 @@ /* SPDX-License-Identifier: MIT */ package li.cil.oc2.api.bus.device.rpc; -import com.google.gson.JsonElement; import java.util.UUID; /** @@ -10,5 +9,11 @@ */ public interface IEventSink { - void postEvent(UUID sourceid, JsonElement msg); + /** + * Hand a message to the event sink to process + * + * @param sourceid The UUID of the originator, usually given by {@link RPCEventSource#subscribe(IEventSink, UUID)} + * @param msg The message. Should be serializable with gson + */ + void postEvent(UUID sourceid, Object msg); } diff --git a/src/main/java/li/cil/oc2/api/bus/device/rpc/RPCDevice.java b/src/main/java/li/cil/oc2/api/bus/device/rpc/RPCDevice.java index 68942d14..2fdd2178 100644 --- a/src/main/java/li/cil/oc2/api/bus/device/rpc/RPCDevice.java +++ b/src/main/java/li/cil/oc2/api/bus/device/rpc/RPCDevice.java @@ -8,6 +8,8 @@ import java.util.List; +import javax.annotation.Nullable; + /** * Provides an interface for an RPC device, describing the methods that can be * called on it and the type names it can be detected by/is compatible with. @@ -72,6 +74,25 @@ public interface RPCDevice extends Device { */ List getMethodGroups(); + /** + * Get an {@link RPCEventSource} that handles events for this device, if any + * + * By default, returns {@code this} if it's an event source, but can be overriden if the event source and the device + * are different objects. + * + * @return An {@link RPCEventSource} that can be used for subscription and unsubscription, or {@code null} if + * subscriptions are not supported. + */ + @Nullable + default RPCEventSource asEventSource() { + if (this instanceof RPCEventSource res) { + return res; + } + else { + return null; + } + } + /** * Called to start this device. *

diff --git a/src/main/java/li/cil/oc2/api/bus/device/rpc/RPCEventSource.java b/src/main/java/li/cil/oc2/api/bus/device/rpc/RPCEventSource.java index 8389244f..12d14a2b 100644 --- a/src/main/java/li/cil/oc2/api/bus/device/rpc/RPCEventSource.java +++ b/src/main/java/li/cil/oc2/api/bus/device/rpc/RPCEventSource.java @@ -5,10 +5,11 @@ import java.util.*; /** - * Provides an interface for an RPC event source. Blocks which whish to provide - * push notifications via the /dev/hvc0 serial devices should implement this. + * Provides an interface for an RPC event source. Blocks which wish to provide + * push notifications via the RPC bus serial device should implement this. * It is generally recommended to *also* provide documentation and a list of - * events by implementing DocumentedDevice and providing a listEvents() callback + * events by implementing {@link li.cil.oc2.api.bus.device.object.DocumentedDevice + * DocumentedDevice} and providing a {@code listEvents()} callback *

*/ public interface RPCEventSource { diff --git a/src/main/java/li/cil/oc2/common/bus/RPCDeviceBusAdapter.java b/src/main/java/li/cil/oc2/common/bus/RPCDeviceBusAdapter.java index 0268303e..495ec7a4 100644 --- a/src/main/java/li/cil/oc2/common/bus/RPCDeviceBusAdapter.java +++ b/src/main/java/li/cil/oc2/common/bus/RPCDeviceBusAdapter.java @@ -4,7 +4,6 @@ import com.google.gson.Gson; import com.google.gson.JsonArray; -import com.google.gson.JsonElement; import li.cil.ceres.api.Serialized; import li.cil.oc2.api.bus.DeviceBusController; import li.cil.oc2.api.bus.device.Device; @@ -16,8 +15,10 @@ import li.cil.oc2.common.serialization.gson.*; import li.cil.sedna.api.device.Steppable; import li.cil.sedna.api.device.serial.SerialDevice; -import li.cil.oc2.api.bus.device.object.*; import javax.annotation.Nullable; + +import org.apache.logging.log4j.LogManager; + import java.io.ByteArrayInputStream; import java.io.InputStreamReader; import java.nio.ByteBuffer; @@ -35,6 +36,7 @@ public final class RPCDeviceBusAdapter implements Steppable, IEventSink { public static final String ERROR_UNKNOWN_DEVICE = "unknown device"; public static final String ERROR_UNKNOWN_METHOD = "unknown method"; public static final String ERROR_INVALID_PARAMETER_SIGNATURE = "invalid parameter signature"; + public static final String ERROR_SUBSCRIPTIONS_NOT_SUPPORTED = "device does not support subscriptions"; /////////////////////////////////////////////////////////////////// @@ -46,6 +48,7 @@ public final class RPCDeviceBusAdapter implements Steppable, IEventSink { private final Set unmountedDevices = new HashSet<>(); private final Set mountedDevices = new HashSet<>(); private final Lock pauseLock = new ReentrantLock(); + private final Object receiveLock = new Object(); // Lock object for receive buffer private boolean isPaused; private boolean crmode = false; private final ArrayList subscriptions = new ArrayList<>(); @@ -55,6 +58,7 @@ public final class RPCDeviceBusAdapter implements Steppable, IEventSink { @Serialized private final ByteBuffer transmitBuffer; // for data written to device by VM @Serialized private ByteBuffer receiveBuffer; // for data written by device to VM @Serialized private MethodInvocation synchronizedInvocation; // pending main thread invocation + @Serialized private volatile long sequenceNumber = 0; /////////////////////////////////////////////////////////////////// @@ -65,6 +69,7 @@ public RPCDeviceBusAdapter(final SerialDevice serialDevice) { public RPCDeviceBusAdapter(final SerialDevice serialDevice, final int maxMessageSize) { this.serialDevice = serialDevice; this.transmitBuffer = ByteBuffer.allocate(maxMessageSize); + this.receiveBuffer = ByteBuffer.allocate(maxMessageSize); this.gson = RPCMethodParameterTypeAdapters.beginBuildGson() .registerTypeAdapter(byte[].class, new UnsignedByteArrayJsonSerializer()) .registerTypeAdapter(MethodInvocation.class, new MethodInvocationJsonDeserializer()) @@ -107,8 +112,9 @@ public void disposeDevices() { public void reset() { transmitBuffer.clear(); - receiveBuffer = null; + receiveBuffer.clear(); synchronizedInvocation = null; + sequenceNumber = 0; } public void pause() { @@ -122,6 +128,13 @@ public void pause() { } public void resume(final DeviceBusController controller, final boolean didDevicesChange) { + // Fix for upgrade from pre-event-support. Ideally this would be done on deserialization, but Ceres doesn't + // have a hook for that, and this should at least run before the buffer is needed. + if (receiveBuffer == null) { + // Default receive buffer size is the same as transmitBuffer capacity + receiveBuffer = ByteBuffer.allocate(transmitBuffer.capacity()); + } + isPaused = false; if (!didDevicesChange) { @@ -159,10 +172,10 @@ public void resume(final DeviceBusController controller, final boolean didDevice devicesByIdentifier.forEach((identifier, devices) -> { final RPCDeviceList device = new RPCDeviceList(devices); - // If there are no methods we have either no devices at all, or all synthetic - // devices, i.e. devices that only contribute type names, but have no methods - // to call. We do not expose these to avoid cluttering the device list. - if (device.getMethodGroups().isEmpty()) { + // If there are no methods or events we have either no devices at all, or all + // synthetic devices, i.e. devices that only contribute type names, but have + // no functionality. We do not expose these to avoid cluttering the device list. + if (device.getMethodGroups().isEmpty() && device.asEventSource() == null) { return; } @@ -210,7 +223,8 @@ public void tick() { // This is also used to prevent thread from processing messages, so only // reset this when we're done. Otherwise, we may get a race-condition when - // writing back data. + // writing back data, which would not cause interleaved messages but might + // confuse which results go with which method call. synchronizedInvocation = null; } } @@ -242,12 +256,24 @@ private UUID selectIdentifierDeterministically(final ArrayList identifiers } private void readFromDevice() { - // Only ever allow one pending message to avoid giving the VM the - // power of uncontrollably inflating memory usage. Basically any - // method of limiting the write queue size would work, but this is - // the most simple and easy to maintain one I could think of. + // Early return if we don't want to handle a new message. + // 1. Make sure receiveBuffer is empty so we can almost certainly write results back (not a guarantee if events + // are posted at the wrong time, especially if a device posts events while handling a method, but should be fine + // if there is no misbehaving device). + // 2. Make sure there is no pending synchronized method invocation so we only need to deal with one at once and + // we can be sure we respond to methods in the order called. + // Note that a synchronized method invocation is much more likely to have unrelated events post between the call + // and the results. + synchronized (receiveLock) { + if (receiveBuffer.position() != 0 || synchronizedInvocation != null) { + return; + } + } + int value; - while (receiveBuffer == null && synchronizedInvocation == null && (value = serialDevice.read()) >= 0) { + // Only ever read one message at a time. The first early return check *will* be invalidated by processing a + // message and the second also could be + while ((value = serialDevice.read()) >= 0) { if (value == 0 || value == 13) { this.crmode = value == 13; if (transmitBuffer.limit() > 0) { @@ -271,23 +297,34 @@ private void readFromDevice() { } private void writeToDevice() { - if (receiveBuffer == null) { - return; - } + synchronized (receiveLock) { + receiveBuffer.flip(); - while (receiveBuffer.hasRemaining() && serialDevice.canPutByte()) { - serialDevice.putByte(receiveBuffer.get()); - } - - serialDevice.flush(); + while (receiveBuffer.hasRemaining() && serialDevice.canPutByte()) { + serialDevice.putByte(receiveBuffer.get()); + } - if (!receiveBuffer.hasRemaining()) { - receiveBuffer = null; + receiveBuffer.compact(); } + serialDevice.flush(); } private void processMessage(final byte[] messageData) { - if (new String(messageData).trim().isEmpty()) { + // HACK: Linux thinks the RPC bus is a TTY, and when all file descriptors to it are closed and then one is + // opened again, the kernel resets the termios attributes and *turns on echo*. This sends a bunch of mangled + // garbage back down the bus to end up here. Part of the mangling is replacing control characters, including + // replacing the default message delimiter with "^@", so we can try to detect it. This doesn't work in crmode + // and it's a bit of a hack to try to catch here + // + // Medium term solution: keep a file descriptor open on Linux all the time, so the `stty` stuff only needs to be + // done once and echo doesn't turn back on. Also make a human-readable symlink from /dev/oc2r/rpc to /dev/hvc0, + // both because of the benefit of being human-readable (see also: /dev/disk/by-label with udev), and to ease the + // transition for the longer-term solution. + // Longer-term solution: Add an option to VirtIOConsoleDevice to present as a non-tty port; I have a proof of + // concept for that but it needs more work. This would move the bus to /dev/vport0p0, but the symlink can help + // ease the transition. + String messageString = new String(messageData).trim(); + if (messageString.isEmpty() || messageString.startsWith("^@")) { return; } @@ -333,50 +370,40 @@ private void processMessage(final byte[] messageData) { } @Override - public void postEvent(UUID deviceid, JsonElement msg) { + public void postEvent(UUID deviceid, Object msg) { writeMessage(Message.MESSAGE_TYPE_EVENT, new Object[]{deviceid, msg}); } private void subscribe(final UUID deviceId) { RPCDeviceList devices = devicesById.get(deviceId); - if (devices != null) { - for (RPCDevice device : devices.getDevices()) { - if (device instanceof ObjectDevice od) { - RPCEventSource res = od.asEventSource(); - if (res != null) { - res.subscribe(this, deviceId); - subscriptions.add(res); - return; - } - } - if (device instanceof RPCEventSource res) { - res.subscribe(this, deviceId); - subscriptions.add(res); - return; - } - } - writeError("device does not support subscriptions"); + if (devices == null) { + writeError(ERROR_UNKNOWN_DEVICE); + return; } - else { - writeError("unknown device"); + RPCEventSource res = devices.asEventSource(); + if (res == null) { + writeError(ERROR_SUBSCRIPTIONS_NOT_SUPPORTED); } + + res.subscribe(this, deviceId); + subscriptions.add(res); + writeMessage(Message.MESSAGE_TYPE_SUBSCRIBE, null); } + private void unsubscribe(final UUID deviceId) { RPCDeviceList devices = devicesById.get(deviceId); - if (devices != null) { - for (RPCDevice device : devices.getDevices()) { - if (device instanceof RPCEventSource res) { - res.unsubscribe(this); - subscriptions.remove(res); - } - else { - writeError("device does not support subscriptions"); - } - } + if (devices == null) { + writeError(ERROR_UNKNOWN_DEVICE); + return; } - else { - writeError("unknown device"); + RPCEventSource res = devices.asEventSource(); + if (res == null) { + writeError(ERROR_SUBSCRIPTIONS_NOT_SUPPORTED); } + + res.unsubscribe(this); + subscriptions.remove(res); + writeMessage(Message.MESSAGE_TYPE_UNSUBSCRIBE, null); } private void processMethodInvocation(final MethodInvocation methodInvocation, final boolean isMainThread) { @@ -458,35 +485,57 @@ private void writeError(final String message) { } private void writeMessage(final String type, @Nullable final Object data) { - if (receiveBuffer != null) throw new IllegalStateException(); - final String json = gson.toJson(new Message(type, data)); - final byte[] bytes = json.getBytes(); - final ByteBuffer receiveBuffer = ByteBuffer.allocate(bytes.length + MESSAGE_DELIMITER.length * 2); + // Start synchronization here to also include sequenceNumber increment + synchronized (receiveLock) { + final String json = gson.toJson(new Message(type, data, sequenceNumber++)); + final byte[] bytes = json.getBytes(); + final int messageLength = bytes.length + MESSAGE_DELIMITER.length * 2; + if (receiveBuffer.remaining() < messageLength) { + // Decide whether to resize or not + // The current heuristic is to resize for a large message (because + // that is probably intended by a mod author) and not for many small + // messages (because a computer user could use unlimited memory that + // way). + boolean reallocate = (receiveBuffer.capacity() <= messageLength); + LogManager.getLogger().warn( + "Attempted to send {} message without enough space (size {}, remaining {}), {}", + type, messageLength, receiveBuffer.remaining(), + reallocate ? "reallocating" : "ignoring"); + + if (!reallocate) { + // Return without writing anything. We already incremented sequenceNumber, so the VM can know + // something was missed when it reads the next message actually present. + return; + } - // In case we went through a reset and the VM was in the middle of reading - // a message we inject a delimiter up front to cause the truncated message - // to be discarded. - if (this.crmode) { - receiveBuffer.put(MESSAGE_DELIMITER2); - } - else { - receiveBuffer.put(MESSAGE_DELIMITER); - } + ByteBuffer newReceiveBuffer = ByteBuffer.allocate(messageLength * 2); + newReceiveBuffer.put(receiveBuffer.flip()); + receiveBuffer = newReceiveBuffer; + assert(messageLength < receiveBuffer.remaining()); + } - receiveBuffer.put(bytes); + // In case we went through a reset and the VM was in the middle of reading + // a message we inject a delimiter up front to cause the truncated message + // to be discarded. + if (this.crmode) { + receiveBuffer.put(MESSAGE_DELIMITER2); + } + else { + receiveBuffer.put(MESSAGE_DELIMITER); + } - // We follow up each message with a delimiter, too, so the VM knows when the - // message has been completed. This will lead to two delimiters between most - // messages. The VM is expected to ignore such "empty" messages. - if (this.crmode) { - receiveBuffer.put(MESSAGE_DELIMITER2); - } - else { - receiveBuffer.put(MESSAGE_DELIMITER); - } + receiveBuffer.put(bytes); - receiveBuffer.flip(); - this.receiveBuffer = receiveBuffer; + // We follow up each message with a delimiter, too, so the VM knows when the + // message has been completed. This will lead to two delimiters between most + // messages. The VM is expected to ignore such "empty" messages. + if (this.crmode) { + receiveBuffer.put(MESSAGE_DELIMITER2); + } + else { + receiveBuffer.put(MESSAGE_DELIMITER); + } + } } /////////////////////////////////////////////////////////////////// @@ -495,7 +544,7 @@ public record RPCDeviceWithIdentifier(UUID identifier, RPCDevice device) { } public record EmptyMethodGroup(String name) { } - public record Message(String type, @Nullable Object data) { + public record Message(String type, @Nullable Object data, long seq) { // Device -> VM public static final String MESSAGE_TYPE_LIST = "list"; public static final String MESSAGE_TYPE_METHODS = "methods"; diff --git a/src/main/java/li/cil/oc2/common/bus/device/rpc/RPCDeviceList.java b/src/main/java/li/cil/oc2/common/bus/device/rpc/RPCDeviceList.java index 112cdb3b..f52d01fe 100644 --- a/src/main/java/li/cil/oc2/common/bus/device/rpc/RPCDeviceList.java +++ b/src/main/java/li/cil/oc2/common/bus/device/rpc/RPCDeviceList.java @@ -2,13 +2,17 @@ package li.cil.oc2.common.bus.device.rpc; +import li.cil.oc2.api.bus.device.rpc.IEventSink; import li.cil.oc2.api.bus.device.rpc.RPCDevice; +import li.cil.oc2.api.bus.device.rpc.RPCEventSource; import li.cil.oc2.api.bus.device.rpc.RPCMethodGroup; import net.minecraft.nbt.CompoundTag; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Objects; +import java.util.UUID; import java.util.stream.Collectors; public record RPCDeviceList(ArrayList devices) implements RPCDevice { @@ -67,4 +71,29 @@ public CompoundTag serializeNBT() { public void deserializeNBT(final CompoundTag tag) { throw new UnsupportedOperationException(); } + + private record RPCEventSourceList(List sources) implements RPCEventSource { + @Override + public void subscribe(IEventSink dba, UUID sourceid) { + for (RPCEventSource source : sources) { + source.subscribe(dba, sourceid); + } + } + + @Override + public void unsubscribe(IEventSink dba) { + for (RPCEventSource source : sources) { + source.unsubscribe(dba); + } + } + } + + @Override + public RPCEventSource asEventSource() { + List sources = devices.stream() + .map(RPCDevice::asEventSource) + .filter(Objects::nonNull) + .toList(); + return sources.isEmpty() ? null : new RPCEventSourceList(sources); + } } diff --git a/src/main/java/li/cil/oc2/common/serialization/gson/MessageJsonDeserializer.java b/src/main/java/li/cil/oc2/common/serialization/gson/MessageJsonDeserializer.java index ecb41b50..1056f52c 100644 --- a/src/main/java/li/cil/oc2/common/serialization/gson/MessageJsonDeserializer.java +++ b/src/main/java/li/cil/oc2/common/serialization/gson/MessageJsonDeserializer.java @@ -21,7 +21,8 @@ public RPCDeviceBusAdapter.Message deserialize(final JsonElement json, final Typ case RPCDeviceBusAdapter.Message.MESSAGE_TYPE_INVOKE_METHOD -> context.deserialize(jsonObject.getAsJsonObject("data"), RPCDeviceBusAdapter.MethodInvocation.class); default -> throw new JsonParseException(RPCDeviceBusAdapter.ERROR_UNKNOWN_MESSAGE_TYPE + messageType); }; + final long messageSequenceNumber = jsonObject.has("seq") ? jsonObject.getAsJsonPrimitive("seq").getAsLong() : -1; - return new RPCDeviceBusAdapter.Message(messageType, messageData); + return new RPCDeviceBusAdapter.Message(messageType, messageData, messageSequenceNumber); } }