Build AI-Powered Data Pipelines: Automate ETL with LLMs, Python, and dbt
Traditional ETL is manual drudgery. Here's how to use LLMs to automate data classification, cleaning, enrichment, and transformation — with production code.
Data engineers spend an estimated 40% of their time on data cleaning and transformation — the tedious work of standardizing formats, fixing inconsistencies, and enriching records. These are exactly the tasks that LLMs handle well: understanding unstructured text, classifying content, and extracting structured information.
This tutorial shows you how to build data pipelines that use AI for the messy parts while keeping traditional, deterministic tools for the structured parts. The result: pipelines that handle real-world data quality issues automatically.
Architecture Overview
Raw Data Sources
↓
[Ingestion Layer] — Airflow/Prefect orchestration
↓
[AI Processing Layer] — LLM-powered cleaning, classification, enrichment
↓
[Transformation Layer] — dbt models for structured transforms
↓
[Output Layer] — Data warehouse / API / Dashboard
The key insight: AI handles unstructured-to-structured conversion. Traditional tools handle structured-to-structured transformation. Don’t use AI where SQL works fine.
Use Case: Customer Feedback Pipeline
We’ll build a pipeline that:
- Ingests raw customer feedback from multiple sources (emails, surveys, reviews)
- Uses an LLM to classify sentiment, extract topics, and identify urgency
- Uses dbt to aggregate and transform the structured output
- Outputs a dashboard-ready dataset
Step 1: Ingestion
# src/ingestion.py
import pandas as pd
from pathlib import Path
from datetime import datetime
def load_feedback_sources() -> pd.DataFrame:
"""Load feedback from multiple sources into a unified DataFrame."""
frames = []
# Source 1: CSV from survey tool
survey_df = pd.read_csv("data/raw/survey_responses.csv")
survey_df["source"] = "survey"
survey_df = survey_df.rename(columns={"response_text": "feedback_text"})
frames.append(survey_df[["feedback_text", "source", "timestamp", "customer_id"]])
# Source 2: JSON from API
api_df = pd.read_json("data/raw/support_tickets.json")
api_df["source"] = "support"
api_df = api_df.rename(columns={"description": "feedback_text"})
frames.append(api_df[["feedback_text", "source", "timestamp", "customer_id"]])
# Source 3: App store reviews
reviews_df = pd.read_csv("data/raw/app_reviews.csv")
reviews_df["source"] = "app_store"
reviews_df = reviews_df.rename(columns={"review_body": "feedback_text"})
reviews_df["customer_id"] = None # Anonymous reviews
frames.append(reviews_df[["feedback_text", "source", "timestamp", "customer_id"]])
combined = pd.concat(frames, ignore_index=True)
combined["ingested_at"] = datetime.utcnow().isoformat()
print(f"Loaded {len(combined)} feedback records from {len(frames)} sources")
return combined
Step 2: AI Processing Layer
This is where LLMs add value. We’ll use Claude to process each feedback record.
# src/ai_processor.py
import anthropic
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
from tenacity import retry, stop_after_attempt, wait_exponential
client = anthropic.Anthropic()
CLASSIFICATION_PROMPT = """Analyze this customer feedback and return a JSON object.
<feedback>
{feedback_text}
</feedback>
Return ONLY valid JSON with these fields:
{{
"sentiment": "positive" | "negative" | "mixed" | "neutral",
"sentiment_score": <float from -1.0 to 1.0>,
"topics": [<list of 1-3 topic strings from: "pricing", "performance", "reliability", "ui_ux", "features", "support", "onboarding", "billing", "security", "other">],
"urgency": "critical" | "high" | "medium" | "low",
"key_issue": "<one sentence summary of the main issue, or null if positive>",
"feature_request": "<extracted feature request if mentioned, or null>",
"language": "<ISO 639-1 language code>"
}}
Rules:
- "critical" urgency: Data loss, security issues, billing errors, service outages
- "high" urgency: Broken features, blocking issues affecting daily work
- "medium" urgency: Performance issues, minor bugs, UX frustrations
- "low" urgency: Feature requests, general feedback, compliments
- If feedback is in a non-English language, still analyze it and set the language field"""
@retry(stop=stop_after_attempt(3), wait=wait_exponential(min=1, max=10))
def classify_single(feedback_text: str) -> dict:
"""Classify a single feedback record using Claude."""
if not feedback_text or len(feedback_text.strip()) < 5:
return {
"sentiment": "neutral",
"sentiment_score": 0.0,
"topics": ["other"],
"urgency": "low",
"key_issue": None,
"feature_request": None,
"language": "en",
}
message = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=512,
temperature=0.0,
messages=[{
"role": "user",
"content": CLASSIFICATION_PROMPT.format(feedback_text=feedback_text),
}],
)
try:
result = json.loads(message.content[0].text)
return result
except json.JSONDecodeError:
# Attempt to extract JSON from response
text = message.content[0].text
start = text.find("{")
end = text.rfind("}") + 1
if start >= 0 and end > start:
return json.loads(text[start:end])
raise ValueError(f"Failed to parse JSON from response: {text[:200]}")
def process_batch(df, max_workers: int = 5) -> list[dict]:
"""Process a batch of feedback records in parallel."""
results = [None] * len(df)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_idx = {
executor.submit(classify_single, row["feedback_text"]): idx
for idx, row in df.iterrows()
}
completed = 0
for future in as_completed(future_to_idx):
idx = future_to_idx[future]
try:
results[idx] = future.result()
except Exception as e:
print(f"Error processing record {idx}: {e}")
results[idx] = {
"sentiment": "unknown",
"sentiment_score": 0.0,
"topics": ["other"],
"urgency": "low",
"key_issue": f"Processing error: {str(e)}",
"feature_request": None,
"language": "unknown",
}
completed += 1
if completed % 100 == 0:
print(f"Processed {completed}/{len(df)} records")
return results
Cost Estimation
Before running a batch, estimate costs:
def estimate_cost(df, model="claude-sonnet-4-20250514"):
"""Estimate API costs for processing a DataFrame."""
avg_input_tokens = 250 # prompt + average feedback length
avg_output_tokens = 150 # JSON response
total_input = len(df) * avg_input_tokens
total_output = len(df) * avg_output_tokens
# Claude Sonnet pricing (per million tokens)
input_cost = (total_input / 1_000_000) * 3.00
output_cost = (total_output / 1_000_000) * 15.00
print(f"Records: {len(df)}")
print(f"Estimated input tokens: {total_input:,}")
print(f"Estimated output tokens: {total_output:,}")
print(f"Estimated cost: ${input_cost + output_cost:.2f}")
# Example: 10,000 records
# Input: 2.5M tokens × $3/M = $7.50
# Output: 1.5M tokens × $15/M = $22.50
# Total: ~$30 for 10,000 classified records
$30 to classify 10,000 customer feedback records with sentiment, topics, urgency, and feature extraction. A human analyst doing this would take 40-80 hours at $30-50/hour ($1,200-4,000).
Step 3: Merge AI Results with Source Data
# src/merge.py
import pandas as pd
def merge_results(df: pd.DataFrame, ai_results: list[dict]) -> pd.DataFrame:
"""Merge AI classification results back into the source DataFrame."""
ai_df = pd.DataFrame(ai_results)
# Explode topics into separate columns for dbt
ai_df["primary_topic"] = ai_df["topics"].apply(lambda x: x[0] if x else "other")
ai_df["all_topics"] = ai_df["topics"].apply(lambda x: ",".join(x))
merged = pd.concat([df.reset_index(drop=True), ai_df], axis=1)
merged.drop(columns=["topics"], inplace=True)
return merged
def save_to_warehouse(df: pd.DataFrame, table_name: str = "feedback_classified"):
"""Save to a format dbt can consume (CSV for demo, use DB connector in production)."""
output_path = f"data/processed/{table_name}.csv"
df.to_csv(output_path, index=False)
print(f"Saved {len(df)} records to {output_path}")
Step 4: dbt Transformation
Now that AI has converted unstructured feedback into structured data, dbt handles the aggregation.
-- models/staging/stg_feedback.sql
WITH source AS (
SELECT * FROM {{ source('processed', 'feedback_classified') }}
)
SELECT
feedback_text,
source,
timestamp::timestamp AS feedback_timestamp,
customer_id,
sentiment,
sentiment_score::float AS sentiment_score,
primary_topic,
all_topics,
urgency,
key_issue,
feature_request,
language,
ingested_at::timestamp AS ingested_at
FROM source
WHERE sentiment != 'unknown'
-- models/marts/feedback_summary.sql
WITH classified AS (
SELECT * FROM {{ ref('stg_feedback') }}
),
daily_summary AS (
SELECT
DATE_TRUNC('day', feedback_timestamp) AS date,
source,
primary_topic,
COUNT(*) AS feedback_count,
AVG(sentiment_score) AS avg_sentiment,
COUNT(CASE WHEN urgency = 'critical' THEN 1 END) AS critical_count,
COUNT(CASE WHEN urgency = 'high' THEN 1 END) AS high_urgency_count,
COUNT(CASE WHEN feature_request IS NOT NULL THEN 1 END) AS feature_request_count
FROM classified
GROUP BY 1, 2, 3
)
SELECT * FROM daily_summary
-- models/marts/feature_requests.sql
WITH classified AS (
SELECT * FROM {{ ref('stg_feedback') }}
)
SELECT
feature_request,
COUNT(*) AS request_count,
AVG(sentiment_score) AS avg_sentiment_of_requesters,
ARRAY_AGG(DISTINCT source) AS sources,
MIN(feedback_timestamp) AS first_requested,
MAX(feedback_timestamp) AS last_requested
FROM classified
WHERE feature_request IS NOT NULL
GROUP BY 1
ORDER BY request_count DESC
Step 5: Orchestration with Airflow
# dags/feedback_pipeline.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
"owner": "data-team",
"retries": 2,
"retry_delay": timedelta(minutes=5),
}
with DAG(
"feedback_pipeline",
default_args=default_args,
schedule_interval="@daily",
start_date=datetime(2026, 4, 1),
catchup=False,
) as dag:
ingest = PythonOperator(
task_id="ingest_feedback",
python_callable=load_feedback_sources,
)
process = PythonOperator(
task_id="ai_classification",
python_callable=run_ai_processing,
)
transform = BashOperator(
task_id="dbt_transform",
bash_command="cd /opt/dbt/feedback && dbt run --models marts",
)
test = BashOperator(
task_id="dbt_test",
bash_command="cd /opt/dbt/feedback && dbt test",
)
ingest >> process >> transform >> test
Quality Assurance: Don’t Trust AI Blindly
AI classification is probabilistic. You need validation:
# src/validation.py
def validate_ai_results(df: pd.DataFrame) -> dict:
"""Run quality checks on AI-classified data."""
checks = {}
# Check 1: No unknown sentiments
unknown_pct = (df["sentiment"] == "unknown").mean()
checks["unknown_sentiment_rate"] = {
"value": unknown_pct,
"threshold": 0.05,
"passed": unknown_pct < 0.05,
}
# Check 2: Sentiment score distribution is reasonable
score_std = df["sentiment_score"].std()
checks["sentiment_score_std"] = {
"value": score_std,
"threshold": 0.3, # Should have meaningful variation
"passed": score_std > 0.3,
}
# Check 3: Critical urgency rate is reasonable (1-10%)
critical_pct = (df["urgency"] == "critical").mean()
checks["critical_rate"] = {
"value": critical_pct,
"threshold": (0.01, 0.10),
"passed": 0.01 <= critical_pct <= 0.10,
}
# Check 4: Topic distribution isn't dominated by "other"
other_pct = (df["primary_topic"] == "other").mean()
checks["other_topic_rate"] = {
"value": other_pct,
"threshold": 0.30,
"passed": other_pct < 0.30,
}
all_passed = all(c["passed"] for c in checks.values())
print(f"Quality checks: {'ALL PASSED' if all_passed else 'FAILURES DETECTED'}")
for name, check in checks.items():
status = "PASS" if check["passed"] else "FAIL"
print(f" [{status}] {name}: {check['value']:.3f} (threshold: {check['threshold']})")
return checks
When NOT to Use AI in Pipelines
AI is the wrong tool when:
| Situation | Use Instead |
|---|---|
| Standardizing date formats | pd.to_datetime() or dbt macros |
| Joining tables | SQL JOIN |
| Deduplication on exact match | SQL DISTINCT or df.drop_duplicates() |
| Numeric calculations | SQL or pandas aggregations |
| Schema validation | Great Expectations, dbt tests |
| Type conversion | Cast operations |
The rule: if a deterministic function can handle it reliably, don’t use AI. AI should only handle tasks that require natural language understanding, classification of unstructured content, or extraction of information from free text.
Cost Optimization
Batch vs. Real-Time
For pipelines processing 1,000+ records: always batch. Real-time classification (one API call per record as it arrives) is 3-5x more expensive due to overhead.
Model Selection
| Task Complexity | Recommended Model | Cost per 1K records |
|---|---|---|
| Simple sentiment (pos/neg) | Claude Haiku | ~$0.10 |
| Multi-label classification | Claude Sonnet | ~$3.00 |
| Complex extraction + reasoning | Claude Opus | ~$15.00 |
Start with Haiku. Only upgrade to Sonnet/Opus if accuracy is insufficient.
Caching
Cache LLM responses for identical inputs. In our feedback pipeline, approximately 5-10% of records are duplicates or near-duplicates. Caching saves that percentage in API costs.
import hashlib
import json
from pathlib import Path
CACHE_DIR = Path("cache/classifications")
CACHE_DIR.mkdir(parents=True, exist_ok=True)
def classify_with_cache(feedback_text: str) -> dict:
cache_key = hashlib.sha256(feedback_text.encode()).hexdigest()
cache_file = CACHE_DIR / f"{cache_key}.json"
if cache_file.exists():
return json.loads(cache_file.read_text())
result = classify_single(feedback_text)
cache_file.write_text(json.dumps(result))
return result
The Bottom Line
AI-powered data pipelines aren’t about replacing your existing stack. They’re about inserting LLMs at the specific points where unstructured data needs to become structured data. The rest of your pipeline — ingestion, transformation, testing, orchestration — stays deterministic and reliable.
The pattern is simple:
- Ingest raw data (traditional tools)
- Use AI to classify/extract/enrich unstructured fields
- Validate AI output with quality checks
- Transform structured output with dbt/SQL
- Test everything
Follow this pattern, and you’ll spend 40% less time on data cleaning and 100% more time on actually using the data.
> Want more like this?
Get the best AI insights delivered weekly.
> Related Articles
Web Scraping with AI: Build a Smart Data Extraction Pipeline
Traditional web scraping breaks when websites change layouts. AI-powered scraping understands page structure and extracts data intelligently. Here's how to build one using Python, Beautiful Soup, and Claude.
Create an AI Art Portfolio: From Generation to Gallery in One Weekend
Build a professional AI art portfolio website with curated collections, consistent style, and proper attribution. Covers prompt engineering, style consistency, curation, and deployment.
Build an AI Chrome Extension: Add Claude to Any Webpage in 60 Minutes
Build a Chrome extension that summarizes web pages, answers questions about content, and rewrites selected text — all powered by Claude. Full source code and step-by-step instructions included.
Tags
> Stay in the loop
Weekly AI tools & insights.