Rails v7でDelayed::Job (delayed_job)を使う

はじめに

最近はRuby on RailsでDelayed::Job (Gemパッケージ名delayed_job) とDelayedJob ActiveRecord Backend (Gemパッケージ名delayed_job_active_record)を使い始めた。まだ全然使いこなせていないが手元の情報をこのページに記す。

各々のGitHubでは以下のように概要説明がある。

delayed_job

Database based asynchronous priority queue system – Extracted from Shopify

delayed_job_active_record

ActiveRecord backend integration for DelayedJob 3.0+

筆者はRails 7からユーザーとなったのでRails 4.2 (リリース日2014年12月) で導入されたActive Jobが前提にある。RubyGems.orgで確認可能な最古のバージョンを見ると delayed_job は2009年9月にリリースされたv1.8.1であり、 delayed_job_active_record は2011年9月にリリースされたv0.2.0であった。Active JobがないRails 4.2未満のアプリケーションでは、これら2つのGemパッケージを利用してジョブの処理をしていたのだろうと想像する。

そのため、本稿ではActive Jobと連携はせずにdelayed_job v4.1.11とdelayed_job_active_record v4.1.8のみの利用にして、別稿でActive Jobと連携することにしたい。

前提の開発環境

本稿は以下の開発環境を前提とする。

$ uname -m
arm64

$ sysctl -n machdep.cpu.brand_string
Apple M1 Max

$ sw_vers
ProductName:		macOS
ProductVersion:		14.3.1
BuildVersion:		23D60

$ sqlite3 --version
3.43.2 2023-10-10 13:08:14 1b37c146ee9ebb7acd0160c0ab1fd11017a419fa8a3187386ed8cb32b709aapl (64-bit)

$ rbenv --version
rbenv 1.2.0-62-ga632465

$ ruby --version
ruby 3.2.2 (2023-03-30 revision e51014f9c0) [arm64-darwin21]

$ bundle --version
Bundler version 2.4.22

$ rails --version
Rails 7.1.3.2

サンプルプロジェクトの作成

delayed_jobdelayed_job_active_record の動作確認用にRailsプロジェクトを作成する。

rails new delayed_job_sample
cd delayed_job_sample
rails db:create # データベースも先んじて作成しておく

作成したRailsプロジェクト内の Gemfile に以下の記述を加える。

gem 'delayed_job', "4.1.11"
gem 'delayed_job_active_record', "4.1.8"

Bundler でGemパッケージをインストールする。

bundle install

GitHubにある delayed_job_active_record のREADMEはInstallationbundle install 後に以下のコマンド実行を促している。それに従い実行する。

rails g delayed_job:active_record
rake db:migrate

サーバーを起動する。

rails server

以上の手順で http://localhost:3000 にアクセスできる。

ジョブの定義

続いてジョブを定義する。

GitHubにある delayed_job のREADMEを見るとCustom Jobsでジョブについて以下のように述べている。

Jobs are simple ruby objects with a method called perform. Any object which responds to perform can be stuffed into the jobs table. Job objects are serialized to yaml so that they can later be resurrected by the job runner.

上の内容から筆者は大雑把に以下のように捉えた。

  • ジョブは perform メソッドが呼ばれるRubyオブジェクトである
  • ジョブはジョブテーブルに格納される
  • ジョブオブジェクトは yaml にシリアライズされる
  • ジョブオブジェクトはジョブランナーによって yaml から復元される

READMEは続けてコードでジョブ定義とキューイング方法を例示しており、以下がそのコードである。ジョブ定義に NewsletterJob 、キューイングの仕方に Delayed::Job.enqueue を挙げている。READMEのCustom Jobsの冒頭の説明通り、ジョブには perform メソッドがある。 delayed_job は実装者に特定のクラスの継承やモジュールのインクルードを求めないこともわかった。

NewsletterJob = Struct.new(:text, :emails) do
  def perform
    emails.each { |e| NewsletterMailer.deliver_text_to_email(text, e) }
  end
end

Delayed::Job.enqueue NewsletterJob.new('lorem ipsum...', Customers.pluck(:email))

READMEは続けてHooksを説明している。筆者はコールバック相当と捉えた。説明には NewsletterJob の派生クラス ParanoidNewsletterJob が登場する。

例として用いる正常系のジョブ

よりシンプルなジョブが欲しかったため、以下のジョブを定義した。このジョブは正常終了することを想定して記述した。

touch app/models/task_job.rb
class TaskJob
  def enqueue(job)
    puts "TaskJob#enqueue"
  end

  def before(job)
    puts 'TaskJob#before'
  end
  
  def perform
    puts "TaskJob#perform"
  end

  def success(job)
    puts "TaskJob#success"
  end

  def error(job, exception)
    puts "TaskJob#error"
  end

  def failure(job)
    puts "TaskJob#failure"
  end
  
  def after(job)
    puts 'TaskJob#after'
  end  
end

例として用いる異常系のジョブ

perform メソッドでエラーが起きる場合を想定して BadTaskJob を作成する。

touch app/models/bad_task_job.rb
class BadTaskJob
  def enqueue(job)
    puts "BadTaskJob#enqueue"
  end

  def before(job)
    puts 'BadTaskJob#before'
  end
  
  def perform
    puts "BadTaskJob#perform"
    raise StandardError.new("Failed to process by something.")
  end

  def success(job)
    puts "BadTaskJob#success"
  end

  def error(job, exception)
    puts "BadTaskJob#error"
  end

  def failure(job)
    puts "BadTaskJob#failure"
  end
  
  def after(job)
    puts 'BadTaskJob#after'
  end  
end

ジョブのHookがどのように呼ばれるかについては後ほど触れる。

キューイング

手早くジョブをキューイングするためRailsコンソールを使う。

rails console

ジョブ TaskJob を作成してキューイングする。

Delayed::Job.enqueue TaskJob.new

キューイングしたジョブを確認する。

Delayed::Job.last

Delayed::Job.last の応答結果を見ると、ジョブは delayed_jobs テーブルに格納していることがわかる。

Delayed::Backend::ActiveRecord::Job Load (0.3ms)  SELECT "delayed_jobs".* FROM "delayed_jobs" ORDER BY "delayed_jobs"."id" DESC LIMIT ?  [["LIMIT", 1]]

その delayed_jobs テーブルも併せて確認する。

rails dbconsole

簡単なSQLで確認を済ます。

SELECT * FROM delayed_jobs;

以下はそのSQLの応答結果である。カラムの run_at はジョブが実行される時刻を示す。

id|priority|attempts|handler|last_error|run_at|locked_at|failed_at|locked_by|queue|created_at|updated_at
1|0|0|--- !ruby/object:TaskJob {}
||2024-03-09 10:50:49.163368||||default|2024-03-09 10:50:49.163400|2024-03-09 10:50:49.163400

ジョブワーカーの起動方法

ターミナルで rake jobs:work を実行するとジョブワーカーが起動してジョブを処理し始める。

rake jobs:work

処理を中断したい場合は Ctrl C を入力する。

delayed_jobの設定

GitHubの delayed_job にあるREADMEのGory Detailsによると、Delayed::Jobの設定は config/initializers/delayed_job_config.rb に記述するようだ。同節には設定ファイルのサンプルが載っている。以下にその内容を引用する。

# config/initializers/delayed_job_config.rb
Delayed::Worker.destroy_failed_jobs = false
Delayed::Worker.sleep_delay = 60
Delayed::Worker.max_attempts = 3
Delayed::Worker.max_run_time = 5.minutes
Delayed::Worker.read_ahead = 10
Delayed::Worker.default_queue_name = 'default'
Delayed::Worker.delay_jobs = !Rails.env.test?
Delayed::Worker.raise_signal_exceptions = :term
Delayed::Worker.logger = Logger.new(File.join(Rails.root, 'log', 'delayed_job.log'))

設定の初期値はソースコードに記載がある。

https://github.com/collectiveidea/delayed_job/blob/v4.1.11/lib/delayed/worker.rb

module Delayed
  class Worker # rubocop:disable ClassLength
    DEFAULT_LOG_LEVEL        = 'info'.freeze
    DEFAULT_SLEEP_DELAY      = 5
    DEFAULT_MAX_ATTEMPTS     = 25
    DEFAULT_MAX_RUN_TIME     = 4.hours
    DEFAULT_DEFAULT_PRIORITY = 0
    DEFAULT_DELAY_JOBS       = true
    DEFAULT_QUEUES           = [].freeze
    DEFAULT_QUEUE_ATTRIBUTES = HashWithIndifferentAccess.new.freeze
    DEFAULT_READ_AHEAD       = 5

なお、 max_attempts, max_run_time, destroy_failed_jobs, queue_name, reschedule_at についてはジョブ定義時にメソッドを実装することで指定できるようである。詳しくはGitHub delayed_job のREADMEを参照されたい。

ジョブのHook

ジョブのHookがどのように呼ばれるかについて追う。

対象のHookは enqueuebeforeperformsuccesserrorfailureafter である。

正常系のジョブ - TaskJob

正常系のジョブとして TaskJob をキューに投入する。投入直後、 TaskJob#enqueue と出力されるのでHookの enqueue を確認できた。

irb(main):001> Delayed::Job.enqueue TaskJob.new
TaskJob#enqueue
  TRANSACTION (0.0ms)  begin transaction
  Delayed::Backend::ActiveRecord::Job Create (0.3ms)  INSERT INTO "delayed_jobs" ("priority", "attempts", "handler", "last_error", "run_at", "locked_at", "failed_at", "locked_by", "queue", "created_at", "updated_at") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) RETURNING "id"  [["priority", 0], ["attempts", 0], ["handler", "--- !ruby/object:TaskJob {}\n"], ["last_error", nil], ["run_at", "2024-03-10 12:21:55.816135"], ["locked_at", nil], ["failed_at", nil], ["locked_by", nil], ["queue", "default"], ["created_at", "2024-03-10 12:21:55.816158"], ["updated_at", "2024-03-10 12:21:55.816158"]]
  TRANSACTION (0.1ms)  commit transaction
=> 
#<Delayed::Backend::ActiveRecord::Job:0x000000011331fc90
 id: 1,
 priority: 0,
 attempts: 0,
 handler: "--- !ruby/object:TaskJob {}\n",
 last_error: nil,
 run_at: Sun, 10 Mar 2024 12:21:55.816135000 UTC +00:00,
 locked_at: nil,
 failed_at: nil,
 locked_by: nil,
 queue: "default",
 created_at: Sun, 10 Mar 2024 12:21:55.816158000 UTC +00:00,
 updated_at: Sun, 10 Mar 2024 12:21:55.816158000 UTC +00:00>

この時点で delayed_job テーブルにジョブのレコードが存在する。

ジョブワーカー (rake jobs:work をしたターミナル) を確認する。ジョブワーカーは TaskJob のジョブを実行して以下の内容を出力した。

[Worker(host:Suppaman.local pid:56408)] Job TaskJob (id=1) (queue=default) RUNNING
TaskJob#before
TaskJob#perform
TaskJob#success
TaskJob#after
[Worker(host:Suppaman.local pid:56408)] Job TaskJob (id=1) (queue=default) COMPLETED after 0.0012
[Worker(host:Suppaman.local pid:56408)] 1 jobs processed at 200.0000 j/s, 0 failed

上の出力結果からHookの before はperformの前に呼ばれ、performの後にHookの success が呼ばれ、その次にHookの after が呼ばれることがわかった。

なお、performが正常に終了した場合はlib/delayed/worker.rb#L232にある通りジョブのレコードを削除する。

異常系のジョブ - BadTaskJob

次は異常系のジョブとして BadTaskJob を投入する。Hookの enqueue はTaskJobと同じである。

irb(main):002> Delayed::Job.enqueue BadTaskJob.new
BadTaskJob#enqueue
  TRANSACTION (0.1ms)  begin transaction
  Delayed::Backend::ActiveRecord::Job Create (0.7ms)  INSERT INTO "delayed_jobs" ("priority", "attempts", "handler", "last_error", "run_at", "locked_at", "failed_at", "locked_by", "queue", "created_at", "updated_at") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) RETURNING "id"  [["priority", 0], ["attempts", 0], ["handler", "--- !ruby/object:BadTaskJob {}\n"], ["last_error", nil], ["run_at", "2024-03-10 12:32:09.683335"], ["locked_at", nil], ["failed_at", nil], ["locked_by", nil], ["queue", "default"], ["created_at", "2024-03-10 12:32:09.683398"], ["updated_at", "2024-03-10 12:32:09.683398"]]
  TRANSACTION (0.1ms)  commit transaction
=> 
#<Delayed::Backend::ActiveRecord::Job:0x0000000108fd5878
 id: 2,
 priority: 0,
 attempts: 0,
 handler: "--- !ruby/object:BadTaskJob {}\n",
 last_error: nil,
 run_at: Sun, 10 Mar 2024 12:32:09.683335000 UTC +00:00,
 locked_at: nil,
 failed_at: nil,
 locked_by: nil,
 queue: "default",
 created_at: Sun, 10 Mar 2024 12:32:09.683398000 UTC +00:00,
 updated_at: Sun, 10 Mar 2024 12:32:09.683398000 UTC +00:00>

ジョブワーカー (rake jobs:work をしたターミナル) を確認する。 BadTaskJob のジョブが実行されると以下の出力がされた。

[Worker(host:Suppaman.local pid:56408)] Job BadTaskJob (id=2) (queue=default) RUNNING
BadTaskJob#before
BadTaskJob#perform
BadTaskJob#error
BadTaskJob#after
[Worker(host:Suppaman.local pid:56408)] Job BadTaskJob (id=2) (queue=default) FAILED (0 prior attempts) with StandardError: Failed to process by something.
[Worker(host:Suppaman.local pid:56408)] 1 jobs processed at 54.2711 j/s, 1 failed

peformでraiseをしているため、Hookの error が呼ばれたのだと思われる。その次にHookの after が呼ばれる。

config/initializers/delayed_job_config.rbDelayed::Worker.max_attempts = 3 と記述しているため、ジョブワーカーは再試行上限範囲内と捉えて、数分後に BadTaskJob を実行した。そして以下の出力を得た。Hookの流れは前回のジョブ実行時と同じである。

[Worker(host:Suppaman.local pid:56408)] Job BadTaskJob (id=2) (queue=default) RUNNING
BadTaskJob#before
BadTaskJob#perform
BadTaskJob#error
BadTaskJob#after
[Worker(host:Suppaman.local pid:56408)] Job BadTaskJob (id=2) (queue=default) FAILED (1 prior attempts) with StandardError: Failed to process by something.
[Worker(host:Suppaman.local pid:56408)] 1 jobs processed at 119.2037 j/s, 1 failed

さらに数分後にジョブワーカーはジョブを再実行した。そして以下の出力を得た。前回のジョブ実行と異なるのはHook after の後にHook failure が呼ばれた点である。

[Worker(host:Suppaman.local pid:56408)] Job BadTaskJob (id=2) (queue=default) RUNNING
BadTaskJob#before
BadTaskJob#perform
BadTaskJob#error
BadTaskJob#after
[Worker(host:Suppaman.local pid:56408)] Job BadTaskJob (id=2) (queue=default) FAILED (2 prior attempts) with StandardError: Failed to process by something.
[Worker(host:Suppaman.local pid:56408)] Job BadTaskJob (id=2) (queue=default) FAILED permanently because of 3 consecutive failures
BadTaskJob#failure
[Worker(host:Suppaman.local pid:56408)] 1 jobs processed at 132.2751 j/s, 1 failed

上の出力結果から当該ジョブの実行回数が Delayed::Worker.max_attempts に到達した場合、Hookの failure が呼ばれるとわかった。

DeserializationError

ジョブをキーに入れると、応答結果に handler を確認でき、値は yaml 形式と推測できる。GitHubにある delayed_job のREADMEを見るとCustom Jobsで述べられているようにジョブオブジェクトが yaml にシリアライズされていることがわかる。おそらくこの yaml 形式のデータはこのあと復元する際に使われるのであろう。

irb(main):001> Delayed::Job.enqueue TaskJob.new
TaskJob#enqueue
  TRANSACTION (0.0ms)  begin transaction
  Delayed::Backend::ActiveRecord::Job Create (0.6ms)  INSERT INTO "delayed_jobs" ("priority", "attempts", "handler", "last_error", "run_at", "locked_at", "failed_at", "locked_by", "queue", "created_at", "updated_at") VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) RETURNING "id"  [["priority", 0], ["attempts", 0], ["handler", "--- !ruby/object:TaskJob {}\n"], ["last_error", nil], ["run_at", "2024-03-11 01:53:50.625303"], ["locked_at", nil], ["failed_at", nil], ["locked_by", nil], ["queue", "default"], ["created_at", "2024-03-11 01:53:50.625341"], ["updated_at", "2024-03-11 01:53:50.625341"]]
  TRANSACTION (0.4ms)  commit transaction
=> 
#<Delayed::Backend::ActiveRecord::Job:0x000000010765b2f8
 id: 3,
 priority: 0,
 attempts: 0,
 handler: "--- !ruby/object:TaskJob {}\n",
 last_error: nil,
 run_at: Mon, 11 Mar 2024 01:53:50.625303000 UTC +00:00,
 locked_at: nil,
 failed_at: nil,
 locked_by: nil,
 queue: "default",
 created_at: Mon, 11 Mar 2024 01:53:50.625341000 UTC +00:00,
 updated_at: Mon, 11 Mar 2024 01:53:50.625341000 UTC +00:00>

そうだとすると、ジョブワーカーを起動する前に TaskJob のクラス名を GoodTaskJob などに変更するとエラーを引き起こすと思われる。そこでクラス名を変更してからジョブワーカーを起動してみる。

rake jobs:work

ジョブワーカーは以下のように Delayed::DeserializationError を出力した。

[Worker(host:MilleCrepes1.local pid:54199)] Starting job worker
[Worker(host:MilleCrepes1.local pid:54199)] Job TaskJob (id=58) (queue=default) RUNNING
[Worker(host:MilleCrepes1.local pid:54199)] Job TaskJob (id=58) (queue=default) FAILED permanently with Delayed::DeserializationError: Job failed to load: undefined class/module TaskJob. Handler: "--- !ruby/object:TaskJob {}\n"

worker.rb#L240の実装を見たが呼び出し可能ならHookの failure を呼ぶようだ。上の確認例ではyamlからオブジェクトに変換できていないため、 failure は呼ばれていない。

デキューされたジョブの処理フロー

前述までの確認からデキューされたジョブはどのように処理されるか下図のフローチャートにまとめた。

./images/delayed_job-flowchart.png

おわりに

Railsで delayed_jobdelayed_job_active_record を使った場合の動作確認をRailsコンソールと rake jobs:work で実施した。その動作確認で得た情報からジョブがデキューされた後のフローを示した。

今後にActive Jobと連携する場合の delayed_job について確認したい。