From 2492a4e39a45df10176d3651d78cdf1758386e91 Mon Sep 17 00:00:00 2001 From: chathurangaj Date: Mon, 9 Dec 2024 23:44:43 +0530 Subject: [PATCH] update print endpoint address --- .../synapse/config/SynapseConfigUtils.java | 21 ++++++++++- .../synapse/endpoints/EndpointContext.java | 35 ++++++++++++------- .../synapse/endpoints/EndpointDefinition.java | 5 +++ 3 files changed, 48 insertions(+), 13 deletions(-) diff --git a/modules/core/src/main/java/org/apache/synapse/config/SynapseConfigUtils.java b/modules/core/src/main/java/org/apache/synapse/config/SynapseConfigUtils.java index f1c9aaa59e..0701b29f77 100644 --- a/modules/core/src/main/java/org/apache/synapse/config/SynapseConfigUtils.java +++ b/modules/core/src/main/java/org/apache/synapse/config/SynapseConfigUtils.java @@ -32,6 +32,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.synapse.MessageContext; import org.apache.synapse.SynapseConstants; import org.apache.synapse.SynapseException; import org.apache.synapse.aspects.AspectConfiguration; @@ -86,7 +87,8 @@ import java.util.List; import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; - +import java.util.regex.Matcher; +import java.util.regex.Pattern; @SuppressWarnings({"UnusedDeclaration"}) public class SynapseConfigUtils { @@ -895,6 +897,23 @@ public static boolean isFailSafeEnabled(String componentName) { return false; } + public static String processString(String input, Pattern pattern, MessageContext messageContext) { + Matcher matcher = pattern.matcher(input); + StringBuffer result = new StringBuffer(); + + int s = 0; + while (matcher.find()) { + String propertyName = matcher.group(1); + Object propertyValue = messageContext.getProperty(propertyName); + if (propertyValue != null) { + result.append(input.substring(s, matcher.start())); + result.append(propertyValue.toString()); + s = matcher.end(); + } + } + result.append(input.substring(s)); + return result.toString(); + } public static SynapseConfiguration getSynapseConfiguration(String tenantDomain){ return lastRegisteredSynapseConfigurationMap.get(tenantDomain); diff --git a/modules/core/src/main/java/org/apache/synapse/endpoints/EndpointContext.java b/modules/core/src/main/java/org/apache/synapse/endpoints/EndpointContext.java index a274d85628..b2bcb8a1ea 100644 --- a/modules/core/src/main/java/org/apache/synapse/endpoints/EndpointContext.java +++ b/modules/core/src/main/java/org/apache/synapse/endpoints/EndpointContext.java @@ -230,7 +230,7 @@ private void setState(int state, MessageContext messageContext) { } if (retries <= 0) { - log.info("Endpoint : " + endpointName + printEndpointAddress() + + log.info("Endpoint : " + endpointName + printEndpointAddress(messageContext) + " has been marked for SUSPENSION," + " but no further retries remain. Thus it will be SUSPENDED."); @@ -243,7 +243,7 @@ private void setState(int state, MessageContext messageContext) { + definition.getResolvedRetryDurationOnTimeout(messageContext); Replicator.setAndReplicateState(NEXT_RETRY_TIME_KEY, nextRetry, cfgCtx); - log.warn("Endpoint : " + endpointName + printEndpointAddress() + + log.warn("Endpoint : " + endpointName + printEndpointAddress(messageContext) + " is marked as TIMEOUT and " + "will be retried : " + (retries - 1) + " more time/s after : " + new Date(nextRetry) + " until its marked SUSPENDED for failure"); @@ -295,7 +295,7 @@ private void setState(int state, MessageContext messageContext) { } if (retries <= 0) { - log.info("Endpoint : " + endpointName + printEndpointAddress() + log.info("Endpoint : " + endpointName + printEndpointAddress(messageContext) + " has been marked for SUSPENSION, " + "but no further retries remain. Thus it will be SUSPENDED."); @@ -305,7 +305,7 @@ private void setState(int state, MessageContext messageContext) { localRemainingRetries = retries - 1; localNextRetryTime = System.currentTimeMillis() + definition.getResolvedRetryDurationOnTimeout(messageContext); - log.warn("Endpoint : " + endpointName + printEndpointAddress() + log.warn("Endpoint : " + endpointName + printEndpointAddress(messageContext) + " is marked as TIMEOUT and " + "will be retried : " + localRemainingRetries + " more time/s " + "after : " + new Date(localNextRetryTime) @@ -345,14 +345,14 @@ public void onSuccess(MessageContext messageContext) { Integer state = (Integer) cfgCtx.getPropertyNonReplicable(STATE_KEY); if ((state != null) && ((state != ST_ACTIVE) && (state != ST_OFF))) { - log.info("Endpoint : " + endpointName + printEndpointAddress() + log.info("Endpoint : " + endpointName + printEndpointAddress(messageContext) + " currently " + getStateAsString() + " will now be marked active since it processed its last message"); setState(ST_ACTIVE, messageContext); } } else { if (localState != ST_ACTIVE && localState != ST_OFF) { - log.info("Endpoint : " + endpointName + printEndpointAddress() + log.info("Endpoint : " + endpointName + printEndpointAddress(messageContext) + " currently " + getStateAsString() + " will now be marked active since it processed its last message"); setState(ST_ACTIVE, messageContext); @@ -368,7 +368,7 @@ public void onFault() { } public void onFault(MessageContext messageContext) { - log.warn("Endpoint : " + endpointName + printEndpointAddress() + + log.warn("Endpoint : " + endpointName + printEndpointAddress(messageContext) + " will be marked SUSPENDED as it failed"); setState(ST_SUSPENDED, messageContext); } @@ -382,7 +382,7 @@ public void onTimeout() { public void onTimeout(MessageContext messageContext) { if (log.isDebugEnabled()) { - log.debug("Endpoint : " + endpointName + printEndpointAddress() + " will be marked for " + + log.debug("Endpoint : " + endpointName + printEndpointAddress(messageContext) + " will be marked for " + "SUSPENSION due to the occurrence of one of the configured errors"); } setState(ST_TIMEOUT, messageContext); @@ -426,7 +426,7 @@ private void computeNextRetryTimeForSuspended(MessageContext messageContext) { localNextRetryTime = nextRetryTime; } - log.warn("Suspending endpoint : " + endpointName + printEndpointAddress() + + log.warn("Suspending endpoint : " + endpointName + printEndpointAddress(messageContext) + (notYetSuspended ? " -" : " - last suspend duration was : " + lastSuspendDuration + "ms and") + " current suspend duration is : " + nextSuspendDuration + "ms - " + @@ -616,11 +616,22 @@ public String toString() { } private String printEndpointAddress() { + return printEndpointAddress(null); + } + + private String printEndpointAddress(MessageContext messageContext) { if(this.definition != null && this.definition.getAddress() != null) { - return " with address " + MessageHelper.maskURLPassword(this.definition.getAddress()); - } else { - return " "; + String address = ""; + if (messageContext != null) { + address = this.definition.getAddress(messageContext); + } else { + address = this.definition.getAddress(); + } + if (address != null) { + return " with address " + MessageHelper.maskURLPassword(address); + } } + return " "; } /** diff --git a/modules/core/src/main/java/org/apache/synapse/endpoints/EndpointDefinition.java b/modules/core/src/main/java/org/apache/synapse/endpoints/EndpointDefinition.java index bdfa3f8bd0..4e6b0f68d1 100755 --- a/modules/core/src/main/java/org/apache/synapse/endpoints/EndpointDefinition.java +++ b/modules/core/src/main/java/org/apache/synapse/endpoints/EndpointDefinition.java @@ -287,6 +287,11 @@ public String getAddress(MessageContext messageContext) { } if (!matches) { + Pattern newPattern = Pattern.compile("\\{\\+?([^}]+)\\}"); + String newProcessedAddress = SynapseConfigUtils.processString(addressString, newPattern, messageContext); + if (StringUtils.isNotEmpty(newProcessedAddress)) { + return newProcessedAddress; + } return addressString; } else { computedAddress.append(addressString.substring(s, addressString.length()));