From ffb18adb50cbb542116bdfbe51361bf422f3218b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michael=20Mur=C3=A9?= Date: Tue, 21 Jul 2020 10:23:18 +0200 Subject: [PATCH] wire a context in most of the ipfs data pipeline, connect it --- dht.go | 8 +++---- dht_test.go | 4 ++-- handlers.go | 16 ++++++------- providers/providers_manager.go | 36 +++++++++++++++-------------- providers/providers_manager_test.go | 9 ++++---- records_test.go | 8 +++---- routing.go | 8 +++---- 7 files changed, 46 insertions(+), 43 deletions(-) diff --git a/dht.go b/dht.go index acd451980..ad1d3c65e 100644 --- a/dht.go +++ b/dht.go @@ -525,10 +525,10 @@ func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID, key string) ( } // getLocal attempts to retrieve the value from the datastore -func (dht *IpfsDHT) getLocal(key string) (*recpb.Record, error) { +func (dht *IpfsDHT) getLocal(ctx context.Context, key string) (*recpb.Record, error) { logger.Debugw("finding value in datastore", "key", loggableKeyString(key)) - rec, err := dht.getRecordFromDatastore(mkDsKey(key)) + rec, err := dht.getRecordFromDatastore(ctx, mkDsKey(key)) if err != nil { logger.Warnw("get local failed", "key", key, "error", err) return nil, err @@ -544,14 +544,14 @@ func (dht *IpfsDHT) getLocal(key string) (*recpb.Record, error) { } // putLocal stores the key value pair in the datastore -func (dht *IpfsDHT) putLocal(key string, rec *recpb.Record) error { +func (dht *IpfsDHT) putLocal(ctx context.Context, key string, rec *recpb.Record) error { data, err := proto.Marshal(rec) if err != nil { logger.Warnw("failed to put marshal record for local put", "error", err, "key", key) return err } - return dht.datastore.Put(mkDsKey(key), data) + return dht.datastore.Put(ctx, mkDsKey(key), data) } // peerFound signals the routingTable that we've found a peer that diff --git a/dht_test.go b/dht_test.go index 3d7f5838b..e71a38508 100644 --- a/dht_test.go +++ b/dht_test.go @@ -1891,7 +1891,7 @@ func TestProtocolUpgrade(t *testing.T) { // Add record into local DHT only rec := record.MakePutRecord("/v/crow", []byte("caw")) rec.TimeReceived = u.FormatRFC3339(time.Now()) - err = dhtC.putLocal(string(rec.Key), rec) + err = dhtC.putLocal(ctx, string(rec.Key), rec) if err != nil { t.Fatal(err) } @@ -1908,7 +1908,7 @@ func TestProtocolUpgrade(t *testing.T) { // Add record into local DHT only rec = record.MakePutRecord("/v/bee", []byte("buzz")) rec.TimeReceived = u.FormatRFC3339(time.Now()) - err = dhtB.putLocal(string(rec.Key), rec) + err = dhtB.putLocal(ctx, string(rec.Key), rec) if err != nil { t.Fatal(err) } diff --git a/handlers.go b/handlers.go index 4c991fc13..9094b84aa 100644 --- a/handlers.go +++ b/handlers.go @@ -61,7 +61,7 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess // setup response resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel()) - rec, err := dht.checkLocalDatastore(k) + rec, err := dht.checkLocalDatastore(ctx, k) if err != nil { return nil, err } @@ -89,10 +89,10 @@ func (dht *IpfsDHT) handleGetValue(ctx context.Context, p peer.ID, pmes *pb.Mess return resp, nil } -func (dht *IpfsDHT) checkLocalDatastore(k []byte) (*recpb.Record, error) { +func (dht *IpfsDHT) checkLocalDatastore(ctx context.Context, k []byte) (*recpb.Record, error) { logger.Debugf("%s handleGetValue looking into ds", dht.self) dskey := convertToDsKey(k) - buf, err := dht.datastore.Get(dskey) + buf, err := dht.datastore.Get(ctx, dskey) logger.Debugf("%s handleGetValue looking into ds GOT %v", dht.self, buf) if err == ds.ErrNotFound { @@ -131,7 +131,7 @@ func (dht *IpfsDHT) checkLocalDatastore(k []byte) (*recpb.Record, error) { // may be computationally expensive if recordIsBad { - err := dht.datastore.Delete(dskey) + err := dht.datastore.Delete(ctx, dskey) if err != nil { logger.Error("Failed to delete bad record from datastore: ", err) } @@ -187,7 +187,7 @@ func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Mess // Make sure the new record is "better" than the record we have locally. // This prevents a record with for example a lower sequence number from // overwriting a record with a higher sequence number. - existing, err := dht.getRecordFromDatastore(dskey) + existing, err := dht.getRecordFromDatastore(ctx, dskey) if err != nil { return nil, err } @@ -213,14 +213,14 @@ func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Mess return nil, err } - err = dht.datastore.Put(dskey, data) + err = dht.datastore.Put(ctx, dskey, data) return pmes, err } // returns nil, nil when either nothing is found or the value found doesn't properly validate. // returns nil, some_error when there's a *datastore* error (i.e., something goes very wrong) -func (dht *IpfsDHT) getRecordFromDatastore(dskey ds.Key) (*recpb.Record, error) { - buf, err := dht.datastore.Get(dskey) +func (dht *IpfsDHT) getRecordFromDatastore(ctx context.Context, dskey ds.Key) (*recpb.Record, error) { + buf, err := dht.datastore.Get(ctx, dskey) if err == ds.ErrNotFound { return nil, nil } diff --git a/providers/providers_manager.go b/providers/providers_manager.go index 20927d2e8..e52b72cf7 100644 --- a/providers/providers_manager.go +++ b/providers/providers_manager.go @@ -33,6 +33,7 @@ var log = logging.Logger("providers") // ProviderManager adds and pulls providers out of the datastore, // caching them in between type ProviderManager struct { + ctx context.Context // all non channel fields are meant to be accessed only within // the run method cache lru.LRUCache @@ -88,6 +89,7 @@ type getProv struct { // NewProviderManager constructor func NewProviderManager(ctx context.Context, local peer.ID, dstore ds.Batching, opts ...Option) (*ProviderManager, error) { pm := new(ProviderManager) + pm.ctx = ctx pm.getprovs = make(chan *getProv) pm.newprovs = make(chan *addProv) pm.dstore = autobatch.NewAutoBatching(dstore, batchBufferSize) @@ -125,7 +127,7 @@ func (pm *ProviderManager) run(proc goprocess.Process) { // don't really care if this fails. _ = gcQuery.Close() } - if err := pm.dstore.Flush(); err != nil { + if err := pm.dstore.Flush(context.Background()); err != nil { log.Error("failed to flush datastore: ", err) } }() @@ -133,7 +135,7 @@ func (pm *ProviderManager) run(proc goprocess.Process) { for { select { case np := <-pm.newprovs: - err := pm.addProv(np.key, np.val) + err := pm.addProv(pm.ctx, np.key, np.val) if err != nil { log.Error("error adding new providers: ", err) continue @@ -144,7 +146,7 @@ func (pm *ProviderManager) run(proc goprocess.Process) { gcSkip[mkProvKeyFor(np.key, np.val)] = struct{}{} } case gp := <-pm.getprovs: - provs, err := pm.getProvidersForKey(gp.key) + provs, err := pm.getProvidersForKey(pm.ctx, gp.key) if err != nil && err != ds.ErrNotFound { log.Error("error reading providers: ", err) } @@ -183,7 +185,7 @@ func (pm *ProviderManager) run(proc goprocess.Process) { fallthrough case gcTime.Sub(t) > ProvideValidity: // or expired - err = pm.dstore.Delete(ds.RawKey(res.Key)) + err = pm.dstore.Delete(pm.ctx, ds.RawKey(res.Key)) if err != nil && err != ds.ErrNotFound { log.Error("failed to remove provider record from disk: ", err) } @@ -197,7 +199,7 @@ func (pm *ProviderManager) run(proc goprocess.Process) { pm.cache.Purge() // Now, kick off a GC of the datastore. - q, err := pm.dstore.Query(dsq.Query{ + q, err := pm.dstore.Query(pm.ctx, dsq.Query{ Prefix: ProvidersKeyPrefix, }) if err != nil { @@ -226,23 +228,23 @@ func (pm *ProviderManager) AddProvider(ctx context.Context, k []byte, val peer.I } // addProv updates the cache if needed -func (pm *ProviderManager) addProv(k []byte, p peer.ID) error { +func (pm *ProviderManager) addProv(ctx context.Context, k []byte, p peer.ID) error { now := time.Now() if provs, ok := pm.cache.Get(string(k)); ok { provs.(*providerSet).setVal(p, now) } // else not cached, just write through - return writeProviderEntry(pm.dstore, k, p, now) + return writeProviderEntry(ctx, pm.dstore, k, p, now) } // writeProviderEntry writes the provider into the datastore -func writeProviderEntry(dstore ds.Datastore, k []byte, p peer.ID, t time.Time) error { +func writeProviderEntry(ctx context.Context, dstore ds.Datastore, k []byte, p peer.ID, t time.Time) error { dsk := mkProvKeyFor(k, p) buf := make([]byte, 16) n := binary.PutVarint(buf, t.UnixNano()) - return dstore.Put(ds.NewKey(dsk), buf[:n]) + return dstore.Put(ctx, ds.NewKey(dsk), buf[:n]) } func mkProvKeyFor(k []byte, p peer.ID) string { @@ -273,8 +275,8 @@ func (pm *ProviderManager) GetProviders(ctx context.Context, k []byte) []peer.ID } } -func (pm *ProviderManager) getProvidersForKey(k []byte) ([]peer.ID, error) { - pset, err := pm.getProviderSetForKey(k) +func (pm *ProviderManager) getProvidersForKey(ctx context.Context, k []byte) ([]peer.ID, error) { + pset, err := pm.getProviderSetForKey(ctx, k) if err != nil { return nil, err } @@ -282,13 +284,13 @@ func (pm *ProviderManager) getProvidersForKey(k []byte) ([]peer.ID, error) { } // returns the ProviderSet if it already exists on cache, otherwise loads it from datasatore -func (pm *ProviderManager) getProviderSetForKey(k []byte) (*providerSet, error) { +func (pm *ProviderManager) getProviderSetForKey(ctx context.Context, k []byte) (*providerSet, error) { cached, ok := pm.cache.Get(string(k)) if ok { return cached.(*providerSet), nil } - pset, err := loadProviderSet(pm.dstore, k) + pset, err := loadProviderSet(ctx, pm.dstore, k) if err != nil { return nil, err } @@ -301,8 +303,8 @@ func (pm *ProviderManager) getProviderSetForKey(k []byte) (*providerSet, error) } // loads the ProviderSet out of the datastore -func loadProviderSet(dstore ds.Datastore, k []byte) (*providerSet, error) { - res, err := dstore.Query(dsq.Query{Prefix: mkProvKey(k)}) +func loadProviderSet(ctx context.Context, dstore ds.Datastore, k []byte) (*providerSet, error) { + res, err := dstore.Query(ctx, dsq.Query{Prefix: mkProvKey(k)}) if err != nil { return nil, err } @@ -329,7 +331,7 @@ func loadProviderSet(dstore ds.Datastore, k []byte) (*providerSet, error) { fallthrough case now.Sub(t) > ProvideValidity: // or just expired - err = dstore.Delete(ds.RawKey(e.Key)) + err = dstore.Delete(ctx, ds.RawKey(e.Key)) if err != nil && err != ds.ErrNotFound { log.Error("failed to remove provider record from disk: ", err) } @@ -341,7 +343,7 @@ func loadProviderSet(dstore ds.Datastore, k []byte) (*providerSet, error) { decstr, err := base32.RawStdEncoding.DecodeString(e.Key[lix+1:]) if err != nil { log.Error("base32 decoding error: ", err) - err = dstore.Delete(ds.RawKey(e.Key)) + err = dstore.Delete(ctx, ds.RawKey(e.Key)) if err != nil && err != ds.ErrNotFound { log.Error("failed to remove provider record from disk: ", err) } diff --git a/providers/providers_manager_test.go b/providers/providers_manager_test.go index e7e385b22..a83d7a4b6 100644 --- a/providers/providers_manager_test.go +++ b/providers/providers_manager_test.go @@ -93,6 +93,7 @@ func TestProvidersDatastore(t *testing.T) { } func TestProvidersSerialization(t *testing.T) { + ctx := context.Background() dstore := dssync.MutexWrap(ds.NewMapDatastore()) k := u.Hash(([]byte("my key!"))) @@ -101,17 +102,17 @@ func TestProvidersSerialization(t *testing.T) { pt1 := time.Now() pt2 := pt1.Add(time.Hour) - err := writeProviderEntry(dstore, k, p1, pt1) + err := writeProviderEntry(ctx, dstore, k, p1, pt1) if err != nil { t.Fatal(err) } - err = writeProviderEntry(dstore, k, p2, pt2) + err = writeProviderEntry(ctx, dstore, k, p2, pt2) if err != nil { t.Fatal(err) } - pset, err := loadProviderSet(dstore, k) + pset, err := loadProviderSet(ctx, dstore, k) if err != nil { t.Fatal(err) } @@ -206,7 +207,7 @@ func TestProvidesExpire(t *testing.T) { t.Fatal("providers map not cleaned up") } - res, err := ds.Query(dsq.Query{Prefix: ProvidersKeyPrefix}) + res, err := ds.Query(ctx, dsq.Query{Prefix: ProvidersKeyPrefix}) if err != nil { t.Fatal(err) } diff --git a/records_test.go b/records_test.go index 092f5b3c0..00013c69b 100644 --- a/records_test.go +++ b/records_test.go @@ -204,7 +204,7 @@ func TestPubkeyBadKeyFromDHT(t *testing.T) { // Store incorrect public key on node B rec := record.MakePutRecord(pkkey, wrongbytes) rec.TimeReceived = u.FormatRFC3339(time.Now()) - err = dhtB.putLocal(pkkey, rec) + err = dhtB.putLocal(ctx, pkkey, rec) if err != nil { t.Fatal(err) } @@ -243,7 +243,7 @@ func TestPubkeyBadKeyFromDHTGoodKeyDirect(t *testing.T) { // Store incorrect public key on node B rec := record.MakePutRecord(pkkey, wrongbytes) rec.TimeReceived = u.FormatRFC3339(time.Now()) - err = dhtB.putLocal(pkkey, rec) + err = dhtB.putLocal(ctx, pkkey, rec) if err != nil { t.Fatal(err) } @@ -358,7 +358,7 @@ func TestValuesDisabled(t *testing.T) { if err != routing.ErrNotSupported { t.Fatal("get should have failed on node B") } - rec, _ := dhtB.getLocal(pkkey) + rec, _ := dhtB.getLocal(ctx, pkkey) if rec != nil { t.Fatal("node B should not have found the value locally") } @@ -374,7 +374,7 @@ func TestValuesDisabled(t *testing.T) { t.Fatal("node A should not have found the value") } } - rec, _ := dhtA.getLocal(pkkey) + rec, _ := dhtA.getLocal(ctx, pkkey) if rec != nil { t.Fatal("node A should not have found the value locally") } diff --git a/routing.go b/routing.go index b57e0ae84..11d974249 100644 --- a/routing.go +++ b/routing.go @@ -39,7 +39,7 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts return err } - old, err := dht.getLocal(key) + old, err := dht.getLocal(ctx, key) if err != nil { // Means something is wrong with the datastore. return err @@ -59,7 +59,7 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts rec := record.MakePutRecord(key, value) rec.TimeReceived = u.FormatRFC3339(time.Now()) - err = dht.putLocal(key, rec) + err = dht.putLocal(ctx, key, rec) if err != nil { return err } @@ -273,7 +273,7 @@ func (dht *IpfsDHT) updatePeerValues(ctx context.Context, key string, val []byte go func(p peer.ID) { //TODO: Is this possible? if p == dht.self { - err := dht.putLocal(key, fixupRec) + err := dht.putLocal(ctx, key, fixupRec) if err != nil { logger.Error("Error correcting local dht entry:", err) } @@ -295,7 +295,7 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan st logger.Debugw("finding value", "key", loggableKeyString(key)) - if rec, err := dht.getLocal(key); rec != nil && err == nil { + if rec, err := dht.getLocal(ctx, key); rec != nil && err == nil { select { case valCh <- RecvdVal{ Val: rec.GetValue(),