CRM & Marketing Platform Implementation Guide
Version: 2.0 Date: December 2024
Table of Contents
- Overview
- Prerequisites
- Phase 1: Foundation
- Phase 2: Event Integration
- Phase 3: Timeline & Notes
- Phase 4: Marketing Core
- Phase 5: Marketing Automation
- Phase 6: Social Integration
- Phase 7: Backfill
- Phase 8: Intelligence
- Testing Strategy
- Deployment
1. Overview
This guide provides step-by-step instructions for implementing the CRM & Marketing Platform. Follow the phases in order, as each builds on the previous.
Phase Summary
| Phase | Description | Key Deliverables |
|---|---|---|
| 1 | Foundation | Database, Identity Resolution, Basic API |
| 2 | Event Integration | BullMQ consumers, SCL/TeeTime/Messaging sync |
| 3 | Timeline & Notes | Activity timeline, Customer notes |
| 4 | Marketing Core | Campaigns, Segments (CRM-owned), Scheduling |
| 5 | Marketing Automation | Journey builder, Event triggers, A/B testing |
| 6 | Social Integration | OAuth connections, Publisher, Event promotion |
| 7 | Backfill | MCA v1 ZA/UK migration |
| 8 | Intelligence | Engagement scoring, Churn prediction |
2. Prerequisites
Infrastructure
- PostgreSQL database provisioned (CRM database)
- Redis available for event transport and caching
- Kubernetes namespace/deployment slots
- Infisical secrets configured
Codebase Setup
# Create library structure
mkdir -p libs/prisma/crm-client
mkdir -p libs/crm/core
mkdir -p libs/crm/events
mkdir -p libs/crm/campaigns
mkdir -p libs/crm/automation
mkdir -p libs/crm/social
mkdir -p apps/crm/crm-backend
Dependencies
{
"dependencies": {
"@prisma/client": "^6.x",
"@nestjs/common": "^10.x",
"@nestjs/bullmq": "^10.x",
"@nestjs/schedule": "^4.x",
"bullmq": "^5.x",
"cron-parser": "^4.x"
}
}
3. Phase 1: Foundation
3.1 Database Setup
Create Prisma Client Library
cd libs/prisma/crm-client
pnpm prisma init
Schema Setup
Copy the schema from data-model.md to:
libs/prisma/crm-client/prisma/schema.prisma
Initial Migration
# Set database URL in Infisical: CRM_DATABASE_URL=postgresql://...
cd libs/prisma/crm-client
pnpm prisma migrate dev --name init
pnpm prisma generate
3.2 Core Service Module
Project Structure
libs/crm/core/
├── src/
│ ├── index.ts
│ ├── crm-core.module.ts
│ ├── services/
│ │ ├── customer-profile.service.ts
│ │ ├── identity-resolution.service.ts
│ │ └── profile-merge.service.ts
│ ├── repositories/
│ │ ├── customer-profile.repository.ts
│ │ └── identity-alias.repository.ts
│ └── dto/
│ ├── create-customer.dto.ts
│ ├── update-customer.dto.ts
│ └── resolve-identity.dto.ts
├── project.json
└── tsconfig.lib.json
Identity Resolution Service
// libs/crm/core/src/services/identity-resolution.service.ts
import { Injectable, Logger } from '@nestjs/common';
import { PrismaService } from '@digiwedge/crm-client';
export interface IdentityHints {
tenantId: string;
authUserId?: string;
email?: string;
phoneNumber?: string;
sclMemberId?: number;
teetimePlayerId?: string;
messagingContactId?: string;
externalIds?: Record<string, string>;
}
export interface ResolutionResult {
profile: CustomerProfile;
isNew: boolean;
matchedBy: string[];
confidence: number;
}
@Injectable()
export class IdentityResolutionService {
private readonly logger = new Logger(IdentityResolutionService.name);
constructor(private readonly prisma: PrismaService) {}
async resolve(hints: IdentityHints): Promise<ResolutionResult> {
const { tenantId, authUserId, email, phoneNumber } = hints;
// 1. Try authUserId (100% confidence)
if (authUserId) {
const profile = await this.prisma.customerProfile.findUnique({
where: { authUserId },
});
if (profile) {
return { profile, isNew: false, matchedBy: ['authUserId'], confidence: 100 };
}
}
// 2. Try email (95% confidence)
if (email) {
const normalizedEmail = this.normalizeEmail(email);
const profile = await this.prisma.customerProfile.findUnique({
where: { tenantId_email: { tenantId, email: normalizedEmail } },
});
if (profile) {
// Link authUserId if we now have it
if (authUserId && !profile.authUserId) {
await this.prisma.customerProfile.update({
where: { id: profile.id },
data: { authUserId },
});
}
return { profile, isNew: false, matchedBy: ['email'], confidence: 95 };
}
}
// 3. Try phone (85% confidence)
if (phoneNumber) {
const e164Phone = this.normalizePhone(phoneNumber);
const profile = await this.prisma.customerProfile.findUnique({
where: { tenantId_phoneNumber: { tenantId, phoneNumber: e164Phone } },
});
if (profile) {
return { profile, isNew: false, matchedBy: ['phone'], confidence: 85 };
}
}
// 4. Try cross-service links
if (hints.sclMemberId) {
const profile = await this.prisma.customerProfile.findUnique({
where: { tenantId_sclMemberId: { tenantId, sclMemberId: hints.sclMemberId } },
});
if (profile) {
return { profile, isNew: false, matchedBy: ['sclMemberId'], confidence: 100 };
}
}
// 5. No match - create new profile
const newProfile = await this.prisma.customerProfile.create({
data: {
tenantId,
authUserId,
email: email ? this.normalizeEmail(email) : undefined,
phoneNumber: phoneNumber ? this.normalizePhone(phoneNumber) : undefined,
sclMemberId: hints.sclMemberId,
teetimePlayerId: hints.teetimePlayerId,
messagingContactId: hints.messagingContactId,
externalIds: hints.externalIds || {},
status: 'ACTIVE',
lifecycleStage: 'SUBSCRIBER',
},
});
return { profile: newProfile, isNew: true, matchedBy: [], confidence: 100 };
}
private normalizeEmail(email: string): string {
return email.toLowerCase().trim();
}
private normalizePhone(phone: string): string {
const digits = phone.replace(/\D/g, '');
if (digits.startsWith('27') && digits.length === 11) {
return `+${digits}`;
}
if (digits.startsWith('0') && digits.length === 10) {
return `+27${digits.slice(1)}`;
}
return phone;
}
}
3.3 REST API
// apps/crm/crm-backend/src/app/customer/customer.controller.ts
@ApiTags('customers')
@Controller('api/v1/customers')
export class CustomerController {
constructor(
private readonly customerService: CustomerProfileService,
private readonly identityService: IdentityResolutionService,
) {}
@Get()
@ApiOperation({ summary: 'List customers with filters' })
async list(@Query() query: ListCustomersDto) {
return this.customerService.list(query);
}
@Get('resolve')
@ApiOperation({ summary: 'Resolve customer by identity hints' })
async resolve(@Query() hints: ResolveIdentityDto) {
return this.identityService.resolve(hints);
}
@Get(':id')
@ApiOperation({ summary: 'Get customer by ID' })
async get(@Param('id') id: string) {
return this.customerService.getById(id);
}
@Patch(':id')
@ApiOperation({ summary: 'Update customer' })
async update(@Param('id') id: string, @Body() dto: UpdateCustomerDto) {
return this.customerService.update(id, dto);
}
@Post(':id/merge')
@ApiOperation({ summary: 'Merge two customer profiles' })
async merge(@Param('id') winnerId: string, @Body() dto: MergeCustomerDto) {
return this.customerService.merge(winnerId, dto.loserId);
}
}
3.4 Deliverables - Phase 1
- Prisma schema created and migrated
- CustomerProfile CRUD operations
- IdentityResolution service
- Basic REST API for customers
- Unit tests for identity resolution
4. Phase 2: Event Integration
4.1 Event Consumer Setup
// libs/crm/events/src/processors/crm-events.processor.ts
import { Processor, WorkerHost } from '@nestjs/bullmq';
import { Job } from 'bullmq';
interface CrmEvent {
id: string;
type: string;
tenantId: string;
timestamp: string;
identity: {
authUserId?: string;
email?: string;
phoneNumber?: string;
sclMemberId?: number;
teetimePlayerId?: string;
messagingContactId?: string;
};
payload: Record<string, unknown>;
}
@Processor('crm-events')
export class CrmEventsProcessor extends WorkerHost {
constructor(
private readonly identityService: IdentityResolutionService,
private readonly activityService: ActivityService,
private readonly syncLogService: SyncLogService,
) {
super();
}
async process(job: Job<CrmEvent>): Promise<void> {
const event = job.data;
// Log sync attempt
const syncLog = await this.syncLogService.create({
tenantId: event.tenantId,
sourceService: this.extractService(event.type),
eventType: event.type,
eventId: event.id,
eventPayload: event.payload,
});
try {
// 1. Resolve identity
const { profile, isNew } = await this.identityService.resolve({
tenantId: event.tenantId,
...event.identity,
});
// 2. Update cross-service links
await this.updateServiceLinks(profile.id, event);
// 3. Create activity record
await this.createActivity(profile.id, event);
// 4. Handle type-specific logic
await this.handleEventType(profile.id, event);
// 5. Update sync log
await this.syncLogService.markSuccess(syncLog.id, profile.id);
} catch (error) {
await this.syncLogService.markFailed(syncLog.id, error.message);
throw error;
}
}
private async handleEventType(profileId: string, event: CrmEvent): Promise<void> {
switch (event.type) {
case 'scl.member.updated':
await this.syncMemberData(profileId, event.payload);
break;
case 'teetime.handicap.updated':
await this.syncHandicapData(profileId, event.payload);
break;
case 'contactPreference.updated':
await this.syncConsentData(profileId, event.payload);
break;
}
}
private async syncConsentData(profileId: string, payload: Record<string, unknown>): Promise<void> {
// Consent is owned by Messaging - we only sync the flags
const updates: Record<string, boolean | Date> = {};
for (const [field, value] of Object.entries(payload)) {
if (!field.endsWith('OptIn') || typeof value !== 'boolean') continue;
updates[field] = value;
if (value) {
updates[`${field}At`] = new Date();
}
}
await this.prisma.customerProfile.update({
where: { id: profileId },
data: updates,
});
}
}
4.2 Deliverables - Phase 2
- BullMQ queue configuration
- Event processor with identity resolution
- SCL → CRM event publishing
- TeeTime → CRM event publishing
- Messaging → CRM event publishing (consent sync)
- SyncLog recording for audit
5. Phase 3: Timeline & Notes
5.1 Activity Service
// libs/crm/core/src/services/activity.service.ts
@Injectable()
export class ActivityService {
constructor(private readonly prisma: PrismaService) {}
async create(dto: CreateActivityDto): Promise<Activity> {
// Check for duplicate (idempotency)
if (dto.sourceEventId) {
const existing = await this.prisma.activity.findUnique({
where: {
tenantId_sourceService_sourceEventId: {
tenantId: dto.tenantId,
sourceService: dto.sourceService,
sourceEventId: dto.sourceEventId,
},
},
});
if (existing) return existing;
}
const activity = await this.prisma.activity.create({ data: dto });
// Update profile activity metrics
await this.prisma.customerProfile.update({
where: { id: dto.customerId },
data: {
lastActivityAt: dto.occurredAt,
lastActivityType: dto.type,
activityCount30d: { increment: 1 },
},
});
return activity;
}
async getTimeline(
customerId: string,
filters: TimelineFilters = {},
pagination: { skip?: number; take?: number } = {},
): Promise<{ activities: Activity[]; total: number }> {
const where: Prisma.ActivityWhereInput = { customerId };
if (filters.categories?.length) {
where.category = { in: filters.categories };
}
if (filters.startDate || filters.endDate) {
where.occurredAt = {};
if (filters.startDate) where.occurredAt.gte = filters.startDate;
if (filters.endDate) where.occurredAt.lte = filters.endDate;
}
const [activities, total] = await Promise.all([
this.prisma.activity.findMany({
where,
orderBy: { occurredAt: 'desc' },
skip: pagination.skip ?? 0,
take: pagination.take ?? 50,
}),
this.prisma.activity.count({ where }),
]);
return { activities, total };
}
}
5.2 Notes Service
// libs/crm/core/src/services/customer-notes.service.ts
@Injectable()
export class CustomerNotesService {
constructor(
private readonly prisma: PrismaService,
private readonly activityService: ActivityService,
) {}
async create(dto: CreateNoteDto, userId: string): Promise<CustomerNote> {
const note = await this.prisma.customerNote.create({
data: {
tenantId: dto.tenantId,
customerId: dto.customerId,
content: dto.content,
type: dto.type ?? 'GENERAL',
isPinned: dto.isPinned ?? false,
isPrivate: dto.isPrivate ?? false,
createdBy: userId,
},
});
// Create activity
await this.activityService.create({
tenantId: dto.tenantId,
customerId: dto.customerId,
type: 'NOTE_ADDED',
category: 'SUPPORT',
title: `Note added: ${dto.type || 'General'}`,
sourceService: 'crm',
sourceId: note.id,
occurredAt: new Date(),
});
return note;
}
async list(customerId: string, includePrivate: boolean, userId?: string) {
const where: Prisma.CustomerNoteWhereInput = { customerId };
if (!includePrivate) {
where.OR = [
{ isPrivate: false },
{ isPrivate: true, createdBy: userId },
];
}
return this.prisma.customerNote.findMany({
where,
orderBy: [{ isPinned: 'desc' }, { createdAt: 'desc' }],
});
}
}
5.3 Deliverables - Phase 3
- Activity service with CRUD
- Timeline API with filters
- Customer notes CRUD
- Activity metrics denormalization
6. Phase 4: Marketing Core
6.1 Segmentation Service (CRM Owns Segments)
// libs/crm/campaigns/src/services/segmentation.service.ts
@Injectable()
export class SegmentationService {
constructor(
private readonly prisma: PrismaService,
private readonly messagingClient: MessagingServiceClient,
) {}
async createSegment(dto: CreateSegmentDto): Promise<CustomerSegment> {
const segment = await this.prisma.customerSegment.create({
data: {
tenantId: dto.tenantId,
clubId: dto.clubId,
name: dto.name,
description: dto.description,
type: dto.type,
criteria: dto.criteria,
combinator: dto.combinator ?? 'AND',
refreshFrequency: dto.refreshFrequency ?? 'DAILY',
createdBy: dto.createdBy,
},
});
// Initial refresh for dynamic segments
if (dto.type === 'DYNAMIC') {
await this.refreshSegment(segment.id);
}
return segment;
}
async refreshSegment(segmentId: string): Promise<void> {
const segment = await this.prisma.customerSegment.findUnique({
where: { id: segmentId },
});
if (!segment || segment.type === 'STATIC') return;
// Build query from criteria
const where = this.buildWhereFromCriteria(
segment.tenantId,
segment.criteria as SegmentCriteria,
);
// Find matching profiles
const matchingProfiles = await this.prisma.customerProfile.findMany({
where,
select: { id: true, email: true, phoneNumber: true },
});
// Update membership
await this.prisma.$transaction([
this.prisma.customerSegmentMembership.deleteMany({
where: { segmentId },
}),
this.prisma.customerSegmentMembership.createMany({
data: matchingProfiles.map(p => ({
tenantId: segment.tenantId,
segmentId,
customerId: p.id,
matchedAt: new Date(),
})),
}),
this.prisma.customerSegment.update({
where: { id: segmentId },
data: {
memberCount: matchingProfiles.length,
lastRefreshedAt: new Date(),
},
}),
]);
}
async syncToMessaging(segmentId: string): Promise<void> {
const segment = await this.prisma.customerSegment.findUnique({
where: { id: segmentId },
include: {
memberships: {
include: { customer: true },
},
},
});
if (!segment) throw new NotFoundException('Segment not found');
// Create or update segment in Messaging service
const messagingSegment = await this.messagingClient.upsertSegment({
externalId: segment.id,
tenantId: segment.tenantId,
name: segment.name,
contacts: segment.memberships.map(m => ({
email: m.customer.email,
phoneNumber: m.customer.phoneNumber,
firstName: m.customer.firstName,
lastName: m.customer.lastName,
})),
});
// Store messaging segment ID for reference
await this.prisma.customerSegment.update({
where: { id: segmentId },
data: {
messagingSegmentId: messagingSegment.id,
lastSyncedAt: new Date(),
},
});
}
private buildWhereFromCriteria(
tenantId: string,
criteria: SegmentCriteria,
): Prisma.CustomerProfileWhereInput {
const conditions = criteria.rules.map(rule => this.ruleToCondition(rule));
return {
tenantId,
status: { not: 'MERGED' },
deletedAt: null,
...(criteria.combinator === 'OR' ? { OR: conditions } : { AND: conditions }),
};
}
private ruleToCondition(rule: SegmentRule): Prisma.CustomerProfileWhereInput {
const { field, operator, value } = rule;
const operators: Record<string, any> = {
eq: value,
neq: { not: value },
gt: { gt: value },
gte: { gte: value },
lt: { lt: value },
lte: { lte: value },
contains: { contains: value, mode: 'insensitive' },
in: { in: value },
isNull: null,
isNotNull: { not: null },
};
return { [field]: operators[operator] ?? value };
}
}
6.2 Campaign Service
// libs/crm/campaigns/src/services/campaign.service.ts
@Injectable()
export class CampaignService {
constructor(
private readonly prisma: PrismaService,
private readonly segmentService: SegmentationService,
private readonly schedulerService: CampaignSchedulerService,
) {}
async create(dto: CreateCampaignDto): Promise<Campaign> {
const campaign = await this.prisma.campaign.create({
data: {
tenantId: dto.tenantId,
clubId: dto.clubId,
name: dto.name,
description: dto.description,
type: dto.type,
status: 'DRAFT',
segmentId: dto.segmentId,
channels: dto.channels,
emailTemplateId: dto.emailTemplateId,
smsTemplateId: dto.smsTemplateId,
pushTemplateId: dto.pushTemplateId,
timezone: dto.timezone ?? 'Africa/Johannesburg',
createdBy: dto.createdBy,
},
});
return campaign;
}
async schedule(campaignId: string, scheduledAt: Date): Promise<Campaign> {
const campaign = await this.prisma.campaign.findUnique({
where: { id: campaignId },
include: { segment: true },
});
if (!campaign) throw new NotFoundException('Campaign not found');
if (campaign.status !== 'DRAFT') {
throw new BadRequestException('Can only schedule draft campaigns');
}
// Refresh segment and get target count
if (campaign.segmentId) {
await this.segmentService.refreshSegment(campaign.segmentId);
await this.segmentService.syncToMessaging(campaign.segmentId);
}
const targetCount = campaign.segment?.memberCount ?? 0;
// Update campaign status
const updated = await this.prisma.campaign.update({
where: { id: campaignId },
data: {
status: 'SCHEDULED',
scheduledAt,
targetCount,
},
});
// Schedule the job
await this.schedulerService.scheduleCampaign(updated);
return updated;
}
async execute(campaignId: string): Promise<void> {
const campaign = await this.prisma.campaign.findUnique({
where: { id: campaignId },
include: {
segment: {
include: {
memberships: {
include: { customer: true },
},
},
},
},
});
if (!campaign) throw new NotFoundException('Campaign not found');
await this.prisma.campaign.update({
where: { id: campaignId },
data: { status: 'ACTIVE', startedAt: new Date() },
});
try {
// Get recipients
const recipients = campaign.segment?.memberships.map(m => m.customer) ?? [];
// Send via each channel
for (const channel of campaign.channels) {
await this.sendViaChannel(campaign, recipients, channel);
}
await this.prisma.campaign.update({
where: { id: campaignId },
data: { status: 'COMPLETED', completedAt: new Date() },
});
} catch (error) {
await this.prisma.campaign.update({
where: { id: campaignId },
data: { status: 'PAUSED' },
});
throw error;
}
}
private async sendViaChannel(
campaign: Campaign,
recipients: CustomerProfile[],
channel: Channel,
): Promise<void> {
switch (channel) {
case 'EMAIL':
await this.messagingClient.sendBulkEmail({
templateId: campaign.emailTemplateId,
recipients: recipients
.filter(r => r.emailOptIn && r.email)
.map(r => ({
email: r.email,
data: { firstName: r.firstName, lastName: r.lastName },
})),
campaignId: campaign.id,
});
break;
case 'SMS':
await this.messagingClient.sendBulkSms({
templateId: campaign.smsTemplateId,
recipients: recipients
.filter(r => r.smsOptIn && r.phoneNumber)
.map(r => ({
phone: r.phoneNumber,
data: { firstName: r.firstName },
})),
campaignId: campaign.id,
});
break;
case 'SOCIAL':
// Handled separately via social posts
break;
}
}
}
6.3 Campaign Scheduler
// libs/crm/campaigns/src/services/campaign-scheduler.service.ts
@Injectable()
export class CampaignSchedulerService {
constructor(
@InjectQueue('campaign-execution') private readonly queue: Queue,
private readonly prisma: PrismaService,
) {}
async scheduleCampaign(campaign: Campaign): Promise<void> {
const delay = campaign.scheduledAt.getTime() - Date.now();
if (delay <= 0) {
// Execute immediately
await this.queue.add('execute', { campaignId: campaign.id });
} else {
// Schedule for later
await this.queue.add(
'execute',
{ campaignId: campaign.id },
{ delay, jobId: `campaign-${campaign.id}` },
);
}
}
async cancelScheduledCampaign(campaignId: string): Promise<void> {
const job = await this.queue.getJob(`campaign-${campaignId}`);
if (job) {
await job.remove();
}
}
}
@Processor('campaign-execution')
export class CampaignExecutionProcessor extends WorkerHost {
constructor(private readonly campaignService: CampaignService) {
super();
}
async process(job: Job<{ campaignId: string }>): Promise<void> {
await this.campaignService.execute(job.data.campaignId);
}
}
6.4 Deliverables - Phase 4
- Segmentation service with dynamic rules
- Segment sync to Messaging service
- Campaign CRUD
- Campaign scheduling with BullMQ
- Campaign execution (Email, SMS)
- Campaign stats tracking
7. Phase 5: Marketing Automation
7.1 Journey Service
// libs/crm/automation/src/services/journey.service.ts
@Injectable()
export class JourneyService {
constructor(
private readonly prisma: PrismaService,
private readonly journeyEngine: JourneyEngine,
) {}
async create(dto: CreateJourneyDto): Promise<Journey> {
return this.prisma.journey.create({
data: {
tenantId: dto.tenantId,
clubId: dto.clubId,
name: dto.name,
description: dto.description,
triggerType: dto.triggerType,
triggerConfig: dto.triggerConfig,
allowReentry: dto.allowReentry ?? false,
timezone: dto.timezone ?? 'Africa/Johannesburg',
createdBy: dto.createdBy,
},
});
}
async addStep(journeyId: string, dto: CreateJourneyStepDto): Promise<JourneyStep> {
const journey = await this.prisma.journey.findUnique({
where: { id: journeyId },
include: { steps: true },
});
if (!journey) throw new NotFoundException('Journey not found');
const stepNumber = journey.steps.length + 1;
return this.prisma.journeyStep.create({
data: {
journeyId,
stepNumber,
type: dto.type,
name: dto.name,
config: dto.config,
nextStepId: dto.nextStepId,
nextStepYesId: dto.nextStepYesId,
nextStepNoId: dto.nextStepNoId,
},
});
}
async activate(journeyId: string): Promise<Journey> {
const journey = await this.prisma.journey.findUnique({
where: { id: journeyId },
include: { steps: true },
});
if (!journey) throw new NotFoundException('Journey not found');
if (journey.steps.length === 0) {
throw new BadRequestException('Journey must have at least one step');
}
const updated = await this.prisma.journey.update({
where: { id: journeyId },
data: { status: 'ACTIVE', activatedAt: new Date() },
});
// Register trigger listener
await this.journeyEngine.registerTrigger(updated);
return updated;
}
}
7.2 Journey Engine
// libs/crm/automation/src/services/journey-engine.service.ts
@Injectable()
export class JourneyEngine {
constructor(
private readonly prisma: PrismaService,
@InjectQueue('journey-steps') private readonly stepQueue: Queue,
) {}
async enrollCustomer(
journeyId: string,
customerId: string,
triggerData?: any,
): Promise<JourneyEnrollment> {
const journey = await this.prisma.journey.findUnique({
where: { id: journeyId },
include: { steps: { orderBy: { stepNumber: 'asc' } } },
});
if (!journey || journey.status !== 'ACTIVE') {
throw new BadRequestException('Journey not active');
}
// Check for existing enrollment
if (!journey.allowReentry) {
const existing = await this.prisma.journeyEnrollment.findUnique({
where: { journeyId_customerId: { journeyId, customerId } },
});
if (existing) return existing;
}
const firstStep = journey.steps[0];
const enrollment = await this.prisma.journeyEnrollment.create({
data: {
tenantId: journey.tenantId,
journeyId,
customerId,
status: 'ACTIVE',
currentStepId: firstStep?.id,
currentStepNumber: 1,
triggerData,
},
});
// Update journey stats
await this.prisma.journey.update({
where: { id: journeyId },
data: { totalEnrolled: { increment: 1 } },
});
// Queue first step
if (firstStep) {
await this.queueStep(enrollment.id, firstStep.id);
}
return enrollment;
}
async processStep(enrollmentId: string, stepId: string): Promise<void> {
const enrollment = await this.prisma.journeyEnrollment.findUnique({
where: { id: enrollmentId },
include: { journey: true, customer: true },
});
const step = await this.prisma.journeyStep.findUnique({
where: { id: stepId },
});
if (!enrollment || !step || enrollment.status !== 'ACTIVE') return;
// Record step entry
await this.prisma.journeyEnrollmentStep.create({
data: {
enrollmentId,
stepId,
status: 'IN_PROGRESS',
},
});
try {
const result = await this.executeStep(enrollment, step);
// Record completion
await this.prisma.journeyEnrollmentStep.update({
where: { enrollmentId_stepId: { enrollmentId, stepId } },
data: { status: 'COMPLETED', completedAt: new Date(), result },
});
// Determine next step
const nextStepId = this.determineNextStep(step, result);
if (nextStepId) {
await this.queueStep(enrollmentId, nextStepId);
} else {
// Journey complete
await this.completeEnrollment(enrollmentId);
}
} catch (error) {
await this.prisma.journeyEnrollmentStep.update({
where: { enrollmentId_stepId: { enrollmentId, stepId } },
data: { status: 'FAILED', result: { error: error.message } },
});
}
}
private async executeStep(
enrollment: JourneyEnrollment & { customer: CustomerProfile },
step: JourneyStep,
): Promise<any> {
const config = step.config as any;
switch (step.type) {
case 'SEND':
return this.executeSendStep(enrollment, config);
case 'WAIT':
return this.executeWaitStep(enrollment, config);
case 'CONDITION':
return this.executeConditionStep(enrollment, config);
case 'UPDATE':
return this.executeUpdateStep(enrollment, config);
case 'END':
return { completed: true };
default:
throw new Error(`Unknown step type: ${step.type}`);
}
}
private async executeSendStep(
enrollment: JourneyEnrollment & { customer: CustomerProfile },
config: { channel: string; templateId: string },
): Promise<any> {
// Send via Messaging service
await this.messagingClient.send({
channel: config.channel,
templateId: config.templateId,
recipient: {
email: enrollment.customer.email,
phone: enrollment.customer.phoneNumber,
},
data: {
firstName: enrollment.customer.firstName,
lastName: enrollment.customer.lastName,
},
});
return { sent: true, channel: config.channel };
}
private async executeWaitStep(
enrollment: JourneyEnrollment,
config: { duration: string },
): Promise<any> {
const waitUntil = this.calculateWaitUntil(config.duration);
await this.prisma.journeyEnrollment.update({
where: { id: enrollment.id },
data: { waitingUntil: waitUntil },
});
return { waitingUntil };
}
private async executeConditionStep(
enrollment: JourneyEnrollment & { customer: CustomerProfile },
config: { rules: any[]; combinator: string },
): Promise<{ result: boolean }> {
// Evaluate condition against customer profile
const result = this.evaluateCondition(enrollment.customer, config);
return { result };
}
private determineNextStep(step: JourneyStep, result: any): string | null {
if (step.type === 'CONDITION') {
return result.result ? step.nextStepYesId : step.nextStepNoId;
}
return step.nextStepId;
}
private async queueStep(enrollmentId: string, stepId: string): Promise<void> {
await this.stepQueue.add('process-step', { enrollmentId, stepId });
}
private async completeEnrollment(enrollmentId: string): Promise<void> {
await this.prisma.journeyEnrollment.update({
where: { id: enrollmentId },
data: { status: 'COMPLETED', completedAt: new Date() },
});
const enrollment = await this.prisma.journeyEnrollment.findUnique({
where: { id: enrollmentId },
});
if (enrollment) {
await this.prisma.journey.update({
where: { id: enrollment.journeyId },
data: { totalCompleted: { increment: 1 } },
});
}
}
}
7.3 Event Trigger Handler
// libs/crm/automation/src/services/event-trigger.service.ts
@Injectable()
export class EventTriggerService {
private triggerMap: Map<string, Journey[]> = new Map();
constructor(
private readonly prisma: PrismaService,
private readonly journeyEngine: JourneyEngine,
) {}
async loadTriggers(): Promise<void> {
const journeys = await this.prisma.journey.findMany({
where: { status: 'ACTIVE' },
});
this.triggerMap.clear();
for (const journey of journeys) {
const triggers = this.triggerMap.get(journey.triggerType) || [];
triggers.push(journey);
this.triggerMap.set(journey.triggerType, triggers);
}
}
async handleEvent(event: CrmEvent): Promise<void> {
const journeys = this.triggerMap.get(event.type) || [];
for (const journey of journeys) {
// Check if event matches trigger conditions
if (this.matchesTriggerConditions(event, journey.triggerConfig)) {
// Resolve customer and enroll
const { profile } = await this.identityService.resolve({
tenantId: event.tenantId,
...event.identity,
});
await this.journeyEngine.enrollCustomer(
journey.id,
profile.id,
event.payload,
);
}
}
}
}
7.4 Wait Step Processor (Cron)
// libs/crm/automation/src/jobs/wait-step.job.ts
@Injectable()
export class WaitStepJob {
constructor(
private readonly prisma: PrismaService,
private readonly journeyEngine: JourneyEngine,
) {}
@Cron('* * * * *') // Every minute
async processWaitingEnrollments(): Promise<void> {
const waitingEnrollments = await this.prisma.journeyEnrollment.findMany({
where: {
status: 'ACTIVE',
waitingUntil: { lte: new Date() },
},
include: {
journey: { include: { steps: true } },
},
});
for (const enrollment of waitingEnrollments) {
// Clear wait and move to next step
const currentStep = enrollment.journey.steps.find(
s => s.id === enrollment.currentStepId,
);
if (currentStep?.nextStepId) {
await this.prisma.journeyEnrollment.update({
where: { id: enrollment.id },
data: { waitingUntil: null },
});
await this.journeyEngine.queueStep(enrollment.id, currentStep.nextStepId);
}
}
}
}
7.5 Deliverables - Phase 5
- Journey CRUD
- Journey step management
- Journey engine (enrollment, step execution)
- Event triggers for journey enrollment
- Wait step processing (cron)
- Condition evaluation
- Journey analytics
8. Phase 6: Social Integration
8.1 Social Connection Service
// libs/crm/social/src/services/social-connection.service.ts
@Injectable()
export class SocialConnectionService {
constructor(
private readonly prisma: PrismaService,
private readonly encryptionService: EncryptionService,
) {}
async initiateOAuth(
tenantId: string,
clubId: string,
platform: SocialPlatform,
redirectUri: string,
): Promise<{ authUrl: string; state: string }> {
const state = this.generateState(tenantId, clubId);
const authUrl = this.buildOAuthUrl(platform, redirectUri, state);
return { authUrl, state };
}
async completeOAuth(
platform: SocialPlatform,
code: string,
state: string,
userId: string,
): Promise<SocialConnection> {
const { tenantId, clubId } = this.parseState(state);
// Exchange code for tokens
const tokens = await this.exchangeCodeForTokens(platform, code);
// Get account info
const accountInfo = await this.getAccountInfo(platform, tokens.accessToken);
// Encrypt tokens
const encryptedAccess = this.encryptionService.encrypt(tokens.accessToken);
const encryptedRefresh = tokens.refreshToken
? this.encryptionService.encrypt(tokens.refreshToken)
: null;
return this.prisma.socialConnection.upsert({
where: {
tenantId_platform_accountId: {
tenantId,
platform,
accountId: accountInfo.id,
},
},
update: {
accessToken: encryptedAccess,
refreshToken: encryptedRefresh,
tokenExpiry: tokens.expiresAt,
accountName: accountInfo.name,
permissions: accountInfo.permissions,
isActive: true,
lastError: null,
},
create: {
tenantId,
clubId,
platform,
accountId: accountInfo.id,
accountName: accountInfo.name,
accountType: accountInfo.type,
accessToken: encryptedAccess,
refreshToken: encryptedRefresh,
tokenExpiry: tokens.expiresAt,
permissions: accountInfo.permissions,
connectedBy: userId,
},
});
}
private buildOAuthUrl(
platform: SocialPlatform,
redirectUri: string,
state: string,
): string {
switch (platform) {
case 'FACEBOOK':
return `https://www.facebook.com/v18.0/dialog/oauth?` +
`client_id=${process.env.FACEBOOK_APP_ID}` +
`&redirect_uri=${encodeURIComponent(redirectUri)}` +
`&state=${state}` +
`&scope=pages_manage_posts,pages_read_engagement`;
case 'INSTAGRAM':
return `https://api.instagram.com/oauth/authorize?` +
`client_id=${process.env.INSTAGRAM_APP_ID}` +
`&redirect_uri=${encodeURIComponent(redirectUri)}` +
`&state=${state}` +
`&scope=instagram_basic,instagram_content_publish`;
case 'TWITTER':
return `https://twitter.com/i/oauth2/authorize?` +
`client_id=${process.env.TWITTER_CLIENT_ID}` +
`&redirect_uri=${encodeURIComponent(redirectUri)}` +
`&state=${state}` +
`&scope=tweet.read%20tweet.write%20users.read`;
default:
throw new Error(`Unsupported platform: ${platform}`);
}
}
}
8.2 Social Publisher Service
// libs/crm/social/src/services/social-publisher.service.ts
@Injectable()
export class SocialPublisherService {
constructor(
private readonly prisma: PrismaService,
private readonly connectionService: SocialConnectionService,
@InjectQueue('social-posts') private readonly postQueue: Queue,
) {}
async createPost(dto: CreateSocialPostDto): Promise<SocialPost> {
const post = await this.prisma.socialPost.create({
data: {
tenantId: dto.tenantId,
connectionId: dto.connectionId,
campaignId: dto.campaignId,
content: dto.content,
mediaUrls: dto.mediaUrls ?? [],
linkUrl: dto.linkUrl,
hashtags: dto.hashtags ?? [],
status: 'DRAFT',
createdBy: dto.createdBy,
},
});
return post;
}
async schedulePost(postId: string, scheduledAt: Date): Promise<SocialPost> {
const post = await this.prisma.socialPost.update({
where: { id: postId },
data: { status: 'SCHEDULED', scheduledAt },
});
const delay = scheduledAt.getTime() - Date.now();
await this.postQueue.add(
'publish',
{ postId },
{ delay: Math.max(0, delay), jobId: `post-${postId}` },
);
return post;
}
async publishPost(postId: string): Promise<SocialPost> {
const post = await this.prisma.socialPost.findUnique({
where: { id: postId },
include: { connection: true },
});
if (!post) throw new NotFoundException('Post not found');
await this.prisma.socialPost.update({
where: { id: postId },
data: { status: 'PUBLISHING' },
});
try {
const connection = post.connection;
const accessToken = this.encryptionService.decrypt(connection.accessToken);
const result = await this.publishToPlatform(
connection.platform,
accessToken,
connection.accountId,
{
content: post.content,
mediaUrls: post.mediaUrls,
linkUrl: post.linkUrl,
},
);
return this.prisma.socialPost.update({
where: { id: postId },
data: {
status: 'PUBLISHED',
publishedAt: new Date(),
platformPostId: result.postId,
platformUrl: result.url,
},
});
} catch (error) {
return this.prisma.socialPost.update({
where: { id: postId },
data: {
status: 'FAILED',
error: error.message,
},
});
}
}
private async publishToPlatform(
platform: SocialPlatform,
accessToken: string,
accountId: string,
content: { content: string; mediaUrls: string[]; linkUrl?: string },
): Promise<{ postId: string; url: string }> {
switch (platform) {
case 'FACEBOOK':
return this.publishToFacebook(accessToken, accountId, content);
case 'INSTAGRAM':
return this.publishToInstagram(accessToken, accountId, content);
case 'TWITTER':
return this.publishToTwitter(accessToken, content);
default:
throw new Error(`Unsupported platform: ${platform}`);
}
}
private async publishToFacebook(
accessToken: string,
pageId: string,
content: any,
): Promise<{ postId: string; url: string }> {
const response = await fetch(
`https://graph.facebook.com/v18.0/${pageId}/feed`,
{
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
message: content.content,
link: content.linkUrl,
access_token: accessToken,
}),
},
);
const data = await response.json();
return {
postId: data.id,
url: `https://facebook.com/${data.id}`,
};
}
}
8.3 Event Promotion Service
// libs/crm/social/src/services/event-promotion.service.ts
@Injectable()
export class EventPromotionService {
constructor(
private readonly prisma: PrismaService,
private readonly campaignService: CampaignService,
private readonly socialPublisher: SocialPublisherService,
) {}
async createPromotion(dto: CreateEventPromotionDto): Promise<EventPromotion> {
return this.prisma.eventPromotion.create({
data: {
tenantId: dto.tenantId,
clubId: dto.clubId,
sourceType: dto.sourceType,
sourceId: dto.sourceId,
autoPromote: dto.autoPromote ?? true,
channels: dto.channels,
announceAt: dto.announceAt,
reminderDays: dto.reminderDays ?? [7, 3, 1],
segmentId: dto.segmentId,
emailTemplateId: dto.emailTemplateId,
smsTemplateId: dto.smsTemplateId,
socialContent: dto.socialContent,
createdBy: dto.createdBy,
},
});
}
async handleCompetitionPublished(event: {
tenantId: string;
clubId: string;
competitionId: string;
competitionName: string;
startDate: Date;
}): Promise<void> {
// Check for existing promotion config
let promotion = await this.prisma.eventPromotion.findUnique({
where: {
tenantId_sourceType_sourceId: {
tenantId: event.tenantId,
sourceType: 'COMPETITION',
sourceId: event.competitionId,
},
},
});
// Create default if auto-promote is enabled for club
if (!promotion) {
const clubConfig = await this.getClubAutoPromoteConfig(event.clubId);
if (clubConfig?.autoPromoteCompetitions) {
promotion = await this.createPromotion({
tenantId: event.tenantId,
clubId: event.clubId,
sourceType: 'COMPETITION',
sourceId: event.competitionId,
autoPromote: true,
channels: clubConfig.defaultChannels,
announceAt: new Date(), // Announce immediately
reminderDays: [7, 3, 1],
createdBy: 'system',
});
}
}
if (promotion?.autoPromote) {
await this.executePromotion(promotion, event);
}
}
private async executePromotion(
promotion: EventPromotion,
eventData: any,
): Promise<void> {
// Create campaign
const campaign = await this.campaignService.create({
tenantId: promotion.tenantId,
clubId: promotion.clubId,
name: `${eventData.competitionName} Promotion`,
type: 'ONE_TIME',
segmentId: promotion.segmentId,
channels: promotion.channels.filter(c => c !== 'SOCIAL'),
emailTemplateId: promotion.emailTemplateId,
smsTemplateId: promotion.smsTemplateId,
createdBy: 'system',
});
// Schedule campaign
await this.campaignService.schedule(campaign.id, promotion.announceAt ?? new Date());
// Create social posts if SOCIAL channel enabled
if (promotion.channels.includes('SOCIAL')) {
const connections = await this.prisma.socialConnection.findMany({
where: {
tenantId: promotion.tenantId,
clubId: promotion.clubId,
isActive: true,
},
});
for (const connection of connections) {
await this.socialPublisher.createPost({
tenantId: promotion.tenantId,
connectionId: connection.id,
campaignId: campaign.id,
content: promotion.socialContent || this.generateDefaultContent(eventData),
createdBy: 'system',
});
}
}
// Schedule reminders
for (const days of promotion.reminderDays) {
const reminderDate = new Date(eventData.startDate);
reminderDate.setDate(reminderDate.getDate() - days);
if (reminderDate > new Date()) {
// Schedule reminder campaign
// ... similar to above
}
}
await this.prisma.eventPromotion.update({
where: { id: promotion.id },
data: { lastPromotedAt: new Date() },
});
}
private generateDefaultContent(eventData: any): string {
return `Join us for ${eventData.competitionName}! ` +
`Starting ${eventData.startDate.toLocaleDateString()}. ` +
`Register now!`;
}
}
8.4 Deliverables - Phase 6
- Social connection OAuth flow
- Token encryption and storage
- Social post CRUD and scheduling
- Facebook publishing
- Instagram publishing
- Twitter publishing
- Event promotion auto-creation
- Reminder scheduling
9. Phase 7: Backfill
See Backfill Strategy for detailed migration plan.
Key Steps
- Export MCA v1 ZA data
- Export MCA v1 UK data
- Transform to CRM schema
- Run identity resolution
- Load into CRM database
- Verify and reconcile
10. Phase 8: Intelligence
10.1 Engagement Scoring
// libs/crm/core/src/services/engagement-scoring.service.ts
@Injectable()
export class EngagementScoringService {
private readonly weights = {
recency: 0.30,
frequency: 0.25,
monetary: 0.20,
breadth: 0.15,
depth: 0.10,
};
async calculateScore(profile: CustomerProfile): Promise<number> {
const [recency, frequency, monetary, breadth, depth] = await Promise.all([
this.calculateRecencyScore(profile),
this.calculateFrequencyScore(profile),
this.calculateMonetaryScore(profile),
this.calculateBreadthScore(profile),
this.calculateDepthScore(profile),
]);
return Math.round(
recency * this.weights.recency +
frequency * this.weights.frequency +
monetary * this.weights.monetary +
breadth * this.weights.breadth +
depth * this.weights.depth
);
}
private calculateRecencyScore(profile: CustomerProfile): number {
if (!profile.lastActivityAt) return 0;
const daysSince = this.daysBetween(profile.lastActivityAt, new Date());
if (daysSince <= 7) return 100;
if (daysSince <= 14) return 80;
if (daysSince <= 30) return 60;
if (daysSince <= 60) return 40;
if (daysSince <= 90) return 20;
return 0;
}
private calculateFrequencyScore(profile: CustomerProfile): number {
const count = profile.activityCount30d ?? 0;
if (count >= 20) return 100;
if (count >= 10) return 80;
if (count >= 5) return 60;
if (count >= 2) return 40;
if (count >= 1) return 20;
return 0;
}
private calculateMonetaryScore(profile: CustomerProfile): number {
const ltv = Number(profile.lifetimeValue ?? 0);
if (ltv >= 100000) return 100;
if (ltv >= 50000) return 80;
if (ltv >= 20000) return 60;
if (ltv >= 10000) return 40;
if (ltv >= 5000) return 20;
return 0;
}
private calculateBreadthScore(profile: CustomerProfile): number {
let channels = 0;
if (profile.emailOptIn) channels++;
if (profile.smsOptIn) channels++;
if (profile.pushOptIn) channels++;
if (profile.whatsappOptIn) channels++;
return channels * 25;
}
private async calculateDepthScore(profile: CustomerProfile): Promise<number> {
const interactions = await this.prisma.campaignInteraction.findMany({
where: {
customerId: profile.id,
occurredAt: { gte: this.daysAgo(30) },
},
});
const sent = interactions.filter(i => i.type === 'SENT').length;
const opened = interactions.filter(i => i.type === 'OPENED').length;
const clicked = interactions.filter(i => i.type === 'CLICKED').length;
if (sent === 0) return 50;
const openRate = opened / sent;
const clickRate = clicked / Math.max(opened, 1);
return Math.round((openRate * 50) + (clickRate * 50));
}
}
10.2 Batch Scoring Job
@Injectable()
export class BatchScoringJob {
@Cron('0 2 * * *') // Run at 2 AM daily
async runBatchScoring(): Promise<void> {
let cursor: string | undefined;
while (true) {
const profiles = await this.prisma.customerProfile.findMany({
where: { status: { not: 'MERGED' }, deletedAt: null },
take: 100,
skip: cursor ? 1 : 0,
cursor: cursor ? { id: cursor } : undefined,
orderBy: { id: 'asc' },
});
if (profiles.length === 0) break;
await Promise.all(
profiles.map(async profile => {
const [engagement, churnRisk] = await Promise.all([
this.engagementService.calculateScore(profile),
this.churnService.calculateChurnRisk(profile),
]);
await this.prisma.customerProfile.update({
where: { id: profile.id },
data: {
engagementScore: engagement,
engagementScoreAt: new Date(),
churnRiskScore: churnRisk,
churnRiskScoreAt: new Date(),
},
});
})
);
cursor = profiles[profiles.length - 1].id;
}
}
}
10.3 Deliverables - Phase 8
- Engagement scoring service
- Churn risk scoring service
- Batch scoring cron job
- Analytics API endpoints
11. Testing Strategy
Unit Tests
- Identity resolution logic
- Scoring calculations
- Segment rule parsing
- Journey step execution
Integration Tests
- Event processing end-to-end
- Campaign execution flow
- Journey enrollment and progression
- Social publishing
Load Tests
- Identity resolution at scale
- Batch scoring performance
- Campaign sending throughput
12. Deployment
Kubernetes Resources
apiVersion: apps/v1
kind: Deployment
metadata:
name: crm-backend
namespace: crm
spec:
replicas: 2
template:
spec:
containers:
- name: crm-backend
image: harbor.digiwedge.dev/crm/backend:latest
env:
- name: CRM_DATABASE_URL
valueFrom:
secretKeyRef:
name: crm-secrets
key: DATABASE_URL
- name: REDIS_URL
valueFrom:
secretKeyRef:
name: crm-secrets
key: REDIS_URL
- name: FACEBOOK_APP_ID
valueFrom:
secretKeyRef:
name: crm-social-secrets
key: FACEBOOK_APP_ID
- name: FACEBOOK_APP_SECRET
valueFrom:
secretKeyRef:
name: crm-social-secrets
key: FACEBOOK_APP_SECRET
Infisical Secrets
/crm/backend:
- CRM_DATABASE_URL
- REDIS_URL
- JWT_SECRET
- ENCRYPTION_KEY
/crm/social:
- FACEBOOK_APP_ID
- FACEBOOK_APP_SECRET
- INSTAGRAM_APP_ID
- INSTAGRAM_APP_SECRET
- TWITTER_CLIENT_ID
- TWITTER_CLIENT_SECRET