From c8de5b2d53182cbc50dc7b980c3b5a574fc15e2b Mon Sep 17 00:00:00 2001 From: Leonard Lyubich Date: Tue, 19 Dec 2023 20:30:08 +0400 Subject: [PATCH] slicer: Allow to specify known size of the full payload Previously, there was no ability to specify pre-known size of payload of the object to be sliced. The main drawback was the slicer's inability to determine the optimal buffer size needed to read the payload. Therefore, the slicer always allocated a buffer of `MaxObjectSize` size. With this behavior, the smaller the size of the loaded payload (up to 0), the more memory was wasted. For example, this could lead to 64MB allocations for 1K objects which is a 65,000-fold excess. Now the slicer supports an optional fixed payload size via method `Options.SetPayloadSize` method. When used, this option tunes behavior to allocate payload buffer according to the size. The option could be used with files, in-memory slices and other cases to improve application performance. Signed-off-by: Leonard Lyubich --- object/slicer/options.go | 24 ++++++-- object/slicer/slicer.go | 28 +++++---- object/slicer/slicer_test.go | 108 ++++++++++++++++++++++++++++++++++- 3 files changed, 145 insertions(+), 15 deletions(-) diff --git a/object/slicer/options.go b/object/slicer/options.go index 45973641..09900631 100644 --- a/object/slicer/options.go +++ b/object/slicer/options.go @@ -19,6 +19,9 @@ type Options struct { sessionToken *session.Object bearerToken *bearer.Token + + payloadSizeKnown bool + payloadSize uint64 } // SetObjectPayloadLimit specifies data size limit for produced physically @@ -55,16 +58,29 @@ func (x *Options) SetCopiesNumber(copiesNumber uint32) { x.copiesNumber = copiesNumber } -// SetPayloadBuffer sets pre-allocated payloadBuffer to be used to object uploading. -// The payloadBuffer should have length at least MaxObjectSize+1 from NeoFS, -// otherwise, it does not affect anything. +// SetPayloadBuffer sets pre-allocated payloadBuffer to be used to object +// uploading. The payloadBuffer should have length at least +// [Options.ObjectPayloadLimit] + 1. If [Options.SetPayloadSize] is called, its +// argument is used as buffer size requirement instead. Any smaller buffer is +// ignored. func (x *Options) SetPayloadBuffer(payloadBuffer []byte) { x.payloadBuffer = payloadBuffer } +// SetPayloadSize allows to specify object's payload size known in advance. if +// the size is fixed, the option is recommended as it improves the performance +// of the application using the [Slicer]. +func (x *Options) SetPayloadSize(size uint64) { + x.payloadSizeKnown = true + x.payloadSize = size +} + // ObjectPayloadLimit returns required max object size. func (x *Options) ObjectPayloadLimit() uint64 { - return x.objectPayloadLimit + if x.objectPayloadLimit > 0 { + return x.objectPayloadLimit + } + return defaultPayloadSizeLimit } // CurrentNeoFSEpoch returns epoch. diff --git a/object/slicer/slicer.go b/object/slicer/slicer.go index b3a7af3b..da6d7ffb 100644 --- a/object/slicer/slicer.go +++ b/object/slicer/slicer.go @@ -148,15 +148,6 @@ func InitPut(ctx context.Context, ow ObjectWriter, header object.Object, signer const defaultPayloadSizeLimit = 1 << 20 -// childPayloadSizeLimit returns configured size limit of the child object's -// payload which defaults to 1MB. -func childPayloadSizeLimit(opts Options) uint64 { - if opts.objectPayloadLimit > 0 { - return opts.objectPayloadLimit - } - return defaultPayloadSizeLimit -} - func slice(ctx context.Context, ow ObjectWriter, header object.Object, data io.Reader, signer user.Signer, opts Options) (oid.ID, error) { var rootID oid.ID var n int @@ -259,7 +250,15 @@ func initPayloadStream(ctx context.Context, ow ObjectWriter, header object.Objec stubObject: &stubObject, } - maxObjSize := childPayloadSizeLimit(opts) + maxObjSize := opts.ObjectPayloadLimit() + if opts.payloadSizeKnown { + res.payloadSizeKnown = true + res.payloadSize = opts.payloadSize + + if opts.payloadSize < maxObjSize { + maxObjSize = opts.payloadSize + } + } if uint64(len(opts.payloadBuffer)) > maxObjSize { res.buf = opts.payloadBuffer[:maxObjSize+1] @@ -302,8 +301,13 @@ type PayloadWriter struct { writtenChildren []oid.ID prmObjectPutInit client.PrmObjectPutInit stubObject *object.Object + + payloadSizeKnown bool + payloadSize uint64 } +var errPayloadSizeExceeded = errors.New("payload size exceeded") + // Write writes next chunk of the object data. Concatenation of all chunks forms // the payload of the final object. When the data is over, the PayloadWriter // should be closed. @@ -313,6 +317,10 @@ func (x *PayloadWriter) Write(chunk []byte) (int, error) { return 0, nil } + if x.payloadSizeKnown && x.rootMeta.length+uint64(len(chunk)) > x.payloadSize { + return 0, errPayloadSizeExceeded + } + n, err := x.currentWriter.Write(chunk) if err == nil || !errors.Is(err, errOverflow) { return n, err diff --git a/object/slicer/slicer_test.go b/object/slicer/slicer_test.go index ab950c44..f86d9108 100644 --- a/object/slicer/slicer_test.go +++ b/object/slicer/slicer_test.go @@ -254,8 +254,17 @@ func randomInput(tb testing.TB, size, sizeLimit uint64) (input, slicer.Options) } func testSlicer(t *testing.T, size, sizeLimit uint64) { + testSlicerWithKnownSize(t, size, sizeLimit, true) + testSlicerWithKnownSize(t, size, sizeLimit, false) +} + +func testSlicerWithKnownSize(t *testing.T, size, sizeLimit uint64, known bool) { in, opts := randomInput(t, size, sizeLimit) + if known { + opts.SetPayloadSize(uint64(len(in.payload))) + } + checker := &slicedObjectChecker{ opts: opts, tb: t, @@ -266,7 +275,7 @@ func testSlicer(t *testing.T, size, sizeLimit uint64) { for i := object.TypeRegular; i <= object.TypeLock; i++ { in.objectType = i - t.Run("slicer with "+i.EncodeToString(), func(t *testing.T) { + t.Run(fmt.Sprintf("slicer with %s,known_size=%t", i.EncodeToString(), known), func(t *testing.T) { testSlicerByHeaderType(t, checker, in, opts) }) } @@ -953,3 +962,100 @@ func BenchmarkReadPayloadBuffer(b *testing.B) { }) } } + +func TestKnownPayloadSize(t *testing.T) { + ctx := context.Background() + t.Run("overflow", func(t *testing.T) { + t.Run("read", func(t *testing.T) { + in, opts := randomInput(t, 1, 1) + obj := objecttest.Object(t) + hdr := *obj.CutPayload() + + opts.SetPayloadSize(20) + r := bytes.NewReader(make([]byte, 21)) + + _, err := slicer.Put(ctx, discardObject{opts: opts}, hdr, in.signer, r, opts) + require.ErrorContains(t, err, "payload size exceeded") + }) + + t.Run("write", func(t *testing.T) { + in, opts := randomInput(t, 1, 1) + obj := objecttest.Object(t) + hdr := *obj.CutPayload() + + opts.SetPayloadSize(20) + + w, err := slicer.InitPut(ctx, discardObject{opts: opts}, hdr, in.signer, opts) + require.NoError(t, err) + + for i := byte(0); i < 21; i++ { + _, err = w.Write([]byte{1}) + if i < 20 { + require.NoError(t, err) + } else { + require.ErrorContains(t, err, "payload size exceeded") + } + } + }) + }) +} + +func BenchmarkKnownPayloadSize(b *testing.B) { + ctx := context.Background() + for _, tc := range []struct { + sizeLimit uint64 + size uint64 + }{ + {sizeLimit: 1 << 10, size: 1}, + {sizeLimit: 1 << 10, size: 1 << 10}, + {sizeLimit: 1 << 10, size: 10 << 10}, + } { + b.Run(fmt.Sprintf("limit=%d,size=%d", tc.sizeLimit, tc.size), func(b *testing.B) { + b.Run("read", func(b *testing.B) { + obj := objecttest.Object(b) + hdr := *obj.CutPayload() + signer := user.NewSigner(test.RandomSigner(b), usertest.ID(b)) + payload := make([]byte, tc.size) + rand.Read(payload) + + var opts slicer.Options + opts.SetObjectPayloadLimit(tc.sizeLimit) + opts.SetPayloadSize(tc.size) + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + _, err := slicer.Put(ctx, discardObject{opts: opts}, hdr, signer, bytes.NewReader(payload), opts) + require.NoError(b, err) + } + }) + + b.Run("write", func(b *testing.B) { + obj := objecttest.Object(b) + hdr := *obj.CutPayload() + signer := user.NewSigner(test.RandomSigner(b), usertest.ID(b)) + payload := make([]byte, tc.size) + rand.Read(payload) + + var opts slicer.Options + opts.SetObjectPayloadLimit(tc.sizeLimit) + opts.SetPayloadSize(tc.size) + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + w, err := slicer.InitPut(ctx, discardObject{opts: opts}, hdr, signer, opts) + require.NoError(b, err) + + _, err = w.Write(payload) + if err == nil { + err = w.Close() + } + require.NoError(b, err) + } + }) + }) + } +}