Skip to content

Commit

Permalink
Add NotifyRemoveCids (#9)
Browse files Browse the repository at this point in the history
* add NotifyRemoveCids

* fix interface for NotifyRemoveCar

* minor fix
  • Loading branch information
adlrocha authored Aug 26, 2021
1 parent f373f22 commit 55c890b
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 21 deletions.
54 changes: 36 additions & 18 deletions core/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,6 @@ func (e *Engine) Push(ctx context.Context, indexer peer.ID, cid cid.Cid, metadat
}

func (e *Engine) NotifyPutCids(ctx context.Context, cids []cid.Cid, metadata []byte) (cid.Cid, error) {
latestAdvID, err := e.getLatest(false)
if err != nil {
return cid.Undef, err
}
latestIndexLink, err := e.getLatestIndexLink()
if err != nil {
return cid.Undef, err
Expand All @@ -136,35 +132,34 @@ func (e *Engine) NotifyPutCids(ctx context.Context, cids []cid.Cid, metadata []b
if err != nil {
return cid.Undef, err
}
return e.publishAdvForIndex(ctx, indexLnk)
}

// TODO: We should probably prevent providers from being able to advertise
// the same index several times. It may lead to a lot of duplicate retrievals?

// Update the latest index
iLnk, err := indexLnk.AsLink()
func (e *Engine) NotifyRemoveCids(ctx context.Context, cids []cid.Cid) (cid.Cid, error) {
latestIndexLink, err := e.getLatestIndexLink()
if err != nil {
return cid.Undef, err
}
err = e.putLatestIndex(iLnk.(cidlink.Link).Cid)
if err != nil {
return cid.Undef, err
// Selectors don't like Cid.Undef. The exchange fails if we build
// a link with cid.Undef for the genesis index. To avoid this we
// check if cid.Undef, and if yes we set to nil.
if schema.Link_Index(latestIndexLink).ToCid() == cid.Undef {
latestIndexLink = nil
}
adv, err := schema.NewAdvertisement(e.privKey, latestAdvID.Bytes(), indexLnk, e.host.ID().String())
// Lsys will store the index conveniently here.
_, indexLnk, err := schema.NewIndexFromCids(e.lsys, nil, cids, nil, latestIndexLink)
if err != nil {
return cid.Undef, err
}
return e.Publish(ctx, adv)
}

func (e *Engine) NotifyRemoveCids(ctx context.Context, cids []cid.Cid, metadata []byte) (cid.Cid, error) {
panic("not implemented")
return e.publishAdvForIndex(ctx, indexLnk)
}

func (e *Engine) NotifyPutCar(ctx context.Context, carID cid.Cid, metadata []byte) (cid.Cid, error) {
panic("not implemented")
}

func (e *Engine) NotifyRemoveCar(ctx context.Context, carID cid.Cid, metadata []byte) (cid.Cid, error) {
func (e *Engine) NotifyRemoveCar(ctx context.Context, carID cid.Cid) (cid.Cid, error) {
panic("not implemented")
}

Expand Down Expand Up @@ -193,3 +188,26 @@ func (e *Engine) GetLatestAdv(ctx context.Context) (schema.Advertisement, error)
}
return e.GetAdv(ctx, latestAdv)
}

func (e *Engine) publishAdvForIndex(ctx context.Context, lnk schema.Link_Index) (cid.Cid, error) {
// TODO: We should probably prevent providers from being able to advertise
// the same index several times. It may lead to a lot of duplicate retrievals?
latestAdvID, err := e.getLatest(false)
if err != nil {
return cid.Undef, err
}
// Update the latest index
iLnk, err := lnk.AsLink()
if err != nil {
return cid.Undef, err
}
err = e.putLatestIndex(iLnk.(cidlink.Link).Cid)
if err != nil {
return cid.Undef, err
}
adv, err := schema.NewAdvertisement(e.privKey, latestAdvID.Bytes(), lnk, e.host.ID().String())
if err != nil {
return cid.Undef, err
}
return e.Publish(ctx, adv)
}
17 changes: 16 additions & 1 deletion core/engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func TestNotifyPublish(t *testing.T) {
require.Equal(t, ipld.DeepEqual(fAdv, adv), true, "latest fetched advertisement is not equal to published one")
}

func TestNotifyPutCids(t *testing.T) {
func TestNotifyPutAndRemoveCids(t *testing.T) {
ctx := context.Background()
e, err := mkEngine(t)
require.NoError(t, err)
Expand Down Expand Up @@ -220,5 +220,20 @@ func TestNotifyPutCids(t *testing.T) {
if !downstream.Equals(c) {
t.Fatalf("not the right advertisement published %s vs %s", downstream, c)
}
// TODO: Add a sanity-check to see if the list of cids have been set correctly.
}

// NotifyRemove the previous ones
c, err = e.NotifyRemoveCids(ctx, cids)
require.NoError(t, err)
// Check that the update has been published and can be fetched from subscriber
select {
case <-time.After(time.Second * 5):
t.Fatal("timed out waiting for sync to propogate")
case downstream := <-watcher:
if !downstream.Equals(c) {
t.Fatalf("not the right advertisement published %s vs %s", downstream, c)
}
// TODO: Add a sanity-check to see if the list of cids have been set correctly.
}
}
4 changes: 2 additions & 2 deletions core/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ type Interface interface {

// NotifyRemoveCids notifies that a list of CIDs have been removed from the provider
// and generates and publishes the corresponding advertisement.
NotifyRemoveCids(ctx context.Context, cids []cid.Cid, metadata []byte) (cid.Cid, error)
NotifyRemoveCids(ctx context.Context, cids []cid.Cid) (cid.Cid, error)

// NotifyRemoveCAr notifies that a CAR has been removed from the provider
// and generates and publishes the corresponding advertisement.
NotifyRemoveCar(ctx context.Context, carID cid.Cid, metadata []byte) (cid.Cid, error)
NotifyRemoveCar(ctx context.Context, carID cid.Cid) (cid.Cid, error)

// GetAdv gets an advertisement by CID from local storage.
GetAdv(ctx context.Context, id cid.Cid) (schema.Advertisement, error)
Expand Down

0 comments on commit 55c890b

Please sign in to comment.