Phase 4: Marketing Core
Campaigns, segments, and scheduling.
Deliverables
- Segmentation service with dynamic rules
- Segment sync to Messaging service
- Campaign CRUD
- Campaign scheduling with BullMQ
- Campaign execution (Email, SMS)
- Campaign stats tracking
1. Segmentation Service (CRM Owns Segments)
// libs/crm/campaigns/src/services/segmentation.service.ts
@Injectable()
export class SegmentationService {
constructor(
private readonly prisma: PrismaService,
private readonly messagingClient: MessagingServiceClient,
) {}
async createSegment(dto: CreateSegmentDto): Promise<CustomerSegment> {
const segment = await this.prisma.customerSegment.create({
data: {
tenantId: dto.tenantId,
clubId: dto.clubId,
name: dto.name,
description: dto.description,
type: dto.type,
criteria: dto.criteria,
combinator: dto.combinator ?? 'AND',
refreshFrequency: dto.refreshFrequency ?? 'DAILY',
createdBy: dto.createdBy,
},
});
// Initial refresh for dynamic segments
if (dto.type === 'DYNAMIC') {
await this.refreshSegment(segment.id);
}
return segment;
}
async refreshSegment(segmentId: string): Promise<void> {
const segment = await this.prisma.customerSegment.findUnique({
where: { id: segmentId },
});
if (!segment || segment.type === 'STATIC') return;
// Build query from criteria
const where = this.buildWhereFromCriteria(
segment.tenantId,
segment.criteria as SegmentCriteria,
);
// Find matching profiles
const matchingProfiles = await this.prisma.customerProfile.findMany({
where,
select: { id: true },
});
// Update membership in transaction
await this.prisma.$transaction([
this.prisma.customerSegmentMembership.deleteMany({
where: { segmentId },
}),
this.prisma.customerSegmentMembership.createMany({
data: matchingProfiles.map(p => ({
tenantId: segment.tenantId,
segmentId,
customerId: p.id,
matchedAt: new Date(),
})),
}),
this.prisma.customerSegment.update({
where: { id: segmentId },
data: {
memberCount: matchingProfiles.length,
lastRefreshedAt: new Date(),
},
}),
]);
}
async syncToMessaging(segmentId: string): Promise<void> {
const segment = await this.prisma.customerSegment.findUnique({
where: { id: segmentId },
include: {
memberships: {
include: { customer: true },
},
},
});
if (!segment) throw new NotFoundException('Segment not found');
// Create or update segment in Messaging service
const messagingSegment = await this.messagingClient.upsertSegment({
externalId: segment.id,
tenantId: segment.tenantId,
name: segment.name,
contacts: segment.memberships.map(m => ({
email: m.customer.email,
phoneNumber: m.customer.phoneNumber,
firstName: m.customer.firstName,
lastName: m.customer.lastName,
})),
});
// Store messaging segment ID for reference
await this.prisma.customerSegment.update({
where: { id: segmentId },
data: {
messagingSegmentId: messagingSegment.id,
lastSyncedAt: new Date(),
},
});
}
private buildWhereFromCriteria(
tenantId: string,
criteria: SegmentCriteria,
): Prisma.CustomerProfileWhereInput {
const conditions = criteria.rules.map(rule => this.ruleToCondition(rule));
return {
tenantId,
status: { not: 'MERGED' },
deletedAt: null,
...(criteria.combinator === 'OR' ? { OR: conditions } : { AND: conditions }),
};
}
private ruleToCondition(rule: SegmentRule): Prisma.CustomerProfileWhereInput {
const { field, operator, value } = rule;
// Handle nested rules (groups)
if ('rules' in rule) {
const nestedConditions = rule.rules.map(r => this.ruleToCondition(r));
return rule.combinator === 'OR' ? { OR: nestedConditions } : { AND: nestedConditions };
}
const operators: Record<string, any> = {
eq: value,
neq: { not: value },
gt: { gt: value },
gte: { gte: value },
lt: { lt: value },
lte: { lte: value },
contains: { contains: value, mode: 'insensitive' },
in: { in: value },
isNull: null,
isNotNull: { not: null },
};
return { [field]: operators[operator] ?? value };
}
}
2. Campaign Service
// libs/crm/campaigns/src/services/campaign.service.ts
@Injectable()
export class CampaignService {
constructor(
private readonly prisma: PrismaService,
private readonly segmentService: SegmentationService,
private readonly schedulerService: CampaignSchedulerService,
private readonly messagingClient: MessagingServiceClient,
) {}
async create(dto: CreateCampaignDto): Promise<Campaign> {
return this.prisma.campaign.create({
data: {
tenantId: dto.tenantId,
clubId: dto.clubId,
name: dto.name,
description: dto.description,
type: dto.type,
status: 'DRAFT',
segmentId: dto.segmentId,
channels: dto.channels,
emailTemplateId: dto.emailTemplateId,
smsTemplateId: dto.smsTemplateId,
pushTemplateId: dto.pushTemplateId,
timezone: dto.timezone ?? 'Africa/Johannesburg',
createdBy: dto.createdBy,
},
});
}
async schedule(campaignId: string, scheduledAt: Date): Promise<Campaign> {
const campaign = await this.prisma.campaign.findUnique({
where: { id: campaignId },
include: { segment: true },
});
if (!campaign) throw new NotFoundException('Campaign not found');
if (campaign.status !== 'DRAFT') {
throw new BadRequestException('Can only schedule draft campaigns');
}
// Refresh segment and sync to messaging
if (campaign.segmentId) {
await this.segmentService.refreshSegment(campaign.segmentId);
await this.segmentService.syncToMessaging(campaign.segmentId);
}
const targetCount = campaign.segment?.memberCount ?? 0;
const updated = await this.prisma.campaign.update({
where: { id: campaignId },
data: {
status: 'SCHEDULED',
scheduledAt,
targetCount,
},
});
// Schedule the execution job
await this.schedulerService.scheduleCampaign(updated);
return updated;
}
async execute(campaignId: string): Promise<void> {
const campaign = await this.prisma.campaign.findUnique({
where: { id: campaignId },
include: {
segment: {
include: {
memberships: {
include: { customer: true },
},
},
},
},
});
if (!campaign) throw new NotFoundException('Campaign not found');
await this.prisma.campaign.update({
where: { id: campaignId },
data: { status: 'ACTIVE', startedAt: new Date() },
});
try {
const recipients = campaign.segment?.memberships.map(m => m.customer) ?? [];
// Send via each channel
for (const channel of campaign.channels) {
await this.sendViaChannel(campaign, recipients, channel);
}
await this.prisma.campaign.update({
where: { id: campaignId },
data: { status: 'COMPLETED', completedAt: new Date() },
});
} catch (error) {
await this.prisma.campaign.update({
where: { id: campaignId },
data: { status: 'PAUSED' },
});
throw error;
}
}
private async sendViaChannel(
campaign: Campaign,
recipients: CustomerProfile[],
channel: Channel,
): Promise<void> {
switch (channel) {
case 'EMAIL':
await this.messagingClient.sendBulkEmail({
templateId: campaign.emailTemplateId,
recipients: recipients
.filter(r => r.emailOptIn && r.email)
.map(r => ({
email: r.email,
data: { firstName: r.firstName, lastName: r.lastName },
})),
campaignId: campaign.id,
});
break;
case 'SMS':
await this.messagingClient.sendBulkSms({
templateId: campaign.smsTemplateId,
recipients: recipients
.filter(r => r.smsOptIn && r.phoneNumber)
.map(r => ({
phone: r.phoneNumber,
data: { firstName: r.firstName },
})),
campaignId: campaign.id,
});
break;
}
// Record sent interactions
for (const recipient of recipients) {
await this.prisma.campaignInteraction.create({
data: {
tenantId: campaign.tenantId,
campaignId: campaign.id,
customerId: recipient.id,
channel,
type: 'SENT',
},
});
}
}
async pause(campaignId: string): Promise<Campaign> {
const campaign = await this.prisma.campaign.findUnique({ where: { id: campaignId } });
if (campaign?.status !== 'ACTIVE' && campaign?.status !== 'SCHEDULED') {
throw new BadRequestException('Campaign cannot be paused');
}
await this.schedulerService.cancelScheduledCampaign(campaignId);
return this.prisma.campaign.update({
where: { id: campaignId },
data: { status: 'PAUSED' },
});
}
async getStats(campaignId: string): Promise<CampaignStats> {
const campaign = await this.prisma.campaign.findUnique({
where: { id: campaignId },
});
if (!campaign) throw new NotFoundException('Campaign not found');
const interactions = await this.prisma.campaignInteraction.groupBy({
by: ['type'],
where: { campaignId },
_count: true,
});
const counts = Object.fromEntries(
interactions.map(i => [i.type.toLowerCase() + 'Count', i._count])
);
return {
campaignId,
targetCount: campaign.targetCount,
...counts,
deliveryRate: counts.deliveredCount / Math.max(counts.sentCount, 1),
openRate: counts.openedCount / Math.max(counts.deliveredCount, 1),
clickRate: counts.clickedCount / Math.max(counts.openedCount, 1),
};
}
}
3. Campaign Scheduler
// libs/crm/campaigns/src/services/campaign-scheduler.service.ts
@Injectable()
export class CampaignSchedulerService {
constructor(
@InjectQueue('campaign-execution') private readonly queue: Queue,
private readonly prisma: PrismaService,
) {}
async scheduleCampaign(campaign: Campaign): Promise<void> {
const delay = campaign.scheduledAt.getTime() - Date.now();
if (delay <= 0) {
// Execute immediately
await this.queue.add('execute', { campaignId: campaign.id });
} else {
// Schedule for later
await this.queue.add(
'execute',
{ campaignId: campaign.id },
{ delay, jobId: `campaign-${campaign.id}` },
);
}
}
async cancelScheduledCampaign(campaignId: string): Promise<void> {
const job = await this.queue.getJob(`campaign-${campaignId}`);
if (job) {
await job.remove();
}
}
}
@Processor('campaign-execution')
export class CampaignExecutionProcessor extends WorkerHost {
constructor(private readonly campaignService: CampaignService) {
super();
}
async process(job: Job<{ campaignId: string }>): Promise<void> {
await this.campaignService.execute(job.data.campaignId);
}
}
4. Segment Refresh Job
// libs/crm/campaigns/src/jobs/segment-refresh.job.ts
@Injectable()
export class SegmentRefreshJob {
constructor(
private readonly prisma: PrismaService,
private readonly segmentService: SegmentationService,
) {}
@Cron('0 * * * *') // Every hour
async refreshHourlySegments(): Promise<void> {
const segments = await this.prisma.customerSegment.findMany({
where: {
type: 'DYNAMIC',
refreshFrequency: 'HOURLY',
isActive: true,
},
});
for (const segment of segments) {
await this.segmentService.refreshSegment(segment.id);
}
}
@Cron('0 2 * * *') // Daily at 2 AM
async refreshDailySegments(): Promise<void> {
const segments = await this.prisma.customerSegment.findMany({
where: {
type: 'DYNAMIC',
refreshFrequency: 'DAILY',
isActive: true,
},
});
for (const segment of segments) {
await this.segmentService.refreshSegment(segment.id);
}
}
}
Next Steps
After completing Phase 4, proceed to Phase 5: Marketing Automation.