""" RTCM3 message parser for extracting base station information and message details. """ import struct from typing import Any class RTCMParser: """Parse RTCM3 messages to extract base station position and other details.""" def __init__(self): self.base_station_position: dict[str, Any] | None = None self.message_stats: dict[int, int] = {} self.total_bytes = 0 self.message_count = 0 self.chunk_buffer = b"" # Buffer for incomplete chunks def parse_chunked_data(self, data: bytes) -> bytes: """ Parse HTTP chunked transfer encoding and return clean RTCM data. Handles incomplete chunks across multiple calls. """ self.chunk_buffer += data clean_data = bytearray() while True: # Look for chunk size line (hex number followed by \r\n) line_end = self.chunk_buffer.find(b'\r\n') if line_end == -1: # No complete chunk size line yet break chunk_size_line = self.chunk_buffer[:line_end] try: # Parse hex chunk size (may have extensions after semicolon) chunk_size_str = chunk_size_line.decode('ascii', errors='ignore').split(';')[0].strip() chunk_size = int(chunk_size_str, 16) if chunk_size == 0: # End of chunks self.chunk_buffer = self.chunk_buffer[line_end + 2:] break # Check if we have the complete chunk data chunk_data_start = line_end + 2 chunk_data_end = chunk_data_start + chunk_size if chunk_data_end + 2 > len(self.chunk_buffer): # Don't have complete chunk yet, wait for more data break # Extract chunk data chunk_data = self.chunk_buffer[chunk_data_start:chunk_data_end] clean_data.extend(chunk_data) # Move past chunk data and trailing \r\n self.chunk_buffer = self.chunk_buffer[chunk_data_end + 2:] except (ValueError, UnicodeDecodeError): # Not a valid chunk, might be plain RTCM data # Just return what we have and let RTCM parser handle it clean_data.extend(self.chunk_buffer) self.chunk_buffer = b"" break return bytes(clean_data) def parse_messages(self, data: bytes, is_chunked: bool = True) -> list[dict[str, Any]]: """ Parse RTCM3 messages from data stream and return message details. Args: data: Raw data from NTRIP stream is_chunked: If True, parse HTTP chunked encoding first """ # Handle chunked encoding if needed if is_chunked: data = self.parse_chunked_data(data) messages = [] i = 0 while i < len(data): # RTCM3 messages start with 0xD3 if data[i] == 0xD3 and i + 2 < len(data): # Parse header: 0xD3 + 2 bytes (6 bits reserved + 10 bits length) length = ((data[i+1] & 0x03) << 8) | data[i+2] msg_total_len = 3 + length + 3 # header + payload + CRC if i + msg_total_len <= len(data) and length >= 3: # Extract message type (first 12 bits of payload) msg_type = (data[i+3] << 4) | (data[i+4] >> 4) # Extract full message msg_data = data[i+3:i+3+length] self.message_count += 1 self.total_bytes += msg_total_len self.message_stats[msg_type] = self.message_stats.get(msg_type, 0) + 1 # Parse specific message types msg_info: dict[str, Any] = { "type": msg_type, "length": length, "total_length": msg_total_len, "index": self.message_count, "raw_hex": msg_data[:min(50, len(msg_data))].hex(), # First 50 bytes for debug } # RTCM 1005: Stationary RTK reference station ARP if msg_type == 1005: pos = self._parse_1005(msg_data) if pos: msg_info["base_position"] = pos self.base_station_position = pos # RTCM 1006: Stationary RTK reference station ARP with antenna height elif msg_type == 1006: pos = self._parse_1006(msg_data) if pos: msg_info["base_position"] = pos self.base_station_position = pos # RTCM 1033: Receiver and antenna descriptors elif msg_type == 1033: desc = self._parse_1033(msg_data) if desc: msg_info["descriptors"] = desc # Add common observation message types elif msg_type in [1074, 1084, 1094, 1124]: # GPS, GLONASS, Galileo, BeiDou MSM4 msg_info["description"] = self._get_message_description(msg_type) elif msg_type in [1075, 1085, 1095, 1125]: # GPS, GLONASS, Galileo, BeiDou MSM5 msg_info["description"] = self._get_message_description(msg_type) elif msg_type in [1077, 1087, 1097, 1127]: # GPS, GLONASS, Galileo, BeiDou MSM7 msg_info["description"] = self._get_message_description(msg_type) else: msg_info["description"] = self._get_message_description(msg_type) messages.append(msg_info) i += msg_total_len continue i += 1 return messages def _parse_1005(self, data: bytes) -> dict[str, Any] | None: """Parse RTCM 1005 message (Stationary RTK reference station ARP).""" try: if len(data) < 19: return None # Convert payload to bit array for easier bit-level access bit_array = [] for byte in data: for i in range(7, -1, -1): bit_array.append((byte >> i) & 1) def get_bits(start, length): """Extract unsigned value from bit array.""" value = 0 for i in range(length): value = (value << 1) | bit_array[start + i] return value def get_signed_bits(start, length): """Extract signed value from bit array (two's complement).""" value = get_bits(start, length) # Check sign bit if value & (1 << (length - 1)): # Negative number - convert from two's complement value -= (1 << length) return value pos = 0 # DF002: Message Number (12 bits) - skip, already know it's 1005 pos += 12 # DF003: Reference Station ID (12 bits) station_id = get_bits(pos, 12) pos += 12 # DF021: ITRF Realization Year (6 bits) itrf_year = get_bits(pos, 6) pos += 6 # DF022: GPS Indicator (1 bit) pos += 1 # DF023: GLONASS Indicator (1 bit) pos += 1 # DF024: Reserved for Galileo (1 bit) pos += 1 # DF141: Reference-Station Indicator (1 bit) pos += 1 # DF025: Antenna Reference Point ECEF-X (38 bits, signed, 0.0001m LSB) ecef_x_raw = get_signed_bits(pos, 38) ecef_x = ecef_x_raw * 0.0001 pos += 38 # DF142: Single Receiver Oscillator Indicator (1 bit) pos += 1 # Reserved (1 bit) pos += 1 # DF026: Antenna Reference Point ECEF-Y (38 bits, signed, 0.0001m LSB) ecef_y_raw = get_signed_bits(pos, 38) ecef_y = ecef_y_raw * 0.0001 pos += 38 # DF364: Quarter Cycle Indicator (2 bits) pos += 2 # DF027: Antenna Reference Point ECEF-Z (38 bits, signed, 0.0001m LSB) ecef_z_raw = get_signed_bits(pos, 38) ecef_z = ecef_z_raw * 0.0001 pos += 38 # Convert ECEF to LLA lat, lon, alt = self._ecef_to_lla(ecef_x, ecef_y, ecef_z) return { "station_id": station_id, "itrf_year": itrf_year + 1980 if itrf_year else None, "ecef_x": ecef_x, "ecef_y": ecef_y, "ecef_z": ecef_z, "latitude": lat, "longitude": lon, "altitude_m": alt, } except Exception as e: import traceback traceback.print_exc() return None def _parse_1006(self, data: bytes) -> dict[str, Any] | None: """Parse RTCM 1006 message (Stationary RTK reference station ARP with antenna height).""" # First parse the 1005 portion result = self._parse_1005(data) if result: try: # RTCM 1006 has antenna height after the 1005 data # The antenna height starts at bit position 140 (after 1005 fields) # Antenna height is 16 bits unsigned bits = int.from_bytes(data, byteorder='big') bit_pos = 140 # After all 1005 fields antenna_height_raw = (bits >> (len(data) * 8 - bit_pos - 16)) & 0xFFFF result["antenna_height_m"] = antenna_height_raw * 0.0001 # 0.1mm resolution except: pass return result def _parse_1033(self, data: bytes) -> dict[str, Any] | None: """Parse RTCM 1033 message (Receiver and antenna descriptors).""" try: if len(data) < 6: return None # Station ID (12 bits) station_id = ((data[1] & 0x0F) << 8) | data[2] # This is a complex variable-length message with text strings # For simplicity, just return the station ID return { "station_id": station_id, } except Exception: return None def _ecef_to_lla(self, x: float, y: float, z: float) -> tuple[float, float, float]: """Convert ECEF coordinates to latitude, longitude, altitude (WGS84).""" # WGS84 constants a = 6378137.0 # Semi-major axis e2 = 6.69437999014e-3 # First eccentricity squared # Longitude lon = 0.0 if x != 0 or y != 0: import math lon = math.atan2(y, x) # Latitude and altitude (iterative) import math p = math.sqrt(x * x + y * y) lat = math.atan2(z, p * (1 - e2)) for _ in range(10): # Iterate to converge N = a / math.sqrt(1 - e2 * math.sin(lat) ** 2) alt = p / math.cos(lat) - N lat_new = math.atan2(z, p * (1 - e2 * N / (N + alt))) if abs(lat_new - lat) < 1e-12: break lat = lat_new N = a / math.sqrt(1 - e2 * math.sin(lat) ** 2) alt = p / math.cos(lat) - N if abs(math.cos(lat)) > 1e-10 else z / math.sin(lat) - N * (1 - e2) return math.degrees(lat), math.degrees(lon), alt def _get_message_description(self, msg_type: int) -> str: """Get human-readable description for RTCM message type.""" descriptions = { 1001: "GPS L1 RTK Observables", 1002: "GPS L1 RTK Observables (Extended)", 1003: "GPS L1/L2 RTK Observables", 1004: "GPS L1/L2 RTK Observables (Extended)", 1005: "Stationary RTK Reference Station ARP", 1006: "Stationary RTK Reference Station ARP + Antenna Height", 1007: "Antenna Descriptor", 1008: "Antenna Descriptor & Serial Number", 1009: "GLONASS L1 RTK Observables", 1010: "GLONASS L1 RTK Observables (Extended)", 1011: "GLONASS L1/L2 RTK Observables", 1012: "GLONASS L1/L2 RTK Observables (Extended)", 1013: "System Parameters", 1019: "GPS Ephemerides", 1020: "GLONASS Ephemerides", 1033: "Receiver and Antenna Descriptors", 1074: "GPS MSM4 (Multi-Signal)", 1075: "GPS MSM5 (Multi-Signal)", 1076: "GPS MSM6 (Multi-Signal)", 1077: "GPS MSM7 (Multi-Signal)", 1084: "GLONASS MSM4 (Multi-Signal)", 1085: "GLONASS MSM5 (Multi-Signal)", 1086: "GLONASS MSM6 (Multi-Signal)", 1087: "GLONASS MSM7 (Multi-Signal)", 1094: "Galileo MSM4 (Multi-Signal)", 1095: "Galileo MSM5 (Multi-Signal)", 1096: "Galileo MSM6 (Multi-Signal)", 1097: "Galileo MSM7 (Multi-Signal)", 1124: "BeiDou MSM4 (Multi-Signal)", 1125: "BeiDou MSM5 (Multi-Signal)", 1126: "BeiDou MSM6 (Multi-Signal)", 1127: "BeiDou MSM7 (Multi-Signal)", 1230: "GLONASS Code-Phase Biases", } return descriptions.get(msg_type, f"RTCM Type {msg_type}") def get_stats(self) -> dict[str, Any]: """Get parser statistics.""" return { "total_bytes": self.total_bytes, "message_count": self.message_count, "message_types": dict(sorted(self.message_stats.items())), "base_station": self.base_station_position, }