Log目次

【Rails】shoryukenを使った非同期処理の作成

作成日 2021-03-30更新日 2021-03-30

はじめに

shoryukenはAmazonSQSを使用した非同期処理を作成できるGemです。

インストールするgemは下記のものです。

※事前にキューは作成しておきます。今回はfifoのキューを作成して確認します。

gem 'shoryuken' gem 'aws-sdk-sqs'

開発環境

初期設定

config/shoryuken.ymlを作成し、設定情報を記載します。

shoryuken.ymlに設定できる他のオプションについては こちら を参照

concurrency: 1 queues: - hoge.fifo

ActiveJob経由で実行する場合は下記の設定も行います

config.active_job.queue_adapter = :shoryuken

rubyで直接実行する場合

rubyで直接実行する場合について記載します。

実行するコード

class HelloWorker include Shoryuken::Worker shoryuken_options queue: 'hoge.fifo', auto_delete: true def perform(sqs_msg, name) puts "Hello, #{name}" puts "#{Thread.current.object_id}" end end

キューの登録

irb(main):003:0> HelloWorker.perform_async('Msg') => #<struct Aws::SQS::Types::SendMessageResult md5_of_message_body="e1ae09b94f8926587697d0f246bf1050", md5_of_message_attributes="4aec9c0a286bf7cfe8d7ce87131e75d1", md5_of_message_system_attributes=nil, message_id="746f3c1f-249a-4d4a-a8d1-f44c37a40fe1", sequence_number="18860703462460911616">

キューの登録前

キューの登録後

キューが作成されていることを確認できます。(利用可能なメッセージの項目が1になっている)

キューの取得、実行

#コマンド docker-compose exec app bundle exec shoryuken -q hoge.fifo -r ./app/jobs/hello_worker.rb # 実行結果 2021-03-29T14:43:15Z 52 TID-6ig HelloWorker/hoge.fifo/746f3c1f-249a-4d4a-a8d1-f44c37a40fe1 INFO: started at 2021-03-29 14:43:15 +0000 Hello, Msg 8440

SQSの状態を確認するとキューがなくなっていることを確認できます。

ActiveJob経由で実行する場合

ActiveJob経由で実行する場合について記載します。

実行するコード

class HelloJob < ApplicationJob queue_as 'hoge.fifo' def perform(*args) puts "call #{args}" puts Thread.current.object_id end end

キューの登録

irb(main):008:0> HelloJob.perform_later('Msg') Enqueued HelloJob (Job ID: 0dbcdbfe-894a-471f-b92a-107cf3eba981) to Shoryuken(hoge.fifo) with arguments: "Msg" => #<HelloJob:0x00005595b0e71bc0 @arguments=["Msg"], @job_id="0dbcdbfe-894a-471f-b92a-107cf3eba981", @queue_name="hoge.fifo", @priority=nil, @executions=0, @exception_executions={}, @timezone="UTC", @sqs_send_message_parameters={:message_body=>"{\"job_class\":\"HelloJob\",\"job_id\":\"0dbcdbfe-894a-471f-b92a-107cf3eba981\",\"provider_job_id\":null,\"queue_name\":\"hoge.fifo\",\"priority\":null,\"arguments\":[\"Msg\"],\"executions\":0,\"exception_executions\":{},\"locale\":\"en\",\"timezone\":\"UTC\",\"enqueued_at\":\"2021-03-29T14:55:52Z\"}", :message_attributes=>{"shoryuken_class"=>{:string_value=>"ActiveJob::QueueAdapters::ShoryukenAdapter::JobWrapper", :data_type=>"String"}}, :message_deduplication_id=>"73cdb5c112767d6f8acdbdbb888f771de4069ba80b8e8777d6c98acbe95955cd", :message_group_id=>"ShoryukenMessage"}>

キューの取得、実行

#コマンド docker-compose exec app bundle exec shoryuken -q hoge.fifo -R #実行結果 2021-03-29T14:57:11Z 134 TID-35c INFO: Starting 2021-03-29T14:57:11Z 134 TID-c4o ActiveJob/HelloJob/hoge.fifo/736b234c-cb7a-474f-8f94-9514f72e3b62 INFO: started at 2021-03-29 14:57:11 +0000 call ["Msg"] 15720

スレッドセーフな実装を心がけよう

上に記載したHelloWorkerのコードで複数のキューを登録して実行結果を確認してみます。

# 実行結果 2021-03-29T15:07:58Z 153 TID-3rk INFO: Starting 2021-03-29T15:07:58Z 153 TID-4ss HelloWorker/hoge.fifo/3ea22c72-eaac-4298-bd00-c0a17c04af27 INFO: started at 2021-03-29 15:07:58 +0000 Hello, Msg1 6220 2021-03-29T15:07:58Z 153 TID-4ss HelloWorker/hoge.fifo/3ea22c72-eaac-4298-bd00-c0a17c04af27 INFO: completed in: 74.606517 ms 2021-03-29T15:07:58Z 153 TID-4ss HelloWorker/hoge.fifo/3b248516-7f18-4136-a71b-4d44231712e3 INFO: started at 2021-03-29 15:07:58 +0000 Hello, Msg2 6220 2021-03-29T15:07:58Z 153 TID-6uo HelloWorker/hoge.fifo/e7338933-e571-4364-9ca2-b0ffd6af2688 INFO: started at 2021-03-29 15:07:58 +0000 Hello, Msg3 8880 2021-03-29T15:07:59Z 153 TID-4ss HelloWorker/hoge.fifo/3b248516-7f18-4136-a71b-4d44231712e3 INFO: completed in: 21.199928 ms 2021-03-29T15:07:59Z 153 TID-884 HelloWorker/hoge.fifo/1f54f9f3-a01f-496a-959c-a922cd56031a INFO: started at 2021-03-29 15:07:59 +0000 Hello, Msg4 10660 2021-03-29T15:07:59Z 153 TID-6uo HelloWorker/hoge.fifo/e7338933-e571-4364-9ca2-b0ffd6af2688 INFO: completed in: 85.155153 ms 2021-03-29T15:07:59Z 153 TID-884 HelloWorker/hoge.fifo/1f54f9f3-a01f-496a-959c-a922cd56031a INFO: completed in: 25.114509 ms 2021-03-29T15:07:59Z 153 TID-9lk HelloWorker/hoge.fifo/ce9c8388-5936-48b1-94e3-7d446af2c16c INFO: started at 2021-03-29 15:07:59 +0000 Hello, Msg5 12440 2021-03-29T15:07:59Z 153 TID-9lk HelloWorker/hoge.fifo/ce9c8388-5936-48b1-94e3-7d446af2c16c INFO: completed in: 28.740350000000003 ms 2021-03-29T15:07:59Z 153 TID-6uo HelloWorker/hoge.fifo/0e11a6cf-6a2f-4f30-95a8-fae3e31cc96d INFO: started at 2021-03-29 15:07:59 +0000 Hello, Msg6 8880 2021-03-29T15:07:59Z 153 TID-9lk HelloWorker/hoge.fifo/17364ad1-77fe-45f4-8ecd-f2608917dc20 INFO: started at 2021-03-29 15:07:59 +0000 Hello, Msg7 12440 2021-03-29T15:07:59Z 153 TID-6uo HelloWorker/hoge.fifo/0e11a6cf-6a2f-4f30-95a8-fae3e31cc96d INFO: completed in: 22.214782 ms 2021-03-29T15:07:59Z 153 TID-9lk HelloWorker/hoge.fifo/17364ad1-77fe-45f4-8ecd-f2608917dc20 INFO: completed in: 30.55334 ms

Hello, Msgの文言の下に出ている数字はスレッドのIDです。

実行結果を見るとわかりますが、同じスレッドIDが出現しています。

これは、処理のたびにスレッドを新規に作成しているのではなく、一度作成してプールしているものを使用して処理の負荷を軽減させています。(スレッドプール)

thread_mattr_accessorのようなスレッドに値を保持するような変数を使用する場合は、初期化しないと別のスレッドで使用していた値をそのまま使ってしまうこともあるので注意が必要です。
#modelに属性追加 class User < ApplicationRecord thread_mattr_accessor :test_value end #thread_mattr_accessorの処理を追加 class HelloWorker include Shoryuken::Worker shoryuken_options queue: 'hoge.fifo', auto_delete: true def perform(sqs_msg, name) puts "Hello, #{name}" puts "#{Thread.current.object_id}" target = User.find(1) if target.test_value puts '値がないので設定します' target.test_value = SecureRandom.uuid else puts '値が設定されているので表示します' puts target.test_value end end end

実行結果 スレッドIDが15720は、最初は値が設定されていないが後続でスレッド再使用された時には値が設定されている。

#railsフラグがないとモデルの参照ができないので追加 docker-compose exec app bundle exec shoryuken -q hoge.fifo -r ./app/jobs/hello_worker.rb --rails 2021-03-29T16:08:53Z 55 TID-35c INFO: Starting 2021-03-29T16:08:53Z 55 TID-c4o HelloWorker/hoge.fifo/515fc4cc-7db4-43f8-aa9a-fecea22ebd3d INFO: started at 2021-03-29 16:08:53 +0000 Hello, Msg1 15720 値がないので設定します 2021-03-29T16:08:53Z 55 TID-c4o HelloWorker/hoge.fifo/515fc4cc-7db4-43f8-aa9a-fecea22ebd3d INFO: completed in: 389.821333 ms 2021-03-29T16:08:53Z 55 TID-c4o HelloWorker/hoge.fifo/22da384a-5e4e-4b39-a033-78d0367d4498 INFO: started at 2021-03-29 16:08:53 +0000 Hello, Msg2 15720 値が設定されているので表示します fbd71abe-b4ab-476d-a939-911327d273f0 2021-03-29T16:08:53Z 55 TID-c4o HelloWorker/hoge.fifo/22da384a-5e4e-4b39-a033-78d0367d4498 INFO: completed in: 21.977642 ms 2021-03-29T16:08:53Z 55 TID-g90 HelloWorker/hoge.fifo/60a16ecc-7521-4a9e-a75e-6af9a129132b INFO: started at 2021-03-29 16:08:53 +0000 Hello, Msg3 21060 値がないので設定します 2021-03-29T16:08:54Z 55 TID-g90 HelloWorker/hoge.fifo/60a16ecc-7521-4a9e-a75e-6af9a129132b INFO: completed in: 28.649026000000003 ms 2021-03-29T16:08:54Z 55 TID-hn0 HelloWorker/hoge.fifo/6db036ce-c018-444a-9cdb-075e827d1a07 INFO: started at 2021-03-29 16:08:54 +0000 Hello, Msg 22860 値がないので設定します 2021-03-29T16:08:54Z 55 TID-hn0 HelloWorker/hoge.fifo/6db036ce-c018-444a-9cdb-075e827d1a07 INFO: completed in: 32.033697 ms 2021-03-29T16:08:54Z 55 TID-hn0 HelloWorker/hoge.fifo/2df3724a-5cd8-4138-a230-7d20c271f444 INFO: started at 2021-03-29 16:08:54 +0000 Hello, Msg4 22860 値が設定されているので表示します e3e3d45f-2087-4bb8-82cf-3613fafd351c 2021-03-29T16:08:54Z 55 TID-hn0 HelloWorker/hoge.fifo/2df3724a-5cd8-4138-a230-7d20c271f444 INFO: completed in: 30.187632999999998 ms 2021-03-29T16:08:54Z 55 TID-g90 HelloWorker/hoge.fifo/8bd18b2e-f367-4649-94c0-a9bf263f7c7b INFO: started at 2021-03-29 16:08:54 +0000 Hello, Msg5 21060 値が設定されているので表示します d689de85-50a5-472a-a6af-a7e7c9cc3d5b 2021-03-29T16:08:54Z 55 TID-g90 HelloWorker/hoge.fifo/8bd18b2e-f367-4649-94c0-a9bf263f7c7b INFO: completed in: 25.281324 ms 2021-03-29T16:08:54Z 55 TID-c4o HelloWorker/hoge.fifo/ff6c54d9-a21d-451e-b75a-eb387f2a2d52 INFO: started at 2021-03-29 16:08:54 +0000 Hello, Msg6 15720 値が設定されているので表示します fbd71abe-b4ab-476d-a939-911327d273f0 2021-03-29T16:08:54Z 55 TID-c4o HelloWorker/hoge.fifo/ff6c54d9-a21d-451e-b75a-eb387f2a2d52 INFO: completed in: 26.295854000000002 ms

非同期時の例外処理について

非同期処理の中で何らかの原因で例外が発生し、例外処理ができなかった場合はSQS上にキューが残り続けます。

例外の原因が一時的なものであればキューが残っているので再度取得した際にちゃんと処理できるかもしれませんが、 原因が一時的なものではない(プログラムバグ等)場合は、ずーっと同じキューを取得し続け、 後続の待ちになっているキューの処理が進まなくなる可能性があります。

エラーになった時、問題のキューを取得し続ける

2021-03-30T02:56:15Z 138 TID-35c INFO: Starting 2021-03-30T02:56:15Z 138 TID-c4o HelloWorker/hoge.fifo/abbfb460-97a9-4857-bc50-45f9671a4f07 INFO: started at 2021-03-30 02:56:15 +0000 Hello, Msg 15720 2021-03-30T02:56:15Z 138 TID-c4o HelloWorker/hoge.fifo/abbfb460-97a9-4857-bc50-45f9671a4f07 INFO: failed in: 331.3035 ms 2021-03-30T02:56:15Z 138 TID-c4o ERROR: Processor failed: エラー 2021-03-30T02:56:45Z 138 TID-463c HelloWorker/hoge.fifo/abbfb460-97a9-4857-bc50-45f9671a4f07 INFO: started at 2021-03-30 02:56:45 +0000 Hello, Msg 194520 2021-03-30T02:56:45Z 138 TID-463c HelloWorker/hoge.fifo/abbfb460-97a9-4857-bc50-45f9671a4f07 INFO: failed in: 4.4712 ms 2021-03-30T02:56:45Z 138 TID-463c ERROR: Processor failed: エラー 2021-03-30T02:57:15Z 138 TID-84p8 HelloWorker/hoge.fifo/abbfb460-97a9-4857-bc50-45f9671a4f07 INFO: started at 2021-03-30 02:57:15 +0000 Hello, Msg 379340 2021-03-30T02:57:15Z 138 TID-84p8 HelloWorker/hoge.fifo/abbfb460-97a9-4857-bc50-45f9671a4f07 INFO: failed in: 3.8105 ms 2021-03-30T02:57:15Z 138 TID-84p8 ERROR: Processor failed: エラー 上記が続く・・

対応方法として考えられるのはプログラム上でちゃんと例外処理をハンドリングすること。

もしくは、SQSでデットレターキューを設定し、問題のキューを除外する方法があります。

ここでは、デッドレターキューの設定についてみてみます。

デッドレターキューの作成

デッドレターキューの設定

デッドレターキューに移動した時

上記のように例外が発生して処理が失敗したキューはデッドレターキューに移動するのでメインのキューに残り続けることはありません。

まとめ

今回は非同期処理をAmazonSQSで行う場合についてまとめました。

SQSを使用しない場合は他にもsidekiqを使用するという選択肢も出てくるかと思います。 あとは、Delayed Job、Resqueとか?

非同期処理はちゃんと考えて設計+実装しないと処理が落ちていることに気づかない、処理されないキューがたまっていくなど問題が発生するので十分に検討、動作確認をした方が良いと思います。

参考