Skip to content

Commit

Permalink
Merge branch 'master' into fix/drop-gogo
Browse files Browse the repository at this point in the history
  • Loading branch information
guillaumemichel committed Jan 23, 2025
2 parents 68a6c56 + 1331ba7 commit 1e26fb8
Show file tree
Hide file tree
Showing 14 changed files with 172 additions and 58 deletions.
57 changes: 57 additions & 0 deletions amino/defaults.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Package amino provides protocol parameters and suggested default values for the [Amino DHT].
//
// [Amino DHT] is an implementation of the Kademlia distributed hash table (DHT) algorithm,
// originally designed for use in IPFS (InterPlanetary File System) network.
// This package defines key constants and protocol identifiers used in the Amino DHT implementation.
//
// [Amino DHT]: https://probelab.io/ipfs/amino/
package amino

import (
"time"

"github.com/libp2p/go-libp2p/core/protocol"
)

const (
// ProtocolPrefix is the base prefix for Amono DHT protocols.
ProtocolPrefix protocol.ID = "/ipfs"

// ProtocolID is the latest protocol identifier for the Amino DHT.
ProtocolID protocol.ID = "/ipfs/kad/1.0.0"

// DefaultBucketSize is the Amino DHT bucket size (k in the Kademlia paper).
// It represents the maximum number of peers stored in each
// k-bucket of the routing table.
DefaultBucketSize = 20

// DefaultConcurrency is the suggested number of concurrent requests (alpha
// in the Kademlia paper) for a given query path in Amino DHT. It
// determines how many parallel lookups are performed during network
// traversal.
DefaultConcurrency = 10

// DefaultResiliency is the suggested number of peers closest to a target
// that must have responded in order for a given query path to complete in
// Amino DHT. This helps ensure reliable results by requiring multiple
// confirmations.
DefaultResiliency = 3

// DefaultProvideValidity is the default time that a Provider Record should
// last on Amino DHT before it needs to be refreshed or removed. This value
// is also known as Provider Record Expiration Interval.
DefaultProvideValidity = 48 * time.Hour

// DefaultProviderAddrTTL is the TTL to keep the multi addresses of
// provider peers around. Those addresses are returned alongside provider.
// After it expires, the returned records will require an extra lookup, to
// find the multiaddress associated with the returned peer id.
DefaultProviderAddrTTL = 24 * time.Hour
)

var (
// Protocols is a slice containing all supported protocol IDs for Amino DHT.
// Currently, it only includes the main ProtocolID, but it's defined as a slice
// to allow for potential future protocol versions or variants.
Protocols = []protocol.ID{ProtocolID}
)
13 changes: 9 additions & 4 deletions crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type (
DefaultCrawler struct {
parallelism int
connectTimeout time.Duration
queryTimeout time.Duration
host host.Host
dhtRPC *pb.ProtocolMessenger
dialAddressExtendDur time.Duration
Expand All @@ -59,6 +60,7 @@ func NewDefaultCrawler(host host.Host, opts ...Option) (*DefaultCrawler, error)
return &DefaultCrawler{
parallelism: o.parallelism,
connectTimeout: o.connectTimeout,
queryTimeout: 3 * o.connectTimeout,
host: host,
dhtRPC: pm,
dialAddressExtendDur: o.dialAddressExtendDur,
Expand All @@ -74,7 +76,10 @@ type messageSender struct {

// SendRequest sends a peer a message and waits for its response
func (ms *messageSender) SendRequest(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
s, err := ms.h.NewStream(ctx, p, ms.protocols...)
tctx, cancel := context.WithTimeout(ctx, ms.timeout)
defer cancel()

s, err := ms.h.NewStream(tctx, p, ms.protocols...)
if err != nil {
return nil, err
}
Expand All @@ -85,8 +90,6 @@ func (ms *messageSender) SendRequest(ctx context.Context, p peer.ID, pmes *pb.Me
}

r := pbio.NewDelimitedReader(s, network.MessageSizeMax)
tctx, cancel := context.WithTimeout(ctx, ms.timeout)
defer cancel()
defer func() { _ = s.Close() }()

msg := new(pb.Message)
Expand Down Expand Up @@ -144,7 +147,9 @@ func (c *DefaultCrawler) Run(ctx context.Context, startingPeers []*peer.AddrInfo
go func() {
defer wg.Done()
for p := range jobs {
res := c.queryPeer(ctx, p)
qctx, cancel := context.WithTimeout(ctx, c.queryTimeout)
res := c.queryPeer(qctx, p)
cancel() // do not defer, cleanup after each job
results <- res
}
}()
Expand Down
3 changes: 2 additions & 1 deletion crawler/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package crawler
import (
"time"

"github.com/libp2p/go-libp2p-kad-dht/amino"
"github.com/libp2p/go-libp2p/core/protocol"
)

Expand All @@ -20,7 +21,7 @@ type options struct {
// defaults are the default crawler options. This option will be automatically
// prepended to any options you pass to the crawler constructor.
var defaults = func(o *options) error {
o.protocols = []protocol.ID{"/ipfs/kad/1.0.0"}
o.protocols = amino.Protocols
o.parallelism = 1000
o.connectTimeout = time.Second * 5
o.perMsgTimeout = time.Second * 5
Expand Down
11 changes: 4 additions & 7 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (

"github.com/libp2p/go-libp2p-kad-dht/internal"
dhtcfg "github.com/libp2p/go-libp2p-kad-dht/internal/config"
"github.com/libp2p/go-libp2p-kad-dht/internal/net"
"github.com/libp2p/go-libp2p-kad-dht/metrics"
"github.com/libp2p/go-libp2p-kad-dht/netsize"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
Expand Down Expand Up @@ -206,7 +205,7 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
dht.disableFixLowPeers = cfg.DisableFixLowPeers

dht.Validator = cfg.Validator
dht.msgSender = net.NewMessageSenderImpl(h, dht.protocols)
dht.msgSender = cfg.MsgSenderBuilder(h, dht.protocols)
dht.protoMessenger, err = pb.NewProtocolMessenger(dht.msgSender)
if err != nil {
return nil, err
Expand Down Expand Up @@ -737,12 +736,10 @@ func (dht *IpfsDHT) FindLocal(ctx context.Context, id peer.ID) peer.AddrInfo {
_, span := internal.StartSpan(ctx, "IpfsDHT.FindLocal", trace.WithAttributes(attribute.Stringer("PeerID", id)))
defer span.End()

switch dht.host.Network().Connectedness(id) {
case network.Connected, network.CanConnect:
if hasValidConnectedness(dht.host, id) {
return dht.peerstore.PeerInfo(id)
default:
return peer.AddrInfo{}
}
return peer.AddrInfo{}
}

// nearestPeersToQuery returns the routing tables closest peers.
Expand Down Expand Up @@ -928,7 +925,7 @@ func (dht *IpfsDHT) newContextWithLocalTags(ctx context.Context, extraTags ...ta

func (dht *IpfsDHT) maybeAddAddrs(p peer.ID, addrs []ma.Multiaddr, ttl time.Duration) {
// Don't add addresses for self or our connected peers. We have better ones.
if p == dht.self || dht.host.Network().Connectedness(p) == network.Connected {
if p == dht.self || hasValidConnectedness(dht.host, p) {
return
}
dht.peerstore.AddAddrs(p, dht.filterAddrs(addrs), ttl)
Expand Down
5 changes: 5 additions & 0 deletions dht_filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,3 +238,8 @@ func inAddrRange(ip net.IP, ipnets []*net.IPNet) bool {

return false
}

func hasValidConnectedness(host host.Host, id peer.ID) bool {
connectedness := host.Network().Connectedness(id)
return connectedness == network.Connected || connectedness == network.Limited
}
24 changes: 18 additions & 6 deletions dht_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ import (
"testing"
"time"

"github.com/libp2p/go-libp2p-kad-dht/amino"
dhtcfg "github.com/libp2p/go-libp2p-kad-dht/internal/config"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
"github.com/libp2p/go-libp2p-kad-dht/providers"
"github.com/libp2p/go-libp2p-kbucket/peerdiversity"
record "github.com/libp2p/go-libp2p-record"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"

Expand All @@ -32,7 +35,7 @@ const (
)

// DefaultPrefix is the application specific prefix attached to all DHT protocols by default.
const DefaultPrefix protocol.ID = "/ipfs"
const DefaultPrefix protocol.ID = amino.ProtocolPrefix

type Option = dhtcfg.Option

Expand Down Expand Up @@ -134,7 +137,7 @@ func NamespacedValidator(ns string, v record.Validator) Option {
// ProtocolPrefix sets an application specific prefix to be attached to all DHT protocols. For example,
// /myapp/kad/1.0.0 instead of /ipfs/kad/1.0.0. Prefix should be of the form /myapp.
//
// Defaults to dht.DefaultPrefix
// Defaults to amino.ProtocolPrefix
func ProtocolPrefix(prefix protocol.ID) Option {
return func(c *dhtcfg.Config) error {
c.ProtocolPrefix = prefix
Expand Down Expand Up @@ -165,7 +168,7 @@ func V1ProtocolOverride(proto protocol.ID) Option {

// BucketSize configures the bucket size (k in the Kademlia paper) of the routing table.
//
// The default value is 20.
// The default value is amino.DefaultBucketSize
func BucketSize(bucketSize int) Option {
return func(c *dhtcfg.Config) error {
c.BucketSize = bucketSize
Expand All @@ -175,7 +178,7 @@ func BucketSize(bucketSize int) Option {

// Concurrency configures the number of concurrent requests (alpha in the Kademlia paper) for a given query path.
//
// The default value is 10.
// The default value is amino.DefaultConcurrency
func Concurrency(alpha int) Option {
return func(c *dhtcfg.Config) error {
c.Concurrency = alpha
Expand All @@ -186,15 +189,15 @@ func Concurrency(alpha int) Option {
// Resiliency configures the number of peers closest to a target that must have responded in order for a given query
// path to complete.
//
// The default value is 3.
// The default value is amino.DefaultResiliency
func Resiliency(beta int) Option {
return func(c *dhtcfg.Config) error {
c.Resiliency = beta
return nil
}
}

// LookupInterval configures maximal number of go routines that can be used to
// LookupCheckConcurrency configures maximal number of go routines that can be used to
// perform a lookup check operation, before adding a new node to the routing table.
func LookupCheckConcurrency(n int) Option {
return func(c *dhtcfg.Config) error {
Expand Down Expand Up @@ -356,3 +359,12 @@ func AddressFilter(f func([]ma.Multiaddr) []ma.Multiaddr) Option {
return nil
}
}

// WithCustomMessageSender configures the pb.MessageSender of the IpfsDHT to use the
// custom implementation of the pb.MessageSender
func WithCustomMessageSender(messageSenderBuilder func(h host.Host, protos []protocol.ID) pb.MessageSenderWithDisconnect) Option {
return func(c *dhtcfg.Config) error {
c.MsgSenderBuilder = messageSenderBuilder
return nil
}
}
60 changes: 48 additions & 12 deletions fullrt/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@ import (
"github.com/multiformats/go-multihash"

"github.com/libp2p/go-libp2p-routing-helpers/tracing"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/core/routing"
"github.com/libp2p/go-libp2p/p2p/host/eventbus"
swarm "github.com/libp2p/go-libp2p/p2p/net/swarm"

u "github.com/ipfs/boxo/util"
Expand Down Expand Up @@ -98,6 +100,8 @@ type FullRT struct {
bulkSendParallelism int

self peer.ID

peerConnectednessSubscriber event.Subscription
}

// NewFullRT creates a DHT client that tracks the full network. It takes a protocol prefix for the given network,
Expand Down Expand Up @@ -151,6 +155,11 @@ func NewFullRT(h host.Host, protocolPrefix protocol.ID, options ...Option) (*Ful
}
}

sub, err := h.EventBus().Subscribe(new(event.EvtPeerConnectednessChanged), eventbus.Name("fullrt-dht"))
if err != nil {
return nil, fmt.Errorf("peer connectedness subscription failed: %w", err)
}

ctx, cancel := context.WithCancel(context.Background())

self := h.ID()
Expand Down Expand Up @@ -195,14 +204,14 @@ func NewFullRT(h host.Host, protocolPrefix protocol.ID, options ...Option) (*Ful

crawlerInterval: fullrtcfg.crawlInterval,

bulkSendParallelism: fullrtcfg.bulkSendParallelism,

self: self,
bulkSendParallelism: fullrtcfg.bulkSendParallelism,
self: self,
peerConnectednessSubscriber: sub,
}

rt.wg.Add(1)
rt.wg.Add(2)
go rt.runCrawler(ctx)

go rt.runSubscriber()
return rt, nil
}

Expand All @@ -211,6 +220,31 @@ type crawlVal struct {
key kadkey.Key
}

func (dht *FullRT) runSubscriber() {
defer dht.wg.Done()
ms, ok := dht.messageSender.(dht_pb.MessageSenderWithDisconnect)
defer dht.peerConnectednessSubscriber.Close()
if !ok {
return
}
for {
select {
case e := <-dht.peerConnectednessSubscriber.Out():
pc, ok := e.(event.EvtPeerConnectednessChanged)
if !ok {
logger.Errorf("invalid event message type: %T", e)
continue
}

if pc.Connectedness != network.Connected {
ms.OnDisconnect(dht.ctx, pc.Peer)
}
case <-dht.ctx.Done():
return
}
}
}

func (dht *FullRT) TriggerRefresh(ctx context.Context) error {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -1459,8 +1493,7 @@ func (dht *FullRT) FindPeer(ctx context.Context, id peer.ID) (pi peer.AddrInfo,

// Return peer information if we tried to dial the peer during the query or we are (or recently were) connected
// to the peer.
connectedness := dht.h.Network().Connectedness(id)
if connectedness == network.Connected || connectedness == network.CanConnect {
if hasValidConnectedness(dht.h, id) {
return dht.h.Peerstore().PeerInfo(id), nil
}

Expand Down Expand Up @@ -1538,18 +1571,21 @@ func (dht *FullRT) getRecordFromDatastore(ctx context.Context, dskey ds.Key) (*r

// FindLocal looks for a peer with a given ID connected to this dht and returns the peer and the table it was found in.
func (dht *FullRT) FindLocal(id peer.ID) peer.AddrInfo {
switch dht.h.Network().Connectedness(id) {
case network.Connected, network.CanConnect:
if hasValidConnectedness(dht.h, id) {
return dht.h.Peerstore().PeerInfo(id)
default:
return peer.AddrInfo{}
}
return peer.AddrInfo{}
}

func (dht *FullRT) maybeAddAddrs(p peer.ID, addrs []multiaddr.Multiaddr, ttl time.Duration) {
// Don't add addresses for self or our connected peers. We have better ones.
if p == dht.h.ID() || dht.h.Network().Connectedness(p) == network.Connected {
if p == dht.h.ID() || hasValidConnectedness(dht.h, p) {
return
}
dht.h.Peerstore().AddAddrs(p, addrs, ttl)
}

func hasValidConnectedness(host host.Host, id peer.ID) bool {
connectedness := host.Network().Connectedness(id)
return connectedness == network.Connected || connectedness == network.Limited
}
Loading

0 comments on commit 1e26fb8

Please sign in to comment.