Skip to content

Commit

Permalink
Restore widgets in batches to not exceed the zmq high water mark mess…
Browse files Browse the repository at this point in the history
…age limit in the kernel.

Current ZMQ by default limits the kernel’s iopub message send queue to at most 1000 messages (and the real limit can be much lower, see ZMQ_SNDHWM at http://api.zeromq.org/4-3:zmq-setsockopt). We now request comm state in batches to avoid this limit.


See voila-dashboards/voila#534 for more details, including where this was causing real problems.
  • Loading branch information
jasongrout committed Jan 30, 2020
1 parent b3708e1 commit 68e5aa0
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 6 deletions.
13 changes: 13 additions & 0 deletions packages/base-manager/src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -298,3 +298,16 @@ export function bufferToBase64(buffer: ArrayBuffer): string {
export function base64ToBuffer(base64: string): ArrayBuffer {
return toByteArray(base64).buffer;
}


/**
* Map a function onto a list in batches, resolving each batch of returned
* promises before moving to the next batch.
*/
export async function mapBatch<T, U>(list: T[], step: number, fn: (value: T, index: number, array: T[]) => Promise<U> | U, thisArg?: any): Promise<U[]> {
const results = [];
for(let i = 0; i < list.length; i+=step) {
results.push(...await Promise.all(list.slice(i, i+step).map(fn, thisArg)))
}
return results;
}
14 changes: 8 additions & 6 deletions packages/jupyterlab-manager/src/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ import {
import {
ManagerBase,
serialize_state,
IStateOptions
IStateOptions,
mapBatch
} from '@jupyter-widgets/base-manager';

import { IDisposable } from '@lumino/disposable';
Expand Down Expand Up @@ -144,9 +145,11 @@ export abstract class LabWidgetManager extends ManagerBase<Widget>
}
const comm_ids = await this._get_comm_info();

// For each comm id that we do not know about, create the comm, and request the state.
const widgets_info = await Promise.all(
Object.keys(comm_ids).map(async comm_id => {
// For each comm id that we do not know about, create the comm, and
// request the state. We must do this in batches to make sure we do not
// exceed the ZMQ high water mark limiting messages from the kernel. See
// https://github.com/voila-dashboards/voila/issues/534 for more details.
const widgets_info = await mapBatch(Object.keys(comm_ids), 100, async comm_id => {
try {
await this.get_model(comm_id);
// If we successfully get the model, do no more.
Expand Down Expand Up @@ -192,8 +195,7 @@ export abstract class LabWidgetManager extends ManagerBase<Widget>

return info.promise;
}
})
);
});

// We put in a synchronization barrier here so that we don't have to
// topologically sort the restored widgets. `new_model` synchronously
Expand Down

0 comments on commit 68e5aa0

Please sign in to comment.