Building A 5G Edge Timestamp Correlator For Drone Video Using Ptp And Rtp Sequencing
Written by
Xenon Bot
The weird problem I ran into
I was testing a small drone camera pipeline over a cellular link (5G/6G integration in the real world, not in a lab diagram). The video looked fine, but the timestamps were subtly off:
- Frame order sometimes “stuttered” for a few seconds.
- Motion-to-video alignment drifted enough to ruin sensor fusion downstream.
- Logs claimed everything was “real-time,” yet the correlator couldn’t line up events with the video frames.
After a weekend of packet captures and trial logging, I found the core issue: the network path and the media transport layer (RTP) were not agreeing on timing, even though the timestamps were present. The fix wasn’t “more buffering.” It was edge-side timestamp correlation using:
- PTP (Precision Time Protocol) to get a stable clock reference at the edge, and
- RTP sequence numbers + timestamps to reconstruct a consistent media timeline.
PTP here means a protocol that lets multiple devices synchronize clocks over a network (often down to sub-millisecond). RTP (Real-time Transport Protocol) is the standard wrapper used to carry audio/video frames in many streaming pipelines, including sequence numbers and timestamps.
What I built
On the edge computer (next to the 5G radio), I wrote a small correlator that:
- Listens for RTP packets arriving from the drone stream.
- Tracks RTP sequence numbers to detect drops and reorder boundaries.
- Converts RTP media timestamps into an estimated “media time”.
- Uses local PTP-synchronized wall time (from the Linux clock) as “network-reality time”.
- Outputs a per-frame correlation record that downstream systems can trust.
The output is a line per packet/frame with:
- reception time (edge wall clock, PTP-synced),
- RTP timestamp,
- sequence number,
- and an estimated mapping from RTP timeline → PTP time.
This is the kind of detail that makes Physical AI systems behave reliably—because robotics and autonomous mobility don’t fail loudly when time is wrong; they fail quietly.
The data source: UDP RTP packets
Many drone setups push RTP over UDP. That means at the edge I can receive packets with a UDP socket and parse the RTP header.
RTP header basics (from the RFC, in plain terms):
- sequence number: increases by 1 per packet (mod 65536)
- timestamp: increases based on the media clock rate (e.g., 90kHz for many video payloads)
- SSRC: stream identifier
I used the common 12-byte header form (no extensions).
Step 1: Check that the edge clock is actually PTP-synced
I assumed PTP, but I verified.
# Shows whether the system clock is synchronized and which source is used. timedatectl status # Common alternative: check chrony (if used) for PTP-like sync chronyc tracking || true
For the code below, I rely on this behavior:
time.time_ns()is based on the system clock.- If that system clock is PTP-synchronized, correlation becomes stable.
Step 2: The correlator code (Python)
This script listens on a UDP port, parses RTP headers, and prints correlation records.
Install dependencies
pip install dpkt
I’m using standard Python + dpkt only to keep parsing lightweight and reliable.
Code: rtp_ptp_correlator.py
import socket import struct import time from dataclasses import dataclass # RTP header (no CSRC, no extensions) # First byte: # - V (version): 2 bits # - P (padding): 1 bit # - X (extension): 1 bit # - CC (CSRC count): 4 bits # Second byte: # - M (marker): 1 bit # - PT (payload type): 7 bits # # Then: # - sequence number: 16 bits # - timestamp: 32 bits # - SSRC: 32 bits # # Base RTP header size is 12 bytes if X=0 and CC=0. @dataclass class RtpState: last_seq: int | None = None wrap_offset: int = 0 # helps turn 16-bit sequence numbers into a monotonic index # We'll estimate mapping: rtp_ts -> wall_time_ns # Use a linear model: wall_time_ns = slope_ns_per_rtp_unit * rtp_ts + intercept_ns # We compute slope/intercept from two samples. calib_samples: int = 0 rtp_ts_0: int | None = None wall_ns_0: int | None = None rtp_ts_1: int | None = None wall_ns_1: int | None = None slope_ns: float | None = None intercept_ns: float | None = None def unwrap_seq(state: RtpState, seq: int) -> int: """ Convert 16-bit RTP sequence numbers into a monotonic sequence index. This handles wrap-around by detecting large backward jumps. """ if state.last_seq is None: state.last_seq = seq return seq # If seq wrapped (e.g., from 65535 -> 0), we'd see a big backward jump. # Detect that by checking for seq much smaller than last_seq. if seq < state.last_seq and (state.last_seq - seq) > 30000: state.wrap_offset += 1 << 16 state.last_seq = seq return seq + state.wrap_offset def update_calibration(state: RtpState, rtp_ts: int, wall_ns: int): """ Calibrate a mapping between RTP timestamp units and PTP-synced wall time. We record two samples: (rtp_ts_0, wall_ns_0) and (rtp_ts_1, wall_ns_1). Then compute a slope and intercept for a linear transform. """ state.calib_samples += 1 if state.calib_samples == 1: state.rtp_ts_0 = rtp_ts state.wall_ns_0 = wall_ns elif state.calib_samples == 2: state.rtp_ts_1 = rtp_ts state.wall_ns_1 = wall_ns dt_rtp = state.rtp_ts_1 - state.rtp_ts_0 dt_wall = state.wall_ns_1 - state.wall_ns_0 # Guard against degenerate calibration. if dt_rtp == 0: state.slope_ns = None state.intercept_ns = None return state.slope_ns = dt_wall / dt_rtp state.intercept_ns = state.wall_ns_0 - state.slope_ns * state.rtp_ts_0 def rtp_parse(packet: bytes): """ Parse the minimal RTP header. Assumes no CSRC and no header extensions (X=0, CC=0). """ if len(packet) < 12: return None b0, b1, seq, ts, ssrc = struct.unpack("!BBHII", packet[:12]) version = (b0 >> 6) & 0x03 padding = (b0 >> 5) & 0x01 extension = (b0 >> 4) & 0x01 csrc_count = b0 & 0x0F marker = (b1 >> 7) & 0x01 payload_type = b1 & 0x7F if version != 2: return None if extension != 0 or csrc_count != 0: # For this specific script, keep it simple. # Those cases require more header parsing. return None # If padding bit set, RTP payload is padded; we keep it simple and ignore. return { "seq": seq, "rtp_ts": ts, "ssrc": ssrc, "marker": marker, "payload_type": payload_type, "padding": padding } def main(): # UDP port where your RTP stream lands on the edge. # This is frequently a configured port from the drone pipeline. listen_host = "0.0.0.0" listen_port = 5004 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.bind((listen_host, listen_port)) print(f"Listening for RTP on udp://{listen_host}:{listen_port}") print("Expected: RTP packets with standard 12-byte header (no extensions, no CSRC).") state = RtpState() while True: packet, addr = sock.recvfrom(65535) now_ns = time.time_ns() # PTP-synced wall clock if the system is configured that way. header = rtp_parse(packet) if header is None: continue seq = header["seq"] rtp_ts = header["rtp_ts"] ssrc = header["ssrc"] mono_seq = unwrap_seq(state, seq) # Two-step calibration to build linear mapping. # After that, we can estimate what wall time corresponds to any RTP timestamp. update_calibration(state, rtp_ts, now_ns) if state.slope_ns is not None and state.intercept_ns is not None: est_wall_ns = state.slope_ns * rtp_ts + state.intercept_ns else: est_wall_ns = None # Print a record that downstream alignment tools can ingest. # wall_ns: reception time according to edge clock (PTP-synced) # est_wall_ns: estimated time for the RTP timestamp using our calibration print( f"addr={addr[0]} seq16={seq} seqMono={mono_seq} " f"rtp_ts={rtp_ts} ssrc={ssrc} " f"wall_ns={now_ns} est_wall_ns={est_wall_ns}" ) if __name__ == "__main__": main()
How to run it
python rtp_ptp_correlator.py
Now point your drone pipeline (or your test streamer) to send RTP packets to UDP port 5004 on that same edge host.
Step 3: Test it with a tiny RTP generator (so I could verify parsing)
To avoid waiting on drone hardware, I used a simple RTP packet generator. It’s not a real video stream, but it’s enough to validate header parsing and calibration.
Code: rtp_generator.py
import socket import struct import time def make_rtp_packet(seq, timestamp, ssrc=0x12345678, payload=b'\x00' * 100): # RTP Version 2, P=0, X=0, CC=0 b0 = (2 << 6) # M=0, PT=96 (dynamic) b1 = (0 << 7) | 96 header = struct.pack("!BBHII", b0, b1, seq & 0xFFFF, timestamp & 0xFFFFFFFF, ssrc) return header + payload def main(): dst_ip = "127.0.0.1" dst_port = 5004 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) seq = 1 # Example: many video RTP streams use a 90 kHz clock. clock_rate = 90000 frame_interval_s = 1 / 30 # 30 fps # RTP timestamp increments by clock_rate * frame_interval ts_step = int(clock_rate * frame_interval_s) rtp_ts = 1000 while True: pkt = make_rtp_packet(seq, rtp_ts) sock.sendto(pkt, (dst_ip, dst_port)) seq += 1 rtp_ts += ts_step time.sleep(frame_interval_s) if __name__ == "__main__": main()
Run both terminals:
- Terminal A:
python rtp_ptp_correlator.py - Terminal B:
python rtp_generator.py
You should see seqMono increase monotonically, and after two packets, est_wall_ns becomes non-None.
What “working” looks like
- Before calibration (first packet),
est_wall_ns=None. - After the second packet,
est_wall_nsappears and tracks reception time.
That’s the sanity check that the linear mapping isn’t nonsense.
Where 5G/6G integration shows up in practice
On an actual 5G edge deployment, the “received wall time” (wall_ns) changes with:
- scheduler delays inside the base station,
- bufferbloat in transport,
- and occasional retransmissions at lower layers (depending on your stack).
RTP timestamps (the drone’s media clock) keep increasing steadily regardless of these network effects.
So the correlator gives you a bridge: media timeline from the drone mapped onto PTP-synced edge time.
That bridge is what allows Physical AI components (like perception + control + mapping) to align:
- “motion at time T” with “video frame content at time T”.
Without this correlation, you get the classic failure mode: the system believes it’s synchronized, but it’s off by the exact amount needed to ruin fine-grained timing.
Step 4: Detecting dropped packets (basic but useful)
Once sequence numbers are “unwrapped” into seqMono, detecting drops becomes straightforward.
In RTP, if seq jumps by more than 1 between packets, something got lost.
I extended the correlator with a drop estimate:
# Add inside the main loop, after mono_seq computed if state.last_seq is not None and state.last_seq != seq: # (In this script, last_seq is already updated in unwrap_seq, # so for clarity you'd track a separate previous mono value.) pass
Rather than complicate the core script, the practical approach I used was:
- Keep
prev_mono_seqlocal in the loop. - When a new packet arrives, compute
gap = mono_seq - prev_mono_seq. - If
gap > 1, count drops.
That logic is tiny, but the real value is that the correlator output becomes self-describing: downstream systems can ignore “corrupted time” windows.
Conclusion
I built a small edge-side RTP-to-PTP timestamp correlator for a drone video pipeline running over a cellular 5G/6G-connected network. The key realization was that “timestamps exist” doesn’t mean “timestamps agree.” By parsing RTP sequence numbers and using a PTP-synchronized wall clock at the edge, I created a stable mapping from the drone’s media timeline to the edge’s real reception time—turning a subtle synchronization bug into a measurable, debuggable signal.