はじめに
最近はRuby on RailsでDelayed::Job (Gemパッケージ名delayed_job) とDelayedJob ActiveRecord Backend (Gemパッケージ名delayed_job_active_record)を使い始めた。まだ全然使いこなせていないが手元の情報をこのページに記す。
各々のGitHubでは以下のように概要説明がある。
delayed_job
Database based asynchronous priority queue system – Extracted from Shopify
delayed_job_active_record
ActiveRecord backend integration for DelayedJob 3.0+
筆者はRails 7からユーザーとなったのでRails 4.2 (リリース日2014年12月) で導入されたActive Jobが前提にある。RubyGems.orgで確認可能な最古のバージョンを見ると delayed_job
は2009年9月にリリースされたv1.8.1であり、 delayed_job_active_record
は2011年9月にリリースされたv0.2.0であった。Active JobがないRails 4.2未満のアプリケーションでは、これら2つのGemパッケージを利用してジョブの処理をしていたのだろうと想像する。
そのため、本稿ではActive Jobと連携はせずにdelayed_job v4.1.11とdelayed_job_active_record v4.1.8のみの利用にして、別稿でActive Jobと連携することにしたい。
前提の開発環境
本稿は以下の開発環境を前提とする。
$ uname -m arm64 $ sysctl -n machdep.cpu.brand_string Apple M1 Max $ sw_vers ProductName: macOS ProductVersion: 14.3.1 BuildVersion: 23D60 $ sqlite3 --version 3.43.2 2023-10-10 13:08:14 1b37c146ee9ebb7acd0160c0ab1fd11017a419fa8a3187386ed8cb32b709aapl (64-bit) $ rbenv --version rbenv 1.2.0-62-ga632465 $ ruby --version ruby 3.2.2 (2023-03-30 revision e51014f9c0) [arm64-darwin21] $ bundle --version Bundler version 2.4.22 $ rails --version Rails 7.1.3.2
サンプルプロジェクトの作成
delayed_job
と delayed_job_active_record
の動作確認用にRailsプロジェクトを作成する。
rails new delayed_job_sample cd delayed_job_sample rails db:create # データベースも先んじて作成しておく
作成したRailsプロジェクト内の Gemfile
に以下の記述を加える。
gem 'delayed_job', "4.1.11" gem 'delayed_job_active_record', "4.1.8"
Bundler
でGemパッケージをインストールする。
bundle install
GitHubにある delayed_job_active_record
のREADMEはInstallationで bundle install
後に以下のコマンド実行を促している。それに従い実行する。
rails g delayed_job:active_record rake db:migrate
サーバーを起動する。
rails server
以上の手順で http://localhost:3000 にアクセスできる。
ジョブの定義
続いてジョブを定義する。
GitHubにある delayed_job
のREADMEを見るとCustom Jobsでジョブについて以下のように述べている。
Jobs are simple ruby objects with a method called perform. Any object which responds to perform can be stuffed into the jobs table. Job objects are serialized to yaml so that they can later be resurrected by the job runner.
上の内容から筆者は大雑把に以下のように捉えた。
- ジョブは
perform
メソッドが呼ばれるRubyオブジェクトである - ジョブはジョブテーブルに格納される
- ジョブオブジェクトは
yaml
にシリアライズされる - ジョブオブジェクトはジョブランナーによって
yaml
から復元される
READMEは続けてコードでジョブ定義とキューイング方法を例示しており、以下がそのコードである。ジョブ定義に NewsletterJob
、キューイングの仕方に Delayed::Job.enqueue
を挙げている。READMEのCustom Jobsの冒頭の説明通り、ジョブには perform
メソッドがある。 delayed_job
は実装者に特定のクラスの継承やモジュールのインクルードを求めないこともわかった。
NewsletterJob = Struct.new(:text, :emails) do def perform emails.each { |e| NewsletterMailer.deliver_text_to_email(text, e) } end end Delayed::Job.enqueue NewsletterJob.new('lorem ipsum...', Customers.pluck(:email))
READMEは続けてHooksを説明している。筆者はコールバック相当と捉えた。説明には NewsletterJob
の派生クラス ParanoidNewsletterJob
が登場する。
例として用いる正常系のジョブ
よりシンプルなジョブが欲しかったため、以下のジョブを定義した。このジョブは正常終了することを想定して記述した。
touch app/models/task_job.rb
class TaskJob def enqueue(job) puts "TaskJob#enqueue" end def before(job) puts 'TaskJob#before' end def perform puts "TaskJob#perform" end def success(job) puts "TaskJob#success" end def error(job, exception) puts "TaskJob#error" end def failure(job) puts "TaskJob#failure" end def after(job) puts 'TaskJob#after' end end
例として用いる異常系のジョブ
perform
メソッドでエラーが起きる場合を想定して BadTaskJob
を作成する。
touch app/models/bad_task_job.rb
class BadTaskJob def enqueue(job) puts "BadTaskJob#enqueue" end def before(job) puts 'BadTaskJob#before' end def perform puts "BadTaskJob#perform" raise StandardError.new("Failed to process by something.") end def success(job) puts "BadTaskJob#success" end def error(job, exception) puts "BadTaskJob#error" end def failure(job) puts "BadTaskJob#failure" end def after(job) puts 'BadTaskJob#after' end end
ジョブのHookがどのように呼ばれるかについては後ほど触れる。
キューイング
手早くジョブをキューイングするためRailsコンソールを使う。
rails console
ジョブ TaskJob
を作成してキューイングする。
Delayed::Job.enqueue TaskJob.new
キューイングしたジョブを確認する。
Delayed::Job.last
Delayed::Job.last
の応答結果を見ると、ジョブは delayed_jobs
テーブルに格納していることがわかる。
Delayed::Backend::ActiveRecord::Job Load (0.3ms) SELECT "delayed_jobs".* FROM "delayed_jobs" ORDER BY "delayed_jobs"."id" DESC LIMIT ? [["LIMIT", 1]]
その delayed_jobs
テーブルも併せて確認する。
rails dbconsole
簡単なSQLで確認を済ます。
SELECT * FROM delayed_jobs;
以下はそのSQLの応答結果である。カラムの run_at
はジョブが実行される時刻を示す。
id|priority|attempts|handler|last_error|run_at|locked_at|failed_at|locked_by|queue|created_at|updated_at 1|0|0|--- !ruby/object:TaskJob {} ||2024-03-09 10:50:49.163368||||default|2024-03-09 10:50:49.163400|2024-03-09 10:50:49.163400
ジョブワーカーの起動方法
ターミナルで rake jobs:work
を実行するとジョブワーカーが起動してジョブを処理し始める。
rake jobs:work
処理を中断したい場合は Ctrl C
を入力する。
delayed_jobの設定
GitHubの delayed_job
にあるREADMEのGory Detailsによると、Delayed::Jobの設定は config/initializers/delayed_job_config.rb
に記述するようだ。同節には設定ファイルのサンプルが載っている。以下にその内容を引用する。
# config/initializers/delayed_job_config.rb Delayed::Worker.destroy_failed_jobs = false Delayed::Worker.sleep_delay = 60 Delayed::Worker.max_attempts = 3 Delayed::Worker.max_run_time = 5.minutes Delayed::Worker.read_ahead = 10 Delayed::Worker.default_queue_name = 'default' Delayed::Worker.delay_jobs = !Rails.env.test? Delayed::Worker.raise_signal_exceptions = :term Delayed::Worker.logger = Logger.new(File.join(Rails.root, 'log', 'delayed_job.log'))
設定の初期値はソースコードに記載がある。
https://github.com/collectiveidea/delayed_job/blob/v4.1.11/lib/delayed/worker.rb
module Delayed class Worker # rubocop:disable ClassLength DEFAULT_LOG_LEVEL = 'info'.freeze DEFAULT_SLEEP_DELAY = 5 DEFAULT_MAX_ATTEMPTS = 25 DEFAULT_MAX_RUN_TIME = 4.hours DEFAULT_DEFAULT_PRIORITY = 0 DEFAULT_DELAY_JOBS = true DEFAULT_QUEUES = [].freeze DEFAULT_QUEUE_ATTRIBUTES = HashWithIndifferentAccess.new.freeze DEFAULT_READ_AHEAD = 5
なお、 max_attempts
, max_run_time
, destroy_failed_jobs
, queue_name
, reschedule_at
についてはジョブ定義時にメソッドを実装することで指定できるようである。詳しくはGitHub delayed_job
のREADMEを参照されたい。
ジョブのHook
ジョブのHookがどのように呼ばれるかについて追う。
対象のHookは enqueue
、 before
、 perform
、 success
、 error
、 failure
、 after
である。
正常系のジョブ - TaskJob
正常系のジョブとして TaskJob
をキューに投入する。投入直後、 TaskJob#enqueue
と出力されるのでHookの enqueue
を確認できた。
irb(main):001> Delayed::Job.enqueue TaskJob.new TaskJob#enqueue TRANSACTION (0.0ms) begin transaction Delayed::Backend::ActiveRecord::Job Create (0.3ms) INSERT INTO "delayed_jobs" ("priority", "attempts", "handler", "last_error", "run_at", "locked_at", "failed_at", "locked_by", "queue", "created_at", "updated_at") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) RETURNING "id" [["priority", 0], ["attempts", 0], ["handler", "--- !ruby/object:TaskJob {}\n"], ["last_error", nil], ["run_at", "2024-03-10 12:21:55.816135"], ["locked_at", nil], ["failed_at", nil], ["locked_by", nil], ["queue", "default"], ["created_at", "2024-03-10 12:21:55.816158"], ["updated_at", "2024-03-10 12:21:55.816158"]] TRANSACTION (0.1ms) commit transaction => #<Delayed::Backend::ActiveRecord::Job:0x000000011331fc90 id: 1, priority: 0, attempts: 0, handler: "--- !ruby/object:TaskJob {}\n", last_error: nil, run_at: Sun, 10 Mar 2024 12:21:55.816135000 UTC +00:00, locked_at: nil, failed_at: nil, locked_by: nil, queue: "default", created_at: Sun, 10 Mar 2024 12:21:55.816158000 UTC +00:00, updated_at: Sun, 10 Mar 2024 12:21:55.816158000 UTC +00:00>
この時点で delayed_job
テーブルにジョブのレコードが存在する。
ジョブワーカー (rake jobs:work
をしたターミナル) を確認する。ジョブワーカーは TaskJob
のジョブを実行して以下の内容を出力した。
[Worker(host:Suppaman.local pid:56408)] Job TaskJob (id=1) (queue=default) RUNNING TaskJob#before TaskJob#perform TaskJob#success TaskJob#after [Worker(host:Suppaman.local pid:56408)] Job TaskJob (id=1) (queue=default) COMPLETED after 0.0012 [Worker(host:Suppaman.local pid:56408)] 1 jobs processed at 200.0000 j/s, 0 failed
上の出力結果からHookの before
はperformの前に呼ばれ、performの後にHookの success
が呼ばれ、その次にHookの after
が呼ばれることがわかった。
なお、performが正常に終了した場合はlib/delayed/worker.rb#L232にある通りジョブのレコードを削除する。
異常系のジョブ - BadTaskJob
次は異常系のジョブとして BadTaskJob
を投入する。Hookの enqueue
はTaskJobと同じである。
irb(main):002> Delayed::Job.enqueue BadTaskJob.new BadTaskJob#enqueue TRANSACTION (0.1ms) begin transaction Delayed::Backend::ActiveRecord::Job Create (0.7ms) INSERT INTO "delayed_jobs" ("priority", "attempts", "handler", "last_error", "run_at", "locked_at", "failed_at", "locked_by", "queue", "created_at", "updated_at") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) RETURNING "id" [["priority", 0], ["attempts", 0], ["handler", "--- !ruby/object:BadTaskJob {}\n"], ["last_error", nil], ["run_at", "2024-03-10 12:32:09.683335"], ["locked_at", nil], ["failed_at", nil], ["locked_by", nil], ["queue", "default"], ["created_at", "2024-03-10 12:32:09.683398"], ["updated_at", "2024-03-10 12:32:09.683398"]] TRANSACTION (0.1ms) commit transaction => #<Delayed::Backend::ActiveRecord::Job:0x0000000108fd5878 id: 2, priority: 0, attempts: 0, handler: "--- !ruby/object:BadTaskJob {}\n", last_error: nil, run_at: Sun, 10 Mar 2024 12:32:09.683335000 UTC +00:00, locked_at: nil, failed_at: nil, locked_by: nil, queue: "default", created_at: Sun, 10 Mar 2024 12:32:09.683398000 UTC +00:00, updated_at: Sun, 10 Mar 2024 12:32:09.683398000 UTC +00:00>
ジョブワーカー (rake jobs:work
をしたターミナル) を確認する。 BadTaskJob
のジョブが実行されると以下の出力がされた。
[Worker(host:Suppaman.local pid:56408)] Job BadTaskJob (id=2) (queue=default) RUNNING BadTaskJob#before BadTaskJob#perform BadTaskJob#error BadTaskJob#after [Worker(host:Suppaman.local pid:56408)] Job BadTaskJob (id=2) (queue=default) FAILED (0 prior attempts) with StandardError: Failed to process by something. [Worker(host:Suppaman.local pid:56408)] 1 jobs processed at 54.2711 j/s, 1 failed
peformでraiseをしているため、Hookの error
が呼ばれたのだと思われる。その次にHookの after
が呼ばれる。
config/initializers/delayed_job_config.rb
に Delayed::Worker.max_attempts = 3
と記述しているため、ジョブワーカーは再試行上限範囲内と捉えて、数分後に BadTaskJob
を実行した。そして以下の出力を得た。Hookの流れは前回のジョブ実行時と同じである。
[Worker(host:Suppaman.local pid:56408)] Job BadTaskJob (id=2) (queue=default) RUNNING BadTaskJob#before BadTaskJob#perform BadTaskJob#error BadTaskJob#after [Worker(host:Suppaman.local pid:56408)] Job BadTaskJob (id=2) (queue=default) FAILED (1 prior attempts) with StandardError: Failed to process by something. [Worker(host:Suppaman.local pid:56408)] 1 jobs processed at 119.2037 j/s, 1 failed
さらに数分後にジョブワーカーはジョブを再実行した。そして以下の出力を得た。前回のジョブ実行と異なるのはHook after
の後にHook failure
が呼ばれた点である。
[Worker(host:Suppaman.local pid:56408)] Job BadTaskJob (id=2) (queue=default) RUNNING BadTaskJob#before BadTaskJob#perform BadTaskJob#error BadTaskJob#after [Worker(host:Suppaman.local pid:56408)] Job BadTaskJob (id=2) (queue=default) FAILED (2 prior attempts) with StandardError: Failed to process by something. [Worker(host:Suppaman.local pid:56408)] Job BadTaskJob (id=2) (queue=default) FAILED permanently because of 3 consecutive failures BadTaskJob#failure [Worker(host:Suppaman.local pid:56408)] 1 jobs processed at 132.2751 j/s, 1 failed
上の出力結果から当該ジョブの実行回数が Delayed::Worker.max_attempts
に到達した場合、Hookの failure
が呼ばれるとわかった。
DeserializationError
ジョブをキーに入れると、応答結果に handler
を確認でき、値は yaml
形式と推測できる。GitHubにある delayed_job
のREADMEを見るとCustom Jobsで述べられているようにジョブオブジェクトが yaml
にシリアライズされていることがわかる。おそらくこの yaml
形式のデータはこのあと復元する際に使われるのであろう。
irb(main):001> Delayed::Job.enqueue TaskJob.new TaskJob#enqueue TRANSACTION (0.0ms) begin transaction Delayed::Backend::ActiveRecord::Job Create (0.6ms) INSERT INTO "delayed_jobs" ("priority", "attempts", "handler", "last_error", "run_at", "locked_at", "failed_at", "locked_by", "queue", "created_at", "updated_at") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) RETURNING "id" [["priority", 0], ["attempts", 0], ["handler", "--- !ruby/object:TaskJob {}\n"], ["last_error", nil], ["run_at", "2024-03-11 01:53:50.625303"], ["locked_at", nil], ["failed_at", nil], ["locked_by", nil], ["queue", "default"], ["created_at", "2024-03-11 01:53:50.625341"], ["updated_at", "2024-03-11 01:53:50.625341"]] TRANSACTION (0.4ms) commit transaction => #<Delayed::Backend::ActiveRecord::Job:0x000000010765b2f8 id: 3, priority: 0, attempts: 0, handler: "--- !ruby/object:TaskJob {}\n", last_error: nil, run_at: Mon, 11 Mar 2024 01:53:50.625303000 UTC +00:00, locked_at: nil, failed_at: nil, locked_by: nil, queue: "default", created_at: Mon, 11 Mar 2024 01:53:50.625341000 UTC +00:00, updated_at: Mon, 11 Mar 2024 01:53:50.625341000 UTC +00:00>
そうだとすると、ジョブワーカーを起動する前に TaskJob
のクラス名を GoodTaskJob
などに変更するとエラーを引き起こすと思われる。そこでクラス名を変更してからジョブワーカーを起動してみる。
rake jobs:work
ジョブワーカーは以下のように Delayed::DeserializationError
を出力した。
[Worker(host:MilleCrepes1.local pid:54199)] Starting job worker [Worker(host:MilleCrepes1.local pid:54199)] Job TaskJob (id=58) (queue=default) RUNNING [Worker(host:MilleCrepes1.local pid:54199)] Job TaskJob (id=58) (queue=default) FAILED permanently with Delayed::DeserializationError: Job failed to load: undefined class/module TaskJob. Handler: "--- !ruby/object:TaskJob {}\n"
worker.rb#L240の実装を見たが呼び出し可能ならHookの failure
を呼ぶようだ。上の確認例ではyamlからオブジェクトに変換できていないため、 failure
は呼ばれていない。
デキューされたジョブの処理フロー
前述までの確認からデキューされたジョブはどのように処理されるか下図のフローチャートにまとめた。
おわりに
Railsで delayed_job
と delayed_job_active_record
を使った場合の動作確認をRailsコンソールと rake jobs:work
で実施した。その動作確認で得た情報からジョブがデキューされた後のフローを示した。
今後にActive Jobと連携する場合の delayed_job
について確認したい。
参考資料
- https://github.com/collectiveidea/delayed_job/tree/v4.1.11
- https://github.com/collectiveidea/delayed_job_active_record/tree/v4.1.8
- https://rubygems.org/gems/delayed_job
- https://rubygems.org/gems/delayed_job/versions/1.8.1
- https://rubygems.org/gems/delayed_job_active_record
- https://rubygems.org/gems/delayed_job_active_record/versions/0.2.0