Skip to content

Commit

Permalink
making share.Availability depend on share.Getter
Browse files Browse the repository at this point in the history
  • Loading branch information
distractedm1nd committed Dec 20, 2022
1 parent f5d47bf commit 1ee0ea2
Show file tree
Hide file tree
Showing 10 changed files with 41 additions and 43 deletions.
7 changes: 4 additions & 3 deletions das/daser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/celestiaorg/celestia-node/fraud"
"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/share/getters"
)

var timeout = time.Second * 15
Expand All @@ -32,7 +33,7 @@ var timeout = time.Second * 15
func TestDASerLifecycle(t *testing.T) {
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
bServ := mdutils.Bserv()
avail := light.TestAvailability(bServ)
avail := light.TestAvailability(getters.NewIPLDGetter(bServ))
// 15 headers from the past and 15 future headers
mockGet, sub, mockService := createDASerSubcomponents(t, bServ, 15, 15)

Expand Down Expand Up @@ -72,7 +73,7 @@ func TestDASerLifecycle(t *testing.T) {
func TestDASer_Restart(t *testing.T) {
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
bServ := mdutils.Bserv()
avail := light.TestAvailability(bServ)
avail := light.TestAvailability(getters.NewIPLDGetter(bServ))
// 15 headers from the past and 15 future headers
mockGet, sub, mockService := createDASerSubcomponents(t, bServ, 15, 15)

Expand Down Expand Up @@ -147,7 +148,7 @@ func TestDASer_stopsAfter_BEFP(t *testing.T) {
ps, err := pubsub.NewGossipSub(ctx, net.Hosts()[0],
pubsub.WithMessageSignaturePolicy(pubsub.StrictNoSign))
require.NoError(t, err)
avail := full.TestAvailability(bServ)
avail := full.TestAvailability(getters.NewIPLDGetter(bServ))
// 15 headers from the past and 15 future headers
mockGet, sub, _ := createDASerSubcomponents(t, bServ, 15, 15)

Expand Down
6 changes: 2 additions & 4 deletions nodebuilder/share/constructors.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package share

import (
"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-datastore"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/routing"
Expand All @@ -11,7 +10,6 @@ import (
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/availability/cache"
disc "github.com/celestiaorg/celestia-node/share/availability/discovery"
"github.com/celestiaorg/celestia-node/share/getters"
)

func discovery(cfg Config) func(routing.ContentRouting, host.Host) *disc.Discovery {
Expand All @@ -38,6 +36,6 @@ func cacheAvailability[A share.Availability](lc fx.Lifecycle, ds datastore.Batch
return ca
}

func newModule(bServ blockservice.BlockService, avail share.Availability) Module {
return &module{getters.NewIPLDGetter(bServ), avail}
func newModule(getter share.Getter, avail share.Availability) Module {
return &module{getter, avail}
}
2 changes: 2 additions & 0 deletions nodebuilder/share/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/celestiaorg/celestia-node/share/availability/full"
"github.com/celestiaorg/celestia-node/share/availability/light"
"github.com/celestiaorg/celestia-node/share/eds"
"github.com/celestiaorg/celestia-node/share/getters"
)

func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option {
Expand All @@ -24,6 +25,7 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option
fx.Invoke(share.EnsureEmptySquareExists),
fx.Provide(discovery(*cfg)),
fx.Provide(newModule),
fx.Provide(getters.NewIPLDGetter),
)

switch tp {
Expand Down
7 changes: 5 additions & 2 deletions share/availability/cache/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,17 @@ import (
"github.com/celestiaorg/celestia-node/share/availability/full"
"github.com/celestiaorg/celestia-node/share/availability/light"
availability_test "github.com/celestiaorg/celestia-node/share/availability/test"
"github.com/celestiaorg/celestia-node/share/getters"
)

// RandLightLocalServiceWithSquare is the same as light.RandServiceWithSquare, except
// the share.Availability is wrapped with cache availability.
func RandLightLocalServiceWithSquare(t *testing.T, n int) (share.Availability, *share.Root) {
bServ := mdutils.Bserv()
store := dssync.MutexWrap(ds.NewMapDatastore())
getter := getters.NewIPLDGetter(bServ)
avail := NewShareAvailability(
light.TestAvailability(bServ),
light.TestAvailability(getter),
store,
)
return avail, availability_test.RandFillBS(t, n, bServ)
Expand All @@ -30,8 +32,9 @@ func RandLightLocalServiceWithSquare(t *testing.T, n int) (share.Availability, *
func RandFullLocalServiceWithSquare(t *testing.T, n int) (share.Availability, *share.Root) {
bServ := mdutils.Bserv()
store := dssync.MutexWrap(ds.NewMapDatastore())
getter := getters.NewIPLDGetter(bServ)
avail := NewShareAvailability(
full.TestAvailability(bServ),
full.TestAvailability(getter),
store,
)
return avail, availability_test.RandFillBS(t, n, bServ)
Expand Down
14 changes: 6 additions & 8 deletions share/availability/full/availability.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,12 @@ import (
"context"
"errors"

"github.com/ipfs/go-blockservice"
ipldFormat "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p-core/peer"

"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/availability/discovery"
"github.com/celestiaorg/celestia-node/share/eds"
)

var log = logging.Logger("share/full")
Expand All @@ -20,17 +18,17 @@ var log = logging.Logger("share/full")
// recovery technique. It is considered "full" because it is required
// to download enough shares to fully reconstruct the data square.
type ShareAvailability struct {
rtrv *eds.Retriever
disc *discovery.Discovery
getter share.Getter
disc *discovery.Discovery

cancel context.CancelFunc
}

// NewShareAvailability creates a new full ShareAvailability.
func NewShareAvailability(bServ blockservice.BlockService, disc *discovery.Discovery) *ShareAvailability {
func NewShareAvailability(getter share.Getter, disc *discovery.Discovery) *ShareAvailability {
return &ShareAvailability{
rtrv: eds.NewRetriever(bServ),
disc: disc,
getter: getter,
disc: disc,
}
}

Expand Down Expand Up @@ -61,7 +59,7 @@ func (fa *ShareAvailability) SharesAvailable(ctx context.Context, root *share.Ro
panic(err)
}

_, err := fa.rtrv.Retrieve(ctx, root)
_, err := fa.getter.GetShares(ctx, root)
if err != nil {
log.Errorw("availability validation failed", "root", root.Hash(), "err", err)
if ipldFormat.IsNotFound(err) || errors.Is(err, context.DeadlineExceeded) {
Expand Down
10 changes: 5 additions & 5 deletions share/availability/full/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"testing"
"time"

"github.com/ipfs/go-blockservice"
mdutils "github.com/ipfs/go-merkledag/test"
routinghelpers "github.com/libp2p/go-libp2p-routing-helpers"
"github.com/libp2p/go-libp2p/p2p/discovery/routing"
Expand All @@ -19,7 +18,8 @@ import (
// trees of 'n' random shares, essentially storing a whole square.
func RandServiceWithSquare(t *testing.T, n int) (share.Getter, share.Availability, *share.Root) {
bServ := mdutils.Bserv()
return getters.NewIPLDGetter(bServ), TestAvailability(bServ), availability_test.RandFillBS(t, n, bServ)
getter := getters.NewIPLDGetter(bServ)
return getter, TestAvailability(getter), availability_test.RandFillBS(t, n, bServ)
}

// RandNode creates a Full Node filled with a random block of the given size.
Expand All @@ -32,11 +32,11 @@ func RandNode(dn *availability_test.TestDagNet, squareSize int) (*availability_t
func Node(dn *availability_test.TestDagNet) *availability_test.TestNode {
nd := dn.NewTestNode()
nd.Getter = getters.NewIPLDGetter(nd.BlockService)
nd.Availability = TestAvailability(nd.BlockService)
nd.Availability = TestAvailability(nd.Getter)
return nd
}

func TestAvailability(bServ blockservice.BlockService) *ShareAvailability {
func TestAvailability(getter share.Getter) *ShareAvailability {
disc := discovery.NewDiscovery(nil, routing.NewRoutingDiscovery(routinghelpers.Null{}), 0, time.Second, time.Second)
return NewShareAvailability(bServ, disc)
return NewShareAvailability(getter, disc)
}
20 changes: 7 additions & 13 deletions share/availability/light/availability.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,6 @@ import (

"github.com/libp2p/go-libp2p-core/peer"

"github.com/celestiaorg/celestia-node/share/ipld"

"github.com/ipfs/go-blockservice"
ipldFormat "github.com/ipfs/go-ipld-format"
logging "github.com/ipfs/go-log/v2"

Expand All @@ -24,7 +21,7 @@ var log = logging.Logger("share/light")
// its availability. It is assumed that there are a lot of lightAvailability instances
// on the network doing sampling over the same Root to collectively verify its availability.
type ShareAvailability struct {
bserv blockservice.BlockService
getter share.Getter
// disc discovers new full nodes in the network.
// it is not allowed to call advertise for light nodes (Full nodes only).
disc *discovery.Discovery
Expand All @@ -33,12 +30,12 @@ type ShareAvailability struct {

// NewShareAvailability creates a new light Availability.
func NewShareAvailability(
bserv blockservice.BlockService,
getter share.Getter,
disc *discovery.Discovery,
) *ShareAvailability {
la := &ShareAvailability{
bserv: bserv,
disc: disc,
getter: getter,
disc: disc,
}
return la
}
Expand Down Expand Up @@ -76,16 +73,13 @@ func (la *ShareAvailability) SharesAvailable(ctx context.Context, dah *share.Roo
defer cancel()

log.Debugw("starting sampling session", "root", dah.Hash())
ses := blockservice.NewSession(ctx, la.bserv)
errs := make(chan error, len(samples))
for _, s := range samples {
go func(s Sample) {
root, leaf := ipld.Translate(dah, s.Row, s.Col)

log.Debugw("fetching share", "root", dah.Hash(), "leaf CID", leaf)
_, err := share.GetShare(ctx, ses, root, leaf, len(dah.RowsRoots))
log.Debugw("fetching share", "root", dah.Hash(), "row", s.Row, "col", s.Col)
_, err := la.getter.GetShare(ctx, dah, s.Row, s.Col)
if err != nil {
log.Debugw("error fetching share", "root", dah.Hash(), "leaf CID", leaf)
log.Debugw("error fetching share", "root", dah.Hash(), "row", s.Row, "col", s.Col)
}
// we don't really care about Share bodies at this point
// it also means we now saved the Share in local storage
Expand Down
13 changes: 7 additions & 6 deletions share/availability/light/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@ import (
// trees of 'n' random shares, essentially storing a whole square.
func RandServiceWithSquare(t *testing.T, n int) (share.Getter, share.Availability, *share.Root) {
bServ := mdutils.Bserv()

return getters.NewIPLDGetter(bServ), TestAvailability(bServ), availability_test.RandFillBS(t, n, bServ)
getter := getters.NewIPLDGetter(bServ)
return getter, TestAvailability(getter), availability_test.RandFillBS(t, n, bServ)
}

// RandService provides an unfilled share.Getter/share.Availability with corresponding
// blockservice.BlockService than can be filled by the test.
func RandService() (share.Getter, share.Availability, blockservice.BlockService) {
bServ := mdutils.Bserv()
return getters.NewIPLDGetter(bServ), TestAvailability(bServ), bServ
getter := getters.NewIPLDGetter(bServ)
return getter, TestAvailability(getter), bServ
}

// RandNode creates a Light Node filled with a random block of the given size.
Expand All @@ -40,13 +41,13 @@ func RandNode(dn *availability_test.TestDagNet, squareSize int) (*availability_t
func Node(dn *availability_test.TestDagNet) *availability_test.TestNode {
nd := dn.NewTestNode()
nd.Getter = getters.NewIPLDGetter(nd.BlockService)
nd.Availability = TestAvailability(nd.BlockService)
nd.Availability = TestAvailability(nd.Getter)
return nd
}

func TestAvailability(bServ blockservice.BlockService) *ShareAvailability {
func TestAvailability(getter share.Getter) *ShareAvailability {
disc := discovery.NewDiscovery(nil, routing.NewRoutingDiscovery(routinghelpers.Null{}), 0, time.Second, time.Second)
return NewShareAvailability(bServ, disc)
return NewShareAvailability(getter, disc)
}

func SubNetNode(sn *availability_test.SubNet) *availability_test.TestNode {
Expand Down
2 changes: 1 addition & 1 deletion share/getters/ipld.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type IPLDGetter struct {
bServ blockservice.BlockService
}

func NewIPLDGetter(bServ blockservice.BlockService) *IPLDGetter {
func NewIPLDGetter(bServ blockservice.BlockService) share.Getter {
return &IPLDGetter{
rtrv: eds.NewRetriever(bServ),
bServ: bServ,
Expand Down
3 changes: 2 additions & 1 deletion share/ipld/corrupted_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/celestiaorg/celestia-node/share"
"github.com/celestiaorg/celestia-node/share/availability/full"
availability_test "github.com/celestiaorg/celestia-node/share/availability/test"
"github.com/celestiaorg/celestia-node/share/getters"
)

// sharesAvailableTimeout is an arbitrarily picked interval of time in which a TestNode is expected
Expand All @@ -25,7 +26,7 @@ func TestNamespaceHasher_CorruptedData(t *testing.T) {

requestor := full.Node(net)
provider, mockBS := availability_test.MockNode(t, net)
provider.Availability = full.TestAvailability(provider.BlockService)
provider.Availability = full.TestAvailability(getters.NewIPLDGetter(provider.BlockService))
net.ConnectAll()

// before the provider starts attacking, we should be able to retrieve successfully. We pass a size
Expand Down

0 comments on commit 1ee0ea2

Please sign in to comment.