Skip to main content

Phase 5: Marketing Automation

Journey builder, event triggers, and step execution.


Deliverables

  • Journey CRUD
  • Journey step management
  • Journey engine (enrollment, step execution)
  • Event triggers for journey enrollment
  • Wait step processing (cron)
  • Condition evaluation
  • Journey analytics

1. Journey Service

// libs/crm/automation/src/services/journey.service.ts

@Injectable()
export class JourneyService {
constructor(
private readonly prisma: PrismaService,
private readonly journeyEngine: JourneyEngine,
) {}

async create(dto: CreateJourneyDto): Promise<Journey> {
return this.prisma.journey.create({
data: {
tenantId: dto.tenantId,
clubId: dto.clubId,
name: dto.name,
description: dto.description,
triggerType: dto.triggerType,
triggerConfig: dto.triggerConfig,
allowReentry: dto.allowReentry ?? false,
timezone: dto.timezone ?? 'Africa/Johannesburg',
createdBy: dto.createdBy,
},
});
}

async addStep(journeyId: string, dto: CreateJourneyStepDto): Promise<JourneyStep> {
const journey = await this.prisma.journey.findUnique({
where: { id: journeyId },
include: { steps: true },
});

if (!journey) throw new NotFoundException('Journey not found');
if (journey.status !== 'DRAFT') {
throw new BadRequestException('Cannot add steps to active journey');
}

const stepNumber = journey.steps.length + 1;

return this.prisma.journeyStep.create({
data: {
journeyId,
stepNumber,
type: dto.type,
name: dto.name,
config: dto.config,
nextStepId: dto.nextStepId,
nextStepYesId: dto.nextStepYesId,
nextStepNoId: dto.nextStepNoId,
},
});
}

async activate(journeyId: string): Promise<Journey> {
const journey = await this.prisma.journey.findUnique({
where: { id: journeyId },
include: { steps: true },
});

if (!journey) throw new NotFoundException('Journey not found');
if (journey.steps.length === 0) {
throw new BadRequestException('Journey must have at least one step');
}

const updated = await this.prisma.journey.update({
where: { id: journeyId },
data: { status: 'ACTIVE', activatedAt: new Date() },
});

// Register trigger listener
await this.journeyEngine.registerTrigger(updated);

return updated;
}

async pause(journeyId: string): Promise<Journey> {
return this.prisma.journey.update({
where: { id: journeyId },
data: { status: 'PAUSED', pausedAt: new Date() },
});
}
}

2. Journey Engine

// libs/crm/automation/src/services/journey-engine.service.ts

@Injectable()
export class JourneyEngine {
private readonly logger = new Logger(JourneyEngine.name);

constructor(
private readonly prisma: PrismaService,
@InjectQueue('journey-steps') private readonly stepQueue: Queue,
private readonly messagingClient: MessagingServiceClient,
) {}

async enrollCustomer(
journeyId: string,
customerId: string,
triggerData?: any,
): Promise<JourneyEnrollment> {
const journey = await this.prisma.journey.findUnique({
where: { id: journeyId },
include: { steps: { orderBy: { stepNumber: 'asc' } } },
});

if (!journey || journey.status !== 'ACTIVE') {
throw new BadRequestException('Journey not active');
}

// Check for existing enrollment (unless reentry allowed)
if (!journey.allowReentry) {
const existing = await this.prisma.journeyEnrollment.findUnique({
where: { journeyId_customerId: { journeyId, customerId } },
});
if (existing) {
this.logger.log(`Customer ${customerId} already enrolled in journey ${journeyId}`);
return existing;
}
}

const firstStep = journey.steps[0];

const enrollment = await this.prisma.journeyEnrollment.create({
data: {
tenantId: journey.tenantId,
journeyId,
customerId,
status: 'ACTIVE',
currentStepId: firstStep?.id,
currentStepNumber: 1,
triggerData,
},
});

// Update journey stats
await this.prisma.journey.update({
where: { id: journeyId },
data: { totalEnrolled: { increment: 1 } },
});

// Queue first step
if (firstStep) {
await this.queueStep(enrollment.id, firstStep.id);
}

this.logger.log(`Enrolled customer ${customerId} in journey ${journeyId}`);

return enrollment;
}

async processStep(enrollmentId: string, stepId: string): Promise<void> {
const enrollment = await this.prisma.journeyEnrollment.findUnique({
where: { id: enrollmentId },
include: { journey: true, customer: true },
});

const step = await this.prisma.journeyStep.findUnique({
where: { id: stepId },
});

if (!enrollment || !step || enrollment.status !== 'ACTIVE') return;

// Record step entry
await this.prisma.journeyEnrollmentStep.upsert({
where: { enrollmentId_stepId: { enrollmentId, stepId } },
create: {
enrollmentId,
stepId,
status: 'IN_PROGRESS',
},
update: {
status: 'IN_PROGRESS',
},
});

try {
const result = await this.executeStep(enrollment, step);

// Record completion
await this.prisma.journeyEnrollmentStep.update({
where: { enrollmentId_stepId: { enrollmentId, stepId } },
data: { status: 'COMPLETED', completedAt: new Date(), result },
});

// Determine next step
const nextStepId = this.determineNextStep(step, result);

if (nextStepId) {
await this.prisma.journeyEnrollment.update({
where: { id: enrollmentId },
data: {
currentStepId: nextStepId,
currentStepNumber: { increment: 1 },
},
});
await this.queueStep(enrollmentId, nextStepId);
} else {
// Journey complete
await this.completeEnrollment(enrollmentId);
}
} catch (error) {
this.logger.error(`Step ${stepId} failed for enrollment ${enrollmentId}: ${error.message}`);
await this.prisma.journeyEnrollmentStep.update({
where: { enrollmentId_stepId: { enrollmentId, stepId } },
data: { status: 'FAILED', result: { error: error.message } },
});
}
}

private async executeStep(
enrollment: JourneyEnrollment & { customer: CustomerProfile },
step: JourneyStep,
): Promise<any> {
const config = step.config as any;

switch (step.type) {
case 'SEND':
return this.executeSendStep(enrollment, config);

case 'WAIT':
return this.executeWaitStep(enrollment, config);

case 'CONDITION':
return this.executeConditionStep(enrollment, config);

case 'UPDATE':
return this.executeUpdateStep(enrollment, config);

case 'END':
return { completed: true };

default:
throw new Error(`Unknown step type: ${step.type}`);
}
}

private async executeSendStep(
enrollment: JourneyEnrollment & { customer: CustomerProfile },
config: { channel: string; templateId: string },
): Promise<any> {
await this.messagingClient.send({
channel: config.channel,
templateId: config.templateId,
recipient: {
email: enrollment.customer.email,
phone: enrollment.customer.phoneNumber,
},
data: {
firstName: enrollment.customer.firstName,
lastName: enrollment.customer.lastName,
...enrollment.triggerData,
},
});

return { sent: true, channel: config.channel };
}

private async executeWaitStep(
enrollment: JourneyEnrollment,
config: { duration: string },
): Promise<any> {
const waitUntil = this.calculateWaitUntil(config.duration);

await this.prisma.journeyEnrollment.update({
where: { id: enrollment.id },
data: { waitingUntil: waitUntil },
});

return { waitingUntil };
}

private calculateWaitUntil(duration: string): Date {
const now = new Date();
const match = duration.match(/^(\d+)\s*(minute|hour|day|week)s?$/i);

if (!match) throw new Error(`Invalid duration: ${duration}`);

const [, amount, unit] = match;
const num = parseInt(amount);

switch (unit.toLowerCase()) {
case 'minute':
return new Date(now.getTime() + num * 60 * 1000);
case 'hour':
return new Date(now.getTime() + num * 60 * 60 * 1000);
case 'day':
return new Date(now.getTime() + num * 24 * 60 * 60 * 1000);
case 'week':
return new Date(now.getTime() + num * 7 * 24 * 60 * 60 * 1000);
default:
throw new Error(`Unknown unit: ${unit}`);
}
}

private async executeConditionStep(
enrollment: JourneyEnrollment & { customer: CustomerProfile },
config: { rules: any[]; combinator: string },
): Promise<{ result: boolean }> {
const result = this.evaluateCondition(enrollment.customer, config);
return { result };
}

private evaluateCondition(
customer: CustomerProfile,
config: { rules: any[]; combinator: string },
): boolean {
const results = config.rules.map(rule => {
const value = customer[rule.field as keyof CustomerProfile];
return this.evaluateRule(value, rule.operator, rule.value);
});

return config.combinator === 'OR'
? results.some(r => r)
: results.every(r => r);
}

private evaluateRule(actual: any, operator: string, expected: any): boolean {
switch (operator) {
case 'eq':
return actual === expected;
case 'neq':
return actual !== expected;
case 'gt':
return actual > expected;
case 'gte':
return actual >= expected;
case 'lt':
return actual < expected;
case 'lte':
return actual <= expected;
case 'isNull':
return actual == null;
case 'isNotNull':
return actual != null;
default:
return false;
}
}

private async executeUpdateStep(
enrollment: JourneyEnrollment,
config: { updates: Array<{ field: string; value: any; operation?: string }> },
): Promise<any> {
const updates: Record<string, any> = {};

for (const update of config.updates) {
if (update.field === 'tags' && update.operation === 'add') {
updates.tags = { push: update.value };
} else {
updates[update.field] = update.value;
}
}

await this.prisma.customerProfile.update({
where: { id: enrollment.customerId },
data: updates,
});

return { updated: config.updates.map(u => u.field) };
}

private determineNextStep(step: JourneyStep, result: any): string | null {
if (step.type === 'CONDITION') {
return result.result ? step.nextStepYesId : step.nextStepNoId;
}
if (step.type === 'END') {
return null;
}
return step.nextStepId;
}

private async queueStep(enrollmentId: string, stepId: string): Promise<void> {
await this.stepQueue.add('process-step', { enrollmentId, stepId });
}

private async completeEnrollment(enrollmentId: string): Promise<void> {
const enrollment = await this.prisma.journeyEnrollment.update({
where: { id: enrollmentId },
data: { status: 'COMPLETED', completedAt: new Date() },
});

await this.prisma.journey.update({
where: { id: enrollment.journeyId },
data: { totalCompleted: { increment: 1 } },
});
}
}

3. Event Trigger Service

// libs/crm/automation/src/services/event-trigger.service.ts

@Injectable()
export class EventTriggerService implements OnModuleInit {
private triggerMap: Map<string, Journey[]> = new Map();

constructor(
private readonly prisma: PrismaService,
private readonly journeyEngine: JourneyEngine,
private readonly identityService: IdentityResolutionService,
) {}

async onModuleInit() {
await this.loadTriggers();
}

async loadTriggers(): Promise<void> {
const journeys = await this.prisma.journey.findMany({
where: { status: 'ACTIVE' },
});

this.triggerMap.clear();

for (const journey of journeys) {
const triggers = this.triggerMap.get(journey.triggerType) || [];
triggers.push(journey);
this.triggerMap.set(journey.triggerType, triggers);
}
}

async handleEvent(event: CrmEvent): Promise<void> {
const journeys = this.triggerMap.get(event.type) || [];

for (const journey of journeys) {
if (this.matchesTriggerConditions(event, journey.triggerConfig)) {
const { profile } = await this.identityService.resolve({
tenantId: event.tenantId,
...event.identity,
});

await this.journeyEngine.enrollCustomer(
journey.id,
profile.id,
event.payload,
);
}
}
}

private matchesTriggerConditions(event: CrmEvent, config: any): boolean {
if (!config?.conditions?.length) return true;

return config.conditions.every(condition => {
const value = event.payload[condition.field];
return this.evaluateCondition(value, condition.operator, condition.value);
});
}
}

4. Wait Step Processor

// libs/crm/automation/src/jobs/wait-step.job.ts

@Injectable()
export class WaitStepJob {
constructor(
private readonly prisma: PrismaService,
private readonly journeyEngine: JourneyEngine,
) {}

@Cron('* * * * *') // Every minute
async processWaitingEnrollments(): Promise<void> {
const waitingEnrollments = await this.prisma.journeyEnrollment.findMany({
where: {
status: 'ACTIVE',
waitingUntil: { lte: new Date() },
},
include: {
journey: { include: { steps: true } },
},
});

for (const enrollment of waitingEnrollments) {
const currentStep = enrollment.journey.steps.find(
s => s.id === enrollment.currentStepId,
);

if (currentStep?.nextStepId) {
await this.prisma.journeyEnrollment.update({
where: { id: enrollment.id },
data: { waitingUntil: null },
});

await this.journeyEngine.processStep(enrollment.id, currentStep.nextStepId);
}
}
}
}

Next Steps

After completing Phase 5, proceed to Phase 6: Social Integration.