#!/usr/bin/env python3 """ Analyze binary file for RTCM3 messages and ASCII data. Searches for RTCM3 message headers (0xD3) and displays message info. Also identifies ASCII text blocks. """ import sys import argparse from pathlib import Path def find_rtcm_messages(data: bytes) -> list[dict]: """Find all RTCM3 messages in binary data.""" messages = [] i = 0 while i < len(data): # RTCM3 messages start with 0xD3 if data[i] == 0xD3 and i + 2 < len(data): # Parse header length = ((data[i+1] & 0x03) << 8) | data[i+2] msg_total_len = 3 + length + 3 # header + payload + CRC if i + msg_total_len <= len(data) and length >= 3: # Extract message type (first 12 bits of payload) msg_type = (data[i+3] << 4) | (data[i+4] >> 4) messages.append({ 'offset': i, 'type': msg_type, 'length': length, 'total_length': msg_total_len, 'header': data[i:i+3], 'payload': data[i+3:i+3+length], 'crc': data[i+3+length:i+3+length+3] if i+3+length+3 <= len(data) else None, }) i += msg_total_len continue i += 1 return messages def find_ascii_blocks(data: bytes, min_length: int = 10) -> list[dict]: """Find blocks of ASCII printable text.""" blocks = [] start = None for i, byte in enumerate(data): is_printable = 32 <= byte < 127 or byte in [9, 10, 13] # Include tab, LF, CR if is_printable: if start is None: start = i else: if start is not None and (i - start) >= min_length: text = data[start:i].decode('ascii', errors='ignore') blocks.append({ 'offset': start, 'length': i - start, 'text': text, }) start = None # Handle final block if start is not None and (len(data) - start) >= min_length: text = data[start:].decode('ascii', errors='ignore') blocks.append({ 'offset': start, 'length': len(data) - start, 'text': text, }) return blocks def hex_dump(data: bytes, offset: int = 0, max_bytes: int = 64) -> str: """Create a hex dump of data.""" lines = [] data = data[:max_bytes] for i in range(0, len(data), 16): chunk = data[i:i+16] hex_part = " ".join(f"{b:02X}" for b in chunk) ascii_part = "".join(chr(b) if 32 <= b < 127 else "." for b in chunk) lines.append(f"{offset + i:08X} {hex_part:<48} {ascii_part}") return "\n".join(lines) def get_message_description(msg_type: int) -> str: """Get human-readable description for RTCM message type.""" descriptions = { 1005: "Stationary RTK Reference Station ARP", 1006: "Stationary RTK Reference Station ARP + Antenna Height", 1007: "Antenna Descriptor", 1008: "Antenna Descriptor & Serial Number", 1019: "GPS Ephemerides", 1020: "GLONASS Ephemerides", 1033: "Receiver and Antenna Descriptors", 1074: "GPS MSM4", 1075: "GPS MSM5", 1077: "GPS MSM7", 1084: "GLONASS MSM4", 1085: "GLONASS MSM5", 1087: "GLONASS MSM7", 1094: "Galileo MSM4", 1095: "Galileo MSM5", 1097: "Galileo MSM7", 1124: "BeiDou MSM4", 1125: "BeiDou MSM5", 1127: "BeiDou MSM7", 1230: "GLONASS Code-Phase Biases", } return descriptions.get(msg_type, f"Type {msg_type}") def analyze_file(filename: str, show_ascii: bool = True, show_messages: bool = True, show_hex: bool = False, filter_type: int = None, max_messages: int = None): """Analyze binary file for RTCM messages and ASCII data.""" path = Path(filename) if not path.exists(): print(f"ERROR: File not found: {filename}") return print(f"Analyzing: {filename}") print(f"File size: {path.stat().st_size:,} bytes") print(f"{'=' * 80}\n") # Read file data = path.read_bytes() # Find ASCII blocks if show_ascii: print(f"ASCII TEXT BLOCKS (10+ chars):") print(f"{'─' * 80}") ascii_blocks = find_ascii_blocks(data, min_length=10) if ascii_blocks: for block in ascii_blocks[:20]: # Show first 20 blocks print(f"Offset: 0x{block['offset']:08X} ({block['offset']}) - Length: {block['length']} bytes") # Show text with visible whitespace text = block['text'].replace('\r', '\\r').replace('\n', '\\n').replace('\t', '\\t') if len(text) > 200: text = text[:200] + "..." print(f" Text: {text}") print() if len(ascii_blocks) > 20: print(f"... and {len(ascii_blocks) - 20} more ASCII blocks\n") else: print(" (none found)\n") print() # Find RTCM messages if show_messages: print(f"RTCM3 MESSAGES:") print(f"{'─' * 80}") messages = find_rtcm_messages(data) if messages: # Group by type type_counts = {} for msg in messages: msg_type = msg['type'] type_counts[msg_type] = type_counts.get(msg_type, 0) + 1 print(f"Found {len(messages)} RTCM3 messages\n") print(f"Message type summary:") for msg_type in sorted(type_counts.keys()): desc = get_message_description(msg_type) print(f" Type {msg_type:4d}: {type_counts[msg_type]:4d} messages - {desc}") print() # Show individual messages print(f"Individual messages:") print(f"{'─' * 80}") shown_count = 0 for i, msg in enumerate(messages): msg_type = msg['type'] # Apply filter if specified if filter_type is not None and msg_type != filter_type: continue # Apply max messages limit if max_messages is not None and shown_count >= max_messages: remaining = sum(1 for m in messages[i:] if filter_type is None or m['type'] == filter_type) if remaining > 0: print(f"\n... and {remaining} more messages (use --max-messages to show more)") break shown_count += 1 desc = get_message_description(msg_type) print(f"\nMessage #{i+1}: Type {msg_type:4d} - {desc}") print(f" Offset: 0x{msg['offset']:08X} ({msg['offset']})") print(f" Payload length: {msg['length']} bytes") print(f" Total length: {msg['total_length']} bytes") # Show hex dump of payload if show_hex: print(f" Payload hex (first 64 bytes):") hex_lines = hex_dump(msg['payload'], msg['offset'] + 3, max_bytes=64) for line in hex_lines.split('\n'): print(f" {line}") # Check for ASCII in payload payload = msg['payload'] printable_count = sum(1 for b in payload if 32 <= b < 127) printable_percent = (printable_count / len(payload) * 100) if len(payload) > 0 else 0 if printable_percent > 50: print(f" Payload is {printable_percent:.0f}% ASCII printable") try: text = payload.decode('ascii', errors='ignore') text_display = text.replace('\r', '\\r').replace('\n', '\\n').replace('\t', '\\t') if len(text_display) > 200: text_display = text_display[:200] + "..." print(f" ASCII: {text_display}") except: pass # Special handling for specific message types if msg_type == 208: print(f" ★ MESSAGE 208 - Proprietary/Custom") else: print(" (no RTCM3 messages found)\n") def main(): parser = argparse.ArgumentParser( description="Analyze binary file for RTCM3 messages and ASCII data", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Basic analysis python analyze_rtcm_binary.py ntrip_raw_20250605_120000.bin # Show hex dumps python analyze_rtcm_binary.py ntrip_raw_20250605_120000.bin --hex # Only show message type 208 python analyze_rtcm_binary.py ntrip_raw_20250605_120000.bin --type 208 # Show only first 10 messages python analyze_rtcm_binary.py ntrip_raw_20250605_120000.bin --max-messages 10 # Skip ASCII blocks, show messages only python analyze_rtcm_binary.py ntrip_raw_20250605_120000.bin --no-ascii """ ) parser.add_argument('filename', help='Binary file to analyze') parser.add_argument('--no-ascii', action='store_true', help='Do not show ASCII blocks') parser.add_argument('--no-messages', action='store_true', help='Do not show RTCM messages') parser.add_argument('--hex', action='store_true', help='Show hex dumps of message payloads') parser.add_argument('--type', type=int, help='Filter to show only specific message type') parser.add_argument('--max-messages', type=int, help='Maximum number of messages to display') args = parser.parse_args() analyze_file( args.filename, show_ascii=not args.no_ascii, show_messages=not args.no_messages, show_hex=args.hex, filter_type=args.type, max_messages=args.max_messages ) if __name__ == "__main__": main()