From 37a3fac96ebd352302bbb986f7af5b62b9fe0b43 Mon Sep 17 00:00:00 2001 From: eironside Date: Thu, 12 Mar 2020 16:21:14 -0600 Subject: [PATCH] Fixed Connection Parameter Bug Fixed bug where connection options not being passed on connect. Updated logging Updated paho library to 1.2.2 formatting and minor changes to poms --- mqtt-transport/pom.xml | 83 ++- .../transport/mqtt/MqttClientManager.java | 42 +- .../transport/mqtt/MqttInboundTransport.java | 529 +++++++++--------- .../mqtt/MqttInboundTransportService.java | 22 +- .../transport/mqtt/MqttOutboundTransport.java | 425 +++++++------- .../mqtt/MqttOutboundTransportService.java | 20 +- .../resources/OSGI-INF/blueprint/config.xml | 18 +- .../transport/mqtt-transport.properties | 88 +-- .../transport/mqtt-transport_en_US.properties | 88 +-- .../mqtt-inbound-transport-definition.xml | 61 +- .../mqtt-outbound-transport-definition.xml | 66 +-- pom.xml | 115 ++-- 12 files changed, 772 insertions(+), 785 deletions(-) diff --git a/mqtt-transport/pom.xml b/mqtt-transport/pom.xml index 6d84878..ea99c00 100644 --- a/mqtt-transport/pom.xml +++ b/mqtt-transport/pom.xml @@ -1,45 +1,42 @@ - - 4.0.0 - - com.esri.geoevent.parent - mqtt - 10.4.0 - - - com.esri.geoevent.transport - mqtt-transport - Esri :: GeoEvent :: Transport :: MQTT Transport - bundle - - - - org.eclipse.paho - org.eclipse.paho.client.mqttv3 - 1.1.0 - - - - - - - org.apache.felix - maven-bundle-plugin - true - - - ${project.groupId}.${project.artifactId} - ${contact.address} - ${project.version} - - !com.ibm.mqttdirect.modules.local.bindings;* - - *;scope=compile|runtime;inline=true - - com.esri.geoevent.transport.mqtt - - - - - + + 4.0.0 + + com.esri.geoevent.parent + mqtt + 10.4.0 + + com.esri.geoevent.transport + mqtt-transport + Esri :: GeoEvent :: Transport :: MQTT Transport + bundle + + + org.eclipse.paho + org.eclipse.paho.client.mqttv3 + 1.2.2 + + + + + + org.apache.felix + maven-bundle-plugin + true + + + ${project.groupId}.${project.artifactId} + ${contact.address} + ${project.version} + + !com.ibm.mqttdirect.modules.local.bindings;* + *;scope=compile|runtime;inline=true + + com.esri.geoevent.transport.mqtt + + + + + \ No newline at end of file diff --git a/mqtt-transport/src/main/java/com/esri/geoevent/transport/mqtt/MqttClientManager.java b/mqtt-transport/src/main/java/com/esri/geoevent/transport/mqtt/MqttClientManager.java index 60ac9f8..c80a781 100644 --- a/mqtt-transport/src/main/java/com/esri/geoevent/transport/mqtt/MqttClientManager.java +++ b/mqtt-transport/src/main/java/com/esri/geoevent/transport/mqtt/MqttClientManager.java @@ -50,7 +50,7 @@ public class MqttClientManager private String topic; private int qos; private String username; - private char[] password; + private String password; private boolean retain; public MqttClientManager(BundleLogger logger) @@ -97,7 +97,8 @@ public void applyProperties(TransportBase transport) throws Exception String value = (String) transport.getProperty("password").getDecryptedValue(); if (value != null) { - password = value.toCharArray(); + // password = value.toCharArray(); + password = value; } } @@ -114,7 +115,7 @@ public void applyProperties(TransportBase transport) throws Exception } else { - log.warn("Property value for QOS is not valid ({0}). Using default of 0 instead.", value); + log.debug("Property value for QOS is not valid ({0}). Using default of 0 instead.", value); } } catch (NumberFormatException e) @@ -137,6 +138,9 @@ public void applyProperties(TransportBase transport) throws Exception } } } + + if (log.isTraceEnabled()) + log.trace(this.toString()); } /** @@ -167,14 +171,22 @@ public MqttClient createMqttClient(MqttCallback callback) throws MqttException mqttClient.setCallback(callback); } + getMqttClientOptions(); + + // Let the caller connect so they can handle connection failures and reconnects. + return mqttClient; + } + + public MqttConnectOptions getMqttClientOptions() + { MqttConnectOptions options = new MqttConnectOptions(); // Connect with username and password if both are available. - if (username != null && password != null && !username.isEmpty() && password.length > 0) + if (username != null && password != null && !username.isEmpty() && !password.isEmpty()) { log.trace("Connecting to MQTT Broker using credentials. Username={0}", username); options.setUserName(username); - options.setPassword(password); + options.setPassword(password.toCharArray()); } if (ssl) @@ -187,9 +199,7 @@ public MqttClient createMqttClient(MqttCallback callback) throws MqttException } options.setCleanSession(true); - - // Let the caller connect so they can handle connection failures and reconnects. - return mqttClient; + return options; } /** @@ -215,7 +225,7 @@ public void disconnectMqtt(MqttClient mqttClient) } catch (MqttException e) { - log.error("UNABLE_TO_CLOSE", e); + log.debug("UNABLE_TO_CLOSE", e); } finally { @@ -250,22 +260,22 @@ public boolean isTopicValid(String topic) } else { - log.error("GeoEvent TOPIC = {0}. ERROR, the topic must be more than one character long or equal to '/'.", topic); + log.debug("GeoEvent TOPIC = {0}. ERROR, the topic must be more than one character long or equal to '/'.", topic); } } else { - log.error("GeoEvent TOPIC = {0}. ERROR, cannot contain the '$' symbol.", topic); + log.debug("GeoEvent TOPIC = {0}. ERROR, cannot contain the '$' symbol.", topic); } } else { - log.error("GeoEvent TOPIC cannot be EMPTY."); + log.debug("GeoEvent TOPIC cannot be EMPTY."); } } else { - log.error("GeoEvent TOPIC cannot be NULL."); + log.debug("GeoEvent TOPIC cannot be NULL."); } return result; } @@ -332,4 +342,10 @@ public boolean isRetain() { return retain; } + + @Override + public String toString() + { + return "MqttClientManager [log=" + log + ", port=" + port + ", host=" + host + ", ssl=" + ssl + ", topic=" + topic + ", qos=" + qos + ", username=" + username + ", password=" + password + ", retain=" + retain + "]"; + } } diff --git a/mqtt-transport/src/main/java/com/esri/geoevent/transport/mqtt/MqttInboundTransport.java b/mqtt-transport/src/main/java/com/esri/geoevent/transport/mqtt/MqttInboundTransport.java index cb03bf9..2f21033 100644 --- a/mqtt-transport/src/main/java/com/esri/geoevent/transport/mqtt/MqttInboundTransport.java +++ b/mqtt-transport/src/main/java/com/esri/geoevent/transport/mqtt/MqttInboundTransport.java @@ -1,263 +1,266 @@ -/* - Copyright 1995-2019 Esri - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - - For additional information, contact: - Environmental Systems Research Institute, Inc. - Attn: Contracts Dept - 380 New York Street - Redlands, California, USA 92373 - - email: contracts@esri.com -*/ - -package com.esri.geoevent.transport.mqtt; - -import java.nio.BufferOverflowException; -import java.nio.ByteBuffer; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; -import org.eclipse.paho.client.mqttv3.MqttCallback; -import org.eclipse.paho.client.mqttv3.MqttClient; -import org.eclipse.paho.client.mqttv3.MqttMessage; - -import com.esri.ges.core.component.ComponentException; -import com.esri.ges.core.component.RunningException; -import com.esri.ges.core.component.RunningState; -import com.esri.ges.framework.i18n.BundleLogger; -import com.esri.ges.framework.i18n.BundleLoggerFactory; -import com.esri.ges.transport.InboundTransportBase; -import com.esri.ges.transport.TransportDefinition; - -public class MqttInboundTransport extends InboundTransportBase implements Runnable -{ - - private static final BundleLogger log = BundleLoggerFactory.getLogger(MqttInboundTransport.class); - - private MqttClientManager mqttClientManager = new MqttClientManager(log); - private MqttClient mqttClient; - private ScheduledExecutorService executor; - private boolean isStarted = false; - - public MqttInboundTransport(TransportDefinition definition) throws ComponentException - { - super(definition); - } - - public void start() throws RunningException - { - isStarted = true; - if (getRunningState() == RunningState.STOPPED) - { - log.trace("Starting Transport"); - - this.setRunningState(RunningState.STARTING); - try - { - executor = Executors.newSingleThreadScheduledExecutor(); - executor.scheduleAtFixedRate(this, 1, 3, TimeUnit.SECONDS); - - // STARTED state is set in the thread - } - catch (Exception e) - { - String errormsg = log.translate("{0} {1}", log.translate("INIT_ERROR", "inbound"), e.getMessage()); - log.error(errormsg, e); - setErrorMessage(errormsg); - stop(); - setRunningState(RunningState.ERROR); - } - } - } - - @Override - public void run() - { - if (mqttClient == null || !mqttClient.isConnected()) - { - log.info("Creating new MQTT Client"); - try - { - if (mqttClient != null) - { - try - { - log.info("Disconnecting previous MQTT Client"); - mqttClientManager.disconnectMqtt(mqttClient); - } - finally - { - mqttClient = null; - } - } - - mqttClientManager.applyProperties(this); - mqttClient = mqttClientManager.createMqttClient(new MqttCallback() - { - - @Override - public void messageArrived(String topic, MqttMessage message) throws Exception - { - try - { - log.trace("Message arrived on topic {0}: ( {1} )", topic, message); - receive(message.getPayload()); - } - catch (RuntimeException e) - { - log.warn("ERROR_PUBLISHING", e); - } - } - - @Override - public void deliveryComplete(IMqttDeliveryToken token) - { - // not used - } - - @Override - public void connectionLost(Throwable cause) - { - log.warn("CONNECTION_LOST", cause, cause.getLocalizedMessage()); - } - }); - - mqttClient.connect(); - mqttClient.subscribe(mqttClientManager.getTopic(), mqttClientManager.getQos()); - - setRunningState(RunningState.STARTED); - log.info("Transport started mqtt client. Transport state set to STARTED."); - - } - catch (Throwable ex) - { - String errormsg = log.translate("UNEXPECTED_ERROR", ex.getMessage()); - log.error(errormsg, ex); - setErrorMessage(errormsg); - disconnectClient(); - setRunningState(RunningState.ERROR); - } - } - } - - private void receive(byte[] bytes) - { - if (bytes != null && bytes.length > 0) - { - log.debug("Received {0} bytes", bytes.length); - - String str = new String(bytes); - str = str + '\n'; - - log.trace("Byte String received {0}", str); - - byte[] newBytes = str.getBytes(); - - ByteBuffer bb = ByteBuffer.allocate(newBytes.length); - try - { - bb.put(newBytes); - bb.flip(); - byteListener.receive(bb, ""); - bb.clear(); - - log.trace("{0} received bytes sent on to the adaptor.", newBytes.length); - } - catch (BufferOverflowException boe) - { - log.error("BUFFER_OVERFLOW_ERROR", boe); - bb.clear(); - } - catch (Exception e) - { - log.error("UNEXPECTED_ERROR2", e); - disconnectClient(); - setRunningState(RunningState.ERROR); - setErrorMessage("Unexcpected Error: " + e.getMessage()); - } - } - } - - public synchronized void stop() - { - isStarted = false; - if (getRunningState() != RunningState.STOPPING && getRunningState() != RunningState.STOPPED) - { - setRunningState(RunningState.STOPPING); - log.trace("Stopping Transport"); - - if (executor != null) - { - try - { - executor.shutdownNow(); - executor.awaitTermination(3, TimeUnit.SECONDS); - } - catch (Throwable e) - { // pass - } - finally - { - executor = null; - } - } - - disconnectClient(); - - log.info("Transport stopped mqtt client and worker thread. Transport state set to STOPPED."); - } - setRunningState(RunningState.STOPPED); - } - - @Override - public boolean isClusterable() - { - return false; - } - - private void disconnectClient() - { - try - { - mqttClientManager.disconnectMqtt(mqttClient); - } - catch (Throwable e) - { // pass - } - finally - { - mqttClient = null; - } - } - - @Override - public void afterPropertiesSet() - { - log.info("Setting Prpoerties, resetting client, and updating state"); - super.afterPropertiesSet(); - disconnectClient(); - setErrorMessage(""); - if (isStarted) - { - setRunningState(RunningState.STARTED); - } - else - { - setRunningState(RunningState.STOPPED); - } - } -} +/* + Copyright 1995-2019 Esri + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + For additional information, contact: + Environmental Systems Research Institute, Inc. + Attn: Contracts Dept + 380 New York Street + Redlands, California, USA 92373 + + email: contracts@esri.com +*/ + +package com.esri.geoevent.transport.mqtt; + +import java.nio.BufferOverflowException; +import java.nio.ByteBuffer; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken; +import org.eclipse.paho.client.mqttv3.MqttCallback; +import org.eclipse.paho.client.mqttv3.MqttClient; +import org.eclipse.paho.client.mqttv3.MqttConnectOptions; +import org.eclipse.paho.client.mqttv3.MqttMessage; + +import com.esri.ges.core.component.ComponentException; +import com.esri.ges.core.component.RunningException; +import com.esri.ges.core.component.RunningState; +import com.esri.ges.framework.i18n.BundleLogger; +import com.esri.ges.framework.i18n.BundleLoggerFactory; +import com.esri.ges.transport.InboundTransportBase; +import com.esri.ges.transport.TransportDefinition; + +public class MqttInboundTransport extends InboundTransportBase implements Runnable +{ + + private static final BundleLogger log = BundleLoggerFactory.getLogger(MqttInboundTransport.class); + + private MqttClientManager mqttClientManager = new MqttClientManager(log); + private MqttClient mqttClient; + private ScheduledExecutorService executor; + private boolean isStarted = false; + + public MqttInboundTransport(TransportDefinition definition) throws ComponentException + { + super(definition); + } + + public void start() throws RunningException + { + isStarted = true; + if (getRunningState() == RunningState.STOPPED) + { + log.trace("Starting Transport"); + + this.setRunningState(RunningState.STARTING); + try + { + executor = Executors.newSingleThreadScheduledExecutor(); + executor.scheduleAtFixedRate(this, 1, 3, TimeUnit.SECONDS); + + // STARTED state is set in the thread + } + catch (Exception e) + { + String errormsg = log.translate("{0} {1}", log.translate("INIT_ERROR", "inbound"), e.getMessage()); + log.debug(errormsg, e); + setErrorMessage(errormsg); + stop(); + setRunningState(RunningState.ERROR); + } + } + } + + @Override + public void run() + { + if (mqttClient == null || !mqttClient.isConnected()) + { + log.trace("Creating new MQTT Client"); + try + { + if (mqttClient != null) + { + try + { + log.trace("Disconnecting previous MQTT Client"); + mqttClientManager.disconnectMqtt(mqttClient); + } + finally + { + mqttClient = null; + } + } + + mqttClientManager.applyProperties(this); + MqttConnectOptions connectOptions = mqttClientManager.getMqttClientOptions(); + mqttClient = mqttClientManager.createMqttClient(new MqttCallback() + { + + @Override + public void messageArrived(String topic, MqttMessage message) throws Exception + { + try + { + log.debug("Message arrived on topic {0}: ( {1} )", topic, message); + receive(message.getPayload()); + } + catch (RuntimeException e) + { + log.debug("ERROR_PUBLISHING", e); + } + } + + @Override + public void deliveryComplete(IMqttDeliveryToken token) + { + // not used + } + + @Override + public void connectionLost(Throwable cause) + { + log.debug("CONNECTION_LOST", cause, cause.getLocalizedMessage()); + } + }); + + log.trace("Connecting to mqtt using {}", mqttClientManager); + mqttClient.connect(connectOptions); + mqttClient.subscribe(mqttClientManager.getTopic(), mqttClientManager.getQos()); + + setRunningState(RunningState.STARTED); + log.trace("Transport started mqtt client. Transport state set to STARTED."); + + } + catch (Throwable ex) + { + String errormsg = log.translate("UNEXPECTED_ERROR", ex.getMessage()); + log.debug(errormsg, ex); + setErrorMessage(errormsg); + disconnectClient(); + setRunningState(RunningState.ERROR); + } + } + } + + private void receive(byte[] bytes) + { + if (bytes != null && bytes.length > 0) + { + log.trace("Received {0} bytes", bytes.length); + + String str = new String(bytes); + str = str + '\n'; + + log.trace("Byte String received {0}", str); + + byte[] newBytes = str.getBytes(); + + ByteBuffer bb = ByteBuffer.allocate(newBytes.length); + try + { + bb.put(newBytes); + bb.flip(); + byteListener.receive(bb, ""); + bb.clear(); + + log.trace("{0} received bytes sent on to the adaptor.", newBytes.length); + } + catch (BufferOverflowException boe) + { + log.debug("BUFFER_OVERFLOW_ERROR", boe); + bb.clear(); + } + catch (Exception e) + { + log.debug("UNEXPECTED_ERROR2", e); + disconnectClient(); + setRunningState(RunningState.ERROR); + setErrorMessage("Unexcpected Error: " + e.getMessage()); + } + } + } + + public synchronized void stop() + { + isStarted = false; + if (getRunningState() != RunningState.STOPPING && getRunningState() != RunningState.STOPPED) + { + setRunningState(RunningState.STOPPING); + log.trace("Stopping Transport"); + + if (executor != null) + { + try + { + executor.shutdownNow(); + executor.awaitTermination(3, TimeUnit.SECONDS); + } + catch (Throwable e) + { // pass + } + finally + { + executor = null; + } + } + + disconnectClient(); + + log.trace("Transport stopped mqtt client and worker thread. Transport state set to STOPPED."); + } + setRunningState(RunningState.STOPPED); + } + + @Override + public boolean isClusterable() + { + return false; + } + + private void disconnectClient() + { + try + { + mqttClientManager.disconnectMqtt(mqttClient); + } + catch (Throwable e) + { // pass + } + finally + { + mqttClient = null; + } + } + + @Override + public void afterPropertiesSet() + { + log.trace("Setting Prpoerties, resetting client, and updating state"); + super.afterPropertiesSet(); + disconnectClient(); + setErrorMessage(""); + if (isStarted) + { + setRunningState(RunningState.STARTED); + } + else + { + setRunningState(RunningState.STOPPED); + } + } +} diff --git a/mqtt-transport/src/main/java/com/esri/geoevent/transport/mqtt/MqttInboundTransportService.java b/mqtt-transport/src/main/java/com/esri/geoevent/transport/mqtt/MqttInboundTransportService.java index 3c6ce22..77b5e69 100644 --- a/mqtt-transport/src/main/java/com/esri/geoevent/transport/mqtt/MqttInboundTransportService.java +++ b/mqtt-transport/src/main/java/com/esri/geoevent/transport/mqtt/MqttInboundTransportService.java @@ -31,13 +31,17 @@ public class MqttInboundTransportService extends TransportServiceBase { - public MqttInboundTransportService() - { - definition = new XmlTransportDefinition(getResourceAsStream("mqtt-inbound-transport-definition.xml")); - } - - public Transport createTransport() throws ComponentException - { - return new MqttInboundTransport(definition); - } + + public MqttInboundTransportService() + { + XmlTransportDefinition xmlAdapterDefinition = new XmlTransportDefinition(getResourceAsStream("mqtt-inbound-transport-definition.xml")); + definition = xmlAdapterDefinition; + + } + + public Transport createTransport() throws ComponentException + { + return new MqttInboundTransport(definition); + } + } diff --git a/mqtt-transport/src/main/java/com/esri/geoevent/transport/mqtt/MqttOutboundTransport.java b/mqtt-transport/src/main/java/com/esri/geoevent/transport/mqtt/MqttOutboundTransport.java index df6658f..8be57ce 100644 --- a/mqtt-transport/src/main/java/com/esri/geoevent/transport/mqtt/MqttOutboundTransport.java +++ b/mqtt-transport/src/main/java/com/esri/geoevent/transport/mqtt/MqttOutboundTransport.java @@ -1,213 +1,212 @@ -/* - Copyright 1995-2019 Esri - - Licensed under the Apache License, Version 2.0 (the "License"); - you may not use this file except in compliance with the License. - You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - - For additional information, contact: - Environmental Systems Research Institute, Inc. - Attn: Contracts Dept - 380 New York Street - Redlands, California, USA 92373 - - email: contracts@esri.com -*/ - -package com.esri.geoevent.transport.mqtt; - -import java.nio.ByteBuffer; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import org.eclipse.paho.client.mqttv3.MqttClient; - -import com.esri.ges.core.component.ComponentException; -import com.esri.ges.core.component.RunningException; -import com.esri.ges.core.component.RunningState; -import com.esri.ges.core.geoevent.GeoEvent; -import com.esri.ges.framework.i18n.BundleLogger; -import com.esri.ges.framework.i18n.BundleLoggerFactory; -import com.esri.ges.transport.GeoEventAwareTransport; -import com.esri.ges.transport.OutboundTransportBase; -import com.esri.ges.transport.TransportDefinition; - -public class MqttOutboundTransport extends OutboundTransportBase implements GeoEventAwareTransport, Runnable -{ - - private static final BundleLogger log = BundleLoggerFactory.getLogger(MqttOutboundTransport.class); - - private final MqttClientManager mqttClientManager = new MqttClientManager(log); - private MqttClient mqttClient; - private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - private boolean isStarted = false; - - public MqttOutboundTransport(TransportDefinition definition) throws ComponentException - { - super(definition); - } - - @Override - public void start() throws RunningException - { - isStarted = true; - if (getRunningState() == RunningState.STOPPED) - { - log.trace("Starting MQTT Outbound Transport"); - setRunningState(RunningState.STARTING); - - try - { - mqttClientManager.applyProperties(this); - mqttClient = mqttClientManager.createMqttClient(); - mqttClient.connect(); - - setRunningState(RunningState.STARTED); - log.info("Transport started mqtt client. Transport state set to STARTED."); - } - catch (Exception e) - { - String errormsg = log.translate("{0} {1}", log.translate("INIT_ERROR", "outbound"), e.getMessage()); - log.error(errormsg, e); - setRunningState(RunningState.ERROR); - setErrorMessage(errormsg); - } - } - else - { - log.info("Cannot start transport: Not in STOPPED state."); - } - } - - @Override - public void receive(ByteBuffer buffer, String channelId) - { - receive(buffer, channelId, null); - } - - @Override - public void receive(ByteBuffer buffer, String channelID, GeoEvent geoEvent) - { - log.trace("receive {0}: {1}", channelID, geoEvent); - - String topic = mqttClientManager.getTopic(); - if (geoEvent != null && topic.contains("$")) - { - log.trace("received geoEvent, creating output topic from field values using template {0}", topic); - // Do field value substitution like "${field1}/${field2}" - topic = geoEvent.formatString(topic); - } - - log.trace("Publishing outgoing bytes to topic {0}", topic); - if (mqttClientManager.isTopicValid(topic)) - { - try - { - byte[] b = new byte[buffer.remaining()]; - buffer.get(b); - - if (mqttClient == null || !mqttClient.isConnected()) - { - mqttClientManager.disconnectMqtt(mqttClient); - mqttClientManager.applyProperties(this); - mqttClient = mqttClientManager.createMqttClient(); - mqttClient.connect(); - } - mqttClient.publish(topic, b, mqttClientManager.getQos(), mqttClientManager.isRetain()); - setErrorMessage(null); - } - catch (Exception e) - { - try - { - String errormsg = log.translate("ERROR_PUBLISHING", e.getMessage()); - log.error(errormsg, e); - setErrorMessage(errormsg); - mqttClientManager.disconnectMqtt(mqttClient); - setRunningState(RunningState.ERROR); - executor.schedule(this, 3, TimeUnit.SECONDS); - } - finally - { - mqttClient = null; - } - } - } - else - { - log.warn("GeoEvent Topic {0} is not valid, GeoEvent not published to MQTT output: {1}", topic, geoEvent); - } - } - - @Override - public synchronized void stop() - { - isStarted = false; - log.trace("Stopping Transport"); - if (getRunningState() != RunningState.STOPPING && getRunningState() != RunningState.STOPPED) - { - setRunningState(RunningState.STOPPING); - - disconnectClient(); - - log.info("Transport stopped mqtt client. Transport state set to STOPPED."); - } - setRunningState(RunningState.STOPPED); - } - - private void disconnectClient() - { - try - { - mqttClientManager.disconnectMqtt(mqttClient); - } - catch (Throwable e) - { // pass - } - finally - { - mqttClient = null; - } - } - - @Override - public void run() - { - if (getRunningState() == RunningState.ERROR) - { - setRunningState(RunningState.STARTED); - } - } - - /* - * (non-Javadoc) - * - * @see com.esri.ges.transport.TransportBase#afterPropertiesSet() - */ - @Override - public void afterPropertiesSet() - { - log.info("Setting Prpoerties, resetting client, and updating state"); - super.afterPropertiesSet(); - disconnectClient(); - setErrorMessage(""); - if (isStarted) - { - setRunningState(RunningState.STARTED); - } - else - { - setRunningState(RunningState.STOPPED); - } - } - -} +/* + Copyright 1995-2019 Esri + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + For additional information, contact: + Environmental Systems Research Institute, Inc. + Attn: Contracts Dept + 380 New York Street + Redlands, California, USA 92373 + + email: contracts@esri.com +*/ + +package com.esri.geoevent.transport.mqtt; + +import java.nio.ByteBuffer; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import org.eclipse.paho.client.mqttv3.MqttClient; + +import com.esri.ges.core.component.ComponentException; +import com.esri.ges.core.component.RunningException; +import com.esri.ges.core.component.RunningState; +import com.esri.ges.core.geoevent.GeoEvent; +import com.esri.ges.framework.i18n.BundleLogger; +import com.esri.ges.framework.i18n.BundleLoggerFactory; +import com.esri.ges.transport.GeoEventAwareTransport; +import com.esri.ges.transport.OutboundTransportBase; +import com.esri.ges.transport.TransportDefinition; + +public class MqttOutboundTransport extends OutboundTransportBase implements GeoEventAwareTransport, Runnable +{ + + private static final BundleLogger log = BundleLoggerFactory.getLogger(MqttOutboundTransport.class); + + private final MqttClientManager mqttClientManager = new MqttClientManager(log); + private MqttClient mqttClient; + private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); + private boolean isStarted = false; + + public MqttOutboundTransport(TransportDefinition definition) throws ComponentException + { + super(definition); + } + + @Override + public void start() throws RunningException + { + isStarted = true; + if (getRunningState() == RunningState.STOPPED) + { + log.trace("Starting MQTT Outbound Transport"); + setRunningState(RunningState.STARTING); + + try + { + mqttClientManager.applyProperties(this); + mqttClient = mqttClientManager.createMqttClient(); + mqttClient.connect(); + + setRunningState(RunningState.STARTED); + log.trace("Transport started mqtt client. Transport state set to STARTED."); + } + catch (Exception e) + { + String errormsg = log.translate("{0} {1}", log.translate("INIT_ERROR", "outbound"), e.getMessage()); + log.error(errormsg, e); + setRunningState(RunningState.ERROR); + setErrorMessage(errormsg); + } + } + else + { + log.trace("Cannot start transport: Not in STOPPED state."); + } + } + + @Override + public void receive(ByteBuffer buffer, String channelId) + { + receive(buffer, channelId, null); + } + + @Override + public void receive(ByteBuffer buffer, String channelID, GeoEvent geoEvent) + { + String topic = mqttClientManager.getTopic(); + + if (geoEvent != null && topic.contains("$")) + { + log.trace("received geoEvent, creating output topic from field values using template {0}", topic); + // Do field value substitution like "${field1}/${field2}" + topic = geoEvent.formatString(topic); + } + log.debug("Publishing outgoing bytes to topic {0}: {1}", topic, geoEvent); + + if (mqttClientManager.isTopicValid(topic)) + { + try + { + byte[] b = new byte[buffer.remaining()]; + buffer.get(b); + + if (mqttClient == null || !mqttClient.isConnected()) + { + mqttClientManager.disconnectMqtt(mqttClient); + mqttClientManager.applyProperties(this); + mqttClient = mqttClientManager.createMqttClient(); + mqttClient.connect(); + } + mqttClient.publish(topic, b, mqttClientManager.getQos(), mqttClientManager.isRetain()); + setErrorMessage(null); + } + catch (Exception e) + { + try + { + String errormsg = log.translate("ERROR_PUBLISHING", e.getMessage()); + log.debug(errormsg, e); + setErrorMessage(errormsg); + mqttClientManager.disconnectMqtt(mqttClient); + setRunningState(RunningState.ERROR); + executor.schedule(this, 3, TimeUnit.SECONDS); + } + finally + { + mqttClient = null; + } + } + } + else + { + log.debug("GeoEvent Topic {0} is not valid, GeoEvent not published to MQTT output: {1}", topic, geoEvent); + } + } + + @Override + public synchronized void stop() + { + isStarted = false; + log.trace("Stopping Transport"); + if (getRunningState() != RunningState.STOPPING && getRunningState() != RunningState.STOPPED) + { + setRunningState(RunningState.STOPPING); + + disconnectClient(); + + log.trace("Transport stopped mqtt client. Transport state set to STOPPED."); + } + setRunningState(RunningState.STOPPED); + } + + private void disconnectClient() + { + try + { + mqttClientManager.disconnectMqtt(mqttClient); + } + catch (Throwable e) + { // pass + } + finally + { + mqttClient = null; + } + } + + @Override + public void run() + { + if (getRunningState() == RunningState.ERROR) + { + setRunningState(RunningState.STARTED); + } + } + + /* + * (non-Javadoc) + * + * @see com.esri.ges.transport.TransportBase#afterPropertiesSet() + */ + @Override + public void afterPropertiesSet() + { + log.trace("Setting Prpoerties, resetting client, and updating state"); + super.afterPropertiesSet(); + disconnectClient(); + setErrorMessage(""); + if (isStarted) + { + setRunningState(RunningState.STARTED); + } + else + { + setRunningState(RunningState.STOPPED); + } + } + +} diff --git a/mqtt-transport/src/main/java/com/esri/geoevent/transport/mqtt/MqttOutboundTransportService.java b/mqtt-transport/src/main/java/com/esri/geoevent/transport/mqtt/MqttOutboundTransportService.java index a15a21d..1ec3c1b 100644 --- a/mqtt-transport/src/main/java/com/esri/geoevent/transport/mqtt/MqttOutboundTransportService.java +++ b/mqtt-transport/src/main/java/com/esri/geoevent/transport/mqtt/MqttOutboundTransportService.java @@ -32,15 +32,15 @@ public class MqttOutboundTransportService extends TransportServiceBase { - public MqttOutboundTransportService() - { - definition = new XmlTransportDefinition(getResourceAsStream("mqtt-outbound-transport-definition.xml")); - } - - @Override - public Transport createTransport() throws ComponentException - { - return new MqttOutboundTransport(definition); - } + public MqttOutboundTransportService() + { + definition = new XmlTransportDefinition(getResourceAsStream("mqtt-outbound-transport-definition.xml")); + } + + @Override + public Transport createTransport() throws ComponentException + { + return new MqttOutboundTransport(definition); + } } diff --git a/mqtt-transport/src/main/resources/OSGI-INF/blueprint/config.xml b/mqtt-transport/src/main/resources/OSGI-INF/blueprint/config.xml index 53801e5..142f1aa 100644 --- a/mqtt-transport/src/main/resources/OSGI-INF/blueprint/config.xml +++ b/mqtt-transport/src/main/resources/OSGI-INF/blueprint/config.xml @@ -1,11 +1,11 @@ - - - - - - - - - + + + + + + + + + \ No newline at end of file diff --git a/mqtt-transport/src/main/resources/com/esri/geoevent/transport/mqtt-transport.properties b/mqtt-transport/src/main/resources/com/esri/geoevent/transport/mqtt-transport.properties index 8074e28..ae5ebf1 100644 --- a/mqtt-transport/src/main/resources/com/esri/geoevent/transport/mqtt-transport.properties +++ b/mqtt-transport/src/main/resources/com/esri/geoevent/transport/mqtt-transport.properties @@ -1,45 +1,45 @@ -# Inbound Transport Definition -TRANSPORT_IN_LBL=MQTT Inbound Transport -TRANSPORT_IN_DESC=This inbound transport connects to an MQTT broker and receives message bytes on a specified topic. -TRANSPORT_IN_HOST_LBL=Host -TRANSPORT_IN_HOST_DESC=Host URL of the connection to the MQTT broker, with optional scheme (tcp or ssl, default tcp) and port (default 1883, or 8883 for ssl). -TRANSPORT_IN_QOS_LBL=QOS Level -TRANSPORT_IN_QOS_DESC=Quality of Service level requested for subscription (0=at most once, 1=at least once, 2=exactly once) -TRANSPORT_IN_TOPIC_LBL=Topic -TRANSPORT_IN_TOPIC_DESC=Topic of the connection to the MQTT broker. Each topic must contain at least 1 character and the topic string permits empty spaces. The forward slash alone is a valid topic. The $-symbol in the topic string is not permitted. Topics are case-sensitive; for example, "myhome/temperature" and "MyHome/Temperature" are two different topics. -TRANSPORT_IN_USERNAME_LBL=User name -TRANSPORT_IN_USERNAME_DESC=Name of the user for the connection to the MQTT broker. -TRANSPORT_IN_PASSWORD_LBL=Password -TRANSPORT_IN_PASSWORD_DESC=Password for the connection to the MQTT broker. - -# Outbound Transport Definition -TRANSPORT_OUT_LBL=MQTT Outbound Transport -TRANSPORT_OUT_DESC=This outbound transport connects to an MQTT broker and sends message bytes to a specified topic. -TRANSPORT_OUT_HOST_LBL=Host -TRANSPORT_OUT_HOST_DESC=Host URL of the connection to the MQTT broker, with optional scheme (tcp or ssl, default tcp) and port (default 1883, or 8883 for ssl). -TRANSPORT_OUT_QOS_LBL=QOS Level -TRANSPORT_OUT_QOS_DESC=Quality of Service level specified for delivery (0=at most once, 1=at least once, 2=exactly once) -TRANSPORT_OUT_RETAIN_MESSAGE_LBL=Retain message -TRANSPORT_OUT_RETAIN_MESSAGE_DESC=Instruct broker to retain last message sent on each distinct topic -TRANSPORT_OUT_TOPIC_LBL=Topic -TRANSPORT_OUT_TOPIC_DESC=Topic of the connection to the MQTT broker. Each topic must contain at least 1 character and the topic string permits empty spaces. The forward slash alone is a valid topic. You can use field substitution such as ${Field1}, however the $-symbol in the final topic string is not permitted. Topics are case-sensitive; for example, "myhome/temperature" and "MyHome/Temperature" are two different topics. -TRANSPORT_OUT_USERNAME_LBL=User name -TRANSPORT_OUT_USERNAME_DESC=Name of the user for the connection to the MQTT broker. -TRANSPORT_OUT_PASSWORD_LBL=Password -TRANSPORT_OUT_PASSWORD_DESC=Password for the connection to the MQTT broker. - -# Transport QOS option labels -TRANSPORT_COMMON_QOS_0_LBL=0 (at most once) -TRANSPORT_COMMON_QOS_1_LBL=1 (at least once) -TRANSPORT_COMMON_QOS_2_LBL=2 (exactly once) - -# Log Messages -CONNECTION_LOST=Connection lost: {0} -UNEXPECTED_ERROR=Unexpected error during MQTT receiver initialization. -UNEXPECTED_ERROR_STARTING=Unexpected error when starting the Transport. -BUFFER_OVERFLOW_ERROR=Buffer overflow. Flushing the buffer and continuing. -UNEXPECTED_ERROR2=Unexpected error, stopping the Transport. -UNABLE_TO_CLOSE=Unable to close MQTT client. - -INIT_ERROR=Unable to initialize the {0} transport. +# Inbound Transport Definition +TRANSPORT_IN_LBL=MQTT Inbound Transport +TRANSPORT_IN_DESC=This inbound transport connects to an MQTT broker and receives message bytes on a specified topic. +TRANSPORT_IN_HOST_LBL=Host +TRANSPORT_IN_HOST_DESC=Host URL of the connection to the MQTT broker, with optional scheme (tcp or ssl, default tcp) and port (default 1883, or 8883 for ssl). +TRANSPORT_IN_QOS_LBL=QOS Level +TRANSPORT_IN_QOS_DESC=Quality of Service level requested for subscription (0=at most once, 1=at least once, 2=exactly once) +TRANSPORT_IN_TOPIC_LBL=Topic +TRANSPORT_IN_TOPIC_DESC=Topic of the connection to the MQTT broker. Each topic must contain at least 1 character and the topic string permits empty spaces. The forward slash alone is a valid topic. The $-symbol in the topic string is not permitted. Topics are case-sensitive; for example, "myhome/temperature" and "MyHome/Temperature" are two different topics. +TRANSPORT_IN_USERNAME_LBL=User name +TRANSPORT_IN_USERNAME_DESC=Name of the user for the connection to the MQTT broker. +TRANSPORT_IN_PASSWORD_LBL=Password +TRANSPORT_IN_PASSWORD_DESC=Password for the connection to the MQTT broker. + +# Outbound Transport Definition +TRANSPORT_OUT_LBL=MQTT Outbound Transport +TRANSPORT_OUT_DESC=This outbound transport connects to an MQTT broker and sends message bytes to a specified topic. +TRANSPORT_OUT_HOST_LBL=Host +TRANSPORT_OUT_HOST_DESC=Host URL of the connection to the MQTT broker, with optional scheme (tcp or ssl, default tcp) and port (default 1883, or 8883 for ssl). +TRANSPORT_OUT_QOS_LBL=QOS Level +TRANSPORT_OUT_QOS_DESC=Quality of Service level specified for delivery (0=at most once, 1=at least once, 2=exactly once) +TRANSPORT_OUT_RETAIN_MESSAGE_LBL=Retain message +TRANSPORT_OUT_RETAIN_MESSAGE_DESC=Instruct broker to retain last message sent on each distinct topic +TRANSPORT_OUT_TOPIC_LBL=Topic +TRANSPORT_OUT_TOPIC_DESC=Topic of the connection to the MQTT broker. Each topic must contain at least 1 character and the topic string permits empty spaces. The forward slash alone is a valid topic. You can use field substitution such as ${Field1}, however the $-symbol in the final topic string is not permitted. Topics are case-sensitive; for example, "myhome/temperature" and "MyHome/Temperature" are two different topics. +TRANSPORT_OUT_USERNAME_LBL=User name +TRANSPORT_OUT_USERNAME_DESC=Name of the user for the connection to the MQTT broker. +TRANSPORT_OUT_PASSWORD_LBL=Password +TRANSPORT_OUT_PASSWORD_DESC=Password for the connection to the MQTT broker. + +# Transport QOS option labels +TRANSPORT_COMMON_QOS_0_LBL=0 (at most once) +TRANSPORT_COMMON_QOS_1_LBL=1 (at least once) +TRANSPORT_COMMON_QOS_2_LBL=2 (exactly once) + +# Log Messages +CONNECTION_LOST=Connection lost: {0} +UNEXPECTED_ERROR=Unexpected error during MQTT receiver initialization. +UNEXPECTED_ERROR_STARTING=Unexpected error when starting the Transport. +BUFFER_OVERFLOW_ERROR=Buffer overflow. Flushing the buffer and continuing. +UNEXPECTED_ERROR2=Unexpected error, stopping the Transport. +UNABLE_TO_CLOSE=Unable to close MQTT client. + +INIT_ERROR=Unable to initialize the {0} transport. ERROR_PUBLISHING=ERROR occurred while publishing message: {0} \ No newline at end of file diff --git a/mqtt-transport/src/main/resources/com/esri/geoevent/transport/mqtt-transport_en_US.properties b/mqtt-transport/src/main/resources/com/esri/geoevent/transport/mqtt-transport_en_US.properties index 8074e28..ae5ebf1 100644 --- a/mqtt-transport/src/main/resources/com/esri/geoevent/transport/mqtt-transport_en_US.properties +++ b/mqtt-transport/src/main/resources/com/esri/geoevent/transport/mqtt-transport_en_US.properties @@ -1,45 +1,45 @@ -# Inbound Transport Definition -TRANSPORT_IN_LBL=MQTT Inbound Transport -TRANSPORT_IN_DESC=This inbound transport connects to an MQTT broker and receives message bytes on a specified topic. -TRANSPORT_IN_HOST_LBL=Host -TRANSPORT_IN_HOST_DESC=Host URL of the connection to the MQTT broker, with optional scheme (tcp or ssl, default tcp) and port (default 1883, or 8883 for ssl). -TRANSPORT_IN_QOS_LBL=QOS Level -TRANSPORT_IN_QOS_DESC=Quality of Service level requested for subscription (0=at most once, 1=at least once, 2=exactly once) -TRANSPORT_IN_TOPIC_LBL=Topic -TRANSPORT_IN_TOPIC_DESC=Topic of the connection to the MQTT broker. Each topic must contain at least 1 character and the topic string permits empty spaces. The forward slash alone is a valid topic. The $-symbol in the topic string is not permitted. Topics are case-sensitive; for example, "myhome/temperature" and "MyHome/Temperature" are two different topics. -TRANSPORT_IN_USERNAME_LBL=User name -TRANSPORT_IN_USERNAME_DESC=Name of the user for the connection to the MQTT broker. -TRANSPORT_IN_PASSWORD_LBL=Password -TRANSPORT_IN_PASSWORD_DESC=Password for the connection to the MQTT broker. - -# Outbound Transport Definition -TRANSPORT_OUT_LBL=MQTT Outbound Transport -TRANSPORT_OUT_DESC=This outbound transport connects to an MQTT broker and sends message bytes to a specified topic. -TRANSPORT_OUT_HOST_LBL=Host -TRANSPORT_OUT_HOST_DESC=Host URL of the connection to the MQTT broker, with optional scheme (tcp or ssl, default tcp) and port (default 1883, or 8883 for ssl). -TRANSPORT_OUT_QOS_LBL=QOS Level -TRANSPORT_OUT_QOS_DESC=Quality of Service level specified for delivery (0=at most once, 1=at least once, 2=exactly once) -TRANSPORT_OUT_RETAIN_MESSAGE_LBL=Retain message -TRANSPORT_OUT_RETAIN_MESSAGE_DESC=Instruct broker to retain last message sent on each distinct topic -TRANSPORT_OUT_TOPIC_LBL=Topic -TRANSPORT_OUT_TOPIC_DESC=Topic of the connection to the MQTT broker. Each topic must contain at least 1 character and the topic string permits empty spaces. The forward slash alone is a valid topic. You can use field substitution such as ${Field1}, however the $-symbol in the final topic string is not permitted. Topics are case-sensitive; for example, "myhome/temperature" and "MyHome/Temperature" are two different topics. -TRANSPORT_OUT_USERNAME_LBL=User name -TRANSPORT_OUT_USERNAME_DESC=Name of the user for the connection to the MQTT broker. -TRANSPORT_OUT_PASSWORD_LBL=Password -TRANSPORT_OUT_PASSWORD_DESC=Password for the connection to the MQTT broker. - -# Transport QOS option labels -TRANSPORT_COMMON_QOS_0_LBL=0 (at most once) -TRANSPORT_COMMON_QOS_1_LBL=1 (at least once) -TRANSPORT_COMMON_QOS_2_LBL=2 (exactly once) - -# Log Messages -CONNECTION_LOST=Connection lost: {0} -UNEXPECTED_ERROR=Unexpected error during MQTT receiver initialization. -UNEXPECTED_ERROR_STARTING=Unexpected error when starting the Transport. -BUFFER_OVERFLOW_ERROR=Buffer overflow. Flushing the buffer and continuing. -UNEXPECTED_ERROR2=Unexpected error, stopping the Transport. -UNABLE_TO_CLOSE=Unable to close MQTT client. - -INIT_ERROR=Unable to initialize the {0} transport. +# Inbound Transport Definition +TRANSPORT_IN_LBL=MQTT Inbound Transport +TRANSPORT_IN_DESC=This inbound transport connects to an MQTT broker and receives message bytes on a specified topic. +TRANSPORT_IN_HOST_LBL=Host +TRANSPORT_IN_HOST_DESC=Host URL of the connection to the MQTT broker, with optional scheme (tcp or ssl, default tcp) and port (default 1883, or 8883 for ssl). +TRANSPORT_IN_QOS_LBL=QOS Level +TRANSPORT_IN_QOS_DESC=Quality of Service level requested for subscription (0=at most once, 1=at least once, 2=exactly once) +TRANSPORT_IN_TOPIC_LBL=Topic +TRANSPORT_IN_TOPIC_DESC=Topic of the connection to the MQTT broker. Each topic must contain at least 1 character and the topic string permits empty spaces. The forward slash alone is a valid topic. The $-symbol in the topic string is not permitted. Topics are case-sensitive; for example, "myhome/temperature" and "MyHome/Temperature" are two different topics. +TRANSPORT_IN_USERNAME_LBL=User name +TRANSPORT_IN_USERNAME_DESC=Name of the user for the connection to the MQTT broker. +TRANSPORT_IN_PASSWORD_LBL=Password +TRANSPORT_IN_PASSWORD_DESC=Password for the connection to the MQTT broker. + +# Outbound Transport Definition +TRANSPORT_OUT_LBL=MQTT Outbound Transport +TRANSPORT_OUT_DESC=This outbound transport connects to an MQTT broker and sends message bytes to a specified topic. +TRANSPORT_OUT_HOST_LBL=Host +TRANSPORT_OUT_HOST_DESC=Host URL of the connection to the MQTT broker, with optional scheme (tcp or ssl, default tcp) and port (default 1883, or 8883 for ssl). +TRANSPORT_OUT_QOS_LBL=QOS Level +TRANSPORT_OUT_QOS_DESC=Quality of Service level specified for delivery (0=at most once, 1=at least once, 2=exactly once) +TRANSPORT_OUT_RETAIN_MESSAGE_LBL=Retain message +TRANSPORT_OUT_RETAIN_MESSAGE_DESC=Instruct broker to retain last message sent on each distinct topic +TRANSPORT_OUT_TOPIC_LBL=Topic +TRANSPORT_OUT_TOPIC_DESC=Topic of the connection to the MQTT broker. Each topic must contain at least 1 character and the topic string permits empty spaces. The forward slash alone is a valid topic. You can use field substitution such as ${Field1}, however the $-symbol in the final topic string is not permitted. Topics are case-sensitive; for example, "myhome/temperature" and "MyHome/Temperature" are two different topics. +TRANSPORT_OUT_USERNAME_LBL=User name +TRANSPORT_OUT_USERNAME_DESC=Name of the user for the connection to the MQTT broker. +TRANSPORT_OUT_PASSWORD_LBL=Password +TRANSPORT_OUT_PASSWORD_DESC=Password for the connection to the MQTT broker. + +# Transport QOS option labels +TRANSPORT_COMMON_QOS_0_LBL=0 (at most once) +TRANSPORT_COMMON_QOS_1_LBL=1 (at least once) +TRANSPORT_COMMON_QOS_2_LBL=2 (exactly once) + +# Log Messages +CONNECTION_LOST=Connection lost: {0} +UNEXPECTED_ERROR=Unexpected error during MQTT receiver initialization. +UNEXPECTED_ERROR_STARTING=Unexpected error when starting the Transport. +BUFFER_OVERFLOW_ERROR=Buffer overflow. Flushing the buffer and continuing. +UNEXPECTED_ERROR2=Unexpected error, stopping the Transport. +UNABLE_TO_CLOSE=Unable to close MQTT client. + +INIT_ERROR=Unable to initialize the {0} transport. ERROR_PUBLISHING=ERROR occurred while publishing message: {0} \ No newline at end of file diff --git a/mqtt-transport/src/main/resources/mqtt-inbound-transport-definition.xml b/mqtt-transport/src/main/resources/mqtt-inbound-transport-definition.xml index 54e047e..dfa254a 100644 --- a/mqtt-transport/src/main/resources/mqtt-inbound-transport-definition.xml +++ b/mqtt-transport/src/main/resources/mqtt-inbound-transport-definition.xml @@ -1,40 +1,23 @@ - - ${com.esri.geoevent.transport.mqtt-transport.TRANSPORT_IN_DESC} - - - - - - 0 - 1 - 2 - - - - - - + + Release ${project.release}: ${com.esri.geoevent.transport.mqtt-transport.TRANSPORT_IN_DESC} + + + + + + 0 + 1 + 2 + + + + + + \ No newline at end of file diff --git a/mqtt-transport/src/main/resources/mqtt-outbound-transport-definition.xml b/mqtt-transport/src/main/resources/mqtt-outbound-transport-definition.xml index 61f4220..36e8384 100644 --- a/mqtt-transport/src/main/resources/mqtt-outbound-transport-definition.xml +++ b/mqtt-transport/src/main/resources/mqtt-outbound-transport-definition.xml @@ -1,42 +1,26 @@ - - ${com.esri.geoevent.transport.mqtt-transport.TRANSPORT_OUT_DESC} - - - - - - 0 - 1 - 2 - - - - - - - + + Release ${project.release}: ${com.esri.geoevent.transport.mqtt-transport.TRANSPORT_OUT_DESC} + + + + + + 0 + 1 + 2 + + + + + + + \ No newline at end of file diff --git a/pom.xml b/pom.xml index b1cf410..a781212 100644 --- a/pom.xml +++ b/pom.xml @@ -1,59 +1,60 @@ - 4.0.0 - - com.esri.geoevent.parent - mqtt - 10.4.0 - pom - - Esri :: GeoEvent :: MQTT - http://www.esri.com - - - UTF-8 - 4.0.0 - 4.8.1 - - - - mqtt-transport - - - - - com.esri.geoevent.sdk - geoevent-sdk - ${project.version} - provided - - - junit - junit - ${junit.version} - test - - - - - - - - org.apache.felix - maven-bundle-plugin - true - ${maven.bundle.plugin.version} - - - org.apache.maven.plugins - maven-compiler-plugin - - 1.8 - 1.8 - - - - - - + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + 4.0.0 + com.esri.geoevent.parent + mqtt + 10.4.0 + pom + Esri :: GeoEvent :: MQTT + http://www.esri.com + + UTF-8 + 4.0.0 + 4.8.1 + 2 + + + mqtt-transport + + + + com.esri.geoevent.sdk + geoevent-sdk + ${project.version} + provided + + + junit + junit + ${junit.version} + test + + + + + + src/main/resources + true + + + + + + org.apache.felix + maven-bundle-plugin + true + ${maven.bundle.plugin.version} + + + org.apache.maven.plugins + maven-compiler-plugin + + 1.8 + 1.8 + + + + + + \ No newline at end of file