Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Restore widgets in batches to not exceed the zmq high water mark message limit in the kernel #2765

Draft
wants to merge 10 commits into
base: main
Choose a base branch
from
67 changes: 67 additions & 0 deletions packages/base-manager/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -298,3 +298,70 @@ export function bufferToBase64(buffer: ArrayBuffer): string {
export function base64ToBuffer(base64: string): ArrayBuffer {
return toByteArray(base64).buffer;
}

/**
* A map that chunks the list into chunkSize pieces and evaluates chunks
* concurrently.
*
* @param list - The list to map over
* @param chunkOptions - The options for chunking and evaluating.
* @param fn - The function to map, with the same arguments as an array map
* @param thisArg - An optional thisArg for the function
* @returns - the equivalent of `Promise.all(list.map(fn, thisArg))`
*/
export async function chunkMap<T, U>(
list: T[],
chunkOptions: chunkMap.IOptions,
fn: (value: T, index: number, array: T[]) => Promise<U> | U,
thisArg?: any
): Promise<U[]> {
// Default to equivalent to Promise.all(list.map(fn, thisarg))
const chunkSize = chunkOptions.chunkSize ?? list.length;
const concurrency = chunkOptions.concurrency ?? 1;

const results = new Array(list.length);
const chunks: (() => Promise<void>)[] = [];

// Process a single chunk and resolve to the next chunk if available
async function processChunk(chunk: any[], start: number): Promise<void> {
const chunkResult = await Promise.all(
chunk.map((v, i) => fn.call(thisArg, v, start + i, list))
);

// Splice the chunk results into the results array. We use
// chunkResult.length because the last chunk may not be full size.
results.splice(start, chunkResult.length, ...chunkResult);

// Start the next work item by processing it
if (chunks.length > 0) {
return chunks.shift()!();
}
}

// Make closures for each batch of work.
for (let i = 0; i < list.length; i += chunkSize) {
chunks.push(() => processChunk(list.slice(i, i + chunkSize), i));
}

// Start the first concurrent chunks. Each chunk will automatically start
// the next available chunk when it finishes.
await Promise.all(chunks.splice(0, concurrency).map(f => f()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very intersting pattern!
This is the place where we can protect against the io rate limit I think (not tested).

Suggested change
await Promise.all(chunks.splice(0, concurrency).map(f => f()));
const concurrency = chunkOptions.rate ?? Infinity;
const delay = (sec) => new Promise((resolve) => setTimeout(resolve, sec*1000));
const rateLimit (f) = > Promise.all(f(), delay((concurrency * chunkSize) / rate)).then(v => v[0]);
await Promise.all(chunks.splice(0, concurrency).map(f => rateLimit(f())), de);

return results;
}

export namespace chunkMap {
/**
* The options for chunking and evaluating.
*/
export interface IOptions {
/**
* The maximum size of a chunk. Defaults to the list size.
*/
chunkSize?: number;

/**
* The maximum number of chunks to evaluate simultaneously. Defaults to 1.
*/
concurrency?: number;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
}
/**
* The maximum rate (calls/second) at which we may call fn. Defaults to Infinity.
*/
rate?: number;
}

}
17 changes: 12 additions & 5 deletions packages/jupyterlab-manager/src/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ import {
import {
ManagerBase,
serialize_state,
IStateOptions
IStateOptions,
chunkMap
} from '@jupyter-widgets/base-manager';

import { IDisposable } from '@lumino/disposable';
Expand Down Expand Up @@ -144,9 +145,15 @@ export abstract class LabWidgetManager extends ManagerBase<Widget>
}
const comm_ids = await this._get_comm_info();

// For each comm id that we do not know about, create the comm, and request the state.
const widgets_info = await Promise.all(
Object.keys(comm_ids).map(async comm_id => {
// For each comm id that we do not know about, create the comm, and
// request the state. We must do this processing in chunksto make sure we
// do not exceed the ZMQ high water mark limiting messages from the
// kernel. See https://github.com/voila-dashboards/voila/issues/534 for
// more details.
const widgets_info = await chunkMap(
Object.keys(comm_ids),
{ chunkSize: 10, concurrency: 10 },
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
{ chunkSize: 10, concurrency: 10 },
{ chunkSize: 10, concurrency: 10, rate: 500 },

async (comm_id: string) => {
try {
await this.get_model(comm_id);
// If we successfully get the model, do no more.
Expand Down Expand Up @@ -192,7 +199,7 @@ export abstract class LabWidgetManager extends ManagerBase<Widget>

return info.promise;
}
})
}
);

// We put in a synchronization barrier here so that we don't have to
Expand Down
20 changes: 14 additions & 6 deletions widgetsnbextension/src/manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
'use strict';

var base = require('@jupyter-widgets/base');
var ManagerBase = require('@jupyter-widgets/base-manager').ManagerBase;
var baseManager = require('@jupyter-widgets/base-manager');
var widgets = require('@jupyter-widgets/controls');
var outputWidgets = require('./widget_output');
var saveState = require('./save_state');
Expand Down Expand Up @@ -74,7 +74,7 @@ function new_comm(
// WidgetManager class
//--------------------------------------------------------------------

export class WidgetManager extends ManagerBase {
export class WidgetManager extends baseManager.ManagerBase {
constructor(comm_manager, notebook) {
super();
// Managers are stored in *reverse* order, so that _managers[0] is the most recent.
Expand Down Expand Up @@ -111,8 +111,13 @@ export class WidgetManager extends ManagerBase {
// for the responses (2).
return Promise.all(comm_promises)
.then(function(comms) {
return Promise.all(
comms.map(function(comm) {
// We must do this in chunks to make sure we do not
// exceed the ZMQ high water mark limiting messages from the kernel. See
// https://github.com/voila-dashboards/voila/issues/534 for more details.
return baseManager.chunkMap(
comms,
{ chunkSize: 10, concurrency: 10 },
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
{ chunkSize: 10, concurrency: 10 },
{ chunkSize: 10, concurrency: 10, rate: 500 },

function(comm) {
var update_promise = new Promise(function(resolve, reject) {
comm.on_msg(function(msg) {
base.put_buffers(
Expand All @@ -137,7 +142,7 @@ export class WidgetManager extends ManagerBase {
that.callbacks()
);
return update_promise;
})
}
);
})
.then(function(widgets_info) {
Expand Down Expand Up @@ -411,7 +416,10 @@ export class WidgetManager extends ManagerBase {
* Callback handlers for a specific view
*/
callbacks(view) {
var callbacks = ManagerBase.prototype.callbacks.call(this, view);
var callbacks = baseManager.ManagerBase.prototype.callbacks.call(
this,
view
);
if (view && view.options.iopub_callbacks) {
callbacks.iopub = view.options.iopub_callbacks;
}
Expand Down