Event System
Event-driven architecture for the CRM platform.
Overview
The CRM consumes events from source services (SCL, TeeTime, Messaging, Feedback) and produces events for downstream consumers.
Event Transport
- Technology: Redis + BullMQ
- Queue:
crm-events - Retry Policy: Exponential backoff (1s, 2s, 4s)
- Dead Letter Queue: After 3 retries
Event Envelope
interface CrmEvent {
// Envelope metadata
id: string; // Unique event ID (UUID)
type: string; // e.g., "scl.member.created"
version: string; // Schema version
timestamp: string; // ISO 8601
// Routing
tenantId: string;
source: string; // Source service
// Identity hints
identity: {
authUserId?: string;
email?: string;
phoneNumber?: string;
sclMemberId?: number;
teetimePlayerId?: string;
messagingContactId?: string;
};
// Event-specific payload
payload: Record<string, unknown>;
// Optional metadata
metadata?: {
userId?: string;
correlationId?: string;
};
}
Consumed Events
SCL Events
| Event | Action |
|---|---|
scl.member.created | Create/link profile, activity |
scl.member.updated | Update profile fields |
scl.tier.changed | Update tier, rescore |
scl.payment.received | Update LTV, activity |
scl.payment.failed | Activity, alert |
TeeTime Events
| Event | Action |
|---|---|
teetime.player.created | Create/link profile |
teetime.booking.created | Activity, update frequency |
teetime.booking.completed | Activity, trigger review |
teetime.competition.published | Auto-promote |
teetime.handicap.updated | Update handicap |
Messaging Events
| Event | Action |
|---|---|
messaging.message.sent | Activity |
messaging.message.delivered | Activity |
messaging.email.opened | Activity, engagement |
messaging.link.clicked | Activity, engagement |
messaging.consent.changed | Update opt-in flags |
Feedback Events
| Event | Action |
|---|---|
feedback.voted | Create activity (feature vote allocation) |
feedback.unvoted | Create activity (vote removed) |
feedback.shipped | Create activity (feature shipped notification) |
Produced Events
| Event | Trigger | Payload |
|---|---|---|
crm.profile.created | New profile | Profile ID, tenant |
crm.profile.merged | Duplicate merge | Source IDs, target ID |
crm.segment.membership.changed | Refresh | Segment ID, adds/removes |
crm.campaign.sent | Execution | Campaign ID, stats |
crm.churn.risk.high | Score > 80 | Profile ID, score |
Event Processing Flow
Event received
│
▼
┌─────────────────────┐
│ Parse envelope │
│ Validate schema │
└──────────┬──────────┘
│
▼
┌─────────────────────┐
│ Check idempotency │
│ (SyncLog lookup) │
└──────────┬──────────┘
│ New event?
▼
┌─────────────────────┐
│ Resolve identity │
│ Find/create profile │
└──────────┬──────────┘
│
▼
┌─────────────────────┐
│ Update profile │
│ Apply event data │
└──────────┬──────────┘
│
▼
┌─────────────────────┐
│ Create Activity │
│ Timeline record │
└──────────┬──────────┘
│
▼
┌─────────────────────┐
│ Queue async work │
│ - Rescore │
│ - Segment update │
│ - Journey trigger │
└─────────────────────┘
Idempotency
Events are deduplicated using SyncLog:
// Check if already processed
const existing = await this.prisma.syncLog.findUnique({
where: { eventId: event.id }
});
if (existing) {
return; // Skip duplicate
}
// Process event...
// Record processing
await this.prisma.syncLog.create({
data: {
eventId: event.id,
eventType: event.type,
status: 'SUCCESS',
processedAt: new Date(),
}
});
Error Handling
| Error Type | Action |
|---|---|
| Transient (network) | Retry with backoff |
| Validation error | Log and skip |
| Processing error | Retry, then DLQ |
| Unknown event type | Log and skip |
Monitoring
Queue Metrics
# Queue depth
bullmq_queue_size{queue="crm-events"}
# Processing rate
rate(bullmq_jobs_completed_total{queue="crm-events"}[5m])
# Failed jobs
rate(bullmq_jobs_failed_total{queue="crm-events"}[5m])
SyncLog Queries
-- Failed events
SELECT * FROM "SyncLog" WHERE status = 'FAILED' ORDER BY created_at DESC;
-- Processing by type
SELECT event_type, count(*) FROM "SyncLog" GROUP BY event_type;