Core EngineeringApril 30, 2026

Lease-Aware In-Process Job Router With Expiring Locks And Exactly-Once-Ish Scheduling

M

Written by

Maximus Arc

The weekend problem I kept running into

I was building a small system that scheduled short background jobs. Nothing fancy—just a single service, multiple worker threads, and an in-memory queue. The catch was reliability:

  • Jobs arrive continuously.
  • Workers process jobs concurrently.
  • A job might be retried if it fails.
  • A worker crash must not permanently “steal” a job.
  • I didn’t want “exactly-once” in the strongest database sense, but I did want an exactly-once-ish effect: the same job should be processed by at most one worker at a time, and stale locks should recover automatically.

The bug pattern was always the same: workers would grab jobs and “hold” them forever when they crashed mid-processing. Locks existed, but they never expired.

So I implemented a lease-aware in-process job router: workers acquire a lease (a time-bounded lock) before processing; leases automatically expire; retries create new attempts; and a central router keeps the scheduling consistent.

This post documents what I built and the design decisions that made it behave.


The core idea: leases, not forever-locks

A lease is a lock that expires after a fixed duration unless renewed. In practice:

  1. A worker requests a job.
  2. The router chooses a job and issues a lease with an expiration timestamp.
  3. The worker processes the job.
  4. If the worker finishes in time, it completes the job and releases the lease.
  5. If the worker dies or hangs, the lease expires and the job becomes available again.

This pattern is common in distributed systems, but I used it in a single process with multiple worker threads because the failure mode (worker crash) still matters.


Data model: job attempts with dedup by job id

I modeled each job with a stable job_id and an attempt counter:

  • job_id: logical identity for dedup and idempotency at the router level.
  • attempt: increments on retries.
  • status: pending, running, done, or failed.
  • lease_expiry: when the “running” lock expires.

Key property: if a lease expires, the router is allowed to re-run the job by moving it back to pending and increasing attempt.


Working implementation (single process)

Below is a complete, runnable example in Python. It uses:

  • A background router thread that tracks job states.
  • Multiple worker threads that acquire leases by requesting a job from the router.
  • A tiny in-memory store that supports thread-safe transitions.

Full code

import threading import time import uuid from dataclasses import dataclass, field from typing import Dict, Optional, Tuple, List @dataclass class Job: job_id: str payload: str attempt: int = 0 status: str = "pending" # pending, running, done, failed lease_expiry: float = 0.0 lease_owner: Optional[str] = None # worker id last_error: Optional[str] = None created_at: float = field(default_factory=time.time) updated_at: float = field(default_factory=time.time) class LeaseAwareRouter: """ In-process job router with expiring leases. - Workers call get_job(worker_id) to acquire a lease for a pending job. - Router also reclaims expired leases automatically (on each get_job call). - Workers call complete_job(worker_id, job_id, success, error) to finalize. """ def __init__(self, lease_seconds: float = 2.0): self.lease_seconds = lease_seconds self._lock = threading.RLock() self._jobs: Dict[str, Job] = {} # job_id -> Job # For visibility in the demo self._events: List[str] = [] def add_job(self, job_id: str, payload: str) -> None: with self._lock: now = time.time() if job_id not in self._jobs: self._jobs[job_id] = Job(job_id=job_id, payload=payload) self._events.append(f"[{now:.3f}] add_job job_id={job_id}") else: # Dedup behavior: keep existing attempt/status; do not enqueue duplicates. # In real systems you'd decide whether to merge or ignore. self._events.append(f"[{now:.3f}] add_job ignored (dedup) job_id={job_id}") def _reclaim_expired_leases_locked(self, now: float) -> None: # Any job in running state whose lease has expired becomes pending again. for job in self._jobs.values(): if job.status == "running" and job.lease_expiry <= now: job.status = "pending" job.lease_owner = None job.attempt += 1 job.lease_expiry = 0.0 job.last_error = "lease expired; reclaimed by router" job.updated_at = now self._events.append( f"[{now:.3f}] reclaim job_id={job.job_id} attempt={job.attempt}" ) def get_job(self, worker_id: str) -> Optional[Tuple[str, str, int]]: """ Acquire a lease for one eligible job. Returns: (job_id, payload, attempt) or None if no jobs are available. """ with self._lock: now = time.time() self._reclaim_expired_leases_locked(now) # Pick the oldest pending job first. candidates = [ job for job in self._jobs.values() if job.status == "pending" ] if not candidates: return None job = min(candidates, key=lambda j: j.created_at) job.status = "running" job.lease_owner = worker_id job.lease_expiry = now + self.lease_seconds job.updated_at = now self._events.append( f"[{now:.3f}] lease_acquired job_id={job.job_id} owner={worker_id} " f"attempt={job.attempt} lease_until={job.lease_expiry:.3f}" ) return job.job_id, job.payload, job.attempt def complete_job( self, worker_id: str, job_id: str, success: bool, error: Optional[str] = None ) -> bool: """ Finalize a job if (and only if) the worker still owns the active lease. This prevents a late completion from a dead worker from corrupting state. """ with self._lock: now = time.time() job = self._jobs.get(job_id) if not job: return False # Lease ownership check: if someone else owns the lease now, ignore. if job.status != "running" or job.lease_owner != worker_id: self._events.append( f"[{now:.3f}] complete_ignored job_id={job_id} owner={worker_id} " f"(state={job.status}, lease_owner={job.lease_owner})" ) return False if success: job.status = "done" job.last_error = None job.lease_owner = None job.lease_expiry = 0.0 job.updated_at = now self._events.append(f"[{now:.3f}] complete_success job_id={job_id} owner={worker_id}") else: job.status = "failed" job.last_error = error or "unknown error" job.lease_owner = None job.lease_expiry = 0.0 job.updated_at = now self._events.append(f"[{now:.3f}] complete_failed job_id={job_id} owner={worker_id} err={error}") return True def dump_events(self) -> List[str]: with self._lock: return list(self._events) def snapshot(self) -> Dict[str, Job]: with self._lock: return {k: Job(**vars(v)) for k, v in self._jobs.items()} def worker_loop(router: LeaseAwareRouter, worker_id: str, stop_event: threading.Event, behavior_map: Dict[str, float]) -> None: """ behavior_map maps job_id -> processing_time_seconds. If processing_time is negative, simulate a crash (worker dies mid-job). """ while not stop_event.is_set(): got = router.get_job(worker_id) if got is None: time.sleep(0.05) continue job_id, payload, attempt = got process_time = behavior_map.get(job_id, 0.3) start = time.time() try: # Simulate crash: negative time means abrupt stop (no completion call). if process_time < 0: raise RuntimeError(f"simulated worker crash on job_id={job_id}") # Simulate long processing to exceed lease expiry if needed. while True: elapsed = time.time() - start if elapsed >= process_time: break time.sleep(0.02) router.complete_job(worker_id, job_id, success=True) except Exception as e: # Crucially: do NOT call complete_job here. # The router will reclaim when the lease expires. print(f"[{time.time():.3f}] {worker_id} crashed: {e}") stop_event.set() return def demo(): router = LeaseAwareRouter(lease_seconds=1.0) # Add three jobs. We'll make one job take longer than the lease to force reclamation. router.add_job("job-A", "alpha") router.add_job("job-B", "bravo") router.add_job("job-C", "charlie") behavior_map = { "job-A": 0.2, # quick success "job-B": 2.0, # will exceed lease_seconds => reclaimed and re-run "job-C": -1.0, # simulate crash mid-job (no completion call) } stop_event = threading.Event() workers = [] for i in range(2): wid = f"worker-{i}" t = threading.Thread( target=worker_loop, args=(router, wid, stop_event, behavior_map), daemon=True ) workers.append(t) # Start workers for t in workers: t.start() # Let the demo run long enough for lease expirations and retries to happen. time.sleep(4.0) stop_event.set() # Print router event trace print("\n=== Router events ===") for ev in router.dump_events(): print(ev) print("\n=== Final snapshot ===") snap = router.snapshot() for job_id, job in snap.items(): print(f"{job_id}: status={job.status}, attempt={job.attempt}, lease_owner={job.lease_owner}, last_error={job.last_error}") if __name__ == "__main__": demo()

Walkthrough: what happens when I run it

1) Jobs are added (deduped by job_id)

When I call add_job("job-A", ...), the router stores it with:

  • status="pending"
  • attempt=0

If I add the same job_id again later, the router ignores it. That keeps the demo focused on lease behavior.

2) Workers request jobs and acquire leases

Workers call get_job(worker_id). Inside get_job:

  • The router calls _reclaim_expired_leases_locked(now) to move any expired running jobs back to pending and bump attempt.
  • It selects the oldest pending job.
  • It sets:
    • status="running"
    • lease_owner=worker_id
    • lease_expiry=now + lease_seconds

The event log will show lines like:

  • lease_acquired job_id=job-B owner=worker-0 attempt=0 lease_until=...

3) Job B runs longer than the lease and gets reclaimed

job-B is configured with processing_time_seconds=2.0, while lease_seconds=1.0.

So the worker acquires a lease, starts processing, but the lease expires before completion.

When another get_job() call happens, the router reclaims:

  • status: running -> pending
  • attempt increments
  • lease_owner cleared

Then the job can be leased again to a worker.

4) The “late completion” is ignored (important!)

In this demo, the worker for job-B eventually calls complete_job. But there’s a race:

  • If the lease expired, the router likely reassigned job-B (or at least removed ownership) during processing.
  • complete_job() checks:
if job.status != "running" or job.lease_owner != worker_id: return False

So a worker that lost the lease cannot accidentally mark the job done after it should have been retried.

This is the central “exactly-once-ish” safeguard.

5) Job C simulates a crash

For job-C, I set processing time to -1.0, which triggers an exception and exits the worker loop without calling complete_job.

Result: there’s no completion callback. The router can only recover via lease reclamation. That’s why lease expiry is non-negotiable.


Why this design works (and what it does not guarantee)

This router enforces:

  • No two workers process the same job concurrently (for the duration of the lease), because lease_owner is required to complete.
  • Automatic recovery from worker crashes, because leases expire.
  • Late completions don’t corrupt state, because completion validates current lease ownership.

It does not provide strict exactly-once delivery semantics, because a job can be reprocessed after lease expiry. That’s intentional. The typical production answer is pairing this with idempotent job execution (e.g., storing results keyed by job_id + attempt or by a deterministic idempotency key).


Extending the router without changing the core

Two small but common evolutions:

  1. Lease renewal (heartbeats)
    Add a renew_lease(worker_id, job_id) method to extend lease_expiry while processing. This helps long-running jobs avoid unnecessary retries.

  2. Backoff and max attempts
    Instead of reclaiming forever, track attempt limits and schedule backoff delays. That prevents hot-loop retries during persistent failures.

Both improvements keep the same invariant: completion must validate current lease ownership.


Conclusion

I built a lease-aware in-process job router that schedules jobs with expiring locks, automatically reclaims abandoned work, and prevents stale workers from corrupting job state via strict lease ownership checks. The key lesson is that “locks” without expiration are reliability bugs waiting to happen—leases turn that into a recoverable, observable workflow.