Files
maglink-console/ntrip/bluetooth_nmea_parser.py
brentperteet 5703c05c1d Initial commit
2026-06-24 11:12:44 -05:00

271 lines
8.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Bluetooth Serial NMEA Parser for RTK Receiver
Connects to RTK receiver via Bluetooth and parses accuracy information from NMEA sentences.
"""
import serial
import re
from datetime import datetime
# ========= USER SETTINGS =========
BLUETOOTH_PORT = "/dev/tty.H11-230621-SerialPort" # macOS Bluetooth serial port
# On Linux, might be: /dev/rfcomm0
# On Windows, might be: COM5
BAUD_RATE = 115200
# =================================
def parse_gga(sentence):
"""
Parse NMEA GGA sentence for position quality and HDOP.
$GPGGA,time,lat,NS,lon,EW,quality,numSV,HDOP,alt,M,sep,M,diffAge,diffSta*CS
Quality values:
0 = Invalid
1 = GPS fix (SPS)
2 = DGPS fix
3 = PPS fix
4 = RTK Fixed
5 = RTK Float
6 = Estimated (dead reckoning)
7 = Manual input mode
8 = Simulation mode
"""
parts = sentence.split(',')
if len(parts) < 9:
return None
try:
time_utc = parts[1]
lat = parts[2]
lat_dir = parts[3]
lon = parts[4]
lon_dir = parts[5]
quality = int(parts[6])
num_sats = int(parts[7]) if parts[7] else 0
hdop = float(parts[8]) if parts[8] else 0.0
quality_map = {
0: "Invalid",
1: "GPS (SPS)",
2: "DGPS",
3: "PPS",
4: "RTK Fixed",
5: "RTK Float",
6: "Estimated",
7: "Manual",
8: "Simulation"
}
return {
'sentence': 'GGA',
'time': time_utc,
'quality': quality,
'quality_str': quality_map.get(quality, f"Unknown({quality})"),
'num_sats': num_sats,
'hdop': hdop,
'lat': lat,
'lat_dir': lat_dir,
'lon': lon,
'lon_dir': lon_dir
}
except (ValueError, IndexError):
return None
def parse_gsa(sentence):
"""
Parse NMEA GSA sentence for DOP values (PDOP, HDOP, VDOP).
$GPGSA,mode,fix_type,sat1,...,sat12,PDOP,HDOP,VDOP*CS
"""
parts = sentence.split(',')
if len(parts) < 18:
return None
try:
mode = parts[1] # M=Manual, A=Automatic
fix_type = int(parts[2]) if parts[2] else 0 # 1=no fix, 2=2D, 3=3D
# DOPs are at the end
pdop = float(parts[-3]) if parts[-3] else 0.0
hdop = float(parts[-2]) if parts[-2] else 0.0
vdop_cs = parts[-1].split('*')[0] # Remove checksum
vdop = float(vdop_cs) if vdop_cs else 0.0
fix_map = {
1: "No Fix",
2: "2D Fix",
3: "3D Fix"
}
return {
'sentence': 'GSA',
'fix_type': fix_type,
'fix_str': fix_map.get(fix_type, f"Unknown({fix_type})"),
'pdop': pdop,
'hdop': hdop,
'vdop': vdop
}
except (ValueError, IndexError):
return None
def calculate_accuracy_estimate(hdop, quality):
"""
Estimate horizontal accuracy in meters based on HDOP and fix quality.
Rough approximation:
- RTK Fixed: ~0.01-0.02m (1-2cm)
- RTK Float: ~0.1-1m (10cm-1m)
- DGPS: ~1-5m
- GPS (SPS): ~5-15m
Accuracy ≈ HDOP × UERE (User Equivalent Range Error)
Where UERE varies by fix type
"""
uere_map = {
4: 0.01, # RTK Fixed: 1cm UERE
5: 0.3, # RTK Float: 30cm UERE
2: 1.5, # DGPS: 1.5m UERE
1: 5.0, # GPS: 5m UERE
0: 999.0 # Invalid
}
uere = uere_map.get(quality, 10.0)
accuracy = hdop * uere
return accuracy
def format_position(lat, lat_dir, lon, lon_dir):
"""Convert NMEA ddmm.mmmm format to decimal degrees."""
try:
# Latitude: ddmm.mmmm
lat_deg = int(float(lat) / 100)
lat_min = float(lat) - (lat_deg * 100)
lat_decimal = lat_deg + (lat_min / 60.0)
if lat_dir == 'S':
lat_decimal = -lat_decimal
# Longitude: dddmm.mmmm
lon_deg = int(float(lon) / 100)
lon_min = float(lon) - (lon_deg * 100)
lon_decimal = lon_deg + (lon_min / 60.0)
if lon_dir == 'W':
lon_decimal = -lon_decimal
return lat_decimal, lon_decimal
except (ValueError, ZeroDivisionError):
return None, None
def main():
print(f"Connecting to RTK receiver on {BLUETOOTH_PORT} @ {BAUD_RATE} baud...")
try:
ser = serial.Serial(BLUETOOTH_PORT, BAUD_RATE, timeout=1)
print(f"✓ Connected to {BLUETOOTH_PORT}")
print("=" * 80)
print("Parsing NMEA stream for accuracy information...")
print("=" * 80)
# Track latest values
latest_gga = None
latest_gsa = None
while True:
try:
line = ser.readline().decode('ascii', errors='ignore').strip()
if not line.startswith('$'):
continue
# Parse GGA sentences (position quality, HDOP)
if 'GGA' in line:
gga = parse_gga(line)
if gga:
latest_gga = gga
# Calculate estimated accuracy
accuracy = calculate_accuracy_estimate(gga['hdop'], gga['quality'])
# Convert position to decimal degrees
lat_dd, lon_dd = format_position(gga['lat'], gga['lat_dir'],
gga['lon'], gga['lon_dir'])
print(f"\n[{datetime.now().strftime('%H:%M:%S')}] Position Quality:")
print(f" Fix Type: {gga['quality_str']}")
print(f" Satellites: {gga['num_sats']}")
print(f" HDOP: {gga['hdop']:.2f}")
print(f" Est. Accuracy: {accuracy:.3f} m", end="")
if gga['quality'] == 4:
print(f" ({accuracy*100:.1f} cm) ← RTK FIXED ✓")
elif gga['quality'] == 5:
print(f" ({accuracy*100:.1f} cm) ← RTK FLOAT")
else:
print()
if lat_dd and lon_dd:
print(f" Position: {lat_dd:.8f}°, {lon_dd:.8f}°")
# Parse GSA sentences (PDOP, HDOP, VDOP)
elif 'GSA' in line:
gsa = parse_gsa(line)
if gsa:
latest_gsa = gsa
if gsa['fix_type'] >= 2: # Only show if we have a fix
print(f"\n[{datetime.now().strftime('%H:%M:%S')}] Dilution of Precision:")
print(f" Fix Type: {gsa['fix_str']}")
print(f" PDOP: {gsa['pdop']:.2f} (Position)")
print(f" HDOP: {gsa['hdop']:.2f} (Horizontal)")
print(f" VDOP: {gsa['vdop']:.2f} (Vertical)")
# DOP quality interpretation
if gsa['hdop'] < 1.0:
dop_quality = "Excellent"
elif gsa['hdop'] < 2.0:
dop_quality = "Good"
elif gsa['hdop'] < 5.0:
dop_quality = "Moderate"
elif gsa['hdop'] < 10.0:
dop_quality = "Fair"
else:
dop_quality = "Poor"
print(f" DOP Quality: {dop_quality}")
except UnicodeDecodeError:
continue
except KeyboardInterrupt:
raise
except Exception as e:
print(f"Error parsing line: {e}")
continue
except serial.SerialException as e:
print(f"✗ Failed to connect to {BLUETOOTH_PORT}: {e}")
print("\nTroubleshooting:")
print(" 1. Check that the Bluetooth device is paired")
print(" 2. Find the correct port:")
print(" macOS: ls /dev/tty.* | grep -i bluetooth")
print(" Linux: ls /dev/rfcomm*")
print(" Windows: Check Device Manager → Ports (COM & LPT)")
print(" 3. Update BLUETOOTH_PORT in this script")
return 1
except KeyboardInterrupt:
print("\n\nDisconnected.")
return 0
finally:
if 'ser' in locals() and ser.is_open:
ser.close()
if __name__ == "__main__":
exit(main())