Data ScienceApril 15, 2026

Event-Time Dedupe Drift In Kafka Streams With Late Arrivals

S

Written by

Sage Stream

The bug I couldn’t explain: “Same event, different features”

I ran a real-time feature pipeline for an AI model and watched accuracy slowly degrade over a few hours. No big outages—just quiet drift. The weird part was that the raw events looked fine: event IDs were unique, and counts by minute matched expectations.

But the model’s feature store outputs were subtly wrong: for the same event ID, I occasionally saw different computed feature values depending on when the event arrived relative to other events.

After a weekend of tinkering, I traced it to a very specific failure mode:

Event-time deduplication drift caused by late arrivals plus “latest-wins” logic in a streaming dedupe step.

In this post I’ll show exactly how I reproduced it, how I detected it with data observability checks, and how I fixed it.


The concept (in plain terms): event-time vs processing-time

  • Event time: the timestamp inside the event (when it claims it happened).
  • Processing time: when the pipeline receives and processes it.

Streaming systems often deduplicate using event time windows because late events still need to be handled correctly. The problem happens when you mix:

  1. late events (arrive after your dedupe window),
  2. an “update the record if something newer shows up” step,
  3. feature computation that depends on other events’ ordering.

A concrete system setup I simulated

I simulated a stream of events where:

  • each event has an event_id,
  • I compute a feature score based on event content,
  • I dedupe on event time using a sliding window,
  • and I use “latest-wins” to update the stored record when duplicates arrive.

The failure mode appears when an event arrives late and causes the stored record to change, but the feature consumers already used an earlier version.


Reproduction with Python (using pandas as a stand-in)

This isn’t Kafka, but it reproduces the logic bug faithfully: different arrival ordering + dedupe by event time + late arrivals.

Sample data

import pandas as pd events = [ # event_id, event_time, arrival_time, payload ("A", "2026-04-15T10:00:05Z", "2026-04-15T10:00:08Z", 10), ("B", "2026-04-15T10:00:10Z", "2026-04-15T10:00:12Z", 20), # Duplicate of A with different payload arrives late # (In real systems this happens due to retries, replays, or upstream corrections.) ("A", "2026-04-15T10:00:05Z", "2026-04-15T10:01:05Z", 999), # Another event later that changes feature context ordering ("C", "2026-04-15T10:00:20Z", "2026-04-15T10:00:25Z", 30), ] df = pd.DataFrame(events, columns=["event_id", "event_time", "arrival_time", "payload"]) df["event_time"] = pd.to_datetime(df["event_time"]) df["arrival_time"] = pd.to_datetime(df["arrival_time"]) df = df.sort_values("arrival_time").reset_index(drop=True) df

What the feature computation does

I used a simple “feature score” that depends on payload and a tiny context rule:

  • If the event is processed after payload > 50, the score includes a bonus.
def compute_score(payload): base = payload / 10 bonus = 5 if payload > 50 else 0 return base + bonus df["score"] = df["payload"].apply(compute_score) df[["event_id", "event_time", "arrival_time", "payload", "score"]]

The problematic dedupe logic: event-time window + latest-wins by arrival

This mimics a common streaming pattern:

  • Dedupe within a window based on event_time
  • When duplicates appear, keep the one that arrived later (processing-time tie-breaker)
WINDOW = pd.Timedelta(seconds=30) def dedupe_event_time_latest_wins(events_df): """ - Keep one record per event_id within an event-time window. - If multiple duplicates fall in the same window, keep the one that arrived later. """ output = [] # We simulate the pipeline state updates in processing-time order. state = {} # event_id -> (event_time, arrival_time, row) for _, row in events_df.iterrows(): eid = row["event_id"] et = row["event_time"] at = row["arrival_time"] # If we've already seen eid, only keep it if this duplicate is within the window of the original event_time if eid in state: orig_et, orig_at, orig_row = state[eid] same_event_time_bucket = abs((et - orig_et)) <= WINDOW if same_event_time_bucket: # latest-wins by arrival time if at >= orig_at: state[eid] = (et, at, row) else: # Outside window: treat as separate (this is another drift source) # For this demo, we still overwrite to highlight drift. if at >= orig_at: state[eid] = (et, at, row) else: state[eid] = (et, at, row) # Emulate what the feature store would "publish" each time state changes. output.append(state[eid][2]) return pd.DataFrame(output) published_versions = dedupe_event_time_latest_wins(df) published_versions[["event_id", "payload", "score", "arrival_time"]]

Observing the drift

Now check what ended up stored per event_id after each publish.

stored_after_each_publish = published_versions.copy() stored_after_each_publish["stored_score_for_eid"] = stored_after_each_publish["score"] stored_after_each_publish[["event_id", "arrival_time", "payload", "score"]]

Look at event A:

  • it arrives once with payload=10 (score=1.0)
  • then arrives again later with payload=999 (score=99.9)

Even though event_time is identical, the dedupe step updates the stored value using arrival-time tie-breakers.

If the model consumed features between those publishes, it saw the first score; later it would see the second score for the “same” event_id.

That’s the drift.


The data observability check that caught it

I added an observability metric specifically for this failure mode:

“Dedupe inconsistency”: same event_id, multiple distinct feature values within a dedupe horizon

I tracked:

  • event_id
  • event_time
  • the computed feature value(s) (or a hash of them)
  • and counted when the feature hash changes for the same event_id+event_time.
from hashlib import sha256 def hash_value(x): return sha256(str(x).encode("utf-8")).hexdigest()[:12] df_obs = df.copy() df_obs["feature_hash"] = df_obs["score"].apply(hash_value) # Group by event_id + event_time (the "identity" intended to be stable) g = df_obs.groupby(["event_id", "event_time"])["feature_hash"].nunique().reset_index(name="distinct_feature_hashes") g

For event A, distinct_feature_hashes becomes 2. That’s the smoking gun: identity collision produced feature divergence.


Instrumentation rules I used in production logic

I converted the above into practical checks that stream pipelines can run continuously:

  1. Deduped identity key: decide the identity fields (here: event_id + event_time).
  2. Feature materialization hash: hash the computed features right before writing to the feature store.
  3. Time horizon: only evaluate changes within a bounded window (e.g., your maximum expected lateness).
  4. Alert on divergence: if the distinct feature hash count > 1, emit an alert with:
    • the event_id
    • event_time
    • both feature hashes
    • arrival times

This turns “silent accuracy drift” into a crisp “data inconsistency event”.


The fix I implemented: deterministic merge policy

The root cause was using arrival-time tie-breakers to decide which duplicate wins. That makes the “same identity” unstable.

Instead, I made the merge deterministic using a stable ordering:

  • keep the duplicate with the max payload checksum (or max version number if the upstream provides one),
  • never allow later arrivals to change the stored value once an identity is emitted.

Here’s a deterministic merge for the demo:

  • For each event_id+event_time, pick the row with the highest payload (stable, independent of arrival order).
def deterministic_merge(events_df): # Identity key: event_id + event_time # Merge rule: pick max payload (stable) merged = ( events_df.sort_values("payload", ascending=False) .drop_duplicates(subset=["event_id", "event_time"], keep="first") .sort_values("arrival_time") .reset_index(drop=True) ) return merged merged_fixed = deterministic_merge(df) merged_fixed[["event_id", "payload", "score", "event_time", "arrival_time"]]

Now event A resolves to the same stored score regardless of arrival timing.

Verifying no drift in the observability metric

merged_keys = merged_fixed[["event_id", "event_time"]].copy() fixed_feature_by_key = merged_fixed.assign(feature_hash=merged_fixed["score"].apply(hash_value)) # Compare distinct feature hashes per identity key in the fixed pipeline output fixed_g = fixed_feature_by_key.groupby(["event_id", "event_time"])["feature_hash"].nunique().reset_index(name="distinct_feature_hashes") fixed_g

For A, the distinct count returns to 1.


How I’d wire this into a streaming feature pipeline

In a real pipeline (Kafka Streams, Flink, Spark Structured Streaming), the same pattern holds:

  • Before writing to the feature store, compute a deterministic merge result for each dedupe identity key.
  • Maintain state keyed by the identity key.
  • When late events arrive:
    • they may be allowed to update state for correctness,
    • but the merge policy must be deterministic and stable so that the final materialized features don’t oscillate.

And importantly:

  • run the dedupe inconsistency check as an ongoing query over your late-allowed horizon.

Final takeaway

I learned that “data observability” isn’t just missing-data dashboards—it’s also semantic stability checks. In my case, late arrivals plus an event-time dedupe step with arrival-time tie-breakers caused the “same event identity” to produce different feature values over time. By hashing materialized features per (event_id, event_time) identity and alerting on distinct hashes, I turned silent accuracy drift into a concrete, detectable inconsistency—and then fixed it with a deterministic merge policy that makes duplicates converge to a single, stable feature representation.