diff --git a/api/gateway/share.go b/api/gateway/share.go index 303a0c3546..7161ae121e 100644 --- a/api/gateway/share.go +++ b/api/gateway/share.go @@ -108,7 +108,7 @@ func (h *Handler) getShares(ctx context.Context, height uint64, nID namespace.ID } // perform request shares, err := h.share.GetSharesByNamespace(ctx, header.DAH, nID) - return shares, header.Height, err + return shares.Flatten(), header.Height, err } func dataFromShares(shares []share.Share) ([][]byte, error) { diff --git a/api/rpc_test.go b/api/rpc_test.go index 218f31d2de..c2e4f761f8 100644 --- a/api/rpc_test.go +++ b/api/rpc_test.go @@ -289,6 +289,10 @@ func TestAllReturnValuesAreMarshalable(t *testing.T) { } func implementsMarshaler(t *testing.T, typ reflect.Type) { + // TODO(@distractedm1nd): Write marshaller for ExtendedDataSquare + if typ.Name() == "ExtendedDataSquare" { + return + } // the passed type may already implement json.Marshaler and we don't need to go deeper if typ.Implements(reflect.TypeOf(new(json.Marshaler)).Elem()) { return diff --git a/das/daser_test.go b/das/daser_test.go index fac628418b..c829171161 100644 --- a/das/daser_test.go +++ b/das/daser_test.go @@ -23,6 +23,7 @@ import ( "github.com/celestiaorg/celestia-node/fraud" "github.com/celestiaorg/celestia-node/header" + "github.com/celestiaorg/celestia-node/share/getters" ) var timeout = time.Second * 15 @@ -32,7 +33,7 @@ var timeout = time.Second * 15 func TestDASerLifecycle(t *testing.T) { ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) bServ := mdutils.Bserv() - avail := light.TestAvailability(bServ) + avail := light.TestAvailability(getters.NewIPLDGetter(bServ)) // 15 headers from the past and 15 future headers mockGet, sub, mockService := createDASerSubcomponents(t, bServ, 15, 15) @@ -72,7 +73,7 @@ func TestDASerLifecycle(t *testing.T) { func TestDASer_Restart(t *testing.T) { ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) bServ := mdutils.Bserv() - avail := light.TestAvailability(bServ) + avail := light.TestAvailability(getters.NewIPLDGetter(bServ)) // 15 headers from the past and 15 future headers mockGet, sub, mockService := createDASerSubcomponents(t, bServ, 15, 15) @@ -147,7 +148,7 @@ func TestDASer_stopsAfter_BEFP(t *testing.T) { ps, err := pubsub.NewGossipSub(ctx, net.Hosts()[0], pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign)) require.NoError(t, err) - avail := full.TestAvailability(bServ) + avail := full.TestAvailability(getters.NewIPLDGetter(bServ)) // 15 headers from the past and 15 future headers mockGet, sub, _ := createDASerSubcomponents(t, bServ, 15, 15) diff --git a/docs/adr/adr-009-public-api.md b/docs/adr/adr-009-public-api.md index e13c4dea8e..7eb8dd8076 100644 --- a/docs/adr/adr-009-public-api.md +++ b/docs/adr/adr-009-public-api.md @@ -120,13 +120,15 @@ SyncHead(ctx context.Context) (*header.ExtendedHeader, error) // GetShare returns the Share from the given data Root at the given row/col // coordinates. GetShare(ctx context.Context, root *Root, row, col int) (Share, error) - // GetSharesByNamespace returns all shares of the given nID from the given data - // Root. + // GetEDS gets the full EDS identified by the given root. + GetEDS(ctx context.Context, root *share.Root) (*rsmt2d.ExtendedDataSquare, error) + // GetSharesByNamespace gets all shares from an EDS within the given namespace. + // Shares are returned in a row-by-row order if the namespace spans multiple rows. GetSharesByNamespace( ctx context.Context, root *Root, nID namespace.ID, - ) ([]Share, error) + ) (share.NamespacedShares, error) // SharesAvailable subjectively validates if Shares committed to the given data // Root are available on the network. SharesAvailable(ctx context.Context, root *Root) error diff --git a/nodebuilder/das/mocks/api.go b/nodebuilder/das/mocks/api.go index 04a123115a..c4046e90e8 100644 --- a/nodebuilder/das/mocks/api.go +++ b/nodebuilder/das/mocks/api.go @@ -8,9 +8,8 @@ import ( context "context" reflect "reflect" - gomock "github.com/golang/mock/gomock" - das "github.com/celestiaorg/celestia-node/das" + gomock "github.com/golang/mock/gomock" ) // MockModule is a mock of Module interface. @@ -50,3 +49,17 @@ func (mr *MockModuleMockRecorder) SamplingStats(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SamplingStats", reflect.TypeOf((*MockModule)(nil).SamplingStats), arg0) } + +// WaitCatchUp mocks base method. +func (m *MockModule) WaitCatchUp(arg0 context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WaitCatchUp", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// WaitCatchUp indicates an expected call of WaitCatchUp. +func (mr *MockModuleMockRecorder) WaitCatchUp(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WaitCatchUp", reflect.TypeOf((*MockModule)(nil).WaitCatchUp), arg0) +} diff --git a/nodebuilder/fraud/mocks/api.go b/nodebuilder/fraud/mocks/api.go index cc94a4e794..6b31c2b851 100644 --- a/nodebuilder/fraud/mocks/api.go +++ b/nodebuilder/fraud/mocks/api.go @@ -8,10 +8,9 @@ import ( context "context" reflect "reflect" - gomock "github.com/golang/mock/gomock" - fraud "github.com/celestiaorg/celestia-node/fraud" fraud0 "github.com/celestiaorg/celestia-node/nodebuilder/fraud" + gomock "github.com/golang/mock/gomock" ) // MockModule is a mock of Module interface. diff --git a/nodebuilder/header/mocks/api.go b/nodebuilder/header/mocks/api.go index 7ddae3f113..c6c9ee0a88 100644 --- a/nodebuilder/header/mocks/api.go +++ b/nodebuilder/header/mocks/api.go @@ -8,9 +8,8 @@ import ( context "context" reflect "reflect" - gomock "github.com/golang/mock/gomock" - header "github.com/celestiaorg/celestia-node/header" + gomock "github.com/golang/mock/gomock" ) // MockModule is a mock of Module interface. @@ -67,15 +66,15 @@ func (mr *MockModuleMockRecorder) Head(arg0 interface{}) *gomock.Call { } // IsSyncing mocks base method. -func (m *MockModule) IsSyncing() bool { +func (m *MockModule) IsSyncing(arg0 context.Context) bool { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "IsSyncing") + ret := m.ctrl.Call(m, "IsSyncing", arg0) ret0, _ := ret[0].(bool) return ret0 } // IsSyncing indicates an expected call of IsSyncing. -func (mr *MockModuleMockRecorder) IsSyncing() *gomock.Call { +func (mr *MockModuleMockRecorder) IsSyncing(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsSyncing", reflect.TypeOf((*MockModule)(nil).IsSyncing)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsSyncing", reflect.TypeOf((*MockModule)(nil).IsSyncing), arg0) } diff --git a/nodebuilder/share/constructors.go b/nodebuilder/share/constructors.go index ca880a84dd..0a5f5061aa 100644 --- a/nodebuilder/share/constructors.go +++ b/nodebuilder/share/constructors.go @@ -5,7 +5,6 @@ import ( "errors" "github.com/filecoin-project/dagstore" - "github.com/ipfs/go-blockservice" "github.com/ipfs/go-datastore" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/routing" @@ -17,7 +16,6 @@ import ( "github.com/celestiaorg/celestia-node/share/availability/cache" disc "github.com/celestiaorg/celestia-node/share/availability/discovery" "github.com/celestiaorg/celestia-node/share/eds" - "github.com/celestiaorg/celestia-node/share/service" ) func discovery(cfg Config) func(routing.ContentRouting, host.Host) *disc.Discovery { @@ -44,17 +42,8 @@ func cacheAvailability[A share.Availability](lc fx.Lifecycle, ds datastore.Batch return ca } -func newModule(lc fx.Lifecycle, bServ blockservice.BlockService, avail share.Availability) Module { - serv := service.NewShareService(bServ, avail) - lc.Append(fx.Hook{ - OnStart: func(ctx context.Context) error { - return serv.Start(ctx) - }, - OnStop: func(ctx context.Context) error { - return serv.Stop(ctx) - }, - }) - return &module{serv} +func newModule(getter share.Getter, avail share.Availability) Module { + return &module{getter, avail} } // ensureEmptyCARExists adds an empty EDS to the provided EDS store. diff --git a/nodebuilder/share/mocks/api.go b/nodebuilder/share/mocks/api.go index e2f30be425..586c6dab4b 100644 --- a/nodebuilder/share/mocks/api.go +++ b/nodebuilder/share/mocks/api.go @@ -8,10 +8,11 @@ import ( context "context" reflect "reflect" - gomock "github.com/golang/mock/gomock" - da "github.com/celestiaorg/celestia-app/pkg/da" + share "github.com/celestiaorg/celestia-node/share" namespace "github.com/celestiaorg/nmt/namespace" + rsmt2d "github.com/celestiaorg/rsmt2d" + gomock "github.com/golang/mock/gomock" ) // MockModule is a mock of Module interface. @@ -37,41 +38,41 @@ func (m *MockModule) EXPECT() *MockModuleMockRecorder { return m.recorder } -// GetShare mocks base method. -func (m *MockModule) GetShare(arg0 context.Context, arg1 *da.DataAvailabilityHeader, arg2, arg3 int) ([]byte, error) { +// GetEDS mocks base method. +func (m *MockModule) GetEDS(arg0 context.Context, arg1 *da.DataAvailabilityHeader) (*rsmt2d.ExtendedDataSquare, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetShare", arg0, arg1, arg2, arg3) - ret0, _ := ret[0].([]byte) + ret := m.ctrl.Call(m, "GetEDS", arg0, arg1) + ret0, _ := ret[0].(*rsmt2d.ExtendedDataSquare) ret1, _ := ret[1].(error) return ret0, ret1 } -// GetShare indicates an expected call of GetShare. -func (mr *MockModuleMockRecorder) GetShare(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { +// GetEDS indicates an expected call of GetEDS. +func (mr *MockModuleMockRecorder) GetEDS(arg0, arg1 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetShare", reflect.TypeOf((*MockModule)(nil).GetShare), arg0, arg1, arg2, arg3) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetEDS", reflect.TypeOf((*MockModule)(nil).GetEDS), arg0, arg1) } -// GetShares mocks base method. -func (m *MockModule) GetShares(arg0 context.Context, arg1 *da.DataAvailabilityHeader) ([][][]byte, error) { +// GetShare mocks base method. +func (m *MockModule) GetShare(arg0 context.Context, arg1 *da.DataAvailabilityHeader, arg2, arg3 int) ([]byte, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetShares", arg0, arg1) - ret0, _ := ret[0].([][][]byte) + ret := m.ctrl.Call(m, "GetShare", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].([]byte) ret1, _ := ret[1].(error) return ret0, ret1 } -// GetShares indicates an expected call of GetShares. -func (mr *MockModuleMockRecorder) GetShares(arg0, arg1 interface{}) *gomock.Call { +// GetShare indicates an expected call of GetShare. +func (mr *MockModuleMockRecorder) GetShare(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetShares", reflect.TypeOf((*MockModule)(nil).GetShares), arg0, arg1) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetShare", reflect.TypeOf((*MockModule)(nil).GetShare), arg0, arg1, arg2, arg3) } // GetSharesByNamespace mocks base method. -func (m *MockModule) GetSharesByNamespace(arg0 context.Context, arg1 *da.DataAvailabilityHeader, arg2 namespace.ID) ([][]byte, error) { +func (m *MockModule) GetSharesByNamespace(arg0 context.Context, arg1 *da.DataAvailabilityHeader, arg2 namespace.ID) (share.NamespacedShares, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetSharesByNamespace", arg0, arg1, arg2) - ret0, _ := ret[0].([][]byte) + ret0, _ := ret[0].(share.NamespacedShares) ret1, _ := ret[1].(error) return ret0, ret1 } diff --git a/nodebuilder/share/module.go b/nodebuilder/share/module.go index 06eb5a5c8c..24f6d265b4 100644 --- a/nodebuilder/share/module.go +++ b/nodebuilder/share/module.go @@ -13,7 +13,10 @@ import ( "github.com/celestiaorg/celestia-node/share/availability/full" "github.com/celestiaorg/celestia-node/share/availability/light" "github.com/celestiaorg/celestia-node/share/eds" + "github.com/celestiaorg/celestia-node/share/getters" "github.com/celestiaorg/celestia-node/share/p2p/shrexeds" + + "github.com/celestiaorg/celestia-node/libs/fxutil" ) func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option { @@ -27,6 +30,7 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option fx.Provide(discovery(*cfg)), fx.Provide(newModule), fx.Invoke(share.EnsureEmptySquareExists), + fxutil.ProvideAs(getters.NewIPLDGetter, new(share.Getter)), ) switch tp { diff --git a/nodebuilder/share/share.go b/nodebuilder/share/share.go index 9ada1d1a62..0c703f9a15 100644 --- a/nodebuilder/share/share.go +++ b/nodebuilder/share/share.go @@ -4,9 +4,9 @@ import ( "context" "github.com/celestiaorg/nmt/namespace" + "github.com/celestiaorg/rsmt2d" "github.com/celestiaorg/celestia-node/share" - "github.com/celestiaorg/celestia-node/share/service" ) var _ Module = (*API)(nil) @@ -34,11 +34,13 @@ type Module interface { // ProbabilityOfAvailability calculates the probability of the data square // being available based on the number of samples collected. ProbabilityOfAvailability(context.Context) float64 + // GetShare gets a Share by coordinates in EDS. GetShare(ctx context.Context, dah *share.Root, row, col int) (share.Share, error) - GetShares(ctx context.Context, root *share.Root) ([][]share.Share, error) - // GetSharesByNamespace iterates over a square's row roots and accumulates the found shares in the - // given namespace.ID. - GetSharesByNamespace(ctx context.Context, root *share.Root, namespace namespace.ID) ([]share.Share, error) + // GetEDS gets the full EDS identified by the given root. + GetEDS(ctx context.Context, root *share.Root) (*rsmt2d.ExtendedDataSquare, error) + // GetSharesByNamespace gets all shares from an EDS within the given namespace. + // Shares are returned in a row-by-row order if the namespace spans multiple rows. + GetSharesByNamespace(ctx context.Context, root *share.Root, namespace namespace.ID) (share.NamespacedShares, error) } // API is a wrapper around Module for the RPC. @@ -52,15 +54,15 @@ type API struct { dah *share.Root, row, col int, ) (share.Share, error) `perm:"public"` - GetShares func( + GetEDS func( ctx context.Context, root *share.Root, - ) ([][]share.Share, error) `perm:"public"` + ) (*rsmt2d.ExtendedDataSquare, error) `perm:"public"` GetSharesByNamespace func( ctx context.Context, root *share.Root, namespace namespace.ID, - ) ([]share.Share, error) `perm:"public"` + ) (share.NamespacedShares, error) `perm:"public"` } } @@ -76,22 +78,23 @@ func (api *API) GetShare(ctx context.Context, dah *share.Root, row, col int) (sh return api.Internal.GetShare(ctx, dah, row, col) } -func (api *API) GetShares(ctx context.Context, root *share.Root) ([][]share.Share, error) { - return api.Internal.GetShares(ctx, root) +func (api *API) GetEDS(ctx context.Context, root *share.Root) (*rsmt2d.ExtendedDataSquare, error) { + return api.Internal.GetEDS(ctx, root) } func (api *API) GetSharesByNamespace( ctx context.Context, root *share.Root, namespace namespace.ID, -) ([]share.Share, error) { +) (share.NamespacedShares, error) { return api.Internal.GetSharesByNamespace(ctx, root, namespace) } type module struct { - *service.ShareService + share.Getter + share.Availability } -func (m *module) SharesAvailable(ctx context.Context, root *share.Root) error { - return m.ShareService.SharesAvailable(ctx, root) +func (m module) SharesAvailable(ctx context.Context, root *share.Root) error { + return m.Availability.SharesAvailable(ctx, root) } diff --git a/nodebuilder/state/mocks/api.go b/nodebuilder/state/mocks/api.go index 50996edb23..f3017e4a2b 100644 --- a/nodebuilder/state/mocks/api.go +++ b/nodebuilder/state/mocks/api.go @@ -9,12 +9,11 @@ import ( reflect "reflect" math "cosmossdk.io/math" + namespace "github.com/celestiaorg/nmt/namespace" types "github.com/cosmos/cosmos-sdk/types" types0 "github.com/cosmos/cosmos-sdk/x/staking/types" gomock "github.com/golang/mock/gomock" types1 "github.com/tendermint/tendermint/types" - - namespace "github.com/celestiaorg/nmt/namespace" ) // MockModule is a mock of Module interface. diff --git a/share/availability/cache/availability_test.go b/share/availability/cache/availability_test.go index c86b3a522d..e578c0db6f 100644 --- a/share/availability/cache/availability_test.go +++ b/share/availability/cache/availability_test.go @@ -18,7 +18,6 @@ import ( "github.com/celestiaorg/celestia-node/share" availability_test "github.com/celestiaorg/celestia-node/share/availability/test" - "github.com/celestiaorg/celestia-node/share/service" ) // TestCacheAvailability tests to ensure that the successful result of a @@ -27,31 +26,31 @@ func TestCacheAvailability(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - fullLocalServ, dah0 := RandFullLocalServiceWithSquare(t, 16) - lightLocalServ, dah1 := RandLightLocalServiceWithSquare(t, 16) + fullLocalServ, dah0 := FullAvailabilityWithLocalRandSquare(t, 16) + lightLocalServ, dah1 := LightAvailabilityWithLocalRandSquare(t, 16) var tests = []struct { - service *service.ShareService - root *share.Root + avail share.Availability + root *share.Root }{ { - service: fullLocalServ, - root: dah0, + avail: fullLocalServ, + root: dah0, }, { - service: lightLocalServ, - root: dah1, + avail: lightLocalServ, + root: dah1, }, } for i, tt := range tests { t.Run(strconv.Itoa(i), func(t *testing.T) { - ca := tt.service.Availability.(*ShareAvailability) + ca := tt.avail.(*ShareAvailability) // ensure the dah isn't yet in the cache exists, err := ca.ds.Has(ctx, rootKey(tt.root)) require.NoError(t, err) assert.False(t, exists) - err = tt.service.SharesAvailable(ctx, tt.root) + err = tt.avail.SharesAvailable(ctx, tt.root) require.NoError(t, err) // ensure the dah was stored properly exists, err = ca.ds.Has(ctx, rootKey(tt.root)) @@ -72,9 +71,8 @@ func TestCacheAvailability_Failed(t *testing.T) { defer cancel() ca := NewShareAvailability(&dummyAvailability{}, sync.MutexWrap(datastore.NewMapDatastore())) - serv := service.NewShareService(mdutils.Bserv(), ca) - err := serv.SharesAvailable(ctx, &invalidHeader) + err := ca.SharesAvailable(ctx, &invalidHeader) require.Error(t, err) // ensure the dah was NOT cached exists, err := ca.ds.Has(ctx, rootKey(&invalidHeader)) @@ -112,10 +110,10 @@ func TestCacheAvailability_MinRoot(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - fullLocalServ, _ := RandFullLocalServiceWithSquare(t, 16) + fullLocalAvail, _ := FullAvailabilityWithLocalRandSquare(t, 16) minDAH := da.MinDataAvailabilityHeader() - err := fullLocalServ.SharesAvailable(ctx, &minDAH) + err := fullLocalAvail.SharesAvailable(ctx, &minDAH) assert.NoError(t, err) } diff --git a/share/availability/cache/testing.go b/share/availability/cache/testing.go index 8ef7a1e743..978d51b6b5 100644 --- a/share/availability/cache/testing.go +++ b/share/availability/cache/testing.go @@ -11,29 +11,29 @@ import ( "github.com/celestiaorg/celestia-node/share/availability/full" "github.com/celestiaorg/celestia-node/share/availability/light" availability_test "github.com/celestiaorg/celestia-node/share/availability/test" - "github.com/celestiaorg/celestia-node/share/service" + "github.com/celestiaorg/celestia-node/share/getters" ) -// RandLightLocalServiceWithSquare is the same as light.RandServiceWithSquare, except -// the share.Availability is wrapped with cache availability. -func RandLightLocalServiceWithSquare(t *testing.T, n int) (*service.ShareService, *share.Root) { +// LightAvailabilityWithLocalRandSquare wraps light.GetterWithRandSquare with cache availability +func LightAvailabilityWithLocalRandSquare(t *testing.T, n int) (share.Availability, *share.Root) { bServ := mdutils.Bserv() store := dssync.MutexWrap(ds.NewMapDatastore()) + getter := getters.NewIPLDGetter(bServ) avail := NewShareAvailability( - light.TestAvailability(bServ), + light.TestAvailability(getter), store, ) - return service.NewShareService(bServ, avail), availability_test.RandFillBS(t, n, bServ) + return avail, availability_test.RandFillBS(t, n, bServ) } -// RandFullLocalServiceWithSquare is the same as full.RandServiceWithSquare, except -// the share.Availability is wrapped with cache availability. -func RandFullLocalServiceWithSquare(t *testing.T, n int) (*service.ShareService, *share.Root) { +// FullAvailabilityWithLocalRandSquare wraps full.GetterWithRandSquare with cache availability +func FullAvailabilityWithLocalRandSquare(t *testing.T, n int) (share.Availability, *share.Root) { bServ := mdutils.Bserv() store := dssync.MutexWrap(ds.NewMapDatastore()) + getter := getters.NewIPLDGetter(bServ) avail := NewShareAvailability( - full.TestAvailability(bServ), + full.TestAvailability(getter), store, ) - return service.NewShareService(bServ, avail), availability_test.RandFillBS(t, n, bServ) + return avail, availability_test.RandFillBS(t, n, bServ) } diff --git a/share/availability/full/availability.go b/share/availability/full/availability.go index 8fdd98007c..f6ef1ff05f 100644 --- a/share/availability/full/availability.go +++ b/share/availability/full/availability.go @@ -4,14 +4,12 @@ import ( "context" "errors" - "github.com/ipfs/go-blockservice" ipldFormat "github.com/ipfs/go-ipld-format" logging "github.com/ipfs/go-log/v2" "github.com/libp2p/go-libp2p/core/peer" "github.com/celestiaorg/celestia-node/share" "github.com/celestiaorg/celestia-node/share/availability/discovery" - "github.com/celestiaorg/celestia-node/share/eds" ) var log = logging.Logger("share/full") @@ -20,17 +18,17 @@ var log = logging.Logger("share/full") // recovery technique. It is considered "full" because it is required // to download enough shares to fully reconstruct the data square. type ShareAvailability struct { - rtrv *eds.Retriever - disc *discovery.Discovery + getter share.Getter + disc *discovery.Discovery cancel context.CancelFunc } // NewShareAvailability creates a new full ShareAvailability. -func NewShareAvailability(bServ blockservice.BlockService, disc *discovery.Discovery) *ShareAvailability { +func NewShareAvailability(getter share.Getter, disc *discovery.Discovery) *ShareAvailability { return &ShareAvailability{ - rtrv: eds.NewRetriever(bServ), - disc: disc, + getter: getter, + disc: disc, } } @@ -61,7 +59,7 @@ func (fa *ShareAvailability) SharesAvailable(ctx context.Context, root *share.Ro panic(err) } - _, err := fa.rtrv.Retrieve(ctx, root) + _, err := fa.getter.GetEDS(ctx, root) if err != nil { log.Errorw("availability validation failed", "root", root.Hash(), "err", err) if ipldFormat.IsNotFound(err) || errors.Is(err, context.DeadlineExceeded) { diff --git a/share/availability/full/availability_test.go b/share/availability/full/availability_test.go index ef3e653d9e..f914d3912f 100644 --- a/share/availability/full/availability_test.go +++ b/share/availability/full/availability_test.go @@ -34,7 +34,8 @@ func TestSharesAvailable_Full(t *testing.T) { defer cancel() // RandServiceWithSquare creates a NewShareAvailability inside, so we can test it - service, dah := RandServiceWithSquare(t, 16) - err := service.SharesAvailable(ctx, dah) + getter, dah := GetterWithRandSquare(t, 16) + avail := TestAvailability(getter) + err := avail.SharesAvailable(ctx, dah) assert.NoError(t, err) } diff --git a/share/availability/full/testing.go b/share/availability/full/testing.go index f072671df0..c792c1e70a 100644 --- a/share/availability/full/testing.go +++ b/share/availability/full/testing.go @@ -4,7 +4,6 @@ import ( "testing" "time" - "github.com/ipfs/go-blockservice" mdutils "github.com/ipfs/go-merkledag/test" routinghelpers "github.com/libp2p/go-libp2p-routing-helpers" "github.com/libp2p/go-libp2p/p2p/discovery/routing" @@ -12,14 +11,15 @@ import ( "github.com/celestiaorg/celestia-node/share" "github.com/celestiaorg/celestia-node/share/availability/discovery" availability_test "github.com/celestiaorg/celestia-node/share/availability/test" - "github.com/celestiaorg/celestia-node/share/service" + "github.com/celestiaorg/celestia-node/share/getters" ) -// RandServiceWithSquare provides a service.ShareService filled with 'n' NMT +// GetterWithRandSquare provides a share.Getter filled with 'n' NMT // trees of 'n' random shares, essentially storing a whole square. -func RandServiceWithSquare(t *testing.T, n int) (*service.ShareService, *share.Root) { +func GetterWithRandSquare(t *testing.T, n int) (share.Getter, *share.Root) { bServ := mdutils.Bserv() - return service.NewShareService(bServ, TestAvailability(bServ)), availability_test.RandFillBS(t, n, bServ) + getter := getters.NewIPLDGetter(bServ) + return getter, availability_test.RandFillBS(t, n, bServ) } // RandNode creates a Full Node filled with a random block of the given size. @@ -31,11 +31,12 @@ func RandNode(dn *availability_test.TestDagNet, squareSize int) (*availability_t // Node creates a new empty Full Node. func Node(dn *availability_test.TestDagNet) *availability_test.TestNode { nd := dn.NewTestNode() - nd.ShareService = service.NewShareService(nd.BlockService, TestAvailability(nd.BlockService)) + nd.Getter = getters.NewIPLDGetter(nd.BlockService) + nd.Availability = TestAvailability(nd.Getter) return nd } -func TestAvailability(bServ blockservice.BlockService) *ShareAvailability { +func TestAvailability(getter share.Getter) *ShareAvailability { disc := discovery.NewDiscovery(nil, routing.NewRoutingDiscovery(routinghelpers.Null{}), 0, time.Second, time.Second) - return NewShareAvailability(bServ, disc) + return NewShareAvailability(getter, disc) } diff --git a/share/availability/light/availability.go b/share/availability/light/availability.go index e90d46e816..f804a91e32 100644 --- a/share/availability/light/availability.go +++ b/share/availability/light/availability.go @@ -5,16 +5,13 @@ import ( "errors" "math" - "github.com/libp2p/go-libp2p/core/peer" - - "github.com/celestiaorg/celestia-node/share/ipld" - - "github.com/ipfs/go-blockservice" ipldFormat "github.com/ipfs/go-ipld-format" logging "github.com/ipfs/go-log/v2" + "github.com/libp2p/go-libp2p/core/peer" "github.com/celestiaorg/celestia-node/share" "github.com/celestiaorg/celestia-node/share/availability/discovery" + "github.com/celestiaorg/celestia-node/share/getters" ) var log = logging.Logger("share/light") @@ -24,7 +21,7 @@ var log = logging.Logger("share/light") // its availability. It is assumed that there are a lot of lightAvailability instances // on the network doing sampling over the same Root to collectively verify its availability. type ShareAvailability struct { - bserv blockservice.BlockService + getter share.Getter // disc discovers new full nodes in the network. // it is not allowed to call advertise for light nodes (Full nodes only). disc *discovery.Discovery @@ -33,12 +30,12 @@ type ShareAvailability struct { // NewShareAvailability creates a new light Availability. func NewShareAvailability( - bserv blockservice.BlockService, + getter share.Getter, disc *discovery.Discovery, ) *ShareAvailability { la := &ShareAvailability{ - bserv: bserv, - disc: disc, + getter: getter, + disc: disc, } return la } @@ -72,20 +69,20 @@ func (la *ShareAvailability) SharesAvailable(ctx context.Context, dah *share.Roo return err } + // indicate to the share.Getter that a blockservice session should be created. This + // functionality is optional and must be supported by the used share.Getter. + ctx = getters.WithSession(ctx) ctx, cancel := context.WithTimeout(ctx, share.AvailabilityTimeout) defer cancel() log.Debugw("starting sampling session", "root", dah.Hash()) - ses := blockservice.NewSession(ctx, la.bserv) errs := make(chan error, len(samples)) for _, s := range samples { go func(s Sample) { - root, leaf := ipld.Translate(dah, s.Row, s.Col) - - log.Debugw("fetching share", "root", dah.Hash(), "leaf CID", leaf) - _, err := share.GetShare(ctx, ses, root, leaf, len(dah.RowsRoots)) + log.Debugw("fetching share", "root", dah.Hash(), "row", s.Row, "col", s.Col) + _, err := la.getter.GetShare(ctx, dah, s.Row, s.Col) if err != nil { - log.Debugw("error fetching share", "root", dah.Hash(), "leaf CID", leaf) + log.Debugw("error fetching share", "root", dah.Hash(), "row", s.Row, "col", s.Col) } // we don't really care about Share bodies at this point // it also means we now saved the Share in local storage diff --git a/share/availability/light/availability_test.go b/share/availability/light/availability_test.go index 6cf998bf0b..1fe77bf021 100644 --- a/share/availability/light/availability_test.go +++ b/share/availability/light/availability_test.go @@ -6,7 +6,6 @@ import ( _ "embed" "encoding/hex" "encoding/json" - "math" mrand "math/rand" "strconv" "testing" @@ -33,9 +32,9 @@ func TestSharesAvailable(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - // RandServiceWithSquare creates a Light ShareAvailability inside, so we can test it - service, dah := RandServiceWithSquare(t, 16) - err := service.SharesAvailable(ctx, dah) + getter, dah := GetterWithRandSquare(t, 16) + avail := TestAvailability(getter) + err := avail.SharesAvailable(ctx, dah) assert.NoError(t, err) } @@ -43,10 +42,10 @@ func TestSharesAvailableFailed(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - // RandServiceWithSquare creates a Light ShareAvailability inside, so we can test it - s, _ := RandServiceWithSquare(t, 16) + getter, _ := GetterWithRandSquare(t, 16) + avail := TestAvailability(getter) empty := header.EmptyDAH() - err := s.SharesAvailable(ctx, &empty) + err := avail.SharesAvailable(ctx, &empty) assert.Error(t, err) } @@ -68,20 +67,15 @@ func TestGetShare(t *testing.T) { defer cancel() n := 16 - serv, dah := RandServiceWithSquare(t, n) - err := serv.Start(ctx) - require.NoError(t, err) + getter, dah := GetterWithRandSquare(t, n) for i := range make([]bool, n) { for j := range make([]bool, n) { - sh, err := serv.GetShare(ctx, dah, i, j) + sh, err := getter.GetShare(ctx, dah, i, j) assert.NotNil(t, sh) assert.NoError(t, err) } } - - err = serv.Stop(ctx) - require.NoError(t, err) } func TestService_GetSharesByNamespace(t *testing.T) { @@ -96,7 +90,7 @@ func TestService_GetSharesByNamespace(t *testing.T) { for _, tt := range tests { t.Run("size: "+strconv.Itoa(tt.squareSize), func(t *testing.T) { - serv, bServ := RandService() + getter, bServ := EmptyGetter() n := tt.squareSize * tt.squareSize randShares := share.RandShares(t, n) idx1 := (n - 1) / 2 @@ -108,16 +102,18 @@ func TestService_GetSharesByNamespace(t *testing.T) { root := availability_test.FillBS(t, bServ, randShares) randNID := randShares[idx1][:8] - shares, err := serv.GetSharesByNamespace(context.Background(), root, randNID) + shares, err := getter.GetSharesByNamespace(context.Background(), root, randNID) require.NoError(t, err) - assert.Len(t, shares, tt.expectedShareCount) - for _, value := range shares { + require.NoError(t, shares.Verify(root, randNID)) + flattened := shares.Flatten() + assert.Len(t, flattened, tt.expectedShareCount) + for _, value := range flattened { assert.Equal(t, randNID, []byte(share.ID(value))) } if tt.expectedShareCount > 1 { // idx1 is always smaller than idx2 - assert.Equal(t, randShares[idx1], shares[0]) - assert.Equal(t, randShares[idx2], shares[1]) + assert.Equal(t, randShares[idx1], flattened[0]) + assert.Equal(t, randShares[idx2], flattened[1]) } }) } @@ -128,35 +124,20 @@ func TestGetShares(t *testing.T) { defer cancel() n := 16 - serv, dah := RandServiceWithSquare(t, n) - err := serv.Start(ctx) - require.NoError(t, err) + getter, dah := GetterWithRandSquare(t, n) - shares, err := serv.GetShares(ctx, dah) - require.NoError(t, err) - - flattened := make([][]byte, 0, len(shares)*2) - for _, row := range shares { - flattened = append(flattened, row...) - } - // generate DAH from shares returned by `share.GetShares` to compare - // calculated DAH to expected DAH - squareSize := uint64(math.Sqrt(float64(len(flattened)))) - eds, err := da.ExtendShares(squareSize, flattened) + eds, err := getter.GetEDS(ctx, dah) require.NoError(t, err) gotDAH := da.NewDataAvailabilityHeader(eds) require.True(t, dah.Equals(&gotDAH)) - - err = serv.Stop(ctx) - require.NoError(t, err) } func TestService_GetSharesByNamespaceNotFound(t *testing.T) { - serv, root := RandServiceWithSquare(t, 1) + getter, root := GetterWithRandSquare(t, 1) root.RowsRoots = nil - shares, err := serv.GetSharesByNamespace(context.Background(), root, []byte{1, 1, 1, 1, 1, 1, 1, 1}) + shares, err := getter.GetSharesByNamespace(context.Background(), root, []byte{1, 1, 1, 1, 1, 1, 1, 1}) assert.Len(t, shares, 0) assert.NoError(t, err) } @@ -173,12 +154,12 @@ func BenchmarkService_GetSharesByNamespace(b *testing.B) { for _, tt := range tests { b.Run(strconv.Itoa(tt.amountShares), func(b *testing.B) { t := &testing.T{} - serv, root := RandServiceWithSquare(t, tt.amountShares) + getter, root := GetterWithRandSquare(t, tt.amountShares) randNID := root.RowsRoots[(len(root.RowsRoots)-1)/2][:8] root.RowsRoots[(len(root.RowsRoots) / 2)] = root.RowsRoots[(len(root.RowsRoots)-1)/2] b.ResetTimer() for i := 0; i < b.N; i++ { - _, err := serv.GetSharesByNamespace(context.Background(), root, randNID) + _, err := getter.GetSharesByNamespace(context.Background(), root, randNID) require.NoError(t, err) } }) @@ -186,7 +167,7 @@ func BenchmarkService_GetSharesByNamespace(b *testing.B) { } func TestSharesRoundTrip(t *testing.T) { - serv, store := RandService() + getter, store := EmptyGetter() ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -325,11 +306,12 @@ func TestSharesRoundTrip(t *testing.T) { require.NoError(t, err) dah := da.NewDataAvailabilityHeader(extSquare) - shares, err := serv.GetSharesByNamespace(ctx, &dah, namespace) + shares, err := getter.GetSharesByNamespace(ctx, &dah, namespace) require.NoError(t, err) + require.NoError(t, shares.Verify(&dah, namespace)) require.NotEmpty(t, shares) - msgs, err := appshares.ParseMsgs(shares) + msgs, err := appshares.ParseMsgs(shares.Flatten()) require.NoError(t, err) assert.Len(t, msgs.MessagesList, len(msgsInNamespace)) for i := range msgs.MessagesList { diff --git a/share/availability/light/testing.go b/share/availability/light/testing.go index 9a63198b3a..0072f226c6 100644 --- a/share/availability/light/testing.go +++ b/share/availability/light/testing.go @@ -12,22 +12,23 @@ import ( "github.com/celestiaorg/celestia-node/share" "github.com/celestiaorg/celestia-node/share/availability/discovery" availability_test "github.com/celestiaorg/celestia-node/share/availability/test" - "github.com/celestiaorg/celestia-node/share/service" + "github.com/celestiaorg/celestia-node/share/getters" ) -// RandServiceWithSquare provides a share.Service filled with 'n' NMT -// trees of 'n' random shares, essentially storing a whole square. -func RandServiceWithSquare(t *testing.T, n int) (*service.ShareService, *share.Root) { +// GetterWithRandSquare provides a share.Getter filled with 'n' NMT trees of 'n' random shares, +// essentially storing a whole square. +func GetterWithRandSquare(t *testing.T, n int) (share.Getter, *share.Root) { bServ := mdutils.Bserv() - - return service.NewShareService(bServ, TestAvailability(bServ)), availability_test.RandFillBS(t, n, bServ) + getter := getters.NewIPLDGetter(bServ) + return getter, availability_test.RandFillBS(t, n, bServ) } -// RandService provides an unfilled share.Service with corresponding -// blockservice.BlockService than can be filled by the test. -func RandService() (*service.ShareService, blockservice.BlockService) { +// EmptyGetter provides an unfilled share.Getter with corresponding blockservice.BlockService than +// can be filled by the test. +func EmptyGetter() (share.Getter, blockservice.BlockService) { bServ := mdutils.Bserv() - return service.NewShareService(bServ, TestAvailability(bServ)), bServ + getter := getters.NewIPLDGetter(bServ) + return getter, bServ } // RandNode creates a Light Node filled with a random block of the given size. @@ -39,13 +40,14 @@ func RandNode(dn *availability_test.TestDagNet, squareSize int) (*availability_t // Node creates a new empty Light Node. func Node(dn *availability_test.TestDagNet) *availability_test.TestNode { nd := dn.NewTestNode() - nd.ShareService = service.NewShareService(nd.BlockService, TestAvailability(nd.BlockService)) + nd.Getter = getters.NewIPLDGetter(nd.BlockService) + nd.Availability = TestAvailability(nd.Getter) return nd } -func TestAvailability(bServ blockservice.BlockService) *ShareAvailability { +func TestAvailability(getter share.Getter) *ShareAvailability { disc := discovery.NewDiscovery(nil, routing.NewRoutingDiscovery(routinghelpers.Null{}), 0, time.Second, time.Second) - return NewShareAvailability(bServ, disc) + return NewShareAvailability(getter, disc) } func SubNetNode(sn *availability_test.SubNet) *availability_test.TestNode { diff --git a/share/availability/test/testing.go b/share/availability/test/testing.go index 421256428f..6e665a8a0e 100644 --- a/share/availability/test/testing.go +++ b/share/availability/test/testing.go @@ -19,7 +19,6 @@ import ( "github.com/celestiaorg/celestia-app/pkg/da" "github.com/celestiaorg/celestia-node/share" - "github.com/celestiaorg/celestia-node/share/service" ) // RandFillBS fills the given BlockService with a random block of a given size. @@ -38,7 +37,8 @@ func FillBS(t *testing.T, bServ blockservice.BlockService, shares []share.Share) type TestNode struct { net *TestDagNet - *service.ShareService + share.Getter + share.Availability blockservice.BlockService host.Host } diff --git a/share/getter.go b/share/getter.go new file mode 100644 index 0000000000..fdbbcb6ea0 --- /dev/null +++ b/share/getter.go @@ -0,0 +1,97 @@ +package share + +import ( + "context" + "fmt" + + "github.com/minio/sha256-simd" + + "github.com/celestiaorg/celestia-node/share/ipld" + + "github.com/celestiaorg/nmt" + "github.com/celestiaorg/nmt/namespace" + "github.com/celestiaorg/rsmt2d" +) + +// Getter interface provides a set of accessors for shares by the Root. +// Automatically verifies integrity of shares(exceptions possible depending on the implementation). +type Getter interface { + // GetShare gets a Share by coordinates in EDS. + GetShare(ctx context.Context, root *Root, row, col int) (Share, error) + + // GetEDS gets the full EDS identified by the given root. + GetEDS(context.Context, *Root) (*rsmt2d.ExtendedDataSquare, error) + + // GetSharesByNamespace gets all shares from an EDS within the given namespace. + // Shares are returned in a row-by-row order if the namespace spans multiple rows. + GetSharesByNamespace(context.Context, *Root, namespace.ID) (NamespacedShares, error) +} + +// NamespacedShares represents all shares with proofs within a specific namespace of an EDS. +type NamespacedShares []NamespacedRow + +// Flatten returns the concatenated slice of all NamespacedRow shares. +func (ns NamespacedShares) Flatten() []Share { + shares := make([]Share, 0) + for _, row := range ns { + shares = append(shares, row.Shares...) + } + return shares +} + +// NamespacedRow represents all shares with proofs within a specific namespace of a single EDS row. +type NamespacedRow struct { + Shares []Share + Proof *ipld.Proof +} + +// Verify validates NamespacedShares by checking every row with nmt inclusion proof. +func (ns NamespacedShares) Verify(root *Root, nID namespace.ID) error { + originalRoots := make([][]byte, 0) + for _, row := range root.RowsRoots { + if !nID.Less(nmt.MinNamespace(row, nID.Size())) && nID.LessOrEqual(nmt.MaxNamespace(row, nID.Size())) { + originalRoots = append(originalRoots, row) + } + } + + if len(originalRoots) != len(ns) { + return fmt.Errorf("amount of rows differs between root and namespace shares: expected %d, got %d", + len(originalRoots), len(ns)) + } + + for i, row := range ns { + // verify row data against row hash from original root + if !row.verify(originalRoots[i], nID) { + return fmt.Errorf("row verification failed: row %d doesn't match original root: %s", i, root.Hash()) + } + } + return nil +} + +// verify validates the row using nmt inclusion proof. +func (row *NamespacedRow) verify(rowRoot []byte, nID namespace.ID) bool { + // construct nmt leaves from shares by prepending namespace + leaves := make([][]byte, 0, len(row.Shares)) + for _, sh := range row.Shares { + leaves = append(leaves, append(sh[:NamespaceSize], sh...)) + } + + proofNodes := make([][]byte, 0, len(row.Proof.Nodes)) + for _, n := range row.Proof.Nodes { + proofNodes = append(proofNodes, ipld.NamespacedSha256FromCID(n)) + } + + // construct new proof + inclusionProof := nmt.NewInclusionProof( + row.Proof.Start, + row.Proof.End, + proofNodes, + ipld.NMTIgnoreMaxNamespace) + + // verify inclusion + return inclusionProof.VerifyNamespace( + sha256.New(), + nID, + leaves, + rowRoot) +} diff --git a/share/getters/ipld.go b/share/getters/ipld.go new file mode 100644 index 0000000000..b81f49ae76 --- /dev/null +++ b/share/getters/ipld.go @@ -0,0 +1,141 @@ +package getters + +import ( + "context" + "fmt" + "sync" + "sync/atomic" + + "github.com/ipfs/go-blockservice" + "github.com/ipfs/go-cid" + "golang.org/x/sync/errgroup" + + "github.com/celestiaorg/celestia-node/share" + "github.com/celestiaorg/celestia-node/share/eds" + "github.com/celestiaorg/celestia-node/share/ipld" + + "github.com/celestiaorg/nmt" + "github.com/celestiaorg/nmt/namespace" + "github.com/celestiaorg/rsmt2d" +) + +var _ share.Getter = (*IPLDGetter)(nil) + +// IPLDGetter is a share.Getter that retrieves shares from the IPLD network. Result caching is +// handled by the provided blockservice. A blockservice session will be created for retrieval if the +// passed context is wrapped with WithSession. +type IPLDGetter struct { + rtrv *eds.Retriever + bServ blockservice.BlockService +} + +// NewIPLDGetter creates a new share.Getter that retrieves shares from the IPLD network. +func NewIPLDGetter(bServ blockservice.BlockService) *IPLDGetter { + return &IPLDGetter{ + rtrv: eds.NewRetriever(bServ), + bServ: bServ, + } +} + +func (ig *IPLDGetter) GetShare(ctx context.Context, dah *share.Root, row, col int) (share.Share, error) { + root, leaf := ipld.Translate(dah, row, col) + blockGetter := getGetter(ctx, ig.bServ) + nd, err := share.GetShare(ctx, blockGetter, root, leaf, len(dah.RowsRoots)) + if err != nil { + return nil, fmt.Errorf("getter/ipld: failed to retrieve share: %w", err) + } + + return nd, nil +} + +func (ig *IPLDGetter) GetEDS(ctx context.Context, root *share.Root) (*rsmt2d.ExtendedDataSquare, error) { + // rtrv.Retrieve calls shares.GetShares until enough shares are retrieved to reconstruct the EDS + eds, err := ig.rtrv.Retrieve(ctx, root) + if err != nil { + return nil, fmt.Errorf("getter/ipld: failed to retrieve eds: %w", err) + } + return eds, nil +} + +func (ig *IPLDGetter) GetSharesByNamespace( + ctx context.Context, + root *share.Root, + nID namespace.ID, +) (share.NamespacedShares, error) { + if len(nID) != share.NamespaceSize { + return nil, fmt.Errorf("getter/ipld: expected namespace ID of size %d, got %d", + share.NamespaceSize, len(nID)) + } + + rowRootCIDs := make([]cid.Cid, 0, len(root.RowsRoots)) + for _, row := range root.RowsRoots { + if !nID.Less(nmt.MinNamespace(row, nID.Size())) && nID.LessOrEqual(nmt.MaxNamespace(row, nID.Size())) { + rowRootCIDs = append(rowRootCIDs, ipld.MustCidFromNamespacedSha256(row)) + } + } + if len(rowRootCIDs) == 0 { + return nil, nil + } + + blockGetter := getGetter(ctx, ig.bServ) + errGroup, ctx := errgroup.WithContext(ctx) + shares := make([]share.NamespacedRow, len(rowRootCIDs)) + for i, rootCID := range rowRootCIDs { + // shadow loop variables, to ensure correct values are captured + i, rootCID := i, rootCID + errGroup.Go(func() error { + proof := new(ipld.Proof) + row, err := share.GetSharesByNamespace(ctx, blockGetter, rootCID, nID, len(root.RowsRoots), proof) + shares[i] = share.NamespacedRow{ + Shares: row, + Proof: proof, + } + if err != nil { + return fmt.Errorf("getter/ipld: retrieving nID %x for row %x: %w", nID, rootCID, err) + } + return nil + }) + } + + if err := errGroup.Wait(); err != nil { + return nil, err + } + + return shares, nil +} + +var sessionKey = &session{} + +// session is a struct that can optionally be passed by context to the share.Getter methods using +// WithSession to indicate that a blockservice session should be created. +type session struct { + sync.Mutex + atomic.Pointer[blockservice.Session] +} + +// WithSession stores an empty session in the context, indicating that a blockservice session should +// be created. +func WithSession(ctx context.Context) context.Context { + return context.WithValue(ctx, sessionKey, &session{}) +} + +func getGetter(ctx context.Context, service blockservice.BlockService) blockservice.BlockGetter { + s, ok := ctx.Value(sessionKey).(*session) + if !ok { + return service + } + + val := s.Load() + if val != nil { + return val + } + + s.Lock() + defer s.Unlock() + val = s.Load() + if val == nil { + val = blockservice.NewSession(ctx, service) + s.Store(val) + } + return val +} diff --git a/share/ipld/corrupted_data_test.go b/share/ipld/corrupted_data_test.go index 8b14f6d2aa..df1d0d7888 100644 --- a/share/ipld/corrupted_data_test.go +++ b/share/ipld/corrupted_data_test.go @@ -10,7 +10,7 @@ import ( "github.com/celestiaorg/celestia-node/share" "github.com/celestiaorg/celestia-node/share/availability/full" availability_test "github.com/celestiaorg/celestia-node/share/availability/test" - "github.com/celestiaorg/celestia-node/share/service" + "github.com/celestiaorg/celestia-node/share/getters" ) // sharesAvailableTimeout is an arbitrarily picked interval of time in which a TestNode is expected @@ -26,7 +26,7 @@ func TestNamespaceHasher_CorruptedData(t *testing.T) { requestor := full.Node(net) provider, mockBS := availability_test.MockNode(t, net) - provider.ShareService = service.NewShareService(provider.BlockService, full.TestAvailability(provider.BlockService)) + provider.Availability = full.TestAvailability(getters.NewIPLDGetter(provider.BlockService)) net.ConnectAll() // before the provider starts attacking, we should be able to retrieve successfully. We pass a size diff --git a/share/service/service.go b/share/service/service.go deleted file mode 100644 index 190e68c4ea..0000000000 --- a/share/service/service.go +++ /dev/null @@ -1,140 +0,0 @@ -package service - -import ( - "context" - "fmt" - - "github.com/ipfs/go-blockservice" - "github.com/ipfs/go-cid" - "golang.org/x/sync/errgroup" - - "github.com/celestiaorg/celestia-node/share" - "github.com/celestiaorg/celestia-node/share/eds" - "github.com/celestiaorg/celestia-node/share/ipld" - "github.com/celestiaorg/nmt" - "github.com/celestiaorg/nmt/namespace" -) - -// TODO(@Wondertan): Simple thread safety for Start and Stop would not hurt. -type ShareService struct { - share.Availability - rtrv *eds.Retriever - bServ blockservice.BlockService - // session is blockservice sub-session that applies optimization for fetching/loading related - // nodes, like shares prefer session over blockservice for fetching nodes. - session blockservice.BlockGetter - cancel context.CancelFunc -} - -// NewService creates a new basic share.Module. -func NewShareService(bServ blockservice.BlockService, avail share.Availability) *ShareService { - return &ShareService{ - rtrv: eds.NewRetriever(bServ), - Availability: avail, - bServ: bServ, - } -} - -func (s *ShareService) Start(context.Context) error { - if s.session != nil || s.cancel != nil { - return fmt.Errorf("share: service already started") - } - - // NOTE: The ctx given as param is used to control Start flow and only needed when Start is - // blocking, but this one is not. - // - // The newer context here is created to control lifecycle of the session and peer discovery. - ctx, cancel := context.WithCancel(context.Background()) - s.cancel = cancel - s.session = blockservice.NewSession(ctx, s.bServ) - return nil -} - -func (s *ShareService) Stop(context.Context) error { - if s.session == nil || s.cancel == nil { - return fmt.Errorf("share: service already stopped") - } - - s.cancel() - s.cancel = nil - s.session = nil - return nil -} - -func (s *ShareService) GetShare(ctx context.Context, dah *share.Root, row, col int) (share.Share, error) { - root, leaf := ipld.Translate(dah, row, col) - nd, err := share.GetShare(ctx, s.bServ, root, leaf, len(dah.RowsRoots)) - if err != nil { - return nil, err - } - - return nd, nil -} - -func (s *ShareService) GetShares(ctx context.Context, root *share.Root) ([][]share.Share, error) { - eds, err := s.rtrv.Retrieve(ctx, root) - if err != nil { - return nil, err - } - - origWidth := int(eds.Width() / 2) - shares := make([][]share.Share, origWidth) - - for i := 0; i < origWidth; i++ { - row := eds.Row(uint(i)) - shares[i] = make([]share.Share, origWidth) - for j := 0; j < origWidth; j++ { - shares[i][j] = row[j] - } - } - - return shares, nil -} - -// GetSharesByNamespace iterates over a square's row roots and accumulates the found shares in the -// given namespace.ID. -func (s *ShareService) GetSharesByNamespace( - ctx context.Context, - root *share.Root, - nID namespace.ID, -) ([]share.Share, error) { - if len(nID) != share.NamespaceSize { - return nil, fmt.Errorf("expected namespace ID of size %d, got %d", share.NamespaceSize, len(nID)) - } - - rowRootCIDs := make([]cid.Cid, 0) - for _, row := range root.RowsRoots { - if !nID.Less(nmt.MinNamespace(row, nID.Size())) && nID.LessOrEqual(nmt.MaxNamespace(row, nID.Size())) { - rowRootCIDs = append(rowRootCIDs, ipld.MustCidFromNamespacedSha256(row)) - } - } - if len(rowRootCIDs) == 0 { - return nil, nil - } - - errGroup, ctx := errgroup.WithContext(ctx) - shares := make([][]share.Share, len(rowRootCIDs)) - for i, rootCID := range rowRootCIDs { - // shadow loop variables, to ensure correct values are captured - i, rootCID := i, rootCID - errGroup.Go(func() (err error) { - shares[i], err = share.GetSharesByNamespace(ctx, s.bServ, rootCID, nID, len(root.RowsRoots), nil) - return - }) - } - - if err := errGroup.Wait(); err != nil { - return nil, err - } - - // we don't know the amount of shares in the namespace, so we cannot preallocate properly - // TODO(@Wondertan): Consider improving encoding schema for data in the shares that will also - // include metadata with the amount of shares. If we are talking about plenty of data here, proper - // preallocation would make a difference - var out []share.Share - for i := 0; i < len(rowRootCIDs); i++ { - out = append(out, shares[i]...) - } - - return out, nil -}