Back to Resources
Architecture

What is MLOps? Complete Guide to ML Operations [2025]

Comprehensive guide to MLOps—the practice of deploying, monitoring, and maintaining machine learning models in production. Learn the principles, tools, and best practices.

June 4, 2025
13 min read
MLOpsAI OperationsDevOpsModel LifecycleProduction AI

MLOps (Machine Learning Operations) is the practice of deploying, monitoring, and maintaining machine learning models in production. It's DevOps for ML—combining software engineering, data engineering, and ML expertise to build reliable, scalable ML systems.

What You'll Learn

  • What MLOps is and why it's critical for production ML
  • Core MLOps principles and lifecycle stages
  • Key components: data pipelines, model training, deployment, monitoring
  • MLOps tools and platforms (MLflow, Kubeflow, AWS SageMaker, etc.)
  • Best practices for production ML systems
  • Real-world MLOps architecture examples

Why MLOps Matters

Most of the ML projects never make it to production. MLOps bridges the gap between ML experimentation and production deployment.

  • Without MLOps: Models decay, pipelines break, no monitoring, manual deployments
  • With MLOps: Automated pipelines, continuous monitoring, easy rollbacks, reproducible results

What is MLOps?

MLOps extends DevOps principles to machine learning, addressing the unique challenges of ML systems.

MLOps vs DevOps

AspectDevOpsMLOps
ArtifactCodeCode + Data + Model
TestingUnit, integration tests+ Data validation, model performance
DeploymentDeploy codeDeploy model + serving infrastructure
MonitoringUptime, latency, errors+ Model drift, data drift, accuracy
ComplexityDeterministicNon-deterministic, data-dependent

Continuous Training

Automatically retrain models with new data

Models are retrained weekly/monthly as new data arrives, ensuring they stay accurate

Model Versioning

Track and manage different model versions

Every model version is tracked with code, data, and hyperparameters for reproducibility

Automated Deployment

Deploy models to production automatically

CI/CD pipelines test and deploy models without manual intervention

Monitoring & Alerting

Track model performance in production

Alert when accuracy drops, data drifts, or predictions become unreliable

MLOps Lifecycle

The complete ML lifecycle from data to deployment and monitoring.

End-to-End MLOps Pipeline

1. Data Collection
Gather and store training data from various sources
2. Data Preparation
Clean, transform, and validate data for training
3. Model Training
Train models with hyperparameter tuning and experimentation
4. Model Evaluation
Validate model performance on test data
5. Model Deployment
Deploy model to production serving infrastructure
6. Monitoring
Track performance, detect drift, trigger retraining

Continuous Loop: Monitoring triggers retraining when performance degrades

Core MLOps Components

1. Data Pipeline

Automated data collection, validation, and preprocessing.

Automated Data Pipeline
python
# Data Pipeline with Apache Airflow
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

def extract_data():
    """Extract data from source systems"""
    data = fetch_from_database()
    save_to_staging(data)

def validate_data():
    """Validate data quality"""
    data = load_from_staging()
    
    # Check for missing values
    assert data.isnull().sum().sum() == 0, "Missing values found"
    
    # Check data distribution
    assert data['feature'].mean() > 0, "Invalid distribution"
    
    # Check schema
    expected_columns = ['feature1', 'feature2', 'target']
    assert all(col in data.columns for col in expected_columns)

def transform_data():
    """Transform and feature engineer"""
    data = load_from_staging()
    
    # Feature engineering
    data['new_feature'] = data['feature1'] * data['feature2']
    
    # Normalization
    from sklearn.preprocessing import StandardScaler
    scaler = StandardScaler()
    data[['feature1', 'feature2']] = scaler.fit_transform(
        data[['feature1', 'feature2']]
    )
    
    save_to_processed(data)

# Define DAG
dag = DAG(
    'ml_data_pipeline',
    default_args={
        'owner': 'ml-team',
        'retries': 3,
        'retry_delay': timedelta(minutes=5)
    },
    schedule_interval='@daily',
    start_date=datetime(2025, 1, 1)
)

# Define tasks
extract_task = PythonOperator(
    task_id='extract_data',
    python_callable=extract_data,
    dag=dag
)

validate_task = PythonOperator(
    task_id='validate_data',
    python_callable=validate_data,
    dag=dag
)

transform_task = PythonOperator(
    task_id='transform_data',
    python_callable=transform_data,
    dag=dag
)

# Set dependencies
extract_task >> validate_task >> transform_task

2. Model Training & Experiment Tracking

Track experiments, hyperparameters, and model versions.

Model Training with MLflow
python
# Model Training with MLflow
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score

# Set MLflow tracking URI
mlflow.set_tracking_uri("http://mlflow-server:5000")
mlflow.set_experiment("customer_churn_prediction")

def train_model(data, hyperparameters):
    """Train model with experiment tracking"""
    
    # Start MLflow run
    with mlflow.start_run(run_name="rf_model_v1"):
        
        # Log parameters
        mlflow.log_params(hyperparameters)
        
        # Split data
        X = data.drop('target', axis=1)
        y = data['target']
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42
        )
        
        # Train model
        model = RandomForestClassifier(**hyperparameters)
        model.fit(X_train, y_train)
        
        # Evaluate
        y_pred = model.predict(X_test)
        accuracy = accuracy_score(y_test, y_pred)
        f1 = f1_score(y_test, y_pred)
        
        # Log metrics
        mlflow.log_metric("accuracy", accuracy)
        mlflow.log_metric("f1_score", f1)
        
        # Log model
        mlflow.sklearn.log_model(
            model,
            "model",
            registered_model_name="customer_churn_model"
        )
        
        # Log artifacts
        import matplotlib.pyplot as plt
        from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
        
        cm = confusion_matrix(y_test, y_pred)
        disp = ConfusionMatrixDisplay(cm)
        disp.plot()
        plt.savefig("confusion_matrix.png")
        mlflow.log_artifact("confusion_matrix.png")
        
        print(f"Model trained: accuracy={accuracy:.3f}, f1={f1:.3f}")
        
        return model

# Hyperparameter tuning
hyperparameter_grid = [
    {'n_estimators': 100, 'max_depth': 10},
    {'n_estimators': 200, 'max_depth': 15},
    {'n_estimators': 300, 'max_depth': 20}
]

# Train multiple models
for params in hyperparameter_grid:
    train_model(data, params)

# Get best model
best_model = mlflow.search_runs(
    experiment_names=["customer_churn_prediction"],
    order_by=["metrics.accuracy DESC"],
    max_results=1
)
print(f"Best model: {best_model['run_id'].values[0]}")

3. Model Deployment

Deploy models as REST APIs or batch inference services.

Model Deployment API
python
# Model Serving with FastAPI
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import mlflow.pyfunc
import numpy as np

app = FastAPI(title="ML Model API")

# Load model from MLflow
model = mlflow.pyfunc.load_model("models:/customer_churn_model/production")

class PredictionRequest(BaseModel):
    features: list[float]

class PredictionResponse(BaseModel):
    prediction: int
    probability: float
    model_version: str

@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
    """Make prediction"""
    try:
        # Prepare input
        features = np.array(request.features).reshape(1, -1)
        
        # Make prediction
        prediction = model.predict(features)[0]
        probability = model.predict_proba(features)[0][1]
        
        return PredictionResponse(
            prediction=int(prediction),
            probability=float(probability),
            model_version="v1.2.3"
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/health")
async def health_check():
    """Health check endpoint"""
    return {"status": "healthy", "model_loaded": model is not None}

# Kubernetes deployment
# deployment.yaml
"""
apiVersion: apps/v1
kind: Deployment
metadata:
  name: ml-model-api
spec:
  replicas: 3
  selector:
    matchLabels:
      app: ml-model
  template:
    metadata:
      labels:
        app: ml-model
    spec:
      containers:
      - name: api
        image: ml-model-api:v1.2.3
        ports:
        - containerPort: 8000
        env:
        - name: MLFLOW_TRACKING_URI
          value: "http://mlflow:5000"
        resources:
          requests:
            memory: "512Mi"
            cpu: "500m"
          limits:
            memory: "1Gi"
            cpu: "1000m"
        livenessProbe:
          httpGet:
            path: /health
            port: 8000
          initialDelaySeconds: 30
          periodSeconds: 10
"""

Model Monitoring & Drift Detection

Production models degrade over time. Monitoring detects issues before they impact users.

Data Drift

Input data distribution changes over time

Example:

Customer demographics shift, new product categories appear

Concept Drift

Relationship between features and target changes

Example:

Economic conditions change, user behavior evolves

Model Decay

Model performance degrades over time

Example:

Accuracy drops from 95% to 85% over 3 months

Model Monitoring & Drift Detection
python
# Model Monitoring Implementation
from evidently import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, DataQualityPreset
import pandas as pd
from datetime import datetime

class ModelMonitor:
    """Monitor model performance and data drift"""
    
    def __init__(self, reference_data: pd.DataFrame):
        self.reference_data = reference_data
        self.metrics_history = []
    
    def check_data_drift(self, current_data: pd.DataFrame) -> dict:
        """Detect data drift between reference and current data"""
        
        # Create drift report
        report = Report(metrics=[
            DataDriftPreset(),
            DataQualityPreset()
        ])
        
        report.run(
            reference_data=self.reference_data,
            current_data=current_data
        )
        
        # Extract drift metrics
        drift_results = report.as_dict()
        
        # Check if drift detected
        drift_detected = drift_results['metrics'][0]['result']['dataset_drift']
        
        if drift_detected:
            drifted_features = [
                feature for feature, result in 
                drift_results['metrics'][0]['result']['drift_by_columns'].items()
                if result['drift_detected']
            ]
            
            return {
                "drift_detected": True,
                "drifted_features": drifted_features,
                "timestamp": datetime.utcnow().isoformat()
            }
        
        return {"drift_detected": False}
    
    def monitor_predictions(
        self,
        predictions: np.array,
        actuals: np.array,
        features: pd.DataFrame
    ) -> dict:
        """Monitor model performance"""
        
        from sklearn.metrics import accuracy_score, precision_score, recall_score
        
        # Calculate metrics
        accuracy = accuracy_score(actuals, predictions)
        precision = precision_score(actuals, predictions, average='weighted')
        recall = recall_score(actuals, predictions, average='weighted')
        
        metrics = {
            "timestamp": datetime.utcnow().isoformat(),
            "accuracy": accuracy,
            "precision": precision,
            "recall": recall,
            "num_predictions": len(predictions)
        }
        
        # Store metrics
        self.metrics_history.append(metrics)
        
        # Check for performance degradation
        if len(self.metrics_history) > 1:
            prev_accuracy = self.metrics_history[-2]["accuracy"]
            accuracy_drop = prev_accuracy - accuracy
            
            if accuracy_drop > 0.05:  # 5% drop
                metrics["alert"] = f"Accuracy dropped by {accuracy_drop:.2%}"
        
        # Check for data drift
        drift_result = self.check_data_drift(features)
        if drift_result["drift_detected"]:
            metrics["drift_alert"] = f"Data drift detected in: {drift_result['drifted_features']}"
        
        return metrics
    
    def should_retrain(self) -> bool:
        """Determine if model should be retrained"""
        
        if len(self.metrics_history) < 2:
            return False
        
        recent_metrics = self.metrics_history[-5:]  # Last 5 periods
        avg_accuracy = sum(m["accuracy"] for m in recent_metrics) / len(recent_metrics)
        
        # Retrain if average accuracy drops below threshold
        if avg_accuracy < 0.85:
            return True
        
        # Retrain if drift detected in recent periods
        drift_count = sum(1 for m in recent_metrics if "drift_alert" in m)
        if drift_count >= 3:
            return True
        
        return False

# Usage in production
monitor = ModelMonitor(reference_data=training_data)

# Monitor predictions continuously
while True:
    # Get recent predictions and actuals
    predictions, actuals, features = get_recent_data()
    
    # Monitor performance
    metrics = monitor.monitor_predictions(predictions, actuals, features)
    
    # Log metrics
    logger.info(f"Model metrics: {metrics}")
    
    # Send to monitoring dashboard
    send_to_dashboard(metrics)
    
    # Check if retraining needed
    if monitor.should_retrain():
        logger.warning("Model performance degraded. Triggering retraining...")
        trigger_retraining_pipeline()
    
    time.sleep(3600)  # Check hourly

MLOps Tools & Platforms

Popular tools for implementing MLOps in your organization.

ToolCategoryBest ForDeployment
MLflowExperiment TrackingModel versioning, tracking experimentsOpen source, self-hosted
KubeflowEnd-to-End PlatformComplete ML pipelines on KubernetesOpen source, K8s
AWS SageMakerCloud PlatformAWS-native ML workflowsManaged, AWS
Azure MLCloud PlatformAzure-native ML workflowsManaged, Azure
Vertex AICloud PlatformGCP-native ML workflowsManaged, GCP
Weights & BiasesExperiment TrackingDeep learning experiments, visualizationSaaS
DVCData VersioningVersion control for data and modelsOpen source
Evidently AIMonitoringData drift detection, model monitoringOpen source
AirflowOrchestrationData pipelines, workflow automationOpen source

Choosing MLOps Tools

  • Start simple: Begin with MLflow for experiment tracking, expand as needed
  • Cloud-native: Use SageMaker/Azure ML/Vertex AI if already on that cloud
  • Kubernetes: Kubeflow if you have K8s expertise and need full control
  • Open source: MLflow + DVC + Airflow for cost-effective solution

MLOps Best Practices

✓ Do

  • Version everything: code, data, models, configs
  • Automate data validation and quality checks
  • Track all experiments with metrics and parameters
  • Implement CI/CD for model deployment
  • Monitor models continuously in production
  • Set up automated retraining pipelines
  • Use feature stores for consistency
  • Implement model rollback capabilities
  • Document model cards and data sheets
  • Test models before production deployment

✗ Don't

  • Deploy models without monitoring
  • Skip data validation in pipelines
  • Manually deploy models to production
  • Ignore model performance degradation
  • Train on production data without validation
  • Forget to version training data
  • Use different features in training vs serving
  • Deploy without A/B testing capability
  • Neglect model documentation
  • Assume models work forever without retraining

Key Takeaways

  • MLOps is essential: Many of the ML projects fail without proper operations practices
  • Complete lifecycle: Data collection → Training → Deployment → Monitoring → Retraining
  • Automation is key: Automate data pipelines, training, deployment, and monitoring
  • Version everything: Code, data, models, and configurations must be versioned
  • Monitor continuously: Track data drift, model drift, and performance degradation
  • Tools matter: Use MLflow, Kubeflow, or cloud platforms to implement MLOps
  • Start simple: Begin with experiment tracking, add complexity as you scale

Start Your MLOps Journey

MLOps transforms ML from experiments to production systems. Start with the basics and scale as you grow.

Back to Resources