Skip to content

Commit

Permalink
feat: add strict typescript config and refactor data channels
Browse files Browse the repository at this point in the history
  • Loading branch information
skyllo committed May 22, 2022
1 parent a414d12 commit f74539e
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 69 deletions.
97 changes: 64 additions & 33 deletions src/peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@ export default class Peer {

private readonly channels = new Map<string, RTCDataChannel>();

private readonly channelsPending = new Map<string, RTCDataChannelInit>();

private readonly emitter = new EventEmitter() as TypedEmitter<PeerEvents>;

private polite = POLITE_DEFAULT_VALUE;

private isActive = false;

private makingOffer = false;

private ignoreOffer = false;
Expand All @@ -46,6 +50,7 @@ export default class Peer {
/** Creates a peer instance */
public constructor(options?: PeerOptions) {
this.options = { ...this.options, ...options };
this.peerConn = this.init();
}

/** Initializes the peer connection */
Expand Down Expand Up @@ -80,7 +85,7 @@ export default class Peer {
clearTimeout(candidatesId);
candidates.push(event.candidate);
// return all candidates if finished gathering
if (this.peerConn && this.peerConn.iceGatheringState === 'complete') {
if (this.peerConn.iceGatheringState === 'complete') {
this.emit('onicecandidates', candidates);
} else {
// create timeout to return candidates after 200ms
Expand Down Expand Up @@ -129,17 +134,22 @@ export default class Peer {

this.peerConn.onnegotiationneeded = async () => {
try {
if (!this.isActive) return;

this.makingOffer = true;

const { channelName, channelOptions, enableDataChannels } = this.options;
if (enableDataChannels) {
// create data channel, needed to add "m=application" to SDP
this.getDataChannel(channelName, channelOptions);
this.addDataChannel(channelName, channelOptions);
}

const offer = await this.peerConn.createOffer(this.options.offerOptions);
if (this.peerConn.signalingState !== 'stable') return;

// add pending data channels
this.createDataChannelsPending();

console.log(`${this.options.name}.onnegotiationneeded()`);
offer.sdp = offer.sdp && this.options.sdpTransform(offer.sdp);
await this.peerConn.setLocalDescription(offer);
Expand All @@ -149,7 +159,9 @@ export default class Peer {
this.emit('signal', this.peerConn.localDescription);
}
} catch (err) {
this.error('Failed in negotiation needed', err);
if (err instanceof Error) {
this.error('Failed in negotiation needed', err);
}
} finally {
this.makingOffer = false;
}
Expand All @@ -160,7 +172,7 @@ export default class Peer {
const { channel } = event;
if (this.options.enableDataChannels) {
this.channels.set(channel.label, channel);
this.addDataChannel(channel);
this.addDataChannelEvents(channel);
}
};

Expand All @@ -170,7 +182,7 @@ export default class Peer {
public start({ polite = POLITE_DEFAULT_VALUE }: { polite?: boolean } = {}) {
try {
// reset peer if only local offer is set
if (this.peerConn?.signalingState === 'have-local-offer') {
if (this.peerConn.signalingState === 'have-local-offer') {
this.destroy();
}
if (this.isClosed()) {
Expand All @@ -179,13 +191,15 @@ export default class Peer {

console.log(`${this.options.name}.start()`);

this.isActive = true;
this.polite = polite;

// ⚡ triggers "negotiationneeded" event if connected
this.peerConn.restartIce();
} catch (err) {
this.error('Failed to start', err);
throw err;
if (err instanceof Error) {
this.error('Failed to start', err);
}
}
}

Expand All @@ -197,6 +211,7 @@ export default class Peer {

console.log(this.options.name, '<-', description.type);

this.isActive = true;
const offerCollision =
description.type === 'offer' &&
(this.makingOffer || this.peerConn.signalingState !== 'stable');
Expand All @@ -209,6 +224,9 @@ export default class Peer {

await this.peerConn.setRemoteDescription(description);
if (description.type === 'offer') {
// add pending data channels
this.createDataChannelsPending();

await this.peerConn.setLocalDescription();
if (this.peerConn.localDescription) {
console.log(this.options.name, '->', this.peerConn.localDescription.type);
Expand All @@ -217,16 +235,18 @@ export default class Peer {
}
this.polite = POLITE_DEFAULT_VALUE;
} catch (err) {
this.error('Failed to set local/remote descriptions', err);
if (err instanceof Error) {
this.error('Failed to set local/remote descriptions', err);
}
}
}

public async addIceCandidate(candidate: RTCIceCandidate) {
try {
console.log(this.options.name, '<-', 'icecandidate');
await this.peerConn?.addIceCandidate(candidate);
await this.peerConn.addIceCandidate(candidate);
} catch (err) {
if (!this.ignoreOffer) {
if (!this.ignoreOffer && err instanceof Error) {
this.error('Failed to addIceCandidate', err);
}
}
Expand All @@ -238,38 +258,45 @@ export default class Peer {
label: string = this.options.channelName
): boolean {
const channel = this.channels.get(label);
if (channel && channel.readyState === 'open' && data) {
if (channel?.readyState === 'open' && data) {
channel.send(data);
this.emit('channelData', { channel, data, source: 'outgoing' });
return true;
}
return false;
}

/** Gets existing open data channels or creates new ones */
public getDataChannel(
label: string = this.options.channelName,
opts: RTCDataChannelInit = {}
): RTCDataChannel | undefined {
/** Create a data channel */
public addDataChannel(label: string = this.options.channelName, opts: RTCDataChannelInit = {}) {
if (!this.options.enableDataChannels) {
this.error('Failed to createDataChannel as "enableDataChannels" is false');
return undefined;
this.error('Failed to addDataChannel as "enableDataChannels" is false');
return;
}
if (this.isClosed()) {
this.error('Failed to createDataChannel as peer connection is closed');
return undefined;
this.error('Failed to addDataChannel as peer connection is closed');
return;
}
if (this.channels.has(label)) {
return this.channels.get(label);
this.channelsPending.set(label, opts);
if (this.isActive) {
this.createDataChannelsPending();
}
// ⚡ triggers "negotiationneeded" event if connected and no other data channels already added
const channel = this.peerConn.createDataChannel(label, opts);
this.channels.set(channel.label, channel);
this.addDataChannel(channel);
return channel;
}

private addDataChannel(channel: RTCDataChannel) {
public getDataChannel(label: string = this.options.channelName) {
return this.channels.get(label);
}

private createDataChannelsPending() {
Array.from(this.channelsPending.entries()).forEach(([key, value]) => {
// ⚡ triggers "negotiationneeded" event if connected and no other data channels already added
const channel = this.peerConn.createDataChannel(key, value);
this.channels.set(channel.label, channel);
this.addDataChannelEvents(channel);
});
this.channelsPending.clear();
}

private addDataChannelEvents(channel: RTCDataChannel) {
// setup data channel events
channel.onopen = () => this.emit('channelOpen', { channel });
channel.onerror = (ev: Event) => {
Expand All @@ -289,8 +316,10 @@ export default class Peer {
public destroy() {
if (!this.isClosed()) {
this.polite = POLITE_DEFAULT_VALUE;
this.isActive = false;
this.makingOffer = false;
this.ignoreOffer = false;
this.channelsPending.clear();
this.peerConn.close();
console.log(`${this.options.name}.disconnected()`);
this.emit('disconnected');
Expand Down Expand Up @@ -335,19 +364,21 @@ export default class Peer {
this.removeTracks(true, true);
}
stream.getTracks().forEach((track) => this.addTrack(track));
return this.streamLocal;
} catch (err) {
this.error('Failed to set local stream', err);
throw err;
if (err instanceof Error) {
this.error('Failed to set local stream', err);
}
}
}

/** Add a track to the local stream */
public addTrack(track: MediaStreamTrack) {
this.streamLocal.addTrack(track);
this.emit('streamLocal', this.streamLocal);
// ⚡ triggers "negotiationneeded" event if connected
this.peerConn?.addTrack(track, this.streamLocal);
if (!this.isClosed()) {
// ⚡ triggers "negotiationneeded" event if connected
this.peerConn.addTrack(track, this.streamLocal);
}
}

/** Removes the local and remote stream of audio and/or video tracks */
Expand Down
93 changes: 61 additions & 32 deletions test/peer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const config: RollupOptions = {
};

test.beforeEach(async ({ page }) => {
if (!Array.isArray(config.output)) return;
// generate code from rollup
const bundle = await rollup(config);
const { output } = await bundle.generate(config.output?.[0]);
Expand Down Expand Up @@ -63,8 +64,8 @@ test('should set the local stream and be active', async ({ page }) => {
const isStreamLocalActive = await page.evaluate(async () => {
const stream = await window.Peer.getUserMedia();
const peer = getPeer();
const streamLocal = peer.addStream(stream);
return streamLocal.active;
peer.addStream(stream);
return peer.getStreamLocal().active;
});

expect(isStreamLocalActive).toEqual(true);
Expand Down Expand Up @@ -119,26 +120,15 @@ test('should not reset a new connection', async ({ page }) => {
expect(isEqual).toEqual(true);
});

test('should reset an stable connection', async ({ page }) => {
const isEqual = await page.evaluate(
() =>
new Promise(async (resolve) => {
const peer1 = getPeer({ name: 'peer1' });
const peer2 = getPeer({ name: 'peer2' });

const stream = await window.Peer.getUserMedia();

peer2.on('connected', () => {
const p1 = peer2.get();
peer2.init();
const p2 = peer2.get();
resolve(p1 !== p2);
});

setupPeers(peer1, peer2, stream);
peer1.start();
})
);
test('should reset a destroyed connection', async ({ page }) => {
const isEqual = await page.evaluate(() => {
const peer = getPeer();
const p1 = peer.get();
peer.destroy();
peer.init();
const p2 = peer.get();
return p1 !== p2;
});

expect(isEqual).toEqual(true);
});
Expand Down Expand Up @@ -231,7 +221,7 @@ test('should fail to replace track on peer', async ({ page }) => {
await peer1.replaceTrack(newTrack, oldTrack);
} catch (err) {
// eslint-disable-next-line @typescript-eslint/no-unsafe-member-access
if (err.name === 'InvalidModificationError') {
if (err instanceof Error && err.name === 'InvalidModificationError') {
resolve();
} else {
reject(err);
Expand Down Expand Up @@ -375,6 +365,45 @@ test('should send data to other peer using data channels', async ({ page }) => {
);
});

test('should send data to other peer using dynamic data channels', async ({ page }) => {
await page.evaluate(
() =>
new Promise<void>(async (resolve, reject) => {
const peer1 = getPeer({
name: 'peer1',
enableDataChannels: true,
});
const peer2 = getPeer({
name: 'peer2',
enableDataChannels: true,
});

const stream = await window.Peer.getUserMedia();

peer1.on('connected', () => {
peer1.addDataChannel('test');
});

peer1.on('channelOpen', () => {
setTimeout(() => {
peer2.send('hello world', 'test');
}, 500);
});

peer1.on('channelData', ({ channel, data, source }) => {
if (channel.label === 'test' && data === 'hello world' && source === 'incoming') {
resolve();
} else {
reject(new Error('did not get correct channel data'));
}
});

setupPeers(peer1, peer2, stream);
peer1.start();
})
);
});

test('should send data to other peer then close using negotiated data channels', async ({
page,
}) => {
Expand All @@ -384,30 +413,30 @@ test('should send data to other peer then close using negotiated data channels',
const peer1 = getPeer({ name: 'peer1', enableDataChannels: true });
const peer2 = getPeer({ name: 'peer2', enableDataChannels: true });

peer1.addDataChannel('extraMessages', { negotiated: true, id: 0 });
peer2.addDataChannel('extraMessages', { negotiated: true, id: 0 });

const stream = await window.Peer.getUserMedia();

peer1.on('channelData', ({ data }) => {
if (data !== 'hello world') {
reject(new Error('did not get correct channel data'));
} else {
const channel = peer1.getDataChannel();
const channel = peer1.getDataChannel('extraMessages');
channel?.close();
}
});

peer1.on('channelClosed', () => {
resolve();
});

peer2.on('connected', () => {
peer1.getDataChannel('extraMessages', { negotiated: true, id: 0 });
peer2.getDataChannel('extraMessages', { negotiated: true, id: 0 });

peer2.on('channelOpen', () => {
setTimeout(() => {
peer2.send('hello world', 'extraMessages');
}, 500);
});

peer1.on('channelClosed', () => {
resolve();
});

setupPeers(peer1, peer2, stream);
peer1.start();

Expand Down
Loading

0 comments on commit f74539e

Please sign in to comment.