diff --git a/CHANGELOG.md b/CHANGELOG.md index 9b7702f8ab..84d3954548 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -34,7 +34,8 @@ Changelog for NeoFS Node - `neofs-cli` does not allow searches based on homomorphic hash values (#3847) - `neofs-cli`, `neofs-node` do not allow searches based on homomorphic hash values (#3847) - Storage nodes do not calculate homomorphic hashes for objects (#3847) -- Optimized GET/HEAD request forwarding (#3877) +- Optimized GET/HEAD/RANGE request forwarding (#3877, #3967) +- Optimized local RANGE request execution (#3967) ### Removed - `policer.max_workers` configuration (#3920) diff --git a/internal/object/wire.go b/internal/object/wire.go index 79daa36a71..cae2ce1ff2 100644 --- a/internal/object/wire.go +++ b/internal/object/wire.go @@ -253,3 +253,39 @@ loop: return idf, sigf, hdrf, nil } + +// GetPayloadLengthAndFieldOffset reads payload length header and seeks payload +// field in buf. If payload field is missing, negative offset returns. +// Otherwise, the offset points to protobuf LV. +// +// If any field is missing, no error is returned. +// +// Message should have ascending field order, otherwise error returns. +func GetPayloadLengthAndFieldOffset(buf []byte) (uint64, int, error) { + // TODO: traverse buffer at once + hf, err := iprotobuf.GetLENFieldBounds(buf, protoobject.FieldObjectHeader) + if err != nil { + return 0, 0, fmt.Errorf("seek header field: %w", err) + } + + var pldLen uint64 + if !hf.IsMissing() { + pldLen, err = iprotobuf.GetUint64Field(buf[hf.ValueFrom:hf.To], protoobject.FieldHeaderPayloadLength) + if err != nil { + return 0, 0, fmt.Errorf("seek payload length field in header: %w", err) + } + } + + off, tagLn, typ, err := iprotobuf.SeekFieldByNumber(buf, protoobject.FieldObjectPayload) + if err != nil { + return 0, 0, fmt.Errorf("seek payload field: %w", err) + } + if off < 0 { + return pldLen, off, nil + } + if typ != protowire.BytesType { + return 0, 0, fmt.Errorf("wrong payload field type: expected %d, got %d", protowire.BytesType, typ) + } + + return pldLen, off + tagLn, nil +} diff --git a/internal/protobuf/protoscan/messages.go b/internal/protobuf/protoscan/messages.go index 796cac728a..3d0ff755b3 100644 --- a/internal/protobuf/protoscan/messages.go +++ b/internal/protobuf/protoscan/messages.go @@ -372,4 +372,13 @@ var ( protoobject.FieldGetResponseBodySplitInfo: ObjectSplitInfoScheme, }, } + ObjectGetRangeResponseBodyScheme = MessageScheme{ + Fields: map[protowire.Number]MessageField{ + protoobject.FieldRangeResponseBodyChunk: NewMessageField("chunk", FieldTypeBytes), + protoobject.FieldRangeResponseBodySplitInfo: NewMessageField("split info", FieldTypeNestedMessage), + }, + NestedMessageFields: map[protowire.Number]MessageScheme{ + protoobject.FieldRangeResponseBodySplitInfo: ObjectSplitInfoScheme, + }, + } ) diff --git a/internal/protobuf/seekers.go b/internal/protobuf/seekers.go index 12281d329b..f775a9c21c 100644 --- a/internal/protobuf/seekers.go +++ b/internal/protobuf/seekers.go @@ -76,3 +76,19 @@ func GetLENFieldBounds(buf []byte, num protowire.Number) (FieldBounds, error) { return ParseLENFieldBounds(buf, off, tagLn, num, typ) } + +// GetUint64Field seeks uint64 field in buf by number and parses it. If field is +// missing, no error is returned. +// +// Message should have ascending field order, otherwise error returns. +// +// If there is an error, its text contains num. +func GetUint64Field(buf []byte, num protowire.Number) (uint64, error) { + off, tagLn, typ, err := SeekFieldByNumber(buf, num) + if err != nil { + return 0, err + } + + u, _, err := ParseUint64Field(buf[off+tagLn:], num, typ) + return u, err +} diff --git a/pkg/local_object_storage/blobstor/common/storage.go b/pkg/local_object_storage/blobstor/common/storage.go index ce2564aa8b..b9378e6657 100644 --- a/pkg/local_object_storage/blobstor/common/storage.go +++ b/pkg/local_object_storage/blobstor/common/storage.go @@ -25,11 +25,12 @@ type Storage interface { // binary format. Returns [apistatus.ObjectNotFound] if object is missing. GetBytes(oid.Address) ([]byte, error) Get(oid.Address) (*object.Object, error) - GetRangeStream(addr oid.Address, off uint64, ln uint64) (io.ReadCloser, error) + GetRangeStream(addr oid.Address, off uint64, ln uint64) (uint64, io.ReadCloser, error) GetStream(oid.Address) (*object.Object, io.ReadCloser, error) Head(oid.Address) (*object.Object, error) ReadHeader(oid.Address, []byte) (int, error) ReadObject(oid.Address, []byte) (int, io.ReadCloser, error) + ReadPayloadRange(oid.Address, uint64, uint64, []byte) (io.ReadCloser, error) Exists(oid.Address) (bool, error) Put(oid.Address, []byte) error PutBatch(map[oid.Address][]byte) error diff --git a/pkg/local_object_storage/blobstor/fstree/fstree.go b/pkg/local_object_storage/blobstor/fstree/fstree.go index 6f074e6ed9..d4dd2d0aaf 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree.go @@ -14,6 +14,8 @@ import ( "strings" "time" + objectwire "github.com/nspcc-dev/neofs-node/internal/object" + iprotobuf "github.com/nspcc-dev/neofs-node/internal/protobuf" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/compression" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr" @@ -530,46 +532,208 @@ func (t *FSTree) GetStream(addr oid.Address) (*object.Object, io.ReadCloser, err // // If the range is out of payload bounds, GetRangeStream returns // [apistatus.ErrObjectOutOfRange]. -func (t *FSTree) GetRangeStream(addr oid.Address, off uint64, ln uint64) (io.ReadCloser, error) { - if ln == 0 && off != 0 { - return nil, fmt.Errorf("invalid range off=%d,ln=0", off) +func (t *FSTree) GetRangeStream(addr oid.Address, off uint64, ln uint64) (uint64, io.ReadCloser, error) { + if err := verifyRequestedRange(off, ln); err != nil { + return 0, nil, err } // TODO: we need only one header field. Consider decoding only it + jumping to payload hdr, stream, err := t.getObjectStream(addr) if err != nil { - return nil, err + return 0, nil, err } pldLen := hdr.PayloadSize() if ln == 0 && off == 0 { - return stream, nil + return pldLen, stream, nil } - if off >= pldLen || pldLen-off < ln { + if !checkPayloadBounds(pldLen, off, ln) { stream.Close() - return nil, apistatus.ErrObjectOutOfRange + return 0, nil, apistatus.ErrObjectOutOfRange } - if off > math.MaxInt64 || ln > math.MaxInt64 { // 8 exabytes, amply + if err := checkTooBigRange(off, ln); err != nil { stream.Close() - return nil, fmt.Errorf("range overflowing int64 is not supported by this server: off=%d,len=%d", off, ln) + return 0, nil, err } if off > 0 { if _, err := stream.Seek(int64(off), io.SeekStart); err != nil { stream.Close() - return nil, fmt.Errorf("seek offset in payload stream: %w", err) + return 0, nil, fmt.Errorf("seek offset in payload stream: %w", err) } } - return readerCloser{ + return pldLen, readerCloser{ Reader: io.LimitReader(stream, int64(ln)), Closer: stream, }, nil } +// ReadPayloadRange is [FSTree.ReadObject] analogue for payload range reading. +// Zero range means full payload. Returns full payload range length. +// +// If given range is out of payload bounds, ReadPayloadRange returns +// [apistatus.ErrObjectOutOfRange]. +func (t *FSTree) ReadPayloadRange(addr oid.Address, off, ln uint64, hdrBuf []byte) (io.ReadCloser, error) { + if err := verifyRequestedRange(off, ln); err != nil { + return nil, err + } + + initial, stream, err := t._readObject(addr, hdrBuf) + if err != nil { + return nil, err + } + + // stream can be nil + pldLen, pldFldOff, err := objectwire.GetPayloadLengthAndFieldOffset(initial) + if err != nil { + if stream != nil { + stream.Close() + } + return nil, fmt.Errorf("get payload length and field in read header: %w", err) + } + + if ln != 0 && !checkPayloadBounds(pldLen, off, ln) { + if stream != nil { + stream.Close() + } + return nil, apistatus.ErrObjectOutOfRange + } + + if pldFldOff < 0 { + if pldLen != 0 { + return nil, fmt.Errorf("missing payload field tag in %d bytes header, payload len in header = %d", len(initial), pldLen) + } + return nopReadCloser{}, nil + } + + _, n, err := iprotobuf.ParseVarint(initial[pldFldOff:]) + if err != nil { + if !errors.Is(err, io.ErrUnexpectedEOF) && !errors.Is(err, io.EOF) { + if stream != nil { + stream.Close() + } + return nil, fmt.Errorf("parse payload field len: %w", err) + } + + if pldFldOff+binary.MaxVarintLen64 > len(initial) { + if pldFldOff >= binary.MaxVarintLen64 { + n = copy(initial[pldFldOff-binary.MaxVarintLen64:], initial[pldFldOff:]) + pldFldOff -= binary.MaxVarintLen64 + } else { + initial = make([]byte, binary.MaxVarintLen64) + n = copy(initial, initial[pldFldOff:]) + pldFldOff = 0 + } + } + + extra, err := io.ReadFull(stream, initial[pldFldOff+n:]) + if err != nil && !errors.Is(err, io.ErrUnexpectedEOF) && !errors.Is(err, io.EOF) { + if stream != nil { + stream.Close() + } + return nil, fmt.Errorf("read stream: %w", err) + } + + initial = initial[:pldFldOff+n+extra] + + _, n, err = iprotobuf.ParseVarint(initial[pldFldOff:]) + if err != nil { + if stream != nil { + stream.Close() + } + return nil, fmt.Errorf("parse payload field len: %w", err) + } + } + + initial = initial[pldFldOff+n:] + + if stream == nil && uint64(len(initial)) != pldLen { + return nil, fmt.Errorf("diff len of object payload: in header %d, in field tag %d", pldLen, len(initial)) + } + + type readerCloser struct { + io.Reader + io.Closer + } + + // check range is already bufferred + + if off == 0 { + if ln == 0 { // full + if stream == nil { + return io.NopCloser(bytes.NewReader(initial)), nil + } + if len(initial) == 0 { + return stream, nil + } + return readerCloser{ + Reader: io.MultiReader(bytes.NewReader(initial), stream), + Closer: stream, + }, nil + } + + if ln <= uint64(len(initial)) { + if stream != nil { + stream.Close() + } + return io.NopCloser(bytes.NewReader(initial[:ln])), nil + } + + // stream is non-nil here according to conditions above + + if len(initial) == 0 { + return stream, nil + } + + if err := checkTooBigRange(off, ln); err != nil { + stream.Close() + return nil, err + } + + return readerCloser{ + Reader: io.LimitReader(io.MultiReader(bytes.NewReader(initial), stream), int64(ln)), + Closer: stream, + }, nil + } + + if stream == nil { + // range is within slice according to conditions above + return io.NopCloser(bytes.NewReader(initial[off:][:ln])), nil + } + + if err := checkTooBigRange(off, ln); err != nil { + stream.Close() + return nil, err + } + + if off >= uint64(len(initial)) { + if off > uint64(len(initial)) { + if seeker, ok := stream.(io.Seeker); ok { + _, err = seeker.Seek(int64(off)-int64(len(initial)), io.SeekCurrent) + } else { + _, err = io.CopyN(io.Discard, stream, int64(off)-int64(len(initial))) + } + if err != nil { + stream.Close() + return nil, fmt.Errorf("seek payload stream: %w", err) + } + } + return readerCloser{ + Reader: io.LimitReader(stream, int64(ln)), + Closer: stream, + }, nil + } + + return readerCloser{ + Reader: io.LimitReader(io.MultiReader(bytes.NewReader(initial[off:]), stream), int64(ln)), + Closer: stream, + }, nil +} + // Type is fstree storage type used in logs and configuration. const Type = "fstree" diff --git a/pkg/local_object_storage/blobstor/fstree/fstree_test.go b/pkg/local_object_storage/blobstor/fstree/fstree_test.go index 6053a5d64f..d9a6970a02 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree_test.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree_test.go @@ -41,7 +41,34 @@ func TestFSTree_GetRangeStream(t *testing.T) { testGetRangeStream(t, setupFSTree(t)) } +func TestFSTree_ReadPayloadRange(t *testing.T) { + t.Run("compressed", func(t *testing.T) { + comp := compression.Config{Enabled: true} + require.NoError(t, comp.Init()) + + fst := setupFSTree(t) + fst.SetCompressor(&comp) + + testReadPayloadRange(t, fst) + }) + + testReadPayloadRange(t, setupFSTree(t)) +} + func testGetRangeStream(t *testing.T, fst *FSTree) { + testGetRangeStreamFunc(t, fst, func(fst *FSTree, addr oid.Address, off, ln uint64) (io.ReadCloser, error) { + _, stream, err := fst.GetRangeStream(addr, off, ln) + return stream, err + }) +} + +func testReadPayloadRange(t *testing.T, fst *FSTree) { + testGetRangeStreamFunc(t, fst, func(fst *FSTree, addr oid.Address, off, ln uint64) (io.ReadCloser, error) { + return fst.ReadPayloadRange(addr, off, ln, make([]byte, 40<<10)) + }) +} + +func testGetRangeStreamFunc(t *testing.T, fst *FSTree, fn func(fst *FSTree, addr oid.Address, off, ln uint64) (io.ReadCloser, error)) { const pldLen = 1024 pld := testutil.RandByteSlice(pldLen) @@ -51,14 +78,14 @@ func testGetRangeStream(t *testing.T, fst *FSTree) { addr := obj.Address() - _, err := fst.GetRangeStream(addr, 0, 0) + _, err := fn(fst, addr, 0, 0) require.ErrorIs(t, err, apistatus.ErrObjectNotFound) - _, err = fst.GetRangeStream(addr, 1, pldLen-1) + _, err = fn(fst, addr, 1, pldLen-1) require.ErrorIs(t, err, apistatus.ErrObjectNotFound) require.NoError(t, fst.Put(addr, obj.Marshal())) - _, err = fst.GetRangeStream(addr, 1, 0) + _, err = fn(fst, addr, 1, 0) require.EqualError(t, err, "invalid range off=1,ln=0") for _, tc := range []struct{ off, ln uint64 }{ @@ -67,7 +94,7 @@ func testGetRangeStream(t *testing.T, fst *FSTree) { {off: 1, ln: pldLen - 1}, {off: pldLen - 1, ln: 1}, } { - stream, err := fst.GetRangeStream(addr, tc.off, tc.ln) + stream, err := fn(fst, addr, tc.off, tc.ln) require.NoError(t, err, tc) if tc.off == 0 && tc.ln == 0 { @@ -84,15 +111,15 @@ func testGetRangeStream(t *testing.T, fst *FSTree) { {off: 1, ln: pldLen}, {off: pldLen - 1, ln: 2}, } { - _, err := fst.GetRangeStream(addr, tc.off, tc.ln) + _, err := fn(fst, addr, tc.off, tc.ln) require.ErrorIs(t, err, apistatus.ErrObjectOutOfRange) } require.NoError(t, fst.Delete(addr)) - _, err = fst.GetRangeStream(addr, 0, 0) + _, err = fn(fst, addr, 0, 0) require.ErrorIs(t, err, apistatus.ErrObjectNotFound) - _, err = fst.GetRangeStream(addr, 1, pldLen-1) + _, err = fn(fst, addr, 1, pldLen-1) require.ErrorIs(t, err, apistatus.ErrObjectNotFound) } diff --git a/pkg/local_object_storage/blobstor/fstree/head.go b/pkg/local_object_storage/blobstor/fstree/head.go index b648fce79e..1ec7b215e1 100644 --- a/pkg/local_object_storage/blobstor/fstree/head.go +++ b/pkg/local_object_storage/blobstor/fstree/head.go @@ -46,18 +46,9 @@ func (t *FSTree) ReadHeader(addr oid.Address, buf []byte) (int, error) { return n, nil } -// ReadObject reads first bytes of the referenced object's binary containing its -// full header from t into buf. Returns number of bytes read and stream of -// remaining bytes. The stream must be finally closed by the caller. -// -// Read part may include payload prefix. -// -// If object is missing, ReadObject returns [apistatus.ErrObjectNotFound]. -// -// Passed buf must have 2*[objectwire.NonPayloadFieldsBufferLength] bytes len at least. -func (t *FSTree) ReadObject(addr oid.Address, buf []byte) (int, io.ReadCloser, error) { +func (t *FSTree) _readObject(addr oid.Address, buf []byte) ([]byte, io.ReadCloser, error) { if len(buf) < 2*objectwire.NonPayloadFieldsBufferLength { - return 0, nil, fmt.Errorf("too short buffer %d bytes", len(buf)) + return nil, nil, fmt.Errorf("too short buffer %d bytes", len(buf)) } p := t.treePath(addr) @@ -65,18 +56,31 @@ func (t *FSTree) ReadObject(addr oid.Address, buf []byte) (int, io.ReadCloser, e f, err := os.Open(p) if err != nil { if errors.Is(err, fs.ErrNotExist) { - return 0, nil, logicerr.Wrap(apistatus.ErrObjectNotFound) + return nil, nil, logicerr.Wrap(apistatus.ErrObjectNotFound) } - return 0, nil, fmt.Errorf("read file %q: %w", p, err) + return nil, nil, fmt.Errorf("read file %q: %w", p, err) } initial, stream, err := t.readHeader(addr.Object(), f, buf) if err != nil { stream.Close() - return 0, nil, err + return nil, nil, err } - initial, stream, err = t.preprocessStreamHead(stream, initial) + return t.preprocessStreamHead(stream, initial) +} + +// ReadObject reads first bytes of the referenced object's binary containing its +// full header from t into buf. Returns number of bytes read and stream of +// remaining bytes. The stream must be finally closed by the caller. +// +// Read part may include payload prefix. +// +// If object is missing, ReadObject returns [apistatus.ErrObjectNotFound]. +// +// Passed buf must have 2*[objectwire.NonPayloadFieldsBufferLength] bytes len at least. +func (t *FSTree) ReadObject(addr oid.Address, buf []byte) (int, io.ReadCloser, error) { + initial, stream, err := t._readObject(addr, buf) if err != nil { return 0, nil, err } diff --git a/pkg/local_object_storage/blobstor/fstree/range_bench_test.go b/pkg/local_object_storage/blobstor/fstree/range_bench_test.go index 33a6ac98ac..e3fbce4d35 100644 --- a/pkg/local_object_storage/blobstor/fstree/range_bench_test.go +++ b/pkg/local_object_storage/blobstor/fstree/range_bench_test.go @@ -48,7 +48,7 @@ func BenchmarkFSTree_GetRangeStream(b *testing.B) { require.NoError(b, fsTree.Put(addr, obj.Marshal())) for b.Loop() { - stream, err := fsTree.GetRangeStream(addr, tc.from, tc.length) + _, stream, err := fsTree.GetRangeStream(addr, tc.from, tc.length) if err == nil { _, err = io.ReadFull(stream, buf) } @@ -64,7 +64,7 @@ func BenchmarkFSTree_GetRangeStream(b *testing.B) { require.NoError(b, fsTree.Put(addr, obj.Marshal())) for b.Loop() { - stream, err := fsTree.GetRangeStream(addr, tc.from, tc.length) + _, stream, err := fsTree.GetRangeStream(addr, tc.from, tc.length) if err == nil { _, err = io.ReadFull(stream, buf) } @@ -80,7 +80,7 @@ func BenchmarkFSTree_GetRangeStream(b *testing.B) { b.ResetTimer() for k := range b.N { - stream, err := fsTree.GetRangeStream(addrs[k%len(addrs)], tc.from, tc.length) + _, stream, err := fsTree.GetRangeStream(addrs[k%len(addrs)], tc.from, tc.length) if err == nil { _, err = io.ReadFull(stream, buf) } diff --git a/pkg/local_object_storage/blobstor/fstree/util.go b/pkg/local_object_storage/blobstor/fstree/util.go index ed3613a871..c2c8f8a7d7 100644 --- a/pkg/local_object_storage/blobstor/fstree/util.go +++ b/pkg/local_object_storage/blobstor/fstree/util.go @@ -1,7 +1,9 @@ package fstree import ( + "fmt" "io" + "math" ) type readerCloser struct { @@ -23,3 +25,21 @@ type nopReadCloser struct{} func (nopReadCloser) Read([]byte) (int, error) { return 0, io.EOF } func (nopReadCloser) Close() error { return nil } + +func verifyRequestedRange(off, ln uint64) error { + if ln == 0 && off != 0 { + return fmt.Errorf("invalid range off=%d,ln=0", off) + } + return nil +} + +func checkPayloadBounds(pldLen, off, ln uint64) bool { + return off < pldLen && pldLen-off >= ln +} + +func checkTooBigRange(off, ln uint64) error { + if off > math.MaxInt64 || ln > math.MaxInt64 { // 8 exabytes, amply + return fmt.Errorf("range overflowing int64 is not supported by this server: off=%d,len=%d", off, ln) + } + return nil +} diff --git a/pkg/local_object_storage/blobstor/internal/storagetest/delete.go b/pkg/local_object_storage/blobstor/internal/storagetest/delete.go index 49c1f8982a..0e55a21d07 100644 --- a/pkg/local_object_storage/blobstor/internal/storagetest/delete.go +++ b/pkg/local_object_storage/blobstor/internal/storagetest/delete.go @@ -35,7 +35,7 @@ func TestDelete(t *testing.T, cons Constructor, minSize, maxSize uint64) { require.ErrorAs(t, err, new(apistatus.ObjectNotFound)) }) t.Run("getrange fail", func(t *testing.T) { - _, err := s.GetRangeStream(oidtest.Address(), 0, 1) + _, _, err := s.GetRangeStream(oidtest.Address(), 0, 1) require.ErrorAs(t, err, new(apistatus.ObjectNotFound)) }) }) diff --git a/pkg/local_object_storage/blobstor/internal/storagetest/get_range.go b/pkg/local_object_storage/blobstor/internal/storagetest/get_range.go index 365117646b..51689875e5 100644 --- a/pkg/local_object_storage/blobstor/internal/storagetest/get_range.go +++ b/pkg/local_object_storage/blobstor/internal/storagetest/get_range.go @@ -18,7 +18,7 @@ func TestGetRangeStream(t *testing.T, cons Constructor, minSize, maxSize uint64) t.Cleanup(func() { require.NoError(t, s.Close()) }) t.Run("missing object", func(t *testing.T) { - _, err := s.GetRangeStream(oidtest.Address(), 0, 1) + _, _, err := s.GetRangeStream(oidtest.Address(), 0, 1) require.ErrorAs(t, err, new(apistatus.ObjectNotFound)) }) @@ -31,35 +31,37 @@ func TestGetRangeStream(t *testing.T, cons Constructor, minSize, maxSize uint64) } t.Run("regular", func(t *testing.T) { - stream, err := s.GetRangeStream(objects[0].addr, start, stop-start) + pldLen, stream, err := s.GetRangeStream(objects[0].addr, start, stop-start) require.NoError(t, err) + require.EqualValues(t, objects[0].obj.PayloadSize(), pldLen) require.NoError(t, iotest.TestReader(stream, payload[start:stop])) }) t.Run("offset > len(payload)", func(t *testing.T) { - _, err := s.GetRangeStream(objects[0].addr, uint64(len(payload)+10), 10) + _, _, err := s.GetRangeStream(objects[0].addr, uint64(len(payload)+10), 10) require.ErrorAs(t, err, new(apistatus.ObjectOutOfRange)) }) t.Run("offset + length > len(payload)", func(t *testing.T) { - _, err := s.GetRangeStream(objects[0].addr, 10, uint64(len(payload))) + _, _, err := s.GetRangeStream(objects[0].addr, 10, uint64(len(payload))) require.ErrorAs(t, err, new(apistatus.ObjectOutOfRange)) }) t.Run("length is negative when converted to int64", func(t *testing.T) { - _, err := s.GetRangeStream(objects[0].addr, 0, 1<<63) + _, _, err := s.GetRangeStream(objects[0].addr, 0, 1<<63) require.ErrorAs(t, err, new(apistatus.ObjectOutOfRange)) }) t.Run("offset + length overflow uint64", func(t *testing.T) { - _, err := s.GetRangeStream(objects[0].addr, 10, math.MaxUint64-2) + _, _, err := s.GetRangeStream(objects[0].addr, 10, math.MaxUint64-2) require.ErrorAs(t, err, new(apistatus.ObjectOutOfRange)) }) t.Run("zero range", func(t *testing.T) { for i := range objects { - stream, err := s.GetRangeStream(objects[i].addr, 0, 0) + pldLen, stream, err := s.GetRangeStream(objects[i].addr, 0, 0) require.NoError(t, err) + require.EqualValues(t, objects[i].obj.PayloadSize(), pldLen) pld := objects[i].obj.Payload() require.NoError(t, iotest.TestReader(stream, pld)) require.NoError(t, stream.Close()) diff --git a/pkg/local_object_storage/engine/ec.go b/pkg/local_object_storage/engine/ec.go index a03d66aa70..d870771091 100644 --- a/pkg/local_object_storage/engine/ec.go +++ b/pkg/local_object_storage/engine/ec.go @@ -2,9 +2,8 @@ package engine import ( "errors" - "fmt" "io" - "math" + "time" iec "github.com/nspcc-dev/neofs-node/internal/ec" ierrors "github.com/nspcc-dev/neofs-node/internal/errors" @@ -144,6 +143,21 @@ loop: return apistatus.ErrObjectNotFound } +// ReadECPartRange is a buffered alternative for [StorageEngine.GetECPartRange] +// similar to [StorageEngine.ReadECPart]. +func (e *StorageEngine) ReadECPartRange(cnr cid.ID, parent oid.ID, pi iec.PartInfo, off, ln uint64, buf []byte) (io.ReadCloser, error) { + var stream io.ReadCloser + return stream, e.getECPartRangeFunc(cnr, parent, pi, off, ln, MetricRegister.AddReadECPartRangeDuration, func(s shardInterface, cnr cid.ID, parent oid.ID, pi iec.PartInfo, off, ln uint64) error { + var err error + stream, err = s.ReadECPartRange(cnr, parent, pi, off, ln, buf) + return err + }, func(s shardInterface, cnr cid.ID, partID oid.ID, off, ln uint64) error { + var err error + stream, err = s.ReadRange(cnr, partID, off, ln, buf) + return err + }) +} + // GetECPartRange looks up for object that carries EC part produced within cnr // for parent object and indexed by pi in the underlying shards, checks its // availability and, if available, reads it. Returns full payload len. If zero, @@ -170,18 +184,31 @@ loop: // // Range bounds are limited by int64. func (e *StorageEngine) GetECPartRange(cnr cid.ID, parent oid.ID, pi iec.PartInfo, off, ln uint64) (uint64, io.ReadCloser, error) { - if off > math.MaxInt64 || ln > math.MaxInt64 { // 8 exabytes, amply - return 0, nil, fmt.Errorf("range overflowing int64 is not supported by this server: off=%d,len=%d", off, ln) - } + var pldLen uint64 + var stream io.ReadCloser + return pldLen, stream, e.getECPartRangeFunc(cnr, parent, pi, off, ln, MetricRegister.AddGetECPartRangeDuration, func(s shardInterface, cnr cid.ID, parent oid.ID, pi iec.PartInfo, off, ln uint64) error { + var err error + pldLen, stream, err = s.GetECPartRange(cnr, parent, pi, off, ln) + return err + }, func(s shardInterface, cnr cid.ID, partID oid.ID, off, ln uint64) error { + var err error + pldLen, stream, err = s.GetRangeStream(cnr, partID, off, ln) + return err + }) +} +func (e *StorageEngine) getECPartRangeFunc(cnr cid.ID, parent oid.ID, pi iec.PartInfo, off, ln uint64, metricFn func(MetricRegister, time.Duration), + resolveFn func(shardInterface, cid.ID, oid.ID, iec.PartInfo, uint64, uint64) error, + rangeFn func(shardInterface, cid.ID, oid.ID, uint64, uint64) error, +) error { if e.metrics != nil { - defer elapsed(e.metrics.AddGetECPartRangeDuration)() + defer elapsed(func(d time.Duration) { metricFn(e.metrics, d) })() } e.blockMtx.RLock() defer e.blockMtx.RUnlock() if e.blockErr != nil { - return 0, nil, e.blockErr + return e.blockErr } s := e.sortShardsFn(e, parent) @@ -189,14 +216,14 @@ func (e *StorageEngine) GetECPartRange(cnr cid.ID, parent oid.ID, pi iec.PartInf var partID oid.ID loop: for i := range s { - pldLen, rc, err := s[i].shardIface.GetECPartRange(cnr, parent, pi, int64(off), int64(ln)) + err := resolveFn(s[i].shardIface, cnr, parent, pi, off, ln) switch { case err == nil: - return pldLen, rc, nil + return nil case errors.Is(err, apistatus.ErrObjectAlreadyRemoved), errors.Is(err, apistatus.ErrObjectOutOfRange), errors.As(err, new(*object.SplitInfoError)): - return 0, nil, err + return err case errors.Is(err, meta.ErrObjectIsExpired): - return 0, nil, apistatus.ErrObjectNotFound + return apistatus.ErrObjectNotFound case errors.As(err, (*ierrors.ObjectID)(&partID)): if partID.IsZero() { panic("zero object ID returned as error") @@ -218,17 +245,17 @@ loop: } if partID.IsZero() { - return 0, nil, apistatus.ErrObjectNotFound + return apistatus.ErrObjectNotFound } for i := range s { // get an object bypassing the metabase. We can miss deletion or expiration mark. GetECPart behaves like this, so here too. - pldLen, rc, err := s[i].shardIface.GetRangeStream(cnr, partID, int64(off), int64(ln)) + err := rangeFn(s[i].shardIface, cnr, partID, off, ln) switch { case err == nil: - return pldLen, rc, nil + return nil case errors.Is(err, apistatus.ErrObjectOutOfRange): - return 0, nil, err + return err case errors.Is(err, apistatus.ErrObjectNotFound): default: e.log.Info("failed to RANGE EC part in shard bypassing metabase, ignore error", @@ -238,7 +265,7 @@ loop: } } - return 0, nil, apistatus.ErrObjectNotFound + return apistatus.ErrObjectNotFound } // ReadECPartHeader is a buffered alternative for [StorageEngine.HeadECPart] diff --git a/pkg/local_object_storage/engine/ec_test.go b/pkg/local_object_storage/engine/ec_test.go index 4a475c9a9a..22e575cfb6 100644 --- a/pkg/local_object_storage/engine/ec_test.go +++ b/pkg/local_object_storage/engine/ec_test.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "io" - "math" "slices" "strconv" "testing" @@ -525,15 +524,6 @@ func TestStorageEngine_GetECPartRange(t *testing.T) { Index: 456, } - t.Run("too big bounds", func(t *testing.T) { - s := newEngineWithFixedShardOrder([]shardInterface{unimplementedShard{}}) // to ensure shards are not accessed - - _, _, err := s.GetECPartRange(cnr, parentID, pi, math.MaxInt64+1, 1) - require.EqualError(t, err, "range overflowing int64 is not supported by this server: off=9223372036854775808,len=1") - _, _, err = s.GetECPartRange(cnr, parentID, pi, 1, math.MaxInt64+1) - require.EqualError(t, err, "range overflowing int64 is not supported by this server: off=1,len=9223372036854775808") - }) - t.Run("blocked", func(t *testing.T) { s := newEngineWithFixedShardOrder([]shardInterface{unimplementedShard{}}) // to ensure shards are not accessed @@ -556,7 +546,7 @@ func TestStorageEngine_GetECPartRange(t *testing.T) { partID := partObj.GetID() - partShardKey := getECPartRangeKey{cnr: cnr, parent: parentID, pi: pi, off: int64(off), ln: int64(ln)} + partShardKey := getECPartRangeKey{cnr: cnr, parent: parentID, pi: pi, off: off, ln: ln} shardOK := &mockShard{ getECPartRange: map[getECPartRangeKey]getECPartRangeValue{ @@ -620,7 +610,7 @@ func TestStorageEngine_GetECPartRange(t *testing.T) { partShardKey: {err: apistatus.ErrObjectNotFound}, }, getRangeStream: map[getRangeStreamKey]getRangeStreamValue{ - {cnr: cnr, id: partID, off: int64(off), ln: int64(ln)}: {err: apistatus.ErrObjectNotFound}, + {cnr: cnr, id: partID, off: off, ln: ln}: {err: apistatus.ErrObjectNotFound}, }, } @@ -787,7 +777,7 @@ func TestStorageEngine_GetECPartRange(t *testing.T) { }, }, &mockShard{ getRangeStream: map[getRangeStreamKey]getRangeStreamValue{ - {cnr: cnr, id: partID, off: int64(off), ln: int64(ln)}: {obj: partObj}, + {cnr: cnr, id: partID, off: off, ln: ln}: {obj: partObj}, }, }}) s.log = l @@ -848,7 +838,7 @@ func TestStorageEngine_GetECPartRange(t *testing.T) { }, &mockShard{ i: 2, getRangeStream: map[getRangeStreamKey]getRangeStreamValue{ - {cnr: cnr, id: partID, off: int64(off), ln: int64(ln)}: {err: errors.New("some shard error")}, + {cnr: cnr, id: partID, off: off, ln: ln}: {err: errors.New("some shard error")}, }, }}) s.log = l diff --git a/pkg/local_object_storage/engine/engine.go b/pkg/local_object_storage/engine/engine.go index adab654e03..ac1b67da8f 100644 --- a/pkg/local_object_storage/engine/engine.go +++ b/pkg/local_object_storage/engine/engine.go @@ -45,10 +45,12 @@ type shardInterface interface { ID() common.ID GetStream(oid.Address, bool) (*object.Object, io.ReadCloser, error) ReadObject(oid.Address, bool, []byte) (int, io.ReadCloser, error) - GetRangeStream(cnr cid.ID, id oid.ID, off, ln int64) (uint64, io.ReadCloser, error) + GetRangeStream(cnr cid.ID, id oid.ID, off, ln uint64) (uint64, io.ReadCloser, error) + ReadRange(cnr cid.ID, id oid.ID, off, ln uint64, buf []byte) (io.ReadCloser, error) GetECPart(cid.ID, oid.ID, iec.PartInfo) (object.Object, io.ReadCloser, error) ReadECPart(cid.ID, oid.ID, iec.PartInfo, []byte) (int, io.ReadCloser, error) - GetECPartRange(cnr cid.ID, parent oid.ID, pi iec.PartInfo, off, ln int64) (uint64, io.ReadCloser, error) + GetECPartRange(cnr cid.ID, parent oid.ID, pi iec.PartInfo, off, ln uint64) (uint64, io.ReadCloser, error) + ReadECPartRange(cid.ID, oid.ID, iec.PartInfo, uint64, uint64, []byte) (io.ReadCloser, error) Head(oid.Address, bool) (*object.Object, error) ReadHeader(oid.Address, bool, []byte) (int, error) HeadECPart(cid.ID, oid.ID, iec.PartInfo) (object.Object, error) diff --git a/pkg/local_object_storage/engine/engine_test.go b/pkg/local_object_storage/engine/engine_test.go index fbf8945a0a..e9c7f18493 100644 --- a/pkg/local_object_storage/engine/engine_test.go +++ b/pkg/local_object_storage/engine/engine_test.go @@ -205,7 +205,7 @@ func (unimplementedShard) ReadObject(oid.Address, bool, []byte) (int, io.ReadClo panic("unimplemented") } -func (unimplementedShard) GetRangeStream(cid.ID, oid.ID, int64, int64) (uint64, io.ReadCloser, error) { +func (unimplementedShard) GetRangeStream(cid.ID, oid.ID, uint64, uint64) (uint64, io.ReadCloser, error) { panic("unimplemented") } @@ -217,7 +217,7 @@ func (unimplementedShard) ReadECPart(cid.ID, oid.ID, iec.PartInfo, []byte) (int, panic("unimplemented") } -func (unimplementedShard) GetECPartRange(cid.ID, oid.ID, iec.PartInfo, int64, int64) (uint64, io.ReadCloser, error) { +func (unimplementedShard) GetECPartRange(cid.ID, oid.ID, iec.PartInfo, uint64, uint64) (uint64, io.ReadCloser, error) { panic("unimplemented") } @@ -237,6 +237,14 @@ func (unimplementedShard) ReadECPartHeader(cid.ID, oid.ID, iec.PartInfo, []byte) panic("unimplemented") } +func (unimplementedShard) ReadRange(cid.ID, oid.ID, uint64, uint64, []byte) (io.ReadCloser, error) { + panic("unimplemented") +} + +func (unimplementedShard) ReadECPartRange(cid.ID, oid.ID, iec.PartInfo, uint64, uint64, []byte) (io.ReadCloser, error) { + panic("unimplemented") +} + type getECPartKey struct { cnr cid.ID parent oid.ID @@ -251,8 +259,8 @@ type getECPartValue struct { type getRangeStreamKey struct { cnr cid.ID id oid.ID - off int64 - ln int64 + off uint64 + ln uint64 } type getRangeStreamValue struct { @@ -264,8 +272,8 @@ type getECPartRangeKey struct { cnr cid.ID parent oid.ID pi iec.PartInfo - off int64 - ln int64 + off uint64 + ln uint64 } type getECPartRangeValue struct { @@ -298,6 +306,7 @@ type headECPartKey = getECPartKey type headECPartValue = headValue type mockShard struct { + unimplementedShard i int eCPartSleep time.Duration getECPart map[getECPartKey]getECPartValue @@ -340,7 +349,7 @@ func (x *mockShard) ReadObject(addr oid.Address, skipMeta bool, buf []byte) (int return copy(buf, val.obj.CutPayload().Marshal()), io.NopCloser(bytes.NewReader(payloadFld)), val.err } -func (x *mockShard) GetRangeStream(cnr cid.ID, id oid.ID, off, ln int64) (uint64, io.ReadCloser, error) { +func (x *mockShard) GetRangeStream(cnr cid.ID, id oid.ID, off, ln uint64) (uint64, io.ReadCloser, error) { val, ok := x.getRangeStream[getRangeStreamKey{ cnr: cnr, id: id, @@ -394,7 +403,7 @@ func (x *mockShard) ReadECPart(cnr cid.ID, parent oid.ID, pi iec.PartInfo, buf [ return copy(buf, val.obj.CutPayload().Marshal()), io.NopCloser(bytes.NewReader(payloadFld)), val.err } -func (x *mockShard) GetECPartRange(cnr cid.ID, parent oid.ID, pi iec.PartInfo, off, ln int64) (uint64, io.ReadCloser, error) { +func (x *mockShard) GetECPartRange(cnr cid.ID, parent oid.ID, pi iec.PartInfo, off, ln uint64) (uint64, io.ReadCloser, error) { time.Sleep(x.eCPartSleep) val, ok := x.getECPartRange[getECPartRangeKey{ cnr: cnr, @@ -554,6 +563,14 @@ func (x unimplementedMetrics) AddHeadECPartDuration(time.Duration) { panic("unimplemented") } +func (x unimplementedMetrics) AddReadPayloadRangeDuration(time.Duration) { + panic("unimplemented") +} + +func (x unimplementedMetrics) AddReadECPartRangeDuration(time.Duration) { + panic("unimplemented") +} + func (x unimplementedMetrics) SetObjectCounter(string, string, uint64) { panic("unimplemented") } diff --git a/pkg/local_object_storage/engine/get.go b/pkg/local_object_storage/engine/get.go index 82af1bb5ff..f8c76d8008 100644 --- a/pkg/local_object_storage/engine/get.go +++ b/pkg/local_object_storage/engine/get.go @@ -279,3 +279,29 @@ func (e *StorageEngine) ReadObject(addr oid.Address, buf []byte) (int, io.ReadCl return err }) } + +// ReadPayloadRange is [StorageEngine.ReadObject] analogue for payload range +// reading. Zero range means full payload. +// +// If given range is out of payload bounds, ReadPayloadRange returns +// [apistatus.ErrObjectOutOfRange]. +func (e *StorageEngine) ReadPayloadRange(addr oid.Address, off, ln uint64, hdrBuf []byte) (io.ReadCloser, error) { + if e.metrics != nil { + defer elapsed(e.metrics.AddReadPayloadRangeDuration)() + } + + e.blockMtx.RLock() + defer e.blockMtx.RUnlock() + + if e.blockErr != nil { + return nil, e.blockErr + } + + var stream io.ReadCloser + + return stream, e.get(addr, func(s *shard.Shard, ignoreMetadata bool) error { + var err error + stream, err = s.ReadPayloadRange(addr, off, ln, ignoreMetadata, hdrBuf) + return err + }) +} diff --git a/pkg/local_object_storage/engine/metrics.go b/pkg/local_object_storage/engine/metrics.go index 456aed08a3..d853f87b09 100644 --- a/pkg/local_object_storage/engine/metrics.go +++ b/pkg/local_object_storage/engine/metrics.go @@ -14,6 +14,7 @@ type MetricRegister interface { AddHeadDuration(d time.Duration) AddReadHeaderDuration(d time.Duration) AddReadObjectDuration(d time.Duration) + AddReadPayloadRangeDuration(d time.Duration) AddGetStreamDuration(d time.Duration) AddGetRangeStreamDuration(d time.Duration) AddInhumeDuration(d time.Duration) @@ -26,6 +27,7 @@ type MetricRegister interface { AddGetECPartRangeDuration(d time.Duration) AddHeadECPartDuration(d time.Duration) AddReadECPartHeaderDuration(d time.Duration) + AddReadECPartRangeDuration(d time.Duration) SetObjectCounter(shardID, objectType string, v uint64) AddToObjectCounter(shardID, objectType string, delta int) diff --git a/pkg/local_object_storage/shard/ec.go b/pkg/local_object_storage/shard/ec.go index 6047de6bc5..c9f43bf991 100644 --- a/pkg/local_object_storage/shard/ec.go +++ b/pkg/local_object_storage/shard/ec.go @@ -108,6 +108,21 @@ func (s *Shard) getECPartFunc(cnr cid.ID, parent oid.ID, pi iec.PartInfo, writeC return nil } +// ReadECPartRange is a buffered alternative for [Shard.GetECPartRange] +// similar to [Shard.ReadHeader]. +func (s *Shard) ReadECPartRange(cnr cid.ID, parent oid.ID, pi iec.PartInfo, off, ln uint64, buf []byte) (io.ReadCloser, error) { + var stream io.ReadCloser + return stream, s.getECPartRangeFunc(cnr, parent, pi, off, ln, func(writeCache writecache.Cache, addr oid.Address, off, ln uint64) error { + var err error + stream, err = writeCache.ReadPayloadRange(addr, off, ln, buf) + return err + }, func(blobStorage common.Storage, addr oid.Address, off, ln uint64) error { + var err error + stream, err = blobStorage.ReadPayloadRange(addr, off, ln, buf) + return err + }) +} + // GetECPartRange looks up for object that carries EC part produced within cnr // for parent object and indexed by pi in the underlying metabase, checks its // availability and reads it from the underlying BLOB storage. Returns full @@ -134,11 +149,28 @@ func (s *Shard) getECPartFunc(cnr cid.ID, parent oid.ID, pi iec.PartInfo, writeC // // If the range is out of payload bounds, GetECPartRange returns // [apistatus.ErrObjectOutOfRange]. -func (s *Shard) GetECPartRange(cnr cid.ID, parent oid.ID, pi iec.PartInfo, off, ln int64) (uint64, io.ReadCloser, error) { +func (s *Shard) GetECPartRange(cnr cid.ID, parent oid.ID, pi iec.PartInfo, off, ln uint64) (uint64, io.ReadCloser, error) { + var pldLen uint64 + var stream io.ReadCloser + return pldLen, stream, s.getECPartRangeFunc(cnr, parent, pi, off, ln, func(writeCache writecache.Cache, addr oid.Address, off, ln uint64) error { + var err error + pldLen, stream, err = writeCache.GetRangeStream(addr, off, ln) + return err + }, func(blobStorage common.Storage, addr oid.Address, off, ln uint64) error { + var err error + pldLen, stream, err = blobStorage.GetRangeStream(addr, off, ln) + return err + }) +} + +func (s *Shard) getECPartRangeFunc(cnr cid.ID, parent oid.ID, pi iec.PartInfo, off, ln uint64, + writeCacheFn func(writecache.Cache, oid.Address, uint64, uint64) error, + blobStorageFn func(common.Storage, oid.Address, uint64, uint64) error, +) error { partID, pldLen, err := s.metaBaseIface.ResolveECPartWithPayloadLen(cnr, parent, pi) if err != nil { if !errors.As(err, (*ierrors.ObjectID)(&partID)) { - return 0, nil, fmt.Errorf("resolve part ID and payload len in metabase: %w", err) + return fmt.Errorf("resolve part ID and payload len in metabase: %w", err) } s.log.Warn("EC part ID returned from metabase with error", @@ -146,19 +178,19 @@ func (s *Shard) GetECPartRange(cnr cid.ID, parent oid.ID, pi iec.PartInfo, off, } else { if ln == 0 && off == 0 { if pldLen == 0 { - return 0, nil, nil + return nil } - } else if uint64(off) >= pldLen || pldLen-uint64(off) < uint64(ln) { - return 0, nil, apistatus.ErrObjectOutOfRange + } else if off >= pldLen || pldLen-off < ln { + return apistatus.ErrObjectOutOfRange } } - pldLen, rc, err := s.GetRangeStream(cnr, partID, off, ln) + err = s.getRangeStreamFunc(cnr, partID, off, ln, writeCacheFn, blobStorageFn) if err != nil { - return 0, nil, fmt.Errorf("get range by ID %w: %w", ierrors.ObjectID(partID), err) + return fmt.Errorf("get range by ID %w: %w", ierrors.ObjectID(partID), err) } - return pldLen, rc, nil + return nil } // ReadECPartHeader is a buffered alternative for [Shard.HeadECPart] diff --git a/pkg/local_object_storage/shard/ec_test.go b/pkg/local_object_storage/shard/ec_test.go index fca950b62e..c451fc0709 100644 --- a/pkg/local_object_storage/shard/ec_test.go +++ b/pkg/local_object_storage/shard/ec_test.go @@ -327,8 +327,8 @@ func TestShard_GetECPartRange(t *testing.T) { }, } bs := mockBLOBStore{ - getStream: map[oid.Address]getStreamValue{ - partAddr: {obj: partObj}, + getRangeStream: map[oid.Address]getRangeStreamValue{ + partAddr: {pld: partObj.Payload()}, }, } @@ -379,7 +379,7 @@ func TestShard_GetECPartRange(t *testing.T) { s := newSimpleTestShard(t, &bs, &mb, nil) s.log = l - off, ln := int64(partLen/3), int64(partLen/2) + off, ln := partLen/3, partLen/2 gotLen, rdr, err := s.GetECPartRange(cnr, parentID, pi, off, ln) require.NoError(t, err) assertGetECPartRangeOK(t, partObj, off, ln, gotLen, rdr) @@ -409,7 +409,7 @@ func TestShard_GetECPartRange(t *testing.T) { id: partID, ln: 0, } - _, _, err := s.GetECPartRange(cnr, parentID, pi, int64(rng[0]), int64(rng[1])) + _, _, err := s.GetECPartRange(cnr, parentID, pi, rng[0], rng[1]) require.ErrorIs(t, err, apistatus.ErrObjectOutOfRange, rng) } }) @@ -425,7 +425,7 @@ func TestShard_GetECPartRange(t *testing.T) { } { t.Run("BLOB storage/"+tc.name, func(t *testing.T) { bs := mockBLOBStore{ - getStream: map[oid.Address]getStreamValue{ + getRangeStream: map[oid.Address]getRangeStreamValue{ partAddr: {err: tc.err}, }, } @@ -462,7 +462,7 @@ func TestShard_GetECPartRange(t *testing.T) { l, lb := testutil.NewBufferedLogger(t, zap.DebugLevel) wc := mockWriteCache{ - getStream: map[oid.Address]getStreamValue{ + getRangeStream: map[oid.Address]getRangeStreamValue{ oid.NewAddress(cnr, partID): {err: tc.err}, }, } @@ -470,7 +470,7 @@ func TestShard_GetECPartRange(t *testing.T) { s := newSimpleTestShard(t, &bs, &mb, &wc) s.log = l - off, ln := int64(partLen/3), int64(partLen/2) + off, ln := partLen/3, partLen/2 gotLen, rdr, err := s.GetECPartRange(cnr, parentID, pi, off, ln) require.NoError(t, err) assertGetECPartRangeOK(t, partObj, off, ln, gotLen, rdr) @@ -480,8 +480,8 @@ func TestShard_GetECPartRange(t *testing.T) { } wc := mockWriteCache{ - getStream: map[oid.Address]getStreamValue{ - partAddr: {obj: partObj}, + getRangeStream: map[oid.Address]getRangeStreamValue{ + partAddr: {pld: partObj.Payload()}, }, } @@ -768,8 +768,8 @@ func TestShard_HeadECPart(t *testing.T) { } func testGetECPartRangeStream(t *testing.T, obj object.Object, parent oid.ID, pi iec.PartInfo, s *Shard) { - full := int64(obj.PayloadSize()) - for _, rng := range [][2]int64{ + full := obj.PayloadSize() + for _, rng := range [][2]uint64{ {0, 0}, {0, 1}, {0, full}, @@ -793,7 +793,7 @@ func assertGetECPartOK(t testing.TB, exp, hdr object.Object, rdr io.ReadCloser) require.Equal(t, exp, hdr) } -func assertGetECPartRangeOK(t testing.TB, exp object.Object, off, ln int64, pldLen uint64, rc io.ReadCloser) { +func assertGetECPartRangeOK(t testing.TB, exp object.Object, off, ln uint64, pldLen uint64, rc io.ReadCloser) { require.EqualValues(t, exp.PayloadSize(), pldLen) if pldLen == 0 { @@ -808,7 +808,7 @@ func assertGetECPartRangeOK(t testing.TB, exp object.Object, off, ln int64, pldL require.NoError(t, err) if off == 0 && ln == 0 { - ln = int64(pldLen) + ln = pldLen } require.Len(t, b, int(ln)) diff --git a/pkg/local_object_storage/shard/get.go b/pkg/local_object_storage/shard/get.go index af67029819..9f4111ccb3 100644 --- a/pkg/local_object_storage/shard/get.go +++ b/pkg/local_object_storage/shard/get.go @@ -249,3 +249,35 @@ func (s *Shard) ReadObject(addr oid.Address, skipMeta bool, buf []byte) (int, io return n, stream, err } + +// ReadPayloadRange is [Shard.ReadObject] analogue for payload range reading. +// Zero range means full payload. +// +// If given range is out of payload bounds, ReadPayloadRange returns +// [apistatus.ErrObjectOutOfRange]. +func (s *Shard) ReadPayloadRange(addr oid.Address, off, ln uint64, skipMeta bool, buf []byte) (io.ReadCloser, error) { + s.m.RLock() + defer s.m.RUnlock() + + var stream io.ReadCloser + + cb := func(stor common.Storage) error { + var err error + stream, err = stor.ReadPayloadRange(addr, off, ln, buf) + return err + } + + wc := func(c writecache.Cache) error { + var err error + stream, err = c.ReadPayloadRange(addr, off, ln, buf) + return err + } + + skipMeta = skipMeta || s.info.Mode.NoMetabase() + gotMeta, err := s.fetchObjectData(addr, skipMeta, cb, wc) + if err != nil && gotMeta { + err = fmt.Errorf("%w, %w", err, ErrMetaWithNoObject) + } + + return stream, err +} diff --git a/pkg/local_object_storage/shard/range.go b/pkg/local_object_storage/shard/range.go index 65d14d93cf..2eef863e45 100644 --- a/pkg/local_object_storage/shard/range.go +++ b/pkg/local_object_storage/shard/range.go @@ -25,19 +25,44 @@ import ( // // If the range is out of payload bounds, GetRangeStream returns // [apistatus.ErrObjectOutOfRange]. -func (s *Shard) GetRangeStream(cnr cid.ID, id oid.ID, off, ln int64) (uint64, io.ReadCloser, error) { - if off < 0 || ln < 0 { - return 0, nil, fmt.Errorf("invalid range: off=%d,len=%d", off, ln) - } +func (s *Shard) GetRangeStream(cnr cid.ID, id oid.ID, off, ln uint64) (uint64, io.ReadCloser, error) { + var pldLen uint64 + var stream io.ReadCloser + return pldLen, stream, s.getRangeStreamFunc(cnr, id, off, ln, func(writeCache writecache.Cache, addr oid.Address, off, ln uint64) error { + var err error + pldLen, stream, err = writeCache.GetRangeStream(addr, off, ln) + return err + }, func(blobStorage common.Storage, addr oid.Address, off, ln uint64) error { + var err error + pldLen, stream, err = blobStorage.GetRangeStream(addr, off, ln) + return err + }) +} + +// ReadRange is a buffered alternative for [Shard.GetRangeStream] +// similar to [Shard.ReadHeader]. +func (s *Shard) ReadRange(cnr cid.ID, id oid.ID, off, ln uint64, buf []byte) (io.ReadCloser, error) { + var stream io.ReadCloser + return stream, s.getRangeStreamFunc(cnr, id, off, ln, func(writeCache writecache.Cache, addr oid.Address, off, ln uint64) error { + var err error + stream, err = writeCache.ReadPayloadRange(addr, off, ln, buf) + return err + }, func(blobStorage common.Storage, addr oid.Address, off, ln uint64) error { + var err error + stream, err = blobStorage.ReadPayloadRange(addr, off, ln, buf) + return err + }) +} +func (s *Shard) getRangeStreamFunc(cnr cid.ID, id oid.ID, off, ln uint64, + writeCacheFn func(writecache.Cache, oid.Address, uint64, uint64) error, + blobStorageFn func(common.Storage, oid.Address, uint64, uint64) error, +) error { addr := oid.NewAddress(cnr, id) if s.hasWriteCache() { - // TODO: support GetRangeStream https://github.com/nspcc-dev/neofs-node/issues/3593 - hdr, rc, err := s.writeCache.GetStream(addr) - if err == nil { - pldLen := hdr.PayloadSize() - rc, err = slicePayloadReader(rc, pldLen, off, ln) - return pldLen, rc, err + err := writeCacheFn(s.writeCache, addr, off, ln) + if err == nil || errors.Is(err, apistatus.ErrObjectOutOfRange) { + return err } if errors.Is(err, apistatus.ErrObjectNotFound) { @@ -49,51 +74,12 @@ func (s *Shard) GetRangeStream(cnr cid.ID, id oid.ID, off, ln int64) (uint64, io } } - // TODO: support GetRangeStream https://github.com/nspcc-dev/neofs-node/issues/3593 - hdr, rc, err := s.blobStor.GetStream(oid.NewAddress(cnr, id)) + err := blobStorageFn(s.blobStor, addr, off, ln) if err != nil { - return 0, nil, fmt.Errorf("get from BLOB storage: %w", err) - } - - pldLen := hdr.PayloadSize() - rc, err = slicePayloadReader(rc, pldLen, off, ln) - return pldLen, rc, err -} - -func slicePayloadReader(rc io.ReadCloser, pldLen uint64, off, ln int64) (io.ReadCloser, error) { - if ln == 0 && off == 0 { - if pldLen == 0 { - rc.Close() - return nil, nil - } - return rc, nil - } - - if uint64(off) >= pldLen || pldLen-uint64(off) < uint64(ln) { - rc.Close() - return nil, apistatus.ErrObjectOutOfRange - } - - if off > 0 { - if s, ok := rc.(io.Seeker); ok { - // TODO: Seems like rc implements io.Seeker in all current cases. Consider extension of resulting interface. - if _, err := s.Seek(off, io.SeekStart); err != nil { - rc.Close() - return nil, fmt.Errorf("seek offset in payload stream: %w", err) - } - } else if _, err := io.CopyN(io.Discard, rc, off); err != nil { - rc.Close() - return nil, fmt.Errorf("discard first bytes in payload stream: %w", err) - } + return fmt.Errorf("get from BLOB storage: %w", err) } - return struct { - io.Reader - io.Closer - }{ - Reader: io.LimitReader(rc, ln), - Closer: rc, - }, nil + return nil } // GetRangeStreamWithMetadataLookup reads payload range of the referenced object @@ -121,13 +107,13 @@ func (s *Shard) GetRangeStreamWithMetadataLookup(addr oid.Address, off, ln uint6 cb := func(stor common.Storage) error { var err error - stream, err = stor.GetRangeStream(addr, off, ln) + _, stream, err = stor.GetRangeStream(addr, off, ln) return err } wc := func(c writecache.Cache) error { var err error - stream, err = c.GetRangeStream(addr, off, ln) + _, stream, err = c.GetRangeStream(addr, off, ln) return err } diff --git a/pkg/local_object_storage/shard/range_internal_test.go b/pkg/local_object_storage/shard/range_internal_test.go index f8644e8335..c436156135 100644 --- a/pkg/local_object_storage/shard/range_internal_test.go +++ b/pkg/local_object_storage/shard/range_internal_test.go @@ -7,7 +7,6 @@ import ( "io" "math" "testing" - "testing/iotest" "github.com/nspcc-dev/neofs-node/internal/testutil" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" @@ -35,24 +34,11 @@ func TestShard_GetRangeStream(t *testing.T) { objAddr := obj.Address() bs := mockBLOBStore{ - getStream: map[oid.Address]getStreamValue{ - objAddr: {obj: obj}, + getRangeStream: map[oid.Address]getRangeStreamValue{ + objAddr: {pld: obj.Payload()}, }, } - t.Run("invalid ranges", func(t *testing.T) { - s := newSimpleTestShard(t, unimplementedBLOBStore{}, unimplementedMetabase{}, unimplementedWriteCache{}) - - t.Run("negative offset", func(t *testing.T) { - _, _, err := s.GetRangeStream(cnr, id, -1, 2) - require.EqualError(t, err, "invalid range: off=-1,len=2") - }) - t.Run("negative len", func(t *testing.T) { - _, _, err := s.GetRangeStream(cnr, id, 2, -1) - require.EqualError(t, err, "invalid range: off=2,len=-1") - }) - }) - t.Run("BLOB storage failures", func(t *testing.T) { for _, tc := range []struct { name string @@ -63,7 +49,7 @@ func TestShard_GetRangeStream(t *testing.T) { } { t.Run(tc.name, func(t *testing.T) { bs := mockBLOBStore{ - getStream: map[oid.Address]getStreamValue{ + getRangeStream: map[oid.Address]getRangeStreamValue{ objAddr: {err: tc.err}, }, } @@ -77,97 +63,25 @@ func TestShard_GetRangeStream(t *testing.T) { } t.Run("range out of bounds", func(t *testing.T) { - var cf closeFlag - bs := mockBLOBStore{ - getStream: map[oid.Address]getStreamValue{ - objAddr: { - obj: obj, - rc: struct { - io.Reader - io.Closer - }{ - Closer: &cf, - }, - }, + getRangeStream: map[oid.Address]getRangeStreamValue{ + objAddr: {pld: obj.Payload()}, }, } s := newSimpleTestShard(t, &bs, unimplementedMetabase{}, nil) - for _, rng := range [][2]int64{ + for _, rng := range [][2]uint64{ {0, payloadLen + 1}, {payloadLen - 1, 2}, {payloadLen, 0}, {payloadLen, 1}, {math.MaxInt64, math.MaxInt64}, } { - cf = false - _, _, err := s.GetRangeStream(cnr, id, rng[0], rng[1]) require.ErrorIs(t, err, apistatus.ErrObjectOutOfRange, rng) - - require.True(t, bool(cf)) } }) - - t.Run("skip first bytes", func(t *testing.T) { - t.Run("seek", func(t *testing.T) { - var cf closeFlag - seekErr := errors.New("seek error") - - bs := mockBLOBStore{ - getStream: map[oid.Address]getStreamValue{ - objAddr: { - obj: obj, - rc: errSeeker{ - ReadCloser: struct { - io.Reader - io.Closer - }{ - Closer: &cf, - }, - err: seekErr, - }, - }, - }, - } - - s := newSimpleTestShard(t, &bs, unimplementedMetabase{}, nil) - - _, _, err := s.GetRangeStream(cnr, id, 1, 1) - require.ErrorIs(t, err, seekErr) - require.ErrorContains(t, err, "seek offset in payload stream") - - require.True(t, bool(cf)) - }) - - var cf closeFlag - readErr := errors.New("read error") - - bs := mockBLOBStore{ - getStream: map[oid.Address]getStreamValue{ - objAddr: { - obj: obj, - rc: struct { - io.Reader - io.Closer - }{ - Reader: iotest.ErrReader(readErr), - Closer: &cf, - }, - }, - }, - } - - s := newSimpleTestShard(t, &bs, unimplementedMetabase{}, nil) - - _, _, err := s.GetRangeStream(cnr, id, 1, 1) - require.ErrorIs(t, err, readErr) - require.ErrorContains(t, err, "discard first bytes in payload stream") - - require.True(t, bool(cf)) - }) }) t.Run("writecache", func(t *testing.T) { @@ -190,7 +104,7 @@ func TestShard_GetRangeStream(t *testing.T) { l, lb := testutil.NewBufferedLogger(t, zap.DebugLevel) wc := mockWriteCache{ - getStream: map[oid.Address]getStreamValue{ + getRangeStream: map[oid.Address]getRangeStreamValue{ objAddr: {err: tc.err}, }, } @@ -198,7 +112,7 @@ func TestShard_GetRangeStream(t *testing.T) { s := newSimpleTestShard(t, &bs, &unimplementedMetabase{}, &wc) s.log = l - off, ln := int64(payloadLen/2), int64(payloadLen/2) + off, ln := uint64(payloadLen/2), uint64(payloadLen/2) pldLen, rc, err := s.GetRangeStream(cnr, id, off, ln) require.NoError(t, err) assertGetRangeStreamOK(t, obj, off, ln, pldLen, rc) @@ -208,96 +122,24 @@ func TestShard_GetRangeStream(t *testing.T) { } t.Run("range out of bounds", func(t *testing.T) { - var cf closeFlag - wc := mockWriteCache{ - getStream: map[oid.Address]getStreamValue{ - objAddr: { - obj: obj, - rc: struct { - io.Reader - io.Closer - }{ - Closer: &cf, - }, - }, + getRangeStream: map[oid.Address]getRangeStreamValue{ + objAddr: {pld: obj.Payload()}, }, } s := newSimpleTestShard(t, unimplementedBLOBStore{}, unimplementedMetabase{}, &wc) - for _, rng := range [][2]int64{ + for _, rng := range [][2]uint64{ {0, payloadLen + 1}, {payloadLen - 1, 2}, {payloadLen, 0}, {payloadLen, 1}, {math.MaxInt64, math.MaxInt64}, } { - cf = false - _, _, err := s.GetRangeStream(cnr, id, rng[0], rng[1]) require.ErrorIs(t, err, apistatus.ErrObjectOutOfRange, rng) - - require.True(t, bool(cf)) - } - }) - - t.Run("skip first bytes", func(t *testing.T) { - t.Run("seek", func(t *testing.T) { - var cf closeFlag - seekErr := errors.New("seek error") - - wc := mockWriteCache{ - getStream: map[oid.Address]getStreamValue{ - objAddr: { - obj: obj, - rc: errSeeker{ - ReadCloser: struct { - io.Reader - io.Closer - }{ - Closer: &cf, - }, - err: seekErr, - }, - }, - }, - } - - s := newSimpleTestShard(t, unimplementedBLOBStore{}, unimplementedMetabase{}, &wc) - - _, _, err := s.GetRangeStream(cnr, id, 1, 1) - require.ErrorIs(t, err, seekErr) - require.ErrorContains(t, err, "seek offset in payload stream") - - require.True(t, bool(cf)) - }) - - var cf closeFlag - readErr := errors.New("read error") - - wc := mockWriteCache{ - getStream: map[oid.Address]getStreamValue{ - objAddr: { - obj: obj, - rc: struct { - io.Reader - io.Closer - }{ - Reader: iotest.ErrReader(readErr), - Closer: &cf, - }, - }, - }, } - - s := newSimpleTestShard(t, unimplementedBLOBStore{}, unimplementedMetabase{}, &wc) - - _, _, err := s.GetRangeStream(cnr, id, 1, 1) - require.ErrorIs(t, err, readErr) - require.ErrorContains(t, err, "discard first bytes in payload stream") - - require.True(t, bool(cf)) }) }) @@ -306,19 +148,9 @@ func TestShard_GetRangeStream(t *testing.T) { obj.SetPayloadSize(0) obj.SetPayload(nil) - var cf closeFlag - wc := mockWriteCache{ - getStream: map[oid.Address]getStreamValue{ - objAddr: { - obj: obj, - rc: struct { - io.Reader - io.Closer - }{ - Closer: &cf, - }, - }, + getRangeStream: map[oid.Address]getRangeStreamValue{ + objAddr: {pld: obj.Payload()}, }, } @@ -327,21 +159,17 @@ func TestShard_GetRangeStream(t *testing.T) { t.Run("non-zero range", func(t *testing.T) { _, _, err := s.GetRangeStream(cnr, id, 0, 1) require.ErrorIs(t, err, apistatus.ErrObjectOutOfRange) - require.True(t, bool(cf)) - cf = false }) pldLen, rc, err := s.GetRangeStream(cnr, id, 0, 0) require.NoError(t, err) require.Zero(t, pldLen) require.Zero(t, rc) - - require.True(t, bool(cf)) }) wc := mockWriteCache{ - getStream: map[oid.Address]getStreamValue{ - objAddr: {obj: obj}, + getRangeStream: map[oid.Address]getRangeStreamValue{ + objAddr: {pld: obj.Payload()}, }, } @@ -354,19 +182,9 @@ func TestShard_GetRangeStream(t *testing.T) { obj.SetPayloadSize(0) obj.SetPayload(nil) - var cf closeFlag - bs := mockBLOBStore{ - getStream: map[oid.Address]getStreamValue{ - objAddr: { - obj: obj, - rc: struct { - io.Reader - io.Closer - }{ - Closer: &cf, - }, - }, + getRangeStream: map[oid.Address]getRangeStreamValue{ + objAddr: {pld: obj.Payload()}, }, } @@ -375,16 +193,12 @@ func TestShard_GetRangeStream(t *testing.T) { t.Run("non-zero range", func(t *testing.T) { _, _, err := s.GetRangeStream(cnr, id, 0, 1) require.ErrorIs(t, err, apistatus.ErrObjectOutOfRange) - require.True(t, bool(cf)) - cf = false }) pldLen, rc, err := s.GetRangeStream(cnr, id, 0, 0) require.NoError(t, err) require.Zero(t, pldLen) require.Zero(t, rc) - - require.True(t, bool(cf)) }) s := newSimpleTestShard(t, &bs, &unimplementedMetabase{}, nil) @@ -392,8 +206,8 @@ func TestShard_GetRangeStream(t *testing.T) { } func testGetRangeStream(t *testing.T, obj object.Object, s *Shard) { - full := int64(obj.PayloadSize()) - for _, rng := range [][2]int64{ + full := obj.PayloadSize() + for _, rng := range [][2]uint64{ {0, 0}, {0, 1}, {0, full}, @@ -410,14 +224,14 @@ func testGetRangeStream(t *testing.T, obj object.Object, s *Shard) { } } -func assertGetRangeStreamOK(t testing.TB, obj object.Object, off, ln int64, pldLen uint64, rc io.ReadCloser) { +func assertGetRangeStreamOK(t testing.TB, obj object.Object, off, ln uint64, pldLen uint64, rc io.ReadCloser) { require.EqualValues(t, obj.PayloadSize(), pldLen) b, err := io.ReadAll(rc) require.NoError(t, err) if off == 0 && ln == 0 { - ln = int64(pldLen) + ln = pldLen } require.True(t, bytes.Equal(obj.Payload()[off:][:ln], b)) diff --git a/pkg/local_object_storage/shard/shard_internal_test.go b/pkg/local_object_storage/shard/shard_internal_test.go index d8113d0488..05a3304aa8 100644 --- a/pkg/local_object_storage/shard/shard_internal_test.go +++ b/pkg/local_object_storage/shard/shard_internal_test.go @@ -3,6 +3,7 @@ package shard import ( "bytes" "errors" + "fmt" "io" "testing" @@ -140,29 +141,29 @@ func (x *mockBLOBStore) ReadHeader(addr oid.Address, buf []byte) (int, error) { return copy(buf, val.hdr.Marshal()), nil } -func (x *mockBLOBStore) GetRangeStream(addr oid.Address, off uint64, ln uint64) (io.ReadCloser, error) { +func (x *mockBLOBStore) GetRangeStream(addr oid.Address, off uint64, ln uint64) (uint64, io.ReadCloser, error) { val, ok := x.getRangeStream[addr] if !ok { - return nil, errors.New("[test] unexpected object requested") + return 0, nil, fmt.Errorf("[test] unexpected object requested %s", addr) } if val.err != nil { - return nil, val.err + return 0, nil, val.err } pldLen := uint64(len(val.pld)) if off == 0 && ln == 0 { if pldLen == 0 { - return nil, nil + return 0, nil, nil } - return io.NopCloser(bytes.NewReader(val.pld)), nil + return uint64(len(val.pld)), io.NopCloser(bytes.NewReader(val.pld)), nil } if off >= pldLen || pldLen-off < ln { - return nil, apistatus.ErrObjectOutOfRange + return 0, nil, apistatus.ErrObjectOutOfRange } - return io.NopCloser(bytes.NewReader(val.pld[off:][:ln])), nil + return uint64(len(val.pld)), io.NopCloser(bytes.NewReader(val.pld[off:][:ln])), nil } type mockWriteCache struct { @@ -215,29 +216,29 @@ func (x *mockWriteCache) ReadHeader(addr oid.Address, buf []byte) (int, error) { return copy(buf, val.hdr.Marshal()), nil } -func (x *mockWriteCache) GetRangeStream(addr oid.Address, off uint64, ln uint64) (io.ReadCloser, error) { +func (x *mockWriteCache) GetRangeStream(addr oid.Address, off uint64, ln uint64) (uint64, io.ReadCloser, error) { val, ok := x.getRangeStream[addr] if !ok { - return nil, errors.New("[test] unexpected object requested") + return 0, nil, errors.New("[test] unexpected object requested") } if val.err != nil { - return nil, val.err + return 0, nil, val.err } pldLen := uint64(len(val.pld)) if off == 0 && ln == 0 { if pldLen == 0 { - return nil, nil + return 0, nil, nil } - return io.NopCloser(bytes.NewReader(val.pld)), nil + return uint64(len(val.pld)), io.NopCloser(bytes.NewReader(val.pld)), nil } if off >= pldLen || pldLen-off < ln { - return nil, apistatus.ErrObjectOutOfRange + return 0, nil, apistatus.ErrObjectOutOfRange } - return io.NopCloser(bytes.NewReader(val.pld[off:][:ln])), nil + return uint64(len(val.pld)), io.NopCloser(bytes.NewReader(val.pld[off:][:ln])), nil } type unimplementedBLOBStore struct{} @@ -282,7 +283,11 @@ func (unimplementedBLOBStore) GetStream(oid.Address) (*object.Object, io.ReadClo panic("unimplemented") } -func (unimplementedBLOBStore) GetRangeStream(oid.Address, uint64, uint64) (io.ReadCloser, error) { +func (unimplementedBLOBStore) GetRangeStream(oid.Address, uint64, uint64) (uint64, io.ReadCloser, error) { + panic("unimplemented") +} + +func (unimplementedBLOBStore) ReadPayloadRange(oid.Address, uint64, uint64, []byte) (io.ReadCloser, error) { panic("unimplemented") } @@ -336,7 +341,11 @@ func (unimplementedWriteCache) GetStream(oid.Address) (*object.Object, io.ReadCl panic("unimplemented") } -func (unimplementedWriteCache) GetRangeStream(oid.Address, uint64, uint64) (io.ReadCloser, error) { +func (unimplementedWriteCache) GetRangeStream(oid.Address, uint64, uint64) (uint64, io.ReadCloser, error) { + panic("unimplemented") +} + +func (unimplementedWriteCache) ReadPayloadRange(oid.Address, uint64, uint64, []byte) (io.ReadCloser, error) { panic("unimplemented") } @@ -401,20 +410,3 @@ func (unimplementedMetabase) ResolveECPart(cid.ID, oid.ID, iec.PartInfo) (oid.ID func (unimplementedMetabase) ResolveECPartWithPayloadLen(cid.ID, oid.ID, iec.PartInfo) (oid.ID, uint64, error) { panic("unimplemented") } - -type closeFlag bool - -func (x *closeFlag) Close() error { - *x = true - return nil -} - -// [iotest.ErrReader] analogue. -type errSeeker struct { - io.ReadCloser - err error -} - -func (x errSeeker) Seek(int64, int) (int64, error) { - return 0, x.err -} diff --git a/pkg/local_object_storage/writecache/get.go b/pkg/local_object_storage/writecache/get.go index baacb57099..b5fde83aca 100644 --- a/pkg/local_object_storage/writecache/get.go +++ b/pkg/local_object_storage/writecache/get.go @@ -114,19 +114,32 @@ func (c *cache) GetStream(addr oid.Address) (*object.Object, io.ReadCloser, erro // // If the range is out of payload bounds, GetRangeStream returns // [apistatus.ErrObjectOutOfRange]. -func (c *cache) GetRangeStream(addr oid.Address, off uint64, ln uint64) (io.ReadCloser, error) { +func (c *cache) GetRangeStream(addr oid.Address, off uint64, ln uint64) (uint64, io.ReadCloser, error) { if ln == 0 && off != 0 { - return nil, fmt.Errorf("invalid range off=%d,ln=0", off) + return 0, nil, fmt.Errorf("invalid range off=%d,ln=0", off) } if !c.objCounters.HasAddress(addr) { - return nil, logicerr.Wrap(apistatus.ErrObjectNotFound) + return 0, nil, logicerr.Wrap(apistatus.ErrObjectNotFound) } - stream, err := c.fsTree.GetRangeStream(addr, off, ln) + pldLen, stream, err := c.fsTree.GetRangeStream(addr, off, ln) if err != nil { - return nil, fmt.Errorf("get range stream from underlying FS tree: %w", err) + return 0, nil, fmt.Errorf("get range stream from underlying FS tree: %w", err) } - return stream, nil + return pldLen, stream, nil +} + +// ReadPayloadRange is [Cache.ReadObject] analogue for payload range reading. +// Zero range means full payload. Returns full payload range length. +// +// If given range is out of payload bounds, ReadPayloadRange returns +// [apistatus.ErrObjectOutOfRange]. +func (c *cache) ReadPayloadRange(addr oid.Address, off, ln uint64, buf []byte) (io.ReadCloser, error) { + if !c.objCounters.HasAddress(addr) { + return nil, apistatus.ErrObjectNotFound + } + + return c.fsTree.ReadPayloadRange(addr, off, ln, buf) } diff --git a/pkg/local_object_storage/writecache/get_test.go b/pkg/local_object_storage/writecache/get_test.go index b085fe124e..cfc4a3fe08 100644 --- a/pkg/local_object_storage/writecache/get_test.go +++ b/pkg/local_object_storage/writecache/get_test.go @@ -79,14 +79,14 @@ func TestCache_GetRangeStream(t *testing.T) { addr := obj.Address() - _, err := c.GetRangeStream(addr, 0, 0) + _, _, err := c.GetRangeStream(addr, 0, 0) require.ErrorIs(t, err, apistatus.ErrObjectNotFound) - _, err = c.GetRangeStream(addr, 1, pldLen-1) + _, _, err = c.GetRangeStream(addr, 1, pldLen-1) require.ErrorIs(t, err, apistatus.ErrObjectNotFound) require.NoError(t, c.Put(addr, &obj, obj.Marshal())) - _, err = c.GetRangeStream(addr, 1, 0) + _, _, err = c.GetRangeStream(addr, 1, 0) require.EqualError(t, err, "invalid range off=1,ln=0") for _, tc := range []struct{ off, ln uint64 }{ @@ -95,8 +95,9 @@ func TestCache_GetRangeStream(t *testing.T) { {off: 1, ln: pldLen - 1}, {off: pldLen - 1, ln: 1}, } { - stream, err := c.GetRangeStream(addr, tc.off, tc.ln) + gotPldLen, stream, err := c.GetRangeStream(addr, tc.off, tc.ln) require.NoError(t, err, tc) + require.EqualValues(t, pldLen, gotPldLen) b, err := io.ReadAll(stream) require.NoError(t, err) @@ -115,15 +116,15 @@ func TestCache_GetRangeStream(t *testing.T) { {off: 1, ln: pldLen}, {off: pldLen - 1, ln: 2}, } { - _, err := c.GetRangeStream(addr, tc.off, tc.ln) + _, _, err := c.GetRangeStream(addr, tc.off, tc.ln) require.ErrorIs(t, err, apistatus.ErrObjectOutOfRange) } require.NoError(t, c.Delete(addr)) - _, err = c.GetRangeStream(addr, 0, 0) + _, _, err = c.GetRangeStream(addr, 0, 0) require.ErrorIs(t, err, apistatus.ErrObjectNotFound) - _, err = c.GetRangeStream(addr, 1, pldLen-1) + _, _, err = c.GetRangeStream(addr, 1, pldLen-1) require.ErrorIs(t, err, apistatus.ErrObjectNotFound) } diff --git a/pkg/local_object_storage/writecache/writecache.go b/pkg/local_object_storage/writecache/writecache.go index 2b6c5fa24c..2db798db8a 100644 --- a/pkg/local_object_storage/writecache/writecache.go +++ b/pkg/local_object_storage/writecache/writecache.go @@ -28,10 +28,11 @@ type Cache interface { GetBytes(oid.Address) ([]byte, error) // GetStream returns an object and a stream to read its payload. GetStream(oid.Address) (*object.Object, io.ReadCloser, error) - GetRangeStream(addr oid.Address, off uint64, ln uint64) (io.ReadCloser, error) + GetRangeStream(addr oid.Address, off uint64, ln uint64) (uint64, io.ReadCloser, error) Head(oid.Address) (*object.Object, error) ReadHeader(oid.Address, []byte) (int, error) ReadObject(oid.Address, []byte) (int, io.ReadCloser, error) + ReadPayloadRange(oid.Address, uint64, uint64, []byte) (io.ReadCloser, error) // Delete removes object referenced by the given oid.Address from the // Cache. Returns any error encountered that prevented the object to be // removed. diff --git a/pkg/metrics/engine.go b/pkg/metrics/engine.go index 01d909002c..fb14edd460 100644 --- a/pkg/metrics/engine.go +++ b/pkg/metrics/engine.go @@ -17,6 +17,7 @@ type ( headDuration prometheus.Histogram readHeaderDuration prometheus.Histogram readObjectDuration prometheus.Histogram + readPayloadRangeDuration prometheus.Histogram getStreamDuration prometheus.Histogram getRangeStreamDuration prometheus.Histogram inhumeDuration prometheus.Histogram @@ -29,6 +30,7 @@ type ( getECPartRangeDuration prometheus.Histogram headECPartDuration prometheus.Histogram readECPartHeaderDuration prometheus.Histogram + readECPartRangeDuration prometheus.Histogram containerSize prometheus.GaugeVec payloadSize prometheus.GaugeVec @@ -103,6 +105,13 @@ func newEngineMetrics() engineMetrics { Help: "Engine 'read object' operations handling time", }) + readPayloadRangeDuration = prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: storageNodeNameSpace, + Subsystem: engineSubsystem, + Name: "read_payload_range_time", + Help: "Engine 'read payload range' operations handling time", + }) + getStreamDuration = prometheus.NewHistogram(prometheus.HistogramOpts{ Namespace: storageNodeNameSpace, Subsystem: engineSubsystem, @@ -186,6 +195,13 @@ func newEngineMetrics() engineMetrics { Name: "read_ec_part_header_time", Help: "Engine 'read EC part header' operations handling time", }) + + readECPartRangeDuration = prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: storageNodeNameSpace, + Subsystem: engineSubsystem, + Name: "read_ec_part_range_time", + Help: "Engine 'read EC part range' operations handling time", + }) ) var ( @@ -221,6 +237,7 @@ func newEngineMetrics() engineMetrics { headDuration: headDuration, readHeaderDuration: readHeaderDuration, readObjectDuration: readObjectDuration, + readPayloadRangeDuration: readPayloadRangeDuration, getStreamDuration: getStreamDuration, getRangeStreamDuration: getRangeStreamDuration, inhumeDuration: inhumeDuration, @@ -233,6 +250,7 @@ func newEngineMetrics() engineMetrics { getECPartRangeDuration: getECPartRangeDuration, headECPartDuration: headECPartDuration, readECPartHeaderDuration: readECPartHeaderDuration, + readECPartRangeDuration: readECPartRangeDuration, containerSize: *containerSize, payloadSize: *payloadSize, capacitySize: *capacitySize, @@ -249,6 +267,7 @@ func (m engineMetrics) register() { prometheus.MustRegister(m.headDuration) prometheus.MustRegister(m.readHeaderDuration) prometheus.MustRegister(m.readObjectDuration) + prometheus.MustRegister(m.readPayloadRangeDuration) prometheus.MustRegister(m.getStreamDuration) prometheus.MustRegister(m.getRangeStreamDuration) prometheus.MustRegister(m.inhumeDuration) @@ -261,6 +280,7 @@ func (m engineMetrics) register() { prometheus.MustRegister(m.getECPartRangeDuration) prometheus.MustRegister(m.headECPartDuration) prometheus.MustRegister(m.readECPartHeaderDuration) + prometheus.MustRegister(m.readECPartRangeDuration) prometheus.MustRegister(m.containerSize) prometheus.MustRegister(m.payloadSize) prometheus.MustRegister(m.capacitySize) @@ -302,6 +322,10 @@ func (m engineMetrics) AddReadObjectDuration(d time.Duration) { m.readObjectDuration.Observe(d.Seconds()) } +func (m engineMetrics) AddReadPayloadRangeDuration(d time.Duration) { + m.readPayloadRangeDuration.Observe(d.Seconds()) +} + func (m engineMetrics) AddGetStreamDuration(d time.Duration) { m.getStreamDuration.Observe(d.Seconds()) } @@ -350,6 +374,10 @@ func (m engineMetrics) AddReadECPartHeaderDuration(d time.Duration) { m.readECPartHeaderDuration.Observe(d.Seconds()) } +func (m engineMetrics) AddReadECPartRangeDuration(d time.Duration) { + m.readECPartRangeDuration.Observe(d.Seconds()) +} + func (m engineMetrics) AddToContainerSize(cnrID string, size int64) { m.containerSize.With( prometheus.Labels{ diff --git a/pkg/services/object/get.go b/pkg/services/object/get.go index 5f4bf36868..85f3169d4d 100644 --- a/pkg/services/object/get.go +++ b/pkg/services/object/get.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "crypto/sha256" - "encoding/binary" "errors" "fmt" "io" @@ -13,8 +12,6 @@ import ( "github.com/nspcc-dev/neofs-node/internal/protobuf/protoscan" aclsvc "github.com/nspcc-dev/neofs-node/pkg/services/object/acl/v2" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" - "github.com/nspcc-dev/neofs-sdk-go/object" - oid "github.com/nspcc-dev/neofs-sdk-go/object/id" protoobject "github.com/nspcc-dev/neofs-sdk-go/proto/object" protorefs "github.com/nspcc-dev/neofs-sdk-go/proto/refs" protostatus "github.com/nspcc-dev/neofs-sdk-go/proto/status" @@ -268,109 +265,10 @@ func (x *getProxyContext) handleChunkResponse(streamProg *getStreamProgress, res } } - remoteSent := respChunkLen == chunkLen - if !remoteSent { - if respChunkLen <= maxGetResponseChunkLen { - localRespBuf, _ := getBufferForChunkGetResponse() - - chunkBuffers.CopyTo(localRespBuf.SliceBuffer[maxChunkOffsetInGetResponse:]) - - bodyf := shiftPayloadChunkInGetResponseBuffer(localRespBuf.SliceBuffer, maxChunkOffsetInGetResponse, respChunkLen) - - if x.respStream.signResponse { - n, err := x.respStream.srv.signResponse(localRespBuf.SliceBuffer[bodyf.To:], localRespBuf.SliceBuffer[bodyf.ValueFrom:bodyf.To], nil) - if err != nil { - return false, fmt.Errorf("sign chunk response: %w", err) - } - bodyf.To += n - } - - localRespBuf.SetBounds(bodyf.From, bodyf.To) - respBuf = mem.BufferSlice{localRespBuf} - } else { - // TODO: in this case we could make respBuf = mem.BufferSlice{prefix, chunkBuffers}, - // but then we'd have to provide mem.Buffer from iprotobuf.BuffersSlice - bodyFldLen := 1 + protowire.SizeBytes(respChunkLen) - fullLen := 1 + protowire.SizeBytes(bodyFldLen) - if x.respStream.signResponse { - fullLen += maxResponseVerificationHeaderLen - } - - b := make(mem.SliceBuffer, fullLen) - b[0] = iprotobuf.TagBytes1 // body field - off := 1 + binary.PutUvarint(b[1:], uint64(bodyFldLen)) - b[off] = iprotobuf.TagBytes2 // chunk field - off += 1 + binary.PutUvarint(b[off+1:], uint64(respChunkLen)) - off += chunkBuffers.CopyTo(b[off:]) - if x.respStream.signResponse { - n, err := x.respStream.srv.signResponse(b[off:], b[:off], nil) - if err != nil { - return false, fmt.Errorf("sign chunk response: %w", err) - } - b = b[:off+n] - } - - respBuf = mem.BufferSlice{b} - } - } - - if err := x.respStream.base.SendMsg(respBuf); err != nil { - return remoteSent, err - } - - streamProg.readPayload += chunkLen - x.respondedPayload += to - from - - return remoteSent, nil + return x.respStream.srv.sendChunkResponse(x.respStream.base, respBuf, chunkBuffers, respChunkLen, chunkLen, + x.respStream.signResponse, iprotobuf.TagBytes2, &streamProg.readPayload, &x.respondedPayload, shiftPayloadChunkInGetResponseBuffer) } func (x *getProxyContext) handleSplitInfo(respBuf mem.BufferSlice, buffers iprotobuf.BuffersSlice) (bool, error) { - var si object.SplitInfo - var opts protoscan.ScanMessageOptions - - compose := !x.req.GetBody().GetRaw() - if compose { - opts.InterceptBytes = func(num protowire.Number, buffers iprotobuf.BuffersSlice) error { - if num == protoobject.FieldSplitInfoSplitID { - id := object.NewSplitIDFromV2(buffers.ReadOnlyData()) - if id == nil { - return errors.New("invalid split ID") - } - si.SetSplitID(id) - } - return nil - } - opts.InterceptNested = func(num protowire.Number, buffers iprotobuf.BuffersSlice) error { - if num != protoobject.FieldSplitInfoLastPart && num != protoobject.FieldSplitInfoLink && num != protoobject.FieldSplitInfoFirstPart { - return protoscan.ErrContinue - } - - var opts protoscan.ScanMessageOptions - opts.InterceptBytes = func(num2 protowire.Number, buffers iprotobuf.BuffersSlice) error { - if num2 == protorefs.FieldObjectIDValue { - switch num { //nolint:exhaustive - case protoobject.FieldSplitInfoLastPart: - si.SetLastPart(oid.ID(buffers.ReadOnlyData())) - case protoobject.FieldSplitInfoLink: - si.SetLink(oid.ID(buffers.ReadOnlyData())) - case protoobject.FieldSplitInfoFirstPart: - si.SetFirstPart(oid.ID(buffers.ReadOnlyData())) - } - } - return nil - } - return protoscan.ScanMessage(buffers, protoscan.ObjectIDScheme, opts) - } - } - - err := protoscan.ScanMessage(buffers, protoscan.ObjectSplitInfoScheme, opts) - if err != nil { - return false, fmt.Errorf("handle split info field: %w", err) - } - - if compose { - return false, object.NewSplitInfoError(&si) - } - - return true, x.respStream.base.SendMsg(respBuf) + return handleSplitInfo(x.req.GetBody().GetRaw(), x.respStream.base, respBuf, buffers) } diff --git a/pkg/services/object/get/exec.go b/pkg/services/object/get/exec.go index 9a5cf24d1d..d801354975 100644 --- a/pkg/services/object/get/exec.go +++ b/pkg/services/object/get/exec.go @@ -69,6 +69,11 @@ type execCtx struct { submitHeadResponseFn SubmitHeadResponseFunc forwardGetRequestFn ForwardGetRequestFunc + + forwardRangeRequestFn ForwardRangeRequestFunc + + localRangeBuffer []byte + submitLocalRangeStreamFn SubmitDataStreamFunc } type execOption func(*execCtx) @@ -127,6 +132,19 @@ func withForwardGetRequestFunc(f ForwardGetRequestFunc) execOption { } } +func withForwardRangeRequestFunc(f ForwardRangeRequestFunc) execOption { + return func(ctx *execCtx) { + ctx.forwardRangeRequestFn = f + } +} + +func withLocalRangeBuffer(buf []byte, submitStreamFn SubmitDataStreamFunc) execOption { + return func(ctx *execCtx) { + ctx.localRangeBuffer = buf + ctx.submitLocalRangeStreamFn = submitStreamFn + } +} + func (exec *execCtx) setLogger(l *zap.Logger) { if l.Level() != zap.DebugLevel { exec.log = l @@ -449,12 +467,6 @@ func (exec *execCtx) writeCollectedObject() { } } -// isForwardingEnabled returns true if common execution -// parameters has request forwarding closure set. -func (exec execCtx) isForwardingEnabled() bool { - return exec.prm.forwarder != nil -} - // isRangeHashForwardingEnabled returns true if common execution // parameters has GETRANGEHASH request forwarding closure set. func (exec execCtx) isRangeHashForwardingEnabled() bool { diff --git a/pkg/services/object/get/forward.go b/pkg/services/object/get/forward.go index ca239342dd..34be5aecee 100644 --- a/pkg/services/object/get/forward.go +++ b/pkg/services/object/get/forward.go @@ -63,3 +63,27 @@ func (s *Service) forwardHeadRequest(ctx context.Context, sortedNodeLists [][]ne return apistatus.ErrObjectNotFound } + +func (s *Service) forwardRangeRequest(ctx context.Context, sortedNodeLists [][]netmap.NodeInfo, forwardRequestFn ForwardRangeRequestFunc) error { + for i := range sortedNodeLists { + for j := range sortedNodeLists[i] { + conn, node, err := s.conns.(*clientCacheWrapper)._connect(ctx, sortedNodeLists[i][j]) + if err != nil { + s.log.Debug("get conn to remote node", + zap.Stringer("addresses", node.AddressGroup()), zap.Error(err)) + continue + } + + err = forwardRequestFn(ctx, conn) + if err == nil || errors.Is(err, ctx.Err()) { + return err + } + + if !errors.Is(err, apistatus.ErrObjectNotFound) { + s.log.Info("failed to RANGE object from remote node", zap.Error(err)) + } + } + } + + return apistatus.ErrObjectNotFound +} diff --git a/pkg/services/object/get/get.go b/pkg/services/object/get/get.go index 96e2f580fb..5ef80cf3a7 100644 --- a/pkg/services/object/get/get.go +++ b/pkg/services/object/get/get.go @@ -6,7 +6,6 @@ import ( "fmt" iec "github.com/nspcc-dev/neofs-node/internal/ec" - "github.com/nspcc-dev/neofs-node/pkg/services/object/internal" "github.com/nspcc-dev/neofs-node/pkg/util" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" "github.com/nspcc-dev/neofs-sdk-go/netmap" @@ -74,37 +73,6 @@ func (s *Service) Get(ctx context.Context, prm Prm) error { ecRules, ecNodeLists, prm.objWriter) } -func (s *Service) proxyGetRequest(ctx context.Context, sortedNodeLists [][]netmap.NodeInfo, proxyFn RequestForwarder, - req string, headWriter internal.HeaderWriter) error { - for i := range sortedNodeLists { - for j := range sortedNodeLists[i] { - conn, node, err := s.conns.(*clientCacheWrapper)._connect(ctx, sortedNodeLists[i][j]) - if err != nil { - s.log.Debug("get conn to remote node", - zap.Stringer("addresses", node.AddressGroup()), zap.Error(err)) - continue - } - - hdr, err := proxyFn(ctx, conn) - if err == nil { - if headWriter != nil { - return headWriter.WriteHeader(hdr) - } - return nil - } - - if errors.Is(err, apistatus.ErrObjectAlreadyRemoved) || errors.Is(err, apistatus.ErrObjectAccessDenied) || - errors.Is(err, apistatus.ErrObjectOutOfRange) || errors.Is(err, ctx.Err()) { - return err - } - - s.log.Info("request proxy failed", zap.String("request", req), zap.Error(err)) - } - } - - return apistatus.ErrObjectNotFound -} - // GetRange serves a request to get an object by address, and returns Streamer instance. func (s *Service) GetRange(ctx context.Context, prm RangePrm) error { pi, err := checkECPartInfoRequest(prm.common.XHeaders(), prm.container) @@ -115,6 +83,15 @@ func (s *Service) GetRange(ctx context.Context, prm RangePrm) error { if pi.RuleIndex >= 0 { // TODO: deny if node is not in the container? + + if prm.localBuffer != nil { + stream, err := s.localObjects.ReadECPartRange(prm.addr.Container(), prm.addr.Object(), pi, prm.rng.GetOffset(), prm.rng.GetLength(), prm.localBuffer) + if err == nil { + prm.submitLocalStreamFn(stream) + } + return err + } + return s.copyLocalECPartRange(prm.objWriter, prm.addr.Container(), prm.addr.Object(), pi, prm.rng.GetOffset(), prm.rng.GetLength()) } @@ -122,7 +99,8 @@ func (s *Service) GetRange(ctx context.Context, prm RangePrm) error { len(prm.container.PlacementPolicy().ECRules()) == 0 && // EC breaks TTL requirements currently. len(prm.container.PlacementPolicy().Replicas()) != 0 { // It handles locality internally. - return s.get(ctx, prm.commonPrm, withPayloadRange(prm.rng)).err + bufOpt := withLocalRangeBuffer(prm.localBuffer, prm.submitLocalStreamFn) + return s.get(ctx, prm.commonPrm, withPayloadRange(prm.rng), bufOpt).err } nodeLists, repRules, ecRules, err := s.neoFSNet.GetNodesForObject(prm.addr) @@ -136,7 +114,9 @@ func (s *Service) GetRange(ctx context.Context, prm RangePrm) error { func (s *Service) getRange(ctx context.Context, prm RangePrm, nodeLists [][]netmap.NodeInfo, repRules []uint, ecRules []iec.Rule, hashPrm *RangeHashPrm) error { if len(repRules) > 0 { // REP format does not require encoding - err := s.get(ctx, prm.commonPrm, withPreSortedContainerNodes(nodeLists[:len(repRules)], repRules), withPayloadRange(prm.rng), withHash(hashPrm)).err + bufOpt := withLocalRangeBuffer(prm.localBuffer, prm.submitLocalStreamFn) + forwardOpt := withForwardRangeRequestFunc(prm.forwardRequestFn) + err := s.get(ctx, prm.commonPrm, withPreSortedContainerNodes(nodeLists[:len(repRules)], repRules), withPayloadRange(prm.rng), withHash(hashPrm), bufOpt, forwardOpt).err if len(ecRules) == 0 || !errors.Is(err, apistatus.ErrObjectNotFound) { return err } @@ -151,8 +131,8 @@ func (s *Service) getRange(ctx context.Context, prm RangePrm, nodeLists [][]netm return err } - if prm.forwarder != nil && !localNodeInSets(s.neoFSNet, ecNodeLists) { - return s.proxyGetRequest(ctx, ecNodeLists, prm.forwarder, "RANGE", nil) + if prm.forwardRequestFn != nil && !localNodeInSets(s.neoFSNet, ecNodeLists) { + return s.forwardRangeRequest(ctx, ecNodeLists, prm.forwardRequestFn) } if prm.raw { diff --git a/pkg/services/object/get/prm.go b/pkg/services/object/get/prm.go index daaf05438a..9c08858667 100644 --- a/pkg/services/object/get/prm.go +++ b/pkg/services/object/get/prm.go @@ -19,6 +19,9 @@ import ( // SubmitStreamFunc is a callback for partially read object stream. type SubmitStreamFunc = func(int, io.ReadCloser) +// SubmitDataStreamFunc is a handler of data stream. +type SubmitDataStreamFunc = func(io.ReadCloser) + // Prm groups parameters of Get service call. type Prm struct { commonPrm @@ -34,6 +37,11 @@ type RangePrm struct { commonPrm rng *object.Range + + localBuffer []byte + submitLocalStreamFn SubmitDataStreamFunc + + forwardRequestFn ForwardRangeRequestFunc } // RangeHashPrm groups parameters of GetRange service call. @@ -64,6 +72,10 @@ type SubmitHeadResponseFunc = func(mem.BufferSlice, iprotobuf.BuffersSlice) // through passed connection. type ForwardGetRequestFunc = func(context.Context, coreclient.MultiAddressClient) error +// ForwardRangeRequestFunc continues to serve current RANGE request from remote node +// through passed connection. +type ForwardRangeRequestFunc = func(context.Context, coreclient.MultiAddressClient) error + // HeadPrm groups parameters of Head service call. type HeadPrm struct { commonPrm @@ -86,7 +98,6 @@ type commonPrm struct { raw bool - forwarder RequestForwarder rangeForwarder RangeRequestForwarder // signerKey is a cached key that should be used for spawned @@ -144,10 +155,6 @@ func (p *commonPrm) SetCommonParameters(common *util.CommonPrm) { p.common = common } -func (p *commonPrm) SetRequestForwarder(f RequestForwarder) { - p.forwarder = f -} - func (p *commonPrm) SetRangeHashRequestForwarder(f RangeRequestForwarder) { p.rangeForwarder = f } @@ -235,3 +242,24 @@ func (p *HeadPrm) SetSubmitHeadResponseFunc(f SubmitHeadResponseFunc) { func (p *Prm) SetRequestForwarder(f ForwardGetRequestFunc) { p.forwardRequestFn = f } + +// WithBuffer specifies a buffer to use for header reading and a callback for +// payload range stream. If passed, the stream must be finally closed by the +// caller. +func (p *RangePrm) WithBuffer(buffer []byte, submitStreamFn SubmitDataStreamFunc) { + p.localBuffer = buffer + p.submitLocalStreamFn = submitStreamFn +} + +// SetRequestForwarder specifies request transport callback to use for streaming +// responses from remote node. +// +// The f should return: +// - nil on completed object transmission +// - [object.SplitInfoError] on OK with corresponding body field +// - [apistatus.ErrObjectNotFound] on 404 status +// - nil on other API statuses +// - any other transport/protocol error otherwise +func (p *RangePrm) SetRequestForwarder(f ForwardRangeRequestFunc) { + p.forwardRequestFn = f +} diff --git a/pkg/services/object/get/service.go b/pkg/services/object/get/service.go index 4368c33643..f151213473 100644 --- a/pkg/services/object/get/service.go +++ b/pkg/services/object/get/service.go @@ -83,6 +83,8 @@ type cfg struct { GetECPartRange(cnr cid.ID, parent oid.ID, pi iec.PartInfo, off, ln uint64) (uint64, io.ReadCloser, error) // ReadECPart is a buffered alternative for GetECPart similar to ReadObject. ReadECPart(cnr cid.ID, parent oid.ID, pi iec.PartInfo, buf []byte) (int, io.ReadCloser, error) + // ReadECPartRange is a buffered alternative for GetECPartRange similar to ReadECPart. + ReadECPartRange(cnr cid.ID, parent oid.ID, pi iec.PartInfo, off, ln uint64, buf []byte) (io.ReadCloser, error) Head(oid.Address, bool) (*object.Object, error) ReadHeader(oid.Address, bool, []byte) (int, error) // HeadECPart is similar to GetECPart but returns only the header. diff --git a/pkg/services/object/get/service_test.go b/pkg/services/object/get/service_test.go index dab3b04159..a98466ba0a 100644 --- a/pkg/services/object/get/service_test.go +++ b/pkg/services/object/get/service_test.go @@ -212,3 +212,7 @@ func (unimplementedLocalStorage) HeadECPart(cid.ID, oid.ID, iec.PartInfo) (objec func (unimplementedLocalStorage) ReadECPartHeader(cid.ID, oid.ID, iec.PartInfo, []byte) (int, error) { panic("unimplemented") } + +func (unimplementedLocalStorage) ReadECPartRange(cid.ID, oid.ID, iec.PartInfo, uint64, uint64, []byte) (io.ReadCloser, error) { + panic("unimplemented") +} diff --git a/pkg/services/object/get/util.go b/pkg/services/object/get/util.go index 906699afcb..2e610655a9 100644 --- a/pkg/services/object/get/util.go +++ b/pkg/services/object/get/util.go @@ -189,9 +189,8 @@ func (c *clientWrapper) getObject(exec *execCtx) (*object.Object, io.ReadCloser, return nil, nil, exec.forwardGetRequestFn(exec.ctx, c.client) } - if exec.isForwardingEnabled() { - obj, err := exec.prm.forwarder(exec.ctx, c.client) - return obj, nil, err + if exec.forwardRangeRequestFn != nil { + return nil, nil, exec.forwardRangeRequestFn(exec.ctx, c.client) } key, err := exec.key() @@ -313,6 +312,13 @@ func (e *storageEngineWrapper) get(exec *execCtx) (*object.Object, io.ReadCloser } if rng := exec.ctxRange(); rng != nil { + if exec.localRangeBuffer != nil { + r, err := e.engine.ReadPayloadRange(exec.address(), rng.GetOffset(), rng.GetLength(), exec.localRangeBuffer) + if err == nil { + exec.submitLocalRangeStreamFn(r) + } + return nil, nil, err + } r, err := e.engine.GetRangeStream(exec.address(), rng.GetOffset(), rng.GetLength()) return nil, r, err } diff --git a/pkg/services/object/proto.go b/pkg/services/object/proto.go index 971c8c4d49..405baf36d5 100644 --- a/pkg/services/object/proto.go +++ b/pkg/services/object/proto.go @@ -217,6 +217,14 @@ func shiftHeaderInGetResponseBuffer(respBuf, hdrBuf []byte) iprotobuf.FieldBound } func shiftPayloadChunkInGetResponseBuffer(respBuf []byte, off, ln int) iprotobuf.FieldBounds { + return shiftPayloadChunkInResponseBuffer(respBuf, iprotobuf.TagBytes2, off, ln) +} + +func shiftPayloadChunkInRangeResponseBuffer(respBuf []byte, off, ln int) iprotobuf.FieldBounds { + return shiftPayloadChunkInResponseBuffer(respBuf, iprotobuf.TagBytes1, off, ln) +} + +func shiftPayloadChunkInResponseBuffer(respBuf []byte, chunkFldTag byte, off, ln int) iprotobuf.FieldBounds { bodyFldPrefixLen := 1 + protowire.SizeVarint(uint64(ln)) var bodyf iprotobuf.FieldBounds @@ -228,7 +236,7 @@ func shiftPayloadChunkInGetResponseBuffer(respBuf []byte, off, ln int) iprotobuf respBuf[bodyf.From] = iprotobuf.TagBytes1 // body binary.PutUvarint(respBuf[bodyf.From+1:], uint64(bodyFldPrefixLen+ln)) - respBuf[bodyf.ValueFrom] = iprotobuf.TagBytes2 // chunk + respBuf[bodyf.ValueFrom] = chunkFldTag binary.PutUvarint(respBuf[bodyf.ValueFrom+1:], uint64(ln)) bodyf.To = off + ln diff --git a/pkg/services/object/range.go b/pkg/services/object/range.go new file mode 100644 index 0000000000..bbc40abe7d --- /dev/null +++ b/pkg/services/object/range.go @@ -0,0 +1,175 @@ +package object + +import ( + "context" + "errors" + "fmt" + "io" + + iprotobuf "github.com/nspcc-dev/neofs-node/internal/protobuf" + "github.com/nspcc-dev/neofs-node/internal/protobuf/protoscan" + apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" + protoobject "github.com/nspcc-dev/neofs-sdk-go/proto/object" + protostatus "github.com/nspcc-dev/neofs-sdk-go/proto/status" + "google.golang.org/grpc" + "google.golang.org/grpc/mem" + "google.golang.org/protobuf/encoding/protowire" +) + +type rangeStreamProgress struct { + empty bool + readPayload int +} + +// returns: +// - nil on completed payload transmission +// - [object.SplitInfoError]/nil on split info response and unset/set raw flag in request +// - [apistatus.ErrObjectNotFound] on 404 status +// - nil on other API statuses +// - any other transport/protocol error otherwise +func (s *rangeStream) continueWithConn(ctx context.Context, conn *grpc.ClientConn) error { + stream, err := conn.NewStream(ctx, &protoobject.ObjectService_ServiceDesc.Streams[3], protoobject.ObjectService_GetRange_FullMethodName, + grpc.StaticMethod(), + grpc.ForceCodecV2(iprotobuf.BufferedCodec{}), + ) + if err != nil { + return fmt.Errorf("stream opening failed: %w", err) + } + if err = stream.SendMsg(s.req); err != nil { + return fmt.Errorf("send request: %w", err) + } + if err = stream.CloseSend(); err != nil { + return fmt.Errorf("close send: %w", err) + } + + var prog rangeStreamProgress + for { + var respBuf mem.BufferSlice + if err = stream.RecvMsg(&respBuf); err != nil { + if errors.Is(err, io.EOF) { + if prog.empty { + return io.ErrUnexpectedEOF + } + return nil + } + return fmt.Errorf("reading the response failed: %w", err) + } + + fin, sent, err := s.handleResponse(&prog, respBuf) + if !sent { + respBuf.Free() + } + if err != nil { + return fmt.Errorf("handle next stream message: %w", err) + } + if fin { + return nil + } + } +} + +func (s *rangeStream) handleResponse(streamProg *rangeStreamProgress, respBuf mem.BufferSlice) (bool, bool, error) { + var code uint32 + var body iprotobuf.BuffersSlice + + var opts protoscan.ScanMessageOptions + opts.InterceptNested = func(num protowire.Number, buffers iprotobuf.BuffersSlice) error { + switch num { + default: + return protoscan.ErrContinue + case iprotobuf.FieldResponseBody: + body = buffers + return nil + case iprotobuf.FieldResponseMetaHeader: + var err error + code, err = getStatusCodeFromResponseMetaHeader(buffers) + if err != nil { + return fmt.Errorf("handle meta header: %w", err) + } + return nil + } + } + + err := protoscan.ScanMessage(iprotobuf.NewBuffersSlice(respBuf), protoscan.ResponseScheme, opts) + if err != nil { + return false, false, err + } + + if code == protostatus.ObjectNotFound { + return false, false, apistatus.ErrObjectNotFound + } + + if code != protostatus.OK { + return true, true, s.base.SendMsg(respBuf) + } + + sent, err := s.handleResponseBody(streamProg, respBuf, body) + if err != nil { + return false, sent, fmt.Errorf("handle body: %w", err) + } + + return false, sent, nil +} + +func (s *rangeStream) handleResponseBody(streamProg *rangeStreamProgress, respBuf mem.BufferSlice, buffers iprotobuf.BuffersSlice) (bool, error) { + var oneofNum protowire.Number + var oneofFld iprotobuf.BuffersSlice + + var opts protoscan.ScanMessageOptions + opts.InterceptBytes = func(num protowire.Number, buffers iprotobuf.BuffersSlice) error { + if num == protoobject.FieldRangeResponseBodyChunk { + oneofNum, oneofFld = num, buffers + } + return nil + } + opts.InterceptNested = func(num protowire.Number, buffers iprotobuf.BuffersSlice) error { + switch num { + default: + return protoscan.ErrContinue + case protoobject.FieldGetResponseBodySplitInfo: + oneofNum, oneofFld = num, buffers + return nil + } + } + + err := protoscan.ScanMessage(buffers, protoscan.ObjectGetRangeResponseBodyScheme, opts) + if err != nil { + return false, err + } + + switch oneofNum { + default: + return false, errors.New("none of the supported oneof fields are specified") + case protoobject.FieldRangeResponseBodyChunk: + return s.handleChunkResponse(streamProg, respBuf, oneofFld) + case protoobject.FieldRangeResponseBodySplitInfo: + return s.handleSplitInfo(respBuf, oneofFld) + } +} + +func (s *rangeStream) handleChunkResponse(streamProg *rangeStreamProgress, respBuf mem.BufferSlice, chunkBuffers iprotobuf.BuffersSlice) (bool, error) { + chunkLen := chunkBuffers.Len() + + from, to := chunkBoundsToSend(s.respondedPayload, streamProg.readPayload, chunkLen) + if from == to { + streamProg.readPayload += chunkLen + return false, nil + } + + _, ok := chunkBuffers.MoveNext(from) + if !ok { + return false, fmt.Errorf("seek chunk left bound in response buffers: %w", io.ErrUnexpectedEOF) + } + + chunkBuffers, ok = chunkBuffers.MoveNext(to - from) + if !ok { + return false, fmt.Errorf("seek chunk right bound in response buffers: %w", io.ErrUnexpectedEOF) + } + + return s.srv.sendChunkResponse(s.base, respBuf, chunkBuffers, to-from, chunkLen, + s.signResponse, iprotobuf.TagBytes1, &streamProg.readPayload, &s.respondedPayload, shiftPayloadChunkInRangeResponseBuffer) +} + +func (s *rangeStream) handleSplitInfo(respBuf mem.BufferSlice, buffers iprotobuf.BuffersSlice) (bool, error) { + return handleSplitInfo(s.req.GetBody().GetRaw(), s.base, respBuf, buffers) +} diff --git a/pkg/services/object/server.go b/pkg/services/object/server.go index 566f6bb1d1..098ab22716 100644 --- a/pkg/services/object/server.go +++ b/pkg/services/object/server.go @@ -1384,6 +1384,10 @@ type rangeStream struct { base protoobject.ObjectService_GetRangeServer srv *Server req *protoobject.GetRangeRequest + + respondedPayload int + + signResponse bool } func (s *rangeStream) WriteChunk(chunk []byte) error { @@ -1435,10 +1439,13 @@ func (s *Server) GetRange(req *protoobject.GetRangeRequest, gStream protoobject. return s.sendStatusRangeResponse(gStream, err, req) } + needSignResponse := needSignGetResponse(req) + p, err := convertRangePrm(s.signer, reqInfo.Container, req, &rangeStream{ - base: gStream, - srv: s, - req: req, + base: gStream, + srv: s, + req: req, + signResponse: needSignResponse, }) if err != nil { if !errors.Is(err, apistatus.Error) { @@ -1448,13 +1455,73 @@ func (s *Server) GetRange(req *protoobject.GetRangeRequest, gStream protoobject. } return s.sendStatusRangeResponse(gStream, err, req) } + + var stream io.ReadCloser + defer func() { + if stream != nil { + stream.Close() + } + }() + + hdrRespBuf, hdrBuf := getBufferForHeadResponse() + + p.WithBuffer(hdrBuf, func(s io.ReadCloser) { stream = s }) + err = s.handlers.GetRange(gStream.Context(), p) + + hdrRespBuf.Free() + + if err != nil { + return s.sendStatusRangeResponse(gStream, err, req) + } + + if stream == nil { + return nil + } + + err = s.copyRangeStream(gStream, stream, needSignResponse) if err != nil { return s.sendStatusRangeResponse(gStream, err, req) } + return nil } +func (s *Server) copyRangeStream(gStream protoobject.ObjectService_GetRangeServer, stream io.Reader, needSignResp bool) error { + for { + // chunk response buffers for GET completely suitable for RANGE + respBuf, buf := getBufferForChunkGetResponse() + + n, err := io.ReadFull(stream, buf) + streamDone := errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) + if err != nil && !streamDone { + respBuf.Free() + return fmt.Errorf("read payload stream: %w", err) + } + + if n == 0 { + respBuf.Free() + return nil + } + + bodyf := shiftPayloadChunkInRangeResponseBuffer(respBuf.SliceBuffer, maxChunkOffsetInGetResponse, n) + + if needSignResp { + n, err := s.signResponse(respBuf.SliceBuffer[bodyf.To:], respBuf.SliceBuffer[bodyf.ValueFrom:bodyf.To], nil) + if err != nil { + respBuf.Free() + return fmt.Errorf("sign chunk response: %w", err) + } + bodyf.To += n + } + + respBuf.SetBounds(bodyf.From, bodyf.To) + if err = gStream.SendMsg(respBuf); err != nil || streamDone { + return err + } + } +} + // converts original request into parameters accepted by the internal handler. // Note that the stream is untouched within this call, errors are not reported // into it. @@ -1499,12 +1566,11 @@ func convertRangePrm(signer ecdsa.PrivateKey, cnr container.Container, req *prot } var onceResign sync.Once - var respondedPayload int meta := req.GetMetaHeader() if meta == nil { return getsvc.RangePrm{}, errors.New("missing meta header") } - p.SetRequestForwarder(func(ctx context.Context, c client.MultiAddressClient) (*object.Object, error) { + p.SetRequestForwarder(func(ctx context.Context, c client.MultiAddressClient) error { var err error onceResign.Do(func() { req.MetaHeader = &protosession.RequestMetaHeader{ @@ -1515,70 +1581,14 @@ func convertRangePrm(signer ecdsa.PrivateKey, cnr container.Container, req *prot req.VerifyHeader, err = neofscrypto.SignRequestWithBuffer(neofsecdsa.Signer(signer), req, nil) }) if err != nil { - return nil, err + return err } - return nil, c.ForEachGRPCConn(ctx, func(ctx context.Context, conn *grpc.ClientConn) error { - err := continueRangeFromRemoteNode(ctx, conn, req, stream, &respondedPayload) - if errors.Is(err, io.EOF) { - return nil - } - return err // TODO: log error - }) + return c.ForEachGRPCConn(ctx, stream.continueWithConn) }) return p, nil } -func continueRangeFromRemoteNode(ctx context.Context, conn *grpc.ClientConn, req *protoobject.GetRangeRequest, - stream *rangeStream, respondedPayload *int) error { - rangeStream, err := protoobject.NewObjectServiceClient(conn).GetRange(ctx, req) - if err != nil { - return fmt.Errorf("stream opening failed: %w", err) - } - - var readPayload int - for { - resp, err := rangeStream.Recv() - if err != nil { - if errors.Is(err, io.EOF) { - return io.EOF - } - return fmt.Errorf("reading the response failed: %w", err) - } - - if err := checkStatus(resp.GetMetaHeader().GetStatus()); err != nil { - return err - } - - switch v := resp.GetBody().GetRangePart().(type) { - default: - return fmt.Errorf("unexpected range type %T", v) - case *protoobject.GetRangeResponse_Body_Chunk: - fullChunk := v.Chunk - respChunk := chunkToSend(*respondedPayload, readPayload, fullChunk) - if len(respChunk) == 0 { - readPayload += len(fullChunk) - continue - } - if err := stream.WriteChunk(respChunk); err != nil { - return fmt.Errorf("could not write object chunk in Get forwarder: %w", err) - } - readPayload += len(fullChunk) - *respondedPayload += len(respChunk) - case *protoobject.GetRangeResponse_Body_SplitInfo: - if v == nil || v.SplitInfo == nil { - return errors.New("nil split info oneof field") - } - si := object.NewSplitInfo() - err := si.FromProtoMessage(v.SplitInfo) - if err != nil { - return err - } - return object.NewSplitInfoError(si) - } - } -} - func (s *Server) Search(_ *protoobject.SearchRequest, _ protoobject.ObjectService_SearchServer) error { return grpcstatus.Error(grpccodes.Unimplemented, "no longer supported, use SearchV2") } @@ -2214,11 +2224,6 @@ func checkStatus(st *protostatus.Status) error { return apistatus.ToError(st) } -func chunkToSend(global, local int, chunk []byte) []byte { - from, to := chunkBoundsToSend(global, local, len(chunk)) - return chunk[from:to] -} - func chunkBoundsToSend(global, local, chunkLen int) (int, int) { if global == local { return 0, chunkLen diff --git a/pkg/services/object/util.go b/pkg/services/object/util.go new file mode 100644 index 0000000000..dbf50dd33a --- /dev/null +++ b/pkg/services/object/util.go @@ -0,0 +1,125 @@ +package object + +import ( + "encoding/binary" + "errors" + "fmt" + + iprotobuf "github.com/nspcc-dev/neofs-node/internal/protobuf" + "github.com/nspcc-dev/neofs-node/internal/protobuf/protoscan" + "github.com/nspcc-dev/neofs-sdk-go/object" + oid "github.com/nspcc-dev/neofs-sdk-go/object/id" + protoobject "github.com/nspcc-dev/neofs-sdk-go/proto/object" + protorefs "github.com/nspcc-dev/neofs-sdk-go/proto/refs" + "google.golang.org/grpc" + "google.golang.org/grpc/mem" + "google.golang.org/protobuf/encoding/protowire" +) + +func handleSplitInfo(raw bool, respStream grpc.ServerStream, respBuf mem.BufferSlice, buffers iprotobuf.BuffersSlice) (bool, error) { + var si object.SplitInfo + var opts protoscan.ScanMessageOptions + + if !raw { + opts.InterceptBytes = func(num protowire.Number, buffers iprotobuf.BuffersSlice) error { + if num == protoobject.FieldSplitInfoSplitID { + id := object.NewSplitIDFromV2(buffers.ReadOnlyData()) + if id == nil { + return errors.New("invalid split ID") + } + si.SetSplitID(id) + } + return nil + } + opts.InterceptNested = func(num protowire.Number, buffers iprotobuf.BuffersSlice) error { + if num != protoobject.FieldSplitInfoLastPart && num != protoobject.FieldSplitInfoLink && num != protoobject.FieldSplitInfoFirstPart { + return protoscan.ErrContinue + } + + var opts protoscan.ScanMessageOptions + opts.InterceptBytes = func(num2 protowire.Number, buffers iprotobuf.BuffersSlice) error { + if num2 == protorefs.FieldObjectIDValue { + switch num { //nolint:exhaustive + case protoobject.FieldSplitInfoLastPart: + si.SetLastPart(oid.ID(buffers.ReadOnlyData())) + case protoobject.FieldSplitInfoLink: + si.SetLink(oid.ID(buffers.ReadOnlyData())) + case protoobject.FieldSplitInfoFirstPart: + si.SetFirstPart(oid.ID(buffers.ReadOnlyData())) + } + } + return nil + } + return protoscan.ScanMessage(buffers, protoscan.ObjectIDScheme, opts) + } + } + + err := protoscan.ScanMessage(buffers, protoscan.ObjectSplitInfoScheme, opts) + if err != nil { + return false, fmt.Errorf("handle split info field: %w", err) + } + + if !raw { + return false, object.NewSplitInfoError(&si) + } + + return true, respStream.SendMsg(respBuf) +} + +func (s *Server) sendChunkResponse(respStream grpc.ServerStream, respBuf mem.BufferSlice, chunkBuffers iprotobuf.BuffersSlice, + respChunkLen, chunkLen int, signResponse bool, chunkFldTag byte, readStream, responded *int, shiftFunc func([]byte, int, int) iprotobuf.FieldBounds) (bool, error) { + remoteSent := respChunkLen == chunkLen + if !remoteSent { + if respChunkLen <= maxGetResponseChunkLen { + localRespBuf, _ := getBufferForChunkGetResponse() + + chunkBuffers.CopyTo(localRespBuf.SliceBuffer[maxChunkOffsetInGetResponse:]) + + bodyf := shiftFunc(localRespBuf.SliceBuffer, maxChunkOffsetInGetResponse, respChunkLen) + + if signResponse { + n, err := s.signResponse(localRespBuf.SliceBuffer[bodyf.To:], localRespBuf.SliceBuffer[bodyf.ValueFrom:bodyf.To], nil) + if err != nil { + return false, fmt.Errorf("sign chunk response: %w", err) + } + bodyf.To += n + } + + localRespBuf.SetBounds(bodyf.From, bodyf.To) + respBuf = mem.BufferSlice{localRespBuf} + } else { + // TODO: in this case we could make respBuf = mem.BufferSlice{prefix, chunkBuffers}, + // but then we'd have to provide mem.Buffer from iprotobuf.BuffersSlice + bodyFldLen := 1 + protowire.SizeBytes(respChunkLen) + fullLen := 1 + protowire.SizeBytes(bodyFldLen) + if signResponse { + fullLen += maxResponseVerificationHeaderLen + } + + b := make(mem.SliceBuffer, fullLen) + b[0] = iprotobuf.TagBytes1 // body field + off := 1 + binary.PutUvarint(b[1:], uint64(bodyFldLen)) + b[off] = chunkFldTag + off += 1 + binary.PutUvarint(b[off+1:], uint64(respChunkLen)) + off += chunkBuffers.CopyTo(b[off:]) + if signResponse { + n, err := s.signResponse(b[off:], b[:off], nil) + if err != nil { + return false, fmt.Errorf("sign chunk response: %w", err) + } + b = b[:off+n] + } + + respBuf = mem.BufferSlice{b} + } + } + + if err := respStream.SendMsg(respBuf); err != nil { + return remoteSent, err + } + + *readStream += chunkLen + *responded += respChunkLen + + return remoteSent, nil +}