Skip to content

Commit

Permalink
feat: adding polling for new heads. (#2160)
Browse files Browse the repository at this point in the history
* feat: adding polling for new heads.

Signed-off-by: ebadiere <ebadiere@gmail.com>

* fix: Test fix.  Isolated the WS_SUBSCRIPTION_LIMIT setting for the test.

Signed-off-by: ebadiere <ebadiere@gmail.com>

* fix: Fixed lerna version and set WS_NEW_HEADS_ENABLED for localAccpentance to false.

Signed-off-by: ebadiere <ebadiere@gmail.com>

* feat: Added test for number of subscriptions.

Signed-off-by: ebadiere <ebadiere@gmail.com>

* fix: Test bug fix.

Signed-off-by: ebadiere <ebadiere@gmail.com>

---------

Signed-off-by: ebadiere <ebadiere@gmail.com>
  • Loading branch information
ebadiere authored Mar 5, 2024
1 parent 2d8f24a commit 4469f5e
Show file tree
Hide file tree
Showing 7 changed files with 287 additions and 86 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
"acceptancetest:htsprecompilev1": "ts-mocha packages/server/tests/acceptance/index.spec.ts -g '@htsprecompilev1' --exit",
"acceptancetest:release": "ts-mocha packages/server/tests/acceptance/index.spec.ts -g '@release' --exit",
"acceptancetest:ws": "ts-mocha packages/server/tests/acceptance/index.spec.ts -g '@web-socket' --exit",
"acceptancetest:ws_newheads": "ts-mocha packages/server/tests/acceptance/index.spec.ts -g '@web-socket-newheads' --exit",
"acceptancetest:precompile-calls": "ts-mocha packages/server/tests/acceptance/index.spec.ts -g '@precompile-calls' --exit",
"acceptancetest:cache-service": "ts-mocha packages/server/tests/acceptance/index.spec.ts -g '@cache-service' --exit",
"acceptancetest:rpc_api_schema_conformity": "ts-mocha packages/server/tests/acceptance/index.spec.ts -g '@api-conformity' --exit",
Expand Down
6 changes: 5 additions & 1 deletion packages/relay/src/lib/poller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
*
* Hedera JSON RPC Relay
*
* Copyright (C) 2023 Hedera Hashgraph, LLC
* Copyright (C) 2023-2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -71,6 +71,10 @@ export class Poller {
filters?.topics || null,
);

poll.lastPolled = this.latestBlock;
} else if (event === 'newHeads' && process.env.WS_NEW_HEADS_ENABLED === 'true') {
data = await this.eth.getBlockByNumber('latest', true);
data.jsonrpc = '2.0';
poll.lastPolled = this.latestBlock;
} else {
this.logger.error(`${LOGGER_PREFIX} Polling for unsupported event: ${event}. Tag: ${poll.tag}`);
Expand Down
21 changes: 0 additions & 21 deletions packages/server/tests/acceptance/ws/subscribe.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -356,27 +356,6 @@ describe('@web-socket Acceptance Tests', async function () {
await new Promise((resolve) => setTimeout(resolve, 500));
});

it('Expect Unsupported Method Error message when subscribing for newHeads method', async function () {
const webSocket = new WebSocket(WS_RELAY_URL);
let response = {};
webSocket.on('message', function incoming(data) {
response = JSON.parse(data);
});
webSocket.on('open', function open() {
webSocket.send('{"jsonrpc":"2.0","method":"eth_subscribe","params":["newHeads"],"id":1}');
});

// wait 500ms to expect the message
await new Promise((resolve) => setTimeout(resolve, 500));

expect(response.error.code).to.eq(predefined.UNSUPPORTED_METHOD.code);
expect(response.error.name).to.eq(predefined.UNSUPPORTED_METHOD.name);
expect(response.error.message).to.eq(predefined.UNSUPPORTED_METHOD.message);

// close the connection
webSocket.close();
});

it('Expect Unsupported Method Error message when subscribing for newPendingTransactions method', async function () {
const webSocket = new WebSocket(WS_RELAY_URL);
let response = {};
Expand Down
153 changes: 153 additions & 0 deletions packages/server/tests/acceptance/ws/subscribeNewHeads.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*-
*
* Hedera JSON RPC Relay
*
* Copyright (C) 2024 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

// external resources
import { solidity } from 'ethereum-waffle';
import chai, { expect } from 'chai';
import WebSocket from 'ws';
chai.use(solidity);

import { ethers } from 'ethers';
import Assertions from '../../helpers/assertions';
import { predefined } from '@hashgraph/json-rpc-relay';
const WS_RELAY_URL = `${process.env.WS_RELAY_URL}`;

const createSubscription = (ws, subscriptionId) => {
return new Promise((resolve, reject) => {
ws.on('message', function incoming(data) {
const response = JSON.parse(data);
if (response.id === subscriptionId && response.result) {
console.log(`Subscription ${subscriptionId} successful with ID: ${response.result}`);
resolve(response.result); // Resolve with the subscription ID
} else if (response.method === 'eth_subscription') {
console.log(`Subscription ${subscriptionId} received block:`, response.params.result);
// You can add more logic here to handle incoming blocks
}
});

ws.on('open', function open() {
ws.send(
JSON.stringify({
id: subscriptionId,
jsonrpc: '2.0',
method: 'eth_subscribe',
params: ['newHeads'],
}),
);
});

ws.on('error', (error) => {
reject(error);
});
});
};

describe('@web-socket Acceptance Tests', async function () {
this.timeout(240 * 1000); // 240 seconds

let server;

let wsProvider;
let originalWsNewHeadsEnabledValue, originalWsSubcriptionLimitValue;

before(async () => {
const { socketServer } = global;
server = socketServer;

// cache original ENV values
originalWsNewHeadsEnabledValue = process.env.WS_NEW_HEADS_ENABLED;
originalWsSubcriptionLimitValue = process.env.WS_SUBSCRIPTION_LIMIT;
});

beforeEach(async () => {
process.env.WS_NEW_HEADS_ENABLED = originalWsNewHeadsEnabledValue;

process.env.WS_SUBSCRIPTION_LIMIT = '10';

wsProvider = await new ethers.WebSocketProvider(WS_RELAY_URL);
await new Promise((resolve) => setTimeout(resolve, 1000));
if (server) expect(server._connections).to.equal(1);
});

afterEach(async () => {
if (wsProvider) {
await wsProvider.destroy();
await new Promise((resolve) => setTimeout(resolve, 1000));
}
if (server) expect(server._connections).to.equal(0);
process.env.WS_SUBSCRIPTION_LIMIT = originalWsSubcriptionLimitValue;
});

describe('Configuration', async function () {
it('Should return unsupported method when WS_NEW_HEADS_ENABLED is set to false', async function () {
const webSocket = new WebSocket(WS_RELAY_URL);
process.env.WS_NEW_HEADS_ENABLED = 'false';
let response = '';
const messagePromise = new Promise((resolve, reject) => {
webSocket.on('message', function incoming(data) {
try {
const response = JSON.parse(data);
expect(response).to.have.property('error');
expect(response.error).to.have.property('code');
expect(response.error.code).to.equal(-32601);
expect(response.error).to.have.property('message');
expect(response.error.message).to.equal('Unsupported JSON-RPC method');
expect(response.error).to.have.property('name');
expect(response.error.name).to.equal('Method not found');
resolve();
} catch (error) {
reject(error);
}
response = data;
});
webSocket.on('open', function open() {
// send the request for newHeads
webSocket.send('{"id":1,"jsonrpc":"2.0","method":"eth_subscribe","params":["newHeads"]}');
});
webSocket.on('error', (error) => {
reject(error); // Reject the promise on WebSocket error
});
});
await messagePromise;

webSocket.close();
process.env.WS_NEW_HEADS_ENABLED = originalWsNewHeadsEnabledValue;
});

it('Does not allow more subscriptions per connection than the specified limit with newHeads', async function () {
process.env.WS_SUBSCRIPTION_LIMIT = '2';
process.env.WS_NEW_HEADS_ENABLED = 'true';
// Create different subscriptions
for (let i = 0; i < 3; i++) {
if (i === 2) {
const expectedError = predefined.MAX_SUBSCRIPTIONS;
await Assertions.assertPredefinedRpcError(expectedError, wsProvider.send, true, wsProvider, [
'eth_subscribe',
['newHeads'],
]);
} else {
await wsProvider.send('eth_subscribe', ['newHeads']);
}
}

await new Promise((resolve) => setTimeout(resolve, 500));
});
});
});
1 change: 1 addition & 0 deletions packages/server/tests/localAcceptance.env
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,4 @@ DEBUG_API_ENABLED=true
SEND_RAW_TRANSACTION_SIZE_LIMIT=131072
BATCH_REQUESTS_ENABLED=true
TEST_GAS_PRICE_DEVIATION=0.2
WS_NEW_HEADS_ENABLED=false
2 changes: 2 additions & 0 deletions packages/server/tests/previewnetAcceptance.env
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,5 @@ SUBSCRIPTIONS_ENABLED=true
FILTER_API_ENABLED=true
DEBUG_API_ENABLED=true
TEST_GAS_PRICE_DEVIATION=0.2
WS_NEW_HEADS_ENABLED=true

Loading

0 comments on commit 4469f5e

Please sign in to comment.