ChronoPubSub: simple pub/sub plugin for ChronoLog (#355)#634
Open
iameneko wants to merge 4 commits into
Open
Conversation
Adds a new client-side plugin that wraps the ChronoLog client with a simple pub/sub interface. Topics map to stories under a default chronicle; publish() appends an event, subscribe()/subscribe_from() spawn a polling thread that replays events from a watermark and dispatches each to a user callback. Mirrors the chronokvs plugin layout: noexcept Create() factories, mapper + client adapter layers, per-plugin LogLevel and macros. Includes a publisher/subscriber example pair, GTest unit coverage for logger and config loading, and a manual integration test that publishes N messages, verifies ordered delivery, exercises live delivery on a second topic, and asserts no callbacks fire after unsubscribe.
…ation test The first run failed because the polling subscriber hammered ReplayStory before keepers had propagated events to a player, getting CL_ERR_QUERY_TIMED_OUT on every attempt. Mirror chronokvs's pattern: publish, flush, sleep through the propagation window, then subscribe and let the polling thread deliver. Also lengthen the test poll interval to 1s and downgrade the mapper's "replay failed" log to WARNING so retries during propagation don't flood stderr.
…le thrash Subscribing before publishing on the same topic causes the polling thread's ReleaseStory to invalidate the publisher's cached handle every cycle; in practice events don't surface in one propagation window. Mirror Phase 3's publish → flush → wait → subscribe ordering on the second topic too.
…nd topic Phase 4 still saw 0/10 deliveries because the Phase 3 subscriber's 1Hz polling loop kept Acquire/ReleaseStory-ing the same chronicle during the second topic's propagation window, measurably delaying when its events became readable. Stop sub_id before publishing on kSecondTopic; verify the re-unsubscribe contract in Phase 5.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Closes #355.
This adds ChronoPubSub, a new client-side plugin that wraps the ChronoLog client with a simple publish/subscribe API. Topics are stories under a single default chronicle:
publish(topic, payload)appends a ChronoLog event, andsubscribe(topic, callback)/subscribe_from(topic, since_ts, callback)spawn a background polling thread that replays the topic from a per-subscription watermark and dispatches every new event to the user callback in timestamp order. The plugin sits entirely on top of the existing client API, no client changes needed, and the constructor pattern matches chronokvs/chronosql so it can target distributed deployments via a JSON config.The design intentionally favors extensibility over performance: the polling cadence is per-subscription (default 100 ms), each subscription runs an independent worker, and unsubscribe is safe to call from inside or outside the callback. Cross-process subscribers see published messages only after the publisher calls
flush()(or destructs); this matches ChronoLog's read-after-release semantics and is documented in the API. Coverage includes a logger unit test, a config-loading unit test (no live stack required), and a manual integration test that publishes N messages, verifies ordered delivery from the beginning of the topic, exercises live delivery on a second topic, and asserts no callbacks fire after unsubscribe. A publisher/subscriber example pair is shipped underPlugins/chronopubsub/examples/.