Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

error handling for read ETIMEDOUT error #2846

Open
wants to merge 13 commits into
base: v6/develop
Choose a base branch
from
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import BaseController from '../base-http-api-controller.js';
import { OPERATION_ID_STATUS } from '../../../constants/constants.js';
import { ERROR_TYPE, OPERATION_ID_STATUS } from '../../../constants/constants.js';

class LocalStoreController extends BaseController {
constructor(ctx) {
Expand Down Expand Up @@ -65,7 +65,18 @@ class LocalStoreController extends BaseController {
)}. Operation id: ${operationId}`,
);

await this.operationIdService.cacheOperationIdData(operationId, cachedAssertions);
try {
await this.operationIdService.cacheOperationIdData(operationId, cachedAssertions);
} catch (err) {
this.operationIdService.updateOperationIdStatus(
operationId,
null,
OPERATION_ID_STATUS.FAILED,
djordjekovac marked this conversation as resolved.
Show resolved Hide resolved
err.message,
ERROR_TYPE.LOCAL_STORE.LOCAL_STORE_ERROR,
);
this.logger.warn(`Error caching operationId: ${operationId} data, ${err}`);
}

const commandSequence = ['validateAssetCommand', 'localStoreCommand'];

Expand Down
2 changes: 1 addition & 1 deletion src/controllers/rpc/publish-rpc-controller.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ class PublishController extends BaseController {
command.data.keyword = message.data.keyword;
command.data.agreementId = dataSource.agreementId;
command.data.agreementData = dataSource.agreementData;

break;
default:
throw Error('unknown message type');
}

command.data = {
...command.data,
remotePeerId,
Expand Down
73 changes: 52 additions & 21 deletions src/modules/network/implementation/libp2p-service.js
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,6 @@ class Libp2pService {

handleMessage(protocol, handler) {
this.logger.info(`Enabling network protocol: ${protocol}`);

this.node.handle(protocol, async (handlerProps) => {
const { stream } = handlerProps;
const peerIdString = handlerProps.connection.remotePeer.toB58String();
Expand All @@ -205,7 +204,6 @@ class Libp2pService {
this.isRequestValid.bind(this),
peerIdString,
);

this.updateSessionStream(
message.header.operationId,
message.header.keywordUuid,
Expand All @@ -214,38 +212,65 @@ class Libp2pService {
);

if (!valid) {
await this.sendMessageResponse(
protocol,
peerIdString,
NETWORK_MESSAGE_TYPES.RESPONSES.NACK,
message.header.operationId,
message.header.keywordUuid,
{ errorMessage: 'Invalid request message' },
);
try {
await this.sendMessageResponse(
protocol,
peerIdString,
NETWORK_MESSAGE_TYPES.RESPONSES.NACK,
message.header.operationId,
message.header.keywordUuid,
{ errorMessage: 'Invalid request message' },
);
} catch (error) {
this.logger.warn(
djordjekovac marked this conversation as resolved.
Show resolved Hide resolved
`Request message is not valid with operationId: ${message.header.operationId}, peerId: ${peerIdString}, keyword: ${message.header.keywordUuid}, Error: ${error}`,
);
}

this.removeCachedSession(
message.header.operationId,
message.header.keywordUuid,
peerIdString,
);
} else if (busy) {
await this.sendMessageResponse(
protocol,
peerIdString,
NETWORK_MESSAGE_TYPES.RESPONSES.BUSY,
message.header.operationId,
message.header.keywordUuid,
{},
);
try {
await this.sendMessageResponse(
protocol,
peerIdString,
NETWORK_MESSAGE_TYPES.RESPONSES.BUSY,
message.header.operationId,
message.header.keywordUuid,
{},
);
} catch (error) {
this.logger.warn(
`Peer is busy, operationId: ${message.header.operationId}, peerId: ${peerIdString}, keyword: ${message.header.keywordUuid}, Error: ${error}`,
);
}
this.removeCachedSession(
message.header.operationId,
message.header.keywordUuid,
peerIdString,
);
} else {
this.logger.debug(
`Receiving message from ${peerIdString} to ${this.config.id}: protocol: ${protocol}, messageType: ${message.header.messageType};`,
`Receiving message from peerId: ${peerIdString} to ${this.config.id} protocol: ${protocol} with operationId: ${message.header.operationId} and keyword: ${message.header.keywordUuid}, messageType: ${message.header.messageType};`,
);
await handler(message, peerIdString);
try {
await handler(message, peerIdString);
} catch (error) {
await this.sendMessageResponse(
protocol,
peerIdString,
NETWORK_MESSAGE_TYPES.RESPONSES.NACK,
message.header.operationId,
message.header.keywordUuid,
{ errorMessage: 'Unable to handle request' },
);
this.logger.error(
djordjekovac marked this conversation as resolved.
Show resolved Hide resolved
`Error handling message from peerId: ${peerIdString} to ${this.config.id} protocol : ${protocol} with operationId: ${message.header.operationId} and keyword: ${message.header.keywordUuid}, Error: ${error} `,
);
}
}
});
}
Expand Down Expand Up @@ -633,7 +658,13 @@ class Libp2pService {

removeCachedSession(operationId, keywordUuid, peerIdString) {
if (this.sessions[peerIdString]?.[operationId]?.[keywordUuid]?.stream) {
this.sessions[peerIdString][operationId][keywordUuid].stream.close();
try {
this.sessions[peerIdString][operationId][keywordUuid].stream.close();
} catch (error) {
this.logger.error(
`Error closing session stream. OperationId: ${operationId}, peerId: ${peerIdString} Error: ${error.message}`,
);
}
delete this.sessions[peerIdString][operationId];
this.logger.trace(
`Removed session for remotePeerId: ${peerIdString}, operationId: ${operationId}.`,
Expand Down
3 changes: 1 addition & 2 deletions src/service/operation-id-service.js
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,10 @@ class OperationIdService {
}

async getCachedOperationIdData(operationId) {
if (this.memoryCachedHandlersData[operationId]) {
if (this.memoryCachedHandlersData[operationId]?.data) {
this.logger.debug(`Reading operation id: ${operationId} cached data from memory`);
return this.memoryCachedHandlersData[operationId].data;
}

this.logger.debug(`Reading operation id: ${operationId} cached data from file`);
const documentPath = this.fileService.getOperationIdDocumentPath(operationId);
let data;
Expand Down