Skip to content

Commit

Permalink
adding support for splitting sp batches, simplified batching logic fo…
Browse files Browse the repository at this point in the history
…r send promise in sp and graph
  • Loading branch information
patrick-rodgers committed Feb 12, 2025
1 parent 614bae1 commit 0efc612
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 103 deletions.
16 changes: 6 additions & 10 deletions packages/graph/batching.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type ParsedGraphResponse = { nextLink?: string; responses: Response[] };
* [3]: The resolve function back to the promise for the original operation
* [4]: The reject function back to the promise for the original operation
*/
type RequestRecord = [Queryable, string, RequestInit, (value: Response | PromiseLike<Response>) => void, (reason?: any) => void];
type RequestRecord = [Queryable, string, RequestInit, (value: Response | PromiseLike<Response>) => void];

const RegistrationCompleteSym = Symbol.for("batch_registration");
const RequestCompleteSym = Symbol.for("batch_request");
Expand All @@ -95,7 +95,7 @@ function BatchParse(): TimelinePipe {

class BatchQueryable extends _GraphQueryable {

constructor(base: IGraphQueryable, public requestBaseUrl = base.toUrl().replace(/[\\|/]v1\.0|beta[\\|/].*$/i || "", "")) {
constructor(base: IGraphQueryable, public requestBaseUrl = base.toUrl().replace(/[\\|/]v1\.0|beta[\\|/].*$/i, "")) {

super(requestBaseUrl, "$batch");

Expand Down Expand Up @@ -183,12 +183,8 @@ export function createBatch(base: IGraphQueryable, props?: IGraphBatchProps): [T
const response: ParsedGraphResponse = await graphPost(batchQuery, body(batchRequest));

for (let index = 0; index < response.responses.length; index++) {
const [, , , resolve, reject] = requests[index + chunkIndex];
try {
resolve(response.responses[index]);
} catch (e) {
reject(e);
}
// this resolves the child request's send promise with the parsed response
requests[index + chunkIndex][3](response.responses[index]);
}
chunkIndex += requestsChunk.length;
}
Expand Down Expand Up @@ -229,8 +225,8 @@ export function createBatch(base: IGraphQueryable, props?: IGraphBatchProps): [T
// we replace the send function with our batching logic
instance.on.send.replace(async function (this: Queryable, url: URL, init: RequestInit) {

const promise = new Promise<Response>((resolve, reject) => {
requests.push([this, url.toString(), init, resolve, reject]);
const promise = new Promise<Response>((resolve) => {
requests.push([this, url.toString(), init, resolve]);
});

this.log(`[batch:${batchId}] (${(new Date()).getTime()}) Adding request ${init.method} ${url.toString()} to batch.`, 0);
Expand Down
184 changes: 92 additions & 92 deletions packages/sp/batching.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ interface ISPBatchProps {
* default: /Accept|Content-Type|IF-Match/i
*/
headersCopyPattern?: RegExp;

/**
* Number of requests to include in each batch, if more than this number are added your requests will
* be completed in multiple batches. This may affect ordered operations.
*/
maxRequests?: number;
}

/**
Expand All @@ -65,7 +71,7 @@ interface ISPBatchProps {
* [3]: The resolve function back to the promise for the original operation
* [4]: The reject function back to the promise for the original operation
*/
type RequestRecord = [Queryable, string, RequestInit, (value: Response | PromiseLike<Response>) => void, (reason?: any) => void];
type RequestRecord = [Queryable, string, RequestInit, (value: Response | PromiseLike<Response>) => void];

/**
* Tracks on a batched instance that registration is complete (the child request has gotten to the send moment and the request is included in the batch)
Expand Down Expand Up @@ -133,14 +139,17 @@ export function createBatch(base: ISPQueryable, props?: ISPBatchProps): [Timelin
const registrationPromises: Promise<void>[] = [];
const completePromises: Promise<void>[] = [];
const requests: RequestRecord[] = [];
const batchId = getGUID();
const batchQuery = new BatchQueryable(base);
// this id will be reused across multiple batches if the number of requests added to the batch
// exceeds the configured maxRequests value
const batchId = getGUID();
// this query is used to copy back the behaviors after the batch executes
// it should not manipulated or have behaviors added.
const refQuery = new BatchQueryable(base);

const { headersCopyPattern } = {
const { headersCopyPattern, maxRequests } = {
headersCopyPattern: /Accept|Content-Type|IF-Match/i,
maxRequests: 20,
...props,
};

Expand All @@ -155,125 +164,116 @@ export function createBatch(base: ISPQueryable, props?: ISPBatchProps): [Timelin
return Promise.all(completePromises).then(() => void (0));
}

const batchBody: string[] = [];
let currentChangeSetId = "";
// create a working copy of our requests
const requestsWorkingCopy = requests.slice();

for (let i = 0; i < requests.length; i++) {
while (requestsWorkingCopy.length > 0) {

const [, url, init] = requests[i];
const requestsChunk = requestsWorkingCopy.splice(0, maxRequests);
const batchBody: string[] = [];
let currentChangeSetId = "";

if (init.method === "GET") {
for (let i = 0; i < requestsChunk.length; i++) {

if (currentChangeSetId.length > 0) {
// end an existing change set
batchBody.push(`--changeset_${currentChangeSetId}--\n\n`);
currentChangeSetId = "";
}
const [, url, init] = requestsChunk[i];

batchBody.push(`--batch_${batchId}\n`);
if (init.method === "GET") {

} else {
if (currentChangeSetId.length > 0) {
// end an existing change set
batchBody.push(`--changeset_${currentChangeSetId}--\n\n`);
currentChangeSetId = "";
}

if (currentChangeSetId.length < 1) {
// start new change set
currentChangeSetId = getGUID();
batchBody.push(`--batch_${batchId}\n`);
batchBody.push(`Content-Type: multipart/mixed; boundary="changeset_${currentChangeSetId}"\n\n`);
}

batchBody.push(`--changeset_${currentChangeSetId}\n`);
}

// common batch part prefix
batchBody.push("Content-Type: application/http\n");
batchBody.push("Content-Transfer-Encoding: binary\n\n");
} else {

// these are the per-request headers
const headers = new Headers(init.headers);
if (currentChangeSetId.length < 1) {
// start new change set
currentChangeSetId = getGUID();
batchBody.push(`--batch_${batchId}\n`);
batchBody.push(`Content-Type: multipart/mixed; boundary="changeset_${currentChangeSetId}"\n\n`);
}

// this is the url of the individual request within the batch
const reqUrl = isUrlAbsolute(url) ? url : combine(batchQuery.requestBaseUrl, url);
batchBody.push(`--changeset_${currentChangeSetId}\n`);
}

if (init.method !== "GET") {
// common batch part prefix
batchBody.push("Content-Type: application/http\n");
batchBody.push("Content-Transfer-Encoding: binary\n\n");

let method = init.method;
// these are the per-request headers
const headers = new Headers(init.headers);

if (headers.has("X-HTTP-Method")) {
method = headers.get("X-HTTP-Method");
headers.delete("X-HTTP-Method");
}
// this is the url of the individual request within the batch
const reqUrl = isUrlAbsolute(url) ? url : combine(batchQuery.requestBaseUrl, url);

batchBody.push(`${method} ${reqUrl} HTTP/1.1\n`);
if (init.method !== "GET") {

} else {
batchBody.push(`${init.method} ${reqUrl} HTTP/1.1\n`);
}
let method = init.method;

// lastly we apply any default headers we need that may not exist
if (!headers.has("Accept")) {
headers.append("Accept", "application/json");
}
if (headers.has("X-HTTP-Method")) {
method = headers.get("X-HTTP-Method");
headers.delete("X-HTTP-Method");
}

if (!headers.has("Content-Type")) {
headers.append("Content-Type", "application/json;charset=utf-8");
}
batchBody.push(`${method} ${reqUrl} HTTP/1.1\n`);

// write headers into batch body
headers.forEach((value: string, name: string) => {
if (headersCopyPattern.test(name)) {
batchBody.push(`${name}: ${value}\n`);
} else {
batchBody.push(`${init.method} ${reqUrl} HTTP/1.1\n`);
}
});

batchBody.push("\n");

if (init.body) {
batchBody.push(`${init.body}\n\n`);
}
}
// lastly we apply any default headers we need that may not exist
if (!headers.has("Accept")) {
headers.append("Accept", "application/json");
}

if (currentChangeSetId.length > 0) {
// Close the changeset
batchBody.push(`--changeset_${currentChangeSetId}--\n\n`);
currentChangeSetId = "";
}
if (!headers.has("Content-Type")) {
headers.append("Content-Type", "application/json;charset=utf-8");
}

batchBody.push(`--batch_${batchId}--\n`);
// write headers into batch body
headers.forEach((value: string, name: string) => {
if (headersCopyPattern.test(name)) {
batchBody.push(`${name}: ${value}\n`);
}
});

const responses: Response[] = await spPost(batchQuery, {
body: batchBody.join(""),
headers: {
"Content-Type": `multipart/mixed; boundary=batch_${batchId}`,
},
});
batchBody.push("\n");

if (responses.length !== requests.length) {
throw Error("Could not properly parse responses to match requests in batch.");
}
if (init.body) {
batchBody.push(`${init.body}\n\n`);
}
}

return new Promise<void>((res, rej) => {
if (currentChangeSetId.length > 0) {
// Close the changeset
batchBody.push(`--changeset_${currentChangeSetId}--\n\n`);
currentChangeSetId = "";
}

try {
batchBody.push(`--batch_${batchId}--\n`);

for (let index = 0; index < responses.length; index++) {
const [, , , resolve, reject] = requests[index];
try {
resolve(responses[index]);
} catch (e) {
reject(e);
}
}

// this small delay allows the promises to resolve correctly in order by dropping this resolve behind
// the other work in the event loop. Feels hacky, but it works so 🤷
setTimeout(res, 0);
const responses: Response[] = await spPost(batchQuery, {
body: batchBody.join(""),
headers: {
"Content-Type": `multipart/mixed; boundary=batch_${batchId}`,
},
});

} catch (e) {
if (responses.length !== requestsChunk.length) {
throw Error("Could not properly parse responses to match requests in batch.");
}

setTimeout(() => rej(e), 0);
for (let index = 0; index < responses.length; index++) {
// resolve the child request's send promise with the parsed response
requestsChunk[index][3](responses[index]);
}

}).then(() => Promise.all(completePromises)).then(() => void (0));
} // end of while (requestsWorkingCopy.length > 0)

await Promise.all(completePromises).then(() => void (0));
};

const register = (instance: ISPQueryable) => {
Expand Down Expand Up @@ -316,9 +316,9 @@ export function createBatch(base: ISPQueryable, props?: ISPBatchProps): [Timelin
this.on.send.replace(async function (this: ISPQueryable, url: URL, init: RequestInit) {

// this is the promise that Queryable will see returned from .emit.send
const promise = new Promise<Response>((resolve, reject) => {
const promise = new Promise<Response>((resolve) => {
// add the request information into the batch
requests.push([this, url.toString(), init, resolve, reject]);
requests.push([this, url.toString(), init, resolve]);
});

this.log(`[batch:${batchId}] (${(new Date()).getTime()}) Adding request ${init.method} ${url.toString()} to batch.`, 0);
Expand Down
1 change: 0 additions & 1 deletion packages/sp/items/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,6 @@ export class _Item extends _SPInstance {
}));

return spPost(Item(this).using(ItemUpdatedParser()), postBody);

}

/**
Expand Down

0 comments on commit 0efc612

Please sign in to comment.