Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: peer exchange filter by shard #1026

Merged
merged 2 commits into from
Feb 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions waku/v2/protocol/peer_exchange/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/libp2p/go-msgio/pbio"
"github.com/waku-org/go-waku/waku/v2/peermanager"
"github.com/waku-org/go-waku/waku/v2/peerstore"
"github.com/waku-org/go-waku/waku/v2/protocol"
wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"
"github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange/pb"
"github.com/waku-org/go-waku/waku/v2/service"
Expand Down Expand Up @@ -43,10 +44,16 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts
}

if params.pm != nil && params.selectedPeer == "" {
pubsubTopics := []string{}
if params.clusterID != 0 {
pubsubTopics = append(pubsubTopics,
protocol.NewStaticShardingPubsubTopic(uint16(params.clusterID), uint16(params.shard)).String())
}
selectedPeers, err := wakuPX.pm.SelectPeers(
peermanager.PeerSelectionCriteria{
SelectionType: params.peerSelectionType,
Proto: PeerExchangeID_v20alpha1,
PubsubTopics: pubsubTopics,
SpecificPeers: params.preferredPeers,
Ctx: ctx,
},
Expand Down Expand Up @@ -93,10 +100,10 @@ func (wakuPX *WakuPeerExchange) Request(ctx context.Context, numPeers int, opts

stream.Close()

return wakuPX.handleResponse(ctx, responseRPC.Response)
return wakuPX.handleResponse(ctx, responseRPC.Response, params)
}

func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb.PeerExchangeResponse) error {
func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb.PeerExchangeResponse, params *PeerExchangeParameters) error {
var discoveredPeers []struct {
addrInfo peer.AddrInfo
enr *enode.Node
Expand All @@ -112,6 +119,16 @@ func (wakuPX *WakuPeerExchange) handleResponse(ctx context.Context, response *pb
return err
}

if params.clusterID != 0 {
wakuPX.log.Debug("clusterID is non zero, filtering by shard")
rs, err := wenr.RelaySharding(enrRecord)
if err != nil || rs == nil || !rs.Contains(uint16(params.clusterID), uint16(params.shard)) {
wakuPX.log.Debug("peer doesn't matches filter", zap.Int("shard", params.shard))
continue
}
wakuPX.log.Debug("peer matches filter", zap.Int("shard", params.shard))
}

enodeRecord, err := enode.New(enode.ValidSchemes, enrRecord)
if err != nil {
wakuPX.log.Error("creating enode record", zap.Error(err))
Expand Down
11 changes: 11 additions & 0 deletions waku/v2/protocol/peer_exchange/waku_peer_exchange_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ type PeerExchangeParameters struct {
preferredPeers peer.IDSlice
pm *peermanager.PeerManager
log *zap.Logger
shard int
clusterID int
}

type PeerExchangeOption func(*PeerExchangeParameters) error
Expand Down Expand Up @@ -77,3 +79,12 @@ func DefaultOptions(host host.Host) []PeerExchangeOption {
WithAutomaticPeerSelection(),
}
}

// Use this if you want to filter peers by specific shards
func FilterByShard(clusterID int, shard int) PeerExchangeOption {
return func(params *PeerExchangeParameters) error {
params.shard = shard
params.clusterID = clusterID
return nil
}
}
97 changes: 97 additions & 0 deletions waku/v2/protocol/peer_exchange/waku_peer_exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,13 @@ import (
"time"

"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/waku-org/go-waku/tests"
"github.com/waku-org/go-waku/waku/v2/discv5"
"github.com/waku-org/go-waku/waku/v2/protocol"
wenr "github.com/waku-org/go-waku/waku/v2/protocol/enr"
"github.com/waku-org/go-waku/waku/v2/utils"
)
Expand Down Expand Up @@ -88,3 +90,98 @@ func TestRetrieveProvidePeerExchangePeers(t *testing.T) {
px1.Stop()
px3.Stop()
}

func TestRetrieveFilteredPeerExchangePeers(t *testing.T) {
// H1
host1, _, prvKey1 := tests.CreateHost(t)
udpPort1, err := tests.FindFreePort(t, "127.0.0.1", 3)
require.NoError(t, err)
ip1, _ := tests.ExtractIP(host1.Addrs()[0])
l1, err := tests.NewLocalnode(prvKey1, ip1, udpPort1, wenr.NewWakuEnrBitfield(false, false, false, true), nil, utils.Logger())
require.NoError(t, err)

discv5PeerConn1 := discv5.NewTestPeerDiscoverer()
d1, err := discv5.NewDiscoveryV5(prvKey1, l1, discv5PeerConn1, prometheus.DefaultRegisterer, utils.Logger(), discv5.WithUDPPort(uint(udpPort1)))
require.NoError(t, err)
d1.SetHost(host1)

// H2
host2, _, prvKey2 := tests.CreateHost(t)
ip2, _ := tests.ExtractIP(host2.Addrs()[0])
udpPort2, err := tests.FindFreePort(t, "127.0.0.1", 3)
require.NoError(t, err)
l2, err := tests.NewLocalnode(prvKey2, ip2, udpPort2, wenr.NewWakuEnrBitfield(false, false, false, true), nil, utils.Logger())
require.NoError(t, err)
rs, err := protocol.NewRelayShards(1, 2)
require.NoError(t, err)
l2.Set(enr.WithEntry(wenr.ShardingBitVectorEnrField, rs.BitVector()))
discv5PeerConn2 := discv5.NewTestPeerDiscoverer()
d2, err := discv5.NewDiscoveryV5(prvKey2, l2, discv5PeerConn2, prometheus.DefaultRegisterer, utils.Logger(), discv5.WithUDPPort(uint(udpPort2)), discv5.WithBootnodes([]*enode.Node{d1.Node()}))
require.NoError(t, err)
d2.SetHost(host2)

// H3
host3, _, _ := tests.CreateHost(t)

defer d1.Stop()
defer d2.Stop()
defer host1.Close()
defer host2.Close()
defer host3.Close()

err = d1.Start(context.Background())
require.NoError(t, err)

err = d2.Start(context.Background())
require.NoError(t, err)

time.Sleep(3 * time.Second) // Wait some time for peers to be discovered

// mount peer exchange
pxPeerConn1 := discv5.NewTestPeerDiscoverer()
px1, err := NewWakuPeerExchange(d1, pxPeerConn1, nil, prometheus.DefaultRegisterer, utils.Logger())
require.NoError(t, err)
px1.SetHost(host1)

pxPeerConn3 := discv5.NewTestPeerDiscoverer()
px3, err := NewWakuPeerExchange(nil, pxPeerConn3, nil, prometheus.DefaultRegisterer, utils.Logger())
require.NoError(t, err)
px3.SetHost(host3)

err = px1.Start(context.Background())
require.NoError(t, err)

err = px3.Start(context.Background())
require.NoError(t, err)

host3.Peerstore().AddAddrs(host1.ID(), host1.Addrs(), peerstore.PermanentAddrTTL)
err = host3.Peerstore().AddProtocols(host1.ID(), PeerExchangeID_v20alpha1)
require.NoError(t, err)

//Try with shard that is not registered.
err = px3.Request(context.Background(), 1, WithPeer(host1.ID()), FilterByShard(1, 3))
require.NoError(t, err)

time.Sleep(3 * time.Second) // Give the algorithm some time to work its magic

require.False(t, pxPeerConn3.HasPeer(host2.ID()))

//Try without shard filtering

err = px3.Request(context.Background(), 1, WithPeer(host1.ID()))
require.NoError(t, err)

time.Sleep(3 * time.Second) // Give the algorithm some time to work its magic

require.True(t, pxPeerConn3.HasPeer(host2.ID()))

err = px3.Request(context.Background(), 1, WithPeer(host1.ID()), FilterByShard(1, 2))
require.NoError(t, err)

time.Sleep(3 * time.Second) // Give the algorithm some time to work its magic

require.True(t, pxPeerConn3.HasPeer(host2.ID()))

px1.Stop()
px3.Stop()
}
8 changes: 8 additions & 0 deletions waku/v2/protocol/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,3 +268,11 @@ func GeneratePubsubToContentTopicMap(pubsubTopic string, contentTopics []string)
}
return pubSubTopicMap, nil
}

func ShardsToTopics(clusterId int, shards []int) []string {
pubsubTopics := make([]string, len(shards))
for i, shard := range shards {
pubsubTopics[i] = NewStaticShardingPubsubTopic(uint16(clusterId), uint16(shard)).String()
}
return pubsubTopics
}
Loading