Phase 5: Marketing Automation
Journey builder, event triggers, and step execution.
Deliverables
- Journey CRUD
- Journey step management
- Journey engine (enrollment, step execution)
- Event triggers for journey enrollment
- Wait step processing (cron)
- Condition evaluation
- Journey analytics
1. Journey Service
// libs/crm/automation/src/services/journey.service.ts
@Injectable()
export class JourneyService {
constructor(
private readonly prisma: PrismaService,
private readonly journeyEngine: JourneyEngine,
) {}
async create(dto: CreateJourneyDto): Promise<Journey> {
return this.prisma.journey.create({
data: {
tenantId: dto.tenantId,
clubId: dto.clubId,
name: dto.name,
description: dto.description,
triggerType: dto.triggerType,
triggerConfig: dto.triggerConfig,
allowReentry: dto.allowReentry ?? false,
timezone: dto.timezone ?? 'Africa/Johannesburg',
createdBy: dto.createdBy,
},
});
}
async addStep(journeyId: string, dto: CreateJourneyStepDto): Promise<JourneyStep> {
const journey = await this.prisma.journey.findUnique({
where: { id: journeyId },
include: { steps: true },
});
if (!journey) throw new NotFoundException('Journey not found');
if (journey.status !== 'DRAFT') {
throw new BadRequestException('Cannot add steps to active journey');
}
const stepNumber = journey.steps.length + 1;
return this.prisma.journeyStep.create({
data: {
journeyId,
stepNumber,
type: dto.type,
name: dto.name,
config: dto.config,
nextStepId: dto.nextStepId,
nextStepYesId: dto.nextStepYesId,
nextStepNoId: dto.nextStepNoId,
},
});
}
async activate(journeyId: string): Promise<Journey> {
const journey = await this.prisma.journey.findUnique({
where: { id: journeyId },
include: { steps: true },
});
if (!journey) throw new NotFoundException('Journey not found');
if (journey.steps.length === 0) {
throw new BadRequestException('Journey must have at least one step');
}
const updated = await this.prisma.journey.update({
where: { id: journeyId },
data: { status: 'ACTIVE', activatedAt: new Date() },
});
// Register trigger listener
await this.journeyEngine.registerTrigger(updated);
return updated;
}
async pause(journeyId: string): Promise<Journey> {
return this.prisma.journey.update({
where: { id: journeyId },
data: { status: 'PAUSED', pausedAt: new Date() },
});
}
}
2. Journey Engine
// libs/crm/automation/src/services/journey-engine.service.ts
@Injectable()
export class JourneyEngine {
private readonly logger = new Logger(JourneyEngine.name);
constructor(
private readonly prisma: PrismaService,
@InjectQueue('journey-steps') private readonly stepQueue: Queue,
private readonly messagingClient: MessagingServiceClient,
) {}
async enrollCustomer(
journeyId: string,
customerId: string,
triggerData?: any,
): Promise<JourneyEnrollment> {
const journey = await this.prisma.journey.findUnique({
where: { id: journeyId },
include: { steps: { orderBy: { stepNumber: 'asc' } } },
});
if (!journey || journey.status !== 'ACTIVE') {
throw new BadRequestException('Journey not active');
}
// Check for existing enrollment (unless reentry allowed)
if (!journey.allowReentry) {
const existing = await this.prisma.journeyEnrollment.findUnique({
where: { journeyId_customerId: { journeyId, customerId } },
});
if (existing) {
this.logger.log(`Customer ${customerId} already enrolled in journey ${journeyId}`);
return existing;
}
}
const firstStep = journey.steps[0];
const enrollment = await this.prisma.journeyEnrollment.create({
data: {
tenantId: journey.tenantId,
journeyId,
customerId,
status: 'ACTIVE',
currentStepId: firstStep?.id,
currentStepNumber: 1,
triggerData,
},
});
// Update journey stats
await this.prisma.journey.update({
where: { id: journeyId },
data: { totalEnrolled: { increment: 1 } },
});
// Queue first step
if (firstStep) {
await this.queueStep(enrollment.id, firstStep.id);
}
this.logger.log(`Enrolled customer ${customerId} in journey ${journeyId}`);
return enrollment;
}
async processStep(enrollmentId: string, stepId: string): Promise<void> {
const enrollment = await this.prisma.journeyEnrollment.findUnique({
where: { id: enrollmentId },
include: { journey: true, customer: true },
});
const step = await this.prisma.journeyStep.findUnique({
where: { id: stepId },
});
if (!enrollment || !step || enrollment.status !== 'ACTIVE') return;
// Record step entry
await this.prisma.journeyEnrollmentStep.upsert({
where: { enrollmentId_stepId: { enrollmentId, stepId } },
create: {
enrollmentId,
stepId,
status: 'IN_PROGRESS',
},
update: {
status: 'IN_PROGRESS',
},
});
try {
const result = await this.executeStep(enrollment, step);
// Record completion
await this.prisma.journeyEnrollmentStep.update({
where: { enrollmentId_stepId: { enrollmentId, stepId } },
data: { status: 'COMPLETED', completedAt: new Date(), result },
});
// Determine next step
const nextStepId = this.determineNextStep(step, result);
if (nextStepId) {
await this.prisma.journeyEnrollment.update({
where: { id: enrollmentId },
data: {
currentStepId: nextStepId,
currentStepNumber: { increment: 1 },
},
});
await this.queueStep(enrollmentId, nextStepId);
} else {
// Journey complete
await this.completeEnrollment(enrollmentId);
}
} catch (error) {
this.logger.error(`Step ${stepId} failed for enrollment ${enrollmentId}: ${error.message}`);
await this.prisma.journeyEnrollmentStep.update({
where: { enrollmentId_stepId: { enrollmentId, stepId } },
data: { status: 'FAILED', result: { error: error.message } },
});
}
}
private async executeStep(
enrollment: JourneyEnrollment & { customer: CustomerProfile },
step: JourneyStep,
): Promise<any> {
const config = step.config as any;
switch (step.type) {
case 'SEND':
return this.executeSendStep(enrollment, config);
case 'WAIT':
return this.executeWaitStep(enrollment, config);
case 'CONDITION':
return this.executeConditionStep(enrollment, config);
case 'UPDATE':
return this.executeUpdateStep(enrollment, config);
case 'END':
return { completed: true };
default:
throw new Error(`Unknown step type: ${step.type}`);
}
}
private async executeSendStep(
enrollment: JourneyEnrollment & { customer: CustomerProfile },
config: { channel: string; templateId: string },
): Promise<any> {
await this.messagingClient.send({
channel: config.channel,
templateId: config.templateId,
recipient: {
email: enrollment.customer.email,
phone: enrollment.customer.phoneNumber,
},
data: {
firstName: enrollment.customer.firstName,
lastName: enrollment.customer.lastName,
...enrollment.triggerData,
},
});
return { sent: true, channel: config.channel };
}
private async executeWaitStep(
enrollment: JourneyEnrollment,
config: { duration: string },
): Promise<any> {
const waitUntil = this.calculateWaitUntil(config.duration);
await this.prisma.journeyEnrollment.update({
where: { id: enrollment.id },
data: { waitingUntil: waitUntil },
});
return { waitingUntil };
}
private calculateWaitUntil(duration: string): Date {
const now = new Date();
const match = duration.match(/^(\d+)\s*(minute|hour|day|week)s?$/i);
if (!match) throw new Error(`Invalid duration: ${duration}`);
const [, amount, unit] = match;
const num = parseInt(amount);
switch (unit.toLowerCase()) {
case 'minute':
return new Date(now.getTime() + num * 60 * 1000);
case 'hour':
return new Date(now.getTime() + num * 60 * 60 * 1000);
case 'day':
return new Date(now.getTime() + num * 24 * 60 * 60 * 1000);
case 'week':
return new Date(now.getTime() + num * 7 * 24 * 60 * 60 * 1000);
default:
throw new Error(`Unknown unit: ${unit}`);
}
}
private async executeConditionStep(
enrollment: JourneyEnrollment & { customer: CustomerProfile },
config: { rules: any[]; combinator: string },
): Promise<{ result: boolean }> {
const result = this.evaluateCondition(enrollment.customer, config);
return { result };
}
private evaluateCondition(
customer: CustomerProfile,
config: { rules: any[]; combinator: string },
): boolean {
const results = config.rules.map(rule => {
const value = customer[rule.field as keyof CustomerProfile];
return this.evaluateRule(value, rule.operator, rule.value);
});
return config.combinator === 'OR'
? results.some(r => r)
: results.every(r => r);
}
private evaluateRule(actual: any, operator: string, expected: any): boolean {
switch (operator) {
case 'eq':
return actual === expected;
case 'neq':
return actual !== expected;
case 'gt':
return actual > expected;
case 'gte':
return actual >= expected;
case 'lt':
return actual < expected;
case 'lte':
return actual <= expected;
case 'isNull':
return actual == null;
case 'isNotNull':
return actual != null;
default:
return false;
}
}
private async executeUpdateStep(
enrollment: JourneyEnrollment,
config: { updates: Array<{ field: string; value: any; operation?: string }> },
): Promise<any> {
const updates: Record<string, any> = {};
for (const update of config.updates) {
if (update.field === 'tags' && update.operation === 'add') {
updates.tags = { push: update.value };
} else {
updates[update.field] = update.value;
}
}
await this.prisma.customerProfile.update({
where: { id: enrollment.customerId },
data: updates,
});
return { updated: config.updates.map(u => u.field) };
}
private determineNextStep(step: JourneyStep, result: any): string | null {
if (step.type === 'CONDITION') {
return result.result ? step.nextStepYesId : step.nextStepNoId;
}
if (step.type === 'END') {
return null;
}
return step.nextStepId;
}
private async queueStep(enrollmentId: string, stepId: string): Promise<void> {
await this.stepQueue.add('process-step', { enrollmentId, stepId });
}
private async completeEnrollment(enrollmentId: string): Promise<void> {
const enrollment = await this.prisma.journeyEnrollment.update({
where: { id: enrollmentId },
data: { status: 'COMPLETED', completedAt: new Date() },
});
await this.prisma.journey.update({
where: { id: enrollment.journeyId },
data: { totalCompleted: { increment: 1 } },
});
}
}
3. Event Trigger Service
// libs/crm/automation/src/services/event-trigger.service.ts
@Injectable()
export class EventTriggerService implements OnModuleInit {
private triggerMap: Map<string, Journey[]> = new Map();
constructor(
private readonly prisma: PrismaService,
private readonly journeyEngine: JourneyEngine,
private readonly identityService: IdentityResolutionService,
) {}
async onModuleInit() {
await this.loadTriggers();
}
async loadTriggers(): Promise<void> {
const journeys = await this.prisma.journey.findMany({
where: { status: 'ACTIVE' },
});
this.triggerMap.clear();
for (const journey of journeys) {
const triggers = this.triggerMap.get(journey.triggerType) || [];
triggers.push(journey);
this.triggerMap.set(journey.triggerType, triggers);
}
}
async handleEvent(event: CrmEvent): Promise<void> {
const journeys = this.triggerMap.get(event.type) || [];
for (const journey of journeys) {
if (this.matchesTriggerConditions(event, journey.triggerConfig)) {
const { profile } = await this.identityService.resolve({
tenantId: event.tenantId,
...event.identity,
});
await this.journeyEngine.enrollCustomer(
journey.id,
profile.id,
event.payload,
);
}
}
}
private matchesTriggerConditions(event: CrmEvent, config: any): boolean {
if (!config?.conditions?.length) return true;
return config.conditions.every(condition => {
const value = event.payload[condition.field];
return this.evaluateCondition(value, condition.operator, condition.value);
});
}
}
4. Wait Step Processor
// libs/crm/automation/src/jobs/wait-step.job.ts
@Injectable()
export class WaitStepJob {
constructor(
private readonly prisma: PrismaService,
private readonly journeyEngine: JourneyEngine,
) {}
@Cron('* * * * *') // Every minute
async processWaitingEnrollments(): Promise<void> {
const waitingEnrollments = await this.prisma.journeyEnrollment.findMany({
where: {
status: 'ACTIVE',
waitingUntil: { lte: new Date() },
},
include: {
journey: { include: { steps: true } },
},
});
for (const enrollment of waitingEnrollments) {
const currentStep = enrollment.journey.steps.find(
s => s.id === enrollment.currentStepId,
);
if (currentStep?.nextStepId) {
await this.prisma.journeyEnrollment.update({
where: { id: enrollment.id },
data: { waitingUntil: null },
});
await this.journeyEngine.processStep(enrollment.id, currentStep.nextStepId);
}
}
}
}
Next Steps
After completing Phase 5, proceed to Phase 6: Social Integration.