#!/usr/bin/env python3 """ Standalone NTRIP test script to observe RTCM stream behavior. Connects to NTRIP caster and displays parsed RTCM message details. """ import base64 import socket import time from datetime import datetime, timezone import math from rtcm_parser import RTCMParser # NTRIP configuration CASTER_HOST = "truertk.pointonenav.com" CASTER_PORT = 2101 MOUNTPOINT = "AUTO" USERNAME = "9t7fwfbm57" PASSWORD = "96m7bec9g8" # Position for GGA (hardcoded) LAT = 36.1140884 LON = -97.0880663 ALT = 390.0 # Debug options DEBUG_HEX = False # Set to True to see hex dump of first 5 messages DEBUG_1005_1006 = True # Show detailed debug for position messages def build_gga(lat: float, lon: float, alt: float) -> bytes: """Build NMEA GGA sentence.""" now_utc = datetime.now(timezone.utc).strftime("%H%M%S") # Convert to NMEA format lat_hemi = "N" if lat >= 0 else "S" lat_abs = abs(lat) lat_deg = int(lat_abs) lat_min = (lat_abs - lat_deg) * 60.0 lat_str = f"{lat_deg:02d}{lat_min:07.4f}" lon_hemi = "E" if lon >= 0 else "W" lon_abs = abs(lon) lon_deg = int(lon_abs) lon_min = (lon_abs - lon_deg) * 60.0 lon_str = f"{lon_deg:03d}{lon_min:07.4f}" fields = [ "GPGGA", now_utc, lat_str, lat_hemi, lon_str, lon_hemi, "1", # GPS fix "12", # Number of satellites "1.0", # HDOP f"{alt:.1f}", "M", "", "M", "", "", ] core = ",".join(fields) checksum = 0 for char in core: checksum ^= ord(char) sentence = f"${core}*{checksum:02X}\r\n" return sentence.encode("ascii") def make_ntrip_request(host: str, port: int, mount: str, user: str, password: str) -> bytes: """Create NTRIP v2 HTTP request.""" auth = base64.b64encode(f"{user}:{password}".encode("utf-8")).decode("ascii") req = ( f"GET /{mount} HTTP/1.1\r\n" f"Host: {host}:{port}\r\n" f"Ntrip-Version: Ntrip/2.0\r\n" f"User-Agent: NTRIP-Test/1.0\r\n" f"Connection: close\r\n" f"Authorization: Basic {auth}\r\n\r\n" ) return req.encode("ascii") def haversine_m(lat1: float, lon1: float, lat2: float, lon2: float) -> float: """Calculate distance between two points using Haversine formula.""" EARTH_RADIUS_M = 6371008.8 lat1_rad = math.radians(lat1) lat2_rad = math.radians(lat2) delta_lat = math.radians(lat2 - lat1) delta_lon = math.radians(lon2 - lon1) a = math.sin(delta_lat / 2) ** 2 + math.cos(lat1_rad) * math.cos(lat2_rad) * math.sin(delta_lon / 2) ** 2 return 2 * EARTH_RADIUS_M * math.asin(min(1.0, math.sqrt(a))) def main(): print(f"NTRIP Test Client (with HTTP Chunked Encoding Support)") print(f"=" * 80) print(f"Caster: {CASTER_HOST}:{CASTER_PORT}/{MOUNTPOINT}") print(f"Rover Position: {LAT:.7f}°, {LON:.7f}° @ {ALT:.1f}m") print(f"=" * 80) print() parser = RTCMParser() last_gga_time = 0 start_time = time.monotonic() try: # Connect to caster print(f"[{time.strftime('%H:%M:%S')}] Connecting...") sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(30) sock.connect((CASTER_HOST, CASTER_PORT)) # Send NTRIP request sock.sendall(make_ntrip_request(CASTER_HOST, CASTER_PORT, MOUNTPOINT, USERNAME, PASSWORD)) # Read HTTP response headers header = b"" while b"\r\n\r\n" not in header: chunk = sock.recv(1) if not chunk: print("ERROR: Caster closed before headers received") return header += chunk header_text = header.decode("iso-8859-1", errors="replace") if "200 OK" not in header_text: print(f"ERROR: NTRIP connection failed:") print(header_text) return print(f"[{time.strftime('%H:%M:%S')}] Connected! Streaming RTCM data...\n") # Main receive loop while True: now = time.monotonic() # Send GGA every 10 seconds if now - last_gga_time >= 10: gga = build_gga(LAT, LON, ALT) sock.sendall(gga) print(f"[{time.strftime('%H:%M:%S')}] → Sent GGA to caster") last_gga_time = now # Receive data data = sock.recv(4096) if not data: print(f"\n[{time.strftime('%H:%M:%S')}] Caster closed connection") break # Debug: show hex dump of raw data for first few messages if DEBUG_HEX and parser.message_count < 5: print(f"\n[DEBUG] Raw data ({len(data)} bytes):") print(" ".join(f"{b:02X}" for b in data[:min(100, len(data))])) if len(data) > 100: print(f"... ({len(data) - 100} more bytes)") print() # Parse RTCM messages messages = parser.parse_messages(data) for msg in messages: timestamp = time.strftime('%H:%M:%S') msg_type = msg["type"] length = msg["length"] index = msg["index"] # Build message info line info = f"[{timestamp}] MSG #{index:4d} | RTCM {msg_type:4d} | {length:4d} bytes" # Add description if available if "description" in msg: info += f" | {msg['description']}" # Highlight position messages even if parsing failed if msg_type in [1005, 1006]: info += " ← BASE POSITION MESSAGE" if "base_position" not in msg: info += " (PARSING FAILED)" if DEBUG_1005_1006 and "raw_hex" in msg: info += f"\n Raw hex: {msg['raw_hex'][:80]}..." elif DEBUG_1005_1006: info += " (PARSED OK)" # Check for base station position if "base_position" in msg: base = msg["base_position"] # Check if this is first time seeing base station is_first = parser.message_count == index and parser.base_station_position == base if is_first: info += f"\n ╔════════════════════════════════════════════════════════════════╗" info += f"\n ║ 🎯 BASE STATION POSITION ACQUIRED ║" info += f"\n ╚════════════════════════════════════════════════════════════════╝" else: info += f"\n └─ BASE STATION INFO" info += f"\n Station ID: {base.get('station_id', 'N/A')}" info += f"\n Position: {base['latitude']:.7f}°, {base['longitude']:.7f}°" info += f"\n Altitude: {base['altitude_m']:.2f} m" # ECEF coordinates if 'ecef_x' in base: info += f"\n ECEF: X={base['ecef_x']:.4f}, Y={base['ecef_y']:.4f}, Z={base['ecef_z']:.4f}" # Calculate baseline distance baseline = haversine_m(LAT, LON, base['latitude'], base['longitude']) info += f"\n 📏 Baseline Distance: {baseline:.2f} m ({baseline/1000:.3f} km)" if "antenna_height_m" in base: info += f"\n 📡 Antenna Height: {base['antenna_height_m']:.4f} m" if "itrf_year" in base and base["itrf_year"]: info += f"\n ITRF Year: {base['itrf_year']}" print(info) # Print stats every 10 messages if parser.message_count % 10 == 0 and parser.message_count > 0: stats = parser.get_stats() elapsed = time.monotonic() - start_time if elapsed > 0: bytes_per_hour = int(stats["total_bytes"] / elapsed * 3600) print(f"\n{'─' * 80}") print(f"STATS: {stats['message_count']} messages | {stats['total_bytes']:,} bytes | {bytes_per_hour:,} bytes/hour") print(f"Message Types: {dict(list(stats['message_types'].items())[:10])}") if stats["base_station"]: base = stats["base_station"] print(f"Base Station: {base['latitude']:.7f}°, {base['longitude']:.7f}° @ {base['altitude_m']:.2f}m") baseline = haversine_m(LAT, LON, base['latitude'], base['longitude']) print(f"Baseline: {baseline:.2f} m ({baseline/1000:.3f} km)") print(f"{'─' * 80}\n") except KeyboardInterrupt: print(f"\n\n[{time.strftime('%H:%M:%S')}] Interrupted by user") except Exception as e: print(f"\nERROR: {e}") finally: if 'sock' in locals(): try: sock.close() except: pass # Print final stats print(f"\n{'=' * 80}") print("FINAL STATISTICS") print(f"{'=' * 80}") stats = parser.get_stats() elapsed = time.monotonic() - start_time print(f"Connected for: {int(elapsed)} seconds") print(f"Total messages: {stats['message_count']}") print(f"Total bytes: {stats['total_bytes']:,}") if elapsed > 0: print(f"Average rate: {int(stats['total_bytes'] / elapsed * 3600):,} bytes/hour") print(f"\nMessage types received:") for msg_type, count in sorted(stats['message_types'].items()): print(f" RTCM {msg_type:4d}: {count:4d} messages") if stats["base_station"]: print(f"\nBase Station Information:") base = stats["base_station"] print(f" Station ID: {base.get('station_id', 'N/A')}") print(f" Position: {base['latitude']:.7f}°, {base['longitude']:.7f}°") print(f" Altitude: {base['altitude_m']:.2f} m") baseline = haversine_m(LAT, LON, base['latitude'], base['longitude']) print(f" Baseline from rover: {baseline:.2f} m ({baseline/1000:.3f} km)") print(f"{'=' * 80}") if __name__ == "__main__": main()