Skip links

Real-Time Data Processing with Python: Lessons from SparkAI

Python is the default language for data processing, but most Python data pipelines are batch-oriented: read a file, transform, write results. When a client needed sub-second processing of streaming sensor data with AI inference at the edge, we had to rethink everything we knew about Python performance. The project, internally called SparkAI, processed 15,000 events per second from IoT sensors, ran anomaly detection models on each event, and pushed alerts to operators in under 800ms end-to-end. Here is how we built it without abandoning Python for Go or Rust.

Article Overview

Real-Time Data Processing with Python: Lessons from SparkAI

8 sections · Reading flow

01
Why Python for Real-Time (And When Not To)
02
Architecture: Async Everywhere
03
Serialization: MessagePack Over JSON
04
Memory Management: Object Pooling and Slots
05
Multiprocessing: Scaling Beyond the GIL
06
Monitoring and Backpressure
07
Error Handling and Dead Letter Queues
08
Results and Takeaways

HARBOR SOFTWARE · Engineering Insights

Why Python for Real-Time (And When Not To)

The honest answer is that Python is not the optimal language for real-time data processing if you measure only by raw throughput and latency. Go, Rust, Java, and even Node.js all offer better latency characteristics for pure data shuffling. But Python has two advantages that outweigh raw performance in many scenarios:

  • Ecosystem: NumPy, pandas, scikit-learn, PyTorch, ONNX Runtime, and the entire ML stack are Python-native. If your pipeline includes ML inference (and in 2024, most interesting pipelines do), writing the pipeline in Python avoids a serialization boundary between your processing logic and your models. Crossing that boundary (Python model server behind a gRPC API called from a Go pipeline) adds 2-5ms of overhead per call and introduces a network failure point that does not exist when everything runs in-process.
  • Team velocity: Most data teams know Python. Forcing them to write Go for the pipeline and Python for the models doubles the cognitive load, doubles the codebase surface area, and creates a skill silo where pipeline bugs require a different engineer than model bugs. For teams under 8 engineers, this overhead is rarely justified.

The rule we follow: if your processing is pure data transformation without ML (ETL, aggregation, routing, format conversion), use Go or Rust. If ML inference is in the critical path, use Python with the optimization techniques below. If you need both, consider a hybrid: Go for ingestion and routing, Python for inference workers, communicating via a message queue.

Architecture: Async Everywhere

The single biggest performance lever in Python real-time processing is eliminating blocking I/O. Every millisecond spent waiting on a network call or disk read is a millisecond your pipeline is not processing events. Python’s Global Interpreter Lock (GIL) means you cannot use threads to overlap CPU work, but async I/O lets you overlap network waits with processing. We use asyncio as the foundation with uvloop as a drop-in replacement for the default event loop. uvloop is a Cython wrapper around libuv (the same event loop that powers Node.js) and is 2-4x faster than the default asyncio loop for I/O-heavy workloads:

import asyncio
import uvloop
from dataclasses import dataclass
from typing import AsyncIterator

uvloop.install()

@dataclass
class SensorEvent:
    device_id: str
    timestamp: float
    readings: dict[str, float]
    
class StreamProcessor:
    def __init__(self, config: ProcessorConfig):
        self.redis = aioredis.from_url(config.redis_url)
        self.db_pool = asyncpg.create_pool(
            config.postgres_url, 
            min_size=5, max_size=20
        )
        self.model = AnomalyDetector.load(config.model_path)
        self.buffer = asyncio.Queue(maxsize=10_000)
    
    async def consume(self, stream: AsyncIterator[bytes]):
        """Read from Kafka/Redis Stream and decode."""
        async for raw in stream:
            event = SensorEvent.from_msgpack(raw)
            await self.buffer.put(event)
    
    async def process(self):
        """Process events from the buffer in micro-batches."""
        while True:
            batch = []
            try:
                while len(batch) < 64:
                    event = await asyncio.wait_for(
                        self.buffer.get(), timeout=0.01
                    )
                    batch.append(event)
            except asyncio.TimeoutError:
                pass
            
            if batch:
                await self._process_batch(batch)
    
    async def _process_batch(self, batch: list[SensorEvent]):
        # Vectorized feature extraction
        features = self._extract_features(batch)
        
        # Batched ML inference
        anomaly_scores = self.model.predict_batch(features)
        
        # Identify alerts
        alerts = []
        for event, score in zip(batch, anomaly_scores):
            if score > self.config.threshold:
                alerts.append(Alert(
                    device_id=event.device_id,
                    score=float(score),
                    timestamp=event.timestamp
                ))
        
        # Fan out: store results and send alerts concurrently
        await asyncio.gather(
            self._store_results(batch, anomaly_scores),
            self._send_alerts(alerts)
        )
    
    async def run(self, consumer):
        """Main entry point: run consume and process concurrently."""
        await asyncio.gather(
            self.consume(consumer),
            self.process()
        )

Micro-batching: the key optimization

Processing events one at a time through a Python ML model is catastrophically slow. A single ONNX Runtime forward pass has approximately 5ms of overhead regardless of batch size, because of Python-to-C bridge crossing, memory allocation, and result marshaling. The actual computation time for a small anomaly detection model is roughly 0.1ms per sample but only 2ms for 64 samples, because NumPy and ONNX Runtime use SIMD vectorized operations that process multiple samples in parallel. By micro-batching (collecting 32-128 events before calling the model), we reduced per-event inference time from 12ms to 0.4ms, a 30x improvement.

The trade-off is added latency: we wait up to 10ms (the timeout=0.01 in asyncio.wait_for) to fill the batch. In practice, at 15,000 events/second, the 64-event batch fills in under 5ms. We dynamically adjust the batch size based on incoming event rate: during quiet periods, we reduce the batch size to 8 to avoid adding unnecessary latency, and during bursts we increase to 128 to maximize throughput. The adaptation uses an exponentially weighted moving average of the inter-event arrival time.

Serialization: MessagePack Over JSON

JSON serialization and deserialization is one of the most expensive operations in a Python data pipeline. The standard library json module is pure Python and painfully slow. For our sensor events, switching from json.loads() to msgpack.unpackb() reduced deserialization time by 73% and wire size by 35%:

import msgpack
import json
import time

# Benchmark: 10,000 sensor events
events = [{
    "device_id": f"sensor_{i}",
    "timestamp": time.time(),
    "readings": {
        "temp": 22.5 + (i % 10) * 0.1,
        "humidity": 45.0 + (i % 20) * 0.5,
        "pressure": 1013.25 + (i % 5) * 0.3,
        "vibration": 0.02 + (i % 100) * 0.001
    }
} for i in range(10_000)]

# JSON: serialize + deserialize
json_data = [json.dumps(e).encode() for e in events]
# Serialize: 28ms, Deserialize: 35ms, Size: 1.12MB total

# MessagePack: serialize + deserialize  
msgpack_data = [msgpack.packb(e) for e in events]
# Serialize: 8ms, Deserialize: 9ms, Size: 0.73MB total

# orjson (fastest JSON in Python, C extension)
import orjson
orjson_data = [orjson.dumps(e) for e in events]
# Serialize: 5ms, Deserialize: 12ms, Size: 1.12MB total

If your consumers require JSON format (HTTP APIs, JavaScript frontend clients, third-party integrations), use orjson instead of the standard library json module. It is a C extension that is 3-5x faster for serialization and 2-3x faster for deserialization. It also handles datetime objects natively without a custom encoder, which eliminates a common source of bugs. If you control both ends of the wire (internal service-to-service communication), use MessagePack for the smaller wire format and faster deserialization. The 35% size reduction also means 35% less network bandwidth and Redis memory if you are using Redis Streams as a buffer.

Memory Management: Object Pooling and Slots

At 15,000 events/second, Python’s garbage collector becomes a visible source of latency jitter. Every event creates a new Python object (with its associated 64-byte overhead for the object header, reference count, type pointer, and dict pointer). These objects are allocated, used for a few milliseconds during processing, then become garbage. The GC must scan and collect them, and GC pauses show up as latency spikes at p99. Two techniques reduced our GC pause time by 80%:

# Technique 1: __slots__ eliminates the per-instance __dict__
# Reduces per-object memory from 352 bytes to 104 bytes
class SensorEvent:
    __slots__ = ('device_id', 'timestamp', 'readings', 
                 '_features', '_score')
    
    def __init__(self, device_id: str, timestamp: float, 
                 readings: dict):
        self.device_id = device_id
        self.timestamp = timestamp
        self.readings = readings
        self._features = None
        self._score = None

# Technique 2: Object pooling reuses objects instead of 
# allocating/deallocating
class EventPool:
    def __init__(self, size: int = 1000):
        self._pool = [SensorEvent.__new__(SensorEvent) 
                      for _ in range(size)]
        self._available = list(range(size))
    
    def acquire(self) -> SensorEvent:
        if self._available:
            idx = self._available.pop()
            return self._pool[idx]
        # Pool exhausted: create new (falls back to allocation)
        return SensorEvent.__new__(SensorEvent)
    
    def release(self, obj: SensorEvent):
        # Reset fields to avoid holding references
        obj._features = None
        obj._score = None
        idx = self._pool.index(obj) if obj in self._pool else -1
        if idx >= 0:
            self._available.append(idx)

# Technique 3: Pre-allocated NumPy arrays for feature extraction
class FeatureBuffer:
    def __init__(self, batch_size: int = 128, 
                 n_features: int = 12):
        self.buffer = np.zeros(
            (batch_size, n_features), dtype=np.float32
        )
    
    def fill(self, events: list[SensorEvent]) -> np.ndarray:
        for i, event in enumerate(events):
            self.buffer[i, 0] = event.readings['temp']
            self.buffer[i, 1] = event.readings['humidity']
            self.buffer[i, 2] = event.readings['pressure']
            self.buffer[i, 3] = event.readings['vibration']
            # ... derived features ...
        return self.buffer[:len(events)]

The __slots__ optimization alone reduced per-object memory from 352 bytes to 104 bytes for our event class. At 15K events/second with a 5-second processing window (75K live objects at any time), that is the difference between 26MB and 7.8MB of live objects in memory. Less memory pressure means fewer GC pauses, which means more predictable latency. We measured p99 latency drop from 180ms to 45ms after implementing __slots__ and object pooling together.

The pre-allocated NumPy array technique avoids creating temporary arrays during feature extraction. Without it, each batch creates a new (64, 12) NumPy array (3KB), uses it for inference, and discards it. At 234 batches per second (15K events / 64 per batch), that is 700KB/second of short-lived allocations that the GC must track. With a pre-allocated buffer that we reuse for every batch, the allocation happens once at startup and never again.

Multiprocessing: Scaling Beyond the GIL

Python’s Global Interpreter Lock limits CPU-bound Python bytecode execution to a single core. For ML inference, this is often not the bottleneck because NumPy, PyTorch, and ONNX Runtime release the GIL during C-level computation. But for Python-heavy preprocessing (string parsing, feature engineering, business rule evaluation, data validation), the GIL is a real constraint that limits you to one core’s worth of Python computation.

Our solution is a process-per-partition architecture. Each Kafka partition is consumed by a separate Python process, with no shared state between processes. Each process runs its own event loop, its own model instance, and its own database connection pool:

import multiprocessing as mp
import os

def run_processor(partition_id: int, config: dict):
    """Each process handles one Kafka partition."""
    # Set CPU affinity for predictable performance
    os.sched_setaffinity(0, {partition_id % os.cpu_count()})
    
    uvloop.install()
    processor = StreamProcessor(config)
    
    consumer = KafkaConsumer(
        topic=config['topic'],
        partition=partition_id,
        group_id=config['group_id'],
        auto_offset_reset='latest'
    )
    
    asyncio.run(processor.run(consumer))

def main():
    config = load_config()
    num_partitions = 8
    
    processes = []
    for i in range(num_partitions):
        p = mp.Process(
            target=run_processor, 
            args=(i, config),
            name=f"processor-{i}"
        )
        p.start()
        processes.append(p)
    
    # Monitor child processes, restart on failure
    while True:
        for i, p in enumerate(processes):
            if not p.is_alive():
                print(f"Process {i} died (exit code {p.exitcode}), restarting")
                new_p = mp.Process(
                    target=run_processor, 
                    args=(i, config)
                )
                new_p.start()
                processes[i] = new_p
        time.sleep(5)

This gives us true parallelism: 8 partitions = 8 processes = 8 cores utilized. The overhead is memory (each process loads the model separately, roughly 200MB per process for our ONNX model). For models under 500MB, this is acceptable. For larger models like transformer-based detectors, we use a shared model server (Triton Inference Server or a simple FastAPI service) that all processes call, trading some latency for memory efficiency.

We also pin each process to a specific CPU core using os.sched_setaffinity. This prevents the OS scheduler from migrating processes between cores, which causes cache invalidation and unpredictable latency spikes. On our 8-core machines, this reduced p99 latency by 15%.

Monitoring and Backpressure

Real-time systems fail differently than batch systems. A batch job that runs slow just finishes late. A real-time pipeline that runs slow accumulates unbounded lag, consumes all available memory buffering the backlog, and eventually crashes. When it restarts, the lag is even worse because it has to process the accumulated backlog on top of ongoing events. Backpressure mechanisms are not optional; they are the difference between a system that degrades gracefully and one that falls off a cliff:

class BackpressureMonitor:
    def __init__(self, max_lag_seconds: float = 30.0):
        self.max_lag = max_lag_seconds
        self.metrics = PrometheusMetrics()
        self._consecutive_overloads = 0
    
    async def check(self, event: SensorEvent) -> bool:
        lag = time.time() - event.timestamp
        self.metrics.processing_lag.observe(lag)
        
        if lag > self.max_lag:
            self._consecutive_overloads += 1
            
            # Strategy 1: Drop non-critical events
            if not event.is_critical:
                self.metrics.dropped_events.inc()
                return False  # Do not process
            
            # Strategy 2: If sustained overload, pause consumption
            if self._consecutive_overloads > 100:
                self.metrics.backpressure_pauses.inc()
                await self.pause_consumption(duration=10)
                self._consecutive_overloads = 0
        else:
            self._consecutive_overloads = 0
        
        return True  # Process normally

We export four key metrics to Prometheus and display them on a Grafana dashboard: events processed per second (throughput), processing lag in seconds (time between event timestamp and processing completion), buffer occupancy percentage (how full the asyncio queue is), and inference latency per batch (model performance). An alert fires if processing lag exceeds 5 seconds sustained for more than 2 minutes, or if buffer occupancy exceeds 80% for more than 1 minute. These thresholds were calibrated empirically: 5 seconds of lag is recoverable within a few minutes, but 30 seconds of lag at our event rate means 450K events backed up, which takes 30+ minutes to drain even at full throughput.

Error Handling and Dead Letter Queues

In a real-time pipeline, error handling cannot be an afterthought. When an event fails processing (malformed data, model inference error, downstream service unavailable), you have three options: drop the event silently (unacceptable for safety-critical sensor data), retry indefinitely (risks blocking the entire pipeline if the error is permanent), or route to a dead-letter queue for later investigation and replay. We implement a three-strike policy with exponential backoff:

class ErrorHandler:
    def __init__(self, max_retries: int = 3):
        self.max_retries = max_retries
        self.retry_counts: dict[str, int] = {}

    async def handle_failure(
        self, event: SensorEvent, error: Exception
    ) -> str:
        event_key = f"{event.device_id}:{event.timestamp}"
        count = self.retry_counts.get(event_key, 0) + 1
        self.retry_counts[event_key] = count

        if count <= self.max_retries:
            delay = 2 ** count  # 2s, 4s, 8s
            await asyncio.sleep(delay)
            return "retry"
        else:
            await self.dead_letter_queue.put({
                "event": event.to_dict(),
                "error": str(error),
                "attempts": count,
                "failed_at": datetime.utcnow().isoformat()
            })
            del self.retry_counts[event_key]
            return "dead_lettered"

The dead-letter queue is a separate Redis Stream that a human operator reviews daily. Each failed event includes the full event payload, the error message, the number of retry attempts, and a timestamp. In our production deployment, DLQ entries fall into three categories: malformed data from a misconfigured sensor (40% of DLQ volume, requires the device operations team to fix the sensor configuration), transient downstream failures that resolved themselves within minutes (35%, these events can be safely replayed from the DLQ after the downstream service recovers), and genuine model failures on edge-case inputs that the anomaly detection model was not trained to handle (25%, requires model retraining with the new edge cases added to the training set).

We also maintain a poison pill detector that identifies events causing repeated failures across the pipeline. If the same device_id produces more than 10 DLQ entries in a single hour, the pipeline automatically quarantines that device: its events are diverted from the real-time pipeline to a lower-priority batch processing queue, and an alert is sent to the operations team via PagerDuty. This prevents a single malfunctioning sensor from degrading the entire pipeline through repeated retry cycles that consume processing capacity meant for healthy events. In one incident, a firmware bug on a batch of 12 sensors caused them to emit readings with timestamps in the year 2038 (a Unix timestamp overflow). Without the poison pill detector, these events would have caused repeated failures in our time-series storage layer, creating a backlog that affected all 3,000 healthy sensors. The quarantine triggered within 90 seconds and isolated the problem devices automatically.

Results and Takeaways

The final system processes 15,000 events/second sustained on a 3-node cluster (8 vCPUs, 16GB RAM each), with end-to-end latency (sensor event emission to alert delivery) of 400-800ms. The latency breakdown: 50ms network transit (sensor to Kafka), 5ms Kafka consumer delivery, 10ms deserialization, 150ms average micro-batch fill time, 80ms batched inference, 40ms result storage (async PostgreSQL write), and 20ms alert dispatch (async Redis publish to WebSocket server). The remaining variance is queue wait time and GC pauses.

Python is viable for real-time data processing up to roughly 50,000 events/second per node with the optimizations described here. Beyond that throughput, the per-event Python overhead (even with __slots__, object pooling, and micro-batching) becomes the dominant cost and you are fighting the language rather than leveraging it. The key techniques in order of impact: micro-batching for ML inference (30x throughput improvement), async I/O everywhere (eliminates blocking waits), efficient serialization with MessagePack (3x faster than json), process-per-partition parallelism (linear scaling across cores), and memory optimization with __slots__ and pre-allocated buffers (80% reduction in GC pauses). Skip any one of these and you will hit a wall well below our achieved throughput.

Leave a comment

Explore
Drag