-
Notifications
You must be signed in to change notification settings - Fork 947
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Restore widgets in batches to not exceed the zmq high water mark message limit in the kernel #2765
Draft
jasongrout
wants to merge
10
commits into
jupyter-widgets:main
Choose a base branch
from
jasongrout:limitrequests
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from 4 commits
Commits
Show all changes
10 commits
Select commit
Hold shift + click to select a range
a507eae
Restore widgets in batches to not exceed the zmq high water mark mess…
jasongrout e744957
Add explanatory comment to classic widget manager
jasongrout 24cf363
Introduce concurrency and tweak interface.
jasongrout 978fbce
Clean up documentation
jasongrout 08493db
bump chunk size
jasongrout f4abd92
Stop chunk map on error
jasongrout 32c915c
Let’s just use p-map.
jasongrout 6aadcf7
Add throttling to respect the server default iopub rate limit of 1 ms…
jasongrout cc23b08
Fix import error
jasongrout 2b3bd09
Fix integrity issue
jasongrout File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change | ||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -298,3 +298,70 @@ export function bufferToBase64(buffer: ArrayBuffer): string { | |||||||||||||
export function base64ToBuffer(base64: string): ArrayBuffer { | ||||||||||||||
return toByteArray(base64).buffer; | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
/** | ||||||||||||||
* A map that chunks the list into chunkSize pieces and evaluates chunks | ||||||||||||||
* concurrently. | ||||||||||||||
* | ||||||||||||||
* @param list - The list to map over | ||||||||||||||
* @param chunkOptions - The options for chunking and evaluating. | ||||||||||||||
* @param fn - The function to map, with the same arguments as an array map | ||||||||||||||
* @param thisArg - An optional thisArg for the function | ||||||||||||||
* @returns - the equivalent of `Promise.all(list.map(fn, thisArg))` | ||||||||||||||
*/ | ||||||||||||||
export async function chunkMap<T, U>( | ||||||||||||||
list: T[], | ||||||||||||||
chunkOptions: chunkMap.IOptions, | ||||||||||||||
fn: (value: T, index: number, array: T[]) => Promise<U> | U, | ||||||||||||||
thisArg?: any | ||||||||||||||
): Promise<U[]> { | ||||||||||||||
// Default to equivalent to Promise.all(list.map(fn, thisarg)) | ||||||||||||||
const chunkSize = chunkOptions.chunkSize ?? list.length; | ||||||||||||||
const concurrency = chunkOptions.concurrency ?? 1; | ||||||||||||||
|
||||||||||||||
const results = new Array(list.length); | ||||||||||||||
const chunks: (() => Promise<void>)[] = []; | ||||||||||||||
|
||||||||||||||
// Process a single chunk and resolve to the next chunk if available | ||||||||||||||
async function processChunk(chunk: any[], start: number): Promise<void> { | ||||||||||||||
const chunkResult = await Promise.all( | ||||||||||||||
chunk.map((v, i) => fn.call(thisArg, v, start + i, list)) | ||||||||||||||
); | ||||||||||||||
|
||||||||||||||
// Splice the chunk results into the results array. We use | ||||||||||||||
// chunkResult.length because the last chunk may not be full size. | ||||||||||||||
results.splice(start, chunkResult.length, ...chunkResult); | ||||||||||||||
|
||||||||||||||
// Start the next work item by processing it | ||||||||||||||
if (chunks.length > 0) { | ||||||||||||||
return chunks.shift()!(); | ||||||||||||||
} | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
// Make closures for each batch of work. | ||||||||||||||
for (let i = 0; i < list.length; i += chunkSize) { | ||||||||||||||
chunks.push(() => processChunk(list.slice(i, i + chunkSize), i)); | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
// Start the first concurrent chunks. Each chunk will automatically start | ||||||||||||||
// the next available chunk when it finishes. | ||||||||||||||
await Promise.all(chunks.splice(0, concurrency).map(f => f())); | ||||||||||||||
return results; | ||||||||||||||
} | ||||||||||||||
|
||||||||||||||
export namespace chunkMap { | ||||||||||||||
/** | ||||||||||||||
* The options for chunking and evaluating. | ||||||||||||||
*/ | ||||||||||||||
export interface IOptions { | ||||||||||||||
/** | ||||||||||||||
* The maximum size of a chunk. Defaults to the list size. | ||||||||||||||
*/ | ||||||||||||||
chunkSize?: number; | ||||||||||||||
|
||||||||||||||
/** | ||||||||||||||
* The maximum number of chunks to evaluate simultaneously. Defaults to 1. | ||||||||||||||
*/ | ||||||||||||||
concurrency?: number; | ||||||||||||||
} | ||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||||||||||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -19,7 +19,8 @@ import { | |||||
import { | ||||||
ManagerBase, | ||||||
serialize_state, | ||||||
IStateOptions | ||||||
IStateOptions, | ||||||
chunkMap | ||||||
} from '@jupyter-widgets/base-manager'; | ||||||
|
||||||
import { IDisposable } from '@lumino/disposable'; | ||||||
|
@@ -144,9 +145,15 @@ export abstract class LabWidgetManager extends ManagerBase<Widget> | |||||
} | ||||||
const comm_ids = await this._get_comm_info(); | ||||||
|
||||||
// For each comm id that we do not know about, create the comm, and request the state. | ||||||
const widgets_info = await Promise.all( | ||||||
Object.keys(comm_ids).map(async comm_id => { | ||||||
// For each comm id that we do not know about, create the comm, and | ||||||
// request the state. We must do this processing in chunksto make sure we | ||||||
// do not exceed the ZMQ high water mark limiting messages from the | ||||||
// kernel. See https://github.com/voila-dashboards/voila/issues/534 for | ||||||
// more details. | ||||||
const widgets_info = await chunkMap( | ||||||
Object.keys(comm_ids), | ||||||
{ chunkSize: 10, concurrency: 10 }, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
async (comm_id: string) => { | ||||||
try { | ||||||
await this.get_model(comm_id); | ||||||
// If we successfully get the model, do no more. | ||||||
|
@@ -192,7 +199,7 @@ export abstract class LabWidgetManager extends ManagerBase<Widget> | |||||
|
||||||
return info.promise; | ||||||
} | ||||||
}) | ||||||
} | ||||||
); | ||||||
|
||||||
// We put in a synchronization barrier here so that we don't have to | ||||||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -3,7 +3,7 @@ | |||||
'use strict'; | ||||||
|
||||||
var base = require('@jupyter-widgets/base'); | ||||||
var ManagerBase = require('@jupyter-widgets/base-manager').ManagerBase; | ||||||
var baseManager = require('@jupyter-widgets/base-manager'); | ||||||
var widgets = require('@jupyter-widgets/controls'); | ||||||
var outputWidgets = require('./widget_output'); | ||||||
var saveState = require('./save_state'); | ||||||
|
@@ -74,7 +74,7 @@ function new_comm( | |||||
// WidgetManager class | ||||||
//-------------------------------------------------------------------- | ||||||
|
||||||
export class WidgetManager extends ManagerBase { | ||||||
export class WidgetManager extends baseManager.ManagerBase { | ||||||
constructor(comm_manager, notebook) { | ||||||
super(); | ||||||
// Managers are stored in *reverse* order, so that _managers[0] is the most recent. | ||||||
|
@@ -111,8 +111,13 @@ export class WidgetManager extends ManagerBase { | |||||
// for the responses (2). | ||||||
return Promise.all(comm_promises) | ||||||
.then(function(comms) { | ||||||
return Promise.all( | ||||||
comms.map(function(comm) { | ||||||
// We must do this in chunks to make sure we do not | ||||||
// exceed the ZMQ high water mark limiting messages from the kernel. See | ||||||
// https://github.com/voila-dashboards/voila/issues/534 for more details. | ||||||
return baseManager.chunkMap( | ||||||
comms, | ||||||
{ chunkSize: 10, concurrency: 10 }, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
function(comm) { | ||||||
var update_promise = new Promise(function(resolve, reject) { | ||||||
comm.on_msg(function(msg) { | ||||||
base.put_buffers( | ||||||
|
@@ -137,7 +142,7 @@ export class WidgetManager extends ManagerBase { | |||||
that.callbacks() | ||||||
); | ||||||
return update_promise; | ||||||
}) | ||||||
} | ||||||
); | ||||||
}) | ||||||
.then(function(widgets_info) { | ||||||
|
@@ -411,7 +416,10 @@ export class WidgetManager extends ManagerBase { | |||||
* Callback handlers for a specific view | ||||||
*/ | ||||||
callbacks(view) { | ||||||
var callbacks = ManagerBase.prototype.callbacks.call(this, view); | ||||||
var callbacks = baseManager.ManagerBase.prototype.callbacks.call( | ||||||
this, | ||||||
view | ||||||
); | ||||||
if (view && view.options.iopub_callbacks) { | ||||||
callbacks.iopub = view.options.iopub_callbacks; | ||||||
} | ||||||
|
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very intersting pattern!
This is the place where we can protect against the io rate limit I think (not tested).