こんにちは、開発部の鈴木です。
この記事は Symfony Advent Calendar 2023 の21日目の記事です。
今回はSymfonyアプリケーションの開発における、Messengerの利用シーンと利用方法について紹介します。

はじめに

WEBアプリケーションを開発する際に、ユーザーがアクションを行った数分後に特定の処理を行いたい場合があります。

例えば、外部APIを使用する際には、処理の完了に時間がかかるものや、ステータスの更新が反映されるまでに10~30分かかることがあります。

ユーザーがアプリケーションを操作した後、このような非同期な機能を使用して処理を実行し、その完了を確認した上で、データベースを更新したり、ユーザーにメールを送信したりする必要があります。

このようなケースで、以前はcronを使用して定期的に非同期処理の実行状態を確認していました。しかし、最近SymfonyのMessengerを使用してメッセージの処理を意図的に指定した時間だけ遅延させることができることを知り、実際に実装してみてcronよりもMessengerが適していると感じました。

Messengerの使用方法

まず、Messengerを使用するためには、Messageクラスとそれに対応したMessageHandlerクラスを作成し、MessageBusを使用してメッセージを送信します。

今回はユーザーのアクションで外部APIの非同期のタスクを実行し、そのタスクの実行状態を定期的に確認して完了したらメールを送信するといった実装例を紹介します。

Messageクラスを作成

メッセージの送受信に使用するためのMessageクラスを作成します。

namespace App\Message;

class CheckTaskMessage
{
    public function __construct(
        private int $taskId,
    ) {
    }

    public function getTaskId(): int
    {
        return $this->taskId;
    }
}

MessageHandlerクラスを作成

先程作成したCheckTaskMessageを受け取り、タスクが完了していればメールを送信するMessageHandlerクラスを作成します。

namespace App\MessageHandler;

use App\Message\CheckTaskMessage;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;

#[AsMessageHandler]
class CheckTaskMessageHandler
{
    public function __invoke(CheckTaskMessage $message)
    {
        $taskId = $message->getTaskId();

        // ... $status = 外部APIを使用して$taskIdのタスクの実行状態を取得する

        if ($status === 'completed') {
            // ... メールを送信する
        }
    }
}

メッセージの送信

DIしたMessageBusInterfaceを使用してメッセージを送信します。

namespace App\Controller;

use App\Message\CheckTaskMessage;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\Messenger\MessageBusInterface;

class DefaultController extends AbstractController
{
    public function index(MessageBusInterface $messageBus): Response
    {
        
        // ... $taskId = 外部APIを使用しての非同期なタスクの実行を依頼する

        // タスクの実行状態を確認して何らかの処理を行うメッセージを送信する
        $message = new CheckTaskMessage($taskId);
        $messageBus->dispatch($message);

        // ...
    }
}

DelayStampでメッセージの処理を遅延させる

このままでは、非同期なタスクの実行を依頼した直後にタスクの実行状態を確認することしかできません。
タスクの完了には10 ~ 30分ほど時間がかかる見込みとして、EnvelopeとDelayStampを使用してタスクの実行状態を確認するのを10分待ってから行うようにします。

DelayStampにはメッセージの処理を遅延させる時間(ミリ秒)を指定します。

namespace App\Controller;

use App\Message\CheckTaskMessage;
use Symfony\Bundle\FrameworkBundle\Controller\AbstractController;
use Symfony\Component\HttpFoundation\Response;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;

class DefaultController extends AbstractController
{
    public function index(MessageBusInterface $messageBus): Response
    {
        
        // ... $taskId = 外部APIを使用しての非同期なタスクの実行を依頼する

        // タスクの実行状態を確認して何らかの処理を行うメッセージを送信する
        $message = new CheckTaskMessage($taskId);
        
        $envelope = new Envelope($message, [
            new DelayStamp(1000 * 60 * 10),
        ]);

        $messageBus->dispatch($envelope);

        // ...
    }
}

再帰的にメッセージを送信する

タスクの実行状態の確認を10分遅延させることができるようになりましたが、このままではタスクの実行状態を1度しか確認することができません。
再帰的にメッセージを送信することで、タスクが完了するまで10分毎に実行状態を確認するようにします。

namespace App\MessageHandler;

use App\Message\CheckTaskMessage;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\MessageBusInterface;
use Symfony\Component\Messenger\Stamp\DelayStamp;

#[AsMessageHandler]
class CheckTaskMessageHandler
{
    public function __construct(
        private readonly MessageBusInterface $messageBus,
    ) {
    }

    public function __invoke(CheckTaskMessage $message)
    {
        $taskId = $message->getTaskId();

        // ... $status = 外部APIを使用して$taskIdのタスクのステータスを取得する

        if ($status === 'completed') {
            // ... メールを送信する
            return;
        }

        // タスクが完了していない場合、また10分後に実行状態を確認する
        $message = new CheckTaskMessage($taskId);
        
        $envelope = new Envelope($message, [
            new DelayStamp(1000 * 60 * 10),
        ]);

        $messageBus->dispatch($envelope);
    }
}

まとめ

今回はSymfonyのMessagerを遅延処理させ再帰的に使用し、非同期なタスクが完了するまで定期的に確認する実装例を紹介しました。

cronを使用して実装した場合と比べて、実行中のタスクがない場合の無駄な定期処理が無くなり、定期処理のスケジュールもシステム都合ではなく、ユーザーがアクションを行った時間を基準に行うことができる点が良いなと思いました。