Skip to content

Commit

Permalink
Implement CAR CID iterator and supplier
Browse files Browse the repository at this point in the history
Implements mechanisms to list the CIDs given a CAR path, regardless of
the CAR's version. The mechanism allows paths to be put and removed, and
supplies CID iterators given a CAR ID.

Implement mechanism to generate CID IDs for a CAR file, regardless of
its version.

Implement a CID iterator reader that given a CID iterator and a
marshaller converts it into a stream of bytes as an `io.ReadCloser`.

Implement tests using sample CAR files in versions 1 and 2.

Run `gofumpt -l -w .` across the whole repo for consistent formatting.

Leave TODOs on undecided or improvable implementations to contain the PR
scope.

Increase test timeout flaky engine tests.
  • Loading branch information
masih committed Aug 27, 2021
1 parent 55c890b commit 08d4a5e
Show file tree
Hide file tree
Showing 14 changed files with 571 additions and 6 deletions.
1 change: 0 additions & 1 deletion core/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ func New(ctx context.Context,

// NewFromConfig creates a reference provider engine with the corresponding config.
func NewFromConfig(ctx context.Context, cfg config.Config, ds datastore.Batching, host host.Host) (*Engine, error) {

log.Debugw("Starting new reference provider engine")
privKey, err := cfg.Identity.DecodePrivateKey("")
if err != nil {
Expand Down
9 changes: 4 additions & 5 deletions core/engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ func mkEngine(t *testing.T) (*Engine, error) {
store := dssync.MutexWrap(datastore.NewMapDatastore())

return New(context.Background(), priv, h, store, testTopic)

}

func connectHosts(t *testing.T, srcHost, dstHost host.Host) {
Expand All @@ -64,7 +63,7 @@ func connectHosts(t *testing.T, srcHost, dstHost host.Host) {
}

func RandomCids(n int) ([]cid.Cid, error) {
var prng = rand.New(rand.NewSource(time.Now().UnixNano()))
prng := rand.New(rand.NewSource(time.Now().UnixNano()))

res := make([]cid.Cid, n)
for i := 0; i < n; i++ {
Expand Down Expand Up @@ -200,7 +199,7 @@ func TestNotifyPutAndRemoveCids(t *testing.T) {

// Check that the update has been published and can be fetched from subscriber
select {
case <-time.After(time.Second * 5):
case <-time.After(time.Second * 10):
t.Fatal("timed out waiting for sync to propogate")
case downstream := <-watcher:
if !downstream.Equals(c) {
Expand All @@ -214,7 +213,7 @@ func TestNotifyPutAndRemoveCids(t *testing.T) {
require.NoError(t, err)
// Check that the update has been published and can be fetched from subscriber
select {
case <-time.After(time.Second * 5):
case <-time.After(time.Second * 10):
t.Fatal("timed out waiting for sync to propogate")
case downstream := <-watcher:
if !downstream.Equals(c) {
Expand All @@ -228,7 +227,7 @@ func TestNotifyPutAndRemoveCids(t *testing.T) {
require.NoError(t, err)
// Check that the update has been published and can be fetched from subscriber
select {
case <-time.After(time.Second * 5):
case <-time.After(time.Second * 10):
t.Fatal("timed out waiting for sync to propogate")
case downstream := <-watcher:
if !downstream.Equals(c) {
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@ require (
github.com/ipfs/go-datastore v0.4.6
github.com/ipfs/go-ds-leveldb v0.4.2
github.com/ipfs/go-log/v2 v2.3.0
github.com/ipld/go-car/v2 v2.0.3-0.20210827150014-1bac13d05359
github.com/ipld/go-ipld-prime v0.12.0
github.com/lib/pq v1.10.2
github.com/libp2p/go-libp2p v0.15.0-rc.1
github.com/libp2p/go-libp2p-core v0.9.0
github.com/mitchellh/go-homedir v1.1.0
github.com/multiformats/go-multicodec v0.3.0
github.com/multiformats/go-multihash v0.0.16
github.com/stretchr/testify v1.7.0
github.com/urfave/cli/v2 v2.3.0
Expand Down
41 changes: 41 additions & 0 deletions go.sum

Large diffs are not rendered by default.

57 changes: 57 additions & 0 deletions internal/suppliers/car_ciditerator.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package suppliers

import (
"context"
"io"

"github.com/ipfs/go-cid"
"github.com/ipld/go-car/v2"
"github.com/ipld/go-car/v2/blockstore"
)

type carCidIterator struct {
closer io.Closer
cancel context.CancelFunc
cidsChan <-chan cid.Cid
errChan <-chan error
}

func newCarCidIterator(path string, opts ...car.ReadOption) (*carCidIterator, error) {
robs, err := blockstore.OpenReadOnly(path, opts...)
if err != nil {
return nil, err
}
errChan := make(chan error)
ctx, cancel := context.WithCancel(context.Background())
ctx = blockstore.WithAsyncErrorHandler(ctx, func(err error) { errChan <- err })
cidsChan, err := robs.AllKeysChan(ctx)
if err != nil {
cancel()
_ = robs.Close()
return nil, err
}

return &carCidIterator{
closer: robs,
cancel: cancel,
cidsChan: cidsChan,
errChan: errChan,
}, nil
}

func (c *carCidIterator) Next() (cid.Cid, error) {
select {
case err := <-c.errChan:
return cid.Undef, err
case nextCid, ok := <-c.cidsChan:
if ok {
return nextCid, nil
}
return cid.Undef, io.EOF
}
}

func (c *carCidIterator) Close() error {
c.cancel()
return c.closer.Close()
}
62 changes: 62 additions & 0 deletions internal/suppliers/car_ciditerator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package suppliers

import (
"context"
"io"
"testing"
"time"

"github.com/ipfs/go-cid"
"github.com/ipld/go-car/v2/blockstore"
"github.com/stretchr/testify/require"
)

func TestCarCidIteratorReturnsExpectedCids(t *testing.T) {
tests := []struct {
name string
carPath string
}{
{
"CARv1ReturnsExpectedCIDs",
"../../testdata/sample-v1.car",
},
{
"CARv2ReturnsExpectedCIDs",
"../../testdata/sample-wrapped-v2.car",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
subject, err := newCarCidIterator(tt.carPath)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, subject.Close()) })

// Open ReadOnly blockstore used to provide wanted case for testing
want, err := blockstore.OpenReadOnly(tt.carPath)
require.NoError(t, err)

// Wait at most 3 seconds for iteration over wanted CIDs.
ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)

// Fail immediately if error is encountered while iterating over CIDs.
ctx = blockstore.WithAsyncErrorHandler(ctx, func(err error) { require.Fail(t, "expected no error", "%v", err) })
t.Cleanup(cancel)

// Instantiate wanted CIDs channel
keysChan, err := want.AllKeysChan(ctx)
require.NoError(t, err)

// Assert CIDs are consistent with the iterator
for wantCid := range keysChan {
gotCid, gotErr := subject.Next()
require.NoError(t, gotErr)
require.Equal(t, wantCid, gotCid)
}

// Assert there are no more CIDs left to consume via the iterator.
gotCid, gotErr := subject.Next()
require.Equal(t, io.EOF, gotErr)
require.Equal(t, cid.Undef, gotCid)
})
}
}
187 changes: 187 additions & 0 deletions internal/suppliers/car_supplier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
package suppliers

import (
"crypto/sha256"
"encoding/base64"
"io"
"path/filepath"

"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipld/go-car/v2"
"github.com/multiformats/go-multicodec"
"github.com/multiformats/go-multihash"
)

const carPathKeyPrefix = "car://"

var (
_ CidIteratorSupplier = (*CarSupplier)(nil)
_ io.Closer = (*CarSupplier)(nil)
_ io.Closer = (*carCidIterator)(nil)
_ CidIterator = (*carCidIterator)(nil)
)

type CarSupplier struct {
ds datastore.Datastore
opts []car.ReadOption
}

func NewCarSupplier(ds datastore.Datastore, opts ...car.ReadOption) *CarSupplier {
return &CarSupplier{
ds: ds,
opts: opts,
}
}

// Put makes the CAR at given path suppliable by this supplier.
// The return CID can then be used via Supply to get an iterator over CIDs that belong to the CAR.
// The ID is generated based on the content of the CAR.
// When the CAR ID is already known, PutWithID should be used instead.
// This function accepts both CARv1 and CARv2 formats.
func (cs *CarSupplier) Put(path string) (cid.Cid, error) {
// Clean path to CAR.
path = filepath.Clean(path)

// Generate a CID for the CAR at given path.
id, err := generateID(path)
if err != nil {
return cid.Undef, err
}

return cs.PutWithID(id, path)
}

// PutWithID makes the CAR at given path suppliable by this supplier identified by the given ID.
// The return CID can then be used via Supply to get an iterator over CIDs that belong to the CAR.
// When the CAR ID is not known, Put should be used instead.
// This function accepts both CARv1 and CARv2 formats.
func (cs *CarSupplier) PutWithID(id cid.Cid, path string) (cid.Cid, error) {
// Clean path to CAR.
path = filepath.Clean(path)

// Store mapping of CAR ID to path, used to instantiate CID iterator.
carIdKey := toCarIdKey(id)
if err := cs.ds.Put(carIdKey, []byte(path)); err != nil {
return cid.Undef, err
}

// Store mapping of path to CAR ID, used to lookup the CAR by path when it is removed.
if err := cs.ds.Put(toPathKey(path), id.Bytes()); err != nil {
return cid.Undef, err
}
return id, nil
}

func toCarIdKey(id cid.Cid) datastore.Key {
return datastore.NewKey(id.String())
}

// Remove removes the CAR at the given path from the list of suppliable CID iterators.
// If the CAR at given path is not known, this function will return an error.
// This function accepts both CARv1 and CARv2 formats.
func (cs *CarSupplier) Remove(path string) (cid.Cid, error) {
// Clean path.
path = filepath.Clean(path)

// Find the CAR ID that corresponds to the given path
pathKey := toPathKey(path)
id, err := cs.getCarIDFromPathKey(pathKey)
if err != nil {
if err == datastore.ErrNotFound {
err = ErrNotFound
}
return cid.Undef, err
}

// Delete mapping of CAR ID to path.
carIdKey := toCarIdKey(id)
if err := cs.ds.Delete(carIdKey); err != nil {
// TODO improve error handling logic
// we shouldn't typically get NotFound error here.
// If we do then a put must have failed prematurely
// See what we can do to opportunistically heal the datastore.
return cid.Undef, err
}

// Delete mapping of path to CAR ID.
if err := cs.ds.Delete(pathKey); err != nil {
// TODO improve error handling logic
// we shouldn't typically get NotFound error here.
// If we do then a put must have failed prematurely
// See what we can do to opportunistically heal the datastore.
return cid.Undef, err
}
return id, nil
}

// Supply supplies an iterator over CIDs of the CAR file that corresponds to the given key.
// An error is returned if no CAR file is found for the key.
func (cs *CarSupplier) Supply(key cid.Cid) (CidIterator, error) {
b, err := cs.ds.Get(toCarIdKey(key))
if err != nil {
if err == datastore.ErrNotFound {
return nil, ErrNotFound
}
return nil, err
}
path := string(b)
return newCarCidIterator(path, cs.opts...)
}

// Close permanently closes this supplier.
// After calling Close this supplier is no longer usable.
func (cs *CarSupplier) Close() error {
return cs.ds.Close()
}

func (cs *CarSupplier) getCarIDFromPathKey(pathKey datastore.Key) (cid.Cid, error) {
carIdBytes, err := cs.ds.Get(pathKey)
if err != nil {
return cid.Undef, err
}
_, c, err := cid.CidFromBytes(carIdBytes)
return c, err
}

// generateID generates a unique ID for a CAR at a given path.
// The ID is in form of a CID, generated by hashing the list of all CIDs inside the CAR payload.
// This implies that different CARs that have the same CID list appearing in the same order will have the same ID, regardless of version.
// For example, CARv1 and wrapped CARv2 version of it will have the same CID list.
// This function accepts both CARv1 and CARv2 payloads
func generateID(path string, opts ...car.ReadOption) (cid.Cid, error) {
// TODO investigate if there is a more efficient and version-agnostic way to generate CID for a CAR file.
// HINT it will most likely be more efficient to generate the ID using the index of a CAR if it is an indexed CARv2
// and fall back on current approach otherwise. Note, the CAR index has the multihashes of CIDs not full CIDs,
// and that should be enough for the purposes of ID generation.

// Instantiate iterator over CAR CIDs.
cri, err := newCarCidIterator(path, opts...)
if err != nil {
return cid.Undef, err
}
defer cri.Close()
// Instantiate a reader over the CID iterator to turn CIDs into bytes.
// Note we use the multihash of CIDs instead of the entire CID.
// TODO consider implementing an efficient multihash iterator for cars.
reader := NewCidIteratorReadCloser(cri, func(cid cid.Cid) ([]byte, error) { return cid.Hash(), nil })

// Generate multihash of CAR's CIDs.
mh, err := multihash.SumStream(reader, multihash.SHA2_256, -1)
if err != nil {
return cid.Undef, err
}
// TODO Figure out what the codec should be.
// HINT we could use the root CID codec or the first CID's codec.
// Construct the ID for the CAR in form of a CID.
return cid.NewCidV1(uint64(multicodec.DagCbor), mh), nil
}

func toPathKey(path string) datastore.Key {
// Hash the path to get a fixed length string as key, regardless of how long the path is.
pathHash := sha256.New().Sum([]byte(path))
encPathHash := base64.StdEncoding.EncodeToString(pathHash)

// Prefix the hash with some constant for namespacing and debug readability.
return datastore.NewKey(carPathKeyPrefix + encPathHash)
}
Loading

0 comments on commit 08d4a5e

Please sign in to comment.