diff --git a/nodebuilder/share/constructors.go b/nodebuilder/share/constructors.go index f53ecf24ed..809683563c 100644 --- a/nodebuilder/share/constructors.go +++ b/nodebuilder/share/constructors.go @@ -1,8 +1,6 @@ package share import ( - "context" - "github.com/ipfs/go-blockservice" "github.com/ipfs/go-datastore" "github.com/libp2p/go-libp2p-core/host" @@ -13,7 +11,7 @@ import ( "github.com/celestiaorg/celestia-node/share" "github.com/celestiaorg/celestia-node/share/availability/cache" disc "github.com/celestiaorg/celestia-node/share/availability/discovery" - "github.com/celestiaorg/celestia-node/share/service" + "github.com/celestiaorg/celestia-node/share/getters" ) func discovery(cfg Config) func(routing.ContentRouting, host.Host) *disc.Discovery { @@ -40,15 +38,6 @@ func cacheAvailability[A share.Availability](lc fx.Lifecycle, ds datastore.Batch return ca } -func newModule(lc fx.Lifecycle, bServ blockservice.BlockService, avail share.Availability) Module { - serv := service.NewShareService(bServ, avail) - lc.Append(fx.Hook{ - OnStart: func(ctx context.Context) error { - return serv.Start(ctx) - }, - OnStop: func(ctx context.Context) error { - return serv.Stop(ctx) - }, - }) - return &module{serv} +func newModule(bServ blockservice.BlockService, avail share.Availability) Module { + return &module{getters.NewIPLDGetter(bServ), avail} } diff --git a/nodebuilder/share/share.go b/nodebuilder/share/share.go index 9ada1d1a62..868c557304 100644 --- a/nodebuilder/share/share.go +++ b/nodebuilder/share/share.go @@ -6,7 +6,6 @@ import ( "github.com/celestiaorg/nmt/namespace" "github.com/celestiaorg/celestia-node/share" - "github.com/celestiaorg/celestia-node/share/service" ) var _ Module = (*API)(nil) @@ -89,9 +88,10 @@ func (api *API) GetSharesByNamespace( } type module struct { - *service.ShareService + share.Getter + share.Availability } -func (m *module) SharesAvailable(ctx context.Context, root *share.Root) error { - return m.ShareService.SharesAvailable(ctx, root) +func (m module) SharesAvailable(ctx context.Context, root *share.Root) error { + return m.Availability.SharesAvailable(ctx, root) } diff --git a/share/availability/cache/availability_test.go b/share/availability/cache/availability_test.go index e4302307ef..00f61b6ba7 100644 --- a/share/availability/cache/availability_test.go +++ b/share/availability/cache/availability_test.go @@ -18,7 +18,6 @@ import ( "github.com/celestiaorg/celestia-node/share" availability_test "github.com/celestiaorg/celestia-node/share/availability/test" - "github.com/celestiaorg/celestia-node/share/service" ) // TestCacheAvailability tests to ensure that the successful result of a @@ -31,27 +30,27 @@ func TestCacheAvailability(t *testing.T) { lightLocalServ, dah1 := RandLightLocalServiceWithSquare(t, 16) var tests = []struct { - service *service.ShareService - root *share.Root + avail share.Availability + root *share.Root }{ { - service: fullLocalServ, - root: dah0, + avail: fullLocalServ, + root: dah0, }, { - service: lightLocalServ, - root: dah1, + avail: lightLocalServ, + root: dah1, }, } for i, tt := range tests { t.Run(strconv.Itoa(i), func(t *testing.T) { - ca := tt.service.Availability.(*ShareAvailability) + ca := tt.avail.(*ShareAvailability) // ensure the dah isn't yet in the cache exists, err := ca.ds.Has(ctx, rootKey(tt.root)) require.NoError(t, err) assert.False(t, exists) - err = tt.service.SharesAvailable(ctx, tt.root) + err = tt.avail.SharesAvailable(ctx, tt.root) require.NoError(t, err) // ensure the dah was stored properly exists, err = ca.ds.Has(ctx, rootKey(tt.root)) @@ -72,9 +71,8 @@ func TestCacheAvailability_Failed(t *testing.T) { defer cancel() ca := NewShareAvailability(&dummyAvailability{}, sync.MutexWrap(datastore.NewMapDatastore())) - serv := service.NewShareService(mdutils.Bserv(), ca) - err := serv.SharesAvailable(ctx, &invalidHeader) + err := ca.SharesAvailable(ctx, &invalidHeader) require.Error(t, err) // ensure the dah was NOT cached exists, err := ca.ds.Has(ctx, rootKey(&invalidHeader)) @@ -112,10 +110,10 @@ func TestCacheAvailability_MinRoot(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - fullLocalServ, _ := RandFullLocalServiceWithSquare(t, 16) + fullLocalAvail, _ := RandFullLocalServiceWithSquare(t, 16) minDAH := da.MinDataAvailabilityHeader() - err := fullLocalServ.SharesAvailable(ctx, &minDAH) + err := fullLocalAvail.SharesAvailable(ctx, &minDAH) assert.NoError(t, err) } diff --git a/share/availability/cache/testing.go b/share/availability/cache/testing.go index 8ef7a1e743..14b715f136 100644 --- a/share/availability/cache/testing.go +++ b/share/availability/cache/testing.go @@ -11,29 +11,28 @@ import ( "github.com/celestiaorg/celestia-node/share/availability/full" "github.com/celestiaorg/celestia-node/share/availability/light" availability_test "github.com/celestiaorg/celestia-node/share/availability/test" - "github.com/celestiaorg/celestia-node/share/service" ) // RandLightLocalServiceWithSquare is the same as light.RandServiceWithSquare, except // the share.Availability is wrapped with cache availability. -func RandLightLocalServiceWithSquare(t *testing.T, n int) (*service.ShareService, *share.Root) { +func RandLightLocalServiceWithSquare(t *testing.T, n int) (share.Availability, *share.Root) { bServ := mdutils.Bserv() store := dssync.MutexWrap(ds.NewMapDatastore()) avail := NewShareAvailability( light.TestAvailability(bServ), store, ) - return service.NewShareService(bServ, avail), availability_test.RandFillBS(t, n, bServ) + return avail, availability_test.RandFillBS(t, n, bServ) } // RandFullLocalServiceWithSquare is the same as full.RandServiceWithSquare, except // the share.Availability is wrapped with cache availability. -func RandFullLocalServiceWithSquare(t *testing.T, n int) (*service.ShareService, *share.Root) { +func RandFullLocalServiceWithSquare(t *testing.T, n int) (share.Availability, *share.Root) { bServ := mdutils.Bserv() store := dssync.MutexWrap(ds.NewMapDatastore()) avail := NewShareAvailability( full.TestAvailability(bServ), store, ) - return service.NewShareService(bServ, avail), availability_test.RandFillBS(t, n, bServ) + return avail, availability_test.RandFillBS(t, n, bServ) } diff --git a/share/availability/full/availability_test.go b/share/availability/full/availability_test.go index ef3e653d9e..12f341ce04 100644 --- a/share/availability/full/availability_test.go +++ b/share/availability/full/availability_test.go @@ -34,7 +34,7 @@ func TestSharesAvailable_Full(t *testing.T) { defer cancel() // RandServiceWithSquare creates a NewShareAvailability inside, so we can test it - service, dah := RandServiceWithSquare(t, 16) - err := service.SharesAvailable(ctx, dah) + _, availability, dah := RandServiceWithSquare(t, 16) + err := availability.SharesAvailable(ctx, dah) assert.NoError(t, err) } diff --git a/share/availability/full/testing.go b/share/availability/full/testing.go index f072671df0..541ea5e616 100644 --- a/share/availability/full/testing.go +++ b/share/availability/full/testing.go @@ -12,14 +12,14 @@ import ( "github.com/celestiaorg/celestia-node/share" "github.com/celestiaorg/celestia-node/share/availability/discovery" availability_test "github.com/celestiaorg/celestia-node/share/availability/test" - "github.com/celestiaorg/celestia-node/share/service" + "github.com/celestiaorg/celestia-node/share/getters" ) -// RandServiceWithSquare provides a service.ShareService filled with 'n' NMT +// RandServiceWithSquare provides a share.Getter filled with 'n' NMT // trees of 'n' random shares, essentially storing a whole square. -func RandServiceWithSquare(t *testing.T, n int) (*service.ShareService, *share.Root) { +func RandServiceWithSquare(t *testing.T, n int) (share.Getter, share.Availability, *share.Root) { bServ := mdutils.Bserv() - return service.NewShareService(bServ, TestAvailability(bServ)), availability_test.RandFillBS(t, n, bServ) + return getters.NewIPLDGetter(bServ), TestAvailability(bServ), availability_test.RandFillBS(t, n, bServ) } // RandNode creates a Full Node filled with a random block of the given size. @@ -31,7 +31,8 @@ func RandNode(dn *availability_test.TestDagNet, squareSize int) (*availability_t // Node creates a new empty Full Node. func Node(dn *availability_test.TestDagNet) *availability_test.TestNode { nd := dn.NewTestNode() - nd.ShareService = service.NewShareService(nd.BlockService, TestAvailability(nd.BlockService)) + nd.Getter = getters.NewIPLDGetter(nd.BlockService) + nd.Availability = TestAvailability(nd.BlockService) return nd } diff --git a/share/availability/light/availability_test.go b/share/availability/light/availability_test.go index 6cf998bf0b..8a6478a0f5 100644 --- a/share/availability/light/availability_test.go +++ b/share/availability/light/availability_test.go @@ -34,8 +34,8 @@ func TestSharesAvailable(t *testing.T) { defer cancel() // RandServiceWithSquare creates a Light ShareAvailability inside, so we can test it - service, dah := RandServiceWithSquare(t, 16) - err := service.SharesAvailable(ctx, dah) + _, avail, dah := RandServiceWithSquare(t, 16) + err := avail.SharesAvailable(ctx, dah) assert.NoError(t, err) } @@ -44,9 +44,9 @@ func TestSharesAvailableFailed(t *testing.T) { defer cancel() // RandServiceWithSquare creates a Light ShareAvailability inside, so we can test it - s, _ := RandServiceWithSquare(t, 16) + _, avail, _ := RandServiceWithSquare(t, 16) empty := header.EmptyDAH() - err := s.SharesAvailable(ctx, &empty) + err := avail.SharesAvailable(ctx, &empty) assert.Error(t, err) } @@ -68,20 +68,15 @@ func TestGetShare(t *testing.T) { defer cancel() n := 16 - serv, dah := RandServiceWithSquare(t, n) - err := serv.Start(ctx) - require.NoError(t, err) + getter, _, dah := RandServiceWithSquare(t, n) for i := range make([]bool, n) { for j := range make([]bool, n) { - sh, err := serv.GetShare(ctx, dah, i, j) + sh, err := getter.GetShare(ctx, dah, i, j) assert.NotNil(t, sh) assert.NoError(t, err) } } - - err = serv.Stop(ctx) - require.NoError(t, err) } func TestService_GetSharesByNamespace(t *testing.T) { @@ -96,7 +91,7 @@ func TestService_GetSharesByNamespace(t *testing.T) { for _, tt := range tests { t.Run("size: "+strconv.Itoa(tt.squareSize), func(t *testing.T) { - serv, bServ := RandService() + getter, _, bServ := RandService() n := tt.squareSize * tt.squareSize randShares := share.RandShares(t, n) idx1 := (n - 1) / 2 @@ -108,7 +103,7 @@ func TestService_GetSharesByNamespace(t *testing.T) { root := availability_test.FillBS(t, bServ, randShares) randNID := randShares[idx1][:8] - shares, err := serv.GetSharesByNamespace(context.Background(), root, randNID) + shares, err := getter.GetSharesByNamespace(context.Background(), root, randNID) require.NoError(t, err) assert.Len(t, shares, tt.expectedShareCount) for _, value := range shares { @@ -128,11 +123,9 @@ func TestGetShares(t *testing.T) { defer cancel() n := 16 - serv, dah := RandServiceWithSquare(t, n) - err := serv.Start(ctx) - require.NoError(t, err) + getter, _, dah := RandServiceWithSquare(t, n) - shares, err := serv.GetShares(ctx, dah) + shares, err := getter.GetShares(ctx, dah) require.NoError(t, err) flattened := make([][]byte, 0, len(shares)*2) @@ -147,16 +140,13 @@ func TestGetShares(t *testing.T) { gotDAH := da.NewDataAvailabilityHeader(eds) require.True(t, dah.Equals(&gotDAH)) - - err = serv.Stop(ctx) - require.NoError(t, err) } func TestService_GetSharesByNamespaceNotFound(t *testing.T) { - serv, root := RandServiceWithSquare(t, 1) + getter, _, root := RandServiceWithSquare(t, 1) root.RowsRoots = nil - shares, err := serv.GetSharesByNamespace(context.Background(), root, []byte{1, 1, 1, 1, 1, 1, 1, 1}) + shares, err := getter.GetSharesByNamespace(context.Background(), root, []byte{1, 1, 1, 1, 1, 1, 1, 1}) assert.Len(t, shares, 0) assert.NoError(t, err) } @@ -173,12 +163,12 @@ func BenchmarkService_GetSharesByNamespace(b *testing.B) { for _, tt := range tests { b.Run(strconv.Itoa(tt.amountShares), func(b *testing.B) { t := &testing.T{} - serv, root := RandServiceWithSquare(t, tt.amountShares) + getter, _, root := RandServiceWithSquare(t, tt.amountShares) randNID := root.RowsRoots[(len(root.RowsRoots)-1)/2][:8] root.RowsRoots[(len(root.RowsRoots) / 2)] = root.RowsRoots[(len(root.RowsRoots)-1)/2] b.ResetTimer() for i := 0; i < b.N; i++ { - _, err := serv.GetSharesByNamespace(context.Background(), root, randNID) + _, err := getter.GetSharesByNamespace(context.Background(), root, randNID) require.NoError(t, err) } }) @@ -186,7 +176,7 @@ func BenchmarkService_GetSharesByNamespace(b *testing.B) { } func TestSharesRoundTrip(t *testing.T) { - serv, store := RandService() + getter, _, store := RandService() ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -325,7 +315,7 @@ func TestSharesRoundTrip(t *testing.T) { require.NoError(t, err) dah := da.NewDataAvailabilityHeader(extSquare) - shares, err := serv.GetSharesByNamespace(ctx, &dah, namespace) + shares, err := getter.GetSharesByNamespace(ctx, &dah, namespace) require.NoError(t, err) require.NotEmpty(t, shares) diff --git a/share/availability/light/testing.go b/share/availability/light/testing.go index 9a63198b3a..c3ef0f6a0a 100644 --- a/share/availability/light/testing.go +++ b/share/availability/light/testing.go @@ -12,22 +12,22 @@ import ( "github.com/celestiaorg/celestia-node/share" "github.com/celestiaorg/celestia-node/share/availability/discovery" availability_test "github.com/celestiaorg/celestia-node/share/availability/test" - "github.com/celestiaorg/celestia-node/share/service" + "github.com/celestiaorg/celestia-node/share/getters" ) -// RandServiceWithSquare provides a share.Service filled with 'n' NMT +// RandServiceWithSquare provides a share.Getter/share.Availability filled with 'n' NMT // trees of 'n' random shares, essentially storing a whole square. -func RandServiceWithSquare(t *testing.T, n int) (*service.ShareService, *share.Root) { +func RandServiceWithSquare(t *testing.T, n int) (share.Getter, share.Availability, *share.Root) { bServ := mdutils.Bserv() - return service.NewShareService(bServ, TestAvailability(bServ)), availability_test.RandFillBS(t, n, bServ) + return getters.NewIPLDGetter(bServ), TestAvailability(bServ), availability_test.RandFillBS(t, n, bServ) } -// RandService provides an unfilled share.Service with corresponding +// RandService provides an unfilled share.Getter/share.Availability with corresponding // blockservice.BlockService than can be filled by the test. -func RandService() (*service.ShareService, blockservice.BlockService) { +func RandService() (share.Getter, share.Availability, blockservice.BlockService) { bServ := mdutils.Bserv() - return service.NewShareService(bServ, TestAvailability(bServ)), bServ + return getters.NewIPLDGetter(bServ), TestAvailability(bServ), bServ } // RandNode creates a Light Node filled with a random block of the given size. @@ -39,7 +39,8 @@ func RandNode(dn *availability_test.TestDagNet, squareSize int) (*availability_t // Node creates a new empty Light Node. func Node(dn *availability_test.TestDagNet) *availability_test.TestNode { nd := dn.NewTestNode() - nd.ShareService = service.NewShareService(nd.BlockService, TestAvailability(nd.BlockService)) + nd.Getter = getters.NewIPLDGetter(nd.BlockService) + nd.Availability = TestAvailability(nd.BlockService) return nd } diff --git a/share/availability/test/testing.go b/share/availability/test/testing.go index e9399ddad6..205fe78aee 100644 --- a/share/availability/test/testing.go +++ b/share/availability/test/testing.go @@ -19,7 +19,6 @@ import ( "github.com/celestiaorg/celestia-app/pkg/da" "github.com/celestiaorg/celestia-node/share" - "github.com/celestiaorg/celestia-node/share/service" ) // RandFillBS fills the given BlockService with a random block of a given size. @@ -38,7 +37,8 @@ func FillBS(t *testing.T, bServ blockservice.BlockService, shares []share.Share) type TestNode struct { net *TestDagNet - *service.ShareService + share.Getter + share.Availability blockservice.BlockService host.Host } diff --git a/share/getter.go b/share/getter.go new file mode 100644 index 0000000000..d1cc7d3956 --- /dev/null +++ b/share/getter.go @@ -0,0 +1,19 @@ +package share + +import ( + "context" + + "github.com/celestiaorg/nmt/namespace" +) + +// Getter interface provides a set of accessors for shares by the Root. +type Getter interface { + // GetShare gets a Share by coordinates in EDS. + GetShare(ctx context.Context, root *Root, row, col int) (Share, error) + + // GetShares gets all the shares. + GetShares(context.Context, *Root) ([][]Share, error) + + // GetSharesByNamespace gets all the shares of the given namespace. + GetSharesByNamespace(context.Context, *Root, namespace.ID) ([]Share, error) +} diff --git a/share/service/service.go b/share/getters/ipld.go similarity index 52% rename from share/service/service.go rename to share/getters/ipld.go index 190e68c4ea..1e8feec770 100644 --- a/share/service/service.go +++ b/share/getters/ipld.go @@ -1,4 +1,4 @@ -package service +package getters import ( "context" @@ -12,58 +12,27 @@ import ( "github.com/celestiaorg/celestia-node/share/eds" "github.com/celestiaorg/celestia-node/share/ipld" "github.com/celestiaorg/nmt" + "github.com/celestiaorg/nmt/namespace" ) -// TODO(@Wondertan): Simple thread safety for Start and Stop would not hurt. -type ShareService struct { - share.Availability +var _ share.Getter = (*IPLDGetter)(nil) + +type IPLDGetter struct { rtrv *eds.Retriever bServ blockservice.BlockService - // session is blockservice sub-session that applies optimization for fetching/loading related - // nodes, like shares prefer session over blockservice for fetching nodes. - session blockservice.BlockGetter - cancel context.CancelFunc -} - -// NewService creates a new basic share.Module. -func NewShareService(bServ blockservice.BlockService, avail share.Availability) *ShareService { - return &ShareService{ - rtrv: eds.NewRetriever(bServ), - Availability: avail, - bServ: bServ, - } } -func (s *ShareService) Start(context.Context) error { - if s.session != nil || s.cancel != nil { - return fmt.Errorf("share: service already started") +func NewIPLDGetter(bServ blockservice.BlockService) *IPLDGetter { + return &IPLDGetter{ + rtrv: eds.NewRetriever(bServ), + bServ: bServ, } - - // NOTE: The ctx given as param is used to control Start flow and only needed when Start is - // blocking, but this one is not. - // - // The newer context here is created to control lifecycle of the session and peer discovery. - ctx, cancel := context.WithCancel(context.Background()) - s.cancel = cancel - s.session = blockservice.NewSession(ctx, s.bServ) - return nil -} - -func (s *ShareService) Stop(context.Context) error { - if s.session == nil || s.cancel == nil { - return fmt.Errorf("share: service already stopped") - } - - s.cancel() - s.cancel = nil - s.session = nil - return nil } -func (s *ShareService) GetShare(ctx context.Context, dah *share.Root, row, col int) (share.Share, error) { +func (ig *IPLDGetter) GetShare(ctx context.Context, dah *share.Root, row, col int) (share.Share, error) { root, leaf := ipld.Translate(dah, row, col) - nd, err := share.GetShare(ctx, s.bServ, root, leaf, len(dah.RowsRoots)) + nd, err := share.GetShare(ctx, ig.bServ, root, leaf, len(dah.RowsRoots)) if err != nil { return nil, err } @@ -71,8 +40,8 @@ func (s *ShareService) GetShare(ctx context.Context, dah *share.Root, row, col i return nd, nil } -func (s *ShareService) GetShares(ctx context.Context, root *share.Root) ([][]share.Share, error) { - eds, err := s.rtrv.Retrieve(ctx, root) +func (ig *IPLDGetter) GetShares(ctx context.Context, root *share.Root) ([][]share.Share, error) { + eds, err := ig.rtrv.Retrieve(ctx, root) if err != nil { return nil, err } @@ -91,9 +60,7 @@ func (s *ShareService) GetShares(ctx context.Context, root *share.Root) ([][]sha return shares, nil } -// GetSharesByNamespace iterates over a square's row roots and accumulates the found shares in the -// given namespace.ID. -func (s *ShareService) GetSharesByNamespace( +func (ig *IPLDGetter) GetSharesByNamespace( ctx context.Context, root *share.Root, nID namespace.ID, @@ -118,7 +85,7 @@ func (s *ShareService) GetSharesByNamespace( // shadow loop variables, to ensure correct values are captured i, rootCID := i, rootCID errGroup.Go(func() (err error) { - shares[i], err = share.GetSharesByNamespace(ctx, s.bServ, rootCID, nID, len(root.RowsRoots), nil) + shares[i], err = share.GetSharesByNamespace(ctx, ig.bServ, rootCID, nID, len(root.RowsRoots), nil) return }) } diff --git a/share/ipld/corrupted_data_test.go b/share/ipld/corrupted_data_test.go index 8b14f6d2aa..857a44bfc9 100644 --- a/share/ipld/corrupted_data_test.go +++ b/share/ipld/corrupted_data_test.go @@ -10,7 +10,6 @@ import ( "github.com/celestiaorg/celestia-node/share" "github.com/celestiaorg/celestia-node/share/availability/full" availability_test "github.com/celestiaorg/celestia-node/share/availability/test" - "github.com/celestiaorg/celestia-node/share/service" ) // sharesAvailableTimeout is an arbitrarily picked interval of time in which a TestNode is expected @@ -26,7 +25,7 @@ func TestNamespaceHasher_CorruptedData(t *testing.T) { requestor := full.Node(net) provider, mockBS := availability_test.MockNode(t, net) - provider.ShareService = service.NewShareService(provider.BlockService, full.TestAvailability(provider.BlockService)) + provider.Availability = full.TestAvailability(provider.BlockService) net.ConnectAll() // before the provider starts attacking, we should be able to retrieve successfully. We pass a size