Skip to main content

Event System

Event-driven architecture for the CRM platform.

Overview

The CRM consumes events from source services (SCL, TeeTime, Messaging, Feedback) and produces events for downstream consumers.

Event Transport

  • Technology: Redis + BullMQ
  • Queue: crm-events
  • Retry Policy: Exponential backoff (1s, 2s, 4s)
  • Dead Letter Queue: After 3 retries

Event Envelope

interface CrmEvent {
// Envelope metadata
id: string; // Unique event ID (UUID)
type: string; // e.g., "scl.member.created"
version: string; // Schema version
timestamp: string; // ISO 8601

// Routing
tenantId: string;
source: string; // Source service

// Identity hints
identity: {
authUserId?: string;
email?: string;
phoneNumber?: string;
sclMemberId?: number;
teetimePlayerId?: string;
messagingContactId?: string;
};

// Event-specific payload
payload: Record<string, unknown>;

// Optional metadata
metadata?: {
userId?: string;
correlationId?: string;
};
}

Consumed Events

SCL Events

EventAction
scl.member.createdCreate/link profile, activity
scl.member.updatedUpdate profile fields
scl.tier.changedUpdate tier, rescore
scl.payment.receivedUpdate LTV, activity
scl.payment.failedActivity, alert

TeeTime Events

EventAction
teetime.player.createdCreate/link profile
teetime.booking.createdActivity, update frequency
teetime.booking.completedActivity, trigger review
teetime.competition.publishedAuto-promote
teetime.handicap.updatedUpdate handicap

Messaging Events

EventAction
messaging.message.sentActivity
messaging.message.deliveredActivity
messaging.email.openedActivity, engagement
messaging.link.clickedActivity, engagement
messaging.consent.changedUpdate opt-in flags

Feedback Events

EventAction
feedback.votedCreate activity (feature vote allocation)
feedback.unvotedCreate activity (vote removed)
feedback.shippedCreate activity (feature shipped notification)

Produced Events

EventTriggerPayload
crm.profile.createdNew profileProfile ID, tenant
crm.profile.mergedDuplicate mergeSource IDs, target ID
crm.segment.membership.changedRefreshSegment ID, adds/removes
crm.campaign.sentExecutionCampaign ID, stats
crm.churn.risk.highScore > 80Profile ID, score

Event Processing Flow

Event received


┌─────────────────────┐
│ Parse envelope │
│ Validate schema │
└──────────┬──────────┘


┌─────────────────────┐
│ Check idempotency │
│ (SyncLog lookup) │
└──────────┬──────────┘
│ New event?

┌─────────────────────┐
│ Resolve identity │
│ Find/create profile │
└──────────┬──────────┘


┌─────────────────────┐
│ Update profile │
│ Apply event data │
└──────────┬──────────┘


┌─────────────────────┐
│ Create Activity │
│ Timeline record │
└──────────┬──────────┘


┌─────────────────────┐
│ Queue async work │
│ - Rescore │
│ - Segment update │
│ - Journey trigger │
└─────────────────────┘

Idempotency

Events are deduplicated using SyncLog:

// Check if already processed
const existing = await this.prisma.syncLog.findUnique({
where: { eventId: event.id }
});

if (existing) {
return; // Skip duplicate
}

// Process event...

// Record processing
await this.prisma.syncLog.create({
data: {
eventId: event.id,
eventType: event.type,
status: 'SUCCESS',
processedAt: new Date(),
}
});

Error Handling

Error TypeAction
Transient (network)Retry with backoff
Validation errorLog and skip
Processing errorRetry, then DLQ
Unknown event typeLog and skip

Monitoring

Queue Metrics

# Queue depth
bullmq_queue_size{queue="crm-events"}

# Processing rate
rate(bullmq_jobs_completed_total{queue="crm-events"}[5m])

# Failed jobs
rate(bullmq_jobs_failed_total{queue="crm-events"}[5m])

SyncLog Queries

-- Failed events
SELECT * FROM "SyncLog" WHERE status = 'FAILED' ORDER BY created_at DESC;

-- Processing by type
SELECT event_type, count(*) FROM "SyncLog" GROUP BY event_type;