TUTORIALS 13 min read

Build an AI Anomaly Detection System: Catch Problems Before They Explode

Build a real-time anomaly detection system that monitors metrics, detects unusual patterns, and explains what went wrong. Full Python tutorial.

By EgoistAI ·
Build an AI Anomaly Detection System: Catch Problems Before They Explode

Anomaly detection is the guard dog of production systems. It watches metrics that humans can’t, detects patterns that dashboards miss, and alerts your team before small issues become outages. Traditional anomaly detection relies on static thresholds — “alert if CPU > 90%.” AI anomaly detection learns what normal looks like and flags anything that deviates, adapting automatically as your system evolves.

This tutorial builds a real-time anomaly detection system that monitors time-series metrics, identifies unusual patterns using statistical methods and AI, and generates explanations that help engineers understand what’s actually wrong.

What We’re Building

Chapter 1: What We're Building

An anomaly detection system that:

  1. Ingests time-series metrics (CPU, memory, request rates, error rates, custom metrics)
  2. Learns normal patterns automatically (including daily/weekly seasonality)
  3. Detects point anomalies (single unusual values) and collective anomalies (unusual patterns)
  4. Uses AI to generate human-readable explanations of detected anomalies
  5. Correlates anomalies across multiple metrics to identify root causes
  6. Sends prioritized alerts

Tech Stack

  • Python 3.11+
  • scikit-learn for statistical anomaly detection (Isolation Forest, LOF)
  • Claude API for anomaly explanation and correlation analysis
  • InfluxDB or SQLite for time-series storage
  • Streamlit for monitoring dashboard

Step 1: Metric Collection

Chapter 2: Collection

from dataclasses import dataclass
from datetime import datetime
from typing import Optional
import psutil
import time

@dataclass
class MetricPoint:
    name: str
    value: float
    timestamp: datetime
    labels: dict = None

def collect_system_metrics() -> list[MetricPoint]:
    now = datetime.now()
    return [
        MetricPoint("cpu_percent", psutil.cpu_percent(), now),
        MetricPoint("memory_percent", psutil.virtual_memory().percent, now),
        MetricPoint("disk_percent", psutil.disk_usage('/').percent, now),
        MetricPoint("network_bytes_sent",
                    psutil.net_io_counters().bytes_sent, now),
        MetricPoint("network_bytes_recv",
                    psutil.net_io_counters().bytes_recv, now),
    ]

def collect_app_metrics(endpoint: str = "http://localhost:8080/metrics"):
    """Collect from Prometheus-compatible endpoint."""
    import requests
    resp = requests.get(endpoint)
    # Parse Prometheus format or JSON metrics
    metrics = parse_metrics(resp.text)
    return metrics

Step 2: Statistical Anomaly Detection

Chapter 3: Statistics

from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
import numpy as np
from collections import deque

class AnomalyDetector:
    def __init__(self, window_size: int = 1000):
        self.windows = {}
        self.models = {}
        self.scalers = {}
        self.window_size = window_size

    def add_point(self, metric_name: str, value: float) -> dict:
        if metric_name not in self.windows:
            self.windows[metric_name] = deque(maxlen=self.window_size)
            self.models[metric_name] = IsolationForest(
                contamination=0.05, random_state=42
            )
            self.scalers[metric_name] = StandardScaler()

        self.windows[metric_name].append(value)
        window = list(self.windows[metric_name])

        if len(window) < 100:
            return {"is_anomaly": False, "reason": "insufficient_data"}

        # Fit and predict
        X = np.array(window).reshape(-1, 1)
        X_scaled = self.scalers[metric_name].fit_transform(X)
        self.models[metric_name].fit(X_scaled)

        # Check latest point
        latest = X_scaled[-1].reshape(1, -1)
        score = self.models[metric_name].decision_function(latest)[0]
        prediction = self.models[metric_name].predict(latest)[0]

        # Calculate z-score for additional context
        mean = np.mean(window[:-1])
        std = np.std(window[:-1])
        z_score = (value - mean) / std if std > 0 else 0

        return {
            "is_anomaly": prediction == -1,
            "anomaly_score": float(score),
            "z_score": float(z_score),
            "value": value,
            "mean": float(mean),
            "std": float(std)
        }

Step 3: AI-Powered Anomaly Explanation

Chapter 4: AI Explanation

from anthropic import Anthropic
import json

client = Anthropic()

def explain_anomaly(metric_name: str, anomaly_data: dict,
                    recent_values: list, correlated_metrics: dict = None) -> str:
    context = f"""
Metric: {metric_name}
Current value: {anomaly_data['value']:.2f}
Normal range: {anomaly_data['mean']:.2f} +/- {anomaly_data['std']:.2f}
Z-score: {anomaly_data['z_score']:.2f}
Recent values (last 20): {recent_values[-20:]}
"""
    if correlated_metrics:
        context += f"\nOther metrics at the same time:\n"
        for name, data in correlated_metrics.items():
            context += f"  {name}: {data['value']:.2f} (normal: {data['mean']:.2f})\n"

    response = client.messages.create(
        model="claude-sonnet-4-20250514",
        max_tokens=512,
        system="""You are an SRE anomaly analyst. Given metric data, explain:
1. What the anomaly is (in plain English)
2. Likely cause based on the metric type and correlated metrics
3. Severity (info/warning/critical)
4. Recommended action
Be concise and technical.""",
        messages=[{"role": "user", "content": context}]
    )
    return response.content[0].text

Step 4: Multi-Metric Correlation

Chapter 5: Correlation

def find_correlated_anomalies(detectors: dict, timestamp) -> list:
    """Find anomalies occurring simultaneously across metrics."""
    concurrent_anomalies = []
    for name, detector in detectors.items():
        result = detector.get_latest_result()
        if result and result["is_anomaly"]:
            concurrent_anomalies.append({"name": name, **result})

    if len(concurrent_anomalies) > 1:
        # Ask AI to correlate
        response = client.messages.create(
            model="claude-sonnet-4-20250514",
            max_tokens=512,
            system="Analyze correlated anomalies and identify the likely root cause.",
            messages=[{"role": "user",
                      "content": json.dumps(concurrent_anomalies, indent=2)}]
        )
        return {
            "anomalies": concurrent_anomalies,
            "root_cause_analysis": response.content[0].text
        }
    return {"anomalies": concurrent_anomalies, "root_cause_analysis": None}

Step 5: Alert Pipeline

Chapter 6: Alerts

Build an alert pipeline with severity-based routing, deduplication (don’t alert on the same anomaly repeatedly), escalation (if unacknowledged after N minutes), and AI-generated summaries for incident channels.

Step 6: Monitoring Dashboard

Chapter 7: Dashboard

Create a Streamlit dashboard showing:

  • Real-time metric charts with anomaly markers
  • Anomaly timeline showing detected events
  • AI-generated explanations for each anomaly
  • Correlation visualization between metrics
  • Historical accuracy (were flagged anomalies actually problems?)

Step 7: Tuning and Optimization

Chapter 8: Tuning

Contamination Parameter

The Isolation Forest’s contamination parameter controls sensitivity. Start at 0.05 (expect 5% anomalies) and adjust based on your alert volume. Too many false positives? Lower it. Missing real issues? Raise it.

Seasonal Adjustment

For metrics with daily/weekly patterns (request volume, user counts), decompose the time series into trend + seasonal + residual components. Run anomaly detection on the residual to avoid flagging normal daily peaks as anomalies.

AI Cost Control

Only send anomalies to the AI for explanation — don’t send every data point. Use statistical detection as the first filter and AI as the analyst. This keeps API costs under $20/month for most deployments.

The Bottom Line

AI anomaly detection catches problems that static thresholds miss — gradual degradation, unusual patterns, and correlated failures. The combination of statistical detection (fast, cheap) with AI analysis (contextual, explanatory) gives you the best of both worlds.

Build time: 5-6 hours. Ongoing cost: $15-30/month. Value: catching the performance degradation or security anomaly that would otherwise become a major incident.

Share this article

> Want more like this?

Get the best AI insights delivered weekly.

> Related Articles

Tags

anomaly detectionmonitoringPythonmachine learningtime seriestutorial

> Stay in the loop

Weekly AI tools & insights.