Skip to main content

Queue Workers

BullMQ job processing for the CRM platform.

Worker Architecture

┌─────────────────────────────────────────────────────────────────┐
│ REDIS + BULLMQ │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ │
│ │ crm-events │ │ crm-scoring │ │ crm-campaigns │ │
│ │ (event sync) │ │ (engagement) │ │ (execution) │ │
│ └───────────────┘ └───────────────┘ └───────────────┘ │
│ │
│ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ │
│ │ crm-segments │ │ crm-journeys │ │ crm-social │ │
│ │ (refresh) │ │ (automation) │ │ (publishing) │ │
│ └───────────────┘ └───────────────┘ └───────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘


┌───────────────────────────────┐
│ CRM WORKER PODS │
│ (crm-worker deployment) │
└───────────────────────────────┘

Queues

QueuePurposeConcurrency
crm-eventsEvent sync from source services10
crm-scoringEngagement/churn scoring5
crm-segmentsSegment refresh2
crm-campaignsCampaign execution5
crm-journeysJourney step execution10
crm-socialSocial publishing3

Job Configuration

Default Settings

const defaultJobOptions: JobsOptions = {
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000,
},
removeOnComplete: {
count: 1000, // Keep last 1000 completed
age: 24 * 60 * 60, // Or 24 hours
},
removeOnFail: false, // Keep failed for debugging
};

Queue-Specific Settings

// Events - fast processing, high concurrency
const eventQueueOptions = {
...defaultJobOptions,
attempts: 3,
backoff: { type: 'exponential', delay: 500 },
};

// Scoring - background, lower priority
const scoringQueueOptions = {
...defaultJobOptions,
priority: 10, // Lower priority
delay: 5000, // Wait 5s before processing
};

// Social - rate limited
const socialQueueOptions = {
...defaultJobOptions,
rateLimiter: {
max: 30, // Max 30 jobs
duration: 60000, // Per minute
},
};

Worker Implementation

@Processor('crm-events')
export class CrmEventProcessor {
constructor(
private readonly identityService: IdentityResolutionService,
private readonly profileService: CustomerProfileService,
private readonly activityService: ActivityService,
) {}

@Process()
async processEvent(job: Job<CrmEvent>): Promise<void> {
const event = job.data;

// 1. Resolve identity
const profile = await this.identityService.resolveOrCreate(event);

// 2. Update profile
await this.profileService.applyEvent(profile.id, event);

// 3. Create activity
await this.activityService.createFromEvent(profile.id, event);

// 4. Queue async work
await this.queueAsyncWork(profile.id, event);
}

private async queueAsyncWork(profileId: string, event: CrmEvent) {
// Rescore engagement
await this.scoringQueue.add('rescore', { profileId });

// Check segment membership
await this.segmentQueue.add('check-membership', { profileId });

// Check journey triggers
await this.journeyQueue.add('check-triggers', { profileId, event });
}
}

Error Handling

Retry Logic

@Process()
async processJob(job: Job): Promise<void> {
try {
await this.doWork(job.data);
} catch (error) {
if (isTransientError(error)) {
throw error; // Will retry
}

// Permanent failure - log and don't retry
this.logger.error('Job failed permanently', {
jobId: job.id,
error: error.message,
});

// Move to DLQ
await this.dlq.add('failed', {
originalJob: job.data,
error: error.message,
});
}
}

Dead Letter Queue

Failed jobs after max retries go to DLQ:

// Process DLQ manually
const dlqJobs = await dlqQueue.getJobs(['waiting', 'delayed']);
for (const job of dlqJobs) {
// Investigate and fix
console.log(job.data);
}

Monitoring

Queue Metrics

// Get queue stats
const stats = await queue.getJobCounts();
// { waiting: 10, active: 5, completed: 100, failed: 2 }

Prometheus Metrics

# Active jobs
bullmq_jobs_active{queue="crm-events"}

# Waiting jobs
bullmq_jobs_waiting{queue="crm-events"}

# Failed jobs
bullmq_jobs_failed{queue="crm-events"}

# Processing duration
histogram_quantile(0.95, bullmq_job_duration_seconds_bucket)

Scaling

Horizontal Scaling

# Scale worker pods
kubectl scale deployment/crm-worker --replicas=4 -n crm

Concurrency Tuning

// Adjust per-queue concurrency
const worker = new Worker('crm-events', processor, {
concurrency: process.env.EVENT_CONCURRENCY || 10,
});

Common Operations

Pause Queue

await queue.pause();

Resume Queue

await queue.resume();

Drain Queue

// Remove all jobs
await queue.obliterate({ force: true });

Retry Failed Jobs

const failed = await queue.getFailed();
for (const job of failed) {
await job.retry();
}