Core EngineeringApril 16, 2026

Designing A Deterministic Outbox For Crdt Event Streams In Go

M

Written by

Maximus Arc

I hit a fun (and annoying) problem while building a service that stores user edits as a stream of CRDT (Conflict-free Replicated Data Types) operations. CRDTs let multiple replicas apply operations in any order and still converge—but only if the exact same operations end up persisted and replayed in the right way.

The bug I chased for days looked like this: after a redeploy, my system sometimes “reordered” a tiny subset of operations. Everything still eventually converged, but some downstream projections (search indexing, audit logs, billing rollups) produced different results for the same logical document.

That led me to a niche idea that ended up being the core fix: a deterministic outbox that turns “whatever order my code happens to enqueue” into “a stable, replayable order based on operation identity.” Below is the design I implemented in Go, including working code you can run.


The symptom: identical CRDTs, different downstream views

I had a pipeline like:

  1. Client sends CRDT ops for a document.
  2. Service validates and appends ops.
  3. Service emits events to other systems (indexer, audit log, etc.).

The data store update and the event emission weren’t atomic. That’s a classic distributed-systems issue: you can persist state but crash before emitting the event, or emit but crash before persisting, etc.

To solve that, I used the Transactional Outbox pattern:

  • In the same database transaction as the state update, I insert an “outbox event” row.
  • A background worker reads outbox rows and publishes them to the message broker.
  • After successful publish, it marks the row as sent.

That pattern fixes “lost events.” It didn’t fix my issue, because my worker could publish outbox rows in a non-deterministic order after restarts (due to concurrency and DB scan order). For most event types that’s fine. For CRDT operation streams, it can make downstream systems temporarily observe different sequences, and some projections treat sequence as meaningful.

So the missing piece was: make the outbox publishing order deterministic across retries and restarts.


The deterministic outbox: stable ordering by operation identity

CRDT operations usually have fields like:

  • docID — which document they apply to
  • actorID — which replica produced it
  • counter — per-actor monotonic counter

A common identity for an operation is (actorID, counter); I used exactly that.

Key design rule

When publishing events for a given document:

  • Determine a stable sequence key: seqKey = actorID + ":" + counter
  • Ensure workers publish events in ascending (docID, seqKey) order.
  • Still keep the outbox reliable: retries are allowed, but ordering remains stable.

Why it works

Even if two replicas interleave operations, their identities define a deterministic total order for projection purposes. This makes replay idempotent and consistent.


Architecture sketch

Components:

  1. HTTP handler receives CRDT operations
  2. DB transaction:
    • upserts a “document version marker”
    • inserts one outbox row per operation
  3. Outbox worker:
    • reads unsent outbox rows
    • publishes in deterministic order
    • marks rows as sent

I used PostgreSQL for clarity, database/sql, and a fake “publisher” that just logs.


Working example (Go + PostgreSQL + outbox worker)

1) Database schema

This schema:

  • stores an outbox row for each operation event
  • has published_at to track success
  • includes a seq_key column so ordering is stable and index-friendly
-- schema.sql CREATE TABLE IF NOT EXISTS crdt_outbox ( id UUID PRIMARY KEY, doc_id TEXT NOT NULL, actor_id TEXT NOT NULL, counter BIGINT NOT NULL, seq_key TEXT NOT NULL, payload JSONB NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT now(), published_at TIMESTAMPTZ NULL ); CREATE INDEX IF NOT EXISTS idx_outbox_unsent_order ON crdt_outbox (doc_id, seq_key) WHERE published_at IS NULL; CREATE INDEX IF NOT EXISTS idx_outbox_unsent ON crdt_outbox (published_at) WHERE published_at IS NULL;

2) Go types and the “deterministic outbox” publishing loop

The outbox worker uses a transaction with row locking to avoid duplicate publishes when multiple workers run. It selects a small batch of unsent rows ordered by (doc_id, seq_key), locks them, then publishes.

// main.go package main import ( "context" "database/sql" "encoding/json" "fmt" "log" "os" "sort" "strings" "time" "github.com/google/uuid" _ "github.com/jackc/pgx/v5/stdlib" ) type CRDTOp struct { ActorID string `json:"actor_id"` Counter int64 `json:"counter"` Op string `json:"op"` } type OutboxEvent struct { DocID string `json:"doc_id"` Op CRDTOp `json:"op"` SentAt time.Time `json:"sent_at,omitempty"` SeqKey string `json:"seq_key"` OpID string `json:"op_id"` } func seqKey(actorID string, counter int64) string { // Deterministic identity -> deterministic ordering. // I kept it simple: actorID lexical order, then counter numeric order. return actorID + ":" + fmt.Sprintf("%020d", counter) } func opID(actorID string, counter int64) string { return actorID + ":" + fmt.Sprintf("%d", counter) } type Publisher interface { Publish(ctx context.Context, event OutboxEvent) error } // A fake publisher for demonstration. Replace with Kafka/NATS/etc in real systems. type LogPublisher struct{} func (p LogPublisher) Publish(ctx context.Context, event OutboxEvent) error { b, _ := json.Marshal(event) log.Printf("[publish] %s", string(b)) return nil } func ensureSchema(ctx context.Context, db *sql.DB) error { // For the blog, I keep schema creation explicit. _, err := db.ExecContext(ctx, ` CREATE TABLE IF NOT EXISTS crdt_outbox ( id UUID PRIMARY KEY, doc_id TEXT NOT NULL, actor_id TEXT NOT NULL, counter BIGINT NOT NULL, seq_key TEXT NOT NULL, payload JSONB NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT now(), published_at TIMESTAMPTZ NULL ); CREATE INDEX IF NOT EXISTS idx_outbox_unsent_order ON crdt_outbox (doc_id, seq_key) WHERE published_at IS NULL; CREATE INDEX IF NOT EXISTS idx_outbox_unsent ON crdt_outbox (published_at) WHERE published_at IS NULL; `) return err } func enqueueOpsTx(ctx context.Context, db *sql.DB, docID string, ops []CRDTOp) error { // One transaction: // - In a real system, you would also update your CRDT state here. // - Then you insert outbox rows for each operation event. tx, err := db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable}) if err != nil { return err } defer tx.Rollback() for _, op := range ops { skey := seqKey(op.ActorID, op.Counter) e := OutboxEvent{ DocID: docID, Op: op, SeqKey: skey, OpID: opID(op.ActorID, op.Counter), } payload, err := json.Marshal(e) if err != nil { return err } _, err = tx.ExecContext(ctx, ` INSERT INTO crdt_outbox (id, doc_id, actor_id, counter, seq_key, payload) VALUES ($1, $2, $3, $4, $5, $6) `, uuid.New(), docID, op.ActorID, op.Counter, skey, payload) if err != nil { return err } } if err := tx.Commit(); err != nil { return err } return nil } func publishOutboxDeterministically(ctx context.Context, db *sql.DB, publisher Publisher, workerID string, batchSize int) error { // This worker loop: // 1) Select unsent outbox rows ordered by (doc_id, seq_key) and lock them. // 2) Publish them in the exact order they were selected. // 3) Mark them as published. tx, err := db.BeginTx(ctx, &sql.TxOptions{Isolation: sql.LevelReadCommitted}) if err != nil { return err } rows, err := tx.QueryContext(ctx, ` SELECT id, payload FROM crdt_outbox WHERE published_at IS NULL ORDER BY doc_id, seq_key LIMIT $1 FOR UPDATE SKIP LOCKED `, batchSize) if err != nil { tx.Rollback() return err } defer rows.Close() type candidate struct { id string payload []byte } var candidates []candidate for rows.Next() { var id string var payload []byte if err := rows.Scan(&id, &payload); err != nil { tx.Rollback() return err } candidates = append(candidates, candidate{id: id, payload: payload}) } if err := rows.Err(); err != nil { tx.Rollback() return err } // Nothing to do if len(candidates) == 0 { _ = tx.Rollback() return nil } // Publish in deterministic order: we rely on ORDER BY and keeping the slice order. for _, c := range candidates { var evt OutboxEvent if err := json.Unmarshal(c.payload, &evt); err != nil { tx.Rollback() return err } evt.SentAt = time.Now().UTC() log.Printf("[worker %s] publishing %s for doc=%s seq=%s", workerID, evt.OpID, evt.DocID, evt.SeqKey) if err := publisher.Publish(ctx, evt); err != nil { tx.Rollback() return err } _, err := tx.ExecContext(ctx, ` UPDATE crdt_outbox SET published_at = now() WHERE id = $1 AND published_at IS NULL `, c.id) if err != nil { tx.Rollback() return err } } if err := tx.Commit(); err != nil { return err } return nil } func parseOpsFromJSON(s string) ([]CRDTOp, error) { var ops []CRDTOp if err := json.Unmarshal([]byte(s), &ops); err != nil { return nil, err } return ops, nil } func main() { ctx := context.Background() dsn := os.Getenv("DATABASE_URL") if dsn == "" { dsn = "postgres://postgres:postgres@localhost:5432/postgres?sslmode=disable" } db, err := sql.Open("pgx", dsn) if err != nil { log.Fatal(err) } defer db.Close() if err := ensureSchema(ctx, db); err != nil { log.Fatal(err) } publisher := LogPublisher{} // Demo workload: // I enqueue operations in a deliberately scrambled order, // then I run the outbox worker and observe the deterministic publish order. // // The operations are for the same doc. Worker should publish by (actor_id, counter). docID := "doc-123" opsScrambled := []CRDTOp{ {ActorID: "b", Counter: 2, Op: "insert:X"}, {ActorID: "a", Counter: 1, Op: "insert:A"}, {ActorID: "a", Counter: 3, Op: "insert:C"}, {ActorID: "b", Counter: 1, Op: "insert:B"}, } // Show that input ordering is not sorted. { var keys []string for _, op := range opsScrambled { keys = append(keys, seqKey(op.ActorID, op.Counter)) } log.Printf("input seq keys (scrambled): %v", keys) } if err := enqueueOpsTx(ctx, db, docID, opsScrambled); err != nil { log.Fatal(err) } // Run one worker pass; in production you'd loop continuously. if err := publishOutboxDeterministically(ctx, db, publisher, "W1", 100); err != nil { log.Fatal(err) } // Validate order empirically by reading remaining none. var remaining int if err := db.QueryRowContext(ctx, `SELECT count(*) FROM crdt_outbox WHERE published_at IS NULL`).Scan(&remaining); err != nil { log.Fatal(err) } log.Printf("remaining unsent outbox rows: %d", remaining) // Optional: clean up (keeps demo repeatable). _, _ = db.ExecContext(ctx, `DELETE FROM crdt_outbox WHERE doc_id = $1`, docID) // Additional sanity check: deterministic ordering function. expected := make([]string, 0, len(opsScrambled)) for _, op := range opsScrambled { expected = append(expected, seqKey(op.ActorID, op.Counter)) } sort.Strings(expected) log.Printf("expected deterministic order (sorted seq keys): %s", strings.Join(expected, ", ")) }

How to run it

1) Start PostgreSQL

For local testing:

docker run --name pg -e POSTGRES_PASSWORD=postgres -e POSTGRES_USER=postgres -e POSTGRES_DB=postgres -p 5432:5432 -d postgres:16

2) Create the table (optional because the Go code does it)

psql "postgres://postgres:postgres@localhost:5432/postgres?sslmode=disable" -f schema.sql

3) Run the Go program

go mod init example.com/outbox && go get github.com/jackc/pgx/v5/stdlib github.com/google/uuid go run main.go

You’ll see logs like:

  • input seq keys (scrambled): [...]
  • worker publishes events in (doc_id, seq_key) order
  • remaining unsent rows becomes 0

The important part is that even though the input operations were scrambled, the publishing order is stable because:

  • outbox rows carry seq_key
  • worker selects with ORDER BY doc_id, seq_key
  • worker locks rows with FOR UPDATE SKIP LOCKED to avoid duplicate publishes across workers

What I learned (and the design constraints)

  1. Transactional Outbox solves “don’t lose events,” not “publish in the same order.”
    When ordering matters to projections, you must make ordering explicit.

  2. Determinism needs a key, not a vibe.
    I used a concrete operation identity (actor_id, counter) to derive seq_key. That’s stable across restarts and independent of thread scheduling or DB scan behavior.

  3. Don’t rely on concurrency behavior.
    Even with correct transactions, multiple workers and retry paths can reorder delivery. ORDER BY ... plus row locking makes the worker’s action order deterministic.


Conclusion

I built a deterministic outbox for CRDT operation event streams by combining the Transactional Outbox pattern with explicit, stable ordering derived from operation identity. The end result is that downstream projections see the same sequence after crashes and redeploys, even when worker concurrency and database scan order would otherwise cause reordering.