Data Engineering: Implementing Robust Data Validation Pipelines
Bad data is silent. It doesn't throw exceptions or crash your system — it just quietly corrupts your analytics, misleads your stakeholders, and eventually destroys trust in your entire data platform.…
Data Engineering: Implementing Robust Data Validation Pipelines
Bad data is silent. It doesn't throw exceptions or crash your system — it just quietly corrupts your analytics, misleads your stakeholders, and eventually destroys trust in your entire data platform. If you've ever had a dashboard show negative revenue or a machine learning model trained on null values, you know exactly what I mean.
Data validation pipelines are how you catch these problems before they become disasters. And right now, interviewers are specifically looking for engineers who know how to build them properly. Let's get into it.
Why Data Validation Isn't Optional
Most data engineers focus on getting data *moving*. ETL pipelines, Spark jobs, Airflow DAGs — the plumbing. Validation feels like extra work until the day your source system starts sending malformed JSON and your entire warehouse silently fills with garbage.
Here's the reality: data quality issues cost organizations an estimated $12.9 million per year on average (Gartner). More practically, a single bad data event can take days to debug and roll back. Validation pipelines shift you from reactive firefighting to proactive quality control.
The three layers you need to care about:
Layer 1: Schema Validation
Schema validation is your first line of defense. Before any transformation runs, you want to confirm the incoming data matches your contract.
Here's a simple example using Python with jsonschema for validating an incoming API payload:
from jsonschema import validate, ValidationErrorschema = {
"type": "object",
"properties": {
"user_id": {"type": "integer"},
"email": {"type": "string", "format": "email"},
"signup_date": {"type": "string", "format": "date"},
"plan": {"type": "string", "enum": ["free", "pro", "enterprise"]}
},
"required": ["user_id", "email", "signup_date"]
}
def validate_record(record: dict) -> bool:
try:
validate(instance=record, schema=schema)
return True
except ValidationError as e:
print(f"Validation failed: {e.message}")
return False
Test it
record = {"user_id": 42, "email": "jane@example.com", "signup_date": "2024-01-15", "plan": "pro"}
validate_record(record) # Truebad_record = {"user_id": "not-an-int", "email": "jane@example.com", "signup_date": "2024-01-15"}
validate_record(bad_record) # False — logs the error
For columnar data in Pandas, pandera is a cleaner choice:
import pandera as pa
import pandas as pdschema = pa.DataFrameSchema({
"user_id": pa.Column(int, nullable=False),
"email": pa.Column(str, pa.Check.str_matches(r".+@.+\..+")),
"revenue": pa.Column(float, pa.Check.greater_than_or_equal_to(0)),
"country_code": pa.Column(str, pa.Check.isin(["US", "UK", "CA", "AU"]))
})
try:
validated_df = schema.validate(df)
except pa.errors.SchemaError as e:
print(f"Schema validation failed: {e}")
Fail fast, fail loudly. If schema validation fails, stop the pipeline and alert — don't let bad data flow downstream.
Layer 2: Data Quality Checks with dbt and Great Expectations
Once data is in your warehouse, you need ongoing quality checks baked into your transformation layer. This is where dbt and Great Expectations shine.
dbt Tests
If you're using dbt, you already have a powerful testing framework built in. Generic tests are quick wins:
# models/schema.yml
models:
- name: orders
columns:
- name: order_id
tests:
- unique
- not_null
- name: status
tests:
- accepted_values:
values: ['placed', 'shipped', 'delivered', 'cancelled']
- name: customer_id
tests:
- not_null
- relationships:
to: ref('customers')
field: customer_idFor more complex checks, write custom singular tests:
-- tests/assert_revenue_not_negative.sql
SELECT
order_id,
revenue
FROM {{ ref('orders') }}
WHERE revenue < 0If this query returns any rows, dbt marks the test as failed. Simple and effective.
Great Expectations
Great Expectations is more powerful when you need rich documentation and complex statistical checks. Here's a basic expectation suite:
import great_expectations as gxcontext = gx.get_context()
Define expectations
validator = context.sources.pandas_default.read_csv("orders.csv")validator.expect_column_values_to_not_be_null("order_id")
validator.expect_column_values_to_be_unique("order_id")
validator.expect_column_values_to_be_between("revenue", min_value=0, max_value=1_000_000)
validator.expect_column_median_to_be_between("revenue", min_value=50, max_value=500)
validator.expect_column_proportion_of_unique_values_to_be_between(
"customer_id", min_value=0.3, max_value=1.0
)
results = validator.validate()
print(results["success"]) # True/False
The expect_column_median_to_be_between check is particularly useful — it catches the scenario where data looks structurally valid but something has gone statistically wrong (like revenue suddenly dropping 80% because of a source system bug).
Layer 3: Anomaly Detection
Statistical checks catch issues that rule-based validation misses. A column can be non-null, correctly typed, and within range — but still be completely wrong.
A lightweight approach using Z-scores to flag anomalies in daily metrics:
import pandas as pd
from scipy import statsdef detect_anomalies(df: pd.DataFrame, column: str, threshold: float = 3.0) -> pd.DataFrame:
"""Flag rows where values are more than threshold standard deviations from the mean."""
z_scores = stats.zscore(df[column].dropna())
df = df.copy()
df["z_score"] = abs(z_scores)
df["is_anomaly"] = df["z_score"] > threshold
return df[df["is_anomaly"]]
Example: check daily order counts
daily_orders = pd.DataFrame({
"date": pd.date_range("2024-01-01", periods=30),
"order_count": [150, 148, 155, 160, 145, 152, 149, 3, 151, 147, ...] # 3 is suspicious
})anomalies = detect_anomalies(daily_orders, "order_count")
if not anomalies.empty:
print(f"Anomalies detected on: {anomalies['date'].tolist()}")
# Trigger alert
For production systems, consider more sophisticated approaches like Facebook Prophet for time-series anomaly detection or Isolation Forest for multivariate anomalies. But Z-scores get you surprisingly far and are easy to explain to stakeholders.
Practical Tips for Production Pipelines
Don't validate everything equally. Tier your checks. Critical business metrics (revenue, user counts) get strict checks and hard failures. Enrichment data gets soft warnings. Not every failed check should stop your pipeline.
Log validation results as data. Write your validation outcomes to a table, not just logs. This lets you track data quality trends over time and spot degrading sources before they become critical.
-- validation_results table
INSERT INTO data_quality.validation_log (
run_id, model_name, check_name, status, failed_rows, run_timestamp
) VALUES (
'{{ run_id }}', 'orders', 'revenue_not_negative', 'PASSED', 0, CURRENT_TIMESTAMP
);Validate at every stage boundary. Source → Staging, Staging → Intermediate, Intermediate → Mart. Each handoff is a chance for corruption. Each boundary should have a gate.
Make failures observable. Wire your validation failures into your alerting system (PagerDuty, Slack, whatever you use). A validation check nobody sees is the same as no validation check.
Actionable Next Steps
not_null, unique, and accepted_values take 10 minutes to add and catch a huge percentage of common issues.Data quality is a feature, not an afterthought. Build the validation layer now, before your stakeholders lose confidence in your numbers — because once that trust is gone, it's very hard to get back.