TUTORIALS 11 min read

Build AI-Powered Data Pipelines: Automate ETL with LLMs, Python, and dbt

Traditional ETL is manual drudgery. Here's how to use LLMs to automate data classification, cleaning, enrichment, and transformation — with production code.

By EgoistAI ·
Build AI-Powered Data Pipelines: Automate ETL with LLMs, Python, and dbt

Data engineers spend an estimated 40% of their time on data cleaning and transformation — the tedious work of standardizing formats, fixing inconsistencies, and enriching records. These are exactly the tasks that LLMs handle well: understanding unstructured text, classifying content, and extracting structured information.

This tutorial shows you how to build data pipelines that use AI for the messy parts while keeping traditional, deterministic tools for the structured parts. The result: pipelines that handle real-world data quality issues automatically.

Architecture Overview

Raw Data Sources

[Ingestion Layer] — Airflow/Prefect orchestration

[AI Processing Layer] — LLM-powered cleaning, classification, enrichment

[Transformation Layer] — dbt models for structured transforms

[Output Layer] — Data warehouse / API / Dashboard

The key insight: AI handles unstructured-to-structured conversion. Traditional tools handle structured-to-structured transformation. Don’t use AI where SQL works fine.

Use Case: Customer Feedback Pipeline

We’ll build a pipeline that:

  1. Ingests raw customer feedback from multiple sources (emails, surveys, reviews)
  2. Uses an LLM to classify sentiment, extract topics, and identify urgency
  3. Uses dbt to aggregate and transform the structured output
  4. Outputs a dashboard-ready dataset

Step 1: Ingestion

# src/ingestion.py
import pandas as pd
from pathlib import Path
from datetime import datetime


def load_feedback_sources() -> pd.DataFrame:
    """Load feedback from multiple sources into a unified DataFrame."""
    frames = []

    # Source 1: CSV from survey tool
    survey_df = pd.read_csv("data/raw/survey_responses.csv")
    survey_df["source"] = "survey"
    survey_df = survey_df.rename(columns={"response_text": "feedback_text"})
    frames.append(survey_df[["feedback_text", "source", "timestamp", "customer_id"]])

    # Source 2: JSON from API
    api_df = pd.read_json("data/raw/support_tickets.json")
    api_df["source"] = "support"
    api_df = api_df.rename(columns={"description": "feedback_text"})
    frames.append(api_df[["feedback_text", "source", "timestamp", "customer_id"]])

    # Source 3: App store reviews
    reviews_df = pd.read_csv("data/raw/app_reviews.csv")
    reviews_df["source"] = "app_store"
    reviews_df = reviews_df.rename(columns={"review_body": "feedback_text"})
    reviews_df["customer_id"] = None  # Anonymous reviews
    frames.append(reviews_df[["feedback_text", "source", "timestamp", "customer_id"]])

    combined = pd.concat(frames, ignore_index=True)
    combined["ingested_at"] = datetime.utcnow().isoformat()

    print(f"Loaded {len(combined)} feedback records from {len(frames)} sources")
    return combined

Step 2: AI Processing Layer

This is where LLMs add value. We’ll use Claude to process each feedback record.

# src/ai_processor.py
import anthropic
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
from tenacity import retry, stop_after_attempt, wait_exponential


client = anthropic.Anthropic()

CLASSIFICATION_PROMPT = """Analyze this customer feedback and return a JSON object.

<feedback>
{feedback_text}
</feedback>

Return ONLY valid JSON with these fields:
{{
  "sentiment": "positive" | "negative" | "mixed" | "neutral",
  "sentiment_score": <float from -1.0 to 1.0>,
  "topics": [<list of 1-3 topic strings from: "pricing", "performance", "reliability", "ui_ux", "features", "support", "onboarding", "billing", "security", "other">],
  "urgency": "critical" | "high" | "medium" | "low",
  "key_issue": "<one sentence summary of the main issue, or null if positive>",
  "feature_request": "<extracted feature request if mentioned, or null>",
  "language": "<ISO 639-1 language code>"
}}

Rules:
- "critical" urgency: Data loss, security issues, billing errors, service outages
- "high" urgency: Broken features, blocking issues affecting daily work
- "medium" urgency: Performance issues, minor bugs, UX frustrations
- "low" urgency: Feature requests, general feedback, compliments
- If feedback is in a non-English language, still analyze it and set the language field"""


@retry(stop=stop_after_attempt(3), wait=wait_exponential(min=1, max=10))
def classify_single(feedback_text: str) -> dict:
    """Classify a single feedback record using Claude."""
    if not feedback_text or len(feedback_text.strip()) < 5:
        return {
            "sentiment": "neutral",
            "sentiment_score": 0.0,
            "topics": ["other"],
            "urgency": "low",
            "key_issue": None,
            "feature_request": None,
            "language": "en",
        }

    message = client.messages.create(
        model="claude-sonnet-4-20250514",
        max_tokens=512,
        temperature=0.0,
        messages=[{
            "role": "user",
            "content": CLASSIFICATION_PROMPT.format(feedback_text=feedback_text),
        }],
    )

    try:
        result = json.loads(message.content[0].text)
        return result
    except json.JSONDecodeError:
        # Attempt to extract JSON from response
        text = message.content[0].text
        start = text.find("{")
        end = text.rfind("}") + 1
        if start >= 0 and end > start:
            return json.loads(text[start:end])
        raise ValueError(f"Failed to parse JSON from response: {text[:200]}")


def process_batch(df, max_workers: int = 5) -> list[dict]:
    """Process a batch of feedback records in parallel."""
    results = [None] * len(df)

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_idx = {
            executor.submit(classify_single, row["feedback_text"]): idx
            for idx, row in df.iterrows()
        }

        completed = 0
        for future in as_completed(future_to_idx):
            idx = future_to_idx[future]
            try:
                results[idx] = future.result()
            except Exception as e:
                print(f"Error processing record {idx}: {e}")
                results[idx] = {
                    "sentiment": "unknown",
                    "sentiment_score": 0.0,
                    "topics": ["other"],
                    "urgency": "low",
                    "key_issue": f"Processing error: {str(e)}",
                    "feature_request": None,
                    "language": "unknown",
                }
            completed += 1
            if completed % 100 == 0:
                print(f"Processed {completed}/{len(df)} records")

    return results

Cost Estimation

Before running a batch, estimate costs:

def estimate_cost(df, model="claude-sonnet-4-20250514"):
    """Estimate API costs for processing a DataFrame."""
    avg_input_tokens = 250  # prompt + average feedback length
    avg_output_tokens = 150  # JSON response

    total_input = len(df) * avg_input_tokens
    total_output = len(df) * avg_output_tokens

    # Claude Sonnet pricing (per million tokens)
    input_cost = (total_input / 1_000_000) * 3.00
    output_cost = (total_output / 1_000_000) * 15.00

    print(f"Records: {len(df)}")
    print(f"Estimated input tokens: {total_input:,}")
    print(f"Estimated output tokens: {total_output:,}")
    print(f"Estimated cost: ${input_cost + output_cost:.2f}")

# Example: 10,000 records
# Input: 2.5M tokens × $3/M = $7.50
# Output: 1.5M tokens × $15/M = $22.50
# Total: ~$30 for 10,000 classified records

$30 to classify 10,000 customer feedback records with sentiment, topics, urgency, and feature extraction. A human analyst doing this would take 40-80 hours at $30-50/hour ($1,200-4,000).

Step 3: Merge AI Results with Source Data

# src/merge.py
import pandas as pd


def merge_results(df: pd.DataFrame, ai_results: list[dict]) -> pd.DataFrame:
    """Merge AI classification results back into the source DataFrame."""
    ai_df = pd.DataFrame(ai_results)

    # Explode topics into separate columns for dbt
    ai_df["primary_topic"] = ai_df["topics"].apply(lambda x: x[0] if x else "other")
    ai_df["all_topics"] = ai_df["topics"].apply(lambda x: ",".join(x))

    merged = pd.concat([df.reset_index(drop=True), ai_df], axis=1)
    merged.drop(columns=["topics"], inplace=True)

    return merged


def save_to_warehouse(df: pd.DataFrame, table_name: str = "feedback_classified"):
    """Save to a format dbt can consume (CSV for demo, use DB connector in production)."""
    output_path = f"data/processed/{table_name}.csv"
    df.to_csv(output_path, index=False)
    print(f"Saved {len(df)} records to {output_path}")

Step 4: dbt Transformation

Now that AI has converted unstructured feedback into structured data, dbt handles the aggregation.

-- models/staging/stg_feedback.sql
WITH source AS (
    SELECT * FROM {{ source('processed', 'feedback_classified') }}
)

SELECT
    feedback_text,
    source,
    timestamp::timestamp AS feedback_timestamp,
    customer_id,
    sentiment,
    sentiment_score::float AS sentiment_score,
    primary_topic,
    all_topics,
    urgency,
    key_issue,
    feature_request,
    language,
    ingested_at::timestamp AS ingested_at
FROM source
WHERE sentiment != 'unknown'
-- models/marts/feedback_summary.sql
WITH classified AS (
    SELECT * FROM {{ ref('stg_feedback') }}
),

daily_summary AS (
    SELECT
        DATE_TRUNC('day', feedback_timestamp) AS date,
        source,
        primary_topic,
        COUNT(*) AS feedback_count,
        AVG(sentiment_score) AS avg_sentiment,
        COUNT(CASE WHEN urgency = 'critical' THEN 1 END) AS critical_count,
        COUNT(CASE WHEN urgency = 'high' THEN 1 END) AS high_urgency_count,
        COUNT(CASE WHEN feature_request IS NOT NULL THEN 1 END) AS feature_request_count
    FROM classified
    GROUP BY 1, 2, 3
)

SELECT * FROM daily_summary
-- models/marts/feature_requests.sql
WITH classified AS (
    SELECT * FROM {{ ref('stg_feedback') }}
)

SELECT
    feature_request,
    COUNT(*) AS request_count,
    AVG(sentiment_score) AS avg_sentiment_of_requesters,
    ARRAY_AGG(DISTINCT source) AS sources,
    MIN(feedback_timestamp) AS first_requested,
    MAX(feedback_timestamp) AS last_requested
FROM classified
WHERE feature_request IS NOT NULL
GROUP BY 1
ORDER BY request_count DESC

Step 5: Orchestration with Airflow

# dags/feedback_pipeline.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

default_args = {
    "owner": "data-team",
    "retries": 2,
    "retry_delay": timedelta(minutes=5),
}

with DAG(
    "feedback_pipeline",
    default_args=default_args,
    schedule_interval="@daily",
    start_date=datetime(2026, 4, 1),
    catchup=False,
) as dag:

    ingest = PythonOperator(
        task_id="ingest_feedback",
        python_callable=load_feedback_sources,
    )

    process = PythonOperator(
        task_id="ai_classification",
        python_callable=run_ai_processing,
    )

    transform = BashOperator(
        task_id="dbt_transform",
        bash_command="cd /opt/dbt/feedback && dbt run --models marts",
    )

    test = BashOperator(
        task_id="dbt_test",
        bash_command="cd /opt/dbt/feedback && dbt test",
    )

    ingest >> process >> transform >> test

Quality Assurance: Don’t Trust AI Blindly

AI classification is probabilistic. You need validation:

# src/validation.py

def validate_ai_results(df: pd.DataFrame) -> dict:
    """Run quality checks on AI-classified data."""
    checks = {}

    # Check 1: No unknown sentiments
    unknown_pct = (df["sentiment"] == "unknown").mean()
    checks["unknown_sentiment_rate"] = {
        "value": unknown_pct,
        "threshold": 0.05,
        "passed": unknown_pct < 0.05,
    }

    # Check 2: Sentiment score distribution is reasonable
    score_std = df["sentiment_score"].std()
    checks["sentiment_score_std"] = {
        "value": score_std,
        "threshold": 0.3,  # Should have meaningful variation
        "passed": score_std > 0.3,
    }

    # Check 3: Critical urgency rate is reasonable (1-10%)
    critical_pct = (df["urgency"] == "critical").mean()
    checks["critical_rate"] = {
        "value": critical_pct,
        "threshold": (0.01, 0.10),
        "passed": 0.01 <= critical_pct <= 0.10,
    }

    # Check 4: Topic distribution isn't dominated by "other"
    other_pct = (df["primary_topic"] == "other").mean()
    checks["other_topic_rate"] = {
        "value": other_pct,
        "threshold": 0.30,
        "passed": other_pct < 0.30,
    }

    all_passed = all(c["passed"] for c in checks.values())
    print(f"Quality checks: {'ALL PASSED' if all_passed else 'FAILURES DETECTED'}")
    for name, check in checks.items():
        status = "PASS" if check["passed"] else "FAIL"
        print(f"  [{status}] {name}: {check['value']:.3f} (threshold: {check['threshold']})")

    return checks

When NOT to Use AI in Pipelines

AI is the wrong tool when:

SituationUse Instead
Standardizing date formatspd.to_datetime() or dbt macros
Joining tablesSQL JOIN
Deduplication on exact matchSQL DISTINCT or df.drop_duplicates()
Numeric calculationsSQL or pandas aggregations
Schema validationGreat Expectations, dbt tests
Type conversionCast operations

The rule: if a deterministic function can handle it reliably, don’t use AI. AI should only handle tasks that require natural language understanding, classification of unstructured content, or extraction of information from free text.

Cost Optimization

Batch vs. Real-Time

For pipelines processing 1,000+ records: always batch. Real-time classification (one API call per record as it arrives) is 3-5x more expensive due to overhead.

Model Selection

Task ComplexityRecommended ModelCost per 1K records
Simple sentiment (pos/neg)Claude Haiku~$0.10
Multi-label classificationClaude Sonnet~$3.00
Complex extraction + reasoningClaude Opus~$15.00

Start with Haiku. Only upgrade to Sonnet/Opus if accuracy is insufficient.

Caching

Cache LLM responses for identical inputs. In our feedback pipeline, approximately 5-10% of records are duplicates or near-duplicates. Caching saves that percentage in API costs.

import hashlib
import json
from pathlib import Path

CACHE_DIR = Path("cache/classifications")
CACHE_DIR.mkdir(parents=True, exist_ok=True)

def classify_with_cache(feedback_text: str) -> dict:
    cache_key = hashlib.sha256(feedback_text.encode()).hexdigest()
    cache_file = CACHE_DIR / f"{cache_key}.json"

    if cache_file.exists():
        return json.loads(cache_file.read_text())

    result = classify_single(feedback_text)
    cache_file.write_text(json.dumps(result))
    return result

The Bottom Line

AI-powered data pipelines aren’t about replacing your existing stack. They’re about inserting LLMs at the specific points where unstructured data needs to become structured data. The rest of your pipeline — ingestion, transformation, testing, orchestration — stays deterministic and reliable.

The pattern is simple:

  1. Ingest raw data (traditional tools)
  2. Use AI to classify/extract/enrich unstructured fields
  3. Validate AI output with quality checks
  4. Transform structured output with dbt/SQL
  5. Test everything

Follow this pattern, and you’ll spend 40% less time on data cleaning and 100% more time on actually using the data.

Share this article

> Want more like this?

Get the best AI insights delivered weekly.

> Related Articles

Tags

data pipelinesETLLLMdbtPythondata engineeringtutorial

> Stay in the loop

Weekly AI tools & insights.