Skip to content

Commit

Permalink
Merge pull request #873 from paraswap/feat/curve-v1-pub-sub
Browse files Browse the repository at this point in the history
Feat/curve v1 pub sub
  • Loading branch information
KanievskyiDanylo authored Jan 6, 2025
2 parents dec7b3b + 26f957c commit 77a0667
Show file tree
Hide file tree
Showing 16 changed files with 366 additions and 56 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@paraswap/dex-lib",
"version": "4.0.13",
"version": "4.0.16",
"main": "build/index.js",
"types": "build/index.d.ts",
"repository": "https://github.com/paraswap/paraswap-dex-lib",
Expand Down
6 changes: 2 additions & 4 deletions src/abi/fluid-dex/fluid-dex.abi.json
Original file line number Diff line number Diff line change
Expand Up @@ -381,15 +381,13 @@
},
{
"anonymous": false,
"inputs": [
],
"inputs": [],
"name": "LogPauseSwapAndArbitrage",
"type": "event"
},
{
"anonymous": false,
"inputs": [
],
"inputs": [],
"name": "LogUnpauseSwapAndArbitrage",
"type": "event"
},
Expand Down
21 changes: 21 additions & 0 deletions src/dex-helper/dummy-dex-helper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,23 @@ class DummyCache implements ICache {
return null;
}

async keys(
dexKey: string,
network: number,
cacheKey: string,
): Promise<string[]> {
return [];
}

async ttl(
dexKey: string,
network: number,
cacheKey: string,
): Promise<number> {
const key = `${network}_${dexKey}_${cacheKey}`.toLowerCase();
return this.storage[key] ? 1 : -1;
}

async rawget(key: string): Promise<string | null> {
return this.storage[key] ? this.storage[key] : null;
return null;
Expand Down Expand Up @@ -139,6 +156,10 @@ class DummyCache implements ICache {
return set.has(key);
}

async smembers(setKey: string): Promise<string[]> {
return Array.from(this.setMap[setKey] ?? []);
}

async hset(mapKey: string, key: string, value: string): Promise<void> {
if (!this.hashStorage[mapKey]) this.hashStorage[mapKey] = {};
this.hashStorage[mapKey][key] = value;
Expand Down
6 changes: 6 additions & 0 deletions src/dex-helper/icache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@ export interface ICache {
cacheKey: string,
): Promise<string | null>;

ttl(dexKey: string, network: number, cacheKey: string): Promise<number>;

keys(dexKey: string, network: number, cacheKey: string): Promise<string[]>;

rawget(key: string): Promise<string | null>;

rawset(key: string, value: string, ttl: number): Promise<string | null>;
Expand Down Expand Up @@ -52,6 +56,8 @@ export interface ICache {

sismember(setKey: string, key: string): Promise<boolean>;

smembers(setKey: string): Promise<string[]>;

hset(mapKey: string, key: string, value: string): Promise<void>;

hdel(mapKey: string, keys: string[]): Promise<number>;
Expand Down
6 changes: 4 additions & 2 deletions src/dex/bebop/bebop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -875,10 +875,11 @@ export class Bebop extends SimpleExchange implements IDex<BebopData> {
}

async getCachedPrices(): Promise<BebopPricingResponse | null> {
const cachedPrices = await this.dexHelper.cache.get(
const cachedPrices = await this.dexHelper.cache.getAndCacheLocally(
this.dexKey,
this.network,
this.pricesCacheKey,
2,
);

if (cachedPrices) {
Expand All @@ -889,10 +890,11 @@ export class Bebop extends SimpleExchange implements IDex<BebopData> {
}

async getCachedTokens(): Promise<TokenDataMap | null> {
const cachedTokens = await this.dexHelper.cache.get(
const cachedTokens = await this.dexHelper.cache.getAndCacheLocally(
this.dexKey,
this.network,
this.tokensAddrCacheKey,
BEBOP_TOKENS_POLLING_INTERVAL_MS / 1000,
);

if (cachedTokens) {
Expand Down
12 changes: 9 additions & 3 deletions src/dex/cables/cables.ts
Original file line number Diff line number Diff line change
Expand Up @@ -642,30 +642,36 @@ export class Cables extends SimpleExchange implements IDex<any> {
* CACHED UTILS
*/
async getCachedTokens(): Promise<any> {
const cachedTokens = await this.dexHelper.cache.get(
const cachedTokens = await this.dexHelper.cache.getAndCacheLocally(
this.dexKey,
this.network,
this.rateFetcher.tokensCacheKey,
// as local cache just uses passed ttl (instead of getting actual ttl from cache)
// pass shorter interval to avoid getting stale data
// (same logic is used in other places)
CABLES_API_TOKENS_POLLING_INTERVAL_MS / 1000,
);

return cachedTokens ? JSON.parse(cachedTokens) : {};
}

async getCachedPairs(): Promise<any> {
const cachedPairs = await this.dexHelper.cache.get(
const cachedPairs = await this.dexHelper.cache.getAndCacheLocally(
this.dexKey,
this.network,
this.rateFetcher.pairsCacheKey,
CABLES_API_PAIRS_POLLING_INTERVAL_MS / 1000,
);

return cachedPairs ? JSON.parse(cachedPairs) : {};
}

async getCachedPrices(): Promise<any> {
const cachedPrices = await this.dexHelper.cache.get(
const cachedPrices = await this.dexHelper.cache.getAndCacheLocally(
this.dexKey,
this.network,
this.rateFetcher.pricesCacheKey,
CABLES_API_PRICES_POLLING_INTERVAL_MS / 1000,
);

return cachedPrices ? JSON.parse(cachedPrices) : {};
Expand Down
6 changes: 5 additions & 1 deletion src/dex/curve-v1-factory/curve-v1-factory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,12 @@ export class CurveV1Factory

this.poolManager = new CurveV1FactoryPoolManager(
this.dexKey,
// should be the same as we use for FactoryStateHandler (4th param) and others
this.cacheStateKey,
dexHelper.getLogger(`${this.dexKey}-state-manager`),
dexHelper,
allPriceHandlers,
// should be the same as we use for FactoryStateHandler (8th param) and others
this.config.stateUpdatePeriodMs,
);
}
Expand Down Expand Up @@ -707,7 +710,8 @@ export class CurveV1Factory
factoryImplementationFromConfig.name,
implementationAddress.toLowerCase(),
poolAddresses[i],
this.config,
this.config.stateUpdatePeriodMs,
this.config.factories,
factoryAddress,
poolIdentifier,
poolConstants,
Expand Down
18 changes: 17 additions & 1 deletion src/dex/curve-v1-factory/curve-v1-pool-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
CURVE_API_URL,
LIQUIDITY_FETCH_TIMEOUT_MS,
LIQUIDITY_UPDATE_PERIOD_MS,
MAX_ALLOWED_STATE_DELAY_FACTOR,
NETWORK_ID_TO_NAME,
STATE_UPDATE_PERIOD_MS,
STATE_UPDATE_RETRY_PERIOD_MS,
Expand Down Expand Up @@ -48,7 +49,7 @@ export class CurveV1FactoryPoolManager {
'/factory-stable-ng',
]);

private statePollingManager = StatePollingManager;
private statePollingManager: StatePollingManager;
private taskScheduler: TaskScheduler;

private liquidityUpdatedAtMs: number = 0;
Expand All @@ -58,12 +59,27 @@ export class CurveV1FactoryPoolManager {

constructor(
private name: string,
cacheStateKey: string,
private logger: Logger,
private dexHelper: IDexHelper,
private allPriceHandlers: Record<string, PriceHandler>,
stateUpdatePeriodMs: number = STATE_UPDATE_PERIOD_MS,
stateUpdateRetryPeriodMs: number = STATE_UPDATE_RETRY_PERIOD_MS,
maxAllowedStateDelayFactor: number = MAX_ALLOWED_STATE_DELAY_FACTOR,
) {
this.statePollingManager = new StatePollingManager(
dexHelper,
cacheStateKey,
// lower than on PoolPollingBase.isStateUpToDate to increase chances of getting fresh data if pub-sub is lagging
Math.round(
(stateUpdatePeriodMs / 1000) * (maxAllowedStateDelayFactor - 1),
),
);

if (this.dexHelper.config.isSlave) {
this.statePollingManager.subscribe();
}

this.taskScheduler = new TaskScheduler(
this.name,
this.logger,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import {
import { BytesLike } from 'ethers/lib/utils';
import { Address } from '@paraswap/core';
import { BigNumber } from 'ethers';
import { DexConfigMap } from '../../../types';

const DEFAULT_2_ZERO_ARRAY = [0n, 0n];
const DEFAULT_4_ZERO_ARRAY = [0n, 0n, 0n, 0n];
Expand All @@ -40,7 +39,8 @@ export class FactoryStateHandler extends PoolPollingBase {
readonly implementationName: FactoryImplementationNames,
implementationAddress: Address,
readonly address: Address,
readonly config: DexParams,
readonly stateUpdatePeriodMs: number,
readonly factories: DexParams['factories'],
readonly factoryAddress: Address,
readonly poolIdentifier: string,
readonly poolConstants: PoolConstants,
Expand All @@ -64,7 +64,7 @@ export class FactoryStateHandler extends PoolPollingBase {
cacheStateKey,
implementationName,
implementationAddress,
config.stateUpdatePeriodMs,
stateUpdatePeriodMs,
poolIdentifier,
poolConstants,
poolContextConstants,
Expand All @@ -84,7 +84,7 @@ export class FactoryStateHandler extends PoolPollingBase {
}

getStateMultiCalldata(): MultiCallParams<MulticallReturnedTypes>[] {
const factoryConfig = this.config.factories?.find(
const factoryConfig = this.factories?.find(
({ address }) =>
address.toLowerCase() === this.factoryAddress.toLowerCase(),
);
Expand Down
53 changes: 32 additions & 21 deletions src/dex/curve-v1-factory/state-polling-pools/polling-manager.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,27 @@
import { Logger } from 'log4js';
import { Network } from '../../../constants';
import { IDexHelper } from '../../../dex-helper';
import { Utils, _require } from '../../../utils';
import { PoolState } from '../types';
import { MulticallReturnedTypes, PoolPollingBase } from './pool-polling-base';
import { ExpHashPubSub } from '../../../lib/pub-sub';

/*
* Since we are updating all pools state at once, I need some generalized iterator without state,
* just to go for every pool, get multicall requests and apply them into new state
*/

export class StatePollingManager {
static async fetchAndSetStatesFromRPC(
private pubSub: ExpHashPubSub;

constructor(dexHelper: IDexHelper, cacheKey: string, ttl: number) {
this.pubSub = new ExpHashPubSub(dexHelper, cacheKey, ttl);
}

subscribe() {
this.pubSub.subscribe();
}

async fetchAndSetStatesFromRPC(
dexHelper: IDexHelper,
pools: PoolPollingBase[],
blockNumber?: number,
Expand Down Expand Up @@ -94,15 +104,15 @@ export class StatePollingManager {
return newStates;
}

static async masterUpdatePoolsInBatch(
async masterUpdatePoolsInBatch(
logger: Logger,
dexHelper: IDexHelper,
pools: PoolPollingBase[],
blockNumber?: number,
) {
const dexKey = pools.length > 0 ? pools[0].dexKey : 'CurveV1Factory';
try {
const newStates = await StatePollingManager.fetchAndSetStatesFromRPC(
const newStates = await this.fetchAndSetStatesFromRPC(
dexHelper,
pools,
blockNumber,
Expand All @@ -113,6 +123,17 @@ export class StatePollingManager {
{ poolLength: pools.length, newStatesLength: newStates.length },
'newStates.length === pools.length',
);

const dataToPublish: Record<string, PoolState | null> = newStates.reduce(
(acc, state, i) => {
acc[pools[i].poolIdentifier] = state;
return acc;
},
{} as Record<string, PoolState | null>,
);

this.pubSub.publish(dataToPublish);

await Promise.all(
pools.map(async (p, i) => {
if (newStates[i] !== null) {
Expand Down Expand Up @@ -143,7 +164,7 @@ export class StatePollingManager {
}
}

static async slaveUpdatePoolsInBatch(
async slaveUpdatePoolsInBatch(
logger: Logger,
dexHelper: IDexHelper,
pools: PoolPollingBase[],
Expand All @@ -155,15 +176,10 @@ export class StatePollingManager {
await Promise.all(
pools.map(async p => {
try {
const unparsedStateFromCache = await dexHelper.cache.hget(
p.cacheStateKey,
const parsedState = await this.pubSub.getAndCache<PoolState>(
p.poolIdentifier,
);
if (unparsedStateFromCache !== null) {
const parsedState = Utils.Parse(
unparsedStateFromCache,
) as PoolState;

if (parsedState !== null) {
if (p.isStateUpToDate(parsedState)) {
p.setState(parsedState);
return;
Expand Down Expand Up @@ -197,7 +213,7 @@ export class StatePollingManager {
}. Falling back to RPC`,
);

await StatePollingManager.fetchAndSetStatesFromRPC(
await this.fetchAndSetStatesFromRPC(
dexHelper,
poolsForRPCUpdate,
blockNumber,
Expand All @@ -217,7 +233,7 @@ export class StatePollingManager {
}
}

static async updatePoolsInBatch(
async updatePoolsInBatch(
logger: Logger,
dexHelper: IDexHelper,
pools: PoolPollingBase[],
Expand All @@ -233,14 +249,9 @@ export class StatePollingManager {
}

if (dexHelper.config.isSlave) {
await StatePollingManager.slaveUpdatePoolsInBatch(
logger,
dexHelper,
pools,
blockNumber,
);
await this.slaveUpdatePoolsInBatch(logger, dexHelper, pools, blockNumber);
} else {
await StatePollingManager.masterUpdatePoolsInBatch(
await this.masterUpdatePoolsInBatch(
logger,
dexHelper,
pools,
Expand Down
4 changes: 2 additions & 2 deletions src/dex/dexalot/constants.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import BigNumber from 'bignumber.js';

export const DEXALOT_PRICES_CACHES_TTL_S = 3;
export const DEXALOT_PRICES_CACHES_TTL_S = 5;

export const DEXALOT_PAIRS_CACHES_TTL_S = 21 * 60; // 21 mins

export const DEXALOT_TOKENS_CACHES_TTL_S = 21 * 60; // 21 mins

export const DEXALOT_BLACKLIST_CACHES_TTL_S = 180 * 60; // 3 hours

export const DEXALOT_API_PRICES_POLLING_INTERVAL_MS = 1000;
export const DEXALOT_API_PRICES_POLLING_INTERVAL_MS = 2000;

export const DEXALOT_API_PAIRS_POLLING_INTERVAL_MS = 1000 * 60 * 10; // 10 mins

Expand Down
Loading

0 comments on commit 77a0667

Please sign in to comment.