NestJS BullMQでジョブキューを実装する
ユーザー作成時に非同期ジョブをキューに積み、Consumerで処理するBullMQの基本パターン
NestJSテスト入門まで終えている前提。
この記事では、ユーザー作成をトリガーに「ウェルカムメール送信」のジョブをキューに積み、Consumer がそれを非同期で処理する最小パターンを実装する。
この記事でやること
- Redisをローカルで起動する
@nestjs/bullmqを導入し、mailキューを定義するUsersServiceでユーザー作成時にジョブをキューに積む(Producer)MailServiceに実際のメール送信ロジックを書くMailConsumerでジョブを受け取りMailServiceに委譲する(Consumer)POST /usersを叩いてConsumerのログを確認する
触るファイル一覧
api-sample/
└─ src/
├─ app.module.ts # BullModule.forRootを追加、MailModuleをimport
├─ users/
│ ├─ users.module.ts # BullModule.registerQueueを追加
│ └─ users.service.ts # @InjectQueueでキューを注入し、createでジョブを積む
└─ mail/ # 新規作成
├─ mail.module.ts
├─ mail.service.ts # メール送信ロジック
└─ mail.consumer.ts # キューからジョブを受け取りMailServiceに委譲0. Redisを用意する
BullMQはRedisを必要とする。Dockerが入っている場合は次のコマンドで起動できる。
docker run -d --name redis-dev -p 6379:6379 redis:7-alpine停止するときは次のようにする。
docker stop redis-devDockerが無い場合はRedis公式のインストール手順を参照する。
既に docker-compose.yml がある場合(NestJSをDocker化するを済ませている場合)は、次のように services にRedisを追加してもよい。
services:
api:
# ...(既存のまま)
redis:
image: redis:7-alpine
ports:
- "6379:6379".env に接続情報を追加する。
REDIS_HOST=localhost
REDIS_PORT=63791. パッケージを追加する
pnpmの場合。
pnpm add @nestjs/bullmq bullmqnpmの場合。
npm install @nestjs/bullmq bullmq2. AppModule に BullModule.forRoot を追加する
編集するファイルは api-sample/src/app.module.ts。
BullModule.forRoot はアプリ全体のRedis接続を1箇所で設定する。各ModuleはここでつないだRedisを共有する。
import { Module } from "@nestjs/common";
import { BullModule } from "@nestjs/bullmq";
import { UsersModule } from "./users/users.module";
import { AuthModule } from "./auth/auth.module";
import { MailModule } from "./mail/mail.module";
@Module({
imports: [
// Redisへの接続設定をアプリ全体で1回だけ行う
BullModule.forRoot({
connection: {
host: process.env.REDIS_HOST ?? "localhost",
port: parseInt(process.env.REDIS_PORT ?? "6379", 10),
},
}),
UsersModule,
AuthModule,
MailModule, // 後述で作成するMailModuleを追加
],
})
export class AppModule {}3. MailModule / MailService / MailConsumer を作る
3-1. ジョブのデータ型を定義する
ジョブに乗せるデータの型は、ProducerとConsumerの両側から参照する。mail.module.ts に MAIL_QUEUE 定数と合わせて定義するのが最もシンプルでまとまりやすい。
新規作成するファイルは api-sample/src/mail/mail.module.ts。型の定義は後のステップでも参照するため先に置いておく。
import { Module } from "@nestjs/common";
import { BullModule } from "@nestjs/bullmq";
import { MailService } from "./mail.service";
import { MailConsumer } from "./mail.consumer";
// キュー名を定数で管理する。ProducerとConsumer双方で参照する
export const MAIL_QUEUE = "mail";
// ジョブに乗せるデータの型
export interface WelcomeMailJobData {
userId: string;
email: string;
name: string;
}
@Module({
imports: [
// このModuleでConsumerとして使うキューを登録する
BullModule.registerQueue({ name: MAIL_QUEUE }),
],
providers: [MailService, MailConsumer],
})
export class MailModule {}BullModule.registerQueue はキュー名を宣言する。ProducerとConsumerで同じキュー名を登録する必要があるため、定数 MAIL_QUEUE で一元管理する。
3-2. mail.service.ts を作る
新規作成するファイルは api-sample/src/mail/mail.service.ts。
実際のメール送信ロジックをここに集約する。この記事ではダミー実装でログ出力のみ行うが、SendGridやNodemailerへの差し替えはこのファイルだけで済む。
import { Injectable, Logger } from "@nestjs/common";
import { WelcomeMailJobData } from "./mail.module";
@Injectable()
export class MailService {
private readonly logger = new Logger(MailService.name);
async sendWelcome(data: WelcomeMailJobData): Promise<void> {
// 実際のメール送信はここに書く(SendGrid / Nodemailer など)
// この記事ではダミーの非同期処理でログ出力のみ
await new Promise((resolve) => setTimeout(resolve, 100));
this.logger.log(`ウェルカムメールを送信しました: ${data.email}`);
}
}3-3. mail.consumer.ts を作る
新規作成するファイルは api-sample/src/mail/mail.consumer.ts。
Consumerはキューからジョブを受け取り、処理の実体を MailService に委譲する役割に徹する。
import { Processor, WorkerHost, OnWorkerEvent } from "@nestjs/bullmq";
import { Logger } from "@nestjs/common";
import { Job } from "bullmq";
import { MAIL_QUEUE, WelcomeMailJobData } from "./mail.module";
import { MailService } from "./mail.service";
// @Processor にキュー名を渡す。このクラスが MAIL_QUEUE を監視するConsumerになる
@Processor(MAIL_QUEUE)
export class MailConsumer extends WorkerHost {
private readonly logger = new Logger(MailConsumer.name);
constructor(private readonly mailService: MailService) {
super();
}
// キューからジョブを取り出したときに呼ばれるメインの処理
async process(job: Job<WelcomeMailJobData>): Promise<void> {
this.logger.log(
`[${job.name}] ジョブを受信: userId=${job.data.userId}, email=${job.data.email}`,
);
// 送信ロジックはMailServiceに委譲する
await this.mailService.sendWelcome(job.data);
this.logger.log(`[${job.name}] 処理完了: ${job.data.email}`);
}
// ジョブが失敗したときのハンドリング
@OnWorkerEvent("failed")
onFailed(job: Job, error: Error): void {
this.logger.error(`[${job.name}] 失敗: userId=${job.data.userId}, reason=${error.message}`);
}
}WorkerHost を継承し process メソッドを実装するのがBullMQ版の書き方。@OnWorkerEvent で "completed" / "failed" / "active" などのイベントもフックできる。
4. Producer側の実装:UsersModule と UsersService を修正する
ジョブを「積む」側の実装。Serviceからキューへジョブを追加する。
4-1. UsersModule に BullModule.registerQueue を追加する
編集するファイルは api-sample/src/users/users.module.ts。
import { Module } from "@nestjs/common";
import { BullModule } from "@nestjs/bullmq";
import { UsersController } from "./users.controller";
import { UsersService } from "./users.service";
import { PrismaModule } from "../prisma/prisma.module";
import { MAIL_QUEUE } from "../mail/mail.module";
@Module({
imports: [
PrismaModule,
// ジョブを積む側でも同じキュー名を登録する
BullModule.registerQueue({ name: MAIL_QUEUE }),
],
controllers: [UsersController],
providers: [UsersService],
})
export class UsersModule {}ProducerとConsumerで同じ MAIL_QUEUE を registerQueue に渡す。BullMQは同じキュー名をRedis上の同じキューとして扱う。
4-2. UsersService にジョブを積む処理を追加する
編集するファイルは api-sample/src/users/users.service.ts。
追加するimport。
import { InjectQueue } from "@nestjs/bullmq";
import { Queue } from "bullmq";
import { MAIL_QUEUE, WelcomeMailJobData } from "../mail/mail.module";コンストラクタに @InjectQueue を追加する。
既存の PrismaService の注入はそのままに、@InjectQueue(MAIL_QUEUE) を加える。
constructor(
private readonly prisma: PrismaService,
// キュー名を指定してQueueを注入する
@InjectQueue(MAIL_QUEUE) private readonly mailQueue: Queue,
) {}create メソッドにジョブを積む処理を追加する。
ユーザーをDBに保存した直後、mailQueue.add でジョブをキューに投入する。
async create(dto: CreateUserDto) {
// 既存の処理(bcryptによるハッシュ化など)はそのまま
const passwordHash = await bcrypt.hash(dto.password, 10);
const user = await this.prisma.user.create({
data: { email: dto.email, name: dto.name, passwordHash },
});
// ユーザー作成後にウェルカムメールのジョブをキューに積む
const jobData: WelcomeMailJobData = {
userId: user.id,
email: user.email,
name: user.name ?? "",
};
await this.mailQueue.add("send-welcome", jobData);
return { id: user.id, email: user.email, name: user.name };
}mailQueue.add の第1引数はジョブ名、第2引数はジョブのデータ。Consumer側では job.name と job.data で参照できる。
5. 動作確認
アプリを起動する。
pnpm start:dev別ターミナルでユーザーを作成する。
curl -s -X POST http://localhost:3000/users \
-H "Content-Type: application/json" \
-d '{"email":"queue@example.com","name":"Queue Taro","password":"password123"}'Nestのログに次のような出力が出ればConsumerが動いている。
[MailConsumer] [send-welcome] ジョブを受信: userId=xxx, email=queue@example.com
[MailService] ウェルカムメールを送信しました: queue@example.com
[MailConsumer] [send-welcome] 処理完了: queue@example.comAPIのレスポンスは create メソッドがDBへの保存を完了した時点で返る。メールの送信(Consumer側の処理)はレスポンスを返した後に非同期で行われる。
よくあるエラー
Error: connect ECONNREFUSED 127.0.0.1:6379
Redisが起動していない。次のコマンドで状態を確認する。
docker ps | grep redis起動していない場合は docker run -d --name redis-dev -p 6379:6379 redis:7-alpine で起動する。
Missing Queue name for @InjectQueue
@InjectQueue(MAIL_QUEUE) の MAIL_QUEUE が undefined になっている可能性がある。MAIL_QUEUE の定数が mail.module.ts から正しくimportされているか確認する。
Consumerがジョブを処理しない(キューに積まれるが無視される)
MailModule が AppModule の imports に追加されているか確認する。MailModuleを追加し忘れると、Consumerが起動せずキューにジョブが溜まるだけになる。
@InjectQueue でDIエラーになる
UsersModule の imports に BullModule.registerQueue({ name: MAIL_QUEUE }) が入っているか確認する。Producer側のModuleにも registerQueue が必要。
実装タスク(チェックリスト)
- Redisを起動する(
docker run -d --name redis-dev -p 6379:6379 redis:7-alpine) -
.envにREDIS_HOST/REDIS_PORTを追加 -
@nestjs/bullmqとbullmqをインストール -
src/mail/mail.module.tsを作成(MAIL_QUEUE定数・WelcomeMailJobData型、BullModule.registerQueue、MailService/MailConsumerを登録) -
src/mail/mail.service.tsを作成(送信ロジックを実装) -
src/mail/mail.consumer.tsを作成(WorkerHostを継承、processでMailServiceに委譲) -
AppModuleにBullModule.forRootとMailModuleを追加 -
UsersModuleにBullModule.registerQueueを追加 -
UsersServiceに@InjectQueueでキューを注入し、createでジョブを積む -
pnpm start:dev→POST /usersでConsumerのログを確認