Skip to main content

CRM & Marketing Platform Implementation Guide

Version: 2.0 Date: December 2024


Table of Contents

  1. Overview
  2. Prerequisites
  3. Phase 1: Foundation
  4. Phase 2: Event Integration
  5. Phase 3: Timeline & Notes
  6. Phase 4: Marketing Core
  7. Phase 5: Marketing Automation
  8. Phase 6: Social Integration
  9. Phase 7: Backfill
  10. Phase 8: Intelligence
  11. Testing Strategy
  12. Deployment

1. Overview

This guide provides step-by-step instructions for implementing the CRM & Marketing Platform. Follow the phases in order, as each builds on the previous.

Phase Summary

PhaseDescriptionKey Deliverables
1FoundationDatabase, Identity Resolution, Basic API
2Event IntegrationBullMQ consumers, SCL/TeeTime/Messaging sync
3Timeline & NotesActivity timeline, Customer notes
4Marketing CoreCampaigns, Segments (CRM-owned), Scheduling
5Marketing AutomationJourney builder, Event triggers, A/B testing
6Social IntegrationOAuth connections, Publisher, Event promotion
7BackfillMCA v1 ZA/UK migration
8IntelligenceEngagement scoring, Churn prediction

2. Prerequisites

Infrastructure

  • PostgreSQL database provisioned (CRM database)
  • Redis available for event transport and caching
  • Kubernetes namespace/deployment slots
  • Infisical secrets configured

Codebase Setup

# Create library structure
mkdir -p libs/prisma/crm-client
mkdir -p libs/crm/core
mkdir -p libs/crm/events
mkdir -p libs/crm/campaigns
mkdir -p libs/crm/automation
mkdir -p libs/crm/social
mkdir -p apps/crm/crm-backend

Dependencies

{
"dependencies": {
"@prisma/client": "^6.x",
"@nestjs/common": "^10.x",
"@nestjs/bullmq": "^10.x",
"@nestjs/schedule": "^4.x",
"bullmq": "^5.x",
"cron-parser": "^4.x"
}
}

3. Phase 1: Foundation

3.1 Database Setup

Create Prisma Client Library

cd libs/prisma/crm-client
pnpm prisma init

Schema Setup

Copy the schema from data-model.md to:

libs/prisma/crm-client/prisma/schema.prisma

Initial Migration

# Set database URL in Infisical: CRM_DATABASE_URL=postgresql://...
cd libs/prisma/crm-client
pnpm prisma migrate dev --name init
pnpm prisma generate

3.2 Core Service Module

Project Structure

libs/crm/core/
├── src/
│ ├── index.ts
│ ├── crm-core.module.ts
│ ├── services/
│ │ ├── customer-profile.service.ts
│ │ ├── identity-resolution.service.ts
│ │ └── profile-merge.service.ts
│ ├── repositories/
│ │ ├── customer-profile.repository.ts
│ │ └── identity-alias.repository.ts
│ └── dto/
│ ├── create-customer.dto.ts
│ ├── update-customer.dto.ts
│ └── resolve-identity.dto.ts
├── project.json
└── tsconfig.lib.json

Identity Resolution Service

// libs/crm/core/src/services/identity-resolution.service.ts

import { Injectable, Logger } from '@nestjs/common';
import { PrismaService } from '@digiwedge/crm-client';

export interface IdentityHints {
tenantId: string;
authUserId?: string;
email?: string;
phoneNumber?: string;
sclMemberId?: number;
teetimePlayerId?: string;
messagingContactId?: string;
externalIds?: Record<string, string>;
}

export interface ResolutionResult {
profile: CustomerProfile;
isNew: boolean;
matchedBy: string[];
confidence: number;
}

@Injectable()
export class IdentityResolutionService {
private readonly logger = new Logger(IdentityResolutionService.name);

constructor(private readonly prisma: PrismaService) {}

async resolve(hints: IdentityHints): Promise<ResolutionResult> {
const { tenantId, authUserId, email, phoneNumber } = hints;

// 1. Try authUserId (100% confidence)
if (authUserId) {
const profile = await this.prisma.customerProfile.findUnique({
where: { authUserId },
});
if (profile) {
return { profile, isNew: false, matchedBy: ['authUserId'], confidence: 100 };
}
}

// 2. Try email (95% confidence)
if (email) {
const normalizedEmail = this.normalizeEmail(email);
const profile = await this.prisma.customerProfile.findUnique({
where: { tenantId_email: { tenantId, email: normalizedEmail } },
});
if (profile) {
// Link authUserId if we now have it
if (authUserId && !profile.authUserId) {
await this.prisma.customerProfile.update({
where: { id: profile.id },
data: { authUserId },
});
}
return { profile, isNew: false, matchedBy: ['email'], confidence: 95 };
}
}

// 3. Try phone (85% confidence)
if (phoneNumber) {
const e164Phone = this.normalizePhone(phoneNumber);
const profile = await this.prisma.customerProfile.findUnique({
where: { tenantId_phoneNumber: { tenantId, phoneNumber: e164Phone } },
});
if (profile) {
return { profile, isNew: false, matchedBy: ['phone'], confidence: 85 };
}
}

// 4. Try cross-service links
if (hints.sclMemberId) {
const profile = await this.prisma.customerProfile.findUnique({
where: { tenantId_sclMemberId: { tenantId, sclMemberId: hints.sclMemberId } },
});
if (profile) {
return { profile, isNew: false, matchedBy: ['sclMemberId'], confidence: 100 };
}
}

// 5. No match - create new profile
const newProfile = await this.prisma.customerProfile.create({
data: {
tenantId,
authUserId,
email: email ? this.normalizeEmail(email) : undefined,
phoneNumber: phoneNumber ? this.normalizePhone(phoneNumber) : undefined,
sclMemberId: hints.sclMemberId,
teetimePlayerId: hints.teetimePlayerId,
messagingContactId: hints.messagingContactId,
externalIds: hints.externalIds || {},
status: 'ACTIVE',
lifecycleStage: 'SUBSCRIBER',
},
});

return { profile: newProfile, isNew: true, matchedBy: [], confidence: 100 };
}

private normalizeEmail(email: string): string {
return email.toLowerCase().trim();
}

private normalizePhone(phone: string): string {
const digits = phone.replace(/\D/g, '');
if (digits.startsWith('27') && digits.length === 11) {
return `+${digits}`;
}
if (digits.startsWith('0') && digits.length === 10) {
return `+27${digits.slice(1)}`;
}
return phone;
}
}

3.3 REST API

// apps/crm/crm-backend/src/app/customer/customer.controller.ts

@ApiTags('customers')
@Controller('api/v1/customers')
export class CustomerController {
constructor(
private readonly customerService: CustomerProfileService,
private readonly identityService: IdentityResolutionService,
) {}

@Get()
@ApiOperation({ summary: 'List customers with filters' })
async list(@Query() query: ListCustomersDto) {
return this.customerService.list(query);
}

@Get('resolve')
@ApiOperation({ summary: 'Resolve customer by identity hints' })
async resolve(@Query() hints: ResolveIdentityDto) {
return this.identityService.resolve(hints);
}

@Get(':id')
@ApiOperation({ summary: 'Get customer by ID' })
async get(@Param('id') id: string) {
return this.customerService.getById(id);
}

@Patch(':id')
@ApiOperation({ summary: 'Update customer' })
async update(@Param('id') id: string, @Body() dto: UpdateCustomerDto) {
return this.customerService.update(id, dto);
}

@Post(':id/merge')
@ApiOperation({ summary: 'Merge two customer profiles' })
async merge(@Param('id') winnerId: string, @Body() dto: MergeCustomerDto) {
return this.customerService.merge(winnerId, dto.loserId);
}
}

3.4 Deliverables - Phase 1

  • Prisma schema created and migrated
  • CustomerProfile CRUD operations
  • IdentityResolution service
  • Basic REST API for customers
  • Unit tests for identity resolution

4. Phase 2: Event Integration

4.1 Event Consumer Setup

// libs/crm/events/src/processors/crm-events.processor.ts

import { Processor, WorkerHost } from '@nestjs/bullmq';
import { Job } from 'bullmq';

interface CrmEvent {
id: string;
type: string;
tenantId: string;
timestamp: string;
identity: {
authUserId?: string;
email?: string;
phoneNumber?: string;
sclMemberId?: number;
teetimePlayerId?: string;
messagingContactId?: string;
};
payload: Record<string, unknown>;
}

@Processor('crm-events')
export class CrmEventsProcessor extends WorkerHost {
constructor(
private readonly identityService: IdentityResolutionService,
private readonly activityService: ActivityService,
private readonly syncLogService: SyncLogService,
) {
super();
}

async process(job: Job<CrmEvent>): Promise<void> {
const event = job.data;

// Log sync attempt
const syncLog = await this.syncLogService.create({
tenantId: event.tenantId,
sourceService: this.extractService(event.type),
eventType: event.type,
eventId: event.id,
eventPayload: event.payload,
});

try {
// 1. Resolve identity
const { profile, isNew } = await this.identityService.resolve({
tenantId: event.tenantId,
...event.identity,
});

// 2. Update cross-service links
await this.updateServiceLinks(profile.id, event);

// 3. Create activity record
await this.createActivity(profile.id, event);

// 4. Handle type-specific logic
await this.handleEventType(profile.id, event);

// 5. Update sync log
await this.syncLogService.markSuccess(syncLog.id, profile.id);
} catch (error) {
await this.syncLogService.markFailed(syncLog.id, error.message);
throw error;
}
}

private async handleEventType(profileId: string, event: CrmEvent): Promise<void> {
switch (event.type) {
case 'scl.member.updated':
await this.syncMemberData(profileId, event.payload);
break;
case 'teetime.handicap.updated':
await this.syncHandicapData(profileId, event.payload);
break;
case 'contactPreference.updated':
await this.syncConsentData(profileId, event.payload);
break;
}
}

private async syncConsentData(profileId: string, payload: Record<string, unknown>): Promise<void> {
// Consent is owned by Messaging - we only sync the flags
const updates: Record<string, boolean | Date> = {};
for (const [field, value] of Object.entries(payload)) {
if (!field.endsWith('OptIn') || typeof value !== 'boolean') continue;
updates[field] = value;
if (value) {
updates[`${field}At`] = new Date();
}
}

await this.prisma.customerProfile.update({
where: { id: profileId },
data: updates,
});
}
}

4.2 Deliverables - Phase 2

  • BullMQ queue configuration
  • Event processor with identity resolution
  • SCL → CRM event publishing
  • TeeTime → CRM event publishing
  • Messaging → CRM event publishing (consent sync)
  • SyncLog recording for audit

5. Phase 3: Timeline & Notes

5.1 Activity Service

// libs/crm/core/src/services/activity.service.ts

@Injectable()
export class ActivityService {
constructor(private readonly prisma: PrismaService) {}

async create(dto: CreateActivityDto): Promise<Activity> {
// Check for duplicate (idempotency)
if (dto.sourceEventId) {
const existing = await this.prisma.activity.findUnique({
where: {
tenantId_sourceService_sourceEventId: {
tenantId: dto.tenantId,
sourceService: dto.sourceService,
sourceEventId: dto.sourceEventId,
},
},
});
if (existing) return existing;
}

const activity = await this.prisma.activity.create({ data: dto });

// Update profile activity metrics
await this.prisma.customerProfile.update({
where: { id: dto.customerId },
data: {
lastActivityAt: dto.occurredAt,
lastActivityType: dto.type,
activityCount30d: { increment: 1 },
},
});

return activity;
}

async getTimeline(
customerId: string,
filters: TimelineFilters = {},
pagination: { skip?: number; take?: number } = {},
): Promise<{ activities: Activity[]; total: number }> {
const where: Prisma.ActivityWhereInput = { customerId };

if (filters.categories?.length) {
where.category = { in: filters.categories };
}
if (filters.startDate || filters.endDate) {
where.occurredAt = {};
if (filters.startDate) where.occurredAt.gte = filters.startDate;
if (filters.endDate) where.occurredAt.lte = filters.endDate;
}

const [activities, total] = await Promise.all([
this.prisma.activity.findMany({
where,
orderBy: { occurredAt: 'desc' },
skip: pagination.skip ?? 0,
take: pagination.take ?? 50,
}),
this.prisma.activity.count({ where }),
]);

return { activities, total };
}
}

5.2 Notes Service

// libs/crm/core/src/services/customer-notes.service.ts

@Injectable()
export class CustomerNotesService {
constructor(
private readonly prisma: PrismaService,
private readonly activityService: ActivityService,
) {}

async create(dto: CreateNoteDto, userId: string): Promise<CustomerNote> {
const note = await this.prisma.customerNote.create({
data: {
tenantId: dto.tenantId,
customerId: dto.customerId,
content: dto.content,
type: dto.type ?? 'GENERAL',
isPinned: dto.isPinned ?? false,
isPrivate: dto.isPrivate ?? false,
createdBy: userId,
},
});

// Create activity
await this.activityService.create({
tenantId: dto.tenantId,
customerId: dto.customerId,
type: 'NOTE_ADDED',
category: 'SUPPORT',
title: `Note added: ${dto.type || 'General'}`,
sourceService: 'crm',
sourceId: note.id,
occurredAt: new Date(),
});

return note;
}

async list(customerId: string, includePrivate: boolean, userId?: string) {
const where: Prisma.CustomerNoteWhereInput = { customerId };

if (!includePrivate) {
where.OR = [
{ isPrivate: false },
{ isPrivate: true, createdBy: userId },
];
}

return this.prisma.customerNote.findMany({
where,
orderBy: [{ isPinned: 'desc' }, { createdAt: 'desc' }],
});
}
}

5.3 Deliverables - Phase 3

  • Activity service with CRUD
  • Timeline API with filters
  • Customer notes CRUD
  • Activity metrics denormalization

6. Phase 4: Marketing Core

6.1 Segmentation Service (CRM Owns Segments)

// libs/crm/campaigns/src/services/segmentation.service.ts

@Injectable()
export class SegmentationService {
constructor(
private readonly prisma: PrismaService,
private readonly messagingClient: MessagingServiceClient,
) {}

async createSegment(dto: CreateSegmentDto): Promise<CustomerSegment> {
const segment = await this.prisma.customerSegment.create({
data: {
tenantId: dto.tenantId,
clubId: dto.clubId,
name: dto.name,
description: dto.description,
type: dto.type,
criteria: dto.criteria,
combinator: dto.combinator ?? 'AND',
refreshFrequency: dto.refreshFrequency ?? 'DAILY',
createdBy: dto.createdBy,
},
});

// Initial refresh for dynamic segments
if (dto.type === 'DYNAMIC') {
await this.refreshSegment(segment.id);
}

return segment;
}

async refreshSegment(segmentId: string): Promise<void> {
const segment = await this.prisma.customerSegment.findUnique({
where: { id: segmentId },
});

if (!segment || segment.type === 'STATIC') return;

// Build query from criteria
const where = this.buildWhereFromCriteria(
segment.tenantId,
segment.criteria as SegmentCriteria,
);

// Find matching profiles
const matchingProfiles = await this.prisma.customerProfile.findMany({
where,
select: { id: true, email: true, phoneNumber: true },
});

// Update membership
await this.prisma.$transaction([
this.prisma.customerSegmentMembership.deleteMany({
where: { segmentId },
}),
this.prisma.customerSegmentMembership.createMany({
data: matchingProfiles.map(p => ({
tenantId: segment.tenantId,
segmentId,
customerId: p.id,
matchedAt: new Date(),
})),
}),
this.prisma.customerSegment.update({
where: { id: segmentId },
data: {
memberCount: matchingProfiles.length,
lastRefreshedAt: new Date(),
},
}),
]);
}

async syncToMessaging(segmentId: string): Promise<void> {
const segment = await this.prisma.customerSegment.findUnique({
where: { id: segmentId },
include: {
memberships: {
include: { customer: true },
},
},
});

if (!segment) throw new NotFoundException('Segment not found');

// Create or update segment in Messaging service
const messagingSegment = await this.messagingClient.upsertSegment({
externalId: segment.id,
tenantId: segment.tenantId,
name: segment.name,
contacts: segment.memberships.map(m => ({
email: m.customer.email,
phoneNumber: m.customer.phoneNumber,
firstName: m.customer.firstName,
lastName: m.customer.lastName,
})),
});

// Store messaging segment ID for reference
await this.prisma.customerSegment.update({
where: { id: segmentId },
data: {
messagingSegmentId: messagingSegment.id,
lastSyncedAt: new Date(),
},
});
}

private buildWhereFromCriteria(
tenantId: string,
criteria: SegmentCriteria,
): Prisma.CustomerProfileWhereInput {
const conditions = criteria.rules.map(rule => this.ruleToCondition(rule));

return {
tenantId,
status: { not: 'MERGED' },
deletedAt: null,
...(criteria.combinator === 'OR' ? { OR: conditions } : { AND: conditions }),
};
}

private ruleToCondition(rule: SegmentRule): Prisma.CustomerProfileWhereInput {
const { field, operator, value } = rule;

const operators: Record<string, any> = {
eq: value,
neq: { not: value },
gt: { gt: value },
gte: { gte: value },
lt: { lt: value },
lte: { lte: value },
contains: { contains: value, mode: 'insensitive' },
in: { in: value },
isNull: null,
isNotNull: { not: null },
};

return { [field]: operators[operator] ?? value };
}
}

6.2 Campaign Service

// libs/crm/campaigns/src/services/campaign.service.ts

@Injectable()
export class CampaignService {
constructor(
private readonly prisma: PrismaService,
private readonly segmentService: SegmentationService,
private readonly schedulerService: CampaignSchedulerService,
) {}

async create(dto: CreateCampaignDto): Promise<Campaign> {
const campaign = await this.prisma.campaign.create({
data: {
tenantId: dto.tenantId,
clubId: dto.clubId,
name: dto.name,
description: dto.description,
type: dto.type,
status: 'DRAFT',
segmentId: dto.segmentId,
channels: dto.channels,
emailTemplateId: dto.emailTemplateId,
smsTemplateId: dto.smsTemplateId,
pushTemplateId: dto.pushTemplateId,
timezone: dto.timezone ?? 'Africa/Johannesburg',
createdBy: dto.createdBy,
},
});

return campaign;
}

async schedule(campaignId: string, scheduledAt: Date): Promise<Campaign> {
const campaign = await this.prisma.campaign.findUnique({
where: { id: campaignId },
include: { segment: true },
});

if (!campaign) throw new NotFoundException('Campaign not found');
if (campaign.status !== 'DRAFT') {
throw new BadRequestException('Can only schedule draft campaigns');
}

// Refresh segment and get target count
if (campaign.segmentId) {
await this.segmentService.refreshSegment(campaign.segmentId);
await this.segmentService.syncToMessaging(campaign.segmentId);
}

const targetCount = campaign.segment?.memberCount ?? 0;

// Update campaign status
const updated = await this.prisma.campaign.update({
where: { id: campaignId },
data: {
status: 'SCHEDULED',
scheduledAt,
targetCount,
},
});

// Schedule the job
await this.schedulerService.scheduleCampaign(updated);

return updated;
}

async execute(campaignId: string): Promise<void> {
const campaign = await this.prisma.campaign.findUnique({
where: { id: campaignId },
include: {
segment: {
include: {
memberships: {
include: { customer: true },
},
},
},
},
});

if (!campaign) throw new NotFoundException('Campaign not found');

await this.prisma.campaign.update({
where: { id: campaignId },
data: { status: 'ACTIVE', startedAt: new Date() },
});

try {
// Get recipients
const recipients = campaign.segment?.memberships.map(m => m.customer) ?? [];

// Send via each channel
for (const channel of campaign.channels) {
await this.sendViaChannel(campaign, recipients, channel);
}

await this.prisma.campaign.update({
where: { id: campaignId },
data: { status: 'COMPLETED', completedAt: new Date() },
});
} catch (error) {
await this.prisma.campaign.update({
where: { id: campaignId },
data: { status: 'PAUSED' },
});
throw error;
}
}

private async sendViaChannel(
campaign: Campaign,
recipients: CustomerProfile[],
channel: Channel,
): Promise<void> {
switch (channel) {
case 'EMAIL':
await this.messagingClient.sendBulkEmail({
templateId: campaign.emailTemplateId,
recipients: recipients
.filter(r => r.emailOptIn && r.email)
.map(r => ({
email: r.email,
data: { firstName: r.firstName, lastName: r.lastName },
})),
campaignId: campaign.id,
});
break;

case 'SMS':
await this.messagingClient.sendBulkSms({
templateId: campaign.smsTemplateId,
recipients: recipients
.filter(r => r.smsOptIn && r.phoneNumber)
.map(r => ({
phone: r.phoneNumber,
data: { firstName: r.firstName },
})),
campaignId: campaign.id,
});
break;

case 'SOCIAL':
// Handled separately via social posts
break;
}
}
}

6.3 Campaign Scheduler

// libs/crm/campaigns/src/services/campaign-scheduler.service.ts

@Injectable()
export class CampaignSchedulerService {
constructor(
@InjectQueue('campaign-execution') private readonly queue: Queue,
private readonly prisma: PrismaService,
) {}

async scheduleCampaign(campaign: Campaign): Promise<void> {
const delay = campaign.scheduledAt.getTime() - Date.now();

if (delay <= 0) {
// Execute immediately
await this.queue.add('execute', { campaignId: campaign.id });
} else {
// Schedule for later
await this.queue.add(
'execute',
{ campaignId: campaign.id },
{ delay, jobId: `campaign-${campaign.id}` },
);
}
}

async cancelScheduledCampaign(campaignId: string): Promise<void> {
const job = await this.queue.getJob(`campaign-${campaignId}`);
if (job) {
await job.remove();
}
}
}

@Processor('campaign-execution')
export class CampaignExecutionProcessor extends WorkerHost {
constructor(private readonly campaignService: CampaignService) {
super();
}

async process(job: Job<{ campaignId: string }>): Promise<void> {
await this.campaignService.execute(job.data.campaignId);
}
}

6.4 Deliverables - Phase 4

  • Segmentation service with dynamic rules
  • Segment sync to Messaging service
  • Campaign CRUD
  • Campaign scheduling with BullMQ
  • Campaign execution (Email, SMS)
  • Campaign stats tracking

7. Phase 5: Marketing Automation

7.1 Journey Service

// libs/crm/automation/src/services/journey.service.ts

@Injectable()
export class JourneyService {
constructor(
private readonly prisma: PrismaService,
private readonly journeyEngine: JourneyEngine,
) {}

async create(dto: CreateJourneyDto): Promise<Journey> {
return this.prisma.journey.create({
data: {
tenantId: dto.tenantId,
clubId: dto.clubId,
name: dto.name,
description: dto.description,
triggerType: dto.triggerType,
triggerConfig: dto.triggerConfig,
allowReentry: dto.allowReentry ?? false,
timezone: dto.timezone ?? 'Africa/Johannesburg',
createdBy: dto.createdBy,
},
});
}

async addStep(journeyId: string, dto: CreateJourneyStepDto): Promise<JourneyStep> {
const journey = await this.prisma.journey.findUnique({
where: { id: journeyId },
include: { steps: true },
});

if (!journey) throw new NotFoundException('Journey not found');

const stepNumber = journey.steps.length + 1;

return this.prisma.journeyStep.create({
data: {
journeyId,
stepNumber,
type: dto.type,
name: dto.name,
config: dto.config,
nextStepId: dto.nextStepId,
nextStepYesId: dto.nextStepYesId,
nextStepNoId: dto.nextStepNoId,
},
});
}

async activate(journeyId: string): Promise<Journey> {
const journey = await this.prisma.journey.findUnique({
where: { id: journeyId },
include: { steps: true },
});

if (!journey) throw new NotFoundException('Journey not found');
if (journey.steps.length === 0) {
throw new BadRequestException('Journey must have at least one step');
}

const updated = await this.prisma.journey.update({
where: { id: journeyId },
data: { status: 'ACTIVE', activatedAt: new Date() },
});

// Register trigger listener
await this.journeyEngine.registerTrigger(updated);

return updated;
}
}

7.2 Journey Engine

// libs/crm/automation/src/services/journey-engine.service.ts

@Injectable()
export class JourneyEngine {
constructor(
private readonly prisma: PrismaService,
@InjectQueue('journey-steps') private readonly stepQueue: Queue,
) {}

async enrollCustomer(
journeyId: string,
customerId: string,
triggerData?: any,
): Promise<JourneyEnrollment> {
const journey = await this.prisma.journey.findUnique({
where: { id: journeyId },
include: { steps: { orderBy: { stepNumber: 'asc' } } },
});

if (!journey || journey.status !== 'ACTIVE') {
throw new BadRequestException('Journey not active');
}

// Check for existing enrollment
if (!journey.allowReentry) {
const existing = await this.prisma.journeyEnrollment.findUnique({
where: { journeyId_customerId: { journeyId, customerId } },
});
if (existing) return existing;
}

const firstStep = journey.steps[0];

const enrollment = await this.prisma.journeyEnrollment.create({
data: {
tenantId: journey.tenantId,
journeyId,
customerId,
status: 'ACTIVE',
currentStepId: firstStep?.id,
currentStepNumber: 1,
triggerData,
},
});

// Update journey stats
await this.prisma.journey.update({
where: { id: journeyId },
data: { totalEnrolled: { increment: 1 } },
});

// Queue first step
if (firstStep) {
await this.queueStep(enrollment.id, firstStep.id);
}

return enrollment;
}

async processStep(enrollmentId: string, stepId: string): Promise<void> {
const enrollment = await this.prisma.journeyEnrollment.findUnique({
where: { id: enrollmentId },
include: { journey: true, customer: true },
});

const step = await this.prisma.journeyStep.findUnique({
where: { id: stepId },
});

if (!enrollment || !step || enrollment.status !== 'ACTIVE') return;

// Record step entry
await this.prisma.journeyEnrollmentStep.create({
data: {
enrollmentId,
stepId,
status: 'IN_PROGRESS',
},
});

try {
const result = await this.executeStep(enrollment, step);

// Record completion
await this.prisma.journeyEnrollmentStep.update({
where: { enrollmentId_stepId: { enrollmentId, stepId } },
data: { status: 'COMPLETED', completedAt: new Date(), result },
});

// Determine next step
const nextStepId = this.determineNextStep(step, result);

if (nextStepId) {
await this.queueStep(enrollmentId, nextStepId);
} else {
// Journey complete
await this.completeEnrollment(enrollmentId);
}
} catch (error) {
await this.prisma.journeyEnrollmentStep.update({
where: { enrollmentId_stepId: { enrollmentId, stepId } },
data: { status: 'FAILED', result: { error: error.message } },
});
}
}

private async executeStep(
enrollment: JourneyEnrollment & { customer: CustomerProfile },
step: JourneyStep,
): Promise<any> {
const config = step.config as any;

switch (step.type) {
case 'SEND':
return this.executeSendStep(enrollment, config);

case 'WAIT':
return this.executeWaitStep(enrollment, config);

case 'CONDITION':
return this.executeConditionStep(enrollment, config);

case 'UPDATE':
return this.executeUpdateStep(enrollment, config);

case 'END':
return { completed: true };

default:
throw new Error(`Unknown step type: ${step.type}`);
}
}

private async executeSendStep(
enrollment: JourneyEnrollment & { customer: CustomerProfile },
config: { channel: string; templateId: string },
): Promise<any> {
// Send via Messaging service
await this.messagingClient.send({
channel: config.channel,
templateId: config.templateId,
recipient: {
email: enrollment.customer.email,
phone: enrollment.customer.phoneNumber,
},
data: {
firstName: enrollment.customer.firstName,
lastName: enrollment.customer.lastName,
},
});

return { sent: true, channel: config.channel };
}

private async executeWaitStep(
enrollment: JourneyEnrollment,
config: { duration: string },
): Promise<any> {
const waitUntil = this.calculateWaitUntil(config.duration);

await this.prisma.journeyEnrollment.update({
where: { id: enrollment.id },
data: { waitingUntil: waitUntil },
});

return { waitingUntil };
}

private async executeConditionStep(
enrollment: JourneyEnrollment & { customer: CustomerProfile },
config: { rules: any[]; combinator: string },
): Promise<{ result: boolean }> {
// Evaluate condition against customer profile
const result = this.evaluateCondition(enrollment.customer, config);
return { result };
}

private determineNextStep(step: JourneyStep, result: any): string | null {
if (step.type === 'CONDITION') {
return result.result ? step.nextStepYesId : step.nextStepNoId;
}
return step.nextStepId;
}

private async queueStep(enrollmentId: string, stepId: string): Promise<void> {
await this.stepQueue.add('process-step', { enrollmentId, stepId });
}

private async completeEnrollment(enrollmentId: string): Promise<void> {
await this.prisma.journeyEnrollment.update({
where: { id: enrollmentId },
data: { status: 'COMPLETED', completedAt: new Date() },
});

const enrollment = await this.prisma.journeyEnrollment.findUnique({
where: { id: enrollmentId },
});

if (enrollment) {
await this.prisma.journey.update({
where: { id: enrollment.journeyId },
data: { totalCompleted: { increment: 1 } },
});
}
}
}

7.3 Event Trigger Handler

// libs/crm/automation/src/services/event-trigger.service.ts

@Injectable()
export class EventTriggerService {
private triggerMap: Map<string, Journey[]> = new Map();

constructor(
private readonly prisma: PrismaService,
private readonly journeyEngine: JourneyEngine,
) {}

async loadTriggers(): Promise<void> {
const journeys = await this.prisma.journey.findMany({
where: { status: 'ACTIVE' },
});

this.triggerMap.clear();

for (const journey of journeys) {
const triggers = this.triggerMap.get(journey.triggerType) || [];
triggers.push(journey);
this.triggerMap.set(journey.triggerType, triggers);
}
}

async handleEvent(event: CrmEvent): Promise<void> {
const journeys = this.triggerMap.get(event.type) || [];

for (const journey of journeys) {
// Check if event matches trigger conditions
if (this.matchesTriggerConditions(event, journey.triggerConfig)) {
// Resolve customer and enroll
const { profile } = await this.identityService.resolve({
tenantId: event.tenantId,
...event.identity,
});

await this.journeyEngine.enrollCustomer(
journey.id,
profile.id,
event.payload,
);
}
}
}
}

7.4 Wait Step Processor (Cron)

// libs/crm/automation/src/jobs/wait-step.job.ts

@Injectable()
export class WaitStepJob {
constructor(
private readonly prisma: PrismaService,
private readonly journeyEngine: JourneyEngine,
) {}

@Cron('* * * * *') // Every minute
async processWaitingEnrollments(): Promise<void> {
const waitingEnrollments = await this.prisma.journeyEnrollment.findMany({
where: {
status: 'ACTIVE',
waitingUntil: { lte: new Date() },
},
include: {
journey: { include: { steps: true } },
},
});

for (const enrollment of waitingEnrollments) {
// Clear wait and move to next step
const currentStep = enrollment.journey.steps.find(
s => s.id === enrollment.currentStepId,
);

if (currentStep?.nextStepId) {
await this.prisma.journeyEnrollment.update({
where: { id: enrollment.id },
data: { waitingUntil: null },
});

await this.journeyEngine.queueStep(enrollment.id, currentStep.nextStepId);
}
}
}
}

7.5 Deliverables - Phase 5

  • Journey CRUD
  • Journey step management
  • Journey engine (enrollment, step execution)
  • Event triggers for journey enrollment
  • Wait step processing (cron)
  • Condition evaluation
  • Journey analytics

8. Phase 6: Social Integration

8.1 Social Connection Service

// libs/crm/social/src/services/social-connection.service.ts

@Injectable()
export class SocialConnectionService {
constructor(
private readonly prisma: PrismaService,
private readonly encryptionService: EncryptionService,
) {}

async initiateOAuth(
tenantId: string,
clubId: string,
platform: SocialPlatform,
redirectUri: string,
): Promise<{ authUrl: string; state: string }> {
const state = this.generateState(tenantId, clubId);

const authUrl = this.buildOAuthUrl(platform, redirectUri, state);

return { authUrl, state };
}

async completeOAuth(
platform: SocialPlatform,
code: string,
state: string,
userId: string,
): Promise<SocialConnection> {
const { tenantId, clubId } = this.parseState(state);

// Exchange code for tokens
const tokens = await this.exchangeCodeForTokens(platform, code);

// Get account info
const accountInfo = await this.getAccountInfo(platform, tokens.accessToken);

// Encrypt tokens
const encryptedAccess = this.encryptionService.encrypt(tokens.accessToken);
const encryptedRefresh = tokens.refreshToken
? this.encryptionService.encrypt(tokens.refreshToken)
: null;

return this.prisma.socialConnection.upsert({
where: {
tenantId_platform_accountId: {
tenantId,
platform,
accountId: accountInfo.id,
},
},
update: {
accessToken: encryptedAccess,
refreshToken: encryptedRefresh,
tokenExpiry: tokens.expiresAt,
accountName: accountInfo.name,
permissions: accountInfo.permissions,
isActive: true,
lastError: null,
},
create: {
tenantId,
clubId,
platform,
accountId: accountInfo.id,
accountName: accountInfo.name,
accountType: accountInfo.type,
accessToken: encryptedAccess,
refreshToken: encryptedRefresh,
tokenExpiry: tokens.expiresAt,
permissions: accountInfo.permissions,
connectedBy: userId,
},
});
}

private buildOAuthUrl(
platform: SocialPlatform,
redirectUri: string,
state: string,
): string {
switch (platform) {
case 'FACEBOOK':
return `https://www.facebook.com/v18.0/dialog/oauth?` +
`client_id=${process.env.FACEBOOK_APP_ID}` +
`&redirect_uri=${encodeURIComponent(redirectUri)}` +
`&state=${state}` +
`&scope=pages_manage_posts,pages_read_engagement`;

case 'INSTAGRAM':
return `https://api.instagram.com/oauth/authorize?` +
`client_id=${process.env.INSTAGRAM_APP_ID}` +
`&redirect_uri=${encodeURIComponent(redirectUri)}` +
`&state=${state}` +
`&scope=instagram_basic,instagram_content_publish`;

case 'TWITTER':
return `https://twitter.com/i/oauth2/authorize?` +
`client_id=${process.env.TWITTER_CLIENT_ID}` +
`&redirect_uri=${encodeURIComponent(redirectUri)}` +
`&state=${state}` +
`&scope=tweet.read%20tweet.write%20users.read`;

default:
throw new Error(`Unsupported platform: ${platform}`);
}
}
}

8.2 Social Publisher Service

// libs/crm/social/src/services/social-publisher.service.ts

@Injectable()
export class SocialPublisherService {
constructor(
private readonly prisma: PrismaService,
private readonly connectionService: SocialConnectionService,
@InjectQueue('social-posts') private readonly postQueue: Queue,
) {}

async createPost(dto: CreateSocialPostDto): Promise<SocialPost> {
const post = await this.prisma.socialPost.create({
data: {
tenantId: dto.tenantId,
connectionId: dto.connectionId,
campaignId: dto.campaignId,
content: dto.content,
mediaUrls: dto.mediaUrls ?? [],
linkUrl: dto.linkUrl,
hashtags: dto.hashtags ?? [],
status: 'DRAFT',
createdBy: dto.createdBy,
},
});

return post;
}

async schedulePost(postId: string, scheduledAt: Date): Promise<SocialPost> {
const post = await this.prisma.socialPost.update({
where: { id: postId },
data: { status: 'SCHEDULED', scheduledAt },
});

const delay = scheduledAt.getTime() - Date.now();
await this.postQueue.add(
'publish',
{ postId },
{ delay: Math.max(0, delay), jobId: `post-${postId}` },
);

return post;
}

async publishPost(postId: string): Promise<SocialPost> {
const post = await this.prisma.socialPost.findUnique({
where: { id: postId },
include: { connection: true },
});

if (!post) throw new NotFoundException('Post not found');

await this.prisma.socialPost.update({
where: { id: postId },
data: { status: 'PUBLISHING' },
});

try {
const connection = post.connection;
const accessToken = this.encryptionService.decrypt(connection.accessToken);

const result = await this.publishToPlatform(
connection.platform,
accessToken,
connection.accountId,
{
content: post.content,
mediaUrls: post.mediaUrls,
linkUrl: post.linkUrl,
},
);

return this.prisma.socialPost.update({
where: { id: postId },
data: {
status: 'PUBLISHED',
publishedAt: new Date(),
platformPostId: result.postId,
platformUrl: result.url,
},
});
} catch (error) {
return this.prisma.socialPost.update({
where: { id: postId },
data: {
status: 'FAILED',
error: error.message,
},
});
}
}

private async publishToPlatform(
platform: SocialPlatform,
accessToken: string,
accountId: string,
content: { content: string; mediaUrls: string[]; linkUrl?: string },
): Promise<{ postId: string; url: string }> {
switch (platform) {
case 'FACEBOOK':
return this.publishToFacebook(accessToken, accountId, content);
case 'INSTAGRAM':
return this.publishToInstagram(accessToken, accountId, content);
case 'TWITTER':
return this.publishToTwitter(accessToken, content);
default:
throw new Error(`Unsupported platform: ${platform}`);
}
}

private async publishToFacebook(
accessToken: string,
pageId: string,
content: any,
): Promise<{ postId: string; url: string }> {
const response = await fetch(
`https://graph.facebook.com/v18.0/${pageId}/feed`,
{
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
message: content.content,
link: content.linkUrl,
access_token: accessToken,
}),
},
);

const data = await response.json();
return {
postId: data.id,
url: `https://facebook.com/${data.id}`,
};
}
}

8.3 Event Promotion Service

// libs/crm/social/src/services/event-promotion.service.ts

@Injectable()
export class EventPromotionService {
constructor(
private readonly prisma: PrismaService,
private readonly campaignService: CampaignService,
private readonly socialPublisher: SocialPublisherService,
) {}

async createPromotion(dto: CreateEventPromotionDto): Promise<EventPromotion> {
return this.prisma.eventPromotion.create({
data: {
tenantId: dto.tenantId,
clubId: dto.clubId,
sourceType: dto.sourceType,
sourceId: dto.sourceId,
autoPromote: dto.autoPromote ?? true,
channels: dto.channels,
announceAt: dto.announceAt,
reminderDays: dto.reminderDays ?? [7, 3, 1],
segmentId: dto.segmentId,
emailTemplateId: dto.emailTemplateId,
smsTemplateId: dto.smsTemplateId,
socialContent: dto.socialContent,
createdBy: dto.createdBy,
},
});
}

async handleCompetitionPublished(event: {
tenantId: string;
clubId: string;
competitionId: string;
competitionName: string;
startDate: Date;
}): Promise<void> {
// Check for existing promotion config
let promotion = await this.prisma.eventPromotion.findUnique({
where: {
tenantId_sourceType_sourceId: {
tenantId: event.tenantId,
sourceType: 'COMPETITION',
sourceId: event.competitionId,
},
},
});

// Create default if auto-promote is enabled for club
if (!promotion) {
const clubConfig = await this.getClubAutoPromoteConfig(event.clubId);
if (clubConfig?.autoPromoteCompetitions) {
promotion = await this.createPromotion({
tenantId: event.tenantId,
clubId: event.clubId,
sourceType: 'COMPETITION',
sourceId: event.competitionId,
autoPromote: true,
channels: clubConfig.defaultChannels,
announceAt: new Date(), // Announce immediately
reminderDays: [7, 3, 1],
createdBy: 'system',
});
}
}

if (promotion?.autoPromote) {
await this.executePromotion(promotion, event);
}
}

private async executePromotion(
promotion: EventPromotion,
eventData: any,
): Promise<void> {
// Create campaign
const campaign = await this.campaignService.create({
tenantId: promotion.tenantId,
clubId: promotion.clubId,
name: `${eventData.competitionName} Promotion`,
type: 'ONE_TIME',
segmentId: promotion.segmentId,
channels: promotion.channels.filter(c => c !== 'SOCIAL'),
emailTemplateId: promotion.emailTemplateId,
smsTemplateId: promotion.smsTemplateId,
createdBy: 'system',
});

// Schedule campaign
await this.campaignService.schedule(campaign.id, promotion.announceAt ?? new Date());

// Create social posts if SOCIAL channel enabled
if (promotion.channels.includes('SOCIAL')) {
const connections = await this.prisma.socialConnection.findMany({
where: {
tenantId: promotion.tenantId,
clubId: promotion.clubId,
isActive: true,
},
});

for (const connection of connections) {
await this.socialPublisher.createPost({
tenantId: promotion.tenantId,
connectionId: connection.id,
campaignId: campaign.id,
content: promotion.socialContent || this.generateDefaultContent(eventData),
createdBy: 'system',
});
}
}

// Schedule reminders
for (const days of promotion.reminderDays) {
const reminderDate = new Date(eventData.startDate);
reminderDate.setDate(reminderDate.getDate() - days);

if (reminderDate > new Date()) {
// Schedule reminder campaign
// ... similar to above
}
}

await this.prisma.eventPromotion.update({
where: { id: promotion.id },
data: { lastPromotedAt: new Date() },
});
}

private generateDefaultContent(eventData: any): string {
return `Join us for ${eventData.competitionName}! ` +
`Starting ${eventData.startDate.toLocaleDateString()}. ` +
`Register now!`;
}
}

8.4 Deliverables - Phase 6

  • Social connection OAuth flow
  • Token encryption and storage
  • Social post CRUD and scheduling
  • Facebook publishing
  • Instagram publishing
  • Twitter publishing
  • Event promotion auto-creation
  • Reminder scheduling

9. Phase 7: Backfill

See Backfill Strategy for detailed migration plan.

Key Steps

  1. Export MCA v1 ZA data
  2. Export MCA v1 UK data
  3. Transform to CRM schema
  4. Run identity resolution
  5. Load into CRM database
  6. Verify and reconcile

10. Phase 8: Intelligence

10.1 Engagement Scoring

// libs/crm/core/src/services/engagement-scoring.service.ts

@Injectable()
export class EngagementScoringService {
private readonly weights = {
recency: 0.30,
frequency: 0.25,
monetary: 0.20,
breadth: 0.15,
depth: 0.10,
};

async calculateScore(profile: CustomerProfile): Promise<number> {
const [recency, frequency, monetary, breadth, depth] = await Promise.all([
this.calculateRecencyScore(profile),
this.calculateFrequencyScore(profile),
this.calculateMonetaryScore(profile),
this.calculateBreadthScore(profile),
this.calculateDepthScore(profile),
]);

return Math.round(
recency * this.weights.recency +
frequency * this.weights.frequency +
monetary * this.weights.monetary +
breadth * this.weights.breadth +
depth * this.weights.depth
);
}

private calculateRecencyScore(profile: CustomerProfile): number {
if (!profile.lastActivityAt) return 0;
const daysSince = this.daysBetween(profile.lastActivityAt, new Date());
if (daysSince <= 7) return 100;
if (daysSince <= 14) return 80;
if (daysSince <= 30) return 60;
if (daysSince <= 60) return 40;
if (daysSince <= 90) return 20;
return 0;
}

private calculateFrequencyScore(profile: CustomerProfile): number {
const count = profile.activityCount30d ?? 0;
if (count >= 20) return 100;
if (count >= 10) return 80;
if (count >= 5) return 60;
if (count >= 2) return 40;
if (count >= 1) return 20;
return 0;
}

private calculateMonetaryScore(profile: CustomerProfile): number {
const ltv = Number(profile.lifetimeValue ?? 0);
if (ltv >= 100000) return 100;
if (ltv >= 50000) return 80;
if (ltv >= 20000) return 60;
if (ltv >= 10000) return 40;
if (ltv >= 5000) return 20;
return 0;
}

private calculateBreadthScore(profile: CustomerProfile): number {
let channels = 0;
if (profile.emailOptIn) channels++;
if (profile.smsOptIn) channels++;
if (profile.pushOptIn) channels++;
if (profile.whatsappOptIn) channels++;
return channels * 25;
}

private async calculateDepthScore(profile: CustomerProfile): Promise<number> {
const interactions = await this.prisma.campaignInteraction.findMany({
where: {
customerId: profile.id,
occurredAt: { gte: this.daysAgo(30) },
},
});

const sent = interactions.filter(i => i.type === 'SENT').length;
const opened = interactions.filter(i => i.type === 'OPENED').length;
const clicked = interactions.filter(i => i.type === 'CLICKED').length;

if (sent === 0) return 50;

const openRate = opened / sent;
const clickRate = clicked / Math.max(opened, 1);

return Math.round((openRate * 50) + (clickRate * 50));
}
}

10.2 Batch Scoring Job

@Injectable()
export class BatchScoringJob {
@Cron('0 2 * * *') // Run at 2 AM daily
async runBatchScoring(): Promise<void> {
let cursor: string | undefined;

while (true) {
const profiles = await this.prisma.customerProfile.findMany({
where: { status: { not: 'MERGED' }, deletedAt: null },
take: 100,
skip: cursor ? 1 : 0,
cursor: cursor ? { id: cursor } : undefined,
orderBy: { id: 'asc' },
});

if (profiles.length === 0) break;

await Promise.all(
profiles.map(async profile => {
const [engagement, churnRisk] = await Promise.all([
this.engagementService.calculateScore(profile),
this.churnService.calculateChurnRisk(profile),
]);

await this.prisma.customerProfile.update({
where: { id: profile.id },
data: {
engagementScore: engagement,
engagementScoreAt: new Date(),
churnRiskScore: churnRisk,
churnRiskScoreAt: new Date(),
},
});
})
);

cursor = profiles[profiles.length - 1].id;
}
}
}

10.3 Deliverables - Phase 8

  • Engagement scoring service
  • Churn risk scoring service
  • Batch scoring cron job
  • Analytics API endpoints

11. Testing Strategy

Unit Tests

  • Identity resolution logic
  • Scoring calculations
  • Segment rule parsing
  • Journey step execution

Integration Tests

  • Event processing end-to-end
  • Campaign execution flow
  • Journey enrollment and progression
  • Social publishing

Load Tests

  • Identity resolution at scale
  • Batch scoring performance
  • Campaign sending throughput

12. Deployment

Kubernetes Resources

apiVersion: apps/v1
kind: Deployment
metadata:
name: crm-backend
namespace: crm
spec:
replicas: 2
template:
spec:
containers:
- name: crm-backend
image: harbor.digiwedge.dev/crm/backend:latest
env:
- name: CRM_DATABASE_URL
valueFrom:
secretKeyRef:
name: crm-secrets
key: DATABASE_URL
- name: REDIS_URL
valueFrom:
secretKeyRef:
name: crm-secrets
key: REDIS_URL
- name: FACEBOOK_APP_ID
valueFrom:
secretKeyRef:
name: crm-social-secrets
key: FACEBOOK_APP_ID
- name: FACEBOOK_APP_SECRET
valueFrom:
secretKeyRef:
name: crm-social-secrets
key: FACEBOOK_APP_SECRET

Infisical Secrets

/crm/backend:
- CRM_DATABASE_URL
- REDIS_URL
- JWT_SECRET
- ENCRYPTION_KEY

/crm/social:
- FACEBOOK_APP_ID
- FACEBOOK_APP_SECRET
- INSTAGRAM_APP_ID
- INSTAGRAM_APP_SECRET
- TWITTER_CLIENT_ID
- TWITTER_CLIENT_SECRET