Move MQTT listener process to its own module.

koppelting
Bertrik Sikken 2017-08-23 12:31:29 +02:00
rodzic 9884c5b949
commit f8cc50d5aa
5 zmienionych plików z 168 dodań i 81 usunięć

Wyświetl plik

@ -11,7 +11,7 @@ public interface ITtnHabBridgeConfig {
String getMqttServerUrl();
String getMqttClientId();
String getMqttUserName();
char[] getMqttPassword();
String getMqttPassword();
String getMqttTopic();
}

Wyświetl plik

@ -3,17 +3,11 @@ package nl.sikken.bertrik;
import java.io.File;
import java.io.IOException;
import java.nio.BufferUnderflowException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -25,39 +19,36 @@ import nl.sikken.bertrik.hab.habitat.HabReceiver;
import nl.sikken.bertrik.hab.habitat.HabitatUploader;
import nl.sikken.bertrik.hab.habitat.IHabitatRestApi;
import nl.sikken.bertrik.hab.habitat.Location;
import nl.sikken.bertrik.hab.ttn.TtnListener;
import nl.sikken.bertrik.hab.ttn.TtnMessage;
import nl.sikken.bertrik.hab.ttn.TtnMessageGateway;
/**
* Bridge between the-things-network and the habhub network.
*
* Possible improvements:
* - put the MQTT functionality in its own module
* - add uncaught exception handler
* - add example systemd startup scripts
* Possible improvements:
* - add uncaught exception handler
*/
public final class TtnHabBridge {
private static final Logger LOG = LoggerFactory.getLogger(TtnHabBridge.class);
private static final String CONFIG_FILE = "ttnhabbridge.properties";
private final ITtnHabBridgeConfig config;
private final HabitatUploader uploader;
private final TtnListener ttnListener;
private final HabitatUploader habUploader;
private final ObjectMapper mapper;
private MqttClient mqttClient;
/**
* Main application entry point.
*
* @param arguments application arguments (none taken)
* @throws IOException in case of a problem reading a config file
* @throws IOException in case of a problem reading a config file
* @throws MqttException in case of a problem starting MQTT client
*/
public static void main(String[] arguments) throws IOException, MqttException {
final ITtnHabBridgeConfig config = readConfig(new File(CONFIG_FILE));
final TtnHabBridge app = new TtnHabBridge(config);
app.start();
Runtime.getRuntime().addShutdownHook(new Thread(app::stop));
}
@ -68,93 +59,70 @@ public final class TtnHabBridge {
* @param config the application configuration
*/
private TtnHabBridge(ITtnHabBridgeConfig config) {
this.config = config;
final IHabitatRestApi restApi = HabitatUploader.newRestClient(config.getHabitatUrl(), config.getHabitatTimeout());
this.uploader = new HabitatUploader(restApi);
this.ttnListener = new TtnListener(this::handleTTNMessage, config.getMqttServerUrl(), config.getMqttUserName(),
config.getMqttPassword(), config.getMqttTopic());
final IHabitatRestApi restApi =
HabitatUploader.newRestClient(config.getHabitatUrl(), config.getHabitatTimeout());
this.habUploader = new HabitatUploader(restApi);
this.mapper = new ObjectMapper();
}
/**
* Starts the application.
*
* @throws MqttException in case of a problem starting MQTT client
*/
private void start() throws MqttException {
LOG.info("Starting TTN-HAB bridge application");
// start sub-modules
uploader.start();
// start MQTT client
this.mqttClient = new MqttClient(config.getMqttServerUrl(), config.getMqttClientId());
final MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(config.getMqttUserName());
options.setPassword(config.getMqttPassword());
options.setAutomaticReconnect(true);
mqttClient.connect(options);
mqttClient.setCallback(new MqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
final String payload = new String(message.getPayload(), StandardCharsets.US_ASCII);
LOG.info("Message arrived on topic {}: {}", topic, payload);
try {
handleMessageArrived(topic, payload);
} catch (Exception e) {
LOG.info("Exception in message handling: {}", e.getMessage());
}
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
LOG.info("deliveryComplete");
// we don't care
}
@Override
public void connectionLost(Throwable cause) {
LOG.info("connectionLost: {}", cause.getMessage());
}
});
mqttClient.subscribe(config.getMqttTopic());
habUploader.start();
ttnListener.start();
LOG.info("Started TTN-HAB bridge application");
}
private void handleMessageArrived(String topic, String message) {
/**
* Handles an incoming TTN message
*
* @param topic the topic on which the message was received
* @param message the message contents
*/
private void handleTTNMessage(String topic, String message) {
final Date now = new Date();
try {
// try to decode the payload
final TtnMessage data = mapper.readValue(message, TtnMessage.class);
final SodaqOnePayload sodaq = SodaqOnePayload.parse(data.getPayload());
LOG.info("Got SODAQ message: {}", sodaq);
final String callSign = data.getDevId();
final int id = data.getCounter();
final double latitude = sodaq.getLatitude();
final double longitude = sodaq.getLongitude();
final double altitude = sodaq.getAltitude();
// get the payload coordinates and construct a sentence
final Sentence sentence = new Sentence(callSign, id, now, latitude, longitude, altitude);
final String line = sentence.format();
// create listeners
final List<HabReceiver> receivers = new ArrayList<>();
for (TtnMessageGateway gw : data.getMetaData().getMqttGateways()) {
final HabReceiver receiver =
new HabReceiver(gw.getId(), new Location(gw.getLatitude(), gw.getLongitude(), gw.getAltitude()));
final HabReceiver receiver = new HabReceiver(gw.getId(),
new Location(gw.getLatitude(), gw.getLongitude(), gw.getAltitude()));
receivers.add(receiver);
}
// send listener data
for (HabReceiver receiver : receivers) {
uploader.scheduleListenerDataUpload(receiver, now);
habUploader.scheduleListenerDataUpload(receiver, now);
}
// send payload telemetry data
uploader.schedulePayloadTelemetryUpload(line, receivers, now);
habUploader.schedulePayloadTelemetryUpload(line, receivers, now);
} catch (IOException e) {
LOG.warn("JSON unmarshalling exception '{}' for {}", e.getMessage(), message);
} catch (BufferUnderflowException e) {
@ -164,17 +132,13 @@ public final class TtnHabBridge {
/**
* Stops the application.
* @throws MqttException
*
* @throws MqttException
*/
private void stop() {
LOG.info("Stopping TTN HAB bridge application");
try {
mqttClient.close();
} catch (MqttException e) {
// what can we do about this?
LOG.warn("Error closing MQTT client: {}", e.getMessage());
}
uploader.stop();
ttnListener.stop();
habUploader.stop();
}
private static ITtnHabBridgeConfig readConfig(File file) throws IOException {

Wyświetl plik

@ -111,8 +111,8 @@ public final class TtnHabBridgeConfig implements ITtnHabBridgeConfig {
}
@Override
public char[] getMqttPassword() {
return props.get(EConfigItem.MQTT_USER_PASS).toCharArray();
public String getMqttPassword() {
return props.get(EConfigItem.MQTT_USER_PASS);
}
@Override

Wyświetl plik

@ -0,0 +1,16 @@
package nl.sikken.bertrik.hab.ttn;
/**
* Interface of the callback from the TTN listener.
*/
public interface IMessageReceived {
/**
* Indicates that a message was received.
*
* @param topic the topic
* @param message the message
*/
void messageReceived(String topic, String message);
}

Wyświetl plik

@ -0,0 +1,107 @@
package nl.sikken.bertrik.hab.ttn;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Listener process for receiving data from the TTN.
*/
public final class TtnListener {
private static Logger LOG = LoggerFactory.getLogger(TtnListener.class);
private static final long DISCONNECT_TIMEOUT_MS = 3000;
private final String clientId;
private final IMessageReceived callback;
private final String url;
private final String userName;
private final String password;
private final String topic;
private MqttClient mqttClient;
/**
* Constructor.
*
* @param receiveCallback the interface for indicating a received message.
* @param url the URL of the MQTT server
* @param userName the user name
* @param password the password
* @param topic the MQTT topic
*/
public TtnListener(IMessageReceived receiveCallback, String url, String userName, String password, String topic) {
this.callback = receiveCallback;
this.url = url;
this.clientId = UUID.randomUUID().toString();
this.userName = userName;
this.password = password;
this.topic = topic;
}
/**
* Starts this module.
*
* @throws MqttException
*/
public void start() throws MqttException {
LOG.info("Starting TTN listener");
// connect
LOG.info("Connecting as user '{}' to MQTT server {}", userName, url);
this.mqttClient = new MqttClient(url, clientId);
final MqttConnectOptions options = new MqttConnectOptions();
options.setUserName(userName);
options.setPassword(password.toCharArray());
options.setAutomaticReconnect(true);
mqttClient.connect(options);
// subscribe
LOG.info("Subscribing to topic '{}'", topic);
mqttClient.setCallback(new MqttCallback() {
@Override
public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
final String message = new String(mqttMessage.getPayload(), StandardCharsets.US_ASCII);
LOG.info("Message arrived on topic {}: {}", topic, message);
callback.messageReceived(topic, message);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
// we don't care
}
@Override
public void connectionLost(Throwable cause) {
LOG.info("connectionLost: {}", cause.getMessage());
}
});
mqttClient.subscribe(topic);
LOG.info("Started TTN listener");
}
/**
* Stops this module.
*/
public void stop() {
LOG.info("Stopping TTN listener");
try {
mqttClient.disconnect(DISCONNECT_TIMEOUT_MS);
mqttClient.close();
} catch (MqttException e) {
// don't care, just log
LOG.warn("Caught exception while shutting down", e.getMessage());
}
LOG.info("Stopped TTN listener");
}
}