Data ScienceApril 3, 2026

Building A Streaming “Monotonic Timestamp” Guardrail In Kafka Connect

S

Written by

Sage Stream

Building a Streaming “Monotonic Timestamp” Guardrail in Kafka Connect

A weird class of incidents kept biting me in real-time pipelines: event times were sometimes going backwards. Not just “late-arriving events” in the normal sense—actual non-monotonic timestamps within the same entity stream. That broke rolling window features, caused backfills to double-count, and made model training datasets look haunted.

I wanted a guardrail that fails fast (or at least quarantines) whenever a key’s event timestamps violate monotonicity. What I built is a Kafka Connect SMT (Single Message Transform) that tracks the last seen event timestamp per key and routes bad records to a dead-letter topic.

Below is what I implemented, how it behaves when you run it, and the exact code.


Problem: timestamps go backwards (within a key)

Imagine you ingest user events like:

  • key: user_id=42
  • event time: 2026-04-03T10:00:01Z
  • next event time for the same key: 2026-04-03T10:00:00Z (oops, earlier)

Even if the system clock is fine, this can happen due to upstream bugs, clock skew, or incorrect parsing.

In analytics, rolling features often assume “new events come later.” When the timestamp goes backwards, you get inconsistent aggregates and broken “last event wins” logic.

So I enforced this rule:

For each key, event_timestamp must be >= last seen event_timestamp.

When the rule breaks, I don’t silently accept it—I quarantine the message with metadata explaining why.


Architecture: Kafka Connect + SMT + a dead-letter topic

Kafka Connect processes records through transforms (SMTs). I wrote an SMT that:

  1. Extracts a timestamp field from the record.
  2. Maintains lastTimestampByKey in memory.
  3. If timestamp < lastTimestamp, it:
    • annotates the record with error details
    • forwards it to a quarantine topic

In production, you’d typically persist state (or use a stream processor), but for a guardrail and fast iteration, in-memory state is a great starting point.

Input record shape

I assumed JSON messages like:

{ "user_id": 42, "event_timestamp": "2026-04-03T10:00:01Z", "event_type": "click" }

The SMT: MonotonicTimestampGuard (Java)

This SMT works with Kafka Connect’s org.apache.kafka.connect.transforms.Transformation.

Step 1: Maven project skeleton

Create a Maven project and include Kafka Connect dependencies.

<!-- pom.xml --> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example</groupId> <artifactId>monotonic-timestamp-guard</artifactId> <version>1.0.0</version> <properties> <maven.compiler.release>17</maven.compiler.release> <kafka.version>3.7.0</kafka.version> <slf4j.version>2.0.13</slf4j.version> </properties> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>connect-transforms</artifactId> <version>${kafka.version}</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>connect-api</artifactId> <version>${kafka.version}</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>${slf4j.version}</version> </dependency> </dependencies> </project>

Step 2: Implement the transform

// src/main/java/com/example/MonotonicTimestampGuard.java package com.example; import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.transforms.Transformation; import org.apache.kafka.connect.transforms.util.Requirements; import org.apache.kafka.connect.transforms.util.SchemaUtil; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; import java.time.Instant; import java.time.format.DateTimeParseException; import java.util.HashMap; import java.util.Map; public class MonotonicTimestampGuard<R extends ConnectRecord<R>> implements Transformation<R> { public static final String OVERVIEW_KEY_FIELD = "key_field"; public static final String TIMESTAMP_FIELD = "timestamp_field"; public static final String QUARANTINE_ERROR_FIELD = "quarantine_error"; public static final String QUARANTINE_TOPIC = "quarantine_topic"; public static final String DEFAULT_BEHAVIOR = "behavior"; // behavior: // - "drop" => drop bad records // - "quarantine" => route to quarantine topic private static final String BEHAVIOR_DROP = "drop"; private static final String BEHAVIOR_QUARANTINE = "quarantine"; private String keyField; private String timestampField; private String quarantineTopic; private String quarantineErrorField; private String behavior; // In-memory guardrail state: // last seen event timestamp per key private final Map<String, Instant> lastTimestampByKey = new HashMap<>(); @Override public void configure(Map<String, ?> configs) { this.keyField = (String) configs.get(OVERVIEW_KEY_FIELD); this.timestampField = (String) configs.get(TIMESTAMP_FIELD); this.quarantineTopic = (String) configs.get(QUARANTINE_TOPIC); this.quarantineErrorField = (String) configs.getOrDefault(QUARANTINE_ERROR_FIELD, "quarantine_error"); this.behavior = (String) configs.getOrDefault(DEFAULT_BEHAVIOR, BEHAVIOR_QUARANTINE); Requirements.requireNonNull(keyField, OVERVIEW_KEY_FIELD); Requirements.requireNonNull(timestampField, TIMESTAMP_FIELD); if (BEHAVIOR_QUARANTINE.equals(behavior)) { Requirements.requireNonNull(quarantineTopic, QUARANTINE_TOPIC); } Requirements.requireNonNull(behavior, DEFAULT_BEHAVIOR); } @Override public R apply(R record) { // We assume the value is a Struct (common with JSON converters into Connect data). // If you use byte[] or Map, you'd adjust this extraction logic. Object valueObj = record.value(); if (!(valueObj instanceof Struct struct)) { // If the record isn't in the expected form, treat as a parsing failure. return quarantineOrDrop(record, "invalid_record_type", String.valueOf(valueObj)); } Struct value = (Struct) valueObj; // Extract key value Object keyValObj = value.get(keyField); if (keyValObj == null) { return quarantineOrDrop(record, "missing_key_field", keyField + " was null"); } String keyVal = String.valueOf(keyValObj); // Extract timestamp string and parse it Object tsObj = value.get(timestampField); if (tsObj == null) { return quarantineOrDrop(record, "missing_timestamp_field", timestampField + " was null"); } Instant ts; try { // Supports ISO-8601 timestamps like 2026-04-03T10:00:01Z ts = Instant.parse(String.valueOf(tsObj)); } catch (DateTimeParseException e) { return quarantineOrDrop(record, "invalid_timestamp_format", e.getMessage()); } Instant last = lastTimestampByKey.get(keyVal); // If it's the first time we see the key, accept it. if (last == null) { lastTimestampByKey.put(keyVal, ts); return record; } // Monotonic rule: // ts must be >= last if (ts.isBefore(last)) { return quarantineOrDrop(record, "non_monotonic_timestamp", "timestamp " + ts + " is before last seen " + last + " for key " + keyVal); } // Update state lastTimestampByKey.put(keyVal, ts); return record; } private R quarantineOrDrop(R record, String errorType, String details) { if (BEHAVIOR_DROP.equals(behavior)) { // Returning null tells Connect to drop the record. return null; } // Quarantine path: // Add metadata field to the value if possible. Object valueObj = record.value(); if (valueObj instanceof Struct struct) { Schema originalSchema = struct.schema(); // Build a new schema that includes the original fields + the error field SchemaBuilder builder = SchemaUtil.copySchemaBasics(originalSchema, SchemaBuilder.struct().name(originalSchema.name())); Schema newSchema = originalSchema; // Instead of fully rebuilding schema (verbose), we use Struct with a best-effort approach: // If the schema is optional, Connect can still handle additional fields by using a new Struct. // For simplicity, rebuild a struct schema with one extra string field. Schema rebuilt = SchemaBuilder.struct().name(originalSchema.name()); for (org.apache.kafka.connect.data.Field f : originalSchema.fields()) { rebuilt.field(f.name(), f.schema()); } rebuilt.field(quarantineErrorField, Schema.OPTIONAL_STRING_SCHEMA); Struct newStruct = new Struct(rebuilt); for (org.apache.kafka.connect.data.Field f : originalSchema.fields()) { newStruct.put(f.name(), struct.get(f.name())); } String combined = errorType + ": " + details; newStruct.put(quarantineErrorField, combined); // Create a new record pointing to quarantine topic. // Note: In SourceRecord/SinkRecord, the "topic" differs. We'll handle both. if (record instanceof SourceRecord source) { return (R) new SourceRecord(source.sourcePartition(), source.sourceOffset(), quarantineTopic, source.keySchema(), source.key(), newStruct.schema(), newStruct, source.timestamp()); } if (record instanceof SinkRecord sink) { return (R) new SinkRecord(quarantineTopic, sink.kafkaPartition(), sink.keySchema(), sink.key(), newStruct.schema(), newStruct, sink.timestamp(), sink.headers()); } } // If we can't annotate, still quarantine by changing topic. // Connect records generally allow topic switching in both SourceRecord and SinkRecord. if (record instanceof SourceRecord source) { return (R) new SourceRecord(source.sourcePartition(), source.sourceOffset(), quarantineTopic, record.keySchema(), record.key(), record.valueSchema(), record.value(), source.timestamp()); } if (record instanceof SinkRecord sink) { return (R) new SinkRecord(quarantineTopic, sink.kafkaPartition(), record.keySchema(), record.key(), record.valueSchema(), record.value(), sink.timestamp(), sink.headers()); } return record; } @Override public void close() {} @Override public ConfigDef config() { // Optional in this simplified example. return null; } @Override public R apply(R record, Schema schema) { return apply(record); } @Override public org.apache.kafka.common.ConfigDef configDef() { return null; } @Override public Transformation<R> reverse() { return null; } }

Why this approach works

  • The SMT runs inside Kafka Connect, so it happens at ingestion time.
  • State (lastTimestampByKey) lets it enforce monotonicity per key.
  • Returning null drops the record; changing topic routes it to quarantine.

Step 3: Register the SMT

Kafka Connect discovers transforms via the plugin class path and the transforms config.

Build the jar:

mvn -q -DskipTests package

Copy the jar to your Connect plugins directory, for example:

cp target/monotonic-timestamp-guard-1.0.0.jar /usr/share/java/kafka-connect/

Step 4: Configure Kafka Connect

Here’s an example sink connector config that uses the SMT.

Key point: this assumes your JSON converter turns values into Connect Structs with fields.

{ "name": "events-sink-with-monotonic-guard", "config": { "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector", "tasks.max": "1", "topics": "events", "file": "/tmp/quarantine-check", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "true", "transforms": "guard", "transforms.guard.type": "com.example.MonotonicTimestampGuard", "transforms.guard.key_field": "user_id", "transforms.guard.timestamp_field": "event_timestamp", "transforms.guard.behavior": "quarantine", "transforms.guard.quarantine_topic": "events.quarantine", "transforms.guard.quarantine_error_field": "quarantine_error" } }

What happens when I run this

  1. A message with a valid timestamp for a key passes through unchanged.
  2. The SMT updates lastTimestampByKey[user_id].
  3. A later message with an earlier event_timestamp:
    • gets a new quarantine_error field like:
      • non_monotonic_timestamp: timestamp ... is before last seen ...
    • gets redirected to events.quarantine.

Step 5: Test with example events

Produce some test messages to events:

kafka-console-producer --bootstrap-server localhost:9092 --topic events --property parse.key=true --property key.separator=: 42 "{\"user_id\":42,\"event_timestamp\":\"2026-04-03T10:00:01Z\",\"event_type\":\"click\"}" 42 "{\"user_id\":42,\"event_timestamp\":\"2026-04-03T10:00:00Z\",\"event_type\":\"view\"}" 42 "{\"user_id\":42,\"event_timestamp\":\"2026-04-03T10:00:02Z\",\"event_type\":\"purchase\"}"

Consume from the quarantine topic:

kafka-console-consumer --bootstrap-server localhost:9092 --topic events.quarantine --from-beginning

You should see only the non-monotonic one, annotated with quarantine_error.


Data observability payoff: measurable “bad event” rates

Once the guardrail exists, it turns an invisible data quality problem into a visible signal:

  • quarantine topic volume over time
  • breakdown of quarantine_error types
  • per-key repeat offenders (useful for isolating upstream sources)

That makes your AI data preparation pipeline more reliable because your training and feature generation logic stops fighting corrupted event time sequences.


Production notes (what I learned the hard way)

  • State scope matters. In-memory state resets on Connect restart. For robust governance, you’d persist last timestamps (or implement this check in a stream processor with keyed state).
  • Definition of monotonicity should match your domain. For some pipelines you may allow small backward jitter (e.g., clock skew) by enforcing ts >= last - allowed_lag.
  • Quarantine should be structured. Adding error metadata to the payload is what makes downstream debugging possible without re-running ingestion.

Conclusion

I built a Kafka Connect SMT that enforces a per-key monotonic timestamp rule by tracking the last seen event_timestamp and routing violations to a quarantine topic with an explicit quarantine_error. The big win is turning “time went backwards” from a silent, downstream-model-breaking mystery into an immediate, observable data quality event you can measure, debug, and govern.