#!/usr/bin/env python3 """ Parse NTRIP stream with HTTP chunked transfer encoding. Handles the chunk headers and extracts clean RTCM messages. """ import sys import argparse from pathlib import Path from typing import Any def parse_chunked_stream(data: bytes) -> tuple[bytes, list[dict]]: """ Parse HTTP chunked transfer encoded stream. Returns: (clean_rtcm_data, chunk_log) """ chunks = [] rtcm_data = bytearray() i = 0 while i < len(data): # Look for chunk size line (hex number followed by \r\n) line_end = data.find(b'\r\n', i) if line_end == -1: # No more complete chunks break chunk_size_line = data[i:line_end].decode('ascii', errors='ignore').strip() # Try to parse as hex chunk size try: # Chunk size may have optional extension after semicolon chunk_size_str = chunk_size_line.split(';')[0].strip() chunk_size = int(chunk_size_str, 16) # Move past the chunk size line chunk_data_start = line_end + 2 # skip \r\n if chunk_size == 0: # End of chunks chunks.append({ 'offset': i, 'size_declared': 0, 'size_actual': 0, 'is_end': True, }) break # Extract chunk data chunk_data_end = chunk_data_start + chunk_size if chunk_data_end + 2 > len(data): # Incomplete chunk break chunk_data = data[chunk_data_start:chunk_data_end] # Verify trailing \r\n trailing = data[chunk_data_end:chunk_data_end + 2] chunks.append({ 'offset': i, 'size_declared': chunk_size, 'size_actual': len(chunk_data), 'chunk_data': chunk_data, 'has_trailing_crlf': trailing == b'\r\n', }) # Append to clean RTCM data rtcm_data.extend(chunk_data) # Move to next chunk i = chunk_data_end + 2 # skip trailing \r\n except (ValueError, UnicodeDecodeError): # Not a valid chunk size, skip byte i += 1 continue return bytes(rtcm_data), chunks def parse_rtcm_messages(data: bytes) -> list[dict]: """Parse RTCM3 messages from clean data.""" messages = [] i = 0 while i < len(data): if data[i] == 0xD3 and i + 2 < len(data): length = ((data[i+1] & 0x03) << 8) | data[i+2] msg_total_len = 3 + length + 3 if i + msg_total_len <= len(data) and length >= 3: payload = data[i+3:i+3+length] msg_type = (payload[0] << 4) | (payload[1] >> 4) station_id = ((payload[1] & 0x0F) << 8) | payload[2] messages.append({ 'offset': i, 'type': msg_type, 'station_id': station_id, 'length': length, 'total_length': msg_total_len, 'payload': payload, }) i += msg_total_len continue i += 1 return messages def get_message_description(msg_type: int) -> str: """Get human-readable description for RTCM message type.""" descriptions = { 1005: "Stationary RTK Reference Station ARP", 1006: "Stationary RTK Reference Station ARP + Antenna Height", 1007: "Antenna Descriptor", 1008: "Antenna Descriptor & Serial Number", 1019: "GPS Ephemerides", 1020: "GLONASS Ephemerides", 1033: "Receiver and Antenna Descriptors", 1074: "GPS MSM4", 1075: "GPS MSM5", 1077: "GPS MSM7", 1084: "GLONASS MSM4", 1085: "GLONASS MSM5", 1087: "GLONASS MSM7", 1094: "Galileo MSM4", 1095: "Galileo MSM5", 1097: "Galileo MSM7", 1124: "BeiDou MSM4", 1125: "BeiDou MSM5", 1127: "BeiDou MSM7", 1230: "GLONASS Code-Phase Biases", } return descriptions.get(msg_type, f"Type {msg_type}") def analyze_file(filename: str, save_clean: bool = False, show_chunks: bool = True, show_messages: bool = True, max_messages: int = 50): """Analyze chunked NTRIP file.""" path = Path(filename) if not path.exists(): print(f"ERROR: File not found: {filename}") return print(f"Analyzing: {filename}") print(f"File size: {path.stat().st_size:,} bytes") print(f"{'=' * 80}\n") # Read file data = path.read_bytes() # Parse chunks print("Parsing HTTP chunked transfer encoding...") rtcm_data, chunks = parse_chunked_stream(data) print(f"Found {len(chunks)} chunks") print(f"Clean RTCM data: {len(rtcm_data):,} bytes") print() # Show chunk details if show_chunks and chunks: print(f"CHUNK DETAILS:") print(f"{'─' * 80}") total_chunk_overhead = 0 for i, chunk in enumerate(chunks[:20]): # Show first 20 chunks if chunk.get('is_end'): print(f"Chunk {i+1}: END (0-byte chunk)") break size = chunk['size_declared'] offset = chunk['offset'] trailing = "✓" if chunk.get('has_trailing_crlf') else "✗" # Calculate overhead (chunk size line + \r\n + trailing \r\n) size_line_len = len(hex(size)[2:]) + 2 # hex digits + \r\n overhead = size_line_len + 2 # + trailing \r\n total_chunk_overhead += overhead print(f"Chunk {i+1:3d}: Offset 0x{offset:08X}, Size {size:5d} bytes, Trailing CRLF {trailing}") if len(chunks) > 20: print(f"... and {len(chunks) - 20} more chunks") print(f"\nTotal chunk overhead: {total_chunk_overhead:,} bytes") print(f"Efficiency: {len(rtcm_data) / len(data) * 100:.1f}% (data vs. total)") print() # Save clean RTCM data if save_clean: clean_filename = path.stem + "_clean.bin" Path(clean_filename).write_bytes(rtcm_data) print(f"✓ Saved clean RTCM data to: {clean_filename}\n") # Parse RTCM messages if show_messages: print(f"RTCM MESSAGES:") print(f"{'─' * 80}") messages = parse_rtcm_messages(rtcm_data) print(f"Found {len(messages)} RTCM messages\n") # Message type summary type_counts = {} station_ids = set() for msg in messages: type_counts[msg['type']] = type_counts.get(msg['type'], 0) + 1 station_ids.add(msg['station_id']) print("Message type summary:") for msg_type in sorted(type_counts.keys()): desc = get_message_description(msg_type) count = type_counts[msg_type] print(f" Type {msg_type:4d}: {count:5d} messages - {desc}") print(f"\nStation IDs: {sorted(station_ids)}") print() # Show individual messages print(f"Individual messages (first {max_messages}):") print(f"{'─' * 80}\n") for i, msg in enumerate(messages[:max_messages]): desc = get_message_description(msg['type']) print(f"Message {i+1}: Type {msg['type']:4d} - {desc}") print(f" Offset: 0x{msg['offset']:08X}, Station: {msg['station_id']}, Length: {msg['length']} bytes") # Check for ASCII content payload = msg['payload'] printable = sum(1 for b in payload if 32 <= b < 127) if printable / len(payload) > 0.3: text = payload.decode('ascii', errors='ignore') text_clean = text.replace('\r', '\\r').replace('\n', '\\n') if len(text_clean) > 100: text_clean = text_clean[:100] + "..." print(f" ASCII: {text_clean}") print() if len(messages) > max_messages: print(f"... and {len(messages) - max_messages} more messages") def main(): parser = argparse.ArgumentParser( description="Parse NTRIP stream with HTTP chunked transfer encoding", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: # Basic analysis python parse_chunked_rtcm.py ntrip_raw_20250605_120000.bin # Save clean RTCM data (without chunk encoding) python parse_chunked_rtcm.py ntrip_raw_20250605_120000.bin --save-clean # Show more messages python parse_chunked_rtcm.py ntrip_raw_20250605_120000.bin --max-messages 100 # Skip chunk details, just show messages python parse_chunked_rtcm.py ntrip_raw_20250605_120000.bin --no-chunks """ ) parser.add_argument('filename', help='Binary file to analyze') parser.add_argument('--save-clean', action='store_true', help='Save clean RTCM data without chunk encoding') parser.add_argument('--no-chunks', action='store_true', help='Skip chunk details') parser.add_argument('--no-messages', action='store_true', help='Skip message details') parser.add_argument('--max-messages', type=int, default=50, help='Maximum messages to show') args = parser.parse_args() analyze_file( args.filename, save_clean=args.save_clean, show_chunks=not args.no_chunks, show_messages=not args.no_messages, max_messages=args.max_messages ) if __name__ == "__main__": main()