kopia lustrzana https://github.com/inmcm/micropyGPS
Refer to 'micropyGPS.py' for the proposed improvements
rodzic
f6c2b76a3b
commit
9f5f9fb868
604
micropyGPS.py
604
micropyGPS.py
|
@ -11,6 +11,24 @@
|
|||
# Dynamically limit sentences types to parse
|
||||
|
||||
from math import floor, modf
|
||||
from typing import Final, Callable
|
||||
|
||||
#
|
||||
# Proposal of improvements 2023-02
|
||||
# - formatted by black
|
||||
# - add new format in date_string ('2023-01-01'): 'l_ymd'
|
||||
# - add new format in speed_string ('3.2f')
|
||||
# - add new 'timestamp_string' helper function
|
||||
# - adjust code for being compliant with typing (pylance & mypy)
|
||||
# - add parser for the antenna status: PGTOP: $PGTOP,11,x
|
||||
# - implement suggestions from https://github.com/inmcm/micropyGPS/issues/14
|
||||
# - use of fstring instead of % formating
|
||||
# - use of lists instead of tuples
|
||||
# - adjust code in 'speed_string' when knot speed is within +-1 range (English grammar to be confirmed)
|
||||
# - add pytest tests for checking the above
|
||||
# - adjust minor clerical errors
|
||||
# Works with python 3.11.2
|
||||
# Works with MicroPython 1.19.1 after stripping annotations (refer to https://github.com/orgs/micropython/discussions/10529#discussioncomment-4764412)
|
||||
|
||||
# Import utime or time for fix time handling
|
||||
try:
|
||||
|
@ -21,27 +39,118 @@ except ImportError:
|
|||
# Should still support millisecond resolution.
|
||||
import time
|
||||
|
||||
L_YMD: Final[str] = "l_ymd"
|
||||
S_MDY: Final[str] = "s_mdy"
|
||||
S_DMY: Final[str] = "s_dmy"
|
||||
LONG: Final[str] = "long"
|
||||
_DATE_FORMAT: tuple = (
|
||||
L_YMD,
|
||||
S_MDY,
|
||||
S_DMY,
|
||||
LONG,
|
||||
) # Not in use; could be checked against, as guard
|
||||
|
||||
S_HMS: Final[str] = "s_hms"
|
||||
_TIME_FORMAT: tuple = (S_HMS,) # Not in use; could be checked against, as guard
|
||||
|
||||
_NORTH: Final[str] = "N"
|
||||
_SOUTH: Final[str] = "S"
|
||||
_EAST: Final[str] = "E"
|
||||
_WEST: Final[str] = "W"
|
||||
|
||||
MPH: Final[str] = "mph"
|
||||
KPH: Final[str] = "km/h"
|
||||
KNOT: Final[str] = "knot"
|
||||
_SPEED_UNIT: tuple = (MPH, KPH, KNOT)
|
||||
|
||||
FULL_FLOAT: Final[str] = "full_float"
|
||||
F2_2: Final[str] = "f2_2"
|
||||
_SPEED_FORMAT: tuple = (
|
||||
FULL_FLOAT,
|
||||
F2_2,
|
||||
) # Not in use; could be checked against, as guard
|
||||
|
||||
DD: Final[str] = "dd"
|
||||
DMS: Final[str] = "dms"
|
||||
DDM: Final[str] = "ddm"
|
||||
_LONG_LAT_FORMAT: tuple = (
|
||||
DD,
|
||||
DMS,
|
||||
DDM,
|
||||
) # Not in use; could be checked against, as guard
|
||||
|
||||
# Month ending formats
|
||||
_FIRST: tuple = (1, 21, 31)
|
||||
_SECOND: tuple = (2, 22)
|
||||
_THIRD: tuple = (3, 23)
|
||||
|
||||
# Specific types
|
||||
t_code_message = dict[str, str]
|
||||
t_antenna = tuple[dict[str, str], ...]
|
||||
t_satellite = tuple[int | None, int | None, int | None]
|
||||
t_satellite_dict = dict[int, t_satellite]
|
||||
|
||||
|
||||
ANTENNA_PA1616S: Final[t_antenna] = (
|
||||
{"code": "1", "message": "Internal"},
|
||||
{"code": "2", "message": "Active"},
|
||||
{"code": "3", "message": "Error. Antenna shorted"},
|
||||
)
|
||||
ANTENNA_PA6H: Final[t_antenna] = (
|
||||
{"code": "2", "message": "Internal"},
|
||||
{"code": "3", "message": "Active"},
|
||||
{"code": "1", "message": "Error. Antenna shorted"},
|
||||
)
|
||||
|
||||
|
||||
class MicropyGPS(object):
|
||||
"""GPS NMEA Sentence Parser. Creates object that stores all relevant GPS data and statistics.
|
||||
Parses sentences one character at a time using update(). """
|
||||
Parses sentences one character at a time using update()."""
|
||||
|
||||
# Max Number of Characters a valid sentence can be (based on GGA sentence)
|
||||
SENTENCE_LIMIT = 90
|
||||
__HEMISPHERES = ('N', 'S', 'E', 'W')
|
||||
__NO_FIX = 1
|
||||
__FIX_2D = 2
|
||||
__FIX_3D = 3
|
||||
__DIRECTIONS = ('N', 'NNE', 'NE', 'ENE', 'E', 'ESE', 'SE', 'SSE', 'S', 'SSW', 'SW', 'WSW', 'W',
|
||||
'WNW', 'NW', 'NNW')
|
||||
__MONTHS = ('January', 'February', 'March', 'April', 'May',
|
||||
'June', 'July', 'August', 'September', 'October',
|
||||
'November', 'December')
|
||||
SENTENCE_LIMIT: Final[int] = 90
|
||||
|
||||
def __init__(self, local_offset=0, location_formatting='ddm'):
|
||||
__HEMISPHERES: Final[tuple[str, str, str, str]] = (_NORTH, _SOUTH, _EAST, _WEST)
|
||||
__NO_FIX = 1
|
||||
# __FIX_2D = 2 # Not used
|
||||
# __FIX_3D = 3 # Not used
|
||||
__DIRECTIONS: Final[tuple[str, ...]] = (
|
||||
"N",
|
||||
"NNE",
|
||||
"NE",
|
||||
"ENE",
|
||||
"E",
|
||||
"ESE",
|
||||
"SE",
|
||||
"SSE",
|
||||
"S",
|
||||
"SSW",
|
||||
"SW",
|
||||
"WSW",
|
||||
"W",
|
||||
"WNW",
|
||||
"NW",
|
||||
"NNW",
|
||||
)
|
||||
__MONTHS: Final[tuple[str, ...]] = (
|
||||
"January",
|
||||
"February",
|
||||
"March",
|
||||
"April",
|
||||
"May",
|
||||
"June",
|
||||
"July",
|
||||
"August",
|
||||
"September",
|
||||
"October",
|
||||
"November",
|
||||
"December",
|
||||
)
|
||||
|
||||
def __init__(self, local_offset: int = 0, location_formatting: str = DDM):
|
||||
"""
|
||||
Setup GPS Object Status Flags, Internal Data Registers, etc
|
||||
local_offset (int): Timzone Difference to UTC
|
||||
local_offset (int): Timezone Difference to UTC
|
||||
location_formatting (str): Style For Presenting Longitude/Latitude:
|
||||
Decimal Degree Minute (ddm) - 40° 26.767′ N
|
||||
Degrees Minutes Seconds (dms) - 40° 26′ 46″ N
|
||||
|
@ -50,12 +159,12 @@ class MicropyGPS(object):
|
|||
|
||||
#####################
|
||||
# Object Status Flags
|
||||
self.sentence_active = False
|
||||
self.active_segment = 0
|
||||
self.process_crc = False
|
||||
self.gps_segments = []
|
||||
self.crc_xor = 0
|
||||
self.char_count = 0
|
||||
self.sentence_active: bool = False
|
||||
self.active_segment: int = 0
|
||||
self.process_crc: bool = False
|
||||
self.gps_segments: list[str] = []
|
||||
self.crc_xor: int = 0
|
||||
self.char_count: int = 0
|
||||
self.fix_time = 0
|
||||
|
||||
#####################
|
||||
|
@ -67,20 +176,20 @@ class MicropyGPS(object):
|
|||
#####################
|
||||
# Logging Related
|
||||
self.log_handle = None
|
||||
self.log_en = False
|
||||
self.log_en: bool = False
|
||||
|
||||
#####################
|
||||
# Data From Sentences
|
||||
# Time
|
||||
self.timestamp = [0, 0, 0.0]
|
||||
self.date = [0, 0, 0]
|
||||
self.timestamp: list[int | float] = [0, 0, 0.0]
|
||||
self.date: list[int | float] = [0, 0, 0]
|
||||
self.local_offset = local_offset
|
||||
|
||||
# Position/Motion
|
||||
self._latitude = [0, 0.0, 'N']
|
||||
self._longitude = [0, 0.0, 'W']
|
||||
self._latitude = [0, 0.0, _NORTH]
|
||||
self._longitude = [0, 0.0, _WEST]
|
||||
self.coord_format = location_formatting
|
||||
self.speed = [0.0, 0.0, 0.0]
|
||||
self.speed: list[float] = [0.0, 0.0, 0.0]
|
||||
self.course = 0.0
|
||||
self.altitude = 0.0
|
||||
self.geoid_height = 0.0
|
||||
|
@ -88,27 +197,33 @@ class MicropyGPS(object):
|
|||
# GPS Info
|
||||
self.satellites_in_view = 0
|
||||
self.satellites_in_use = 0
|
||||
self.satellites_used = []
|
||||
self.satellites_used: list[int] = []
|
||||
self.last_sv_sentence = 0
|
||||
self.total_sv_sentences = 0
|
||||
self.satellite_data = dict()
|
||||
self.satellite_data: t_satellite_dict = dict()
|
||||
self.hdop = 0.0
|
||||
self.pdop = 0.0
|
||||
self.vdop = 0.0
|
||||
self.valid = False
|
||||
self.valid: bool = False
|
||||
self.fix_stat = 0
|
||||
self.fix_type = 1
|
||||
|
||||
# Antenna status
|
||||
self.antenna_status = "Unknown antenna status"
|
||||
|
||||
# Speed format representation
|
||||
self._speed_formatter: Callable = self.full_format
|
||||
|
||||
########################################
|
||||
# Coordinates Translation Functions
|
||||
########################################
|
||||
@property
|
||||
def latitude(self):
|
||||
"""Format Latitude Data Correctly"""
|
||||
if self.coord_format == 'dd':
|
||||
decimal_degrees = self._latitude[0] + (self._latitude[1] / 60)
|
||||
if self.coord_format == DD:
|
||||
decimal_degrees: float = self._latitude[0] + (self._latitude[1] / 60)
|
||||
return [decimal_degrees, self._latitude[2]]
|
||||
elif self.coord_format == 'dms':
|
||||
elif self.coord_format == DMS:
|
||||
minute_parts = modf(self._latitude[1])
|
||||
seconds = round(minute_parts[0] * 60)
|
||||
return [self._latitude[0], int(minute_parts[1]), seconds, self._latitude[2]]
|
||||
|
@ -118,25 +233,30 @@ class MicropyGPS(object):
|
|||
@property
|
||||
def longitude(self):
|
||||
"""Format Longitude Data Correctly"""
|
||||
if self.coord_format == 'dd':
|
||||
decimal_degrees = self._longitude[0] + (self._longitude[1] / 60)
|
||||
if self.coord_format == DD:
|
||||
decimal_degrees = self._longitude[0] + (float(self._longitude[1]) / 60)
|
||||
return [decimal_degrees, self._longitude[2]]
|
||||
elif self.coord_format == 'dms':
|
||||
minute_parts = modf(self._longitude[1])
|
||||
elif self.coord_format == DMS:
|
||||
minute_parts: tuple[float, float] = modf(float(self._longitude[1]))
|
||||
seconds = round(minute_parts[0] * 60)
|
||||
return [self._longitude[0], int(minute_parts[1]), seconds, self._longitude[2]]
|
||||
return [
|
||||
self._longitude[0],
|
||||
int(minute_parts[1]),
|
||||
seconds,
|
||||
self._longitude[2],
|
||||
]
|
||||
else:
|
||||
return self._longitude
|
||||
|
||||
########################################
|
||||
# Logging Related Functions
|
||||
########################################
|
||||
def start_logging(self, target_file, mode="append"):
|
||||
def start_logging(self, target_file, mode="append") -> bool:
|
||||
"""
|
||||
Create GPS data log object
|
||||
"""
|
||||
# Set Write Mode Overwrite or Append
|
||||
mode_code = 'w' if mode == 'new' else 'a'
|
||||
mode_code = "w" if mode == "new" else "a"
|
||||
|
||||
try:
|
||||
self.log_handle = open(target_file, mode_code)
|
||||
|
@ -147,32 +267,40 @@ class MicropyGPS(object):
|
|||
self.log_en = True
|
||||
return True
|
||||
|
||||
def stop_logging(self):
|
||||
def stop_logging(self) -> bool:
|
||||
"""
|
||||
Closes the log file handler and disables further logging
|
||||
"""
|
||||
try:
|
||||
self.log_handle.close()
|
||||
except AttributeError:
|
||||
# try:
|
||||
# self.log_handle.close()
|
||||
# except AttributeError:
|
||||
# print("Invalid Handle")
|
||||
# return False
|
||||
if self.log_handle is None:
|
||||
print("Invalid Handle")
|
||||
return False
|
||||
|
||||
self.log_handle.close()
|
||||
self.log_en = False
|
||||
return True
|
||||
|
||||
def write_log(self, log_string):
|
||||
"""Attempts to write the last valid NMEA sentence character to the active file handler
|
||||
"""
|
||||
try:
|
||||
self.log_handle.write(log_string)
|
||||
except TypeError:
|
||||
def write_log(self, log_string) -> bool:
|
||||
"""Attempts to write the last valid NMEA sentence character to the active file handler"""
|
||||
# try:
|
||||
# self.log_handle.write(log_string)
|
||||
# except TypeError:
|
||||
# return False
|
||||
# return True
|
||||
if self.log_handle is None:
|
||||
print("Invalid Handle")
|
||||
return False
|
||||
self.log_handle.write(log_string)
|
||||
return True
|
||||
|
||||
########################################
|
||||
# Sentence Parsers
|
||||
########################################
|
||||
def gprmc(self):
|
||||
def gprmc(self) -> bool:
|
||||
"""Parse Recommended Minimum Specific GPS/Transit data (RMC)Sentence.
|
||||
Updates UTC timestamp, latitude, longitude, Course, Speed, Date, and fix status
|
||||
"""
|
||||
|
@ -202,16 +330,15 @@ class MicropyGPS(object):
|
|||
day = int(date_string[0:2])
|
||||
month = int(date_string[2:4])
|
||||
year = int(date_string[4:6])
|
||||
self.date = (day, month, year)
|
||||
self.date = [day, month, year]
|
||||
else: # No Date stamp yet
|
||||
self.date = (0, 0, 0)
|
||||
self.date = [0, 0, 0]
|
||||
|
||||
except ValueError: # Bad Date stamp value present
|
||||
return False
|
||||
|
||||
# Check Receiver Data Valid Flag
|
||||
if self.gps_segments[2] == 'A': # Data from Receiver is Valid/Has Fix
|
||||
|
||||
if self.gps_segments[2] == "A": # Data from Receiver is Valid/Has Fix
|
||||
# Longitude / Latitude
|
||||
try:
|
||||
# Latitude
|
||||
|
@ -263,15 +390,15 @@ class MicropyGPS(object):
|
|||
self.new_fix_time()
|
||||
|
||||
else: # Clear Position Data if Sentence is 'Invalid'
|
||||
self._latitude = [0, 0.0, 'N']
|
||||
self._longitude = [0, 0.0, 'W']
|
||||
self._latitude = [0, 0.0, _NORTH]
|
||||
self._longitude = [0, 0.0, _WEST]
|
||||
self.speed = [0.0, 0.0, 0.0]
|
||||
self.course = 0.0
|
||||
self.valid = False
|
||||
|
||||
return True
|
||||
|
||||
def gpgll(self):
|
||||
def gpgll(self) -> bool:
|
||||
"""Parse Geographic Latitude and Longitude (GLL)Sentence. Updates UTC timestamp, latitude,
|
||||
longitude, and fix status"""
|
||||
|
||||
|
@ -291,8 +418,7 @@ class MicropyGPS(object):
|
|||
return False
|
||||
|
||||
# Check Receiver Data Valid Flag
|
||||
if self.gps_segments[6] == 'A': # Data from Receiver is Valid/Has Fix
|
||||
|
||||
if self.gps_segments[6] == "A": # Data from Receiver is Valid/Has Fix
|
||||
# Longitude / Latitude
|
||||
try:
|
||||
# Latitude
|
||||
|
@ -324,13 +450,13 @@ class MicropyGPS(object):
|
|||
self.new_fix_time()
|
||||
|
||||
else: # Clear Position Data if Sentence is 'Invalid'
|
||||
self._latitude = [0, 0.0, 'N']
|
||||
self._longitude = [0, 0.0, 'W']
|
||||
self._latitude = [0, 0.0, _NORTH]
|
||||
self._longitude = [0, 0.0, _WEST]
|
||||
self.valid = False
|
||||
|
||||
return True
|
||||
|
||||
def gpvtg(self):
|
||||
def gpvtg(self) -> bool:
|
||||
"""Parse Track Made Good and Ground Speed (VTG) Sentence. Updates speed and course"""
|
||||
try:
|
||||
course = float(self.gps_segments[1]) if self.gps_segments[1] else 0.0
|
||||
|
@ -339,13 +465,15 @@ class MicropyGPS(object):
|
|||
return False
|
||||
|
||||
# Include mph and km/h
|
||||
self.speed = (spd_knt, spd_knt * 1.151, spd_knt * 1.852)
|
||||
# self.speed = (spd_knt, spd_knt * 1.151, spd_knt * 1.852)
|
||||
self.speed = [spd_knt, spd_knt * 1.151, spd_knt * 1.852]
|
||||
self.course = course
|
||||
return True
|
||||
|
||||
def gpgga(self):
|
||||
def gpgga(self) -> bool:
|
||||
"""Parse Global Positioning System Fix Data (GGA) Sentence. Updates UTC timestamp, latitude, longitude,
|
||||
fix status, satellites in use, Horizontal Dilution of Precision (HDOP), altitude, geoid height and fix status"""
|
||||
fix status, satellites in use, Horizontal Dilution of Precision (HDOP), altitude, geoid height and fix status
|
||||
"""
|
||||
|
||||
try:
|
||||
# UTC Timestamp
|
||||
|
@ -378,7 +506,6 @@ class MicropyGPS(object):
|
|||
|
||||
# Process Location and Speed Data if Fix is GOOD
|
||||
if fix_stat:
|
||||
|
||||
# Longitude / Latitude
|
||||
try:
|
||||
# Latitude
|
||||
|
@ -427,7 +554,7 @@ class MicropyGPS(object):
|
|||
|
||||
return True
|
||||
|
||||
def gpgsa(self):
|
||||
def gpgsa(self) -> bool:
|
||||
"""Parse GNSS DOP and Active Satellites (GSA) sentence. Updates GPS fix type, list of satellites used in
|
||||
fix calculation, Position Dilution of Precision (PDOP), Horizontal Dilution of Precision (HDOP), Vertical
|
||||
Dilution of Precision, and fix status"""
|
||||
|
@ -439,7 +566,7 @@ class MicropyGPS(object):
|
|||
return False
|
||||
|
||||
# Read All (up to 12) Available PRN Satellite Numbers
|
||||
sats_used = []
|
||||
sats_used: list[int] = []
|
||||
for sats in range(12):
|
||||
sat_number_str = self.gps_segments[3 + sats]
|
||||
if sat_number_str:
|
||||
|
@ -473,7 +600,7 @@ class MicropyGPS(object):
|
|||
|
||||
return True
|
||||
|
||||
def gpgsv(self):
|
||||
def gpgsv(self) -> bool:
|
||||
"""Parse Satellites in View (GSV) sentence. Updates number of SV Sentences,the number of the last SV sentence
|
||||
parsed, and data on each satellite present in the sentence"""
|
||||
try:
|
||||
|
@ -485,8 +612,7 @@ class MicropyGPS(object):
|
|||
|
||||
# Create a blank dict to store all the satellite data from this sentence in:
|
||||
# satellite PRN is key, tuple containing telemetry is value
|
||||
satellite_dict = dict()
|
||||
|
||||
satellite_dict: t_satellite_dict = dict()
|
||||
# Calculate Number of Satelites to pull data for and thus how many segment positions to read
|
||||
if num_sv_sentences == current_sv_sentence:
|
||||
# Last sentence may have 1-4 satellites; 5 - 20 positions
|
||||
|
@ -496,27 +622,26 @@ class MicropyGPS(object):
|
|||
|
||||
# Try to recover data for up to 4 satellites in sentence
|
||||
for sats in range(4, sat_segment_limit, 4):
|
||||
|
||||
# If a PRN is present, grab satellite data
|
||||
if self.gps_segments[sats]:
|
||||
try:
|
||||
sat_id = int(self.gps_segments[sats])
|
||||
except (ValueError,IndexError):
|
||||
except (ValueError, IndexError):
|
||||
return False
|
||||
|
||||
try: # elevation can be null (no value) when not tracking
|
||||
elevation = int(self.gps_segments[sats+1])
|
||||
except (ValueError,IndexError):
|
||||
elevation = int(self.gps_segments[sats + 1])
|
||||
except (ValueError, IndexError):
|
||||
elevation = None
|
||||
|
||||
try: # azimuth can be null (no value) when not tracking
|
||||
azimuth = int(self.gps_segments[sats+2])
|
||||
except (ValueError,IndexError):
|
||||
azimuth = int(self.gps_segments[sats + 2])
|
||||
except (ValueError, IndexError):
|
||||
azimuth = None
|
||||
|
||||
try: # SNR can be null (no value) when not tracking
|
||||
snr = int(self.gps_segments[sats+3])
|
||||
except (ValueError,IndexError):
|
||||
snr = int(self.gps_segments[sats + 3])
|
||||
except (ValueError, IndexError):
|
||||
snr = None
|
||||
# If no PRN is found, then the sentence has no more satellites to read
|
||||
else:
|
||||
|
@ -539,23 +664,63 @@ class MicropyGPS(object):
|
|||
|
||||
return True
|
||||
|
||||
def pgtop(self) -> bool:
|
||||
"""
|
||||
Receive antenna status
|
||||
|
||||
# $PGTOP,11,x
|
||||
When issuing one of the following commands:
|
||||
- PGCMD_PERIODIC_ANTENNA_STATUS or PGCMD_NO_PERIODIC_ANTENNA_STATUS to disable
|
||||
periodic output
|
||||
- INQUIRE_ANTENNA_STATUS: Inquiry antenna status - one shot
|
||||
|
||||
The MTK responses with: $PGTOP,11,value*checksum
|
||||
"""
|
||||
try:
|
||||
segment1: str = self.gps_segments[1]
|
||||
except ValueError:
|
||||
return False
|
||||
if segment1 == "11":
|
||||
self.antenna_status = self.gps_segments[2]
|
||||
else:
|
||||
self.antenna_status = "-1" # unknown status
|
||||
return True
|
||||
|
||||
def get_antenna_status(self, GPS_module_type: t_antenna) -> str:
|
||||
"""
|
||||
Get Antenna status, after MTK has responded with: $PGTOP,11,value*checksum.
|
||||
|
||||
:param GPS_module_type holds the data for the relevant GPS module
|
||||
|
||||
:return: status of the antenna connected to the GPS module.
|
||||
"""
|
||||
for antenna in GPS_module_type:
|
||||
if self.antenna_status == antenna["code"]:
|
||||
return antenna["message"]
|
||||
else:
|
||||
return "Error in antenna status"
|
||||
|
||||
##########################################
|
||||
# Data Stream Handler Functions
|
||||
##########################################
|
||||
|
||||
def new_sentence(self):
|
||||
def new_sentence(self) -> None:
|
||||
"""Adjust Object Flags in Preparation for a New Sentence"""
|
||||
self.gps_segments = ['']
|
||||
self.gps_segments = [""]
|
||||
self.active_segment = 0
|
||||
self.crc_xor = 0
|
||||
self.sentence_active = True
|
||||
self.process_crc = True
|
||||
self.char_count = 0
|
||||
|
||||
def update(self, new_char):
|
||||
"""Process a new input char and updates GPS object if necessary based on special characters ('$', ',', '*')
|
||||
def update(self, new_char: str) -> None | str:
|
||||
"""
|
||||
Process a new input char and updates GPS object if necessary based on special characters ('$', ',', '*')
|
||||
|
||||
Function builds a list of received string that are validate by CRC prior to parsing by the appropriate
|
||||
sentence function. Returns sentence type on successful parse, None otherwise"""
|
||||
sentence function.
|
||||
:return: sentence type on successful parse, None otherwise
|
||||
"""
|
||||
|
||||
valid_sentence = False
|
||||
|
||||
|
@ -570,24 +735,23 @@ class MicropyGPS(object):
|
|||
self.write_log(new_char)
|
||||
|
||||
# Check if a new string is starting ($)
|
||||
if new_char == '$':
|
||||
if new_char == "$":
|
||||
self.new_sentence()
|
||||
return None
|
||||
|
||||
elif self.sentence_active:
|
||||
|
||||
# Check if sentence is ending (*)
|
||||
if new_char == '*':
|
||||
if new_char == "*":
|
||||
self.process_crc = False
|
||||
self.active_segment += 1
|
||||
self.gps_segments.append('')
|
||||
self.gps_segments.append("")
|
||||
return None
|
||||
|
||||
# Check if a section is ended (,), Create a new substring to feed
|
||||
# characters to
|
||||
elif new_char == ',':
|
||||
elif new_char == ",":
|
||||
self.active_segment += 1
|
||||
self.gps_segments.append('')
|
||||
self.gps_segments.append("")
|
||||
|
||||
# Store All Other printable character and check CRC when ready
|
||||
else:
|
||||
|
@ -595,10 +759,11 @@ class MicropyGPS(object):
|
|||
|
||||
# When CRC input is disabled, sentence is nearly complete
|
||||
if not self.process_crc:
|
||||
|
||||
if len(self.gps_segments[self.active_segment]) == 2:
|
||||
try:
|
||||
final_crc = int(self.gps_segments[self.active_segment], 16)
|
||||
final_crc = int(
|
||||
self.gps_segments[self.active_segment], 16
|
||||
)
|
||||
if self.crc_xor == final_crc:
|
||||
valid_sentence = True
|
||||
else:
|
||||
|
@ -616,10 +781,8 @@ class MicropyGPS(object):
|
|||
self.sentence_active = False # Clear Active Processing Flag
|
||||
|
||||
if self.gps_segments[0] in self.supported_sentences:
|
||||
|
||||
# parse the Sentence Based on the message type, return True if parse is clean
|
||||
if self.supported_sentences[self.gps_segments[0]](self):
|
||||
|
||||
# Let host know that the GPS object was updated by returning parsed sentence type
|
||||
self.parsed_sentences += 1
|
||||
return self.gps_segments[0]
|
||||
|
@ -631,9 +794,12 @@ class MicropyGPS(object):
|
|||
# Tell Host no new sentence was parsed
|
||||
return None
|
||||
|
||||
def new_fix_time(self):
|
||||
"""Updates a high resolution counter with current time when fix is updated. Currently only triggered from
|
||||
GGA, GSA and RMC sentences"""
|
||||
def new_fix_time(self) -> None:
|
||||
"""
|
||||
Updates a high resolution (soft) counter with current time when fix is updated.
|
||||
|
||||
Currently only triggered from GGA, GSA and RMC sentences
|
||||
"""
|
||||
try:
|
||||
self.fix_time = utime.ticks_ms()
|
||||
except NameError:
|
||||
|
@ -644,36 +810,40 @@ class MicropyGPS(object):
|
|||
# These functions make working with the GPS object data easier
|
||||
#########################################
|
||||
|
||||
def satellite_data_updated(self):
|
||||
def satellite_data_updated(self) -> bool:
|
||||
"""
|
||||
Checks if the all the GSV sentences in a group have been read, making satellite data complete
|
||||
:return: boolean
|
||||
"""
|
||||
if self.total_sv_sentences > 0 and self.total_sv_sentences == self.last_sv_sentence:
|
||||
if (
|
||||
self.total_sv_sentences > 0
|
||||
and self.total_sv_sentences == self.last_sv_sentence
|
||||
):
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def unset_satellite_data_updated(self):
|
||||
def unset_satellite_data_updated(self) -> None:
|
||||
"""
|
||||
Mark GSV sentences as read indicating the data has been used and future updates are fresh
|
||||
"""
|
||||
self.last_sv_sentence = 0
|
||||
|
||||
def satellites_visible(self):
|
||||
def satellites_visible(self) -> list[int]:
|
||||
"""
|
||||
Returns a list of of the satellite PRNs currently visible to the receiver
|
||||
:return: list
|
||||
|
||||
:return: list of satellite ids
|
||||
"""
|
||||
return list(self.satellite_data.keys())
|
||||
|
||||
def time_since_fix(self):
|
||||
"""Returns number of millisecond since the last sentence with a valid fix was parsed. Returns 0 if
|
||||
def time_since_fix(self) -> float:
|
||||
"""Returns number of millisecond since the last sentence with a valid fix was parsed. Returns -1.0 if
|
||||
no fix has been found"""
|
||||
|
||||
# Test if a Fix has been found
|
||||
if self.fix_time == 0:
|
||||
return -1
|
||||
return -1.0
|
||||
|
||||
# Try calculating fix time using utime; if not running MicroPython
|
||||
# time.time() returns a floating point value in secs
|
||||
|
@ -702,129 +872,221 @@ class MicropyGPS(object):
|
|||
|
||||
return final_dir
|
||||
|
||||
def latitude_string(self):
|
||||
def latitude_string(self) -> str:
|
||||
"""
|
||||
Create a readable string of the current latitude data
|
||||
:return: string
|
||||
|
||||
:return: formatted string according to 'self.coord_format'
|
||||
"""
|
||||
if self.coord_format == 'dd':
|
||||
if self.coord_format == DD:
|
||||
formatted_latitude = self.latitude
|
||||
lat_string = str(formatted_latitude[0]) + '° ' + str(self._latitude[2])
|
||||
elif self.coord_format == 'dms':
|
||||
lat_string = f"{formatted_latitude[0]}° {self._latitude[2]}"
|
||||
elif self.coord_format == DMS:
|
||||
formatted_latitude = self.latitude
|
||||
lat_string = str(formatted_latitude[0]) + '° ' + str(formatted_latitude[1]) + "' " + str(formatted_latitude[2]) + '" ' + str(formatted_latitude[3])
|
||||
lat_string = (
|
||||
f"{formatted_latitude[0]}° {formatted_latitude[1]}'"
|
||||
f' {formatted_latitude[2]}" {formatted_latitude[3]}'
|
||||
)
|
||||
else:
|
||||
lat_string = str(self._latitude[0]) + '° ' + str(self._latitude[1]) + "' " + str(self._latitude[2])
|
||||
lat_string = (
|
||||
f"{self._latitude[0]}° {self._latitude[1]}' {self._latitude[2]}"
|
||||
)
|
||||
return lat_string
|
||||
|
||||
def longitude_string(self):
|
||||
def longitude_string(self) -> str:
|
||||
"""
|
||||
Create a readable string of the current longitude data
|
||||
:return: string
|
||||
|
||||
:return: formatted string according to
|
||||
"""
|
||||
if self.coord_format == 'dd':
|
||||
if self.coord_format == DD:
|
||||
formatted_longitude = self.longitude
|
||||
lon_string = str(formatted_longitude[0]) + '° ' + str(self._longitude[2])
|
||||
elif self.coord_format == 'dms':
|
||||
lon_string = f"{formatted_longitude[0]}° {self._longitude[2]}"
|
||||
elif self.coord_format == DMS:
|
||||
formatted_longitude = self.longitude
|
||||
lon_string = str(formatted_longitude[0]) + '° ' + str(formatted_longitude[1]) + "' " + str(formatted_longitude[2]) + '" ' + str(formatted_longitude[3])
|
||||
lon_string = (
|
||||
f"{formatted_longitude[0]}° {formatted_longitude[1]}'"
|
||||
f' {formatted_longitude[2]}" {formatted_longitude[3]}'
|
||||
)
|
||||
else:
|
||||
lon_string = str(self._longitude[0]) + '° ' + str(self._longitude[1]) + "' " + str(self._longitude[2])
|
||||
lon_string = (
|
||||
f"{self._longitude[0]}° {self._longitude[1]}' {self._longitude[2]}"
|
||||
)
|
||||
return lon_string
|
||||
|
||||
def speed_string(self, unit='kph'):
|
||||
def set_speed_formatter(self, formatter: Callable) -> None:
|
||||
"""Initialise the format function for speed representation"""
|
||||
self._speed_formatter = formatter
|
||||
|
||||
def full_format(self, speed: float) -> str:
|
||||
"""
|
||||
Function to format the speed.
|
||||
|
||||
Here: full float format (any length)
|
||||
|
||||
:return: string formatted speed value
|
||||
"""
|
||||
return f"{speed}"
|
||||
|
||||
def f2_2_format(self, speed: float) -> str:
|
||||
"""
|
||||
Function to format the speed.
|
||||
|
||||
Here: '06.78'
|
||||
|
||||
:return: string formatted speed value
|
||||
"""
|
||||
return f"{speed:0>5.2f}"
|
||||
|
||||
def speed_string(self, unit=KPH) -> str:
|
||||
"""
|
||||
Creates a readable string of the current speed data in one of three units
|
||||
:param unit: string of 'kph','mph, or 'knot'
|
||||
:return:
|
||||
|
||||
The 'self._speed_formatter()' object contains the format function representation to apply to the speed
|
||||
|
||||
:param unit: string of SPEED_UNIT
|
||||
:return: speed formatted string according to the SPEED_UNIT
|
||||
"""
|
||||
if unit == 'mph':
|
||||
speed_string = str(self.speed[1]) + ' mph'
|
||||
if unit not in _SPEED_UNIT:
|
||||
print(f"Unknown unit for speed: {unit}")
|
||||
|
||||
elif unit == 'knot':
|
||||
if self.speed[0] == 1:
|
||||
unit_str = ' knot'
|
||||
if unit == MPH:
|
||||
speed_string = f"{self._speed_formatter(self.speed[1])} {MPH}"
|
||||
elif unit == KNOT:
|
||||
if 1 >= self.speed[0] >= -1:
|
||||
speed_string = f"{self._speed_formatter(self.speed[0])} {KNOT}"
|
||||
else:
|
||||
unit_str = ' knots'
|
||||
speed_string = str(self.speed[0]) + unit_str
|
||||
|
||||
speed_string = f"{self._speed_formatter(self.speed[0])} {KNOT}s"
|
||||
else:
|
||||
speed_string = str(self.speed[2]) + ' km/h'
|
||||
speed_string = f"{self._speed_formatter(self.speed[2])} {KPH}"
|
||||
|
||||
return speed_string
|
||||
|
||||
def date_string(self, formatting='s_mdy', century='20'):
|
||||
def date_string(self, formatting=S_MDY, century="20") -> str:
|
||||
"""
|
||||
Creates a readable string of the current date.
|
||||
Can select between long format: Januray 1st, 2014
|
||||
or two short formats:
|
||||
11/01/2014 (MM/DD/YYYY)
|
||||
01/11/2014 (DD/MM/YYYY)
|
||||
:param formatting: string 's_mdy', 's_dmy', or 'long'
|
||||
|
||||
Select between:
|
||||
- long format: Januray 1st, 2014
|
||||
- or two short numeric formats:
|
||||
- 11/01/2014 (MM/DD/YYYY)
|
||||
- 01/11/2014 (DD/MM/YYYY)
|
||||
- or this long numeric one:
|
||||
- 2023-01-01 (YYY-MM-DD)
|
||||
|
||||
:param formatting: string of DATE_FORMAT
|
||||
:param century: int delineating the century the GPS data is from (19 for 19XX, 20 for 20XX)
|
||||
:return: date_string string with long or short format date
|
||||
"""
|
||||
|
||||
# Long Format Januray 1st, 2014
|
||||
if formatting == 'long':
|
||||
# Long Format January 1st, 2014
|
||||
if formatting == LONG:
|
||||
# Retrieve Month string from private set
|
||||
month = self.__MONTHS[self.date[1] - 1]
|
||||
month = self.__MONTHS[int(self.date[1]) - 1]
|
||||
|
||||
# Determine Date Suffix
|
||||
if self.date[0] in (1, 21, 31):
|
||||
suffix = 'st'
|
||||
elif self.date[0] in (2, 22):
|
||||
suffix = 'nd'
|
||||
elif self.date[0] == (3, 23):
|
||||
suffix = 'rd'
|
||||
if self.date[0] in _FIRST:
|
||||
day = f"{self.date[0]}st"
|
||||
elif self.date[0] in _SECOND:
|
||||
day = f"{self.date[0]}nd"
|
||||
elif self.date[0] in _THIRD:
|
||||
day = f"{self.date[0]}rd"
|
||||
else:
|
||||
suffix = 'th'
|
||||
day = f"{self.date[0]}th"
|
||||
|
||||
day = str(self.date[0]) + suffix # Create Day String
|
||||
|
||||
year = century + str(self.date[2]) # Create Year String
|
||||
|
||||
date_string = month + ' ' + day + ', ' + year # Put it all together
|
||||
year = f"{century}{self.date[2]}" # Create Year String
|
||||
date_string = f"{month} {day}, {year}" # Put it all together
|
||||
|
||||
else:
|
||||
# Add leading zeros to day string if necessary
|
||||
if self.date[0] < 10:
|
||||
day = '0' + str(self.date[0])
|
||||
day = f"0{self.date[0]}"
|
||||
else:
|
||||
day = str(self.date[0])
|
||||
day = f"{self.date[0]}"
|
||||
|
||||
# Add leading zeros to month string if necessary
|
||||
if self.date[1] < 10:
|
||||
month = '0' + str(self.date[1])
|
||||
month = f"0{self.date[1]}"
|
||||
else:
|
||||
month = str(self.date[1])
|
||||
month = f"{self.date[1]}"
|
||||
|
||||
# Add leading zeros to year string if necessary
|
||||
if self.date[2] < 10:
|
||||
year = '0' + str(self.date[2])
|
||||
year = f"0{self.date[2]}"
|
||||
else:
|
||||
year = str(self.date[2])
|
||||
year = f"{self.date[2]}"
|
||||
|
||||
# Build final string based on desired formatting
|
||||
if formatting == 's_dmy':
|
||||
date_string = day + '/' + month + '/' + year
|
||||
if formatting == S_DMY:
|
||||
date_string = f"{day}/{month}/{year}"
|
||||
|
||||
else: # Default date format
|
||||
date_string = month + '/' + day + '/' + year
|
||||
elif formatting == L_YMD:
|
||||
date_string = f"{century}{year}-{month}-{day}"
|
||||
|
||||
else: # Default date format (S_MDY)
|
||||
date_string = f"{month}/{day}/{year}"
|
||||
|
||||
return date_string
|
||||
|
||||
def timestamp_string(self, formatting=S_HMS) -> str:
|
||||
"""
|
||||
Creates a readable string of the current timestamp.
|
||||
|
||||
One format hh:mm:ss.us 12:00:23.789
|
||||
|
||||
:param formatting: string 's_hms'
|
||||
:return: timestamp_string string format timestamp
|
||||
"""
|
||||
|
||||
if formatting == S_HMS:
|
||||
# Add leading zeros to hour string if necessary
|
||||
if self.timestamp[0] < 10:
|
||||
hour = f"0{self.timestamp[0]}"
|
||||
else:
|
||||
hour = f"{self.timestamp[0]}"
|
||||
|
||||
# Add leading zeros to minute string if necessary
|
||||
if self.timestamp[1] < 10:
|
||||
minute = f"0{self.timestamp[1]}"
|
||||
else:
|
||||
minute = f"{self.timestamp[1]}"
|
||||
|
||||
# Add leading zeros to second string if necessary
|
||||
if self.timestamp[2] < 10:
|
||||
second = f"0{self.timestamp[2]}"
|
||||
else:
|
||||
second = f"{self.timestamp[2]}"
|
||||
|
||||
# Build final string
|
||||
timestamp_string = f"{hour}:{minute}:{second}"
|
||||
|
||||
else:
|
||||
raise ValueError("Unkown timestamp format")
|
||||
|
||||
return timestamp_string
|
||||
|
||||
# All the currently supported NMEA sentences
|
||||
supported_sentences = {'GPRMC': gprmc, 'GLRMC': gprmc,
|
||||
'GPGGA': gpgga, 'GLGGA': gpgga,
|
||||
'GPVTG': gpvtg, 'GLVTG': gpvtg,
|
||||
'GPGSA': gpgsa, 'GLGSA': gpgsa,
|
||||
'GPGSV': gpgsv, 'GLGSV': gpgsv,
|
||||
'GPGLL': gpgll, 'GLGLL': gpgll,
|
||||
'GNGGA': gpgga, 'GNRMC': gprmc,
|
||||
'GNVTG': gpvtg, 'GNGLL': gpgll,
|
||||
'GNGSA': gpgsa,
|
||||
}
|
||||
supported_sentences = {
|
||||
"GPRMC": gprmc,
|
||||
"GLRMC": gprmc,
|
||||
"GPGGA": gpgga,
|
||||
"GLGGA": gpgga,
|
||||
"GPVTG": gpvtg,
|
||||
"GLVTG": gpvtg,
|
||||
"GPGSA": gpgsa,
|
||||
"GLGSA": gpgsa,
|
||||
"GPGSV": gpgsv,
|
||||
"GLGSV": gpgsv,
|
||||
"GPGLL": gpgll,
|
||||
"GLGLL": gpgll,
|
||||
"GNGGA": gpgga,
|
||||
"GNRMC": gprmc,
|
||||
"GNVTG": gpvtg,
|
||||
"GNGLL": gpgll,
|
||||
"GNGSA": gpgsa,
|
||||
"PGTOP": pgtop,
|
||||
}
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pass
|
||||
|
|
1260
test_micropyGPS.py
1260
test_micropyGPS.py
Plik diff jest za duży
Load Diff
Ładowanie…
Reference in New Issue