#!/usr/bin/env python3 """ Simple NTRIP client to pull RTCM from a CORS/RTK caster and inject to a GNSS receiver. - Connects to caster with HTTP Basic Auth (NTRIP v2 headers). - Sends NMEA GGA immediately and every GGA_INTERVAL seconds. - Writes RTCM bytes to a serial port (or optional TCP out). """ import base64 import os import socket import sys import time import threading from datetime import datetime, timezone from typing import Optional try: import serial # pip install pyserial except ImportError: serial = None # ========= USER SETTINGS ========= # --- Caster / NTRIP source --- CASTER_HOST = "truertk.pointonenav.com" # e.g. "12.34.56.78" CASTER_PORT = 2101 # e.g. 2101 MOUNTPOINT = "AUTO" # e.g. "RTCM3" USERNAME = "9t7fwfbm57" PASSWORD = "96m7bec9g8" # --- Output to GNSS receiver --- USE_SERIAL_OUT = False SERIAL_PORT = "/dev/tty.ML-NA001-250079" # Windows: "COM5" SERIAL_BAUD = 115200 # Optional: forward RTCM to TCP instead of serial (set USE_SERIAL_OUT=False) USE_TCP_OUT = False TCP_OUT_HOST = "127.0.0.1" TCP_OUT_PORT = 2102 # --- GGA configuration --- SEND_GGA = True GGA_INTERVAL_SEC = 10 # caster-friendly: 5–15 seconds typical # If you have a rough position, put it here (WGS84): GGA_LAT_DEG = 36.1140884 # positive N, negative S GGA_LON_DEG = -97.0880663 # positive E, negative W GGA_ALT_M = 390.0 # orthometric (approx OK) # --- Misc/retry --- RECV_BUF = 4096 RECONNECT_DELAY_S = 5 SOCK_TIMEOUT_S = 30 USER_AGENT = "NTRIP pyclient/1.0" # --- Debug --- DEBUG_RTCM = True # Show RTCM message stats PARSE_NMEA = True # Parse NMEA from receiver (read from serial) USE_RECEIVER_POS = True # Use receiver's actual position for GGA to caster DEBUG_ACCURACY = True # Show receiver accuracy info # ================================= def nmea_checksum(sentence_no_dollar: str) -> str: csum = 0 for ch in sentence_no_dollar: csum ^= ord(ch) return f"{csum:02X}" def format_lat_lon(lat_deg: float, lon_deg: float): """ Convert signed decimal degrees to NMEA ddmm.mmmm, dddmm.mmmm and hemispheres. """ # Latitude lat_hemi = "N" if lat_deg >= 0 else "S" lat_abs = abs(lat_deg) lat_deg_i = int(lat_abs) lat_min = (lat_abs - lat_deg_i) * 60.0 lat_str = f"{lat_deg_i:02d}{lat_min:07.4f}" # Longitude lon_hemi = "E" if lon_deg >= 0 else "W" lon_abs = abs(lon_deg) lon_deg_i = int(lon_abs) lon_min = (lon_abs - lon_deg_i) * 60.0 lon_str = f"{lon_deg_i:03d}{lon_min:07.4f}" return lat_str, lat_hemi, lon_str, lon_hemi def parse_nmea_position(lat_nmea: str, lat_dir: str, lon_nmea: str, lon_dir: str): """Convert NMEA ddmm.mmmm format to decimal degrees.""" try: # Latitude: ddmm.mmmm lat_deg = int(float(lat_nmea) / 100) lat_min = float(lat_nmea) - (lat_deg * 100) lat_decimal = lat_deg + (lat_min / 60.0) if lat_dir == 'S': lat_decimal = -lat_decimal # Longitude: dddmm.mmmm lon_deg = int(float(lon_nmea) / 100) lon_min = float(lon_nmea) - (lon_deg * 100) lon_decimal = lon_deg + (lon_min / 60.0) if lon_dir == 'W': lon_decimal = -lon_decimal return lat_decimal, lon_decimal except (ValueError, ZeroDivisionError): return None, None def parse_gga(sentence: str): """ Parse NMEA GGA sentence. Returns dict with position, quality, sats, hdop, altitude. """ parts = sentence.split(',') if len(parts) < 15: return None try: return { 'time': parts[1], 'lat': parts[2], 'lat_dir': parts[3], 'lon': parts[4], 'lon_dir': parts[5], 'quality': int(parts[6]) if parts[6] else 0, 'num_sats': int(parts[7]) if parts[7] else 0, 'hdop': float(parts[8]) if parts[8] else 0.0, 'altitude': float(parts[9]) if parts[9] else 0.0, } except (ValueError, IndexError): return None def build_gga(lat_deg: float, lon_deg: float, alt_m: float, fix_quality=1, sats=12, hdop=1.0) -> bytes: """ Build a minimal NMEA GGA sentence (UTC time now, fix provided). Returns CRLF-terminated bytes. """ now = datetime.now(timezone.utc).strftime("%H%M%S") lat_str, lat_hemi, lon_str, lon_hemi = format_lat_lon(lat_deg, lon_deg) # GGA fields: # $GPGGA,time,lat,NS,lon,EW,quality,numSV,HDOP,alt,M,sep,M,diffAge,diffStation fields = [ "GPGGA", now, lat_str, lat_hemi, lon_str, lon_hemi, str(fix_quality), # 1 = GPS fix, 4/5 = RTK; for caster seeding 1 is fine f"{sats:02d}", f"{hdop:.1f}", f"{alt_m:.1f}", "M", # altitude + units "", "M", # geoid separation unknown "", "", # DGPS age/station ] core = ",".join(fields) csum = nmea_checksum(core) sentence = f"${core}*{csum}\r\n" return sentence.encode("ascii") def make_ntrip_request(host: str, port: int, mount: str, user: str, password: str) -> bytes: auth = base64.b64encode(f"{user}:{password}".encode("utf-8")).decode("ascii") # NTRIP v2-style request. Mountpoint must be URL-encoded if it contains special chars; most are simple. req = ( f"GET /{mount} HTTP/1.1\r\n" f"Host: {host}:{port}\r\n" f"Ntrip-Version: Ntrip/2.0\r\n" f"User-Agent: {USER_AGENT}\r\n" f"Connection: close\r\n" f"Authorization: Basic {auth}\r\n\r\n" ) return req.encode("ascii") class RTCMForwarder: def __init__(self): self.ser = None self.tcp_out_sock = None self.total_bytes = 0 self.msg_count = 0 self.start_time = None # For tracking receiver position/status self.latest_gga = None self.nmea_buffer = "" def open(self): self.start_time = time.monotonic() if USE_SERIAL_OUT: if serial is None: raise RuntimeError("pyserial is not installed. Install with: pip install pyserial") self.ser = serial.Serial(SERIAL_PORT, SERIAL_BAUD, timeout=0) print(f"[OUT] Serial open {SERIAL_PORT} @ {SERIAL_BAUD}") elif USE_TCP_OUT: self.tcp_out_sock = socket.create_connection((TCP_OUT_HOST, TCP_OUT_PORT), timeout=5) print(f"[OUT] TCP forward connected {TCP_OUT_HOST}:{TCP_OUT_PORT}") else: print("[OUT] No output configured; data will be discarded.") def read_nmea(self): """Read and parse NMEA data from the receiver (non-blocking).""" if not self.ser or not PARSE_NMEA: return try: # Read available data (non-blocking because timeout=0) if self.ser.in_waiting > 0: data = self.ser.read(self.ser.in_waiting) self.nmea_buffer += data.decode('ascii', errors='ignore') # Process complete NMEA sentences while '\n' in self.nmea_buffer: line, self.nmea_buffer = self.nmea_buffer.split('\n', 1) line = line.strip() if line.startswith('$') and 'GGA' in line: gga = parse_gga(line) if gga and gga['quality'] > 0: self.latest_gga = gga if DEBUG_ACCURACY: self._display_accuracy(gga) except Exception: # Don't crash on NMEA parse errors pass def _display_accuracy(self, gga): """Display receiver accuracy information.""" quality_map = { 0: "Invalid", 1: "GPS (SPS)", 2: "DGPS", 3: "PPS", 4: "RTK Fixed", 5: "RTK Float", 6: "Estimated", } quality_str = quality_map.get(gga['quality'], f"Unknown({gga['quality']})") # Calculate estimated accuracy uere_map = { 4: 0.01, # RTK Fixed: 1cm 5: 0.3, # RTK Float: 30cm 2: 1.5, # DGPS: 1.5m 1: 5.0, # GPS: 5m 0: 999.0 } uere = uere_map.get(gga['quality'], 10.0) accuracy = gga['hdop'] * uere # Convert position to decimal degrees lat_dd, lon_dd = parse_nmea_position(gga['lat'], gga['lat_dir'], gga['lon'], gga['lon_dir']) indicator = "" if gga['quality'] == 4: indicator = " ← RTK FIXED ✓" elif gga['quality'] == 5: indicator = " ← RTK FLOAT" print(f"[RX] {quality_str:12s} | Sats: {gga['num_sats']:2d} | HDOP: {gga['hdop']:4.1f} | " f"Acc: {accuracy:6.3f}m ({accuracy*100:5.1f}cm){indicator}") if lat_dd and lon_dd: print(f" Position: {lat_dd:11.7f}°, {lon_dd:11.7f}° | Alt: {gga['altitude']:6.1f}m") def get_position(self): """Get the latest position from the receiver, or fallback to configured position.""" if USE_RECEIVER_POS and self.latest_gga: lat_dd, lon_dd = parse_nmea_position( self.latest_gga['lat'], self.latest_gga['lat_dir'], self.latest_gga['lon'], self.latest_gga['lon_dir'] ) if lat_dd and lon_dd: return lat_dd, lon_dd, self.latest_gga['altitude'] # Fallback to configured position return GGA_LAT_DEG, GGA_LON_DEG, GGA_ALT_M def write(self, data: bytes): if not data: return # Track statistics self.total_bytes += len(data) # Debug: parse and display RTCM messages if DEBUG_RTCM: self._debug_rtcm(data) if self.ser: self.ser.write(data) elif self.tcp_out_sock: try: self.tcp_out_sock.sendall(data) except Exception: # attempt to reconnect once try: self.tcp_out_sock.close() except Exception: pass self.tcp_out_sock = socket.create_connection((TCP_OUT_HOST, TCP_OUT_PORT), timeout=5) self.tcp_out_sock.sendall(data) # else: discard def _debug_rtcm(self, data: bytes): """Parse and display RTCM3 message info""" i = 0 while i < len(data): # RTCM3 messages start with 0xD3 if data[i] == 0xD3 and i + 2 < len(data): # Parse header: 0xD3 + 2 bytes (6 bits reserved + 10 bits length) length = ((data[i+1] & 0x03) << 8) | data[i+2] msg_total_len = 3 + length + 3 # header + payload + CRC if i + msg_total_len <= len(data) and length >= 3: # Extract message type (first 12 bits of payload) msg_type = (data[i+3] << 4) | (data[i+4] >> 4) self.msg_count += 1 # Calculate bytes per hour elapsed = time.monotonic() - self.start_time if elapsed > 0: bytes_per_hour = int(self.total_bytes / elapsed * 3600) print(f"[RTCM] Msg #{self.msg_count}: Type {msg_type:4d}, {length:4d} bytes payload, {self.total_bytes:8d} total bytes ({bytes_per_hour:,} bytes/hour)") else: print(f"[RTCM] Msg #{self.msg_count}: Type {msg_type:4d}, {length:4d} bytes payload, {self.total_bytes:8d} total bytes") i += msg_total_len continue i += 1 def close(self): try: if self.ser: self.ser.close() if self.tcp_out_sock: self.tcp_out_sock.close() except Exception: pass def ntrip_loop(): out = RTCMForwarder() out.open() while True: try: print(f"[NTRIP] Connecting to {CASTER_HOST}:{CASTER_PORT} /{MOUNTPOINT}") s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(SOCK_TIMEOUT_S) s.connect((CASTER_HOST, CASTER_PORT)) # Send request s.sendall(make_ntrip_request(CASTER_HOST, CASTER_PORT, MOUNTPOINT, USERNAME, PASSWORD)) # Read HTTP response headers header = b"" while b"\r\n\r\n" not in header: chunk = s.recv(1) if not chunk: raise ConnectionError("Caster closed before headers were received.") header += chunk header_text = header.decode("iso-8859-1", errors="replace") if "200 OK" not in header_text: s.close() raise ConnectionError(f"NTRIP error or mount not found:\n{header_text}") print("[NTRIP] 200 OK – streaming RTCM") # Thread to send periodic GGA stop_gga = threading.Event() def gga_sender(): if not SEND_GGA: return # Many casters accept GGA after the HTTP header via the same socket # (Write NMEA sentences to the socket; they are ignored by HTTP and consumed by caster) next_send = 0 while not stop_gga.is_set(): now = time.monotonic() if now >= next_send: # Get position from receiver or use fallback lat, lon, alt = out.get_position() gga = build_gga(lat, lon, alt) try: s.sendall(gga) if USE_RECEIVER_POS and out.latest_gga: print(f"[GGA→] Sent receiver position to caster") else: print(f"[GGA→] Sent fallback position to caster") except Exception: break next_send = now + GGA_INTERVAL_SEC time.sleep(0.5) gga_thread = threading.Thread(target=gga_sender, daemon=True) gga_thread.start() # Main receive loop last_data_time = time.monotonic() while True: # Read NMEA from receiver (non-blocking) out.read_nmea() data = s.recv(RECV_BUF) if not data: raise ConnectionError("Caster closed the connection.") last_data_time = time.monotonic() out.write(data) # Simple idle watchdog if time.monotonic() - last_data_time > SOCK_TIMEOUT_S: raise TimeoutError("No data from caster.") except KeyboardInterrupt: print("\n[EXIT] Interrupted by user.") out.close() try: s.close() except Exception: pass sys.exit(0) except Exception as e: print(f"[WARN] {e}") try: s.close() except Exception: pass print(f"[NTRIP] Reconnecting in {RECONNECT_DELAY_S}s…") time.sleep(RECONNECT_DELAY_S) continue if __name__ == "__main__": # Basic sanity checks if not CASTER_HOST or not MOUNTPOINT or not USERNAME: print("Please fill in CASTER_HOST, MOUNTPOINT, USERNAME, PASSWORD at the top of this script.") sys.exit(1) if USE_SERIAL_OUT is False and USE_TCP_OUT is False: print("No output path enabled. Set USE_SERIAL_OUT=True or USE_TCP_OUT=True.") # continue anyway (discards data) ntrip_loop()