SymfonyでWebアプリケーションを作るとき、HTTPリクエストから直接実行するには重い処理があったらどうするか?
以前の記事 でも書いた通り、非同期処理を実装することが多いですね。
Symfony2時代の非同期処理のスタンダードは JMSJobQueueBundle でした。

JMSJobQueueBundleも一応Symfony4に対応してくれたのですが、不具合があって利用できない時期が長く、メンテナーの方が忙しいようでプルリクエストへの返信も遅れがちでした。乗り換え先をどれにするのが良いかTwitterで聞いてみたところ、 Symfony\Component\Messenger を使うと良いよというアドバイスをもらい、使ってみたらとても良かったので、JMSJobQueueBundleとの違いを軸にご紹介したいと思います。

JMSJobQueueBundleからMessengerComponentへの移行方法

コマンドは作らなくて良い。代わりにMessageとHandlerを作る。

JMSJobQueueBundleは「コマンドを実行する」という縛りがあったので、非同期処理したい処理に関して専用のコマンド(Symfonyの Command を継承したコマンドクラス)を作る必要がありました。

Messengerでは、MessageクラスとHandlerクラスを作ります。
MessageクラスはピュアなPHPのクラスです。何の抽象クラスを継承する必要も何のinterfaceを実装する必要もありません。非同期処理に対して渡したいパラメータの入れ物として使います。Messageオブジェクトはキューに入れた時にシリアライズされ、非同期処理実行時にデシリアライズされるので、シリアライズして安全な値だけをもたせるように設計する必要があります。たとえば、Doctrineのエンティティをそのまま入れるのは危険なので、MessageにはIDだけを持たせて、実行時にEntityManagerからエンティティを取り直す方式が 公式に推奨 されています。
Handlerクラスは Symfony\Component\Messenger\Handler\MessageHandlerInterface を実装したクラスです。Messageを受け取って __invoke メソッドで処理します。Handlerクラスに対しては通常のSymfonyのDependencyInjectionを使って依存を注入することができるので、いろいろな仕事をさせることができます。 messenger.message_handler タグをつけてサービスとして定義しておく必要があります。

エンキューのやり方

JMSJobQueueBundleでは JMS\JobQueueBundle\Entity\Job を新規で作り、エンティティとしてDBに保存することで非同期処理キューに入れる仕組みでした。
MessengerComponentでは、対象のMessageクラスを async Transportに送る設定をした上で、Messageオブジェクトを MessageBus( @messenger.default_bus サービスでDIできます)にdispatchすることで非同期処理にエンキューします。

<?php

$message = new MyMessage();
$messageBus->dispatch($message);

キューのバックエンド

JMSJobQueueBundleではデータベース一択でしたが、MessengerComponentでは他のキューバックエンドにも対応しています。
デフォルトでAMQP、 Redisとデータベース保存の Transport が提供されており、 自作する こともできます。

実際に非同期処理を実行するランナーはどうやって起動するか?

JMSJobQueueBundleでは jms-job-queue:run を常駐させていました。
MessageComponentでは、代わりに messenger:consume を常駐させます。
常駐設定は公式ドキュメントにある通り supervisorを使う のが一般的ですが、一定時間ごとにcronで起動していく方式でも良いと思います。

並列実行したいとき

1つのキューに対して並列で実行したい(同時にN個処理したい)場合です。
JMSJobQueueBundleでは jms-job-queue:run--max-concurrent-jobs=N を渡すことで並列実行させることができました。
MessengerComponentでは並列実行したい場合は messenger:consume コマンドのプロセスを所望の数だけ起動します。supervisorを利用する場合は numprocs=N を指定すれば良いですし、cronの場合はcrontabに同じ設定をN行コピーして設定することで実現できます。

優先順位をつけたい・キュー名を指定したいとき

JMSJobQueueBundleでは JMS\JobQueueBundle\Entity\Job を作る時に $priority$queue を指定することで、コマンド実行時に他より優先させたり、キュー名ごとにランナーを振り分けたりできました。
MessengerComponentでは、 公式ドキュメント にある通り、必要なだけ優先度やキュー名によってTransportを予め分けて定義しておき、MessageクラスをそれぞれのTransportにルーティングします。同じクラスのMessageオブジェクト間で別々のキュー名を利用することはできません。優先度を分けたい・ランナーを分けたい場合はそれぞれMessageのクラスを分けておきましょう。

ある非同期処理を他の非同期処理に依存させたい(他の非同期処理が終わってから発動させたい非同期処理がある)

JMSJobQueueBundleではJobのDependencyを指定することができました。
MessengerComponentにはDependency機能は無いですが、1つ目のMessageのHandlerの最後で次のMessageをMessageBusにdispatchするという方法で実現できます。

非同期処理が成功したか失敗したかを見たい

JMSJobQueueBundleでは JMS\JobQueueBundle\Entity\Job のレコードがジョブ実行完了後も保存されていたので、後からそれを見て成功・失敗をチェックすることができました。Jobは単なるエンティティだったので、アプリケーションのエンティティからリレーションを張っておくことでJobエンティティの成功・失敗を参照する作りにすることもできたほどです。
MessengerComponentではMessageは処理が終わった後は保存されることがないので、成功・失敗は別の箇所(アプリケーションのエンティティに直接 state フィールドを作ってMessageHandlerの側から保存する等)に保存する必要があります。最初は不便に感じましたが、最近では、アプリケーションの側が「非同期処理される」ことに依存する構造を作りにくくなったことで依存関係が混乱することがなくなって良いと思うようになりました。

まとめ

MessengerComponentに移行して数ヶ月経ちますが、全般的にシンプルになっているので、実装も運用は楽だと感じます。特に、JMSJobQueueBundleの頃にあった、暴走して数時間走りっぱなしの非同期コマンドを止めたり、古いJobレコードを定期削除したり…というメンテナンスが不要になったのは大きいです。

なお、私は直近で、MessageのRoutingをasyncに指定するのを忘れたために非同期で処理しているつもりの重い処理がいつまでも同期処理されていたというウッカリがありました :sweat_drops: ルーティング設定は忘れやすいのでそこだけ要注意かなと思います!