diff --git a/broadcast/broadcast.go b/broadcast/broadcast.go index b5e43f1..5ddfda9 100644 --- a/broadcast/broadcast.go +++ b/broadcast/broadcast.go @@ -8,4 +8,5 @@ type Configuration struct { type Broadcaster interface { Init() (chan []byte, error) Broadcast([]byte) error + Close() error } diff --git a/broadcast/causal.go b/broadcast/causal.go index bbb600b..387b678 100644 --- a/broadcast/causal.go +++ b/broadcast/causal.go @@ -78,6 +78,10 @@ func (c *causal_broadcast_container) Broadcast(content []byte) error { return nil } +func (c *causal_broadcast_container) Close() error { + return c.broadcaster.Close() +} + func (c *causal_broadcast_container) handle_incoming_messages() { for { var message causal_broadcast_message diff --git a/broadcast/causal_test.go b/broadcast/causal_test.go index 0e20464..c3d0da9 100644 --- a/broadcast/causal_test.go +++ b/broadcast/causal_test.go @@ -6,7 +6,7 @@ import ( ) func TestCausalBroadcast(t *testing.T) { - peer_addrs := []string{"localhost:9991", "localhost:9992"} + peer_addrs := []string{"localhost:9981", "localhost:9982"} messages := [][]byte{randStringBytes(2 << 23), []byte("hello gofret!")} incoming_channels := []chan []byte{make(chan []byte), make(chan []byte)} done_signal := make(chan bool) diff --git a/broadcast/communication.go b/broadcast/communication.go index e6db60a..127c2e7 100644 --- a/broadcast/communication.go +++ b/broadcast/communication.go @@ -6,21 +6,23 @@ import ( "io" "log" "net" + "reflect" ) type communication struct { - listener net.Listener - address string + listener net.Listener + address string + close_signal chan bool } type communicator interface { Listen() (chan []byte, error) - CloseConnection() + CloseConnection() error Send(address string, message []byte) error } func new_communication(address string) communicator { - return &communication{address: address} + return &communication{address: address, close_signal: make(chan bool)} } func (c *communication) Listen() (chan []byte, error) { @@ -48,19 +50,26 @@ func (c *communication) Listen() (chan []byte, error) { go func() { for { - connection, err := l.Accept() - if err != nil { - log.Fatalf("something happened while trying to accept an incoming connection: %v", err) + select { + case <-c.close_signal: + return + default: + connection, err := l.Accept() + if err != nil { + log.Printf("%v, something happened while trying to accept an incoming connection: %v\n", reflect.TypeOf(err), err) + continue + } + go handle_request(connection) } - go handle_request(connection) } }() return incoming_messages, nil } -func (c *communication) CloseConnection() { - c.listener.Close() +func (c *communication) CloseConnection() error { + c.close_signal <- true + return c.listener.Close() } func (c *communication) Send(address string, message []byte) error { diff --git a/broadcast/fifo.go b/broadcast/fifo.go index c9a6be3..d82cb66 100644 --- a/broadcast/fifo.go +++ b/broadcast/fifo.go @@ -62,6 +62,10 @@ func (f *fifo_broadcast_container) Broadcast(content []byte) error { return nil } +func (f *fifo_broadcast_container) Close() error { + return f.broadcaster.Close() +} + func (f *fifo_broadcast_container) handle_incoming_messages() { for { var message fifo_broadcast_message diff --git a/broadcast/unordered.go b/broadcast/unordered.go index 987cb3a..961fa59 100644 --- a/broadcast/unordered.go +++ b/broadcast/unordered.go @@ -36,6 +36,10 @@ func (bc *broadcast_container) Init() (chan []byte, error) { return incoming_messages, nil } +func (bc *broadcast_container) Close() error { + return bc.Communicator.CloseConnection() +} + func UnorderedBroadcast(config Configuration) Broadcaster { new_broadcast := broadcast_container{peer_addrs: config.PeerAddresses, address: config.SelfAddress} return &new_broadcast