大規模なWebアプリケーション開発において、「スピード」は非常に重要なポイントです。
ユーザーはアプリケーションに「即応性」を求めていますが、アプリケーションの処理の一部には元々スピードが遅く、その処理を早めることや排除ができない場合があります。
この課題を解決してくれるのが「Message queue(メッセージキュー)」です。一般的なリクエスト-レスポンスフローと並行して、追加のレスポンス経路を作成することで、本来時間のかかる処理スピードを早め、ユーザー満足度を向上させます。
Queues(以下、キュー)とは、先入れ先出し(FIFO)の原則に従って、エンティティを特定の順序で整理するデータ構造のことです。つまり、キューに最初に追加された要素が、最初に削除されることになります。日常生活で例えると、飲食店の列に並んでいる時に、最初に並んだ人から順番にお店に入って、退店していくようなイメージです。
キューの話に戻ると、APIリクエストなどのリクエストを実行する時、メール送信などのタスクを切り離す必要があれば、そのタスクをキューに追加して、メインの処理を続行することができます。そしてそのタスクは、キューに追加された順番で処理されます。
これにより、メインの流れを中断することなく、効率的にタスクを管理することができます。
以下で、上記の図に出てくる用語を解説していきます。
【 ジョブ 】
ジョブとは、通常JSONのような形式で、処理のためにキューに入れられます。
例えば、空港にいる人々をジョブとします。パスポートなどの入国に必要な書類を持って審査待ちの列に並び、順番に審査を受けていきます。
ジョブには処理に必要なデータがそれぞれ含まれており、列に追加され、追加された順に処理されていきます。
【 ジョブプロデューサー 】
ジョブプロデューサーとは、ジョブをキューに追加する役割のあるコードのことです。
先ほどの空港の例に当てはめると、ジョブプロデューサーは、チケットの購入やチェックイン、手荷物検査など空港に居る人たちを適切な列に誘導する警備員です。
マイクロサービスアーキテクチャでは、ジョブプロデューサーはジョブコンシューマーから独立して動作することができます。つまり、あるサービスはジョブをキューに追加することに注力し、そのジョブがいつ、どのように処理されるか気にする必要がないということです。
【 ワーカー(ジョブコンシューマー) 】
ワーカー(ジョブコンシューマー)とは、ジョブを実行するプロセスや機能のことです。
例えば、銀行の窓口に人々が並んでいるとします。最初に来た人が列の先頭となり、順番が来ると窓口の人が呼び出し、要件を伝えて必要な手続きを行います。その間にも次々と人が並んでいきますが、窓口にいる人の対応が終わるまで、次の人の要件を聞くことはできません。
同様に、キューワーカーはキューの最初のジョブを選択し、それを処理してから次のジョブに進みます。
【 失敗したジョブ 】
場合によっては、処理中にジョブが失敗することがあります。失敗の理由には、主に以下のようなものが挙げられます。
・入力データの無効や不足
ジョブが処理するためには特定のデータの処理が必要ですが、そのデータが不足していたり、間違っている場合、ジョブは失敗します。
・タイムアウト
ジョブの完了に時間がかかりすぎる場合、キューシステムによってジョブが終了されることがあります。これは依存関係の問題やその他の問題が原因の可能性もありますが、通常は1つのジョブが無限に実行されることはありません。
・ネットワークやインフラの問題
データベース接続エラーのような問題が原因で、ジョブが失敗することがあります。これらの多くは制御が難しい場合が多いです。
・依存関係の問題
ジョブが正常に実行されるために、外部リソースに頼る場合もあります。このリソースが利用できなかったり失敗すると、ジョブも失敗します。
ジョブが失敗した場合、「すぐ」もしくは「少し時間をおいて」再試行するようにキューシステムを設定できます。また、常に失敗するジョブが際限なく再実行されないよう、再試行の最大回数を設定することも推奨されています。
キューは、マイクロサービス間の信頼性の高い通信チャネルを構築するのに不可欠です。それぞれのサービスが異なるタスクを処理している時も、複数のサービスがシームレスにやり取りできるようになります。
例えば、あるサービスがタスクを完了すると、共有キューにジョブを追加することができます。ワーカーが待機している別のサービスは、そのジョブを受け取り、必要に応じてデータを処理することができます。
キューは、リソースを大量に消費するタスクをメインの処理から切り離すのにも有効です。例えば、メールの送信などの時間のかかるタスクをキューに入れることができます。これにより、メイン処理のレスポンスタイムが低下するのを防ぐことができます。
さらに、キューは単一障害点のリスクを軽減するのに役立ちます。失敗が起きやすくても再試行が可能な場合、キューを使用することでそのタスクを後で再試行できるため、システム全体の耐障害性を向上させることができます。
ジョブをキューに追加し、NestJSBullMQを使って処理する方法を、メール送信のような簡単な例で見てみましょう。
ユーザーがサインアップしたときにメールを送信する必要があるサービスがあるとします。メールを直接送信する代わりに(サインアッププロセスが遅くなる可能性があります)、メール送信ジョブをキューに追加します。以下は、これを示す簡略化したコードです。
【 キューにジョブを追加する 】
import { Injectable } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bull';
import { Queue } from 'bullmq';
@Injectable()
export class EmailService {
constructor(
@InjectQueue('emailQueue') private emailQueue: Queue, // Inject the queue
) {}async sendWelcomeEmail(to: string) {
// Adding a job to the emailQueue
await this.emailQueue.add('sendEmail', {
to,subject: 'Welcome to Our Service!',
body: 'Thank you for signing up.',
});console.log(`Job added to queue to send an email to ${to}`);
}
}
・EmailService
このサービスはメールの送信を担当します。
・SendWelcomeEmail
ジョブには、受信者のメールアドレス、件名、メールの本文が含まれます。
・InjectQueue
このデコレータは、キューをサービスに注力し、ジョブをキューに追加できるようにします。
【 キュー内のジョブの処理 】
ジョブがキューに入ったら、それを処理するためのワーカーが必要になります。ワーカーはキューからジョブを取得し、必要なタスク(この場合は電子メールの送信)を実行します。
import { Processor, Process } from '@nestjs/bull';
import { Job } from 'bullmq';@Processor('emailQueue')
export class EmailProcessor {
@Process('sendEmail')
async handleEmailJob(job: Job) {
const { to, subject, body } = job.data;// Simulate sending the email
console.log(`Sending email to ${to} with subject: ${subject}`);
console.log(`Email content: ${body}`);// Here you would actually send the email, e.g., using an email service provider
}
}
1 . 非同期処理
・非ブロッキング操作
キューを使用することで、メインアプリケーションを停止させることなく、バックグラウンドでタスクを処理することができます。
例えば、ユーザーがサインアップしてメールを送信する必要がある場合、メールタスクをキューに入れることで、サインアッププロセスをすぐに完了させることができます。ユーザーはメールの送信を待つ必要がなく、全体的な操作がよりスムーズかつ高速になります。
・パフォーマンスの向上
タスクは非同期で処理されるため、アプリケーションは同時に多くのリクエストを処理でき、全体的なパフォーマンスを向上させることができます。
2 . スケーラビリティ
・水平スケーリング
システムの拡大に合わせて、キューからジョブを処理するワーカーを追加できます。需要が急増した場合(大規模なセール中など)、ワーカーの数を素早く増やして、増加した負荷の処理に対応することができます。
・サービスの分離
キューは、システムの様々な部分が独立して機能することを可能にします。例えば、あるサービスがジョブをキューに追加し、別のサービスがそれを処理する時、お互いの操作を意識する必要はありません。この分離により、サービスの拡張と管理が簡単になります。
3 . 信頼性
・ジョブの永続性
キュー内のジョブは、処理されるまで永続的に保存されます。つまり、ワーカーがクラッシュしたりシステムがダウンしても、ジョブは失われず、システムが復旧すれば処理されます。
・自動再試行
キューには一定期間後に自動的にジョブを再試行する仕組みが備わっている場合が多く、一時的な問題(ネットワークタイムアウトなど)が原因でジョブが失敗した場合でも確実にタスクを完了させることができるため、システムの信頼性が向上します。
4 . 負荷分散
・作業の均等な分散
キューは、複数のワーカー間で作業負荷を均等に分散するのに役立ちます。これにより、単一のワーカーが過負荷になり処理速度が低下したり、障害に繋がってしまうことを防ぎます。
・動的な調整
作業負荷が変化すると、キューはタスクを処理するワーカーの数を動的に調整(ピーク時にはワーカーを追加、閑散期にはワーカーを削減)することができます。
5 . フォールトトレランス
・障害の分離
各ジョブは独立しているため、あるジョブが失敗しても、キュー内の他のジョブには影響しません。失敗したジョブを再試行、エラーのログ記録、サポートチームへの警告など、異なる方法で処理するようにシステムを構成することも可能です。
・グレースフルデグラデーション
システムが高負荷状態にある場合や、問題が発生したりしている場合、キューはタスクの処理速度を制御することで、システム全体の安定性を維持するのに役立ちます。ワーカーが過負荷になった場合、キューはバッファとして機能し、タスクが取りこぼされることなく、リソースが利用可能になったときに処理されるようにします。
6 . 柔軟性
・ジョブの優先順位付け
キューは、特定のジョブを他のジョブよりも優先するように設定できます。
・スケジューリング
特定の時間や一定の遅延の後にジョブを処理するようにスケジュールすることができ、特定の順序や特定の時間に実行する必要があるタスクを管理しやすくします。
・発注
顧客が注文すると、注文をすぐに処理する代わりに、注文の詳細がキューに追加されます。これにより、発注処理が高速になり、顧客に迅速な注文が提供されます。
・注文処理
作業者はキューから注文を受け取り、在庫の確認、顧客への請求、発送準備などの注文の処理を行います。
・通知
注文が処理されると、別のジョブがキューに追加され、顧客に確認メールやSMSを送信します。
キューを使用することで、迅速な注文処理とスムーズな顧客体験を提供可能です。また、システムは大量の注文を効率的に処理することができ、必要に応じてスケールアップし、一部のジョブで一時的な問題が発生しても、各タスク(注文処理、通知)が確実に完了させることができます。
キューは、タスクをより効率的に管理し、システムパフォーマンスを向上させ、信頼性とスケーラビリティを確保するのに有効的なツールです。
タスクを非同期処理し、ワークロードを分散し、フォールトトレランスの仕組みを提供することで、キューは最新のソフトウェアアーキテクチャ、特にマイクロサービスやクラウドベースの環境において重要な役割を果たします。