Initial release

pull/1/head
Konstantin Gründger 2015-10-24 23:13:21 +02:00
commit 2f456a0eef
28 zmienionych plików z 834 dodań i 0 usunięć

50
.gitignore vendored 100644
Wyświetl plik

@ -0,0 +1,50 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
# C extensions
*.so
# Distribution / packaging
bin/
build/
develop-eggs/
dist/
eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
.tox/
.coverage
.cache
nosetests.xml
coverage.xml
# Translations
*.mo
# Mr Developer
.mr.developer.cfg
.project
.pydevproject
# Rope
.ropeproject
# Django stuff:
*.log
*.pot
# Sphinx documentation
docs/_build/

10
.travis.yml 100644
Wyświetl plik

@ -0,0 +1,10 @@
language: python
python:
- 3.4
script:
- nosetests --with-coverage --cover-package=ogn
after_success:
- coveralls

6
README.md 100644
Wyświetl plik

@ -0,0 +1,6 @@
# ogn
[![Build Status](https://travis-ci.org/Meisterschueler/ogn.svg?branch=master)]
(https://travis-ci.org/Meisterschueler/ogn)
[![Coverage Status](https://img.shields.io/coveralls/Meisterschueler/ogn.svg)]
(https://coveralls.io/r/Meisterschueler/ogn)

1
ogn/__init__.py 100644
Wyświetl plik

@ -0,0 +1 @@

32
ogn/aprs_parser.py 100644
Wyświetl plik

@ -0,0 +1,32 @@
from ogn.model.beacon import Beacon
from ogn.model.position import Position
from ogn.model.receiver import Receiver
def parse_aprs(text):
if not isinstance(text, str):
raise Exception("Unknown type: %s" % type(text))
elif text == "":
raise Exception("String is empty")
elif text[0] == "#":
return None
beacon = Beacon()
beacon.parse(text)
# symboltable / symbolcodes used by OGN:
# I&: used as receiver
# /X: helicopter_rotorcraft
# /': glider_or_motorglider
# \^: powered_aircraft
# /g: para_glider
# /O: ?
# /^: ?
# \n: ?
# /z: ?
# /o: ?
if beacon.symboltable == "I" and beacon.symbolcode == "&":
return Receiver(beacon)
else:
return Position(beacon)

36
ogn/aprs_utils.py 100644
Wyświetl plik

@ -0,0 +1,36 @@
from datetime import *
import math
kmh2kts = 0.539957
feet2m = 0.3048
ms2fpm = 196.85
kts2kmh = 1/kmh2kts
m2feet = 1/feet2m
fpm2ms = 1/ms2fpm
def dmsToDeg(dms):
absDms = abs(dms)
d = math.floor(absDms)
m = (absDms - d) * 100 / 60
return (d + m)
def createTimestamp(hhmmss, reference):
hh = int(hhmmss[0:2])
mm = int(hhmmss[2:4])
ss = int(hhmmss[4:6])
if reference is None:
reference = datetime.utcnow()
return datetime(reference.year, reference.month, reference.day, hh, mm, ss)
if (reference.hour == 23) & (hh == 0):
reference = reference + timedelta(days=1)
elif (reference.hour == 0) & (hh == 23):
reference = reference - timedelta(days=1)
elif (abs(reference.hour - hh) > 1):
raise Exception("Time difference is big. Reference time:%s - timestamp:%s" % (reference, hhmmss))
return datetime(reference.year, reference.month, reference.day, hh, mm, ss)

15
ogn/db.py 100644
Wyświetl plik

@ -0,0 +1,15 @@
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from ogn.model.base import Base
# prepare db
#engine = create_engine('sqlite:///:memory:', echo=False)
#engine = create_engine('sqlite:///ogn.db', echo=False)
engine = create_engine('postgresql://postgres:albatross@localhost:5432/ogn')
Base().metadata.create_all(engine)
Session = sessionmaker(bind=engine)
session = Session()

52
ogn/db_utils.py 100644
Wyświetl plik

@ -0,0 +1,52 @@
from urllib.request import urlopen
from ogn.db import session
from ogn.model.address_origin import AddressOrigin
from ogn.model.flarm import *
def get_devices_from_ddb():
session.query(Flarm.address_origin == AddressOrigin.ogn_ddb).delete()
response = urlopen("http://ddb.glidernet.org/download")
lines = response.readlines()
for line in lines:
if (line.decode()[0] == "#"):
continue
flarm = Flarm()
flarm.parse_ogn(line.decode())
session.add(flarm)
session.commit()
def get_devices_from_flarmnet():
session.query(Flarm.address_origin == AddressOrigin.flarmnet).delete()
response = urlopen("http://flarmnet.org/files/data.fln")
lines = response.readlines()
for line in lines:
if (len(line) != FLARMNET_LINE_LENGTH):
continue
flarm = Flarm()
flarm.parse_flarmnet(line.decode())
session.add(flarm)
session.commit()
def put_position_into_db(position):
session.add(position)
session.commit()
def put_receiver_into_db(receiver):
session.add(receiver)
session.commit()
if __name__ == '__main__':
get_devices_from_ddb()
get_devices_from_flarmnet()

Wyświetl plik

@ -0,0 +1 @@

Wyświetl plik

@ -0,0 +1,4 @@
class AddressOrigin:
ogn_ddb = 1
flarmnet = 2
userdefined = 3

Wyświetl plik

@ -0,0 +1,16 @@
class AircraftType:
unknown = 0
glider_or_motor_glider = 1
tow_tug_plane = 2
helicopter_rotorcraft = 3
parachute = 4
drop_plane = 5
hang_glider = 6
para_glider = 7
powered_aircraft = 8
jet_aircraft = 9
flying_saucer = 10
balloon = 11
airship = 12
unmanned_aerial_vehicle = 13
static_object = 15

Wyświetl plik

@ -0,0 +1,4 @@
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()

Wyświetl plik

@ -0,0 +1,63 @@
from datetime import *
import re
from sqlalchemy import Column, String, Integer, Float, DateTime
from sqlalchemy.ext.declarative import AbstractConcreteBase
from ogn.aprs_utils import *
from ogn.model.base import Base
# "original" pattern from OGN: "(.+?)>APRS,.+,(.+?):/(\\d{6})+h(\\d{4}\\.\\d{2})(N|S).(\\d{5}\\.\\d{2})(E|W).((\\d{3})/(\\d{3}))?/A=(\\d{6}).*?"
PATTERN_APRS = r"^(.+?)>APRS,.+,(.+?):/(\d{6})+h(\d{4}\.\d{2})(N|S)(.)(\d{5}\.\d{2})(E|W)(.)((\d{3})/(\d{3}))?/A=(\d{6})\s(.*)$"
prog = re.compile(PATTERN_APRS)
class Beacon(AbstractConcreteBase, Base):
id = Column(Integer, primary_key=True)
# APRS data
name = Column(String)
receiver_name = Column(String(9))
timestamp = Column(DateTime, index=True)
latitude = Column(Float)
symboltable = None
longitude = Column(Float)
symbolcode = None
ground_speed = Column(Float)
track = Column(Integer)
altitude = Column(Integer)
comment = None
def parse(self, text, reference_time=None):
result = prog.match(text)
if result is None:
raise Exception("String is not valid" % text)
self.name = result.group(1)
self.receiver_name = result.group(2)
self.timestamp = createTimestamp(result.group(3), reference_time)
self.latitude = dmsToDeg(float(result.group(4)) / 100)
if result.group(5) == "S":
self.latitude = -self.latitude
self.symboltable = result.group(6)
self.longitude = dmsToDeg(float(result.group(7)) / 100)
if result.group(8) == "W":
self.longitude = -self.longitude
self.symbolcode = result.group(9)
if result.group(10) is not None:
self.ground_speed = int(result.group(11))*kts2kmh
self.track = int(result.group(12))
else:
self.speed = 0
self.track = 0
self.altitude = int(result.group(13))*feet2m
self.comment = result.group(14)

92
ogn/model/flarm.py 100644
Wyświetl plik

@ -0,0 +1,92 @@
import re
from sqlalchemy import Column, Integer, String, Unicode, Boolean, SmallInteger
from ogn.model.address_origin import *
from ogn.model.base import Base
FLARMNET_LINE_LENGTH = 173
class Flarm(Base):
__tablename__ = 'flarm'
id = Column(Integer, primary_key=True)
address_type = None
address = Column(String(6), index=True)
name = Column(Unicode)
airport = Column(String)
aircraft = Column(String)
registration = Column(String(7), index=True)
competition = Column(String(3))
frequency = Column(String)
tracked = Column(Boolean)
identified = Column(Boolean)
address_origin = Column(SmallInteger)
def parse_ogn(self, line):
PATTERN = "\'([FIO])\',\'(.{6})\',\'([^\']+)?\',\'([^\']+)?\',\'([^\']+)?\',\'([YN])\',\'([YN])\'"
ogn_re = re.compile(PATTERN)
result = ogn_re.match(line)
if result is None:
raise Exception("No valid string: %s" % line)
self.address_type = result.group(1)
self.address = result.group(2)
self.aircraft = result.group(3)
self.registration = result.group(4)
self.competition = result.group(5)
self.tracked = result.group(6) == "Y"
self.identified = result.group(7) == "Y"
self.address_origin = AddressOrigin.ogn_ddb
def parse_flarmnet(self, line):
rawString = self.hexToString(line)
self.address_type = None
self.address = rawString[0:6].strip()
self.name = rawString[6:27].strip()
self.airport = rawString[27:48].strip()
self.aircraft = rawString[48:69].strip()
self.registration = rawString[69:76].strip()
self.competition = rawString[76:79].strip()
self.frequency = rawString[79:89].strip()
self.address_origin = AddressOrigin.flarmnet
def hexToString(self, hexString):
result = ''
for i in range(0, len(hexString)-1, 2):
result += chr(int(hexString[i:i+2], 16))
return(result)
def __repr__(self):
return("<Flarm: %s,%s,%s,%s,%s,%s,%s,%s,%s,%s>" % (self.address_type, self.address, self.name, self.airport, self.aircraft, self.registration, self.competition, self.frequency, self.tracked, self.identified))
if __name__ == '__main__':
import urllib.request
response = urllib.request.urlopen("http://ddb.glidernet.org/download")
lines = response.readlines()
for line in lines:
if (line.decode()[0] == "#"):
continue
flarm = Flarm()
flarm.parse_ogn(line.decode())
print(str(flarm))
response = urllib.request.urlopen("http://flarmnet.org/files/data.fln")
lines = response.readlines()
for line in lines:
if (len(line) != FLARMNET_LINE_LENGTH):
continue
flarm = Flarm()
flarm.parse_flarmnet(line.decode())
print(str(flarm))

Wyświetl plik

@ -0,0 +1,98 @@
import re
from sqlalchemy import Column, String, Integer, Float, Boolean, SmallInteger
from ogn.aprs_utils import *
from ogn.model.beacon import Beacon
class Position(Beacon):
__tablename__ = "position"
# Flarm specific data
address_type = Column(SmallInteger)
aircraft_type = Column(SmallInteger)
stealth = Column(Boolean)
address = Column(String, index=True)
climb_rate = Column(Float)
turn_rate = Column(Float)
signal_strength = Column(Float)
error_count = Column(Integer)
frequency_offset = Column(Float)
gps_status = Column(String)
# Pattern
address_pattern = re.compile(r"id(\S{2})(\S{6})")
climb_rate_pattern = re.compile(r"([\+\-]\d+)fpm")
turn_rate_pattern = re.compile(r"([\+\-]\d+\.\d+)rot")
signal_strength_pattern = re.compile(r"(\d+\.\d+)dB")
error_count_pattern = re.compile(r"(\d+)e")
coordinates_extension_pattern = re.compile(r"\!W(.)(.)!")
hear_ID_pattern = re.compile(r"hear(\w{4})")
frequency_offset_pattern = re.compile(r"([\+\-]\d+\.\d+)kHz")
gps_status_pattern = re.compile(r"gps(\d+x\d+)")
def __init__(self, beacon=None):
self.heared_aircraft_IDs = list()
if beacon is not None:
self.name = beacon.name
self.receiver_name = beacon.receiver_name
self.timestamp = beacon.timestamp
self.latitude = beacon.latitude
self.longitude = beacon.longitude
self.ground_speed = beacon.ground_speed
self.track = beacon.track
self.altitude = beacon.altitude
self.comment = beacon.comment
self.parse(beacon.comment)
def parse(self, text):
for part in text.split(' '):
address_match = self.address_pattern.match(part)
climb_rate_match = self.climb_rate_pattern.match(part)
turn_rate_match = self.turn_rate_pattern.match(part)
signal_strength_match = self.signal_strength_pattern.match(part)
error_count_match = self.error_count_pattern.match(part)
coordinates_extension_match = self.coordinates_extension_pattern.match(part)
hear_ID_match = self.hear_ID_pattern.match(part)
frequency_offset_match = self.frequency_offset_pattern.match(part)
gps_status_match = self.gps_status_pattern.match(part)
if address_match is not None:
# Flarm ID type byte in APRS msg: PTTT TTII
# P => stealth mode
# TTTTT => aircraftType
# II => IdType: 0=Random, 1=ICAO, 2=FLARM, 3=OGN
# (see https://groups.google.com/forum/#!msg/openglidernetwork/lMzl5ZsaCVs/YirmlnkaJOYJ).
self.address_type = int(address_match.group(1), 16) & 0b00000011
self.aircraft_type = (int(address_match.group(1), 16) & 0b01111100) >> 2
self.stealth = ((int(address_match.group(1), 16) & 0b10000000) >> 7 == 1)
self.address = address_match.group(2)
elif climb_rate_match is not None:
self.climb_rate = int(climb_rate_match.group(1))*fpm2ms
elif turn_rate_match is not None:
self.turn_rate = float(turn_rate_match.group(1))
elif signal_strength_match is not None:
self.signal_strength = float(signal_strength_match.group(1))
elif error_count_match is not None:
self.error_count = int(error_count_match.group(1))
elif coordinates_extension_match is not None:
dlat = int(coordinates_extension_match.group(1)) / 1000
dlon = int(coordinates_extension_match.group(2)) / 1000
self.latitude = self.latitude + dlat
self.longitude = self.longitude + dlon
elif hear_ID_match is not None:
self.heared_aircraft_IDs.append(hear_ID_match.group(1))
elif frequency_offset_match is not None:
self.frequency_offset = float(frequency_offset_match.group(1))
elif gps_status_match is not None:
self.gps_status = gps_status_match.group(1)
else:
raise Exception("No valid position description: %s" % part)
def __repr__(self):
#return("<Position %s: %s %s %s %s %s %s" % (self.name, self.latitude, self.longitude, self.altitude, self.ground_speed, self.track, self.climb_rate))
return("<Position %s: %s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s>" % (self.name, self.address_type, self.aircraft_type, self.timestamp, self.address_type, self.aircraft_type, self.stealth, self.address, self.climb_rate, self.turn_rate, self.signal_strength, self.error_count, self.frequency_offset, self.gps_status))

Wyświetl plik

@ -0,0 +1,86 @@
import re
from sqlalchemy import Column, String, Integer, Float
from ogn.model.beacon import Beacon
class Receiver(Beacon):
__tablename__ = "receiver"
# Receiver specific data
version = Column(String)
cpu_load = 0
cpu_temp = 0
free_ram = 0
total_ram = 0
ntp_error = 0
rt_crystal_correction = 0
rec_crystal_correction = 0
rec_crystal_correction_fine = 0
rec_input_noise = 0
# Pattern
version_pattern = re.compile(r"v(\d+\.\d+\.\d+)")
cpu_pattern = re.compile(r"CPU:(\d+\.\d+)")
cpu_temp_pattern = re.compile(r"([\+\-]\d+\.\d+)C")
ram_pattern = re.compile(r"RAM:(\d+\.\d+)/(\d+\.\d+)MB")
ntp_pattern = re.compile(r"NTP:(\d+\.\d+)ms/([\+\-]\d+\.\d+)ppm")
rf_pattern_full = re.compile(r"RF:([\+\-]\d+)([\+\-]\d+\.\d+)ppm/([\+\-]\d+\.\d+)dB")
rf_pattern_light1 = re.compile(r"RF:([\+\-]\d+\.\d+)dB")
rf_pattern_light2 = re.compile(r"RF:([\+\-]\d+)([\+\-]\d+\.\d+)ppm")
def __init__(self, beacon=None):
if beacon is not None:
self.name = beacon.name
self.receiver_name = beacon.receiver_name
self.timestamp = beacon.timestamp
self.latitude = beacon.latitude
self.longitude = beacon.longitude
self.ground_speed = beacon.ground_speed
self.track = beacon.track
self.altitude = beacon.altitude
self.comment = beacon.comment
self.parse(beacon.comment)
def parse(self, text):
for part in text.split(' '):
version_match = self.version_pattern.match(part)
cpu_match = self.cpu_pattern.match(part)
cpu_temp_match = self.cpu_temp_pattern.match(part)
ram_match = self.ram_pattern.match(part)
ntp_match = self.ntp_pattern.match(part)
rf_full_match = self.rf_pattern_full.match(part)
rf_light1_match = self.rf_pattern_light1.match(part)
rf_light2_match = self.rf_pattern_light2.match(part)
if version_match is not None:
self.version = version_match.group(1)
elif cpu_match is not None:
self.cpu_load = float(cpu_match.group(1))
elif cpu_temp_match is not None:
self.cpu_temp = float(cpu_temp_match.group(1))
elif ram_match is not None:
self.free_ram = float(ram_match.group(1))
self.total_ram = float(ram_match.group(2))
elif ntp_match is not None:
self.ntp_error = float(ntp_match.group(1))
self.rt_crystal_correction = float(ntp_match.group(2))
elif rf_full_match is not None:
self.rec_crystal_correction = int(rf_full_match.group(1))
self.rec_crystal_correction_fine = float(rf_full_match.group(2))
self.rec_input_noise = float(rf_full_match.group(3))
elif rf_light1_match is not None:
self.rec_input_noise = float(rf_light1_match.group(1))
elif rf_light2_match is not None:
self.rec_crystal_correction = int(rf_light2_match.group(1))
self.rec_crystal_correction_fine = float(rf_light2_match.group(2))
else:
raise Exception("No valid receiver description: %s" % part)
def __repr__(self):
return("<Receiver %s: %s>" % (self.name, self.version))

0
ogn/ogn.db 100644
Wyświetl plik

67
ogn/ognproceed.py 100644
Wyświetl plik

@ -0,0 +1,67 @@
import socket
from time import time
from ogn import db_utils
from ogn import settings
from ogn.aprs_parser import *
def proceed():
# create socket, connect to server, login and make a file object associated with the socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
sock.connect((settings.APRS_SERVER_HOST, settings.APRS_SERVER_PORT))
login = 'user %s pass %s vers PyGrabber 1.0 %s\n' % (settings.APRS_USER, settings.APRS_PASSCODE, settings.APRS_FILTER)
sock.send(login.encode())
sock_file = sock.makefile('rw')
keepalive_time = time()
try:
while True:
if time()-keepalive_time > 60:
sock.send("#keepalive".encode())
keepalive_time = time()
# Read packet string from socket
try:
packet_str = sock_file.readline().strip()
except socket.error:
print('Socket error on readline')
continue
# A zero length line should not be return if keepalives are being sent
# A zero length line will only be returned after ~30m if keepalives are not sent
if len(packet_str) == 0:
print('Read returns zero length string. Failure. Orderly closeout')
break
proceed_line(packet_str)
finally:
# close everything
print('Close socket')
sock.shutdown(0)
sock.close()
def proceed_line(line):
try:
result = parse_aprs(line)
except Exception as e:
print('Failed to parse line: %s' % line)
print('Reason: %s' % e)
return
if isinstance(result, Position):
db_utils.put_position_into_db(result)
elif isinstance(result, Receiver):
db_utils.put_receiver_into_db(result)
if __name__ == '__main__':
while True:
try:
print("Start Python_Test")
proceed()
print("Python Test Exit")
except OSError as e:
print("OSError %s" % e)

6
ogn/settings.py 100644
Wyświetl plik

@ -0,0 +1,6 @@
APRS_SERVER_HOST = 'aprs.glidernet.org'
APRS_SERVER_PORT = 14580
APRS_USER = 'PyGrabber'
APRS_PASSCODE = -1 # Read only
APRS_FILTER = "filter r/+50.0000/+10.0000/5000"

3
requirements.txt 100644
Wyświetl plik

@ -0,0 +1,3 @@
SQLAlchemy==1.0.8
nose==1.3.7
coveralls==0.4.4

Wyświetl plik

@ -0,0 +1 @@

Wyświetl plik

@ -0,0 +1 @@

Wyświetl plik

@ -0,0 +1,25 @@
import unittest
from ogn.aprs_utils import *
from ogn.model.beacon import Beacon
class TestStringMethods(unittest.TestCase):
def test_basic(self):
beacon = Beacon()
beacon.parse("FLRDDA5BA>APRS,qAS,LFMX:/160829h4415.41N/00600.03E'342/049/A=005524 this is a comment")
self.assertEqual(beacon.name, "FLRDDA5BA")
self.assertEqual(beacon.receiver_name, "LFMX")
self.assertEqual(beacon.timestamp.strftime('%H:%M:%S'), "16:08:29")
self.assertAlmostEqual(beacon.latitude, dmsToDeg(44.1541), 5)
self.assertEqual(beacon.symboltable, '/')
self.assertAlmostEqual(beacon.longitude, dmsToDeg(6.0003), 5)
self.assertEqual(beacon.symbolcode, '\'')
self.assertEqual(beacon.ground_speed, 342*kts2kmh)
self.assertAlmostEqual(beacon.altitude*m2feet, 5524, 5)
self.assertEqual(beacon.comment, "this is a comment")
if __name__ == '__main__':
unittest.main()

Wyświetl plik

@ -0,0 +1,38 @@
import unittest
from ogn.model.address_origin import *
from ogn.model.flarm import Flarm
class TestStringMethods(unittest.TestCase):
def test_ddb(self):
flarm = Flarm()
flarm.parse_ogn("'F','DD9703','Twin Astir II','D-8203','7G','Y','N'\r\n")
self.assertEqual(flarm.address_type, 'F')
self.assertEqual(flarm.address, 'DD9703')
self.assertEqual(flarm.aircraft, 'Twin Astir II')
self.assertEqual(flarm.registration, 'D-8203')
self.assertEqual(flarm.competition, '7G')
self.assertTrue(flarm.tracked)
self.assertFalse(flarm.identified)
self.assertEqual(flarm.address_origin, AddressOrigin.ogn_ddb)
def test_flarmnet(self):
flarm = Flarm()
flarm.parse_flarmnet('4446304242325265696e686f6c64204dfc6c6c65722020202020204c535a46202020202020202020202020202020202056656e747573203263784d2020202020202020202048422d323532375836203132332e303030')
self.assertEqual(flarm.address, 'DF0BB2')
self.assertEqual(flarm.name, 'Reinhold Müller')
self.assertEqual(flarm.airport, 'LSZF')
self.assertEqual(flarm.aircraft, 'Ventus 2cxM')
self.assertEqual(flarm.registration, 'HB-2527')
self.assertEqual(flarm.competition, 'X6')
self.assertEqual(flarm.frequency, '123.000')
self.assertEqual(flarm.address_origin, AddressOrigin.flarmnet)
if __name__ == '__main__':
unittest.main()

Wyświetl plik

@ -0,0 +1,51 @@
import unittest
from ogn.aprs_utils import *
from ogn.model.beacon import Beacon
from ogn.model.position import Position
class TestStringMethods(unittest.TestCase):
def test_basic(self):
position = Position()
position.parse("id0ADDA5BA -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5 hear1084 hearB597 hearB598")
self.assertFalse(position.stealth)
self.assertEqual(position.address, "DDA5BA")
self.assertAlmostEqual(position.climb_rate*ms2fpm, -454, 2)
self.assertEqual(position.turn_rate, -1.1)
self.assertEqual(position.signal_strength, 8.8)
self.assertEqual(position.error_count, 0)
self.assertEqual(position.frequency_offset, 51.2)
self.assertEqual(position.gps_status, '4x5')
self.assertEqual(len(position.heared_aircraft_IDs), 3)
self.assertEqual(position.heared_aircraft_IDs[0], '1084')
self.assertEqual(position.heared_aircraft_IDs[1], 'B597')
self.assertEqual(position.heared_aircraft_IDs[2], 'B598')
def test_stealth(self):
position = Position()
position.parse("id0ADD1234")
self.assertFalse(position.stealth)
position.parse("id8ADD1234")
self.assertTrue(position.stealth)
@unittest.skip("v0.2.4 not implemented yet")
def test_ver024(self):
position = Position()
position.parse("!W26! id21400EA9 -2454fpm +0.9rot 19.5dB 0e -6.6kHz gps1x1 s6.02 h44 rDF0C56")
def test_copy_constructor(self):
beacon = Beacon()
beacon.parse("FLRDDA5BA>APRS,qAS,LFMX:/160829h4415.41N/00600.03E'342/049/A=005524 id0ADDA5BA -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5")
position = Position(beacon)
self.assertEqual(position.name, 'FLRDDA5BA')
self.assertEqual(position.address, 'DDA5BA')
if __name__ == '__main__':
unittest.main()

Wyświetl plik

@ -0,0 +1,21 @@
import unittest
from ogn.model.receiver import Receiver
class TestStringMethods(unittest.TestCase):
def test_basic(self):
receiver = Receiver()
receiver.parse("v0.2.2 CPU:0.8 RAM:695.7/4025.5MB NTP:16000.0ms/+0.0ppm +63.0C")
self.assertEqual(receiver.version, '0.2.2')
self.assertEqual(receiver.cpu_load, 0.8)
self.assertEqual(receiver.cpu_temp, 63.0)
self.assertEqual(receiver.free_ram, 695.7)
self.assertEqual(receiver.total_ram, 4025.5)
self.assertEqual(receiver.ntp_error, 16000.0)
self.assertEqual(receiver.rec_crystal_correction, 0.0)
if __name__ == '__main__':
unittest.main()

Wyświetl plik

@ -0,0 +1,34 @@
import unittest
from ogn.aprs_parser import parse_aprs
class TestStringMethods(unittest.TestCase):
def test_server(self):
parse_aprs("# aprsc 2.0.14-g28c5a6a 10 Apr 2015 18:58:47 GMT GLIDERN1 37.187.40.234:14580")
def test_beacons(self):
lines = list()
lines.append( "FLRDDA5BA>APRS,qAS,LFMX:/160829h4415.41N/00600.03E'342/049/A=005524 id0ADDA5BA -454fpm -1.1rot 8.8dB 0e +51.2kHz gps4x5")
lines.append( "ICA4B0E3A>APRS,qAS,Letzi:/072319h4711.75N\\00802.59E^327/149/A=006498 id154B0E3A -3959fpm +0.5rot 9.0dB 0e -6.3kHz gps1x3")
lines.append( "Lachens>APRS,TCPIP*,qAC,GLIDERN2:/165334h4344.70NI00639.19E&/A=005435 v0.2.1 CPU:0.3 RAM:1764.4/2121.4MB NTP:2.8ms/+4.9ppm +47.0C RF:+0.70dB")
lines.append( "LFGU>APRS,TCPIP*,qAC,GLIDERN2:/190556h4907.63NI00706.41E&/A=000833 v0.2.0 CPU:0.9 RAM:281.3/458.9MB NTP:0.5ms/-19.1ppm +53.0C RF:+0.70dB")
lines.append( "FLRDDB091>APRS,qAS,Letzi:/195831h4740.04N/00806.01EX152/124/A=004881 id06DD8E80 +198fpm +0.0rot 6.5dB 13e +4.0kHz gps3x4")
lines.append( "LSGS>APRS,TCPIP*,qAC,GLIDERN1:/195345h4613.25NI00719.68E&/A=001581 CPU:0.7 RAM:247.9/456.4MB NTP:0.7ms/-11.4ppm +44.4C RF:+53+71.9ppm/+0.4dB")
lines.append( "FLRDDDD33>APRS,qAS,LFNF:/165341h4344.27N/00547.41E'/A=000886 id06DDDD33 +020fpm +0.0rot 20.8dB 0e -14.3kHz gps3x4")
lines.append( "FLRDDE026>APRS,qAS,LFNF:/165341h4358.58N/00553.89E'204/055/A=005048 id06DDE026 +257fpm +0.1rot 7.2dB 0e -0.8kHz gps4x7")
lines.append( "ICA484A9C>APRS,qAS,LFMX:/165341h4403.50N/00559.67E'/A=001460 id05484A9C +000fpm +0.0rot 18.0dB 0e +3.5kHz gps4x7")
lines.append( "WolvesSW>APRS,TCPIP*,qAC,GLIDERN2:/165343h5232.23NI00210.91W&/A=000377 CPU:1.5 RAM:159.9/458.7MB NTP:6.6ms/-36.7ppm +45.5C RF:+130-0.4ppm/-0.1dB")
lines.append( "Oxford>APRS,TCPIP*,qAC,GLIDERN1:/190533h5142.96NI00109.68W&/A=000380 v0.1.3 CPU:0.9 RAM:268.8/458.6MB NTP:0.5ms/-45.9ppm +60.5C RF:+55+2.9ppm/+1.54dB")
lines.append( "OGNE95A16>APRS,qAS,Sylwek:/203641h5001.94N/01956.91E'270/004/A=000000 id07E95A16 +000fpm +0.1rot 37.8dB 0e -0.4kHz")
lines.append( "Salland>APRS,TCPIP*,qAC,GLIDERN2:/201426h5227.93NI00620.03E&/A=000049 v0.2.2 CPU:0.7 RAM:659.3/916.9MB NTP:2.5ms/-75.0ppm RF:+0.41dB")
lines.append( "LSGS>APRS,TCPIP*,qAC,GLIDERN1:/195345h4613.25NI00719.68E&/A=001581 CPU:0.7 RAM:247.9/456.4MB NTP:0.7ms/-11.4ppm +44.4C RF:+53+71.9ppm/+0.4dB")
lines.append("Drenstein>APRS,TCPIP*,qAC,GLIDERN1:/203011h5147.51NI00744.45E&/A=000213 v0.2.2 CPU:0.8 RAM:695.7/4025.5MB NTP:16000.0ms/+0.0ppm +63.0C")
lines.append( "ZK-GSC>APRS,qAS,Omarama:/210202h4429.25S/16959.33E'/A=001407 id05C821EA +020fpm +0.0rot 16.8dB 0e -3.1kHz gps1x3 hear1084 hearB597 hearB598")
for line in lines:
result = parse_aprs(line)
print(str(result), "\n")
if __name__ == '__main__':
unittest.main()

Wyświetl plik

@ -0,0 +1,21 @@
from datetime import datetime
import unittest
from ogn.aprs_utils import createTimestamp
class TestStringMethods(unittest.TestCase):
def test_createTimestamp_seconds_behind(self):
timestamp = createTimestamp('235959', datetime(2015, 10, 16, 0, 0, 1))
self.assertEqual(timestamp, datetime(2015, 10, 15, 23, 59, 59))
def test_createTimestamp_seconds_before(self):
timestamp = createTimestamp('000001', datetime(2015, 10, 15, 23, 59, 59))
self.assertEqual(timestamp, datetime(2015, 10, 16, 0, 0, 1))
def test_createTimestamp_big_difference(self):
with self.assertRaises(Exception):
createTimestamp(datetime(2015, 10, 15, 23, 59, 59), '123456')
if __name__ == '__main__':
unittest.main()