Skip to main content

Phase 4: Marketing Core

Campaigns, segments, and scheduling.


Deliverables

  • Segmentation service with dynamic rules
  • Segment sync to Messaging service
  • Campaign CRUD
  • Campaign scheduling with BullMQ
  • Campaign execution (Email, SMS)
  • Campaign stats tracking

1. Segmentation Service (CRM Owns Segments)

// libs/crm/campaigns/src/services/segmentation.service.ts

@Injectable()
export class SegmentationService {
constructor(
private readonly prisma: PrismaService,
private readonly messagingClient: MessagingServiceClient,
) {}

async createSegment(dto: CreateSegmentDto): Promise<CustomerSegment> {
const segment = await this.prisma.customerSegment.create({
data: {
tenantId: dto.tenantId,
clubId: dto.clubId,
name: dto.name,
description: dto.description,
type: dto.type,
criteria: dto.criteria,
combinator: dto.combinator ?? 'AND',
refreshFrequency: dto.refreshFrequency ?? 'DAILY',
createdBy: dto.createdBy,
},
});

// Initial refresh for dynamic segments
if (dto.type === 'DYNAMIC') {
await this.refreshSegment(segment.id);
}

return segment;
}

async refreshSegment(segmentId: string): Promise<void> {
const segment = await this.prisma.customerSegment.findUnique({
where: { id: segmentId },
});

if (!segment || segment.type === 'STATIC') return;

// Build query from criteria
const where = this.buildWhereFromCriteria(
segment.tenantId,
segment.criteria as SegmentCriteria,
);

// Find matching profiles
const matchingProfiles = await this.prisma.customerProfile.findMany({
where,
select: { id: true },
});

// Update membership in transaction
await this.prisma.$transaction([
this.prisma.customerSegmentMembership.deleteMany({
where: { segmentId },
}),
this.prisma.customerSegmentMembership.createMany({
data: matchingProfiles.map(p => ({
tenantId: segment.tenantId,
segmentId,
customerId: p.id,
matchedAt: new Date(),
})),
}),
this.prisma.customerSegment.update({
where: { id: segmentId },
data: {
memberCount: matchingProfiles.length,
lastRefreshedAt: new Date(),
},
}),
]);
}

async syncToMessaging(segmentId: string): Promise<void> {
const segment = await this.prisma.customerSegment.findUnique({
where: { id: segmentId },
include: {
memberships: {
include: { customer: true },
},
},
});

if (!segment) throw new NotFoundException('Segment not found');

// Create or update segment in Messaging service
const messagingSegment = await this.messagingClient.upsertSegment({
externalId: segment.id,
tenantId: segment.tenantId,
name: segment.name,
contacts: segment.memberships.map(m => ({
email: m.customer.email,
phoneNumber: m.customer.phoneNumber,
firstName: m.customer.firstName,
lastName: m.customer.lastName,
})),
});

// Store messaging segment ID for reference
await this.prisma.customerSegment.update({
where: { id: segmentId },
data: {
messagingSegmentId: messagingSegment.id,
lastSyncedAt: new Date(),
},
});
}

private buildWhereFromCriteria(
tenantId: string,
criteria: SegmentCriteria,
): Prisma.CustomerProfileWhereInput {
const conditions = criteria.rules.map(rule => this.ruleToCondition(rule));

return {
tenantId,
status: { not: 'MERGED' },
deletedAt: null,
...(criteria.combinator === 'OR' ? { OR: conditions } : { AND: conditions }),
};
}

private ruleToCondition(rule: SegmentRule): Prisma.CustomerProfileWhereInput {
const { field, operator, value } = rule;

// Handle nested rules (groups)
if ('rules' in rule) {
const nestedConditions = rule.rules.map(r => this.ruleToCondition(r));
return rule.combinator === 'OR' ? { OR: nestedConditions } : { AND: nestedConditions };
}

const operators: Record<string, any> = {
eq: value,
neq: { not: value },
gt: { gt: value },
gte: { gte: value },
lt: { lt: value },
lte: { lte: value },
contains: { contains: value, mode: 'insensitive' },
in: { in: value },
isNull: null,
isNotNull: { not: null },
};

return { [field]: operators[operator] ?? value };
}
}

2. Campaign Service

// libs/crm/campaigns/src/services/campaign.service.ts

@Injectable()
export class CampaignService {
constructor(
private readonly prisma: PrismaService,
private readonly segmentService: SegmentationService,
private readonly schedulerService: CampaignSchedulerService,
private readonly messagingClient: MessagingServiceClient,
) {}

async create(dto: CreateCampaignDto): Promise<Campaign> {
return this.prisma.campaign.create({
data: {
tenantId: dto.tenantId,
clubId: dto.clubId,
name: dto.name,
description: dto.description,
type: dto.type,
status: 'DRAFT',
segmentId: dto.segmentId,
channels: dto.channels,
emailTemplateId: dto.emailTemplateId,
smsTemplateId: dto.smsTemplateId,
pushTemplateId: dto.pushTemplateId,
timezone: dto.timezone ?? 'Africa/Johannesburg',
createdBy: dto.createdBy,
},
});
}

async schedule(campaignId: string, scheduledAt: Date): Promise<Campaign> {
const campaign = await this.prisma.campaign.findUnique({
where: { id: campaignId },
include: { segment: true },
});

if (!campaign) throw new NotFoundException('Campaign not found');
if (campaign.status !== 'DRAFT') {
throw new BadRequestException('Can only schedule draft campaigns');
}

// Refresh segment and sync to messaging
if (campaign.segmentId) {
await this.segmentService.refreshSegment(campaign.segmentId);
await this.segmentService.syncToMessaging(campaign.segmentId);
}

const targetCount = campaign.segment?.memberCount ?? 0;

const updated = await this.prisma.campaign.update({
where: { id: campaignId },
data: {
status: 'SCHEDULED',
scheduledAt,
targetCount,
},
});

// Schedule the execution job
await this.schedulerService.scheduleCampaign(updated);

return updated;
}

async execute(campaignId: string): Promise<void> {
const campaign = await this.prisma.campaign.findUnique({
where: { id: campaignId },
include: {
segment: {
include: {
memberships: {
include: { customer: true },
},
},
},
},
});

if (!campaign) throw new NotFoundException('Campaign not found');

await this.prisma.campaign.update({
where: { id: campaignId },
data: { status: 'ACTIVE', startedAt: new Date() },
});

try {
const recipients = campaign.segment?.memberships.map(m => m.customer) ?? [];

// Send via each channel
for (const channel of campaign.channels) {
await this.sendViaChannel(campaign, recipients, channel);
}

await this.prisma.campaign.update({
where: { id: campaignId },
data: { status: 'COMPLETED', completedAt: new Date() },
});
} catch (error) {
await this.prisma.campaign.update({
where: { id: campaignId },
data: { status: 'PAUSED' },
});
throw error;
}
}

private async sendViaChannel(
campaign: Campaign,
recipients: CustomerProfile[],
channel: Channel,
): Promise<void> {
switch (channel) {
case 'EMAIL':
await this.messagingClient.sendBulkEmail({
templateId: campaign.emailTemplateId,
recipients: recipients
.filter(r => r.emailOptIn && r.email)
.map(r => ({
email: r.email,
data: { firstName: r.firstName, lastName: r.lastName },
})),
campaignId: campaign.id,
});
break;

case 'SMS':
await this.messagingClient.sendBulkSms({
templateId: campaign.smsTemplateId,
recipients: recipients
.filter(r => r.smsOptIn && r.phoneNumber)
.map(r => ({
phone: r.phoneNumber,
data: { firstName: r.firstName },
})),
campaignId: campaign.id,
});
break;
}

// Record sent interactions
for (const recipient of recipients) {
await this.prisma.campaignInteraction.create({
data: {
tenantId: campaign.tenantId,
campaignId: campaign.id,
customerId: recipient.id,
channel,
type: 'SENT',
},
});
}
}

async pause(campaignId: string): Promise<Campaign> {
const campaign = await this.prisma.campaign.findUnique({ where: { id: campaignId } });

if (campaign?.status !== 'ACTIVE' && campaign?.status !== 'SCHEDULED') {
throw new BadRequestException('Campaign cannot be paused');
}

await this.schedulerService.cancelScheduledCampaign(campaignId);

return this.prisma.campaign.update({
where: { id: campaignId },
data: { status: 'PAUSED' },
});
}

async getStats(campaignId: string): Promise<CampaignStats> {
const campaign = await this.prisma.campaign.findUnique({
where: { id: campaignId },
});

if (!campaign) throw new NotFoundException('Campaign not found');

const interactions = await this.prisma.campaignInteraction.groupBy({
by: ['type'],
where: { campaignId },
_count: true,
});

const counts = Object.fromEntries(
interactions.map(i => [i.type.toLowerCase() + 'Count', i._count])
);

return {
campaignId,
targetCount: campaign.targetCount,
...counts,
deliveryRate: counts.deliveredCount / Math.max(counts.sentCount, 1),
openRate: counts.openedCount / Math.max(counts.deliveredCount, 1),
clickRate: counts.clickedCount / Math.max(counts.openedCount, 1),
};
}
}

3. Campaign Scheduler

// libs/crm/campaigns/src/services/campaign-scheduler.service.ts

@Injectable()
export class CampaignSchedulerService {
constructor(
@InjectQueue('campaign-execution') private readonly queue: Queue,
private readonly prisma: PrismaService,
) {}

async scheduleCampaign(campaign: Campaign): Promise<void> {
const delay = campaign.scheduledAt.getTime() - Date.now();

if (delay <= 0) {
// Execute immediately
await this.queue.add('execute', { campaignId: campaign.id });
} else {
// Schedule for later
await this.queue.add(
'execute',
{ campaignId: campaign.id },
{ delay, jobId: `campaign-${campaign.id}` },
);
}
}

async cancelScheduledCampaign(campaignId: string): Promise<void> {
const job = await this.queue.getJob(`campaign-${campaignId}`);
if (job) {
await job.remove();
}
}
}

@Processor('campaign-execution')
export class CampaignExecutionProcessor extends WorkerHost {
constructor(private readonly campaignService: CampaignService) {
super();
}

async process(job: Job<{ campaignId: string }>): Promise<void> {
await this.campaignService.execute(job.data.campaignId);
}
}

4. Segment Refresh Job

// libs/crm/campaigns/src/jobs/segment-refresh.job.ts

@Injectable()
export class SegmentRefreshJob {
constructor(
private readonly prisma: PrismaService,
private readonly segmentService: SegmentationService,
) {}

@Cron('0 * * * *') // Every hour
async refreshHourlySegments(): Promise<void> {
const segments = await this.prisma.customerSegment.findMany({
where: {
type: 'DYNAMIC',
refreshFrequency: 'HOURLY',
isActive: true,
},
});

for (const segment of segments) {
await this.segmentService.refreshSegment(segment.id);
}
}

@Cron('0 2 * * *') // Daily at 2 AM
async refreshDailySegments(): Promise<void> {
const segments = await this.prisma.customerSegment.findMany({
where: {
type: 'DYNAMIC',
refreshFrequency: 'DAILY',
isActive: true,
},
});

for (const segment of segments) {
await this.segmentService.refreshSegment(segment.id);
}
}
}

Next Steps

After completing Phase 4, proceed to Phase 5: Marketing Automation.