Skip to content

Commit

Permalink
refactor: remove goprocess
Browse files Browse the repository at this point in the history
  • Loading branch information
Jorropo committed Jun 15, 2023
1 parent 701721f commit cbe39cd
Show file tree
Hide file tree
Showing 13 changed files with 328 additions and 380 deletions.
214 changes: 105 additions & 109 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,10 @@ import (
"github.com/gogo/protobuf/proto"
ds "github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log"
"github.com/jbenet/goprocess"
goprocessctx "github.com/jbenet/goprocess/context"
"github.com/multiformats/go-base32"
ma "github.com/multiformats/go-multiaddr"
"go.opencensus.io/tag"
"go.uber.org/multierr"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -92,13 +91,12 @@ type IpfsDHT struct {

Validator record.Validator

ctx context.Context
proc goprocess.Process
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup

protoMessenger *pb.ProtocolMessenger
msgSender pb.MessageSender

plk sync.Mutex
msgSender pb.MessageSenderWithDisconnect

stripedPutLocks [256]sync.Mutex

Expand Down Expand Up @@ -187,7 +185,7 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
return nil, err
}

dht, err := makeDHT(ctx, h, cfg)
dht, err := makeDHT(h, cfg)
if err != nil {
return nil, fmt.Errorf("failed to create DHT, err=%s", err)
}
Expand Down Expand Up @@ -225,30 +223,27 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
}

// register for event bus and network notifications
sn, err := newSubscriberNotifiee(dht)
if err != nil {
if err := dht.startNetworkSubscriber(); err != nil {
return nil, err
}
dht.proc.Go(sn.subscribe)
// handle providers
if mgr, ok := dht.providerStore.(interface{ Process() goprocess.Process }); ok {
dht.proc.AddChild(mgr.Process())
}

// go-routine to make sure we ALWAYS have RT peer addresses in the peerstore
// since RT membership is decoupled from connectivity
go dht.persistRTPeersInPeerStore()

dht.proc.Go(dht.rtPeerLoop)
dht.rtPeerLoop()

// Fill routing table with currently connected peers that are DHT servers
dht.plk.Lock()
for _, p := range dht.host.Network().Peers() {
dht.peerFound(dht.ctx, p)
dht.peerFound(p)
}
dht.plk.Unlock()

dht.proc.Go(dht.populatePeers)
dht.rtRefreshManager.Start()

// listens to the fix low peers chan and tries to fix the Routing Table
if !dht.disableFixLowPeers {
dht.runFixLowPeersLoop()
}

return dht, nil
}
Expand All @@ -275,7 +270,7 @@ func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT
return dht
}

func makeDHT(ctx context.Context, h host.Host, cfg dhtcfg.Config) (*IpfsDHT, error) {
func makeDHT(h host.Host, cfg dhtcfg.Config) (*IpfsDHT, error) {
var protocols, serverProtocols []protocol.ID

v1proto := cfg.ProtocolPrefix + kad1
Expand Down Expand Up @@ -346,26 +341,19 @@ func makeDHT(ctx context.Context, h host.Host, cfg dhtcfg.Config) (*IpfsDHT, err
}

// rt refresh manager
rtRefresh, err := makeRtRefreshManager(dht, cfg, maxLastSuccessfulOutboundThreshold)
dht.rtRefreshManager, err = makeRtRefreshManager(dht, cfg, maxLastSuccessfulOutboundThreshold)
if err != nil {
return nil, fmt.Errorf("failed to construct RT Refresh Manager,err=%s", err)
}
dht.rtRefreshManager = rtRefresh

// create a DHT proc with the given context
dht.proc = goprocessctx.WithContextAndTeardown(ctx, func() error {
return rtRefresh.Close()
})

// create a tagged context derived from the original context
ctxTags := dht.newContextWithLocalTags(ctx)
// the DHT context should be done when the process is closed
dht.ctx = goprocessctx.WithProcessClosing(ctxTags, dht.proc)
dht.ctx, dht.cancel = context.WithCancel(dht.newContextWithLocalTags(context.Background()))

if cfg.ProviderStore != nil {
dht.providerStore = cfg.ProviderStore
} else {
dht.providerStore, err = providers.NewProviderManager(dht.ctx, h.ID(), dht.peerstore, cfg.Datastore)
dht.providerStore, err = providers.NewProviderManager(h.ID(), dht.peerstore, cfg.Datastore)
if err != nil {
return nil, fmt.Errorf("initializing default provider manager (%v)", err)
}
Expand Down Expand Up @@ -468,50 +456,40 @@ func (dht *IpfsDHT) Mode() ModeOpt {
return dht.auto
}

func (dht *IpfsDHT) populatePeers(_ goprocess.Process) {
if !dht.disableFixLowPeers {
dht.fixLowPeers(dht.ctx)
}
// runFixLowPeersLoop manages simultaneous requests to fixLowPeers
func (dht *IpfsDHT) runFixLowPeersLoop() {
dht.wg.Add(1)
go func() {
defer dht.wg.Done()

if err := dht.rtRefreshManager.Start(); err != nil {
logger.Error(err)
}
dht.fixLowPeers()

// listens to the fix low peers chan and tries to fix the Routing Table
if !dht.disableFixLowPeers {
dht.proc.Go(dht.fixLowPeersRoutine)
}

}
ticker := time.NewTicker(periodicBootstrapInterval)
defer ticker.Stop()

// fixLowPeersRouting manages simultaneous requests to fixLowPeers
func (dht *IpfsDHT) fixLowPeersRoutine(proc goprocess.Process) {
ticker := time.NewTicker(periodicBootstrapInterval)
defer ticker.Stop()
for {
select {
case <-dht.fixLowPeersChan:
case <-ticker.C:
case <-dht.ctx.Done():
return
}

for {
select {
case <-dht.fixLowPeersChan:
case <-ticker.C:
case <-proc.Closing():
return
dht.fixLowPeers()
}

dht.fixLowPeers(dht.Context())
}

}()
}

// fixLowPeers tries to get more peers into the routing table if we're below the threshold
func (dht *IpfsDHT) fixLowPeers(ctx context.Context) {
func (dht *IpfsDHT) fixLowPeers() {
if dht.routingTable.Size() > minRTRefreshThreshold {
return
}

// we try to add all peers we are connected to to the Routing Table
// in case they aren't already there.
for _, p := range dht.host.Network().Peers() {
dht.peerFound(ctx, p)
dht.peerFound(p)
}

// TODO Active Bootstrapping
Expand All @@ -528,7 +506,7 @@ func (dht *IpfsDHT) fixLowPeers(ctx context.Context) {
found := 0
for _, i := range rand.Perm(len(bootstrapPeers)) {
ai := bootstrapPeers[i]
err := dht.Host().Connect(ctx, ai)
err := dht.Host().Connect(dht.ctx, ai)
if err == nil {
found++
} else {
Expand Down Expand Up @@ -613,54 +591,59 @@ func (dht *IpfsDHT) putLocal(ctx context.Context, key string, rec *recpb.Record)
return dht.datastore.Put(ctx, mkDsKey(key), data)
}

func (dht *IpfsDHT) rtPeerLoop(proc goprocess.Process) {
bootstrapCount := 0
isBootsrapping := false
var timerCh <-chan time.Time
func (dht *IpfsDHT) rtPeerLoop() {
dht.wg.Add(1)
go func() {
defer dht.wg.Done()

var bootstrapCount uint
var isBootsrapping bool
var timerCh <-chan time.Time

for {
select {
case <-timerCh:
dht.routingTable.MarkAllPeersIrreplaceable()
case p := <-dht.addPeerToRTChan:
if dht.routingTable.Size() == 0 {
isBootsrapping = true
bootstrapCount = 0
timerCh = nil
}
// queryPeer set to true as we only try to add queried peers to the RT
newlyAdded, err := dht.routingTable.TryAddPeer(p, true, isBootsrapping)
if err != nil {
// peer not added.
continue
}
if !newlyAdded {
// the peer is already in our RT, but we just successfully queried it and so let's give it a
// bump on the query time so we don't ping it too soon for a liveliness check.
dht.routingTable.UpdateLastSuccessfulOutboundQueryAt(p, time.Now())
}
case <-dht.refreshFinishedCh:
bootstrapCount = bootstrapCount + 1
if bootstrapCount == 2 {
timerCh = time.NewTimer(dht.rtFreezeTimeout).C
}

for {
select {
case <-timerCh:
dht.routingTable.MarkAllPeersIrreplaceable()
case p := <-dht.addPeerToRTChan:
if dht.routingTable.Size() == 0 {
isBootsrapping = true
bootstrapCount = 0
timerCh = nil
}
// queryPeer set to true as we only try to add queried peers to the RT
newlyAdded, err := dht.routingTable.TryAddPeer(p, true, isBootsrapping)
if err != nil {
// peer not added.
continue
}
if !newlyAdded {
// the peer is already in our RT, but we just successfully queried it and so let's give it a
// bump on the query time so we don't ping it too soon for a liveliness check.
dht.routingTable.UpdateLastSuccessfulOutboundQueryAt(p, time.Now())
}
case <-dht.refreshFinishedCh:
bootstrapCount = bootstrapCount + 1
if bootstrapCount == 2 {
timerCh = time.NewTimer(dht.rtFreezeTimeout).C
}
old := isBootsrapping
isBootsrapping = false
if old {
dht.rtRefreshManager.RefreshNoWait()
}

old := isBootsrapping
isBootsrapping = false
if old {
dht.rtRefreshManager.RefreshNoWait()
case <-dht.ctx.Done():
return
}

case <-proc.Closing():
return
}
}
}()
}

// peerFound verifies whether the found peer advertises DHT protocols
// and probe it to make sure it answers DHT queries as expected. If
// it fails to answer, it isn't added to the routingTable.
func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID) {
func (dht *IpfsDHT) peerFound(p peer.ID) {
// if the peer is already in the routing table or the appropriate bucket is
// already full, don't try to add the new peer.ID
if !dht.routingTable.UsefulNewPeer(p) {
Expand All @@ -685,7 +668,7 @@ func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID) {
dht.lookupChecksLk.Unlock()

go func() {
livelinessCtx, cancel := context.WithTimeout(ctx, dht.lookupCheckTimeout)
livelinessCtx, cancel := context.WithTimeout(dht.ctx, dht.lookupCheckTimeout)
defer cancel()

// performing a FIND_NODE query
Expand All @@ -701,14 +684,14 @@ func (dht *IpfsDHT) peerFound(ctx context.Context, p peer.ID) {
}

// if the FIND_NODE succeeded, the peer is considered as valid
dht.validPeerFound(ctx, p)
dht.validPeerFound(p)
}()
}
}

// validPeerFound signals the routingTable that we've found a peer that
// supports the DHT protocol, and just answered correctly to a DHT FindPeers
func (dht *IpfsDHT) validPeerFound(ctx context.Context, p peer.ID) {
func (dht *IpfsDHT) validPeerFound(p peer.ID) {
if c := baseLogger.Check(zap.DebugLevel, "peer found"); c != nil {
c.Write(zap.String("peer", p.String()))
}
Expand Down Expand Up @@ -852,19 +835,32 @@ func (dht *IpfsDHT) Context() context.Context {
return dht.ctx
}

// Process returns the DHT's process.
func (dht *IpfsDHT) Process() goprocess.Process {
return dht.proc
}

// RoutingTable returns the DHT's routingTable.
func (dht *IpfsDHT) RoutingTable() *kb.RoutingTable {
return dht.routingTable
}

// Close calls Process Close.
func (dht *IpfsDHT) Close() error {
return dht.proc.Close()
dht.cancel()
dht.wg.Wait()

var wg sync.WaitGroup
closes := [...]func() error{
dht.rtRefreshManager.Close,
dht.providerStore.Close,
}
var errors [len(closes)]error
wg.Add(len(errors))
for i, c := range closes {
go func(i int, c func() error) {
defer wg.Done()
errors[i] = c()
}(i, c)
}
wg.Wait()

return multierr.Combine(errors[:]...)
}

func mkDsKey(s string) ds.Key {
Expand Down
4 changes: 2 additions & 2 deletions dht_bootstrap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,8 +191,8 @@ func TestBootstrappersReplacable(t *testing.T) {
require.NoError(t, d.host.Network().ClosePeer(d5.self))
connectNoSync(t, ctx, d, d1)
connectNoSync(t, ctx, d, d5)
d.peerFound(ctx, d5.self)
d.peerFound(ctx, d1.self)
d.peerFound(d5.self)
d.peerFound(d1.self)
time.Sleep(1 * time.Second)

require.Len(t, d.routingTable.ListPeers(), 2)
Expand Down
Loading

0 comments on commit cbe39cd

Please sign in to comment.