Skip to content

Commit

Permalink
Merge branch 'master' into fix/drop-gogo
Browse files Browse the repository at this point in the history
  • Loading branch information
guillaumemichel authored Jan 28, 2025
2 parents 1f64f32 + 811669a commit d2f68ee
Show file tree
Hide file tree
Showing 12 changed files with 86 additions and 50 deletions.
9 changes: 5 additions & 4 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ import (
"google.golang.org/protobuf/proto"
)

const tracer = tracing.Tracer("go-libp2p-kad-dht")
const dhtName = "IpfsDHT"
const (
tracer = tracing.Tracer("go-libp2p-kad-dht")
dhtName = "IpfsDHT"
)

var (
logger = logging.Logger("dht")
Expand Down Expand Up @@ -164,7 +166,7 @@ type IpfsDHT struct {
// Mostly used to filter out localhost and local addresses.
addrFilter func([]ma.Multiaddr) []ma.Multiaddr

onRequestHook func(ctx context.Context, s network.Stream, req pb.Message)
onRequestHook func(ctx context.Context, s network.Stream, req *pb.Message)
}

// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
Expand Down Expand Up @@ -420,7 +422,6 @@ func makeRoutingTable(dht *IpfsDHT, cfg dhtcfg.Config, maxLastSuccessfulOutbound
df, err := peerdiversity.NewFilter(dht.rtPeerDiversityFilter, "rt/diversity", func(p peer.ID) int {
return kb.CommonPrefixLen(dht.selfKey, kb.ConvertPeerID(p))
})

if err != nil {
return nil, fmt.Errorf("failed to construct peer diversity filter: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion dht_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
)

if dht.onRequestHook != nil {
dht.onRequestHook(ctx, s, req)
dht.onRequestHook(ctx, s, &req)
}

handler := dht.handlerForMsgType(req.GetType())
Expand Down
2 changes: 1 addition & 1 deletion dht_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,7 @@ func WithCustomMessageSender(messageSenderBuilder func(h host.Host, protos []pro
// incoming DHT protocol message.
// Note: Ensure that the callback executes efficiently, as it will block the
// entire message handler.
func OnRequestHook(f func(ctx context.Context, s network.Stream, req pb.Message)) Option {
func OnRequestHook(f func(ctx context.Context, s network.Stream, req *pb.Message)) Option {
return func(c *dhtcfg.Config) error {
c.OnRequestHook = f
return nil
Expand Down
14 changes: 7 additions & 7 deletions dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"testing"
"time"

"github.com/libp2p/go-libp2p-kad-dht/internal"
"github.com/libp2p/go-libp2p-kad-dht/internal/net"
"github.com/libp2p/go-libp2p-kad-dht/providers"
"github.com/libp2p/go-libp2p/core/crypto"
Expand All @@ -36,7 +37,6 @@ import (
test "github.com/libp2p/go-libp2p-kad-dht/internal/testing"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"

u "github.com/ipfs/boxo/util"
"github.com/ipfs/go-cid"
detectrace "github.com/ipfs/go-detect-race"
kb "github.com/libp2p/go-libp2p-kbucket"
Expand All @@ -54,10 +54,10 @@ func init() {
var newCid cid.Cid
switch i % 3 {
case 0:
mhv := u.Hash([]byte(v))
mhv := internal.Hash([]byte(v))
newCid = cid.NewCidV0(mhv)
case 1:
mhv := u.Hash([]byte(v))
mhv := internal.Hash([]byte(v))
newCid = cid.NewCidV1(cid.DagCBOR, mhv)
case 2:
rawMh := make([]byte, 12)
Expand Down Expand Up @@ -858,7 +858,7 @@ func TestRefresh(t *testing.T) {
time.Sleep(time.Microsecond * 50)
}

if u.Debug {
if testing.Verbose() {
// the routing tables should be full now. let's inspect them.
printRoutingTables(dhts)
}
Expand Down Expand Up @@ -1003,7 +1003,7 @@ func TestPeriodicRefresh(t *testing.T) {
}
}

if u.Debug {
if testing.Verbose() {
printRoutingTables(dhts)
}

Expand All @@ -1022,7 +1022,7 @@ func TestPeriodicRefresh(t *testing.T) {
// until the routing tables look better, or some long timeout for the failure case.
waitForWellFormedTables(t, dhts, 7, 10, 20*time.Second)

if u.Debug {
if testing.Verbose() {
printRoutingTables(dhts)
}
}
Expand Down Expand Up @@ -1057,7 +1057,7 @@ func TestProvidesMany(t *testing.T) {
defer cancel()
bootstrap(t, ctxT, dhts)

if u.Debug {
if testing.Verbose() {
// the routing tables should be full now. let's inspect them.
t.Logf("checking routing table of %d", nDHTs)
for _, dht := range dhts {
Expand Down
6 changes: 3 additions & 3 deletions dual/dual_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (
"testing"
"time"

u "github.com/ipfs/boxo/util"
"github.com/ipfs/go-cid"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p-kad-dht/internal"
test "github.com/libp2p/go-libp2p-kad-dht/internal/testing"
record "github.com/libp2p/go-libp2p-record"
"github.com/libp2p/go-libp2p/core/host"
Expand All @@ -22,8 +22,8 @@ import (
var wancid, lancid cid.Cid

func init() {
wancid = cid.NewCidV1(cid.DagCBOR, u.Hash([]byte("wan cid -- value")))
lancid = cid.NewCidV1(cid.DagCBOR, u.Hash([]byte("lan cid -- value")))
wancid = cid.NewCidV1(cid.DagCBOR, internal.Hash([]byte("wan cid -- value")))
lancid = cid.NewCidV1(cid.DagCBOR, internal.Hash([]byte("lan cid -- value")))
}

type blankValidator struct{}
Expand Down
15 changes: 9 additions & 6 deletions fullrt/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
swarm "github.com/libp2p/go-libp2p/p2p/net/swarm"

u "github.com/ipfs/boxo/util"
"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
Expand Down Expand Up @@ -53,8 +52,10 @@ import (

var logger = logging.Logger("fullrtdht")

const tracer = tracing.Tracer("go-libp2p-kad-dht/fullrt")
const dhtName = "FullRT"
const (
tracer = tracing.Tracer("go-libp2p-kad-dht/fullrt")
dhtName = "FullRT"
)

const rtRefreshLimitsMsg = `Accelerated DHT client was unable to fully refresh its routing table due to Resource Manager limits, which may degrade content routing. Consider increasing resource limits. See debug logs for the "dht-crawler" subsystem for details.`

Expand Down Expand Up @@ -530,7 +531,7 @@ func (dht *FullRT) PutValue(ctx context.Context, key string, value []byte, opts
}

rec := record.MakePutRecord(key, value)
rec.TimeReceived = u.FormatRFC3339(time.Now())
rec.TimeReceived = internal.FormatRFC3339(time.Now())
err = dht.putLocal(ctx, key, rec)
if err != nil {
return err
Expand Down Expand Up @@ -656,7 +657,8 @@ func (dht *FullRT) SearchValue(ctx context.Context, key string, opts ...routing.
}

func (dht *FullRT) searchValueQuorum(ctx context.Context, key string, valCh <-chan RecvdVal, stopCh chan struct{},
out chan<- []byte, nvals int) ([]byte, map[peer.ID]struct{}, bool) {
out chan<- []byte, nvals int,
) ([]byte, map[peer.ID]struct{}, bool) {
numResponses := 0
return dht.processValues(ctx, key, valCh,
func(ctx context.Context, v RecvdVal, better bool) bool {
Expand All @@ -678,7 +680,8 @@ func (dht *FullRT) searchValueQuorum(ctx context.Context, key string, valCh <-ch
}

func (dht *FullRT) processValues(ctx context.Context, key string, vals <-chan RecvdVal,
newVal func(ctx context.Context, v RecvdVal, better bool) bool) (best []byte, peersWithBest map[peer.ID]struct{}, aborted bool) {
newVal func(ctx context.Context, v RecvdVal, better bool) bool,
) (best []byte, peersWithBest map[peer.ID]struct{}, aborted bool) {
loop:
for {
if aborted {
Expand Down
5 changes: 2 additions & 3 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
pstore "github.com/libp2p/go-libp2p/p2p/host/peerstore"

u "github.com/ipfs/boxo/util"
ds "github.com/ipfs/go-datastore"
"github.com/libp2p/go-libp2p-kad-dht/internal"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
Expand Down Expand Up @@ -115,7 +114,7 @@ func (dht *IpfsDHT) checkLocalDatastore(ctx context.Context, k []byte) (*recpb.R
}

var recordIsBad bool
recvtime, err := u.ParseRFC3339(rec.GetTimeReceived())
recvtime, err := internal.ParseRFC3339(rec.GetTimeReceived())
if err != nil {
logger.Info("either no receive time set on record, or it was invalid: ", err)
recordIsBad = true
Expand Down Expand Up @@ -206,7 +205,7 @@ func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Mess
}

// record the time we receive every record
rec.TimeReceived = u.FormatRFC3339(time.Now())
rec.TimeReceived = internal.FormatRFC3339(time.Now())

data, err := proto.Marshal(rec)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type Config struct {

BootstrapPeers func() []peer.AddrInfo
AddressFilter func([]ma.Multiaddr) []ma.Multiaddr
OnRequestHook func(ctx context.Context, s network.Stream, req pb.Message)
OnRequestHook func(ctx context.Context, s network.Stream, req *pb.Message)

// test specific Config options
DisableFixLowPeers bool
Expand Down
35 changes: 35 additions & 0 deletions internal/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package internal

import (
"time"

mh "github.com/multiformats/go-multihash"
)

// Hash is the global IPFS hash function. uses multihash SHA2_256, 256 bits
func Hash(data []byte) mh.Multihash {
h, err := mh.Sum(data, mh.SHA2_256, -1)
if err != nil {
// this error can be safely ignored (panic) because multihash only fails
// from the selection of hash function. If the fn + length are valid, it
// won't error.
panic("multihash failed to hash using SHA2_256.")
}
return h
}

// ParseRFC3339 parses an RFC3339Nano-formatted time stamp and
// returns the UTC time.
func ParseRFC3339(s string) (time.Time, error) {
t, err := time.Parse(time.RFC3339Nano, s)
if err != nil {
return time.Time{}, err
}
return t.UTC(), nil
}

// FormatRFC3339 returns the string representation of the
// UTC value of the given time in RFC3339Nano format.
func FormatRFC3339(t time.Time) string {
return t.UTC().Format(time.RFC3339Nano)
}
24 changes: 13 additions & 11 deletions providers/providers_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ import (
"testing"
"time"

"github.com/libp2p/go-libp2p-kad-dht/internal"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/host/peerstore/pstoremem"

mh "github.com/multiformats/go-multihash"

u "github.com/ipfs/boxo/util"
ds "github.com/ipfs/go-datastore"
dsq "github.com/ipfs/go-datastore/query"
dssync "github.com/ipfs/go-datastore/sync"
Expand All @@ -35,7 +35,7 @@ func TestProviderManager(t *testing.T) {
if err != nil {
t.Fatal(err)
}
a := u.Hash([]byte("test"))
a := internal.Hash([]byte("test"))
p.AddProvider(ctx, a, peer.AddrInfo{ID: peer.ID("testingprovider")})

// Not cached
Expand Down Expand Up @@ -86,7 +86,7 @@ func TestProvidersDatastore(t *testing.T) {
friend := peer.ID("friend")
var mhs []mh.Multihash
for i := 0; i < 100; i++ {
h := u.Hash([]byte(fmt.Sprint(i)))
h := internal.Hash([]byte(fmt.Sprint(i)))
mhs = append(mhs, h)
p.AddProvider(ctx, h, peer.AddrInfo{ID: friend})
}
Expand All @@ -105,7 +105,7 @@ func TestProvidersDatastore(t *testing.T) {
func TestProvidersSerialization(t *testing.T) {
dstore := dssync.MutexWrap(ds.NewMapDatastore())

k := u.Hash(([]byte("my key!")))
k := internal.Hash(([]byte("my key!")))
p1 := peer.ID("peer one")
p2 := peer.ID("peer two")
pt1 := time.Now()
Expand Down Expand Up @@ -174,7 +174,7 @@ func TestProvidesExpire(t *testing.T) {
peers := []peer.ID{"a", "b"}
var mhs []mh.Multihash
for i := 0; i < 10; i++ {
h := u.Hash([]byte(fmt.Sprint(i)))
h := internal.Hash([]byte(fmt.Sprint(i)))
mhs = append(mhs, h)
}

Expand Down Expand Up @@ -235,8 +235,10 @@ func TestProvidesExpire(t *testing.T) {
}
}

var _ = io.NopCloser
var _ = os.DevNull
var (
_ = io.NopCloser
_ = os.DevNull
)

// TestLargeProvidersSet can be used for profiling.
// The datastore can be switched to levelDB by uncommenting the section below and the import above
Expand Down Expand Up @@ -286,7 +288,7 @@ func TestLargeProvidersSet(t *testing.T) {

var mhs []mh.Multihash
for i := 0; i < 1000; i++ {
h := u.Hash([]byte(fmt.Sprint(i)))
h := internal.Hash([]byte(fmt.Sprint(i)))
mhs = append(mhs, h)
for _, pid := range peers {
p.AddProvider(ctx, h, peer.AddrInfo{ID: pid})
Expand All @@ -311,8 +313,8 @@ func TestUponCacheMissProvidersAreReadFromDatastore(t *testing.T) {
defer cancel()

p1, p2 := peer.ID("a"), peer.ID("b")
h1 := u.Hash([]byte("1"))
h2 := u.Hash([]byte("2"))
h1 := internal.Hash([]byte("1"))
h2 := internal.Hash([]byte("2"))
ps, err := pstoremem.NewPeerstore()
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -341,7 +343,7 @@ func TestWriteUpdatesCache(t *testing.T) {
defer cancel()

p1, p2 := peer.ID("a"), peer.ID("b")
h1 := u.Hash([]byte("1"))
h1 := internal.Hash([]byte("1"))
ps, err := pstoremem.NewPeerstore()
if err != nil {
t.Fatal(err)
Expand Down
10 changes: 4 additions & 6 deletions records_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"testing"
"time"

"github.com/libp2p/go-libp2p-kad-dht/internal"
"github.com/libp2p/go-libp2p/core/test"

u "github.com/ipfs/boxo/util"
"github.com/ipfs/go-test/random"
record "github.com/libp2p/go-libp2p-record"
tnet "github.com/libp2p/go-libp2p-testing/net"
Expand Down Expand Up @@ -205,7 +205,7 @@ func TestPubkeyBadKeyFromDHT(t *testing.T) {

// Store incorrect public key on node B
rec := record.MakePutRecord(pkkey, wrongbytes)
rec.TimeReceived = u.FormatRFC3339(time.Now())
rec.TimeReceived = internal.FormatRFC3339(time.Now())
err = dhtB.putLocal(ctx, pkkey, rec)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -244,7 +244,7 @@ func TestPubkeyBadKeyFromDHTGoodKeyDirect(t *testing.T) {

// Store incorrect public key on node B
rec := record.MakePutRecord(pkkey, wrongbytes)
rec.TimeReceived = u.FormatRFC3339(time.Now())
rec.TimeReceived = internal.FormatRFC3339(time.Now())
err = dhtB.putLocal(ctx, pkkey, rec)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -317,9 +317,7 @@ func TestValuesDisabled(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var (
optsA, optsB []Option
)
var optsA, optsB []Option
optsA = append(optsA, ProtocolPrefix("/valuesMaybeDisabled"))
optsB = append(optsB, ProtocolPrefix("/valuesMaybeDisabled"))

Expand Down
Loading

0 comments on commit d2f68ee

Please sign in to comment.