Skip to content

Commit

Permalink
Adding initial fix for bug issue Azure#1967
Browse files Browse the repository at this point in the history
  • Loading branch information
blperf committed Oct 29, 2023
1 parent ad06a02 commit c3e296e
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 2 deletions.
6 changes: 4 additions & 2 deletions src/blob/generated/ExpressMiddlewareFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,15 @@ export default class ExpressMiddlewareFactory extends MiddlewareFactory {
handlers,
this.logger
);
return (req: Request, res: Response, next: NextFunction) => {
return (req: Request, res: Response, next: NextFunction) => {
const request = new ExpressRequestAdapter(req);
const response = new ExpressResponseAdapter(res);
let newContext = new Context(res.locals, this.contextPath, request, response);
handlerMiddlewareFactory.createHandlerMiddleware()(
new Context(res.locals, this.contextPath, request, response),
newContext,
next
);
res.on("close", () => handlers.blobHandler.cleanUpBlob(newContext));
};
}

Expand Down
1 change: 1 addition & 0 deletions src/blob/generated/handlers/IBlobHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import Context from "../Context";

export default interface IBlobHandler {
download(options: Models.BlobDownloadOptionalParams, context: Context): Promise<Models.BlobDownloadResponse>;
cleanUpBlob(context: Context): Promise<void>;
getProperties(options: Models.BlobGetPropertiesOptionalParams, context: Context): Promise<Models.BlobGetPropertiesResponse>;
delete(options: Models.BlobDeleteMethodOptionalParams, context: Context): Promise<Models.BlobDeleteResponse>;
undelete(options: Models.BlobUndeleteOptionalParams, context: Context): Promise<Models.BlobUndeleteResponse>;
Expand Down
9 changes: 9 additions & 0 deletions src/blob/handlers/BlobHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,15 @@ export default class BlobHandler extends BaseHandler implements IBlobHandler {
}
}

/**
* Clean up file handles for blob to prevent leak.
*
* @memberof BlobHandler
*/
public async cleanUpBlob(context: Context): Promise<void> {
this.extentStore.cleanStreams(context.contextId);
}

/**
* Get blob properties.
*
Expand Down
33 changes: 33 additions & 0 deletions src/common/persistence/FSExtentStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
fdatasync,
mkdir,
open,
ReadStream,
stat,
unlink
} from "fs";
Expand Down Expand Up @@ -304,6 +305,8 @@ export default class FSExtentStore implements IExtentStore {
return this.appendQueue.operate(op, contextId);
}

private streams: Map<string, ReadStream[]> = new Map<string, ReadStream[]>();

/**
* Read data from persistency layer according to the given IExtentChunk.
*
Expand Down Expand Up @@ -353,12 +356,42 @@ export default class FSExtentStore implements IExtentStore {
contextId
);
});

if (contextId != null) {
let existingStreams = this.streams.get(contextId);
if (existingStreams == null) {
let newStreamsArray: ReadStream[] = [stream];
this.streams.set(contextId, newStreamsArray);
}
else {
existingStreams.push(stream);
this.streams.set(contextId, existingStreams);
}
}

resolve(stream);
});

return this.readQueue.operate(op, contextId);
}

public async cleanStreams(contextId?: string): Promise<void> {
this.logger.verbose(
"FSExtentStore:cleanStreams() Response object closed unexpectedly, cleaning up after streams",
contextId
);

if (contextId != null) {
let streamsToCleanup = this.streams.get(contextId);

if (streamsToCleanup != null) {
for (const stream of streamsToCleanup) {
stream.destroy();
}
}
}
}

/**
* Merge several extent chunks to a ReadableStream according to the offset and count.
*
Expand Down
5 changes: 5 additions & 0 deletions src/common/persistence/IExtentStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ export default interface IExtentStore extends IDataStore, ICleaner {
contextId?: string
): Promise<NodeJS.ReadableStream>;

/**
* Clean up file handles from an extent when a response object is closed
*/
cleanStreams(contextId?: string): Promise<void>;

/**
* Merge several extent chunks to a ReadableStream according to the offset and count.
*
Expand Down

0 comments on commit c3e296e

Please sign in to comment.