kopia lustrzana https://github.com/glidernet/ogn-python
Fixed celery
rodzic
8d66261d3e
commit
2f577c8c29
16
README.md
16
README.md
|
@ -118,13 +118,13 @@ The following scripts run in the foreground and should be deamonized
|
|||
- Start a task server (make sure redis is up and running)
|
||||
|
||||
```
|
||||
celery -A app.collect worker -l info
|
||||
celery -A celery_app worker -l info
|
||||
```
|
||||
|
||||
- Start the task scheduler (make sure a task server is up and running)
|
||||
|
||||
```
|
||||
celery -A app.collect beat -l info
|
||||
celery -A celery_app beat -l info
|
||||
```
|
||||
|
||||
### Flask - Command Line Interface
|
||||
|
@ -163,14 +163,10 @@ Most commands are command groups, so if you execute this command you will get fu
|
|||
|
||||
### Available tasks
|
||||
|
||||
- `app.collect.celery.update_takeoff_landings` - Compute takeoffs and landings.
|
||||
- `app.collect.celery.update_logbook_entries` - Add/update logbook entries.
|
||||
- `app.collect.celery.update_logbook_max_altitude` - Add max altitudes in logbook when flight is complete (takeoff and landing).
|
||||
- `app.collect.celery.import_ddb` - Import registered devices from the DDB.
|
||||
- `app.collect.celery.update_receivers_country_code` - Update country code in receivers table if None.
|
||||
- `app.collect.celery.purge_old_data` - Delete AircraftBeacons and ReceiverBeacons older than given 'age'.
|
||||
- `app.collect.celery.update_stats` - Create stats and update receivers/devices with stats.
|
||||
- `app.collect.celery.update_ognrange` - Create receiver coverage stats for Melissas ognrange.
|
||||
- `app.tasks.update_takeoff_landings` - Compute takeoffs and landings.
|
||||
- `app.tasks.celery.update_logbook_entries` - Add/update logbook entries.
|
||||
- `app.tasks.celery.update_logbook_max_altitude` - Add max altitudes in logbook when flight is complete (takeoff and landing).
|
||||
- `app.tasks.celery.import_ddb` - Import registered devices from the DDB.
|
||||
|
||||
If the task server is up and running, tasks could be started manually. Here we compute takeoffs and landings for the past 90 minutes:
|
||||
|
||||
|
|
|
@ -0,0 +1,5 @@
|
|||
from .sql_tasks import update_statistics, update_sender_direction_statistics
|
||||
|
||||
from .orm_tasks import transfer_to_database
|
||||
from .orm_tasks import update_takeoff_landings, update_logbook, update_logbook_max_altitude
|
||||
from .orm_tasks import import_ddb
|
|
@ -0,0 +1,51 @@
|
|||
from datetime import datetime, timedelta
|
||||
|
||||
from app.collect.logbook import update_takeoff_landings as logbook_update_takeoff_landings, update_logbook as logbook_update
|
||||
from app.collect.logbook import update_max_altitudes as logbook_update_max_altitudes
|
||||
|
||||
from app.collect.database import import_ddb as device_infos_import_ddb
|
||||
|
||||
from app.collect.gateway import transfer_from_redis_to_database
|
||||
|
||||
from app import db, celery
|
||||
|
||||
@celery.task(name="transfer_to_database")
|
||||
def transfer_to_database():
|
||||
"""Transfer APRS data from Redis to database."""
|
||||
|
||||
result = transfer_from_redis_to_database()
|
||||
return result
|
||||
|
||||
|
||||
@celery.task(name="update_takeoff_landings")
|
||||
def update_takeoff_landings(last_minutes):
|
||||
"""Compute takeoffs and landings."""
|
||||
|
||||
end = datetime.utcnow()
|
||||
start = end - timedelta(minutes=last_minutes)
|
||||
result = logbook_update_takeoff_landings(start=start, end=end)
|
||||
return result
|
||||
|
||||
|
||||
@celery.task(name="update_logbook")
|
||||
def update_logbook(offset_days=None):
|
||||
"""Add/update logbook entries."""
|
||||
|
||||
result = logbook_update(offset_days=offset_days)
|
||||
return result
|
||||
|
||||
|
||||
@celery.task(name="update_logbook_max_altitude")
|
||||
def update_logbook_max_altitude(offset_days=0):
|
||||
"""Add max altitudes in logbook when flight is complete (takeoff and landing)."""
|
||||
|
||||
result = logbook_update_max_altitudes(offset_days=offset_days)
|
||||
return result
|
||||
|
||||
|
||||
@celery.task(name="import_ddb")
|
||||
def import_ddb():
|
||||
"""Import registered devices from the DDB."""
|
||||
|
||||
result = device_infos_import_ddb()
|
||||
return result
|
|
@ -0,0 +1,126 @@
|
|||
from datetime import datetime, timedelta
|
||||
|
||||
from app import db, celery
|
||||
|
||||
|
||||
@celery.task(name="update_statistics")
|
||||
def update_statistics(date_str=None):
|
||||
""" Update relation_statistics, sender_statistics, receiver_statistics (all depend on coverage_statistics)."""
|
||||
|
||||
if date_str is None:
|
||||
date_str = datetime.utcnow().strftime("%Y-%m-%d")
|
||||
|
||||
# Update relation statistics
|
||||
db.session.execute(f"""
|
||||
DELETE FROM relation_statistics
|
||||
WHERE date = '{date_str}';
|
||||
|
||||
INSERT INTO relation_statistics AS rs (date, sender_id, receiver_id, is_trustworthy, max_distance, max_normalized_quality, messages_count, coverages_count)
|
||||
SELECT
|
||||
tmp.date,
|
||||
tmp.sender_id,
|
||||
tmp.receiver_id,
|
||||
|
||||
is_trustworthy,
|
||||
|
||||
MAX(tmp.max_distance) AS max_distance,
|
||||
MAX(tmp.max_normalized_quality) AS max_normalized_quality,
|
||||
SUM(tmp.messages_count) AS messages_count,
|
||||
COUNT(DISTINCT tmp.location_mgrs_short) AS coverages_count
|
||||
FROM coverage_statistics AS tmp
|
||||
WHERE tmp.date = '{date_str}'
|
||||
GROUP BY date, sender_id, receiver_id, is_trustworthy;
|
||||
""")
|
||||
|
||||
# Update sender statistics
|
||||
db.session.execute(f"""
|
||||
DELETE FROM sender_statistics
|
||||
WHERE date = '{date_str}';
|
||||
|
||||
INSERT INTO sender_statistics AS rs (date, sender_id, is_trustworthy, max_distance, max_normalized_quality, messages_count, coverages_count, receivers_count)
|
||||
SELECT
|
||||
tmp.date,
|
||||
tmp.sender_id,
|
||||
|
||||
is_trustworthy,
|
||||
|
||||
MAX(tmp.max_distance) AS max_distance,
|
||||
MAX(tmp.max_normalized_quality) AS max_normalized_quality,
|
||||
SUM(tmp.messages_count) AS messages_count,
|
||||
COUNT(DISTINCT tmp.location_mgrs_short) AS coverages_count,
|
||||
COUNT(DISTINCT tmp.receiver_id) AS receivers_count
|
||||
FROM coverage_statistics AS tmp
|
||||
WHERE tmp.date = '{date_str}'
|
||||
GROUP BY date, sender_id, is_trustworthy;
|
||||
""")
|
||||
|
||||
# Update receiver statistics
|
||||
db.session.execute(f"""
|
||||
DELETE FROM receiver_statistics
|
||||
WHERE date = '{date_str}';
|
||||
|
||||
INSERT INTO receiver_statistics AS rs (date, receiver_id, is_trustworthy, max_distance, max_normalized_quality, messages_count, coverages_count, senders_count)
|
||||
SELECT
|
||||
tmp.date,
|
||||
tmp.receiver_id,
|
||||
|
||||
is_trustworthy,
|
||||
|
||||
MAX(tmp.max_distance) AS max_distance,
|
||||
MAX(tmp.max_normalized_quality) AS max_normalized_quality,
|
||||
SUM(tmp.messages_count) AS messages_count,
|
||||
COUNT(DISTINCT tmp.location_mgrs_short) AS coverages_count,
|
||||
COUNT(DISTINCT tmp.sender_id) AS senders_count
|
||||
FROM coverage_statistics AS tmp
|
||||
WHERE tmp.date = '{date_str}'
|
||||
GROUP BY date, receiver_id, is_trustworthy;
|
||||
""")
|
||||
|
||||
db.session.commit()
|
||||
|
||||
|
||||
@celery.task(name="update_sender_direction_statistics")
|
||||
def update_sender_direction_statistics():
|
||||
""" Update sender_direction_statistics."""
|
||||
|
||||
db.session.execute("""
|
||||
DELETE FROM sender_direction_statistics;
|
||||
|
||||
INSERT INTO sender_direction_statistics(sender_id, receiver_id, directions_count, messages_count, direction_data)
|
||||
SELECT
|
||||
sq2.sender_id,
|
||||
sq2.receiver_id,
|
||||
COUNT(sq2.*) AS directions_count,
|
||||
SUM(sq2.messages_count) AS messages_count,
|
||||
json_agg(json_build_object('direction', direction, 'messages_count', messages_count, 'max_range', max_range)) AS direction_data
|
||||
FROM (
|
||||
SELECT
|
||||
sq.sender_id,
|
||||
sq.receiver_id,
|
||||
sq.direction,
|
||||
COUNT(sq.*) AS messages_count,
|
||||
MAX(sq.max_range) AS max_range
|
||||
FROM (
|
||||
SELECT
|
||||
s.id AS sender_id,
|
||||
r.id AS receiver_id,
|
||||
10000 * 10^(sp.normalized_quality/20.0) AS max_range,
|
||||
CASE
|
||||
WHEN sp.bearing-sp.track < 0
|
||||
THEN CAST((sp.bearing-sp.track+360)/10 AS INTEGER)*10
|
||||
ELSE CAST((sp.bearing-sp.track)/10 AS INTEGER)*10
|
||||
END AS direction
|
||||
FROM sender_positions AS sp
|
||||
INNER JOIN senders s ON sp.name = s.name
|
||||
INNER JOIN receivers r ON sp.receiver_name = r.name
|
||||
WHERE
|
||||
sp.track IS NOT NULL AND sp.bearing IS NOT NULL AND sp.normalized_quality IS NOT NULL
|
||||
AND sp.agl >= 200
|
||||
AND turn_rate BETWEEN -10.0 AND 10.0
|
||||
AND climb_rate BETWEEN -3.0 AND 3.0
|
||||
) AS sq
|
||||
GROUP BY sq.sender_id, sq.receiver_id, sq.direction
|
||||
ORDER BY sq.sender_id, sq.receiver_id, sq.direction
|
||||
) AS sq2
|
||||
GROUP BY sq2.sender_id, sq2.receiver_id;
|
||||
""")
|
|
@ -33,7 +33,7 @@
|
|||
<td>{{ loop.index }}
|
||||
<td><img src="{{ url_for('static', filename='img/Transparent.gif') }}" class="flag flag-{{ sel_country|lower }}" alt="{{ sel_country }}"/></td>
|
||||
<td>{{ airport|to_html_link|safe }}</td>
|
||||
<td><a href="{{ url_for('main.logbook', country=sel_country, airport_id=airport.id) }}">Logbook</a></td>
|
||||
<td><a href="{{ url_for('main.logbooks', country=sel_country, airport_id=airport.id) }}">Logbook</a></td>
|
||||
</tr>
|
||||
{% endfor %}
|
||||
</table>
|
||||
|
|
|
@ -74,7 +74,7 @@
|
|||
{% for entry in sender.logbook_entries %}
|
||||
<tr>
|
||||
<td>{{ loop.index }}</td>
|
||||
<td>{% if ns.mydate != entry.reference.strftime('%Y-%m-%d') %}{% set ns.mydate = entry.reference.strftime('%Y-%m-%d') %}{{ ns.mydate }}{% endif %}</td>
|
||||
<td>{% if ns.mydate != entry.reference_timestamp.strftime('%Y-%m-%d') %}{% set ns.mydate = entry.reference_timestamp.strftime('%Y-%m-%d') %}{{ ns.mydate }}{% endif %}</td>
|
||||
<td>{% if entry.takeoff_airport is not none %}<a href="{{ url_for('main.airport_detail', airport_id=entry.takeoff_airport.id) }}">{{ entry.takeoff_airport.name }}</a>{% endif %}</td>
|
||||
<td>{% if entry.landing_airport is not none %}<a href="{{ url_for('main.airport_detail', airport_id=entry.landing_airport.id) }}">{{ entry.landing_airport.name }}</a>{% endif %}</td>
|
||||
<td>{% if entry.takeoff_timestamp is not none %} {{ entry.takeoff_timestamp.strftime('%H:%M') }} {% endif %}</td>
|
||||
|
|
|
@ -0,0 +1,6 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
from app import init_celery
|
||||
|
||||
app = init_celery()
|
||||
app.conf.imports = app.conf.imports + ("app.tasks",)
|
|
@ -1,7 +0,0 @@
|
|||
#!/usr/bin/env python
|
||||
import os
|
||||
from app import celery, create_app
|
||||
from app.collect.celery import *
|
||||
|
||||
app = create_app(os.getenv('FLASK_CONFIG') or 'default')
|
||||
app.app_context().push()
|
2
setup.py
2
setup.py
|
@ -42,7 +42,7 @@ setup(
|
|||
'Flask-WTF==0.14.3',
|
||||
'Flask-Caching==1.9.0',
|
||||
'geopy==2.0.0',
|
||||
'celery==5.0.2',
|
||||
'celery==4.4.7',
|
||||
'Flask-Redis==0.4.0',
|
||||
'redis==3.5.3',
|
||||
'aerofiles==1.0.0',
|
||||
|
|
|
@ -32,13 +32,13 @@ class TestLogbook(TestBaseDB):
|
|||
self.takeoff_ohlstadt_dd4711 = TakeoffLanding(is_takeoff=True, timestamp="2016-06-01 10:00:00", airport_id=self.ohlstadt.id, sender_id=self.dd4711.id)
|
||||
|
||||
def get_logbook_entries(self):
|
||||
return db.session.query(Logbook).order_by(Logbook.takeoff_airport_id, Logbook.reference).all()
|
||||
return db.session.query(Logbook).order_by(Logbook.takeoff_airport_id, Logbook.reference_timestamp).all()
|
||||
|
||||
def test_single_takeoff(self):
|
||||
db.session.add(self.takeoff_koenigsdorf_dd0815)
|
||||
db.session.commit()
|
||||
|
||||
update_logbook(date=datetime.date(2016, 6, 1))
|
||||
update_logbook()
|
||||
entries = self.get_logbook_entries()
|
||||
self.assertEqual(len(entries), 1)
|
||||
self.assertEqual(entries[0].takeoff_airport_id, self.koenigsdorf.id)
|
||||
|
@ -48,7 +48,7 @@ class TestLogbook(TestBaseDB):
|
|||
db.session.add(self.landing_koenigsdorf_dd0815)
|
||||
db.session.commit()
|
||||
|
||||
update_logbook(date=datetime.date(2016, 6, 1))
|
||||
update_logbook()
|
||||
entries = self.get_logbook_entries()
|
||||
self.assertEqual(len(entries), 1)
|
||||
self.assertEqual(entries[0].takeoff_airport_id, None)
|
||||
|
@ -59,7 +59,7 @@ class TestLogbook(TestBaseDB):
|
|||
db.session.add(self.takeoff_ohlstadt_dd4711)
|
||||
db.session.commit()
|
||||
|
||||
update_logbook(date=datetime.date(2016, 6, 1))
|
||||
update_logbook()
|
||||
entries = self.get_logbook_entries()
|
||||
self.assertEqual(len(entries), 2)
|
||||
self.assertEqual(entries[0].takeoff_airport_id, self.koenigsdorf.id)
|
||||
|
@ -70,19 +70,19 @@ class TestLogbook(TestBaseDB):
|
|||
db.session.add(self.landing_koenigsdorf_dd0815)
|
||||
db.session.commit()
|
||||
|
||||
update_logbook(date=datetime.date(2016, 6, 1))
|
||||
update_logbook()
|
||||
entries = self.get_logbook_entries()
|
||||
self.assertEqual(len(entries), 1)
|
||||
self.assertEqual(entries[0].takeoff_airport_id, self.koenigsdorf.id)
|
||||
self.assertEqual(entries[0].landing_airport_id, self.koenigsdorf.id)
|
||||
|
||||
|
||||
def test_takeoff_and_landing_on_different_days(self):
|
||||
db.session.add(self.takeoff_koenigsdorf_dd0815)
|
||||
db.session.add(self.landing_koenigsdorf_dd0815_later)
|
||||
db.session.commit()
|
||||
|
||||
update_logbook(date=datetime.date(2016, 6, 1))
|
||||
update_logbook(date=datetime.date(2016, 6, 2))
|
||||
update_logbook()
|
||||
entries = self.get_logbook_entries()
|
||||
self.assertEqual(len(entries), 2)
|
||||
self.assertEqual(entries[0].takeoff_airport_id, self.koenigsdorf.id)
|
||||
|
@ -94,7 +94,7 @@ class TestLogbook(TestBaseDB):
|
|||
db.session.add(self.takeoff_koenigsdorf_dd0815)
|
||||
db.session.commit()
|
||||
|
||||
update_logbook(date=datetime.date(2016, 6, 1))
|
||||
update_logbook(0)
|
||||
entries = self.get_logbook_entries()
|
||||
self.assertEqual(len(entries), 1)
|
||||
self.assertEqual(entries[0].takeoff_airport_id, self.koenigsdorf.id)
|
||||
|
@ -102,7 +102,7 @@ class TestLogbook(TestBaseDB):
|
|||
db.session.add(self.landing_koenigsdorf_dd0815)
|
||||
db.session.commit()
|
||||
|
||||
update_logbook(date=datetime.date(2016, 6, 1))
|
||||
update_logbook()
|
||||
entries = self.get_logbook_entries()
|
||||
self.assertEqual(len(entries), 1)
|
||||
self.assertEqual(entries[0].takeoff_airport_id, self.koenigsdorf.id)
|
||||
|
@ -111,7 +111,7 @@ class TestLogbook(TestBaseDB):
|
|||
db.session.add(self.takeoff_ohlstadt_dd4711)
|
||||
db.session.commit()
|
||||
|
||||
update_logbook(date=datetime.date(2016, 6, 1))
|
||||
update_logbook(0)
|
||||
entries = self.get_logbook_entries()
|
||||
self.assertEqual(len(entries), 2)
|
||||
self.assertEqual(entries[0].takeoff_airport_id, self.koenigsdorf.id)
|
||||
|
@ -121,22 +121,22 @@ class TestLogbook(TestBaseDB):
|
|||
db.session.add(self.landing_koenigsdorf_dd0815)
|
||||
db.session.commit()
|
||||
|
||||
update_logbook(date=datetime.date(2016, 6, 1))
|
||||
update_logbook()
|
||||
entries = self.get_logbook_entries()
|
||||
self.assertEqual(len(entries), 1)
|
||||
self.assertEqual(entries[0].takeoff_airport_id, None)
|
||||
self.assertEqual(entries[0].landing_airport_id, self.koenigsdorf.id)
|
||||
self.assertEqual(entries[0].reftime, self.landing_koenigsdorf_dd0815.timestamp)
|
||||
self.assertEqual(entries[0].reference_timestamp, self.landing_koenigsdorf_dd0815.timestamp)
|
||||
|
||||
db.session.add(self.takeoff_koenigsdorf_dd0815)
|
||||
db.session.commit()
|
||||
|
||||
update_logbook(date=datetime.date(2016, 6, 1))
|
||||
update_logbook()
|
||||
entries = self.get_logbook_entries()
|
||||
self.assertEqual(len(entries), 1)
|
||||
self.assertEqual(entries[0].takeoff_airport_id, self.koenigsdorf.id)
|
||||
self.assertEqual(entries[0].landing_airport_id, self.koenigsdorf.id)
|
||||
self.assertEqual(entries[0].reftime, self.takeoff_koenigsdorf_dd0815.timestamp)
|
||||
self.assertEqual(entries[0].reference_timestamp, self.takeoff_koenigsdorf_dd0815.timestamp)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
Ładowanie…
Reference in New Issue