Add email notification on detection of an encrypted radisonde

pull/774/head
Mark Jessop 2023-05-18 18:15:47 +09:30
rodzic 6b2ce1da94
commit 139378b864
6 zmienionych plików z 91 dodań i 11 usunięć

Wyświetl plik

@ -897,6 +897,7 @@ def main():
),
launch_notifications=config["email_launch_notifications"],
landing_notifications=config["email_landing_notifications"],
encrypted_sonde_notifications=config["email_encrypted_sonde_notifications"],
landing_range_threshold=config["email_landing_range_threshold"],
landing_altitude_threshold=config["email_landing_altitude_threshold"],
)

Wyświetl plik

@ -748,6 +748,17 @@ def read_auto_rx_config(filename, no_sdr_test=False):
"Config - Did not find system / debug logging options, using defaults (disabled, unless set as a command-line option.)"
)
# 1.6.2 - Encrypted Sonde Email Notifications
try:
auto_rx_config["email_encrypted_sonde_notifications"] = config.getboolean(
"email", "encrypted_sonde_notifications"
)
except:
logging.warning(
"Config - Did not find encrypted_sonde_notifications setting (new in v1.6.2), using default (True)"
)
auto_rx_config["email_encrypted_sonde_notifications"] = True
# If we are being called as part of a unit test, just return the config now.
if no_sdr_test:

Wyświetl plik

@ -23,6 +23,7 @@ from .gps import get_ephemeris, get_almanac
from .sonde_specific import fix_datetime, imet_unique_id
from .fsk_demod import FSKDemodStats
from .sdr_wrappers import test_sdr, get_sdr_iq_cmd, get_sdr_fm_cmd, get_sdr_name
from .email_notification import EmailNotification
# Global valid sonde types list.
VALID_SONDE_TYPES = [
@ -1424,6 +1425,20 @@ class SondeDecoder(object):
"Radiosonde %s has encrypted telemetry (Possible encrypted RS41-SGM)! We cannot decode this, closing decoder."
% _telemetry["id"]
)
# Overwrite the datetime field to make the email notifier happy
_telemetry['datetime_dt'] = datetime.datetime.utcnow()
_telemetry["freq"] = "%.3f MHz" % (self.sonde_freq / 1e6)
# Send this to only the Email Notifier, if it exists.
for _exporter in self.exporters:
try:
if _exporter.__self__.__module__ == EmailNotification.__module__:
_exporter(_telemetry)
except Exception as e:
self.log_error("Exporter Error %s" % str(e))
# Close the decoder.
self.exit_state = "Encrypted"
self.decoder_running = False
return False

Wyświetl plik

@ -43,6 +43,7 @@ class EmailNotification(object):
station_position=None,
launch_notifications=True,
landing_notifications=True,
encrypted_sonde_notifications=True,
landing_range_threshold=50,
landing_altitude_threshold=1000,
landing_descent_trip=10,
@ -60,6 +61,7 @@ class EmailNotification(object):
self.station_position = station_position
self.launch_notifications = launch_notifications
self.landing_notifications = landing_notifications
self.encrypted_sonde_notifications = encrypted_sonde_notifications
self.landing_range_threshold = landing_range_threshold
self.landing_altitude_threshold = landing_altitude_threshold
self.landing_descent_trip = landing_descent_trip
@ -133,18 +135,44 @@ class EmailNotification(object):
}
)
if self.launch_notifications:
if "encrypted" in telemetry:
if telemetry["encrypted"] and self.encrypted_sonde_notifications:
try:
# This is a new Encrypted Radiosonde, send an email.
msg = "Encrypted Radiosonde Detected:\n"
msg += "\n"
if "subtype" in telemetry:
telemetry["type"] = telemetry["subtype"]
msg += "Serial: %s\n" % _id
msg += "Type: %s\n" % telemetry["type"]
msg += "Frequency: %s\n" % telemetry["freq"]
msg += "Time Detected: %sZ\n" % telemetry["datetime_dt"].isoformat()
# Construct subject
_subject = self.mail_subject
_subject = _subject.replace("<id>", telemetry["id"])
_subject = _subject.replace("<type>", telemetry["type"])
_subject = _subject.replace("<freq>", telemetry["freq"])
if "encrypted" in telemetry:
if telemetry["encrypted"] == True:
_subject += " - ENCRYPTED SONDE"
self.send_notification_email(subject=_subject, message=msg)
except Exception as e:
self.log_error("Error sending E-mail - %s" % str(e))
elif self.launch_notifications:
try:
# This is a new sonde. Send the email.
msg = "Sonde launch detected:\n"
msg += "\n"
if "encrypted" in telemetry:
if telemetry["encrypted"] == True:
msg += "ENCRYPTED RADIOSONDE DETECTED!\n"
msg += "Callsign: %s\n" % _id
msg += "Serial: %s\n" % _id
msg += "Type: %s\n" % telemetry["type"]
msg += "Frequency: %s\n" % telemetry["freq"]
msg += "Position: %.5f,%.5f\n" % (
@ -175,10 +203,6 @@ class EmailNotification(object):
_subject = _subject.replace("<type>", telemetry["type"])
_subject = _subject.replace("<freq>", telemetry["freq"])
if "encrypted" in telemetry:
if telemetry["encrypted"] == True:
_subject += " - ENCRYPTED SONDE"
self.send_notification_email(subject=_subject, message=msg)
except Exception as e:
@ -237,7 +261,7 @@ class EmailNotification(object):
msg = "Nearby sonde landing detected:\n\n"
msg += "Callsign: %s\n" % _id
msg += "Serial: %s\n" % _id
msg += "Type: %s\n" % telemetry["type"]
msg += "Frequency: %s\n" % telemetry["freq"]
msg += "Position: %.5f,%.5f\n" % (
@ -433,6 +457,29 @@ if __name__ == "__main__":
}
)
time.sleep(10)
print("Testing encrypted sonde alert.")
_email_notification.add(
{
"id": "R1234557",
"frame": 10,
"lat": 0.0,
"lon": 0.0,
"alt": 0,
"temp": 1.0,
"type": "RS41",
"subtype": "RS41-SGM",
"freq": "401.520 MHz",
"freq_float": 401.52,
"heading": 0.0,
"vel_h": 5.1,
"vel_v": -5.0,
"datetime_dt": datetime.datetime.utcnow(),
"encrypted": True
}
)
# Wait a little bit before shutting down.
time.sleep(5)

Wyświetl plik

@ -362,6 +362,9 @@ launch_notifications = True
# Send e-mails when a radiosonde is detected descending near your station location
landing_notifications = True
# Send e-mails when an encrypted radiosonde is detected.
encrypted_sonde_notifications = True
# Range threshold for Landing notifications (km from your station location)
landing_range_threshold = 30

Wyświetl plik

@ -363,6 +363,9 @@ launch_notifications = True
# Send e-mails when a radiosonde is detected descending near your station location
landing_notifications = True
# Send e-mails when an encrypted radiosonde is detected.
encrypted_sonde_notifications = True
# Range threshold for Landing notifications (km from your station location)
landing_range_threshold = 30