본문 바로가기

Project/기록

Bullmq Document + CS 관점으로 다시 생각

BullMQ는 왜 Queue로 설계가 되어있을까?

Queue는 대표적인 FIFO(First In, First Out) 구조입니다. 비동기 작업처리에 직관적이고 예측이 가능하다는 장점을 갖고 있습니다.장점은 아래와 같습니다.

  • 작업 순서가 명확합니다. 들어오는 순서대로 작업이 처리됩니다. 해당 내용은 큐잉 이론과도 연결됩니다.
  • 생산자와 소비자 패턴을 구현할 때 적합합니다. 생산자는 큐에 넣고, 소비자는 큐에서 작업을 빼내서 처리합니다.
  • 병렬 처리와 동시성 관리입니다. 여러 소비자가 하나의 큐에서 작업을 꺼내서 처리할 수 있어 병렬 처리가 가능하고, 여러명의 소비자가 하나의 큐에 접근해서 작업을 처리할 수 있어 동시성 관리가 가능합니다.
  • 확장성입니다. 큐에 작업이 많이 쌓이면서 Worker를 늘려 처리량을 조절할 수도 있습니다.


BullMQ는 Redis에 저장하는 이유는 무엇일까?

Redis는 메모리 기반으로 빠른 읽기와 쓰기가 가능하고, Persistence 옵션을 통해 디스크 백업도 지원합니다. Redis의 다양한 자료구조인 리스트(LIST), 정렬된 집합(ZSET), 해시(HASH), 스트림(STREAM) 등을 활용해 정렬된 작업 스케줄링, 우선순위 큐, 작업 상태 추적 같은 기능을 빠르고 안전하게 처리할 수 있습니다. 이뿐만 아니라 Redis는 낮은 지연 시간, 고가용성, 복제와 클러스터링 지원 측면에서 다른 대안들보다 적합한 선택입니다.

SoC와 SRP

컴퓨터 과학 이론에서 "Separation of Concerns"와 "Single Responsibility Principle"은 모듈 설계의 핵심 원칙인데, BullMQ는 이 원칙을 따라 큐는 생산자 역할만 담당하고 작업 처리나 소비는 Worker나 Scheduler에서 분리하여 맡도록 했습니다. 이러한 구조는 책임을 명확히 구분해 생산자-소비자 패턴을 엄격히 유지할 수 있고, Node.js의 이벤트 기반 아키텍처와도 잘 맞아 확장성을 높여줍니다.

BullMQ의 기능

delay 옵션은 작업을 특정 시점 이후에 실행하도록 해 타이머 큐나 시간 기반 이벤트 큐의 전형적인 전략을 따릅니다. priority는 운영체제 스케줄러처럼 처리 우선순위가 높은 작업을 빠르게 처리할 수 있게 하고, backoff는 실패 복구 시 부하를 분산하기 위해 지수형이나 선형 등 다양한 재시도 정책을 지원합니다. 이 모든 요소들이 비동기 작업 처리의 안정성과 효율성을 높이는 핵심 메커니즘으로 작용합니다.


BullMQ의 내부 설계

BullMQ의 Queue 클래스는 내부적으로 Redis에 연결을 활용해 작업을 관리합니다. 구체적으로 작업은 우선순위 기반 스케줄링을 위해 wait이라는 정렬 집합(ZSET)에 삽입됩니다.

await this.client.zadd(this.keys.wait, score, jobId)

Queue.add() 메서드는 작업 데이터를 JSON으로 직렬화해 Redis의 해시에 저장하고, 작업 ID를 ZSET이나 리스트 등에 넣습니다. 이때 opts.delay, opts.priority 같은 옵션은 내부적으로 ZSET의 점수(score)로 변환되어 작업 스케줄링에 직접적인 영향을 미칩니다.

BullMQ는 큐 단위로 Redis 키를 생성합니다.

bull:Cars:id // 작업 ID를 관리
bull:Cars:wait는 // 대기 중인 작업의 ZSET
bull:Cars:delayed // 지연된 작업을 위한 ZSET입니다.

Queue 인스턴스가 생성될 때 이러한 키들에 대한 참조 및 구성을 수행하지만, 실제 초기화는 지연(Lazy) 방식으로 처리됩니다. 이 방식은 큐를 불필요하게 초기화하지 않으면서, 이미 존재하는 큐를 그대로 이어받아 사용할 수 있게 해 재시작 시 내구성을 높입니다.

또한, delay와 priority 옵션이 ZSET으로 관리되는 이유는 Redis의 정렬 집합이 우선순위 큐나 지연 큐 구현에 매우 적합하기 때문입니다. ZADD 명령어를 통해 점수를 기반으로 작업을 삽입하고, ZRANGEBYSCORE 명령어로 현재 시간 이전에 실행해야 할 작업을 효율적으로 조회합니다.

ZSET이 우선순위 큐나 지연 큐 구현에 적합한 이유

더보기

1. 자동 정렬

Sorted Set은 각 요소가 점수(score)와 연결되어 있으며, 이 점수를 기준으로 내부적으로 자동으로 정렬됩니다. 이 점수를 우선순위나 처리 시점(예: timestamp)으로 설정하면, 우선순위가 높은 항목이나 처리 시간이 도래한 항목을 쉽게 찾을 수 있습니다.

2. 효율적인 접근

Redis의 Sorted Set은 Skip List 등 효율적인 자료구조를 사용해, 요소의 삽입, 삭제, 조회가 모두 평균적으로 O(log⁡N)의 시간 복잡도를 가집니다. 이는 대량의 데이터에서도 빠른 처리가 가능함을 의미합니다

3. 최소/최대값 접근 용이

우선순위 큐에서 가장 중요한 연산 중 하나는 최소(또는 최대) 우선순위를 가진 항목을 빠르게 찾는 것입니다. Sorted Set은 ZRANGE, ZPOPMIN, ZPOPMAX 등 명령어를 통해 이 연산을 매우 효율적으로 지원합니다.

4. 고유성 보장

Sorted Set은 Set의 특성도 갖고 있어, 동일한 값을 중복해서 저장하지 않습니다. 이는 실수로 중복 작업이 큐에 들어가는 것을 방지할 수 있습니다.

5. 다양한 범위 조회

우선순위 큐나 지연 큐에서 특정 범위의 항목(예: 처리 시간이 임박한 작업)만 조회하거나, 우선순위가 특정 값 이하/이상인 항목만 추출하는 것도 쉽게 할 수 있습니다.

이 구조는 작업 스케줄링의 정확도를 밀리초 단위로 보장하며, 선형 탐색 대신 이진 탐색 기반 조회를 제공해 많은 작업을 지연 상태로 관리하더라도 성능 저하 없이 처리할 수 있습니다.

마지막으로, 왜 Worker가 없어도 작업을 추가할 수 있는가에 대해서는 BullMQ가 생산자와 소비자를 강결합하지 않도록 설계되었기 때문입니다. 즉, 작업 추가 시 워커가 반드시 실행 중일 필요는 없습니다. 작업은 Redis에 안전하게 저장되며, 이후 워커가 연결되면 저장된 상태를 복원해 작업을 처리합니다. 이러한 구조는 장애 복원력(fault-tolerant)을 높이는 핵심적인 아키텍처 설계입니다.


BullMQ의 작업 관리

Redis 내에서 여러 상태별로 구분된 서브 키 공간에 저장됩니다.

  • wait List에는 처리 대기 중인 작업
  • active List에는 현재 처리 중인 작업
  • completed Set에는 완료된 작업
  • failed Set에는 실패한 작업
  • delayed List, ZSET에서 지연된 처리 작업

상태에 따라 저장되는 공간이 다릅니다. 이외에도 stalled, repeat, priority, events, logs 등 다양한 부가 키들이 존재합니다. 각 작업은 별도의 Redis 해시로 저장되고 키 형식은 보통 bull:<queueName>:<jobId> 형태를 띕니다.

removeOnComplete와 removeOnFail

작업 완료 또는 실패 시 자동으로 관련 키를 삭제하는 removeOnComplete 및 removeOnFail 옵션은 Redis 관점에서 중요한 역할을 합니다. 이 옵션을 true로 설정하면, 작업이 완료됨과 동시에 해당 작업의 해시 키(bull:<queue>:<jobId>)와 completed 또는 failed 집합 내의 작업 ID가 Redis에서 삭제되며, 관련 로그와 이벤트도 내부적으로 정리되어 Redis 메모리 부담이 최소화됩니다. Redis는 In Memory이기 때문에 메모리 관리를 잘해야 오류 발생의 위험이 감소합니다.

{ count, age } 형태의 설정

반면 { count, age } 형태로 설정하면, BullMQ는 큐 단위로 주기적인 청소 작업(cleanup)을 수행합니다. 이때 count는 저장할 작업의 최대 개수를 의미하며, 초과된 작업부터 삭제합니다. age는 작업의 생성 시각을 기준으로 설정된 시간(초)보다 오래된 작업을 삭제하는 기준이 됩니다. Redis에서는 ZREMRANGEBYSCORE나 ZREMRANGEBYRANK 명령으로 오래되거나 순위가 낮은 작업 ID를 제거하고, 해당 작업 해시도 삭제합니다. 이 방식은 최신 작업은 유지하면서 오래된 작업만 효율적으로 정리할 수 있습니다.

실제로 Redis에 저장된 작업 키는 KEYS bull:myQueue:* 명령으로 확인할 수 있습니다. 만약 removeOnComplete 옵션을 설정하지 않으면, 작업 ID와 관련된 키들이 계속 쌓여 Redis 메모리 사용량이 불필요하게 증가할 수 있습니다.

해당 키 설정을 하지 않고, 테스트를 하다가 약 20만개 이상의 키가 쌓인 적이 있습니다. 코드 한 두줄을 제외하고 만들었다가 이후에 원인을 파악하지 못하고 문제가 발생할 수 있습니다.

Redis 키는 작업당 1개 이상 생성되므로, 작업 수가 수천에서 수백만 건으로 늘어나면 키 수가 기하급수적으로 증가합니다. 특히 완료된 작업이 과도하게 쌓이면 Redis가 메모리 부족 문제에 직면할 수 있습니다. 이때는 Redis의 LRU 정책 없이도 문제가 발생할 수 있어, 운영 환경에서는 반드시 removeOnComplete와 removeOnFail 옵션을 적절히 설정해 Redis의 장기 안정성을 확보하는 것이 중요합니다.

removeOnComplete와 removeOnFail 옵션은 다음과 같이 메모리 효율성에 차이를 만듭니다.

  • true 설정: 작업 완료/실패 즉시 관련 키를 삭제하므로 메모리 효율이 매우 좋음
  • { count, age } 설정: 백그라운드에서 주기적으로 오래된 작업을 삭제해 좋은 효율 유지
  • 미설정: 모든 작업 상태를 유지해 비효율적이고 메모리 부담이 큼

운영 시 권장하는 전략은 완료된 작업은 removeOnComplete: 1000처럼 최신 1000개만 유지하고, 실패한 작업은 removeOnFail: { age: 86400 }으로 하루 정도 보관하여 디버깅에 활용하는 것입니다. 대량 큐를 운영할 때는 Redis 메모리 상태를 INFO memory, SCAN 명령으로 지속적으로 모니터링하는 것을 권장합니다.


BullMQ에서 멱등성(Idempotence) 이란?

멱등성은 동일한 요청을 여러 번 수행해도 결과가 변하지 않는 성질을 의미합니다. 예를 들어, HTTP의 PUT이나 DELETE 메서드는 멱등성을 가져야 하며, 메시지 큐 시스템에서는 네트워크 재시도 시 같은 작업이 중복 실행되지 않도록 보장해야 합니다. 이는 시스템이 한 번 처리한 작업의 상태를 기억하거나 감지하여 재처리를 방지하는 방식을 뜻합니다.

BullMQ는 이러한 멱등성을 고유한 작업 ID(jobId)를 활용해 구현합니다. 작업을 추가할 때 jobId를 지정하면, 이미 동일한 ID를 가진 작업이 Redis에 존재하는지 검사하고, 존재할 경우 새 작업 추가를 막고 기존 작업을 반환합니다. 이 과정은 Redis의 원자적 연산을 통해 레이스 컨디션 없이 안전하게 처리됩니다.

addJob

protected async addJob(
  name: NameType,
  data: DataType,
  opts?: JobsOptions,
): Promise<Job<DataType, ResultType, NameType>> {
  // ... 
  const jobId = opts?.jobId;
  if (jobId == '0' || jobId?.startsWith('0:')) {
    throw new Error("JobId cannot be '0' or start with 0:");
  }
  const job = await this.Job.create<DataType, ResultType, NameType>(
    this as MinimalQueue,
    name,
    data,
    {
      ...this.jobsOpts,
      ...opts,
      jobId,
    },
  );
  this.emit('waiting', job as JobBase<DataType, ResultType, NameType>);
  return job;
}

Redis Lua Script 

-- KEYS[1]: jobs hash key
-- KEYS[2]: wait list key
-- ARGV[1]: jobId
-- ARGV[2]: job data

if redis.call('HEXISTS', KEYS[1], ARGV[1]) == 1 then
  -- 이미 jobId가 존재하면 nil 반환
  return nil
else
  -- 존재하지 않으면 작업 저장
  redis.call('HSET', KEYS[1], ARGV[1], ARGV[2])
  -- 대기 큐 등에 추가
  redis.call('LPUSH', KEYS[2], ARGV[1])
  return ARGV[1]
end

 

removeOnComplete, removeOnFail 옵션 아래에선?

그러나 removeOnComplete 또는 removeOnFail 옵션으로 작업 자동 삭제가 활성화된 경우, 작업이 완료된 뒤 Redis에서 해당 작업 정보가 제거되므로, 동일 jobId를 가진 새 작업은 중복으로 인식되지 않습니다. 즉, 멱등성은 Redis에 작업 정보가 남아있는 동안에만 유효하며, 작업 정보가 삭제되면 멱등성이 보장되지 않는 한계가 있습니다.

이러한 한계를 극복하기 위해 컴퓨터 과학적으로는 Persistent Idempotency Key 패턴이 활용됩니다. 작업 본문이나 비즈니스 키를 해시해 별도의 Redis 키에 TTL(유효기간)을 설정하고, 존재하지 않을 때만 작업을 처리하도록 하여 멱등성을 유지하는 방법입니다. 또한, 데이터베이스나 Redis의 Sorted Set 등을 활용해 처리한 작업 기록을 별도로 관리하면 작업 삭제 이후에도 멱등성을 판별할 수 있습니다.

BullMQ 소스코드에 따르면, jobId가 있을 경우 Redis의 존재 여부를 확인하고 MULTI 트랜잭션 내에서 신규 작업 생성을 스킵하는 방식을 사용합니다. 이는 효율적이지만, 자동 삭제 옵션과 결합되면 영속성 보장이 어려운 점이 있습니다.

더보기
  • 자동 삭제 옵션 설정 시 작업이 완료/실패 후 삭제됨
    BullMQ는 작업이 완료(completed)되거나 실패(failed)된 후, 옵션에 따라 해당 작업을 Redis에서 자동으로 삭제할 수 있습니다(removeOnComplete, removeOnFail)4.
    이때, 해당 jobId를 가진 작업은 Redis에서 완전히 제거됩니다.
  • 삭제된 jobId로 다시 작업 추가 가능
    BullMQ는 Redis에 존재하지 않는 jobId에 대해 중복 검사를 하므로, 자동 삭제 옵션이 활성화된 경우 이미 처리되어 삭제된 jobId로 다시 작업을 추가할 수 있습니다54.
    즉, 동일한 jobId로 작업이 반복적으로 생성될 수 있습니다.
  • 영속성(중복 방지) 보장의 한계
    중복 방지 기능은 오직 Redis에 해당 jobId가 존재하는 동안만 유효합니다.
    작업이 완료되어 삭제되면, 같은 jobId로 다시 작업을 추가할 수 있으므로, jobId의 영속적 중복 방지(예: 한 번만 실행되어야 하는 작업)를 보장할 수 없습니다.
  • 실제 사용 예시
    예를 들어, 한 번만 실행되어야 하는 배치 작업에 jobId를 고정해 사용할 경우, 작업이 완료되어 삭제된 후 동일한 jobId로 다시 작업이 추가될 수 있습니다.
    이 경우, jobId의 중복 방지 기능이 영속적으로 보장되지 않습니다.

요약하면, BullMQ 기본 멱등성 전략은 jobId 중복 검사 기반이며 Redis EXISTS 명령을 사용해 중복을 방지합니다. 그러나 자동 삭제 후에는 멱등성이 무력화되므로, 중요한 비즈니스 작업에는 TTL 기반의 별도 멱등성 키 사용이나 DB 기록 관리 등 보완 전략이 필요합니다.


 


BullMQ Worker의 동작 원리를 Redis 기반

첫째, 작업 처리 흐름을 요약하면, 프로듀서가 Queue.add()를 통해 작업(job)을 생성하고 이를 Redis에 저장합니다.

import { Queue } from 'bullmq';

const queue = new Queue('my-queue', { connection: { host: '127.0.0.1', port: 6379 } });

async function addJob() {
  const job = await queue.add('my-job', { foo: 'bar' });
  console.log('Job added:', job.id);
}
addJob();

워커는 Redis에서 BRPOPLPUSH 또는 Lua 스크립트를 이용해 원자적으로 작업을 가져와 실행합니다.

import { Worker } from 'bullmq';

const worker = new Worker('my-queue', async job => {
  console.log('Processing job:', job.id, job.data);
  // 작업 처리 로직
  return 'done';
}, { connection: { host: '127.0.0.1', port: 6379 } });

// 실제 Worker 내부 동작 흐름
async function processNextJob() {
  // Redis에서 작업을 가져오는 Lua 스크립트 실행
  const jobId = await this.scripts.moveToActive(this.queueName, this.opts);
  if (jobId) {
    // 작업 데이터를 Redis에서 가져옴
    const jobData = await this.getJob(jobId);
    // 작업 실행
    await this.processJob(jobData);
  }
}

작업이 성공적으로 완료되면 상태가 "completed"로, 실패하면 "failed"로 전이되며, 작업 결과나 오류 정보는 옵션에 따라 Redis에 저장되거나 삭제됩니다.

// src/classes/worker.ts의 일부
protected async getNextJob(token: string): Promise<JobJson | void> {
  // moveToActive 스크립트 실행 (Redis에서 작업을 가져오는 Lua 스크립트)
  const jobId = await this.scripts.moveToActive(this.queueName, token, this.opts);
  if (jobId) {
    // 작업 데이터 가져오기
    const job = await Job.fromId(this as MinimalQueue, jobId);
    return job;
  }
}

// src/classes/worker.ts의 processJob 내부 (요약)
protected async processJob(job: Job): Promise<void> {
  try {
    const result = await this.processor(job);
    // 작업 완료 처리
    await job.moveToCompleted(result, this.opts.returnValue, false, token);
  } catch (err) {
    // 작업 실패 처리
    await job.moveToFailed(err as Error, token, false);
  }
}

둘째, 작업을 가져오는 내부 구현 구조를 살펴보면, 워커는 Redis의 waiting 리스트에서 작업을 꺼내 active 상태로 옮겨 실행합니다. 이 과정에서 job.updateProgress() 같은 메서드를 통해 진행 상황을 Redis 해시에 업데이트합니다.

// src/classes/job.ts (요약)
async updateProgress(progress: number | object): Promise<void> {
  await this.queue.scripts.progress(this.queueKeys, this.id, progress);
}

-- progress.lua (요약)
-- KEYS[1]: jobs 해시
-- ARGV[1]: jobId
-- ARGV[2]: progress 데이터

redis.call('HSET', KEYS[1], ARGV[1], ARGV[2])

작업 상태 전이는 Lua 스크립트를 통해 원자적으로 처리되어, 작업이 waiting에서 active로 이동하고 잠금(lock)이 설정되어 중복 실행이나 경쟁 상태(race condition)가 발생하지 않습니다.

-- moveToCompleted.lua (요약)
-- KEYS[1]: active 리스트
-- KEYS[2]: completed 리스트
-- KEYS[3]: jobs 해시
-- ARGV[1]: jobId

-- active에서 작업 제거
redis.call('LREM', KEYS[1], 0, ARGV[1])
-- completed에 작업 추가
redis.call('RPUSH', KEYS[2], ARGV[1])
-- jobs 해시에 결과 저장
redis.call('HSET', KEYS[3], ARGV[1], ...)

셋째, 작업의 실패 및 성공 처리에서는 성공 시 completed 이벤트가 발생하고, 작업 함수가 반환한 값은 job.returnvalue에 저장됩니다. removeOnComplete 옵션이 설정되면 작업 완료 후 자동으로 삭제됩니다. 실패 시에는 failed 이벤트가 발생하며, 실패 이유와 스택 추적 정보가 저장됩니다. attempts와 backoff 정책에 따라 재시도가 이루어지며, Redis 내의 retry 리스트와 delay 리스트가 재처리 타이밍 조절에 활용됩니다.

-- 실패한 작업을 retry 리스트에 추가
redis.call('ZADD', KEYS, retryTimestamp, jobId)
-- 지연(delay) 큐에 작업 추가
redis.call('ZADD', KEYS, delayTimestamp, jobId)

넷째, 워커의 동시성(concurrency)은 워커 생성 시 옵션으로 지정할 수 있어 하나의 워커가 여러 작업을 병렬 처리할 수 있습니다. 예를 들어 new Worker(queueName, processorFn, { concurrency: 5 })는 최대 5개의 작업을 동시에 처리합니다. 내부적으로는 비동기 작업 풀처럼 동작하며, Redis 작업 분배는 락 기반으로 관리되어 중복 처리 문제는 발생하지 않습니다.

set concurrency(concurrency: number) {
  if (
    typeof concurrency !== 'number' ||
    concurrency < 1 ||
    !isFinite(concurrency)
  ) {
    throw new Error('concurrency must be a finite number greater than 0');
  }
  this._concurrency = concurrency;
}

get concurrency() {
  return this._concurrency;
}

다섯째, 기본적으로 워커는 생성되면 즉시 작업 처리를 시작하지만, autorun: false 옵션을 설정하면 직접 worker.run()을 호출해야 작업 처리가 시작됩니다. 이 옵션은 NestJS의 DI 컨테이너 초기화 이후 워커 실행을 지연하거나 테스트 환경에서 실행 시점을 제어할 때, 또는 모니터링 도구와 연동할 때 유용합니다.

여섯째, 작업 함수가 반환하는 값은 자동으로 Redis에 기록되며, 워커에서 completed 이벤트를 청취해 job.returnvalue로 접근할 수 있습니다. 클라이언트 측에서는 작업을 추가하고 job.waitUntilFinished(queueEvents)를 호출해 결과를 받을 수 있습니다. 내부적으로 job:completed:<jobId> 채널을 통해 반환값이 발행되며, QueueEvents가 이를 구독하여 응답합니다.

worker.on('completed', (job, result) => {
  console.log('Job completed:', job.id, 'Result:', result);
  // job.returnvalue에도 결과가 저장됨
  console.log('job.returnvalue:', job.returnvalue);
});
import { Queue, QueueEvents } from 'bullmq';

const queue = new Queue('my-queue');
const queueEvents = new QueueEvents('my-queue');

async function addJobAndWait() {
  const job = await queue.add('my-job', { foo: 'bar' });
  const result = await job.waitUntilFinished(queueEvents);
  console.log('Job result:', result);
}
addJobAndWait();

마지막으로, 실무에서 고려할 사항과 팁을 정리하면 다음과 같습니다. 멀티 프로세스 환경에서 워커를 여러 프로세스로 분산해 CPU 코어를 효율 활용할 수 있으며,

const cluster = require('cluster');
const { Worker } = require('bullmq');
const numCPUs = require('os').cpus().length;

if (cluster.isPrimary) {
  console.log(`Master ${process.pid} is running`);

  // Fork workers equal to CPU cores
  for (let i = 0; i < numCPUs; i++) {
    cluster.fork();
  }

  cluster.on('exit', (worker, code, signal) => {
    console.log(`Worker ${worker.process.pid} died`);
    cluster.fork(); // Auto-restart failed workers
  });
} else {
  // Worker process initialization
  const worker = new Worker('my-queue', async job => {
    // Job processing logic
    console.log(`Worker ${process.pid} processing job ${job.id}`);
  }, { connection: { host: '127.0.0.1', port: 6379 } });
  console.log(`Worker ${process.pid} started`);
}

attempts와 backoff 옵션으로 실패 시 재시도 간격을 조절할 수 있습니다.

await queue.add('my-job', { data: 'payload' }, {
  attempts: 3,
  backoff: {
    type: 'exponential',
    delay: 1000,
  },
});

autorun: false를 사용하면 워커의 우아한 종료(graceful shutdown)를 구현하기 쉽고, QueueEvents를 통해 실시간 상태 추적이 가능합니다. 병렬성 조절은 concurrency 옵션으로 최적화할 수 있습니다.

const worker = new Worker('my-queue', async job => {
  // Job processing logic
}, { connection: { host: '127.0.0.1', port: 6379 }, autorun: false });

// 필요 시 worker.run()으로 작업 시작
worker.run();

// graceful shutdown 예시
process.on('SIGTERM', async () => {
  await worker.close();
});
const { QueueEvents } = require('bullmq');

const queueEvents = new QueueEvents('my-queue', { connection: { host: '127.0.0.1', port: 6379 } });

queueEvents.on('completed', ({ jobId }) => {
  console.log(`Job ${jobId} completed!`);
});

queueEvents.on('failed', ({ jobId, failedReason }) => {
  console.log(`Job ${jobId} failed: ${failedReason}`);
});
const worker = new Worker('my-queue', async job => {
  // Job processing logic
}, { connection: { host: '127.0.0.1', port: 6379 }, concurrency: 5 });

예를 들어 NestJS 환경에서 워커를 활용할 때는 다음과 같이 모듈 내에서 OnModuleInit 인터페이스를 구현해 워커를 생성 및 실행할 수 있습니다.

@Module({})
export class WorkerModule implements OnModuleInit {
  private readonly worker: Worker;

  onModuleInit() {
    this.worker = new Worker(
      'my-queue',
      async (job: Job) => {
        // Job processing logic
      },
      {
        autorun: false,
        concurrency: 5,
      },
    );
    this.worker.run();
  }
}

이처럼 BullMQ Worker는 Redis의 강력한 원자성 기능을 활용해 안정적이고 효율적인 분산 작업 처리를 지원합니다.


BullMQ의 작업 자동 삭제 기능

BullMQ는 작업이 완료되면 해당 작업 데이터를 Redis에 저장합니다. 작업 상태별로 저장 위치가 다르며, 처리 대기 중인 작업은 리스트 구조(waiting, active)에, 완료된 작업은 집합(Set)과 해시(Hash) 구조로 관리되는 completed 세트에, 실패한 작업은 failed 세트에 영구적으로 보관됩니다. 이러한 구조는 디버깅이나 상태 추적에 매우 유용하지만, 시간이 지남에 따라 Redis 메모리를 지속적으로 차지하는 문제가 발생할 수 있습니다.

이를 해결하기 위해 BullMQ는 작업 생성 시점이나 워커 레벨에서 removeOnComplete, removeOnFail 옵션을 제공하여 자동 삭제를 설정할 수 있습니다. 가장 단순한 형태는 Boolean 값 설정으로, true로 지정하면 작업이 완료되거나 실패할 때 즉시 삭제되고, 기본값인 false는 작업을 무기한 보존합니다.

또한, 정수 값(N)을 설정하여 최근 N개의 완료 또는 실패 작업만 유지하도록 할 수 있습니다. 이 경우 오래된 작업부터 FIFO 방식으로 자동 제거됩니다. 더 고급 설정으로는 함수 형태로 조건부 삭제 로직을 직접 정의할 수도 있어, 실패 이유나 작업 데이터에 따라 삭제 여부를 결정할 수 있습니다.

이러한 옵션은 개별 작업마다 Queue.add() 호출 시 다르게 지정할 수 있고, 워커 생성 시 전체 작업에 공통으로 적용할 수도 있습니다. 단, Queue.add()에서 설정한 옵션이 우선순위를 가집니다.

내부적으로 작업이 완료 또는 실패 상태로 전이된 직후에 Redis Lua 스크립트를 통해 원자적으로 작업 ID를 상태 집합에 추가하고, 삭제 조건에 부합하면 작업 해시 및 상태 세트에서 작업 ID를 제거하는 방식으로 동작합니다. 이 과정에서 삭제 설정이 있더라도 completed 이벤트는 먼저 발생합니다.

실무에서는 개발 환경에서는 작업 결과 확인이 중요하므로 삭제하지 않는 것을 권장하지만, 운영 환경에서는 Redis 메모리 압박을 막기 위해 반드시 자동 삭제 설정을 해야 합니다. 특히 실패 작업 추적이 필요한 경우 removeOnFail은 false 또는 조건부 함수로 관리하고, 삭제 전에 DB나 별도의 로그 시스템에 기록하는 훅(hook)을 두는 것이 좋습니다.

예를 들어 NestJS 환경에서는 다음과 같이 워커 생성 시 옵션을 지정할 수 있습니다.

new Worker('my-queue', async job => {
  // 작업 처리 로직
}, {
  removeOnComplete: 1000,
  removeOnFail: (job, reason, err) => {
    return err.name !== 'CriticalError'; // 조건부 삭제 예시
  },
});

자동 삭제를 활용하면 Queue.getCompleted()나 Queue.getFailed() 호출 시 성능이 향상되고, Redis의 스냅샷 크기가 줄어들어 persistence 처리 부담이 감소합니다. 또한, UI 도구인 bull-board가 쌓인 작업 때문에 느려지는 문제를 방지할 수 있습니다.

요약하자면, removeOnComplete와 removeOnFail 옵션은 다음과 같은 설정을 지원합니다.

  • true: 모든 완료 또는 실패 작업을 즉시 삭제
  • false: 기본값으로 삭제하지 않고 무기한 보존
  • 숫자: 최근 N개 작업만 보존
  • 함수: 커스텀 조건에 따른 삭제

이를 적절히 활용해 Redis 메모리 부담을 관리하고, 안정적인 작업 큐 운영 환경을 구축할 수 있습니다.


removeOnComplete와 removeOnFail 옵션

removeOnComplete와 removeOnFail 옵션에서 { count: 0 } 설정은 작업이 완료되거나 실패하자마자 즉시 관련된 모든 데이터를 Redis에서 삭제한다는 의미입니다. 이 경우 작업의 상세 정보(job:<id> 해시), 상태 집합(completed, failed 세트), 이벤트 스트림 등 작업과 관련된 모든 흔적이 Redis에 남지 않아 메모리와 디스크 사용량을 크게 절약할 수 있습니다.

다만, 이렇게 삭제하면 작업 결과나 에러 메시지 같은 정보를 조회할 수 없어 디버깅이나 테스트 환경에서는 권장되지 않습니다. 필요한 경우, 완료 및 실패 이벤트를 활용해 별도의 외부 로그 저장소에 기록하는 방식을 고려해야 합니다.

예를 들어, 다음과 같이 Worker를 생성할 때 설정할 수 있습니다.

import { Worker } from 'bullmq';

const worker = new Worker(
  'myQueue',
  async (job) => {
    // 작업 처리 로직
    return { status: 'done' };
  },
  {
    connection,
    removeOnComplete: { count: 0 },
    removeOnFail: { count: 0 },
  },
);

실무에서는 보통 완료 작업은 최근 1000개, 실패 작업은 5000개 정도를 유지하는 패턴을 추천합니다. 이렇게 하면 완료된 작업은 간단히 결과 확인용으로, 실패한 작업은 원인 분석 및 반복 문제 탐지용으로 충분히 활용할 수 있습니다.

removeOnComplete: { count: 1000 },
removeOnFail: { count: 5000 },

또한, removeOnXxx 설정 외에 수동으로 오래된 작업을 삭제할 수도 있습니다.

await queue.clean(0, 1000, 'completed'); // 오래된 완료 작업 삭제
await queue.clean(0, 1000, 'failed');    // 오래된 실패 작업 삭제

Redis 내에서 삭제 대상이 되는 주요 키는 다음과 같습니다.

  • job:<id> — 작업 상세 정보
  • bull:<queue>:completed — 완료 작업 집합
  • bull:<queue>:failed — 실패 작업 집합
  • bull:<queue>:events — 이벤트 스트림

요약하자면, removeOnComplete와 removeOnFail 설정은 다음과 같은 효과를 가집니다.

설정값효과
{ count: 0 } 작업 즉시 삭제, 모든 흔적 제거
{ count: N } 최근 N개만 보관, 오래된 순 FIFO 삭제
true 모든 작업 즉시 삭제 (무한 개 삭제와 유사)
false 무제한 보관 (기본값)

BullMQ의 removeOnComplete 및 removeOnFail 옵션에 age와 count 설정 적용

이 옵션은 내부적으로 KeepJobs 타입 객체를 받아 처리하며, 주요 역할은 작업을 최대 보관 기간과 최대 보관 개수 기준으로 자동 삭제하는 것입니다.

  • age: 작업을 최대 n초 동안 보관하고, 그 이후에는 삭제합니다.
  • count: 보관할 작업의 최대 개수를 의미하며, 이 수를 초과하는 오래된 작업부터 삭제됩니다.

두 조건 중 하나라도 초과하면 작업이 삭제되며, AND 조건이 아닌 OR 조건으로 작동합니다.

BullMQ는 완료(completed) 및 실패(failed) 상태의 작업을 Redis의 정렬 집합(ZSET)으로 관리하는데, 각 작업의 타임스탬프가 score로 사용됩니다.

  • age 기준 삭제는 ZREMRANGEBYSCORE 명령으로 타임스탬프를 기준으로 오래된 작업을 제거하고,
  • count 기준 삭제는 ZREMRANGEBYRANK 명령으로 가장 오래된 순서대로 작업을 제거합니다.

이를 통해 Redis 메모리와 키 스페이스(keyspace)를 효율적으로 관리할 수 있습니다.

실무 예시는 다음과 같습니다:

const myWorker = new Worker(
  'mailQueue',
  async job => {
    // 이메일 전송 로직
  },
  {
    connection,
    removeOnComplete: {
      age: 3600,     // 최대 1시간 동안 보관
      count: 5000,   // 최대 5000개 작업만 보관
    },
    removeOnFail: {
      age: 86400,    // 실패 작업은 최대 24시간(1일) 동안 보관
    },
  },
);

이 구성은 성능 안정성과 디버깅 가능성 간 균형을 이룹니다.

주의할 점은, age 기반 삭제는 BullMQ 내부에서 주기적으로 평가되어 즉시 삭제되는 것이 아니며, age와 count가 모두 설정되어도 둘 중 하나만 초과해도 삭제가 진행됩니다. 또한, count는 트래픽 폭증 시 Redis 사용량이 급격히 늘어나는 것을 방지하는 안전장치 역할을 합니다. 개발 및 테스트 환경에서는 removeOn를 사용하지 않거나 count만 설정해도 충분합니다.

타입 선언 예시는 공식 문서 기준 다음과 같습니다:

type KeepJobs = { count?: number; age?: number; };

Worker 생성 시 옵션으로는 아래와 같이 사용합니다.

removeOnComplete?: boolean | number | KeepJobs;
removeOnFail?: boolean | number | KeepJobs;

정리하면, 주요 전략별 특징은 다음과 같습니다:

전략설명사용 시점
count: 0 작업 즉시 삭제 로그 저장 불필요 시
count: N N개만 보관 작업량이 일정할 때
age: T T초 동안만 보관 시간 기반 정리 필요 시
age + count 복합 기준 트래픽 변동 대응 및 안정화 시
 

결론적으로, age와 count를 조합한 설정은 실시간 대규모 시스템에서 BullMQ 작업 관리를 자동화하고 안정적으로 운영하는 데 가장 권장되는 방식 중 하나입니다. Redis 메모리 사용량을 최적화하면서도 디버깅 유연성을 확보할 수 있기 때문입니다.

필요 시 QueueEvents를 활용한 작업 보관 전 외부 로그 저장 전략에 대해서도 추가 설명 가능합니다.


BullMQ 동시성(concurrency) 처리 두가지 전략

첫 번째는 하나의 Worker 인스턴스 내에서 concurrency 값을 높여 여러 작업을 동시에 처리하는 ‘로컬 동시성(Local concurrency)’ 방식이고, 두 번째는 여러 Node.js 프로세스에서 Worker를 각각 실행하는 ‘다중 Worker 인스턴스(Multiple Workers)’ 방식입니다.

로컬 동시성 방식은 설정이 간단하고 단일 인스턴스 내에서 작업을 제어할 수 있다는 장점이 있지만, 단일 장애점(SPOF)이 될 수 있고 Node.js의 단일 스레드 특성상 CPU 자원 소모에 제한이 있습니다. 반면, 다중 Worker 인스턴스 방식은 장애 허용성이 뛰어나고 멀티 머신 확장이 가능하지만 설정이 복잡하고 분산 처리 조율이 필요합니다.

예를 들어, 로컬 동시성은 다음과 같이 설정할 수 있습니다:

import { Worker } from 'bullmq';

const worker = new Worker(
  'emailQueue',
  async job => {
    // 이메일 전송 로직
  },
  {
    connection,
    concurrency: 50 // 동시에 50개 작업 처리
  }
);

Node.js의 이벤트 루프 기반 특성상, 이 방식은 CPU 집약적인 작업보다는 DB, Redis, API 호출 같은 I/O 바운드 작업에 적합하며, 하나의 프로세스 내에서 관리되므로 모니터링과 디버깅이 용이합니다.

다중 Worker 인스턴스는 여러 프로세스를 실행하여 작업을 분산 처리하는 방법으로, 예를 들어 PM2를 사용해 4개의 프로세스를 띄우거나, Docker/Kubernetes 환경에서 여러 복제본을 운영할 수 있습니다:

pm2 start worker.js -i 4

또는 Kubernetes 설정 예시:

replicas: 3

각 프로세스는 독립적인 워커이며, BullMQ는 Redis를 중앙 허브로 활용해 작업 분배를 안전하게 조율합니다.

실무에서는 ‘여러 프로세스 + 각 워커당 고정 concurrency 수’ 조합을 많이 사용합니다. 예를 들어, 3대 서버에서 각각 concurrency 20인 워커 프로세스를 실행하면 총 60개의 작업을 병렬 처리할 수 있습니다.

BullMQ 내부 동작은 Redis의 BLPOP 또는 ZPOPMIN 명령어로 작업을 가져오고, concurrency 수만큼 작업 슬롯을 미리 확보해 병렬 실행합니다. 작업 완료 후 상태를 completed 또는 failed로 변경하며, removeOnComplete 등의 정책에 따라 작업을 삭제합니다.

실무에서는 다음 사항에 주의해야 합니다:

  • CPU 바운드 작업 비중이 높으면 Worker Thread나 샌드박스 프로세서 병행 사용 고려
  • 동일 리소스를 사용하는 작업 간에는 job.key, lockDuration, limiter 설정으로 충돌 방지
  • 작업 지연 시 timeout과 stalledInterval 설정으로 처리 실패 방지
  • 트래픽 급증 시 동시성 조절이나 워커 수 확장, 자동 스케일링 도입 필요

정리하자면, concurrency 값을 1보다 크게 설정하는 로컬 동시성은 단일 서버/프로세스 환경에 적합하고, 다중 Worker 인스턴스는 고가용성과 확장이 필요한 분산 환경에 적합합니다. 운영 환경에서는 이 둘을 결합한 하이브리드 방식이 가장 일반적이며 추천되는 구성입니다.


 

BullMQ를 이해하기 위해서는 Node.js의 이벤트 루프 구조와 CPU 점유, 그리고 Redis 기반의 작업 잠금(lock) 메커니즘을 함께 살펴보는 것이 중요합니다.

먼저, Node.js는 기본적으로 단일 스레드 이벤트 루프 모델로 동작합니다. 모든 작업은 비동기 콜백이나 Promise 기반으로 처리되며, 작업이 완료될 때까지 이벤트 루프가 계속 순회하면서 작업을 관리합니다. 이때 CPU 집약적인 작업(CPU-bound)은 암호화, 압축, 대용량 JSON 파싱처럼 CPU를 계속 점유하는 작업을 말하며, I/O 바운드 작업은 DB나 네트워크, 파일 시스템 등 외부 시스템 응답을 기다리는 시간이 대부분입니다. CPU 집약 작업이 process() 함수 내에서 오래 지속되면 이벤트 루프가 차단되어 BullMQ가 기대하는 주기적인 작업 잠금 갱신(lock renewal)을 하지 못하게 됩니다. 이로 인해 BullMQ는 워커가 비정상적으로 종료되거나 멈췄다고 판단해 작업을 stalled 상태로 간주합니다.

Redis 관점에서 BullMQ는 작업 잠금과 상태 추적을 위해 Redis를 활용합니다. 작업이 워커에 할당되면 bull:<queue>:<jobId>:lock 키에 대해 TTL 기반의 잠금을 설정하며, 기본 잠금 시간(lockDuration)은 30초입니다. 워커는 일정 간격(lockRenewTime)마다 Redis에 "아직 작업을 처리 중"임을 알리는 신호(heartbeat)를 보내 잠금을 연장합니다. 만약 이벤트 루프가 바빠서 이 신호를 보내지 못하면 잠금이 만료되고, 다른 워커가 동일 작업을 처리할 수 있게 되어 작업이 중단(stalled)된 것으로 인식됩니다.

BullMQ 오픈소스 코드를 보면, worker.ts 내 extendLocks(job) 함수가 잠금 갱신을 담당하며, 내부적으로 Redis의 PEXPIRE 명령어로 TTL을 갱신합니다. 이 함수는 runStalledCheck()라는 주기 실행 함수 내에서 반복 호출됩니다. runStalledCheck() 함수는 일정 간격(기본 30초)으로 실행되어 잠금이 갱신되지 않은 작업을 stalled 세트에 추가합니다. 이후 moveStalledJobsToWait() 함수가 호출되어 Redis에서 stalled 상태의 작업을 찾아 다시 waiting 상태로 이동시키며, 재시도 횟수(maxStalledCount)를 초과한 작업은 failed 상태로 전환합니다.

따라서 실무에서는 다음과 같은 전략을 권장합니다. 작업 처리 로직은 반드시 비동기로 구현하고, 동기적 반복문 사용을 최소화하며, CPU 집약 작업은 가능한 피하는 것이 중요합니다. 작업 도중 이벤트 루프가 정기적으로 돌아갈 수 있도록 중간마다 await sleep(0) 또는 Promise.resolve().then() 같은 방식을 사용해 제어권을 반환해야 합니다. CPU 부하가 큰 작업은 Node.js의 worker_threads, child_process 또는 BullMQ가 지원하는 sandboxed processor로 분리하는 것을 고려해야 합니다. 또한 lockDuration, stalledInterval, removeOnFail, maxStalledCount, timeout 등 옵션들을 작업 특성에 맞게 조절해야 하며, Prometheus 같은 모니터링 도구와 연동해 stalled 작업 수와 실패 작업 수를 실시간으로 관측하고 알람을 설정하는 것도 효과적입니다.

정리하면, 컴퓨터 과학적 관점에서는 Node.js의 이벤트 루프와 CPU 점유 문제가 BullMQ 안정성에 직접적인 영향을 미치고, Redis는 TTL 기반 잠금 및 연장으로 작업 상태를 관리합니다. BullMQ 내부 코어는 lock, stalled, heartbeat 메커니즘을 통해 작업 처리의 신뢰성과 내구성을 확보합니다. 따라서 실무에서는 CPU 집약 작업 분리, 이벤트 루프 제어권 반환, 적절한 옵션 튜닝으로 stalled 상태를 최소화하는 전략을 반드시 적용해야 합니다.


Sandboxed Processors

샌드박스 프로세서는 Node.js의 단일 이벤트 루프 구조에서 발생하는 CPU 집약 작업으로 인한 문제를 해결하기 위한 설계입니다. Node.js는 기본적으로 싱글 스레드 이벤트 루프를 사용하기 때문에, CPU를 많이 사용하는 작업이 이벤트 루프에서 실행되면 다른 작업 처리와 I/O 콜백 실행이 지연되어 애플리케이션 전체 응답성이 떨어집니다. 샌드박스 프로세서는 작업 처리 로직을 별도의 Node.js 프로세스나 워커 스레드로 분리하여, CPU 집약 작업이 메인 이벤트 루프를 차단하지 않고 별도의 프로세스에서 실행되도록 합니다. 이 덕분에 메인 프로세스는 계속해서 잠금 갱신(lock renewal)이나 상태 업데이트 같은 BullMQ 내부 작업을 정상적으로 수행할 수 있으며, 중단된 작업(stalled jobs)의 발생 확률을 크게 줄일 수 있습니다.

Redis와 BullMQ 관점에서 보면, BullMQ는 작업 실행 시 해당 작업에 대해 Redis에 락을 설정하고 주기적으로 이 락의 TTL을 갱신합니다.

더보기

BullMQ의 락(lock) 동작 원리

  • 락의 설정
    • 워커가 Redis에서 작업을 가져올 때(QNEXT 또는 내부 Lua 스크립트), 해당 작업에 대해 고유한 락 토큰(token)과 TTL(lockTTL)을 지정해 락을 설정합니다315.
    • 이 락은 Redis 해시에 저장되며, 락 토큰은 워커가 작업을 소유하고 있음을 증명합니다.
  • 락의 목적
    • 중복 실행 방지: 한 번에 하나의 워커만 작업을 처리할 수 있도록 보장합니다.
    • 장애 감지: 워커가 비정상 종료되거나 응답하지 않을 경우, 락의 TTL이 만료되면 BullMQ가 해당 작업을 다시 대기열로 돌려보내거나 실패 처리합니다315.
  • 락의 갱신
    • 워커는 작업을 처리하는 동안 주기적으로 락의 TTL을 갱신(lock renewal)합니다.
    • 이 과정은 lockRenewTime 옵션에 따라 보통 락의 만료 시간(lockDuration)의 절반마다 실행됩니다54.
    • 갱신은 QRELOCK 명령이나 내부 Lua 스크립트를 통해 이루어집니다3.
  • 락의 해제
    • 작업이 성공적으로 완료되거나 실패하면, 워커는 QDONE 명령을 통해 락을 해제합니다3.
    • 락 토큰이 일치해야만 락을 해제할 수 있으므로, 권한이 없는 워커가 작업을 완료/실패 처리할 수 없습니다.

BullMQ의 락 관련 Redis 명령어

  • QNEXT: 작업을 가져올 때 락을 설정합니다.
  • QRELOCK: 락의 TTL을 갱신합니다.
  • QDONE: 작업 완료/실패 시 락을 해제합니다.

장애 상황과 락

  • 락 만료(Stalled)
    • 워커가 락을 갱신하지 못하고 TTL이 만료되면, BullMQ는 해당 작업을 stalled(정지) 상태로 간주하고, 다시 대기열로 돌려보내거나 실패 처리합니다53.
    • 이 과정은 QueueScheduler 또는 Worker의 stalledChecker가 주기적으로 확인합니다4.
  • 중복 처리 방지
    • 한 번에 하나의 워커만 작업을 처리할 수 있도록 보장하지만, 락이 만료되면 작업이 중복 처리될 수 있습니다(이 경우는 "at least once" 전략)5.
    • maxStalledCount 옵션으로 최대 중복 처리 횟수를 제한할 수 있습니다

CPU 집약 작업이 메인 이벤트 루프를 차단하면, 락 갱신 요청이 제때 Redis에 도달하지 못해 락이 만료되고 작업이 stalled 상태로 간주됩니다. 샌드박스 프로세서 구조를 도입하면, 락 갱신과 내부 관리 로직이 메인 워커 프로세스에서 원활히 실행되므로 락 만료 및 stalled 전환이 줄어들어 안정성이 향상됩니다.

BullMQ 오픈소스에서는 Worker를 생성할 때 작업 프로세서를 별도의 파일로 분리해 지정할 수 있습니다. 예를 들어 다음과 같이 설정합니다.

const worker = new Worker(
  queueName,
  './processor.js',
  {
    connection,
    concurrency: 10,
    useWorkerThreads: true, // Worker Threads 옵션 활성화
  },
);

./processor.js 파일은 작업 처리 함수가 포함된 별도의 프로세스로 실행됩니다. BullMQ 내부에서는 이 프로세스를 관리하면서 작업 할당, 락 관리, 결과 수집을 메인 워커 프로세스가 담당합니다. 주요 구현 흐름은 Worker.ts에서 프로세서가 파일 경로나 함수 형태로 주어지면 내부적으로 sandboxedProcess를 생성하는 방식입니다. sandboxedProcess는 Node.js의 child_process.spawn 또는 worker_threads API를 사용해 작업자 프로세스를 띄우고, IPC 통신으로 작업 데이터와 결과를 주고받습니다.

실무에서는 CPU 집약 작업이 많거나 작업 시간이 긴 경우 샌드박스 프로세서 사용을 강력히 권장합니다. 워커가 과부하되지 않도록 적절한 concurrency 설정과 함께 stalledInterval, lockDuration 등의 옵션을 조율해야 합니다. 또한, 대규모 시스템에서는 여러 서버 및 프로세스에 워커를 분산 배치해 안정성과 확장성을 확보하는 것이 바람직합니다.


 

BullMQ의 대기열 일시 중지 기능

작업 처리 흐름을 효과적으로 제어하고 리소스 관리를 최적화하는 중요한 수단입니다. 작업량이 급증하거나 시스템 부하가 심할 때, 혹은 긴급 유지보수나 배포 상황에서 작업 처리를 잠시 멈추고자 할 때 유용하게 활용됩니다. 대기열 전역 일시 중지는 큐 전체에 영향을 미쳐 새로운 작업이 워커에 할당되지 않도록 막아, 리소스 사용량이 불필요하게 늘어나는 것을 방지합니다. 반면, 로컬 일시 중지는 특정 워커 인스턴스에만 적용되므로, 개별 워커 부하를 조절하거나 점진적 배포, 테스트 시나리오에 적합합니다.

Redis와 BullMQ 내부 동작 방식을 보면, BullMQ의 대기열과 워커는 Redis 내의 대기열 리스트 및 여러 관련 키를 통해 작업을 관리합니다. 일시 중지 시에는 Redis에 일시 중지 상태를 표시하는 플래그가 설정되어 워커가 새로운 작업을 가져가지 못하게 합니다. queue.pause() 호출은 Redis에 이 플래그를 설정하여 모든 워커가 새 작업 수신을 중단하도록 만들고, worker.pause()는 워커 내부 상태를 변경해 해당 워커의 작업 수신 루프를 멈춥니다. 워커의 pause() 메서드는 진행 중인 작업이 완료될 때까지 기다렸다가 중단하는 반면, pause(true) 호출 시 즉시 작업 수신을 멈추고 진행 중 작업 결과를 무시하거나 롤백하는 특수 상황에 사용됩니다. 이 과정에서 상태 변화를 외부에서 감지할 수 있도록 paused, resumed 이벤트가 발생합니다.

구체적인 코드 예시는 다음과 같습니다:

// 대기열 전역 일시 중지
await queue.pause(); // 모든 워커가 새 작업 수신 중단

// 워커 단위 일시 중지 - 현재 작업 완료 후 중단
await worker.pause();

// 워커 단위 즉시 일시 중지 - 진행 중 작업 무시
await worker.pause(true);

실무에서는 배포나 유지보수 시 대기열 전체를 일시 중지하고, 작업 완료 여부를 안정적으로 확인한 후 작업을 재개하는 절차가 권장됩니다. 특정 워커에 부하가 집중될 경우에는 해당 워커만 일시 중지하여, 나머지 워커들이 작업을 계속 처리하도록 조절할 수 있습니다. 또한, 대기열과 워커의 일시 중지 상태를 지속적으로 모니터링하며 장애 대응 상황에서 작업 처리 흐름을 유연하게 관리하는 것이 중요합니다.


BullMQ의 자동 작업 삭제(Auto Removal) 기능

먼저 컴퓨터 과학적 관점에서 이 기능의 목적은 Redis 내에 작업 데이터가 무한히 쌓이는 것을 방지해 메모리 누수와 자원 고갈 문제를 예방하는 데 있습니다. 작업 데이터는 생성부터 처리, 완료 혹은 실패, 보관, 그리고 최종 삭제까지의 생명 주기(lifecycle)를 갖는데, 이 주기를 자동화하는 것은 시스템 안정성과 성능 유지에 필수적입니다. 특히 실패한 작업은 장애 원인 분석에 중요하므로 더 오래 보존하는 반면, 완료된 작업은 불필요한 데이터를 줄여 효율성을 높이는 방향으로 보존 정책이 최적화됩니다. 이를 위해 작업 보존 정책을 개수 제한(count)과 시간 제한(age)을 조합하여 세밀하게 관리하는데, 이는 대규모 서비스에서 자원 관리 전략의 핵심 요소입니다.

Redis 내부에서는 BullMQ가 Sorted Set(zset), List, Hash 등 다양한 자료구조를 활용해 작업 상태를 관리합니다. 완료(completed)와 실패(failed) 작업은 각각 별도의 네임스페이스 키에 저장되며, 삭제 시에는 Redis 명령어를 사용해 해당 작업 ID를 상태 집합에서 제거하고, 연관된 메타데이터도 함께 삭제합니다. removeOnComplete나 removeOnFail 옵션에 따라 작업 완료 시점이나 주기적인 클린업 과정에서 삭제가 이루어집니다. 내부적으로는 주기적 클린업 작업(cleanup job)이 설정된 age(시간)나 count(개수) 기준을 초과하는 작업을 찾아 안전하게 삭제합니다. 이를 위해 Redis Lua 스크립트를 활용해 다수 작업 삭제를 원자적으로 수행함으로써 데이터 무결성을 보장합니다.

BullMQ 오픈소스 내부 구현을 보면, Queue 클래스는 작업 추가 시 옵션으로 removeOnComplete, removeOnFail을 받아 저장하며, Worker는 작업 완료 또는 실패 시 이 옵션을 확인해 자동 삭제 명령을 실행하거나, 숫자 혹은 객체 형태의 조건일 경우 주기적 클린업에서 삭제 작업을 수행합니다. 삭제 과정은 Redis 내 관련 키(예: completed 큐, 작업 해시, 이벤트 리스트 등)에서 작업 데이터를 Lua 스크립트로 안전하게 원자적으로 제거합니다. 주요 코드 위치로는 Queue 클래스의 add() 메서드, Worker 클래스 내 작업 완료 핸들러, 그리고 QueueScheduler나 QueueCleaner 컴포넌트가 주기적으로 오래된 작업을 삭제하는 부분이 있습니다.

정리하자면, BullMQ의 자동 작업 삭제 기능은 Redis 메모리 사용 최적화와 시스템 안정성 향상, 장애 분석용 데이터 보존 간 균형 잡힌 리소스 관리 전략입니다. BullMQ는 Redis의 특성을 최대한 활용해 이를 안전하고 효율적으로 구현했으며, 운영 환경에서는 작업량 급증이나 장애 상황에서 Redis 용량 과부하를 방지하기 위해 적절한 removeOnComplete 및 removeOnFail 설정이 필수적입니다.


BullMQ Workers에 대한 이해는 컴퓨터 과학적 관점, Redis 내부 동작 원리, 그리고 오픈소스 구현 관점

  1. 컴퓨터 과학(CS) 관점
    작업자(Worker)는 큐 시스템에서 비동기 작업을 실제로 처리하는 실행자 역할을 합니다. 큐에 쌓인 작업(Job)을 가져와 지정된 로직을 수행하며, Node.js 환경에서는 이벤트 루프가 차단되지 않도록 async/await 또는 Promise 기반으로 비동기 처리합니다. 작업 진행 상황을 주기적으로 업데이트할 수 있어 작업 모니터링과 장애 대응에 유리하며, 작업 성공 시 결과를 반환하고 실패 시 예외 처리를 통해 상태를 체계적으로 관리합니다. Worker 인스턴스는 생성 시 프로세스 또는 스레드 내에서 실행할 작업(process 함수)을 바인딩하며, 필요에 따라 수동으로 실행(run)할 수도 있습니다.
  2. Redis 내부 동작 원리
    BullMQ Worker는 Redis에 저장된 대기 작업 리스트에서 작업을 원자적으로 가져오기 위해 리스트 자료구조와 BLPOP 같은 blocking 명령, 혹은 Lua 스크립트를 사용합니다. 작업 상태는 실행 중(active), 완료(completed), 실패(failed) 상태로 Redis 내 집합 또는 Sorted Set에 반영됩니다. 작업의 진행률(progress)은 Redis의 해시(Hash) 자료구조에 JSON 형태로 저장되며, 작업 결과(returnvalue)는 저장 후 Redis Pub/Sub 채널을 통해 completed, failed 이벤트가 발행되어 실시간 상태를 모니터링할 수 있습니다. 또한 중복 작업 실행을 방지하기 위해 Redis 기반 분산 락 메커니즘(setnx 등)을 내부적으로 활용합니다.
  3. BullMQ 오픈소스 내부 구현
    Worker 클래스는 큐 이름, 프로세서 함수(processor), 그리고 옵션을 받아 생성되며, 내부에서 Redis 클라이언트와 작업 상태 관리용 이벤트 리스너를 등록합니다. 작업을 큐에서 가져와 processor 함수에 Job 객체로 전달하고, Job 클래스는 작업 데이터(payload), 상태, 진행률(progress), 반환값(returnvalue), 메타 정보를 캡슐화합니다. process 함수는 비동기 함수로, 예외 발생 시 Worker가 이를 감지해 실패로 처리합니다. job.updateProgress() 호출 시 내부적으로 Redis에 진행률이 저장되고, 변경 사항은 이벤트로 브로드캐스트됩니다. 기본적으로 Worker는 생성 즉시 작업 수신을 시작하지만, autorun: false 옵션 시 명시적으로 worker.run() 호출이 있어야 시작합니다. Worker는 completed, failed, progress 등 여러 이벤트를 발행하며, 개발자는 이벤트 리스너를 통해 실시간 작업 상태를 추적하고 후처리할 수 있습니다. 주요 구현은 Worker.ts 파일 내에 작업 수신, 상태 변경, 이벤트 발행, 오류 처리 로직이, Job.ts에는 작업 관련 메서드와 진행률 및 결과 반환 처리가 구현되어 있습니다.

요약

관점핵심 내용
CS 관점 Worker는 비동기 작업 처리 실행자이며, 작업 성공/실패 관리 및 진행률 업데이트 지원
Redis 동작 작업 상태 전환, 작업 데이터 및 메타 정보 저장, Pub/Sub 이벤트 발행, 분산 락 사용
BullMQ 코드 Worker 클래스 내 작업 수신, process 함수 실행, 상태 변경 및 이벤트 발행으로 구현
 

이와 같이 BullMQ Worker는 안정적이고 효율적인 분산 작업 처리를 위해 Node.js 비동기 특성과 Redis 데이터 구조, 그리고 견고한 오픈소스 구현 방식을 유기적으로 결합한 구조를 갖추고 있습니다.


BullMQ의 동시성(Concurrency) 기능은 컴퓨터 과학적 관점, Redis 내부 동작, 그리고 오픈소스 구현 관점

  1. 컴퓨터 과학(CS) 관점
    동시성은 여러 작업을 논리적으로 겹쳐 처리하는 것이며, 병렬성은 여러 작업을 물리적으로 동시에 실행하는 것을 의미합니다. Node.js는 싱글 스레드 이벤트 루프 기반이라 동시성 처리로 여러 작업을 겹쳐 수행할 수 있지만, 멀티코어 CPU의 병렬 처리를 위해서는 여러 프로세스나 워커를 활용해야 합니다.
    Worker 내부에서는 concurrency 옵션으로 한 프로세스 내에서 동시에 처리할 작업 개수를 지정합니다. 예를 들어, concurrency: 50이면 최대 50개의 작업을 논리적으로 병렬 처리할 수 있습니다. 비동기 I/O 및 Promise 기반으로 블로킹 없이 처리하며, CPU 집약 작업은 워커 스레드나 별도 프로세스 분리가 필요합니다.
    또한 여러 Node.js 프로세스에서 각각 워커 인스턴스를 실행해 물리적 병렬성을 확보할 수 있으며, 이를 통해 시스템 자원 활용 극대화와 장애 허용성을 높일 수 있습니다. BullMQ는 Redis 기반으로 최소 한 번 실행(at-least-once)을 보장하지만, 작업 순서 보장이 필요하다면 별도의 논리(우선순위, 지연 처리 등)를 구현해야 합니다.
    마지막으로 작업 분배와 로드밸런싱에서는 Redis의 원자적 작업 할당이 중요하며, 워커 과부하 방지와 작업 지연 최소화가 목표가 됩니다.
  2. Redis 내부 동작 관점
    작업은 Redis의 리스트 또는 Sorted Set에 저장되며, 워커는 BLPOP 같은 blocking 리스트 명령어 혹은 Lua 스크립트를 통해 원자적으로 작업을 가져옵니다. 다수 워커가 동시 작업 요청 시 Redis가 작업을 하나씩 원자적으로 할당하여 중복 처리를 방지합니다. Redis는 싱글 스레드지만 매우 빠른 처리 속도로 수십, 수백 워커의 요청도 지연 없이 소화합니다.
    작업 상태는 waiting → active → completed 또는 failed 로 전환되며, 이 과정에서 상태 키와 메타데이터가 빈번히 갱신됩니다. 작업 중복 방지를 위해 SETNX 기반 락을 걸어 동일 작업을 여러 워커가 중복 실행하지 않도록 합니다. 작업 상태 변경 이벤트는 Redis Pub/Sub 채널을 통해 실시간으로 브로드캐스트되어 워커와 모니터링 도구가 구독합니다.
  3. BullMQ 오픈소스 구현 관점
    Worker 클래스는 생성자에서 큐 이름, 프로세서 함수, 옵션을 받아 내부적으로 동시 처리 큐(pool)를 만듭니다. concurrency 옵션(기본값 1)을 지정하면, 그 수만큼 비동기 작업을 동시에 실행하도록 작업 풀을 관리합니다. Worker는 작업을 가져와 processor 함수에 넘겨 처리하고, 작업 완료 시 Promise가 resolve되면 다음 작업을 가져오는 식입니다.
    BullMQ는 멀티 프로세스 워커 실행을 직접 관리하지는 않지만, 여러 Node.js 프로세스에서 Worker를 띄워 사용하는 방식을 공식 권장합니다. 각 워커 인스턴스는 Redis와 독립적으로 통신하며 작업 분배, 상태 변경, 이벤트 수신을 수행합니다.
    현재 Worker 생성 시 concurrency 값 변경 API는 없으나, 커스텀 구현으로 내부 작업 큐 크기를 조절하는 방식도 가능합니다.

주요 구현 위치는 src/classes/worker.ts에서 concurrency 처리 및 작업 실행 로직, src/classes/processor.ts에서 비동기 작업 처리 관리, src/classes/queueEvents.ts에서 Redis와 작업 상태, 이벤트 관리를 담당합니다.


정리

구분내용
CS 관점 동시성은 논리적 겹침, 병렬성은 물리적 동시 실행; 멀티 프로세스 워커로 병렬 처리; 최소 1회 실행 보장 필요
Redis 관점 원자적 작업 할당 및 상태 전환, 락 기반 중복 방지, Pub/Sub 이벤트로 실시간 동기화
BullMQ 구현 Worker concurrency 옵션으로 내부 작업 풀 관리, 멀티 프로세스 Worker 병렬 처리 권장
 

이처럼 BullMQ는 Node.js 특성과 Redis 구조, 견고한 오픈소스 구현을 결합해 안정적이고 확장 가능한 동시 작업 처리를 지원합니다.

'Project > 기록' 카테고리의 다른 글

Log 영속성을 위한 MQ 도입  (1) 2025.06.10
MySQL에서의 Lock 경합 문제 해결과 성능 개선  (1) 2025.06.09
멱등성(Idempotency)이란?  (0) 2025.06.02
비동기 아키텍처 전환  (0) 2025.05.28
내 API는 실패할 수 있다  (0) 2025.05.26