Library provides worker-queue implementation on top of Java and database.
There are several reasons:
- You need simple, efficient and flexible task processing tool which supports delayed job execution.
- You already have a database and don't want to introduce additional tools in your infrastructure (for example Kafka, RabbitMq, ...)
- You have somewhat small load. This worker queue mechanism can handle more than 1000 rps on single database and table. Moreover you can shard your database and get horizontal scalability. However we cannot guarantee that it would be easy to auto scale or handle more than 1000 rps.
- You require strong guaranties for task delivery or processing. Library offers at-least-once delivery semantic.
- You have a task that you want to process later.
- You tell QueueProducer to schedule the task.
- QueueProducer chooses a database shard through QueueShardRouter.
- QueueProducer converts the task payload to string representation through TaskPayloadTransformer.
- QueueProducer inserts the task in the database through QueueDao.
- ... the task has been selected from database in specified time ...
- The task payload is converted to typed representation through TaskPayloadTransformer.
- The task is passed to the QueueConsumer instance in order to be processed.
- You process the task and return processing result.
- Persistence working-queue
- Support for PostgreSQL with version higher or equal to 9.5.
- Storing queue tasks in a separate tables or in the same table (QueueLocation).
- Storing queue tasks in a separate databases for horizontal scaling (QueueShardRouter).
- Delayed task execution.
- At-least-once task processing semantic.
- Several retry strategies in case of a task processing error (TaskRetryType).
- Task event listeners (TaskLifecycleListener, ThreadLifecycleListener).
- Strong-typed api for task processing and enqueuing (TaskPayloadTransformer).
- Several task processing modes (ProcessingMode).
Library supports only PostgreSQL as backing database, however library architecture makes it easy to add other databases which has "skip locked" feature.
Furthermore, library requires Spring Framework (spring-jdbc and spring-tx) for interacting with database. Spring-context is used only for alternative configuration and can be safely excluded from runtime dependencies.
Project uses Semantic Versioning.
Library is available on Bintray's JCenter repository
<dependency>
<groupId>ru.yandex.money.common</groupId>
<artifactId>db-queue</artifactId>
<version>3.0.0</version>
</dependency>
- Create table (with index) where tasks will be stored.
CREATE TABLE queue_tasks (
id BIGSERIAL PRIMARY KEY,
queue_name VARCHAR(128) NOT NULL,
task TEXT,
create_time TIMESTAMP WITH TIME ZONE DEFAULT now(),
process_time TIMESTAMP WITH TIME ZONE DEFAULT now(),
attempt INTEGER DEFAULT 0,
actor VARCHAR(128),
log_timestamp VARCHAR(128)
);
CREATE INDEX queue_tasks_name_time_desc_idx
ON queue_tasks (queue_name, process_time, id DESC);
- Specify a queue configuration through QueueConfig instance (or use QueueConfigsReader).
- Choose name for the queue.
- Specify betweenTaskTimeout and noTaskTimeout settings.
- Use manual or spring-auto configuration.
Manual configuration may be used in cases where you don't have spring context. It offers more flexibility than spring configuration. Example - example.ManualConfiguration.
Main steps to create manual configuration:
- Create QueueDao instance for each shard.
- Implement QueueShardRouter interface or use SingleShardRouter.
- Implement TaskPayloadTransformer interface or use NoopPayloadTransformer.
- Implement QueueProducer interface or use TransactionalProducer.
- Implement QueueConsumer interface.
- Create QueueRegistry and register QueueConsumer and QueueProducer instances.
- Create QueueExecutionPool and start it.
Spring configuration is more lightweight than manual configuration. Example - example.SpringAutoConfiguration.
Spring configuration can be divided in two parts:
- Base configuration. You may put it in your common code - example.SpringAutoConfiguration.Base
- Client configuration specifies how your queues will work - example.SpringAutoConfiguration.Client
Base configuration includes several beans:
- SpringQueueConfigContainer - Provides settings for all queues in your spring context.
- SpringQueueCollector - Collects beans related to spring configuration.
- SpringQueueInitializer - Wires queue beans to each other and starts queues.
In client configuration you must use classes with prefix Spring.
- internal
Internal classes. Not for public use.
Backward compatibility for classes in that package may be broken in any release
You should provide implementation for interfaces in that package. The package contains classes which are involved in processing or enqueueing tasks.
Default implementation for api interfaces. Allows easy configuration in common use cases.
Queue settings.
Additional classes for queue managing and statistics retrieval. In common use cases you don't need to use that classes.
Registration and running queues.
Classes related to Spring configuration.
Default implementation for Spring configuration. Allows easy configuration in common use cases.
You should always analyze your database workload before applying this recommendations. These settings heavily depends on a hardware and a load you have.
You need to set a low fill-factor for table in order to let database put row updates to the same page. In that case database will need less amount of random page writes. This technique also prevents fragmentation so we get more robust selects. Same rules are applied to an indexes. You can safely set fill-factor for tables and indexes to 70%.
You need to make autovacuum more aggressive in order to eliminate dead tuples. Dead tuples leads to excessive page reads because they occupy space that can be reused by active tuples. Autovacuum can be configured in many ways, for example, you can set scale-factor to 1%.
- Retry strategies cannot be defined by a user
In some cases a client may want to use different retry strategies. For example, do first retry almost immediately and then use standard behaviour. This strategy can be useful to deal with temporary glitches in network or database. There is hard to predict what client needs so it is desirable feature.
- Uneven load balancing
One of the hosts can consequently process several tasks very quickly while other hosts are sleeping.
- Max throughput is limited by "between task timeout"
Thread falls asleep for "between task timeout" regardless of task processing result. Although, it can pick next task after successful result and do processing.
- No support for Blue-green deployment
There is no support for blue-green deployment because a task is not bound to a host or to a group of hosts.
- No support for failover.
QueueProducer can fail on task scheduling. We can detect that fail is caused by database and try insert task on next shard.
- Hard to write tests.
Task processing is asynchronous. Therefore, it is hard to write tests because you always must think about that fact and write code according to it. We can implement some kind of a synchronous mode for tests.