Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .tool-versions
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
elixir 1.16.0-otp-26
erlang 26.2.1
elixir 1.19-otp-27
erlang 27.3.2
2 changes: 2 additions & 0 deletions guides/Subscriptions.md
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,8 @@ By default a subscription will only allow a single subscriber but you can opt-in

- `buffer_size` limits how many in-flight events will be sent to the subscriber process before acknowledgement of successful processing. This limits the number of messages sent to the subscriber and stops their message queue from getting filled with events. Defaults to one in-flight event.

- `buffer_flush_after` (milliseconds) ensures events are flushed to the subscriber after a period of time even if the buffer size has not been reached. This ensures events are delivered with bounded latency during less busy periods. When set to 0 (default), no time-based flushing is performed and events are only sent when the buffer_size is reached. Each partition has its own independent timer. If a subscriber is at capacity when the timer fires, events remain queued and the timer is automatically restarted to ensure eventual delivery with bounded latency.

- `partition_by` is an optional function used to partition events to subscribers. It can be used to guarantee processing order when multiple subscribers have subscribed to a single subscription as described in [Ordering guarantee](#ordering-guarantee) below. The function is passed a single argument (an `EventStore.RecordedEvent` struct) and must return the partition key. As an example to guarantee events for a single stream are processed serially, but different streams are processed concurrently, you could use the `stream_uuid` as the partition key.

### Ordering guarantee
Expand Down
10 changes: 10 additions & 0 deletions lib/event_store.ex
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ defmodule EventStore do
@type transient_subscribe_options :: [transient_subscribe_option]
@type persistent_subscription_option ::
transient_subscribe_option
| {:buffer_flush_after, non_neg_integer()}
| {:buffer_size, pos_integer()}
| {:checkpoint_after, non_neg_integer()}
| {:checkpoint_threshold, pos_integer()}
Expand Down Expand Up @@ -1146,6 +1147,15 @@ defmodule EventStore do
message queue from getting filled with events. Defaults to one in-flight
event.

- `buffer_flush_after` (milliseconds) used to ensure events are flushed
to the subscriber after a period of time even if the buffer size has not
been reached. This ensures events are delivered with bounded latency
during less busy periods. When set to 0 (default), no time-based
flushing is performed and events are only sent when the buffer_size is
reached. Each partition has its own independent timer. If a subscriber
is at capacity when the timer fires, events remain queued and the timer
is automatically restarted to ensure eventual delivery with bounded latency.

- `checkpoint_threshold` determines how frequently a checkpoint is written
to the database for the subscription after events are acknowledged.
Increasing the threshold will reduce the number of database writes for
Expand Down
9 changes: 8 additions & 1 deletion lib/event_store/storage/snapshot.ex
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,14 @@ defmodule EventStore.Storage.Snapshot do
end
end

defp to_snapshot_from_row([source_uuid, source_version, source_type, data, metadata, created_at]) do
defp to_snapshot_from_row([
source_uuid,
source_version,
source_type,
data,
metadata,
created_at
]) do
%SnapshotData{
source_uuid: source_uuid,
source_version: source_version,
Expand Down
22 changes: 22 additions & 0 deletions lib/event_store/subscriptions/subscription.ex
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,18 @@ defmodule EventStore.Subscriptions.Subscription do
{:noreply, state}
end

@impl GenServer
def handle_info({:flush_buffer, partition_key}, %Subscription{} = state) do
%Subscription{subscription: subscription} = state

state =
subscription
|> SubscriptionFsm.flush_buffer(partition_key)
|> apply_subscription_to_state(state)

{:noreply, state}
end

@impl GenServer
def handle_info(
{EventStore.AdvisoryLocks, :lock_released, lock_ref, reason},
Expand Down Expand Up @@ -254,6 +266,10 @@ defmodule EventStore.Subscriptions.Subscription do
@impl GenServer
def terminate(_reason, state) do
%Subscription{subscription: subscription} = state
%SubscriptionFsm{data: subscription_data} = subscription

# Cancel all buffer flush timers before terminating
SubscriptionState.cancel_all_buffer_timers(subscription_data)

# Checkpoint subscription if needed before terminating
SubscriptionFsm.checkpoint(subscription)
Expand Down Expand Up @@ -291,6 +307,12 @@ defmodule EventStore.Subscriptions.Subscription do
defp handle_subscription_state(
%Subscription{subscription: %SubscriptionFsm{state: :max_capacity}} = state
) do
Logger.debug(describe(state) <> " at max capacity, continuing to fetch new events")

# Even though subscriber is at capacity, continue fetching events from storage
# and queue them. When subscriber ACKs pending events, queued events will be sent.
:ok = GenServer.cast(self(), :catch_up)

state
end

Expand Down
Loading
Loading