Skip to main content

Event Processing

Queue configuration, idempotency, and monitoring.


Queue Configuration

BullMQ Settings

// BullMQ queue settings
const queueConfig = {
name: 'crm-events',
defaultJobOptions: {
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000, // 1s, 2s, 4s
},
removeOnComplete: 1000,
removeOnFail: 5000,
},
};

Priority Queues

Events are processed in priority order:

PriorityEventsProcessing
HighcontactPreference.updated, payment.failed, message.bouncedImmediate
Normalmember.*, player.*, booking.*Within seconds
Lowemail.opened, email.clicked, score.postedBatched
// Priority assignment
function getEventPriority(eventType: string): number {
const highPriority = [
'contactPreference.updated',
'scl.payment.failed',
'message.bounced',
'scl.member.status.changed',
];

const lowPriority = [
'email.opened',
'email.clicked',
'teetime.score.posted',
'scl.invoice.sent',
];

if (highPriority.includes(eventType)) return 1;
if (lowPriority.includes(eventType)) return 3;
return 2;
}

Dead Letter Queue

Failed events after all retries go to DLQ for manual review.

const dlqConfig = {
name: 'crm-events-dlq',
// Events stay for 7 days before auto-delete
removeOnComplete: {
age: 7 * 24 * 60 * 60 * 1000,
},
};

DLQ Processing

// Manual retry from DLQ
async function retryFromDlq(jobId: string): Promise<void> {
const job = await dlqQueue.getJob(jobId);
if (!job) throw new Error('Job not found');

// Requeue to main queue
await mainQueue.add('event', job.data, {
...job.opts,
attempts: 3,
});

// Remove from DLQ
await job.remove();
}

Idempotency

Events are processed idempotently using:

  1. Event ID: Each event has unique ID
  2. SyncLog: Records processed events
  3. Deduplication: Skip if event ID already processed

Implementation

@Processor('crm-events')
export class EventProcessor {
constructor(
private readonly prisma: PrismaService,
private readonly identityService: IdentityResolutionService,
) {}

@Process('event')
async processEvent(job: Job<CrmEvent>) {
const event = job.data;

// Check if already processed
const existing = await this.prisma.syncLog.findUnique({
where: { eventId: event.id },
});

if (existing?.status === 'COMPLETED') {
return; // Skip duplicate
}

// Create or update sync log
const syncLog = await this.prisma.syncLog.upsert({
where: { eventId: event.id },
create: {
eventId: event.id,
eventType: event.type,
tenantId: event.tenantId,
status: 'PROCESSING',
startedAt: new Date(),
},
update: {
status: 'PROCESSING',
startedAt: new Date(),
attempts: { increment: 1 },
},
});

try {
await this.handleEvent(event);

await this.prisma.syncLog.update({
where: { id: syncLog.id },
data: {
status: 'COMPLETED',
completedAt: new Date(),
},
});
} catch (error) {
await this.prisma.syncLog.update({
where: { id: syncLog.id },
data: {
status: 'FAILED',
error: error.message,
failedAt: new Date(),
},
});
throw error;
}
}

private async handleEvent(event: CrmEvent): Promise<void> {
// Resolve identity
const { profile } = await this.identityService.resolve({
tenantId: event.tenantId,
...event.identity,
});

// Route to handler
switch (event.type) {
case 'scl.member.created':
await this.handleMemberCreated(profile, event);
break;
case 'scl.member.updated':
await this.handleMemberUpdated(profile, event);
break;
// ... other handlers
}
}
}

Concurrency Control

// Worker configuration
const workerConfig = {
concurrency: 10, // Process 10 events concurrently
limiter: {
max: 100, // Max 100 jobs per second
duration: 1000,
},
};

Per-Tenant Rate Limiting

// Prevent single tenant from monopolizing
const perTenantLimiter = new RateLimiter({
max: 50,
duration: 1000,
keyPrefix: 'tenant',
key: (job) => job.data.tenantId,
});

Error Handling

Retry Strategy

const retryStrategy = {
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000,
},
};

// Retry delays: 1s, 2s, 4s

Error Categories

CategoryRetryAction
Transient (network)YesExponential backoff
Validation errorNoLog and skip
Profile not foundYesMay be timing issue
Database errorYesBackoff
Business logicNoMove to DLQ
function shouldRetry(error: Error): boolean {
const noRetryErrors = [
'ValidationError',
'InvalidEventError',
'BusinessLogicError',
];

return !noRetryErrors.includes(error.name);
}

Monitoring & Alerts

Metrics

MetricDescription
crm_events_processed_totalTotal events processed by type
crm_events_failed_totalFailed events by type
crm_event_processing_durationProcessing time histogram
crm_event_queue_depthCurrent queue depth
crm_identity_resolution_rate% events matched to existing profiles
crm_dlq_depthDead letter queue depth

Prometheus Metrics

// Metrics collection
const processedCounter = new Counter({
name: 'crm_events_processed_total',
help: 'Total events processed',
labelNames: ['type', 'status'],
});

const processingDuration = new Histogram({
name: 'crm_event_processing_duration_seconds',
help: 'Event processing duration',
labelNames: ['type'],
buckets: [0.01, 0.05, 0.1, 0.5, 1, 2, 5, 10],
});

const queueDepth = new Gauge({
name: 'crm_event_queue_depth',
help: 'Current queue depth',
labelNames: ['queue'],
});

Alerts

AlertConditionSeverity
High queue depthQueue > 10,000Warning
Processing failuresFailure rate > 5%Critical
DLQ growthDLQ > 100 eventsWarning
Latency spikep99 > 10sWarning
Queue stalledNo processing for 5mCritical
# Prometheus alert rules
groups:
- name: crm-events
rules:
- alert: CrmEventQueueDepthHigh
expr: crm_event_queue_depth > 10000
for: 5m
labels:
severity: warning
annotations:
summary: "CRM event queue depth high"

- alert: CrmEventProcessingFailures
expr: rate(crm_events_failed_total[5m]) / rate(crm_events_processed_total[5m]) > 0.05
for: 5m
labels:
severity: critical
annotations:
summary: "CRM event processing failure rate > 5%"

- alert: CrmDlqGrowing
expr: crm_dlq_depth > 100
for: 10m
labels:
severity: warning
annotations:
summary: "CRM dead letter queue growing"

Logging

Structured Logging

// Event processing log
logger.info('Event processed', {
eventId: event.id,
eventType: event.type,
tenantId: event.tenantId,
profileId: profile.id,
duration: processingTime,
isNewProfile: resolution.isNew,
});

// Error log
logger.error('Event processing failed', {
eventId: event.id,
eventType: event.type,
tenantId: event.tenantId,
error: error.message,
stack: error.stack,
attempt: job.attemptsMade,
});

Log Retention

Log TypeRetention
Debug7 days
Info30 days
Warning90 days
Error1 year

Operations

Pause Processing

// Pause queue
await queue.pause();

// Resume queue
await queue.resume();

Drain Queue

// Drain (process existing, no new jobs)
await queue.drain();

Clear Queue

// Remove all jobs (careful!)
await queue.clean(0, 'completed');
await queue.clean(0, 'failed');
await queue.clean(0, 'delayed');
await queue.clean(0, 'wait');

Replay Events

// Replay events from sync log
async function replayEvents(startDate: Date, endDate: Date) {
const events = await prisma.syncLog.findMany({
where: {
createdAt: { gte: startDate, lte: endDate },
},
});

for (const event of events) {
await queue.add('event', event.payload, {
jobId: `replay-${event.id}`,
});
}
}