Skip to content

Commit

Permalink
Merge pull request #344 from IBMStreams/develop
Browse files Browse the repository at this point in the history
Merge latest changes from develop to master
  • Loading branch information
schulz2 authored Sep 13, 2018
2 parents 5945f4c + d9590ab commit 30954e5
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@
// This class provides configuration data stored in PE
public class PropertyProvider {

private ProcessingElement pe;
private String configurationName;
private ProcessingElement pe = null;
private String configurationName = null;
private Map<String,String> configuration = null;

public PropertyProvider(ProcessingElement pe, String configurationName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageFormatException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

import com.ibm.streams.operator.logging.LogLevel;
import com.ibm.streams.operator.logging.TraceLevel;
import com.ibm.streams.operator.metrics.Metric;
import com.ibm.streamsx.messaging.common.PropertyProvider;

Expand Down Expand Up @@ -81,7 +81,7 @@ class JMSConnectionHelper implements ExceptionListener {
private final boolean useClientAckMode;

// JMS message selector
private String messageSelector;
private String messageSelector = null;

// Timestamp of session creation
private long sessionCreationTime;
Expand All @@ -97,12 +97,12 @@ class JMSConnectionHelper implements ExceptionListener {

private PropertyProvider propertyProvider = null;

private String userPropName;
private String userPropName = null;

private String passwordPropName;
private String passwordPropName = null;

// CR queue name
private String destinationCR;
private String destinationCR = null;

private ConnectionDocumentParser connectionDocumentParser = null;

Expand Down Expand Up @@ -226,7 +226,6 @@ private Connection getConnect() {
this(connectionDocumentParser, reconnectionPolicy, reconnectionBound, period, isProducer,
maxMessageRetry, messageRetryDelay, nReconnectionAttempts, logger, useClientAckMode, msgSelectorCR, propertyProvider, userPropName, passwordPropName, destinationCR);
this.nFailedInserts = nFailedInserts;

}


Expand Down Expand Up @@ -255,8 +254,8 @@ public void createInitialConnectionNoRetry() throws ConnectionException {
// this subroutine creates the initial jndi context by taking the mandatory
// and optional parameters

private void createAdministeredObjects()
throws NamingException {
private void createAdministeredObjects() throws NamingException {
tracer.log(TraceLevel.TRACE, "Begin createAdministeredObjects()"); //$NON-NLS-1$

// Create a JNDI API InitialContext object if none exists
// create a properties object and add all the mandatory and optional
Expand Down Expand Up @@ -289,14 +288,18 @@ private void createAdministeredObjects()
destCR = (Destination) jndiContext.lookup(destinationCR);
}

tracer.log(TraceLevel.TRACE, "End createAdministeredObjects()"); //$NON-NLS-1$
return;
}

// this subroutine creates the connection, it always verifies if we have a
// successfull existing connection before attempting to create one.
private synchronized void createConnection() throws ConnectionException,
InterruptedException {
private synchronized void createConnection() throws ConnectionException, InterruptedException {

tracer.log(TraceLevel.TRACE, "Begin createConnection()"); //$NON-NLS-1$

int nConnectionAttempts = 0;

// Check if connection exists or not.
if (!isConnectValid()) {

Expand Down Expand Up @@ -350,16 +353,19 @@ else if (reconnectionPolicy == ReconnectionPolicies.BoundedRetry
}
// sleep for delay interval
Thread.sleep(delay);
// Incremet the metric nReconnectionAttempts
// Increment the metric nReconnectionAttempts
nReconnectionAttempts.incrementValue(1);
}

}

}
tracer.log(TraceLevel.TRACE, "End createConnection()"); //$NON-NLS-1$
}

private synchronized void createConnectionNoRetry() throws ConnectionException {

tracer.log(TraceLevel.TRACE, "Begin createConnectionNoRetry()"); //$NON-NLS-1$

if (!isConnectValid()) {
try {
Expand All @@ -370,11 +376,14 @@ private synchronized void createConnectionNoRetry() throws ConnectionException {
Messages.getString("CONNECTION_TO_JMS_FAILED_NO_RECONNECT_AS_RECONNECT_POLICY_DOES_NOT_APPLY")); //$NON-NLS-1$
}
}

tracer.log(TraceLevel.TRACE, "End createConnectionNoRetry()"); //$NON-NLS-1$
}

// this subroutine creates the connection, producer and consumer, throws a
// JMSException if it fails
private boolean connect(boolean isProducer) throws JMSException {
tracer.log(TraceLevel.TRACE, "Begin connect()"); //$NON-NLS-1$

// Create connection.
if (userPrincipal != null && !userPrincipal.isEmpty() &&
Expand Down Expand Up @@ -434,45 +443,58 @@ private boolean connect(boolean isProducer) throws JMSException {
getConnect().start();
}
tracer.log (LogLevel.INFO, "connection successfully created"); //$NON-NLS-1$
tracer.log(TraceLevel.TRACE, "End connect()"); //$NON-NLS-1$

// create connection is successful, return true
return true;
}

private boolean refreshUserCredential() {

tracer.log(TraceLevel.TRACE, "Begin refreshUserCredential()"); //$NON-NLS-1$
if(propertyProvider == null) {
tracer.log(TraceLevel.TRACE, "End refreshUserCredential() - there is no application configuration"); //$NON-NLS-1$
return false;
}

String userName = propertyProvider.getProperty(userPropName);
String password = propertyProvider.getProperty(passwordPropName, false);

tracer.log(TraceLevel.DEBUG, "User name retrieved from application configuration: " + userName ); //$NON-NLS-1$
if(password != null && !password.isEmpty())
tracer.log(TraceLevel.DEBUG, "Password retrieved from application configuration"); //$NON-NLS-1$
else
tracer.log(TraceLevel.DEBUG, "No password retrieved from application configuration"); //$NON-NLS-1$

// TODO: Aren't the following checks kind of redundant?
if(this.userPrincipal == userName && this.userCredential == password) {
tracer.log(TraceLevel.TRACE, "End refreshUserCredential() - user credentials unchanged"); //$NON-NLS-1$
return false;
}

if((this.userPrincipal != null && userName != null && this.userPrincipal.equals(userName))
&& (this.userCredential != null && password != null && this.userCredential.equals(password))) {
tracer.log(TraceLevel.TRACE, "End refreshUserCredential() - user credentials unchanged"); //$NON-NLS-1$
return false;
}

logger.log(LogLevel.INFO, "USER_CREDENTIALS_UPDATED"); //$NON-NLS-1$
this.userPrincipal = userName;
this.userCredential = password;

tracer.log(TraceLevel.TRACE, "End refreshUserCredential()"); //$NON-NLS-1$
return true;
}

// subroutine which on receiving a message, send it to the
// destination,update the metric
// nFailedInserts if the send fails
boolean sendMessage(Message message) throws ConnectionException, InterruptedException {

boolean sendMessage(Message message) throws ConnectionException,
InterruptedException {
tracer.log(TraceLevel.TRACE, "Begin sendMessage()"); //$NON-NLS-1$

boolean res = false;
int count = 0;

do {

try {
Expand Down Expand Up @@ -509,13 +531,16 @@ boolean sendMessage(Message message) throws ConnectionException,
nFailedInserts.incrementValue(1);
}

tracer.log(TraceLevel.TRACE, "End sendMessage()"); //$NON-NLS-1$
return res;
}

// this subroutine receives messages from a message consumer
// This method supports the receive method with timeout
Message receiveMessage(long timeout) throws ConnectionException, InterruptedException,
JMSException {
Message receiveMessage(long timeout) throws ConnectionException, InterruptedException, JMSException {

tracer.log(TraceLevel.TRACE, "Into receiveMessage()"); //$NON-NLS-1$

try {
// try to receive a message via blocking method
synchronized (getSession()) {
Expand Down Expand Up @@ -548,6 +573,7 @@ Message receiveMessage(long timeout) throws ConnectionException, InterruptedExce
// i.e connection problems, this method raise the error back to caller.
// No connection or message retry will be attempted.
void sendMessageNoRetry(Message message) throws JMSException {
tracer.log(TraceLevel.TRACE, "Begin sendMessageNoRetry()"); //$NON-NLS-1$
try {
synchronized (getSession()) {
getProducer().send(message);
Expand All @@ -557,20 +583,26 @@ void sendMessageNoRetry(Message message) throws JMSException {
logger.log(LogLevel.WARN, "ERROR_DURING_SEND", new Object[] { e.toString() }); //$NON-NLS-1$
throw e;
}
tracer.log(TraceLevel.TRACE, "End sendMessageNoRetry()"); //$NON-NLS-1$
}

// send a consistent region message to the consistent region queue
void sendCRMessage(Message message) throws JMSException {

tracer.log(TraceLevel.TRACE, "Begin sendCRMessage()"); //$NON-NLS-1$

synchronized (getSession()) {
getProducerCR().send(message);
}


tracer.log(TraceLevel.TRACE, "End sendCRMessage()"); //$NON-NLS-1$
}

// receive a message from consistent region queue
Message receiveCRMessage(long timeout) throws JMSException {

tracer.log(TraceLevel.TRACE, "Into receiveCRMessage()"); //$NON-NLS-1$

synchronized (getSession()) {
return (getConsumerCR().receive(timeout));
}
Expand All @@ -579,6 +611,8 @@ Message receiveCRMessage(long timeout) throws JMSException {
// Recovers session causing unacknowledged message to be re-delivered
public void recoverSession() throws JMSException, ConnectionException, InterruptedException {

tracer.log(TraceLevel.TRACE, "Begin recoverSession()"); //$NON-NLS-1$

try {
synchronized (getSession()) {
tracer.log(LogLevel.INFO, "recoverSession"); //$NON-NLS-1$
Expand All @@ -596,25 +630,35 @@ public void recoverSession() throws JMSException, ConnectionException, Interrupt
getSession().recover();
}
}

tracer.log(TraceLevel.TRACE, "End recoverSession()"); //$NON-NLS-1$
}

public void commitSession() throws JMSException {

tracer.log(TraceLevel.TRACE, "Begin commitSession()"); //$NON-NLS-1$

synchronized (getSession()) {
getSession().commit();
}

tracer.log(TraceLevel.TRACE, "End commitSession()"); //$NON-NLS-1$
}

public void roolbackSession() throws JMSException {

tracer.log(TraceLevel.TRACE, "Begin roolbackSession()"); //$NON-NLS-1$

synchronized (getSession()) {
getSession().rollback();
}

tracer.log(TraceLevel.TRACE, "End roolbackSession()"); //$NON-NLS-1$
}

// close and invalidate the connection
public void closeConnection() {

tracer.log(TraceLevel.TRACE, "Begin closeConnection()"); //$NON-NLS-1$
if (getSession() != null) {
try {
getSession().close();
Expand All @@ -632,5 +676,6 @@ public void closeConnection() {
setConnect(null);
}
}
tracer.log(TraceLevel.TRACE, "End closeConnection()"); //$NON-NLS-1$
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -176,30 +176,30 @@ public void setnReconnectionAttempts(Metric nReconnectionAttempts) {
private ConsistentRegionContext consistentRegionContext;

// CR queue name for storing checkpoint information
private String consistentRegionQueueName;
private String consistentRegionQueueName = null;

// variable to keep track of last successful check point sequeuce id.
private long lastSuccessfulCheckpointId = 0;

// unique id to identify messages on CR queue
private String operatorUniqueID;
private String operatorUniqueID = null;

// application configuration name
private String appConfigName;
private String appConfigName = null;

// user property name stored in application configuration
private String userPropName;
private String userPropName = null;

// password property name stored in application configuration
private String passwordPropName;
private String passwordPropName = null;

private String keyStore;
private String keyStore = null;

private String trustStore;
private String trustStore = null;

private String keyStorePassword;
private String keyStorePassword = null;

private String trustStorePassword;
private String trustStorePassword = null;

private boolean sslConnection;

Expand Down Expand Up @@ -340,7 +340,7 @@ public void setConnectionDocument(String connectionDocument) {
}

public String getConnectionDocument() {

if (connectionDocument == null)
{
connectionDocument = getOperatorContext().getPE().getApplicationDirectory() + "/etc/connections.xml"; //$NON-NLS-1$
Expand Down Expand Up @@ -444,6 +444,9 @@ public static void checkCompileTimeConsistentRegion(OperatorContextChecker check
*/
@ContextCheck(compile = false)
public static void checkParametersRuntime(OperatorContextChecker checker) {

tracer.log(TraceLevel.TRACE, "Begin checkParametersRuntime()"); //$NON-NLS-1$

OperatorContext context = checker.getOperatorContext();

if ((context.getParameterNames().contains("reconnectionBound"))) { //$NON-NLS-1$
Expand Down Expand Up @@ -536,6 +539,8 @@ public static void checkParametersRuntime(OperatorContextChecker checker) {

}
}

tracer.log(TraceLevel.TRACE, "End checkParametersRuntime()"); //$NON-NLS-1$
}

// add compile time check for either period or reconnectionBound to be
Expand Down Expand Up @@ -565,6 +570,9 @@ public synchronized void initialize(OperatorContext context)
throws ParserConfigurationException, InterruptedException,
IOException, ParseConnectionDocumentException, SAXException,
NamingException, ConnectionException, Exception {

tracer.log(TraceLevel.TRACE, "Begin initialize()"); //$NON-NLS-1$

super.initialize(context);

JmsClasspathUtil.setupClassPaths(context);
Expand Down Expand Up @@ -694,6 +702,8 @@ public synchronized void initialize(OperatorContext context)

// register for data governance
registerForDataGovernance(connectionDocumentParser.getProviderURL(), connectionDocumentParser.getDestination());

tracer.log(TraceLevel.TRACE, "End initialize()"); //$NON-NLS-1$
}

protected String getAbsolutePath(String filePath) {
Expand Down
Loading

0 comments on commit 30954e5

Please sign in to comment.