ORM Middleware for Automatic Query Routing
The SLA risk: your application issues 95% read queries, but without explicit routing they all land on the primary. ORM-level middleware solves this by intercepting every query before pool dispatch, classifying it as a read or write, and forwarding it to the appropriate connection pool. Done correctly, this sits inside the broader connection routing and pooling strategy for your service and gives you per-request consistency controls that a network proxy cannot.
Unlike read/write splitting at the proxy layer, ORM middleware runs in-process, so it can read application context — the current transaction state, a causal-read LSN from a preceding write, or an explicit routing hint set by your business logic — and route accordingly. The trade-off is CPU overhead per query and the need to maintain thread-safe context propagation across every async boundary.
Problem Framing
When replication lag spikes or a replica fails, uncontrolled automatic routing silently routes reads to the primary — or worse, to a stale replica — without the application knowing. The three SLA risks ORM middleware must prevent are: misclassification (a write or SELECT ... FOR UPDATE sent to a read-only replica), stale reads (a read dispatched to a replica before it has applied the preceding write’s WAL), and connection storm (all replicas fail and thousands of requests flood the primary with no backpressure).
This page covers the in-process middleware layer. For a centralized alternative that trades per-request context awareness for operational simplicity, see read/write splitting at the proxy layer.
Concept Definition and Scope
ORM middleware for automatic query routing is an in-process interceptor that sits between your application’s ORM session factory (or connection acquisition path) and the connection pool. It performs three functions in sequence:
- Query classification — is this statement a read or a write?
- Pool selection — given the classification, current transaction state, and any explicit routing hints, which pool should serve this query?
- Context propagation — ensure that routing state (LSN watermarks, sticky hints, circuit-breaker status) is visible to all queries in the same request without leaking across request boundaries.
This is distinct from proxy-layer splitting, which operates at the network layer without access to application context, and from connection pool architecture, which deals with pool sizing and lifecycle rather than routing logic.
Mechanism Deep-Dive
Query Classification
Classification happens before pool acquisition. The two engines are:
Regex classifier — matches the first non-whitespace token of the statement against a priority-ordered rule set. Latency overhead is sub-microsecond but false positives occur for SELECT ... FOR UPDATE, CTEs with write-side effects (WITH ins AS (INSERT ...)), and stored-procedure calls whose bodies the ORM cannot inspect.
AST parser — parses the full SQL statement and walks the syntax tree to detect write nodes, lock hints, and procedure calls. Eliminates the false-positive categories above at the cost of 50–200 µs per parse (cacheable by normalized statement fingerprint).
Hybrid — applies regex first; if the statement matches a known complex pattern (contains FOR UPDATE, PROCEDURE, INTO, or a known procedure prefix), escalates to AST.
[query_classifier]
engine = "hybrid"
# AST escalation triggers — add statement patterns that regex misclassifies
ast_escalation_patterns = [
"FOR\\s+UPDATE",
"FOR\\s+SHARE",
"CALL\\s+",
"\\bINTO\\b",
"WITH\\s+\\w+\\s+AS\\s+\\(\\s*(INSERT|UPDATE|DELETE)"
]
[query_classifier.regex_rules]
# Evaluated in priority order; first match wins
route_replica = ["^\\s*SELECT\\b(?!.*FOR\\s+(UPDATE|SHARE))"]
route_primary = ["^\\s*(INSERT|UPDATE|DELETE|MERGE|TRUNCATE|CALL|EXEC)"]
route_primary = ["^\\s*WITH\\b"] # CTEs default to primary; AST refinesTransaction Boundary Detection
Any query inside an explicit BEGIN / START TRANSACTION block must be pinned to the primary, regardless of its SQL verb. The interceptor checks transaction state from the ORM session before applying the classifier result:
def select_pool(query: str, session_ctx: SessionContext, pools: PoolRegistry) -> Pool:
# Explicit hint always wins
if session_ctx.force_primary:
return pools.primary
# Open transaction — must stay on primary for write serialization
if session_ctx.in_transaction:
return pools.primary
# Causal-read guard: wait for replica to catch up to last-write LSN
if session_ctx.last_write_lsn:
replica = pools.replica_at_or_after(
session_ctx.last_write_lsn,
max_wait_ms=session_ctx.causal_read_timeout_ms,
)
return replica or pools.primary # fallback if no replica caught up in time
# Standard classification
if classify(query) == "READ":
return pools.least_lag_replica()
return pools.primaryRouting Context Propagation
Thread-local or async-local storage scopes routing state to the current request without a global mutex. In Python async code, use contextvars.ContextVar; in Go, pass a context.Context; in Java, use ThreadLocal paired with a servlet filter or Spring HandlerInterceptor that clears state in a finally block.
The pipeline order is strict: Context Propagation → Query Classification → Pool Selection → Execution → Telemetry. Acquiring a routing mutex inside a database transaction causes deadlocks when the transaction holds a row lock while the routing layer waits on topology data.
Trade-Off Comparison Table
| Approach | Routing granularity | Per-query overhead | Consistency controls | Failover control |
|---|---|---|---|---|
| ORM middleware (in-process) | Full request context | 20–250 µs (classifier + pool lookup) | LSN tracking, explicit hints, txn pinning | Application-controlled; can shed load before hitting DB |
| Proxy-layer splitting | Statement-level | ~5 µs (network hop absorbed) | None — proxy cannot see application state | Centralized; requires proxy config change to adjust |
| Dual-URL (app chooses string) | Explicit per call-site | Zero — no interceptor | Application must enforce manually | Per-call-site — no global lever |
Driver-level (e.g. MySQL replicationDriver) |
Statement-level | Sub-millisecond | Limited: read-only flag per connection | Driver restarts required for topology changes |
ORM middleware wins on consistency control at the cost of in-process CPU and the need to maintain context propagation code across every async boundary in your stack.
Configuration Runbook
Dual-Pool Bootstrap (generic / framework-agnostic)
# orm_routing.yaml — canonical bootstrap config
orm_routing:
discovery:
endpoint: "http://topology-service.internal:8080/api/v1/replicas"
refresh_interval_ms: 15000
fallback_cache_ttl_ms: 300000 # serve stale topology for up to 5 min on network partition
defaults:
classifier: "hybrid" # "regex" | "ast_parser" | "hybrid"
max_replication_lag_ms: 500 # replicas lagging beyond this are excluded
silent_fallback_to_primary: false # NEVER silently reroute — log + alert instead
connection_timeout_ms: 2000
causal_read_timeout_ms: 800 # max wait for replica to catch up to write LSN
circuit_breaker:
lag_breach_threshold: 3 # consecutive breaches before OPEN
error_rate_threshold: 0.10 # 10% pool errors trigger OPEN
cooldown_ms: 5000 # time in OPEN before attempting HALF_OPEN
connection_pools:
primary:
min_idle: 5
max_active: 50
connection_timeout_ms: 1000
tcp_keepalive_idle_sec: 60
max_lifetime_sec: 1800 # hard recycle to avoid stale post-failover connections
replicas:
min_idle: 10
max_active: 150
connection_timeout_ms: 500
validation_interval_ms: 10000 # validate on checkout after this idle period
max_lifetime_sec: 1800Django Multi-DB Router
Django’s database router hooks let you implement read/write splitting without modifying any model code. Register the router in settings.py:
# settings.py
DATABASES = {
"default": {
"ENGINE": "django.db.backends.postgresql",
"HOST": "primary.db.internal",
# ... connection params
},
"replica": {
"ENGINE": "django.db.backends.postgresql",
"HOST": "replica.db.internal",
"OPTIONS": {"options": "-c default_transaction_read_only=on"},
# ... connection params
},
}
DATABASE_ROUTERS = ["myapp.routers.ReplicaRouter"]# myapp/routers.py
import threading
_force_primary = threading.local()
class ReplicaRouter:
"""Route reads to replica unless a force-primary context is active."""
def db_for_read(self, model, **hints):
if getattr(_force_primary, "active", False):
return "default"
# Exclude models that require post-write read-your-writes
if getattr(model, "_always_primary", False):
return "default"
return "replica"
def db_for_write(self, model, **hints):
return "default"
def allow_relation(self, obj1, obj2, **hints):
return True
def allow_migrate(self, db, app_label, model_name=None, **hints):
return db == "default"
class force_primary_reads:
"""Context manager: pin all reads to primary within this block."""
def __enter__(self):
_force_primary.active = True
def __exit__(self, *_):
_force_primary.active = FalseUsage in views where read-your-writes consistency is required immediately after a write:
from myapp.routers import force_primary_reads
def checkout(request):
order = Order.objects.create(user=request.user, ...)
with force_primary_reads():
# This SELECT goes to primary — replica may not have applied the INSERT yet
confirmation = Order.objects.get(pk=order.pk)
return render(request, "confirmation.html", {"order": confirmation})SQLAlchemy Routing Session
# sqlalchemy_router.py
from contextvars import ContextVar
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session
_route_hint: ContextVar[str] = ContextVar("route_hint", default="auto")
primary_engine = create_engine("postgresql+psycopg2://primary.db.internal/app", pool_size=10, max_overflow=40)
replica_engine = create_engine("postgresql+psycopg2://replica.db.internal/app", pool_size=20, max_overflow=60,
execution_options={"postgresql_readonly": True})
class RoutingSession(Session):
def get_bind(self, mapper=None, clause=None, **kwargs):
hint = _route_hint.get()
if hint == "primary":
return primary_engine
# Write statements or open transactions always go primary
if self.in_transaction() or (clause is not None and clause.is_dml):
return primary_engine
return replica_engine
RoutingSessionFactory = sessionmaker(class_=RoutingSession)Spring Data JPA — AbstractRoutingDataSource
// RoutingDataSource.java
public class RoutingDataSource extends AbstractRoutingDataSource {
@Override
protected Object determineCurrentLookupKey() {
return RoutingContext.isReadOnly() ? "replica" : "primary";
}
}
// RoutingContext.java (ThreadLocal-backed)
public final class RoutingContext {
private static final ThreadLocal<Boolean> READ_ONLY = ThreadLocal.withInitial(() -> false);
public static void setReadOnly(boolean readOnly) { READ_ONLY.set(readOnly); }
public static boolean isReadOnly() { return READ_ONLY.get(); }
public static void clear() { READ_ONLY.remove(); } // call in filter finally-block
}
// ReadOnlyInterceptor.java — marks @Transactional(readOnly=true) methods
@Aspect
@Component
public class ReadOnlyInterceptor {
@Around("@annotation(transactional)")
public Object route(ProceedingJoinPoint pjp, Transactional transactional) throws Throwable {
try {
RoutingContext.setReadOnly(transactional.readOnly());
return pjp.proceed();
} finally {
RoutingContext.clear();
}
}
}Monitoring and Alerting Signals
Routing decisions are opaque without structured telemetry. Emit a structured log entry for every routing decision; sample at 10% under normal load, 100% when the circuit breaker is not in CLOSED state.
Key metrics to expose:
orm_routing_decisions_total{destination="primary|replica", reason="write|transaction|hint|lag_fallback|circuit_open"}— counts by routing outcomeorm_routing_replica_lag_ms{replica_host}— current lag per replica at decision timeorm_routing_causal_wait_ms— histogram of how long reads blocked waiting for a replica to catch up to a write LSNorm_pool_checkout_ms{pool="primary|replica"}— connection acquisition latency; p99 above 50 ms usually signals exhaustionorm_circuit_breaker_state{pool="replica"}— 0=CLOSED, 1=OPEN, 2=HALF_OPEN
Prometheus alert rules:
groups:
- name: orm_routing
rules:
- alert: ReplicaRoutingCircuitOpen
expr: orm_circuit_breaker_state{pool="replica"} == 1
for: 60s
labels:
severity: warning
annotations:
summary: "ORM replica circuit breaker is OPEN — all reads routing to primary"
- alert: StaleReadRisk
expr: histogram_quantile(0.95, orm_routing_replica_lag_ms) > 500
for: 2m
labels:
severity: warning
annotations:
summary: "P95 replica lag at routing time exceeds 500 ms — stale read risk"
- alert: CausalReadTimeoutSpike
expr: histogram_quantile(0.99, rate(orm_routing_causal_wait_ms_bucket[5m])) > 800
for: 3m
labels:
severity: critical
annotations:
summary: "Causal reads blocking for >800 ms — replica catching-up latency unacceptable"
- alert: ReplicaPoolNearExhaustion
expr: orm_pool_checkout_ms{pool="replica",quantile="0.99"} > 50
for: 5m
labels:
severity: warning
annotations:
summary: "Replica connection pool p99 checkout >50 ms — nearing exhaustion"Database-side correlation queries (PostgreSQL):
-- Replication lag per replica at the database level
SELECT
client_addr,
state,
EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp())) AS replay_lag_sec,
pg_wal_lsn_diff(sent_lsn, replay_lsn) AS lsn_behind_bytes
FROM pg_stat_replication
ORDER BY replay_lag_sec DESC;
-- Active connections broken down by application and state
SELECT
datname,
usename,
application_name,
state,
COUNT(*)
FROM pg_stat_activity
WHERE datname = current_database()
GROUP BY 1,2,3,4
ORDER BY COUNT(*) DESC;Failure Modes and Recovery Steps
1. Misrouted write to a read-only replica
Root cause: The classifier matched a CTE containing an INSERT as a read (WITH data AS (INSERT INTO ...) SELECT ...), or a stored procedure call was regex-classified as a SELECT.
Symptom: ERROR: cannot execute INSERT in a read-only transaction (PostgreSQL) or ERROR 1290: The MySQL server is running with the --read-only option (MySQL).
Recovery:
- Add the statement pattern to
ast_escalation_patternsso the hybrid classifier escalates to AST. - For stored procedures, maintain an explicit deny-list:
route_primary_procedures = ["proc_name_1", "proc_name_2"]. - As an immediate mitigation, set
silent_fallback_to_primary: truetemporarily — but alert on every fallback so you can identify and fix classifiers.
[query_classifier]
# Immediate patch: explicit procedure deny-list
route_primary_procedures = ["update_inventory", "settle_payment"]
ast_escalation_patterns = [
"WITH\\s+\\w+\\s+AS\\s+\\(\\s*(INSERT|UPDATE|DELETE)"
]2. Stale read after write (read-your-writes violation)
Root cause: A write completed on the primary, the ORM immediately issued a SELECT, and replication lag caused the replica to serve a snapshot that didn’t yet include the write.
Symptom: Application shows outdated data immediately after a mutation; often surfaces as “order placed but not visible in order list” bugs.
Recovery:
- Capture the primary’s LSN immediately after each write:
SELECT pg_current_wal_lsn()(PostgreSQL) orSELECT @@gtid_executed(MySQL GTID). - Store it in
session_ctx.last_write_lsn. - Before routing the next read, check
SELECT pg_last_wal_replay_lsn() >= $1on each candidate replica; route only to those that return true, or fall back to primary withincausal_read_timeout_ms. - For critical flows, use
force_primary_readscontext manager (see Django example above) rather than the LSN polling path.
3. Connection storm when all replicas fail
Root cause: All replicas become unhealthy simultaneously (failover, network partition). Without a circuit breaker, every request attempts to acquire a replica connection, times out, and retries — amplifying primary load.
Symptom: Primary connection pool exhaustion; FATAL: remaining connection slots are reserved (PostgreSQL); request queue depth spikes.
Recovery:
- The circuit breaker must trip after
lag_breach_thresholdconsecutive failures and enterOPENstate, routing all reads to primary immediately. - Implement request shedding at the backpressure level: if primary pool utilization exceeds 90%, begin rejecting non-critical read requests with HTTP 503 rather than queuing them.
- After
cooldown_ms, transition toHALF_OPENand probe one replica; if healthy, return toCLOSED.
circuit_breaker:
lag_breach_threshold: 3
error_rate_threshold: 0.10
cooldown_ms: 5000
backpressure:
primary_pool_reject_threshold: 0.90
reject_response: "HTTP 503 / DB_OVERLOADED"4. Routing context leak across async boundaries
Root cause: Thread-local routing state (e.g., force_primary=True) set in one coroutine or thread is read by a different coroutine that inherited the context object.
Symptom: Reads that should go to the replica unexpectedly hit the primary (silent correctness issue, not an error), discovered only by examining orm_routing_decisions_total label counts.
Recovery:
- Use language-native scoped context primitives:
contextvars.ContextVarin Python async,context.Contextwith explicitWithValuein Go,ThreadLocalcleared in a servlet filterfinallyblock in Java. - Add an assertion in your test suite: after an async boundary (e.g.,
await, goroutine spawn), verify that_route_hint.get()returns the default. - Audit every background-task entry point and message-consumer handler to confirm routing context is initialized from headers, not inherited from the producer.
5. Stale connections post-replica failover
Root cause: After a replica restarts or is replaced, the connection pool holds open TCP connections to the old address. Queries sent on these connections fail with ERROR: connection refused or SSL connection has been closed unexpectedly.
Symptom: Burst of connection errors immediately after a replica failover event; errors clear after max_lifetime_sec as pool recycles connections naturally.
Recovery:
- Trigger an immediate pool flush on topology change events from your discovery endpoint.
- Set
max_lifetime_sec: 1800andvalidation_interval_ms: 10000— connections idle longer than the validation interval are tested before use. - Wrap pool acquisition in a retry with jitter (max 2 retries) before escalating to the primary.
Related Pages
- How to implement read/write splitting in Spring Data JPA — step-by-step
AbstractRoutingDataSourcesetup with@Transactional(readOnly=true)wiring and connection-pool validation.
FAQ
When should I use ORM-level routing vs a proxy like ProxySQL?
ORM-level routing is the right choice when you need per-request routing hints, transaction-scoped consistency overrides, or lag-aware selection that depends on application context. Proxies centralise routing with lower per-query CPU overhead but cannot see request-level signals like a causal-read LSN or a user-session sticky hint. High-throughput systems often combine both: a proxy for baseline read/write splitting, and ORM middleware for consistency overrides.
How do I prevent stale reads after a write?
Track the primary’s last committed LSN in a request-scoped variable immediately after each write. On the next read, the ORM middleware checks each candidate replica’s replay LSN and only routes there once it has caught up, or falls back to the primary within a configurable max_wait_for_sync_ms window. See the “Stale read after write” failure mode above for the specific SQL.
What happens to routing when all replicas are lagging?
The circuit breaker trips after a configurable number of consecutive lag-threshold breaches and redirects all reads to the primary with an exponential backoff cooldown before retrying replicas. Never queue reads indefinitely — shed load or return a controlled error rather than exhausting the primary connection pool.
Can I use ORM middleware with async frameworks like FastAPI or async Django?
Yes, but context propagation requires language-native async context primitives. In Python, use contextvars.ContextVar rather than threading.local() — thread-local variables are not propagated across await boundaries in async code. Initialize the ContextVar at the start of each request handler and reset it in a middleware finally block.
How do I test that routing decisions are correct?
Use your ORM’s connection hook to record which pool each query used. In Django, override execute() on the database wrapper. Assert in integration tests (against a real replica) that SELECT statements hit the replica pool and INSERT/UPDATE/DELETE hit the primary. Check pg_stat_activity.application_name on each node to confirm the test connection lands where expected.
Related
← Back to Connection Routing & Pooling Strategies
- Connection Pool Architecture for Read Replicas — dual-pool sizing, idle recycle intervals, and validation strategies that ORM middleware depends on for stable pool selection.
- Implementing Read/Write Splitting at the Proxy Layer — how ProxySQL and similar tools handle routing at the network level, and when to combine them with in-process ORM middleware.
- Managing Sticky Sessions in Distributed Database Reads — session-affinity strategies that complement ORM routing when causal-read LSN tracking is not sufficient.
- Replication Lag & Consistency Management — understanding replication lag measurement, thresholds, and fallback strategies that feed the lag-aware selection logic inside ORM middleware.
- Routing Queries Based on Data Freshness Requirements — data-freshness SLA design patterns that define the
max_replication_lag_msvalues your ORM middleware should enforce.