Merge pull request #43 from Meisterschueler/parser_classes

Parser classes
pull/45/head
Meisterschueler 2017-12-02 10:43:30 +01:00 zatwierdzone przez GitHub
commit 7d738931e2
Nie znaleziono w bazie danych klucza dla tego podpisu
ID klucza GPG: 4AEE18F83AFDEB23
28 zmienionych plików z 352 dodań i 240 usunięć

Wyświetl plik

@ -10,6 +10,14 @@ It can be used to connect to the OGN-APRS-Servers and to parse APRS-/OGN-Message
A full featured gateway with build-in database is provided by [ogn-python](https://github.com/glidernet/ogn-python).
## Installation
python-ogn-client is available at PyPI. So for installation simply use pip:
```
pip install ogn-client
```
## Example Usage
Parse APRS/OGN packet.

Wyświetl plik

@ -0,0 +1,19 @@
class BaseParser():
def __init__(self):
self.beacon_type = 'unknown'
def parse(self, aprs_comment, aprs_type):
if aprs_type == "position":
data = self.parse_position(aprs_comment)
elif aprs_type == "status":
data = self.parse_status(aprs_comment)
else:
raise ValueError("aprs_type {} unknown".format(aprs_type))
data.update({'beacon_type': self.beacon_type})
return data
def parse_position(self, aprs_comment):
raise NotImplementedError("Position parser for parser '{}' not yet implemented".format(self.beacon_type))
def parse_status(self, aprs_comment):
raise NotImplementedError("Status parser for parser '{}' not yet implemented".format(self.beacon_type))

Wyświetl plik

@ -0,0 +1,31 @@
import re
from ogn.parser.pattern import PATTERN_AIRCRAFT_BEACON
from ogn.parser.utils import fpm2ms
from .base import BaseParser
class FlarmParser(BaseParser):
def __init__(self):
self.beacon_type = 'aircraft_beacon'
@staticmethod
def parse_position(aprs_comment):
ac_match = re.search(PATTERN_AIRCRAFT_BEACON, aprs_comment)
return {'address_type': int(ac_match.group('details'), 16) & 0b00000011,
'aircraft_type': (int(ac_match.group('details'), 16) & 0b01111100) >> 2,
'stealth': (int(ac_match.group('details'), 16) & 0b10000000) >> 7 == 1,
'address': ac_match.group('id'),
'climb_rate': int(ac_match.group('climb_rate')) * fpm2ms if ac_match.group('climb_rate') else None,
'turn_rate': float(ac_match.group('turn_rate')) if ac_match.group('turn_rate') else None,
'flightlevel': float(ac_match.group('flight_level')) if ac_match.group('flight_level') else None,
'signal_quality': float(ac_match.group('signal_quality')) if ac_match.group('signal_quality') else None,
'error_count': int(ac_match.group('errors')) if ac_match.group('errors') else None,
'frequency_offset': float(ac_match.group('frequency_offset')) if ac_match.group('frequency_offset') else None,
'gps_status': ac_match.group('gps_accuracy') if ac_match.group('gps_accuracy') else None,
'software_version': float(ac_match.group('flarm_software_version')) if ac_match.group('flarm_software_version') else None,
'hardware_version': int(ac_match.group('flarm_hardware_version'), 16) if ac_match.group('flarm_hardware_version') else None,
'real_address': ac_match.group('flarm_id') if ac_match.group('flarm_id') else None,
'signal_power': float(ac_match.group('signal_power')) if ac_match.group('signal_power') else None,
'proximity': [hear[4:] for hear in ac_match.group('proximity').split(" ")] if ac_match.group('proximity') else None}

Wyświetl plik

@ -0,0 +1,6 @@
from .base import BaseParser
class LT24Parser(BaseParser):
def __init__(self):
self.beacon_type = 'lt24_beacon'

Wyświetl plik

@ -0,0 +1,23 @@
import re
from ogn.parser.pattern import PATTERN_NAVITER_BEACON
from ogn.parser.utils import fpm2ms
from .base import BaseParser
class NaviterParser(BaseParser):
def __init__(self):
self.beacon_type = 'naviter_beacon'
@staticmethod
def parse_position(aprs_comment):
match = re.search(PATTERN_NAVITER_BEACON, aprs_comment)
return {'stealth': (int(match.group('details'), 16) & 0b1000000000000000) >> 15 == 1,
'do_not_track': (int(match.group('details'), 16) & 0b0100000000000000) >> 14 == 1,
'aircraft_type': (int(match.group('details'), 16) & 0b0011110000000000) >> 10,
'address_type': (int(match.group('details'), 16) & 0b0000001111110000) >> 4,
'reserved': (int(match.group('details'), 16) & 0b0000000000001111),
'address': match.group('id'),
'climb_rate': int(match.group('climb_rate')) * fpm2ms if match.group('climb_rate') else None,
'turn_rate': float(match.group('turn_rate')) if match.group('turn_rate') else None}

Wyświetl plik

@ -0,0 +1,78 @@
import re
from ogn.parser.utils import fpm2ms
from ogn.parser.pattern import PATTERN_RECEIVER_BEACON, PATTERN_AIRCRAFT_BEACON
from .base import BaseParser
class OgnParser(BaseParser):
def __init__(self):
self.beacon_type = 'depends...'
def parse(self, aprs_comment, aprs_type):
if not aprs_comment:
return {'beacon_type': 'receiver_beacon'}
ac_data = self.parse_aircraft_beacon(aprs_comment)
if ac_data:
ac_data.update({'beacon_type': 'aircraft_beacon'})
return ac_data
rc_data = self.parse_receiver_beacon(aprs_comment)
if rc_data:
rc_data.update({'beacon_type': 'receiver_beacon'})
return rc_data
else:
return {'user_comment': aprs_comment,
'beacon_type': 'receiver_beacon'}
@staticmethod
def parse_aircraft_beacon(aprs_comment):
ac_match = re.search(PATTERN_AIRCRAFT_BEACON, aprs_comment)
if ac_match:
return {'address_type': int(ac_match.group('details'), 16) & 0b00000011,
'aircraft_type': (int(ac_match.group('details'), 16) & 0b01111100) >> 2,
'stealth': (int(ac_match.group('details'), 16) & 0b10000000) >> 7 == 1,
'address': ac_match.group('id'),
'climb_rate': int(ac_match.group('climb_rate')) * fpm2ms if ac_match.group('climb_rate') else None,
'turn_rate': float(ac_match.group('turn_rate')) if ac_match.group('turn_rate') else None,
'flightlevel': float(ac_match.group('flight_level')) if ac_match.group('flight_level') else None,
'signal_quality': float(ac_match.group('signal_quality')) if ac_match.group('signal_quality') else None,
'error_count': int(ac_match.group('errors')) if ac_match.group('errors') else None,
'frequency_offset': float(ac_match.group('frequency_offset')) if ac_match.group('frequency_offset') else None,
'gps_status': ac_match.group('gps_accuracy') if ac_match.group('gps_accuracy') else None,
'software_version': float(ac_match.group('flarm_software_version')) if ac_match.group('flarm_software_version') else None,
'hardware_version': int(ac_match.group('flarm_hardware_version'), 16) if ac_match.group('flarm_hardware_version') else None,
'real_address': ac_match.group('flarm_id') if ac_match.group('flarm_id') else None,
'signal_power': float(ac_match.group('signal_power')) if ac_match.group('signal_power') else None,
'proximity': [hear[4:] for hear in ac_match.group('proximity').split(" ")] if ac_match.group('proximity') else None}
else:
return None
@staticmethod
def parse_receiver_beacon(aprs_comment):
rec_match = re.search(PATTERN_RECEIVER_BEACON, aprs_comment)
if rec_match:
return {'version': rec_match.group('version'),
'platform': rec_match.group('platform'),
'cpu_load': float(rec_match.group('cpu_load')),
'free_ram': float(rec_match.group('ram_free')),
'total_ram': float(rec_match.group('ram_total')),
'ntp_error': float(rec_match.group('ntp_offset')),
'rt_crystal_correction': float(rec_match.group('ntp_correction')),
'voltage': float(rec_match.group('voltage')) if rec_match.group('voltage') else None,
'amperage': float(rec_match.group('amperage')) if rec_match.group('amperage') else None,
'cpu_temp': float(rec_match.group('cpu_temperature')) if rec_match.group('cpu_temperature') else None,
'senders_visible': int(rec_match.group('visible_senders')) if rec_match.group('visible_senders') else None,
'senders_total': int(rec_match.group('senders')) if rec_match.group('senders') else None,
'rec_crystal_correction': int(rec_match.group('rf_correction_manual')) if rec_match.group('rf_correction_manual') else None,
'rec_crystal_correction_fine': float(rec_match.group('rf_correction_automatic')) if rec_match.group('rf_correction_automatic') else None,
'rec_input_noise': float(rec_match.group('signal_quality')) if rec_match.group('signal_quality') else None,
'senders_signal': float(rec_match.group('senders_signal_quality')) if rec_match.group('senders_signal_quality') else None,
'senders_messages': float(rec_match.group('senders_messages')) if rec_match.group('senders_messages') else None,
'good_senders_signal': float(rec_match.group('good_senders_signal_quality')) if rec_match.group('good_senders_signal_quality') else None,
'good_senders': float(rec_match.group('good_senders')) if rec_match.group('good_senders') else None,
'good_and_bad_senders': float(rec_match.group('good_and_bad_senders')) if rec_match.group('good_and_bad_senders') else None}
else:
return None

Wyświetl plik

@ -0,0 +1,42 @@
import re
from ogn.parser.pattern import PATTERN_RECEIVER_POSITION, PATTERN_RECEIVER_STATUS
from .base import BaseParser
class ReceiverParser(BaseParser):
def __init__(self):
self.beacon_type = 'receiver_beacon'
@staticmethod
def parse_position(aprs_comment):
if aprs_comment is None:
return {}
else:
match = re.search(PATTERN_RECEIVER_POSITION, aprs_comment)
return {'user_comment': match.group('user_comment') if match.group('user_comment') else None}
@staticmethod
def parse_status(aprs_comment):
match = re.search(PATTERN_RECEIVER_STATUS, aprs_comment)
return {'version': match.group('version'),
'platform': match.group('platform'),
'cpu_load': float(match.group('cpu_load')),
'free_ram': float(match.group('ram_free')),
'total_ram': float(match.group('ram_total')),
'ntp_error': float(match.group('ntp_offset')),
'rt_crystal_correction': float(match.group('ntp_correction')),
'voltage': float(match.group('voltage')) if match.group('voltage') else None,
'amperage': float(match.group('amperage')) if match.group('amperage') else None,
'cpu_temp': float(match.group('cpu_temperature')) if match.group('cpu_temperature') else None,
'senders_visible': int(match.group('visible_senders')) if match.group('visible_senders') else None,
'senders_total': int(match.group('senders')) if match.group('senders') else None,
'rec_crystal_correction': int(match.group('rf_correction_manual')) if match.group('rf_correction_manual') else None,
'rec_crystal_correction_fine': float(match.group('rf_correction_automatic')) if match.group('rf_correction_automatic') else None,
'rec_input_noise': float(match.group('signal_quality')) if match.group('signal_quality') else None,
'senders_signal': float(match.group('senders_signal_quality')) if match.group('senders_signal_quality') else None,
'senders_messages': float(match.group('senders_messages')) if match.group('senders_messages') else None,
'good_senders_signal': float(match.group('good_senders_signal_quality')) if match.group('good_senders_signal_quality') else None,
'good_senders': float(match.group('good_senders')) if match.group('good_senders') else None,
'good_and_bad_senders': float(match.group('good_and_bad_senders')) if match.group('good_and_bad_senders') else None}

Wyświetl plik

@ -0,0 +1,6 @@
from .base import BaseParser
class SkylinesParser(BaseParser):
def __init__(self):
self.beacon_type = 'skylines_beacon'

Wyświetl plik

@ -0,0 +1,6 @@
from .base import BaseParser
class SpiderParser(BaseParser):
def __init__(self):
self.beacon_type = 'spider_beacon'

Wyświetl plik

@ -0,0 +1,6 @@
from .base import BaseParser
class SpotParser(BaseParser):
def __init__(self):
self.beacon_type = 'spot_beacon'

Wyświetl plik

@ -0,0 +1,44 @@
import re
from ogn.parser.pattern import PATTERN_TRACKER_BEACON_POSITION, PATTERN_TRACKER_BEACON_STATUS
from ogn.parser.utils import fpm2ms
from .base import BaseParser
class TrackerParser(BaseParser):
def __init__(self):
self.beacon_type = "aircraft_beacon"
@staticmethod
def parse_position(aprs_comment):
match = re.search(PATTERN_TRACKER_BEACON_POSITION, aprs_comment)
return {'address_type': int(match.group('details'), 16) & 0b00000011,
'aircraft_type': (int(match.group('details'), 16) & 0b01111100) >> 2,
'stealth': (int(match.group('details'), 16) & 0b10000000) >> 7 == 1,
'address': match.group('id'),
'climb_rate': int(match.group('climb_rate')) * fpm2ms if match.group('climb_rate') else None,
'turn_rate': float(match.group('turn_rate')) if match.group('turn_rate') else None,
'flightlevel': float(match.group('flight_level')) if match.group('flight_level') else None,
'signal_quality': float(match.group('signal_quality')) if match.group('signal_quality') else None,
'error_count': int(match.group('errors')) if match.group('errors') else None,
'frequency_offset': float(match.group('frequency_offset')) if match.group('frequency_offset') else None,
'gps_status': match.group('gps_accuracy') if match.group('gps_accuracy') else None,
'software_version': float(match.group('flarm_software_version')) if match.group('flarm_software_version') else None,
'hardware_version': int(match.group('flarm_hardware_version'), 16) if match.group('flarm_hardware_version') else None}
@staticmethod
def parse_status(aprs_comment):
match = re.search(PATTERN_TRACKER_BEACON_STATUS, aprs_comment)
return {'hardware_version': int(match.group('hardware_version')) if match.group('hardware_version') else None,
'software_version': int(match.group('software_version')) if match.group('software_version') else None,
'gps_satellites': int(match.group('gps_satellites')) if match.group('gps_satellites') else None,
'gps_quality': int(match.group('gps_quality')) if match.group('gps_quality') else None,
'gps_altitude': int(match.group('gps_altitude')) if match.group('gps_altitude') else None,
'pressure': float(match.group('pressure')) if match.group('pressure') else None,
'temperature': float(match.group('temperature')) if match.group('temperature') else None,
'humidity': int(match.group('humidity')) if match.group('humidity') else None,
'voltage': float(match.group('voltage')) if match.group('voltage') else None,
'transmitter_power': int(match.group('transmitter_power')) if match.group('transmitter_power') else None,
'noise_level': float(match.group('noise_level')) if match.group('noise_level') else None,
'relays': int(match.group('relays')) if match.group('relays') else None}

Wyświetl plik

@ -5,16 +5,15 @@ from ogn.parser.utils import createTimestamp, parseAngle, kts2kmh, feet2m
from ogn.parser.pattern import PATTERN_APRS_POSITION, PATTERN_APRS_STATUS
from ogn.parser.exceptions import AprsParseError, OgnParseError
from ogn.parser.parse_ogn import parse_aircraft_beacon, parse_receiver_beacon
from ogn.parser.parse_naviter import parse as parse_naviter_beacon
from ogn.parser.parse_lt24 import parse as parse_lt24_beacon
from ogn.parser.parse_spider import parse as parse_spider_beacon
from ogn.parser.parse_spot import parse as parse_spot_beacon
from ogn.parser.parse_skylines import parse as parse_skylines_beacon
from ogn.parser.parse_tracker import parse_position as parse_tracker_position
from ogn.parser.parse_tracker import parse_status as parse_tracker_status
from ogn.parser.parse_receiver import parse_position as parse_receiver_position
from ogn.parser.parse_receiver import parse_status as parse_receiver_status
from ogn.parser.aprs_comment.ogn_parser import OgnParser
from ogn.parser.aprs_comment.lt24_parser import LT24Parser
from ogn.parser.aprs_comment.naviter_parser import NaviterParser
from ogn.parser.aprs_comment.flarm_parser import FlarmParser
from ogn.parser.aprs_comment.tracker_parser import TrackerParser
from ogn.parser.aprs_comment.receiver_parser import ReceiverParser
from ogn.parser.aprs_comment.skylines_parser import SkylinesParser
from ogn.parser.aprs_comment.spider_parser import SpiderParser
from ogn.parser.aprs_comment.spot_parser import SpotParser
def parse(aprs_message, reference_date=None, reference_time=None):
@ -60,62 +59,21 @@ def parse_aprs(message, reference_date, reference_time=None):
raise AprsParseError(message)
def parse_comment(aprs_comment, dstcall="APRS", aprs_type="position"):
if dstcall == "APRS": # this can be a receiver or an aircraft
if not aprs_comment:
return {'beacon_type': 'receiver_beacon'}
dstcall_parser_mapping = {'APRS': OgnParser(),
'OGFLR': FlarmParser(),
'OGNTRK': TrackerParser(),
'OGNSDR': ReceiverParser(),
'OGLT24': LT24Parser(),
'OGNAVI': NaviterParser(),
'OGSKYL': SkylinesParser(),
'OGSPID': SpiderParser(),
'OGSPOT': SpotParser(),
}
ac_data = parse_aircraft_beacon(aprs_comment)
if ac_data:
ac_data.update({'beacon_type': 'aircraft_beacon'})
return ac_data
rc_data = parse_receiver_beacon(aprs_comment)
if rc_data:
rc_data.update({'beacon_type': 'receiver_beacon'})
return rc_data
else:
return {'user_comment': aprs_comment,
'beacon_type': 'receiver_beacon'}
elif dstcall == "OGFLR":
ac_data = parse_aircraft_beacon(aprs_comment)
ac_data.update({'beacon_type': 'aircraft_beacon'})
return ac_data
elif dstcall == "OGNTRK":
if aprs_type == "position":
data = parse_tracker_position(aprs_comment)
data.update({'beacon_type': 'aircraft_beacon'})
elif aprs_type == "status":
data = parse_tracker_status(aprs_comment)
data.update({'beacon_type': 'aircraft_beacon'})
return data
elif dstcall == "OGNSDR":
if aprs_type == "position":
data = parse_receiver_position(aprs_comment)
data.update({'beacon_type': 'receiver_beacon'})
elif aprs_type == "status":
data = parse_receiver_status(aprs_comment)
data.update({'beacon_type': 'receiver_beacon'})
return data
elif dstcall == "OGLT24":
ac_data = parse_lt24_beacon(aprs_comment)
ac_data.update({'beacon_type': 'lt24_beacon'})
return ac_data
elif dstcall == "OGNAVI":
ac_data = parse_naviter_beacon(aprs_comment)
ac_data.update({'beacon_type': 'naviter_beacon'})
return ac_data
elif dstcall == "OGSKYL":
ac_data = parse_skylines_beacon(aprs_comment)
ac_data.update({'beacon_type': 'skylines_beacon'})
return ac_data
elif dstcall == "OGSPID":
ac_data = parse_spider_beacon(aprs_comment)
ac_data.update({'beacon_type': 'spider_beacon'})
return ac_data
elif dstcall == "OGSPOT":
ac_data = parse_spot_beacon(aprs_comment)
ac_data.update({'beacon_type': 'spot_beacon'})
return ac_data
def parse_comment(aprs_comment, dstcall='APRS', aprs_type="position"):
parser = dstcall_parser_mapping.get(dstcall)
if parser:
return parser.parse(aprs_comment, aprs_type)
else:
raise OgnParseError("No parser for dstcall {} found. APRS comment: {}".format(dstcall, aprs_comment))

Wyświetl plik

@ -1,2 +0,0 @@
def parse(aprs_comment):
raise NotImplementedError("LT24 beacon parser not yet implemented")

Wyświetl plik

@ -1,16 +0,0 @@
import re
from ogn.parser.utils import fpm2ms
from ogn.parser.pattern import PATTERN_NAVITER_BEACON
def parse(aprs_comment):
match = re.search(PATTERN_NAVITER_BEACON, aprs_comment)
return {'stealth': (int(match.group('details'), 16) & 0b1000000000000000) >> 15 == 1,
'do_not_track': (int(match.group('details'), 16) & 0b0100000000000000) >> 14 == 1,
'aircraft_type': (int(match.group('details'), 16) & 0b0011110000000000) >> 10,
'address_type': (int(match.group('details'), 16) & 0b0000001111110000) >> 4,
'reserved': (int(match.group('details'), 16) & 0b0000000000001111),
'address': match.group('id'),
'climb_rate': int(match.group('climb_rate')) * fpm2ms if match.group('climb_rate') else None,
'turn_rate': float(match.group('turn_rate')) if match.group('turn_rate') else None}

Wyświetl plik

@ -1,54 +0,0 @@
import re
from ogn.parser.utils import fpm2ms
from ogn.parser.pattern import PATTERN_RECEIVER_BEACON, PATTERN_AIRCRAFT_BEACON
def parse_aircraft_beacon(aprs_comment):
ac_match = re.search(PATTERN_AIRCRAFT_BEACON, aprs_comment)
if ac_match:
return {'address_type': int(ac_match.group('details'), 16) & 0b00000011,
'aircraft_type': (int(ac_match.group('details'), 16) & 0b01111100) >> 2,
'stealth': (int(ac_match.group('details'), 16) & 0b10000000) >> 7 == 1,
'address': ac_match.group('id'),
'climb_rate': int(ac_match.group('climb_rate')) * fpm2ms if ac_match.group('climb_rate') else None,
'turn_rate': float(ac_match.group('turn_rate')) if ac_match.group('turn_rate') else None,
'flightlevel': float(ac_match.group('flight_level')) if ac_match.group('flight_level') else None,
'signal_quality': float(ac_match.group('signal_quality')) if ac_match.group('signal_quality') else None,
'error_count': int(ac_match.group('errors')) if ac_match.group('errors') else None,
'frequency_offset': float(ac_match.group('frequency_offset')) if ac_match.group('frequency_offset') else None,
'gps_status': ac_match.group('gps_accuracy') if ac_match.group('gps_accuracy') else None,
'software_version': float(ac_match.group('flarm_software_version')) if ac_match.group('flarm_software_version') else None,
'hardware_version': int(ac_match.group('flarm_hardware_version'), 16) if ac_match.group('flarm_hardware_version') else None,
'real_address': ac_match.group('flarm_id') if ac_match.group('flarm_id') else None,
'signal_power': float(ac_match.group('signal_power')) if ac_match.group('signal_power') else None,
'proximity': [hear[4:] for hear in ac_match.group('proximity').split(" ")] if ac_match.group('proximity') else None}
else:
return None
def parse_receiver_beacon(aprs_comment):
rec_match = re.search(PATTERN_RECEIVER_BEACON, aprs_comment)
if rec_match:
return {'version': rec_match.group('version'),
'platform': rec_match.group('platform'),
'cpu_load': float(rec_match.group('cpu_load')),
'free_ram': float(rec_match.group('ram_free')),
'total_ram': float(rec_match.group('ram_total')),
'ntp_error': float(rec_match.group('ntp_offset')),
'rt_crystal_correction': float(rec_match.group('ntp_correction')),
'voltage': float(rec_match.group('voltage')) if rec_match.group('voltage') else None,
'amperage': float(rec_match.group('amperage')) if rec_match.group('amperage') else None,
'cpu_temp': float(rec_match.group('cpu_temperature')) if rec_match.group('cpu_temperature') else None,
'senders_visible': int(rec_match.group('visible_senders')) if rec_match.group('visible_senders') else None,
'senders_total': int(rec_match.group('senders')) if rec_match.group('senders') else None,
'rec_crystal_correction': int(rec_match.group('rf_correction_manual')) if rec_match.group('rf_correction_manual') else None,
'rec_crystal_correction_fine': float(rec_match.group('rf_correction_automatic')) if rec_match.group('rf_correction_automatic') else None,
'rec_input_noise': float(rec_match.group('signal_quality')) if rec_match.group('signal_quality') else None,
'senders_signal': float(rec_match.group('senders_signal_quality')) if rec_match.group('senders_signal_quality') else None,
'senders_messages': float(rec_match.group('senders_messages')) if rec_match.group('senders_messages') else None,
'good_senders_signal': float(rec_match.group('good_senders_signal_quality')) if rec_match.group('good_senders_signal_quality') else None,
'good_senders': float(rec_match.group('good_senders')) if rec_match.group('good_senders') else None,
'good_and_bad_senders': float(rec_match.group('good_and_bad_senders')) if rec_match.group('good_and_bad_senders') else None}
else:
return None

Wyświetl plik

@ -1,35 +0,0 @@
import re
from ogn.parser.pattern import PATTERN_RECEIVER_POSITION, PATTERN_RECEIVER_STATUS
def parse_position(aprs_comment):
if aprs_comment is None:
return {}
else:
match = re.search(PATTERN_RECEIVER_POSITION, aprs_comment)
return {'user_comment': match.group('user_comment') if match.group('user_comment') else None}
def parse_status(aprs_comment):
match = re.search(PATTERN_RECEIVER_STATUS, aprs_comment)
return {'version': match.group('version'),
'platform': match.group('platform'),
'cpu_load': float(match.group('cpu_load')),
'free_ram': float(match.group('ram_free')),
'total_ram': float(match.group('ram_total')),
'ntp_error': float(match.group('ntp_offset')),
'rt_crystal_correction': float(match.group('ntp_correction')),
'voltage': float(match.group('voltage')) if match.group('voltage') else None,
'amperage': float(match.group('amperage')) if match.group('amperage') else None,
'cpu_temp': float(match.group('cpu_temperature')) if match.group('cpu_temperature') else None,
'senders_visible': int(match.group('visible_senders')) if match.group('visible_senders') else None,
'senders_total': int(match.group('senders')) if match.group('senders') else None,
'rec_crystal_correction': int(match.group('rf_correction_manual')) if match.group('rf_correction_manual') else None,
'rec_crystal_correction_fine': float(match.group('rf_correction_automatic')) if match.group('rf_correction_automatic') else None,
'rec_input_noise': float(match.group('signal_quality')) if match.group('signal_quality') else None,
'senders_signal': float(match.group('senders_signal_quality')) if match.group('senders_signal_quality') else None,
'senders_messages': float(match.group('senders_messages')) if match.group('senders_messages') else None,
'good_senders_signal': float(match.group('good_senders_signal_quality')) if match.group('good_senders_signal_quality') else None,
'good_senders': float(match.group('good_senders')) if match.group('good_senders') else None,
'good_and_bad_senders': float(match.group('good_and_bad_senders')) if match.group('good_and_bad_senders') else None}

Wyświetl plik

@ -1,2 +0,0 @@
def parse(aprs_comment):
raise NotImplementedError("Skylines beacon parser not yet implemented")

Wyświetl plik

@ -1,2 +0,0 @@
def parse(aprs_comment):
raise NotImplementedError("Spider beacon parser not yet implemented")

Wyświetl plik

@ -1,2 +0,0 @@
def parse(aprs_comment):
raise NotImplementedError("SPOT beacon parser not yet implemented")

Wyświetl plik

@ -1,37 +0,0 @@
import re
from ogn.parser.utils import fpm2ms
from ogn.parser.pattern import PATTERN_TRACKER_BEACON_POSITION, PATTERN_TRACKER_BEACON_STATUS
def parse_position(aprs_comment):
match = re.search(PATTERN_TRACKER_BEACON_POSITION, aprs_comment)
return {'address_type': int(match.group('details'), 16) & 0b00000011,
'aircraft_type': (int(match.group('details'), 16) & 0b01111100) >> 2,
'stealth': (int(match.group('details'), 16) & 0b10000000) >> 7 == 1,
'address': match.group('id'),
'climb_rate': int(match.group('climb_rate')) * fpm2ms if match.group('climb_rate') else None,
'turn_rate': float(match.group('turn_rate')) if match.group('turn_rate') else None,
'flightlevel': float(match.group('flight_level')) if match.group('flight_level') else None,
'signal_quality': float(match.group('signal_quality')) if match.group('signal_quality') else None,
'error_count': int(match.group('errors')) if match.group('errors') else None,
'frequency_offset': float(match.group('frequency_offset')) if match.group('frequency_offset') else None,
'gps_status': match.group('gps_accuracy') if match.group('gps_accuracy') else None,
'software_version': float(match.group('flarm_software_version')) if match.group('flarm_software_version') else None,
'hardware_version': int(match.group('flarm_hardware_version'), 16) if match.group('flarm_hardware_version') else None}
def parse_status(aprs_comment):
match = re.search(PATTERN_TRACKER_BEACON_STATUS, aprs_comment)
return {'hardware_version': int(match.group('hardware_version')) if match.group('hardware_version') else None,
'software_version': int(match.group('software_version')) if match.group('software_version') else None,
'gps_satellites': int(match.group('gps_satellites')) if match.group('gps_satellites') else None,
'gps_quality': int(match.group('gps_quality')) if match.group('gps_quality') else None,
'gps_altitude': int(match.group('gps_altitude')) if match.group('gps_altitude') else None,
'pressure': float(match.group('pressure')) if match.group('pressure') else None,
'temperature': float(match.group('temperature')) if match.group('temperature') else None,
'humidity': int(match.group('humidity')) if match.group('humidity') else None,
'voltage': float(match.group('voltage')) if match.group('voltage') else None,
'transmitter_power': int(match.group('transmitter_power')) if match.group('transmitter_power') else None,
'noise_level': float(match.group('noise_level')) if match.group('noise_level') else None,
'relays': int(match.group('relays')) if match.group('relays') else None}

Wyświetl plik

@ -0,0 +1,18 @@
import unittest
from ogn.parser.utils import ms2fpm
from ogn.parser.aprs_comment.lt24_parser import LT24Parser
class TestStringMethods(unittest.TestCase):
@unittest.skip("Not yet implemented")
def test(self):
message = LT24Parser.parse_position("id25387 +000fpm GPS")
self.assertEqual(message['id'], 25387)
self.assertAlmostEqual(message['climb_rate'] * ms2fpm, 0, 2)
self.assertEqual(message['wtf'], 'GPS')
if __name__ == '__main__':
unittest.main()

Wyświetl plik

@ -1,12 +1,12 @@
import unittest
from ogn.parser.utils import ms2fpm
from ogn.parser.parse_naviter import parse
from ogn.parser.aprs_comment.naviter_parser import NaviterParser
class TestStringMethods(unittest.TestCase):
def test_OGNAVI_1(self):
message = parse("id0440042121 +123fpm +0.5rot")
message = NaviterParser.parse_position("id0440042121 +123fpm +0.5rot")
# id0440042121 == 0b0000 0100 0100 0000 0000 0100 0010 0001 0010 0001
# bit 0: stealth mode

Wyświetl plik

@ -1,15 +1,15 @@
import unittest
from ogn.parser.utils import ms2fpm
from ogn.parser.parse import parse_aircraft_beacon
from ogn.parser.aprs_comment.ogn_parser import OgnParser
class TestStringMethods(unittest.TestCase):
def test_invalid_token(self):
self.assertEqual(parse_aircraft_beacon("notAValidToken"), None)
self.assertEqual(OgnParser.parse_aircraft_beacon("notAValidToken"), None)
def test_basic(self):
message = parse_aircraft_beacon("id0ADDA5BA -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5 hear1084 hearB597 hearB598")
message = OgnParser.parse_aircraft_beacon("id0ADDA5BA -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5 hear1084 hearB597 hearB598")
self.assertEqual(message['address_type'], 2)
self.assertEqual(message['aircraft_type'], 2)
@ -27,33 +27,33 @@ class TestStringMethods(unittest.TestCase):
self.assertEqual(message['proximity'][2], 'B598')
def test_stealth(self):
message = parse_aircraft_beacon("id0ADD1234 -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5 hear1084 hearB597 hearB598")
message = OgnParser.parse_aircraft_beacon("id0ADD1234 -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5 hear1084 hearB597 hearB598")
self.assertFalse(message['stealth'])
message = parse_aircraft_beacon("id8ADD1234 -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5 hear1084 hearB597 hearB598")
message = OgnParser.parse_aircraft_beacon("id8ADD1234 -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5 hear1084 hearB597 hearB598")
self.assertTrue(message['stealth'])
def test_v024(self):
message = parse_aircraft_beacon("id21400EA9 -2454fpm +0.9rot 19.5dB 0e -6.6kHz gps1x1 s6.02 h0A rDF0C56")
message = OgnParser.parse_aircraft_beacon("id21400EA9 -2454fpm +0.9rot 19.5dB 0e -6.6kHz gps1x1 s6.02 h0A rDF0C56")
self.assertEqual(message['software_version'], 6.02)
self.assertEqual(message['hardware_version'], 10)
self.assertEqual(message['real_address'], "DF0C56")
def test_v024_ogn_tracker(self):
message = parse_aircraft_beacon("id07353800 +020fpm -14.0rot FL004.43 38.5dB 0e -2.9kHz")
message = OgnParser.parse_aircraft_beacon("id07353800 +020fpm -14.0rot FL004.43 38.5dB 0e -2.9kHz")
self.assertEqual(message['flightlevel'], 4.43)
def test_v025(self):
message = parse_aircraft_beacon("id06DDE28D +535fpm +3.8rot 11.5dB 0e -1.0kHz gps2x3 s6.01 h0C +7.4dBm")
message = OgnParser.parse_aircraft_beacon("id06DDE28D +535fpm +3.8rot 11.5dB 0e -1.0kHz gps2x3 s6.01 h0C +7.4dBm")
self.assertEqual(message['signal_power'], 7.4)
def test_v026(self):
# from 0.2.6 it is sufficent we have only the ID, climb and turn rate or just the ID
message_triple = parse_aircraft_beacon("id093D0930 +000fpm +0.0rot")
message_single = parse_aircraft_beacon("id093D0930")
message_triple = OgnParser.parse_aircraft_beacon("id093D0930 +000fpm +0.0rot")
message_single = OgnParser.parse_aircraft_beacon("id093D0930")
self.assertIsNotNone(message_triple)
self.assertIsNotNone(message_single)

Wyświetl plik

@ -1,14 +1,14 @@
import unittest
from ogn.parser.parse import parse_receiver_beacon
from ogn.parser.aprs_comment.ogn_parser import OgnParser
class TestStringMethods(unittest.TestCase):
def test_fail_validation(self):
self.assertEqual(parse_receiver_beacon("notAValidToken"), None)
self.assertEqual(OgnParser.parse_receiver_beacon("notAValidToken"), None)
def test_v021(self):
message = parse_receiver_beacon("v0.2.1 CPU:0.8 RAM:25.6/458.9MB NTP:0.1ms/+2.3ppm +51.9C RF:+26-1.4ppm/-0.25dB")
message = OgnParser.parse_receiver_beacon("v0.2.1 CPU:0.8 RAM:25.6/458.9MB NTP:0.1ms/+2.3ppm +51.9C RF:+26-1.4ppm/-0.25dB")
self.assertEqual(message['version'], "0.2.1")
self.assertEqual(message['cpu_load'], 0.8)
@ -23,17 +23,17 @@ class TestStringMethods(unittest.TestCase):
self.assertEqual(message['rec_input_noise'], -0.25)
def test_v022(self):
message = parse_receiver_beacon("v0.2.2.x86 CPU:0.5 RAM:669.9/887.7MB NTP:1.0ms/+6.2ppm +52.0C RF:+0.06dB")
message = OgnParser.parse_receiver_beacon("v0.2.2.x86 CPU:0.5 RAM:669.9/887.7MB NTP:1.0ms/+6.2ppm +52.0C RF:+0.06dB")
self.assertEqual(message['platform'], 'x86')
def test_v025(self):
message = parse_receiver_beacon("v0.2.5.RPI-GPU CPU:0.8 RAM:287.3/458.7MB NTP:1.0ms/-6.4ppm 5.016V 0.534A +51.9C RF:+55+0.4ppm/-0.67dB/+10.8dB@10km[57282]")
message = OgnParser.parse_receiver_beacon("v0.2.5.RPI-GPU CPU:0.8 RAM:287.3/458.7MB NTP:1.0ms/-6.4ppm 5.016V 0.534A +51.9C RF:+55+0.4ppm/-0.67dB/+10.8dB@10km[57282]")
self.assertEqual(message['voltage'], 5.016)
self.assertEqual(message['amperage'], 0.534)
self.assertEqual(message['senders_signal'], 10.8)
self.assertEqual(message['senders_messages'], 57282)
message = parse_receiver_beacon("v0.2.5.ARM CPU:0.4 RAM:638.0/970.5MB NTP:0.2ms/-1.1ppm +65.5C 14/16Acfts[1h] RF:+45+0.0ppm/+3.88dB/+24.0dB@10km[143717]/+26.7dB@10km[68/135]")
message = OgnParser.parse_receiver_beacon("v0.2.5.ARM CPU:0.4 RAM:638.0/970.5MB NTP:0.2ms/-1.1ppm +65.5C 14/16Acfts[1h] RF:+45+0.0ppm/+3.88dB/+24.0dB@10km[143717]/+26.7dB@10km[68/135]")
self.assertEqual(message['senders_visible'], 14)
self.assertEqual(message['senders_total'], 16)
self.assertEqual(message['senders_signal'], 24.0)

Wyświetl plik

@ -1,21 +1,21 @@
import unittest
from ogn.parser.parse_receiver import parse_position, parse_status
from ogn.parser.aprs_comment.receiver_parser import ReceiverParser
class TestStringMethods(unittest.TestCase):
def test_position(self):
message = parse_position("Antenna: chinese, on a pylon, 20 meter above ground")
message = ReceiverParser.parse_position("Antenna: chinese, on a pylon, 20 meter above ground")
self.assertEqual(message['user_comment'], "Antenna: chinese, on a pylon, 20 meter above ground")
def test_position_empty(self):
message = parse_position("")
message = ReceiverParser.parse_position("")
self.assertIsNotNone(message)
def test_status(self):
message = parse_status("v0.2.7.RPI-GPU CPU:0.7 RAM:770.2/968.2MB NTP:1.8ms/-3.3ppm +55.7C 7/8Acfts[1h] RF:+54-1.1ppm/-0.16dB/+7.1dB@10km[19481]/+16.8dB@10km[7/13]")
message = ReceiverParser.parse_status("v0.2.7.RPI-GPU CPU:0.7 RAM:770.2/968.2MB NTP:1.8ms/-3.3ppm +55.7C 7/8Acfts[1h] RF:+54-1.1ppm/-0.16dB/+7.1dB@10km[19481]/+16.8dB@10km[7/13]")
self.assertEqual(message['version'], "0.2.7")
self.assertEqual(message['platform'], 'RPI-GPU')

Wyświetl plik

@ -0,0 +1,17 @@
import unittest
from ogn.parser.aprs_comment.spot_parser import SpotParser
class TestStringMethods(unittest.TestCase):
@unittest.skip("Not yet implemented")
def test(self):
message = SpotParser.parse_position("id0-2860357 SPOT3 GOOD")
self.assertEqual(message['id'], "0-2860357")
self.assertEqual(message['hw_version'], 3)
self.assertEqual(message['wtf'], "GOOD")
if __name__ == '__main__':
unittest.main()

Wyświetl plik

@ -1,12 +1,12 @@
import unittest
from ogn.parser.utils import ms2fpm
from ogn.parser.parse_tracker import parse_position, parse_status
from ogn.parser.aprs_comment.tracker_parser import TrackerParser
class TestStringMethods(unittest.TestCase):
def test_position_beacon(self):
message = parse_position("id072FD00F -058fpm +1.1rot FL003.12 32.8dB 0e -0.8kHz gps3x5")
message = TrackerParser.parse_position("id072FD00F -058fpm +1.1rot FL003.12 32.8dB 0e -0.8kHz gps3x5")
self.assertEqual(message['address_type'], 3)
self.assertEqual(message['aircraft_type'], 1)
@ -21,7 +21,7 @@ class TestStringMethods(unittest.TestCase):
self.assertEqual(message['gps_status'], '3x5')
def test_status(self):
message = parse_status("h00 v00 9sat/1 164m 1002.6hPa +20.2degC 0% 3.34V 14/-110.5dBm 1/min")
message = TrackerParser.parse_status("h00 v00 9sat/1 164m 1002.6hPa +20.2degC 0% 3.34V 14/-110.5dBm 1/min")
self.assertEqual(message['hardware_version'], 0)
self.assertEqual(message['software_version'], 0)