#!/usr/bin/env python3 """ Scan a binary stream for RTCM v3 messages. RTCM v3 frames are: 0xD3 | 6 reserved bits + 10-bit payload length | payload | CRC-24Q This script does not depend on any local project modules. It searches byte-by-byte for valid frames, verifies the CRC, prints what it finds, and can optionally write each full RTCM frame to disk. """ from __future__ import annotations import argparse import csv import json import sys from dataclasses import dataclass from pathlib import Path from typing import BinaryIO, Iterable PREAMBLE = 0xD3 MAX_RTCM_PAYLOAD_LENGTH = 1023 CRC24Q_POLY = 0x1864CFB CRC24Q_MASK = 0xFFFFFF @dataclass(frozen=True) class RtcmMessage: index: int offset: int message_type: int | None payload_length: int frame_length: int crc: int frame: bytes @property def payload(self) -> bytes: return self.frame[3:-3] @property def payload_hex_preview(self) -> str: preview = self.payload[:24].hex(" ") if len(self.payload) > 24: return f"{preview} ..." return preview @dataclass class ScanStats: bytes_read: int = 0 bytes_skipped: int = 0 invalid_headers: int = 0 crc_failures: int = 0 incomplete_tail_offset: int | None = None @dataclass(frozen=True) class DechunkResult: data: bytes chunks: int start_offset: int consumed_bytes: int class ChunkedDecodeError(ValueError): pass def crc24q(data: bytes) -> int: """Return the RTCM CRC-24Q value for data.""" crc = 0 for byte in data: crc ^= byte << 16 for _ in range(8): crc <<= 1 if crc & 0x1000000: crc ^= CRC24Q_POLY return crc & CRC24Q_MASK def find_chunked_body_start(data: bytes) -> int: """Return likely HTTP chunked body start offset.""" header_end = data.find(b"\r\n\r\n") if header_end == -1: return 0 headers = data[:header_end].decode("iso-8859-1", errors="ignore").lower() if "transfer-encoding:" in headers and "chunked" in headers: return header_end + 4 return 0 def parse_chunk_size(line: bytes) -> int: size_text = line.split(b";", 1)[0].strip() if not size_text: raise ChunkedDecodeError("empty chunk size") try: return int(size_text, 16) except ValueError as exc: raise ChunkedDecodeError(f"invalid chunk size: {line!r}") from exc def dechunk_http_body(data: bytes, start_offset: int = 0) -> DechunkResult: """Decode an HTTP chunked body from data[start_offset:].""" pos = start_offset decoded = bytearray() chunks = 0 while True: line_end = data.find(b"\r\n", pos) if line_end == -1: raise ChunkedDecodeError("missing chunk-size CRLF") size = parse_chunk_size(data[pos:line_end]) pos = line_end + 2 if size == 0: trailer_end = data.find(b"\r\n\r\n", pos) if trailer_end == -1: final_end = data.find(b"\r\n", pos) consumed = len(data) if final_end == -1 else final_end + 2 else: consumed = trailer_end + 4 return DechunkResult(bytes(decoded), chunks, start_offset, consumed) chunk_end = pos + size if chunk_end + 2 > len(data): raise ChunkedDecodeError("chunk extends beyond input") if data[chunk_end : chunk_end + 2] != b"\r\n": raise ChunkedDecodeError("missing CRLF after chunk data") decoded.extend(data[pos:chunk_end]) chunks += 1 pos = chunk_end + 2 if pos == len(data): return DechunkResult(bytes(decoded), chunks, start_offset, pos) def prepare_input_stream(data: bytes, mode: str) -> tuple[bytes, DechunkResult | None]: if mode == "raw": return data, None start_offset = find_chunked_body_start(data) try: dechunked = dechunk_http_body(data, start_offset) except ChunkedDecodeError: if mode == "chunked": raise return data, None if mode == "chunked": return dechunked.data, dechunked raw_messages, _ = scan_rtcm_frames(data) dechunked_messages, _ = scan_rtcm_frames(dechunked.data) if len(dechunked_messages) > len(raw_messages): return dechunked.data, dechunked return data, None def rtcm_message_type(payload: bytes) -> int | None: """Extract the 12-bit RTCM message number from a payload.""" if len(payload) < 2: return None return (payload[0] << 4) | (payload[1] >> 4) def scan_rtcm_frames(data: bytes) -> tuple[list[RtcmMessage], ScanStats]: """Find valid RTCM v3 frames in data.""" stats = ScanStats(bytes_read=len(data)) messages: list[RtcmMessage] = [] pos = 0 while pos < len(data): if data[pos] != PREAMBLE: stats.bytes_skipped += 1 pos += 1 continue if pos + 3 > len(data): stats.incomplete_tail_offset = pos break second = data[pos + 1] if second & 0xFC: stats.invalid_headers += 1 stats.bytes_skipped += 1 pos += 1 continue payload_length = ((second & 0x03) << 8) | data[pos + 2] if payload_length > MAX_RTCM_PAYLOAD_LENGTH: stats.invalid_headers += 1 stats.bytes_skipped += 1 pos += 1 continue frame_length = 3 + payload_length + 3 end = pos + frame_length if end > len(data): stats.incomplete_tail_offset = pos break frame = data[pos:end] expected_crc = int.from_bytes(frame[-3:], "big") actual_crc = crc24q(frame[:-3]) if actual_crc != expected_crc: stats.crc_failures += 1 stats.bytes_skipped += 1 pos += 1 continue payload = frame[3:-3] messages.append( RtcmMessage( index=len(messages) + 1, offset=pos, message_type=rtcm_message_type(payload), payload_length=payload_length, frame_length=frame_length, crc=expected_crc, frame=frame, ) ) pos = end return messages, stats def write_frames(messages: Iterable[RtcmMessage], out_dir: Path) -> None: out_dir.mkdir(parents=True, exist_ok=True) for msg in messages: msg_type = "unknown" if msg.message_type is None else str(msg.message_type) path = out_dir / f"rtcm_{msg.index:05d}_type_{msg_type}_offset_{msg.offset}.bin" path.write_bytes(msg.frame) def write_csv(messages: Iterable[RtcmMessage], path: Path) -> None: with path.open("w", newline="", encoding="utf-8") as fp: writer = csv.DictWriter( fp, fieldnames=[ "index", "offset", "message_type", "payload_length", "frame_length", "crc_hex", "payload_hex_preview", ], ) writer.writeheader() for msg in messages: writer.writerow( { "index": msg.index, "offset": msg.offset, "message_type": msg.message_type, "payload_length": msg.payload_length, "frame_length": msg.frame_length, "crc_hex": f"{msg.crc:06X}", "payload_hex_preview": msg.payload_hex_preview, } ) def write_jsonl(messages: Iterable[RtcmMessage], path: Path) -> None: with path.open("w", encoding="utf-8") as fp: for msg in messages: fp.write( json.dumps( { "index": msg.index, "offset": msg.offset, "message_type": msg.message_type, "payload_length": msg.payload_length, "frame_length": msg.frame_length, "crc_hex": f"{msg.crc:06X}", "payload_hex": msg.payload.hex(), "frame_hex": msg.frame.hex(), }, separators=(",", ":"), ) + "\n" ) def print_messages( messages: list[RtcmMessage], stats: ScanStats, show_hex: bool, debug_1005: bool, ) -> None: for msg in messages: msg_type = "unknown" if msg.message_type is None else str(msg.message_type) line = ( f"#{msg.index:05d} offset={msg.offset:<10} " f"type={msg_type:<5} payload={msg.payload_length:<4} " f"frame={msg.frame_length:<4} crc=0x{msg.crc:06X}" ) if show_hex: line += f" payload={msg.payload_hex_preview}" print(line) if debug_1005 and msg.message_type == 1005: print(f" debug1005 frame_hex={msg.frame.hex(' ')}") print() print(f"Valid RTCM messages: {len(messages)}") print(f"Bytes read: {stats.bytes_read}") print(f"Bytes skipped while searching: {stats.bytes_skipped}") print(f"Invalid RTCM-like headers: {stats.invalid_headers}") print(f"CRC failures: {stats.crc_failures}") if stats.incomplete_tail_offset is not None: print(f"Incomplete trailing candidate at offset: {stats.incomplete_tail_offset}") def read_input(path: Path | None, stdin: BinaryIO) -> bytes: if path is None: return stdin.read() return path.read_bytes() def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Parse RTCM v3 messages from a .bin file or stdin." ) parser.add_argument( "input", nargs="?", type=Path, help="Binary file to scan. If omitted, reads from stdin.", ) parser.add_argument( "--out-dir", type=Path, help="Directory where each valid full RTCM frame will be written as a .bin file.", ) parser.add_argument( "--csv", type=Path, help="Write a CSV index of parsed messages.", ) parser.add_argument( "--jsonl", type=Path, help="Write JSON Lines with message metadata and hex payload/frame content.", ) parser.add_argument( "--mode", choices=["auto", "raw", "chunked"], default="auto", help=( "How to read the input: auto detects HTTP chunked transfer encoding, " "raw scans bytes exactly as stored, chunked forces HTTP dechunking." ), ) parser.add_argument( "--write-stream", type=Path, help="Write the reconstructed byte stream that is scanned for RTCM frames.", ) parser.add_argument( "--hex", action="store_true", help="Show a short payload hex preview in console output.", ) parser.add_argument( "--debug-1005", action="store_true", help="Print the full RTCM 1005 frame bytes as hex in the console output.", ) return parser.parse_args() def main() -> int: args = parse_args() data = read_input(args.input, sys.stdin.buffer) try: stream, dechunked = prepare_input_stream(data, args.mode) except ChunkedDecodeError as exc: print(f"Could not decode chunked input: {exc}", file=sys.stderr) return 2 if dechunked: print( f"Decoded HTTP chunked transfer stream: " f"{dechunked.chunks} chunks, {len(data)} input bytes -> {len(stream)} data bytes" ) print() if args.write_stream: args.write_stream.write_bytes(stream) print(f"Wrote scanned byte stream to {args.write_stream}") print() messages, stats = scan_rtcm_frames(stream) print_messages(messages, stats, args.hex, args.debug_1005) if args.out_dir: write_frames(messages, args.out_dir) print(f"Wrote {len(messages)} frame file(s) to {args.out_dir}") if args.csv: write_csv(messages, args.csv) print(f"Wrote CSV index to {args.csv}") if args.jsonl: write_jsonl(messages, args.jsonl) print(f"Wrote JSONL details to {args.jsonl}") return 0 if messages else 1 if __name__ == "__main__": raise SystemExit(main())