Skip to content

Commit

Permalink
only create stream on first read in NativeStreamDataProcessor to prev…
Browse files Browse the repository at this point in the history
…ent concurrent reads when resseting the DataProcessor right after creating it
  • Loading branch information
KurtThiemann committed Oct 4, 2022
1 parent ef31dcb commit 4d10a55
Showing 1 changed file with 36 additions and 6 deletions.
42 changes: 36 additions & 6 deletions src/DataProcessor/NativeStreamDataProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,31 +2,51 @@ import Constants from '../Constants.js';
import AbstractDataProcessor from './AbstractDataProcessor.js';

export default class NativeStreamDataProcessor extends AbstractDataProcessor {
/** @type {ReadableStream} */ stream;
/** @type {TransformStream} */ processor;
/** @type {ReadableStreamDefaultReader} */ streamReader;
/** @type {?boolean} */ static supported = null;
/** @type {?ReadableStream} */ stream = null;
/** @type {?TransformStream} */ processor = null;
/** @type {?ReadableStreamDefaultReader} */ streamReader = null;

/**
* @return {boolean}
*/
static isSupported() {
if (this.supported === null) {
try {
// noinspection JSUnresolvedFunction
new CompressionStream('deflate-raw');
} catch (e) {
this.supported = false;
return this.supported;
}
this.supported = true;
}

return this.supported;
}

/**
* @inheritDoc
*/
constructor(reader, createPreCrc = false, createPostCrc = false) {
super(reader, createPreCrc, createPostCrc);
this.reset();
}

/**
* @inheritDoc
*/
async generate(length) {
if(this.streamReader === null) {
this.resetStreams();
}
let {value} = await this.streamReader.read();
return value ?? null;
}

/**
* @inheritDoc
*/
reset() {
super.reset();
resetStreams() {
this.processor = this.createProcessorStream();
this.stream = new ReadableStream({
pull: async (controller) => {
Expand All @@ -40,6 +60,16 @@ export default class NativeStreamDataProcessor extends AbstractDataProcessor {
this.streamReader = this.stream.pipeThrough(this.processor).getReader();
}

/**
* @inheritDoc
*/
reset() {
super.reset();
this.processor = null;
this.stream = null;
this.streamReader = null;
}

/**
* @return {TransformStream}
* @abstract
Expand Down

0 comments on commit 4d10a55

Please sign in to comment.