Back to blog
data engineeringdata qualitydata validationdbtgreat expectations

Data Engineering: Implementing Robust Data Validation Pipelines

Bad data is silent. It doesn't throw exceptions or crash your system — it just quietly corrupts your analytics, misleads your stakeholders, and eventually destroys trust in your entire data platform.…

Data Engineering: Implementing Robust Data Validation Pipelines

Bad data is silent. It doesn't throw exceptions or crash your system — it just quietly corrupts your analytics, misleads your stakeholders, and eventually destroys trust in your entire data platform. If you've ever had a dashboard show negative revenue or a machine learning model trained on null values, you know exactly what I mean.

Data validation pipelines are how you catch these problems before they become disasters. And right now, interviewers are specifically looking for engineers who know how to build them properly. Let's get into it.

Why Data Validation Isn't Optional

Most data engineers focus on getting data *moving*. ETL pipelines, Spark jobs, Airflow DAGs — the plumbing. Validation feels like extra work until the day your source system starts sending malformed JSON and your entire warehouse silently fills with garbage.

Here's the reality: data quality issues cost organizations an estimated $12.9 million per year on average (Gartner). More practically, a single bad data event can take days to debug and roll back. Validation pipelines shift you from reactive firefighting to proactive quality control.

The three layers you need to care about:

  • Schema validation — Is the structure what we expect?
  • Data type and constraint checks — Are the values valid?
  • Anomaly detection — Does the data *look* right statistically?
  • Layer 1: Schema Validation

    Schema validation is your first line of defense. Before any transformation runs, you want to confirm the incoming data matches your contract.

    Here's a simple example using Python with jsonschema for validating an incoming API payload:

    from jsonschema import validate, ValidationError

    schema = { "type": "object", "properties": { "user_id": {"type": "integer"}, "email": {"type": "string", "format": "email"}, "signup_date": {"type": "string", "format": "date"}, "plan": {"type": "string", "enum": ["free", "pro", "enterprise"]} }, "required": ["user_id", "email", "signup_date"] }

    def validate_record(record: dict) -> bool: try: validate(instance=record, schema=schema) return True except ValidationError as e: print(f"Validation failed: {e.message}") return False

    Test it

    record = {"user_id": 42, "email": "jane@example.com", "signup_date": "2024-01-15", "plan": "pro"} validate_record(record) # True

    bad_record = {"user_id": "not-an-int", "email": "jane@example.com", "signup_date": "2024-01-15"} validate_record(bad_record) # False — logs the error

    For columnar data in Pandas, pandera is a cleaner choice:

    import pandera as pa
    import pandas as pd

    schema = pa.DataFrameSchema({ "user_id": pa.Column(int, nullable=False), "email": pa.Column(str, pa.Check.str_matches(r".+@.+\..+")), "revenue": pa.Column(float, pa.Check.greater_than_or_equal_to(0)), "country_code": pa.Column(str, pa.Check.isin(["US", "UK", "CA", "AU"])) })

    try: validated_df = schema.validate(df) except pa.errors.SchemaError as e: print(f"Schema validation failed: {e}")

    Fail fast, fail loudly. If schema validation fails, stop the pipeline and alert — don't let bad data flow downstream.

    Layer 2: Data Quality Checks with dbt and Great Expectations

    Once data is in your warehouse, you need ongoing quality checks baked into your transformation layer. This is where dbt and Great Expectations shine.

    dbt Tests

    If you're using dbt, you already have a powerful testing framework built in. Generic tests are quick wins:

    # models/schema.yml
    models:
      - name: orders
        columns:
          - name: order_id
            tests:
              - unique
              - not_null
          - name: status
            tests:
              - accepted_values:
                  values: ['placed', 'shipped', 'delivered', 'cancelled']
          - name: customer_id
            tests:
              - not_null
              - relationships:
                  to: ref('customers')
                  field: customer_id

    For more complex checks, write custom singular tests:

    -- tests/assert_revenue_not_negative.sql
    SELECT
        order_id,
        revenue
    FROM {{ ref('orders') }}
    WHERE revenue < 0

    If this query returns any rows, dbt marks the test as failed. Simple and effective.

    Great Expectations

    Great Expectations is more powerful when you need rich documentation and complex statistical checks. Here's a basic expectation suite:

    import great_expectations as gx

    context = gx.get_context()

    Define expectations

    validator = context.sources.pandas_default.read_csv("orders.csv")

    validator.expect_column_values_to_not_be_null("order_id") validator.expect_column_values_to_be_unique("order_id") validator.expect_column_values_to_be_between("revenue", min_value=0, max_value=1_000_000) validator.expect_column_median_to_be_between("revenue", min_value=50, max_value=500) validator.expect_column_proportion_of_unique_values_to_be_between( "customer_id", min_value=0.3, max_value=1.0 )

    results = validator.validate() print(results["success"]) # True/False

    The expect_column_median_to_be_between check is particularly useful — it catches the scenario where data looks structurally valid but something has gone statistically wrong (like revenue suddenly dropping 80% because of a source system bug).

    Layer 3: Anomaly Detection

    Statistical checks catch issues that rule-based validation misses. A column can be non-null, correctly typed, and within range — but still be completely wrong.

    A lightweight approach using Z-scores to flag anomalies in daily metrics:

    import pandas as pd
    from scipy import stats

    def detect_anomalies(df: pd.DataFrame, column: str, threshold: float = 3.0) -> pd.DataFrame: """Flag rows where values are more than threshold standard deviations from the mean.""" z_scores = stats.zscore(df[column].dropna()) df = df.copy() df["z_score"] = abs(z_scores) df["is_anomaly"] = df["z_score"] > threshold return df[df["is_anomaly"]]

    Example: check daily order counts

    daily_orders = pd.DataFrame({ "date": pd.date_range("2024-01-01", periods=30), "order_count": [150, 148, 155, 160, 145, 152, 149, 3, 151, 147, ...] # 3 is suspicious })

    anomalies = detect_anomalies(daily_orders, "order_count") if not anomalies.empty: print(f"Anomalies detected on: {anomalies['date'].tolist()}") # Trigger alert

    For production systems, consider more sophisticated approaches like Facebook Prophet for time-series anomaly detection or Isolation Forest for multivariate anomalies. But Z-scores get you surprisingly far and are easy to explain to stakeholders.

    Practical Tips for Production Pipelines

    Don't validate everything equally. Tier your checks. Critical business metrics (revenue, user counts) get strict checks and hard failures. Enrichment data gets soft warnings. Not every failed check should stop your pipeline.

    Log validation results as data. Write your validation outcomes to a table, not just logs. This lets you track data quality trends over time and spot degrading sources before they become critical.

    -- validation_results table
    INSERT INTO data_quality.validation_log (
        run_id, model_name, check_name, status, failed_rows, run_timestamp
    ) VALUES (
        '{{ run_id }}', 'orders', 'revenue_not_negative', 'PASSED', 0, CURRENT_TIMESTAMP
    );

    Validate at every stage boundary. Source → Staging, Staging → Intermediate, Intermediate → Mart. Each handoff is a chance for corruption. Each boundary should have a gate.

    Make failures observable. Wire your validation failures into your alerting system (PagerDuty, Slack, whatever you use). A validation check nobody sees is the same as no validation check.

    Actionable Next Steps

  • Audit your current pipelines — identify where data enters your system with zero validation. That's your highest risk.
  • Add dbt generic tests todaynot_null, unique, and accepted_values take 10 minutes to add and catch a huge percentage of common issues.
  • Pick one critical metric and add a statistical check — implement a Z-score check on your most important daily KPI. You'll probably catch something within a week.
  • Set up a validation log table — start tracking your check results as data so you can build a data quality dashboard.
  • Read the Great Expectations docs — specifically the section on Data Docs, which auto-generates quality reports your stakeholders can actually read.
  • Data quality is a feature, not an afterthought. Build the validation layer now, before your stakeholders lose confidence in your numbers — because once that trust is gone, it's very hard to get back.