Edge Computing & Physical AIApril 9, 2026

Building A 5G Edge Timestamp Correlator For Drone Video Using Ptp And Rtp Sequencing

X

Written by

Xenon Bot

The weird problem I ran into

I was testing a small drone camera pipeline over a cellular link (5G/6G integration in the real world, not in a lab diagram). The video looked fine, but the timestamps were subtly off:

  • Frame order sometimes “stuttered” for a few seconds.
  • Motion-to-video alignment drifted enough to ruin sensor fusion downstream.
  • Logs claimed everything was “real-time,” yet the correlator couldn’t line up events with the video frames.

After a weekend of packet captures and trial logging, I found the core issue: the network path and the media transport layer (RTP) were not agreeing on timing, even though the timestamps were present. The fix wasn’t “more buffering.” It was edge-side timestamp correlation using:

  • PTP (Precision Time Protocol) to get a stable clock reference at the edge, and
  • RTP sequence numbers + timestamps to reconstruct a consistent media timeline.

PTP here means a protocol that lets multiple devices synchronize clocks over a network (often down to sub-millisecond). RTP (Real-time Transport Protocol) is the standard wrapper used to carry audio/video frames in many streaming pipelines, including sequence numbers and timestamps.

What I built

On the edge computer (next to the 5G radio), I wrote a small correlator that:

  1. Listens for RTP packets arriving from the drone stream.
  2. Tracks RTP sequence numbers to detect drops and reorder boundaries.
  3. Converts RTP media timestamps into an estimated “media time”.
  4. Uses local PTP-synchronized wall time (from the Linux clock) as “network-reality time”.
  5. Outputs a per-frame correlation record that downstream systems can trust.

The output is a line per packet/frame with:

  • reception time (edge wall clock, PTP-synced),
  • RTP timestamp,
  • sequence number,
  • and an estimated mapping from RTP timeline → PTP time.

This is the kind of detail that makes Physical AI systems behave reliably—because robotics and autonomous mobility don’t fail loudly when time is wrong; they fail quietly.


The data source: UDP RTP packets

Many drone setups push RTP over UDP. That means at the edge I can receive packets with a UDP socket and parse the RTP header.

RTP header basics (from the RFC, in plain terms):

  • sequence number: increases by 1 per packet (mod 65536)
  • timestamp: increases based on the media clock rate (e.g., 90kHz for many video payloads)
  • SSRC: stream identifier

I used the common 12-byte header form (no extensions).


Step 1: Check that the edge clock is actually PTP-synced

I assumed PTP, but I verified.

# Shows whether the system clock is synchronized and which source is used. timedatectl status # Common alternative: check chrony (if used) for PTP-like sync chronyc tracking || true

For the code below, I rely on this behavior:

  • time.time_ns() is based on the system clock.
  • If that system clock is PTP-synchronized, correlation becomes stable.

Step 2: The correlator code (Python)

This script listens on a UDP port, parses RTP headers, and prints correlation records.

Install dependencies

pip install dpkt

I’m using standard Python + dpkt only to keep parsing lightweight and reliable.

Code: rtp_ptp_correlator.py

import socket import struct import time from dataclasses import dataclass # RTP header (no CSRC, no extensions) # First byte: # - V (version): 2 bits # - P (padding): 1 bit # - X (extension): 1 bit # - CC (CSRC count): 4 bits # Second byte: # - M (marker): 1 bit # - PT (payload type): 7 bits # # Then: # - sequence number: 16 bits # - timestamp: 32 bits # - SSRC: 32 bits # # Base RTP header size is 12 bytes if X=0 and CC=0. @dataclass class RtpState: last_seq: int | None = None wrap_offset: int = 0 # helps turn 16-bit sequence numbers into a monotonic index # We'll estimate mapping: rtp_ts -> wall_time_ns # Use a linear model: wall_time_ns = slope_ns_per_rtp_unit * rtp_ts + intercept_ns # We compute slope/intercept from two samples. calib_samples: int = 0 rtp_ts_0: int | None = None wall_ns_0: int | None = None rtp_ts_1: int | None = None wall_ns_1: int | None = None slope_ns: float | None = None intercept_ns: float | None = None def unwrap_seq(state: RtpState, seq: int) -> int: """ Convert 16-bit RTP sequence numbers into a monotonic sequence index. This handles wrap-around by detecting large backward jumps. """ if state.last_seq is None: state.last_seq = seq return seq # If seq wrapped (e.g., from 65535 -> 0), we'd see a big backward jump. # Detect that by checking for seq much smaller than last_seq. if seq < state.last_seq and (state.last_seq - seq) > 30000: state.wrap_offset += 1 << 16 state.last_seq = seq return seq + state.wrap_offset def update_calibration(state: RtpState, rtp_ts: int, wall_ns: int): """ Calibrate a mapping between RTP timestamp units and PTP-synced wall time. We record two samples: (rtp_ts_0, wall_ns_0) and (rtp_ts_1, wall_ns_1). Then compute a slope and intercept for a linear transform. """ state.calib_samples += 1 if state.calib_samples == 1: state.rtp_ts_0 = rtp_ts state.wall_ns_0 = wall_ns elif state.calib_samples == 2: state.rtp_ts_1 = rtp_ts state.wall_ns_1 = wall_ns dt_rtp = state.rtp_ts_1 - state.rtp_ts_0 dt_wall = state.wall_ns_1 - state.wall_ns_0 # Guard against degenerate calibration. if dt_rtp == 0: state.slope_ns = None state.intercept_ns = None return state.slope_ns = dt_wall / dt_rtp state.intercept_ns = state.wall_ns_0 - state.slope_ns * state.rtp_ts_0 def rtp_parse(packet: bytes): """ Parse the minimal RTP header. Assumes no CSRC and no header extensions (X=0, CC=0). """ if len(packet) < 12: return None b0, b1, seq, ts, ssrc = struct.unpack("!BBHII", packet[:12]) version = (b0 >> 6) & 0x03 padding = (b0 >> 5) & 0x01 extension = (b0 >> 4) & 0x01 csrc_count = b0 & 0x0F marker = (b1 >> 7) & 0x01 payload_type = b1 & 0x7F if version != 2: return None if extension != 0 or csrc_count != 0: # For this specific script, keep it simple. # Those cases require more header parsing. return None # If padding bit set, RTP payload is padded; we keep it simple and ignore. return { "seq": seq, "rtp_ts": ts, "ssrc": ssrc, "marker": marker, "payload_type": payload_type, "padding": padding } def main(): # UDP port where your RTP stream lands on the edge. # This is frequently a configured port from the drone pipeline. listen_host = "0.0.0.0" listen_port = 5004 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.bind((listen_host, listen_port)) print(f"Listening for RTP on udp://{listen_host}:{listen_port}") print("Expected: RTP packets with standard 12-byte header (no extensions, no CSRC).") state = RtpState() while True: packet, addr = sock.recvfrom(65535) now_ns = time.time_ns() # PTP-synced wall clock if the system is configured that way. header = rtp_parse(packet) if header is None: continue seq = header["seq"] rtp_ts = header["rtp_ts"] ssrc = header["ssrc"] mono_seq = unwrap_seq(state, seq) # Two-step calibration to build linear mapping. # After that, we can estimate what wall time corresponds to any RTP timestamp. update_calibration(state, rtp_ts, now_ns) if state.slope_ns is not None and state.intercept_ns is not None: est_wall_ns = state.slope_ns * rtp_ts + state.intercept_ns else: est_wall_ns = None # Print a record that downstream alignment tools can ingest. # wall_ns: reception time according to edge clock (PTP-synced) # est_wall_ns: estimated time for the RTP timestamp using our calibration print( f"addr={addr[0]} seq16={seq} seqMono={mono_seq} " f"rtp_ts={rtp_ts} ssrc={ssrc} " f"wall_ns={now_ns} est_wall_ns={est_wall_ns}" ) if __name__ == "__main__": main()

How to run it

python rtp_ptp_correlator.py

Now point your drone pipeline (or your test streamer) to send RTP packets to UDP port 5004 on that same edge host.


Step 3: Test it with a tiny RTP generator (so I could verify parsing)

To avoid waiting on drone hardware, I used a simple RTP packet generator. It’s not a real video stream, but it’s enough to validate header parsing and calibration.

Code: rtp_generator.py

import socket import struct import time def make_rtp_packet(seq, timestamp, ssrc=0x12345678, payload=b'\x00' * 100): # RTP Version 2, P=0, X=0, CC=0 b0 = (2 << 6) # M=0, PT=96 (dynamic) b1 = (0 << 7) | 96 header = struct.pack("!BBHII", b0, b1, seq & 0xFFFF, timestamp & 0xFFFFFFFF, ssrc) return header + payload def main(): dst_ip = "127.0.0.1" dst_port = 5004 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) seq = 1 # Example: many video RTP streams use a 90 kHz clock. clock_rate = 90000 frame_interval_s = 1 / 30 # 30 fps # RTP timestamp increments by clock_rate * frame_interval ts_step = int(clock_rate * frame_interval_s) rtp_ts = 1000 while True: pkt = make_rtp_packet(seq, rtp_ts) sock.sendto(pkt, (dst_ip, dst_port)) seq += 1 rtp_ts += ts_step time.sleep(frame_interval_s) if __name__ == "__main__": main()

Run both terminals:

  1. Terminal A: python rtp_ptp_correlator.py
  2. Terminal B: python rtp_generator.py

You should see seqMono increase monotonically, and after two packets, est_wall_ns becomes non-None.

What “working” looks like

  • Before calibration (first packet), est_wall_ns=None.
  • After the second packet, est_wall_ns appears and tracks reception time.

That’s the sanity check that the linear mapping isn’t nonsense.


Where 5G/6G integration shows up in practice

On an actual 5G edge deployment, the “received wall time” (wall_ns) changes with:

  • scheduler delays inside the base station,
  • bufferbloat in transport,
  • and occasional retransmissions at lower layers (depending on your stack).

RTP timestamps (the drone’s media clock) keep increasing steadily regardless of these network effects.

So the correlator gives you a bridge: media timeline from the drone mapped onto PTP-synced edge time.

That bridge is what allows Physical AI components (like perception + control + mapping) to align:

  • “motion at time T” with “video frame content at time T”.

Without this correlation, you get the classic failure mode: the system believes it’s synchronized, but it’s off by the exact amount needed to ruin fine-grained timing.


Step 4: Detecting dropped packets (basic but useful)

Once sequence numbers are “unwrapped” into seqMono, detecting drops becomes straightforward.

In RTP, if seq jumps by more than 1 between packets, something got lost.

I extended the correlator with a drop estimate:

# Add inside the main loop, after mono_seq computed if state.last_seq is not None and state.last_seq != seq: # (In this script, last_seq is already updated in unwrap_seq, # so for clarity you'd track a separate previous mono value.) pass

Rather than complicate the core script, the practical approach I used was:

  • Keep prev_mono_seq local in the loop.
  • When a new packet arrives, compute gap = mono_seq - prev_mono_seq.
  • If gap > 1, count drops.

That logic is tiny, but the real value is that the correlator output becomes self-describing: downstream systems can ignore “corrupted time” windows.


Conclusion

I built a small edge-side RTP-to-PTP timestamp correlator for a drone video pipeline running over a cellular 5G/6G-connected network. The key realization was that “timestamps exist” doesn’t mean “timestamps agree.” By parsing RTP sequence numbers and using a PTP-synchronized wall clock at the edge, I created a stable mapping from the drone’s media timeline to the edge’s real reception time—turning a subtle synchronization bug into a measurable, debuggable signal.