kopia lustrzana https://github.com/glidernet/ogn-python
Bugfixes from RPi
rodzic
03cfd6136a
commit
52c9df4f5e
22
README.md
22
README.md
|
@ -53,13 +53,7 @@ It requires [PostgreSQL](http://www.postgresql.org/), [PostGIS](http://www.postg
|
|||
./flask database init
|
||||
```
|
||||
|
||||
8. Prepare tables for TimescaleDB
|
||||
|
||||
```
|
||||
./flask database init_timescaledb
|
||||
```
|
||||
|
||||
9. Optional: Import world border dataset (needed if you want to know the country a receiver belongs to, etc.)
|
||||
8. Optional: Import world border dataset (needed if you want to know the country a receiver belongs to, etc.)
|
||||
Get the [World Borders Dataset](http://thematicmapping.org/downloads/world_borders.php) and unpack it.
|
||||
Then import it into your database (we use "ogn" as database name).
|
||||
|
||||
|
@ -69,30 +63,30 @@ It requires [PostgreSQL](http://www.postgresql.org/), [PostGIS](http://www.postg
|
|||
psql -d ogn -c "DROP TABLE world_borders_temp;"
|
||||
```
|
||||
|
||||
10. Get world elevation data (needed for AGL calculation)
|
||||
9. Get world elevation data (needed for AGL calculation)
|
||||
Sources: There are many sources for DEM data. It is important that the spatial reference system (SRID) is the same as the database which is 4326.
|
||||
The [GMTED2010 Viewer](https://topotools.cr.usgs.gov/gmted_viewer/viewer.htm) provides data for the world with SRID 4326. Just download the data you need.
|
||||
|
||||
|
||||
11. Import the GeoTIFF into the elevation table:
|
||||
10. Import the GeoTIFF into the elevation table:
|
||||
|
||||
```
|
||||
raster2pgsql *.tif -s 4326 -d -M -C -I -F -t 25x25 public.elevation | psql -d ogn
|
||||
```
|
||||
|
||||
12. Import Airports (needed for takeoff and landing calculation). A cup file is provided under tests:
|
||||
11. Import Airports (needed for takeoff and landing calculation). A cup file is provided under tests:
|
||||
|
||||
```
|
||||
flask database import_airports tests/SeeYou.cup
|
||||
```
|
||||
|
||||
13. Import DDB (needed for registration signs in the logbook).
|
||||
12. Import DDB (needed for registration signs in the logbook).
|
||||
|
||||
```
|
||||
flask database import_ddb
|
||||
```
|
||||
|
||||
14. Optional: Use supervisord
|
||||
13. Optional: Use supervisord
|
||||
You can use [Supervisor](http://supervisord.org/) to control the complete system. In the directory deployment/supervisor
|
||||
we have some configuration files to feed the database (ogn-feed), run the celery worker (celeryd), the celery beat
|
||||
(celerybeatd), the celery monitor (flower), and the python wsgi server (gunicorn). All files assume that
|
||||
|
@ -176,5 +170,9 @@ python3
|
|||
>>>update_takeoff_landings.delay(last_minutes=90)
|
||||
```
|
||||
|
||||
## Notes for Raspberry Pi
|
||||
For matplotlib we need several apt packages installed:
|
||||
apt install libatlas3-base libopenjp2-7 libtiff5
|
||||
|
||||
## License
|
||||
Licensed under the [AGPLv3](LICENSE).
|
||||
|
|
|
@ -112,10 +112,10 @@ def update_takeoff_landings(start, end):
|
|||
.subquery()
|
||||
)
|
||||
|
||||
# get the device id instead of the name and consider them if the are near airports ...
|
||||
# get the sender id instead of the name and consider them if the are near airports ...
|
||||
sq5 = (
|
||||
db.session.query(
|
||||
sq4.c.timestamp, sq4.c.track, sq4.c.is_takeoff, Sender.id.label("device_id"), Airport.id.label("airport_id"), func.ST_DistanceSphere(sq4.c.location, Airport.location_wkt).label("airport_distance")
|
||||
sq4.c.timestamp, sq4.c.track, sq4.c.is_takeoff, Sender.id.label("sender_id"), Airport.id.label("airport_id"), func.ST_DistanceSphere(sq4.c.location, Airport.location_wkt).label("airport_distance"), Airport.country_code
|
||||
)
|
||||
.filter(and_(func.ST_Within(sq4.c.location, Airport.border),
|
||||
between(Airport.style, 2, 5)))
|
||||
|
@ -125,17 +125,16 @@ def update_takeoff_landings(start, end):
|
|||
|
||||
# ... and take the nearest airport
|
||||
sq6 = (
|
||||
db.session.query(sq5.c.timestamp, sq5.c.track, sq5.c.is_takeoff, sq5.c.device_id, sq5.c.airport_id)
|
||||
.distinct(sq5.c.timestamp, sq5.c.track, sq5.c.is_takeoff, sq5.c.device_id)
|
||||
.order_by(sq5.c.timestamp, sq5.c.track, sq5.c.is_takeoff, sq5.c.device_id, sq5.c.airport_distance)
|
||||
db.session.query(sq5.c.timestamp, sq5.c.track, sq5.c.is_takeoff, sq5.c.sender_id, sq5.c.airport_id, sq5.c.country_code)
|
||||
.distinct(sq5.c.timestamp, sq5.c.track, sq5.c.is_takeoff, sq5.c.sender_id)
|
||||
.order_by(sq5.c.timestamp, sq5.c.track, sq5.c.is_takeoff, sq5.c.sender_id, sq5.c.airport_distance)
|
||||
.subquery()
|
||||
)
|
||||
|
||||
# ... add the country
|
||||
takeoff_landing_query = (
|
||||
db.session.query(sq6.c.timestamp, sq6.c.track, sq6.c.is_takeoff, sq6.c.device_id, sq6.c.airport_id, Country.gid)
|
||||
.join(Airport, sq6.c.airport_id==Airport.id)
|
||||
.join(Country, Airport.country_code==Country.iso2, isouter=True)
|
||||
db.session.query(sq6.c.timestamp, sq6.c.track, sq6.c.is_takeoff, sq6.c.sender_id, sq6.c.airport_id, Country.gid)
|
||||
.join(Country, sq6.c.country_code==Country.iso2, isouter=True)
|
||||
.subquery()
|
||||
)
|
||||
|
||||
|
@ -288,6 +287,7 @@ def update_logbook(offset_days=None):
|
|||
Logbook.landing_airport_id==complete_flight_query.c.landing_airport_id
|
||||
)) \
|
||||
.values(takeoff_timestamp=complete_flight_query.c.takeoff_timestamp,
|
||||
takeoff_track=complete_flight_query.c.takeoff_track,
|
||||
takeoff_airport_id=complete_flight_query.c.takeoff_airport_id
|
||||
)
|
||||
result = db.session.execute(upd)
|
||||
|
@ -305,6 +305,7 @@ def update_logbook(offset_days=None):
|
|||
Logbook.landing_airport_id==null()
|
||||
)) \
|
||||
.values(landing_timestamp=complete_flight_query.c.landing_timestamp,
|
||||
landing_track=complete_flight_query.c.landing_track,
|
||||
landing_airport_id=complete_flight_query.c.landing_airport_id
|
||||
)
|
||||
result = db.session.execute(upd)
|
||||
|
|
|
@ -43,29 +43,26 @@ def info():
|
|||
|
||||
@user_cli.command("init")
|
||||
def init():
|
||||
"""Initialize the database."""
|
||||
"""Initialize the database (with PostGIS and TimescaleDB extensions)."""
|
||||
|
||||
from alembic.config import Config
|
||||
from alembic import command
|
||||
|
||||
# Create PostGIS and PostGIS extensions
|
||||
db.session.execute("CREATE EXTENSION IF NOT EXISTS postgis;")
|
||||
db.session.execute("CREATE EXTENSION IF NOT EXISTS btree_gist;")
|
||||
db.session.execute("CREATE EXTENSION IF NOT EXISTS timescaledb;")
|
||||
db.session.commit()
|
||||
|
||||
# Create Scheme
|
||||
db.create_all()
|
||||
|
||||
print("Done.")
|
||||
|
||||
|
||||
@user_cli.command("init_timescaledb")
|
||||
def init_timescaledb():
|
||||
"""Initialize TimescaleDB features."""
|
||||
|
||||
db.session.execute("CREATE EXTENSION IF NOT EXISTS timescaledb;")
|
||||
# Change (sender|receiver)_positions to TimescaleDB table
|
||||
db.session.execute("SELECT create_hypertable('sender_positions', 'reference_timestamp', chunk_time_interval => interval '3 hours', if_not_exists => TRUE);")
|
||||
db.session.execute("SELECT create_hypertable('receiver_positions', 'reference_timestamp', chunk_time_interval => interval '1 day', if_not_exists => TRUE);")
|
||||
db.session.commit()
|
||||
|
||||
print("Done.")
|
||||
print("Initialized the database (with PostGIS and TimescaleDB extensions).")
|
||||
|
||||
|
||||
@user_cli.command("drop")
|
||||
|
|
|
@ -6,6 +6,8 @@ import re
|
|||
import csv
|
||||
import os
|
||||
|
||||
from sqlalchemy.orm.exc import NoResultFound
|
||||
|
||||
from aerofiles.igc import Writer
|
||||
from app.model import SenderPosition, Sender
|
||||
from app import db
|
||||
|
@ -132,18 +134,18 @@ def cup():
|
|||
@click.argument("date")
|
||||
def igc(address, date):
|
||||
"""Export igc file for <address> at <date>."""
|
||||
if not re.match(".{6}", address):
|
||||
print("Address {} not valid.".format(address))
|
||||
if not re.match("[0-9A-F]{6}", address):
|
||||
print(f"Address '{address}' not valid.")
|
||||
return
|
||||
|
||||
try:
|
||||
sender = db.session.query(Sender).filter(Sender.address==address).one()
|
||||
except NoResultFound as e:
|
||||
print(f"No data for '{address}' in the DB")
|
||||
return
|
||||
|
||||
if not re.match(r"\d{4}-\d{2}-\d{2}", date):
|
||||
print("Date {} not valid.".format(date))
|
||||
return
|
||||
|
||||
device_id = db.session.query(Sender.id).filter(Sender.address == address).first()
|
||||
|
||||
if device_id is None:
|
||||
print("Device with address '{}' not found.".format(address))
|
||||
print(f"Date {date} not valid.")
|
||||
return
|
||||
|
||||
with open("sample.igc", "wb") as fp:
|
||||
|
@ -155,27 +157,26 @@ def igc(address, date):
|
|||
"logger_id": "OGN",
|
||||
"date": datetime.date(1987, 2, 24),
|
||||
"fix_accuracy": 50,
|
||||
"pilot": "Konstantin Gruendger",
|
||||
"pilot": "Unknown",
|
||||
"copilot": "",
|
||||
"glider_type": "Duo Discus",
|
||||
"glider_id": "D-KKHH",
|
||||
"firmware_version": "2.2",
|
||||
"hardware_version": "2",
|
||||
"logger_type": "LXNAVIGATION,LX8000F",
|
||||
"gps_receiver": "uBLOX LEA-4S-2,16,max9000m",
|
||||
"pressure_sensor": "INTERSEMA,MS5534A,max10000m",
|
||||
"competition_id": "2H",
|
||||
"competition_class": "Doubleseater",
|
||||
"glider_type": sender.infos[0].aircraft if len(sender.infos) > 0 else '',
|
||||
"glider_id": sender.infos[0].registration if len(sender.infos) > 0 else '',
|
||||
"firmware_version": sender.software_version,
|
||||
"hardware_version": sender.hardware_version,
|
||||
"logger_type": "OGN",
|
||||
"gps_receiver": "unknown",
|
||||
"pressure_sensor": "unknown",
|
||||
"competition_id": sender.infos[0].competition if len(sender.infos) > 0 else '',
|
||||
"competition_class": "unknown",
|
||||
}
|
||||
)
|
||||
|
||||
points = (
|
||||
db.session.query(SenderPosition)
|
||||
.filter(SenderPosition.device_id == device_id)
|
||||
.filter(SenderPosition.timestamp > date + " 00:00:00")
|
||||
.filter(SenderPosition.timestamp < date + " 23:59:59")
|
||||
.order_by(SenderPosition.timestamp)
|
||||
.filter(db.between(SenderPosition.reference_timestamp, f"{date} 00:00:00", f"{date} 23:59:59"))
|
||||
.filter(SenderPosition.name == sender.name)
|
||||
.order_by(SenderPosition.timestamp)
|
||||
)
|
||||
|
||||
for point in points.all():
|
||||
for point in points:
|
||||
writer.write_fix(point.timestamp.time(), latitude=point.location.latitude, longitude=point.location.longitude, valid=True, pressure_alt=point.altitude, gps_alt=point.altitude)
|
||||
|
|
|
@ -16,6 +16,7 @@ class CoverageStatistic(db.Model):
|
|||
messages_count = db.Column(db.Integer)
|
||||
max_distance = db.Column(db.Float(precision=2))
|
||||
max_normalized_quality = db.Column(db.Float(precision=2))
|
||||
coverages_count = db.Column(db.Integer)
|
||||
|
||||
# Relations
|
||||
sender_id = db.Column(db.Integer, db.ForeignKey("senders.id", ondelete="CASCADE"), index=True)
|
||||
|
|
|
@ -59,9 +59,9 @@
|
|||
{% for entry in logbook %}
|
||||
<tr>
|
||||
<td>{{ loop.index }}</td>
|
||||
<td>{% if ns.mydate != entry.reference.strftime('%Y-%m-%d') %}{% set ns.mydate = entry.reference.strftime('%Y-%m-%d') %}{{ ns.mydate }}{% endif %}</td>
|
||||
<td>{% if entry.takeoff_airport is not none %}<a href="{{ url_for('main.logbooks', country=entry.takeoff_airport.country_code, airport_id=entry.takeoff_airport.id, date=entry.reference.strftime('%Y-%m-%d')) }}">{{ entry.takeoff_airport.name }}</a>{% endif %}</td>
|
||||
<td>{% if entry.landing_airport is not none %}<a href="{{ url_for('main.logbooks', country=entry.landing_airport.country_code, airport_id=entry.landing_airport.id, date=entry.reference.strftime('%Y-%m-%d')) }}">{{ entry.landing_airport.name }}</a>{% endif %}</td>
|
||||
<td>{% if ns.mydate != entry.reference_timestamp.strftime('%Y-%m-%d') %}{% set ns.mydate = entry.reference_timestamp.strftime('%Y-%m-%d') %}{{ ns.mydate }}{% endif %}</td>
|
||||
<td>{% if entry.takeoff_airport is not none %}<a href="{{ url_for('main.logbooks', country=entry.takeoff_airport.country_code, airport_id=entry.takeoff_airport.id, date=entry.reference_timestamp.strftime('%Y-%m-%d')) }}">{{ entry.takeoff_airport.name }}</a>{% endif %}</td>
|
||||
<td>{% if entry.landing_airport is not none %}<a href="{{ url_for('main.logbooks', country=entry.landing_airport.country_code, airport_id=entry.landing_airport.id, date=entry.reference_timestamp.strftime('%Y-%m-%d')) }}">{{ entry.landing_airport.name }}</a>{% endif %}</td>
|
||||
<td>{% if entry.takeoff_timestamp is not none %} {{ entry.takeoff_timestamp.strftime('%H:%M') }} {% endif %}</td>
|
||||
<td>{% if entry.landing_timestamp is not none %} {{ entry.landing_timestamp.strftime('%H:%M') }} {% endif %}</td>
|
||||
<td>{% if entry.duration is not none %}{{ entry.duration }}{% endif %}</td>
|
||||
|
|
15
config.py
15
config.py
|
@ -25,12 +25,15 @@ class DefaultConfig(BaseConfig):
|
|||
from datetime import timedelta
|
||||
|
||||
CELERYBEAT_SCHEDULE = {
|
||||
#"update-ddb": {"task": "import_ddb", "schedule": timedelta(hours=1)},
|
||||
#"update-country-codes": {"task": "update_receivers_country_code", "schedule": timedelta(days=1)},
|
||||
#"update-takeoff-and-landing": {"task": "update_takeoff_landings", "schedule": timedelta(hours=1), "kwargs": {"last_minutes": 90}},
|
||||
#"update-logbook": {"task": "update_logbook_entries", "schedule": timedelta(hours=2), "kwargs": {"day_offset": 0}},
|
||||
#"update-max-altitudes": {"task": "update_logbook_max_altitude", "schedule": timedelta(hours=1), "kwargs": {"day_offset": 0}},
|
||||
#"update-logbook-daily": {"task": "update_logbook_entries", "schedule": crontab(hour=1, minute=0), "kwargs": {"day_offset": -1}},
|
||||
"transfer_to_database": {"task": "transfer_to_database", "schedule": timedelta(minutes=1)},
|
||||
"update_statistics": {"task": "update_statistics", "schedule": timedelta(minutes=5)},
|
||||
"update_takeoff_landings": {"task": "update_takeoff_landings", "schedule": timedelta(minutes=1), "kwargs": {"last_minutes": 20}},
|
||||
"update_logbook": {"task": "update_logbook", "schedule": timedelta(minutes=1)},
|
||||
"update_logbook_previous_day": {"task": "update_logbook", "schedule": crontab(hour=1, minute=0), "kwargs": {"day_offset": -1}},
|
||||
|
||||
"update_ddb_daily": {"task": "import_ddb", "schedule": timedelta(days=1)},
|
||||
#"update_logbook_max_altitude": {"task": "update_logbook_max_altitude", "schedule": timedelta(minutes=1), "kwargs": {"offset_days": 0}},
|
||||
|
||||
#"purge_old_data": {"task": "purge_old_data", "schedule": timedelta(hours=1), "kwargs": {"max_hours": 48}},
|
||||
}
|
||||
|
||||
|
|
|
@ -1,37 +0,0 @@
|
|||
server {
|
||||
# listen on port 80 (http)
|
||||
listen 80;
|
||||
server_name _;
|
||||
location / {
|
||||
# redirect any requests to the same URL but on https
|
||||
return 301 https://$host$request_uri;
|
||||
}
|
||||
}
|
||||
server {
|
||||
# listen on port 443 (https)
|
||||
listen 443 ssl;
|
||||
server_name _;
|
||||
|
||||
# location of the self-signed SSL certificate
|
||||
ssl_certificate /home/ubuntu/ddb/certs/cert.pem;
|
||||
ssl_certificate_key /home/ubuntu/ddb/certs/key.pem;
|
||||
|
||||
# write access and error logs to /var/log
|
||||
access_log /var/log/ddb_access.log;
|
||||
error_log /var/log/ddb_error.log;
|
||||
|
||||
location / {
|
||||
# forward application requests to the gunicorn server
|
||||
proxy_pass http://localhost:8000;
|
||||
proxy_redirect off;
|
||||
proxy_set_header Host $host;
|
||||
proxy_set_header X-Real-IP $remote_addr;
|
||||
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
|
||||
}
|
||||
|
||||
location /static {
|
||||
# handle static files directly, without forwarding to the application
|
||||
alias /home/ubuntu/ddb/app/static;
|
||||
expires 30d;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,20 @@
|
|||
server {
|
||||
listen 80;
|
||||
server_name api.example.com;
|
||||
|
||||
location / {
|
||||
proxy_pass "http://localhost:5000";
|
||||
proxy_redirect off;
|
||||
proxy_set_header Host $host;
|
||||
proxy_set_header X-Real-IP $remote_addr;
|
||||
fastcgi_read_timeout 300s;
|
||||
proxy_read_timeout 300;
|
||||
}
|
||||
|
||||
location /static {
|
||||
alias /home/pi/ogn-python/app/static/;
|
||||
}
|
||||
|
||||
error_log /var/log/nginx/api-error.log;
|
||||
access_log /var/log/nginx/api-access.log;
|
||||
}
|
1
setup.py
1
setup.py
|
@ -55,6 +55,7 @@ setup(
|
|||
'flower==0.9.5',
|
||||
'tqdm==4.51.0',
|
||||
'requests==2.25.0',
|
||||
'matplotlib=3.3.3'
|
||||
],
|
||||
test_require=[
|
||||
'pytest==5.0.1',
|
||||
|
|
|
@ -1,56 +0,0 @@
|
|||
import os
|
||||
import unittest
|
||||
import datetime
|
||||
from app.model import AircraftBeacon, ReceiverBeacon
|
||||
from app.gateway.bulkimport import DbFeeder
|
||||
|
||||
from tests.base import TestBaseDB, db
|
||||
|
||||
class TestDatabase(TestBaseDB):
|
||||
def test_valid_messages(self):
|
||||
"""This test insert all valid beacons. source: https://github.com/glidernet/ogn-aprs-protocol/valid_messages"""
|
||||
|
||||
path = os.path.join(os.path.dirname(__file__), 'valid_messages')
|
||||
with os.scandir(path) as it:
|
||||
for entry in it:
|
||||
if entry.name.endswith(".txt") and entry.is_file():
|
||||
with DbFeeder() as feeder:
|
||||
print(f"Parsing {entry.name}")
|
||||
with open(entry.path) as file:
|
||||
for line in file:
|
||||
feeder.add(line, datetime.datetime(2020, 5, 1, 13, 22, 1))
|
||||
|
||||
def test_ognsdr_beacons(self):
|
||||
"""This test tests if status+position is correctly merged."""
|
||||
|
||||
aprs_stream = (
|
||||
"LILH>OGNSDR,TCPIP*,qAC,GLIDERN2:/132201h4457.61NI00900.58E&/A=000423\n"
|
||||
"LILH>OGNSDR,TCPIP*,qAC,GLIDERN2:>132201h v0.2.7.RPI-GPU CPU:0.7 RAM:770.2/968.2MB NTP:1.8ms/-3.3ppm +55.7C 7/8Acfts[1h] RF:+54-1.1ppm/-0.16dB/+7.1dB@10km[19481]/+16.8dB@10km[7/13]"
|
||||
)
|
||||
|
||||
with DbFeeder() as feeder:
|
||||
for line in aprs_stream.split('\n'):
|
||||
feeder.add(line, datetime.datetime(2020, 5, 1, 13, 22, 1))
|
||||
|
||||
self.assertEqual(len(db.session.query(ReceiverBeacon).all()), 1)
|
||||
for ab in db.session.query(ReceiverBeacon).all():
|
||||
print(ab)
|
||||
|
||||
def test_oneminute(self):
|
||||
with DbFeeder() as feeder:
|
||||
with open(os.path.dirname(__file__) + '/beacon_data/logs/oneminute.txt') as f:
|
||||
for line in f:
|
||||
timestamp = datetime.datetime.strptime(line[:26], '%Y-%m-%d %H:%M:%S.%f')
|
||||
aprs_string = line[28:]
|
||||
feeder.add(aprs_string, reference_timestamp=timestamp)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
#unittest.main()
|
||||
if True:
|
||||
import cProfile
|
||||
|
||||
from app import create_app
|
||||
app = create_app()
|
||||
with app.app_context():
|
||||
cProfile.run('TestDatabase().test_oneminute()', sort='tottime')
|
|
@ -1,30 +0,0 @@
|
|||
import datetime
|
||||
|
||||
import unittest
|
||||
|
||||
from tests.base import TestBaseDB, db
|
||||
from app.model import Sender, SenderInfo
|
||||
from app.model.device_info_origin import SenderInfoOrigin
|
||||
|
||||
|
||||
class TestStringMethods(TestBaseDB):
|
||||
def test_device_info(self):
|
||||
device = Sender(name="FLRDD0815", address="DD0815")
|
||||
device_info1 = SenderInfo(address="DD0815", address_origin=SenderInfoOrigin.OGN_DDB, registration="D-0815")
|
||||
device_info2 = SenderInfo(address="DD0815", address_origin=SenderInfoOrigin.FLARMNET, registration="15")
|
||||
|
||||
db.session.add(device)
|
||||
db.session.add(device_info1)
|
||||
db.session.add(device_info2)
|
||||
db.session.commit()
|
||||
|
||||
self.assertEqual(device.info, device_info1)
|
||||
|
||||
def test_expiry_date(self):
|
||||
device = Sender(name="FLRDD0815", address="DD0815", software_version=6.42)
|
||||
|
||||
self.assertEqual(device.expiry_date(), datetime.date(2019, 10, 31))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
|
@ -0,0 +1,30 @@
|
|||
import datetime
|
||||
|
||||
import unittest
|
||||
|
||||
from tests.base import TestBaseDB, db
|
||||
from app.model import Sender, SenderInfo, SenderInfoOrigin
|
||||
|
||||
|
||||
class TestStringMethods(TestBaseDB):
|
||||
def test_sender_info(self):
|
||||
sender = Sender(name="FLRDD0815", address="DD0815")
|
||||
sender_info1 = SenderInfo(address="DD0815", address_origin=SenderInfoOrigin.OGN_DDB, registration="D-0815")
|
||||
sender_info2 = SenderInfo(address="DD0815", address_origin=SenderInfoOrigin.FLARMNET, registration="15")
|
||||
|
||||
db.session.add(sender)
|
||||
db.session.add(sender_info1)
|
||||
db.session.add(sender_info2)
|
||||
db.session.commit()
|
||||
|
||||
self.assertEqual(len(sender.infos), 2)
|
||||
self.assertEqual(sender.infos[0], sender_info1)
|
||||
|
||||
def test_expiry_date(self):
|
||||
device = Sender(name="FLRDD0815", address="DD0815", software_version=6.42)
|
||||
|
||||
self.assertEqual(device.expiry_date(), datetime.date(2019, 10, 31))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
Ładowanie…
Reference in New Issue