diff --git a/object/slicer/options.go b/object/slicer/options.go index 45973641..09900631 100644 --- a/object/slicer/options.go +++ b/object/slicer/options.go @@ -19,6 +19,9 @@ type Options struct { sessionToken *session.Object bearerToken *bearer.Token + + payloadSizeKnown bool + payloadSize uint64 } // SetObjectPayloadLimit specifies data size limit for produced physically @@ -55,16 +58,29 @@ func (x *Options) SetCopiesNumber(copiesNumber uint32) { x.copiesNumber = copiesNumber } -// SetPayloadBuffer sets pre-allocated payloadBuffer to be used to object uploading. -// The payloadBuffer should have length at least MaxObjectSize+1 from NeoFS, -// otherwise, it does not affect anything. +// SetPayloadBuffer sets pre-allocated payloadBuffer to be used to object +// uploading. The payloadBuffer should have length at least +// [Options.ObjectPayloadLimit] + 1. If [Options.SetPayloadSize] is called, its +// argument is used as buffer size requirement instead. Any smaller buffer is +// ignored. func (x *Options) SetPayloadBuffer(payloadBuffer []byte) { x.payloadBuffer = payloadBuffer } +// SetPayloadSize allows to specify object's payload size known in advance. if +// the size is fixed, the option is recommended as it improves the performance +// of the application using the [Slicer]. +func (x *Options) SetPayloadSize(size uint64) { + x.payloadSizeKnown = true + x.payloadSize = size +} + // ObjectPayloadLimit returns required max object size. func (x *Options) ObjectPayloadLimit() uint64 { - return x.objectPayloadLimit + if x.objectPayloadLimit > 0 { + return x.objectPayloadLimit + } + return defaultPayloadSizeLimit } // CurrentNeoFSEpoch returns epoch. diff --git a/object/slicer/slicer.go b/object/slicer/slicer.go index b3a7af3b..da6d7ffb 100644 --- a/object/slicer/slicer.go +++ b/object/slicer/slicer.go @@ -148,15 +148,6 @@ func InitPut(ctx context.Context, ow ObjectWriter, header object.Object, signer const defaultPayloadSizeLimit = 1 << 20 -// childPayloadSizeLimit returns configured size limit of the child object's -// payload which defaults to 1MB. -func childPayloadSizeLimit(opts Options) uint64 { - if opts.objectPayloadLimit > 0 { - return opts.objectPayloadLimit - } - return defaultPayloadSizeLimit -} - func slice(ctx context.Context, ow ObjectWriter, header object.Object, data io.Reader, signer user.Signer, opts Options) (oid.ID, error) { var rootID oid.ID var n int @@ -259,7 +250,15 @@ func initPayloadStream(ctx context.Context, ow ObjectWriter, header object.Objec stubObject: &stubObject, } - maxObjSize := childPayloadSizeLimit(opts) + maxObjSize := opts.ObjectPayloadLimit() + if opts.payloadSizeKnown { + res.payloadSizeKnown = true + res.payloadSize = opts.payloadSize + + if opts.payloadSize < maxObjSize { + maxObjSize = opts.payloadSize + } + } if uint64(len(opts.payloadBuffer)) > maxObjSize { res.buf = opts.payloadBuffer[:maxObjSize+1] @@ -302,8 +301,13 @@ type PayloadWriter struct { writtenChildren []oid.ID prmObjectPutInit client.PrmObjectPutInit stubObject *object.Object + + payloadSizeKnown bool + payloadSize uint64 } +var errPayloadSizeExceeded = errors.New("payload size exceeded") + // Write writes next chunk of the object data. Concatenation of all chunks forms // the payload of the final object. When the data is over, the PayloadWriter // should be closed. @@ -313,6 +317,10 @@ func (x *PayloadWriter) Write(chunk []byte) (int, error) { return 0, nil } + if x.payloadSizeKnown && x.rootMeta.length+uint64(len(chunk)) > x.payloadSize { + return 0, errPayloadSizeExceeded + } + n, err := x.currentWriter.Write(chunk) if err == nil || !errors.Is(err, errOverflow) { return n, err diff --git a/object/slicer/slicer_test.go b/object/slicer/slicer_test.go index ab950c44..f86d9108 100644 --- a/object/slicer/slicer_test.go +++ b/object/slicer/slicer_test.go @@ -254,8 +254,17 @@ func randomInput(tb testing.TB, size, sizeLimit uint64) (input, slicer.Options) } func testSlicer(t *testing.T, size, sizeLimit uint64) { + testSlicerWithKnownSize(t, size, sizeLimit, true) + testSlicerWithKnownSize(t, size, sizeLimit, false) +} + +func testSlicerWithKnownSize(t *testing.T, size, sizeLimit uint64, known bool) { in, opts := randomInput(t, size, sizeLimit) + if known { + opts.SetPayloadSize(uint64(len(in.payload))) + } + checker := &slicedObjectChecker{ opts: opts, tb: t, @@ -266,7 +275,7 @@ func testSlicer(t *testing.T, size, sizeLimit uint64) { for i := object.TypeRegular; i <= object.TypeLock; i++ { in.objectType = i - t.Run("slicer with "+i.EncodeToString(), func(t *testing.T) { + t.Run(fmt.Sprintf("slicer with %s,known_size=%t", i.EncodeToString(), known), func(t *testing.T) { testSlicerByHeaderType(t, checker, in, opts) }) } @@ -953,3 +962,100 @@ func BenchmarkReadPayloadBuffer(b *testing.B) { }) } } + +func TestKnownPayloadSize(t *testing.T) { + ctx := context.Background() + t.Run("overflow", func(t *testing.T) { + t.Run("read", func(t *testing.T) { + in, opts := randomInput(t, 1, 1) + obj := objecttest.Object(t) + hdr := *obj.CutPayload() + + opts.SetPayloadSize(20) + r := bytes.NewReader(make([]byte, 21)) + + _, err := slicer.Put(ctx, discardObject{opts: opts}, hdr, in.signer, r, opts) + require.ErrorContains(t, err, "payload size exceeded") + }) + + t.Run("write", func(t *testing.T) { + in, opts := randomInput(t, 1, 1) + obj := objecttest.Object(t) + hdr := *obj.CutPayload() + + opts.SetPayloadSize(20) + + w, err := slicer.InitPut(ctx, discardObject{opts: opts}, hdr, in.signer, opts) + require.NoError(t, err) + + for i := byte(0); i < 21; i++ { + _, err = w.Write([]byte{1}) + if i < 20 { + require.NoError(t, err) + } else { + require.ErrorContains(t, err, "payload size exceeded") + } + } + }) + }) +} + +func BenchmarkKnownPayloadSize(b *testing.B) { + ctx := context.Background() + for _, tc := range []struct { + sizeLimit uint64 + size uint64 + }{ + {sizeLimit: 1 << 10, size: 1}, + {sizeLimit: 1 << 10, size: 1 << 10}, + {sizeLimit: 1 << 10, size: 10 << 10}, + } { + b.Run(fmt.Sprintf("limit=%d,size=%d", tc.sizeLimit, tc.size), func(b *testing.B) { + b.Run("read", func(b *testing.B) { + obj := objecttest.Object(b) + hdr := *obj.CutPayload() + signer := user.NewSigner(test.RandomSigner(b), usertest.ID(b)) + payload := make([]byte, tc.size) + rand.Read(payload) + + var opts slicer.Options + opts.SetObjectPayloadLimit(tc.sizeLimit) + opts.SetPayloadSize(tc.size) + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + _, err := slicer.Put(ctx, discardObject{opts: opts}, hdr, signer, bytes.NewReader(payload), opts) + require.NoError(b, err) + } + }) + + b.Run("write", func(b *testing.B) { + obj := objecttest.Object(b) + hdr := *obj.CutPayload() + signer := user.NewSigner(test.RandomSigner(b), usertest.ID(b)) + payload := make([]byte, tc.size) + rand.Read(payload) + + var opts slicer.Options + opts.SetObjectPayloadLimit(tc.sizeLimit) + opts.SetPayloadSize(tc.size) + + b.ReportAllocs() + b.ResetTimer() + + for i := 0; i < b.N; i++ { + w, err := slicer.InitPut(ctx, discardObject{opts: opts}, hdr, signer, opts) + require.NoError(b, err) + + _, err = w.Write(payload) + if err == nil { + err = w.Close() + } + require.NoError(b, err) + } + }) + }) + } +}