diff --git a/modules/core/src/main/java/org/apache/synapse/config/SynapseConfigUtils.java b/modules/core/src/main/java/org/apache/synapse/config/SynapseConfigUtils.java index f1c9aaa59e..ead80377b0 100644 --- a/modules/core/src/main/java/org/apache/synapse/config/SynapseConfigUtils.java +++ b/modules/core/src/main/java/org/apache/synapse/config/SynapseConfigUtils.java @@ -32,6 +32,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.synapse.MessageContext; import org.apache.synapse.SynapseConstants; import org.apache.synapse.SynapseException; import org.apache.synapse.aspects.AspectConfiguration; @@ -53,12 +54,10 @@ import org.xml.sax.InputSource; import javax.activation.DataHandler; -import javax.net.ssl.HostnameVerifier; import javax.net.ssl.HttpsURLConnection; import javax.net.ssl.KeyManager; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLSession; import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManagerFactory; import javax.xml.stream.XMLInputFactory; @@ -86,7 +85,8 @@ import java.util.List; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; - +import java.util.regex.Matcher; +import java.util.regex.Pattern; @SuppressWarnings({"UnusedDeclaration"}) public class SynapseConfigUtils { @@ -895,6 +895,32 @@ public static boolean isFailSafeEnabled(String componentName) { return false; } + /** + * Replaces occurrences of a pattern in the input string with corresponding property values from the + * message context. + * + * @param input the input string containing patterns to be replaced + * @param pattern the pattern to be matched in the input string + * @param messageContext the message context containing property values + * @return the resulting string with patterns replaced by property values + */ + public static String replacePatternWithProperties(String input, Pattern pattern, MessageContext messageContext) { + Matcher matcher = pattern.matcher(input); + StringBuilder result = new StringBuilder(); + + int s = 0; + while (matcher.find()) { + String propertyName = matcher.group(1); + Object propertyValue = messageContext.getProperty(propertyName); + if (propertyValue != null) { + result.append(input.substring(s, matcher.start())); + result.append(propertyValue.toString()); + s = matcher.end(); + } + } + result.append(input.substring(s)); + return result.toString(); + } public static SynapseConfiguration getSynapseConfiguration(String tenantDomain){ return lastRegisteredSynapseConfigurationMap.get(tenantDomain); diff --git a/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/EndpointDefinitionFactory.java b/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/EndpointDefinitionFactory.java index ae1e925562..c4f36618ed 100644 --- a/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/EndpointDefinitionFactory.java +++ b/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/EndpointDefinitionFactory.java @@ -26,8 +26,11 @@ import org.apache.synapse.SynapseConstants; import org.apache.synapse.SynapseException; import org.apache.synapse.aspects.AspectConfiguration; +import org.apache.synapse.config.xml.ValueFactory; import org.apache.synapse.config.xml.XMLConfigConstants; +import org.apache.synapse.endpoints.EPConstants; import org.apache.synapse.endpoints.EndpointDefinition; +import org.apache.synapse.mediators.Value; import org.apache.synapse.util.xpath.SynapseXPath; import org.jaxen.JaxenException; @@ -46,6 +49,7 @@ public class EndpointDefinitionFactory implements DefinitionFactory{ */ public EndpointDefinition createDefinition(OMElement elem) { EndpointDefinition definition = new EndpointDefinition(); + ValueFactory valueFactory = new ValueFactory(); OMAttribute optimize = elem.getAttribute(new QName(XMLConfigConstants.NULL_NAMESPACE, "optimize")); @@ -159,20 +163,14 @@ public EndpointDefinition createDefinition(OMElement elem) { String d = duration.getText(); if (d != null) { try { - Pattern pattern = Pattern.compile("\\{.*\\}"); - if (pattern.matcher(d).matches()) { - d = d.trim().substring(1, d.length() - 1); - SynapseXPath xpath = new SynapseXPath(d); - definition.setDynamicTimeoutExpression(xpath); - } else { - long timeoutMilliSeconds = Long.parseLong(d.trim()); - definition.setTimeoutDuration(timeoutMilliSeconds); + Value timeoutDurationValue = valueFactory.createTextValue(duration); + if (timeoutDurationValue.getKeyValue() != null) { + Long.parseLong(timeoutDurationValue.getKeyValue()); } + definition.setTimeoutDuration(timeoutDurationValue); } catch (NumberFormatException e) { handleException("Endpoint timeout duration expected as a " + "number but was not a number"); - } catch (JaxenException e) { - handleException("Couldn't assign dynamic endpoint timeout as Synapse expression"); } } } @@ -180,15 +178,16 @@ public EndpointDefinition createDefinition(OMElement elem) { OMElement action = timeout.getFirstChildWithName( new QName(XMLConfigConstants.SYNAPSE_NAMESPACE, "responseAction")); if (action != null && action.getText() != null) { - String actionString = action.getText(); - if ("discard".equalsIgnoreCase(actionString.trim())) { - definition.setTimeoutAction(SynapseConstants.DISCARD); - } else if ("fault".equalsIgnoreCase(actionString.trim())) { - definition.setTimeoutAction(SynapseConstants.DISCARD_AND_FAULT); - } else { - handleException("Invalid timeout action, action : " - + actionString + " is not supported"); + Value timeoutActionValue = valueFactory.createTextValue(action); + + if (timeoutActionValue.getKeyValue() != null && + !timeoutActionValue.getKeyValue().equalsIgnoreCase(EPConstants.NEVER) && + !timeoutActionValue.getKeyValue().equalsIgnoreCase(EPConstants.DISCARD) && + !timeoutActionValue.getKeyValue().equalsIgnoreCase(EPConstants.FAULT)) { + handleException("Invalid timeout action, action : " + timeoutActionValue.getKeyValue() + + " is not supported"); } + definition.setTimeoutAction(timeoutActionValue); } } @@ -202,16 +201,20 @@ public EndpointDefinition createDefinition(OMElement elem) { SynapseConstants.SYNAPSE_NAMESPACE, XMLConfigConstants.ERROR_CODES)); if (timeoutCodes != null && timeoutCodes.getText() != null) { - StringTokenizer st = new StringTokenizer(timeoutCodes.getText().trim(), ", "); - while (st.hasMoreTokens()) { - String s = st.nextToken(); - try { - definition.addTimeoutErrorCode(Integer.parseInt(s)); - } catch (NumberFormatException e) { - handleException("The timeout error codes should be specified " + - "as valid numbers separated by commas : " + timeoutCodes.getText(), e); + Value timeoutErrorCodesValue = valueFactory.createTextValue(timeoutCodes); + if (timeoutErrorCodesValue.getKeyValue() != null) { + StringTokenizer st = new StringTokenizer(timeoutCodes.getText().trim(), ","); + while (st.hasMoreTokens()) { + String s = st.nextToken(); + try { + Integer.parseInt(s); + } catch (NumberFormatException e) { + handleException("The timeout error codes should be specified " + + "as valid numbers separated by commas : " + timeoutCodes.getText(), e); + } } } + definition.setTimeoutErrorCodes(timeoutErrorCodesValue); } OMElement retriesBeforeSuspend = markAsTimedOut.getFirstChildWithName(new QName( @@ -219,8 +222,11 @@ public EndpointDefinition createDefinition(OMElement elem) { XMLConfigConstants.RETRIES_BEFORE_SUSPENSION)); if (retriesBeforeSuspend != null && retriesBeforeSuspend.getText() != null) { try { - definition.setRetriesOnTimeoutBeforeSuspend( - Integer.parseInt(retriesBeforeSuspend.getText().trim())); + Value retriesBeforeSuspendValue = valueFactory.createTextValue(retriesBeforeSuspend); + if (retriesBeforeSuspendValue.getKeyValue() != null) { + Integer.parseInt(retriesBeforeSuspendValue.getKeyValue()); + } + definition.setRetriesOnTimeoutBeforeSuspend(retriesBeforeSuspendValue); } catch (NumberFormatException e) { handleException("The retries before suspend [for timeouts] should be " + "specified as a valid number : " + retriesBeforeSuspend.getText(), e); @@ -232,8 +238,11 @@ public EndpointDefinition createDefinition(OMElement elem) { XMLConfigConstants.RETRY_DELAY)); if (retryDelay != null && retryDelay.getText() != null) { try { - definition.setRetryDurationOnTimeout( - Integer.parseInt(retryDelay.getText().trim())); + Value retryDelayValue = valueFactory.createTextValue(retryDelay); + if (retryDelayValue.getKeyValue() != null) { + Integer.parseInt(retryDelayValue.getKeyValue()); + } + definition.setRetryDurationOnTimeout(retryDelayValue); } catch (NumberFormatException e) { handleException("The retry delay for timeouts should be specified " + "as a valid number : " + retryDelay.getText(), e); @@ -248,9 +257,9 @@ public EndpointDefinition createDefinition(OMElement elem) { log.warn("Configuration uses deprecated style for endpoint 'suspendDurationOnFailure'"); try { - definition.setInitialSuspendDuration( - 1000 * Long.parseLong(suspendDurationOnFailure.getText().trim())); - definition.setSuspendProgressionFactor((float) 1.0); + long suspendDurationValue = 1000 * Long.parseLong(suspendDurationOnFailure.getText().trim()); + definition.setInitialSuspendDuration(new Value(String.valueOf(suspendDurationValue))); + definition.setSuspendProgressionFactor(new Value(String.valueOf((float) 1.0))); } catch (NumberFormatException e) { handleException("The initial suspend duration should be specified " + "as a valid number : " + suspendDurationOnFailure.getText(), e); @@ -267,17 +276,20 @@ public EndpointDefinition createDefinition(OMElement elem) { SynapseConstants.SYNAPSE_NAMESPACE, XMLConfigConstants.ERROR_CODES)); if (suspendCodes != null && suspendCodes.getText() != null) { - - StringTokenizer st = new StringTokenizer(suspendCodes.getText().trim(), ", "); - while (st.hasMoreTokens()) { - String s = st.nextToken(); - try { - definition.addSuspendErrorCode(Integer.parseInt(s)); - } catch (NumberFormatException e) { - handleException("The suspend error codes should be specified " + - "as valid numbers separated by commas : " + suspendCodes.getText(), e); + Value suspendErrorCodesValue = valueFactory.createTextValue(suspendCodes); + if (suspendErrorCodesValue.getKeyValue() != null) { + StringTokenizer st = new StringTokenizer(suspendCodes.getText().trim(), ","); + while (st.hasMoreTokens()) { + String s = st.nextToken(); + try { + Integer.parseInt(s); + } catch (NumberFormatException e) { + handleException("The suspend error codes should be specified " + + "as valid numbers separated by commas : " + suspendCodes.getText(), e); + } } } + definition.setSuspendErrorCodes(suspendErrorCodesValue); } OMElement initialDuration = suspendOnFailure.getFirstChildWithName(new QName( @@ -285,8 +297,11 @@ public EndpointDefinition createDefinition(OMElement elem) { XMLConfigConstants.SUSPEND_INITIAL_DURATION)); if (initialDuration != null && initialDuration.getText() != null) { try { - definition.setInitialSuspendDuration( - Integer.parseInt(initialDuration.getText().trim())); + Value initialSuspendDurationValue = valueFactory.createTextValue(initialDuration); + if (initialSuspendDurationValue.getKeyValue() != null) { + Integer.parseInt(initialSuspendDurationValue.getKeyValue()); + } + definition.setInitialSuspendDuration(initialSuspendDurationValue); } catch (NumberFormatException e) { handleException("The initial suspend duration should be specified " + "as a valid number : " + initialDuration.getText(), e); @@ -298,8 +313,11 @@ public EndpointDefinition createDefinition(OMElement elem) { XMLConfigConstants.SUSPEND_PROGRESSION_FACTOR)); if (progressionFactor != null && progressionFactor.getText() != null) { try { - definition.setSuspendProgressionFactor( - Float.parseFloat(progressionFactor.getText().trim())); + Value progressionFactorValue = valueFactory.createTextValue(progressionFactor); + if (progressionFactorValue.getKeyValue() != null) { + Float.parseFloat(progressionFactorValue.getKeyValue()); + } + definition.setSuspendProgressionFactor(progressionFactorValue); } catch (NumberFormatException e) { handleException("The suspend duration progression factor should be specified " + "as a valid float : " + progressionFactor.getText(), e); @@ -311,8 +329,11 @@ public EndpointDefinition createDefinition(OMElement elem) { XMLConfigConstants.SUSPEND_MAXIMUM_DURATION)); if (maximumDuration != null && maximumDuration.getText() != null) { try { - definition.setSuspendMaximumDuration( - Long.parseLong(maximumDuration.getText().trim())); + Value suspendMaximumDurationValue = valueFactory.createTextValue(maximumDuration); + if (suspendMaximumDurationValue.getKeyValue() != null) { + Long.parseLong(suspendMaximumDurationValue.getKeyValue()); + } + definition.setSuspendMaximumDuration(suspendMaximumDurationValue); } catch (NumberFormatException e) { handleException("The maximum suspend duration should be specified " + "as a valid number : " + maximumDuration.getText(), e); @@ -330,7 +351,7 @@ public EndpointDefinition createDefinition(OMElement elem) { if (retryDisabledErrorCodes != null && retryDisabledErrorCodes.getText() != null) { StringTokenizer st = new StringTokenizer( - retryDisabledErrorCodes.getText().trim(), ", "); + retryDisabledErrorCodes.getText().trim(), ","); while (st.hasMoreTokens()) { String s = st.nextToken(); try { diff --git a/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/EndpointDefinitionSerializer.java b/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/EndpointDefinitionSerializer.java index 0c2e7a50f5..fa54066a6f 100644 --- a/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/EndpointDefinitionSerializer.java +++ b/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/EndpointDefinitionSerializer.java @@ -25,6 +25,7 @@ import org.apache.synapse.SynapseConstants; import org.apache.synapse.aspects.statistics.StatisticsConfigurable; import org.apache.synapse.config.xml.XMLConfigConstants; +import org.apache.synapse.endpoints.EPConstants; import org.apache.synapse.endpoints.EndpointDefinition; public class EndpointDefinitionSerializer { @@ -102,109 +103,141 @@ public void serializeEndpointDefinition(EndpointDefinition endpointDefinition, element.addChild(sec); } - if (endpointDefinition.getTimeoutAction() != SynapseConstants.NONE || - endpointDefinition.getTimeoutDuration() > 0 || endpointDefinition.isDynamicTimeoutEndpoint()) { + if (!endpointDefinition.getTimeoutAction().equals(EPConstants.NEVER) || endpointDefinition.isTimeoutActionDynamic() || + isPositiveNumber(endpointDefinition.getTimeoutDuration()) || endpointDefinition.isDynamicTimeoutEndpoint()) { OMElement timeout = fac.createOMElement( "timeout", SynapseConstants.SYNAPSE_OMNAMESPACE); element.addChild(timeout); - if (endpointDefinition.getTimeoutDuration() > 0 || endpointDefinition.isDynamicTimeoutEndpoint()) { + if (isPositiveNumber(endpointDefinition.getTimeoutDuration()) || endpointDefinition.isDynamicTimeoutEndpoint()) { OMElement duration = fac.createOMElement( "duration", SynapseConstants.SYNAPSE_OMNAMESPACE); if (!endpointDefinition.isDynamicTimeoutEndpoint()) { - duration.setText(Long.toString(endpointDefinition.getTimeoutDuration())); + duration.setText(endpointDefinition.getTimeoutDuration()); } else { - duration.setText('{' + endpointDefinition.getDynamicTimeoutExpression().getExpression() + '}'); + duration.setText('{' + endpointDefinition.getTimeoutDuration() + '}'); } timeout.addChild(duration); } - if (endpointDefinition.getTimeoutAction() != SynapseConstants.NONE) { + if (!endpointDefinition.getTimeoutAction().equals( + EPConstants.NEVER) || endpointDefinition.isTimeoutActionDynamic()) { OMElement action = fac.createOMElement("responseAction", SynapseConstants.SYNAPSE_OMNAMESPACE); - if (endpointDefinition.getTimeoutAction() == SynapseConstants.DISCARD) { - action.setText("discard"); - } else if (endpointDefinition.getTimeoutAction() - == SynapseConstants.DISCARD_AND_FAULT) { - action.setText("fault"); + if (endpointDefinition.isTimeoutActionDynamic()) { + action.setText('{' + endpointDefinition.getTimeoutAction() + '}'); + } else { + if (endpointDefinition.getTimeoutAction().equals(EPConstants.DISCARD)) { + action.setText(EPConstants.DISCARD); + } else if (endpointDefinition.getTimeoutAction().equals(EPConstants.FAULT)) { + action.setText(EPConstants.FAULT); + } } timeout.addChild(action); } } - if (endpointDefinition.getInitialSuspendDuration() != -1 || - !endpointDefinition.getSuspendErrorCodes().isEmpty()) { + if (!endpointDefinition.getInitialSuspendDuration().equals(String.valueOf(-1)) || endpointDefinition.isInitialSuspendDurationDynamic() || + !endpointDefinition.getSuspendErrorCodes().isEmpty() || endpointDefinition.isSuspendErrorCodesDynamic()) { OMElement suspendOnFailure = fac.createOMElement( org.apache.synapse.config.xml.XMLConfigConstants.SUSPEND_ON_FAILURE, SynapseConstants.SYNAPSE_OMNAMESPACE); - if (!endpointDefinition.getSuspendErrorCodes().isEmpty()) { + if (!endpointDefinition.getSuspendErrorCodes().isEmpty() || endpointDefinition.isSuspendErrorCodesDynamic()) { OMElement errorCodes = fac.createOMElement( org.apache.synapse.config.xml.XMLConfigConstants.ERROR_CODES, SynapseConstants.SYNAPSE_OMNAMESPACE); - errorCodes.setText(endpointDefinition.getSuspendErrorCodes(). - toString().replaceAll("[\\[\\] ]", "")); + if (endpointDefinition.isSuspendErrorCodesDynamic()) { + errorCodes.setText('{' + endpointDefinition.getSuspendErrorCodes() + '}'); + } else { + errorCodes.setText(endpointDefinition.getSuspendErrorCodes(). + toString().replaceAll("[\\[\\] ]", "")); + } suspendOnFailure.addChild(errorCodes); } - if (endpointDefinition.getInitialSuspendDuration() != -1) { + if (!endpointDefinition.getInitialSuspendDuration().equals(String.valueOf(-1)) || endpointDefinition.isInitialSuspendDurationDynamic()) { OMElement initialDuration = fac.createOMElement( org.apache.synapse.config.xml.XMLConfigConstants.SUSPEND_INITIAL_DURATION, SynapseConstants.SYNAPSE_OMNAMESPACE); - initialDuration.setText(Long.toString(endpointDefinition.getInitialSuspendDuration())); + if (!endpointDefinition.isInitialSuspendDurationDynamic()){ + initialDuration.setText(endpointDefinition.getInitialSuspendDuration()); + } else { + initialDuration.setText('{' + endpointDefinition.getInitialSuspendDuration() + '}'); + } suspendOnFailure.addChild(initialDuration); } - if (endpointDefinition.getSuspendProgressionFactor() != -1) { + if (!endpointDefinition.getSuspendProgressionFactor().equals(String.valueOf(-1)) || endpointDefinition.isSuspendProgressionFactorDynamic()) { OMElement progressionFactor = fac.createOMElement( org.apache.synapse.config.xml.XMLConfigConstants.SUSPEND_PROGRESSION_FACTOR, SynapseConstants.SYNAPSE_OMNAMESPACE); - progressionFactor.setText(Float.toString(endpointDefinition.getSuspendProgressionFactor())); + if (endpointDefinition.isSuspendProgressionFactorDynamic()) { + progressionFactor.setText('{' + endpointDefinition.getSuspendProgressionFactor() + '}'); + } else { + progressionFactor.setText(endpointDefinition.getSuspendProgressionFactor()); + } suspendOnFailure.addChild(progressionFactor); } - if (endpointDefinition.getSuspendMaximumDuration() != -1 && - endpointDefinition.getSuspendMaximumDuration() != Long.MAX_VALUE) { + if ((!endpointDefinition.getSuspendMaximumDuration().equals(String.valueOf(-1)) && + !endpointDefinition.getSuspendMaximumDuration().equals(String.valueOf(Long.MAX_VALUE))) || endpointDefinition.isSuspendMaximumDurationDynamic()) { OMElement suspendMaximum = fac.createOMElement( org.apache.synapse.config.xml.XMLConfigConstants.SUSPEND_MAXIMUM_DURATION, SynapseConstants.SYNAPSE_OMNAMESPACE); - suspendMaximum.setText(Long.toString(endpointDefinition.getSuspendMaximumDuration())); + if (endpointDefinition.isSuspendMaximumDurationDynamic()) { + suspendMaximum.setText('{' + endpointDefinition.getSuspendMaximumDuration() + '}'); + } else { + suspendMaximum.setText(endpointDefinition.getSuspendMaximumDuration()); + } suspendOnFailure.addChild(suspendMaximum); } element.addChild(suspendOnFailure); } - if (endpointDefinition.getRetryDurationOnTimeout() > 0 || - !endpointDefinition.getTimeoutErrorCodes().isEmpty()) { + if (isPositiveNumber(endpointDefinition.getRetryDurationOnTimeout()) || endpointDefinition.isRetryDurationOnTimeoutDynamic() || + !endpointDefinition.getTimeoutErrorCodes().isEmpty() || endpointDefinition.isTimeoutErrorCodesDynamic()) { OMElement markAsTimedout = fac.createOMElement( org.apache.synapse.config.xml.XMLConfigConstants.MARK_FOR_SUSPENSION, SynapseConstants.SYNAPSE_OMNAMESPACE); - if (!endpointDefinition.getTimeoutErrorCodes().isEmpty()) { + if (!endpointDefinition.getTimeoutErrorCodes().isEmpty() || endpointDefinition.isTimeoutErrorCodesDynamic()) { OMElement errorCodes = fac.createOMElement( org.apache.synapse.config.xml.XMLConfigConstants.ERROR_CODES, SynapseConstants.SYNAPSE_OMNAMESPACE); - errorCodes.setText(endpointDefinition.getTimeoutErrorCodes(). - toString().replaceAll("[\\[\\] ]", "")); + if (endpointDefinition.isTimeoutErrorCodesDynamic()) { + errorCodes.setText('{' + endpointDefinition.getTimeoutErrorCodes() + '}'); + } else { + errorCodes.setText(endpointDefinition.getTimeoutErrorCodes(). + toString().replaceAll("[\\[\\] ]", "")); + } markAsTimedout.addChild(errorCodes); } - if (endpointDefinition.getRetriesOnTimeoutBeforeSuspend() > 0) { + if (isPositiveNumber(endpointDefinition.getRetriesOnTimeoutBeforeSuspend()) || endpointDefinition.isRetriesOnTimeoutBeforeSuspendDynamic()) { OMElement retries = fac.createOMElement( org.apache.synapse.config.xml.XMLConfigConstants.RETRIES_BEFORE_SUSPENSION, SynapseConstants.SYNAPSE_OMNAMESPACE); - retries.setText(Long.toString(endpointDefinition.getRetriesOnTimeoutBeforeSuspend())); + if (endpointDefinition.isRetriesOnTimeoutBeforeSuspendDynamic()) { + retries.setText('{' + endpointDefinition.getRetriesOnTimeoutBeforeSuspend() + '}'); + } else { + retries.setText(endpointDefinition.getRetriesOnTimeoutBeforeSuspend()); + } markAsTimedout.addChild(retries); } - if (endpointDefinition.getRetryDurationOnTimeout() > 0) { + if (isPositiveNumber(endpointDefinition.getRetryDurationOnTimeout()) || endpointDefinition.isRetryDurationOnTimeoutDynamic()) { OMElement retryDelay = fac.createOMElement( org.apache.synapse.config.xml.XMLConfigConstants.RETRY_DELAY, SynapseConstants.SYNAPSE_OMNAMESPACE); - retryDelay.setText(Long.toString(endpointDefinition.getRetryDurationOnTimeout())); + if (endpointDefinition.isRetryDurationOnTimeoutDynamic()) { + retryDelay.setText('{' + endpointDefinition.getRetryDurationOnTimeout() + '}'); + } else { + retryDelay.setText(endpointDefinition.getRetryDurationOnTimeout()); + } markAsTimedout.addChild(retryDelay); } @@ -231,4 +264,16 @@ public void serializeEndpointDefinition(EndpointDefinition endpointDefinition, element.addChild(retryConfig); } } + + private boolean isPositiveNumber(String input) { + if (input == null || input.isEmpty()) { + return false; + } + try { + float number = Float.parseFloat(input); + return number > 0; + } catch (NumberFormatException e) { + return false; + } + } } diff --git a/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2FlexibleMEPClient.java b/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2FlexibleMEPClient.java index 03abfd2a4e..29d7000b04 100644 --- a/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2FlexibleMEPClient.java +++ b/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2FlexibleMEPClient.java @@ -579,27 +579,25 @@ public static void send( if (endpoint != null) { // set the timeout time and the timeout action to the callback, so that the // TimeoutHandler can detect timed out callbacks and take appropriate action. + long endpointTimeout; if (!endpoint.isDynamicTimeoutEndpoint()) { - long endpointTimeout = endpoint.getEffectiveTimeout(); - callback.setTimeout(endpointTimeout); - callback.setTimeOutAction(endpoint.getTimeoutAction()); - callback.setTimeoutType(endpoint.getEndpointTimeoutType()); + endpointTimeout = endpoint.getEffectiveTimeout(); if (log.isDebugEnabled()) { log.debug("Setting Timeout for endpoint : " + - getEndpointLogMessage(synapseOutMessageContext, axisOutMsgCtx) + - " to static timeout value : " + endpointTimeout); + getEndpointLogMessage(synapseOutMessageContext, axisOutMsgCtx) + + " to static timeout value : " + endpointTimeout); } } else { - long endpointTimeout = endpoint.evaluateDynamicEndpointTimeout(synapseOutMessageContext); - callback.setTimeout(endpointTimeout); - callback.setTimeOutAction(endpoint.getTimeoutAction()); - callback.setTimeoutType(endpoint.getEndpointTimeoutType()); + endpointTimeout = endpoint.evaluateDynamicEndpointTimeout(synapseOutMessageContext); if (log.isDebugEnabled()) { log.debug("Setting Timeout for endpoint : " + - getEndpointLogMessage(synapseOutMessageContext, axisOutMsgCtx) + - " to dynamic timeout value : " + endpointTimeout); + getEndpointLogMessage(synapseOutMessageContext, axisOutMsgCtx) + + " to dynamic timeout value : " + endpointTimeout); } } + callback.setTimeout(endpointTimeout); + callback.setTimeOutAction(endpoint.getResolvedTimeoutAction(synapseOutMessageContext)); + callback.setTimeoutType(endpoint.getEndpointTimeoutType()); } else { long globalTimeout = synapseOutMessageContext.getEnvironment().getGlobalTimeout(); callback.setTimeout(globalTimeout); diff --git a/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackReceiver.java b/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackReceiver.java index bb9579db7b..1e4574fa29 100644 --- a/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackReceiver.java +++ b/modules/core/src/main/java/org/apache/synapse/core/axis2/SynapseCallbackReceiver.java @@ -596,14 +596,14 @@ private void handleMessage(String messageID ,MessageContext response, } return; } else { - successfulEndpoint.onSuccess(); + successfulEndpoint.onSuccess(synapseOutMsgCtx); if(failOver) { popFailOverEPFromFaultStack(synapseOutMsgCtx); } } } else if(successfulEndpoint != null) { - successfulEndpoint.onSuccess(); + successfulEndpoint.onSuccess(synapseOutMsgCtx); if(failOver) { popFailOverEPFromFaultStack(synapseOutMsgCtx); } diff --git a/modules/core/src/main/java/org/apache/synapse/endpoints/AbstractEndpoint.java b/modules/core/src/main/java/org/apache/synapse/endpoints/AbstractEndpoint.java index 2ba5b72fc7..d2491a2d5d 100755 --- a/modules/core/src/main/java/org/apache/synapse/endpoints/AbstractEndpoint.java +++ b/modules/core/src/main/java/org/apache/synapse/endpoints/AbstractEndpoint.java @@ -468,7 +468,7 @@ protected boolean isTimeout(MessageContext synCtx) { } } if (errorCode != null) { - if (definition.getTimeoutErrorCodes().isEmpty()) { + if (definition.getResolvedTimeoutErrorCodes(synCtx).isEmpty()) { // if timeout codes are not defined, assume only HTTP timeout and connection close boolean isTimeout = SynapseConstants.NHTTP_CONNECTION_TIMEOUT == errorCode; boolean isClosed = SynapseConstants.NHTTP_CONNECTION_CLOSED == errorCode; @@ -482,11 +482,11 @@ protected boolean isTimeout(MessageContext synCtx) { return true; } } else { - if (definition.getTimeoutErrorCodes().contains(errorCode)) { + if (definition.getResolvedTimeoutErrorCodes(synCtx).contains(errorCode)) { if (log.isDebugEnabled()) { log.debug("Encountered a mark for suspension error : " + errorCode + " defined " + "error codes are : " - + definition.getTimeoutErrorCodes()); + + definition.getResolvedTimeoutErrorCodes(synCtx)); } return true; } @@ -542,7 +542,7 @@ protected boolean isRetry(MessageContext synCtx) { protected boolean isSuspendFault(MessageContext synCtx) { Integer errorCode = (Integer) synCtx.getProperty(SynapseConstants.ERROR_CODE); if (errorCode != null) { - if (definition.getSuspendErrorCodes().isEmpty()) { + if (definition.getResolvedSuspendErrorCodes(synCtx).isEmpty()) { // if suspend codes are not defined, any error will be fatal for the endpoint if (log.isDebugEnabled()) { log.debug(this.toString() + " encountered a fatal error : " + errorCode); @@ -550,10 +550,10 @@ protected boolean isSuspendFault(MessageContext synCtx) { return true; } else { - if (definition.getSuspendErrorCodes().contains(errorCode)) { + if (definition.getResolvedSuspendErrorCodes(synCtx).contains(errorCode)) { if (log.isDebugEnabled()) { log.debug("Encountered a suspend error : " + errorCode + - " defined suspend codes are : " + definition.getSuspendErrorCodes()); + " defined suspend codes are : " + definition.getResolvedSuspendErrorCodes(synCtx)); } return true; } @@ -575,7 +575,7 @@ protected boolean isSuspendFault(MessageContext synCtx) { */ public void onFault(MessageContext synCtx) { EndpointDefinition endpointDefinition = getDefinition(); - if (endpointDefinition != null && endpointDefinition.getTimeoutAction() == SynapseConstants.DISCARD) { + if (endpointDefinition != null && endpointDefinition.getResolvedTimeoutAction(synCtx) == SynapseConstants.DISCARD) { log.info("Ignoring fault handlers since the timeout action is set to DISCARD"); } else { logSetter(); @@ -591,6 +591,9 @@ public void onSuccess() { // do nothing } + public void onSuccess(MessageContext messageContext) { + // do nothing + } /** * Should this mediator perform tracing? True if its explicitly asked to diff --git a/modules/core/src/main/java/org/apache/synapse/endpoints/DynamicLoadbalanceEndpoint.java b/modules/core/src/main/java/org/apache/synapse/endpoints/DynamicLoadbalanceEndpoint.java index df16a85f48..13ecce36aa 100755 --- a/modules/core/src/main/java/org/apache/synapse/endpoints/DynamicLoadbalanceEndpoint.java +++ b/modules/core/src/main/java/org/apache/synapse/endpoints/DynamicLoadbalanceEndpoint.java @@ -40,6 +40,7 @@ import org.apache.synapse.endpoints.dispatch.HttpSessionDispatcher; import org.apache.synapse.endpoints.dispatch.SALSessions; import org.apache.synapse.endpoints.dispatch.SessionInformation; +import org.apache.synapse.mediators.Value; import org.apache.synapse.transport.nhttp.NhttpConstants; import java.net.MalformedURLException; @@ -456,7 +457,7 @@ private Endpoint getEndpoint(EndpointReference to, Member member, MessageContext endpoint.setName("DLB:" + member.getHostName() + ":" + member.getPort() + ":" + UUID.randomUUID()); EndpointDefinition definition = new EndpointDefinition(); - definition.setSuspendMaximumDuration(10000); + definition.setSuspendMaximumDuration(new Value("10000")); definition.setReplicationDisabled(true); definition.setAddress(to.getAddress()); endpoint.setDefinition(definition); diff --git a/modules/core/src/main/java/org/apache/synapse/endpoints/EPConstants.java b/modules/core/src/main/java/org/apache/synapse/endpoints/EPConstants.java index 3abd5ab034..b4db4a4b7a 100644 --- a/modules/core/src/main/java/org/apache/synapse/endpoints/EPConstants.java +++ b/modules/core/src/main/java/org/apache/synapse/endpoints/EPConstants.java @@ -25,4 +25,9 @@ public class EPConstants { public static final int SUPER_TENANT_ID = -1234; public static final String TENANT_INFO_ID = "tenant.info.id"; public static final String LOCAL_TRANSPORT_IDENTIFIER = "local://"; + + public static final String DISCARD = "discard"; + public static final String FAULT = "fault"; + public static final String NEVER = "never"; + public static final String EMPTRY_STRING = ""; } diff --git a/modules/core/src/main/java/org/apache/synapse/endpoints/Endpoint.java b/modules/core/src/main/java/org/apache/synapse/endpoints/Endpoint.java index fc5ef86c8b..141ac0ba73 100644 --- a/modules/core/src/main/java/org/apache/synapse/endpoints/Endpoint.java +++ b/modules/core/src/main/java/org/apache/synapse/endpoints/Endpoint.java @@ -71,6 +71,8 @@ public interface Endpoint extends ManagedLifecycle, SynapseArtifact, Nameable { */ public void onSuccess(); + public void onSuccess(MessageContext synCtx); + /** * Returns true to indicate that the endpoint is ready to service requests * @return true if endpoint is ready to service requests diff --git a/modules/core/src/main/java/org/apache/synapse/endpoints/EndpointContext.java b/modules/core/src/main/java/org/apache/synapse/endpoints/EndpointContext.java index 0d3a6d80e6..b2bcb8a1ea 100644 --- a/modules/core/src/main/java/org/apache/synapse/endpoints/EndpointContext.java +++ b/modules/core/src/main/java/org/apache/synapse/endpoints/EndpointContext.java @@ -25,6 +25,7 @@ import org.apache.synapse.SynapseException; import org.apache.synapse.config.SynapsePropertiesLoader; import org.apache.synapse.util.MessageHelper; +import org.apache.synapse.MessageContext; import org.apache.synapse.util.Replicator; import java.util.Calendar; @@ -194,12 +195,16 @@ private void recordStatistics(int state) { } } + private void setState(int state) { + setState(state, null); + } + /** * Update the internal state of the endpoint * * @param state the new state of the endpoint */ - private void setState(int state) { + private void setState(int state, MessageContext messageContext) { recordStatistics(state); @@ -209,7 +214,7 @@ private void setState(int state) { switch (state) { case ST_ACTIVE: { Replicator.setAndReplicateState(REMAINING_RETRIES_KEY, - definition.getRetriesOnTimeoutBeforeSuspend(), cfgCtx); + definition.getResolvedRetriesOnTimeoutBeforeSuspend(messageContext), cfgCtx); Replicator.setAndReplicateState(LAST_SUSPEND_DURATION_KEY, null, cfgCtx); Replicator.setAndReplicateState(REMAINING_RETRIES_KEY, maximumRetryLimit, cfgCtx); if (maximumRecursiveRetryLimit != -1) { @@ -221,24 +226,24 @@ private void setState(int state) { Integer retries = (Integer) cfgCtx.getPropertyNonReplicable(REMAINING_RETRIES_KEY); if (retries == null) { - retries = definition.getRetriesOnTimeoutBeforeSuspend(); + retries = definition.getResolvedRetriesOnTimeoutBeforeSuspend(messageContext); } if (retries <= 0) { - log.info("Endpoint : " + endpointName + printEndpointAddress() + + log.info("Endpoint : " + endpointName + printEndpointAddress(messageContext) + " has been marked for SUSPENSION," + " but no further retries remain. Thus it will be SUSPENDED."); - setState(ST_SUSPENDED); + setState(ST_SUSPENDED, messageContext); } else { Replicator.setAndReplicateState( REMAINING_RETRIES_KEY, (retries - 1), cfgCtx); long nextRetry = System.currentTimeMillis() - + definition.getRetryDurationOnTimeout(); + + definition.getResolvedRetryDurationOnTimeout(messageContext); Replicator.setAndReplicateState(NEXT_RETRY_TIME_KEY, nextRetry, cfgCtx); - log.warn("Endpoint : " + endpointName + printEndpointAddress() + + log.warn("Endpoint : " + endpointName + printEndpointAddress(messageContext) + " is marked as TIMEOUT and " + "will be retried : " + (retries - 1) + " more time/s after : " + new Date(nextRetry) + " until its marked SUSPENDED for failure"); @@ -246,14 +251,14 @@ private void setState(int state) { break; } case ST_SUSPENDED: { - computeNextRetryTimeForSuspended(); + computeNextRetryTimeForSuspended(messageContext); break; } case ST_OFF: { // mark as in maintenence, and reset all other information Replicator.setAndReplicateState(REMAINING_RETRIES_KEY, definition == null ? -1 : - definition.getRetriesOnTimeoutBeforeSuspend(), cfgCtx); + definition.getResolvedRetriesOnTimeoutBeforeSuspend(messageContext), cfgCtx); Replicator.setAndReplicateState(LAST_SUSPEND_DURATION_KEY, null, cfgCtx); Replicator.setAndReplicateState(REMAINING_RETRIES_KEY, maximumRetryLimit, cfgCtx); if (maximumRecursiveRetryLimit != -1) { @@ -275,7 +280,7 @@ private void setState(int state) { if (definition == null) return; switch (state) { case ST_ACTIVE: { - localRemainingRetries = definition.getRetriesOnTimeoutBeforeSuspend(); + localRemainingRetries = definition.getResolvedRetriesOnTimeoutBeforeSuspend(messageContext); localLastSuspendDuration = -1; maximumRemainingRetries = maximumRetryLimit; if (maximumRecursiveRetryLimit != -1) { @@ -286,21 +291,21 @@ private void setState(int state) { case ST_TIMEOUT: { int retries = localRemainingRetries; if (retries == -1) { - retries = definition.getRetriesOnTimeoutBeforeSuspend(); + retries = definition.getResolvedRetriesOnTimeoutBeforeSuspend(messageContext); } if (retries <= 0) { - log.info("Endpoint : " + endpointName + printEndpointAddress() + log.info("Endpoint : " + endpointName + printEndpointAddress(messageContext) + " has been marked for SUSPENSION, " + "but no further retries remain. Thus it will be SUSPENDED."); - setState(ST_SUSPENDED); + setState(ST_SUSPENDED, messageContext); } else { localRemainingRetries = retries - 1; localNextRetryTime = - System.currentTimeMillis() + definition.getRetryDurationOnTimeout(); - log.warn("Endpoint : " + endpointName + printEndpointAddress() + System.currentTimeMillis() + definition.getResolvedRetryDurationOnTimeout(messageContext); + log.warn("Endpoint : " + endpointName + printEndpointAddress(messageContext) + " is marked as TIMEOUT and " + "will be retried : " + localRemainingRetries + " more time/s " + "after : " + new Date(localNextRetryTime) @@ -309,13 +314,13 @@ private void setState(int state) { break; } case ST_SUSPENDED: { - computeNextRetryTimeForSuspended(); + computeNextRetryTimeForSuspended(messageContext); break; } case ST_OFF: { // mark as in maintenence, and reset all other information localRemainingRetries = definition == null ? - -1 : definition.getRetriesOnTimeoutBeforeSuspend(); + -1 : definition.getResolvedRetriesOnTimeoutBeforeSuspend(messageContext); localLastSuspendDuration = -1; maximumRemainingRetries = maximumRetryLimit; if (maximumRecursiveRetryLimit != -1) { @@ -332,21 +337,25 @@ private void setState(int state) { * Endpoint has processed a message successfully */ public void onSuccess() { + onSuccess(null); + } + + public void onSuccess(MessageContext messageContext) { if (isClustered) { Integer state = (Integer) cfgCtx.getPropertyNonReplicable(STATE_KEY); if ((state != null) && ((state != ST_ACTIVE) && (state != ST_OFF))) { - log.info("Endpoint : " + endpointName + printEndpointAddress() + log.info("Endpoint : " + endpointName + printEndpointAddress(messageContext) + " currently " + getStateAsString() + " will now be marked active since it processed its last message"); - setState(ST_ACTIVE); + setState(ST_ACTIVE, messageContext); } } else { if (localState != ST_ACTIVE && localState != ST_OFF) { - log.info("Endpoint : " + endpointName + printEndpointAddress() + log.info("Endpoint : " + endpointName + printEndpointAddress(messageContext) + " currently " + getStateAsString() + " will now be marked active since it processed its last message"); - setState(ST_ACTIVE); + setState(ST_ACTIVE, messageContext); } } } @@ -355,28 +364,37 @@ public void onSuccess() { * Endpoint failed processing a message */ public void onFault() { - log.warn("Endpoint : " + endpointName + printEndpointAddress() + + onFault(null); + } + + public void onFault(MessageContext messageContext) { + log.warn("Endpoint : " + endpointName + printEndpointAddress(messageContext) + " will be marked SUSPENDED as it failed"); - setState(ST_SUSPENDED); + setState(ST_SUSPENDED, messageContext); } /** * Endpoint timeout processing a message */ public void onTimeout() { + onTimeout(null); + } + + public void onTimeout(MessageContext messageContext) { if (log.isDebugEnabled()) { - log.debug("Endpoint : " + endpointName + printEndpointAddress() + " will be marked for " + + log.debug("Endpoint : " + endpointName + printEndpointAddress(messageContext) + " will be marked for " + "SUSPENSION due to the occurrence of one of the configured errors"); } - setState(ST_TIMEOUT); + setState(ST_TIMEOUT, messageContext); } /** * Compute the suspension duration according to the geometric series parameters defined */ - private void computeNextRetryTimeForSuspended() { + private void computeNextRetryTimeForSuspended(MessageContext messageContext) { boolean notYetSuspended = true; - long lastSuspendDuration = definition.getInitialSuspendDuration(); + long lastSuspendDuration = definition.getResolvedInitialSuspendDuration(messageContext); + if (isClustered) { Long lastDuration = (Long) cfgCtx.getPropertyNonReplicable(LAST_SUSPEND_DURATION_KEY); if (lastDuration != null) { @@ -389,11 +407,11 @@ private void computeNextRetryTimeForSuspended() { } long nextSuspendDuration = (notYetSuspended ? - definition.getInitialSuspendDuration() : - (long) (lastSuspendDuration * definition.getSuspendProgressionFactor())); + definition.getResolvedInitialSuspendDuration(messageContext) : + (long) (lastSuspendDuration * definition.getResolvedSuspendProgressionFactor(messageContext))); - if (nextSuspendDuration > definition.getSuspendMaximumDuration()) { - nextSuspendDuration = definition.getSuspendMaximumDuration(); + if (nextSuspendDuration > definition.getResolvedSuspendMaximumDuration(messageContext)) { + nextSuspendDuration = definition.getResolvedSuspendMaximumDuration(messageContext); } else if (nextSuspendDuration < 0) { nextSuspendDuration = SynapseConstants.DEFAULT_ENDPOINT_SUSPEND_TIME; } @@ -408,7 +426,7 @@ private void computeNextRetryTimeForSuspended() { localNextRetryTime = nextRetryTime; } - log.warn("Suspending endpoint : " + endpointName + printEndpointAddress() + + log.warn("Suspending endpoint : " + endpointName + printEndpointAddress(messageContext) + (notYetSuspended ? " -" : " - last suspend duration was : " + lastSuspendDuration + "ms and") + " current suspend duration is : " + nextSuspendDuration + "ms - " + @@ -598,11 +616,22 @@ public String toString() { } private String printEndpointAddress() { + return printEndpointAddress(null); + } + + private String printEndpointAddress(MessageContext messageContext) { if(this.definition != null && this.definition.getAddress() != null) { - return " with address " + MessageHelper.maskURLPassword(this.definition.getAddress()); - } else { - return " "; + String address = ""; + if (messageContext != null) { + address = this.definition.getAddress(messageContext); + } else { + address = this.definition.getAddress(); + } + if (address != null) { + return " with address " + MessageHelper.maskURLPassword(address); + } } + return " "; } /** diff --git a/modules/core/src/main/java/org/apache/synapse/endpoints/EndpointDefinition.java b/modules/core/src/main/java/org/apache/synapse/endpoints/EndpointDefinition.java index 3cfe55dcf4..1dad6b2eda 100755 --- a/modules/core/src/main/java/org/apache/synapse/endpoints/EndpointDefinition.java +++ b/modules/core/src/main/java/org/apache/synapse/endpoints/EndpointDefinition.java @@ -19,6 +19,7 @@ package org.apache.synapse.endpoints; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.synapse.MessageContext; @@ -28,10 +29,10 @@ import org.apache.synapse.aspects.AspectConfiguration; import org.apache.synapse.config.SynapseConfigUtils; import org.apache.synapse.config.xml.SynapsePath; +import org.apache.synapse.mediators.Value; import java.util.ArrayList; import java.util.List; -import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -131,11 +132,6 @@ public class EndpointDefinition implements AspectConfigurable { */ private String charSetEncoding; - /** - * The expression to evaluate dynamic timeout. - */ - private SynapsePath dynamicTimeout = null; - /** * Whether endpoint state replication should be disabled or not (only valid in clustered setups) */ @@ -147,7 +143,8 @@ public class EndpointDefinition implements AspectConfigurable { * not set any timeout configuration, default timeout action is set to NONE, which won't do * anything for timeouts. */ - private long timeoutDuration = 0; + private final long defaultTimeoutDuration = 0; + private Value timeoutDuration = new Value(String.valueOf(defaultTimeoutDuration)); /** * Effective timeout interval for the endpoint @@ -157,23 +154,36 @@ public class EndpointDefinition implements AspectConfigurable { /** * action to perform when a timeout occurs (NONE | DISCARD | DISCARD_AND_FAULT) * */ - private int timeoutAction = SynapseConstants.NONE; + private final int defaultTimeoutAction = SynapseConstants.NONE; + private Value timeoutAction = new Value(EPConstants.NEVER); /** The initial suspend duration when an endpoint is marked inactive */ - private long initialSuspendDuration = -1; - /** The suspend duration ratio for the next duration - this is the geometric series multipler */ - private float suspendProgressionFactor = 1; + private final long defaultInitialSuspendDuration = -1; + private Value initialSuspendDuration = new Value(String.valueOf(defaultInitialSuspendDuration)); + + /** + * The suspend duration ratio for the next duration - this is the geometric series multipler + */ + private final float defaultSuspendProgressionFactor = 1; + private Value suspendProgressionFactor = new Value(String.valueOf(defaultSuspendProgressionFactor)); + /** This is the maximum duration for which a node will be suspended */ - private long suspendMaximumDuration = Long.MAX_VALUE; + private final long defaultSuspendMaximumDuration = Long.MAX_VALUE; + private Value suspendMaximumDuration = new Value(String.valueOf(defaultSuspendMaximumDuration)); + /** A list of error codes, which directly puts an endpoint into suspend mode */ - private final List suspendErrorCodes = new ArrayList(); + private Value suspendErrorCodes = new Value(EPConstants.EMPTRY_STRING); /** No of retries to attempt on timeout, before an endpoint is makred inactive */ - private int retriesOnTimeoutBeforeSuspend = 0; + private final int defaultRetriesOnTimeOutBeforeSuspend = 0; + private Value retriesOnTimeoutBeforeSuspend = new Value(String.valueOf(defaultRetriesOnTimeOutBeforeSuspend)); + /** The delay between retries for a timeout out endpoint */ - private int retryDurationOnTimeout = 0; + private final int defaultRetryDurationOnTimeout = 0; + private Value retryDurationOnTimeout = new Value(String.valueOf(defaultRetryDurationOnTimeout)); + /** A list of error codes which puts the endpoint into timeout mode */ - private final List timeoutErrorCodes = new ArrayList(); + private Value timeoutErrorCodes = new Value(EPConstants.EMPTRY_STRING); private AspectConfiguration aspectConfiguration; @@ -202,31 +212,19 @@ public EndpointDefinition() { } } - public void setDynamicTimeoutExpression(SynapsePath expression) { - this.dynamicTimeout = expression; - } - - public SynapsePath getDynamicTimeoutExpression() { - return this.dynamicTimeout; - } - public boolean isDynamicTimeoutEndpoint() { - if (this.dynamicTimeout != null) { - return true; - } else { - return false; - } + + return timeoutDuration.getExpression() != null; } public long evaluateDynamicEndpointTimeout(MessageContext synCtx) { + long timeoutMilliSeconds; try { - String stringValue = dynamicTimeout.stringValueOf(synCtx); + String stringValue = timeoutDuration.evaluateValue(synCtx); if (stringValue != null) { timeoutMilliSeconds = Long.parseLong(stringValue.trim()); } else { - log.warn("Error while evaluating dynamic endpoint timeout expression." + - "Synapse global timeout is taken as effective timeout."); timeoutMilliSeconds = effectiveTimeout; } } catch (NumberFormatException e) { @@ -267,32 +265,19 @@ public String getAddress(MessageContext messageContext) { if (dynamicUrl != null && !dynamicUrl.isEmpty()) { addressString = dynamicUrl; } - boolean matches = false; - int s = 0; - Pattern pattern = Pattern.compile("\\$\\{.*?\\}"); - - StringBuffer computedAddress = new StringBuffer(); - - Matcher matcher = pattern.matcher(addressString); - while (matcher.find()) { - - Object property = messageContext.getProperty( - addressString.substring(matcher.start() + 2, matcher.end() - 1)); - if (property != null) { - computedAddress.append(addressString.substring(s, matcher.start())); - computedAddress.append(property.toString()); - s = matcher.end(); - matches = true; - } + Pattern oldPattern = Pattern.compile("\\$\\{(.*?)\\}"); + String oldProcessedAddress = SynapseConfigUtils.replacePatternWithProperties(addressString, oldPattern, messageContext); + if (!oldProcessedAddress.equals(addressString)) { + return oldProcessedAddress; } - if (!matches) { - return addressString; - } else { - computedAddress.append(addressString.substring(s, addressString.length())); - return computedAddress.toString(); + Pattern newPattern = Pattern.compile("\\{\\+?([^}]+)\\}"); + String newProcessedAddress = SynapseConfigUtils.replacePatternWithProperties(addressString, newPattern, messageContext); + if (!newProcessedAddress.equals(addressString)) { + return newProcessedAddress; } + return addressString; } /** @@ -539,8 +524,12 @@ public void setUseSwa(boolean useSwa) { this.useSwa = useSwa; } - public long getTimeoutDuration() { - return timeoutDuration; + public String getTimeoutDuration() { + + if (timeoutDuration.getKeyValue() != null) { + return timeoutDuration.getKeyValue(); + } + return timeoutDuration.getExpression().getExpression(); } /** @@ -560,20 +549,49 @@ public long getEffectiveTimeout() { * * @param timeoutDuration a duration in milliseconds */ - public void setTimeoutDuration(long timeoutDuration) { + public void setTimeoutDuration(Value timeoutDuration) { + this.timeoutDuration = timeoutDuration; - this.effectiveTimeout = timeoutDuration; + if (timeoutDuration.getKeyValue() != null) { + this.effectiveTimeout = Long.parseLong(timeoutDuration.getKeyValue()); + } this.endpointTimeoutType = SynapseConstants.ENDPOINT_TIMEOUT_TYPE.ENDPOINT_TIMEOUT; } - public int getTimeoutAction() { - return timeoutAction; + public String getTimeoutAction() { + + if (timeoutAction.getKeyValue() != null) { + return timeoutAction.getKeyValue(); + } + return timeoutAction.getExpression().getExpression(); } - public void setTimeoutAction(int timeoutAction) { + public void setTimeoutAction(Value timeoutAction) { + this.timeoutAction = timeoutAction; } + public boolean isTimeoutActionDynamic() { + + return timeoutAction.getExpression() != null; + } + + public int getResolvedTimeoutAction(MessageContext synCtx) { + + int result = defaultTimeoutAction; + String timeoutActionStr = (synCtx != null) ? timeoutAction.evaluateValue(synCtx) : timeoutAction.getKeyValue(); + if (StringUtils.isNotEmpty(timeoutActionStr) && !timeoutActionStr.trim().equalsIgnoreCase(EPConstants.NEVER)) { + if (EPConstants.DISCARD.equalsIgnoreCase(timeoutActionStr)) { + result = SynapseConstants.DISCARD; + } else if (EPConstants.FAULT.equalsIgnoreCase(timeoutActionStr)) { + result = SynapseConstants.DISCARD_AND_FAULT; + } else { + log.warn("Invalid timeout action, action : '" + timeoutActionStr + "' is not supported."); + } + } + return result; + } + public String getFormat() { return format; } @@ -605,8 +623,12 @@ public void setCharSetEncoding(String charSetEncoding) { * * @return suspendOnFailDuration */ - public long getInitialSuspendDuration() { - return initialSuspendDuration; + public String getInitialSuspendDuration() { + + if (initialSuspendDuration.getKeyValue() != null) { + return initialSuspendDuration.getKeyValue(); + } + return initialSuspendDuration.getExpression().getExpression(); } /** @@ -614,56 +636,192 @@ public long getInitialSuspendDuration() { * * @param initialSuspendDuration a duration in milliseconds */ - public void setInitialSuspendDuration(long initialSuspendDuration) { + public void setInitialSuspendDuration(Value initialSuspendDuration) { + this.initialSuspendDuration = initialSuspendDuration; } -// public int getTraceState() { -// return traceState; -// } -// -// public void setTraceState(int traceState) { -// this.traceState = traceState; -// } + public boolean isInitialSuspendDurationDynamic() { - public float getSuspendProgressionFactor() { - return suspendProgressionFactor; + return initialSuspendDuration.getExpression() != null; } - public void setSuspendProgressionFactor(float suspendProgressionFactor) { + public long getResolvedInitialSuspendDuration(MessageContext synCtx) { + + long result = defaultInitialSuspendDuration; + String stringValue = ""; + try { + stringValue = (synCtx != null) ? initialSuspendDuration.evaluateValue(synCtx) : initialSuspendDuration.getKeyValue(); + if (StringUtils.isNotEmpty(stringValue)) { + result = Long.parseLong(stringValue.trim()); + } + } catch (NumberFormatException e) { + log.warn("Error while evaluating initial suspend duration. The resolved value '" + + stringValue + "' should be a valid number. Hence the default value '" + + defaultInitialSuspendDuration + "' is used."); + } + return result; + } + + public String getSuspendProgressionFactor() { + + if (suspendProgressionFactor.getKeyValue() != null) { + return suspendProgressionFactor.getKeyValue(); + } + return suspendProgressionFactor.getExpression().getExpression(); + } + + public void setSuspendProgressionFactor(Value suspendProgressionFactor) { + this.suspendProgressionFactor = suspendProgressionFactor; } - public long getSuspendMaximumDuration() { - return suspendMaximumDuration; + public boolean isSuspendProgressionFactorDynamic() { + + return suspendProgressionFactor.getExpression() != null; } - public void setSuspendMaximumDuration(long suspendMaximumDuration) { + public float getResolvedSuspendProgressionFactor(MessageContext messageContext) { + + float result = defaultSuspendProgressionFactor; + String stringValue = ""; + try { + stringValue = (messageContext != null) ? suspendProgressionFactor.evaluateValue(messageContext) : suspendProgressionFactor.getKeyValue(); + if (StringUtils.isNotEmpty(stringValue)) { + result = Float.parseFloat(stringValue); + } + } catch (NumberFormatException e) { + log.warn("Error while evaluating suspend duration progression factor. The resolved value '" + + stringValue + "' should be a valid float. Hence the default value '" + + defaultSuspendProgressionFactor + "' is used."); + } + return result; + } + + public String getSuspendMaximumDuration() { + + if (suspendMaximumDuration.getKeyValue() != null) { + return suspendMaximumDuration.getKeyValue(); + } + return suspendMaximumDuration.getExpression().getExpression(); + } + + public void setSuspendMaximumDuration(Value suspendMaximumDuration) { + this.suspendMaximumDuration = suspendMaximumDuration; } - public int getRetriesOnTimeoutBeforeSuspend() { - return retriesOnTimeoutBeforeSuspend; + public boolean isSuspendMaximumDurationDynamic() { + + return suspendMaximumDuration.getExpression() != null; + } + + public long getResolvedSuspendMaximumDuration(MessageContext messageContext) { + + long result = defaultSuspendMaximumDuration; + String stringValue = ""; + try { + stringValue = (messageContext != null) ? suspendMaximumDuration.evaluateValue(messageContext) : suspendMaximumDuration.getKeyValue(); + if (StringUtils.isNotEmpty(stringValue)) { + result = Long.parseLong(stringValue); + } + } catch (NumberFormatException e) { + log.warn("Error while evaluating suspend maximum duration. The resolved value '" + + stringValue + "' should be a valid number. Hence the default value '" + + defaultSuspendMaximumDuration + "' is used."); + } + return result; + } + + public String getRetriesOnTimeoutBeforeSuspend() { + + if (retriesOnTimeoutBeforeSuspend.getKeyValue() != null) { + return retriesOnTimeoutBeforeSuspend.getKeyValue(); + } + return retriesOnTimeoutBeforeSuspend.getExpression().getExpression(); } - public void setRetriesOnTimeoutBeforeSuspend(int retriesOnTimeoutBeforeSuspend) { + public void setRetriesOnTimeoutBeforeSuspend(Value retriesOnTimeoutBeforeSuspend) { + this.retriesOnTimeoutBeforeSuspend = retriesOnTimeoutBeforeSuspend; } - public int getRetryDurationOnTimeout() { - return retryDurationOnTimeout; + public boolean isRetriesOnTimeoutBeforeSuspendDynamic() { + + return retriesOnTimeoutBeforeSuspend.getExpression() != null; } - public void setRetryDurationOnTimeout(int retryDurationOnTimeout) { + public int getResolvedRetriesOnTimeoutBeforeSuspend(MessageContext messageContext) { + + int result = defaultRetriesOnTimeOutBeforeSuspend; + String stringValue = ""; + try { + stringValue = (messageContext != null) ? retriesOnTimeoutBeforeSuspend.evaluateValue(messageContext) : retriesOnTimeoutBeforeSuspend.getKeyValue(); + if (StringUtils.isNotEmpty(stringValue)) { + result = Integer.parseInt(stringValue); + } + } catch (NumberFormatException e) { + log.warn("Error while evaluating retries before suspend [for timeouts]. " + + "The resolved value '" + stringValue + "' should be a valid number. Hence the default value '" + + defaultRetriesOnTimeOutBeforeSuspend + "' is used."); + } + return result; + } + + public String getRetryDurationOnTimeout() { + + if (retryDurationOnTimeout.getKeyValue() != null) { + return retryDurationOnTimeout.getKeyValue(); + } + return retryDurationOnTimeout.getExpression().getExpression(); + } + + public void setRetryDurationOnTimeout(Value retryDurationOnTimeout) { + this.retryDurationOnTimeout = retryDurationOnTimeout; } - public List getSuspendErrorCodes() { - return suspendErrorCodes; + public boolean isRetryDurationOnTimeoutDynamic() { + + return retryDurationOnTimeout.getExpression() != null; + } + + public int getResolvedRetryDurationOnTimeout(MessageContext messageContext) { + + int result = defaultRetryDurationOnTimeout; + String stringValue = ""; + try { + stringValue = (messageContext != null) ? retryDurationOnTimeout.evaluateValue(messageContext) : retryDurationOnTimeout.getKeyValue(); + if (StringUtils.isNotEmpty(stringValue)) { + result = Integer.parseInt(stringValue); + } + } catch (NumberFormatException e) { + log.warn("Error while evaluating retry delay for timeouts. The resolved value '" + + stringValue + "' should be a valid number. Hence the default value '" + + defaultRetryDurationOnTimeout + "' is used."); + } + return result; + } + + public String getSuspendErrorCodes() { + + if (suspendErrorCodes.getKeyValue() != null) { + return suspendErrorCodes.getKeyValue(); + } + return suspendErrorCodes.getExpression().getExpression(); } - public List getTimeoutErrorCodes() { - return timeoutErrorCodes; + public boolean isSuspendErrorCodesDynamic() { + + return suspendErrorCodes.getExpression() != null; + } + + public String getTimeoutErrorCodes() { + + if (timeoutErrorCodes.getKeyValue() != null) { + return timeoutErrorCodes.getKeyValue(); + } + return timeoutErrorCodes.getExpression().getExpression(); } public List getRetryDisabledErrorCodes() { @@ -682,12 +840,73 @@ public void setReplicationDisabled(boolean replicationDisabled) { this.replicationDisabled = replicationDisabled; } + public void setSuspendErrorCodes(Value suspendErrorCodes) { + + this.suspendErrorCodes = suspendErrorCodes; + } + public void addSuspendErrorCode(int code) { - suspendErrorCodes.add(code); + + if (suspendErrorCodes.getKeyValue() != null) { + suspendErrorCodes = new Value(suspendErrorCodes.getKeyValue() + "," + code); + } + } + + public List getResolvedSuspendErrorCodes(MessageContext messageContext) { + + List result = new ArrayList<>(); + String stringValue = ""; + try { + stringValue = (messageContext != null) ? suspendErrorCodes.evaluateValue(messageContext) : suspendErrorCodes.getKeyValue(); + if (StringUtils.isNotEmpty(stringValue)) { + String[] errorCodes = stringValue.split(","); + result = new ArrayList(); + for (String errorCode : errorCodes) { + result.add(Integer.parseInt(errorCode.trim())); + } + } + } catch (NumberFormatException e) { + log.warn("Error while evaluating suspend error codes. The resolved value '" + + stringValue + "' should be valid numbers separated by commas."); + } + return result; + } + + public void setTimeoutErrorCodes(Value timeoutErrorCodes) { + + this.timeoutErrorCodes = timeoutErrorCodes; } public void addTimeoutErrorCode(int code) { - timeoutErrorCodes.add(code); + + if (timeoutErrorCodes.getKeyValue() != null) { + timeoutErrorCodes = new Value(timeoutErrorCodes.getKeyValue() + "," + code); + } + } + + public boolean isTimeoutErrorCodesDynamic() { + + return timeoutErrorCodes.getExpression() != null; + } + + public List getResolvedTimeoutErrorCodes(MessageContext messageContext) { + + List result = new ArrayList<>(); + String stringValue = ""; + try { + stringValue = (messageContext != null) ? timeoutErrorCodes.evaluateValue(messageContext) : timeoutErrorCodes.getKeyValue(); + if (StringUtils.isNotEmpty(stringValue)) { + String[] errorCodes = stringValue.split(","); + result = new ArrayList(); + for (String errorCode : errorCodes) { + result.add(Integer.parseInt(errorCode.trim())); + } + } + } catch (NumberFormatException e) { + log.warn("Error while evaluating timeout error codes. The resolved value '" + + stringValue + "' should be valid numbers separated by commas."); + } + return result; } public void addRetryDisabledErrorCode(int code) { diff --git a/modules/core/src/main/java/org/apache/synapse/endpoints/HTTPEndpoint.java b/modules/core/src/main/java/org/apache/synapse/endpoints/HTTPEndpoint.java index ab5387f98b..ff3e1d8b5c 100644 --- a/modules/core/src/main/java/org/apache/synapse/endpoints/HTTPEndpoint.java +++ b/modules/core/src/main/java/org/apache/synapse/endpoints/HTTPEndpoint.java @@ -70,9 +70,9 @@ public void onFault(MessageContext synCtx) { } else { // is this really a fault or a timeout/connection close etc? if (isTimeout(synCtx)) { - getContext().onTimeout(); + getContext().onTimeout(synCtx); } else if (isSuspendFault(synCtx)) { - getContext().onFault(); + getContext().onFault(synCtx); } } @@ -81,9 +81,9 @@ public void onFault(MessageContext synCtx) { super.onFault(synCtx); } - public void onSuccess() { + public void onSuccess(MessageContext synCtx) { if (getContext() != null) { - getContext().onSuccess(); + getContext().onSuccess(synCtx); } } diff --git a/modules/core/src/main/java/org/apache/synapse/message/senders/blocking/BlockingMsgSender.java b/modules/core/src/main/java/org/apache/synapse/message/senders/blocking/BlockingMsgSender.java index 82e5df42d0..a098a3708d 100644 --- a/modules/core/src/main/java/org/apache/synapse/message/senders/blocking/BlockingMsgSender.java +++ b/modules/core/src/main/java/org/apache/synapse/message/senders/blocking/BlockingMsgSender.java @@ -466,7 +466,7 @@ public void send(EndpointDefinition endpointDefinition, MessageContext synapseIn if (faultStack.peek() instanceof AbstractEndpoint) { successfulEndpoint = (AbstractEndpoint) faultStack.pop(); - successfulEndpoint.onSuccess(); + successfulEndpoint.onSuccess(synapseInMsgCtx); } if (successfulEndpoint instanceof OAuthConfiguredHTTPEndpoint) { diff --git a/modules/core/src/test/java/org/apache/synapse/config/xml/endpoints/HTTPEndpointSerializerTest.java b/modules/core/src/test/java/org/apache/synapse/config/xml/endpoints/HTTPEndpointSerializerTest.java index 5747924368..08fbc0c266 100644 --- a/modules/core/src/test/java/org/apache/synapse/config/xml/endpoints/HTTPEndpointSerializerTest.java +++ b/modules/core/src/test/java/org/apache/synapse/config/xml/endpoints/HTTPEndpointSerializerTest.java @@ -33,4 +33,78 @@ public void test() throws Exception { OMElement serializedResponse = HTTPEndpointSerializer.getElementFromEndpoint(endpoint); assertTrue("Endpoint not serialized!", compare(serializedResponse, inputElement)); } + + public void testCreateEndpointDefinitionWithErrorHandling() throws Exception { + + String inputXml = "" + + "" + + "" + + "1000" + + "discard" + + "" + + "" + + "403" + + "1" + + "" + + "" + + ""; + OMElement inputElement = AXIOMUtil.stringToOM(inputXml); + HTTPEndpoint endpoint = (HTTPEndpoint) HTTPEndpointFactory.getEndpointFromElement(inputElement, true, null); + OMElement serializedResponse = HTTPEndpointSerializer.getElementFromEndpoint(endpoint); + assertTrue("Endpoint definition with error handling not serialized!", compare(serializedResponse, inputElement)); + } + + public void testCreateEndpointDefinitionWithCompleteErrorHandling() throws Exception { + + String inputXml = "" + + "" + + "" + + "200" + + "discard" + + "" + + "" + + "403" + + "10" + + "10" + + "2" + + "" + + "" + + "304"+ + "10" + + "5" + + "" + + "" + + ""; + OMElement inputElement = AXIOMUtil.stringToOM(inputXml); + HTTPEndpoint endpoint = (HTTPEndpoint) HTTPEndpointFactory.getEndpointFromElement(inputElement, true, null); + OMElement serializedResponse = HTTPEndpointSerializer.getElementFromEndpoint(endpoint); + assertTrue("Endpoint definition with complete error handling not serialized!", compare(serializedResponse, inputElement)); + } + + public void testCreateEndpointDefinitionWithDynamicErrorHandling() throws Exception { + + String inputXml = "" + + "" + + "" + + "{get-property('timeoutDuration')}" + + "{get-property('timeoutAction')}" + + "" + + "" + + "{get-property('suspendErrorCodes')}" + + "{get-property('suspendInitialDuration')}" + + "{get-property('suspendMaximumDuration')}" + + "{get-property('suspendProgressionFactor')}" + + "" + + "" + + "{get-property('retryErrorCodes')}"+ + "{get-property('retryCount')}" + + "{get-property('retryDelay')}" + + "" + + "" + + ""; + OMElement inputElement = AXIOMUtil.stringToOM(inputXml); + HTTPEndpoint endpoint = (HTTPEndpoint) HTTPEndpointFactory.getEndpointFromElement(inputElement, true, null); + OMElement serializedResponse = HTTPEndpointSerializer.getElementFromEndpoint(endpoint); + assertTrue("Endpoint definition with dynamic error handling not serialized!", compare(serializedResponse, inputElement)); + } } \ No newline at end of file diff --git a/modules/core/src/test/java/org/apache/synapse/endpoints/HttpEndpointTest.java b/modules/core/src/test/java/org/apache/synapse/endpoints/HttpEndpointTest.java index edd33d9bed..8bbf2ce205 100644 --- a/modules/core/src/test/java/org/apache/synapse/endpoints/HttpEndpointTest.java +++ b/modules/core/src/test/java/org/apache/synapse/endpoints/HttpEndpointTest.java @@ -37,11 +37,15 @@ import org.apache.synapse.core.SynapseEnvironment; import org.apache.synapse.core.axis2.Axis2MessageContext; import org.apache.synapse.core.axis2.Axis2SynapseEnvironment; +import org.json.HTTP; import org.junit.Assert; import org.junit.Test; import org.mockito.Mockito; import org.powermock.api.mockito.PowerMockito; +import java.util.ArrayList; +import java.util.List; + import javax.xml.stream.XMLStreamException; /** @@ -153,6 +157,136 @@ public void testQueryParamsWithLegacyEncoding() throws AxisFault, XMLStreamExcep } + @Test + public void testSetDynamicTimeoutAction() throws XMLStreamException, AxisFault { + + HTTPEndpointFactory httpEndpointFactory = new HTTPEndpointFactory(); + OMElement omElement = AXIOMUtil.stringToOM( + "" + + "" + + "" + + "12" + + "{$ctx:timeoutAction}" + + "" + + ""); + EndpointDefinition ep = httpEndpointFactory.createEndpointDefinition(omElement); + HTTPEndpoint httpEndpoint = new HTTPEndpoint(); + httpEndpoint.setDefinition(ep); + MessageContext messageContext = createMessageContext(); + messageContext.setProperty("timeoutAction", "discard"); + Assert.assertEquals( + httpEndpoint.getDefinition().getResolvedTimeoutAction(messageContext), SynapseConstants.DISCARD); + } + + @Test + public void testSetSuspendOnFailureInitialDuration() throws XMLStreamException, AxisFault { + + HTTPEndpointFactory httpEndpointFactory = new HTTPEndpointFactory(); + OMElement omElement = AXIOMUtil.stringToOM( + "" + + "" + + "10" + + "" + + "" + + "1000" + + "1" + + "" + + "" + + "0" + + "" + + ""); + EndpointDefinition ep1 = httpEndpointFactory.createEndpointDefinition(omElement); + Assert.assertEquals(ep1.getRetriesOnTimeoutBeforeSuspend(), "0"); + Assert.assertEquals(ep1.getInitialSuspendDuration(), "1000"); + } + + @Test + public void testDefaultErrorHandlingValues() throws XMLStreamException, AxisFault { + + HTTPEndpointFactory httpEndpointFactory = new HTTPEndpointFactory(); + OMElement omElement = AXIOMUtil.stringToOM( + ""); + EndpointDefinition ep = httpEndpointFactory.createEndpointDefinition(omElement); + HTTPEndpoint httpEndpoint = new HTTPEndpoint(); + httpEndpoint.setDefinition(ep); + MessageContext messageContext = createMessageContext(); + Assert.assertEquals("Default timeout action should be 100", 100, + httpEndpoint.getDefinition().getResolvedTimeoutAction(messageContext)); + Assert.assertEquals("Default initial suspend duration should be -1", -1, + httpEndpoint.getDefinition().getResolvedInitialSuspendDuration(messageContext)); + Assert.assertEquals("Default maximum suspend duration should be 0", Long.MAX_VALUE, + httpEndpoint.getDefinition().getResolvedSuspendMaximumDuration(messageContext)); + Assert.assertEquals("Default suspend progression factor should be 1", 1f, + httpEndpoint.getDefinition().getResolvedSuspendProgressionFactor(messageContext), 0.0f); + Assert.assertEquals("Default suspend error codes should be empty", new ArrayList<>(), + httpEndpoint.getDefinition().getResolvedSuspendErrorCodes(messageContext)); + Assert.assertEquals("Default timeout error codes should be empty", new ArrayList<>(), + httpEndpoint.getDefinition().getResolvedTimeoutErrorCodes(messageContext)); + Assert.assertEquals("Default retries on timeout before suspend should be 0", 0, + httpEndpoint.getDefinition().getResolvedRetriesOnTimeoutBeforeSuspend(messageContext)); + Assert.assertEquals("Default retry duration on timeout should be 0", 0, + httpEndpoint.getDefinition().getResolvedRetryDurationOnTimeout(messageContext)); + } + + @Test + public void testSetSuspendErrorCodes() throws XMLStreamException, AxisFault { + + HTTPEndpointFactory httpEndpointFactory = new HTTPEndpointFactory(); + OMElement omElement = AXIOMUtil.stringToOM( + "\n" + + " \n" + + " {$ctx:suspendErrorCodes}\n" + + " -1\n" + + " 1\n" + + " \n" + + " \n" + + " 0\n" + + " \n" + + " "); + EndpointDefinition ep = httpEndpointFactory.createEndpointDefinition(omElement); + HTTPEndpoint httpEndpoint = new HTTPEndpoint(); + httpEndpoint.setDefinition(ep); + MessageContext messageContext = createMessageContext(); + messageContext.setProperty("suspendErrorCodes", "101503,101504"); + + List actualSuspendErrorCodes = new ArrayList<>(); + actualSuspendErrorCodes.add(101503); + actualSuspendErrorCodes.add(101504); + List expectedSuspendErrorCodes = + httpEndpoint.getDefinition().getResolvedSuspendErrorCodes(messageContext); + Assert.assertTrue(expectedSuspendErrorCodes.equals(actualSuspendErrorCodes)); + } + + @Test + public void testSetTimeoutErrorCodes() throws XMLStreamException, AxisFault { + + HTTPEndpointFactory httpEndpointFactory = new HTTPEndpointFactory(); + OMElement omElement = AXIOMUtil.stringToOM( + "\n" + + " \n" + + " {$ctx:timeoutErrorCodes}\n" + + " 10\n" + + " \n" + + " "); + EndpointDefinition ep = httpEndpointFactory.createEndpointDefinition(omElement); + HTTPEndpoint httpEndpoint = new HTTPEndpoint(); + httpEndpoint.setDefinition(ep); + MessageContext messageContext = createMessageContext(); + messageContext.setProperty("timeoutErrorCodes", "101503,101504"); + + List actualTimeoutErrorCodes = new ArrayList<>(); + actualTimeoutErrorCodes.add(101503); + actualTimeoutErrorCodes.add(101504); + List expectedTimeoutErrorCodes = + httpEndpoint.getDefinition().getResolvedTimeoutErrorCodes(messageContext); + Assert.assertTrue(expectedTimeoutErrorCodes.equals(actualTimeoutErrorCodes)); + } + /** * Create a mock SynapseEnvironment object * diff --git a/modules/core/src/test/java/org/apache/synapse/endpoints/dynamic/DynamicEndpointTest.java b/modules/core/src/test/java/org/apache/synapse/endpoints/dynamic/DynamicEndpointTest.java index cbd641290c..0010f054a5 100644 --- a/modules/core/src/test/java/org/apache/synapse/endpoints/dynamic/DynamicEndpointTest.java +++ b/modules/core/src/test/java/org/apache/synapse/endpoints/dynamic/DynamicEndpointTest.java @@ -17,25 +17,269 @@ package org.apache.synapse.endpoints.dynamic; import junit.framework.TestCase; +import org.apache.axiom.om.OMElement; +import org.apache.axiom.om.util.AXIOMUtil; import org.apache.synapse.MessageContext; +import org.apache.synapse.SynapseConstants; import org.apache.synapse.TestMessageContext; +import org.apache.synapse.config.xml.ValueFactory; import org.apache.synapse.endpoints.AbstractEndpoint; import org.apache.synapse.endpoints.AddressEndpoint; import org.apache.synapse.endpoints.EndpointDefinition; -import org.apache.synapse.util.xpath.SynapseXPath; + +import java.util.ArrayList; +import java.util.List; public class DynamicEndpointTest extends TestCase { public void testContextProperties() throws Exception { - SynapseXPath xpath = new SynapseXPath("$ctx:timeout"); + + OMElement omElement = AXIOMUtil.stringToOM("{$ctx:timeout}"); AbstractEndpoint endpoint = new AddressEndpoint(); EndpointDefinition definition = new EndpointDefinition(); endpoint.setDefinition(definition); - definition.setDynamicTimeoutExpression(xpath); + definition.setTimeoutDuration(new ValueFactory().createTextValue(omElement)); MessageContext synCtx = new TestMessageContext(); synCtx.setProperty("timeout", "90000"); - assertEquals(Long.valueOf((String) xpath.evaluate(synCtx)).longValue(), + assertEquals(90000, endpoint.getDefinition().evaluateDynamicEndpointTimeout(synCtx)); } + public void testContextPropertiesForInitialSuspendDuration() throws Exception { + + OMElement omElement = AXIOMUtil.stringToOM("{$ctx:suspendInitialDuration}"); + AbstractEndpoint endpoint = new AddressEndpoint(); + EndpointDefinition definition = new EndpointDefinition(); + endpoint.setDefinition(definition); + definition.setInitialSuspendDuration(new ValueFactory().createTextValue(omElement)); + MessageContext synCtx = new TestMessageContext(); + synCtx.setProperty("suspendInitialDuration", "90000"); + assertEquals(endpoint.getDefinition().getResolvedInitialSuspendDuration(synCtx), 90000); + } + + public void testContextPropertiesForNoInitialSuspendDuration() throws Exception { + + OMElement omElement = AXIOMUtil.stringToOM("{$ctx:suspendInitialDuration}"); + AbstractEndpoint endpoint = new AddressEndpoint(); + EndpointDefinition definition = new EndpointDefinition(); + endpoint.setDefinition(definition); + definition.setInitialSuspendDuration(new ValueFactory().createTextValue(omElement)); + MessageContext synCtx = new TestMessageContext(); + assertEquals(-1, endpoint.getDefinition().getResolvedInitialSuspendDuration(synCtx)); + } + + public void testContextPropertiesForSuspendMaximumDuration() throws Exception { + + OMElement omElement = AXIOMUtil.stringToOM("{$ctx:suspendMaximumDuration}"); + AbstractEndpoint endpoint = new AddressEndpoint(); + EndpointDefinition definition = new EndpointDefinition(); + endpoint.setDefinition(definition); + definition.setSuspendMaximumDuration(new ValueFactory().createTextValue(omElement)); + MessageContext synCtx = new TestMessageContext(); + synCtx.setProperty("suspendMaximumDuration", "90000"); + assertEquals(endpoint.getDefinition().getResolvedSuspendMaximumDuration(synCtx), 90000); + } + + public void testContextPropertiesForNoSuspendMaximumDuration() throws Exception { + + OMElement omElement = AXIOMUtil.stringToOM("{$ctx:suspendMaximumDuration}"); + AbstractEndpoint endpoint = new AddressEndpoint(); + EndpointDefinition definition = new EndpointDefinition(); + endpoint.setDefinition(definition); + definition.setSuspendMaximumDuration(new ValueFactory().createTextValue(omElement)); + MessageContext synCtx = new TestMessageContext(); + assertEquals(Long.MAX_VALUE, endpoint.getDefinition().getResolvedSuspendMaximumDuration(synCtx)); + } + + public void testContextPropertiesForSuspendProgressionFactor() throws Exception { + + OMElement omElement = AXIOMUtil.stringToOM("{$ctx:suspendProgressionFactor}"); + AbstractEndpoint endpoint = new AddressEndpoint(); + EndpointDefinition definition = new EndpointDefinition(); + endpoint.setDefinition(definition); + definition.setSuspendProgressionFactor(new ValueFactory().createTextValue(omElement)); + MessageContext synCtx = new TestMessageContext(); + synCtx.setProperty("suspendProgressionFactor", "2"); + assertEquals(endpoint.getDefinition().getResolvedSuspendProgressionFactor(synCtx), 2.0f); + } + + public void testContextPropertiesForNoSuspendProgressionFactor() throws Exception { + + OMElement omElement = AXIOMUtil.stringToOM("{$ctx:suspendProgressionFactor}"); + AbstractEndpoint endpoint = new AddressEndpoint(); + EndpointDefinition definition = new EndpointDefinition(); + endpoint.setDefinition(definition); + definition.setSuspendProgressionFactor(new ValueFactory().createTextValue(omElement)); + MessageContext synCtx = new TestMessageContext(); + assertEquals(1.0f, endpoint.getDefinition().getResolvedSuspendProgressionFactor(synCtx)); + } + + public void testContextPropertiesForRetriesOnTimeoutBeforeSuspend() throws Exception { + + OMElement omElement = AXIOMUtil.stringToOM("{$ctx:retriesOnTimeoutBeforeSuspend}"); + AbstractEndpoint endpoint = new AddressEndpoint(); + EndpointDefinition definition = new EndpointDefinition(); + endpoint.setDefinition(definition); + definition.setRetriesOnTimeoutBeforeSuspend(new ValueFactory().createTextValue(omElement)); + MessageContext synCtx = new TestMessageContext(); + synCtx.setProperty("retriesOnTimeoutBeforeSuspend", "3"); + assertEquals(endpoint.getDefinition().getResolvedRetriesOnTimeoutBeforeSuspend(synCtx), 3); + } + + public void testContextPropertiesForNoRetriesOnTimeoutBeforeSuspend() throws Exception { + + OMElement omElement = AXIOMUtil.stringToOM("{$ctx:retriesOnTimeoutBeforeSuspend}"); + AbstractEndpoint endpoint = new AddressEndpoint(); + EndpointDefinition definition = new EndpointDefinition(); + endpoint.setDefinition(definition); + definition.setRetriesOnTimeoutBeforeSuspend(new ValueFactory().createTextValue(omElement)); + MessageContext synCtx = new TestMessageContext(); + assertEquals(0, endpoint.getDefinition().getResolvedRetriesOnTimeoutBeforeSuspend(synCtx)); + } + + public void testContextPropertiesForRetryDurationOnTimeout() throws Exception { + + OMElement omElement = AXIOMUtil.stringToOM("{$ctx:retryDurationOnTimeout}"); + AbstractEndpoint endpoint = new AddressEndpoint(); + EndpointDefinition definition = new EndpointDefinition(); + endpoint.setDefinition(definition); + definition.setRetryDurationOnTimeout(new ValueFactory().createTextValue(omElement)); + MessageContext synCtx = new TestMessageContext(); + synCtx.setProperty("retryDurationOnTimeout", "90000"); + assertEquals(endpoint.getDefinition().getResolvedRetryDurationOnTimeout(synCtx), 90000); + } + + public void testContextPropertiesForNoRetryDurationOnTimeout() throws Exception { + + OMElement omElement = AXIOMUtil.stringToOM("{$ctx:retryDurationOnTimeout}"); + AbstractEndpoint endpoint = new AddressEndpoint(); + EndpointDefinition definition = new EndpointDefinition(); + endpoint.setDefinition(definition); + definition.setRetryDurationOnTimeout(new ValueFactory().createTextValue(omElement)); + MessageContext synCtx = new TestMessageContext(); + assertEquals(0, endpoint.getDefinition().getResolvedRetryDurationOnTimeout(synCtx)); + } + + public void testContextPropertiesForTimeoutActionFault() throws Exception { + + OMElement omElement = AXIOMUtil.stringToOM("{$ctx:timeoutAction}"); + AbstractEndpoint endpoint = new AddressEndpoint(); + EndpointDefinition definition = new EndpointDefinition(); + endpoint.setDefinition(definition); + definition.setTimeoutAction(new ValueFactory().createTextValue(omElement)); + MessageContext synCtx = new TestMessageContext(); + synCtx.setProperty("timeoutAction", "fault"); + assertEquals(endpoint.getDefinition().getResolvedTimeoutAction(synCtx), SynapseConstants.DISCARD_AND_FAULT); + } + + public void testContextPropertiesForTimeoutActionDiscard() throws Exception { + + OMElement omElement = AXIOMUtil.stringToOM("{get-property('timeoutAction')}"); + AbstractEndpoint endpoint = new AddressEndpoint(); + EndpointDefinition definition = new EndpointDefinition(); + endpoint.setDefinition(definition); + definition.setTimeoutAction(new ValueFactory().createTextValue(omElement)); + MessageContext synCtx = new TestMessageContext(); + synCtx.setProperty("timeoutAction", "discard"); + assertEquals(endpoint.getDefinition().getResolvedTimeoutAction(synCtx), SynapseConstants.DISCARD); + } + + public void testContextPropertiesForNoTimeoutAction() throws Exception { + + OMElement omElement = AXIOMUtil.stringToOM("{$ctx:timeoutAction}"); + AbstractEndpoint endpoint = new AddressEndpoint(); + EndpointDefinition definition = new EndpointDefinition(); + endpoint.setDefinition(definition); + definition.setTimeoutAction(new ValueFactory().createTextValue(omElement)); + MessageContext synCtx = new TestMessageContext(); + assertEquals(endpoint.getDefinition().getResolvedTimeoutAction(synCtx), SynapseConstants.NONE); + } + + public void testContextPropertiesForSuspendErrorCodes() throws Exception { + + OMElement omElement = AXIOMUtil.stringToOM("{$ctx:suspendErrorCodes}"); + AbstractEndpoint endpoint = new AddressEndpoint(); + EndpointDefinition definition = new EndpointDefinition(); + endpoint.setDefinition(definition); + definition.setSuspendErrorCodes(new ValueFactory().createTextValue(omElement)); + MessageContext synCtx = new TestMessageContext(); + synCtx.setProperty("suspendErrorCodes", "101503,101504"); + + List actualSuspendErrorCodes = new ArrayList<>(); + actualSuspendErrorCodes.add(101503); + actualSuspendErrorCodes.add(101504); + List expectedSuspendErrorCodes = endpoint.getDefinition().getResolvedSuspendErrorCodes(synCtx); + assertTrue(expectedSuspendErrorCodes.equals(actualSuspendErrorCodes)); + } + + public void testContextPropertiesForEmptySuspendErrorCodes() throws Exception { + + OMElement omElement = AXIOMUtil.stringToOM("{$ctx:suspendErrorCodes}"); + AbstractEndpoint endpoint = new AddressEndpoint(); + EndpointDefinition definition = new EndpointDefinition(); + endpoint.setDefinition(definition); + definition.setSuspendErrorCodes(new ValueFactory().createTextValue(omElement)); + MessageContext synCtx = new TestMessageContext(); + synCtx.setProperty("suspendErrorCodes", ""); + + List actualSuspendErrorCodes = new ArrayList<>(); + List expectedSuspendErrorCodes = endpoint.getDefinition().getResolvedSuspendErrorCodes(synCtx); + assertTrue(expectedSuspendErrorCodes.equals(actualSuspendErrorCodes)); + } + + public void testContextPropertiesForNoSuspendErrorCodes() throws Exception { + + OMElement omElement = AXIOMUtil.stringToOM("{$ctx:suspendErrorCodes}"); + AbstractEndpoint endpoint = new AddressEndpoint(); + EndpointDefinition definition = new EndpointDefinition(); + endpoint.setDefinition(definition); + definition.setSuspendErrorCodes(new ValueFactory().createTextValue(omElement)); + MessageContext synCtx = new TestMessageContext(); + List expectedSuspendErrorCodes = endpoint.getDefinition().getResolvedSuspendErrorCodes(synCtx); + assertTrue(expectedSuspendErrorCodes.isEmpty()); + } + + public void testContextPropertiesForTimeoutErrorCodes() throws Exception { + + OMElement omElement = AXIOMUtil.stringToOM("{$ctx:timeoutErrorCodes}"); + AbstractEndpoint endpoint = new AddressEndpoint(); + EndpointDefinition definition = new EndpointDefinition(); + endpoint.setDefinition(definition); + definition.setTimeoutErrorCodes(new ValueFactory().createTextValue(omElement)); + MessageContext synCtx = new TestMessageContext(); + synCtx.setProperty("timeoutErrorCodes", "101503,101504"); + + List actualTimeoutErrorCodes = new ArrayList<>(); + actualTimeoutErrorCodes.add(101503); + actualTimeoutErrorCodes.add(101504); + List expectedTimeoutErrorCodes = endpoint.getDefinition().getResolvedTimeoutErrorCodes(synCtx); + assertTrue(expectedTimeoutErrorCodes.equals(actualTimeoutErrorCodes)); + } + + public void testContextPropertiesForEmptyTimeoutErrorCodes() throws Exception { + + OMElement omElement = AXIOMUtil.stringToOM("{$ctx:timeoutErrorCodes}"); + AbstractEndpoint endpoint = new AddressEndpoint(); + EndpointDefinition definition = new EndpointDefinition(); + endpoint.setDefinition(definition); + definition.setTimeoutErrorCodes(new ValueFactory().createTextValue(omElement)); + MessageContext synCtx = new TestMessageContext(); + synCtx.setProperty("timeoutErrorCodes", ""); + + List actualTimeoutErrorCodes = new ArrayList<>(); + List expectedTimeoutErrorCodes = endpoint.getDefinition().getResolvedTimeoutErrorCodes(synCtx); + assertTrue(expectedTimeoutErrorCodes.equals(actualTimeoutErrorCodes)); + } + + public void testContextPropertiesForNoTimeoutErrorCodes() throws Exception { + + OMElement omElement = AXIOMUtil.stringToOM("{$ctx:timeoutErrorCodes}"); + AbstractEndpoint endpoint = new AddressEndpoint(); + EndpointDefinition definition = new EndpointDefinition(); + endpoint.setDefinition(definition); + definition.setTimeoutErrorCodes(new ValueFactory().createTextValue(omElement)); + MessageContext synCtx = new TestMessageContext(); + List expectedTimeoutErrorCodes = endpoint.getDefinition().getResolvedTimeoutErrorCodes(synCtx); + assertTrue(expectedTimeoutErrorCodes.isEmpty()); + } }