はじめに
最近はRuby on RailsでDelayed::Job (Gemパッケージ名delayed_job) とDelayedJob ActiveRecord Backend (Gemパッケージ名delayed_job_active_record)を使い始めた。まだ全然使いこなせていないが手元の情報をこのページに記す。
各々のGitHubでは以下のように概要説明がある。
delayed_job
Database based asynchronous priority queue system – Extracted from Shopify
delayed_job_active_record
ActiveRecord backend integration for DelayedJob 3.0+
筆者はRails 7からユーザーとなったのでRails 4.2 (リリース日2014年12月) で導入されたActive Jobが前提にある。RubyGems.orgで確認可能な最古のバージョンを見ると delayed_job は2009年9月にリリースされたv1.8.1であり、 delayed_job_active_record は2011年9月にリリースされたv0.2.0であった。Active JobがないRails 4.2未満のアプリケーションでは、これら2つのGemパッケージを利用してジョブの処理をしていたのだろうと想像する。
そのため、本稿ではActive Jobと連携はせずにdelayed_job v4.1.11とdelayed_job_active_record v4.1.8のみの利用にして、別稿でActive Jobと連携することにしたい。
前提の開発環境
本稿は以下の開発環境を前提とする。
$ uname -m arm64 $ sysctl -n machdep.cpu.brand_string Apple M1 Max $ sw_vers ProductName: macOS ProductVersion: 14.3.1 BuildVersion: 23D60 $ sqlite3 --version 3.43.2 2023-10-10 13:08:14 1b37c146ee9ebb7acd0160c0ab1fd11017a419fa8a3187386ed8cb32b709aapl (64-bit) $ rbenv --version rbenv 1.2.0-62-ga632465 $ ruby --version ruby 3.2.2 (2023-03-30 revision e51014f9c0) [arm64-darwin21] $ bundle --version Bundler version 2.4.22 $ rails --version Rails 7.1.3.2
サンプルプロジェクトの作成
delayed_job と delayed_job_active_record の動作確認用にRailsプロジェクトを作成する。
rails new delayed_job_sample cd delayed_job_sample rails db:create # データベースも先んじて作成しておく
作成したRailsプロジェクト内の Gemfile に以下の記述を加える。
gem 'delayed_job', "4.1.11" gem 'delayed_job_active_record', "4.1.8"
Bundler でGemパッケージをインストールする。
bundle install
GitHubにある delayed_job_active_record のREADMEはInstallationで bundle install 後に以下のコマンド実行を促している。それに従い実行する。
rails g delayed_job:active_record rake db:migrate
サーバーを起動する。
rails server
以上の手順で http://localhost:3000 にアクセスできる。
ジョブの定義
続いてジョブを定義する。
GitHubにある delayed_job のREADMEを見るとCustom Jobsでジョブについて以下のように述べている。
Jobs are simple ruby objects with a method called perform. Any object which responds to perform can be stuffed into the jobs table. Job objects are serialized to yaml so that they can later be resurrected by the job runner.
上の内容から筆者は大雑把に以下のように捉えた。
- ジョブは
performメソッドが呼ばれるRubyオブジェクトである - ジョブはジョブテーブルに格納される
- ジョブオブジェクトは
yamlにシリアライズされる - ジョブオブジェクトはジョブランナーによって
yamlから復元される
READMEは続けてコードでジョブ定義とキューイング方法を例示しており、以下がそのコードである。ジョブ定義に NewsletterJob 、キューイングの仕方に Delayed::Job.enqueue を挙げている。READMEのCustom Jobsの冒頭の説明通り、ジョブには perform メソッドがある。 delayed_job は実装者に特定のクラスの継承やモジュールのインクルードを求めないこともわかった。
NewsletterJob = Struct.new(:text, :emails) do
def perform
emails.each { |e| NewsletterMailer.deliver_text_to_email(text, e) }
end
end
Delayed::Job.enqueue NewsletterJob.new('lorem ipsum...', Customers.pluck(:email))
READMEは続けてHooksを説明している。筆者はコールバック相当と捉えた。説明には NewsletterJob の派生クラス ParanoidNewsletterJob が登場する。
例として用いる正常系のジョブ
よりシンプルなジョブが欲しかったため、以下のジョブを定義した。このジョブは正常終了することを想定して記述した。
touch app/models/task_job.rb
class TaskJob
def enqueue(job)
puts "TaskJob#enqueue"
end
def before(job)
puts 'TaskJob#before'
end
def perform
puts "TaskJob#perform"
end
def success(job)
puts "TaskJob#success"
end
def error(job, exception)
puts "TaskJob#error"
end
def failure(job)
puts "TaskJob#failure"
end
def after(job)
puts 'TaskJob#after'
end
end
例として用いる異常系のジョブ
perform メソッドでエラーが起きる場合を想定して BadTaskJob を作成する。
touch app/models/bad_task_job.rb
class BadTaskJob
def enqueue(job)
puts "BadTaskJob#enqueue"
end
def before(job)
puts 'BadTaskJob#before'
end
def perform
puts "BadTaskJob#perform"
raise StandardError.new("Failed to process by something.")
end
def success(job)
puts "BadTaskJob#success"
end
def error(job, exception)
puts "BadTaskJob#error"
end
def failure(job)
puts "BadTaskJob#failure"
end
def after(job)
puts 'BadTaskJob#after'
end
end
ジョブのHookがどのように呼ばれるかについては後ほど触れる。
キューイング
手早くジョブをキューイングするためRailsコンソールを使う。
rails console
ジョブ TaskJob を作成してキューイングする。
Delayed::Job.enqueue TaskJob.new
キューイングしたジョブを確認する。
Delayed::Job.last
Delayed::Job.last の応答結果を見ると、ジョブは delayed_jobs テーブルに格納していることがわかる。
Delayed::Backend::ActiveRecord::Job Load (0.3ms) SELECT "delayed_jobs".* FROM "delayed_jobs" ORDER BY "delayed_jobs"."id" DESC LIMIT ? [["LIMIT", 1]]
その delayed_jobs テーブルも併せて確認する。
rails dbconsole
簡単なSQLで確認を済ます。
SELECT * FROM delayed_jobs;
以下はそのSQLの応答結果である。カラムの run_at はジョブが実行される時刻を示す。
id|priority|attempts|handler|last_error|run_at|locked_at|failed_at|locked_by|queue|created_at|updated_at
1|0|0|--- !ruby/object:TaskJob {}
||2024-03-09 10:50:49.163368||||default|2024-03-09 10:50:49.163400|2024-03-09 10:50:49.163400
ジョブワーカーの起動方法
ターミナルで rake jobs:work を実行するとジョブワーカーが起動してジョブを処理し始める。
rake jobs:work
処理を中断したい場合は Ctrl C を入力する。
delayed_jobの設定
GitHubの delayed_job にあるREADMEのGory Detailsによると、Delayed::Jobの設定は config/initializers/delayed_job_config.rb に記述するようだ。同節には設定ファイルのサンプルが載っている。以下にその内容を引用する。
# config/initializers/delayed_job_config.rb Delayed::Worker.destroy_failed_jobs = false Delayed::Worker.sleep_delay = 60 Delayed::Worker.max_attempts = 3 Delayed::Worker.max_run_time = 5.minutes Delayed::Worker.read_ahead = 10 Delayed::Worker.default_queue_name = 'default' Delayed::Worker.delay_jobs = !Rails.env.test? Delayed::Worker.raise_signal_exceptions = :term Delayed::Worker.logger = Logger.new(File.join(Rails.root, 'log', 'delayed_job.log'))
設定の初期値はソースコードに記載がある。
https://github.com/collectiveidea/delayed_job/blob/v4.1.11/lib/delayed/worker.rb
module Delayed
class Worker # rubocop:disable ClassLength
DEFAULT_LOG_LEVEL = 'info'.freeze
DEFAULT_SLEEP_DELAY = 5
DEFAULT_MAX_ATTEMPTS = 25
DEFAULT_MAX_RUN_TIME = 4.hours
DEFAULT_DEFAULT_PRIORITY = 0
DEFAULT_DELAY_JOBS = true
DEFAULT_QUEUES = [].freeze
DEFAULT_QUEUE_ATTRIBUTES = HashWithIndifferentAccess.new.freeze
DEFAULT_READ_AHEAD = 5
なお、 max_attempts, max_run_time, destroy_failed_jobs, queue_name, reschedule_at についてはジョブ定義時にメソッドを実装することで指定できるようである。詳しくはGitHub delayed_job のREADMEを参照されたい。
ジョブのHook
ジョブのHookがどのように呼ばれるかについて追う。
対象のHookは enqueue 、 before 、 perform 、 success 、 error 、 failure 、 after である。
正常系のジョブ - TaskJob
正常系のジョブとして TaskJob をキューに投入する。投入直後、 TaskJob#enqueue と出力されるのでHookの enqueue を確認できた。
irb(main):001> Delayed::Job.enqueue TaskJob.new
TaskJob#enqueue
TRANSACTION (0.0ms) begin transaction
Delayed::Backend::ActiveRecord::Job Create (0.3ms) INSERT INTO "delayed_jobs" ("priority", "attempts", "handler", "last_error", "run_at", "locked_at", "failed_at", "locked_by", "queue", "created_at", "updated_at") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) RETURNING "id" [["priority", 0], ["attempts", 0], ["handler", "--- !ruby/object:TaskJob {}\n"], ["last_error", nil], ["run_at", "2024-03-10 12:21:55.816135"], ["locked_at", nil], ["failed_at", nil], ["locked_by", nil], ["queue", "default"], ["created_at", "2024-03-10 12:21:55.816158"], ["updated_at", "2024-03-10 12:21:55.816158"]]
TRANSACTION (0.1ms) commit transaction
=>
#<Delayed::Backend::ActiveRecord::Job:0x000000011331fc90
id: 1,
priority: 0,
attempts: 0,
handler: "--- !ruby/object:TaskJob {}\n",
last_error: nil,
run_at: Sun, 10 Mar 2024 12:21:55.816135000 UTC +00:00,
locked_at: nil,
failed_at: nil,
locked_by: nil,
queue: "default",
created_at: Sun, 10 Mar 2024 12:21:55.816158000 UTC +00:00,
updated_at: Sun, 10 Mar 2024 12:21:55.816158000 UTC +00:00>
この時点で delayed_job テーブルにジョブのレコードが存在する。
ジョブワーカー (rake jobs:work をしたターミナル) を確認する。ジョブワーカーは TaskJob のジョブを実行して以下の内容を出力した。
[Worker(host:Suppaman.local pid:56408)] Job TaskJob (id=1) (queue=default) RUNNING TaskJob#before TaskJob#perform TaskJob#success TaskJob#after [Worker(host:Suppaman.local pid:56408)] Job TaskJob (id=1) (queue=default) COMPLETED after 0.0012 [Worker(host:Suppaman.local pid:56408)] 1 jobs processed at 200.0000 j/s, 0 failed
上の出力結果からHookの before はperformの前に呼ばれ、performの後にHookの success が呼ばれ、その次にHookの after が呼ばれることがわかった。
なお、performが正常に終了した場合はlib/delayed/worker.rb#L232にある通りジョブのレコードを削除する。
異常系のジョブ - BadTaskJob
次は異常系のジョブとして BadTaskJob を投入する。Hookの enqueue はTaskJobと同じである。
irb(main):002> Delayed::Job.enqueue BadTaskJob.new
BadTaskJob#enqueue
TRANSACTION (0.1ms) begin transaction
Delayed::Backend::ActiveRecord::Job Create (0.7ms) INSERT INTO "delayed_jobs" ("priority", "attempts", "handler", "last_error", "run_at", "locked_at", "failed_at", "locked_by", "queue", "created_at", "updated_at") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) RETURNING "id" [["priority", 0], ["attempts", 0], ["handler", "--- !ruby/object:BadTaskJob {}\n"], ["last_error", nil], ["run_at", "2024-03-10 12:32:09.683335"], ["locked_at", nil], ["failed_at", nil], ["locked_by", nil], ["queue", "default"], ["created_at", "2024-03-10 12:32:09.683398"], ["updated_at", "2024-03-10 12:32:09.683398"]]
TRANSACTION (0.1ms) commit transaction
=>
#<Delayed::Backend::ActiveRecord::Job:0x0000000108fd5878
id: 2,
priority: 0,
attempts: 0,
handler: "--- !ruby/object:BadTaskJob {}\n",
last_error: nil,
run_at: Sun, 10 Mar 2024 12:32:09.683335000 UTC +00:00,
locked_at: nil,
failed_at: nil,
locked_by: nil,
queue: "default",
created_at: Sun, 10 Mar 2024 12:32:09.683398000 UTC +00:00,
updated_at: Sun, 10 Mar 2024 12:32:09.683398000 UTC +00:00>
ジョブワーカー (rake jobs:work をしたターミナル) を確認する。 BadTaskJob のジョブが実行されると以下の出力がされた。
[Worker(host:Suppaman.local pid:56408)] Job BadTaskJob (id=2) (queue=default) RUNNING BadTaskJob#before BadTaskJob#perform BadTaskJob#error BadTaskJob#after [Worker(host:Suppaman.local pid:56408)] Job BadTaskJob (id=2) (queue=default) FAILED (0 prior attempts) with StandardError: Failed to process by something. [Worker(host:Suppaman.local pid:56408)] 1 jobs processed at 54.2711 j/s, 1 failed
peformでraiseをしているため、Hookの error が呼ばれたのだと思われる。その次にHookの after が呼ばれる。
config/initializers/delayed_job_config.rb に Delayed::Worker.max_attempts = 3 と記述しているため、ジョブワーカーは再試行上限範囲内と捉えて、数分後に BadTaskJob を実行した。そして以下の出力を得た。Hookの流れは前回のジョブ実行時と同じである。
[Worker(host:Suppaman.local pid:56408)] Job BadTaskJob (id=2) (queue=default) RUNNING BadTaskJob#before BadTaskJob#perform BadTaskJob#error BadTaskJob#after [Worker(host:Suppaman.local pid:56408)] Job BadTaskJob (id=2) (queue=default) FAILED (1 prior attempts) with StandardError: Failed to process by something. [Worker(host:Suppaman.local pid:56408)] 1 jobs processed at 119.2037 j/s, 1 failed
さらに数分後にジョブワーカーはジョブを再実行した。そして以下の出力を得た。前回のジョブ実行と異なるのはHook after の後にHook failure が呼ばれた点である。
[Worker(host:Suppaman.local pid:56408)] Job BadTaskJob (id=2) (queue=default) RUNNING BadTaskJob#before BadTaskJob#perform BadTaskJob#error BadTaskJob#after [Worker(host:Suppaman.local pid:56408)] Job BadTaskJob (id=2) (queue=default) FAILED (2 prior attempts) with StandardError: Failed to process by something. [Worker(host:Suppaman.local pid:56408)] Job BadTaskJob (id=2) (queue=default) FAILED permanently because of 3 consecutive failures BadTaskJob#failure [Worker(host:Suppaman.local pid:56408)] 1 jobs processed at 132.2751 j/s, 1 failed
上の出力結果から当該ジョブの実行回数が Delayed::Worker.max_attempts に到達した場合、Hookの failure が呼ばれるとわかった。
DeserializationError
ジョブをキーに入れると、応答結果に handler を確認でき、値は yaml 形式と推測できる。GitHubにある delayed_job のREADMEを見るとCustom Jobsで述べられているようにジョブオブジェクトが yaml にシリアライズされていることがわかる。おそらくこの yaml 形式のデータはこのあと復元する際に使われるのであろう。
irb(main):001> Delayed::Job.enqueue TaskJob.new
TaskJob#enqueue
TRANSACTION (0.0ms) begin transaction
Delayed::Backend::ActiveRecord::Job Create (0.6ms) INSERT INTO "delayed_jobs" ("priority", "attempts", "handler", "last_error", "run_at", "locked_at", "failed_at", "locked_by", "queue", "created_at", "updated_at") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) RETURNING "id" [["priority", 0], ["attempts", 0], ["handler", "--- !ruby/object:TaskJob {}\n"], ["last_error", nil], ["run_at", "2024-03-11 01:53:50.625303"], ["locked_at", nil], ["failed_at", nil], ["locked_by", nil], ["queue", "default"], ["created_at", "2024-03-11 01:53:50.625341"], ["updated_at", "2024-03-11 01:53:50.625341"]]
TRANSACTION (0.4ms) commit transaction
=>
#<Delayed::Backend::ActiveRecord::Job:0x000000010765b2f8
id: 3,
priority: 0,
attempts: 0,
handler: "--- !ruby/object:TaskJob {}\n",
last_error: nil,
run_at: Mon, 11 Mar 2024 01:53:50.625303000 UTC +00:00,
locked_at: nil,
failed_at: nil,
locked_by: nil,
queue: "default",
created_at: Mon, 11 Mar 2024 01:53:50.625341000 UTC +00:00,
updated_at: Mon, 11 Mar 2024 01:53:50.625341000 UTC +00:00>
そうだとすると、ジョブワーカーを起動する前に TaskJob のクラス名を GoodTaskJob などに変更するとエラーを引き起こすと思われる。そこでクラス名を変更してからジョブワーカーを起動してみる。
rake jobs:work
ジョブワーカーは以下のように Delayed::DeserializationError を出力した。
[Worker(host:MilleCrepes1.local pid:54199)] Starting job worker
[Worker(host:MilleCrepes1.local pid:54199)] Job TaskJob (id=58) (queue=default) RUNNING
[Worker(host:MilleCrepes1.local pid:54199)] Job TaskJob (id=58) (queue=default) FAILED permanently with Delayed::DeserializationError: Job failed to load: undefined class/module TaskJob. Handler: "--- !ruby/object:TaskJob {}\n"
worker.rb#L240の実装を見たが呼び出し可能ならHookの failure を呼ぶようだ。上の確認例ではyamlからオブジェクトに変換できていないため、 failure は呼ばれていない。
デキューされたジョブの処理フロー
前述までの確認からデキューされたジョブはどのように処理されるか下図のフローチャートにまとめた。

おわりに
Railsで delayed_job と delayed_job_active_record を使った場合の動作確認をRailsコンソールと rake jobs:work で実施した。その動作確認で得た情報からジョブがデキューされた後のフローを示した。
今後にActive Jobと連携する場合の delayed_job について確認したい。
参考資料
- https://github.com/collectiveidea/delayed_job/tree/v4.1.11
- https://github.com/collectiveidea/delayed_job_active_record/tree/v4.1.8
- https://rubygems.org/gems/delayed_job
- https://rubygems.org/gems/delayed_job/versions/1.8.1
- https://rubygems.org/gems/delayed_job_active_record
- https://rubygems.org/gems/delayed_job_active_record/versions/0.2.0
