Two-tower embedding model for retrieval + gradient-boosted or deep ranking model + FAISS/ScaNN ANN index| Component | Role | Technology Options | Scaling Strategy |
|---|---|---|---|
| Event Ingestion | Capture user interactions (clicks, views, purchases) in real-time | Kafka, AWS Kinesis, Google Pub/Sub | Partition by user_id; horizontal scaling |
| Event Store | Durable log of all interaction events for replay and retraining | Kafka (retention), S3/GCS Parquet, Delta Lake | Tiered storage (hot/warm/cold) |
| Feature Store | Serve consistent features for training and inference | Feast, Tecton, Vertex AI Feature Store | Online store (Redis/DynamoDB) + offline store (BigQuery/S3) |
| Embedding Service | Generate user and item embeddings via two-tower model | TensorFlow Recommenders, PyTorch, custom | GPU cluster for training; CPU/GPU for inference |
| ANN Index | Fast approximate nearest-neighbor retrieval over item embeddings | FAISS, ScaNN, Milvus, Pinecone, Weaviate | Shard by embedding space; replicate for read throughput |
| Candidate Retrieval | Narrow millions of items to hundreds of candidates in <10ms | Two-tower model + ANN, co-occurrence, popularity | Multiple retrieval sources merged; each independently scalable |
| Ranking Model | Score and re-rank candidates using rich cross-features | XGBoost, LambdaMART, deep ranking network (DCN, DLRM) | Model server (TF Serving, Triton); batch + online |
| Business Rules Engine | Apply hard filters (age-gating, geo-restrictions, already-seen) | Custom service, Drools, OPA | Stateless; horizontal scaling |
| Re-ranking / Diversity | Ensure result diversity, freshness, and business objectives | MMR, DPP, slot-based allocation | Lightweight post-processing; CPU-only |
| A/B Testing Framework | Measure online impact of model changes on business KPIs | Optimizely, Statsig, custom (hashing-based) | Consistent hashing for user bucketing |
| Model Training Pipeline | Retrain models on fresh data (daily or continuous) | Airflow/Kubeflow + Spark + GPU training | Scheduled DAGs; spot/preemptible instances |
| Model Registry | Version, track, and deploy trained models | MLflow, Vertex AI Model Registry, SageMaker | Centralized; blue-green deployment |
| Monitoring & Observability | Track model performance, data drift, and system health | Prometheus, Grafana, custom dashboards, Evidently AI | Alert on metric decay, feature drift, latency spikes |
START
+-- Catalog size >100K items?
| +-- YES -> Multi-stage pipeline required (retrieval + ranking)
| | +-- Have rich item metadata (text, images, categories)?
| | | +-- YES -> Hybrid: content-based retrieval + collaborative ranking
| | | +-- NO -> Pure collaborative filtering (two-tower + interaction data)
| | +-- Cold-start items common (>20% of catalog)?
| | +-- YES -> Content-based tower mandatory for item embedding
| | +-- NO -> ID-based embeddings sufficient for item tower
| +-- NO -> Single-stage model feasible
| +-- Have explicit ratings (1-5 stars)?
| | +-- YES -> Matrix factorization (ALS/SVD) or LightFM
| | +-- NO -> Implicit feedback model (BPR, WARP loss)
| +-- Need real-time updates?
| +-- YES -> Online learning or session-based model (GRU4Rec)
| +-- NO -> Batch retrained daily
+-- Latency budget <50ms?
| +-- YES -> Pre-compute recommendations; serve from cache/KV store
| +-- NO -> Online inference acceptable with feature store lookup
+-- DEFAULT -> Start with popularity + simple collaborative filtering, iterate toward hybrid
Design your event schema to capture every user-item interaction with context. This is the foundation -- bad event data means bad recommendations regardless of model sophistication. [src1]
# Event schema (Avro/Protobuf recommended for production)
interaction_event = {
"user_id": "u_abc123", # anonymized user identifier
"item_id": "item_789", # catalog item identifier
"event_type": "click", # click | view | purchase | add_to_cart | skip
"timestamp": "2026-02-23T10:30:00Z",
"context": {
"device": "mobile",
"session_id": "sess_456",
"page": "home_feed",
"position": 3 # position in the list (for position bias correction)
}
}
Verify: Check Kafka consumer lag stays <1s and event count matches expected traffic within 5%.
Materialize user features (historical interaction aggregates, demographics) and item features (metadata, popularity scores, freshness) into both offline and online stores. This eliminates training-serving skew. [src2]
# Feast feature definition (feast_repo/features.py)
from feast import Entity, FeatureView, Field, FileSource
from feast.types import Float32, Int64, String
from datetime import timedelta
user = Entity(name="user_id", join_keys=["user_id"])
user_features = FeatureView(
name="user_features",
entities=[user],
ttl=timedelta(hours=24),
schema=[
Field(name="total_clicks_7d", dtype=Int64),
Field(name="avg_session_duration", dtype=Float32),
Field(name="top_category", dtype=String),
Field(name="interaction_count_30d", dtype=Int64),
],
source=FileSource(path="s3://features/user_features.parquet"),
)
Verify: feast feature-server serve responds in <5ms for user feature lookups; validate feature values match a manual SQL query on 10 random users.
Train a two-tower model where the user tower and item tower produce embeddings in the same vector space. Similarity between embeddings determines relevance. [src4] [src5]
# Two-tower model with TensorFlow Recommenders
import tensorflow as tf
import tensorflow_recommenders as tfrs
class TwoTowerModel(tfrs.Model):
def __init__(self, user_model, item_model, task):
super().__init__()
self.user_model = user_model
self.item_model = item_model
self.task = task
def compute_loss(self, features, training=False):
user_embeddings = self.user_model(features["user_id"])
item_embeddings = self.item_model(features["item_id"])
return self.task(user_embeddings, item_embeddings)
Verify: Evaluate recall@100 on held-out test set; target >0.25 for initial model.
Export item embeddings and build an approximate nearest-neighbor index for sub-millisecond retrieval of top-K candidates from the full catalog. [src7]
import faiss
import numpy as np
embedding_dim = 32
nlist = 256 # number of clusters
quantizer = faiss.IndexFlatIP(embedding_dim)
index = faiss.IndexIVFFlat(quantizer, embedding_dim, nlist, faiss.METRIC_INNER_PRODUCT)
index.train(item_embeddings_np)
index.add(item_embeddings_np)
index.nprobe = 16 # search 16 clusters (speed vs recall trade-off)
# Query: get top 200 candidates for a user
scores, indices = index.search(user_embedding, k=200)
Verify: Measure recall@200 vs. brute-force search; target >0.95. Query latency target: <5ms for 1M items.
Train a ranking model that takes the retrieved candidates and produces a fine-grained relevance score using rich cross-features between user and item. [src2] [src3]
import xgboost as xgb
dtrain = xgb.DMatrix(train_features, label=train_labels)
params = {
"objective": "rank:pairwise",
"eval_metric": "ndcg@10",
"max_depth": 6,
"eta": 0.1,
}
ranker = xgb.train(params, dtrain, num_boost_round=200)
Verify: Evaluate NDCG@10 on held-out set; target >0.35. Compare against popularity-only baseline.
After scoring, apply hard business rules (geo-restrictions, already-consumed filtering) and diversity logic to avoid filter bubbles. [src6]
# Maximal Marginal Relevance (MMR) for diversity
selected = []
lambda_param = 0.7 # 0=max diversity, 1=max relevance
for _ in range(num_results):
best = max(remaining, key=lambda c:
lambda_param * c["score"] -
(1 - lambda_param) * max_similarity(c, selected))
selected.append(best)
Verify: Check no duplicate categories in top-3 and no already-consumed items appear.
Serve the pipeline behind an A/B testing framework. Monitor model metrics (NDCG, coverage, diversity), system metrics (latency, throughput), and business metrics (CTR, conversion, revenue). [src1]
Verify: Confirm consistent user bucketing (same user always sees same variant). Check metric tracking fires correctly.
# Input: CSV of (user_id, item_id, rating) tuples
# Output: Top-N recommendations for a given user
from surprise import SVD, Dataset, Reader
import pandas as pd
reader = Reader(rating_scale=(1, 5))
data = Dataset.load_from_df(
pd.read_csv("ratings.csv")[["user_id", "item_id", "rating"]], reader
)
algo = SVD(n_factors=100, n_epochs=20, lr_all=0.005, reg_all=0.02)
trainset = data.build_full_trainset()
algo.fit(trainset)
anti_testset = trainset.build_anti_testset()
predictions = algo.test([x for x in anti_testset if x[0] == "u_123"])
top_10 = sorted(predictions, key=lambda x: x.est, reverse=True)[:10]
# Input: user_id at serving time
# Output: consistent feature vector for model inference
from feast import FeatureStore
store = FeatureStore(repo_path="feast_repo/")
features = store.get_online_features(
entity_rows=[{"user_id": "u_abc123"}],
features=[
"user_features:total_clicks_7d",
"user_features:avg_session_duration",
"user_features:top_category",
],
).to_dict()
# Pass features to ranking model -- guarantees no training-serving skew
# BAD -- treating all events as equal positive signals
train_data = df[["user_id", "item_id"]] # clicks, views, purchases all weight=1
model.fit(train_data)
# GOOD -- stronger signals get higher weight
weights = {"purchase": 5.0, "add_to_cart": 3.0, "click": 1.0, "view": 0.3}
df["weight"] = df["event_type"].map(weights)
model.fit(df[["user_id", "item_id"]], sample_weight=df["weight"])
# BAD -- retrieval model uses cross-features (cannot scale to full catalog)
retrieval_features = ["user_age", "item_category", "user_x_item_category_affinity"]
# Requires computing features for ALL items per request -- O(N) is too slow
# GOOD -- retrieval uses independent towers (pre-computed, ANN lookup)
user_embedding = user_tower(user_features) # O(1)
candidates = ann_index.search(user_embedding, k=200) # O(log N)
ranked = ranking_model.predict(cross_features(user, candidates)) # O(200)
# BAD -- no A/B test, no business metric validation
if new_model_ndcg > old_model_ndcg:
deploy(new_model)
# GOOD -- offline metrics gate launch; online metrics gate full rollout
if new_model_ndcg > old_model_ndcg * 1.02: # >2% offline lift
launch_ab_test(new_model, traffic=10%)
# Only promote to 100% if revenue_per_user improves with p<0.05
# BAD -- position 1 gets 10x CTR regardless of relevance
# Training on this data reinforces existing ranking (feedback loop)
train_data = raw_click_log
# GOOD -- correct for position bias in training data
position_bias = {1: 1.0, 2: 0.7, 3: 0.5, 4: 0.35, 5: 0.25}
df["corrected_weight"] = df["clicked"] / df["position"].map(position_bias)
| Use When | Don't Use When | Use Instead |
|---|---|---|
| Catalog >1K items and user interaction data available | Catalog <100 items | Manual curation or simple rule-based sorting |
| Personalization drives business KPI (engagement, revenue) | All users should see the same content (editorial, news homepage) | Content management system with editorial ranking |
| Sufficient interaction volume (>10K interactions/day) | Very sparse data (<1K total interactions) | Content-based filtering only, or popularity-based |
| Need to surface long-tail items users would not find via search | Users know exactly what they want (transactional search) | Search engine with relevance ranking |
| Real-time personalization matters (feeds, homepages) | Batch recommendations suffice (weekly email digest) | Simpler batch job with matrix factorization |