From 9d0dd571946e1fa63732bdbf5b4e9b480e861d4d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Mur=C3=A9?= Date: Mon, 20 Jul 2020 22:17:44 +0200 Subject: [PATCH] wire a context in most of the data pipeline This only do the wiring. Actual usage for cancellation, logging or tracing is left for a future work. --- filestore.go | 40 ++++++++++++++++++++-------------------- filestore_test.go | 13 ++++++++----- fsrefstore.go | 38 +++++++++++++++++++------------------- util.go | 33 +++++++++++++++++---------------- 4 files changed, 64 insertions(+), 60 deletions(-) diff --git a/filestore.go b/filestore.go index a9c36c5..6382a6d 100644 --- a/filestore.go +++ b/filestore.go @@ -115,13 +115,13 @@ func (f *Filestore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { // blockstore. As expected, in the case of FileManager blocks, only the // reference is deleted, not its contents. It may return // ErrNotFound when the block is not stored. -func (f *Filestore) DeleteBlock(c cid.Cid) error { - err1 := f.bs.DeleteBlock(c) +func (f *Filestore) DeleteBlock(ctx context.Context, c cid.Cid) error { + err1 := f.bs.DeleteBlock(ctx, c) if err1 != nil && err1 != blockstore.ErrNotFound { return err1 } - err2 := f.fm.DeleteBlock(c) + err2 := f.fm.DeleteBlock(ctx, c) // if we successfully removed something from the blockstore, but the // filestore didnt have it, return success @@ -140,13 +140,13 @@ func (f *Filestore) DeleteBlock(c cid.Cid) error { // Get retrieves the block with the given Cid. It may return // ErrNotFound when the block is not stored. -func (f *Filestore) Get(c cid.Cid) (blocks.Block, error) { - blk, err := f.bs.Get(c) +func (f *Filestore) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) { + blk, err := f.bs.Get(ctx, c) switch err { case nil: return blk, nil case blockstore.ErrNotFound: - return f.fm.Get(c) + return f.fm.Get(ctx, c) default: return nil, err } @@ -154,13 +154,13 @@ func (f *Filestore) Get(c cid.Cid) (blocks.Block, error) { // GetSize returns the size of the requested block. It may return ErrNotFound // when the block is not stored. -func (f *Filestore) GetSize(c cid.Cid) (int, error) { - size, err := f.bs.GetSize(c) +func (f *Filestore) GetSize(ctx context.Context, c cid.Cid) (int, error) { + size, err := f.bs.GetSize(ctx, c) switch err { case nil: return size, nil case blockstore.ErrNotFound: - return f.fm.GetSize(c) + return f.fm.GetSize(ctx, c) default: return -1, err } @@ -168,8 +168,8 @@ func (f *Filestore) GetSize(c cid.Cid) (int, error) { // Has returns true if the block with the given Cid is // stored in the Filestore. -func (f *Filestore) Has(c cid.Cid) (bool, error) { - has, err := f.bs.Has(c) +func (f *Filestore) Has(ctx context.Context, c cid.Cid) (bool, error) { + has, err := f.bs.Has(ctx, c) if err != nil { return false, err } @@ -178,15 +178,15 @@ func (f *Filestore) Has(c cid.Cid) (bool, error) { return true, nil } - return f.fm.Has(c) + return f.fm.Has(ctx, c) } // Put stores a block in the Filestore. For blocks of // underlying type FilestoreNode, the operation is // delegated to the FileManager, while the rest of blocks // are handled by the regular blockstore. -func (f *Filestore) Put(b blocks.Block) error { - has, err := f.Has(b.Cid()) +func (f *Filestore) Put(ctx context.Context, b blocks.Block) error { + has, err := f.Has(ctx, b.Cid()) if err != nil { return err } @@ -197,20 +197,20 @@ func (f *Filestore) Put(b blocks.Block) error { switch b := b.(type) { case *posinfo.FilestoreNode: - return f.fm.Put(b) + return f.fm.Put(ctx, b) default: - return f.bs.Put(b) + return f.bs.Put(ctx, b) } } // PutMany is like Put(), but takes a slice of blocks, allowing // the underlying blockstore to perform batch transactions. -func (f *Filestore) PutMany(bs []blocks.Block) error { +func (f *Filestore) PutMany(ctx context.Context, bs []blocks.Block) error { var normals []blocks.Block var fstores []*posinfo.FilestoreNode for _, b := range bs { - has, err := f.Has(b.Cid()) + has, err := f.Has(ctx, b.Cid()) if err != nil { return err } @@ -228,14 +228,14 @@ func (f *Filestore) PutMany(bs []blocks.Block) error { } if len(normals) > 0 { - err := f.bs.PutMany(normals) + err := f.bs.PutMany(ctx, normals) if err != nil { return err } } if len(fstores) > 0 { - err := f.fm.PutMany(fstores) + err := f.fm.PutMany(ctx, fstores) if err != nil { return err } diff --git a/filestore_test.go b/filestore_test.go index 783dc86..3d4b1cb 100644 --- a/filestore_test.go +++ b/filestore_test.go @@ -45,6 +45,7 @@ func makeFile(dir string, data []byte) (string, error) { } func TestBasicFilestore(t *testing.T) { + ctx := context.Background() dir, fs := newTestFilestore(t) buf := make([]byte, 1000) @@ -65,7 +66,7 @@ func TestBasicFilestore(t *testing.T) { Node: dag.NewRawNode(buf[i*10 : (i+1)*10]), } - err := fs.Put(n) + err := fs.Put(ctx, n) if err != nil { t.Fatal(err) } @@ -73,7 +74,7 @@ func TestBasicFilestore(t *testing.T) { } for i, c := range cids { - blk, err := fs.Get(c) + blk, err := fs.Get(ctx, c) if err != nil { t.Fatal(err) } @@ -105,6 +106,7 @@ func TestBasicFilestore(t *testing.T) { } func randomFileAdd(t *testing.T, fs *Filestore, dir string, size int) (string, []cid.Cid) { + ctx := context.Background() buf := make([]byte, size) rand.Read(buf) @@ -122,7 +124,7 @@ func randomFileAdd(t *testing.T, fs *Filestore, dir string, size int) (string, [ }, Node: dag.NewRawNode(buf[i*10 : (i+1)*10]), } - err := fs.Put(n) + err := fs.Put(ctx, n) if err != nil { t.Fatal(err) } @@ -133,11 +135,12 @@ func randomFileAdd(t *testing.T, fs *Filestore, dir string, size int) (string, [ } func TestDeletes(t *testing.T) { + ctx := context.Background() dir, fs := newTestFilestore(t) _, cids := randomFileAdd(t, fs, dir, 100) todelete := cids[:4] for _, c := range todelete { - err := fs.DeleteBlock(c) + err := fs.DeleteBlock(ctx, c) if err != nil { t.Fatal(err) } @@ -145,7 +148,7 @@ func TestDeletes(t *testing.T) { deleted := make(map[string]bool) for _, c := range todelete { - _, err := fs.Get(c) + _, err := fs.Get(ctx, c) if err != blockstore.ErrNotFound { t.Fatal("expected blockstore not found error") } diff --git a/fsrefstore.go b/fsrefstore.go index bc183fc..1f81fa3 100644 --- a/fsrefstore.go +++ b/fsrefstore.go @@ -66,7 +66,7 @@ func NewFileManager(ds ds.Batching, root string) *FileManager { func (f *FileManager) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { q := dsq.Query{KeysOnly: true} - res, err := f.ds.Query(q) + res, err := f.ds.Query(ctx, q) if err != nil { return nil, err } @@ -100,8 +100,8 @@ func (f *FileManager) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { // DeleteBlock deletes the reference-block from the underlying // datastore. It does not touch the referenced data. -func (f *FileManager) DeleteBlock(c cid.Cid) error { - err := f.ds.Delete(dshelp.MultihashToDsKey(c.Hash())) +func (f *FileManager) DeleteBlock(ctx context.Context, c cid.Cid) error { + err := f.ds.Delete(ctx, dshelp.MultihashToDsKey(c.Hash())) if err == ds.ErrNotFound { return blockstore.ErrNotFound } @@ -112,8 +112,8 @@ func (f *FileManager) DeleteBlock(c cid.Cid) error { // is done in two steps: the first step retrieves the reference // block from the datastore. The second step uses the stored // path and offsets to read the raw block data directly from disk. -func (f *FileManager) Get(c cid.Cid) (blocks.Block, error) { - dobj, err := f.getDataObj(c.Hash()) +func (f *FileManager) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) { + dobj, err := f.getDataObj(ctx, c.Hash()) if err != nil { return nil, err } @@ -129,8 +129,8 @@ func (f *FileManager) Get(c cid.Cid) (blocks.Block, error) { // // This method may successfully return the size even if returning the block // would fail because the associated file is no longer available. -func (f *FileManager) GetSize(c cid.Cid) (int, error) { - dobj, err := f.getDataObj(c.Hash()) +func (f *FileManager) GetSize(ctx context.Context, c cid.Cid) (int, error) { + dobj, err := f.getDataObj(ctx, c.Hash()) if err != nil { return -1, err } @@ -144,8 +144,8 @@ func (f *FileManager) readDataObj(m mh.Multihash, d *pb.DataObj) ([]byte, error) return f.readFileDataObj(m, d) } -func (f *FileManager) getDataObj(m mh.Multihash) (*pb.DataObj, error) { - o, err := f.ds.Get(dshelp.MultihashToDsKey(m)) +func (f *FileManager) getDataObj(ctx context.Context, m mh.Multihash) (*pb.DataObj, error) { + o, err := f.ds.Get(ctx, dshelp.MultihashToDsKey(m)) switch err { case ds.ErrNotFound: return nil, blockstore.ErrNotFound @@ -261,24 +261,24 @@ func (f *FileManager) readURLDataObj(m mh.Multihash, d *pb.DataObj) ([]byte, err // Has returns if the FileManager is storing a block reference. It does not // validate the data, nor checks if the reference is valid. -func (f *FileManager) Has(c cid.Cid) (bool, error) { +func (f *FileManager) Has(ctx context.Context, c cid.Cid) (bool, error) { // NOTE: interesting thing to consider. Has doesnt validate the data. // So the data on disk could be invalid, and we could think we have it. dsk := dshelp.MultihashToDsKey(c.Hash()) - return f.ds.Has(dsk) + return f.ds.Has(ctx, dsk) } type putter interface { - Put(ds.Key, []byte) error + Put(context.Context, ds.Key, []byte) error } // Put adds a new reference block to the FileManager. It does not check // that the reference is valid. -func (f *FileManager) Put(b *posinfo.FilestoreNode) error { - return f.putTo(b, f.ds) +func (f *FileManager) Put(ctx context.Context, b *posinfo.FilestoreNode) error { + return f.putTo(ctx, b, f.ds) } -func (f *FileManager) putTo(b *posinfo.FilestoreNode, to putter) error { +func (f *FileManager) putTo(ctx context.Context, b *posinfo.FilestoreNode, to putter) error { var dobj pb.DataObj if IsURL(b.PosInfo.FullPath) { @@ -309,24 +309,24 @@ func (f *FileManager) putTo(b *posinfo.FilestoreNode, to putter) error { return err } - return to.Put(dshelp.MultihashToDsKey(b.Cid().Hash()), data) + return to.Put(ctx, dshelp.MultihashToDsKey(b.Cid().Hash()), data) } // PutMany is like Put() but takes a slice of blocks instead, // allowing it to create a batch transaction. -func (f *FileManager) PutMany(bs []*posinfo.FilestoreNode) error { +func (f *FileManager) PutMany(ctx context.Context, bs []*posinfo.FilestoreNode) error { batch, err := f.ds.Batch() if err != nil { return err } for _, b := range bs { - if err := f.putTo(b, batch); err != nil { + if err := f.putTo(ctx, b, batch); err != nil { return err } } - return batch.Commit() + return batch.Commit(ctx) } // IsURL returns true if the string represents a valid URL that the diff --git a/util.go b/util.go index dc860f7..f99c044 100644 --- a/util.go +++ b/util.go @@ -1,6 +1,7 @@ package filestore import ( + "context" "fmt" "sort" @@ -86,42 +87,42 @@ func (r *ListRes) FormatLong(enc func(cid.Cid) string) string { // of the given Filestore and returns a ListRes object with the information. // List does not verify that the reference is valid or whether the // raw data is accesible. See Verify(). -func List(fs *Filestore, key cid.Cid) *ListRes { - return list(fs, false, key.Hash()) +func List(ctx context.Context, fs *Filestore, key cid.Cid) *ListRes { + return list(ctx, fs, false, key.Hash()) } // ListAll returns a function as an iterator which, once invoked, returns // one by one each block in the Filestore's FileManager. // ListAll does not verify that the references are valid or whether // the raw data is accessible. See VerifyAll(). -func ListAll(fs *Filestore, fileOrder bool) (func() *ListRes, error) { +func ListAll(ctx context.Context, fs *Filestore, fileOrder bool) (func() *ListRes, error) { if fileOrder { - return listAllFileOrder(fs, false) + return listAllFileOrder(ctx, fs, false) } - return listAll(fs, false) + return listAll(ctx, fs, false) } // Verify fetches the block with the given key from the Filemanager // of the given Filestore and returns a ListRes object with the information. // Verify makes sure that the reference is valid and the block data can be // read. -func Verify(fs *Filestore, key cid.Cid) *ListRes { - return list(fs, true, key.Hash()) +func Verify(ctx context.Context, fs *Filestore, key cid.Cid) *ListRes { + return list(ctx, fs, true, key.Hash()) } // VerifyAll returns a function as an iterator which, once invoked, // returns one by one each block in the Filestore's FileManager. // VerifyAll checks that the reference is valid and that the block data // can be read. -func VerifyAll(fs *Filestore, fileOrder bool) (func() *ListRes, error) { +func VerifyAll(ctx context.Context, fs *Filestore, fileOrder bool) (func() *ListRes, error) { if fileOrder { - return listAllFileOrder(fs, true) + return listAllFileOrder(ctx, fs, true) } - return listAll(fs, true) + return listAll(ctx, fs, true) } -func list(fs *Filestore, verify bool, key mh.Multihash) *ListRes { - dobj, err := fs.fm.getDataObj(key) +func list(ctx context.Context, fs *Filestore, verify bool, key mh.Multihash) *ListRes { + dobj, err := fs.fm.getDataObj(ctx, key) if err != nil { return mkListRes(key, nil, err) } @@ -131,9 +132,9 @@ func list(fs *Filestore, verify bool, key mh.Multihash) *ListRes { return mkListRes(key, dobj, err) } -func listAll(fs *Filestore, verify bool) (func() *ListRes, error) { +func listAll(ctx context.Context, fs *Filestore, verify bool) (func() *ListRes, error) { q := dsq.Query{} - qr, err := fs.fm.ds.Query(q) + qr, err := fs.fm.ds.Query(ctx, q) if err != nil { return nil, err } @@ -169,9 +170,9 @@ func next(qr dsq.Results) (mh.Multihash, *pb.DataObj, error) { return mhash, dobj, nil } -func listAllFileOrder(fs *Filestore, verify bool) (func() *ListRes, error) { +func listAllFileOrder(ctx context.Context, fs *Filestore, verify bool) (func() *ListRes, error) { q := dsq.Query{} - qr, err := fs.fm.ds.Query(q) + qr, err := fs.fm.ds.Query(ctx, q) if err != nil { return nil, err }