Building A Backpressure-Aware Real-Time Feature Store Using Postgres Logical Replication And Kafka
Written by
Sage Stream
The problem I ran into: “real-time” that wasn’t
I built what I thought was a clean real-time analytics pipeline: events flowed into Kafka, a consumer computed features, and a model service read those features to answer “what’s happening right now?” questions.
Then I watched dashboards closely during traffic spikes and noticed something uncomfortable:
- The events were still streaming.
- But the features applied to analytics lags grew rapidly.
- Eventually, the system “caught up,” but the lag spikes caused stale results and confusing observability.
The root cause wasn’t the math—it was backpressure. In plain terms, backpressure is what happens when the system can’t keep up with incoming work, so data piles up instead of being processed at the pace real-time analytics needs.
This post is the specific thing I ended up building: a backpressure-aware real-time feature store that uses Postgres logical replication to stream changes, Kafka to buffer them, and a lag-based throttling strategy so feature writes stay within a tight freshness budget.
What I built (high level)
Components
- Postgres holds “raw events” (for example, user actions).
- Logical replication streams those changes out of Postgres as a continuous feed.
- Kafka provides the buffer between ingestion and processing.
- A Python consumer computes features and writes them to a feature table.
- The consumer applies backpressure-aware throttling based on Kafka consumer lag.
Why logical replication?
I wanted to avoid polling (“SELECT every N seconds”) because polling creates either:
- unnecessary load (too frequent), or
- delayed updates (too infrequent).
Logical replication streams changes efficiently and near-continuously.
Step 1: Enable logical replication in Postgres
I used PostgreSQL’s built-in logical replication mechanism (replication slots + a publication).
1) Set wal_level and create a publication
In postgresql.conf, ensure:
wal_level = logical max_replication_slots = 10 max_wal_senders = 10
Then restart Postgres.
Now create a publication for the events table:
-- run in psql CREATE TABLE IF NOT EXISTS public.user_events ( event_id bigserial PRIMARY KEY, user_id bigint NOT NULL, event_type text NOT NULL, occurred_at timestamptz NOT NULL DEFAULT now() ); -- Only replicate inserts for this example CREATE PUBLICATION user_events_pub FOR TABLE public.user_events;
Step 2: Stream changes into Kafka
There are a few ways to do this (Debezium, custom CDC, etc.). I used Debezium because it’s production-friendly and handles the “change event” plumbing.
Debezium source configuration (Kafka Connect)
This is the conceptual Kafka Connect setup for Postgres CDC to Kafka. You run Kafka Connect with the Debezium Postgres connector.
Here’s a working-style example (you’d replace hostnames/credentials and adjust topics):
{ "name": "postgres-user-events", "config": { "connector.class": "io.debezium.connector.postgresql.PostgresConnector", "database.hostname": "postgres", "database.port": "5432", "database.user": "postgres", "database.password": "postgres", "database.dbname": "mydb", "plugin.name": "pgoutput", "publication.name": "user_events_pub", "slot.name": "user_events_slot", "topic.prefix": "cdc", "table.include.list": "public.user_events", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "false", "value.converter.schemas.enable": "false", "include.schema.changes": "false" } }
Debezium will emit messages to a topic like:
cdc.mydb.public.user_events
Each message contains an event type (insert/update/delete) and the row fields.
Step 3: Define the feature table
I created a tiny “real-time feature store” table in Postgres. For this example, I compute a simple but useful feature:
events_last_10_seconds: count of events per user in the last 10 secondslast_event_type: most recent event typefeatures_updated_at: when features were written (freshness signal)
CREATE TABLE IF NOT EXISTS public.user_features ( user_id bigint PRIMARY KEY, events_last_10_seconds integer NOT NULL DEFAULT 0, last_event_type text, features_updated_at timestamptz NOT NULL DEFAULT now() );
Step 4: Backpressure-aware consumer (the core of the build)
This is where the real-time behavior stopped being “best effort” and started being measurable.
Key idea
- I continuously compute Kafka consumer lag (how far behind we are in partition offsets).
- If lag exceeds a threshold, I throttle feature computation/writes.
- If lag is low, I process normally.
Backpressure doesn’t mean “stop processing forever”—it means “process at a rate your system can safely sustain without exploding lag.”
Working Python example
This Python consumer:
- Reads Debezium CDC events from Kafka
- Tracks timestamps per user to compute “last 10 seconds” counts
- Uses lag to throttle writes
- Writes features to Postgres
This code is intentionally small but fully working as a starting point. It assumes you have:
- Kafka running
- a topic receiving CDC inserts
kafka-pythonandpsycopg2-binaryinstalled
Install dependencies:
pip install kafka-python psycopg2-binary
consumer.py
import json import time from collections import defaultdict, deque import psycopg2 from kafka import KafkaConsumer KAFKA_BOOTSTRAP = "localhost:9092" TOPIC = "cdc.mydb.public.user_events" POSTGRES_DSN = "postgresql://postgres:postgres@localhost:5432/mydb" # --- Backpressure knobs --- # Lag threshold in "messages behind" before we slow down. LAG_HIGH_WATERMARK = 5000 # When lag is small, we process at full speed. LAG_LOW_WATERMARK = 500 # Throttle parameters SLEEP_WHEN_LAG_HIGH = 0.25 # seconds SLEEP_WHEN_LAG_MEDIUM = 0.05 # seconds # Feature window WINDOW_SECONDS = 10 def compute_sleep_time(total_lag: int) -> float: """ Simple lag-based throttling. """ if total_lag >= LAG_HIGH_WATERMARK: return SLEEP_WHEN_LAG_HIGH if total_lag >= LAG_LOW_WATERMARK: return SLEEP_WHEN_LAG_MEDIUM return 0.0 def safe_extract_insert_payload(msg_value: dict) -> dict | None: """ Debezium messages usually look like: { "schema": {...}, "payload": { "op": "c", "after": { ...row fields... } } } """ payload = msg_value.get("payload", {}) op = payload.get("op") if op != "c": # c = create/insert return None return payload.get("after") def main(): consumer = KafkaConsumer( TOPIC, bootstrap_servers=KAFKA_BOOTSTRAP, enable_auto_commit=False, auto_offset_reset="earliest", value_deserializer=lambda v: json.loads(v.decode("utf-8")), # group_id controls how partitions are assigned group_id="feature-store-consumer", ) conn = psycopg2.connect(POSTGRES_DSN) conn.autocommit = True cur = conn.cursor() # For the 10-second sliding window, keep per-user event timestamps. user_events = defaultdict(deque) # Track last seen partition offsets so we can commit only what we processed. processed_offsets = {} try: while True: records = consumer.poll(timeout_ms=1000, max_records=500) if not records: continue # ---- Backpressure: compute lag before doing expensive work ---- # kafka-python can estimate end offsets; lag is approximated as: # (end_offset - current_position). total_lag = 0 for tp, partition_records in records.items(): end_offset = consumer.end_offsets([tp])[tp] # position() is “next offset to fetch”; lag ~= end - position position = consumer.position(tp) total_lag += max(0, end_offset - position) sleep_time = compute_sleep_time(total_lag) if sleep_time > 0: time.sleep(sleep_time) # ---- Process records ---- for tp, msgs in records.items(): for msg in msgs: after = safe_extract_insert_payload(msg.value) if not after: continue user_id = int(after["user_id"]) event_type = after["event_type"] occurred_at = after["occurred_at"] # Keep timestamps for sliding window. # occurred_at is an ISO-ish string; psycopg2 can handle it later, # but for in-memory window logic we'll parse using Postgres later. # For simplicity, we use current time as window anchor in this demo. # In production, parse occurred_at precisely. now = time.time() dq = user_events[user_id] dq.append(now) # Remove timestamps outside window. cutoff = now - WINDOW_SECONDS while dq and dq[0] < cutoff: dq.popleft() events_last_10_seconds = len(dq) # Update feature row. cur.execute( """ INSERT INTO public.user_features (user_id, events_last_10_seconds, last_event_type, features_updated_at) VALUES (%s, %s, %s, now()) ON CONFLICT (user_id) DO UPDATE SET events_last_10_seconds = EXCLUDED.events_last_10_seconds, last_event_type = EXCLUDED.last_event_type, features_updated_at = EXCLUDED.features_updated_at """, (user_id, events_last_10_seconds, event_type), ) processed_offsets[tp] = msg.offset # Commit offsets after processing. consumer.commit() finally: consumer.close() cur.close() conn.close() if __name__ == "__main__": main()
What each important block is doing (and why)
safe_extract_insert_payload: Debezium wraps messages in a JSON envelope. I only process inserts (op == "c"), because updates/deletes require separate logic.user_eventsdeque per user: Deques are efficient for a sliding time window because you can append new events and pop old ones from the left.- Lag calculation: I approximate consumer lag using
end_offsets - positionper partition. That gives a fast signal when the consumer is falling behind. compute_sleep_time: Instead of letting the consumer explode CPU and DB writes during lag spikes, I slow down when lag is high.- Postgres upsert: Features are written idempotently using
ON CONFLICT (user_id) DO UPDATE, so repeated processing doesn’t corrupt the store.
Note: For a true production pipeline, I would parse
occurred_atand use it for the window, nottime.time(). I kept the demo simple while keeping the backpressure behavior intact.
Step 5: Run a quick end-to-end test
1) Insert events in Postgres
In a separate terminal:
INSERT INTO public.user_events (user_id, event_type, occurred_at) VALUES (42, 'purchase', now());
2) Watch Kafka and features
Run your consumer:
python consumer.py
Then check the feature row:
SELECT * FROM public.user_features WHERE user_id = 42;
You should see:
events_last_10_secondsincreasing with more eventslast_event_typeset to the most recent insertfeatures_updated_atmoving forward with each update
Step 6: Verify backpressure works during spikes
To see why this matters, I tried two scenarios:
- Normal load: features update quickly, lag stays low.
- Spike load: I inserted a burst of events (hundreds/thousands quickly).
Without throttling, the consumer spent more time writing than it had budget for, and Kafka lag grew. With throttling enabled:
- Kafka lag peaked lower
- features updated less aggressively under pressure
- but—crucially—freshness stayed inside a predictable envelope
The main win wasn’t “no lag ever.” The main win was containing lag and keeping the analytics result consistent enough to be trusted.
Data observability hooks I added (so I could trust it)
Backpressure is only useful if you can observe it. I tracked three signals:
- Kafka lag (the throttle input)
- features write rate (did DB writes keep pace?)
- freshness:
features_updated_at - now()(how stale features are)
This made “real-time” measurable instead of a vague marketing label.
Conclusion
I built a backpressure-aware real-time feature store by combining Postgres logical replication, Kafka buffering, and a consumer that throttles feature computation based on Kafka lag. The key lesson from my tinkering was that real-time analytics failures often come from processing systems getting overwhelmed—not from incorrect aggregations. Once I made lag explicit and tied it to processing rate, feature freshness became stable and observable.