Files
maglink-console/parse_rtcm_messages.py
brentperteet 5703c05c1d Initial commit
2026-06-24 11:12:44 -05:00

421 lines
12 KiB
Python

#!/usr/bin/env python3
"""
Scan a binary stream for RTCM v3 messages.
RTCM v3 frames are:
0xD3 | 6 reserved bits + 10-bit payload length | payload | CRC-24Q
This script does not depend on any local project modules. It searches byte-by-byte
for valid frames, verifies the CRC, prints what it finds, and can optionally write
each full RTCM frame to disk.
"""
from __future__ import annotations
import argparse
import csv
import json
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import BinaryIO, Iterable
PREAMBLE = 0xD3
MAX_RTCM_PAYLOAD_LENGTH = 1023
CRC24Q_POLY = 0x1864CFB
CRC24Q_MASK = 0xFFFFFF
@dataclass(frozen=True)
class RtcmMessage:
index: int
offset: int
message_type: int | None
payload_length: int
frame_length: int
crc: int
frame: bytes
@property
def payload(self) -> bytes:
return self.frame[3:-3]
@property
def payload_hex_preview(self) -> str:
preview = self.payload[:24].hex(" ")
if len(self.payload) > 24:
return f"{preview} ..."
return preview
@dataclass
class ScanStats:
bytes_read: int = 0
bytes_skipped: int = 0
invalid_headers: int = 0
crc_failures: int = 0
incomplete_tail_offset: int | None = None
@dataclass(frozen=True)
class DechunkResult:
data: bytes
chunks: int
start_offset: int
consumed_bytes: int
class ChunkedDecodeError(ValueError):
pass
def crc24q(data: bytes) -> int:
"""Return the RTCM CRC-24Q value for data."""
crc = 0
for byte in data:
crc ^= byte << 16
for _ in range(8):
crc <<= 1
if crc & 0x1000000:
crc ^= CRC24Q_POLY
return crc & CRC24Q_MASK
def find_chunked_body_start(data: bytes) -> int:
"""Return likely HTTP chunked body start offset."""
header_end = data.find(b"\r\n\r\n")
if header_end == -1:
return 0
headers = data[:header_end].decode("iso-8859-1", errors="ignore").lower()
if "transfer-encoding:" in headers and "chunked" in headers:
return header_end + 4
return 0
def parse_chunk_size(line: bytes) -> int:
size_text = line.split(b";", 1)[0].strip()
if not size_text:
raise ChunkedDecodeError("empty chunk size")
try:
return int(size_text, 16)
except ValueError as exc:
raise ChunkedDecodeError(f"invalid chunk size: {line!r}") from exc
def dechunk_http_body(data: bytes, start_offset: int = 0) -> DechunkResult:
"""Decode an HTTP chunked body from data[start_offset:]."""
pos = start_offset
decoded = bytearray()
chunks = 0
while True:
line_end = data.find(b"\r\n", pos)
if line_end == -1:
raise ChunkedDecodeError("missing chunk-size CRLF")
size = parse_chunk_size(data[pos:line_end])
pos = line_end + 2
if size == 0:
trailer_end = data.find(b"\r\n\r\n", pos)
if trailer_end == -1:
final_end = data.find(b"\r\n", pos)
consumed = len(data) if final_end == -1 else final_end + 2
else:
consumed = trailer_end + 4
return DechunkResult(bytes(decoded), chunks, start_offset, consumed)
chunk_end = pos + size
if chunk_end + 2 > len(data):
raise ChunkedDecodeError("chunk extends beyond input")
if data[chunk_end : chunk_end + 2] != b"\r\n":
raise ChunkedDecodeError("missing CRLF after chunk data")
decoded.extend(data[pos:chunk_end])
chunks += 1
pos = chunk_end + 2
if pos == len(data):
return DechunkResult(bytes(decoded), chunks, start_offset, pos)
def prepare_input_stream(data: bytes, mode: str) -> tuple[bytes, DechunkResult | None]:
if mode == "raw":
return data, None
start_offset = find_chunked_body_start(data)
try:
dechunked = dechunk_http_body(data, start_offset)
except ChunkedDecodeError:
if mode == "chunked":
raise
return data, None
if mode == "chunked":
return dechunked.data, dechunked
raw_messages, _ = scan_rtcm_frames(data)
dechunked_messages, _ = scan_rtcm_frames(dechunked.data)
if len(dechunked_messages) > len(raw_messages):
return dechunked.data, dechunked
return data, None
def rtcm_message_type(payload: bytes) -> int | None:
"""Extract the 12-bit RTCM message number from a payload."""
if len(payload) < 2:
return None
return (payload[0] << 4) | (payload[1] >> 4)
def scan_rtcm_frames(data: bytes) -> tuple[list[RtcmMessage], ScanStats]:
"""Find valid RTCM v3 frames in data."""
stats = ScanStats(bytes_read=len(data))
messages: list[RtcmMessage] = []
pos = 0
while pos < len(data):
if data[pos] != PREAMBLE:
stats.bytes_skipped += 1
pos += 1
continue
if pos + 3 > len(data):
stats.incomplete_tail_offset = pos
break
second = data[pos + 1]
if second & 0xFC:
stats.invalid_headers += 1
stats.bytes_skipped += 1
pos += 1
continue
payload_length = ((second & 0x03) << 8) | data[pos + 2]
if payload_length > MAX_RTCM_PAYLOAD_LENGTH:
stats.invalid_headers += 1
stats.bytes_skipped += 1
pos += 1
continue
frame_length = 3 + payload_length + 3
end = pos + frame_length
if end > len(data):
stats.incomplete_tail_offset = pos
break
frame = data[pos:end]
expected_crc = int.from_bytes(frame[-3:], "big")
actual_crc = crc24q(frame[:-3])
if actual_crc != expected_crc:
stats.crc_failures += 1
stats.bytes_skipped += 1
pos += 1
continue
payload = frame[3:-3]
messages.append(
RtcmMessage(
index=len(messages) + 1,
offset=pos,
message_type=rtcm_message_type(payload),
payload_length=payload_length,
frame_length=frame_length,
crc=expected_crc,
frame=frame,
)
)
pos = end
return messages, stats
def write_frames(messages: Iterable[RtcmMessage], out_dir: Path) -> None:
out_dir.mkdir(parents=True, exist_ok=True)
for msg in messages:
msg_type = "unknown" if msg.message_type is None else str(msg.message_type)
path = out_dir / f"rtcm_{msg.index:05d}_type_{msg_type}_offset_{msg.offset}.bin"
path.write_bytes(msg.frame)
def write_csv(messages: Iterable[RtcmMessage], path: Path) -> None:
with path.open("w", newline="", encoding="utf-8") as fp:
writer = csv.DictWriter(
fp,
fieldnames=[
"index",
"offset",
"message_type",
"payload_length",
"frame_length",
"crc_hex",
"payload_hex_preview",
],
)
writer.writeheader()
for msg in messages:
writer.writerow(
{
"index": msg.index,
"offset": msg.offset,
"message_type": msg.message_type,
"payload_length": msg.payload_length,
"frame_length": msg.frame_length,
"crc_hex": f"{msg.crc:06X}",
"payload_hex_preview": msg.payload_hex_preview,
}
)
def write_jsonl(messages: Iterable[RtcmMessage], path: Path) -> None:
with path.open("w", encoding="utf-8") as fp:
for msg in messages:
fp.write(
json.dumps(
{
"index": msg.index,
"offset": msg.offset,
"message_type": msg.message_type,
"payload_length": msg.payload_length,
"frame_length": msg.frame_length,
"crc_hex": f"{msg.crc:06X}",
"payload_hex": msg.payload.hex(),
"frame_hex": msg.frame.hex(),
},
separators=(",", ":"),
)
+ "\n"
)
def print_messages(
messages: list[RtcmMessage],
stats: ScanStats,
show_hex: bool,
debug_1005: bool,
) -> None:
for msg in messages:
msg_type = "unknown" if msg.message_type is None else str(msg.message_type)
line = (
f"#{msg.index:05d} offset={msg.offset:<10} "
f"type={msg_type:<5} payload={msg.payload_length:<4} "
f"frame={msg.frame_length:<4} crc=0x{msg.crc:06X}"
)
if show_hex:
line += f" payload={msg.payload_hex_preview}"
print(line)
if debug_1005 and msg.message_type == 1005:
print(f" debug1005 frame_hex={msg.frame.hex(' ')}")
print()
print(f"Valid RTCM messages: {len(messages)}")
print(f"Bytes read: {stats.bytes_read}")
print(f"Bytes skipped while searching: {stats.bytes_skipped}")
print(f"Invalid RTCM-like headers: {stats.invalid_headers}")
print(f"CRC failures: {stats.crc_failures}")
if stats.incomplete_tail_offset is not None:
print(f"Incomplete trailing candidate at offset: {stats.incomplete_tail_offset}")
def read_input(path: Path | None, stdin: BinaryIO) -> bytes:
if path is None:
return stdin.read()
return path.read_bytes()
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Parse RTCM v3 messages from a .bin file or stdin."
)
parser.add_argument(
"input",
nargs="?",
type=Path,
help="Binary file to scan. If omitted, reads from stdin.",
)
parser.add_argument(
"--out-dir",
type=Path,
help="Directory where each valid full RTCM frame will be written as a .bin file.",
)
parser.add_argument(
"--csv",
type=Path,
help="Write a CSV index of parsed messages.",
)
parser.add_argument(
"--jsonl",
type=Path,
help="Write JSON Lines with message metadata and hex payload/frame content.",
)
parser.add_argument(
"--mode",
choices=["auto", "raw", "chunked"],
default="auto",
help=(
"How to read the input: auto detects HTTP chunked transfer encoding, "
"raw scans bytes exactly as stored, chunked forces HTTP dechunking."
),
)
parser.add_argument(
"--write-stream",
type=Path,
help="Write the reconstructed byte stream that is scanned for RTCM frames.",
)
parser.add_argument(
"--hex",
action="store_true",
help="Show a short payload hex preview in console output.",
)
parser.add_argument(
"--debug-1005",
action="store_true",
help="Print the full RTCM 1005 frame bytes as hex in the console output.",
)
return parser.parse_args()
def main() -> int:
args = parse_args()
data = read_input(args.input, sys.stdin.buffer)
try:
stream, dechunked = prepare_input_stream(data, args.mode)
except ChunkedDecodeError as exc:
print(f"Could not decode chunked input: {exc}", file=sys.stderr)
return 2
if dechunked:
print(
f"Decoded HTTP chunked transfer stream: "
f"{dechunked.chunks} chunks, {len(data)} input bytes -> {len(stream)} data bytes"
)
print()
if args.write_stream:
args.write_stream.write_bytes(stream)
print(f"Wrote scanned byte stream to {args.write_stream}")
print()
messages, stats = scan_rtcm_frames(stream)
print_messages(messages, stats, args.hex, args.debug_1005)
if args.out_dir:
write_frames(messages, args.out_dir)
print(f"Wrote {len(messages)} frame file(s) to {args.out_dir}")
if args.csv:
write_csv(messages, args.csv)
print(f"Wrote CSV index to {args.csv}")
if args.jsonl:
write_jsonl(messages, args.jsonl)
print(f"Wrote JSONL details to {args.jsonl}")
return 0 if messages else 1
if __name__ == "__main__":
raise SystemExit(main())