awesome-hacks
Docs

NestJS BullMQでジョブキューを実装する

ユーザー作成時に非同期ジョブをキューに積み、Consumerで処理するBullMQの基本パターン

最終更新:2026/05/26

NestJSテスト入門まで終えている前提。
この記事では、ユーザー作成をトリガーに「ウェルカムメール送信」のジョブをキューに積み、Consumer がそれを非同期で処理する最小パターンを実装する。

この記事でやること

  • Redisをローカルで起動する
  • @nestjs/bullmq を導入し、mail キューを定義する
  • UsersService でユーザー作成時にジョブをキューに積む(Producer)
  • MailService に実際のメール送信ロジックを書く
  • MailConsumer でジョブを受け取り MailService に委譲する(Consumer)
  • POST /users を叩いてConsumerのログを確認する

触るファイル一覧

api-sample/
└─ src/
   ├─ app.module.ts                  # BullModule.forRootを追加、MailModuleをimport
   ├─ users/
   │  ├─ users.module.ts             # BullModule.registerQueueを追加
   │  └─ users.service.ts            # @InjectQueueでキューを注入し、createでジョブを積む
   └─ mail/                          # 新規作成
      ├─ mail.module.ts
      ├─ mail.service.ts             # メール送信ロジック
      └─ mail.consumer.ts            # キューからジョブを受け取りMailServiceに委譲

0. Redisを用意する

BullMQはRedisを必要とする。Dockerが入っている場合は次のコマンドで起動できる。

docker run -d --name redis-dev -p 6379:6379 redis:7-alpine

停止するときは次のようにする。

docker stop redis-dev

Dockerが無い場合はRedis公式のインストール手順を参照する。

既に docker-compose.yml がある場合(NestJSをDocker化するを済ませている場合)は、次のように services にRedisを追加してもよい。

services:
  api:
    # ...(既存のまま)
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"

.env に接続情報を追加する。

REDIS_HOST=localhost
REDIS_PORT=6379

1. パッケージを追加する

pnpmの場合。

pnpm add @nestjs/bullmq bullmq

npmの場合。

npm install @nestjs/bullmq bullmq

2. AppModuleBullModule.forRoot を追加する

編集するファイルは api-sample/src/app.module.ts

BullModule.forRoot はアプリ全体のRedis接続を1箇所で設定する。各ModuleはここでつないだRedisを共有する。

import { Module } from "@nestjs/common";
import { BullModule } from "@nestjs/bullmq";
import { UsersModule } from "./users/users.module";
import { AuthModule } from "./auth/auth.module";
import { MailModule } from "./mail/mail.module";

@Module({
  imports: [
    // Redisへの接続設定をアプリ全体で1回だけ行う
    BullModule.forRoot({
      connection: {
        host: process.env.REDIS_HOST ?? "localhost",
        port: parseInt(process.env.REDIS_PORT ?? "6379", 10),
      },
    }),
    UsersModule,
    AuthModule,
    MailModule, // 後述で作成するMailModuleを追加
  ],
})
export class AppModule {}

3. MailModule / MailService / MailConsumer を作る

3-1. ジョブのデータ型を定義する

ジョブに乗せるデータの型は、ProducerとConsumerの両側から参照する。mail.module.tsMAIL_QUEUE 定数と合わせて定義するのが最もシンプルでまとまりやすい。

新規作成するファイルは api-sample/src/mail/mail.module.ts。型の定義は後のステップでも参照するため先に置いておく。

import { Module } from "@nestjs/common";
import { BullModule } from "@nestjs/bullmq";
import { MailService } from "./mail.service";
import { MailConsumer } from "./mail.consumer";

// キュー名を定数で管理する。ProducerとConsumer双方で参照する
export const MAIL_QUEUE = "mail";

// ジョブに乗せるデータの型
export interface WelcomeMailJobData {
  userId: string;
  email: string;
  name: string;
}

@Module({
  imports: [
    // このModuleでConsumerとして使うキューを登録する
    BullModule.registerQueue({ name: MAIL_QUEUE }),
  ],
  providers: [MailService, MailConsumer],
})
export class MailModule {}

BullModule.registerQueue はキュー名を宣言する。ProducerとConsumerで同じキュー名を登録する必要があるため、定数 MAIL_QUEUE で一元管理する。

3-2. mail.service.ts を作る

新規作成するファイルは api-sample/src/mail/mail.service.ts

実際のメール送信ロジックをここに集約する。この記事ではダミー実装でログ出力のみ行うが、SendGridやNodemailerへの差し替えはこのファイルだけで済む。

import { Injectable, Logger } from "@nestjs/common";
import { WelcomeMailJobData } from "./mail.module";

@Injectable()
export class MailService {
  private readonly logger = new Logger(MailService.name);

  async sendWelcome(data: WelcomeMailJobData): Promise<void> {
    // 実際のメール送信はここに書く(SendGrid / Nodemailer など)
    // この記事ではダミーの非同期処理でログ出力のみ
    await new Promise((resolve) => setTimeout(resolve, 100));
    this.logger.log(`ウェルカムメールを送信しました: ${data.email}`);
  }
}

3-3. mail.consumer.ts を作る

新規作成するファイルは api-sample/src/mail/mail.consumer.ts

Consumerはキューからジョブを受け取り、処理の実体を MailService に委譲する役割に徹する。

import { Processor, WorkerHost, OnWorkerEvent } from "@nestjs/bullmq";
import { Logger } from "@nestjs/common";
import { Job } from "bullmq";
import { MAIL_QUEUE, WelcomeMailJobData } from "./mail.module";
import { MailService } from "./mail.service";

// @Processor にキュー名を渡す。このクラスが MAIL_QUEUE を監視するConsumerになる
@Processor(MAIL_QUEUE)
export class MailConsumer extends WorkerHost {
  private readonly logger = new Logger(MailConsumer.name);

  constructor(private readonly mailService: MailService) {
    super();
  }

  // キューからジョブを取り出したときに呼ばれるメインの処理
  async process(job: Job<WelcomeMailJobData>): Promise<void> {
    this.logger.log(
      `[${job.name}] ジョブを受信: userId=${job.data.userId}, email=${job.data.email}`,
    );

    // 送信ロジックはMailServiceに委譲する
    await this.mailService.sendWelcome(job.data);

    this.logger.log(`[${job.name}] 処理完了: ${job.data.email}`);
  }

  // ジョブが失敗したときのハンドリング
  @OnWorkerEvent("failed")
  onFailed(job: Job, error: Error): void {
    this.logger.error(`[${job.name}] 失敗: userId=${job.data.userId}, reason=${error.message}`);
  }
}

WorkerHost を継承し process メソッドを実装するのがBullMQ版の書き方。@OnWorkerEvent"completed" / "failed" / "active" などのイベントもフックできる。

4. Producer側の実装:UsersModuleUsersService を修正する

ジョブを「積む」側の実装。Serviceからキューへジョブを追加する。

4-1. UsersModuleBullModule.registerQueue を追加する

編集するファイルは api-sample/src/users/users.module.ts

import { Module } from "@nestjs/common";
import { BullModule } from "@nestjs/bullmq";
import { UsersController } from "./users.controller";
import { UsersService } from "./users.service";
import { PrismaModule } from "../prisma/prisma.module";
import { MAIL_QUEUE } from "../mail/mail.module";

@Module({
  imports: [
    PrismaModule,
    // ジョブを積む側でも同じキュー名を登録する
    BullModule.registerQueue({ name: MAIL_QUEUE }),
  ],
  controllers: [UsersController],
  providers: [UsersService],
})
export class UsersModule {}

ProducerとConsumerで同じ MAIL_QUEUEregisterQueue に渡す。BullMQは同じキュー名をRedis上の同じキューとして扱う。

4-2. UsersService にジョブを積む処理を追加する

編集するファイルは api-sample/src/users/users.service.ts

追加するimport。

import { InjectQueue } from "@nestjs/bullmq";
import { Queue } from "bullmq";
import { MAIL_QUEUE, WelcomeMailJobData } from "../mail/mail.module";

コンストラクタに @InjectQueue を追加する。

既存の PrismaService の注入はそのままに、@InjectQueue(MAIL_QUEUE) を加える。

constructor(
  private readonly prisma: PrismaService,
  // キュー名を指定してQueueを注入する
  @InjectQueue(MAIL_QUEUE) private readonly mailQueue: Queue,
) {}

create メソッドにジョブを積む処理を追加する。

ユーザーをDBに保存した直後、mailQueue.add でジョブをキューに投入する。

async create(dto: CreateUserDto) {
  // 既存の処理(bcryptによるハッシュ化など)はそのまま
  const passwordHash = await bcrypt.hash(dto.password, 10);
  const user = await this.prisma.user.create({
    data: { email: dto.email, name: dto.name, passwordHash },
  });

  // ユーザー作成後にウェルカムメールのジョブをキューに積む
  const jobData: WelcomeMailJobData = {
    userId: user.id,
    email: user.email,
    name: user.name ?? "",
  };
  await this.mailQueue.add("send-welcome", jobData);

  return { id: user.id, email: user.email, name: user.name };
}

mailQueue.add の第1引数はジョブ名、第2引数はジョブのデータ。Consumer側では job.namejob.data で参照できる。

5. 動作確認

アプリを起動する。

pnpm start:dev

別ターミナルでユーザーを作成する。

curl -s -X POST http://localhost:3000/users \
  -H "Content-Type: application/json" \
  -d '{"email":"queue@example.com","name":"Queue Taro","password":"password123"}'

Nestのログに次のような出力が出ればConsumerが動いている。

[MailConsumer] [send-welcome] ジョブを受信: userId=xxx, email=queue@example.com
[MailService] ウェルカムメールを送信しました: queue@example.com
[MailConsumer] [send-welcome] 処理完了: queue@example.com

APIのレスポンスは create メソッドがDBへの保存を完了した時点で返る。メールの送信(Consumer側の処理)はレスポンスを返した後に非同期で行われる。

よくあるエラー

Error: connect ECONNREFUSED 127.0.0.1:6379

Redisが起動していない。次のコマンドで状態を確認する。

docker ps | grep redis

起動していない場合は docker run -d --name redis-dev -p 6379:6379 redis:7-alpine で起動する。

Missing Queue name for @InjectQueue

@InjectQueue(MAIL_QUEUE)MAIL_QUEUEundefined になっている可能性がある。MAIL_QUEUE の定数が mail.module.ts から正しくimportされているか確認する。

Consumerがジョブを処理しない(キューに積まれるが無視される)

MailModuleAppModuleimports に追加されているか確認する。MailModuleを追加し忘れると、Consumerが起動せずキューにジョブが溜まるだけになる。

@InjectQueue でDIエラーになる

UsersModuleimportsBullModule.registerQueue({ name: MAIL_QUEUE }) が入っているか確認する。Producer側のModuleにも registerQueue が必要。

実装タスク(チェックリスト)

  • Redisを起動する(docker run -d --name redis-dev -p 6379:6379 redis:7-alpine
  • .envREDIS_HOST / REDIS_PORT を追加
  • @nestjs/bullmqbullmq をインストール
  • src/mail/mail.module.ts を作成(MAIL_QUEUE 定数・WelcomeMailJobData 型、BullModule.registerQueueMailService / MailConsumer を登録)
  • src/mail/mail.service.ts を作成(送信ロジックを実装)
  • src/mail/mail.consumer.ts を作成(WorkerHost を継承、processMailService に委譲)
  • AppModuleBullModule.forRootMailModule を追加
  • UsersModuleBullModule.registerQueue を追加
  • UsersService@InjectQueue でキューを注入し、create でジョブを積む
  • pnpm start:devPOST /users でConsumerのログを確認

参考