Skip to content

Commit

Permalink
update bullmq
Browse files Browse the repository at this point in the history
  • Loading branch information
HenryHengZJ committed Jan 6, 2025
1 parent 0a5d38c commit 9323893
Show file tree
Hide file tree
Showing 12 changed files with 168 additions and 49 deletions.
13 changes: 13 additions & 0 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,19 @@ services:
- GLOBAL_AGENT_HTTPS_PROXY=${GLOBAL_AGENT_HTTPS_PROXY}
- GLOBAL_AGENT_NO_PROXY=${GLOBAL_AGENT_NO_PROXY}
- DISABLED_NODES=${DISABLED_NODES}
- MODE=${MODE}
- WORKER_CONCURRENCY=${WORKER_CONCURRENCY}
- QUEUE_NAME=${QUEUE_NAME}
- QUEUE_REDIS_EVENT_STREAM_MAX_LEN=${QUEUE_REDIS_EVENT_STREAM_MAX_LEN}
- REDIS_HOST=${REDIS_HOST}
- REDIS_PORT=${REDIS_PORT}
- REDIS_PASSWORD=${REDIS_PASSWORD}
- REDIS_USERNAME=${REDIS_USERNAME}
- REDIS_TLS=${REDIS_TLS}
- REDIS_CERT=${REDIS_CERT}
- REDIS_KEY=${REDIS_KEY}
- REDIS_CA=${REDIS_CA}
- REDIS_URL=${REDIS_URL}
ports:
- '${PORT}:${PORT}'
volumes:
Expand Down
54 changes: 54 additions & 0 deletions docker/worker/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
version: '3.1'

services:
flowise:
image: flowiseai/flowise-bullmq
restart: always
environment:
- PORT=${PORT}
- CORS_ORIGINS=${CORS_ORIGINS}
- IFRAME_ORIGINS=${IFRAME_ORIGINS}
- FLOWISE_USERNAME=${FLOWISE_USERNAME}
- FLOWISE_PASSWORD=${FLOWISE_PASSWORD}
- FLOWISE_FILE_SIZE_LIMIT=${FLOWISE_FILE_SIZE_LIMIT}
- DEBUG=${DEBUG}
- DATABASE_PATH=${DATABASE_PATH}
- DATABASE_TYPE=${DATABASE_TYPE}
- DATABASE_PORT=${DATABASE_PORT}
- DATABASE_HOST=${DATABASE_HOST}
- DATABASE_NAME=${DATABASE_NAME}
- DATABASE_USER=${DATABASE_USER}
- DATABASE_PASSWORD=${DATABASE_PASSWORD}
- DATABASE_SSL=${DATABASE_SSL}
- DATABASE_SSL_KEY_BASE64=${DATABASE_SSL_KEY_BASE64}
- APIKEY_STORAGE_TYPE=${APIKEY_STORAGE_TYPE}
- APIKEY_PATH=${APIKEY_PATH}
- SECRETKEY_PATH=${SECRETKEY_PATH}
- FLOWISE_SECRETKEY_OVERWRITE=${FLOWISE_SECRETKEY_OVERWRITE}
- LOG_LEVEL=${LOG_LEVEL}
- LOG_PATH=${LOG_PATH}
- BLOB_STORAGE_PATH=${BLOB_STORAGE_PATH}
- DISABLE_FLOWISE_TELEMETRY=${DISABLE_FLOWISE_TELEMETRY}
- MODEL_LIST_CONFIG_JSON=${MODEL_LIST_CONFIG_JSON}
- GLOBAL_AGENT_HTTP_PROXY=${GLOBAL_AGENT_HTTP_PROXY}
- GLOBAL_AGENT_HTTPS_PROXY=${GLOBAL_AGENT_HTTPS_PROXY}
- GLOBAL_AGENT_NO_PROXY=${GLOBAL_AGENT_NO_PROXY}
- DISABLED_NODES=${DISABLED_NODES}
- MODE=${MODE}
- WORKER_CONCURRENCY=${WORKER_CONCURRENCY}
- QUEUE_NAME=${QUEUE_NAME}
- QUEUE_REDIS_EVENT_STREAM_MAX_LEN=${QUEUE_REDIS_EVENT_STREAM_MAX_LEN}
- REDIS_HOST=${REDIS_HOST}
- REDIS_PORT=${REDIS_PORT}
- REDIS_PASSWORD=${REDIS_PASSWORD}
- REDIS_USERNAME=${REDIS_USERNAME}
- REDIS_TLS=${REDIS_TLS}
- REDIS_CERT=${REDIS_CERT}
- REDIS_KEY=${REDIS_KEY}
- REDIS_CA=${REDIS_CA}
- REDIS_URL=${REDIS_URL}
ports:
- '${PORT}:${PORT}'
volumes:
- ~/.flowise:/root/.flowise
entrypoint: /bin/sh -c "sleep 3; flowise worker"
2 changes: 1 addition & 1 deletion packages/server/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "flowise-bullmq",
"version": "2.2.3",
"version": "2.2.8",
"description": "Flowiseai Server",
"main": "dist/index",
"types": "dist/index.d.ts",
Expand Down
33 changes: 19 additions & 14 deletions packages/server/src/CachePool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,25 @@ export class CachePool {

constructor() {
if (process.env.MODE === MODE.QUEUE) {
this.redisClient = new Redis({
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
username: process.env.REDIS_USERNAME || undefined,
password: process.env.REDIS_PASSWORD || undefined,
tls:
process.env.REDIS_TLS === 'true'
? {
cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined,
key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined,
ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined
}
: undefined
})
if (process.env.REDIS_URL) {
this.redisClient = new Redis(process.env.REDIS_URL)
} else {
this.redisClient = new Redis({
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
username: process.env.REDIS_USERNAME || undefined,
password: process.env.REDIS_PASSWORD || undefined,
tls:
process.env.REDIS_TLS === 'true'
? {
cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined,
key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined,
ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined
}
: undefined
})
}
console.log('CachePool connection:', this.redisClient)

Check failure on line 32 in packages/server/src/CachePool.ts

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, 18.15.0)

Unexpected console statement
}
}

Expand Down
2 changes: 2 additions & 0 deletions packages/server/src/commands/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ export abstract class BaseCommand extends Command {
WORKER_CONCURRENCY: Flags.string(),
QUEUE_NAME: Flags.string(),
QUEUE_REDIS_EVENT_STREAM_MAX_LEN: Flags.string(),
REDIS_URL: Flags.string(),
REDIS_HOST: Flags.string(),
REDIS_PORT: Flags.string(),
REDIS_USERNAME: Flags.string(),
Expand Down Expand Up @@ -176,6 +177,7 @@ export abstract class BaseCommand extends Command {

// Queue
if (flags.MODE) process.env.MODE = flags.MODE
if (flags.REDIS_URL) process.env.REDIS_URL = flags.REDIS_URL
if (flags.REDIS_HOST) process.env.REDIS_HOST = flags.REDIS_HOST
if (flags.REDIS_PORT) process.env.REDIS_PORT = flags.REDIS_PORT
if (flags.REDIS_USERNAME) process.env.REDIS_USERNAME = flags.REDIS_USERNAME
Expand Down
2 changes: 1 addition & 1 deletion packages/server/src/commands/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ export default class Worker extends BaseCommand {
logger.info(`Prediction Worker ${this.predictionWorkerId} created`)

const predictionQueueName = predictionQueue.getQueueName()
const queueEvents = new QueueEvents(predictionQueueName)
const queueEvents = new QueueEvents(predictionQueueName, { connection: queueManager.getConnection() })

queueEvents.on<CustomListener>('abort', async ({ id }: { id: string }) => {
abortControllerPool.abort(id)
Expand Down
3 changes: 0 additions & 3 deletions packages/server/src/queue/PredictionQueue.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import dotenv from 'dotenv'
import { DataSource } from 'typeorm'
import { executeFlow } from '../utils/buildChatflow'
import { IComponentNodes, IExecuteFlowParams } from '../Interface'
Expand All @@ -9,8 +8,6 @@ import { AbortControllerPool } from '../AbortControllerPool'
import { BaseQueue } from './BaseQueue'
import { RedisOptions } from 'bullmq'

dotenv.config()

interface PredictionQueueOptions {
appDataSource: DataSource
telemetry: Telemetry
Expand Down
28 changes: 17 additions & 11 deletions packages/server/src/queue/QueueManager.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import dotenv from 'dotenv'
import { BaseQueue } from './BaseQueue'
import { PredictionQueue } from './PredictionQueue'
import { UpsertQueue } from './UpsertQueue'
Expand All @@ -12,8 +11,6 @@ import { createBullBoard } from 'bull-board'
import { BullMQAdapter } from 'bull-board/bullMQAdapter'
import { Express } from 'express'

dotenv.config()

const QUEUE_NAME = process.env.QUEUE_NAME || 'flowise-queue'

type QUEUE_TYPE = 'prediction' | 'upsert'
Expand All @@ -25,20 +22,27 @@ export class QueueManager {
private bullBoardRouter?: Express

private constructor() {
let tlsOpts = undefined
if (process.env.REDIS_URL && process.env.REDIS_URL.startsWith('rediss://')) {
tlsOpts = {
rejectUnauthorized: false
}
} else if (process.env.REDIS_TLS === 'true') {
tlsOpts = {
cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined,
key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined,
ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined
}
}
this.connection = {
url: process.env.REDIS_URL || undefined,
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
username: process.env.REDIS_USERNAME || undefined,
password: process.env.REDIS_PASSWORD || undefined,
tls:
process.env.REDIS_TLS === 'true'
? {
cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined,
key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined,
ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined
}
: undefined
tls: tlsOpts
}
console.log('QueueManager connection:', this.connection)

Check failure on line 45 in packages/server/src/queue/QueueManager.ts

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, 18.15.0)

Unexpected console statement
}

public static getInstance(): QueueManager {
Expand Down Expand Up @@ -99,6 +103,7 @@ export class QueueManager {
abortControllerPool
})
this.registerQueue('prediction', predictionQueue)
console.log('predictionQueue', predictionQueue.getQueue())

Check failure on line 106 in packages/server/src/queue/QueueManager.ts

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, 18.15.0)

Unexpected console statement

const upsertionQueueName = `${QUEUE_NAME}-upsertion`
const upsertionQueue = new UpsertQueue(upsertionQueueName, this.connection, {
Expand All @@ -108,6 +113,7 @@ export class QueueManager {
appDataSource
})
this.registerQueue('upsert', upsertionQueue)
console.log('upsertionQueue', upsertionQueue.getQueue())

Check failure on line 116 in packages/server/src/queue/QueueManager.ts

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, 18.15.0)

Unexpected console statement

const bullboard = createBullBoard([new BullMQAdapter(predictionQueue.getQueue()), new BullMQAdapter(upsertionQueue.getQueue())])
this.bullBoardRouter = bullboard.router
Expand Down
22 changes: 21 additions & 1 deletion packages/server/src/queue/RedisEventPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,31 @@ export class RedisEventPublisher implements IServerSideEventStreamer {
private redisPublisher: ReturnType<typeof createClient>

constructor() {
this.redisPublisher = createClient()
if (process.env.REDIS_URL) {
this.redisPublisher = createClient({
url: process.env.REDIS_URL
})
} else {
this.redisPublisher = createClient({
username: process.env.REDIS_USERNAME || undefined,
password: process.env.REDIS_PASSWORD || undefined,
socket: {
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
tls: process.env.REDIS_TLS === 'true',
cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined,
key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined,
ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined
}
})
}
console.log('this.redisPublisher', this.redisPublisher)

Check failure on line 26 in packages/server/src/queue/RedisEventPublisher.ts

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, 18.15.0)

Unexpected console statement
}

async connect() {
console.log('RedisEventPublisher connecting......................')

Check failure on line 30 in packages/server/src/queue/RedisEventPublisher.ts

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, 18.15.0)

Unexpected console statement
await this.redisPublisher.connect()
console.log('RedisEventPublisher connected')

Check failure on line 32 in packages/server/src/queue/RedisEventPublisher.ts

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, 18.15.0)

Unexpected console statement
}

streamCustomEvent(chatId: string, eventType: string, data: any) {
Expand Down
22 changes: 21 additions & 1 deletion packages/server/src/queue/RedisEventSubscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,32 @@ export class RedisEventSubscriber {
private subscribedChannels: Set<string> = new Set()

constructor(sseStreamer: SSEStreamer) {
this.redisSubscriber = createClient()
if (process.env.REDIS_URL) {
this.redisSubscriber = createClient({
url: process.env.REDIS_URL
})
} else {
this.redisSubscriber = createClient({
username: process.env.REDIS_USERNAME || undefined,
password: process.env.REDIS_PASSWORD || undefined,
socket: {
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
tls: process.env.REDIS_TLS === 'true',
cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined,
key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined,
ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined
}
})
}
console.log('this.redisSubscriber', this.redisSubscriber)

Check failure on line 28 in packages/server/src/queue/RedisEventSubscriber.ts

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, 18.15.0)

Unexpected console statement
this.sseStreamer = sseStreamer
}

async connect() {
console.log('redisSubscriber connecting......................')

Check failure on line 33 in packages/server/src/queue/RedisEventSubscriber.ts

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, 18.15.0)

Unexpected console statement
await this.redisSubscriber.connect()
console.log('redisSubscriber connected')

Check failure on line 35 in packages/server/src/queue/RedisEventSubscriber.ts

View workflow job for this annotation

GitHub Actions / build (ubuntu-latest, 18.15.0)

Unexpected console statement
}

subscribe(channel: string) {
Expand Down
3 changes: 0 additions & 3 deletions packages/server/src/queue/UpsertQueue.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import dotenv from 'dotenv'
import { DataSource } from 'typeorm'
import { IComponentNodes, IExecuteDocStoreUpsert, IExecuteFlowParams, IExecuteProcessLoader, IExecuteVectorStoreInsert } from '../Interface'
import { Telemetry } from '../utils/telemetry'
Expand All @@ -9,8 +8,6 @@ import { executeDocStoreUpsert, insertIntoVectorStore, processLoader } from '../
import { RedisOptions } from 'bullmq'
import logger from '../utils/logger'

dotenv.config()

interface UpsertQueueOptions {
appDataSource: DataSource
telemetry: Telemetry
Expand Down
33 changes: 19 additions & 14 deletions packages/server/src/utils/rateLimit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,25 @@ export class RateLimiterManager {

constructor() {
if (process.env.MODE === MODE.QUEUE) {
this.redisClient = new Redis({
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
username: process.env.REDIS_USERNAME || undefined,
password: process.env.REDIS_PASSWORD || undefined,
tls:
process.env.REDIS_TLS === 'true'
? {
cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined,
key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined,
ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined
}
: undefined
})
if (process.env.REDIS_URL) {
this.redisClient = new Redis(process.env.REDIS_URL)
} else {
this.redisClient = new Redis({
host: process.env.REDIS_HOST || 'localhost',
port: parseInt(process.env.REDIS_PORT || '6379'),
username: process.env.REDIS_USERNAME || undefined,
password: process.env.REDIS_PASSWORD || undefined,
tls:
process.env.REDIS_TLS === 'true'
? {
cert: process.env.REDIS_CERT ? Buffer.from(process.env.REDIS_CERT, 'base64') : undefined,
key: process.env.REDIS_KEY ? Buffer.from(process.env.REDIS_KEY, 'base64') : undefined,
ca: process.env.REDIS_CA ? Buffer.from(process.env.REDIS_CA, 'base64') : undefined
}
: undefined
})
}
console.log('RateLimiterManager connection:', this.redisClient)
}
}

Expand Down

0 comments on commit 9323893

Please sign in to comment.