From 402d61d762f6197e1bd8759bb81df3415dddb367 Mon Sep 17 00:00:00 2001 From: Christian Hotz-Behofsits Date: Tue, 31 Dec 2013 14:30:50 +0100 Subject: [PATCH] multiconnectin works --- .../src/peers/connections/connection.dart | 8 +- .../peers/connections/jswebrtcconnection.dart | 27 ++- lib/Messenger/src/peers/jswebrtcpeer.dart | 20 ++- .../src/peers/messagepassingpeer.dart | 4 +- lib/Messenger/src/peers/peer.dart | 14 +- tests/jsrtcpeer.dart | 156 +++++++++++++++++- tests/unittests.dart | 2 +- 7 files changed, 199 insertions(+), 32 deletions(-) diff --git a/lib/Messenger/src/peers/connections/connection.dart b/lib/Messenger/src/peers/connections/connection.dart index 9c87058..bddd68d 100644 --- a/lib/Messenger/src/peers/connections/connection.dart +++ b/lib/Messenger/src/peers/connections/connection.dart @@ -10,16 +10,16 @@ abstract class Connection{ ///completer for connection ///TODO: use another generic type - Completer connection_completer; - Completer listen_completer; + Completer connection_completer; + Completer listen_completer; Connection([Logger logger=null]){ if (logger == null) this.log = new Logger("Connection"); else this.log = logger; //init - listen_completer = new Completer(); - connection_completer = new Completer(); + listen_completer = new Completer(); + connection_completer = new Completer(); newMessageController = new StreamController(); } diff --git a/lib/Messenger/src/peers/connections/jswebrtcconnection.dart b/lib/Messenger/src/peers/connections/jswebrtcconnection.dart index 5c34cde..4bc82b0 100644 --- a/lib/Messenger/src/peers/connections/jswebrtcconnection.dart +++ b/lib/Messenger/src/peers/connections/jswebrtcconnection.dart @@ -32,7 +32,10 @@ class JsWebRtcConnection extends Connection{ /* set channel events */ _dc.onmessage = (MessageEvent event)=>newMessageController.add(new NewMessageEvent(new Message.fromString(event.data))); - _dc.onopen = (_)=>changeReadyState(new ReadyState.fromDataChannel(_dc.readyState)); + _dc.onopen = (_){ + changeReadyState(new ReadyState.fromDataChannel(_dc.readyState)); + listen_completer.complete(sc.id); + }; _dc.onclose = (_)=>changeReadyState(new ReadyState.fromDataChannel(_dc.readyState)); _dc.onerror = (x)=>log.shout("rtc error callback: " + x.toString()); @@ -64,7 +67,7 @@ class JsWebRtcConnection extends Connection{ */ case MessageType.PEER_ID: log.fine("PEER_ID received: connection established"); - listen_completer.complete(sc.id.toString()); + //listen_completer.complete(sc.id); sc.send(new Message(this.hashCode.toString(), MessageType.AKN_PEER_ID)); changeReadyState(ReadyState.CONNECTED); @@ -72,9 +75,7 @@ class JsWebRtcConnection extends Connection{ break; case MessageType.AKN_PEER_ID: log.fine("AKN_PEER_ID received: connection established"); - connection_completer.complete(sc.id.toString()); - changeReadyState(ReadyState.CONNECTED); break; case MessageType.WEBRTC_OFFER: log.fine("received sdp offer"); @@ -127,7 +128,7 @@ class JsWebRtcConnection extends Connection{ /** * listen for incoming connections */ - Future listen(SignalingChannel sc){ + Future listen(SignalingChannel sc){ log.finest("start listening"); this.sc = sc; @@ -163,7 +164,7 @@ class JsWebRtcConnection extends Connection{ /** * connect to WebrtcPeer */ - Future connect(SignalingChannel sc){ + Future connect(SignalingChannel sc){ log.finest("try to connect"); //listen for incoming connection @@ -175,7 +176,12 @@ class JsWebRtcConnection extends Connection{ _dc = _rtcPeerConnection.createDataChannel("sendDataChannel", js.map(dataChannelOptions)); log.fine('created new data channel'); - _dc.onopen = (_)=>changeReadyState(new ReadyState.fromDataChannel(_dc.readyState)); + _dc.onopen = (_){ + changeReadyState(new ReadyState.fromDataChannel(_dc.readyState)); + connection_completer.complete(sc.id); + + changeReadyState(ReadyState.CONNECTED); + }; _dc.onclose = (_)=>changeReadyState(_dc.readyState); _dc.onmessage = (MessageEvent event)=>newMessageController.add(new NewMessageEvent(new Message.fromString(event.data))); @@ -204,6 +210,13 @@ class JsWebRtcConnection extends Connection{ send(Message msg){ //serialize + if(_dc == null){ + log.warning("could not send message. No DataChannel open!"); + return; + } + + //TODO: not ready, pipe message. + log.info("send message to : ${sc.id.toString()}"); _dc.send(Message.serialize(msg)); } diff --git a/lib/Messenger/src/peers/jswebrtcpeer.dart b/lib/Messenger/src/peers/jswebrtcpeer.dart index 02ee9f4..7f1b477 100644 --- a/lib/Messenger/src/peers/jswebrtcpeer.dart +++ b/lib/Messenger/src/peers/jswebrtcpeer.dart @@ -11,12 +11,13 @@ class JsWebRtcPeer extends Peer{ Stream listen(SignalingChannel sc){ JsWebRtcConnection c = new JsWebRtcConnection(log); - Future f = c.listen(sc); + Future f = c.listen(sc); //add to list of connections. index is identity of other peer //TODO: test if identity is unique - f.then((String hash){ - _connections[hash] = c; + f.then((int id){ + _connections[id] = c; + log.info("new connection added! (now: ${connections.length.toString()})"); newConnectionController.add(new NewConnectionEvent(c)); //redirect messages @@ -28,10 +29,11 @@ class JsWebRtcPeer extends Peer{ Stream connect(SignalingChannel sc){ JsWebRtcConnection c = new JsWebRtcConnection(log); - Future f = c.connect(sc); + Future f = c.connect(sc); - f.then((String hash) { - _connections[hash] = c; + f.then((int id) { + _connections[id] = c; + log.info("new connection added! (now: ${connections.length.toString()})"); newConnectionController.add(new NewConnectionEvent(c)); //redirect messages @@ -70,13 +72,13 @@ class JsWebRtcPeer extends Peer{ * * @ TODO: check if datachannel open. else throw exception */ - send(String name, Message msg){ + send(int id, Message msg){ log.info("send message!"); - if(!_connections.containsKey(name)) + if(!_connections.containsKey(id)) throw new StateError("list of connections does not contain peer ${name}"); - _connections[name].send(msg.toString()); + _connections[id].send(msg); } } \ No newline at end of file diff --git a/lib/Messenger/src/peers/messagepassingpeer.dart b/lib/Messenger/src/peers/messagepassingpeer.dart index d3eda55..c0f55b1 100644 --- a/lib/Messenger/src/peers/messagepassingpeer.dart +++ b/lib/Messenger/src/peers/messagepassingpeer.dart @@ -5,7 +5,7 @@ import './peer.dart'; import '../message.dart'; import '../events/newmessageevent.dart'; */ - +/* class MessagePassingPeer extends Peer{ connect(MessagePassingPeer p){ //TODO: connect?! @@ -17,4 +17,4 @@ class MessagePassingPeer extends Peer{ } close(){} -} \ No newline at end of file +}*/ \ No newline at end of file diff --git a/lib/Messenger/src/peers/peer.dart b/lib/Messenger/src/peers/peer.dart index d11f89b..099028a 100644 --- a/lib/Messenger/src/peers/peer.dart +++ b/lib/Messenger/src/peers/peer.dart @@ -22,7 +22,7 @@ abstract class Peer{ ///new message event stream StreamController newMessageController; - Map _connections; + Map _connections; static List peers = new List(); @@ -55,7 +55,7 @@ abstract class Peer{ //init newMessageController = new StreamController.broadcast(); newConnectionController = new StreamController.broadcast(); - _connections = new Map(); + _connections = new Map(); listen_completer = new Completer(); connection_completer = new Completer(); @@ -69,7 +69,7 @@ abstract class Peer{ /** * connections getter */ - Map get connections => _connections; + Map get connections => _connections; /** * get identifer of this object @@ -116,18 +116,18 @@ abstract class Peer{ * @param String receiverId receiver of message * @param Message msg is content of message */ - send(String receiverId, Message msg); + send(int receiverId, Message msg); /** * send string to other peer */ - sendString(String receiverId, String msg) => send(receiverId, new Message(msg)); + sendString(int receiverId, String msg) => send(receiverId, new Message(msg)); /** * send message to multiple peers */ - broadcast(List receiverIds, Message msg){ - receiverIds.forEach((String id){ + broadcast(Iterable receiverIds, Message msg){ + receiverIds.forEach((int id){ this.send(id, msg); }); } diff --git a/tests/jsrtcpeer.dart b/tests/jsrtcpeer.dart index 1c10d15..c4aee74 100644 --- a/tests/jsrtcpeer.dart +++ b/tests/jsrtcpeer.dart @@ -128,11 +128,11 @@ void main() { //set callbacks alice.newConnectionController.stream.listen( - expectAsync1((_)=>expect(alice.connections.keys.first, bob_sc.id.toString())) + expectAsync1((_)=>expect(alice.connections.keys.first, bob_sc.id)) ); bob.newConnectionController.stream.listen( - expectAsync1((_)=>expect(bob.connections.keys.first, alice_sc.id.toString())) + expectAsync1((_)=>expect(bob.connections.keys.first, alice_sc.id)) ); //connect peer @@ -141,6 +141,158 @@ void main() { }); + group('multi connections', (){ + + test('three connections at the same time', (){ + JsWebRtcPeer alice = new JsWebRtcPeer("alice_m3", Level.OFF); + JsWebRtcPeer bob = new JsWebRtcPeer("bob_m3", Level.OFF); + JsWebRtcPeer clark = new JsWebRtcPeer("clark_m3", Level.OFF); + + //setup signaling channels + MessagePassing alice_bob_sc = new MessagePassing(); + MessagePassing bob_alice_sc = new MessagePassing(); + + MessagePassing alice_clark_sc = new MessagePassing(); + MessagePassing clark_alice_sc = new MessagePassing(); + + MessagePassing clark_bob_sc = new MessagePassing(); + MessagePassing bob_clark_sc = new MessagePassing(); + + alice_bob_sc.connect(bob_alice_sc.identityMap()); + bob_alice_sc.connect(alice_bob_sc.identityMap()); + + alice_clark_sc.connect(clark_alice_sc.identityMap()); + clark_alice_sc.connect(alice_clark_sc.identityMap()); + + clark_bob_sc.connect(bob_clark_sc.identityMap()); + bob_clark_sc.connect(clark_bob_sc.identityMap()); + + + //set callbacks + + int alice_c = 1; + int bob_c = 1; + int clark_c = 1; + + alice.newConnectionController.stream.listen( + expectAsync1((_)=>expect(alice.connections.length, alice_c++), count:2) + ); + + bob.newConnectionController.stream.listen( + expectAsync1((_)=>expect(bob.connections.length, bob_c++), count:2) + ); + + clark.newConnectionController.stream.listen( + expectAsync1((_)=>expect(clark.connections.length, clark_c++), count:2) + ); + + + //connect clark/bob bob/clark + bob.listen(clark_bob_sc); + clark.connect(bob_clark_sc); + + //connect alice/bob bob/alice + alice.listen(bob_alice_sc); + bob.connect(alice_bob_sc); + + //connect alice/clark clark/alice + alice.listen(clark_alice_sc); + clark.connect(alice_clark_sc); + + }); + + + + test('3 clients send messages', (){ + JsWebRtcPeer alice = new JsWebRtcPeer("alice_s3", Level.ALL); + JsWebRtcPeer bob = new JsWebRtcPeer("bob_s3", Level.ALL); + JsWebRtcPeer clark = new JsWebRtcPeer("clark_s3", Level.ALL); + + //setup signaling channels + MessagePassing alice_bob_sc = new MessagePassing(); + MessagePassing bob_alice_sc = new MessagePassing(); + + MessagePassing alice_clark_sc = new MessagePassing(); + MessagePassing clark_alice_sc = new MessagePassing(); + + MessagePassing clark_bob_sc = new MessagePassing(); + MessagePassing bob_clark_sc = new MessagePassing(); + + alice_bob_sc.connect(bob_alice_sc.identityMap()); + bob_alice_sc.connect(alice_bob_sc.identityMap()); + + alice_clark_sc.connect(clark_alice_sc.identityMap()); + clark_alice_sc.connect(alice_clark_sc.identityMap()); + + clark_bob_sc.connect(bob_clark_sc.identityMap()); + bob_clark_sc.connect(clark_bob_sc.identityMap()); + + + //set callbacks + + String s = "some random string"; + Message tm = new Message(s); + + //each sould receive two messages + alice.newMessageController.stream.listen( + expectAsync1((NewMessageEvent e){ + expect(e.data.msg, s); + }, count: 2) + ); + + bob.newMessageController.stream.listen( + expectAsync1((NewMessageEvent e){ + expect(e.data.msg, s); + }, count: 2) + ); + + clark.newMessageController.stream.listen( + expectAsync1((NewMessageEvent e){ + expect(e.data.msg, s); + }, count: 2) + ); + + + bob.newConnectionController.stream.listen((_){ + if(bob.connections.length > 1){ + logMessage("connected to more than 1 client. looks quite good"); + + bob.multicast(tm); + } + }); + + clark.newConnectionController.stream.listen((_){ + if(clark.connections.length > 1){ + logMessage("connected to more than 1 client. looks quite good"); + + clark.multicast(tm); + } + }); + + alice.newConnectionController.stream.listen((_){ + if(alice.connections.length > 1){ + logMessage("connected to more than 1 client. looks quite good"); + + alice.multicast(tm); + } + }); + + //connect clark/bob bob/clark + bob.listen(clark_bob_sc); + clark.connect(bob_clark_sc); + + //connect alice/bob bob/alice + alice.listen(bob_alice_sc); + bob.connect(alice_bob_sc); + + //connect alice/clark clark/alice + alice.listen(clark_alice_sc); + clark.connect(alice_clark_sc); + + }); + + }); + /** * test DataChannel's readyState opens diff --git a/tests/unittests.dart b/tests/unittests.dart index 12a3b58..1ecc8b8 100644 --- a/tests/unittests.dart +++ b/tests/unittests.dart @@ -16,7 +16,7 @@ void main() { /// setup html environment // override configuration to set custom timeout final HtmlEnhancedConfiguration sc = new HtmlEnhancedConfiguration(false); - sc.timeout = new Duration(seconds: 5); + sc.timeout = new Duration(seconds: 10); unittestConfiguration = sc; /*