185 lines
5.9 KiB
Python
185 lines
5.9 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
BLE NTRIP Client for RTK Receiver
|
|
|
|
Connects to an RTK receiver via Bluetooth Low Energy (BLE) to receive NMEA data
|
|
and forward RTCM corrections from an NTRIP caster.
|
|
|
|
This script uses the bleak library for cross-platform BLE communication.
|
|
"""
|
|
|
|
import asyncio
|
|
import sys
|
|
from bleak import BleakClient, BleakScanner
|
|
|
|
# ============================================================================
|
|
# Configuration
|
|
# ============================================================================
|
|
|
|
# BLE Device Configuration
|
|
DEVICE_NAME = "ML-NA001-250079-BLE"
|
|
DEVICE_UUID = "B2DDE9B2-881D-1BE2-5A1B-C44CB646BB1D"
|
|
|
|
# BLE Service and Characteristic UUIDs
|
|
SERVICE_UUID = "0000fff0-0000-1000-8000-00805f9b34fb" # Custom service
|
|
CHARACTERISTIC_UUID = "0000fff2-0000-1000-8000-00805f9b34fb" # Read/Notify characteristic for NMEA
|
|
|
|
# Debug options
|
|
DEBUG_RAW_DATA = True # Show raw bytes received
|
|
DEBUG_NMEA = True # Show parsed NMEA sentences
|
|
|
|
|
|
# ============================================================================
|
|
# BLE Connection and Data Handler
|
|
# ============================================================================
|
|
|
|
class BLENTRIPClient:
|
|
"""BLE NTRIP client for RTK receiver."""
|
|
|
|
def __init__(self):
|
|
self.client = None
|
|
self.running = False
|
|
self.nmea_buffer = bytearray()
|
|
|
|
async def find_device(self):
|
|
"""Scan for the RTK receiver BLE device."""
|
|
print(f"Scanning for BLE device: {DEVICE_NAME} ({DEVICE_UUID})...")
|
|
|
|
devices = await BleakScanner.discover(timeout=10.0)
|
|
|
|
for device in devices:
|
|
print(f"Found: {device.name} ({device.address})")
|
|
|
|
# Match by UUID or name
|
|
if (device.address.upper() == DEVICE_UUID.upper() or
|
|
device.name == DEVICE_NAME):
|
|
print(f"✓ Found target device: {device.name} at {device.address}")
|
|
return device.address
|
|
|
|
print(f"✗ Device not found. Scanned {len(devices)} devices.")
|
|
return None
|
|
|
|
def notification_handler(self, sender, data):
|
|
"""Handle incoming BLE notifications with NMEA data."""
|
|
if DEBUG_RAW_DATA:
|
|
print(f"Raw data ({len(data)} bytes): {data.hex()}")
|
|
|
|
# Add data to buffer
|
|
self.nmea_buffer.extend(data)
|
|
|
|
# Process complete NMEA sentences (terminated with \r\n)
|
|
while b'\n' in self.nmea_buffer:
|
|
# Find the end of the sentence
|
|
newline_idx = self.nmea_buffer.index(b'\n')
|
|
sentence_bytes = self.nmea_buffer[:newline_idx + 1]
|
|
self.nmea_buffer = self.nmea_buffer[newline_idx + 1:]
|
|
|
|
try:
|
|
# Decode NMEA sentence
|
|
sentence = sentence_bytes.decode('ascii').strip()
|
|
|
|
if sentence and DEBUG_NMEA:
|
|
print(f"NMEA: {sentence}")
|
|
|
|
# TODO: Parse NMEA sentences (GGA, etc.) for position data
|
|
|
|
except UnicodeDecodeError as e:
|
|
print(f"Failed to decode NMEA data: {e}")
|
|
|
|
async def connect_and_monitor(self):
|
|
"""Connect to the BLE device and monitor NMEA data."""
|
|
|
|
# Find the device
|
|
device_address = await self.find_device()
|
|
if not device_address:
|
|
print("Could not find RTK receiver. Make sure device is powered on and in range.")
|
|
return False
|
|
|
|
# Connect to device
|
|
print(f"\nConnecting to {device_address}...")
|
|
|
|
try:
|
|
async with BleakClient(device_address) as client:
|
|
self.client = client
|
|
|
|
if not client.is_connected:
|
|
print("✗ Failed to connect")
|
|
return False
|
|
|
|
print(f"✓ Connected to {DEVICE_NAME}")
|
|
|
|
# List available services and characteristics
|
|
print("\nAvailable services:")
|
|
for service in client.services:
|
|
print(f" Service: {service.uuid}")
|
|
for char in service.characteristics:
|
|
props = ','.join(char.properties)
|
|
print(f" Characteristic: {char.uuid} ({props})")
|
|
|
|
# Start notifications on NMEA characteristic
|
|
print(f"\nSubscribing to NMEA notifications on {CHARACTERISTIC_UUID}...")
|
|
await client.start_notify(CHARACTERISTIC_UUID, self.notification_handler)
|
|
print("✓ Subscribed to notifications\n")
|
|
|
|
# Keep connection alive and monitor data
|
|
self.running = True
|
|
print("Monitoring NMEA data (Ctrl+C to stop)...\n")
|
|
|
|
while self.running:
|
|
await asyncio.sleep(1)
|
|
|
|
# Check connection status
|
|
if not client.is_connected:
|
|
print("✗ Connection lost")
|
|
break
|
|
|
|
# Stop notifications before disconnecting
|
|
await client.stop_notify(CHARACTERISTIC_UUID)
|
|
|
|
except Exception as e:
|
|
print(f"✗ Error: {e}")
|
|
return False
|
|
|
|
return True
|
|
|
|
def stop(self):
|
|
"""Stop the client."""
|
|
self.running = False
|
|
|
|
|
|
# ============================================================================
|
|
# Main Entry Point
|
|
# ============================================================================
|
|
|
|
async def main():
|
|
"""Main entry point."""
|
|
client = BLENTRIPClient()
|
|
|
|
try:
|
|
await client.connect_and_monitor()
|
|
except KeyboardInterrupt:
|
|
print("\n\nStopping...")
|
|
client.stop()
|
|
except Exception as e:
|
|
print(f"\n✗ Fatal error: {e}")
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
print("=" * 70)
|
|
print("BLE NTRIP Client for RTK Receiver")
|
|
print("=" * 70)
|
|
print()
|
|
|
|
# Check if bleak is installed
|
|
try:
|
|
import bleak
|
|
except ImportError:
|
|
print("✗ Error: 'bleak' library not found")
|
|
print("\nInstall it with:")
|
|
print(" pip install bleak")
|
|
print()
|
|
sys.exit(1)
|
|
|
|
asyncio.run(main())
|