Skip to content

Container builders for the persist source#36514

Open
antiguru wants to merge 1 commit into
MaterializeInc:mainfrom
antiguru:persist_source_cb
Open

Container builders for the persist source#36514
antiguru wants to merge 1 commit into
MaterializeInc:mainfrom
antiguru:persist_source_cb

Conversation

@antiguru
Copy link
Copy Markdown
Member

Motivation

The persist source hardcodes its row and error outputs to Vec<(_, Timestamp, Diff)> containers, which forces every downstream operator to consume that exact shape and prevents callers from selecting a more suitable container builder (e.g. for columnar batches).

Description

Generalize persist_source over caller-supplied container builders DCB/ECB (each ContainerBuilder + PushInto<(_, Timestamp, Diff)>) and switch the ok/err demux from ok_err to a unary_fallible operator parameterized over them. All current callers pass CapacityContainerBuilder<_> to preserve existing behavior.

Supporting changes propagate the same generalization through txn-wal::operator::txns_progress and txns_progress_frontiers (now over Stream<_, P: Container>), relax builder_async's output/input handles to accept any ContainerBuilder/Container, and add a NoopContainerBuilder in mz-timely-util::containers for the pass-through case.

See the commit message for the full breakdown.

@antiguru antiguru requested review from a team and aljoscha as code owners May 11, 2026 19:45
@antiguru antiguru requested a review from DAlperin May 11, 2026 19:47
@antiguru antiguru force-pushed the persist_source_cb branch from 6e6d122 to 4515d68 Compare May 11, 2026 19:57
Generalize `persist_source` over container builders for the row and
error outputs, replacing the hardcoded `Vec<(_, Timestamp, Diff)>`
containers with caller-supplied `DCB` and `ECB` type parameters that
implement `ContainerBuilder + PushInto<(_, Timestamp, Diff)>`. The
ok/err demux switches from `ok_err` to a `unary_fallible` operator
parameterized over these builders so finished batches flow through the
chosen builder. Existing callers in `storage` and `compute` pass
`CapacityContainerBuilder<_>` to preserve current behavior.

Supporting changes:

* `txn-wal::operator::txns_progress` and `txns_progress_frontiers`
  accept a generic `Stream<_, P: Container>` rather than `StreamVec`,
  and the frontier operator uses the new `NoopContainerBuilder` since
  it forwards whole containers without building from elements.
* Add `NoopContainerBuilder` to `mz-timely-util::containers` for the
  pass-through case where only whole containers exist.
* Relax `AsyncOutputHandle::give_container` and the async input
  handles in `builder_async` to operate over any `ContainerBuilder` /
  `Container`, not just `CapacityContainerBuilder<C>` / `C: Clone`.
@antiguru antiguru force-pushed the persist_source_cb branch from 4515d68 to ef9e048 Compare May 11, 2026 20:23
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant