MLOps (Machine Learning Operations) is the practice of deploying, monitoring, and maintaining machine learning models in production. It's DevOps for ML—combining software engineering, data engineering, and ML expertise to build reliable, scalable ML systems.
What You'll Learn
- What MLOps is and why it's critical for production ML
- Core MLOps principles and lifecycle stages
- Key components: data pipelines, model training, deployment, monitoring
- MLOps tools and platforms (MLflow, Kubeflow, AWS SageMaker, etc.)
- Best practices for production ML systems
- Real-world MLOps architecture examples
Why MLOps Matters
Most of the ML projects never make it to production. MLOps bridges the gap between ML experimentation and production deployment.
- • Without MLOps: Models decay, pipelines break, no monitoring, manual deployments
- • With MLOps: Automated pipelines, continuous monitoring, easy rollbacks, reproducible results
What is MLOps?
MLOps extends DevOps principles to machine learning, addressing the unique challenges of ML systems.
MLOps vs DevOps
| Aspect | DevOps | MLOps |
|---|---|---|
| Artifact | Code | Code + Data + Model |
| Testing | Unit, integration tests | + Data validation, model performance |
| Deployment | Deploy code | Deploy model + serving infrastructure |
| Monitoring | Uptime, latency, errors | + Model drift, data drift, accuracy |
| Complexity | Deterministic | Non-deterministic, data-dependent |
Continuous Training
Automatically retrain models with new data
Models are retrained weekly/monthly as new data arrives, ensuring they stay accurate
Model Versioning
Track and manage different model versions
Every model version is tracked with code, data, and hyperparameters for reproducibility
Automated Deployment
Deploy models to production automatically
CI/CD pipelines test and deploy models without manual intervention
Monitoring & Alerting
Track model performance in production
Alert when accuracy drops, data drifts, or predictions become unreliable
MLOps Lifecycle
The complete ML lifecycle from data to deployment and monitoring.
End-to-End MLOps Pipeline
↻ Continuous Loop: Monitoring triggers retraining when performance degrades
Core MLOps Components
1. Data Pipeline
Automated data collection, validation, and preprocessing.
# Data Pipeline with Apache Airflow
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
def extract_data():
"""Extract data from source systems"""
data = fetch_from_database()
save_to_staging(data)
def validate_data():
"""Validate data quality"""
data = load_from_staging()
# Check for missing values
assert data.isnull().sum().sum() == 0, "Missing values found"
# Check data distribution
assert data['feature'].mean() > 0, "Invalid distribution"
# Check schema
expected_columns = ['feature1', 'feature2', 'target']
assert all(col in data.columns for col in expected_columns)
def transform_data():
"""Transform and feature engineer"""
data = load_from_staging()
# Feature engineering
data['new_feature'] = data['feature1'] * data['feature2']
# Normalization
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
data[['feature1', 'feature2']] = scaler.fit_transform(
data[['feature1', 'feature2']]
)
save_to_processed(data)
# Define DAG
dag = DAG(
'ml_data_pipeline',
default_args={
'owner': 'ml-team',
'retries': 3,
'retry_delay': timedelta(minutes=5)
},
schedule_interval='@daily',
start_date=datetime(2025, 1, 1)
)
# Define tasks
extract_task = PythonOperator(
task_id='extract_data',
python_callable=extract_data,
dag=dag
)
validate_task = PythonOperator(
task_id='validate_data',
python_callable=validate_data,
dag=dag
)
transform_task = PythonOperator(
task_id='transform_data',
python_callable=transform_data,
dag=dag
)
# Set dependencies
extract_task >> validate_task >> transform_task
2. Model Training & Experiment Tracking
Track experiments, hyperparameters, and model versions.
# Model Training with MLflow
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score
# Set MLflow tracking URI
mlflow.set_tracking_uri("http://mlflow-server:5000")
mlflow.set_experiment("customer_churn_prediction")
def train_model(data, hyperparameters):
"""Train model with experiment tracking"""
# Start MLflow run
with mlflow.start_run(run_name="rf_model_v1"):
# Log parameters
mlflow.log_params(hyperparameters)
# Split data
X = data.drop('target', axis=1)
y = data['target']
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# Train model
model = RandomForestClassifier(**hyperparameters)
model.fit(X_train, y_train)
# Evaluate
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
# Log metrics
mlflow.log_metric("accuracy", accuracy)
mlflow.log_metric("f1_score", f1)
# Log model
mlflow.sklearn.log_model(
model,
"model",
registered_model_name="customer_churn_model"
)
# Log artifacts
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
cm = confusion_matrix(y_test, y_pred)
disp = ConfusionMatrixDisplay(cm)
disp.plot()
plt.savefig("confusion_matrix.png")
mlflow.log_artifact("confusion_matrix.png")
print(f"Model trained: accuracy={accuracy:.3f}, f1={f1:.3f}")
return model
# Hyperparameter tuning
hyperparameter_grid = [
{'n_estimators': 100, 'max_depth': 10},
{'n_estimators': 200, 'max_depth': 15},
{'n_estimators': 300, 'max_depth': 20}
]
# Train multiple models
for params in hyperparameter_grid:
train_model(data, params)
# Get best model
best_model = mlflow.search_runs(
experiment_names=["customer_churn_prediction"],
order_by=["metrics.accuracy DESC"],
max_results=1
)
print(f"Best model: {best_model['run_id'].values[0]}")
3. Model Deployment
Deploy models as REST APIs or batch inference services.
# Model Serving with FastAPI
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import mlflow.pyfunc
import numpy as np
app = FastAPI(title="ML Model API")
# Load model from MLflow
model = mlflow.pyfunc.load_model("models:/customer_churn_model/production")
class PredictionRequest(BaseModel):
features: list[float]
class PredictionResponse(BaseModel):
prediction: int
probability: float
model_version: str
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
"""Make prediction"""
try:
# Prepare input
features = np.array(request.features).reshape(1, -1)
# Make prediction
prediction = model.predict(features)[0]
probability = model.predict_proba(features)[0][1]
return PredictionResponse(
prediction=int(prediction),
probability=float(probability),
model_version="v1.2.3"
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health_check():
"""Health check endpoint"""
return {"status": "healthy", "model_loaded": model is not None}
# Kubernetes deployment
# deployment.yaml
"""
apiVersion: apps/v1
kind: Deployment
metadata:
name: ml-model-api
spec:
replicas: 3
selector:
matchLabels:
app: ml-model
template:
metadata:
labels:
app: ml-model
spec:
containers:
- name: api
image: ml-model-api:v1.2.3
ports:
- containerPort: 8000
env:
- name: MLFLOW_TRACKING_URI
value: "http://mlflow:5000"
resources:
requests:
memory: "512Mi"
cpu: "500m"
limits:
memory: "1Gi"
cpu: "1000m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
"""
Model Monitoring & Drift Detection
Production models degrade over time. Monitoring detects issues before they impact users.
Data Drift
Input data distribution changes over time
Example:
Customer demographics shift, new product categories appear
Concept Drift
Relationship between features and target changes
Example:
Economic conditions change, user behavior evolves
Model Decay
Model performance degrades over time
Example:
Accuracy drops from 95% to 85% over 3 months
# Model Monitoring Implementation
from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, DataQualityPreset
import pandas as pd
from datetime import datetime
class ModelMonitor:
"""Monitor model performance and data drift"""
def __init__(self, reference_data: pd.DataFrame):
self.reference_data = reference_data
self.metrics_history = []
def check_data_drift(self, current_data: pd.DataFrame) -> dict:
"""Detect data drift between reference and current data"""
# Create drift report
report = Report(metrics=[
DataDriftPreset(),
DataQualityPreset()
])
report.run(
reference_data=self.reference_data,
current_data=current_data
)
# Extract drift metrics
drift_results = report.as_dict()
# Check if drift detected
drift_detected = drift_results['metrics'][0]['result']['dataset_drift']
if drift_detected:
drifted_features = [
feature for feature, result in
drift_results['metrics'][0]['result']['drift_by_columns'].items()
if result['drift_detected']
]
return {
"drift_detected": True,
"drifted_features": drifted_features,
"timestamp": datetime.utcnow().isoformat()
}
return {"drift_detected": False}
def monitor_predictions(
self,
predictions: np.array,
actuals: np.array,
features: pd.DataFrame
) -> dict:
"""Monitor model performance"""
from sklearn.metrics import accuracy_score, precision_score, recall_score
# Calculate metrics
accuracy = accuracy_score(actuals, predictions)
precision = precision_score(actuals, predictions, average='weighted')
recall = recall_score(actuals, predictions, average='weighted')
metrics = {
"timestamp": datetime.utcnow().isoformat(),
"accuracy": accuracy,
"precision": precision,
"recall": recall,
"num_predictions": len(predictions)
}
# Store metrics
self.metrics_history.append(metrics)
# Check for performance degradation
if len(self.metrics_history) > 1:
prev_accuracy = self.metrics_history[-2]["accuracy"]
accuracy_drop = prev_accuracy - accuracy
if accuracy_drop > 0.05: # 5% drop
metrics["alert"] = f"Accuracy dropped by {accuracy_drop:.2%}"
# Check for data drift
drift_result = self.check_data_drift(features)
if drift_result["drift_detected"]:
metrics["drift_alert"] = f"Data drift detected in: {drift_result['drifted_features']}"
return metrics
def should_retrain(self) -> bool:
"""Determine if model should be retrained"""
if len(self.metrics_history) < 2:
return False
recent_metrics = self.metrics_history[-5:] # Last 5 periods
avg_accuracy = sum(m["accuracy"] for m in recent_metrics) / len(recent_metrics)
# Retrain if average accuracy drops below threshold
if avg_accuracy < 0.85:
return True
# Retrain if drift detected in recent periods
drift_count = sum(1 for m in recent_metrics if "drift_alert" in m)
if drift_count >= 3:
return True
return False
# Usage in production
monitor = ModelMonitor(reference_data=training_data)
# Monitor predictions continuously
while True:
# Get recent predictions and actuals
predictions, actuals, features = get_recent_data()
# Monitor performance
metrics = monitor.monitor_predictions(predictions, actuals, features)
# Log metrics
logger.info(f"Model metrics: {metrics}")
# Send to monitoring dashboard
send_to_dashboard(metrics)
# Check if retraining needed
if monitor.should_retrain():
logger.warning("Model performance degraded. Triggering retraining...")
trigger_retraining_pipeline()
time.sleep(3600) # Check hourly
MLOps Tools & Platforms
Popular tools for implementing MLOps in your organization.
| Tool | Category | Best For | Deployment |
|---|---|---|---|
| MLflow | Experiment Tracking | Model versioning, tracking experiments | Open source, self-hosted |
| Kubeflow | End-to-End Platform | Complete ML pipelines on Kubernetes | Open source, K8s |
| AWS SageMaker | Cloud Platform | AWS-native ML workflows | Managed, AWS |
| Azure ML | Cloud Platform | Azure-native ML workflows | Managed, Azure |
| Vertex AI | Cloud Platform | GCP-native ML workflows | Managed, GCP |
| Weights & Biases | Experiment Tracking | Deep learning experiments, visualization | SaaS |
| DVC | Data Versioning | Version control for data and models | Open source |
| Evidently AI | Monitoring | Data drift detection, model monitoring | Open source |
| Airflow | Orchestration | Data pipelines, workflow automation | Open source |
Choosing MLOps Tools
- Start simple: Begin with MLflow for experiment tracking, expand as needed
- Cloud-native: Use SageMaker/Azure ML/Vertex AI if already on that cloud
- Kubernetes: Kubeflow if you have K8s expertise and need full control
- Open source: MLflow + DVC + Airflow for cost-effective solution
MLOps Best Practices
✓ Do
- →Version everything: code, data, models, configs
- →Automate data validation and quality checks
- →Track all experiments with metrics and parameters
- →Implement CI/CD for model deployment
- →Monitor models continuously in production
- →Set up automated retraining pipelines
- →Use feature stores for consistency
- →Implement model rollback capabilities
- →Document model cards and data sheets
- →Test models before production deployment
✗ Don't
- →Deploy models without monitoring
- →Skip data validation in pipelines
- →Manually deploy models to production
- →Ignore model performance degradation
- →Train on production data without validation
- →Forget to version training data
- →Use different features in training vs serving
- →Deploy without A/B testing capability
- →Neglect model documentation
- →Assume models work forever without retraining
Key Takeaways
- →MLOps is essential: Many of the ML projects fail without proper operations practices
- →Complete lifecycle: Data collection → Training → Deployment → Monitoring → Retraining
- →Automation is key: Automate data pipelines, training, deployment, and monitoring
- →Version everything: Code, data, models, and configurations must be versioned
- →Monitor continuously: Track data drift, model drift, and performance degradation
- →Tools matter: Use MLflow, Kubeflow, or cloud platforms to implement MLOps
- →Start simple: Begin with experiment tracking, add complexity as you scale
Start Your MLOps Journey
MLOps transforms ML from experiments to production systems. Start with the basics and scale as you grow.