ORM Middleware for Automatic Query Routing

The SLA risk: your application issues 95% read queries, but without explicit routing they all land on the primary. ORM-level middleware solves this by intercepting every query before pool dispatch, classifying it as a read or write, and forwarding it to the appropriate connection pool. Done correctly, this sits inside the broader connection routing and pooling strategy for your service and gives you per-request consistency controls that a network proxy cannot.

Unlike read/write splitting at the proxy layer, ORM middleware runs in-process, so it can read application context — the current transaction state, a causal-read LSN from a preceding write, or an explicit routing hint set by your business logic — and route accordingly. The trade-off is CPU overhead per query and the need to maintain thread-safe context propagation across every async boundary.

ORM Middleware Routing Pipeline Diagram showing the in-process routing pipeline from application code through a query interceptor and classifier, branching to either the primary connection pool (writes/transactions) or the replica connection pool (reads), with a lag monitor feeding the replica selection step. Application ORM session call Query Interceptor AST / regex classify txn boundary detect hint extraction Primary Pool writes · transactions forced-primary reads Replica Pool read queries lag-aware selection Lag monitor / topology WRITE / TXN READ Primary DB Replica DB

Problem Framing

When replication lag spikes or a replica fails, uncontrolled automatic routing silently routes reads to the primary — or worse, to a stale replica — without the application knowing. The three SLA risks ORM middleware must prevent are: misclassification (a write or SELECT ... FOR UPDATE sent to a read-only replica), stale reads (a read dispatched to a replica before it has applied the preceding write’s WAL), and connection storm (all replicas fail and thousands of requests flood the primary with no backpressure).

This page covers the in-process middleware layer. For a centralized alternative that trades per-request context awareness for operational simplicity, see read/write splitting at the proxy layer.

Concept Definition and Scope

ORM middleware for automatic query routing is an in-process interceptor that sits between your application’s ORM session factory (or connection acquisition path) and the connection pool. It performs three functions in sequence:

  1. Query classification — is this statement a read or a write?
  2. Pool selection — given the classification, current transaction state, and any explicit routing hints, which pool should serve this query?
  3. Context propagation — ensure that routing state (LSN watermarks, sticky hints, circuit-breaker status) is visible to all queries in the same request without leaking across request boundaries.

This is distinct from proxy-layer splitting, which operates at the network layer without access to application context, and from connection pool architecture, which deals with pool sizing and lifecycle rather than routing logic.

Mechanism Deep-Dive

Query Classification

Classification happens before pool acquisition. The two engines are:

Regex classifier — matches the first non-whitespace token of the statement against a priority-ordered rule set. Latency overhead is sub-microsecond but false positives occur for SELECT ... FOR UPDATE, CTEs with write-side effects (WITH ins AS (INSERT ...)), and stored-procedure calls whose bodies the ORM cannot inspect.

AST parser — parses the full SQL statement and walks the syntax tree to detect write nodes, lock hints, and procedure calls. Eliminates the false-positive categories above at the cost of 50–200 µs per parse (cacheable by normalized statement fingerprint).

Hybrid — applies regex first; if the statement matches a known complex pattern (contains FOR UPDATE, PROCEDURE, INTO, or a known procedure prefix), escalates to AST.

toml
[query_classifier]
engine = "hybrid"
# AST escalation triggers — add statement patterns that regex misclassifies
ast_escalation_patterns = [
  "FOR\\s+UPDATE",
  "FOR\\s+SHARE",
  "CALL\\s+",
  "\\bINTO\\b",
  "WITH\\s+\\w+\\s+AS\\s+\\(\\s*(INSERT|UPDATE|DELETE)"
]

[query_classifier.regex_rules]
# Evaluated in priority order; first match wins
route_replica  = ["^\\s*SELECT\\b(?!.*FOR\\s+(UPDATE|SHARE))"]
route_primary  = ["^\\s*(INSERT|UPDATE|DELETE|MERGE|TRUNCATE|CALL|EXEC)"]
route_primary  = ["^\\s*WITH\\b"]      # CTEs default to primary; AST refines

Transaction Boundary Detection

Any query inside an explicit BEGIN / START TRANSACTION block must be pinned to the primary, regardless of its SQL verb. The interceptor checks transaction state from the ORM session before applying the classifier result:

python
def select_pool(query: str, session_ctx: SessionContext, pools: PoolRegistry) -> Pool:
    # Explicit hint always wins
    if session_ctx.force_primary:
        return pools.primary

    # Open transaction — must stay on primary for write serialization
    if session_ctx.in_transaction:
        return pools.primary

    # Causal-read guard: wait for replica to catch up to last-write LSN
    if session_ctx.last_write_lsn:
        replica = pools.replica_at_or_after(
            session_ctx.last_write_lsn,
            max_wait_ms=session_ctx.causal_read_timeout_ms,
        )
        return replica or pools.primary   # fallback if no replica caught up in time

    # Standard classification
    if classify(query) == "READ":
        return pools.least_lag_replica()

    return pools.primary

Routing Context Propagation

Thread-local or async-local storage scopes routing state to the current request without a global mutex. In Python async code, use contextvars.ContextVar; in Go, pass a context.Context; in Java, use ThreadLocal paired with a servlet filter or Spring HandlerInterceptor that clears state in a finally block.

The pipeline order is strict: Context Propagation → Query Classification → Pool Selection → Execution → Telemetry. Acquiring a routing mutex inside a database transaction causes deadlocks when the transaction holds a row lock while the routing layer waits on topology data.

Trade-Off Comparison Table

Approach Routing granularity Per-query overhead Consistency controls Failover control
ORM middleware (in-process) Full request context 20–250 µs (classifier + pool lookup) LSN tracking, explicit hints, txn pinning Application-controlled; can shed load before hitting DB
Proxy-layer splitting Statement-level ~5 µs (network hop absorbed) None — proxy cannot see application state Centralized; requires proxy config change to adjust
Dual-URL (app chooses string) Explicit per call-site Zero — no interceptor Application must enforce manually Per-call-site — no global lever
Driver-level (e.g. MySQL replicationDriver) Statement-level Sub-millisecond Limited: read-only flag per connection Driver restarts required for topology changes

ORM middleware wins on consistency control at the cost of in-process CPU and the need to maintain context propagation code across every async boundary in your stack.

Configuration Runbook

Dual-Pool Bootstrap (generic / framework-agnostic)

yaml
# orm_routing.yaml — canonical bootstrap config
orm_routing:
  discovery:
    endpoint: "http://topology-service.internal:8080/api/v1/replicas"
    refresh_interval_ms: 15000
    fallback_cache_ttl_ms: 300000   # serve stale topology for up to 5 min on network partition

  defaults:
    classifier: "hybrid"            # "regex" | "ast_parser" | "hybrid"
    max_replication_lag_ms: 500     # replicas lagging beyond this are excluded
    silent_fallback_to_primary: false  # NEVER silently reroute — log + alert instead
    connection_timeout_ms: 2000
    causal_read_timeout_ms: 800     # max wait for replica to catch up to write LSN

  circuit_breaker:
    lag_breach_threshold: 3         # consecutive breaches before OPEN
    error_rate_threshold: 0.10      # 10% pool errors trigger OPEN
    cooldown_ms: 5000               # time in OPEN before attempting HALF_OPEN

connection_pools:
  primary:
    min_idle: 5
    max_active: 50
    connection_timeout_ms: 1000
    tcp_keepalive_idle_sec: 60
    max_lifetime_sec: 1800          # hard recycle to avoid stale post-failover connections
  replicas:
    min_idle: 10
    max_active: 150
    connection_timeout_ms: 500
    validation_interval_ms: 10000   # validate on checkout after this idle period
    max_lifetime_sec: 1800

Django Multi-DB Router

Django’s database router hooks let you implement read/write splitting without modifying any model code. Register the router in settings.py:

python
# settings.py
DATABASES = {
    "default": {
        "ENGINE": "django.db.backends.postgresql",
        "HOST": "primary.db.internal",
        # ... connection params
    },
    "replica": {
        "ENGINE": "django.db.backends.postgresql",
        "HOST": "replica.db.internal",
        "OPTIONS": {"options": "-c default_transaction_read_only=on"},
        # ... connection params
    },
}

DATABASE_ROUTERS = ["myapp.routers.ReplicaRouter"]
python
# myapp/routers.py
import threading

_force_primary = threading.local()

class ReplicaRouter:
    """Route reads to replica unless a force-primary context is active."""

    def db_for_read(self, model, **hints):
        if getattr(_force_primary, "active", False):
            return "default"
        # Exclude models that require post-write read-your-writes
        if getattr(model, "_always_primary", False):
            return "default"
        return "replica"

    def db_for_write(self, model, **hints):
        return "default"

    def allow_relation(self, obj1, obj2, **hints):
        return True

    def allow_migrate(self, db, app_label, model_name=None, **hints):
        return db == "default"


class force_primary_reads:
    """Context manager: pin all reads to primary within this block."""
    def __enter__(self):
        _force_primary.active = True
    def __exit__(self, *_):
        _force_primary.active = False

Usage in views where read-your-writes consistency is required immediately after a write:

python
from myapp.routers import force_primary_reads

def checkout(request):
    order = Order.objects.create(user=request.user, ...)
    with force_primary_reads():
        # This SELECT goes to primary — replica may not have applied the INSERT yet
        confirmation = Order.objects.get(pk=order.pk)
    return render(request, "confirmation.html", {"order": confirmation})

SQLAlchemy Routing Session

python
# sqlalchemy_router.py
from contextvars import ContextVar
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, Session

_route_hint: ContextVar[str] = ContextVar("route_hint", default="auto")

primary_engine = create_engine("postgresql+psycopg2://primary.db.internal/app", pool_size=10, max_overflow=40)
replica_engine = create_engine("postgresql+psycopg2://replica.db.internal/app", pool_size=20, max_overflow=60,
                               execution_options={"postgresql_readonly": True})

class RoutingSession(Session):
    def get_bind(self, mapper=None, clause=None, **kwargs):
        hint = _route_hint.get()
        if hint == "primary":
            return primary_engine
        # Write statements or open transactions always go primary
        if self.in_transaction() or (clause is not None and clause.is_dml):
            return primary_engine
        return replica_engine

RoutingSessionFactory = sessionmaker(class_=RoutingSession)

Spring Data JPA — AbstractRoutingDataSource

java
// RoutingDataSource.java
public class RoutingDataSource extends AbstractRoutingDataSource {

    @Override
    protected Object determineCurrentLookupKey() {
        return RoutingContext.isReadOnly() ? "replica" : "primary";
    }
}

// RoutingContext.java  (ThreadLocal-backed)
public final class RoutingContext {
    private static final ThreadLocal<Boolean> READ_ONLY = ThreadLocal.withInitial(() -> false);

    public static void setReadOnly(boolean readOnly) { READ_ONLY.set(readOnly); }
    public static boolean isReadOnly() { return READ_ONLY.get(); }
    public static void clear() { READ_ONLY.remove(); }   // call in filter finally-block
}

// ReadOnlyInterceptor.java — marks @Transactional(readOnly=true) methods
@Aspect
@Component
public class ReadOnlyInterceptor {
    @Around("@annotation(transactional)")
    public Object route(ProceedingJoinPoint pjp, Transactional transactional) throws Throwable {
        try {
            RoutingContext.setReadOnly(transactional.readOnly());
            return pjp.proceed();
        } finally {
            RoutingContext.clear();
        }
    }
}

Monitoring and Alerting Signals

Routing decisions are opaque without structured telemetry. Emit a structured log entry for every routing decision; sample at 10% under normal load, 100% when the circuit breaker is not in CLOSED state.

Key metrics to expose:

  • orm_routing_decisions_total{destination="primary|replica", reason="write|transaction|hint|lag_fallback|circuit_open"} — counts by routing outcome
  • orm_routing_replica_lag_ms{replica_host} — current lag per replica at decision time
  • orm_routing_causal_wait_ms — histogram of how long reads blocked waiting for a replica to catch up to a write LSN
  • orm_pool_checkout_ms{pool="primary|replica"} — connection acquisition latency; p99 above 50 ms usually signals exhaustion
  • orm_circuit_breaker_state{pool="replica"} — 0=CLOSED, 1=OPEN, 2=HALF_OPEN

Prometheus alert rules:

yaml
groups:
  - name: orm_routing
    rules:
      - alert: ReplicaRoutingCircuitOpen
        expr: orm_circuit_breaker_state{pool="replica"} == 1
        for: 60s
        labels:
          severity: warning
        annotations:
          summary: "ORM replica circuit breaker is OPEN — all reads routing to primary"

      - alert: StaleReadRisk
        expr: histogram_quantile(0.95, orm_routing_replica_lag_ms) > 500
        for: 2m
        labels:
          severity: warning
        annotations:
          summary: "P95 replica lag at routing time exceeds 500 ms — stale read risk"

      - alert: CausalReadTimeoutSpike
        expr: histogram_quantile(0.99, rate(orm_routing_causal_wait_ms_bucket[5m])) > 800
        for: 3m
        labels:
          severity: critical
        annotations:
          summary: "Causal reads blocking for >800 ms — replica catching-up latency unacceptable"

      - alert: ReplicaPoolNearExhaustion
        expr: orm_pool_checkout_ms{pool="replica",quantile="0.99"} > 50
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "Replica connection pool p99 checkout >50 ms — nearing exhaustion"

Database-side correlation queries (PostgreSQL):

sql
-- Replication lag per replica at the database level
SELECT
    client_addr,
    state,
    EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp())) AS replay_lag_sec,
    pg_wal_lsn_diff(sent_lsn, replay_lsn) AS lsn_behind_bytes
FROM pg_stat_replication
ORDER BY replay_lag_sec DESC;

-- Active connections broken down by application and state
SELECT
    datname,
    usename,
    application_name,
    state,
    COUNT(*)
FROM pg_stat_activity
WHERE datname = current_database()
GROUP BY 1,2,3,4
ORDER BY COUNT(*) DESC;

Failure Modes and Recovery Steps

1. Misrouted write to a read-only replica

Root cause: The classifier matched a CTE containing an INSERT as a read (WITH data AS (INSERT INTO ...) SELECT ...), or a stored procedure call was regex-classified as a SELECT.

Symptom: ERROR: cannot execute INSERT in a read-only transaction (PostgreSQL) or ERROR 1290: The MySQL server is running with the --read-only option (MySQL).

Recovery:

  1. Add the statement pattern to ast_escalation_patterns so the hybrid classifier escalates to AST.
  2. For stored procedures, maintain an explicit deny-list: route_primary_procedures = ["proc_name_1", "proc_name_2"].
  3. As an immediate mitigation, set silent_fallback_to_primary: true temporarily — but alert on every fallback so you can identify and fix classifiers.
toml
[query_classifier]
# Immediate patch: explicit procedure deny-list
route_primary_procedures = ["update_inventory", "settle_payment"]
ast_escalation_patterns = [
  "WITH\\s+\\w+\\s+AS\\s+\\(\\s*(INSERT|UPDATE|DELETE)"
]

2. Stale read after write (read-your-writes violation)

Root cause: A write completed on the primary, the ORM immediately issued a SELECT, and replication lag caused the replica to serve a snapshot that didn’t yet include the write.

Symptom: Application shows outdated data immediately after a mutation; often surfaces as “order placed but not visible in order list” bugs.

Recovery:

  1. Capture the primary’s LSN immediately after each write: SELECT pg_current_wal_lsn() (PostgreSQL) or SELECT @@gtid_executed (MySQL GTID).
  2. Store it in session_ctx.last_write_lsn.
  3. Before routing the next read, check SELECT pg_last_wal_replay_lsn() >= $1 on each candidate replica; route only to those that return true, or fall back to primary within causal_read_timeout_ms.
  4. For critical flows, use force_primary_reads context manager (see Django example above) rather than the LSN polling path.

3. Connection storm when all replicas fail

Root cause: All replicas become unhealthy simultaneously (failover, network partition). Without a circuit breaker, every request attempts to acquire a replica connection, times out, and retries — amplifying primary load.

Symptom: Primary connection pool exhaustion; FATAL: remaining connection slots are reserved (PostgreSQL); request queue depth spikes.

Recovery:

  1. The circuit breaker must trip after lag_breach_threshold consecutive failures and enter OPEN state, routing all reads to primary immediately.
  2. Implement request shedding at the backpressure level: if primary pool utilization exceeds 90%, begin rejecting non-critical read requests with HTTP 503 rather than queuing them.
  3. After cooldown_ms, transition to HALF_OPEN and probe one replica; if healthy, return to CLOSED.
yaml
circuit_breaker:
  lag_breach_threshold: 3
  error_rate_threshold: 0.10
  cooldown_ms: 5000
  backpressure:
    primary_pool_reject_threshold: 0.90
    reject_response: "HTTP 503 / DB_OVERLOADED"

4. Routing context leak across async boundaries

Root cause: Thread-local routing state (e.g., force_primary=True) set in one coroutine or thread is read by a different coroutine that inherited the context object.

Symptom: Reads that should go to the replica unexpectedly hit the primary (silent correctness issue, not an error), discovered only by examining orm_routing_decisions_total label counts.

Recovery:

  1. Use language-native scoped context primitives: contextvars.ContextVar in Python async, context.Context with explicit WithValue in Go, ThreadLocal cleared in a servlet filter finally block in Java.
  2. Add an assertion in your test suite: after an async boundary (e.g., await, goroutine spawn), verify that _route_hint.get() returns the default.
  3. Audit every background-task entry point and message-consumer handler to confirm routing context is initialized from headers, not inherited from the producer.

5. Stale connections post-replica failover

Root cause: After a replica restarts or is replaced, the connection pool holds open TCP connections to the old address. Queries sent on these connections fail with ERROR: connection refused or SSL connection has been closed unexpectedly.

Symptom: Burst of connection errors immediately after a replica failover event; errors clear after max_lifetime_sec as pool recycles connections naturally.

Recovery:

  1. Trigger an immediate pool flush on topology change events from your discovery endpoint.
  2. Set max_lifetime_sec: 1800 and validation_interval_ms: 10000 — connections idle longer than the validation interval are tested before use.
  3. Wrap pool acquisition in a retry with jitter (max 2 retries) before escalating to the primary.

FAQ

When should I use ORM-level routing vs a proxy like ProxySQL?

ORM-level routing is the right choice when you need per-request routing hints, transaction-scoped consistency overrides, or lag-aware selection that depends on application context. Proxies centralise routing with lower per-query CPU overhead but cannot see request-level signals like a causal-read LSN or a user-session sticky hint. High-throughput systems often combine both: a proxy for baseline read/write splitting, and ORM middleware for consistency overrides.

How do I prevent stale reads after a write?

Track the primary’s last committed LSN in a request-scoped variable immediately after each write. On the next read, the ORM middleware checks each candidate replica’s replay LSN and only routes there once it has caught up, or falls back to the primary within a configurable max_wait_for_sync_ms window. See the “Stale read after write” failure mode above for the specific SQL.

What happens to routing when all replicas are lagging?

The circuit breaker trips after a configurable number of consecutive lag-threshold breaches and redirects all reads to the primary with an exponential backoff cooldown before retrying replicas. Never queue reads indefinitely — shed load or return a controlled error rather than exhausting the primary connection pool.

Can I use ORM middleware with async frameworks like FastAPI or async Django?

Yes, but context propagation requires language-native async context primitives. In Python, use contextvars.ContextVar rather than threading.local() — thread-local variables are not propagated across await boundaries in async code. Initialize the ContextVar at the start of each request handler and reset it in a middleware finally block.

How do I test that routing decisions are correct?

Use your ORM’s connection hook to record which pool each query used. In Django, override execute() on the database wrapper. Assert in integration tests (against a real replica) that SELECT statements hit the replica pool and INSERT/UPDATE/DELETE hit the primary. Check pg_stat_activity.application_name on each node to confirm the test connection lands where expected.


← Back to Connection Routing & Pooling Strategies