Skip to content

Commit

Permalink
Merge pull request #54 from libp2p/feat/283
Browse files Browse the repository at this point in the history
Kbucket refactoring for Content Routing
  • Loading branch information
aarshkshah1992 authored Feb 28, 2020
2 parents 99fef9c + c91e408 commit 2c6281f
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 11 deletions.
16 changes: 8 additions & 8 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import (
"time"
)

// option is the Routing Table functional option type.
type option func(*options) error
// Option is the Routing Table functional option type.
type Option func(*options) error

// options is a structure containing all the functional options that can be used when constructing a Routing Table.
type options struct {
Expand All @@ -18,8 +18,8 @@ type options struct {
}
}

// Apply applies the given options to this option.
func (o *options) Apply(opts ...option) error {
// apply applies the given options to this option.
func (o *options) apply(opts ...Option) error {
for i, opt := range opts {
if err := opt(o); err != nil {
return fmt.Errorf("routing table option %d failed: %s", i, err)
Expand All @@ -30,31 +30,31 @@ func (o *options) Apply(opts ...option) error {

// PeerValidationFnc configures the Peer Validation function used for RT cleanup.
// Not configuring this disables Routing Table cleanup.
func PeerValidationFnc(f PeerValidationFunc) option {
func PeerValidationFnc(f PeerValidationFunc) Option {
return func(o *options) error {
o.tableCleanup.peerValidationFnc = f
return nil
}
}

// PeersForValidationFnc configures the function that will be used to select the peers that need to be validated during cleanup.
func PeersForValidationFnc(f PeerSelectionFunc) option {
func PeersForValidationFnc(f PeerSelectionFunc) Option {
return func(o *options) error {
o.tableCleanup.peersForValidationFnc = f
return nil
}
}

// TableCleanupInterval configures the interval between two runs of the Routing Table cleanup routine.
func TableCleanupInterval(i time.Duration) option {
func TableCleanupInterval(i time.Duration) Option {
return func(o *options) error {
o.tableCleanup.interval = i
return nil
}
}

// PeerValidationTimeout sets the timeout for a single peer validation during cleanup.
func PeerValidationTimeout(timeout time.Duration) option {
func PeerValidationTimeout(timeout time.Duration) Option {
return func(o *options) error {
o.tableCleanup.peerValidationTimeout = timeout
return nil
Expand Down
34 changes: 31 additions & 3 deletions table.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,10 @@ type RoutingTable struct {
// NewRoutingTable creates a new routing table with a given bucketsize, local ID, and latency tolerance.
// Passing a nil PeerValidationFunc disables periodic table cleanup.
func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m peerstore.Metrics,
opts ...option) (*RoutingTable, error) {
opts ...Option) (*RoutingTable, error) {

var cfg options
if err := cfg.Apply(append([]option{Defaults}, opts...)...); err != nil {
if err := cfg.apply(append([]Option{Defaults}, opts...)...); err != nil {
return nil, err
}

Expand Down Expand Up @@ -196,6 +196,34 @@ func (rt *RoutingTable) peersToValidate() []PeerInfo {
return rt.peersForValidationFnc(peers)
}

// NPeersForCPL returns the number of peers we have for a given Cpl
func (rt *RoutingTable) NPeersForCpl(cpl uint) int {
rt.tabLock.RLock()
defer rt.tabLock.RUnlock()

// it's in the last bucket
if int(cpl) >= len(rt.buckets)-1 {
count := 0
b := rt.buckets[len(rt.buckets)-1]
for _, p := range b.peerIds() {
if CommonPrefixLen(rt.local, ConvertPeerID(p)) == int(cpl) {
count++
}
}
return count
} else {
return rt.buckets[cpl].len()
}
}

// IsBucketFull returns true if the Logical bucket for a given Cpl is full
func (rt *RoutingTable) IsBucketFull(cpl uint) bool {
rt.tabLock.RLock()
defer rt.tabLock.RUnlock()

return rt.NPeersForCpl(cpl) == rt.bucketsize
}

// GetTrackedCplsForRefresh returns the Cpl's we are tracking for refresh.
// Caller is free to modify the returned slice as it is a defensive copy.
func (rt *RoutingTable) GetTrackedCplsForRefresh() []CplRefresh {
Expand Down Expand Up @@ -459,7 +487,7 @@ func (rt *RoutingTable) Print() {
fmt.Printf("\tbucket: %d\n", i)

for e := b.list.Front(); e != nil; e = e.Next() {
p := e.Value.(peer.ID)
p := e.Value.(*PeerInfo).Id
fmt.Printf("\t\t- %s %s\n", p.Pretty(), rt.metrics.LatencyEWMA(p).String())
}
}
Expand Down
45 changes: 45 additions & 0 deletions table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,15 @@ var PeerAlwaysValidFnc = func(ctx context.Context, p peer.ID) bool {
return true
}

func TestPrint(t *testing.T) {
t.Parallel()
local := test.RandPeerIDFatal(t)
m := pstore.NewMetrics()
rt, err := NewRoutingTable(1, ConvertPeerID(local), time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc))
require.NoError(t, err)
rt.Print()
}

// Test basic features of the bucket struct
func TestBucket(t *testing.T) {
t.Parallel()
Expand Down Expand Up @@ -90,6 +99,42 @@ func TestGenRandPeerID(t *testing.T) {
}
}

func TestNPeersForCpl(t *testing.T) {
t.Parallel()
local := test.RandPeerIDFatal(t)
m := pstore.NewMetrics()
rt, err := NewRoutingTable(2, ConvertPeerID(local), time.Hour, m, PeerValidationFnc(PeerAlwaysValidFnc))
require.NoError(t, err)

require.Equal(t, 0, rt.NPeersForCpl(0))
require.Equal(t, 0, rt.NPeersForCpl(1))

// one peer with cpl 1
p, _ := rt.GenRandPeerID(1)
rt.HandlePeerAlive(p)
require.Equal(t, 0, rt.NPeersForCpl(0))
require.Equal(t, 1, rt.NPeersForCpl(1))
require.Equal(t, 0, rt.NPeersForCpl(2))

// one peer with cpl 0
p, _ = rt.GenRandPeerID(0)
rt.HandlePeerAlive(p)
require.Equal(t, 1, rt.NPeersForCpl(0))
require.Equal(t, 1, rt.NPeersForCpl(1))
require.Equal(t, 0, rt.NPeersForCpl(2))

// split the bucket with a peer with cpl 1
p, _ = rt.GenRandPeerID(1)
rt.HandlePeerAlive(p)
require.Equal(t, 1, rt.NPeersForCpl(0))
require.Equal(t, 2, rt.NPeersForCpl(1))
require.Equal(t, 0, rt.NPeersForCpl(2))

p, _ = rt.GenRandPeerID(0)
rt.HandlePeerAlive(p)
require.Equal(t, 2, rt.NPeersForCpl(0))
}

func TestRefreshAndGetTrackedCpls(t *testing.T) {
t.Parallel()
local := test.RandPeerIDFatal(t)
Expand Down

0 comments on commit 2c6281f

Please sign in to comment.