Skip to content

Commit

Permalink
feat(generator): create a toolbox (#8295)
Browse files Browse the repository at this point in the history
  • Loading branch information
fbeauchamp authored Feb 18, 2025
1 parent 4b8cdf8 commit 9723075
Show file tree
Hide file tree
Showing 15 changed files with 738 additions and 5 deletions.
58 changes: 58 additions & 0 deletions @vates/generator-toolbox/.USAGE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
A toolbox to ease the use of generator

### Timeout

wrap a source async generator to have it throw an error when timeout is reached
timeout is a positive number in milliseconds

```js
import { Timeout } from '@vates/generator-toolbox'

const wrappedGenerator = new Timeout(sourceGenerator, timeout)
```

### Throttle

wrap a source async generator to have it respect a max speed ( in bytes per seconds).
speed is either a strictly positive number or a function returning a strictly positive number. A speed change will be used for the next emitted packet.

The source generator must yield object with a length property.

Optimized for small yields regarding to the speed, since it won't split incoming packet.

If the generator reached the max speed it will be paused, limiting memory consumption.

```js
import { Throttle } from '@vates/generator-toolbox'

const wrappedGenerator = new Throttle(sourceGenerator, speed)
```

### Synchronized

Fork a generator. The rules ares:

- if the source returns, all the forks returns
- if the forks errors, all the forks errors with the same error
- if a the fork return , it is stopped, but the generator continue with the other
- if a the fork error , it is stopped, but the generator continue with the other
- if all the fork return , the source is stopped
- if all the fork error , the source is errored with the last error
- the source start producing a packet when the fastest forked ask for it
- the source forks get the packet only when all the forks asked for it, no buffer stores in memory

```ts
import { Synchronized } from '@vates/generator-toolbox'

async function consume(generator: AsyncGenerator) {
for await (const val of generator) {
console.log({ val })
}
}
const forker = new Synchronized(generator)
const first = forker.fork('first')
const second = forker.fork('second')
await Promise.all([consume(first), consume(second)])
```

Note: you can stop early a generator by calling `generator.return()`, and you can stop in in error by calling `generator.throw(error)`
1 change: 1 addition & 0 deletions @vates/generator-toolbox/.npmignore
89 changes: 89 additions & 0 deletions @vates/generator-toolbox/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
<!-- DO NOT EDIT MANUALLY, THIS FILE HAS BEEN GENERATED -->

# @vates/generator-toolbox

[![Package Version](https://badgen.net/npm/v/@vates/generator-toolbox)](https://npmjs.org/package/@vates/generator-toolbox) ![License](https://badgen.net/npm/license/@vates/generator-toolbox) [![PackagePhobia](https://badgen.net/bundlephobia/minzip/@vates/generator-toolbox)](https://bundlephobia.com/result?p=@vates/generator-toolbox) [![Node compatibility](https://badgen.net/npm/node/@vates/generator-toolbox)](https://npmjs.org/package/@vates/generator-toolbox)

## Install

Installation of the [npm package](https://npmjs.org/package/@vates/generator-toolbox):

```sh
npm install --save @vates/generator-toolbox
```

## Usage

A toolbox to ease the use of generator

### Timeout

wrap a source async generator to have it throw an error when timeout is reached
timeout is a positive number in milliseconds

```js
import { Timeout } from '@vates/generator-toolbox'

const wrappedGenerator = new Timeout(sourceGenerator, timeout)
```

### Throttle

wrap a source async generator to have it respect a max speed ( in bytes per seconds).
speed is either a strictly positive number or a function returning a strictly positive number. A speed change will be used for the next emitted packet.

The source generator must yield object with a length property.

Optimized for small yields regarding to the speed, since it won't split incoming packet.

If the generator reached the max speed it will be paused, limiting memory consumption.

```js
import { Throttle } from '@vates/generator-toolbox'

const wrappedGenerator = new Throttle(sourceGenerator, speed)
```

### Synchronized

Fork a generator. The rules ares:

- if the source returns, all the forks returns
- if the forks errors, all the forks errors with the same error
- if a the fork return , it is stopped, but the generator continue with the other
- if a the fork error , it is stopped, but the generator continue with the other
- if all the fork return , the source is stopped
- if all the fork error , the source is errored with the last error
- the source start producing a packet when the fastest forked ask for it
- the source forks get the packet only when all the forks asked for it, no buffer stores in memory

```ts
import { Synchronized } from '@vates/generator-toolbox'

async function consume(generator: AsyncGenerator) {
for await (const val of generator) {
console.log({val})
}
}
const forker = new Synchronized(generator)
const first = forker.fork('first')
const second = forker.fork('second')
await Promise.all([consume(first), consume(second)])
```

Note: you can stop early a generator by calling `generator.return()`, and you can stop in in error by calling `generator.throw(error)`

## Contributions

Contributions are _very_ welcomed, either on the documentation or on
the code.

You may:

- report any [issue](https://github.com/vatesfr/xen-orchestra/issues)
you've encountered;
- fork and create a pull request.

## License

[MIT](https://spdx.org/licenses/MIT) © [Vates SAS](https://vates.fr)
6 changes: 6 additions & 0 deletions @vates/generator-toolbox/eslint.config.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
// @ts-check

import eslint from '@eslint/js'
import tseslint from 'typescript-eslint'

export default tseslint.config(eslint.configs.recommended, tseslint.configs.recommended)
33 changes: 33 additions & 0 deletions @vates/generator-toolbox/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
{
"name": "@vates/generator-toolbox",
"version": "0.0.0",
"main": "dist/index.mjs",
"license": "MIT",
"private": false,
"type": "module",
"devDependencies": {
"@eslint/js": "^9.19.0",
"@types/node": "^20.6",
"typescript": "~5.6",
"typescript-eslint": "^8.23.0"
},
"scripts": {
"build": "tsc",
"dev": "tsc --watch",
"test": "tsc && node --test **/*.test.mjs"
},
"homepage": "https://github.com/vatesfr/xen-orchestra/tree/master/@vates/generator-toolbox",
"bugs": "https://github.com/vatesfr/xen-orchestra/issues",
"repository": {
"directory": "@vates/generator-toolbox",
"type": "git",
"url": "https://github.com/vatesfr/xen-orchestra.git"
},
"author": {
"name": "Vates SAS",
"url": "https://vates.fr"
},
"engines": {
"node": ">=20.18"
}
}
3 changes: 3 additions & 0 deletions @vates/generator-toolbox/src/index.mts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
export { Synchronized } from './synchronized.mjs'
export { Throttle } from './throttle.mjs'
export { Timeout } from './timeout.mjs'
148 changes: 148 additions & 0 deletions @vates/generator-toolbox/src/synchronized.mts
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
import assert from 'node:assert'

export class Synchronized<T, TReturn, TNext> {
#source: AsyncGenerator<T, TReturn, TNext>
#forks = new Map<string, Forked<T, TReturn, TNext>>()
#removedForks = new Set<string>()
#waitingForks = new Set<string>()
#started = false

#nextValueForksReady?: {
promise: Promise<IteratorResult<T>>
forksWaitingReject: (error: Error) => void
forksWaitingResolve: () => void
}

constructor(source: AsyncGenerator<T, TReturn, TNext>) {
this.#source = source
}

fork(uid: string): AsyncGenerator {
assert.strictEqual(this.#started, false, `can't create a fork after consuming the data`)
const fork = new Forked<T, TReturn, TNext>(this, uid)
this.#forks.set(uid, fork)
return fork
}

async #resolveWhenAllForksReady(): Promise<IteratorResult<T>> {
if (!this.#nextValueForksReady) {
throw new Error('Can t wait forks if there are noone waiting')
}
const { promise, forksWaitingResolve } = this.#nextValueForksReady
if (this.#waitingForks.size === this.#forks.size) {
// reset value
this.#waitingForks.clear()
this.#nextValueForksReady = undefined
forksWaitingResolve() // for the other forks waiting
}
return promise
}

async next(uid: string): Promise<IteratorResult<T>> {
if (this.#removedForks.has(uid)) {
return { done: true, value: undefined }
}
if (!this.#forks.has(uid)) {
throw new Error(`trying to advance fork ${uid} that is not a fork of this one`)
}

if (this.#waitingForks.has(uid)) {
throw new Error(`Fork ${uid} is already waiting`)
}

this.#started = true
if (this.#nextValueForksReady === undefined) {
let forksWaitingResolve = () => {}
let forksWaitingReject: (reason?: Error) => void = () => {}
const next = this.#source.next().catch(async error => {
const e = new Error(`Error in the source generator ${error.message}`, { cause: error })
forksWaitingReject(e)
// source has failed, kill everything, and stop the forks
for (const uid of [...this.#forks.keys()]) {
await this.remove(uid, error)
}
})
const promise = Promise.all([
next,
new Promise((_resolve, _reject) => {
forksWaitingResolve = () => _resolve(undefined)
forksWaitingReject = _reject
}),
]).then(([_]) => _ as IteratorResult<T>)

this.#nextValueForksReady = { promise, forksWaitingResolve, forksWaitingReject }
}
this.#waitingForks.add(uid)
return this.#resolveWhenAllForksReady()
}

async remove(uid: string, error?: Error): Promise<IteratorResult<T>> {
const fork = this.#forks.get(uid)
if (fork === undefined) {
if (this.#removedForks.has(uid)) {
// already removed
return { done: true, value: undefined }
}
throw new Error(`trying to remove fork wih uid ${uid} that is not a fork of this one`)
}
this.#forks.delete(uid)
this.#waitingForks.delete(uid)
this.#removedForks.add(uid)
try {
if (error === undefined) {
await fork.return()
} else {
await fork.throw(error)
}
} catch (cleaningError) {
console.error('Error while cleaning the forked', {
cleaningError,
sourceError: error,
})
}

if (this.#forks.size === 0) {
if (error === undefined) {
await this.#source.return(undefined as TReturn)
} else {
await this.#source.throw(error)
}
// Reject any pending forks waiting for the next value
if (this.#nextValueForksReady) {
this.#nextValueForksReady.forksWaitingReject(new Error('Source generator terminated.', { cause: error }))
this.#nextValueForksReady = undefined
}
// clear state
this.#removedForks.clear()
this.#waitingForks.clear()
} else {
// this fork was maybe blocking the others
if (this.#nextValueForksReady) {
await this.#resolveWhenAllForksReady()
}
}
return { done: true, value: undefined }
}
}

class Forked<T, TReturn, TNext> implements AsyncGenerator<T, TReturn, TNext> {
#parent: Synchronized<T, TReturn, TNext>
#uid: string
constructor(parent: Synchronized<T, TReturn, TNext>, uid: string) {
this.#parent = parent
this.#uid = uid
}
next(): Promise<IteratorResult<T>> {
return this.#parent.next(this.#uid)
}
async return(): Promise<IteratorResult<T>> {
return this.#parent.remove(this.#uid)
}
async throw(e: Error): Promise<IteratorResult<T>> {
return this.#parent.remove(this.#uid, e)
}

[Symbol.asyncIterator](): AsyncGenerator<T> {
return this
}
}
Loading

0 comments on commit 9723075

Please sign in to comment.