Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/No-Trust/peerster
Browse files Browse the repository at this point in the history
  • Loading branch information
raja-s committed Dec 14, 2017
2 parents 61d7a8a + 6c0795f commit 1c93f87
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 28 deletions.
3 changes: 2 additions & 1 deletion gossiper/antientropy.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ func antiEntropy(g *Gossiper, etimer uint, wg sync.WaitGroup) {
A := g.peerSet.RandomPeer()
if A != nil {
// send status packet
status := &g.vectorClock

status := g.vectorClock.Copy()
g.standardOutputQueue <- status.AntiEntropyString(&A.Address)

g.gossipOutputQueue <- &Packet{
Expand Down
4 changes: 2 additions & 2 deletions gossiper/gossiper.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
package main

import (
"fmt"
"github.com/No-Trust/peerster/common"
"github.com/dedis/protobuf"
"net"
"fmt"
"sync"
)

Expand Down Expand Up @@ -44,7 +44,7 @@ func NewGossiper(parameters Parameters, peerAddrs []net.UDPAddr) *Gossiper {
peerSet: peerSet,
vectorClock: *NewStatusPacket(peerSet.ToPeerArray(), parameters.Identifier),
messages: Messages{make(map[string]map[uint32]RumorMessage), &sync.Mutex{}},
gossiperWaiters: make(map[string]chan *PeerStatus),
gossiperWaiters: make(map[string]chan *PeerStatus, channelSize),
waitersMutex: &sync.Mutex{},
fileWaiters: make(map[string]chan *DataReply),
fileWaitersMutex: &sync.Mutex{},
Expand Down
6 changes: 3 additions & 3 deletions gossiper/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ func main() {
// from array of string ip:port return the array of UDPAddr
func parsePeers(args []string) []net.UDPAddr {
// parses slice of strings of the form ip:port into slice of UDPAddr
peers := make([]net.UDPAddr, len(args))
for i, v := range args {
peers := make([]net.UDPAddr, 0)
for _, v := range args {
ipS, portS, err := net.SplitHostPort(v)
common.CheckRead(err)
port, err := strconv.Atoi(portS)
Expand All @@ -117,7 +117,7 @@ func parsePeers(args []string) []net.UDPAddr {
common.CheckRead(errors.New("ip address must be correct"))
}
var newPeer net.UDPAddr = net.UDPAddr{ip, port, ""}
peers[i] = newPeer
peers = append(peers, newPeer)
}
return peers
}
29 changes: 20 additions & 9 deletions gossiper/packets.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"github.com/No-Trust/peerster/common"
"net"
"sync"
"fmt"
)

/*
Expand Down Expand Up @@ -115,8 +116,8 @@ func (sp *StatusPacket) GetIndex(i int) PeerStatus {
func (sp *StatusPacket) Copy() *StatusPacket {
// return a copy of the statuspacket

var Wantcopy []PeerStatus
sp.mutex.Lock()
Wantcopy := make([]PeerStatus, len(sp.Want))
copy(Wantcopy, sp.Want)
sp.mutex.Unlock()
spc := StatusPacket{
Expand Down Expand Up @@ -177,18 +178,28 @@ func (sp *StatusPacket) Add(identifier string) {
func NewStatusPacket(peers []common.Peer, identifier string) *StatusPacket {
var Want []PeerStatus

// Adding peers clocks
for _, peer := range peers {
Want = append(Want, PeerStatus{
Identifier: peer.Identifier,
NextID: 1,
})
}
// // Adding peers clocks
// for _, peer := range peers {
// Want = append(Want, PeerStatus{
// Identifier: peer.Identifier,
// NextID: 1,
// })
// }
// Adding its own clock
Want = append(Want, PeerStatus{
Identifier: identifier,
NextID: 1,
})
sp := StatusPacket{Want, &sync.Mutex{}}
return &sp
}

return &StatusPacket{Want, &sync.Mutex{}}
func (sp *StatusPacket) String() string {
sp.mutex.Lock()
str := ""
for _, status := range sp.Want {
str += fmt.Sprintf("id: %s, next msg: %d \n", status.Identifier, status.NextID)
}
sp.mutex.Unlock()
return str
}
10 changes: 5 additions & 5 deletions gossiper/process_rumor.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,12 @@ func (g *Gossiper) processRumor(rumor *RumorMessage, remoteaddr *net.UDPAddr) {

if rumor.isRoute() {
// forward to all peers
//go rumor.broadcastTo(g, otherPeers)
go rumor.broadcastTo(g, otherPeers)

destPeer := (&otherPeers).RandomPeer()
if destPeer != nil {
go g.rumormonger(rumor, destPeer)
}
// destPeer := (&otherPeers).RandomPeer()
// if destPeer != nil {
// go g.rumormonger(rumor, destPeer)
// }

} else {
// start rumormongering
Expand Down
11 changes: 7 additions & 4 deletions gossiper/process_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,17 @@ func (g *Gossiper) processStatus(status *StatusPacket, remoteaddr *net.UDPAddr)

g.waitersMutex.Lock()
c, present := g.gossiperWaiters[ackID]
g.waitersMutex.Unlock()
if present {
if present && c != nil {
// this is an ack
// there is a goroutine waiting for this status message
// so send the status to the goroutine via channel
c <- &peerstatus
select {
case c <- &peerstatus:
default:
}
}
g.waitersMutex.Unlock()

}

// in any case, compare state and proceed accordingly
Expand Down Expand Up @@ -67,7 +71,6 @@ func (g *Gossiper) compareStateAndProcess(rumor *RumorMessage, status *StatusPac
// send status
g.gossipOutputQueue <- &Packet{
GossipPacket: GossipPacket{
Rumor: nil,
Status: cpy,
},
Destination: destPeer.Address,
Expand Down
9 changes: 5 additions & 4 deletions gossiper/rumormongering.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
package main

import (
"fmt"
"github.com/No-Trust/peerster/common"
"time"
)
Expand Down Expand Up @@ -54,9 +53,10 @@ func (g *Gossiper) rumormonger(rumor *RumorMessage, destPeer *common.Peer) {
// timer stops first
// timeout
timer.Stop()
close(statusChannel)
g.waitersMutex.Lock()
close(statusChannel)
g.gossiperWaiters[ackID] = nil
delete(g.gossiperWaiters, ackID)
g.waitersMutex.Unlock()
nextDestPeer := g.peerSet.RandomPeer()
if destPeer != nil {
Expand All @@ -65,10 +65,11 @@ func (g *Gossiper) rumormonger(rumor *RumorMessage, destPeer *common.Peer) {
return
case <-statusChannel:
// received the ack before timeout
fmt.Println("@-@-@-@-@-@-@")
timer.Stop()
close(statusChannel)
g.waitersMutex.Lock()
close(statusChannel)
g.gossiperWaiters[ackID] = nil
delete(g.gossiperWaiters, ackID)
g.gossiperWaiters[ackID] = nil
g.waitersMutex.Unlock()
// rumormonger again with probability 1/2
Expand Down

0 comments on commit 1c93f87

Please sign in to comment.