Redis Beyond Caching: Queues, Pub/Sub, and Rate Limiting
Redis is one of the most misunderstood tools in the modern software stack. Most teams discover it as a cache — a faster alternative to hitting the database for frequently accessed data. They install it, configure a TTL, and move on. That is a valid use case, but it is like buying a Swiss Army knife and only ever using the bottle opener.
Redis is an in-memory data structure server. The key word is “data structure” — not “key-value store,” not “cache.” It natively supports strings, lists, sets, sorted sets, hashes, streams, bitmaps, and HyperLogLog structures. Each data structure enables design patterns that would require a dedicated service, a complex library, or significant custom code to implement otherwise. And because Redis operations are single-threaded and atomic, these patterns work correctly in distributed systems without the locking headaches that plague application-level implementations.
We use Redis extensively at Harbor Software, far beyond caching. Here are the patterns that have become essential to our architecture, with production-ready code and the operational lessons we have learned running them at scale.
Pattern 1: Reliable Task Queues with Redis Lists
Redis lists with LPUSH and BRPOP create a simple, reliable task queue. A producer pushes tasks onto one end of the list, and one or more consumers block-pop tasks from the other end. The blocking pop is key — BRPOP puts the consumer to sleep until a task arrives, using zero CPU while waiting. This is the foundation of most background job systems including Celery (when configured with a Redis broker) and Sidekiq.
The naive implementation has a critical reliability problem: if a consumer pops a task and then crashes (process killed, machine dies, deployment rolls out) before processing it, the task is lost forever. Redis solves this with BLMOVE (Redis 6.2+, formerly BRPOPLPUSH), which atomically moves an item from one list to another. The “processing” list acts as an in-flight ledger — if a consumer crashes, its tasks are still in the processing list and can be recovered.
import redis
import json
import time
from typing import Optional
from datetime import datetime
class ReliableQueue:
def __init__(self, redis_client: redis.Redis, queue_name: str):
self.r = redis_client
self.queue = f"queue:{queue_name}"
self.processing = f"queue:{queue_name}:processing"
self.dead_letter = f"queue:{queue_name}:dead"
def enqueue(self, task: dict) -> str:
"""Add a task to the queue. Returns the task ID."""
task_id = f"{queue_name}:{int(time.time()*1000)}"
payload = json.dumps({
'id': task_id,
'task': task,
'enqueued_at': time.time(),
'attempts': 0
})
self.r.lpush(self.queue, payload)
return task_id
def dequeue(self, timeout: int = 30) -> Optional[str]:
"""Block-pop a task, moving it to processing list atomically."""
result = self.r.blmove(
self.queue, self.processing,
timeout=timeout,
wherefrom='RIGHT',
whereto='LEFT'
)
return result # raw payload string
def complete(self, task_payload: str):
"""Mark a task as completed by removing from processing list."""
removed = self.r.lrem(self.processing, 1, task_payload)
if removed == 0:
# Task was already recovered or completed by another process
pass
def fail(self, task_payload: str, max_retries: int = 3):
"""Handle a failed task: retry or dead-letter."""
task_data = json.loads(task_payload)
task_data['attempts'] += 1
task_data['last_error_at'] = time.time()
if task_data['attempts'] >= max_retries:
task_data['dead_lettered_at'] = time.time()
self.r.lpush(self.dead_letter, json.dumps(task_data))
else:
# Re-enqueue for retry with exponential backoff marker
task_data['retry_after'] = time.time() + (2 ** task_data['attempts'])
self.r.lpush(self.queue, json.dumps(task_data))
self.r.lrem(self.processing, 1, task_payload)
def recover_stale(self, max_age_seconds: int = 300) -> int:
"""Recover tasks stuck in processing for too long."""
items = self.r.lrange(self.processing, 0, -1)
recovered = 0
now = time.time()
for item in items:
data = json.loads(item)
age = now - data.get('enqueued_at', 0)
if age > max_age_seconds:
self.fail(item)
recovered += 1
return recovered
def stats(self) -> dict:
"""Get queue statistics."""
return {
'pending': self.r.llen(self.queue),
'processing': self.r.llen(self.processing),
'dead_letter': self.r.llen(self.dead_letter)
}
The recover_stale method is the key reliability mechanism. A periodic health check running every 60 seconds scans the processing list for tasks that have been there too long — indicating a crashed worker that will never complete them. Those tasks are either retried (if under the retry limit) or moved to the dead letter queue for manual investigation.
This pattern handles our document processing pipeline at Harbor: incoming documents are enqueued by the web API, worker processes pop and process them, and the health check recovers any orphaned tasks. We process roughly 15,000 documents per day through this queue with zero lost tasks over the past 14 months of operation.
For more advanced needs, Redis Streams (the XADD / XREADGROUP API introduced in Redis 5.0) provide consumer groups with built-in acknowledgment tracking, persistent message storage, and the ability for multiple independent consumer groups to read the same stream. Streams are the right choice when you need multiple consumers processing the same events independently (e.g., one consumer updates the database while another sends notifications), or when you need a durable audit log of all messages that survives consumer restarts.
Pattern 2: Pub/Sub for Real-Time Event Propagation
Redis Pub/Sub provides fire-and-forget messaging between services. A publisher sends a message to a named channel, and all subscribers currently listening on that channel receive it immediately. Unlike queues, Pub/Sub messages are not persisted and are not replayed to new subscribers — if no subscriber is listening when a message is published, the message is lost forever.
This “lossy” behavior is actually a feature for the right use cases. Pub/Sub is perfect for real-time events where occasional loss is acceptable and where you need broadcast semantics (one message, multiple receivers):
# Publisher: broadcast competitive intelligence events
import redis
import json
import time
def publish_ci_event(r: redis.Redis, event_type: str, data: dict):
event = {
'type': event_type,
'data': data,
'timestamp': time.time(),
'source': 'ci-collector'
}
subscriber_count = r.publish('ci:events', json.dumps(event))
return subscriber_count # 0 if nobody is listening
# Subscriber: react to CI events in real-time
def subscribe_ci_events(r: redis.Redis, handlers: dict):
pubsub = r.pubsub()
pubsub.subscribe('ci:events')
for message in pubsub.listen():
if message['type'] != 'message':
continue
event = json.loads(message['data'])
handler = handlers.get(event['type'])
if handler:
try:
handler(event['data'])
except Exception as e:
logger.error(f"Handler failed for {event['type']}: {e}")
# Usage: multiple independent subscribers
handlers = {
'price_change': lambda d: update_dashboard(d),
'feature_change': lambda d: send_slack_alert(d),
'competitor_news': lambda d: enqueue_analysis(d)
}
We use Pub/Sub for three distinct purposes in our architecture:
- Cache invalidation broadcast. When any service updates shared data, it publishes an invalidation event on a channel. All services caching that data receive the event and clear their local caches immediately. This is more responsive than TTL-based expiration (which can serve stale data for minutes) and more efficient than polling.
- Real-time dashboard updates. Backend services publish events (new document processed, CI change detected, pipeline stage completed). The dashboard backend subscribes and pushes updates to the browser via WebSocket. This gives users a live view of system activity without polling.
- Service coordination signals. When a long-running pipeline stage completes, it publishes a completion signal. Downstream stages subscribed to that channel start immediately without polling for state changes. This reduces latency between pipeline stages from the polling interval (typically 5-30 seconds) to near-zero.
The critical operational caveat: Pub/Sub is lossy by design. If a subscriber disconnects (network blip, deployment restart) and reconnects, it misses every message published during the gap. For data that cannot be lost, use Redis Streams (which persist messages and track consumer positions) or a proper message broker like RabbitMQ. Use Pub/Sub only for supplementary real-time updates that enhance the user experience but are not required for correctness.
Pattern 3: Distributed Rate Limiting
Rate limiting is essential for any system that calls external APIs (most of which enforce their own rate limits), serves public endpoints (protecting against abuse), or processes shared resources (preventing one tenant from monopolizing capacity). Redis provides the atomic operations needed to implement rate limiting correctly across multiple application instances.
The most versatile algorithm is the sliding window counter using a sorted set. It tracks the number of requests in a rolling time window (e.g., 100 requests per 60 seconds) without the boundary-crossing problems of fixed time windows:
import redis
import time
import uuid
class SlidingWindowRateLimiter:
def __init__(self, redis_client: redis.Redis):
self.r = redis_client
def is_allowed(
self, key: str, max_requests: int, window_seconds: int
) -> tuple[bool, int]:
"""
Check if a request is allowed under the rate limit.
Returns (allowed: bool, remaining: int).
Uses a sorted set with timestamps as scores.
"""
now = time.time()
window_start = now - window_seconds
pipe_key = f"ratelimit:{key}"
pipe = self.r.pipeline()
# Remove expired entries outside the window
pipe.zremrangebyscore(pipe_key, '-inf', window_start)
# Count current entries in the window
pipe.zcard(pipe_key)
# Optimistically add this request
member = f"{now}:{uuid.uuid4().hex[:8]}"
pipe.zadd(pipe_key, {member: now})
# Auto-expire the key to prevent memory leaks
pipe.expire(pipe_key, window_seconds + 10)
results = pipe.execute()
current_count = results[1] # zcard result before our add
if current_count >= max_requests:
# Over limit: remove our optimistic addition
self.r.zrem(pipe_key, member)
remaining = 0
return False, remaining
remaining = max(0, max_requests - current_count - 1)
return True, remaining
def reset(self, key: str):
"""Reset the rate limit for a key."""
self.r.delete(f"ratelimit:{key}")
We use this rate limiter for three categories of protection:
- External API call budgets. OpenAI has rate limits of 60 requests per minute on certain tiers. Vendor APIs have their own limits. The rate limiter sits in front of every external API client, ensuring we never exceed published limits and trigger 429 responses that complicate error handling.
- Per-tenant throttling in multi-tenant systems. One tenant’s heavy usage (a large batch import, an aggressive API integration) should not degrade service for others. Each tenant gets their own rate limit key with limits proportional to their service tier.
- Abuse prevention on public endpoints. Login attempts limited to 5 per minute per IP address. API endpoint calls limited by API key. Password reset requests limited to 3 per hour per email. These limits are the first line of defense against automated attacks.
Pattern 4: Distributed Locks for Coordination
When multiple workers or services need exclusive access to a shared resource, Redis provides distributed locking via the SET NX EX pattern — set a key only if it does not already exist, with an automatic expiration time that prevents deadlocks if the lock holder crashes:
import redis
import uuid
import time
class DistributedLock:
def __init__(self, redis_client: redis.Redis, name: str, ttl: int = 30):
self.r = redis_client
self.key = f"lock:{name}"
self.ttl = ttl
self.token = str(uuid.uuid4()) # Unique token prevents releasing others' locks
def acquire(self, timeout: int = 10) -> bool:
"""Attempt to acquire the lock within timeout seconds."""
deadline = time.time() + timeout
while time.time() < deadline:
if self.r.set(self.key, self.token, nx=True, ex=self.ttl):
return True
time.sleep(0.1) # Brief sleep between attempts
return False
def release(self):
"""Release the lock ONLY if we still own it (atomic check-and-delete)."""
lua_script = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
"""
self.r.eval(lua_script, 1, self.key, self.token)
def extend(self, additional_seconds: int = 30) -> bool:
"""Extend the lock TTL if we still own it."""
lua_script = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('expire', KEYS[1], ARGV[2])
else
return 0
end
"""
result = self.r.eval(lua_script, 1, self.key, self.token, additional_seconds)
return result == 1
def __enter__(self):
if not self.acquire():
raise TimeoutError(f"Could not acquire lock: {self.key}")
return self
def __exit__(self, *args):
self.release()
The unique token per lock instance is a critical safety feature. Without it, a slow worker might hold a lock past its TTL, the TTL expires, another worker acquires the lock, and then the first worker finishes and calls release — deleting the second worker’s lock. The Lua script ensures a worker can only release a lock it actually holds, preventing this dangerous race condition.
We use distributed locks for preventing duplicate processing of the same document (lock on document ID), serializing access to APIs that only allow one concurrent request per account, and leader election where one worker in a pool needs to run periodic cleanup tasks.
Pattern 5: Sorted Sets for Ranking and Time-Series
Redis sorted sets maintain members ordered by a floating-point score, with O(log N) insertions and O(log N + M) range queries. This makes them perfect for any ranking, scheduling, or time-series use case:
# Performance tracking: record and query processing times
def record_processing_time(r: redis.Redis, doc_id: str, duration_ms: float):
today = time.strftime('%Y-%m-%d')
key = f"perf:processing:{today}"
r.zadd(key, {doc_id: duration_ms})
r.expire(key, 86400 * 7) # Keep 7 days of data
def get_slowest_documents(r: redis.Redis, date: str, count: int = 10):
return r.zrevrange(f"perf:processing:{date}", 0, count - 1, withscores=True)
def get_p95_processing_time(r: redis.Redis, date: str) -> float:
total = r.zcard(f"perf:processing:{date}")
if total == 0:
return 0.0
index = int(total * 0.95)
result = r.zrange(f"perf:processing:{date}", index, index, withscores=True)
return result[0][1] if result else 0.0
# Delayed job scheduling: sorted set as a priority queue by execution time
def schedule_job(r: redis.Redis, job_data: dict, execute_at: float):
r.zadd('scheduled:jobs', {json.dumps(job_data): execute_at})
def get_due_jobs(r: redis.Redis) -> list[dict]:
now = time.time()
due = r.zrangebyscore('scheduled:jobs', '-inf', now)
jobs = []
for item in due:
r.zrem('scheduled:jobs', item) # Remove atomically
jobs.append(json.loads(item))
return jobs
Operational Considerations for Production Redis
Using Redis beyond caching means Redis becomes a critical infrastructure dependency. Data loss is not just a performance degradation (as with cache loss) — it means lost tasks, dropped locks, and broken rate limits. A few hard-won operational lessons:
Persistence configuration is critical. For queues and locks, enable AOF (Append Only File) persistence with appendfsync everysec. This gives durability within a one-second window — you can lose at most one second of writes on a crash. RDB snapshots alone are not sufficient for queue data because snapshots are taken every few minutes, and losing several minutes of enqueued tasks is unacceptable.
Memory management is non-negotiable. Set maxmemory and choose an appropriate maxmemory-policy. For queue and lock workloads, use noeviction — you want writes to fail loudly with an error rather than silently evict queue data to make room. Monitor memory usage continuously and alert at 70-80% capacity to allow time for investigation before hitting the limit.
High availability with Sentinel or Cluster. A single Redis instance is a single point of failure. Redis Sentinel provides automatic failover with a primary-replica topology. Redis Cluster provides both high availability and horizontal scaling through sharding. For most workloads under 50GB of data, Sentinel with one synchronous replica is sufficient and dramatically simpler to operate than Cluster.
Key naming conventions. Use a consistent hierarchical naming scheme: service:resource:identifier. Examples: queue:documents:processing, ratelimit:api:openai:user123, lock:document:doc-456. This makes monitoring, debugging, and key-space scanning dramatically easier. Run redis-cli --bigkeys periodically to identify keys consuming disproportionate memory.
Conclusion
Redis is a general-purpose data structure server that happens to be excellent at caching. The patterns described here — reliable task queues, Pub/Sub event propagation, distributed rate limiting, coordination locks, and sorted set rankings — replace what would otherwise require three to four separate infrastructure components (a message broker, a rate limiting service, a coordination service, and a time-series store) with a single, well-understood tool that your team already knows how to operate.
The operational simplicity of consolidating these patterns into Redis is the real strategic advantage. Instead of operating Redis plus RabbitMQ plus a custom rate limiter plus a coordination service, you operate Redis. One monitoring dashboard, one alerting configuration, one backup strategy, one set of runbooks, one paging rotation. For engineering teams under 20 people, this simplification is worth more than the theoretical advantages of specialized tools that each solve one problem optimally but collectively create an operational burden that exceeds the team’s capacity to manage well.
Start with the pattern you need most urgently — for most teams, that is either reliable queues (for background job processing) or rate limiting (for external API management). Get comfortable operating Redis in that capacity, add monitoring and alerting, establish your backup and recovery procedures. Then layer on additional patterns as needs arise. Each new pattern is just a new key prefix and a few hundred lines of application code, not a new infrastructure component to provision, monitor, and maintain. That compounding simplicity is Redis’s real superpower.