Skip to content

Commit

Permalink
multirpc/subpub: fix potential goroutine deadlocks
Browse files Browse the repository at this point in the history
When the connection to a peer is lost,
broadcastHandler errors in its SendMessage call,
and the entire goroutine stops.

No goroutine will continue receiving on the write channel,
and sooner than later, sends to the write channel will start blocking.

This starts causing deadlocks further up in IPFSsync.
SubPub.Subscribe and SubPub.PeerStreamWrite can now block forever,
and further up the chain in IPFSsync,
that can mean some goroutines hold onto mutexes forever.

On one hand, this chain of events can hang IPFSsync,
stopping it from doing anything useful until a restart.

On the other hand, it causes goroutine leaks.
When more calls to IPFSsync.Handle come through,
using new goroutines via the router,
those try to grab the deadlocked mutexes and hang forever.

First, fix the root cause: peerSub now has a "closed" channel,
which gets closed by peersManager when the peer is dropped.
Its goroutines, both for reading and writing messages,
keep running until that happens.

Second, make the symptom of the deadlock less severe:
prevent blocking on channel sends forever.
Any send on the "write" channel now stops on "closed".
And the send on BroadcastWriter, which could also block forever,
now has a fallback timeout of five minutes.

Updates vocdoni#243. Perhaps not a total fix, as there might be other leaks.
  • Loading branch information
mvdan committed Jul 29, 2021
1 parent 2bb4c7f commit e417c72
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 12 deletions.
2 changes: 2 additions & 0 deletions multirpc/subpub/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,15 @@ func (ps *SubPub) Subscribe(ctx context.Context) {
case <-ps.close:
return
case msg := <-ps.BroadcastWriter:

ps.PeersMu.Lock()
for _, peer := range ps.Peers {
if peer.write == nil {
continue
}
select {
case peer.write <- msg:
case <-peer.peerClosed:
default:
log.Infof("dropping broadcast message for peer %s", peer.id)
}
Expand Down
20 changes: 17 additions & 3 deletions multirpc/subpub/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,14 @@ import (
)

type peerSub struct {
id libpeer.ID
id libpeer.ID

write chan []byte

// peerClosed signals that we've lost the connection with the peer, or
// it has been removed by peersManager.
// When closed, its goroutines stop.
peerClosed chan bool
}

// PeerStreamWrite looks for an existing connection with peerID and calls the callback function with the writer channel as parameter
Expand All @@ -32,8 +38,14 @@ func (ps *SubPub) PeerStreamWrite(peerID string, msg []byte) error {
if peerIdx < 0 {
return fmt.Errorf("no connection with peer %s, cannot open stream", peerID)
}
ps.Peers[peerIdx].write <- msg
return nil

peer := ps.Peers[peerIdx]
select {
case peer.write <- msg:
return nil
case <-peer.peerClosed:
return nil
}
}

// FindTopic opens one or multiple new streams with the peers announcing the namespace.
Expand Down Expand Up @@ -103,6 +115,8 @@ func (ps *SubPub) peersManager() {
if len(ps.Host.Network().ConnsToPeer(peer.id)) > 0 {
continue
}
close(peer.peerClosed)

// Remove peer if no active connection
ps.Peers[i] = ps.Peers[len(ps.Peers)-1]
ps.Peers = ps.Peers[:len(ps.Peers)-1]
Expand Down
29 changes: 21 additions & 8 deletions multirpc/subpub/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@ import (
)

func (ps *SubPub) handleStream(stream network.Stream) {
peerClosed := make(chan bool)

// First, ensure that any messages read from the stream are sent to the
// SubPub.Reader channel.
go ps.readHandler(stream)
go ps.readHandler(peerClosed, stream)

// Second, ensure that, from now on, any broadcast message is sent to
// this stream as well.
Expand All @@ -27,38 +29,50 @@ func (ps *SubPub) handleStream(stream network.Stream) {
pid := stream.Conn().RemotePeer()
ps.PeersMu.Lock()
defer ps.PeersMu.Unlock()
ps.Peers = append(ps.Peers, peerSub{pid, write}) // TO-DO this should be a map
ps.Peers = append(ps.Peers, peerSub{ // TO-DO this should be a map
id: pid,
peerClosed: peerClosed,
write: write,
})
if fn := ps.onPeerAdd; fn != nil {
fn(pid)
}
log.Infof("connected to peer %s: %+v", pid, stream.Conn().RemoteMultiaddr())
go ps.broadcastHandler(write, bufio.NewWriter(stream))
go ps.broadcastHandler(peerClosed, write, bufio.NewWriter(stream))
}

func (ps *SubPub) broadcastHandler(write <-chan []byte, w *bufio.Writer) {
func (ps *SubPub) broadcastHandler(peerClosed <-chan bool, write <-chan []byte, w *bufio.Writer) {
for {
select {
case <-ps.close:
return
case <-peerClosed:
return
case msg := <-write:
if err := ps.SendMessage(w, msg); err != nil {
log.Debugf("error writing to buffer: (%s)", err)
return
continue
}
if err := w.Flush(); err != nil {
log.Debugf("error flushing write buffer: (%s)", err)
return
continue
}
}
}
}

func (ps *SubPub) readHandler(stream network.Stream) {
func (ps *SubPub) readHandler(peerClosed <-chan bool, stream network.Stream) {
r := bufio.NewReader(stream)

// Ensure that we always close the stream.
defer stream.Close()

for {
select {
case <-ps.close:
return
case <-peerClosed:
return
default:
// continues below
}
Expand All @@ -67,7 +81,6 @@ func (ps *SubPub) readHandler(stream network.Stream) {
bare.MaxUnmarshalBytes(bareMaxUnmarshalBytes)
if err := bare.UnmarshalReader(io.Reader(r), message); err != nil {
log.Debugf("error reading stream buffer %s: %v", stream.Conn().RemotePeer().Pretty(), err)
stream.Close()
return
} else if len(message.Data) == 0 {
log.Debugf("no data could be read from stream: %s (%+v)", stream.Conn().RemotePeer().Pretty(), stream.Stat())
Expand Down
13 changes: 12 additions & 1 deletion multirpc/transports/subpubtransport/subpub.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,18 @@ func (s *SubPubHandle) ConnectionType() string {

func (s *SubPubHandle) Send(msg transports.Message) error {
log.Debugf("sending %d bytes to broadcast channel", len(msg.Data))
s.SubPub.BroadcastWriter <- msg.Data

// Use a fallback timeout of five minutes, to prevent blocking forever
// or leaking goroutines.
// TODO(mvdan): turn this fallback timeout into a ctx parameter
ctx, cancel := context.WithTimeout(context.TODO(), 5*time.Minute)
defer cancel()

select {
case s.SubPub.BroadcastWriter <- msg.Data:
case <-ctx.Done():
return ctx.Err()
}
return nil
}

Expand Down

0 comments on commit e417c72

Please sign in to comment.