271 lines
8.4 KiB
Python
271 lines
8.4 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Bluetooth Serial NMEA Parser for RTK Receiver
|
||
Connects to RTK receiver via Bluetooth and parses accuracy information from NMEA sentences.
|
||
"""
|
||
|
||
import serial
|
||
import re
|
||
from datetime import datetime
|
||
|
||
|
||
# ========= USER SETTINGS =========
|
||
BLUETOOTH_PORT = "/dev/tty.H11-230621-SerialPort" # macOS Bluetooth serial port
|
||
# On Linux, might be: /dev/rfcomm0
|
||
# On Windows, might be: COM5
|
||
BAUD_RATE = 115200
|
||
# =================================
|
||
|
||
|
||
def parse_gga(sentence):
|
||
"""
|
||
Parse NMEA GGA sentence for position quality and HDOP.
|
||
$GPGGA,time,lat,NS,lon,EW,quality,numSV,HDOP,alt,M,sep,M,diffAge,diffSta*CS
|
||
|
||
Quality values:
|
||
0 = Invalid
|
||
1 = GPS fix (SPS)
|
||
2 = DGPS fix
|
||
3 = PPS fix
|
||
4 = RTK Fixed
|
||
5 = RTK Float
|
||
6 = Estimated (dead reckoning)
|
||
7 = Manual input mode
|
||
8 = Simulation mode
|
||
"""
|
||
parts = sentence.split(',')
|
||
if len(parts) < 9:
|
||
return None
|
||
|
||
try:
|
||
time_utc = parts[1]
|
||
lat = parts[2]
|
||
lat_dir = parts[3]
|
||
lon = parts[4]
|
||
lon_dir = parts[5]
|
||
quality = int(parts[6])
|
||
num_sats = int(parts[7]) if parts[7] else 0
|
||
hdop = float(parts[8]) if parts[8] else 0.0
|
||
|
||
quality_map = {
|
||
0: "Invalid",
|
||
1: "GPS (SPS)",
|
||
2: "DGPS",
|
||
3: "PPS",
|
||
4: "RTK Fixed",
|
||
5: "RTK Float",
|
||
6: "Estimated",
|
||
7: "Manual",
|
||
8: "Simulation"
|
||
}
|
||
|
||
return {
|
||
'sentence': 'GGA',
|
||
'time': time_utc,
|
||
'quality': quality,
|
||
'quality_str': quality_map.get(quality, f"Unknown({quality})"),
|
||
'num_sats': num_sats,
|
||
'hdop': hdop,
|
||
'lat': lat,
|
||
'lat_dir': lat_dir,
|
||
'lon': lon,
|
||
'lon_dir': lon_dir
|
||
}
|
||
except (ValueError, IndexError):
|
||
return None
|
||
|
||
|
||
def parse_gsa(sentence):
|
||
"""
|
||
Parse NMEA GSA sentence for DOP values (PDOP, HDOP, VDOP).
|
||
$GPGSA,mode,fix_type,sat1,...,sat12,PDOP,HDOP,VDOP*CS
|
||
"""
|
||
parts = sentence.split(',')
|
||
if len(parts) < 18:
|
||
return None
|
||
|
||
try:
|
||
mode = parts[1] # M=Manual, A=Automatic
|
||
fix_type = int(parts[2]) if parts[2] else 0 # 1=no fix, 2=2D, 3=3D
|
||
|
||
# DOPs are at the end
|
||
pdop = float(parts[-3]) if parts[-3] else 0.0
|
||
hdop = float(parts[-2]) if parts[-2] else 0.0
|
||
vdop_cs = parts[-1].split('*')[0] # Remove checksum
|
||
vdop = float(vdop_cs) if vdop_cs else 0.0
|
||
|
||
fix_map = {
|
||
1: "No Fix",
|
||
2: "2D Fix",
|
||
3: "3D Fix"
|
||
}
|
||
|
||
return {
|
||
'sentence': 'GSA',
|
||
'fix_type': fix_type,
|
||
'fix_str': fix_map.get(fix_type, f"Unknown({fix_type})"),
|
||
'pdop': pdop,
|
||
'hdop': hdop,
|
||
'vdop': vdop
|
||
}
|
||
except (ValueError, IndexError):
|
||
return None
|
||
|
||
|
||
def calculate_accuracy_estimate(hdop, quality):
|
||
"""
|
||
Estimate horizontal accuracy in meters based on HDOP and fix quality.
|
||
|
||
Rough approximation:
|
||
- RTK Fixed: ~0.01-0.02m (1-2cm)
|
||
- RTK Float: ~0.1-1m (10cm-1m)
|
||
- DGPS: ~1-5m
|
||
- GPS (SPS): ~5-15m
|
||
|
||
Accuracy ≈ HDOP × UERE (User Equivalent Range Error)
|
||
Where UERE varies by fix type
|
||
"""
|
||
uere_map = {
|
||
4: 0.01, # RTK Fixed: 1cm UERE
|
||
5: 0.3, # RTK Float: 30cm UERE
|
||
2: 1.5, # DGPS: 1.5m UERE
|
||
1: 5.0, # GPS: 5m UERE
|
||
0: 999.0 # Invalid
|
||
}
|
||
|
||
uere = uere_map.get(quality, 10.0)
|
||
accuracy = hdop * uere
|
||
|
||
return accuracy
|
||
|
||
|
||
def format_position(lat, lat_dir, lon, lon_dir):
|
||
"""Convert NMEA ddmm.mmmm format to decimal degrees."""
|
||
try:
|
||
# Latitude: ddmm.mmmm
|
||
lat_deg = int(float(lat) / 100)
|
||
lat_min = float(lat) - (lat_deg * 100)
|
||
lat_decimal = lat_deg + (lat_min / 60.0)
|
||
if lat_dir == 'S':
|
||
lat_decimal = -lat_decimal
|
||
|
||
# Longitude: dddmm.mmmm
|
||
lon_deg = int(float(lon) / 100)
|
||
lon_min = float(lon) - (lon_deg * 100)
|
||
lon_decimal = lon_deg + (lon_min / 60.0)
|
||
if lon_dir == 'W':
|
||
lon_decimal = -lon_decimal
|
||
|
||
return lat_decimal, lon_decimal
|
||
except (ValueError, ZeroDivisionError):
|
||
return None, None
|
||
|
||
|
||
def main():
|
||
print(f"Connecting to RTK receiver on {BLUETOOTH_PORT} @ {BAUD_RATE} baud...")
|
||
|
||
try:
|
||
ser = serial.Serial(BLUETOOTH_PORT, BAUD_RATE, timeout=1)
|
||
print(f"✓ Connected to {BLUETOOTH_PORT}")
|
||
print("=" * 80)
|
||
print("Parsing NMEA stream for accuracy information...")
|
||
print("=" * 80)
|
||
|
||
# Track latest values
|
||
latest_gga = None
|
||
latest_gsa = None
|
||
|
||
while True:
|
||
try:
|
||
line = ser.readline().decode('ascii', errors='ignore').strip()
|
||
|
||
if not line.startswith('$'):
|
||
continue
|
||
|
||
# Parse GGA sentences (position quality, HDOP)
|
||
if 'GGA' in line:
|
||
gga = parse_gga(line)
|
||
if gga:
|
||
latest_gga = gga
|
||
|
||
# Calculate estimated accuracy
|
||
accuracy = calculate_accuracy_estimate(gga['hdop'], gga['quality'])
|
||
|
||
# Convert position to decimal degrees
|
||
lat_dd, lon_dd = format_position(gga['lat'], gga['lat_dir'],
|
||
gga['lon'], gga['lon_dir'])
|
||
|
||
print(f"\n[{datetime.now().strftime('%H:%M:%S')}] Position Quality:")
|
||
print(f" Fix Type: {gga['quality_str']}")
|
||
print(f" Satellites: {gga['num_sats']}")
|
||
print(f" HDOP: {gga['hdop']:.2f}")
|
||
print(f" Est. Accuracy: {accuracy:.3f} m", end="")
|
||
|
||
if gga['quality'] == 4:
|
||
print(f" ({accuracy*100:.1f} cm) ← RTK FIXED ✓")
|
||
elif gga['quality'] == 5:
|
||
print(f" ({accuracy*100:.1f} cm) ← RTK FLOAT")
|
||
else:
|
||
print()
|
||
|
||
if lat_dd and lon_dd:
|
||
print(f" Position: {lat_dd:.8f}°, {lon_dd:.8f}°")
|
||
|
||
# Parse GSA sentences (PDOP, HDOP, VDOP)
|
||
elif 'GSA' in line:
|
||
gsa = parse_gsa(line)
|
||
if gsa:
|
||
latest_gsa = gsa
|
||
|
||
if gsa['fix_type'] >= 2: # Only show if we have a fix
|
||
print(f"\n[{datetime.now().strftime('%H:%M:%S')}] Dilution of Precision:")
|
||
print(f" Fix Type: {gsa['fix_str']}")
|
||
print(f" PDOP: {gsa['pdop']:.2f} (Position)")
|
||
print(f" HDOP: {gsa['hdop']:.2f} (Horizontal)")
|
||
print(f" VDOP: {gsa['vdop']:.2f} (Vertical)")
|
||
|
||
# DOP quality interpretation
|
||
if gsa['hdop'] < 1.0:
|
||
dop_quality = "Excellent"
|
||
elif gsa['hdop'] < 2.0:
|
||
dop_quality = "Good"
|
||
elif gsa['hdop'] < 5.0:
|
||
dop_quality = "Moderate"
|
||
elif gsa['hdop'] < 10.0:
|
||
dop_quality = "Fair"
|
||
else:
|
||
dop_quality = "Poor"
|
||
|
||
print(f" DOP Quality: {dop_quality}")
|
||
|
||
except UnicodeDecodeError:
|
||
continue
|
||
except KeyboardInterrupt:
|
||
raise
|
||
except Exception as e:
|
||
print(f"Error parsing line: {e}")
|
||
continue
|
||
|
||
except serial.SerialException as e:
|
||
print(f"✗ Failed to connect to {BLUETOOTH_PORT}: {e}")
|
||
print("\nTroubleshooting:")
|
||
print(" 1. Check that the Bluetooth device is paired")
|
||
print(" 2. Find the correct port:")
|
||
print(" macOS: ls /dev/tty.* | grep -i bluetooth")
|
||
print(" Linux: ls /dev/rfcomm*")
|
||
print(" Windows: Check Device Manager → Ports (COM & LPT)")
|
||
print(" 3. Update BLUETOOTH_PORT in this script")
|
||
return 1
|
||
|
||
except KeyboardInterrupt:
|
||
print("\n\nDisconnected.")
|
||
return 0
|
||
|
||
finally:
|
||
if 'ser' in locals() and ser.is_open:
|
||
ser.close()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
exit(main())
|