Skip to content

Commit

Permalink
feat: add request callback config option
Browse files Browse the repository at this point in the history
  • Loading branch information
dennis-tra committed Jan 10, 2025
1 parent 1331ba7 commit 4e3746b
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 0 deletions.
3 changes: 3 additions & 0 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,8 @@ type IpfsDHT struct {
// addrFilter is used to filter the addresses we put into the peer store.
// Mostly used to filter out localhost and local addresses.
addrFilter func([]ma.Multiaddr) []ma.Multiaddr

onRequestHook func(ctx context.Context, s network.Stream, req pb.Message)
}

// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
Expand Down Expand Up @@ -306,6 +308,7 @@ func makeDHT(h host.Host, cfg dhtcfg.Config) (*IpfsDHT, error) {
routingTablePeerFilter: cfg.RoutingTable.PeerFilter,
rtPeerDiversityFilter: cfg.RoutingTable.DiversityFilter,
addrFilter: cfg.AddressFilter,
onRequestHook: cfg.OnRequestHook,

fixLowPeersChan: make(chan struct{}, 1),

Expand Down
2 changes: 2 additions & 0 deletions dht_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
metrics.ReceivedBytes.M(int64(msgLen)),
)

dht.onRequestHook(ctx, s, req)

handler := dht.handlerForMsgType(req.GetType())
if handler == nil {
stats.Record(ctx, metrics.ReceivedMessageErrors.M(1))
Expand Down
12 changes: 12 additions & 0 deletions dht_options.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dht

import (
"context"
"fmt"
"testing"
"time"
Expand All @@ -12,6 +13,7 @@ import (
"github.com/libp2p/go-libp2p-kbucket/peerdiversity"
record "github.com/libp2p/go-libp2p-record"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"

Expand Down Expand Up @@ -368,3 +370,13 @@ func WithCustomMessageSender(messageSenderBuilder func(h host.Host, protos []pro
return nil
}
}

// OnRequestHook registers a callback function that will be invoked
// for every incoming DHT protocol message. Note: Ensure that the
// callback executes efficiently, as it can block the entire message handler.
func OnRequestHook(f func(ctx context.Context, s network.Stream, req pb.Message)) Option {
return func(c *dhtcfg.Config) error {
c.OnRequestHook = f
return nil
}

Check warning on line 381 in dht_options.go

View check run for this annotation

Codecov / codecov/patch

dht_options.go#L377-L381

Added lines #L377 - L381 were not covered by tests
}
5 changes: 5 additions & 0 deletions internal/config/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package config

import (
"context"
"fmt"
"time"

Expand All @@ -14,6 +15,7 @@ import (
"github.com/libp2p/go-libp2p-kbucket/peerdiversity"
record "github.com/libp2p/go-libp2p-record"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
ma "github.com/multiformats/go-multiaddr"
Expand Down Expand Up @@ -63,6 +65,7 @@ type Config struct {

BootstrapPeers func() []peer.AddrInfo
AddressFilter func([]ma.Multiaddr) []ma.Multiaddr
OnRequestHook func(ctx context.Context, s network.Stream, req pb.Message)

// test specific Config options
DisableFixLowPeers bool
Expand Down Expand Up @@ -134,6 +137,8 @@ var Defaults = func(o *Config) error {
// MAGIC: It makes sense to set it to a multiple of OptProvReturnRatio * BucketSize. We chose a multiple of 4.
o.OptimisticProvideJobsPoolSize = 60

o.OnRequestHook = func(ctx context.Context, s network.Stream, req pb.Message) {}

return nil
}

Expand Down

0 comments on commit 4e3746b

Please sign in to comment.