Skip links

Building Data Pipelines That Don’t Break at 3 AM

Every data engineer has the story. Your phone buzzes at 3:14 AM. The nightly ETL job failed. Upstream changed a column name, or the API started returning 429s, or a single malformed record caused the entire batch to abort. You open your laptop in bed, squinting at stack traces, trying to remember what this pipeline even does.

Article Overview

Building Data Pipelines That Don't Break at 3 AM

8 sections · Reading flow

01
The Anatomy of a Fragile Pipeline
02
Schema Validation at the Boundary
03
Idempotency: The Foundation of Reliability
04
Chunked Processing with Checkpoints
05
Circuit Breakers for External Dependencies
06
Observability: Knowing Before Your Users Do
07
Testing Data Pipelines
08
Putting It All Together: A Reference Architecture

HARBOR SOFTWARE · Engineering Insights

This is not an inevitable cost of data engineering. It is a design failure. Pipelines break at 3 AM because they were built without resilience as a first-class concern. After building and maintaining data pipelines processing north of 50 million records daily across multiple clients, I have a strong opinion: the reliability of a pipeline matters more than its throughput. A pipeline that processes 10 million records but never fails beats one that processes 100 million but crashes weekly.

Here is how we build pipelines at Harbor Software that let us sleep through the night.

The Anatomy of a Fragile Pipeline

Before we talk about solutions, let us dissect why pipelines break. The failure modes fall into predictable categories, and recognizing them is the first step toward building defenses against each one:

  • Schema drift: Upstream data sources change column names, types, or structures without notice. Your SELECT customer_id FROM orders suddenly throws a “column not found” error because someone renamed it to cust_id. This is the most common failure mode in pipelines that consume data from systems maintained by other teams. It happens more often than anyone wants to admit — roughly once per quarter per upstream source in our experience.
  • Volume spikes: Tuesday’s data is 10x larger than Monday’s. Your pipeline, sized for Monday, runs out of memory or exceeds its timeout window. Black Friday, end-of-quarter pushes, marketing campaign launches — all create volume spikes that expose capacity assumptions you did not know you had.
  • Partial failures: 99.8% of records process fine. The 0.2% that fail take down the entire batch because there is no isolation between records. A single null value in a field your code assumed would always be populated throws an exception that propagates up and kills the whole run.
  • Dependency failures: The API you call is down, the database connection pool is exhausted, or the S3 bucket permissions changed. External dependencies are inherently unreliable, and any pipeline that treats them as always-available will fail when they are not.
  • Silent corruption: The pipeline completes successfully but produces wrong results. No error, no alert. You discover it three weeks later when a quarterly report looks off. This is the most dangerous failure mode because it erodes trust in the entire data platform.

Each of these requires a different mitigation strategy. Let us work through them systematically.

Schema Validation at the Boundary

The single most impactful change you can make to any pipeline is to validate schemas at every ingestion point. Not just “is this valid JSON” but “does this match the contract I expect.” The goal is to catch schema drift at the boundary of your system, before bad data propagates through transformation logic and produces confusing errors downstream.

We use Pydantic for Python pipelines and JSON Schema for language-agnostic validation. The pattern looks like this:

from pydantic import BaseModel, validator
from typing import Optional
from datetime import datetime

class OrderRecord(BaseModel):
    order_id: str
    customer_id: str
    total_amount: float
    currency: str
    created_at: datetime
    line_items: list
    status: str

    @validator('total_amount')
    def amount_must_be_positive(cls, v):
        if v < 0:
            raise ValueError('total_amount cannot be negative')
        return v

    @validator('currency')
    def currency_must_be_valid(cls, v):
        valid = {'USD', 'EUR', 'GBP', 'PKR'}
        if v not in valid:
            raise ValueError(f'Unknown currency: {v}')
        return v

def ingest_orders(raw_records: list[dict]) -> tuple[list[OrderRecord], list[dict]]:
    valid = []
    rejected = []
    for record in raw_records:
        try:
            valid.append(OrderRecord(**record))
        except Exception as e:
            rejected.append({
                'record': record,
                'error': str(e),
                'timestamp': datetime.utcnow().isoformat()
            })
    return valid, rejected

The critical design choice here: validation produces two streams, not an exception. Valid records continue processing. Invalid records go to a dead-letter queue (DLQ) for inspection. The pipeline never stops because of bad data. It quarantines it and keeps moving.

We typically set alerts when the rejection rate exceeds a threshold. If more than 2% of records in a batch are rejected, something systemic changed upstream and a human should investigate. Below that threshold, it is normal noise — data is always messy, and a handful of malformed records in every batch is expected. The important thing is that the pipeline does not crash because of them.

One subtlety worth noting: the DLQ itself needs monitoring. We have seen cases where rejected records accumulated for weeks without anyone looking at them. Eventually, someone realized that a critical data source had been silently failing for a month. Set up a daily digest that reports DLQ depth by source, and escalate if the queue grows beyond a configurable threshold.

Idempotency: The Foundation of Reliability

Every operation in your pipeline should be safe to retry. This sounds obvious, but most pipelines violate it in subtle ways. Consider a pipeline that reads 10,000 records from an API, transforms them, and inserts them into a database. If the insert step fails halfway through, you have inserted 5,000 records. Retrying from the beginning will duplicate those 5,000 records. Now you have corrupt data and no error to show for it.

The fix is to make every write operation idempotent. For database inserts, use UPSERT (or INSERT ... ON CONFLICT DO UPDATE in PostgreSQL). For file writes, use deterministic filenames based on the data content, not timestamps. For API calls, use idempotency keys that the receiving system can deduplicate against.

-- PostgreSQL upsert pattern for idempotent writes
INSERT INTO processed_orders (
    order_id, customer_id, total_amount, processed_at
) VALUES (
    %(order_id)s, %(customer_id)s, %(total_amount)s, NOW()
)
ON CONFLICT (order_id) DO UPDATE SET
    customer_id = EXCLUDED.customer_id,
    total_amount = EXCLUDED.total_amount,
    processed_at = NOW();

With idempotent writes, you can retry any step freely. Failed at step 3? Restart from step 1. The already-inserted records will be updated (which is a no-op if the data has not changed), and the pipeline completes. This is enormously liberating for operations: the recovery procedure for any failure is simply “run it again.”

Idempotency extends beyond database writes. Consider side effects like sending emails or calling external APIs. If your pipeline sends a notification during processing, a retry will send it again. The fix is to track which notifications have been sent (using the record ID as a deduplication key) and skip already-sent notifications on retry. Alternatively, separate the notification logic into a downstream consumer that processes events from a queue — this naturally handles deduplication through message acknowledgment.

Chunked Processing with Checkpoints

Processing 10 million records in a single batch is asking for trouble. Memory spikes, timeout limits, and the blast radius of a failure are all proportional to batch size. If your pipeline processes 10 million records as one batch and fails at record 9.5 million, you have wasted the work on 9.5 million records and have to start over.

Instead, chunk your data into manageable pieces and checkpoint after each chunk:

import json
from pathlib import Path
from datetime import datetime

CHECKPOINT_FILE = Path('/tmp/pipeline_checkpoint.json')
CHUNK_SIZE = 5000

def load_checkpoint() -> dict:
    if CHECKPOINT_FILE.exists():
        return json.loads(CHECKPOINT_FILE.read_text())
    return {'last_offset': 0, 'chunks_processed': 0}

def save_checkpoint(offset: int, chunks: int):
    CHECKPOINT_FILE.write_text(json.dumps({
        'last_offset': offset,
        'chunks_processed': chunks,
        'updated_at': datetime.utcnow().isoformat()
    }))

def process_pipeline():
    checkpoint = load_checkpoint()
    offset = checkpoint['last_offset']
    chunks_done = checkpoint['chunks_processed']

    while True:
        records = fetch_records(offset=offset, limit=CHUNK_SIZE)
        if not records:
            break

        transformed = transform_batch(records)
        load_batch(transformed)  # idempotent upsert

        offset += len(records)
        chunks_done += 1
        save_checkpoint(offset, chunks_done)

        logger.info(
            f"Chunk {chunks_done}: processed {len(records)} records, "
            f"offset now {offset}"
        )

    CHECKPOINT_FILE.unlink()  # clean up on success
    logger.info(f"Pipeline complete: {chunks_done} chunks, {offset} total records")

If the pipeline crashes after processing 15 chunks, it restarts from chunk 16 instead of chunk 1. For a 10-million-record pipeline with 5,000-record chunks, the maximum wasted work on failure is 5,000 records — about 0.05% of total work. Compare that to losing all progress on a monolithic batch run.

For production systems, we store checkpoints in Redis rather than local files. This makes them accessible across worker instances and survives container restarts. The Redis checkpoint also enables a nice operational feature: you can inspect the checkpoint to see exactly where a pipeline is in its current run without looking at logs.

Choosing the right chunk size is important. Too small (100 records) and the overhead of checkpointing and committing dominates processing time. Too large (1 million records) and you lose the resilience benefits. We typically start at 5,000 records and tune based on the ratio of processing time to checkpoint overhead. The goal is chunks that take 10-30 seconds to process — long enough to amortize overhead, short enough to limit wasted work on failure.

Circuit Breakers for External Dependencies

When your pipeline depends on an external API, database, or service, you need circuit breakers. The pattern comes from electrical engineering: when current exceeds safe levels, the breaker trips to prevent damage. In software, a circuit breaker tracks failures to an external dependency. After N consecutive failures, it “opens” the circuit and fails fast for subsequent calls instead of waiting for timeouts. After a cooldown period, it allows a single test call through. If that succeeds, the circuit closes and normal operation resumes.

import time
from enum import Enum

class CircuitState(Enum):
    CLOSED = 'closed'       # Normal operation
    OPEN = 'open'           # Failing fast
    HALF_OPEN = 'half_open' # Testing recovery

class CircuitBreaker:
    def __init__(self, failure_threshold=5, recovery_timeout=60):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.state = CircuitState.CLOSED
        self.failure_count = 0
        self.last_failure_time = None

    def call(self, func, *args, **kwargs):
        if self.state == CircuitState.OPEN:
            if time.time() - self.last_failure_time > self.recovery_timeout:
                self.state = CircuitState.HALF_OPEN
            else:
                raise CircuitBreakerOpen(
                    f"Circuit is open, failing fast. "
                    f"Recovery in {self.recovery_timeout - (time.time() - self.last_failure_time):.0f}s"
                )

        try:
            result = func(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            raise

    def _on_success(self):
        self.failure_count = 0
        self.state = CircuitState.CLOSED

    def _on_failure(self):
        self.failure_count += 1
        self.last_failure_time = time.time()
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN

In practice, we combine circuit breakers with exponential backoff and jitter. The first retry waits 1 second, the second waits 2 seconds, the third waits 4 seconds, each with a random jitter of plus or minus 25%. This prevents thundering herd problems when a dependency recovers and all clients retry simultaneously. The combination of circuit breakers and backoff means that when an external service goes down, your pipeline pauses gracefully instead of hammering the failing service with retries.

We wrap every external dependency in a circuit breaker — not just APIs, but database connections, S3 calls, and even internal service calls. Each breaker is independently configured with thresholds appropriate to the dependency. A third-party API with a known 99.5% SLA gets a breaker with a higher failure threshold (10) and longer recovery timeout (120 seconds) than an internal database that should have 99.99% availability (threshold of 3, recovery of 30 seconds).

Observability: Knowing Before Your Users Do

A pipeline without observability is a ticking bomb. You need three layers of visibility into every pipeline, and each layer serves a different purpose:

Metrics are quantitative measurements sampled over time. Records processed per minute, error rates, latency percentiles (p50, p95, p99), queue depth, memory usage. We use Prometheus with Grafana dashboards. The key metrics for any pipeline that we always instrument:

  • pipeline_records_processed_total — counter, labeled by stage and status (success/failed)
  • pipeline_stage_duration_seconds — histogram, labeled by stage name
  • pipeline_errors_total — counter, labeled by error_type and stage
  • pipeline_dlq_depth — gauge, how many rejected records are pending review
  • pipeline_last_successful_run_timestamp — gauge, used for staleness alerting
  • pipeline_checkpoint_offset — gauge, progress indicator for long-running jobs

Logs are structured events for debugging. Every log line should be JSON with consistent fields: timestamp, pipeline name, stage, record_id (if applicable), and the message. Use correlation IDs to trace a single record through all pipeline stages. When something goes wrong, you should be able to search for a specific record ID and see every log entry from ingestion through transformation through loading, across all services.

Alerts are the bridge between observability and action. Our alert rules follow a philosophy of alerting on symptoms, not causes. We do not alert on every individual error. We alert on rates and trends that indicate systemic problems:

  1. Pipeline has not completed in expected window — the nightly job should finish by 4 AM. If it has not, something is wrong.
  2. Error rate exceeds threshold — more than 2% of records rejected in a batch indicates a systemic issue upstream.
  3. DLQ depth is growing — rejected records are accumulating without being addressed.
  4. Latency regression — p95 processing time increased by more than 50% versus the 7-day rolling average.
  5. Pipeline staleness — the last successful run was more than 2x the expected interval ago.

Notice what is absent from this list: alerts on every individual error. Individual record failures are noise. The signal is in the aggregate behavior. If one record out of a million fails, that is normal. If five thousand records out of a million fail, that is a problem. The alert rules capture this distinction.

Testing Data Pipelines

Testing pipelines is notoriously difficult because the inputs are often messy, real-world data that is hard to synthesize. Here is our layered testing strategy:

Unit tests for transformation logic. These are straightforward: given this input record, assert this output record. Cover edge cases aggressively — null values, Unicode characters, extreme numbers (negative, zero, max float), empty strings, nested nulls, timestamps in unusual formats. We typically write 10-15 unit tests per transformation function, with at least half covering edge cases rather than the happy path.

Contract tests for external interfaces. If your pipeline reads from an API, maintain a set of recorded responses (fixtures) and test against them. When the API changes, the fixtures break before your production pipeline does. We use vcrpy or responses in Python to record and replay HTTP interactions.

Integration tests with real databases. Use Docker Compose to spin up PostgreSQL, Redis, and any other dependencies. Run the pipeline end-to-end against a small dataset (1,000-5,000 records). We use pytest-docker-compose for this. These tests catch issues that unit tests miss: connection handling, transaction boundaries, query performance on realistic schemas.

Data quality tests in production. After every pipeline run, execute a suite of assertions against the output data. These are not software tests — they are data tests. They verify invariants that should always hold true:

def test_output_quality(db_connection):
    # No null order_ids in the output table
    result = db_connection.execute(
        "SELECT COUNT(*) FROM processed_orders WHERE order_id IS NULL"
    ).scalar()
    assert result == 0, f"Found {result} null order_ids"

    # Total amount should always be positive
    result = db_connection.execute(
        "SELECT COUNT(*) FROM processed_orders WHERE total_amount <= 0"
    ).scalar()
    assert result == 0, f"Found {result} non-positive amounts"

    # Record count should be within 10% of yesterday's count
    today_count = db_connection.execute(
        "SELECT COUNT(*) FROM processed_orders "
        "WHERE processed_at::date = CURRENT_DATE"
    ).scalar()
    yesterday_count = db_connection.execute(
        "SELECT COUNT(*) FROM processed_orders "
        "WHERE processed_at::date = CURRENT_DATE - 1"
    ).scalar()
    if yesterday_count > 0:
        change_pct = abs(today_count - yesterday_count) / yesterday_count
        assert change_pct < 0.10, (
            f"Record count changed by {change_pct:.1%} vs yesterday "
            f"({today_count} vs {yesterday_count})"
        )

    # Referential integrity: all customer_ids exist in the customers table
    result = db_connection.execute(
        "SELECT COUNT(*) FROM processed_orders o "
        "LEFT JOIN customers c ON o.customer_id = c.id "
        "WHERE c.id IS NULL"
    ).scalar()
    assert result == 0, f"Found {result} orders with invalid customer_ids"

These data quality tests run as the final stage of every pipeline execution. A failed quality test does not roll back the data (it is already written), but it triggers an immediate alert so someone can investigate before downstream consumers use the data. We also maintain a quality score dashboard that tracks the trend of data quality over time -- catching gradual degradation that individual test runs might miss.

Putting It All Together: A Reference Architecture

Here is the architecture we use for most data pipelines at Harbor Software:

  1. Ingestion layer: Reads from source (API, file, message queue). Validates schema using Pydantic models. Splits into valid stream and DLQ. Records ingestion metrics (records received, rejected, latency).
  2. Transform layer: Applies business logic to valid records. Chunked processing with Redis-backed checkpoints. Circuit breakers for any external lookups during transformation. Each transformation function is independently unit-tested.
  3. Load layer: Writes to target (database, data warehouse, file storage). All writes are idempotent using upserts or deterministic file naming. Batched for performance (we use psycopg2.extras.execute_values for PostgreSQL bulk inserts at 10-50x the speed of individual inserts).
  4. Quality layer: Runs data quality assertions against the output. Alerts on failures. Logs quality scores for trend analysis.
  5. Observability layer: Prometheus metrics, structured JSON logs, and alert rules across all stages. Grafana dashboards for real-time monitoring and historical analysis.

The orchestrator -- we use Prefect over Airflow for most projects because the API is cleaner, the local development experience is dramatically better, and the Python-native approach avoids Airflow's DAG serialization quirks -- manages scheduling, retries, and dependency ordering. Each stage is a separate task with its own retry policy. The orchestrator also provides a web UI where operators can see pipeline status, trigger manual runs, and inspect failed tasks without touching the command line.

The entire pipeline is containerized with Docker and deployed on Kubernetes. Each run gets its own pod, so resource contention between runs is impossible. Failed runs leave their logs and checkpoints intact for debugging. Kubernetes resource limits (CPU and memory) provide a safety net against runaway jobs consuming cluster resources.

Conclusion

Reliable data pipelines are not built with clever algorithms or cutting-edge technology. They are built with paranoid engineering: validate everything at the boundary, make every operation retryable, chunk large jobs into small pieces with checkpoints, isolate failures from healthy records, wrap every external call in a circuit breaker, and watch everything with metrics and alerts.

The pipeline that processes 10 million records flawlessly every night without waking anyone up is not impressive because of its scale. It is impressive because of its boring, methodical, defense-in-depth approach to every possible failure mode. Each guardrail individually seems like overkill. Together, they create a system that absorbs the constant chaos of real-world data -- schema changes, volume spikes, upstream outages, malformed records -- without breaking. That is the kind of engineering that lets you sleep at night.

Leave a comment

Explore
Drag