Skip to content

Commit

Permalink
fix: stewardship with erasure encoding (#4955)
Browse files Browse the repository at this point in the history
  • Loading branch information
acha-bill authored Feb 4, 2025
1 parent ffd67d0 commit cda61e5
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 22 deletions.
61 changes: 61 additions & 0 deletions pkg/api/stewardship_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,27 @@
package api_test

import (
"bytes"
"context"
"encoding/hex"
"fmt"
"net/http"
"strconv"
"testing"
"time"

"github.com/ethersphere/bee/v2/pkg/api"
"github.com/ethersphere/bee/v2/pkg/file/redundancy"
"github.com/ethersphere/bee/v2/pkg/jsonhttp"
"github.com/ethersphere/bee/v2/pkg/jsonhttp/jsonhttptest"
"github.com/ethersphere/bee/v2/pkg/log"
mockpost "github.com/ethersphere/bee/v2/pkg/postage/mock"
"github.com/ethersphere/bee/v2/pkg/steward"
"github.com/ethersphere/bee/v2/pkg/steward/mock"
"github.com/ethersphere/bee/v2/pkg/storage"
mockstorer "github.com/ethersphere/bee/v2/pkg/storer/mock"
"github.com/ethersphere/bee/v2/pkg/swarm"
"gitlab.com/nolash/go-mockbytes"
)

// nolint:paralleltest
Expand Down Expand Up @@ -60,6 +69,58 @@ func TestStewardship(t *testing.T) {
})
}

type localRetriever struct {
getter storage.Getter
}

func (lr *localRetriever) RetrieveChunk(ctx context.Context, addr, sourceAddr swarm.Address) (chunk swarm.Chunk, err error) {
ch, err := lr.getter.Get(ctx, addr)
if err != nil {
return nil, fmt.Errorf("retrieve chunk %s: %w", addr, err)
}
return ch, nil
}

func TestStewardshipWithRedundancy(t *testing.T) {
t.Parallel()

var (
storerMock = mockstorer.New()
localRetrieval = &localRetriever{getter: storerMock.ChunkStore()}
s = steward.New(storerMock, localRetrieval, storerMock.Cache())
client, _, _, _ = newTestServer(t, testServerOptions{
Storer: storerMock,
Logger: log.Noop,
Steward: s,
Post: mockpost.New(mockpost.WithAcceptAll()),
})
)

g := mockbytes.New(0, mockbytes.MockTypeStandard).WithModulus(255)
content, err := g.SequentialBytes(512000) // 500KB
if err != nil {
t.Fatal(err)
}

for _, l := range []redundancy.Level{redundancy.NONE, redundancy.MEDIUM, redundancy.STRONG, redundancy.INSANE, redundancy.PARANOID} {
t.Run(fmt.Sprintf("rLevel-%d", l), func(t *testing.T) {
res := new(api.BytesPostResponse)
jsonhttptest.Request(t, client, http.MethodPost, "/bytes", http.StatusCreated,
jsonhttptest.WithRequestHeader(api.SwarmDeferredUploadHeader, "true"),
jsonhttptest.WithRequestHeader(api.SwarmPostageBatchIdHeader, batchOkStr),
jsonhttptest.WithRequestHeader(api.SwarmRedundancyLevelHeader, strconv.Itoa(int(l))),
jsonhttptest.WithRequestBody(bytes.NewReader(content)),
jsonhttptest.WithUnmarshalJSONResponse(res),
)

time.Sleep(2 * time.Second)
jsonhttptest.Request(t, client, http.MethodGet, "/stewardship/"+res.Reference.String(), http.StatusOK,
jsonhttptest.WithExpectedJSONResponse(api.IsRetrievableResponse{IsRetrievable: true}),
)
})
}
}

func TestStewardshipInvalidInputs(t *testing.T) {
t.Parallel()

Expand Down
42 changes: 20 additions & 22 deletions pkg/file/joiner/joiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"github.com/ethersphere/bee/v2/pkg/file/redundancy"
"github.com/ethersphere/bee/v2/pkg/file/redundancy/getter"
"github.com/ethersphere/bee/v2/pkg/replicas"
storage "github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/storage"
"github.com/ethersphere/bee/v2/pkg/swarm"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -376,10 +376,6 @@ func (j *joiner) processChunkAddresses(ctx context.Context, fn swarm.AddressIter
default:
}

eg, ectx := errgroup.WithContext(ctx)

var wg sync.WaitGroup

eSize, err := file.ChunkPayloadSize(data)
if err != nil {
return err
Expand All @@ -399,29 +395,31 @@ func (j *joiner) processChunkAddresses(ctx context.Context, fn swarm.AddressIter
continue
}

wg.Add(1)
eg.Go(func() error {
defer wg.Done()
if j.refLength == encryption.ReferenceSize && i < shardCnt {
addr = swarm.NewAddress(data[cursor : cursor+swarm.HashSize*2])
}

if j.refLength == encryption.ReferenceSize && i < shardCnt {
addr = swarm.NewAddress(data[cursor : cursor+swarm.HashSize*2])
}
ch, err := g.Get(ectx, addr)
if err != nil {
return err
}
// not a shard
if i >= shardCnt {
continue
}

chunkData := ch.Data()[8:]
subtrieLevel, subtrieSpan := j.chunkToSpan(ch.Data())
_, parities := file.ReferenceCount(uint64(subtrieSpan), subtrieLevel, j.refLength != swarm.HashSize)
ch, err := g.Get(ctx, addr)
if err != nil {
return err
}

return j.processChunkAddresses(ectx, fn, chunkData, subtrieSpan, parities)
})
chunkData := ch.Data()[8:]
subtrieLevel, subtrieSpan := j.chunkToSpan(ch.Data())
_, parities := file.ReferenceCount(uint64(subtrieSpan), subtrieLevel, j.refLength != swarm.HashSize)

wg.Wait()
err = j.processChunkAddresses(ctx, fn, chunkData, subtrieSpan, parities)
if err != nil {
return err
}
}

return eg.Wait()
return nil
}

func (j *joiner) Size() int64 {
Expand Down

0 comments on commit cda61e5

Please sign in to comment.