Skip to content

Commit

Permalink
feat: cuckooFilter for subscriptions
Browse files Browse the repository at this point in the history
  • Loading branch information
ZigBalthazar committed Dec 8, 2024
1 parent 0910446 commit c3f64d0
Show file tree
Hide file tree
Showing 11 changed files with 460 additions and 513 deletions.
3 changes: 3 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,6 @@ JWT_EXPIRATION_TIME=""
#== TRY SPEED
TRYSPEED_WEBHOOK_SECRET=
TRYSPEED_API_KEY=

#== REDIS
REDIS_URI=""
2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
"@grpc/grpc-js": "^1.12.2",
"@grpc/proto-loader": "^0.7.13",
"@grpc/reflection": "^1.0.4",
"@nestjs-modules/ioredis": "^2.0.2",
"@nestjs/common": "^10.3.0",
"@nestjs/config": "^3.1.1",
"@nestjs/core": "^10.3.0",
Expand All @@ -48,6 +49,7 @@
"compression": "^1.7.4",
"express": "^4.18.2",
"helmet": "^7.1.0",
"ioredis": "^5.4.1",
"jsonwebtoken": "^9.0.2",
"lodash": "^4.17.21",
"mime-types": "^2.1.35",
Expand Down
1 change: 0 additions & 1 deletion src/modules/grpc/gen/ts/kraken.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion src/modules/grpc/proto/kraken.proto
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ message registerServiceRequest {
message registerServiceResponse {
bool success = 1;
optional string message = 2;
optional string token = 3;
}

message EmptyRequest {}
Expand Down
2 changes: 1 addition & 1 deletion src/modules/health/health.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { HealthCheck, HealthCheckService, MongooseHealthIndicator } from '@nestj
import JwtAuthGuard from '../auth/guards/jwt-auth.guard';

@Controller('health')
@ApiTags("health")
@ApiTags('health')
export default class HealthController {
private readonly startTime = Date.now();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import type { Metadata } from '@grpc/grpc-js';
import type { ServerUnaryCall } from '@grpc/grpc-js';
import { Metadata } from '@grpc/grpc-js';
import { Controller } from '@nestjs/common';

import { ApiConfigService } from '../../../../src/shared/services/api-config.service';
import type { registerServiceRequest, registerServiceResponse } from '../../grpc/gen/ts/kraken';
import { KrakenServiceRegistryServiceControllerMethods, ServiceTypeEnum } from '../../grpc/gen/ts/kraken';
import { ServiceType } from '../enums/service-types.enum';
Expand All @@ -9,36 +11,58 @@ import ServiceRegistryService from '../services/service-registry.service';
@Controller()
@KrakenServiceRegistryServiceControllerMethods()
export class ServiceRegistryGrpcController {
constructor(private readonly serviceRegistryService: ServiceRegistryService) {}
constructor(
private readonly serviceRegistryService: ServiceRegistryService,
private readonly apiConfig: ApiConfigService,
) {}

async registerService(
{ heartbeatDurationInSec, type, url }: registerServiceRequest,
metadata?: Metadata,
call?: ServerUnaryCall<unknown, unknown>,
): Promise<registerServiceResponse> {
try {
const isValidServiceAuthToken = this.serviceRegistryService.isValidServiceAuthToken(
metadata?.getMap().token?.toString() ?? '',
);
const token = metadata?.getMap().token?.toString();

if (!token) {
throw new Error('Missing authentication token in metadata.');
}

const isValidServiceAuthToken = this.serviceRegistryService.isValidServiceAuthToken(token);

if (!isValidServiceAuthToken) {
throw new Error('invalid auth token.');
throw new Error('Invalid authentication token.');
}

const serviceTypeKey = ServiceTypeEnum[type];

if (!serviceTypeKey || !(serviceTypeKey in ServiceType)) {
throw new Error(`Invalid service type: ${type}`);
}

const { token } = await this.serviceRegistryService.register({
const { token: newToken } = await this.serviceRegistryService.register({
heartbeatDurationInSec,
url,
type: ServiceType[ServiceTypeEnum[type] as keyof typeof ServiceType],
type: ServiceType[serviceTypeKey as keyof typeof ServiceType],
});

const responseMetadata = new Metadata();
responseMetadata.add('token', newToken);

if (call) {
call.sendMetadata(responseMetadata);
}

return {
success: true,
message: '',
token,
};
} catch (error) {
const err = error as { message: string; stack: string };

return {
success: false,
message: JSON.stringify(error.message),
message: err.message || 'An unknown error occurred.',
...(this.apiConfig.isDevelopment && { details: err.stack }),
};
}
}
Expand Down
10 changes: 7 additions & 3 deletions src/modules/subscriptions/subscriptions.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,12 +71,16 @@ export class SubscriptionsController {
hmac.update(signedPayload, 'utf8');
const expectedSignature = hmac.digest('base64');

const isSignatureValid = signature === expectedSignature;

return isSignatureValid;
return signature === expectedSignature;
} catch (error) {
console.error('Error during signature verification:', error);

throw new UnauthorizedException('Invalid signature');
}
}

@Post('seedRedis')
seedRedis() {
return this.subscriptionService.seedRedis();
}
}
41 changes: 40 additions & 1 deletion src/modules/subscriptions/subscriptions.service.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
import { BadRequestException, Injectable, NotFoundException } from '@nestjs/common';
import { Injectable, Logger, NotFoundException } from '@nestjs/common';

Check failure on line 1 in src/modules/subscriptions/subscriptions.service.ts

View workflow job for this annotation

GitHub Actions / ESLint

src/modules/subscriptions/subscriptions.service.ts#L1

Run autofix to sort these imports! (simple-import-sort/imports)
import { InjectRedis } from '@nestjs-modules/ioredis';
import axios from 'axios';
import Redis from 'ioredis';

import { ApiConfigService } from '../../../src/shared/services/api-config.service';
import { ConfigService } from '../config/config.service';
import { SubscriptionRepository } from './subscriptions.repository';
import { MoreThan } from 'typeorm';

@Injectable()
export class SubscriptionsService {
private readonly logger = new Logger(SubscriptionsService.name);

private readonly apiUrl = 'https://api.tryspeed.com/checkout-sessions';

constructor(
@InjectRedis() private readonly redis: Redis,
private readonly apiConfig: ApiConfigService,
private readonly subscriptionRepository: SubscriptionRepository,
private readonly configService: ConfigService,
Expand Down Expand Up @@ -77,6 +83,39 @@ export class SubscriptionsService {
endDate,
});

await this.redis.call('CF.ADD', 'SUBSCRIPTIONS', subscriber);

await this.subscriptionRepository.save(sub);
}

async seedRedis() {
try {
const now = Date.now();
const subscriptions = await this.subscriptionRepository.findAll({
where: {
endDate: MoreThan(now),
},
});

if (subscriptions.length === 0) {
this.logger.warn('No subscriptions found with endDate > now.');

return;
}

this.logger.log(`Found ${subscriptions.length} subscriptions to add to Redis.`);

const pipeline = this.redis.pipeline();

for (const subscription of subscriptions) {
pipeline.call('CF.ADD', 'SUBSCRIPTIONS', subscription.subscriber);
}

const results = await pipeline.exec();

this.logger.log(`Pipeline executed with ${results?.length} commands.`);
} catch (error) {
this.logger.error('An error occurred during seedRedis execution.', (error as { stack: string }).stack);
}
}
}
8 changes: 8 additions & 0 deletions src/shared/services/api-config.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { join } from 'node:path';
import { Injectable } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import type { TypeOrmModuleOptions } from '@nestjs/typeorm';
import type { RedisModuleOptions } from '@nestjs-modules/ioredis';
import { isNil } from 'lodash';

@Injectable()
Expand Down Expand Up @@ -90,6 +91,13 @@ export class ApiConfigService {
};
}

get redisConfig(): RedisModuleOptions {
return {
type: 'single',
url: this.getString('REDIS_URI'),
};
}

get grpcConfig() {
return {
port: this.getString('GRPC_PORT'),
Expand Down
6 changes: 6 additions & 0 deletions src/shared/shared.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import type { Provider } from '@nestjs/common';
import { Global, Module } from '@nestjs/common';
import { ConfigService } from '@nestjs/config';
import { TypeOrmModule } from '@nestjs/typeorm';
import { RedisModule } from '@nestjs-modules/ioredis';

import { ApiConfigService } from './services/api-config.service';

Expand All @@ -16,6 +17,11 @@ const providers: Provider[] = [ConfigService, ApiConfigService];
useFactory: (configService: ApiConfigService) => configService.mongoConfig,
inject: [ApiConfigService],
}),
RedisModule.forRootAsync({
imports: [SharedModule],
useFactory: (configService: ApiConfigService) => configService.redisConfig,
inject: [ApiConfigService],
}),
],
exports: [...providers],
})
Expand Down
Loading

0 comments on commit c3f64d0

Please sign in to comment.