Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce sequence observers #2086

Merged
merged 4 commits into from
Sep 6, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/**
* Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 LLC. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.synapse;

public interface SequenceFlowObserver {

/**
* Set the observer name
*
* @param name handler name
*/
void setName(String name);

/**
* This method should implement the logic to run at the start of the flow
*/
void start(MessageContext synCtx, String seqName);

/**
* This method should implement the logic to run at the end of the flow
*/
void complete(MessageContext synCtx, String seqName);

}
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ public static final class Axis2Param {
/** The name of the synapse handlers file */
public static final String SYNAPSE_HANDLER_FILE = "synapse-handlers.xml";

public static final String SEQUENCE_OBSERVERS_FILE = "sequence-observers.xml";

/** the name of the property used for synapse library based class loading */
public static final String SYNAPSE_LIB_LOADER = "synapse.lib.classloader";
/** conf directory name **/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/**
* Copyright (c) 2023, WSO2 LLC. (http://www.wso2.org) All Rights Reserved.
*
* WSO2 LLC. licenses this file to you under the Apache License,
* Version 2.0 (the "License"); you may not use this file except
* in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.synapse.config;

import org.apache.axiom.om.OMElement;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.synapse.SequenceFlowObserver;
import org.apache.synapse.SynapseConstants;
import org.apache.synapse.SynapseException;
import org.apache.synapse.commons.util.MiscellaneousUtil;

import javax.xml.namespace.QName;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

public class SequenceFlowObserversLoader {

private static final QName ROOT_Q = new QName("observers");
private static final QName OBSERVER_Q = new QName("observer");
private static final QName CLASS_Q = new QName("class");
private static final QName NAME_ATT = new QName("name");

private static Log log = LogFactory.getLog(SequenceFlowObserversLoader.class);

public static List<SequenceFlowObserver> loadObservers() {
List<SequenceFlowObserver> handlers = new ArrayList<>();
chanikag marked this conversation as resolved.
Show resolved Hide resolved
OMElement observersConfig =
MiscellaneousUtil.loadXMLConfig(SynapseConstants.SEQUENCE_OBSERVERS_FILE);
if (observersConfig != null) {

if (!ROOT_Q.equals(observersConfig.getQName())) {
handleException("Invalid handler configuration file");
chanikag marked this conversation as resolved.
Show resolved Hide resolved
}

Iterator iterator = observersConfig.getChildrenWithName(OBSERVER_Q);
while (iterator.hasNext()) {
OMElement observerElem = (OMElement) iterator.next();

String name = null;
if (observerElem.getAttribute(NAME_ATT) != null) {
name = observerElem.getAttributeValue(NAME_ATT);
} else {
handleException("Name not defined in one or more handlers");
chanikag marked this conversation as resolved.
Show resolved Hide resolved
}

if (observerElem.getAttribute(CLASS_Q) != null) {
String className = observerElem.getAttributeValue(CLASS_Q);
if (!"".equals(className)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we use isNotBlank to improve code readability

Suggested change
if (!"".equals(className)) {
if (StringUtils.isNotBlank(className)) {

SequenceFlowObserver observer = createObserver(className);
if (observer != null) {
handlers.add(observer);
chanikag marked this conversation as resolved.
Show resolved Hide resolved
observer.setName(name);
}
} else {
handleException("Class name is null for handle name : " + name);
chanikag marked this conversation as resolved.
Show resolved Hide resolved
}
} else {
handleException("Class name not defined for handler named : " + name);
chanikag marked this conversation as resolved.
Show resolved Hide resolved
}

}
}
return handlers;
}

private static SequenceFlowObserver createObserver(String classFQName) {
Object obj = null;
try {
obj = Class.forName(classFQName).newInstance();
} catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
handleException("Error creating Handler for class name : " + classFQName, e);
}

if (obj instanceof SequenceFlowObserver) {
return (SequenceFlowObserver) obj;
} else {
handleException("Error creating Handler. The Handler should be of type " +
chanikag marked this conversation as resolved.
Show resolved Hide resolved
"org.apache.synapse.Handler");
chanikag marked this conversation as resolved.
Show resolved Hide resolved
}
return null;
}

private static void handleException(String msg) {
log.error(msg);
throw new SynapseException(msg);
}

private static void handleException(String msg, Exception ex) {
log.error(msg, ex);
throw new SynapseException(msg, ex);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import org.apache.axiom.util.blob.OverflowBlob;
import org.apache.synapse.MessageContext;
import org.apache.synapse.SequenceFlowObserver;
import org.apache.synapse.ServerContextInformation;
import org.apache.synapse.SynapseHandler;
import org.apache.synapse.aspects.flow.statistics.store.MessageDataStore;
Expand Down Expand Up @@ -232,6 +233,20 @@ public interface SynapseEnvironment {
*/
public void registerSynapseHandler(SynapseHandler handler);

/**
* Get all sequence observers
*
* @return list of sequence observers
*/
public List<SequenceFlowObserver> getSequenceObservers();

/**
* Register a sequence observer to the synapse environment
*
* @param observer sequence observer
*/
public void registerSequenceObservers(SequenceFlowObserver observer);

/**
* Get the global timeout interval for callbacks
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.synapse.ContinuationState;
import org.apache.synapse.Mediator;
import org.apache.synapse.MessageContext;
import org.apache.synapse.SequenceFlowObserver;
import org.apache.synapse.ServerContextInformation;
import org.apache.synapse.SynapseConstants;
import org.apache.synapse.SynapseException;
Expand All @@ -46,6 +47,7 @@
import org.apache.synapse.carbonext.TenantInfoConfigurator;
import org.apache.synapse.commons.json.JsonUtil;
import org.apache.synapse.commons.util.ext.TenantInfoInitiator;
import org.apache.synapse.config.SequenceFlowObserversLoader;
import org.apache.synapse.config.SynapseConfigUtils;
import org.apache.synapse.config.SynapseConfiguration;
import org.apache.synapse.config.SynapseHandlersLoader;
Expand Down Expand Up @@ -100,6 +102,7 @@ public class Axis2SynapseEnvironment implements SynapseEnvironment {
private SynapseTaskManager taskManager;
private RESTRequestHandler restHandler;
private List<SynapseHandler> synapseHandlers;
private List<SequenceFlowObserver> sequenceObservers;
private long globalTimeout = SynapseConstants.DEFAULT_GLOBAL_TIMEOUT;
private SynapseDebugManager synapseDebugManager;

Expand Down Expand Up @@ -207,6 +210,7 @@ public Axis2SynapseEnvironment(SynapseConfiguration synCfg) {
restHandler = new RESTRequestHandler();

synapseHandlers = SynapseHandlersLoader.loadHandlers();
sequenceObservers = SequenceFlowObserversLoader.loadObservers();

this.globalTimeout = SynapseConfigUtils.getGlobalTimeoutInterval();

Expand Down Expand Up @@ -1053,6 +1057,26 @@ public void registerSynapseHandler(SynapseHandler handler) {
synapseHandlers.add(handler);
}

/**
* Get all sequence observers
*
* @return list of sequence observers
*/
@Override
public List<SequenceFlowObserver> getSequenceObservers() {
return sequenceObservers;
}

/**
* Register a sequence observer to the synapse environment
*
* @param observer sequence observer
*/
@Override
public void registerSequenceObservers(SequenceFlowObserver observer) {
sequenceObservers.add(observer);
}

@Override
public long getGlobalTimeout() {
return globalTimeout;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.synapse.ManagedLifecycle;
import org.apache.synapse.Mediator;
import org.apache.synapse.MessageContext;
import org.apache.synapse.SequenceFlowObserver;
import org.apache.synapse.SynapseConstants;
import org.apache.synapse.SynapseException;
import org.apache.synapse.SynapseLog;
Expand All @@ -34,6 +35,7 @@
import org.apache.synapse.config.SynapsePropertiesLoader;
import org.apache.synapse.core.SynapseEnvironment;
import org.apache.synapse.core.axis2.Axis2MessageContext;
import org.apache.synapse.mediators.base.SequenceMediator;
import org.apache.synapse.transport.passthru.PassThroughConstants;
import org.apache.synapse.transport.passthru.util.RelayUtils;
import org.apache.synapse.transport.util.MessageHandlerProvider;
Expand Down Expand Up @@ -80,6 +82,12 @@ public boolean mediate(MessageContext synCtx, int mediatorPosition) {
// to pass it on; else, do nothing -> i.e. let the parents state flow
setEffectiveTraceState(synCtx);
int myEffectiveTraceState = synCtx.getTracingState();
if (this instanceof SequenceMediator) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here we need to check whether mediator position is 0

List<SequenceFlowObserver> observers = synCtx.getEnvironment().getSequenceObservers();
for (SequenceFlowObserver observer : observers) {
observer.start(synCtx, ((SequenceMediator) this).getName());
}
}
try {
SynapseLog synLog = getLog(synCtx);
if (synLog.isTraceOrDebugEnabled()) {
Expand Down Expand Up @@ -109,6 +117,14 @@ public boolean mediate(MessageContext synCtx, int mediatorPosition) {
synCtx.setTracingState(myEffectiveTraceState);
if (!mediator.mediate(synCtx)) {
returnVal = false;
if (i == mediators.size() - 1) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we notifying the completeness if the last mediator returns a true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is fixed

if (this instanceof SequenceMediator) {
List<SequenceFlowObserver> observers = synCtx.getEnvironment().getSequenceObservers();
for (SequenceFlowObserver observer : observers) {
observer.complete(synCtx, ((SequenceMediator) this).getName());
}
}
}
break;
}
}
Expand Down