sn/object: Optimize RANGE handling#3967
Conversation
c6358d6 to
c044e08
Compare
Refs #3902. Signed-off-by: cthulhurider <ctulhurider@gmail.com>
c044e08 to
50c02e5
Compare
Closes #3902. Signed-off-by: cthulhurider <ctulhurider@gmail.com>
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #3967 +/- ##
==========================================
- Coverage 28.01% 27.95% -0.06%
==========================================
Files 680 682 +2
Lines 45295 45631 +336
==========================================
+ Hits 12691 12758 +67
- Misses 31427 31689 +262
- Partials 1177 1184 +7 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
50c02e5 to
e2b7120
Compare
There was a problem hiding this comment.
Pull request overview
This PR optimizes ObjectService/GetRange handling by reducing re-encoding/copying during RANGE forwarding, and by introducing buffered payload-range reads in local storage paths to avoid extra allocations and redundant parsing.
Changes:
- Added low-level RANGE forwarding that proxies raw protobuf buffers (similar to existing GET forwarding), plus shared helpers for chunk/split-info handling.
- Introduced buffered payload-range reading APIs across blobstor/writecache/shard/engine layers (
ReadPayloadRange,ReadRange,ReadECPartRange) and wired them into GetRange execution. - Added new engine metrics for payload-range reads and updated tests/benchmarks and changelog accordingly.
Reviewed changes
Copilot reviewed 40 out of 40 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| pkg/services/object/util.go | New shared helpers for split-info parsing and chunk response re-packaging/signing. |
| pkg/services/object/server.go | Reworked GetRange flow to optionally stream from local buffered range reader; added copyRangeStream. |
| pkg/services/object/range.go | New optimized RANGE forwarder that proxies raw gRPC buffers and trims chunks without full decode. |
| pkg/services/object/proto.go | Generalized payload-chunk shifting to support both GET and RANGE chunk field tags. |
| pkg/services/object/get/util.go | Updated forwarding paths and added local buffered range read integration in storage wrapper. |
| pkg/services/object/get/service_test.go | Updated local storage test stub for new EC part range read method. |
| pkg/services/object/get/service.go | Added buffered EC-part range read path and passed buffer/forwarding options into execution. |
| pkg/services/object/get/prm.go | Extended RangePrm to support local buffer + submit callback and a dedicated RANGE forwarder function. |
| pkg/services/object/get/get.go | Routed RANGE forwarding through a dedicated forwarder and added local-buffer execution option. |
| pkg/services/object/get/forward.go | Added forwardRangeRequest helper for RANGE proxying across node lists. |
| pkg/services/object/get/exec.go | Added exec options/fields for buffered range reads and range forwarding; removed old generic forwarder check. |
| pkg/services/object/get.go | Refactored GET proxy logic to reuse shared helpers (chunk response + split info). |
| pkg/metrics/engine.go | Added histograms and helpers for payload-range and EC-part-range read durations. |
| pkg/local_object_storage/writecache/writecache.go | Extended cache interface to return payload length from GetRangeStream and added ReadPayloadRange. |
| pkg/local_object_storage/writecache/get_test.go | Updated tests for new GetRangeStream signature (payload length return). |
| pkg/local_object_storage/writecache/get.go | Updated GetRangeStream signature; added ReadPayloadRange implementation. |
| pkg/local_object_storage/shard/shard_internal_test.go | Updated mocks/stubs for new range stream signatures and added range read methods. |
| pkg/local_object_storage/shard/range_internal_test.go | Adjusted range tests to use new range-stream semantics and uint64 bounds. |
| pkg/local_object_storage/shard/range.go | Reworked range reads to use underlying storage/cache range APIs; added buffered ReadRange. |
| pkg/local_object_storage/shard/get.go | Added shard-level buffered ReadPayloadRange. |
| pkg/local_object_storage/shard/ec_test.go | Updated EC range tests for uint64 bounds and new range-stream plumbing. |
| pkg/local_object_storage/shard/ec.go | Added buffered ReadECPartRange; refactored EC-part-range logic to reuse range retrieval functions. |
| pkg/local_object_storage/engine/metrics.go | Extended engine metrics interface for new range read durations. |
| pkg/local_object_storage/engine/get.go | Added StorageEngine.ReadPayloadRange with metrics support. |
| pkg/local_object_storage/engine/engine_test.go | Updated mocks/stubs for new range signatures and metrics methods. |
| pkg/local_object_storage/engine/engine.go | Updated shard interface to use uint64 bounds and added buffered range APIs. |
| pkg/local_object_storage/engine/ec_test.go | Updated EC range tests for new uint64 bounds and behavior changes. |
| pkg/local_object_storage/engine/ec.go | Added buffered ReadECPartRange and refactored EC-part-range logic to share metric/error handling. |
| pkg/local_object_storage/blobstor/internal/storagetest/get_range.go | Updated storage conformance tests for new GetRangeStream return signature. |
| pkg/local_object_storage/blobstor/internal/storagetest/delete.go | Updated delete-related tests for new GetRangeStream signature. |
| pkg/local_object_storage/blobstor/fstree/util.go | Added shared range validation/bounds helpers. |
| pkg/local_object_storage/blobstor/fstree/range_bench_test.go | Updated benchmarks for new GetRangeStream signature. |
| pkg/local_object_storage/blobstor/fstree/head.go | Refactored internals (_readObject) to support buffered range reads. |
| pkg/local_object_storage/blobstor/fstree/fstree_test.go | Added tests for ReadPayloadRange and refactored range tests to run against both range APIs. |
| pkg/local_object_storage/blobstor/fstree/fstree.go | Changed GetRangeStream to return payload length; added ReadPayloadRange buffered implementation. |
| pkg/local_object_storage/blobstor/common/storage.go | Updated storage interface: GetRangeStream returns payload length; added ReadPayloadRange. |
| internal/protobuf/seekers.go | Added GetUint64Field helper for scanning uint64 protobuf fields. |
| internal/protobuf/protoscan/messages.go | Added scan scheme for GetRangeResponse.Body oneof parsing. |
| internal/object/wire.go | Added helper to find payload length and payload-field offset from wire bytes. |
| CHANGELOG.md | Documented RANGE forwarding and local RANGE execution optimizations. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // ReadPayloadRange is [Cache.ReadObject] analogue for payload range reading. | ||
| // Zero range means full payload. Returns full payload range length. | ||
| // | ||
| // If given range is out of payload bounds, ReadPayloadRange returns | ||
| // [apistatus.ErrObjectOutOfRange]. | ||
| func (c *cache) ReadPayloadRange(addr oid.Address, off, ln uint64, buf []byte) (io.ReadCloser, error) { |
There was a problem hiding this comment.
Doc comment says "Returns full payload range length", but ReadPayloadRange returns only (io.ReadCloser, error). Update the comment to avoid implying an unavailable return value.
| if err != nil { | ||
| return 0, err | ||
| } | ||
|
|
There was a problem hiding this comment.
GetUint64Field doesn't handle the "field is missing" case: SeekFieldByNumber can return off < 0, which will make buf[off+tagLn:] panic. This contradicts the docstring and can crash callers (e.g., GetPayloadLengthAndFieldOffset when payload length isn't present). Handle off < 0 by returning (0, nil) similar to GetLENFieldBounds, and validate typ before parsing.
| if off < 0 { | |
| return 0, nil | |
| } | |
| if typ != protowire.VarintType { | |
| return 0, fmt.Errorf("field %d: got wire type %d, want %d", num, typ, protowire.VarintType) | |
| } |
| var prog rangeStreamProgress | ||
| for { | ||
| var respBuf mem.BufferSlice | ||
| if err = stream.RecvMsg(&respBuf); err != nil { | ||
| if errors.Is(err, io.EOF) { | ||
| if prog.empty { | ||
| return io.ErrUnexpectedEOF | ||
| } | ||
| return nil | ||
| } |
There was a problem hiding this comment.
rangeStreamProgress.empty is never set, so the io.EOF path can't reliably detect an empty stream. As written, prog.empty defaults to false, so an immediate EOF (no responses) will be treated as success. Initialize empty=true and set it to false after the first successfully received message (or remove the flag and use a clearer condition).
| opts.InterceptNested = func(num protowire.Number, buffers iprotobuf.BuffersSlice) error { | ||
| switch num { | ||
| default: | ||
| return protoscan.ErrContinue | ||
| case protoobject.FieldGetResponseBodySplitInfo: | ||
| oneofNum, oneofFld = num, buffers | ||
| return nil | ||
| } | ||
| } |
There was a problem hiding this comment.
handleResponseBody intercepts split info using protoobject.FieldGetResponseBodySplitInfo, but the scanned scheme is ObjectGetRangeResponseBodyScheme (which uses FieldRangeResponseBodySplitInfo). This prevents split-info responses from being recognized and will fall through to the "none of the supported oneof fields" error. Use FieldRangeResponseBodySplitInfo in the InterceptNested switch.
| initial = make([]byte, binary.MaxVarintLen64) | ||
| n = copy(initial, initial[pldFldOff:]) |
There was a problem hiding this comment.
In the varint-parse recovery branch, initial = make(...); n = copy(initial, initial[pldFldOff:]) copies from the newly allocated slice (all zeros) instead of the previous buffer, so the payload-length varint will be corrupted and parsing will fail. Preserve the old slice (e.g., tmp := initial) before re-slicing/reallocating and copy from the old data.
| initial = make([]byte, binary.MaxVarintLen64) | |
| n = copy(initial, initial[pldFldOff:]) | |
| tmp := initial | |
| initial = make([]byte, binary.MaxVarintLen64) | |
| n = copy(initial, tmp[pldFldOff:]) |
| } | ||
|
|
||
| extra, err := io.ReadFull(stream, initial[pldFldOff+n:]) | ||
| if err != nil && !errors.Is(err, io.ErrUnexpectedEOF) && !errors.Is(err, io.EOF) { | ||
| if stream != nil { | ||
| stream.Close() | ||
| } | ||
| return nil, fmt.Errorf("read stream: %w", err) | ||
| } |
There was a problem hiding this comment.
ReadPayloadRange can call io.ReadFull(stream, ...) even when stream is nil (the function explicitly allows stream to be nil). If ParseVarint returns EOF/UnexpectedEOF and stream is nil, this will panic. Add an explicit stream==nil check in this recovery path and return an error indicating truncated/corrupted object data instead of reading from a nil reader.
| // ReadPayloadRange is [FSTree.ReadObject] analogue for payload range reading. | ||
| // Zero range means full payload. Returns full payload range length. | ||
| // | ||
| // If given range is out of payload bounds, ReadPayloadRange returns | ||
| // [apistatus.ErrObjectOutOfRange]. | ||
| func (t *FSTree) ReadPayloadRange(addr oid.Address, off, ln uint64, hdrBuf []byte) (io.ReadCloser, error) { |
There was a problem hiding this comment.
Doc comment says "Returns full payload range length", but the function signature returns only (io.ReadCloser, error). Please update the comment to match the API (or return the length if that's actually required).
| GetBytes(oid.Address) ([]byte, error) | ||
| Get(oid.Address) (*object.Object, error) | ||
| GetRangeStream(addr oid.Address, off uint64, ln uint64) (io.ReadCloser, error) | ||
| GetRangeStream(addr oid.Address, off uint64, ln uint64) (uint64, io.ReadCloser, error) |
There was a problem hiding this comment.
Technically read-closer should be sufficient. Also, we know payload length from the metabase normally. But I can live with this for now.
| // | ||
| // If given range is out of payload bounds, ReadPayloadRange returns | ||
| // [apistatus.ErrObjectOutOfRange]. | ||
| func (t *FSTree) ReadPayloadRange(addr oid.Address, off, ln uint64, hdrBuf []byte) (io.ReadCloser, error) { |
| func (t *FSTree) GetRangeStream(addr oid.Address, off uint64, ln uint64) (io.ReadCloser, error) { | ||
| if ln == 0 && off != 0 { | ||
| return nil, fmt.Errorf("invalid range off=%d,ln=0", off) | ||
| func (t *FSTree) GetRangeStream(addr oid.Address, off uint64, ln uint64) (uint64, io.ReadCloser, error) { |
There was a problem hiding this comment.
How about allocating a buffer and calling ReadPayloadRange internally? It's all the same.
| return nil, apistatus.ErrObjectOutOfRange | ||
| } | ||
|
|
||
| if pldFldOff < 0 { |
There was a problem hiding this comment.
Check it before checkPayloadBounds?
|
|
||
| _, n, err := iprotobuf.ParseVarint(initial[pldFldOff:]) | ||
| if err != nil { | ||
| if !errors.Is(err, io.ErrUnexpectedEOF) && !errors.Is(err, io.EOF) { |
There was a problem hiding this comment.
Can this realistically happen for size-limited header? Combined reading? I'd try to avoid retries if possible.
| // similar to [Shard.ReadHeader]. | ||
| func (s *Shard) ReadECPartRange(cnr cid.ID, parent oid.ID, pi iec.PartInfo, off, ln uint64, buf []byte) (io.ReadCloser, error) { | ||
| var stream io.ReadCloser | ||
| return stream, s.getECPartRangeFunc(cnr, parent, pi, off, ln, func(writeCache writecache.Cache, addr oid.Address, off, ln uint64) error { |
| func (s *Shard) GetECPartRange(cnr cid.ID, parent oid.ID, pi iec.PartInfo, off, ln uint64) (uint64, io.ReadCloser, error) { | ||
| var pldLen uint64 | ||
| var stream io.ReadCloser | ||
| return pldLen, stream, s.getECPartRangeFunc(cnr, parent, pi, off, ln, func(writeCache writecache.Cache, addr oid.Address, off, ln uint64) error { |
| } | ||
|
|
||
| skipMeta = skipMeta || s.info.Mode.NoMetabase() | ||
| gotMeta, err := s.fetchObjectData(addr, skipMeta, cb, wc) |
There was a problem hiding this comment.
A nice example of more readable thing, btw.
| func (s *Shard) GetRangeStream(cnr cid.ID, id oid.ID, off, ln uint64) (uint64, io.ReadCloser, error) { | ||
| var pldLen uint64 | ||
| var stream io.ReadCloser | ||
| return pldLen, stream, s.getRangeStreamFunc(cnr, id, off, ln, func(writeCache writecache.Cache, addr oid.Address, off, ln uint64) error { |
| // similar to [Shard.ReadHeader]. | ||
| func (s *Shard) ReadRange(cnr cid.ID, id oid.ID, off, ln uint64, buf []byte) (io.ReadCloser, error) { | ||
| var stream io.ReadCloser | ||
| return stream, s.getRangeStreamFunc(cnr, id, off, ln, func(writeCache writecache.Cache, addr oid.Address, off, ln uint64) error { |
|
BTW, does it close #3898? |
No description provided.