From 9323893475c6825fdc17f86b6664c6dbd154f390 Mon Sep 17 00:00:00 2001 From: Henry Date: Mon, 6 Jan 2025 20:06:04 +0000 Subject: [PATCH] update bullmq --- docker/docker-compose.yml | 13 +++++ docker/worker/docker-compose.yml | 54 +++++++++++++++++++ packages/server/package.json | 2 +- packages/server/src/CachePool.ts | 33 +++++++----- packages/server/src/commands/base.ts | 2 + packages/server/src/commands/worker.ts | 2 +- packages/server/src/queue/PredictionQueue.ts | 3 -- packages/server/src/queue/QueueManager.ts | 28 ++++++---- .../server/src/queue/RedisEventPublisher.ts | 22 +++++++- .../server/src/queue/RedisEventSubscriber.ts | 22 +++++++- packages/server/src/queue/UpsertQueue.ts | 3 -- packages/server/src/utils/rateLimit.ts | 33 +++++++----- 12 files changed, 168 insertions(+), 49 deletions(-) create mode 100644 docker/worker/docker-compose.yml diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index c0a87b7..a9f8eba 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -34,6 +34,19 @@ services: - GLOBAL_AGENT_HTTPS_PROXY=${GLOBAL_AGENT_HTTPS_PROXY} - GLOBAL_AGENT_NO_PROXY=${GLOBAL_AGENT_NO_PROXY} - DISABLED_NODES=${DISABLED_NODES} + - MODE=${MODE} + - WORKER_CONCURRENCY=${WORKER_CONCURRENCY} + - QUEUE_NAME=${QUEUE_NAME} + - QUEUE_REDIS_EVENT_STREAM_MAX_LEN=${QUEUE_REDIS_EVENT_STREAM_MAX_LEN} + - REDIS_HOST=${REDIS_HOST} + - REDIS_PORT=${REDIS_PORT} + - REDIS_PASSWORD=${REDIS_PASSWORD} + - REDIS_USERNAME=${REDIS_USERNAME} + - REDIS_TLS=${REDIS_TLS} + - REDIS_CERT=${REDIS_CERT} + - REDIS_KEY=${REDIS_KEY} + - REDIS_CA=${REDIS_CA} + - REDIS_URL=${REDIS_URL} ports: - '${PORT}:${PORT}' volumes: diff --git a/docker/worker/docker-compose.yml b/docker/worker/docker-compose.yml new file mode 100644 index 0000000..01ca208 --- /dev/null +++ b/docker/worker/docker-compose.yml @@ -0,0 +1,54 @@ +version: '3.1' + +services: + flowise: + image: flowiseai/flowise-bullmq + restart: always + environment: + - PORT=${PORT} + - CORS_ORIGINS=${CORS_ORIGINS} + - IFRAME_ORIGINS=${IFRAME_ORIGINS} + - FLOWISE_USERNAME=${FLOWISE_USERNAME} + - FLOWISE_PASSWORD=${FLOWISE_PASSWORD} + - FLOWISE_FILE_SIZE_LIMIT=${FLOWISE_FILE_SIZE_LIMIT} + - DEBUG=${DEBUG} + - DATABASE_PATH=${DATABASE_PATH} + - DATABASE_TYPE=${DATABASE_TYPE} + - DATABASE_PORT=${DATABASE_PORT} + - DATABASE_HOST=${DATABASE_HOST} + - DATABASE_NAME=${DATABASE_NAME} + - DATABASE_USER=${DATABASE_USER} + - DATABASE_PASSWORD=${DATABASE_PASSWORD} + - DATABASE_SSL=${DATABASE_SSL} + - DATABASE_SSL_KEY_BASE64=${DATABASE_SSL_KEY_BASE64} + - APIKEY_STORAGE_TYPE=${APIKEY_STORAGE_TYPE} + - APIKEY_PATH=${APIKEY_PATH} + - SECRETKEY_PATH=${SECRETKEY_PATH} + - FLOWISE_SECRETKEY_OVERWRITE=${FLOWISE_SECRETKEY_OVERWRITE} + - LOG_LEVEL=${LOG_LEVEL} + - LOG_PATH=${LOG_PATH} + - BLOB_STORAGE_PATH=${BLOB_STORAGE_PATH} + - DISABLE_FLOWISE_TELEMETRY=${DISABLE_FLOWISE_TELEMETRY} + - MODEL_LIST_CONFIG_JSON=${MODEL_LIST_CONFIG_JSON} + - GLOBAL_AGENT_HTTP_PROXY=${GLOBAL_AGENT_HTTP_PROXY} + - GLOBAL_AGENT_HTTPS_PROXY=${GLOBAL_AGENT_HTTPS_PROXY} + - GLOBAL_AGENT_NO_PROXY=${GLOBAL_AGENT_NO_PROXY} + - DISABLED_NODES=${DISABLED_NODES} + - MODE=${MODE} + - WORKER_CONCURRENCY=${WORKER_CONCURRENCY} + - QUEUE_NAME=${QUEUE_NAME} + - QUEUE_REDIS_EVENT_STREAM_MAX_LEN=${QUEUE_REDIS_EVENT_STREAM_MAX_LEN} + - REDIS_HOST=${REDIS_HOST} + - REDIS_PORT=${REDIS_PORT} + - REDIS_PASSWORD=${REDIS_PASSWORD} + - REDIS_USERNAME=${REDIS_USERNAME} + - REDIS_TLS=${REDIS_TLS} + - REDIS_CERT=${REDIS_CERT} + - REDIS_KEY=${REDIS_KEY} + - REDIS_CA=${REDIS_CA} + - REDIS_URL=${REDIS_URL} + ports: + - '${PORT}:${PORT}' + volumes: + - ~/.flowise:/root/.flowise + entrypoint: /bin/sh -c "sleep 3; flowise worker" diff --git a/packages/server/package.json b/packages/server/package.json index 2e66a6c..8ffbc70 100644 --- a/packages/server/package.json +++ b/packages/server/package.json @@ -1,6 +1,6 @@ { "name": "flowise-bullmq", - "version": "2.2.3", + "version": "2.2.8", "description": "Flowiseai Server", "main": "dist/index", "types": "dist/index.d.ts", diff --git a/packages/server/src/CachePool.ts b/packages/server/src/CachePool.ts index 69dd7ff..36fd1a2 100644 --- a/packages/server/src/CachePool.ts +++ b/packages/server/src/CachePool.ts @@ -11,20 +11,25 @@ export class CachePool { constructor() { if (process.env.MODE === MODE.QUEUE) { - this.redisClient = new Redis({ - host: process.env.REDIS_HOST || 'localhost', - port: parseInt(process.env.REDIS_PORT || '6379'), - username: process.env.REDIS_USERNAME || undefined, - password: process.env.REDIS_PASSWORD || undefined, - tls: - process.env.REDIS_TLS === 'true' - ? { - cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined, - key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined, - ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined - } - : undefined - }) + if (process.env.REDIS_URL) { + this.redisClient = new Redis(process.env.REDIS_URL) + } else { + this.redisClient = new Redis({ + host: process.env.REDIS_HOST || 'localhost', + port: parseInt(process.env.REDIS_PORT || '6379'), + username: process.env.REDIS_USERNAME || undefined, + password: process.env.REDIS_PASSWORD || undefined, + tls: + process.env.REDIS_TLS === 'true' + ? { + cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined, + key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined, + ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined + } + : undefined + }) + } + console.log('CachePool connection:', this.redisClient) } } diff --git a/packages/server/src/commands/base.ts b/packages/server/src/commands/base.ts index 4123881..09d4db5 100644 --- a/packages/server/src/commands/base.ts +++ b/packages/server/src/commands/base.ts @@ -57,6 +57,7 @@ export abstract class BaseCommand extends Command { WORKER_CONCURRENCY: Flags.string(), QUEUE_NAME: Flags.string(), QUEUE_REDIS_EVENT_STREAM_MAX_LEN: Flags.string(), + REDIS_URL: Flags.string(), REDIS_HOST: Flags.string(), REDIS_PORT: Flags.string(), REDIS_USERNAME: Flags.string(), @@ -176,6 +177,7 @@ export abstract class BaseCommand extends Command { // Queue if (flags.MODE) process.env.MODE = flags.MODE + if (flags.REDIS_URL) process.env.REDIS_URL = flags.REDIS_URL if (flags.REDIS_HOST) process.env.REDIS_HOST = flags.REDIS_HOST if (flags.REDIS_PORT) process.env.REDIS_PORT = flags.REDIS_PORT if (flags.REDIS_USERNAME) process.env.REDIS_USERNAME = flags.REDIS_USERNAME diff --git a/packages/server/src/commands/worker.ts b/packages/server/src/commands/worker.ts index 583c253..89810b6 100644 --- a/packages/server/src/commands/worker.ts +++ b/packages/server/src/commands/worker.ts @@ -37,7 +37,7 @@ export default class Worker extends BaseCommand { logger.info(`Prediction Worker ${this.predictionWorkerId} created`) const predictionQueueName = predictionQueue.getQueueName() - const queueEvents = new QueueEvents(predictionQueueName) + const queueEvents = new QueueEvents(predictionQueueName, { connection: queueManager.getConnection() }) queueEvents.on('abort', async ({ id }: { id: string }) => { abortControllerPool.abort(id) diff --git a/packages/server/src/queue/PredictionQueue.ts b/packages/server/src/queue/PredictionQueue.ts index f7a7099..dea8bc4 100644 --- a/packages/server/src/queue/PredictionQueue.ts +++ b/packages/server/src/queue/PredictionQueue.ts @@ -1,4 +1,3 @@ -import dotenv from 'dotenv' import { DataSource } from 'typeorm' import { executeFlow } from '../utils/buildChatflow' import { IComponentNodes, IExecuteFlowParams } from '../Interface' @@ -9,8 +8,6 @@ import { AbortControllerPool } from '../AbortControllerPool' import { BaseQueue } from './BaseQueue' import { RedisOptions } from 'bullmq' -dotenv.config() - interface PredictionQueueOptions { appDataSource: DataSource telemetry: Telemetry diff --git a/packages/server/src/queue/QueueManager.ts b/packages/server/src/queue/QueueManager.ts index 9fdbd67..543ea14 100644 --- a/packages/server/src/queue/QueueManager.ts +++ b/packages/server/src/queue/QueueManager.ts @@ -1,4 +1,3 @@ -import dotenv from 'dotenv' import { BaseQueue } from './BaseQueue' import { PredictionQueue } from './PredictionQueue' import { UpsertQueue } from './UpsertQueue' @@ -12,8 +11,6 @@ import { createBullBoard } from 'bull-board' import { BullMQAdapter } from 'bull-board/bullMQAdapter' import { Express } from 'express' -dotenv.config() - const QUEUE_NAME = process.env.QUEUE_NAME || 'flowise-queue' type QUEUE_TYPE = 'prediction' | 'upsert' @@ -25,20 +22,27 @@ export class QueueManager { private bullBoardRouter?: Express private constructor() { + let tlsOpts = undefined + if (process.env.REDIS_URL && process.env.REDIS_URL.startsWith('rediss://')) { + tlsOpts = { + rejectUnauthorized: false + } + } else if (process.env.REDIS_TLS === 'true') { + tlsOpts = { + cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined, + key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined, + ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined + } + } this.connection = { + url: process.env.REDIS_URL || undefined, host: process.env.REDIS_HOST || 'localhost', port: parseInt(process.env.REDIS_PORT || '6379'), username: process.env.REDIS_USERNAME || undefined, password: process.env.REDIS_PASSWORD || undefined, - tls: - process.env.REDIS_TLS === 'true' - ? { - cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined, - key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined, - ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined - } - : undefined + tls: tlsOpts } + console.log('QueueManager connection:', this.connection) } public static getInstance(): QueueManager { @@ -99,6 +103,7 @@ export class QueueManager { abortControllerPool }) this.registerQueue('prediction', predictionQueue) + console.log('predictionQueue', predictionQueue.getQueue()) const upsertionQueueName = `${QUEUE_NAME}-upsertion` const upsertionQueue = new UpsertQueue(upsertionQueueName, this.connection, { @@ -108,6 +113,7 @@ export class QueueManager { appDataSource }) this.registerQueue('upsert', upsertionQueue) + console.log('upsertionQueue', upsertionQueue.getQueue()) const bullboard = createBullBoard([new BullMQAdapter(predictionQueue.getQueue()), new BullMQAdapter(upsertionQueue.getQueue())]) this.bullBoardRouter = bullboard.router diff --git a/packages/server/src/queue/RedisEventPublisher.ts b/packages/server/src/queue/RedisEventPublisher.ts index 841dbc2..65f786c 100644 --- a/packages/server/src/queue/RedisEventPublisher.ts +++ b/packages/server/src/queue/RedisEventPublisher.ts @@ -5,11 +5,31 @@ export class RedisEventPublisher implements IServerSideEventStreamer { private redisPublisher: ReturnType constructor() { - this.redisPublisher = createClient() + if (process.env.REDIS_URL) { + this.redisPublisher = createClient({ + url: process.env.REDIS_URL + }) + } else { + this.redisPublisher = createClient({ + username: process.env.REDIS_USERNAME || undefined, + password: process.env.REDIS_PASSWORD || undefined, + socket: { + host: process.env.REDIS_HOST || 'localhost', + port: parseInt(process.env.REDIS_PORT || '6379'), + tls: process.env.REDIS_TLS === 'true', + cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined, + key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined, + ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined + } + }) + } + console.log('this.redisPublisher', this.redisPublisher) } async connect() { + console.log('RedisEventPublisher connecting......................') await this.redisPublisher.connect() + console.log('RedisEventPublisher connected') } streamCustomEvent(chatId: string, eventType: string, data: any) { diff --git a/packages/server/src/queue/RedisEventSubscriber.ts b/packages/server/src/queue/RedisEventSubscriber.ts index 5e7d0ad..5d9b16c 100644 --- a/packages/server/src/queue/RedisEventSubscriber.ts +++ b/packages/server/src/queue/RedisEventSubscriber.ts @@ -7,12 +7,32 @@ export class RedisEventSubscriber { private subscribedChannels: Set = new Set() constructor(sseStreamer: SSEStreamer) { - this.redisSubscriber = createClient() + if (process.env.REDIS_URL) { + this.redisSubscriber = createClient({ + url: process.env.REDIS_URL + }) + } else { + this.redisSubscriber = createClient({ + username: process.env.REDIS_USERNAME || undefined, + password: process.env.REDIS_PASSWORD || undefined, + socket: { + host: process.env.REDIS_HOST || 'localhost', + port: parseInt(process.env.REDIS_PORT || '6379'), + tls: process.env.REDIS_TLS === 'true', + cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined, + key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined, + ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined + } + }) + } + console.log('this.redisSubscriber', this.redisSubscriber) this.sseStreamer = sseStreamer } async connect() { + console.log('redisSubscriber connecting......................') await this.redisSubscriber.connect() + console.log('redisSubscriber connected') } subscribe(channel: string) { diff --git a/packages/server/src/queue/UpsertQueue.ts b/packages/server/src/queue/UpsertQueue.ts index 4ef30fd..7d9e4b3 100644 --- a/packages/server/src/queue/UpsertQueue.ts +++ b/packages/server/src/queue/UpsertQueue.ts @@ -1,4 +1,3 @@ -import dotenv from 'dotenv' import { DataSource } from 'typeorm' import { IComponentNodes, IExecuteDocStoreUpsert, IExecuteFlowParams, IExecuteProcessLoader, IExecuteVectorStoreInsert } from '../Interface' import { Telemetry } from '../utils/telemetry' @@ -9,8 +8,6 @@ import { executeDocStoreUpsert, insertIntoVectorStore, processLoader } from '../ import { RedisOptions } from 'bullmq' import logger from '../utils/logger' -dotenv.config() - interface UpsertQueueOptions { appDataSource: DataSource telemetry: Telemetry diff --git a/packages/server/src/utils/rateLimit.ts b/packages/server/src/utils/rateLimit.ts index b13deac..8072c96 100644 --- a/packages/server/src/utils/rateLimit.ts +++ b/packages/server/src/utils/rateLimit.ts @@ -13,20 +13,25 @@ export class RateLimiterManager { constructor() { if (process.env.MODE === MODE.QUEUE) { - this.redisClient = new Redis({ - host: process.env.REDIS_HOST || 'localhost', - port: parseInt(process.env.REDIS_PORT || '6379'), - username: process.env.REDIS_USERNAME || undefined, - password: process.env.REDIS_PASSWORD || undefined, - tls: - process.env.REDIS_TLS === 'true' - ? { - cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined, - key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined, - ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined - } - : undefined - }) + if (process.env.REDIS_URL) { + this.redisClient = new Redis(process.env.REDIS_URL) + } else { + this.redisClient = new Redis({ + host: process.env.REDIS_HOST || 'localhost', + port: parseInt(process.env.REDIS_PORT || '6379'), + username: process.env.REDIS_USERNAME || undefined, + password: process.env.REDIS_PASSWORD || undefined, + tls: + process.env.REDIS_TLS === 'true' + ? { + cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined, + key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined, + ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined + } + : undefined + }) + } + console.log('RateLimiterManager connection:', this.redisClient) } }