Skip to content

Commit

Permalink
Flow control for streams
Browse files Browse the repository at this point in the history
  • Loading branch information
martenrichter committed Jan 28, 2024
1 parent cb908dc commit 3080e22
Show file tree
Hide file tree
Showing 20 changed files with 1,163 additions and 274 deletions.
14 changes: 12 additions & 2 deletions main/lib/dom.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,16 @@ export interface WebTransportOptions {
congestionControl?: WebTransportCongestionControl
}

export interface WebTransportSendGroup {
getStats: () => Promise<WebTransportSendStreamStats>
}

export interface WebTransportSendStreamOptions {
sendGroup: WebTransportSendGroup|null
sendOrder?: number
waitUntilAvailable?: boolean
}

export interface WebTransportSession {
getStats: () => Promise<WebTransportStats>
readonly ready: Promise<void>
Expand All @@ -98,11 +108,11 @@ export interface WebTransportSession {
close: (closeInfo?: WebTransportCloseInfo) => void
readonly datagrams: WebTransportDatagramDuplexStream

createBidirectionalStream: () => Promise<WebTransportBidirectionalStream>
createBidirectionalStream: (opts?: WebTransportSendStreamOptions) => Promise<WebTransportBidirectionalStream>
/* a ReadableStream of WebTransportBidirectionalStream objects */
readonly incomingBidirectionalStreams: ReadableStream<WebTransportBidirectionalStream>

createUnidirectionalStream: () => Promise<WebTransportSendStream>
createUnidirectionalStream: (opts?: WebTransportSendStreamOptions) => Promise<WebTransportSendStream>
/* a ReadableStream of WebTransportReceiveStream objects */
readonly incomingUnidirectionalStreams: ReadableStream<WebTransportReceiveStream>
}
Expand Down
11 changes: 10 additions & 1 deletion main/lib/http2/browser/browser.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ export class Http2WebTransportBrowser {
this.initialSessionFlowControlWindow =
args?.initialSessionFlowControlWindow || 16 * 1024 // 16 KB

this.initialBidirectionalStreams =
args?.initialBidirectionalSendStreams || 100
this.initialUnidirectionalStreams =
args?.initialUnidirectionalSendStreams || 100

this.streamShouldAutoTuneReceiveWindow =
args.streamShouldAutoTuneReceiveWindow || false
this.streamFlowControlWindowSizeLimit =
Expand Down Expand Up @@ -203,7 +208,11 @@ export class Http2WebTransportBrowser {
sendWindowOffset: 0,
receiveWindowOffset: this.sessionFlowControlWindowSizeLimit,
shouldAutoTuneReceiveWindow: this.sessionShouldAutoTuneReceiveWindow,
receiveWindowSizeLimit: this.sessionFlowControlWindowSizeLimit
receiveWindowSizeLimit: this.sessionFlowControlWindowSizeLimit,
initialBidirectionalSendStreams: this.initialBidirectionalStreams, // TODO, once supported by node, use initial settings
initialBidirectionalReceiveStreams: this.initialBidirectionalStreams,
initialUnidirectionalSendStreams: this.initialUnidirectionalStreams, // TODO, once supported by node, use initial settings
initialUnidirectionalReceiveStreams: this.initialUnidirectionalStreams
}),
reliable: true
}
Expand Down
72 changes: 36 additions & 36 deletions main/lib/http2/browser/browserparser.js
Original file line number Diff line number Diff line change
Expand Up @@ -116,37 +116,48 @@ export class BrowserParser extends ParserBase {
case ParserBase.WT_STOP_SENDING:
{
const streamid = readVarInt(bufferstate)
const stream = this.wtstreams.get(streamid)
const code = readVarInt(bufferstate)
if (stream && typeof code !== 'undefined')
stream.jsobj.onStreamRecvSignal({
code,
nettask:
if (typeof streamid !== 'undefined') {
const stream = this.wtstreams.get(streamid)
const code = readVarInt(bufferstate)
if (stream && typeof code !== 'undefined') {
stream.onStreamSignal(
type === ParserBase.WT_RESET_STREAM
? 'resetStream'
: 'stopSending'
})
)
stream.jsobj.onStreamRecvSignal({
code: Number(code),
nettask:
type === ParserBase.WT_RESET_STREAM
? 'resetStream'
: 'stopSending'
})
}
}
}
break
case ParserBase.WT_STREAM_WOFIN:
case ParserBase.WT_STREAM_WFIN:
{
const streamid = Number(readVarInt(bufferstate))
const streamid = readVarInt(bufferstate)

if (typeof streamid !== 'undefined') {
let object = this.wtstreams.get(streamid)
if (!object) {
object = this.newStream(streamid)
if (!object) return // stream broken
}
// TODO submit data
if (offsetend - bufferstate.offset >= 0) {
const fin = type === ParserBase.WT_STREAM_WFIN
if (fin) object.onFin()
object.recvData({
data: new Uint8Array(
bufferstate.buffer.buffer,
bufferstate.buffer.byteOffset + bufferstate.offset,
offsetend - bufferstate.offset
),
fin: type === ParserBase.WT_STREAM_WFIN
fin
})
}
}
Expand All @@ -156,46 +167,35 @@ export class BrowserParser extends ParserBase {
this.onMaxData(readVarInt(bufferstate))
break
case ParserBase.WT_MAX_STREAM_DATA:
this.onMaxStreamData(readVarInt(bufferstate), readVarInt(bufferstate))
{
const streamid = readVarInt(bufferstate)
const offset = readVarInt(bufferstate)
if (typeof streamid !== 'undefined' && typeof offset !== 'undefined')
this.onMaxStreamData(streamid, offset)
}
break
case ParserBase.WT_MAX_STREAMS_BIDI:
// this.recvSession({ maxstreams: readVarInt(bufferstate), type })
this.onMaxStreamBiDi(readVarInt(bufferstate))
break
case ParserBase.WT_MAX_STREAMS_UNIDI:
// this.recvSession({ maxstreams: readVarInt(bufferstate), type })
this.onMaxStreamUniDi(readVarInt(bufferstate))
break
case ParserBase.WT_DATA_BLOCKED:
this.onDataBlocked(readVarInt(bufferstate))
break
case ParserBase.WT_STREAM_DATA_BLOCKED:
this.onStreamDataBlocked(
readVarInt(bufferstate),
readVarInt(bufferstate)
)
{
const streamid = readVarInt(bufferstate)
const offset = readVarInt(bufferstate)
if (typeof streamid !== 'undefined' && typeof offset !== 'undefined')
this.onStreamDataBlocked(streamid, offset)
}
break
case ParserBase.WT_STREAMS_BLOCKED_UNIDI:
/* {
const streamid = readVarInt(bufferstate)
const object = this.wtstreams.get(streamid)
if (object)
this.recvStream({
maxstreams: readVarInt(bufferstate),
type,
object
})
} */
this.onStreamsBlockedUnidi(readVarInt(bufferstate))
break
case ParserBase.WT_STREAMS_BLOCKED_BIDI:
/* {
const streamid = readVarInt(bufferstate)
const object = this.wtstreams.get(streamid)
if (object)
this.recvStream({
maxstreams: readVarInt(bufferstate),
type,
streamid
})
} */
this.onStreamsBlockedBidi(readVarInt(bufferstate))
break
case ParserBase.DATAGRAM:
this.session.jsobj.onDatagramReceived({
Expand Down
4 changes: 2 additions & 2 deletions main/lib/http2/flowcontroller.js
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ export class FlowController {

maybeSendBlocked() {
if (
this.sendWindowSize() !== 0 ||
this.sendWindowSize() !== 0n ||
this.lastBlockedSendWindowOffset >= this.sendWindowOffset
) {
return
Expand Down Expand Up @@ -342,7 +342,7 @@ export class FlowController {

sendWindowSize() {
if (this.bytesSent > this.sendWindowOffset) {
return 0
return 0n
}
return this.sendWindowOffset - this.bytesSent
}
Expand Down
109 changes: 58 additions & 51 deletions main/lib/http2/node/capsuleparser.js
Original file line number Diff line number Diff line change
Expand Up @@ -121,28 +121,41 @@ export class Http2CapsuleParser extends ParserBaseHttp2 {
case Http2CapsuleParser.WT_STOP_SENDING:
{
const streamid = readVarInt(bufferstate)
const stream = this.wtstreams.get(streamid)
const code = readVarInt(bufferstate)
if (stream && typeof code !== 'undefined')
stream.jsobj.onStreamRecvSignal({
code,
nettask:
if (typeof streamid !== 'undefined') {
const stream = this.wtstreams.get(streamid)
const code = readVarInt(bufferstate)
if (stream && typeof code !== 'undefined') {
stream.onStreamSignal(
type === Http2CapsuleParser.WT_RESET_STREAM
? 'resetStream'
: 'stopSending'
})
)
stream.jsobj.onStreamRecvSignal({
code: Number(code),
nettask:
type === Http2CapsuleParser.WT_RESET_STREAM
? 'resetStream'
: 'stopSending'
})
}
}
}
break
case Http2CapsuleParser.WT_STREAM_WOFIN:
case Http2CapsuleParser.WT_STREAM_WFIN:
streamid = Number(readVarInt(bufferstate))
streamid = readVarInt(bufferstate)
if (typeof streamid !== 'undefined') {
let object = this.wtstreams.get(streamid)
if (!object) {
object = this.newStream(streamid)
if (!object) return // stream broken
}
// TODO submit data
if (offsetend - bufferstate.offset >= 0) {
const fin =
type === Http2CapsuleParser.WT_STREAM_WFIN &&
bufferstate.size >= length + offsetbegin
if (fin) object.onFin()
object.recvData({
data:
offsetend - bufferstate.offset > 0
Expand All @@ -153,9 +166,7 @@ export class Http2CapsuleParser extends ParserBaseHttp2 {
offsetend - bufferstate.offset
)
: undefined,
fin:
type === Http2CapsuleParser.WT_STREAM_WFIN &&
bufferstate.size >= length + offsetbegin
fin
})
}
}
Expand All @@ -164,49 +175,41 @@ export class Http2CapsuleParser extends ParserBaseHttp2 {
this.onMaxData(readVarInt(bufferstate))
break
case Http2CapsuleParser.WT_MAX_STREAM_DATA:
this.onMaxStreamData(
readVarInt(bufferstate),
readVarInt(bufferstate)
)
{
const streamid = readVarInt(bufferstate)
const offset = readVarInt(bufferstate)
if (
typeof streamid !== 'undefined' &&
typeof offset !== 'undefined'
)
this.onMaxStreamData(streamid, offset)
}
break
case Http2CapsuleParser.WT_MAX_STREAMS_BIDI:
// this.recvSession({ maxstreams: readVarInt(bufferstate), type })
this.onMaxStreamBiDi(readVarInt(bufferstate))
break
case Http2CapsuleParser.WT_MAX_STREAMS_UNIDI:
// this.recvSession({ maxstreams: readVarInt(bufferstate), type })
this.onMaxStreamUniDi(readVarInt(bufferstate))
break
case Http2CapsuleParser.WT_DATA_BLOCKED:
this.onDataBlocked(readVarInt(bufferstate))
break
case Http2CapsuleParser.WT_STREAM_DATA_BLOCKED:
this.onStreamDataBlocked(
readVarInt(bufferstate),
readVarInt(bufferstate)
)
{
const streamid = readVarInt(bufferstate)
const offset = readVarInt(bufferstate)
if (
typeof streamid !== 'undefined' &&
typeof offset !== 'undefined'
)
this.onStreamDataBlocked(streamid, offset)
}
break
case Http2CapsuleParser.WT_STREAMS_BLOCKED_UNIDI:
/* {
const streamid = readVarInt(bufferstate)
const object = this.wtstreams.get(streamid)
if (object)
this.recvStream({
maxstreams: readVarInt(bufferstate),
type,
object
})
} */
this.onStreamsBlockedUnidi(readVarInt(bufferstate))
break
case Http2CapsuleParser.WT_STREAMS_BLOCKED_BIDI:
/* {
const streamid = readVarInt(bufferstate)
const object = this.wtstreams.get(streamid)
if (object)
this.recvStream({
maxstreams: readVarInt(bufferstate),
type,
streamid
})
} */
this.onStreamsBlockedBidi(readVarInt(bufferstate))
break
case Http2CapsuleParser.DATAGRAM:
this.session.jsobj.onDatagramReceived({
Expand Down Expand Up @@ -241,17 +244,21 @@ export class Http2CapsuleParser extends ParserBaseHttp2 {
if (this.rstreamid) {
// TODO submitData
const object = this.wtstreams.get(this.rstreamid)
const fin =
this.rtype === Http2CapsuleParser.WT_STREAM_WFIN &&
this.remainlength === clength
// TODO submit data
object.recvData({
data: new Uint8Array(
bufferstate.buffer.buffer,
bufferstate.buffer.byteOffset + bufferstate.offset,
clength
),
fin:
this.rtype === Http2CapsuleParser.WT_STREAM_WFIN &&
this.remainlength === clength
})
if (object) {
if (fin) object.onFin()
object.recvData({
data: new Uint8Array(
bufferstate.buffer.buffer,
bufferstate.buffer.byteOffset + bufferstate.offset,
clength
),
fin
})
}
}

this.remainlength = this.remainlength - clength
Expand Down
13 changes: 11 additions & 2 deletions main/lib/http2/node/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ export class Http2WebTransportClient {
this.initialSessionFlowControlWindow =
args?.initialSessionFlowControlWindow || 16 * 1024 // 16 KB

this.initialBidirectionalStreams =
args?.initialBidirectionalSendStreams || 100
this.initialUnidirectionalStreams =
args?.initialUnidirectionalSendStreams || 100

this.streamShouldAutoTuneReceiveWindow =
args.streamShouldAutoTuneReceiveWindow || false
this.streamFlowControlWindowSizeLimit =
Expand Down Expand Up @@ -84,8 +89,8 @@ export class Http2WebTransportClient {
0x2b61: this.initialSessionFlowControlWindow, // SETTINGS_WEBTRANSPORT_INITIAL_MAX_DATA
0x2b62: this.initialStreamFlowControlWindow, // SETTINGS_WEBTRANSPORT_INITIAL_MAX_STREAM_DATA_UNI
0x2b63: this.initialStreamFlowControlWindow, // SETTINGS_WEBTRANSPORT_INITIAL_MAX_STREAM_DATA_BIDI
0x2b64: 0xffffff, // SETTINGS_WEBTRANSPORT_INITIAL_MAX_STREAMS_UNI
0x2b65: 0xffffff // SETTINGS_WEBTRANSPORT_INITIAL_MAX_STREAMS_BIDI
0x2b64: this.initialUnidirectionalStreams, // SETTINGS_WEBTRANSPORT_INITIAL_MAX_STREAMS_UNI
0x2b65: this.initialBidirectionalStreams // SETTINGS_WEBTRANSPORT_INITIAL_MAX_STREAMS_BIDI
}
},
localPort: this.localPort,
Expand Down Expand Up @@ -223,6 +228,10 @@ export class Http2WebTransportClient {
this.streamShouldAutoTuneReceiveWindow,
streamReceiveWindowSizeLimit: this.streamFlowControlWindowSizeLimit
}),
initialBidirectionalSendStreams: this.initialBidirectionalStreams, // TODO, once supported by node, use initial settings
initialBidirectionalReceiveStreams: this.initialBidirectionalStreams,
initialUnidirectionalSendStreams: this.initialUnidirectionalStreams, // TODO, once supported by node, use initial settings
initialUnidirectionalReceiveStreams: this.initialUnidirectionalStreams,
sendWindowOffset: this.sessionFlowControlWindowSizeLimit,
receiveWindowOffset: this.sessionFlowControlWindowSizeLimit,
shouldAutoTuneReceiveWindow: this.sessionShouldAutoTuneReceiveWindow,
Expand Down
Loading

0 comments on commit 3080e22

Please sign in to comment.