Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 30 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ Solid Queue can be used with SQL databases such as MySQL, PostgreSQL, or SQLite,
- [Dashboard UI Setup](#dashboard-ui-setup)
- [Incremental adoption](#incremental-adoption)
- [High performance requirements](#high-performance-requirements)
- [Workers, dispatchers, and scheduler](#workers-dispatchers-and-scheduler)
- [Fork vs. async mode](#fork-vs-async-mode)
- [Configuration](#configuration)
- [Workers, dispatchers, and scheduler](#workers-dispatchers-and-scheduler)
- [Queue order and priorities](#queue-order-and-priorities)
- [Queues specification and performance](#queues-specification-and-performance)
- [Threads, processes, and signals](#threads-processes-and-signals)
Expand Down Expand Up @@ -179,9 +180,7 @@ end

Solid Queue was designed for the highest throughput when used with MySQL 8+, MariaDB 10.6+, or PostgreSQL 9.5+, as they support `FOR UPDATE SKIP LOCKED`. You can use it with older versions, but in that case, you might run into lock waits if you run multiple workers for the same queue. You can also use it with SQLite on smaller applications.

## Configuration

### Workers, dispatchers, and scheduler
## Workers, dispatchers, and scheduler

We have several types of actors in Solid Queue:

Expand All @@ -190,7 +189,17 @@ We have several types of actors in Solid Queue:
- The _scheduler_ manages [recurring tasks](#recurring-tasks), enqueuing jobs for them when they're due.
- The _supervisor_ runs workers and dispatchers according to the configuration, controls their heartbeats, and stops and starts them when needed.

Solid Queue's supervisor will fork a separate process for each supervised worker/dispatcher/scheduler.
### Fork vs. async mode

By default, Solid Queue runs in `fork` mode. This means the supervisor will fork a separate process for each supervised worker/dispatcher/scheduler. This provides the best isolation and performance, but can have additional memory usage. Alternatively, you can run all workers, dispatchers and schedulers in the same process as the supervisor, in different threads, with an `async` mode. You can choose this mode by running `bin/jobs` as:

```
bin/jobs --mode async
```

Or you can also set the environment variable `SOLID_QUEUE_SUPERVISOR_MODE` to `async`. If you use the `async` mode, the `processes` option in the configuration described below will be ignored.

## Configuration

By default, Solid Queue will try to find your configuration under `config/queue.yml`, but you can set a different path using the environment variable `SOLID_QUEUE_CONFIG` or by using the `-c/--config_file` option with `bin/jobs`, like this:

Expand Down Expand Up @@ -254,7 +263,7 @@ Here's an overview of the different options:

- `threads`: this is the max size of the thread pool that each worker will have to run jobs. Each worker will fetch this number of jobs from their queue(s), at most and will post them to the thread pool to be run. By default, this is `3`. Only workers have this setting.
It is recommended to set this value less than or equal to the queue database's connection pool size minus 2, as each worker thread uses one connection, and two additional connections are reserved for polling and heartbeat.
- `processes`: this is the number of worker processes that will be forked by the supervisor with the settings given. By default, this is `1`, just a single process. This setting is useful if you want to dedicate more than one CPU core to a queue or queues with the same configuration. Only workers have this setting.
- `processes`: this is the number of worker processes that will be forked by the supervisor with the settings given. By default, this is `1`, just a single process. This setting is useful if you want to dedicate more than one CPU core to a queue or queues with the same configuration. Only workers have this setting. **Note**: this option will be ignored if [running in `async` mode](#fork-vs-async-mode).
- `concurrency_maintenance`: whether the dispatcher will perform the concurrency maintenance work. This is `true` by default, and it's useful if you don't use any [concurrency controls](#concurrency-controls) and want to disable it or if you run multiple dispatchers and want some of them to just dispatch jobs without doing anything else.


Expand Down Expand Up @@ -334,7 +343,7 @@ queues: back*

Workers in Solid Queue use a thread pool to run work in multiple threads, configurable via the `threads` parameter above. Besides this, parallelism can be achieved via multiple processes on one machine (configurable via different workers or the `processes` parameter above) or by horizontal scaling.

The supervisor is in charge of managing these processes, and it responds to the following signals:
The supervisor is in charge of managing these processes, and it responds to the following signals when running in its own process via `bin/jobs` or with [the Puma plugin](#puma-plugin) with the default `fork` mode:
- `TERM`, `INT`: starts graceful termination. The supervisor will send a `TERM` signal to its supervised processes, and it'll wait up to `SolidQueue.shutdown_timeout` time until they're done. If any supervised processes are still around by then, it'll send a `QUIT` signal to them to indicate they must exit.
- `QUIT`: starts immediate termination. The supervisor will send a `QUIT` signal to its supervised processes, causing them to exit immediately.

Expand Down Expand Up @@ -603,6 +612,20 @@ that you set in production only. This is what Rails 8's default Puma config look

**Note**: phased restarts are not supported currently because the plugin requires [app preloading](https://github.com/puma/puma?tab=readme-ov-file#cluster-mode) to work.

### Running as a fork or asynchronously

By default, the Puma plugin will fork additional processes for each worker and dispatcher so that they run in different processes. This provides the best isolation and performance, but can have additional memory usage.

Alternatively, workers and dispatchers can be run within the same Puma process(s). To do so just configure the plugin as:

```ruby
plugin :solid_queue
solid_queue_mode :async
```

Note that in this case, the `processes` configuration option will be ignored. See also [Fork vs. async mode](#fork-vs-async-mode).


## Jobs and transactional integrity
:warning: Having your jobs in the same ACID-compliant database as your application data enables a powerful yet sharp tool: taking advantage of transactional integrity to ensure some action in your app is not committed unless your job is also committed and vice versa, and ensuring that your job won't be enqueued until the transaction within which you're enqueuing it is committed. This can be very powerful and useful, but it can also backfire if you base some of your logic on this behaviour, and in the future, you move to another active job backend, or if you simply move Solid Queue to its own database, and suddenly the behaviour changes under you. Because this can be quite tricky and many people shouldn't need to worry about it, by default Solid Queue is configured in a different database as the main app.

Expand Down
88 changes: 67 additions & 21 deletions lib/puma/plugin/solid_queue.rb
Original file line number Diff line number Diff line change
@@ -1,41 +1,87 @@
require "puma/plugin"

module Puma
class DSL
def solid_queue_mode(mode = :fork)
@options[:solid_queue_mode] = mode.to_sym
end
end
end

Puma::Plugin.create do
attr_reader :puma_pid, :solid_queue_pid, :log_writer, :solid_queue_supervisor

def start(launcher)
@log_writer = launcher.log_writer
@puma_pid = $$

in_background do
monitor_solid_queue
if launcher.options[:solid_queue_mode] == :async
start_async(launcher)
else
start_forked(launcher)
end
end

if Gem::Version.new(Puma::Const::VERSION) < Gem::Version.new("7")
launcher.events.on_booted do
@solid_queue_pid = fork do
Thread.new { monitor_puma }
SolidQueue::Supervisor.start
private
def start_forked(launcher)
in_background do
monitor_solid_queue
end

if Gem::Version.new(Puma::Const::VERSION) < Gem::Version.new("7")
launcher.events.on_booted do
@solid_queue_pid = fork do
Thread.new { monitor_puma }
SolidQueue::Supervisor.start(mode: :fork)
end
end

launcher.events.on_stopped { stop_solid_queue_fork }
launcher.events.on_restart { stop_solid_queue_fork }
else
launcher.events.after_booted do
@solid_queue_pid = fork do
Thread.new { monitor_puma }
start_solid_queue(mode: :fork)
end
end

launcher.events.after_stopped { stop_solid_queue_fork }
launcher.events.before_restart { stop_solid_queue_fork }
end
end

launcher.events.on_stopped { stop_solid_queue }
launcher.events.on_restart { stop_solid_queue }
else
launcher.events.after_booted do
@solid_queue_pid = fork do
Thread.new { monitor_puma }
SolidQueue::Supervisor.start
def start_async(launcher)
if Gem::Version.new(Puma::Const::VERSION) < Gem::Version.new("7")
launcher.events.on_booted do
start_solid_queue(mode: :async, standalone: false)
end

launcher.events.on_stopped { solid_queue_supervisor&.stop }

launcher.events.on_restart do
solid_queue_supervisor&.stop
start_solid_queue(mode: :async, standalone: false)
end
else
launcher.events.after_booted do
start_solid_queue(mode: :async, standalone: false)
end

launcher.events.after_stopped { solid_queue_supervisor&.stop }

launcher.events.before_restart do
solid_queue_supervisor&.stop
start_solid_queue(mode: :async, standalone: false)
end
end
end

launcher.events.after_stopped { stop_solid_queue }
launcher.events.before_restart { stop_solid_queue }
def start_solid_queue(**options)
@solid_queue_supervisor = SolidQueue::Supervisor.start(**options)
end
end

private
def stop_solid_queue
def stop_solid_queue_fork
Process.waitpid(solid_queue_pid, Process::WNOHANG)
log "Stopping Solid Queue..."
Process.kill(:INT, solid_queue_pid) if solid_queue_pid
Expand All @@ -48,7 +94,7 @@ def monitor_puma
end

def monitor_solid_queue
monitor(:solid_queue_dead?, "Detected Solid Queue has gone away, stopping Puma...")
monitor(:solid_queue_fork_dead?, "Detected Solid Queue has gone away, stopping Puma...")
end

def monitor(process_dead, message)
Expand All @@ -62,7 +108,7 @@ def monitor(process_dead, message)
end
end

def solid_queue_dead?
def solid_queue_fork_dead?
if solid_queue_started?
Process.waitpid(solid_queue_pid, Process::WNOHANG)
end
Expand Down
10 changes: 10 additions & 0 deletions lib/solid_queue/app_executor.rb
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,15 @@ def handle_thread_error(error)
SolidQueue.on_thread_error.call(error)
end
end

def create_thread(&block)
Thread.new do
Thread.current.name = name
block.call
rescue Exception => exception
handle_thread_error(exception)
raise
end
end
end
end
50 changes: 50 additions & 0 deletions lib/solid_queue/async_supervisor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# frozen_string_literal: true

module SolidQueue
class AsyncSupervisor < Supervisor
after_shutdown :terminate_gracefully, unless: :standalone?

def stop
super
@thread&.join
end

private
def supervise
if standalone? then super
else
@thread = create_thread { super }
end
end

def check_and_replace_terminated_processes
terminated_threads = process_instances.select { |thread_id, instance| !instance.alive? }
terminated_threads.each { |thread_id, instance| replace_thread(thread_id, instance) }
end

def replace_thread(thread_id, instance)
SolidQueue.instrument(:replace_thread, supervisor_pid: ::Process.pid) do |payload|
payload[:thread] = instance

error = Processes::ThreadTerminatedError.new(terminated_instance.name)
release_claimed_jobs_by(terminated_instance, with_error: error)

start_process(configured_processes.delete(thread_id))
end
end

def perform_graceful_termination
process_instances.values.each(&:stop)

Timer.wait_until(SolidQueue.shutdown_timeout, -> { all_processes_terminated? })
end

def perform_immediate_termination
exit!
end

def all_processes_terminated?
process_instances.values.none?(&:alive?)
end
end
end
4 changes: 4 additions & 0 deletions lib/solid_queue/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ class Cli < Thor
desc: "Path to config file (default: #{Configuration::DEFAULT_CONFIG_FILE_PATH}).",
banner: "SOLID_QUEUE_CONFIG"

class_option :mode, type: :string, default: "fork", enum: %w[ fork async ],
desc: "Whether to fork processes for workers and dispatchers (fork) or to run these in the same process as the supervisor (async) (default: fork).",
banner: "SOLID_QUEUE_SUPERVISOR_MODE"

class_option :recurring_schedule_file, type: :string,
desc: "Path to recurring schedule definition (default: #{Configuration::DEFAULT_RECURRING_SCHEDULE_FILE_PATH}).",
banner: "SOLID_QUEUE_RECURRING_SCHEDULE"
Expand Down
17 changes: 16 additions & 1 deletion lib/solid_queue/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ def error_messages
end
end

def mode
@options[:mode].to_s.inquiry
end

def standalone?
mode.fork? || @options[:standalone]
end

private
attr_reader :options

Expand Down Expand Up @@ -84,6 +92,8 @@ def ensure_correctly_sized_thread_pool

def default_options
{
mode: ENV["SOLID_QUEUE_SUPERVISOR_MODE"] || :fork,
standalone: true,
config_file: Rails.root.join(ENV["SOLID_QUEUE_CONFIG"] || DEFAULT_CONFIG_FILE_PATH),
recurring_schedule_file: Rails.root.join(ENV["SOLID_QUEUE_RECURRING_SCHEDULE"] || DEFAULT_RECURRING_SCHEDULE_FILE_PATH),
only_work: false,
Expand All @@ -110,7 +120,12 @@ def skip_recurring_tasks?

def workers
workers_options.flat_map do |worker_options|
processes = worker_options.fetch(:processes, WORKER_DEFAULTS[:processes])
processes = if mode.fork?
worker_options.fetch(:processes, WORKER_DEFAULTS[:processes])
else
1
end

processes.times.map { Process.new(:worker, worker_options.with_defaults(WORKER_DEFAULTS)) }
end
end
Expand Down
1 change: 1 addition & 0 deletions lib/solid_queue/dispatcher.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
module SolidQueue
class Dispatcher < Processes::Poller
include LifecycleHooks

attr_reader :batch_size

after_boot :run_start_hooks
Expand Down
68 changes: 68 additions & 0 deletions lib/solid_queue/fork_supervisor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# frozen_string_literal: true

module SolidQueue
class ForkSupervisor < Supervisor
private

def perform_graceful_termination
term_forks

Timer.wait_until(SolidQueue.shutdown_timeout, -> { all_processes_terminated? }) do
reap_terminated_forks
end
end

def perform_immediate_termination
quit_forks
end

def term_forks
signal_processes(process_instances.keys, :TERM)
end

def quit_forks
signal_processes(process_instances.keys, :QUIT)
end

def check_and_replace_terminated_processes
loop do
pid, status = ::Process.waitpid2(-1, ::Process::WNOHANG)
break unless pid

replace_fork(pid, status)
end
end

def reap_terminated_forks
loop do
pid, status = ::Process.waitpid2(-1, ::Process::WNOHANG)
break unless pid

if (terminated_fork = process_instances.delete(pid)) && !status.exited? || status.exitstatus > 0
error = Processes::ProcessExitError.new(status)
release_claimed_jobs_by(terminated_fork, with_error: error)
end

configured_processes.delete(pid)
end
rescue SystemCallError
# All children already reaped
end

def replace_fork(pid, status)
SolidQueue.instrument(:replace_fork, supervisor_pid: ::Process.pid, pid: pid, status: status) do |payload|
if terminated_fork = process_instances.delete(pid)
payload[:fork] = terminated_fork
error = Processes::ProcessExitError.new(status)
release_claimed_jobs_by(terminated_fork, with_error: error)

start_process(configured_processes.delete(pid))
end
end
end

def all_processes_terminated?
process_instances.empty?
end
end
end
Loading