Skip to main content

ML-Powered Scoring & SMART Segmentation

This document specifies the true Machine Learning scoring system that trains models on historical customer behavior data, replacing the rule-based weighted formula approach.


Executive Summary

The current scoring system uses rule-based weighted formulas (Phase 8 Intelligence). This specification defines a true ML scoring system that:

  1. Trains classification, regression, and clustering models on historical data
  2. Learns patterns from actual churn/engagement outcomes
  3. Powers SMART segment type with predictive segmentation
  4. Improves accuracy over time through continuous learning

Current State vs ML Approach

AspectRule-Based (Current)ML-Powered (Target)
LogicHand-crafted weightsLearned from data
Churn DetectionThreshold-based (score >= 70)Probabilistic prediction
Engagement ScoreWeighted sum of 5 factorsModel trained on outcomes
LTV PredictionHistorical sumRegression model
PersonalizationSame rules for allPer-tenant models
ImprovementManual tuningContinuous learning

1. Feature Engineering Pipeline

1.1 Feature Categories

interface CustomerFeatures {
// === TEMPORAL FEATURES ===
daysSinceLastActivity: number;
daysSinceLastBooking: number;
daysSinceLastPurchase: number;
daysSinceLastEmailOpen: number;
membershipAgeDays: number;

// === FREQUENCY FEATURES ===
activityCount7d: number;
activityCount30d: number;
activityCount90d: number;
bookingCount30d: number;
bookingCount90d: number;
emailOpenCount30d: number;
emailClickCount30d: number;

// === MONETARY FEATURES ===
totalSpendLifetime: number;
totalSpend90d: number;
totalSpend30d: number;
avgTransactionValue: number;

// === BEHAVIORAL FEATURES ===
uniqueActivityTypes30d: number; // breadth of engagement
peakActivityHour: number; // 0-23
preferredDayOfWeek: number; // 0-6
bookingLeadTimeDays: number; // avg days before booking
cancellationRate: number; // 0.0 - 1.0

// === ENGAGEMENT FEATURES ===
emailOpenRate: number; // 0.0 - 1.0
emailClickRate: number; // 0.0 - 1.0
campaignResponseRate: number; // 0.0 - 1.0
pushEngagementRate: number; // 0.0 - 1.0

// === SOCIAL FEATURES ===
connectedSocialPlatforms: number;
socialEngagementCount30d: number;

// === MEMBERSHIP FEATURES ===
membershipTierOrdinal: number; // 0=None, 1=Bronze, 2=Silver, etc.
tierUpgradeCount: number;
tierDowngradeCount: number;

// === TREND FEATURES (Δ values) ===
activityTrend: number; // (30d - prev30d) / prev30d
spendTrend: number; // spending momentum
engagementTrend: number; // engagement momentum

// === CATEGORICAL (one-hot encoded) ===
lifecycleStage: string; // SUBSCRIBER | LEAD | CUSTOMER | ADVOCATE
primaryChannel: string; // EMAIL | SMS | PUSH | WHATSAPP
acquisitionSource: string; // ORGANIC | REFERRAL | CAMPAIGN | IMPORT
}

1.2 Feature Extraction Service

// crm/services/src/ml/feature-extraction.service.ts

@Injectable()
export class FeatureExtractionService {
constructor(private readonly prisma: PrismaService) {}

/**
* Extract features for a single customer (real-time scoring)
*/
async extractFeatures(customerId: string): Promise<CustomerFeatures> {
const [profile, activities, bookings, interactions] = await Promise.all([
this.prisma.customerProfile.findUnique({ where: { id: customerId } }),
this.getActivityMetrics(customerId),
this.getBookingMetrics(customerId),
this.getCampaignInteractionMetrics(customerId),
]);

return this.computeFeatures(profile, activities, bookings, interactions);
}

/**
* Extract features for all customers (batch training)
*/
async extractBatchFeatures(
tenantId: string,
options?: { asOfDate?: Date }
): Promise<CustomerFeatureRow[]> {
// Point-in-time feature extraction for training
// Prevents data leakage by using asOfDate
const asOf = options?.asOfDate ?? new Date();

return this.prisma.$queryRaw`
WITH customer_features AS (
SELECT
cp.id as customer_id,
-- Temporal features
EXTRACT(DAY FROM (${asOf} - cp.last_activity_at)) as days_since_last_activity,
EXTRACT(DAY FROM (${asOf} - cp.last_booking_at)) as days_since_last_booking,
-- ... (full feature SQL)
FROM customer_profile cp
WHERE cp.tenant_id = ${tenantId}
AND cp.status = 'ACTIVE'
)
SELECT * FROM customer_features
`;
}

/**
* Extract features with known labels for training
*/
async extractTrainingData(
tenantId: string,
labelType: 'CHURN' | 'HIGH_VALUE' | 'ENGAGEMENT',
lookbackDays: number = 365,
labelWindowDays: number = 90
): Promise<TrainingDataset> {
// Get historical snapshots where we know the outcome
// Example: Features from 90 days ago, label = did they churn in next 90 days?
const asOfDate = this.daysAgo(labelWindowDays);

const features = await this.extractBatchFeatures(tenantId, { asOfDate });
const labels = await this.extractLabels(tenantId, labelType, asOfDate, labelWindowDays);

return { features, labels, asOfDate };
}
}

1.3 Feature Store

// libs/prisma/crm-client/prisma/schema.prisma

model CustomerFeatureSnapshot {
id String @id @default(cuid())
customerId String
tenantId String
snapshotDate DateTime
features Json // Full CustomerFeatures object
computedAt DateTime @default(now())

@@unique([customerId, snapshotDate])
@@index([tenantId, snapshotDate])
}

model MLTrainingDataset {
id String @id @default(cuid())
tenantId String
modelType MLModelType
asOfDate DateTime
labelWindowDays Int
sampleCount Int
featureCount Int
positiveRate Float // Class balance
datasetPath String // S3/GCS path to parquet file
status DatasetStatus @default(PENDING)
createdAt DateTime @default(now())

@@index([tenantId, modelType])
}

enum MLModelType {
CHURN_CLASSIFIER
ENGAGEMENT_REGRESSOR
LTV_REGRESSOR
CUSTOMER_CLUSTER
}

enum DatasetStatus {
PENDING
READY
TRAINING
ARCHIVED
}

// CustomerProfile ML field extensions (add to existing model)
// model CustomerProfile {
// ...existing fields...
//
// // ML Scoring fields
// predictedLTV Int? // Predicted lifetime value (cents)
// predictedLTVAt DateTime? // When LTV was predicted
// mlClusterId String? // Customer cluster assignment
// mlClusterName String? // Human-readable cluster name
// mlScoredAt DateTime? // Last ML scoring timestamp
// mlModelVersion String? // Version of model used for scoring
// }

2. Model Architecture

2.1 Model Types

ModelTypeInputOutputUse Case
ChurnClassifierBinary ClassificationFeaturesP(churn in 90d)At-risk detection
EngagementRegressorRegressionFeaturesScore 0-100Engagement ranking
LTVRegressorRegressionFeaturesPredicted LTVCustomer value
CustomerClusterClusteringFeaturesCluster ID + NameAuto-segmentation

2.2 Model Definition

// crm/services/src/ml/models/model.interface.ts

interface MLModel<TInput, TOutput> {
readonly modelId: string;
readonly modelType: MLModelType;
readonly version: string;
readonly tenantId: string;

predict(input: TInput): Promise<TOutput>;
predictBatch(inputs: TInput[]): Promise<TOutput[]>;

getFeatureImportance(): Record<string, number>;
getModelMetrics(): ModelMetrics;
}

interface ModelMetrics {
// Classification metrics
accuracy?: number;
precision?: number;
recall?: number;
f1Score?: number;
aucRoc?: number;

// Regression metrics
mae?: number; // Mean Absolute Error
rmse?: number; // Root Mean Square Error
r2Score?: number; // R-squared

// Common
trainSamples: number;
testSamples: number;
trainedAt: Date;
validatedAt: Date;
}

interface ChurnPrediction {
customerId: string;
churnProbability: number; // 0.0 - 1.0
churnRiskLevel: 'LOW' | 'MEDIUM' | 'HIGH' | 'CRITICAL';
topRiskFactors: RiskFactor[];
confidence: number; // Model confidence
}

interface RiskFactor {
feature: string;
value: number;
contribution: number; // SHAP value or similar
humanReadable: string; // "No bookings in 60 days"
}

2.3 Model Registry

model MLModelVersion {
id String @id @default(cuid())
tenantId String
modelType MLModelType
version String // Semantic version
status ModelStatus @default(TRAINING)

// Training info
trainingDatasetId String?
hyperparameters Json?
trainedAt DateTime?

// Validation metrics
metrics Json? // ModelMetrics
validationSetId String?

// Deployment
artifactPath String? // S3/GCS path to model file
deployedAt DateTime?
isProduction Boolean @default(false)

// Lineage
previousVersionId String?
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt

@@unique([tenantId, modelType, version])
@@index([tenantId, modelType, isProduction])
}

enum ModelStatus {
TRAINING
VALIDATING
STAGED
PRODUCTION
DEPRECATED
FAILED
}

3. Training Infrastructure

3.1 Training Pipeline

┌─────────────────┐     ┌─────────────────┐     ┌─────────────────┐
│ Feature │ │ Training │ │ Model │
│ Extraction │────▶│ Job │────▶│ Registry │
│ (batch) │ │ (ML runtime) │ │ (versioned) │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Feature Store │ │ Experiment │ │ A/B Test │
│ (snapshots) │ │ Tracking │ │ Deployment │
└─────────────────┘ └─────────────────┘ └─────────────────┘

3.2 Training Job Service

// crm/services/src/ml/training/training-job.service.ts

@Injectable()
export class MLTrainingJobService {
constructor(
private readonly featureService: FeatureExtractionService,
private readonly modelRegistry: MLModelRegistryService,
private readonly queue: BullQueueService,
) {}

/**
* Schedule a training job for a specific model type
*/
async scheduleTraining(
tenantId: string,
modelType: MLModelType,
config?: TrainingConfig
): Promise<string> {
const jobId = cuid();

await this.queue.add('ml-training', {
jobId,
tenantId,
modelType,
config: {
lookbackDays: config?.lookbackDays ?? 365,
labelWindowDays: config?.labelWindowDays ?? 90,
testSplitRatio: config?.testSplitRatio ?? 0.2,
hyperparameters: config?.hyperparameters ?? this.getDefaultHyperparameters(modelType),
},
});

return jobId;
}

/**
* Execute training job (called by worker)
*/
async executeTraining(job: TrainingJob): Promise<void> {
const { tenantId, modelType, config } = job;

// 1. Extract training data
const dataset = await this.featureService.extractTrainingData(
tenantId,
this.modelTypeToLabelType(modelType),
config.lookbackDays,
config.labelWindowDays
);

// 2. Save dataset to feature store
const datasetRecord = await this.saveDataset(tenantId, modelType, dataset);

// 3. Create model version
const modelVersion = await this.modelRegistry.createVersion(tenantId, modelType);

// 4. Train model (delegates to ML runtime)
const trainingResult = await this.trainModel(modelType, dataset, config);

// 5. Validate model
const metrics = await this.validateModel(trainingResult.model, dataset.testSet);

// 6. Save model artifacts
await this.modelRegistry.saveArtifacts(modelVersion.id, {
modelPath: trainingResult.artifactPath,
metrics,
featureImportance: trainingResult.featureImportance,
});

// 7. Auto-promote if better than production
await this.maybePromoteModel(tenantId, modelType, modelVersion.id, metrics);
}

private getDefaultHyperparameters(modelType: MLModelType): Hyperparameters {
switch (modelType) {
case MLModelType.CHURN_CLASSIFIER:
return {
algorithm: 'GRADIENT_BOOSTING', // XGBoost/LightGBM
maxDepth: 6,
learningRate: 0.1,
numTrees: 100,
minSamplesLeaf: 20,
classWeight: 'balanced', // Handle imbalanced churn data
};
case MLModelType.ENGAGEMENT_REGRESSOR:
case MLModelType.LTV_REGRESSOR:
return {
algorithm: 'GRADIENT_BOOSTING',
maxDepth: 8,
learningRate: 0.05,
numTrees: 200,
};
case MLModelType.CUSTOMER_CLUSTER:
return {
algorithm: 'KMEANS',
numClusters: 5,
maxIterations: 300,
// Or: algorithm: 'HDBSCAN' for auto cluster discovery
};
}
}
}

3.3 ML Runtime Options

// crm/services/src/ml/runtime/ml-runtime.interface.ts

interface MLRuntime {
train(
modelType: MLModelType,
dataset: TrainingDataset,
hyperparameters: Hyperparameters
): Promise<TrainedModel>;

predict(modelPath: string, features: CustomerFeatures): Promise<Prediction>;
predictBatch(modelPath: string, features: CustomerFeatures[]): Promise<Prediction[]>;
}

// Implementation options:

// Option 1: Python microservice with scikit-learn/XGBoost
class PythonMLRuntime implements MLRuntime { ... }

// Option 2: Cloud ML (AWS SageMaker, GCP Vertex AI)
class SageMakerRuntime implements MLRuntime { ... }

// Option 3: Embedded ONNX runtime (for simple models)
class ONNXRuntime implements MLRuntime { ... }

// Option 4: TensorFlow.js (browser/Node compatible)
class TFJSRuntime implements MLRuntime { ... }

3.4 Training Schedule

// crm/services/src/ml/training/training-scheduler.service.ts

@Injectable()
export class MLTrainingSchedulerService {
/**
* Scheduled model retraining
* Runs weekly to incorporate new data
*/
@Cron('0 3 * * 0') // Sunday 3 AM
async scheduledRetraining(): Promise<void> {
const tenants = await this.prisma.tenant.findMany({
where: { mlEnabled: true },
});

for (const tenant of tenants) {
// Retrain each model type
for (const modelType of Object.values(MLModelType)) {
await this.trainingJobService.scheduleTraining(tenant.id, modelType);
}
}
}

/**
* Trigger retraining when data drift detected
*/
async triggerRetrainingOnDrift(tenantId: string, modelType: MLModelType): Promise<void> {
const driftScore = await this.calculateDataDrift(tenantId, modelType);

if (driftScore > 0.15) { // 15% drift threshold
this.logger.warn(`Data drift detected for ${modelType}, triggering retraining`);
await this.trainingJobService.scheduleTraining(tenantId, modelType);
}
}
}

4. Scoring Service

4.1 ML Scoring Service

// crm/services/src/ml/scoring/ml-scoring.service.ts

@Injectable()
export class MLScoringService {
constructor(
private readonly featureService: FeatureExtractionService,
private readonly modelRegistry: MLModelRegistryService,
private readonly runtime: MLRuntime,
private readonly cache: CacheService,
) {}

/**
* Get churn prediction for a customer
*/
async predictChurn(customerId: string): Promise<ChurnPrediction> {
const features = await this.featureService.extractFeatures(customerId);
const model = await this.modelRegistry.getProductionModel(
features.tenantId,
MLModelType.CHURN_CLASSIFIER
);

const prediction = await this.runtime.predict(model.artifactPath, features);

return {
customerId,
churnProbability: prediction.probability,
churnRiskLevel: this.probabilityToRiskLevel(prediction.probability),
topRiskFactors: this.extractTopFactors(prediction, features),
confidence: prediction.confidence,
};
}

/**
* Get engagement score from ML model
*/
async predictEngagement(customerId: string): Promise<EngagementPrediction> {
const features = await this.featureService.extractFeatures(customerId);
const model = await this.modelRegistry.getProductionModel(
features.tenantId,
MLModelType.ENGAGEMENT_REGRESSOR
);

const prediction = await this.runtime.predict(model.artifactPath, features);

return {
customerId,
engagementScore: Math.round(prediction.value),
percentile: await this.calculatePercentile(features.tenantId, prediction.value),
topDrivers: this.extractTopFactors(prediction, features),
};
}

/**
* Batch scoring for all customers (nightly job)
*/
async batchScoreAllCustomers(tenantId: string): Promise<BatchScoringResult> {
const customers = await this.prisma.customerProfile.findMany({
where: { tenantId, status: 'ACTIVE' },
select: { id: true },
});

const batchSize = 100;
let processed = 0;
let updated = 0;

for (let i = 0; i < customers.length; i += batchSize) {
const batch = customers.slice(i, i + batchSize);
const features = await Promise.all(
batch.map(c => this.featureService.extractFeatures(c.id))
);

// Get all predictions in parallel
const [churnPredictions, engagementPredictions, ltvPredictions] = await Promise.all([
this.predictChurnBatch(tenantId, features),
this.predictEngagementBatch(tenantId, features),
this.predictLTVBatch(tenantId, features),
]);

// Update profiles
await Promise.all(batch.map((customer, idx) =>
this.prisma.customerProfile.update({
where: { id: customer.id },
data: {
churnRiskScore: Math.round(churnPredictions[idx].probability * 100),
churnRiskScoreAt: new Date(),
engagementScore: engagementPredictions[idx].value,
engagementScoreAt: new Date(),
predictedLTV: ltvPredictions[idx].value,
predictedLTVAt: new Date(),
mlScoredAt: new Date(),
},
})
));

processed += batch.length;
updated += batch.length;
}

return { processed, updated, errors: 0 };
}

private probabilityToRiskLevel(p: number): ChurnRiskLevel {
if (p >= 0.8) return 'CRITICAL';
if (p >= 0.6) return 'HIGH';
if (p >= 0.3) return 'MEDIUM';
return 'LOW';
}

private extractTopFactors(
prediction: Prediction,
features: CustomerFeatures
): RiskFactor[] {
// Use SHAP values or feature importance
return prediction.featureContributions
.sort((a, b) => Math.abs(b.contribution) - Math.abs(a.contribution))
.slice(0, 5)
.map(fc => ({
feature: fc.featureName,
value: features[fc.featureName],
contribution: fc.contribution,
humanReadable: this.featureToHumanReadable(fc.featureName, features[fc.featureName]),
}));
}

private featureToHumanReadable(feature: string, value: any): string {
const templates: Record<string, (v: any) => string> = {
daysSinceLastActivity: (v) => `No activity in ${v} days`,
daysSinceLastBooking: (v) => `No bookings in ${v} days`,
emailOpenRate: (v) => `Email open rate: ${(v * 100).toFixed(0)}%`,
activityTrend: (v) => v < 0 ? `Activity declining by ${(-v * 100).toFixed(0)}%` : `Activity up ${(v * 100).toFixed(0)}%`,
// ... more templates
};

return templates[feature]?.(value) ?? `${feature}: ${value}`;
}
}

4.2 Scoring Events

// crm/services/src/crm.events.ts (additions)

export enum CrmEvent {
// ... existing events

// ML Scoring events
MLChurnPredicted = 'crm.ml.churn.predicted',
MLEngagementPredicted = 'crm.ml.engagement.predicted',
MLLTVPredicted = 'crm.ml.ltv.predicted',
MLBatchScoringCompleted = 'crm.ml.batch.completed',
MLModelTrained = 'crm.ml.model.trained',
MLModelDeployed = 'crm.ml.model.deployed',
MLModelRolledBack = 'crm.ml.model.rolled_back',
MLDataDriftDetected = 'crm.ml.drift.detected',
MLBiasDetected = 'crm.ml.bias.detected',
}

export interface MLPredictionEvent {
tenantId: string;
customerId: string;
modelType: MLModelType;
modelVersion: string;
prediction: number;
previousPrediction?: number;
change?: number;
topFactors: RiskFactor[];
predictedAt: Date;
}

5. SMART Segment Integration

5.1 SMART Segment Definition

SMART segments use ML models for membership instead of rule-based criteria:

// crm/services/src/segments/smart-segment.service.ts

interface SmartSegmentConfig {
segmentType: 'SMART';
mlConfig: {
modelType: MLModelType;
threshold?: number; // For classifiers (default: 0.5)
thresholdComparison?: 'gt' | 'lt' | 'gte' | 'lte';
clusterId?: string; // For cluster models
topPercentile?: number; // "Top 10% by predicted LTV"
bottomPercentile?: number; // "Bottom 20% by engagement"
};
}

@Injectable()
export class SmartSegmentService {
constructor(
private readonly mlScoring: MLScoringService,
private readonly segmentService: SegmentService,
) {}

/**
* Evaluate SMART segment membership using ML model
*/
async evaluateSmartSegment(segmentId: string): Promise<SegmentMembership[]> {
const segment = await this.segmentService.getSegment(segmentId);

if (segment.type !== 'SMART') {
throw new Error('Not a SMART segment');
}

const config = segment.mlConfig as SmartSegmentConfig['mlConfig'];

switch (config.modelType) {
case MLModelType.CHURN_CLASSIFIER:
return this.evaluateChurnSegment(segment.tenantId, config);

case MLModelType.ENGAGEMENT_REGRESSOR:
return this.evaluateEngagementSegment(segment.tenantId, config);

case MLModelType.LTV_REGRESSOR:
return this.evaluateLTVSegment(segment.tenantId, config);

case MLModelType.CUSTOMER_CLUSTER:
return this.evaluateClusterSegment(segment.tenantId, config);
}
}

private async evaluateChurnSegment(
tenantId: string,
config: SmartSegmentConfig['mlConfig']
): Promise<string[]> {
// "Likely to churn" segment
const threshold = config.threshold ?? 0.7;
const comparison = config.thresholdComparison ?? 'gte';

const customers = await this.prisma.customerProfile.findMany({
where: {
tenantId,
status: 'ACTIVE',
churnRiskScore: this.buildScoreFilter(threshold * 100, comparison),
},
select: { id: true },
});

return customers.map(c => c.id);
}

private async evaluateEngagementSegment(
tenantId: string,
config: SmartSegmentConfig['mlConfig']
): Promise<string[]> {
// "Top 10% most engaged" or "Bottom 20% engagement"
if (config.topPercentile) {
return this.getTopPercentile(tenantId, 'engagementScore', config.topPercentile);
}
if (config.bottomPercentile) {
return this.getBottomPercentile(tenantId, 'engagementScore', config.bottomPercentile);
}

const threshold = config.threshold ?? 70;
return this.getByThreshold(tenantId, 'engagementScore', threshold, config.thresholdComparison ?? 'gte');
}

private async evaluateClusterSegment(
tenantId: string,
config: SmartSegmentConfig['mlConfig']
): Promise<string[]> {
// Customers in a specific cluster
const customers = await this.prisma.customerProfile.findMany({
where: {
tenantId,
status: 'ACTIVE',
mlClusterId: config.clusterId,
},
select: { id: true },
});

return customers.map(c => c.id);
}
}

5.2 Pre-built SMART Segments

// System SMART segments created automatically

const systemSmartSegments: SmartSegmentDefinition[] = [
{
name: 'Likely to Churn (ML)',
description: 'Customers predicted to churn in next 90 days',
type: 'SMART',
mlConfig: {
modelType: MLModelType.CHURN_CLASSIFIER,
threshold: 0.7,
thresholdComparison: 'gte',
},
isSystem: true,
autoRefresh: 'DAILY',
},
{
name: 'High Value (Top 10%)',
description: 'Top 10% customers by predicted lifetime value',
type: 'SMART',
mlConfig: {
modelType: MLModelType.LTV_REGRESSOR,
topPercentile: 10,
},
isSystem: true,
autoRefresh: 'WEEKLY',
},
{
name: 'Re-engagement Candidates',
description: 'Previously engaged but declining activity',
type: 'SMART',
mlConfig: {
modelType: MLModelType.ENGAGEMENT_REGRESSOR,
bottomPercentile: 20,
},
// Additional rule: must have had high engagement before
combinedRules: [
{ field: 'activityCount90d', operator: 'gte', value: 10 },
],
isSystem: true,
autoRefresh: 'DAILY',
},
];

5.3 Customer Clustering

// crm/services/src/ml/clustering/customer-cluster.service.ts

interface CustomerCluster {
clusterId: string;
name: string; // Auto-generated or user-defined
description: string; // ML-generated description
memberCount: number;
centroidFeatures: Record<string, number>; // Cluster center
topDefiningFeatures: string[]; // What makes this cluster unique
}

@Injectable()
export class CustomerClusterService {
/**
* Get all clusters for a tenant
*/
async getClusters(tenantId: string): Promise<CustomerCluster[]> {
const model = await this.modelRegistry.getProductionModel(
tenantId,
MLModelType.CUSTOMER_CLUSTER
);

return model.metadata.clusters as CustomerCluster[];
}

/**
* Generate human-readable cluster names using feature analysis
*/
generateClusterNames(clusters: ClusterResult[]): CustomerCluster[] {
return clusters.map(cluster => {
const topFeatures = this.getTopDistinguishingFeatures(cluster);

return {
clusterId: cluster.id,
name: this.generateNameFromFeatures(topFeatures),
description: this.generateDescription(cluster, topFeatures),
memberCount: cluster.memberCount,
centroidFeatures: cluster.centroid,
topDefiningFeatures: topFeatures.map(f => f.name),
};
});
}

private generateNameFromFeatures(features: FeatureAnalysis[]): string {
// Examples:
// High bookingCount30d + high spendTrend → "Active High Spenders"
// Low activityCount30d + high membershipAgeDays → "Dormant Veterans"
// High emailOpenRate + low bookingCount30d → "Engaged Non-Bookers"

const patterns: ClusterPattern[] = [
{
conditions: [
{ feature: 'bookingCount30d', direction: 'high' },
{ feature: 'totalSpend90d', direction: 'high' },
],
name: 'Active High Spenders',
},
{
conditions: [
{ feature: 'daysSinceLastActivity', direction: 'high' },
{ feature: 'membershipAgeDays', direction: 'high' },
],
name: 'Dormant Veterans',
},
// ... more patterns
];

return this.matchPattern(features, patterns);
}
}

6. API Endpoints

6.1 ML Scoring API

// apps/crm/crm-backend/src/app/ml/ml.controller.ts

@ApiTags('ml')
@Controller('api/v1/ml')
export class MLController {
constructor(
private readonly mlScoring: MLScoringService,
private readonly clusterService: CustomerClusterService,
private readonly trainingService: MLTrainingJobService,
) {}

@Get('customers/:customerId/predictions')
@ApiOperation({ summary: 'Get all ML predictions for a customer' })
async getCustomerPredictions(
@Param('customerId') customerId: string
): Promise<CustomerPredictions> {
const [churn, engagement, ltv, cluster] = await Promise.all([
this.mlScoring.predictChurn(customerId),
this.mlScoring.predictEngagement(customerId),
this.mlScoring.predictLTV(customerId),
this.clusterService.getCustomerCluster(customerId),
]);

return { churn, engagement, ltv, cluster };
}

@Get('customers/:customerId/churn')
@ApiOperation({ summary: 'Get churn prediction with explanations' })
async getChurnPrediction(
@Param('customerId') customerId: string
): Promise<ChurnPrediction> {
return this.mlScoring.predictChurn(customerId);
}

@Get('clusters')
@ApiOperation({ summary: 'Get customer clusters' })
async getClusters(
@Query('tenantId') tenantId: string
): Promise<CustomerCluster[]> {
return this.clusterService.getClusters(tenantId);
}

@Get('clusters/:clusterId/customers')
@ApiOperation({ summary: 'Get customers in a cluster' })
async getClusterCustomers(
@Param('clusterId') clusterId: string,
@Query('tenantId') tenantId: string,
@Query('limit') limit: number = 100,
@Query('offset') offset: number = 0
): Promise<PaginatedCustomers> {
return this.clusterService.getCustomersInCluster(tenantId, clusterId, { limit, offset });
}

@Post('training/schedule')
@ApiOperation({ summary: 'Schedule model retraining' })
async scheduleTraining(
@Body() dto: ScheduleTrainingDto
): Promise<{ jobId: string }> {
const jobId = await this.trainingService.scheduleTraining(
dto.tenantId,
dto.modelType,
dto.config
);
return { jobId };
}

@Get('models')
@ApiOperation({ summary: 'Get model versions and status' })
async getModels(
@Query('tenantId') tenantId: string
): Promise<ModelSummary[]> {
return this.modelRegistry.getModelSummary(tenantId);
}
}

7. Migration Path

7.1 Phased Rollout

PhaseDurationActivities
Phase 1: Data Collection2 weeksEnable feature snapshots, collect training labels
Phase 2: Model Development4 weeksTrain initial models, validate accuracy
Phase 3: Shadow Mode2 weeksRun ML alongside rule-based, compare
Phase 4: Gradual Rollout4 weeks10% → 50% → 100% of tenants
Phase 5: Full MLOngoingDeprecate rule-based, continuous improvement

7.2 Fallback Strategy

// crm/services/src/ml/scoring/scoring-router.service.ts

@Injectable()
export class ScoringRouterService {
constructor(
private readonly mlScoring: MLScoringService,
private readonly ruleScoring: EngagementScoringService, // Existing
private readonly config: ConfigService,
) {}

async getChurnScore(customerId: string): Promise<number> {
const useML = await this.shouldUseML(customerId);

if (useML) {
try {
const prediction = await this.mlScoring.predictChurn(customerId);
return Math.round(prediction.churnProbability * 100);
} catch (error) {
this.logger.warn(`ML scoring failed, falling back to rules: ${error.message}`);
return this.ruleScoring.calculateChurnRisk(customerId);
}
}

return this.ruleScoring.calculateChurnRisk(customerId);
}

private async shouldUseML(customerId: string): Promise<boolean> {
// Check feature flag
const mlEnabled = this.config.get('ML_SCORING_ENABLED', false);
if (!mlEnabled) return false;

// Check if tenant has trained model
const profile = await this.prisma.customerProfile.findUnique({
where: { id: customerId },
select: { tenantId: true },
});

const hasModel = await this.modelRegistry.hasProductionModel(
profile.tenantId,
MLModelType.CHURN_CLASSIFIER
);

return hasModel;
}
}

7.3 A/B Testing ML vs Rule-Based

// Use existing SPLIT step infrastructure for model comparison

const mlComparisonJourney = {
name: 'Churn Prevention - ML A/B Test',
steps: [
{
type: 'SPLIT',
config: {
variants: [
{ name: 'Rule-Based', weight: 50, nextStepId: 'rule-send' },
{ name: 'ML-Powered', weight: 50, nextStepId: 'ml-send' },
],
deterministic: true, // Same customer always in same group
},
},
// ... different send steps for each variant
],
};

8. Monitoring & Observability

8.1 Model Performance Dashboard

interface MLDashboardMetrics {
// Model health
modelAccuracy: number;
predictionLatencyP50: number;
predictionLatencyP99: number;

// Drift detection
featureDriftScore: number;
labelDriftScore: number;
lastDriftCheckAt: Date;

// Business impact
churnPredictionAccuracy: number; // Did predicted churners actually churn?
falsePositiveRate: number; // Predicted churn but didn't
falseNegativeRate: number; // Didn't predict but churned

// Coverage
customersScored: number;
scoringCoverage: number; // % of active customers scored
}

8.2 Alerting

// Alerts to configure:
const mlAlerts = [
{
name: 'Model Accuracy Drop',
condition: 'accuracy < 0.7',
severity: 'HIGH',
action: 'RETRAIN',
},
{
name: 'High Prediction Latency',
condition: 'p99_latency > 500ms',
severity: 'MEDIUM',
action: 'SCALE_UP',
},
{
name: 'Feature Drift Detected',
condition: 'drift_score > 0.15',
severity: 'MEDIUM',
action: 'INVESTIGATE',
},
{
name: 'Model Serving Error Rate',
condition: 'error_rate > 1%',
severity: 'HIGH',
action: 'FALLBACK_TO_RULES',
},
];

9. Data Requirements

9.1 Minimum Data for Training

ModelMin CustomersMin HistoryMin Positive Labels
Churn Classifier1,0006 months50 churned
Engagement Regressor5003 monthsN/A
LTV Regressor1,00012 monthsN/A
Customer Cluster5003 monthsN/A

9.2 Label Definitions

interface LabelDefinitions {
churn: {
// Customer churned if:
// 1. Membership cancelled/expired AND
// 2. No activity in 90 days AND
// 3. Previously had activity
lookbackDays: 90,
requireMembershipEnd: true,
requireInactivity: true,
};

highValue: {
// Top 20% by lifetime value
percentile: 80,
metric: 'lifetimeValue',
};

engaged: {
// Engagement score >= 70 (current threshold)
// Or: activity in last 30 days + opened email in last 14 days
activityRecency: 30,
emailRecency: 14,
};
}

10. Implementation Phases

Phase 1: Feature Engineering (Week 1-2)

  • Implement FeatureExtractionService
  • Create CustomerFeatureSnapshot model
  • Build point-in-time feature extraction
  • Add feature snapshot cron job

Phase 2: Training Infrastructure (Week 3-4)

  • Set up ML runtime (Python service or cloud ML)
  • Implement MLTrainingJobService
  • Create model registry
  • Build training data pipeline

Phase 3: Churn Model (Week 5-6)

  • Train initial churn classifier
  • Validate against historical data
  • Implement SHAP explanations
  • Deploy in shadow mode

Phase 4: Full Model Suite (Week 7-8)

  • Train engagement regressor
  • Train LTV regressor
  • Implement customer clustering
  • Generate cluster names/descriptions

Phase 5: SMART Segments (Week 9-10)

  • Implement SmartSegmentService
  • Create system SMART segments
  • Add cluster-based segments
  • Integrate with segment sync

Phase 6: Rollout (Week 11-12)

  • Enable for pilot tenants
  • Monitor and iterate
  • Full rollout
  • Deprecate rule-based scoring (optional)

11. Privacy, Compliance & Governance

11.1 POPIA/GDPR Compliance

interface MLPrivacyConfig {
// Data minimization - only use necessary features
allowedFeatures: string[]; // Whitelist of permitted features
excludedFeatures: string[]; // Blacklist (e.g., race, religion)

// Right to explanation
explainabilityRequired: boolean; // Must provide prediction reasons
maxStoredPredictions: number; // Retention limit per customer

// Consent
requireExplicitConsent: boolean; // Opt-in for ML scoring
consentFieldName: string; // Field tracking ML consent
}

// Excluded features for fairness
const EXCLUDED_FEATURES = [
'gender', // Protected characteristic
'dateOfBirth', // Age discrimination risk
'postalCode', // Proxy for race/income in SA
'country', // National origin
];

11.2 Model Bias Detection

// crm/services/src/ml/fairness/bias-detection.service.ts

interface BiasMetrics {
// Demographic parity - predictions should be similar across groups
demographicParity: Record<string, number>;

// Equal opportunity - true positive rates should be equal
equalOpportunity: Record<string, number>;

// Predictive parity - precision should be equal
predictiveParity: Record<string, number>;
}

@Injectable()
export class BiasDetectionService {
/**
* Check model for bias across membership tiers
* (proxy for socioeconomic status)
*/
async detectTierBias(
tenantId: string,
modelType: MLModelType
): Promise<BiasReport> {
const predictions = await this.getPredictionsByTier(tenantId);

const biasMetrics = {
churnRateByTier: this.calculateChurnRateByGroup(predictions, 'membershipTier'),
falsePositiveByTier: this.calculateFPRByGroup(predictions, 'membershipTier'),
};

// Flag if disparity > 20%
const hasBias = this.detectSignificantDisparity(biasMetrics, 0.2);

return {
hasBias,
metrics: biasMetrics,
recommendation: hasBias
? 'Consider retraining with balanced sampling or adversarial debiasing'
: 'No significant bias detected',
};
}
}

11.3 Data Retention Policy

const ML_DATA_RETENTION = {
// Feature snapshots
featureSnapshots: {
retentionDays: 365, // Keep 1 year for retraining
aggregatedRetention: 730, // Keep aggregated stats 2 years
},

// Predictions
individualPredictions: {
retentionDays: 90, // Keep detailed predictions 90 days
auditLogRetention: 365, // Keep audit log 1 year
},

// Training data
trainingDatasets: {
retentionVersions: 5, // Keep last 5 dataset versions
anonymizeAfterDays: 180, // Anonymize older datasets
},

// Models
modelArtifacts: {
retentionVersions: 10, // Keep last 10 model versions
deprecatedRetentionDays: 90, // Keep deprecated models 90 days
},
};

12. Cost Optimization

12.1 Cloud ML Cost Estimates

ComponentServiceEst. Monthly CostNotes
TrainingSageMaker/Vertex$50-200/tenantWeekly retraining
InferenceSageMaker Endpoint$100-500/tenantReal-time predictions
Feature StoreS3/GCS$10-50/tenantSnapshot storage
Model RegistryS3/GCS$5-20/tenantArtifact storage

12.2 Cost Reduction Strategies

interface CostOptimizationConfig {
// Batch scoring instead of real-time
batchScoringEnabled: boolean; // Nightly batch vs on-demand
batchScoringFrequency: 'DAILY' | 'WEEKLY';

// Model caching
predictionCacheTTL: number; // Cache predictions (hours)
featureCacheTTL: number; // Cache extracted features (hours)

// Tiered inference
useSpotInstances: boolean; // For training jobs
useServerless: boolean; // For low-volume tenants

// Model complexity
maxModelSize: 'SMALL' | 'MEDIUM' | 'LARGE';
useQuantization: boolean; // Reduce model size
}

// Recommended config by tenant size
const COST_TIERS = {
small: { // < 1,000 customers
batchScoringEnabled: true,
predictionCacheTTL: 24,
useServerless: true,
maxModelSize: 'SMALL',
},
medium: { // 1,000 - 10,000 customers
batchScoringEnabled: true,
predictionCacheTTL: 12,
useSpotInstances: true,
maxModelSize: 'MEDIUM',
},
large: { // > 10,000 customers
batchScoringEnabled: false, // Real-time available
predictionCacheTTL: 1,
useServerless: false,
maxModelSize: 'LARGE',
},
};

13. Cold Start Handling

13.1 New Tenant (No Historical Data)

@Injectable()
export class ColdStartService {
/**
* Handle new tenant with no training data
*/
async handleNewTenant(tenantId: string): Promise<ScoringStrategy> {
const customerCount = await this.getCustomerCount(tenantId);
const historyDays = await this.getDataHistoryDays(tenantId);

// Phase 1: No ML (< 3 months data)
if (historyDays < 90) {
return {
strategy: 'RULE_BASED',
reason: 'Insufficient historical data for ML training',
estimatedMLReadiness: this.addDays(new Date(), 90 - historyDays),
};
}

// Phase 2: Transfer learning (3-6 months, < 1000 customers)
if (historyDays < 180 || customerCount < 1000) {
return {
strategy: 'TRANSFER_LEARNING',
reason: 'Using pre-trained model fine-tuned on tenant data',
baseModel: 'global_churn_v1',
};
}

// Phase 3: Full ML
return {
strategy: 'TENANT_SPECIFIC_ML',
reason: 'Sufficient data for tenant-specific model',
};
}

/**
* Global model trained on anonymized multi-tenant data
* Used as base for transfer learning
*/
async getGlobalBaseModel(modelType: MLModelType): Promise<MLModel> {
return this.modelRegistry.getGlobalModel(modelType);
}
}

13.2 New Customer (No Activity History)

interface NewCustomerScoring {
// Use cohort-based predictions
cohortFeatures: {
acquisitionSource: string;
membershipTier: string;
signupChannel: string;
};

// Default scores based on cohort averages
defaultChurnRisk: number; // Cohort average churn rate
defaultEngagement: number; // Cohort average engagement
confidenceLevel: 'LOW'; // Flag as low confidence
}

async function scoreNewCustomer(customerId: string): Promise<Prediction> {
const profile = await this.getProfile(customerId);
const cohort = await this.findMatchingCohort(profile);

return {
churnProbability: cohort.avgChurnRate,
engagementScore: cohort.avgEngagementScore,
confidence: 0.3, // Low confidence
reason: 'New customer - using cohort averages',
recommendedAction: 'Re-score after 30 days of activity',
};
}

14. Feature Caching & Performance

14.1 Multi-Level Cache

interface FeatureCacheConfig {
// L1: In-memory (hot data)
l1: {
maxSize: 10000, // Max customers in memory
ttlSeconds: 300, // 5 minute TTL
};

// L2: Redis (warm data)
l2: {
ttlSeconds: 3600, // 1 hour TTL
compressionEnabled: true,
};

// L3: Database (cold data)
l3: {
useLatestSnapshot: boolean; // Use daily snapshot
snapshotMaxAge: 24, // Max age in hours
};
}

@Injectable()
export class FeatureCacheService {
async getFeatures(customerId: string): Promise<CustomerFeatures> {
// L1: Check memory
const l1Result = this.memoryCache.get(customerId);
if (l1Result) return l1Result;

// L2: Check Redis
const l2Result = await this.redis.get(`features:${customerId}`);
if (l2Result) {
this.memoryCache.set(customerId, l2Result);
return l2Result;
}

// L3: Compute from database
const features = await this.featureService.extractFeatures(customerId);

// Populate caches
await this.redis.setex(`features:${customerId}`, 3600, features);
this.memoryCache.set(customerId, features);

return features;
}
}

14.2 Batch Prediction Optimization

interface BatchPredictionConfig {
// Parallel processing
workerCount: number; // Parallel workers
batchSize: number; // Customers per batch

// Resource limits
maxMemoryMB: number; // Memory limit per worker
timeoutSeconds: number; // Max time per batch

// Prioritization
priorityField: string; // Score high-value first
priorityOrder: 'ASC' | 'DESC';
}

// Optimized batch scoring
async function batchScoreWithPriority(tenantId: string): Promise<void> {
// Score high-value customers first
const customers = await this.prisma.customerProfile.findMany({
where: { tenantId, status: 'ACTIVE' },
orderBy: { lifetimeValue: 'desc' },
select: { id: true },
});

// Process in parallel batches
const batches = chunk(customers, 100);
await Promise.all(batches.map(batch =>
this.processBatch(batch, { timeout: 30000 })
));
}

15. MLOps & CI/CD

15.1 Model Deployment Pipeline

Manual pipeline (conceptual):

  1. Validate
    • Pull latest model
    • Run validation suite
    • Check bias metrics
    • Compare to production
  2. Stage
    • Deploy to staging
    • Run shadow predictions
    • Compare accuracy
  3. Deploy
    • Deploy to production
    • Update model registry
    • Notify stakeholders

15.2 Model Versioning Convention

// Semantic versioning for models
// {major}.{minor}.{patch}-{tenant}

// Major: Algorithm change (e.g., XGBoost → Neural Net)
// Minor: Feature set change
// Patch: Retraining with same config

const modelVersionExamples = [
'churn-1.0.0-tenant123', // Initial model
'churn-1.0.1-tenant123', // Retrained
'churn-1.1.0-tenant123', // Added features
'churn-2.0.0-tenant123', // New algorithm
];

15.3 Rollback Procedure

@Injectable()
export class ModelRollbackService {
/**
* Rollback to previous model version
*/
async rollback(
tenantId: string,
modelType: MLModelType,
reason: string
): Promise<void> {
// 1. Get current production model
const current = await this.modelRegistry.getProductionModel(tenantId, modelType);

// 2. Get previous version
const previous = await this.modelRegistry.getPreviousVersion(current.id);

if (!previous) {
throw new Error('No previous version available for rollback');
}

// 3. Promote previous to production
await this.modelRegistry.promoteToProduction(previous.id);

// 4. Mark current as deprecated
await this.modelRegistry.deprecate(current.id, reason);

// 5. Clear prediction cache
await this.featureCache.invalidateTenant(tenantId);

// 6. Log rollback event
await this.emitEvent(CrmEvent.MLModelRolledBack, {
tenantId,
modelType,
fromVersion: current.version,
toVersion: previous.version,
reason,
});
}
}

16. Advanced Features (Future)

16.1 Embedding-Based Clustering

// Use embeddings for richer customer representations
interface CustomerEmbedding {
customerId: string;
embedding: number[]; // 128-dim vector
embeddingModel: string; // Model that generated it
generatedAt: Date;
}

// Generate embeddings from activity sequences
async function generateActivityEmbedding(customerId: string): Promise<number[]> {
const activities = await this.getActivitySequence(customerId);

// Use transformer model to encode activity sequence
const embedding = await this.embeddingModel.encode(activities);

return embedding;
}

16.2 Real-Time Feature Updates

// Stream processing for real-time feature updates
interface RealtimeFeatureConfig {
streamSource: 'KAFKA' | 'KINESIS' | 'PUBSUB';
updateLatency: 'IMMEDIATE' | 'BATCHED';
batchWindowSeconds: number;
}

// Triggered features (update on event)
const TRIGGERED_FEATURES = [
'daysSinceLastActivity',
'lastActivityType',
'activityCount30d',
];

16.3 AutoML Integration

// Automatic hyperparameter tuning
interface AutoMLConfig {
enabled: boolean;
maxTrials: number; // Max hyperparameter combinations
metric: 'AUC_ROC' | 'F1' | 'PRECISION' | 'RECALL';
timeoutMinutes: number;
parallelTrials: number;
}