Queue Workers
BullMQ job processing for the CRM platform.
Worker Architecture
┌─────────────────────────────────────────────────────────────────┐
│ REDIS + BULLMQ │
├─────────────────────────────────────────────────────────────────┤
│ │
│ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ │
│ │ crm-events │ │ crm-scoring │ │ crm-campaigns │ │
│ │ (event sync) │ │ (engagement) │ │ (execution) │ │
│ └───────────────┘ └───────────────┘ └───────────────┘ │
│ │
│ ┌───────────────┐ ┌───────────────┐ ┌───────────────┐ │
│ │ crm-segments │ │ crm-journeys │ │ crm-social │ │
│ │ (refresh) │ │ (automation) │ │ (publishing) │ │
│ └───────────────┘ └───────────────┘ └───────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌───────────────────────────────┐
│ CRM WORKER PODS │
│ (crm-worker deployment) │
└───────────────────────────────┘
Queues
| Queue | Purpose | Concurrency |
|---|---|---|
crm-events | Event sync from source services | 10 |
crm-scoring | Engagement/churn scoring | 5 |
crm-segments | Segment refresh | 2 |
crm-campaigns | Campaign execution | 5 |
crm-journeys | Journey step execution | 10 |
crm-social | Social publishing | 3 |
Job Configuration
Default Settings
const defaultJobOptions: JobsOptions = {
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000,
},
removeOnComplete: {
count: 1000, // Keep last 1000 completed
age: 24 * 60 * 60, // Or 24 hours
},
removeOnFail: false, // Keep failed for debugging
};
Queue-Specific Settings
// Events - fast processing, high concurrency
const eventQueueOptions = {
...defaultJobOptions,
attempts: 3,
backoff: { type: 'exponential', delay: 500 },
};
// Scoring - background, lower priority
const scoringQueueOptions = {
...defaultJobOptions,
priority: 10, // Lower priority
delay: 5000, // Wait 5s before processing
};
// Social - rate limited
const socialQueueOptions = {
...defaultJobOptions,
rateLimiter: {
max: 30, // Max 30 jobs
duration: 60000, // Per minute
},
};
Worker Implementation
@Processor('crm-events')
export class CrmEventProcessor {
constructor(
private readonly identityService: IdentityResolutionService,
private readonly profileService: CustomerProfileService,
private readonly activityService: ActivityService,
) {}
@Process()
async processEvent(job: Job<CrmEvent>): Promise<void> {
const event = job.data;
// 1. Resolve identity
const profile = await this.identityService.resolveOrCreate(event);
// 2. Update profile
await this.profileService.applyEvent(profile.id, event);
// 3. Create activity
await this.activityService.createFromEvent(profile.id, event);
// 4. Queue async work
await this.queueAsyncWork(profile.id, event);
}
private async queueAsyncWork(profileId: string, event: CrmEvent) {
// Rescore engagement
await this.scoringQueue.add('rescore', { profileId });
// Check segment membership
await this.segmentQueue.add('check-membership', { profileId });
// Check journey triggers
await this.journeyQueue.add('check-triggers', { profileId, event });
}
}
Error Handling
Retry Logic
@Process()
async processJob(job: Job): Promise<void> {
try {
await this.doWork(job.data);
} catch (error) {
if (isTransientError(error)) {
throw error; // Will retry
}
// Permanent failure - log and don't retry
this.logger.error('Job failed permanently', {
jobId: job.id,
error: error.message,
});
// Move to DLQ
await this.dlq.add('failed', {
originalJob: job.data,
error: error.message,
});
}
}
Dead Letter Queue
Failed jobs after max retries go to DLQ:
// Process DLQ manually
const dlqJobs = await dlqQueue.getJobs(['waiting', 'delayed']);
for (const job of dlqJobs) {
// Investigate and fix
console.log(job.data);
}
Monitoring
Queue Metrics
// Get queue stats
const stats = await queue.getJobCounts();
// { waiting: 10, active: 5, completed: 100, failed: 2 }
Prometheus Metrics
# Active jobs
bullmq_jobs_active{queue="crm-events"}
# Waiting jobs
bullmq_jobs_waiting{queue="crm-events"}
# Failed jobs
bullmq_jobs_failed{queue="crm-events"}
# Processing duration
histogram_quantile(0.95, bullmq_job_duration_seconds_bucket)
Scaling
Horizontal Scaling
# Scale worker pods
kubectl scale deployment/crm-worker --replicas=4 -n crm
Concurrency Tuning
// Adjust per-queue concurrency
const worker = new Worker('crm-events', processor, {
concurrency: process.env.EVENT_CONCURRENCY || 10,
});
Common Operations
Pause Queue
await queue.pause();
Resume Queue
await queue.resume();
Drain Queue
// Remove all jobs
await queue.obliterate({ force: true });
Retry Failed Jobs
const failed = await queue.getFailed();
for (const job of failed) {
await job.retry();
}