Actually enable Helium as a source for LoRaWAN messages

master
Bertrik Sikken 2021-09-16 00:48:51 +02:00
rodzic 6a01360214
commit 69ccd9f000
11 zmienionych plików z 80 dodań i 52 usunięć

Wyświetl plik

@ -24,9 +24,11 @@ import nl.sikken.bertrik.hab.Sentence;
import nl.sikken.bertrik.hab.habitat.HabReceiver;
import nl.sikken.bertrik.hab.habitat.HabitatUploader;
import nl.sikken.bertrik.hab.habitat.Location;
import nl.sikken.bertrik.hab.lorawan.HeliumUplinkMessage;
import nl.sikken.bertrik.hab.lorawan.LoraWanUplinkMessage;
import nl.sikken.bertrik.hab.lorawan.LoraWanUplinkMessage.GatewayInfo;
import nl.sikken.bertrik.hab.lorawan.MqttListener;
import nl.sikken.bertrik.hab.lorawan.Ttnv3UplinkMessage;
/**
* Bridge between the-things-network and the habhub network.
@ -37,7 +39,7 @@ public final class TtnHabBridge {
private static final Logger LOG = LoggerFactory.getLogger(TtnHabBridge.class);
private static final String CONFIG_FILE = "ttnhabbridge.yaml";
private final MqttListener ttnListener;
private final List<MqttListener> mqttListeners = new ArrayList<>();
private final HabitatUploader habUploader;
private final PayloadDecoder decoder;
private final ExpiringCache gwCache;
@ -72,14 +74,21 @@ public final class TtnHabBridge {
return config;
}
}
/**
* Constructor.
*
* @param config the application configuration
*/
private TtnHabBridge(TtnHabBridgeConfig config) {
this.ttnListener = new MqttListener(this::handleTTNMessage, config.getTtnConfig());
if (!config.getTtnConfig().getUrl().isEmpty()) {
this.mqttListeners
.add(new MqttListener(this::handleMessage, config.getTtnConfig(), Ttnv3UplinkMessage.class));
}
if (!config.getHeliumConfig().getUrl().isEmpty()) {
this.mqttListeners
.add(new MqttListener(this::handleMessage, config.getHeliumConfig(), HeliumUplinkMessage.class));
}
this.habUploader = HabitatUploader.create(config.getHabitatConfig());
this.decoder = new PayloadDecoder(EPayloadEncoding.parse(config.getPayloadEncoding()));
this.gwCache = new ExpiringCache(Duration.ofSeconds(config.getGwCacheExpirationTime()));
@ -95,18 +104,17 @@ public final class TtnHabBridge {
// start sub-modules
habUploader.start();
ttnListener.start();
for (MqttListener listener : mqttListeners) {
listener.start();
}
LOG.info("Started TTN-HAB bridge application");
}
/**
* Handles an incoming TTN message
*
* @param textMessage the message contents
* @param now message arrival time
* Handles an incoming LoRaWAN message
*/
private void handleTTNMessage(LoraWanUplinkMessage message) {
private void handleMessage(LoraWanUplinkMessage message) {
Instant now = Instant.now();
try {
Sentence sentence = decoder.decode(message);
@ -117,7 +125,7 @@ public final class TtnHabBridge {
for (GatewayInfo gw : message.getGateways()) {
String gwName = gw.getId();
Location gwLocation = gw.getLocation();
HabReceiver receiver = new HabReceiver(gwName, gwLocation);
HabReceiver receiver = new HabReceiver(gwName, gwLocation, message.getNetwork());
receivers.add(receiver);
// send listener data only if it has a valid location and hasn't been sent
@ -144,7 +152,9 @@ public final class TtnHabBridge {
*/
private void stop() {
LOG.info("Stopping TTN HAB bridge application");
ttnListener.stop();
for (MqttListener listener : mqttListeners) {
listener.stop();
}
habUploader.stop();
LOG.info("Stopped TTN HAB bridge application");
}

Wyświetl plik

@ -7,6 +7,7 @@ public final class HabReceiver {
private final String callSign;
private final Location location;
private final String network;
/**
* Constructor.
@ -15,8 +16,13 @@ public final class HabReceiver {
* @param location the location
*/
public HabReceiver(String callSign, Location location) {
this(callSign, location, "LoRaWAN");
}
public HabReceiver(String callSign, Location location, String network) {
this.callSign = callSign;
this.location = location;
this.network = network;
}
public String getCallsign() {
@ -26,6 +32,10 @@ public final class HabReceiver {
public Location getLocation() {
return location;
}
public String getNetwork() {
return network;
}
@Override
public String toString() {

Wyświetl plik

@ -79,8 +79,6 @@ public final class HabitatUploader {
*/
public void start() {
LOG.info("Starting habitat uploader");
LOG.info("Started habitat uploader");
}
/**
@ -89,7 +87,6 @@ public final class HabitatUploader {
public void stop() {
LOG.info("Stopping habitat uploader");
executor.shutdown();
LOG.info("Stopped habitat uploader");
}
/**

Wyświetl plik

@ -35,7 +35,7 @@ public final class ListenerInformationDoc extends ListenerDoc {
protected JsonNode createDataNode() {
ObjectNode node = factory().objectNode();
node.set("callsign", factory().textNode(receiver.getCallsign()));
node.set("radio", factory().textNode("TheThingsNetwork"));
node.set("radio", factory().textNode(receiver.getNetwork()));
double altitude = receiver.getLocation().getAlt();
if (Double.isFinite(altitude)) {
String antenna = String.format(Locale.ROOT, "%.0f m", altitude);

Wyświetl plik

@ -7,8 +7,10 @@ import java.util.List;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import nl.sikken.bertrik.hab.lorawan.LoraWanUplinkMessage.ILoraWanUplink;
@JsonIgnoreProperties(ignoreUnknown = true)
public final class HeliumUplinkMessage {
public final class HeliumUplinkMessage implements ILoraWanUplink {
@JsonProperty("app_eui")
String appEui = "";
@ -50,10 +52,10 @@ public final class HeliumUplinkMessage {
double snr;
}
// convert this message to the common LoRaWAN uplink message
public LoraWanUplinkMessage toUplinkMessage() {
LoraWanUplinkMessage uplink = new LoraWanUplinkMessage(Instant.ofEpochMilli(reportedAt), appEui, name, fcnt,
port, payload);
@Override
public LoraWanUplinkMessage toLoraWanUplinkMessage() {
LoraWanUplinkMessage uplink = new LoraWanUplinkMessage("Helium", Instant.ofEpochMilli(reportedAt), appEui, name,
fcnt, port, payload);
for (HotSpot hotSpot : hotSpots) {
uplink.addGateway(hotSpot.name.trim(), hotSpot.latitude, hotSpot.longitude, Double.NaN);
}

Wyświetl plik

@ -15,6 +15,7 @@ import nl.sikken.bertrik.hab.habitat.Location;
*/
public final class LoraWanUplinkMessage {
private final String network;
private final Instant time;
private final String appId;
private final String deviceId;
@ -24,7 +25,8 @@ public final class LoraWanUplinkMessage {
private final byte[] payloadRaw;
private final List<GatewayInfo> gateways = new ArrayList<>();
public LoraWanUplinkMessage(Instant time, String appId, String deviceId, int fcnt, int port, byte[] payloadRaw) {
public LoraWanUplinkMessage(String network, Instant time, String appId, String deviceId, int fcnt, int port, byte[] payloadRaw) {
this.network = network;
this.time = Instant.from(time);
this.appId = appId;
this.deviceId = deviceId;
@ -37,6 +39,10 @@ public final class LoraWanUplinkMessage {
payloadFields.put(name, value);
}
public String getNetwork() {
return network;
}
public Instant getTime() {
return Instant.from(time);
}
@ -93,4 +99,9 @@ public final class LoraWanUplinkMessage {
}
// interface for messages that can convert themselves to a LoraWanUplinkMessage
public interface ILoraWanUplink {
LoraWanUplinkMessage toLoraWanUplinkMessage();
}
}

Wyświetl plik

@ -15,6 +15,9 @@ import org.slf4j.LoggerFactory;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import nl.sikken.bertrik.hab.lorawan.LoraWanUplinkMessage.ILoraWanUplink;
/**
* Listener process for receiving data from an MQTT server.
*/
@ -24,44 +27,44 @@ public final class MqttListener {
private static final long DISCONNECT_TIMEOUT_MS = 3000;
private final IMessageReceived callback;
private final Class<? extends ILoraWanUplink> clazz;
private final MqttClient mqttClient;
private final MqttConnectOptions options;
private final ObjectMapper objectMapper = new ObjectMapper();
public MqttListener(IMessageReceived callback, MqttConfig config) {
this(callback, config.getUrl(), config.getUser(), config.getPass(), config.getTopic());
}
/**
* Constructor.
*
* @param callback the listener for a received message.
* @param url the URL of the MQTT server
* @param appId the user name
* @param appKey the password
* @param topic the MQTT topic
* @param config the MQTT configuration
* @param clazz the JSON class sent over MQTT
*/
public MqttListener(IMessageReceived callback, String url, String appId, String appKey, String topic) {
LOG.info("Creating client for MQTT server '{}' for app '{}'", url, appId);
@SuppressFBWarnings({"EI_EXPOSE_REP2"})
public MqttListener(IMessageReceived callback, MqttConfig config, Class<? extends ILoraWanUplink> clazz) {
LOG.info("Creating client for MQTT server '{}' for app '{}'", config.getUrl(), config.getUser());
try {
this.mqttClient = new MqttClient(url, MqttClient.generateClientId(), new MemoryPersistence());
this.mqttClient = new MqttClient(config.getUrl(), MqttClient.generateClientId(), new MemoryPersistence());
} catch (MqttException e) {
throw new IllegalArgumentException(e);
}
this.callback = callback;
mqttClient.setCallback(new MqttCallbackHandler(mqttClient, topic, this::handleMessage));
this.clazz = clazz;
mqttClient.setCallback(new MqttCallbackHandler(mqttClient, config.getTopic(), this::handleMessage));
// create connect options
options = new MqttConnectOptions();
options.setUserName(appId);
options.setPassword(appKey.toCharArray());
options.setUserName(config.getUser());
options.setPassword(config.getPass().toCharArray());
options.setAutomaticReconnect(true);
}
// notify our caller in a thread safe manner
private void handleMessage(String topic, String payload) {
try {
LoraWanUplinkMessage uplinkMessage = convertMessage(topic, payload);
ILoraWanUplink uplink = objectMapper.readValue(payload, clazz);
LoraWanUplinkMessage uplinkMessage = uplink.toLoraWanUplinkMessage();
callback.messageReceived(uplinkMessage);
} catch (JsonProcessingException e) {
LOG.warn("Caught {}", e.getMessage());
@ -71,21 +74,13 @@ public final class MqttListener {
}
}
// package private for testing
LoraWanUplinkMessage convertMessage(String topic, String payload) throws JsonProcessingException {
Ttnv3UplinkMessage v3message = objectMapper.readValue(payload, Ttnv3UplinkMessage.class);
return v3message.toUplinkMessage();
}
/**
* Starts this module.
*
* @throws MqttException in case something went wrong with MQTT
*/
public void start() throws MqttException {
LOG.info("Starting MQTT listener");
LOG.info("Connecting to MQTT server");
LOG.info("Starting MQTT listener {}", mqttClient.getServerURI());
mqttClient.connect(options);
}

Wyświetl plik

@ -7,6 +7,8 @@ import java.util.List;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import nl.sikken.bertrik.hab.lorawan.LoraWanUplinkMessage.ILoraWanUplink;
/**
* Representation of the TTNv3 uplink message.<br>
* <br>
@ -14,7 +16,7 @@ import com.fasterxml.jackson.annotation.JsonProperty;
* All sub-structures are contained in this file too.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public final class Ttnv3UplinkMessage {
public final class Ttnv3UplinkMessage implements ILoraWanUplink {
@JsonProperty("end_device_ids")
EndDeviceIds endDeviceIds = new EndDeviceIds();
@ -86,8 +88,9 @@ public final class Ttnv3UplinkMessage {
private double altitude = Double.NaN;
}
public LoraWanUplinkMessage toUplinkMessage() {
LoraWanUplinkMessage uplink = new LoraWanUplinkMessage(Instant.parse(receivedAt),
@Override
public LoraWanUplinkMessage toLoraWanUplinkMessage() {
LoraWanUplinkMessage uplink = new LoraWanUplinkMessage("TheThingsNetwork", Instant.parse(receivedAt),
endDeviceIds.applicationIds.applicationId, endDeviceIds.deviceId, uplinkMessage.fcnt,
uplinkMessage.fport, uplinkMessage.payload);
for (RxMetadata metadata : uplinkMessage.rxMetadata) {

Wyświetl plik

@ -29,8 +29,8 @@ public final class PayloadDecoderTest {
*/
@Test
public void testCayenne2() throws DecodeException {
LoraWanUplinkMessage message = new LoraWanUplinkMessage(Instant.parse("2020-02-05T22:00:58.930936Z"), "test", "test",
123, 1, Base64.getDecoder().decode("AYgH1ecAzV4AC7gCZwArAwIBhg=="));
LoraWanUplinkMessage message = new LoraWanUplinkMessage("LoRaWAN", Instant.parse("2020-02-05T22:00:58.930936Z"),
"test", "test", 123, 1, Base64.getDecoder().decode("AYgH1ecAzV4AC7gCZwArAwIBhg=="));
// decode payload
PayloadDecoder decoder = new PayloadDecoder(EPayloadEncoding.CAYENNE);
Sentence sentence = decoder.decode(message);

Wyświetl plik

@ -40,7 +40,7 @@ public final class HeliumUplinkMessageTest {
Assert.assertEquals(-7.5, hotSpot.snr, 0.1);
// decode to LoRaWAN message
LoraWanUplinkMessage lorawan = helium.toUplinkMessage();
LoraWanUplinkMessage lorawan = helium.toLoraWanUplinkMessage();
Assert.assertEquals(Instant.parse("2021-09-12T14:39:25.832Z"), lorawan.getTime());
Assert.assertEquals("6081F9D16837130E", lorawan.getAppId());
Assert.assertEquals("kissmapper", lorawan.getDevId());

Wyświetl plik

@ -20,7 +20,7 @@ public final class Ttnv3UplinkMessageTest {
InputStream is = getClass().getClassLoader().getResourceAsStream("ttnv3_uplink.json");
Ttnv3UplinkMessage message = MAPPER.readValue(is, Ttnv3UplinkMessage.class);
LoraWanUplinkMessage uplinkMessage = message.toUplinkMessage();
LoraWanUplinkMessage uplinkMessage = message.toLoraWanUplinkMessage();
Assert.assertEquals("test2id", uplinkMessage.getAppId());
Assert.assertEquals("v3demo1", uplinkMessage.getDevId());