Phase 2: Event Integration
BullMQ consumers, service sync, and activity creation.
Deliverables
- BullMQ queue configuration
- Event processor with identity resolution
- SCL → CRM event publishing
- TeeTime → CRM event publishing
- Messaging → CRM event publishing (consent sync)
- SyncLog recording for audit
1. Queue Configuration
// libs/crm/events/src/crm-events.module.ts
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bullmq';
@Module({
imports: [
BullModule.forRoot({
connection: {
host: process.env.REDIS_HOST,
port: parseInt(process.env.REDIS_PORT || '6379'),
},
}),
BullModule.registerQueue({
name: 'crm-events',
defaultJobOptions: {
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000,
},
removeOnComplete: 1000,
removeOnFail: 5000,
},
}),
],
})
export class CrmEventsModule {}
2. Event Processor
// libs/crm/events/src/processors/crm-events.processor.ts
import { Processor, WorkerHost } from '@nestjs/bullmq';
import { Job } from 'bullmq';
import { Logger, Injectable } from '@nestjs/common';
interface CrmEvent {
id: string;
type: string;
tenantId: string;
timestamp: string;
identity: {
authUserId?: string;
email?: string;
phoneNumber?: string;
sclMemberId?: number;
teetimePlayerId?: string;
messagingContactId?: string;
};
payload: Record<string, unknown>;
}
@Processor('crm-events')
@Injectable()
export class CrmEventsProcessor extends WorkerHost {
private readonly logger = new Logger(CrmEventsProcessor.name);
constructor(
private readonly identityService: IdentityResolutionService,
private readonly activityService: ActivityService,
private readonly syncLogService: SyncLogService,
private readonly prisma: PrismaService,
) {
super();
}
async process(job: Job<CrmEvent>): Promise<void> {
const event = job.data;
this.logger.log(`Processing event ${event.type} (${event.id})`);
// Log sync attempt
const syncLog = await this.syncLogService.create({
tenantId: event.tenantId,
sourceService: this.extractService(event.type),
eventType: event.type,
eventId: event.id,
eventPayload: event.payload,
});
try {
// 1. Resolve identity
const { profile, isNew } = await this.identityService.resolve({
tenantId: event.tenantId,
...event.identity,
});
// 2. Update cross-service links
await this.updateServiceLinks(profile.id, event);
// 3. Create activity record
const activity = await this.createActivity(profile.id, event);
// 4. Handle type-specific logic
await this.handleEventType(profile.id, event);
// 5. Update sync log
await this.syncLogService.markSuccess(syncLog.id, profile.id, activity?.id);
this.logger.log(`Successfully processed event ${event.id} for profile ${profile.id}`);
} catch (error) {
this.logger.error(`Failed to process event ${event.id}: ${error.message}`);
await this.syncLogService.markFailed(syncLog.id, error.message);
throw error;
}
}
private extractService(eventType: string): string {
return eventType.split('.')[0]; // scl.member.created → scl
}
private async updateServiceLinks(profileId: string, event: CrmEvent): Promise<void> {
const updates: Record<string, unknown> = {};
if (event.identity.sclMemberId) {
updates.sclMemberId = event.identity.sclMemberId;
}
if (event.identity.teetimePlayerId) {
updates.teetimePlayerId = event.identity.teetimePlayerId;
}
if (event.identity.messagingContactId) {
updates.messagingContactId = event.identity.messagingContactId;
}
if (Object.keys(updates).length > 0) {
await this.prisma.customerProfile.update({
where: { id: profileId },
data: updates,
});
}
}
private async createActivity(profileId: string, event: CrmEvent) {
const activityConfig = this.getActivityConfig(event.type);
if (!activityConfig) return null;
return this.activityService.create({
tenantId: event.tenantId,
customerId: profileId,
type: activityConfig.type,
category: activityConfig.category,
title: activityConfig.titleFn(event.payload),
metadata: event.payload,
sourceService: this.extractService(event.type),
sourceEventId: event.id,
occurredAt: new Date(event.timestamp),
});
}
private async handleEventType(profileId: string, event: CrmEvent): Promise<void> {
switch (event.type) {
case 'scl.member.created':
case 'scl.member.updated':
await this.syncMemberData(profileId, event.payload);
break;
case 'scl.member.tier.changed':
await this.syncTierData(profileId, event.payload);
break;
case 'teetime.player.created':
case 'teetime.player.updated':
await this.syncPlayerData(profileId, event.payload);
break;
case 'teetime.handicap.updated':
await this.syncHandicapData(profileId, event.payload);
break;
case 'contactPreference.updated':
await this.syncConsentData(profileId, event.payload);
break;
case 'teetime.booking.created':
await this.updateBookingMetrics(profileId);
break;
}
}
private async syncConsentData(profileId: string, payload: Record<string, unknown>): Promise<void> {
const updates: Record<string, boolean | Date> = {};
for (const [field, value] of Object.entries(payload)) {
if (!field.endsWith('OptIn') || typeof value !== 'boolean') continue;
updates[field] = value;
if (value) {
updates[`${field}At`] = new Date();
}
}
await this.prisma.customerProfile.update({
where: { id: profileId },
data: updates,
});
}
private async syncMemberData(profileId: string, payload: any): Promise<void> {
await this.prisma.customerProfile.update({
where: { id: profileId },
data: {
firstName: payload.firstName,
lastName: payload.lastName,
email: payload.email?.toLowerCase(),
membershipTier: payload.membershipTierName,
membershipStatus: payload.status,
lifecycleStage: 'MEMBER',
},
});
}
private async syncHandicapData(profileId: string, payload: any): Promise<void> {
await this.prisma.customerProfile.update({
where: { id: profileId },
data: {
handicap: payload.newHandicap,
handicapProvider: payload.source,
handicapUpdatedAt: new Date(),
},
});
}
private async updateBookingMetrics(profileId: string): Promise<void> {
await this.prisma.customerProfile.update({
where: { id: profileId },
data: {
bookingCount30d: { increment: 1 },
lastBookingAt: new Date(),
lastActivityAt: new Date(),
lastActivityType: 'TEETIME_BOOKED',
},
});
}
}
3. Activity Configuration
// libs/crm/events/src/config/activity-config.ts
interface ActivityConfig {
type: string;
category: ActivityCategory;
titleFn: (payload: any) => string;
}
export const ACTIVITY_CONFIG: Record<string, ActivityConfig> = {
'scl.member.created': {
type: 'MEMBER_CREATED',
category: 'MEMBERSHIP',
titleFn: (p) => `Became a member: ${p.membershipTierName || 'Standard'}`,
},
'scl.member.tier.changed': {
type: 'TIER_CHANGED',
category: 'MEMBERSHIP',
titleFn: (p) => `Tier changed: ${p.previousTierName} → ${p.newTierName}`,
},
'scl.payment.received': {
type: 'PAYMENT_RECEIVED',
category: 'FINANCIAL',
titleFn: (p) => `Payment received: ${formatCurrency(p.amount, p.currency)}`,
},
'teetime.booking.created': {
type: 'TEETIME_BOOKED',
category: 'BOOKING',
titleFn: (p) => `Booked tee time at ${p.clubName} - ${p.courseName}`,
},
'teetime.booking.completed': {
type: 'TEETIME_COMPLETED',
category: 'BOOKING',
titleFn: (p) => `Completed round at ${p.clubName}`,
},
'teetime.handicap.updated': {
type: 'HANDICAP_UPDATED',
category: 'GOLF',
titleFn: (p) => `Handicap updated: ${p.previousHandicap} → ${p.newHandicap}`,
},
'contactPreference.updated': {
type: 'CONSENT_UPDATED',
category: 'COMMUNICATION',
titleFn: (p) => {
const channels = Object.keys(p).filter((key) => key.endsWith('OptIn'));
return channels.length ? `Consent updated (${channels.join(', ')})` : 'Consent updated';
},
},
'message.sent': {
type: 'MESSAGE_SENT',
category: 'COMMUNICATION',
titleFn: (p) => `${p.channel} sent: ${p.templateName}`,
},
'email.opened': {
type: 'EMAIL_OPENED',
category: 'ENGAGEMENT',
titleFn: (p) => `Opened email: ${p.templateName || 'Campaign email'}`,
},
};
4. SyncLog Service
// libs/crm/events/src/services/sync-log.service.ts
@Injectable()
export class SyncLogService {
constructor(private readonly prisma: PrismaService) {}
async create(data: {
tenantId: string;
sourceService: string;
eventType: string;
eventId: string;
eventPayload?: any;
}) {
// Check for existing (idempotency)
const existing = await this.prisma.syncLog.findUnique({
where: {
sourceService_eventId: {
sourceService: data.sourceService,
eventId: data.eventId,
},
},
});
if (existing?.status === 'SUCCESS') {
return existing; // Already processed
}
return this.prisma.syncLog.upsert({
where: {
sourceService_eventId: {
sourceService: data.sourceService,
eventId: data.eventId,
},
},
create: {
...data,
status: 'PROCESSING',
attempts: 1,
},
update: {
status: 'PROCESSING',
attempts: { increment: 1 },
},
});
}
async markSuccess(id: string, customerId: string, activityId?: string) {
return this.prisma.syncLog.update({
where: { id },
data: {
status: 'SUCCESS',
processedAt: new Date(),
customerId,
activityId,
},
});
}
async markFailed(id: string, error: string) {
return this.prisma.syncLog.update({
where: { id },
data: {
status: 'FAILED',
error,
},
});
}
}
5. Event Publishing from Source Services
SCL Service (Example)
// In SCL service - after member creation
async createMember(data: CreateMemberDto): Promise<Member> {
const member = await this.prisma.member.create({ data });
// Publish to CRM queue
await this.crmQueue.add('event', {
id: uuid(),
type: 'scl.member.created',
tenantId: member.tenantId,
timestamp: new Date().toISOString(),
identity: {
sclMemberId: member.id,
email: member.email,
phoneNumber: member.phoneNumber,
authUserId: member.authUserId,
},
payload: {
id: member.id,
firstName: member.firstName,
lastName: member.lastName,
email: member.email,
status: member.status,
membershipTierId: member.tierId,
membershipTierName: member.tier?.name,
},
});
return member;
}
6. Dead Letter Queue Handler
// libs/crm/events/src/processors/dlq.processor.ts
@Processor('crm-events-dlq')
export class CrmEventsDlqProcessor extends WorkerHost {
private readonly logger = new Logger(CrmEventsDlqProcessor.name);
async process(job: Job<CrmEvent>): Promise<void> {
this.logger.warn(`Event in DLQ: ${job.data.type} (${job.data.id})`);
// Alert on-call
await this.alertService.send({
severity: 'warning',
message: `CRM event processing failed after max retries`,
data: {
eventId: job.data.id,
eventType: job.data.type,
failedAt: new Date().toISOString(),
},
});
}
}
Next Steps
After completing Phase 2, proceed to Phase 3: Timeline & Notes.