Event Processing
Queue configuration, idempotency, and monitoring.
Queue Configuration
BullMQ Settings
// BullMQ queue settings
const queueConfig = {
name: 'crm-events',
defaultJobOptions: {
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000, // 1s, 2s, 4s
},
removeOnComplete: 1000,
removeOnFail: 5000,
},
};
Priority Queues
Events are processed in priority order:
| Priority | Events | Processing |
|---|---|---|
| High | contactPreference.updated, payment.failed, message.bounced | Immediate |
| Normal | member.*, player.*, booking.* | Within seconds |
| Low | email.opened, email.clicked, score.posted | Batched |
// Priority assignment
function getEventPriority(eventType: string): number {
const highPriority = [
'contactPreference.updated',
'scl.payment.failed',
'message.bounced',
'scl.member.status.changed',
];
const lowPriority = [
'email.opened',
'email.clicked',
'teetime.score.posted',
'scl.invoice.sent',
];
if (highPriority.includes(eventType)) return 1;
if (lowPriority.includes(eventType)) return 3;
return 2;
}
Dead Letter Queue
Failed events after all retries go to DLQ for manual review.
const dlqConfig = {
name: 'crm-events-dlq',
// Events stay for 7 days before auto-delete
removeOnComplete: {
age: 7 * 24 * 60 * 60 * 1000,
},
};
DLQ Processing
// Manual retry from DLQ
async function retryFromDlq(jobId: string): Promise<void> {
const job = await dlqQueue.getJob(jobId);
if (!job) throw new Error('Job not found');
// Requeue to main queue
await mainQueue.add('event', job.data, {
...job.opts,
attempts: 3,
});
// Remove from DLQ
await job.remove();
}
Idempotency
Events are processed idempotently using:
- Event ID: Each event has unique ID
- SyncLog: Records processed events
- Deduplication: Skip if event ID already processed
Implementation
@Processor('crm-events')
export class EventProcessor {
constructor(
private readonly prisma: PrismaService,
private readonly identityService: IdentityResolutionService,
) {}
@Process('event')
async processEvent(job: Job<CrmEvent>) {
const event = job.data;
// Check if already processed
const existing = await this.prisma.syncLog.findUnique({
where: { eventId: event.id },
});
if (existing?.status === 'COMPLETED') {
return; // Skip duplicate
}
// Create or update sync log
const syncLog = await this.prisma.syncLog.upsert({
where: { eventId: event.id },
create: {
eventId: event.id,
eventType: event.type,
tenantId: event.tenantId,
status: 'PROCESSING',
startedAt: new Date(),
},
update: {
status: 'PROCESSING',
startedAt: new Date(),
attempts: { increment: 1 },
},
});
try {
await this.handleEvent(event);
await this.prisma.syncLog.update({
where: { id: syncLog.id },
data: {
status: 'COMPLETED',
completedAt: new Date(),
},
});
} catch (error) {
await this.prisma.syncLog.update({
where: { id: syncLog.id },
data: {
status: 'FAILED',
error: error.message,
failedAt: new Date(),
},
});
throw error;
}
}
private async handleEvent(event: CrmEvent): Promise<void> {
// Resolve identity
const { profile } = await this.identityService.resolve({
tenantId: event.tenantId,
...event.identity,
});
// Route to handler
switch (event.type) {
case 'scl.member.created':
await this.handleMemberCreated(profile, event);
break;
case 'scl.member.updated':
await this.handleMemberUpdated(profile, event);
break;
// ... other handlers
}
}
}
Concurrency Control
// Worker configuration
const workerConfig = {
concurrency: 10, // Process 10 events concurrently
limiter: {
max: 100, // Max 100 jobs per second
duration: 1000,
},
};
Per-Tenant Rate Limiting
// Prevent single tenant from monopolizing
const perTenantLimiter = new RateLimiter({
max: 50,
duration: 1000,
keyPrefix: 'tenant',
key: (job) => job.data.tenantId,
});
Error Handling
Retry Strategy
const retryStrategy = {
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000,
},
};
// Retry delays: 1s, 2s, 4s
Error Categories
| Category | Retry | Action |
|---|---|---|
| Transient (network) | Yes | Exponential backoff |
| Validation error | No | Log and skip |
| Profile not found | Yes | May be timing issue |
| Database error | Yes | Backoff |
| Business logic | No | Move to DLQ |
function shouldRetry(error: Error): boolean {
const noRetryErrors = [
'ValidationError',
'InvalidEventError',
'BusinessLogicError',
];
return !noRetryErrors.includes(error.name);
}
Monitoring & Alerts
Metrics
| Metric | Description |
|---|---|
crm_events_processed_total | Total events processed by type |
crm_events_failed_total | Failed events by type |
crm_event_processing_duration | Processing time histogram |
crm_event_queue_depth | Current queue depth |
crm_identity_resolution_rate | % events matched to existing profiles |
crm_dlq_depth | Dead letter queue depth |
Prometheus Metrics
// Metrics collection
const processedCounter = new Counter({
name: 'crm_events_processed_total',
help: 'Total events processed',
labelNames: ['type', 'status'],
});
const processingDuration = new Histogram({
name: 'crm_event_processing_duration_seconds',
help: 'Event processing duration',
labelNames: ['type'],
buckets: [0.01, 0.05, 0.1, 0.5, 1, 2, 5, 10],
});
const queueDepth = new Gauge({
name: 'crm_event_queue_depth',
help: 'Current queue depth',
labelNames: ['queue'],
});
Alerts
| Alert | Condition | Severity |
|---|---|---|
| High queue depth | Queue > 10,000 | Warning |
| Processing failures | Failure rate > 5% | Critical |
| DLQ growth | DLQ > 100 events | Warning |
| Latency spike | p99 > 10s | Warning |
| Queue stalled | No processing for 5m | Critical |
# Prometheus alert rules
groups:
- name: crm-events
rules:
- alert: CrmEventQueueDepthHigh
expr: crm_event_queue_depth > 10000
for: 5m
labels:
severity: warning
annotations:
summary: "CRM event queue depth high"
- alert: CrmEventProcessingFailures
expr: rate(crm_events_failed_total[5m]) / rate(crm_events_processed_total[5m]) > 0.05
for: 5m
labels:
severity: critical
annotations:
summary: "CRM event processing failure rate > 5%"
- alert: CrmDlqGrowing
expr: crm_dlq_depth > 100
for: 10m
labels:
severity: warning
annotations:
summary: "CRM dead letter queue growing"
Logging
Structured Logging
// Event processing log
logger.info('Event processed', {
eventId: event.id,
eventType: event.type,
tenantId: event.tenantId,
profileId: profile.id,
duration: processingTime,
isNewProfile: resolution.isNew,
});
// Error log
logger.error('Event processing failed', {
eventId: event.id,
eventType: event.type,
tenantId: event.tenantId,
error: error.message,
stack: error.stack,
attempt: job.attemptsMade,
});
Log Retention
| Log Type | Retention |
|---|---|
| Debug | 7 days |
| Info | 30 days |
| Warning | 90 days |
| Error | 1 year |
Operations
Pause Processing
// Pause queue
await queue.pause();
// Resume queue
await queue.resume();
Drain Queue
// Drain (process existing, no new jobs)
await queue.drain();
Clear Queue
// Remove all jobs (careful!)
await queue.clean(0, 'completed');
await queue.clean(0, 'failed');
await queue.clean(0, 'delayed');
await queue.clean(0, 'wait');
Replay Events
// Replay events from sync log
async function replayEvents(startDate: Date, endDate: Date) {
const events = await prisma.syncLog.findMany({
where: {
createdAt: { gte: startDate, lte: endDate },
},
});
for (const event of events) {
await queue.add('event', event.payload, {
jobId: `replay-${event.id}`,
});
}
}