Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ Changelog for NeoFS Node
- `neofs-cli`, `neofs-node` do not allow searches based on homomorphic hash values (#3847)
- Storage nodes do not calculate homomorphic hashes for objects (#3847)
- Optimized GET/HEAD request forwarding (#3877)
- Optimized local RANGE request execution (#3967)

### Removed
- `policer.max_workers` configuration (#3920)
Expand Down
36 changes: 36 additions & 0 deletions internal/object/wire.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,3 +253,39 @@ loop:

return idf, sigf, hdrf, nil
}

// GetPayloadLengthAndFieldOffset reads payload length header and seeks payload
// field in buf. If payload field is missing, negative offset returns.
// Otherwise, the offset points to protobuf LV.
//
// If any field is missing, no error is returned.
//
// Message should have ascending field order, otherwise error returns.
func GetPayloadLengthAndFieldOffset(buf []byte) (uint64, int, error) {
// TODO: traverse buffer at once
hf, err := iprotobuf.GetLENFieldBounds(buf, protoobject.FieldObjectHeader)
if err != nil {
return 0, 0, fmt.Errorf("seek header field: %w", err)
}

var pldLen uint64
if !hf.IsMissing() {
pldLen, err = iprotobuf.GetUint64Field(buf[hf.ValueFrom:hf.To], protoobject.FieldHeaderPayloadLength)
if err != nil {
return 0, 0, fmt.Errorf("seek payload length field in header: %w", err)
}
}

off, tagLn, typ, err := iprotobuf.SeekFieldByNumber(buf, protoobject.FieldObjectPayload)
if err != nil {
return 0, 0, fmt.Errorf("seek payload field: %w", err)
}
if off < 0 {
return pldLen, off, nil
}
if typ != protowire.BytesType {
return 0, 0, fmt.Errorf("wrong payload field type: expected %d, got %d", protowire.BytesType, typ)
}

return pldLen, off + tagLn, nil
}
16 changes: 16 additions & 0 deletions internal/protobuf/seekers.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,19 @@ func GetLENFieldBounds(buf []byte, num protowire.Number) (FieldBounds, error) {

return ParseLENFieldBounds(buf, off, tagLn, num, typ)
}

// GetUint64Field seeks uint64 field in buf by number and parses it. If field is
// missing, no error is returned.
//
// Message should have ascending field order, otherwise error returns.
//
// If there is an error, its text contains num.
func GetUint64Field(buf []byte, num protowire.Number) (uint64, error) {
off, tagLn, typ, err := SeekFieldByNumber(buf, num)
if err != nil {
return 0, err
}

Copy link

Copilot AI Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetUint64Field doesn't handle the "field is missing" case: SeekFieldByNumber can return off < 0, which will make buf[off+tagLn:] panic. This contradicts the docstring and can crash callers (e.g., GetPayloadLengthAndFieldOffset when payload length isn't present). Handle off < 0 by returning (0, nil) similar to GetLENFieldBounds, and validate typ before parsing.

Suggested change
if off < 0 {
return 0, nil
}
if typ != protowire.VarintType {
return 0, fmt.Errorf("field %d: got wire type %d, want %d", num, typ, protowire.VarintType)
}

Copilot uses AI. Check for mistakes.
u, _, err := ParseUint64Field(buf[off+tagLn:], num, typ)
return u, err
}
3 changes: 2 additions & 1 deletion pkg/local_object_storage/blobstor/common/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,12 @@ type Storage interface {
// binary format. Returns [apistatus.ObjectNotFound] if object is missing.
GetBytes(oid.Address) ([]byte, error)
Get(oid.Address) (*object.Object, error)
GetRangeStream(addr oid.Address, off uint64, ln uint64) (io.ReadCloser, error)
GetRangeStream(addr oid.Address, off uint64, ln uint64) (uint64, io.ReadCloser, error)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically read-closer should be sufficient. Also, we know payload length from the metabase normally. But I can live with this for now.

GetStream(oid.Address) (*object.Object, io.ReadCloser, error)
Head(oid.Address) (*object.Object, error)
ReadHeader(oid.Address, []byte) (int, error)
ReadObject(oid.Address, []byte) (int, io.ReadCloser, error)
ReadPayloadRange(oid.Address, uint64, uint64, []byte) (io.ReadCloser, error)
Exists(oid.Address) (bool, error)
Put(oid.Address, []byte) error
PutBatch(map[oid.Address][]byte) error
Expand Down
186 changes: 175 additions & 11 deletions pkg/local_object_storage/blobstor/fstree/fstree.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"strings"
"time"

objectwire "github.com/nspcc-dev/neofs-node/internal/object"
iprotobuf "github.com/nspcc-dev/neofs-node/internal/protobuf"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/compression"
"github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr"
Expand Down Expand Up @@ -530,46 +532,208 @@ func (t *FSTree) GetStream(addr oid.Address) (*object.Object, io.ReadCloser, err
//
// If the range is out of payload bounds, GetRangeStream returns
// [apistatus.ErrObjectOutOfRange].
func (t *FSTree) GetRangeStream(addr oid.Address, off uint64, ln uint64) (io.ReadCloser, error) {
if ln == 0 && off != 0 {
return nil, fmt.Errorf("invalid range off=%d,ln=0", off)
func (t *FSTree) GetRangeStream(addr oid.Address, off uint64, ln uint64) (uint64, io.ReadCloser, error) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about allocating a buffer and calling ReadPayloadRange internally? It's all the same.

if err := verifyRequestedRange(off, ln); err != nil {
return 0, nil, err
}

// TODO: we need only one header field. Consider decoding only it + jumping to payload
hdr, stream, err := t.getObjectStream(addr)
if err != nil {
return nil, err
return 0, nil, err
}

pldLen := hdr.PayloadSize()

if ln == 0 && off == 0 {
return stream, nil
return pldLen, stream, nil
}

if off >= pldLen || pldLen-off < ln {
if !checkPayloadBounds(pldLen, off, ln) {
stream.Close()
return nil, apistatus.ErrObjectOutOfRange
return 0, nil, apistatus.ErrObjectOutOfRange
}

if off > math.MaxInt64 || ln > math.MaxInt64 { // 8 exabytes, amply
if err := checkTooBigRange(off, ln); err != nil {
stream.Close()
return nil, fmt.Errorf("range overflowing int64 is not supported by this server: off=%d,len=%d", off, ln)
return 0, nil, err
}

if off > 0 {
if _, err := stream.Seek(int64(off), io.SeekStart); err != nil {
stream.Close()
return nil, fmt.Errorf("seek offset in payload stream: %w", err)
return 0, nil, fmt.Errorf("seek offset in payload stream: %w", err)
}
}

return readerCloser{
return pldLen, readerCloser{
Reader: io.LimitReader(stream, int64(ln)),
Closer: stream,
}, nil
}

// ReadPayloadRange is [FSTree.ReadObject] analogue for payload range reading.
// Zero range means full payload. Returns full payload range length.
//
// If given range is out of payload bounds, ReadPayloadRange returns
// [apistatus.ErrObjectOutOfRange].
func (t *FSTree) ReadPayloadRange(addr oid.Address, off, ln uint64, hdrBuf []byte) (io.ReadCloser, error) {
Comment on lines +575 to +580
Copy link

Copilot AI Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doc comment says "Returns full payload range length", but the function signature returns only (io.ReadCloser, error). Please update the comment to match the API (or return the length if that's actually required).

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refs. #2925 (dancing with hdrBuf and proto).

if err := verifyRequestedRange(off, ln); err != nil {
return nil, err
}

initial, stream, err := t._readObject(addr, hdrBuf)
if err != nil {
return nil, err
}

// stream can be nil
pldLen, pldFldOff, err := objectwire.GetPayloadLengthAndFieldOffset(initial)
if err != nil {
if stream != nil {
stream.Close()
}
return nil, fmt.Errorf("get payload length and field in read header: %w", err)
}

if ln != 0 && !checkPayloadBounds(pldLen, off, ln) {
if stream != nil {
stream.Close()
}
return nil, apistatus.ErrObjectOutOfRange
}

if pldFldOff < 0 {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check it before checkPayloadBounds?

if pldLen != 0 {
return nil, fmt.Errorf("missing payload field tag in %d bytes header, payload len in header = %d", len(initial), pldLen)
}
return nopReadCloser{}, nil
}

_, n, err := iprotobuf.ParseVarint(initial[pldFldOff:])
if err != nil {
if !errors.Is(err, io.ErrUnexpectedEOF) && !errors.Is(err, io.EOF) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this realistically happen for size-limited header? Combined reading? I'd try to avoid retries if possible.

if stream != nil {
stream.Close()
}
return nil, fmt.Errorf("parse payload field len: %w", err)
}

if pldFldOff+binary.MaxVarintLen64 > len(initial) {
if pldFldOff >= binary.MaxVarintLen64 {
n = copy(initial[pldFldOff-binary.MaxVarintLen64:], initial[pldFldOff:])
pldFldOff -= binary.MaxVarintLen64
} else {
initial = make([]byte, binary.MaxVarintLen64)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just copy(initial, initial[pldFldOff:])? Maybe it's fine both branches, btw.

n = copy(initial, initial[pldFldOff:])
Comment on lines +627 to +628
Copy link

Copilot AI Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the varint-parse recovery branch, initial = make(...); n = copy(initial, initial[pldFldOff:]) copies from the newly allocated slice (all zeros) instead of the previous buffer, so the payload-length varint will be corrupted and parsing will fail. Preserve the old slice (e.g., tmp := initial) before re-slicing/reallocating and copy from the old data.

Suggested change
initial = make([]byte, binary.MaxVarintLen64)
n = copy(initial, initial[pldFldOff:])
tmp := initial
initial = make([]byte, binary.MaxVarintLen64)
n = copy(initial, tmp[pldFldOff:])

Copilot uses AI. Check for mistakes.
pldFldOff = 0
}
}

extra, err := io.ReadFull(stream, initial[pldFldOff+n:])
if err != nil && !errors.Is(err, io.ErrUnexpectedEOF) && !errors.Is(err, io.EOF) {
if stream != nil {
stream.Close()
}
return nil, fmt.Errorf("read stream: %w", err)
}
Comment on lines +631 to +639
Copy link

Copilot AI Apr 30, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ReadPayloadRange can call io.ReadFull(stream, ...) even when stream is nil (the function explicitly allows stream to be nil). If ParseVarint returns EOF/UnexpectedEOF and stream is nil, this will panic. Add an explicit stream==nil check in this recovery path and return an error indicating truncated/corrupted object data instead of reading from a nil reader.

Copilot uses AI. Check for mistakes.

initial = initial[:pldFldOff+n+extra]

_, n, err = iprotobuf.ParseVarint(initial[pldFldOff:])
if err != nil {
if stream != nil {
stream.Close()
}
return nil, fmt.Errorf("parse payload field len: %w", err)
}
}

initial = initial[pldFldOff+n:]

if stream == nil && uint64(len(initial)) != pldLen {
return nil, fmt.Errorf("diff len of object payload: in header %d, in field tag %d", pldLen, len(initial))
}

type readerCloser struct {
io.Reader
io.Closer
}

// check range is already bufferred

if off == 0 {
if ln == 0 { // full
if stream == nil {
return io.NopCloser(bytes.NewReader(initial)), nil
}
if len(initial) == 0 {
return stream, nil
}
return readerCloser{
Reader: io.MultiReader(bytes.NewReader(initial), stream),
Closer: stream,
}, nil
}

if ln <= uint64(len(initial)) {
if stream != nil {
stream.Close()
}
return io.NopCloser(bytes.NewReader(initial[:ln])), nil
}

// stream is non-nil here according to conditions above

if len(initial) == 0 {
return stream, nil
}

if err := checkTooBigRange(off, ln); err != nil {
stream.Close()
return nil, err
}

return readerCloser{
Reader: io.LimitReader(io.MultiReader(bytes.NewReader(initial), stream), int64(ln)),
Closer: stream,
}, nil
}

if stream == nil {
// range is within slice according to conditions above
return io.NopCloser(bytes.NewReader(initial[off:][:ln])), nil
}

if err := checkTooBigRange(off, ln); err != nil {
stream.Close()
return nil, err
}

if off >= uint64(len(initial)) {
if off > uint64(len(initial)) {
if seeker, ok := stream.(io.Seeker); ok {
_, err = seeker.Seek(int64(off)-int64(len(initial)), io.SeekCurrent)
} else {
_, err = io.CopyN(io.Discard, stream, int64(off)-int64(len(initial)))
}
if err != nil {
stream.Close()
return nil, fmt.Errorf("seek payload stream: %w", err)
}
}
return readerCloser{
Reader: io.LimitReader(stream, int64(ln)),
Closer: stream,
}, nil
}

return readerCloser{
Reader: io.LimitReader(io.MultiReader(bytes.NewReader(initial[off:]), stream), int64(ln)),
Closer: stream,
}, nil
}

// Type is fstree storage type used in logs and configuration.
const Type = "fstree"

Expand Down
41 changes: 34 additions & 7 deletions pkg/local_object_storage/blobstor/fstree/fstree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,34 @@ func TestFSTree_GetRangeStream(t *testing.T) {
testGetRangeStream(t, setupFSTree(t))
}

func TestFSTree_ReadPayloadRange(t *testing.T) {
t.Run("compressed", func(t *testing.T) {
comp := compression.Config{Enabled: true}
require.NoError(t, comp.Init())

fst := setupFSTree(t)
fst.SetCompressor(&comp)

testReadPayloadRange(t, fst)
})

testReadPayloadRange(t, setupFSTree(t))
}

func testGetRangeStream(t *testing.T, fst *FSTree) {
testGetRangeStreamFunc(t, fst, func(fst *FSTree, addr oid.Address, off, ln uint64) (io.ReadCloser, error) {
_, stream, err := fst.GetRangeStream(addr, off, ln)
return stream, err
})
}

func testReadPayloadRange(t *testing.T, fst *FSTree) {
testGetRangeStreamFunc(t, fst, func(fst *FSTree, addr oid.Address, off, ln uint64) (io.ReadCloser, error) {
return fst.ReadPayloadRange(addr, off, ln, make([]byte, 40<<10))
})
}

func testGetRangeStreamFunc(t *testing.T, fst *FSTree, fn func(fst *FSTree, addr oid.Address, off, ln uint64) (io.ReadCloser, error)) {
const pldLen = 1024
pld := testutil.RandByteSlice(pldLen)

Expand All @@ -51,14 +78,14 @@ func testGetRangeStream(t *testing.T, fst *FSTree) {

addr := obj.Address()

_, err := fst.GetRangeStream(addr, 0, 0)
_, err := fn(fst, addr, 0, 0)
require.ErrorIs(t, err, apistatus.ErrObjectNotFound)
_, err = fst.GetRangeStream(addr, 1, pldLen-1)
_, err = fn(fst, addr, 1, pldLen-1)
require.ErrorIs(t, err, apistatus.ErrObjectNotFound)

require.NoError(t, fst.Put(addr, obj.Marshal()))

_, err = fst.GetRangeStream(addr, 1, 0)
_, err = fn(fst, addr, 1, 0)
require.EqualError(t, err, "invalid range off=1,ln=0")

for _, tc := range []struct{ off, ln uint64 }{
Expand All @@ -67,7 +94,7 @@ func testGetRangeStream(t *testing.T, fst *FSTree) {
{off: 1, ln: pldLen - 1},
{off: pldLen - 1, ln: 1},
} {
stream, err := fst.GetRangeStream(addr, tc.off, tc.ln)
stream, err := fn(fst, addr, tc.off, tc.ln)
require.NoError(t, err, tc)

if tc.off == 0 && tc.ln == 0 {
Expand All @@ -84,15 +111,15 @@ func testGetRangeStream(t *testing.T, fst *FSTree) {
{off: 1, ln: pldLen},
{off: pldLen - 1, ln: 2},
} {
_, err := fst.GetRangeStream(addr, tc.off, tc.ln)
_, err := fn(fst, addr, tc.off, tc.ln)
require.ErrorIs(t, err, apistatus.ErrObjectOutOfRange)
}

require.NoError(t, fst.Delete(addr))

_, err = fst.GetRangeStream(addr, 0, 0)
_, err = fn(fst, addr, 0, 0)
require.ErrorIs(t, err, apistatus.ErrObjectNotFound)
_, err = fst.GetRangeStream(addr, 1, pldLen-1)
_, err = fn(fst, addr, 1, pldLen-1)
require.ErrorIs(t, err, apistatus.ErrObjectNotFound)
}

Expand Down
Loading