From c27486703e2ee22d4d8247f44916f21be1e569df Mon Sep 17 00:00:00 2001 From: chanikag Date: Mon, 17 Jul 2023 15:50:10 +0530 Subject: [PATCH 1/4] Introduce sequence observers This is to add sequence observers, which gets invoked at the start and end of a sequence execution. Observers can be configured in conf/sequence-observers.xml --- .../apache/synapse/SequenceFlowObserver.java | 40 +++++++ .../org/apache/synapse/SynapseConstants.java | 2 + .../config/SequenceFlowObserversLoader.java | 110 ++++++++++++++++++ .../synapse/core/SynapseEnvironment.java | 15 +++ .../core/axis2/Axis2SynapseEnvironment.java | 24 ++++ .../mediators/AbstractListMediator.java | 16 +++ 6 files changed, 207 insertions(+) create mode 100644 modules/core/src/main/java/org/apache/synapse/SequenceFlowObserver.java create mode 100644 modules/core/src/main/java/org/apache/synapse/config/SequenceFlowObserversLoader.java diff --git a/modules/core/src/main/java/org/apache/synapse/SequenceFlowObserver.java b/modules/core/src/main/java/org/apache/synapse/SequenceFlowObserver.java new file mode 100644 index 0000000000..15e17edd7d --- /dev/null +++ b/modules/core/src/main/java/org/apache/synapse/SequenceFlowObserver.java @@ -0,0 +1,40 @@ +/** + * Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.synapse; + +public interface SequenceFlowObserver { + + /** + * Set the observer name + * + * @param name handler name + */ + void setName(String name); + + /** + * This method should implement the logic to run at the start of the flow + */ + void start(MessageContext synCtx, String seqName); + + /** + * This method should implement the logic to run at the end of the flow + */ + void complete(MessageContext synCtx, String seqName); + +} diff --git a/modules/core/src/main/java/org/apache/synapse/SynapseConstants.java b/modules/core/src/main/java/org/apache/synapse/SynapseConstants.java index 93f02bf81e..07c061e189 100644 --- a/modules/core/src/main/java/org/apache/synapse/SynapseConstants.java +++ b/modules/core/src/main/java/org/apache/synapse/SynapseConstants.java @@ -171,6 +171,8 @@ public static final class Axis2Param { /** The name of the synapse handlers file */ public static final String SYNAPSE_HANDLER_FILE = "synapse-handlers.xml"; + public static final String SEQUENCE_OBSERVERS_FILE = "sequence-observers.xml"; + /** the name of the property used for synapse library based class loading */ public static final String SYNAPSE_LIB_LOADER = "synapse.lib.classloader"; /** conf directory name **/ diff --git a/modules/core/src/main/java/org/apache/synapse/config/SequenceFlowObserversLoader.java b/modules/core/src/main/java/org/apache/synapse/config/SequenceFlowObserversLoader.java new file mode 100644 index 0000000000..85f775942a --- /dev/null +++ b/modules/core/src/main/java/org/apache/synapse/config/SequenceFlowObserversLoader.java @@ -0,0 +1,110 @@ +/** + * Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved. + * + * WSO2 LLC. licenses this file to you under the Apache License, + * Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.synapse.config; + +import org.apache.axiom.om.OMElement; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.synapse.SequenceFlowObserver; +import org.apache.synapse.SynapseConstants; +import org.apache.synapse.SynapseException; +import org.apache.synapse.commons.util.MiscellaneousUtil; + +import javax.xml.namespace.QName; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +public class SequenceFlowObserversLoader { + + private static final QName ROOT_Q = new QName("observers"); + private static final QName OBSERVER_Q = new QName("observer"); + private static final QName CLASS_Q = new QName("class"); + private static final QName NAME_ATT = new QName("name"); + + private static Log log = LogFactory.getLog(SequenceFlowObserversLoader.class); + + public static List loadObservers() { + List handlers = new ArrayList<>(); + OMElement observersConfig = + MiscellaneousUtil.loadXMLConfig(SynapseConstants.SEQUENCE_OBSERVERS_FILE); + if (observersConfig != null) { + + if (!ROOT_Q.equals(observersConfig.getQName())) { + handleException("Invalid handler configuration file"); + } + + Iterator iterator = observersConfig.getChildrenWithName(OBSERVER_Q); + while (iterator.hasNext()) { + OMElement observerElem = (OMElement) iterator.next(); + + String name = null; + if (observerElem.getAttribute(NAME_ATT) != null) { + name = observerElem.getAttributeValue(NAME_ATT); + } else { + handleException("Name not defined in one or more handlers"); + } + + if (observerElem.getAttribute(CLASS_Q) != null) { + String className = observerElem.getAttributeValue(CLASS_Q); + if (!"".equals(className)) { + SequenceFlowObserver observer = createObserver(className); + if (observer != null) { + handlers.add(observer); + observer.setName(name); + } + } else { + handleException("Class name is null for handle name : " + name); + } + } else { + handleException("Class name not defined for handler named : " + name); + } + + } + } + return handlers; + } + + private static SequenceFlowObserver createObserver(String classFQName) { + Object obj = null; + try { + obj = Class.forName(classFQName).newInstance(); + } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) { + handleException("Error creating Handler for class name : " + classFQName, e); + } + + if (obj instanceof SequenceFlowObserver) { + return (SequenceFlowObserver) obj; + } else { + handleException("Error creating Handler. The Handler should be of type " + + "org.apache.synapse.Handler"); + } + return null; + } + + private static void handleException(String msg) { + log.error(msg); + throw new SynapseException(msg); + } + + private static void handleException(String msg, Exception ex) { + log.error(msg, ex); + throw new SynapseException(msg, ex); + } +} diff --git a/modules/core/src/main/java/org/apache/synapse/core/SynapseEnvironment.java b/modules/core/src/main/java/org/apache/synapse/core/SynapseEnvironment.java index e5761fb54c..b2572617a8 100644 --- a/modules/core/src/main/java/org/apache/synapse/core/SynapseEnvironment.java +++ b/modules/core/src/main/java/org/apache/synapse/core/SynapseEnvironment.java @@ -21,6 +21,7 @@ import org.apache.axiom.util.blob.OverflowBlob; import org.apache.synapse.MessageContext; +import org.apache.synapse.SequenceFlowObserver; import org.apache.synapse.ServerContextInformation; import org.apache.synapse.SynapseHandler; import org.apache.synapse.aspects.flow.statistics.store.MessageDataStore; @@ -232,6 +233,20 @@ public interface SynapseEnvironment { */ public void registerSynapseHandler(SynapseHandler handler); + /** + * Get all sequence observers + * + * @return list of sequence observers + */ + public List getSequenceObservers(); + + /** + * Register a sequence observer to the synapse environment + * + * @param observer sequence observer + */ + public void registerSequenceObservers(SequenceFlowObserver observer); + /** * Get the global timeout interval for callbacks * diff --git a/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2SynapseEnvironment.java b/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2SynapseEnvironment.java index 00d8dd0ec9..5c63c1efa1 100644 --- a/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2SynapseEnvironment.java +++ b/modules/core/src/main/java/org/apache/synapse/core/axis2/Axis2SynapseEnvironment.java @@ -33,6 +33,7 @@ import org.apache.synapse.ContinuationState; import org.apache.synapse.Mediator; import org.apache.synapse.MessageContext; +import org.apache.synapse.SequenceFlowObserver; import org.apache.synapse.ServerContextInformation; import org.apache.synapse.SynapseConstants; import org.apache.synapse.SynapseException; @@ -46,6 +47,7 @@ import org.apache.synapse.carbonext.TenantInfoConfigurator; import org.apache.synapse.commons.json.JsonUtil; import org.apache.synapse.commons.util.ext.TenantInfoInitiator; +import org.apache.synapse.config.SequenceFlowObserversLoader; import org.apache.synapse.config.SynapseConfigUtils; import org.apache.synapse.config.SynapseConfiguration; import org.apache.synapse.config.SynapseHandlersLoader; @@ -100,6 +102,7 @@ public class Axis2SynapseEnvironment implements SynapseEnvironment { private SynapseTaskManager taskManager; private RESTRequestHandler restHandler; private List synapseHandlers; + private List sequenceObservers; private long globalTimeout = SynapseConstants.DEFAULT_GLOBAL_TIMEOUT; private SynapseDebugManager synapseDebugManager; @@ -207,6 +210,7 @@ public Axis2SynapseEnvironment(SynapseConfiguration synCfg) { restHandler = new RESTRequestHandler(); synapseHandlers = SynapseHandlersLoader.loadHandlers(); + sequenceObservers = SequenceFlowObserversLoader.loadObservers(); this.globalTimeout = SynapseConfigUtils.getGlobalTimeoutInterval(); @@ -1053,6 +1057,26 @@ public void registerSynapseHandler(SynapseHandler handler) { synapseHandlers.add(handler); } + /** + * Get all sequence observers + * + * @return list of sequence observers + */ + @Override + public List getSequenceObservers() { + return sequenceObservers; + } + + /** + * Register a sequence observer to the synapse environment + * + * @param observer sequence observer + */ + @Override + public void registerSequenceObservers(SequenceFlowObserver observer) { + sequenceObservers.add(observer); + } + @Override public long getGlobalTimeout() { return globalTimeout; diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/AbstractListMediator.java b/modules/core/src/main/java/org/apache/synapse/mediators/AbstractListMediator.java index 9ae14b1951..6eadbbfd24 100644 --- a/modules/core/src/main/java/org/apache/synapse/mediators/AbstractListMediator.java +++ b/modules/core/src/main/java/org/apache/synapse/mediators/AbstractListMediator.java @@ -26,6 +26,7 @@ import org.apache.synapse.ManagedLifecycle; import org.apache.synapse.Mediator; import org.apache.synapse.MessageContext; +import org.apache.synapse.SequenceFlowObserver; import org.apache.synapse.SynapseConstants; import org.apache.synapse.SynapseException; import org.apache.synapse.SynapseLog; @@ -34,6 +35,7 @@ import org.apache.synapse.config.SynapsePropertiesLoader; import org.apache.synapse.core.SynapseEnvironment; import org.apache.synapse.core.axis2.Axis2MessageContext; +import org.apache.synapse.mediators.base.SequenceMediator; import org.apache.synapse.transport.passthru.PassThroughConstants; import org.apache.synapse.transport.passthru.util.RelayUtils; import org.apache.synapse.transport.util.MessageHandlerProvider; @@ -80,6 +82,12 @@ public boolean mediate(MessageContext synCtx, int mediatorPosition) { // to pass it on; else, do nothing -> i.e. let the parents state flow setEffectiveTraceState(synCtx); int myEffectiveTraceState = synCtx.getTracingState(); + if (this instanceof SequenceMediator) { + List observers = synCtx.getEnvironment().getSequenceObservers(); + for (SequenceFlowObserver observer : observers) { + observer.start(synCtx, ((SequenceMediator) this).getName()); + } + } try { SynapseLog synLog = getLog(synCtx); if (synLog.isTraceOrDebugEnabled()) { @@ -109,6 +117,14 @@ public boolean mediate(MessageContext synCtx, int mediatorPosition) { synCtx.setTracingState(myEffectiveTraceState); if (!mediator.mediate(synCtx)) { returnVal = false; + if (i == mediators.size() - 1) { + if (this instanceof SequenceMediator) { + List observers = synCtx.getEnvironment().getSequenceObservers(); + for (SequenceFlowObserver observer : observers) { + observer.complete(synCtx, ((SequenceMediator) this).getName()); + } + } + } break; } } From db17d33f07eb19a8ca12109a72d389fb706a9521 Mon Sep 17 00:00:00 2001 From: chanikag Date: Tue, 22 Aug 2023 15:26:09 +0530 Subject: [PATCH 2/4] Rename handler -> sequence observer --- .../config/SequenceFlowObserversLoader.java | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/modules/core/src/main/java/org/apache/synapse/config/SequenceFlowObserversLoader.java b/modules/core/src/main/java/org/apache/synapse/config/SequenceFlowObserversLoader.java index 85f775942a..e4cff53ae5 100644 --- a/modules/core/src/main/java/org/apache/synapse/config/SequenceFlowObserversLoader.java +++ b/modules/core/src/main/java/org/apache/synapse/config/SequenceFlowObserversLoader.java @@ -41,13 +41,13 @@ public class SequenceFlowObserversLoader { private static Log log = LogFactory.getLog(SequenceFlowObserversLoader.class); public static List loadObservers() { - List handlers = new ArrayList<>(); + List observers = new ArrayList<>(); OMElement observersConfig = MiscellaneousUtil.loadXMLConfig(SynapseConstants.SEQUENCE_OBSERVERS_FILE); if (observersConfig != null) { if (!ROOT_Q.equals(observersConfig.getQName())) { - handleException("Invalid handler configuration file"); + handleException("Invalid sequence observer configuration file"); } Iterator iterator = observersConfig.getChildrenWithName(OBSERVER_Q); @@ -58,7 +58,7 @@ public static List loadObservers() { if (observerElem.getAttribute(NAME_ATT) != null) { name = observerElem.getAttributeValue(NAME_ATT); } else { - handleException("Name not defined in one or more handlers"); + handleException("Name not defined in one or more sequence observer"); } if (observerElem.getAttribute(CLASS_Q) != null) { @@ -66,19 +66,19 @@ public static List loadObservers() { if (!"".equals(className)) { SequenceFlowObserver observer = createObserver(className); if (observer != null) { - handlers.add(observer); + observers.add(observer); observer.setName(name); } } else { - handleException("Class name is null for handle name : " + name); + handleException("Class name is null for sequence observer name : " + name); } } else { - handleException("Class name not defined for handler named : " + name); + handleException("Class name not defined for sequence observer named : " + name); } } } - return handlers; + return observers; } private static SequenceFlowObserver createObserver(String classFQName) { @@ -86,14 +86,14 @@ private static SequenceFlowObserver createObserver(String classFQName) { try { obj = Class.forName(classFQName).newInstance(); } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) { - handleException("Error creating Handler for class name : " + classFQName, e); + handleException("Error creating Sequence observer for class name : " + classFQName, e); } if (obj instanceof SequenceFlowObserver) { return (SequenceFlowObserver) obj; } else { - handleException("Error creating Handler. The Handler should be of type " + - "org.apache.synapse.Handler"); + handleException("Error creating Sequence observer. The Sequence observer should be of type " + + "org.apache.synapse.SequenceFlowObserver"); } return null; } From 98d8a1409315cff51a76cc1be7a16b14c8301c99 Mon Sep 17 00:00:00 2001 From: chanikag Date: Wed, 23 Aug 2023 06:20:33 +0530 Subject: [PATCH 3/4] Fix review sugessions --- .../synapse/mediators/AbstractListMediator.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/AbstractListMediator.java b/modules/core/src/main/java/org/apache/synapse/mediators/AbstractListMediator.java index 6eadbbfd24..79ce14b6fb 100644 --- a/modules/core/src/main/java/org/apache/synapse/mediators/AbstractListMediator.java +++ b/modules/core/src/main/java/org/apache/synapse/mediators/AbstractListMediator.java @@ -117,15 +117,15 @@ public boolean mediate(MessageContext synCtx, int mediatorPosition) { synCtx.setTracingState(myEffectiveTraceState); if (!mediator.mediate(synCtx)) { returnVal = false; - if (i == mediators.size() - 1) { - if (this instanceof SequenceMediator) { - List observers = synCtx.getEnvironment().getSequenceObservers(); - for (SequenceFlowObserver observer : observers) { - observer.complete(synCtx, ((SequenceMediator) this).getName()); - } + break; + } + if (i == mediators.size() - 1) { + if (this instanceof SequenceMediator) { + List observers = synCtx.getEnvironment().getSequenceObservers(); + for (SequenceFlowObserver observer : observers) { + observer.complete(synCtx, ((SequenceMediator) this).getName()); } } - break; } } } From 518f56d5402f5d2e28f8595bea9911f734e7d1f2 Mon Sep 17 00:00:00 2001 From: chanikag Date: Wed, 23 Aug 2023 10:00:43 +0530 Subject: [PATCH 4/4] Add review sugessions --- .../synapse/config/SequenceFlowObserversLoader.java | 3 ++- .../apache/synapse/mediators/AbstractListMediator.java | 10 +++++++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/modules/core/src/main/java/org/apache/synapse/config/SequenceFlowObserversLoader.java b/modules/core/src/main/java/org/apache/synapse/config/SequenceFlowObserversLoader.java index e4cff53ae5..94ef9cadc1 100644 --- a/modules/core/src/main/java/org/apache/synapse/config/SequenceFlowObserversLoader.java +++ b/modules/core/src/main/java/org/apache/synapse/config/SequenceFlowObserversLoader.java @@ -19,6 +19,7 @@ package org.apache.synapse.config; import org.apache.axiom.om.OMElement; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.synapse.SequenceFlowObserver; @@ -63,7 +64,7 @@ public static List loadObservers() { if (observerElem.getAttribute(CLASS_Q) != null) { String className = observerElem.getAttributeValue(CLASS_Q); - if (!"".equals(className)) { + if (StringUtils.isNotBlank(className)) { SequenceFlowObserver observer = createObserver(className); if (observer != null) { observers.add(observer); diff --git a/modules/core/src/main/java/org/apache/synapse/mediators/AbstractListMediator.java b/modules/core/src/main/java/org/apache/synapse/mediators/AbstractListMediator.java index 79ce14b6fb..ead1f90a90 100644 --- a/modules/core/src/main/java/org/apache/synapse/mediators/AbstractListMediator.java +++ b/modules/core/src/main/java/org/apache/synapse/mediators/AbstractListMediator.java @@ -82,7 +82,7 @@ public boolean mediate(MessageContext synCtx, int mediatorPosition) { // to pass it on; else, do nothing -> i.e. let the parents state flow setEffectiveTraceState(synCtx); int myEffectiveTraceState = synCtx.getTracingState(); - if (this instanceof SequenceMediator) { + if (this instanceof SequenceMediator & mediatorPosition == 0) { List observers = synCtx.getEnvironment().getSequenceObservers(); for (SequenceFlowObserver observer : observers) { observer.start(synCtx, ((SequenceMediator) this).getName()); @@ -112,6 +112,14 @@ public boolean mediate(MessageContext synCtx, int mediatorPosition) { returnVal = false; break; } + if (i == mediators.size() - 1) { + if (this instanceof SequenceMediator) { + List observers = synCtx.getEnvironment().getSequenceObservers(); + for (SequenceFlowObserver observer : observers) { + observer.complete(synCtx, ((SequenceMediator) this).getName()); + } + } + } mediator.reportCloseStatistics(synCtx, statisticReportingIndex); } else { synCtx.setTracingState(myEffectiveTraceState);