From 69ccd9f00041da32b5f9855e8b932ce74fb40909 Mon Sep 17 00:00:00 2001 From: Bertrik Sikken Date: Thu, 16 Sep 2021 00:48:51 +0200 Subject: [PATCH] Actually enable Helium as a source for LoRaWAN messages --- .../java/nl/sikken/bertrik/TtnHabBridge.java | 32 +++++++++----- .../bertrik/hab/habitat/HabReceiver.java | 10 +++++ .../bertrik/hab/habitat/HabitatUploader.java | 3 -- .../habitat/docs/ListenerInformationDoc.java | 2 +- .../hab/lorawan/HeliumUplinkMessage.java | 12 +++--- .../hab/lorawan/LoraWanUplinkMessage.java | 13 +++++- .../bertrik/hab/lorawan/MqttListener.java | 43 ++++++++----------- .../hab/lorawan/Ttnv3UplinkMessage.java | 9 ++-- .../bertrik/hab/PayloadDecoderTest.java | 4 +- .../hab/lorawan/HeliumUplinkMessageTest.java | 2 +- .../hab/lorawan/Ttnv3UplinkMessageTest.java | 2 +- 11 files changed, 80 insertions(+), 52 deletions(-) diff --git a/ttnhabbridge/src/main/java/nl/sikken/bertrik/TtnHabBridge.java b/ttnhabbridge/src/main/java/nl/sikken/bertrik/TtnHabBridge.java index 5957265..f74ba61 100644 --- a/ttnhabbridge/src/main/java/nl/sikken/bertrik/TtnHabBridge.java +++ b/ttnhabbridge/src/main/java/nl/sikken/bertrik/TtnHabBridge.java @@ -24,9 +24,11 @@ import nl.sikken.bertrik.hab.Sentence; import nl.sikken.bertrik.hab.habitat.HabReceiver; import nl.sikken.bertrik.hab.habitat.HabitatUploader; import nl.sikken.bertrik.hab.habitat.Location; +import nl.sikken.bertrik.hab.lorawan.HeliumUplinkMessage; import nl.sikken.bertrik.hab.lorawan.LoraWanUplinkMessage; import nl.sikken.bertrik.hab.lorawan.LoraWanUplinkMessage.GatewayInfo; import nl.sikken.bertrik.hab.lorawan.MqttListener; +import nl.sikken.bertrik.hab.lorawan.Ttnv3UplinkMessage; /** * Bridge between the-things-network and the habhub network. @@ -37,7 +39,7 @@ public final class TtnHabBridge { private static final Logger LOG = LoggerFactory.getLogger(TtnHabBridge.class); private static final String CONFIG_FILE = "ttnhabbridge.yaml"; - private final MqttListener ttnListener; + private final List mqttListeners = new ArrayList<>(); private final HabitatUploader habUploader; private final PayloadDecoder decoder; private final ExpiringCache gwCache; @@ -72,14 +74,21 @@ public final class TtnHabBridge { return config; } } - + /** * Constructor. * * @param config the application configuration */ private TtnHabBridge(TtnHabBridgeConfig config) { - this.ttnListener = new MqttListener(this::handleTTNMessage, config.getTtnConfig()); + if (!config.getTtnConfig().getUrl().isEmpty()) { + this.mqttListeners + .add(new MqttListener(this::handleMessage, config.getTtnConfig(), Ttnv3UplinkMessage.class)); + } + if (!config.getHeliumConfig().getUrl().isEmpty()) { + this.mqttListeners + .add(new MqttListener(this::handleMessage, config.getHeliumConfig(), HeliumUplinkMessage.class)); + } this.habUploader = HabitatUploader.create(config.getHabitatConfig()); this.decoder = new PayloadDecoder(EPayloadEncoding.parse(config.getPayloadEncoding())); this.gwCache = new ExpiringCache(Duration.ofSeconds(config.getGwCacheExpirationTime())); @@ -95,18 +104,17 @@ public final class TtnHabBridge { // start sub-modules habUploader.start(); - ttnListener.start(); + for (MqttListener listener : mqttListeners) { + listener.start(); + } LOG.info("Started TTN-HAB bridge application"); } /** - * Handles an incoming TTN message - * - * @param textMessage the message contents - * @param now message arrival time + * Handles an incoming LoRaWAN message */ - private void handleTTNMessage(LoraWanUplinkMessage message) { + private void handleMessage(LoraWanUplinkMessage message) { Instant now = Instant.now(); try { Sentence sentence = decoder.decode(message); @@ -117,7 +125,7 @@ public final class TtnHabBridge { for (GatewayInfo gw : message.getGateways()) { String gwName = gw.getId(); Location gwLocation = gw.getLocation(); - HabReceiver receiver = new HabReceiver(gwName, gwLocation); + HabReceiver receiver = new HabReceiver(gwName, gwLocation, message.getNetwork()); receivers.add(receiver); // send listener data only if it has a valid location and hasn't been sent @@ -144,7 +152,9 @@ public final class TtnHabBridge { */ private void stop() { LOG.info("Stopping TTN HAB bridge application"); - ttnListener.stop(); + for (MqttListener listener : mqttListeners) { + listener.stop(); + } habUploader.stop(); LOG.info("Stopped TTN HAB bridge application"); } diff --git a/ttnhabbridge/src/main/java/nl/sikken/bertrik/hab/habitat/HabReceiver.java b/ttnhabbridge/src/main/java/nl/sikken/bertrik/hab/habitat/HabReceiver.java index e5349ef..93acea1 100644 --- a/ttnhabbridge/src/main/java/nl/sikken/bertrik/hab/habitat/HabReceiver.java +++ b/ttnhabbridge/src/main/java/nl/sikken/bertrik/hab/habitat/HabReceiver.java @@ -7,6 +7,7 @@ public final class HabReceiver { private final String callSign; private final Location location; + private final String network; /** * Constructor. @@ -15,8 +16,13 @@ public final class HabReceiver { * @param location the location */ public HabReceiver(String callSign, Location location) { + this(callSign, location, "LoRaWAN"); + } + + public HabReceiver(String callSign, Location location, String network) { this.callSign = callSign; this.location = location; + this.network = network; } public String getCallsign() { @@ -26,6 +32,10 @@ public final class HabReceiver { public Location getLocation() { return location; } + + public String getNetwork() { + return network; + } @Override public String toString() { diff --git a/ttnhabbridge/src/main/java/nl/sikken/bertrik/hab/habitat/HabitatUploader.java b/ttnhabbridge/src/main/java/nl/sikken/bertrik/hab/habitat/HabitatUploader.java index c408bbd..2ee874e 100644 --- a/ttnhabbridge/src/main/java/nl/sikken/bertrik/hab/habitat/HabitatUploader.java +++ b/ttnhabbridge/src/main/java/nl/sikken/bertrik/hab/habitat/HabitatUploader.java @@ -79,8 +79,6 @@ public final class HabitatUploader { */ public void start() { LOG.info("Starting habitat uploader"); - - LOG.info("Started habitat uploader"); } /** @@ -89,7 +87,6 @@ public final class HabitatUploader { public void stop() { LOG.info("Stopping habitat uploader"); executor.shutdown(); - LOG.info("Stopped habitat uploader"); } /** diff --git a/ttnhabbridge/src/main/java/nl/sikken/bertrik/hab/habitat/docs/ListenerInformationDoc.java b/ttnhabbridge/src/main/java/nl/sikken/bertrik/hab/habitat/docs/ListenerInformationDoc.java index 945f48a..c0b3532 100644 --- a/ttnhabbridge/src/main/java/nl/sikken/bertrik/hab/habitat/docs/ListenerInformationDoc.java +++ b/ttnhabbridge/src/main/java/nl/sikken/bertrik/hab/habitat/docs/ListenerInformationDoc.java @@ -35,7 +35,7 @@ public final class ListenerInformationDoc extends ListenerDoc { protected JsonNode createDataNode() { ObjectNode node = factory().objectNode(); node.set("callsign", factory().textNode(receiver.getCallsign())); - node.set("radio", factory().textNode("TheThingsNetwork")); + node.set("radio", factory().textNode(receiver.getNetwork())); double altitude = receiver.getLocation().getAlt(); if (Double.isFinite(altitude)) { String antenna = String.format(Locale.ROOT, "%.0f m", altitude); diff --git a/ttnhabbridge/src/main/java/nl/sikken/bertrik/hab/lorawan/HeliumUplinkMessage.java b/ttnhabbridge/src/main/java/nl/sikken/bertrik/hab/lorawan/HeliumUplinkMessage.java index f617273..37226bf 100644 --- a/ttnhabbridge/src/main/java/nl/sikken/bertrik/hab/lorawan/HeliumUplinkMessage.java +++ b/ttnhabbridge/src/main/java/nl/sikken/bertrik/hab/lorawan/HeliumUplinkMessage.java @@ -7,8 +7,10 @@ import java.util.List; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; +import nl.sikken.bertrik.hab.lorawan.LoraWanUplinkMessage.ILoraWanUplink; + @JsonIgnoreProperties(ignoreUnknown = true) -public final class HeliumUplinkMessage { +public final class HeliumUplinkMessage implements ILoraWanUplink { @JsonProperty("app_eui") String appEui = ""; @@ -50,10 +52,10 @@ public final class HeliumUplinkMessage { double snr; } - // convert this message to the common LoRaWAN uplink message - public LoraWanUplinkMessage toUplinkMessage() { - LoraWanUplinkMessage uplink = new LoraWanUplinkMessage(Instant.ofEpochMilli(reportedAt), appEui, name, fcnt, - port, payload); + @Override + public LoraWanUplinkMessage toLoraWanUplinkMessage() { + LoraWanUplinkMessage uplink = new LoraWanUplinkMessage("Helium", Instant.ofEpochMilli(reportedAt), appEui, name, + fcnt, port, payload); for (HotSpot hotSpot : hotSpots) { uplink.addGateway(hotSpot.name.trim(), hotSpot.latitude, hotSpot.longitude, Double.NaN); } diff --git a/ttnhabbridge/src/main/java/nl/sikken/bertrik/hab/lorawan/LoraWanUplinkMessage.java b/ttnhabbridge/src/main/java/nl/sikken/bertrik/hab/lorawan/LoraWanUplinkMessage.java index e7b6fc5..8dbb5f5 100644 --- a/ttnhabbridge/src/main/java/nl/sikken/bertrik/hab/lorawan/LoraWanUplinkMessage.java +++ b/ttnhabbridge/src/main/java/nl/sikken/bertrik/hab/lorawan/LoraWanUplinkMessage.java @@ -15,6 +15,7 @@ import nl.sikken.bertrik.hab.habitat.Location; */ public final class LoraWanUplinkMessage { + private final String network; private final Instant time; private final String appId; private final String deviceId; @@ -24,7 +25,8 @@ public final class LoraWanUplinkMessage { private final byte[] payloadRaw; private final List gateways = new ArrayList<>(); - public LoraWanUplinkMessage(Instant time, String appId, String deviceId, int fcnt, int port, byte[] payloadRaw) { + public LoraWanUplinkMessage(String network, Instant time, String appId, String deviceId, int fcnt, int port, byte[] payloadRaw) { + this.network = network; this.time = Instant.from(time); this.appId = appId; this.deviceId = deviceId; @@ -37,6 +39,10 @@ public final class LoraWanUplinkMessage { payloadFields.put(name, value); } + public String getNetwork() { + return network; + } + public Instant getTime() { return Instant.from(time); } @@ -93,4 +99,9 @@ public final class LoraWanUplinkMessage { } + // interface for messages that can convert themselves to a LoraWanUplinkMessage + public interface ILoraWanUplink { + LoraWanUplinkMessage toLoraWanUplinkMessage(); + } + } diff --git a/ttnhabbridge/src/main/java/nl/sikken/bertrik/hab/lorawan/MqttListener.java b/ttnhabbridge/src/main/java/nl/sikken/bertrik/hab/lorawan/MqttListener.java index e691f11..8a4f00d 100644 --- a/ttnhabbridge/src/main/java/nl/sikken/bertrik/hab/lorawan/MqttListener.java +++ b/ttnhabbridge/src/main/java/nl/sikken/bertrik/hab/lorawan/MqttListener.java @@ -15,6 +15,9 @@ import org.slf4j.LoggerFactory; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import nl.sikken.bertrik.hab.lorawan.LoraWanUplinkMessage.ILoraWanUplink; + /** * Listener process for receiving data from an MQTT server. */ @@ -24,44 +27,44 @@ public final class MqttListener { private static final long DISCONNECT_TIMEOUT_MS = 3000; private final IMessageReceived callback; + private final Class clazz; + private final MqttClient mqttClient; private final MqttConnectOptions options; private final ObjectMapper objectMapper = new ObjectMapper(); - public MqttListener(IMessageReceived callback, MqttConfig config) { - this(callback, config.getUrl(), config.getUser(), config.getPass(), config.getTopic()); - } - /** * Constructor. * * @param callback the listener for a received message. - * @param url the URL of the MQTT server - * @param appId the user name - * @param appKey the password - * @param topic the MQTT topic + * @param config the MQTT configuration + * @param clazz the JSON class sent over MQTT */ - public MqttListener(IMessageReceived callback, String url, String appId, String appKey, String topic) { - LOG.info("Creating client for MQTT server '{}' for app '{}'", url, appId); + @SuppressFBWarnings({"EI_EXPOSE_REP2"}) + public MqttListener(IMessageReceived callback, MqttConfig config, Class clazz) { + LOG.info("Creating client for MQTT server '{}' for app '{}'", config.getUrl(), config.getUser()); try { - this.mqttClient = new MqttClient(url, MqttClient.generateClientId(), new MemoryPersistence()); + this.mqttClient = new MqttClient(config.getUrl(), MqttClient.generateClientId(), new MemoryPersistence()); } catch (MqttException e) { throw new IllegalArgumentException(e); } this.callback = callback; - mqttClient.setCallback(new MqttCallbackHandler(mqttClient, topic, this::handleMessage)); + this.clazz = clazz; + + mqttClient.setCallback(new MqttCallbackHandler(mqttClient, config.getTopic(), this::handleMessage)); // create connect options options = new MqttConnectOptions(); - options.setUserName(appId); - options.setPassword(appKey.toCharArray()); + options.setUserName(config.getUser()); + options.setPassword(config.getPass().toCharArray()); options.setAutomaticReconnect(true); } // notify our caller in a thread safe manner private void handleMessage(String topic, String payload) { try { - LoraWanUplinkMessage uplinkMessage = convertMessage(topic, payload); + ILoraWanUplink uplink = objectMapper.readValue(payload, clazz); + LoraWanUplinkMessage uplinkMessage = uplink.toLoraWanUplinkMessage(); callback.messageReceived(uplinkMessage); } catch (JsonProcessingException e) { LOG.warn("Caught {}", e.getMessage()); @@ -71,21 +74,13 @@ public final class MqttListener { } } - // package private for testing - LoraWanUplinkMessage convertMessage(String topic, String payload) throws JsonProcessingException { - Ttnv3UplinkMessage v3message = objectMapper.readValue(payload, Ttnv3UplinkMessage.class); - return v3message.toUplinkMessage(); - } - /** * Starts this module. * * @throws MqttException in case something went wrong with MQTT */ public void start() throws MqttException { - LOG.info("Starting MQTT listener"); - - LOG.info("Connecting to MQTT server"); + LOG.info("Starting MQTT listener {}", mqttClient.getServerURI()); mqttClient.connect(options); } diff --git a/ttnhabbridge/src/main/java/nl/sikken/bertrik/hab/lorawan/Ttnv3UplinkMessage.java b/ttnhabbridge/src/main/java/nl/sikken/bertrik/hab/lorawan/Ttnv3UplinkMessage.java index c9c917a..7ea85ff 100644 --- a/ttnhabbridge/src/main/java/nl/sikken/bertrik/hab/lorawan/Ttnv3UplinkMessage.java +++ b/ttnhabbridge/src/main/java/nl/sikken/bertrik/hab/lorawan/Ttnv3UplinkMessage.java @@ -7,6 +7,8 @@ import java.util.List; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; +import nl.sikken.bertrik.hab.lorawan.LoraWanUplinkMessage.ILoraWanUplink; + /** * Representation of the TTNv3 uplink message.
*
@@ -14,7 +16,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; * All sub-structures are contained in this file too. */ @JsonIgnoreProperties(ignoreUnknown = true) -public final class Ttnv3UplinkMessage { +public final class Ttnv3UplinkMessage implements ILoraWanUplink { @JsonProperty("end_device_ids") EndDeviceIds endDeviceIds = new EndDeviceIds(); @@ -86,8 +88,9 @@ public final class Ttnv3UplinkMessage { private double altitude = Double.NaN; } - public LoraWanUplinkMessage toUplinkMessage() { - LoraWanUplinkMessage uplink = new LoraWanUplinkMessage(Instant.parse(receivedAt), + @Override + public LoraWanUplinkMessage toLoraWanUplinkMessage() { + LoraWanUplinkMessage uplink = new LoraWanUplinkMessage("TheThingsNetwork", Instant.parse(receivedAt), endDeviceIds.applicationIds.applicationId, endDeviceIds.deviceId, uplinkMessage.fcnt, uplinkMessage.fport, uplinkMessage.payload); for (RxMetadata metadata : uplinkMessage.rxMetadata) { diff --git a/ttnhabbridge/src/test/java/nl/sikken/bertrik/hab/PayloadDecoderTest.java b/ttnhabbridge/src/test/java/nl/sikken/bertrik/hab/PayloadDecoderTest.java index 8d9c1e9..b77e645 100644 --- a/ttnhabbridge/src/test/java/nl/sikken/bertrik/hab/PayloadDecoderTest.java +++ b/ttnhabbridge/src/test/java/nl/sikken/bertrik/hab/PayloadDecoderTest.java @@ -29,8 +29,8 @@ public final class PayloadDecoderTest { */ @Test public void testCayenne2() throws DecodeException { - LoraWanUplinkMessage message = new LoraWanUplinkMessage(Instant.parse("2020-02-05T22:00:58.930936Z"), "test", "test", - 123, 1, Base64.getDecoder().decode("AYgH1ecAzV4AC7gCZwArAwIBhg==")); + LoraWanUplinkMessage message = new LoraWanUplinkMessage("LoRaWAN", Instant.parse("2020-02-05T22:00:58.930936Z"), + "test", "test", 123, 1, Base64.getDecoder().decode("AYgH1ecAzV4AC7gCZwArAwIBhg==")); // decode payload PayloadDecoder decoder = new PayloadDecoder(EPayloadEncoding.CAYENNE); Sentence sentence = decoder.decode(message); diff --git a/ttnhabbridge/src/test/java/nl/sikken/bertrik/hab/lorawan/HeliumUplinkMessageTest.java b/ttnhabbridge/src/test/java/nl/sikken/bertrik/hab/lorawan/HeliumUplinkMessageTest.java index b46017d..b48bfdf 100644 --- a/ttnhabbridge/src/test/java/nl/sikken/bertrik/hab/lorawan/HeliumUplinkMessageTest.java +++ b/ttnhabbridge/src/test/java/nl/sikken/bertrik/hab/lorawan/HeliumUplinkMessageTest.java @@ -40,7 +40,7 @@ public final class HeliumUplinkMessageTest { Assert.assertEquals(-7.5, hotSpot.snr, 0.1); // decode to LoRaWAN message - LoraWanUplinkMessage lorawan = helium.toUplinkMessage(); + LoraWanUplinkMessage lorawan = helium.toLoraWanUplinkMessage(); Assert.assertEquals(Instant.parse("2021-09-12T14:39:25.832Z"), lorawan.getTime()); Assert.assertEquals("6081F9D16837130E", lorawan.getAppId()); Assert.assertEquals("kissmapper", lorawan.getDevId()); diff --git a/ttnhabbridge/src/test/java/nl/sikken/bertrik/hab/lorawan/Ttnv3UplinkMessageTest.java b/ttnhabbridge/src/test/java/nl/sikken/bertrik/hab/lorawan/Ttnv3UplinkMessageTest.java index 9c0319a..5d9fc4d 100644 --- a/ttnhabbridge/src/test/java/nl/sikken/bertrik/hab/lorawan/Ttnv3UplinkMessageTest.java +++ b/ttnhabbridge/src/test/java/nl/sikken/bertrik/hab/lorawan/Ttnv3UplinkMessageTest.java @@ -20,7 +20,7 @@ public final class Ttnv3UplinkMessageTest { InputStream is = getClass().getClassLoader().getResourceAsStream("ttnv3_uplink.json"); Ttnv3UplinkMessage message = MAPPER.readValue(is, Ttnv3UplinkMessage.class); - LoraWanUplinkMessage uplinkMessage = message.toUplinkMessage(); + LoraWanUplinkMessage uplinkMessage = message.toLoraWanUplinkMessage(); Assert.assertEquals("test2id", uplinkMessage.getAppId()); Assert.assertEquals("v3demo1", uplinkMessage.getDevId());